##// END OF EJS Templates
bundle2: seek part back during iteration...
Durham Goode -
r33889:891118dc default
parent child Browse files
Show More
@@ -1,1895 +1,1899
1 # bundle2.py - generic container format to transmit arbitrary data.
1 # bundle2.py - generic container format to transmit arbitrary data.
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7 """Handling of the new bundle2 format
7 """Handling of the new bundle2 format
8
8
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 payloads in an application agnostic way. It consist in a sequence of "parts"
10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 that will be handed to and processed by the application layer.
11 that will be handed to and processed by the application layer.
12
12
13
13
14 General format architecture
14 General format architecture
15 ===========================
15 ===========================
16
16
17 The format is architectured as follow
17 The format is architectured as follow
18
18
19 - magic string
19 - magic string
20 - stream level parameters
20 - stream level parameters
21 - payload parts (any number)
21 - payload parts (any number)
22 - end of stream marker.
22 - end of stream marker.
23
23
24 the Binary format
24 the Binary format
25 ============================
25 ============================
26
26
27 All numbers are unsigned and big-endian.
27 All numbers are unsigned and big-endian.
28
28
29 stream level parameters
29 stream level parameters
30 ------------------------
30 ------------------------
31
31
32 Binary format is as follow
32 Binary format is as follow
33
33
34 :params size: int32
34 :params size: int32
35
35
36 The total number of Bytes used by the parameters
36 The total number of Bytes used by the parameters
37
37
38 :params value: arbitrary number of Bytes
38 :params value: arbitrary number of Bytes
39
39
40 A blob of `params size` containing the serialized version of all stream level
40 A blob of `params size` containing the serialized version of all stream level
41 parameters.
41 parameters.
42
42
43 The blob contains a space separated list of parameters. Parameters with value
43 The blob contains a space separated list of parameters. Parameters with value
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45
45
46 Empty name are obviously forbidden.
46 Empty name are obviously forbidden.
47
47
48 Name MUST start with a letter. If this first letter is lower case, the
48 Name MUST start with a letter. If this first letter is lower case, the
49 parameter is advisory and can be safely ignored. However when the first
49 parameter is advisory and can be safely ignored. However when the first
50 letter is capital, the parameter is mandatory and the bundling process MUST
50 letter is capital, the parameter is mandatory and the bundling process MUST
51 stop if he is not able to proceed it.
51 stop if he is not able to proceed it.
52
52
53 Stream parameters use a simple textual format for two main reasons:
53 Stream parameters use a simple textual format for two main reasons:
54
54
55 - Stream level parameters should remain simple and we want to discourage any
55 - Stream level parameters should remain simple and we want to discourage any
56 crazy usage.
56 crazy usage.
57 - Textual data allow easy human inspection of a bundle2 header in case of
57 - Textual data allow easy human inspection of a bundle2 header in case of
58 troubles.
58 troubles.
59
59
60 Any Applicative level options MUST go into a bundle2 part instead.
60 Any Applicative level options MUST go into a bundle2 part instead.
61
61
62 Payload part
62 Payload part
63 ------------------------
63 ------------------------
64
64
65 Binary format is as follow
65 Binary format is as follow
66
66
67 :header size: int32
67 :header size: int32
68
68
69 The total number of Bytes used by the part header. When the header is empty
69 The total number of Bytes used by the part header. When the header is empty
70 (size = 0) this is interpreted as the end of stream marker.
70 (size = 0) this is interpreted as the end of stream marker.
71
71
72 :header:
72 :header:
73
73
74 The header defines how to interpret the part. It contains two piece of
74 The header defines how to interpret the part. It contains two piece of
75 data: the part type, and the part parameters.
75 data: the part type, and the part parameters.
76
76
77 The part type is used to route an application level handler, that can
77 The part type is used to route an application level handler, that can
78 interpret payload.
78 interpret payload.
79
79
80 Part parameters are passed to the application level handler. They are
80 Part parameters are passed to the application level handler. They are
81 meant to convey information that will help the application level object to
81 meant to convey information that will help the application level object to
82 interpret the part payload.
82 interpret the part payload.
83
83
84 The binary format of the header is has follow
84 The binary format of the header is has follow
85
85
86 :typesize: (one byte)
86 :typesize: (one byte)
87
87
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89
89
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 to this part.
91 to this part.
92
92
93 :parameters:
93 :parameters:
94
94
95 Part's parameter may have arbitrary content, the binary structure is::
95 Part's parameter may have arbitrary content, the binary structure is::
96
96
97 <mandatory-count><advisory-count><param-sizes><param-data>
97 <mandatory-count><advisory-count><param-sizes><param-data>
98
98
99 :mandatory-count: 1 byte, number of mandatory parameters
99 :mandatory-count: 1 byte, number of mandatory parameters
100
100
101 :advisory-count: 1 byte, number of advisory parameters
101 :advisory-count: 1 byte, number of advisory parameters
102
102
103 :param-sizes:
103 :param-sizes:
104
104
105 N couple of bytes, where N is the total number of parameters. Each
105 N couple of bytes, where N is the total number of parameters. Each
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107
107
108 :param-data:
108 :param-data:
109
109
110 A blob of bytes from which each parameter key and value can be
110 A blob of bytes from which each parameter key and value can be
111 retrieved using the list of size couples stored in the previous
111 retrieved using the list of size couples stored in the previous
112 field.
112 field.
113
113
114 Mandatory parameters comes first, then the advisory ones.
114 Mandatory parameters comes first, then the advisory ones.
115
115
116 Each parameter's key MUST be unique within the part.
116 Each parameter's key MUST be unique within the part.
117
117
118 :payload:
118 :payload:
119
119
120 payload is a series of `<chunksize><chunkdata>`.
120 payload is a series of `<chunksize><chunkdata>`.
121
121
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124
124
125 The current implementation always produces either zero or one chunk.
125 The current implementation always produces either zero or one chunk.
126 This is an implementation limitation that will ultimately be lifted.
126 This is an implementation limitation that will ultimately be lifted.
127
127
128 `chunksize` can be negative to trigger special case processing. No such
128 `chunksize` can be negative to trigger special case processing. No such
129 processing is in place yet.
129 processing is in place yet.
130
130
131 Bundle processing
131 Bundle processing
132 ============================
132 ============================
133
133
134 Each part is processed in order using a "part handler". Handler are registered
134 Each part is processed in order using a "part handler". Handler are registered
135 for a certain part type.
135 for a certain part type.
136
136
137 The matching of a part to its handler is case insensitive. The case of the
137 The matching of a part to its handler is case insensitive. The case of the
138 part type is used to know if a part is mandatory or advisory. If the Part type
138 part type is used to know if a part is mandatory or advisory. If the Part type
139 contains any uppercase char it is considered mandatory. When no handler is
139 contains any uppercase char it is considered mandatory. When no handler is
140 known for a Mandatory part, the process is aborted and an exception is raised.
140 known for a Mandatory part, the process is aborted and an exception is raised.
141 If the part is advisory and no handler is known, the part is ignored. When the
141 If the part is advisory and no handler is known, the part is ignored. When the
142 process is aborted, the full bundle is still read from the stream to keep the
142 process is aborted, the full bundle is still read from the stream to keep the
143 channel usable. But none of the part read from an abort are processed. In the
143 channel usable. But none of the part read from an abort are processed. In the
144 future, dropping the stream may become an option for channel we do not care to
144 future, dropping the stream may become an option for channel we do not care to
145 preserve.
145 preserve.
146 """
146 """
147
147
148 from __future__ import absolute_import, division
148 from __future__ import absolute_import, division
149
149
150 import errno
150 import errno
151 import re
151 import re
152 import string
152 import string
153 import struct
153 import struct
154 import sys
154 import sys
155
155
156 from .i18n import _
156 from .i18n import _
157 from . import (
157 from . import (
158 changegroup,
158 changegroup,
159 error,
159 error,
160 obsolete,
160 obsolete,
161 phases,
161 phases,
162 pushkey,
162 pushkey,
163 pycompat,
163 pycompat,
164 tags,
164 tags,
165 url,
165 url,
166 util,
166 util,
167 )
167 )
168
168
169 urlerr = util.urlerr
169 urlerr = util.urlerr
170 urlreq = util.urlreq
170 urlreq = util.urlreq
171
171
172 _pack = struct.pack
172 _pack = struct.pack
173 _unpack = struct.unpack
173 _unpack = struct.unpack
174
174
175 _fstreamparamsize = '>i'
175 _fstreamparamsize = '>i'
176 _fpartheadersize = '>i'
176 _fpartheadersize = '>i'
177 _fparttypesize = '>B'
177 _fparttypesize = '>B'
178 _fpartid = '>I'
178 _fpartid = '>I'
179 _fpayloadsize = '>i'
179 _fpayloadsize = '>i'
180 _fpartparamcount = '>BB'
180 _fpartparamcount = '>BB'
181
181
182 _fphasesentry = '>i20s'
182 _fphasesentry = '>i20s'
183
183
184 preferedchunksize = 4096
184 preferedchunksize = 4096
185
185
186 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
186 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
187
187
188 def outdebug(ui, message):
188 def outdebug(ui, message):
189 """debug regarding output stream (bundling)"""
189 """debug regarding output stream (bundling)"""
190 if ui.configbool('devel', 'bundle2.debug'):
190 if ui.configbool('devel', 'bundle2.debug'):
191 ui.debug('bundle2-output: %s\n' % message)
191 ui.debug('bundle2-output: %s\n' % message)
192
192
193 def indebug(ui, message):
193 def indebug(ui, message):
194 """debug on input stream (unbundling)"""
194 """debug on input stream (unbundling)"""
195 if ui.configbool('devel', 'bundle2.debug'):
195 if ui.configbool('devel', 'bundle2.debug'):
196 ui.debug('bundle2-input: %s\n' % message)
196 ui.debug('bundle2-input: %s\n' % message)
197
197
198 def validateparttype(parttype):
198 def validateparttype(parttype):
199 """raise ValueError if a parttype contains invalid character"""
199 """raise ValueError if a parttype contains invalid character"""
200 if _parttypeforbidden.search(parttype):
200 if _parttypeforbidden.search(parttype):
201 raise ValueError(parttype)
201 raise ValueError(parttype)
202
202
203 def _makefpartparamsizes(nbparams):
203 def _makefpartparamsizes(nbparams):
204 """return a struct format to read part parameter sizes
204 """return a struct format to read part parameter sizes
205
205
206 The number parameters is variable so we need to build that format
206 The number parameters is variable so we need to build that format
207 dynamically.
207 dynamically.
208 """
208 """
209 return '>'+('BB'*nbparams)
209 return '>'+('BB'*nbparams)
210
210
211 parthandlermapping = {}
211 parthandlermapping = {}
212
212
213 def parthandler(parttype, params=()):
213 def parthandler(parttype, params=()):
214 """decorator that register a function as a bundle2 part handler
214 """decorator that register a function as a bundle2 part handler
215
215
216 eg::
216 eg::
217
217
218 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
218 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
219 def myparttypehandler(...):
219 def myparttypehandler(...):
220 '''process a part of type "my part".'''
220 '''process a part of type "my part".'''
221 ...
221 ...
222 """
222 """
223 validateparttype(parttype)
223 validateparttype(parttype)
224 def _decorator(func):
224 def _decorator(func):
225 lparttype = parttype.lower() # enforce lower case matching.
225 lparttype = parttype.lower() # enforce lower case matching.
226 assert lparttype not in parthandlermapping
226 assert lparttype not in parthandlermapping
227 parthandlermapping[lparttype] = func
227 parthandlermapping[lparttype] = func
228 func.params = frozenset(params)
228 func.params = frozenset(params)
229 return func
229 return func
230 return _decorator
230 return _decorator
231
231
232 class unbundlerecords(object):
232 class unbundlerecords(object):
233 """keep record of what happens during and unbundle
233 """keep record of what happens during and unbundle
234
234
235 New records are added using `records.add('cat', obj)`. Where 'cat' is a
235 New records are added using `records.add('cat', obj)`. Where 'cat' is a
236 category of record and obj is an arbitrary object.
236 category of record and obj is an arbitrary object.
237
237
238 `records['cat']` will return all entries of this category 'cat'.
238 `records['cat']` will return all entries of this category 'cat'.
239
239
240 Iterating on the object itself will yield `('category', obj)` tuples
240 Iterating on the object itself will yield `('category', obj)` tuples
241 for all entries.
241 for all entries.
242
242
243 All iterations happens in chronological order.
243 All iterations happens in chronological order.
244 """
244 """
245
245
246 def __init__(self):
246 def __init__(self):
247 self._categories = {}
247 self._categories = {}
248 self._sequences = []
248 self._sequences = []
249 self._replies = {}
249 self._replies = {}
250
250
251 def add(self, category, entry, inreplyto=None):
251 def add(self, category, entry, inreplyto=None):
252 """add a new record of a given category.
252 """add a new record of a given category.
253
253
254 The entry can then be retrieved in the list returned by
254 The entry can then be retrieved in the list returned by
255 self['category']."""
255 self['category']."""
256 self._categories.setdefault(category, []).append(entry)
256 self._categories.setdefault(category, []).append(entry)
257 self._sequences.append((category, entry))
257 self._sequences.append((category, entry))
258 if inreplyto is not None:
258 if inreplyto is not None:
259 self.getreplies(inreplyto).add(category, entry)
259 self.getreplies(inreplyto).add(category, entry)
260
260
261 def getreplies(self, partid):
261 def getreplies(self, partid):
262 """get the records that are replies to a specific part"""
262 """get the records that are replies to a specific part"""
263 return self._replies.setdefault(partid, unbundlerecords())
263 return self._replies.setdefault(partid, unbundlerecords())
264
264
265 def __getitem__(self, cat):
265 def __getitem__(self, cat):
266 return tuple(self._categories.get(cat, ()))
266 return tuple(self._categories.get(cat, ()))
267
267
268 def __iter__(self):
268 def __iter__(self):
269 return iter(self._sequences)
269 return iter(self._sequences)
270
270
271 def __len__(self):
271 def __len__(self):
272 return len(self._sequences)
272 return len(self._sequences)
273
273
274 def __nonzero__(self):
274 def __nonzero__(self):
275 return bool(self._sequences)
275 return bool(self._sequences)
276
276
277 __bool__ = __nonzero__
277 __bool__ = __nonzero__
278
278
279 class bundleoperation(object):
279 class bundleoperation(object):
280 """an object that represents a single bundling process
280 """an object that represents a single bundling process
281
281
282 Its purpose is to carry unbundle-related objects and states.
282 Its purpose is to carry unbundle-related objects and states.
283
283
284 A new object should be created at the beginning of each bundle processing.
284 A new object should be created at the beginning of each bundle processing.
285 The object is to be returned by the processing function.
285 The object is to be returned by the processing function.
286
286
287 The object has very little content now it will ultimately contain:
287 The object has very little content now it will ultimately contain:
288 * an access to the repo the bundle is applied to,
288 * an access to the repo the bundle is applied to,
289 * a ui object,
289 * a ui object,
290 * a way to retrieve a transaction to add changes to the repo,
290 * a way to retrieve a transaction to add changes to the repo,
291 * a way to record the result of processing each part,
291 * a way to record the result of processing each part,
292 * a way to construct a bundle response when applicable.
292 * a way to construct a bundle response when applicable.
293 """
293 """
294
294
295 def __init__(self, repo, transactiongetter, captureoutput=True):
295 def __init__(self, repo, transactiongetter, captureoutput=True):
296 self.repo = repo
296 self.repo = repo
297 self.ui = repo.ui
297 self.ui = repo.ui
298 self.records = unbundlerecords()
298 self.records = unbundlerecords()
299 self.reply = None
299 self.reply = None
300 self.captureoutput = captureoutput
300 self.captureoutput = captureoutput
301 self.hookargs = {}
301 self.hookargs = {}
302 self._gettransaction = transactiongetter
302 self._gettransaction = transactiongetter
303
303
304 def gettransaction(self):
304 def gettransaction(self):
305 transaction = self._gettransaction()
305 transaction = self._gettransaction()
306
306
307 if self.hookargs:
307 if self.hookargs:
308 # the ones added to the transaction supercede those added
308 # the ones added to the transaction supercede those added
309 # to the operation.
309 # to the operation.
310 self.hookargs.update(transaction.hookargs)
310 self.hookargs.update(transaction.hookargs)
311 transaction.hookargs = self.hookargs
311 transaction.hookargs = self.hookargs
312
312
313 # mark the hookargs as flushed. further attempts to add to
313 # mark the hookargs as flushed. further attempts to add to
314 # hookargs will result in an abort.
314 # hookargs will result in an abort.
315 self.hookargs = None
315 self.hookargs = None
316
316
317 return transaction
317 return transaction
318
318
319 def addhookargs(self, hookargs):
319 def addhookargs(self, hookargs):
320 if self.hookargs is None:
320 if self.hookargs is None:
321 raise error.ProgrammingError('attempted to add hookargs to '
321 raise error.ProgrammingError('attempted to add hookargs to '
322 'operation after transaction started')
322 'operation after transaction started')
323 self.hookargs.update(hookargs)
323 self.hookargs.update(hookargs)
324
324
325 class TransactionUnavailable(RuntimeError):
325 class TransactionUnavailable(RuntimeError):
326 pass
326 pass
327
327
328 def _notransaction():
328 def _notransaction():
329 """default method to get a transaction while processing a bundle
329 """default method to get a transaction while processing a bundle
330
330
331 Raise an exception to highlight the fact that no transaction was expected
331 Raise an exception to highlight the fact that no transaction was expected
332 to be created"""
332 to be created"""
333 raise TransactionUnavailable()
333 raise TransactionUnavailable()
334
334
335 def applybundle(repo, unbundler, tr, source=None, url=None, **kwargs):
335 def applybundle(repo, unbundler, tr, source=None, url=None, **kwargs):
336 # transform me into unbundler.apply() as soon as the freeze is lifted
336 # transform me into unbundler.apply() as soon as the freeze is lifted
337 if isinstance(unbundler, unbundle20):
337 if isinstance(unbundler, unbundle20):
338 tr.hookargs['bundle2'] = '1'
338 tr.hookargs['bundle2'] = '1'
339 if source is not None and 'source' not in tr.hookargs:
339 if source is not None and 'source' not in tr.hookargs:
340 tr.hookargs['source'] = source
340 tr.hookargs['source'] = source
341 if url is not None and 'url' not in tr.hookargs:
341 if url is not None and 'url' not in tr.hookargs:
342 tr.hookargs['url'] = url
342 tr.hookargs['url'] = url
343 return processbundle(repo, unbundler, lambda: tr)
343 return processbundle(repo, unbundler, lambda: tr)
344 else:
344 else:
345 # the transactiongetter won't be used, but we might as well set it
345 # the transactiongetter won't be used, but we might as well set it
346 op = bundleoperation(repo, lambda: tr)
346 op = bundleoperation(repo, lambda: tr)
347 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
347 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
348 return op
348 return op
349
349
350 def processbundle(repo, unbundler, transactiongetter=None, op=None):
350 def processbundle(repo, unbundler, transactiongetter=None, op=None):
351 """This function process a bundle, apply effect to/from a repo
351 """This function process a bundle, apply effect to/from a repo
352
352
353 It iterates over each part then searches for and uses the proper handling
353 It iterates over each part then searches for and uses the proper handling
354 code to process the part. Parts are processed in order.
354 code to process the part. Parts are processed in order.
355
355
356 Unknown Mandatory part will abort the process.
356 Unknown Mandatory part will abort the process.
357
357
358 It is temporarily possible to provide a prebuilt bundleoperation to the
358 It is temporarily possible to provide a prebuilt bundleoperation to the
359 function. This is used to ensure output is properly propagated in case of
359 function. This is used to ensure output is properly propagated in case of
360 an error during the unbundling. This output capturing part will likely be
360 an error during the unbundling. This output capturing part will likely be
361 reworked and this ability will probably go away in the process.
361 reworked and this ability will probably go away in the process.
362 """
362 """
363 if op is None:
363 if op is None:
364 if transactiongetter is None:
364 if transactiongetter is None:
365 transactiongetter = _notransaction
365 transactiongetter = _notransaction
366 op = bundleoperation(repo, transactiongetter)
366 op = bundleoperation(repo, transactiongetter)
367 # todo:
367 # todo:
368 # - replace this is a init function soon.
368 # - replace this is a init function soon.
369 # - exception catching
369 # - exception catching
370 unbundler.params
370 unbundler.params
371 if repo.ui.debugflag:
371 if repo.ui.debugflag:
372 msg = ['bundle2-input-bundle:']
372 msg = ['bundle2-input-bundle:']
373 if unbundler.params:
373 if unbundler.params:
374 msg.append(' %i params' % len(unbundler.params))
374 msg.append(' %i params' % len(unbundler.params))
375 if op._gettransaction is None or op._gettransaction is _notransaction:
375 if op._gettransaction is None or op._gettransaction is _notransaction:
376 msg.append(' no-transaction')
376 msg.append(' no-transaction')
377 else:
377 else:
378 msg.append(' with-transaction')
378 msg.append(' with-transaction')
379 msg.append('\n')
379 msg.append('\n')
380 repo.ui.debug(''.join(msg))
380 repo.ui.debug(''.join(msg))
381 iterparts = enumerate(unbundler.iterparts())
381 iterparts = enumerate(unbundler.iterparts())
382 part = None
382 part = None
383 nbpart = 0
383 nbpart = 0
384 try:
384 try:
385 for nbpart, part in iterparts:
385 for nbpart, part in iterparts:
386 _processpart(op, part)
386 _processpart(op, part)
387 except Exception as exc:
387 except Exception as exc:
388 # Any exceptions seeking to the end of the bundle at this point are
388 # Any exceptions seeking to the end of the bundle at this point are
389 # almost certainly related to the underlying stream being bad.
389 # almost certainly related to the underlying stream being bad.
390 # And, chances are that the exception we're handling is related to
390 # And, chances are that the exception we're handling is related to
391 # getting in that bad state. So, we swallow the seeking error and
391 # getting in that bad state. So, we swallow the seeking error and
392 # re-raise the original error.
392 # re-raise the original error.
393 seekerror = False
393 seekerror = False
394 try:
394 try:
395 for nbpart, part in iterparts:
395 for nbpart, part in iterparts:
396 # consume the bundle content
396 # consume the bundle content
397 part.seek(0, 2)
397 part.seek(0, 2)
398 except Exception:
398 except Exception:
399 seekerror = True
399 seekerror = True
400
400
401 # Small hack to let caller code distinguish exceptions from bundle2
401 # Small hack to let caller code distinguish exceptions from bundle2
402 # processing from processing the old format. This is mostly
402 # processing from processing the old format. This is mostly
403 # needed to handle different return codes to unbundle according to the
403 # needed to handle different return codes to unbundle according to the
404 # type of bundle. We should probably clean up or drop this return code
404 # type of bundle. We should probably clean up or drop this return code
405 # craziness in a future version.
405 # craziness in a future version.
406 exc.duringunbundle2 = True
406 exc.duringunbundle2 = True
407 salvaged = []
407 salvaged = []
408 replycaps = None
408 replycaps = None
409 if op.reply is not None:
409 if op.reply is not None:
410 salvaged = op.reply.salvageoutput()
410 salvaged = op.reply.salvageoutput()
411 replycaps = op.reply.capabilities
411 replycaps = op.reply.capabilities
412 exc._replycaps = replycaps
412 exc._replycaps = replycaps
413 exc._bundle2salvagedoutput = salvaged
413 exc._bundle2salvagedoutput = salvaged
414
414
415 # Re-raising from a variable loses the original stack. So only use
415 # Re-raising from a variable loses the original stack. So only use
416 # that form if we need to.
416 # that form if we need to.
417 if seekerror:
417 if seekerror:
418 raise exc
418 raise exc
419 else:
419 else:
420 raise
420 raise
421 finally:
421 finally:
422 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
422 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
423
423
424 return op
424 return op
425
425
426 def _processchangegroup(op, cg, tr, source, url, **kwargs):
426 def _processchangegroup(op, cg, tr, source, url, **kwargs):
427 ret = cg.apply(op.repo, tr, source, url, **kwargs)
427 ret = cg.apply(op.repo, tr, source, url, **kwargs)
428 op.records.add('changegroup', {
428 op.records.add('changegroup', {
429 'return': ret,
429 'return': ret,
430 })
430 })
431 return ret
431 return ret
432
432
433 def _processpart(op, part):
433 def _processpart(op, part):
434 """process a single part from a bundle
434 """process a single part from a bundle
435
435
436 The part is guaranteed to have been fully consumed when the function exits
436 The part is guaranteed to have been fully consumed when the function exits
437 (even if an exception is raised)."""
437 (even if an exception is raised)."""
438 status = 'unknown' # used by debug output
438 status = 'unknown' # used by debug output
439 hardabort = False
439 hardabort = False
440 try:
440 try:
441 try:
441 try:
442 handler = parthandlermapping.get(part.type)
442 handler = parthandlermapping.get(part.type)
443 if handler is None:
443 if handler is None:
444 status = 'unsupported-type'
444 status = 'unsupported-type'
445 raise error.BundleUnknownFeatureError(parttype=part.type)
445 raise error.BundleUnknownFeatureError(parttype=part.type)
446 indebug(op.ui, 'found a handler for part %r' % part.type)
446 indebug(op.ui, 'found a handler for part %r' % part.type)
447 unknownparams = part.mandatorykeys - handler.params
447 unknownparams = part.mandatorykeys - handler.params
448 if unknownparams:
448 if unknownparams:
449 unknownparams = list(unknownparams)
449 unknownparams = list(unknownparams)
450 unknownparams.sort()
450 unknownparams.sort()
451 status = 'unsupported-params (%s)' % unknownparams
451 status = 'unsupported-params (%s)' % unknownparams
452 raise error.BundleUnknownFeatureError(parttype=part.type,
452 raise error.BundleUnknownFeatureError(parttype=part.type,
453 params=unknownparams)
453 params=unknownparams)
454 status = 'supported'
454 status = 'supported'
455 except error.BundleUnknownFeatureError as exc:
455 except error.BundleUnknownFeatureError as exc:
456 if part.mandatory: # mandatory parts
456 if part.mandatory: # mandatory parts
457 raise
457 raise
458 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
458 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
459 return # skip to part processing
459 return # skip to part processing
460 finally:
460 finally:
461 if op.ui.debugflag:
461 if op.ui.debugflag:
462 msg = ['bundle2-input-part: "%s"' % part.type]
462 msg = ['bundle2-input-part: "%s"' % part.type]
463 if not part.mandatory:
463 if not part.mandatory:
464 msg.append(' (advisory)')
464 msg.append(' (advisory)')
465 nbmp = len(part.mandatorykeys)
465 nbmp = len(part.mandatorykeys)
466 nbap = len(part.params) - nbmp
466 nbap = len(part.params) - nbmp
467 if nbmp or nbap:
467 if nbmp or nbap:
468 msg.append(' (params:')
468 msg.append(' (params:')
469 if nbmp:
469 if nbmp:
470 msg.append(' %i mandatory' % nbmp)
470 msg.append(' %i mandatory' % nbmp)
471 if nbap:
471 if nbap:
472 msg.append(' %i advisory' % nbmp)
472 msg.append(' %i advisory' % nbmp)
473 msg.append(')')
473 msg.append(')')
474 msg.append(' %s\n' % status)
474 msg.append(' %s\n' % status)
475 op.ui.debug(''.join(msg))
475 op.ui.debug(''.join(msg))
476
476
477 # handler is called outside the above try block so that we don't
477 # handler is called outside the above try block so that we don't
478 # risk catching KeyErrors from anything other than the
478 # risk catching KeyErrors from anything other than the
479 # parthandlermapping lookup (any KeyError raised by handler()
479 # parthandlermapping lookup (any KeyError raised by handler()
480 # itself represents a defect of a different variety).
480 # itself represents a defect of a different variety).
481 output = None
481 output = None
482 if op.captureoutput and op.reply is not None:
482 if op.captureoutput and op.reply is not None:
483 op.ui.pushbuffer(error=True, subproc=True)
483 op.ui.pushbuffer(error=True, subproc=True)
484 output = ''
484 output = ''
485 try:
485 try:
486 handler(op, part)
486 handler(op, part)
487 finally:
487 finally:
488 if output is not None:
488 if output is not None:
489 output = op.ui.popbuffer()
489 output = op.ui.popbuffer()
490 if output:
490 if output:
491 outpart = op.reply.newpart('output', data=output,
491 outpart = op.reply.newpart('output', data=output,
492 mandatory=False)
492 mandatory=False)
493 outpart.addparam(
493 outpart.addparam(
494 'in-reply-to', pycompat.bytestr(part.id), mandatory=False)
494 'in-reply-to', pycompat.bytestr(part.id), mandatory=False)
495 # If exiting or interrupted, do not attempt to seek the stream in the
495 # If exiting or interrupted, do not attempt to seek the stream in the
496 # finally block below. This makes abort faster.
496 # finally block below. This makes abort faster.
497 except (SystemExit, KeyboardInterrupt):
497 except (SystemExit, KeyboardInterrupt):
498 hardabort = True
498 hardabort = True
499 raise
499 raise
500 finally:
500 finally:
501 # consume the part content to not corrupt the stream.
501 # consume the part content to not corrupt the stream.
502 if not hardabort:
502 if not hardabort:
503 part.seek(0, 2)
503 part.seek(0, 2)
504
504
505
505
506 def decodecaps(blob):
506 def decodecaps(blob):
507 """decode a bundle2 caps bytes blob into a dictionary
507 """decode a bundle2 caps bytes blob into a dictionary
508
508
509 The blob is a list of capabilities (one per line)
509 The blob is a list of capabilities (one per line)
510 Capabilities may have values using a line of the form::
510 Capabilities may have values using a line of the form::
511
511
512 capability=value1,value2,value3
512 capability=value1,value2,value3
513
513
514 The values are always a list."""
514 The values are always a list."""
515 caps = {}
515 caps = {}
516 for line in blob.splitlines():
516 for line in blob.splitlines():
517 if not line:
517 if not line:
518 continue
518 continue
519 if '=' not in line:
519 if '=' not in line:
520 key, vals = line, ()
520 key, vals = line, ()
521 else:
521 else:
522 key, vals = line.split('=', 1)
522 key, vals = line.split('=', 1)
523 vals = vals.split(',')
523 vals = vals.split(',')
524 key = urlreq.unquote(key)
524 key = urlreq.unquote(key)
525 vals = [urlreq.unquote(v) for v in vals]
525 vals = [urlreq.unquote(v) for v in vals]
526 caps[key] = vals
526 caps[key] = vals
527 return caps
527 return caps
528
528
529 def encodecaps(caps):
529 def encodecaps(caps):
530 """encode a bundle2 caps dictionary into a bytes blob"""
530 """encode a bundle2 caps dictionary into a bytes blob"""
531 chunks = []
531 chunks = []
532 for ca in sorted(caps):
532 for ca in sorted(caps):
533 vals = caps[ca]
533 vals = caps[ca]
534 ca = urlreq.quote(ca)
534 ca = urlreq.quote(ca)
535 vals = [urlreq.quote(v) for v in vals]
535 vals = [urlreq.quote(v) for v in vals]
536 if vals:
536 if vals:
537 ca = "%s=%s" % (ca, ','.join(vals))
537 ca = "%s=%s" % (ca, ','.join(vals))
538 chunks.append(ca)
538 chunks.append(ca)
539 return '\n'.join(chunks)
539 return '\n'.join(chunks)
540
540
541 bundletypes = {
541 bundletypes = {
542 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
542 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
543 # since the unification ssh accepts a header but there
543 # since the unification ssh accepts a header but there
544 # is no capability signaling it.
544 # is no capability signaling it.
545 "HG20": (), # special-cased below
545 "HG20": (), # special-cased below
546 "HG10UN": ("HG10UN", 'UN'),
546 "HG10UN": ("HG10UN", 'UN'),
547 "HG10BZ": ("HG10", 'BZ'),
547 "HG10BZ": ("HG10", 'BZ'),
548 "HG10GZ": ("HG10GZ", 'GZ'),
548 "HG10GZ": ("HG10GZ", 'GZ'),
549 }
549 }
550
550
551 # hgweb uses this list to communicate its preferred type
551 # hgweb uses this list to communicate its preferred type
552 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
552 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
553
553
554 class bundle20(object):
554 class bundle20(object):
555 """represent an outgoing bundle2 container
555 """represent an outgoing bundle2 container
556
556
557 Use the `addparam` method to add stream level parameter. and `newpart` to
557 Use the `addparam` method to add stream level parameter. and `newpart` to
558 populate it. Then call `getchunks` to retrieve all the binary chunks of
558 populate it. Then call `getchunks` to retrieve all the binary chunks of
559 data that compose the bundle2 container."""
559 data that compose the bundle2 container."""
560
560
561 _magicstring = 'HG20'
561 _magicstring = 'HG20'
562
562
563 def __init__(self, ui, capabilities=()):
563 def __init__(self, ui, capabilities=()):
564 self.ui = ui
564 self.ui = ui
565 self._params = []
565 self._params = []
566 self._parts = []
566 self._parts = []
567 self.capabilities = dict(capabilities)
567 self.capabilities = dict(capabilities)
568 self._compengine = util.compengines.forbundletype('UN')
568 self._compengine = util.compengines.forbundletype('UN')
569 self._compopts = None
569 self._compopts = None
570
570
571 def setcompression(self, alg, compopts=None):
571 def setcompression(self, alg, compopts=None):
572 """setup core part compression to <alg>"""
572 """setup core part compression to <alg>"""
573 if alg in (None, 'UN'):
573 if alg in (None, 'UN'):
574 return
574 return
575 assert not any(n.lower() == 'compression' for n, v in self._params)
575 assert not any(n.lower() == 'compression' for n, v in self._params)
576 self.addparam('Compression', alg)
576 self.addparam('Compression', alg)
577 self._compengine = util.compengines.forbundletype(alg)
577 self._compengine = util.compengines.forbundletype(alg)
578 self._compopts = compopts
578 self._compopts = compopts
579
579
580 @property
580 @property
581 def nbparts(self):
581 def nbparts(self):
582 """total number of parts added to the bundler"""
582 """total number of parts added to the bundler"""
583 return len(self._parts)
583 return len(self._parts)
584
584
585 # methods used to defines the bundle2 content
585 # methods used to defines the bundle2 content
586 def addparam(self, name, value=None):
586 def addparam(self, name, value=None):
587 """add a stream level parameter"""
587 """add a stream level parameter"""
588 if not name:
588 if not name:
589 raise ValueError('empty parameter name')
589 raise ValueError('empty parameter name')
590 if name[0] not in pycompat.bytestr(string.ascii_letters):
590 if name[0] not in pycompat.bytestr(string.ascii_letters):
591 raise ValueError('non letter first character: %r' % name)
591 raise ValueError('non letter first character: %r' % name)
592 self._params.append((name, value))
592 self._params.append((name, value))
593
593
594 def addpart(self, part):
594 def addpart(self, part):
595 """add a new part to the bundle2 container
595 """add a new part to the bundle2 container
596
596
597 Parts contains the actual applicative payload."""
597 Parts contains the actual applicative payload."""
598 assert part.id is None
598 assert part.id is None
599 part.id = len(self._parts) # very cheap counter
599 part.id = len(self._parts) # very cheap counter
600 self._parts.append(part)
600 self._parts.append(part)
601
601
602 def newpart(self, typeid, *args, **kwargs):
602 def newpart(self, typeid, *args, **kwargs):
603 """create a new part and add it to the containers
603 """create a new part and add it to the containers
604
604
605 As the part is directly added to the containers. For now, this means
605 As the part is directly added to the containers. For now, this means
606 that any failure to properly initialize the part after calling
606 that any failure to properly initialize the part after calling
607 ``newpart`` should result in a failure of the whole bundling process.
607 ``newpart`` should result in a failure of the whole bundling process.
608
608
609 You can still fall back to manually create and add if you need better
609 You can still fall back to manually create and add if you need better
610 control."""
610 control."""
611 part = bundlepart(typeid, *args, **kwargs)
611 part = bundlepart(typeid, *args, **kwargs)
612 self.addpart(part)
612 self.addpart(part)
613 return part
613 return part
614
614
615 # methods used to generate the bundle2 stream
615 # methods used to generate the bundle2 stream
616 def getchunks(self):
616 def getchunks(self):
617 if self.ui.debugflag:
617 if self.ui.debugflag:
618 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
618 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
619 if self._params:
619 if self._params:
620 msg.append(' (%i params)' % len(self._params))
620 msg.append(' (%i params)' % len(self._params))
621 msg.append(' %i parts total\n' % len(self._parts))
621 msg.append(' %i parts total\n' % len(self._parts))
622 self.ui.debug(''.join(msg))
622 self.ui.debug(''.join(msg))
623 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
623 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
624 yield self._magicstring
624 yield self._magicstring
625 param = self._paramchunk()
625 param = self._paramchunk()
626 outdebug(self.ui, 'bundle parameter: %s' % param)
626 outdebug(self.ui, 'bundle parameter: %s' % param)
627 yield _pack(_fstreamparamsize, len(param))
627 yield _pack(_fstreamparamsize, len(param))
628 if param:
628 if param:
629 yield param
629 yield param
630 for chunk in self._compengine.compressstream(self._getcorechunk(),
630 for chunk in self._compengine.compressstream(self._getcorechunk(),
631 self._compopts):
631 self._compopts):
632 yield chunk
632 yield chunk
633
633
634 def _paramchunk(self):
634 def _paramchunk(self):
635 """return a encoded version of all stream parameters"""
635 """return a encoded version of all stream parameters"""
636 blocks = []
636 blocks = []
637 for par, value in self._params:
637 for par, value in self._params:
638 par = urlreq.quote(par)
638 par = urlreq.quote(par)
639 if value is not None:
639 if value is not None:
640 value = urlreq.quote(value)
640 value = urlreq.quote(value)
641 par = '%s=%s' % (par, value)
641 par = '%s=%s' % (par, value)
642 blocks.append(par)
642 blocks.append(par)
643 return ' '.join(blocks)
643 return ' '.join(blocks)
644
644
645 def _getcorechunk(self):
645 def _getcorechunk(self):
646 """yield chunk for the core part of the bundle
646 """yield chunk for the core part of the bundle
647
647
648 (all but headers and parameters)"""
648 (all but headers and parameters)"""
649 outdebug(self.ui, 'start of parts')
649 outdebug(self.ui, 'start of parts')
650 for part in self._parts:
650 for part in self._parts:
651 outdebug(self.ui, 'bundle part: "%s"' % part.type)
651 outdebug(self.ui, 'bundle part: "%s"' % part.type)
652 for chunk in part.getchunks(ui=self.ui):
652 for chunk in part.getchunks(ui=self.ui):
653 yield chunk
653 yield chunk
654 outdebug(self.ui, 'end of bundle')
654 outdebug(self.ui, 'end of bundle')
655 yield _pack(_fpartheadersize, 0)
655 yield _pack(_fpartheadersize, 0)
656
656
657
657
658 def salvageoutput(self):
658 def salvageoutput(self):
659 """return a list with a copy of all output parts in the bundle
659 """return a list with a copy of all output parts in the bundle
660
660
661 This is meant to be used during error handling to make sure we preserve
661 This is meant to be used during error handling to make sure we preserve
662 server output"""
662 server output"""
663 salvaged = []
663 salvaged = []
664 for part in self._parts:
664 for part in self._parts:
665 if part.type.startswith('output'):
665 if part.type.startswith('output'):
666 salvaged.append(part.copy())
666 salvaged.append(part.copy())
667 return salvaged
667 return salvaged
668
668
669
669
670 class unpackermixin(object):
670 class unpackermixin(object):
671 """A mixin to extract bytes and struct data from a stream"""
671 """A mixin to extract bytes and struct data from a stream"""
672
672
673 def __init__(self, fp):
673 def __init__(self, fp):
674 self._fp = fp
674 self._fp = fp
675
675
676 def _unpack(self, format):
676 def _unpack(self, format):
677 """unpack this struct format from the stream
677 """unpack this struct format from the stream
678
678
679 This method is meant for internal usage by the bundle2 protocol only.
679 This method is meant for internal usage by the bundle2 protocol only.
680 They directly manipulate the low level stream including bundle2 level
680 They directly manipulate the low level stream including bundle2 level
681 instruction.
681 instruction.
682
682
683 Do not use it to implement higher-level logic or methods."""
683 Do not use it to implement higher-level logic or methods."""
684 data = self._readexact(struct.calcsize(format))
684 data = self._readexact(struct.calcsize(format))
685 return _unpack(format, data)
685 return _unpack(format, data)
686
686
687 def _readexact(self, size):
687 def _readexact(self, size):
688 """read exactly <size> bytes from the stream
688 """read exactly <size> bytes from the stream
689
689
690 This method is meant for internal usage by the bundle2 protocol only.
690 This method is meant for internal usage by the bundle2 protocol only.
691 They directly manipulate the low level stream including bundle2 level
691 They directly manipulate the low level stream including bundle2 level
692 instruction.
692 instruction.
693
693
694 Do not use it to implement higher-level logic or methods."""
694 Do not use it to implement higher-level logic or methods."""
695 return changegroup.readexactly(self._fp, size)
695 return changegroup.readexactly(self._fp, size)
696
696
697 def getunbundler(ui, fp, magicstring=None):
697 def getunbundler(ui, fp, magicstring=None):
698 """return a valid unbundler object for a given magicstring"""
698 """return a valid unbundler object for a given magicstring"""
699 if magicstring is None:
699 if magicstring is None:
700 magicstring = changegroup.readexactly(fp, 4)
700 magicstring = changegroup.readexactly(fp, 4)
701 magic, version = magicstring[0:2], magicstring[2:4]
701 magic, version = magicstring[0:2], magicstring[2:4]
702 if magic != 'HG':
702 if magic != 'HG':
703 ui.debug(
703 ui.debug(
704 "error: invalid magic: %r (version %r), should be 'HG'\n"
704 "error: invalid magic: %r (version %r), should be 'HG'\n"
705 % (magic, version))
705 % (magic, version))
706 raise error.Abort(_('not a Mercurial bundle'))
706 raise error.Abort(_('not a Mercurial bundle'))
707 unbundlerclass = formatmap.get(version)
707 unbundlerclass = formatmap.get(version)
708 if unbundlerclass is None:
708 if unbundlerclass is None:
709 raise error.Abort(_('unknown bundle version %s') % version)
709 raise error.Abort(_('unknown bundle version %s') % version)
710 unbundler = unbundlerclass(ui, fp)
710 unbundler = unbundlerclass(ui, fp)
711 indebug(ui, 'start processing of %s stream' % magicstring)
711 indebug(ui, 'start processing of %s stream' % magicstring)
712 return unbundler
712 return unbundler
713
713
714 class unbundle20(unpackermixin):
714 class unbundle20(unpackermixin):
715 """interpret a bundle2 stream
715 """interpret a bundle2 stream
716
716
717 This class is fed with a binary stream and yields parts through its
717 This class is fed with a binary stream and yields parts through its
718 `iterparts` methods."""
718 `iterparts` methods."""
719
719
720 _magicstring = 'HG20'
720 _magicstring = 'HG20'
721
721
722 def __init__(self, ui, fp):
722 def __init__(self, ui, fp):
723 """If header is specified, we do not read it out of the stream."""
723 """If header is specified, we do not read it out of the stream."""
724 self.ui = ui
724 self.ui = ui
725 self._compengine = util.compengines.forbundletype('UN')
725 self._compengine = util.compengines.forbundletype('UN')
726 self._compressed = None
726 self._compressed = None
727 super(unbundle20, self).__init__(fp)
727 super(unbundle20, self).__init__(fp)
728
728
729 @util.propertycache
729 @util.propertycache
730 def params(self):
730 def params(self):
731 """dictionary of stream level parameters"""
731 """dictionary of stream level parameters"""
732 indebug(self.ui, 'reading bundle2 stream parameters')
732 indebug(self.ui, 'reading bundle2 stream parameters')
733 params = {}
733 params = {}
734 paramssize = self._unpack(_fstreamparamsize)[0]
734 paramssize = self._unpack(_fstreamparamsize)[0]
735 if paramssize < 0:
735 if paramssize < 0:
736 raise error.BundleValueError('negative bundle param size: %i'
736 raise error.BundleValueError('negative bundle param size: %i'
737 % paramssize)
737 % paramssize)
738 if paramssize:
738 if paramssize:
739 params = self._readexact(paramssize)
739 params = self._readexact(paramssize)
740 params = self._processallparams(params)
740 params = self._processallparams(params)
741 return params
741 return params
742
742
743 def _processallparams(self, paramsblock):
743 def _processallparams(self, paramsblock):
744 """"""
744 """"""
745 params = util.sortdict()
745 params = util.sortdict()
746 for p in paramsblock.split(' '):
746 for p in paramsblock.split(' '):
747 p = p.split('=', 1)
747 p = p.split('=', 1)
748 p = [urlreq.unquote(i) for i in p]
748 p = [urlreq.unquote(i) for i in p]
749 if len(p) < 2:
749 if len(p) < 2:
750 p.append(None)
750 p.append(None)
751 self._processparam(*p)
751 self._processparam(*p)
752 params[p[0]] = p[1]
752 params[p[0]] = p[1]
753 return params
753 return params
754
754
755
755
756 def _processparam(self, name, value):
756 def _processparam(self, name, value):
757 """process a parameter, applying its effect if needed
757 """process a parameter, applying its effect if needed
758
758
759 Parameter starting with a lower case letter are advisory and will be
759 Parameter starting with a lower case letter are advisory and will be
760 ignored when unknown. Those starting with an upper case letter are
760 ignored when unknown. Those starting with an upper case letter are
761 mandatory and will this function will raise a KeyError when unknown.
761 mandatory and will this function will raise a KeyError when unknown.
762
762
763 Note: no option are currently supported. Any input will be either
763 Note: no option are currently supported. Any input will be either
764 ignored or failing.
764 ignored or failing.
765 """
765 """
766 if not name:
766 if not name:
767 raise ValueError('empty parameter name')
767 raise ValueError('empty parameter name')
768 if name[0] not in pycompat.bytestr(string.ascii_letters):
768 if name[0] not in pycompat.bytestr(string.ascii_letters):
769 raise ValueError('non letter first character: %r' % name)
769 raise ValueError('non letter first character: %r' % name)
770 try:
770 try:
771 handler = b2streamparamsmap[name.lower()]
771 handler = b2streamparamsmap[name.lower()]
772 except KeyError:
772 except KeyError:
773 if name[0].islower():
773 if name[0].islower():
774 indebug(self.ui, "ignoring unknown parameter %r" % name)
774 indebug(self.ui, "ignoring unknown parameter %r" % name)
775 else:
775 else:
776 raise error.BundleUnknownFeatureError(params=(name,))
776 raise error.BundleUnknownFeatureError(params=(name,))
777 else:
777 else:
778 handler(self, name, value)
778 handler(self, name, value)
779
779
780 def _forwardchunks(self):
780 def _forwardchunks(self):
781 """utility to transfer a bundle2 as binary
781 """utility to transfer a bundle2 as binary
782
782
783 This is made necessary by the fact the 'getbundle' command over 'ssh'
783 This is made necessary by the fact the 'getbundle' command over 'ssh'
784 have no way to know then the reply end, relying on the bundle to be
784 have no way to know then the reply end, relying on the bundle to be
785 interpreted to know its end. This is terrible and we are sorry, but we
785 interpreted to know its end. This is terrible and we are sorry, but we
786 needed to move forward to get general delta enabled.
786 needed to move forward to get general delta enabled.
787 """
787 """
788 yield self._magicstring
788 yield self._magicstring
789 assert 'params' not in vars(self)
789 assert 'params' not in vars(self)
790 paramssize = self._unpack(_fstreamparamsize)[0]
790 paramssize = self._unpack(_fstreamparamsize)[0]
791 if paramssize < 0:
791 if paramssize < 0:
792 raise error.BundleValueError('negative bundle param size: %i'
792 raise error.BundleValueError('negative bundle param size: %i'
793 % paramssize)
793 % paramssize)
794 yield _pack(_fstreamparamsize, paramssize)
794 yield _pack(_fstreamparamsize, paramssize)
795 if paramssize:
795 if paramssize:
796 params = self._readexact(paramssize)
796 params = self._readexact(paramssize)
797 self._processallparams(params)
797 self._processallparams(params)
798 yield params
798 yield params
799 assert self._compengine.bundletype == 'UN'
799 assert self._compengine.bundletype == 'UN'
800 # From there, payload might need to be decompressed
800 # From there, payload might need to be decompressed
801 self._fp = self._compengine.decompressorreader(self._fp)
801 self._fp = self._compengine.decompressorreader(self._fp)
802 emptycount = 0
802 emptycount = 0
803 while emptycount < 2:
803 while emptycount < 2:
804 # so we can brainlessly loop
804 # so we can brainlessly loop
805 assert _fpartheadersize == _fpayloadsize
805 assert _fpartheadersize == _fpayloadsize
806 size = self._unpack(_fpartheadersize)[0]
806 size = self._unpack(_fpartheadersize)[0]
807 yield _pack(_fpartheadersize, size)
807 yield _pack(_fpartheadersize, size)
808 if size:
808 if size:
809 emptycount = 0
809 emptycount = 0
810 else:
810 else:
811 emptycount += 1
811 emptycount += 1
812 continue
812 continue
813 if size == flaginterrupt:
813 if size == flaginterrupt:
814 continue
814 continue
815 elif size < 0:
815 elif size < 0:
816 raise error.BundleValueError('negative chunk size: %i')
816 raise error.BundleValueError('negative chunk size: %i')
817 yield self._readexact(size)
817 yield self._readexact(size)
818
818
819
819
820 def iterparts(self):
820 def iterparts(self):
821 """yield all parts contained in the stream"""
821 """yield all parts contained in the stream"""
822 # make sure param have been loaded
822 # make sure param have been loaded
823 self.params
823 self.params
824 # From there, payload need to be decompressed
824 # From there, payload need to be decompressed
825 self._fp = self._compengine.decompressorreader(self._fp)
825 self._fp = self._compengine.decompressorreader(self._fp)
826 indebug(self.ui, 'start extraction of bundle2 parts')
826 indebug(self.ui, 'start extraction of bundle2 parts')
827 headerblock = self._readpartheader()
827 headerblock = self._readpartheader()
828 while headerblock is not None:
828 while headerblock is not None:
829 part = unbundlepart(self.ui, headerblock, self._fp)
829 part = unbundlepart(self.ui, headerblock, self._fp)
830 yield part
830 yield part
831 # Seek to the end of the part to force it's consumption so the next
832 # part can be read. But then seek back to the beginning so the
833 # code consuming this generator has a part that starts at 0.
831 part.seek(0, 2)
834 part.seek(0, 2)
835 part.seek(0)
832 headerblock = self._readpartheader()
836 headerblock = self._readpartheader()
833 indebug(self.ui, 'end of bundle2 stream')
837 indebug(self.ui, 'end of bundle2 stream')
834
838
835 def _readpartheader(self):
839 def _readpartheader(self):
836 """reads a part header size and return the bytes blob
840 """reads a part header size and return the bytes blob
837
841
838 returns None if empty"""
842 returns None if empty"""
839 headersize = self._unpack(_fpartheadersize)[0]
843 headersize = self._unpack(_fpartheadersize)[0]
840 if headersize < 0:
844 if headersize < 0:
841 raise error.BundleValueError('negative part header size: %i'
845 raise error.BundleValueError('negative part header size: %i'
842 % headersize)
846 % headersize)
843 indebug(self.ui, 'part header size: %i' % headersize)
847 indebug(self.ui, 'part header size: %i' % headersize)
844 if headersize:
848 if headersize:
845 return self._readexact(headersize)
849 return self._readexact(headersize)
846 return None
850 return None
847
851
848 def compressed(self):
852 def compressed(self):
849 self.params # load params
853 self.params # load params
850 return self._compressed
854 return self._compressed
851
855
852 def close(self):
856 def close(self):
853 """close underlying file"""
857 """close underlying file"""
854 if util.safehasattr(self._fp, 'close'):
858 if util.safehasattr(self._fp, 'close'):
855 return self._fp.close()
859 return self._fp.close()
856
860
857 formatmap = {'20': unbundle20}
861 formatmap = {'20': unbundle20}
858
862
859 b2streamparamsmap = {}
863 b2streamparamsmap = {}
860
864
861 def b2streamparamhandler(name):
865 def b2streamparamhandler(name):
862 """register a handler for a stream level parameter"""
866 """register a handler for a stream level parameter"""
863 def decorator(func):
867 def decorator(func):
864 assert name not in formatmap
868 assert name not in formatmap
865 b2streamparamsmap[name] = func
869 b2streamparamsmap[name] = func
866 return func
870 return func
867 return decorator
871 return decorator
868
872
869 @b2streamparamhandler('compression')
873 @b2streamparamhandler('compression')
870 def processcompression(unbundler, param, value):
874 def processcompression(unbundler, param, value):
871 """read compression parameter and install payload decompression"""
875 """read compression parameter and install payload decompression"""
872 if value not in util.compengines.supportedbundletypes:
876 if value not in util.compengines.supportedbundletypes:
873 raise error.BundleUnknownFeatureError(params=(param,),
877 raise error.BundleUnknownFeatureError(params=(param,),
874 values=(value,))
878 values=(value,))
875 unbundler._compengine = util.compengines.forbundletype(value)
879 unbundler._compengine = util.compengines.forbundletype(value)
876 if value is not None:
880 if value is not None:
877 unbundler._compressed = True
881 unbundler._compressed = True
878
882
879 class bundlepart(object):
883 class bundlepart(object):
880 """A bundle2 part contains application level payload
884 """A bundle2 part contains application level payload
881
885
882 The part `type` is used to route the part to the application level
886 The part `type` is used to route the part to the application level
883 handler.
887 handler.
884
888
885 The part payload is contained in ``part.data``. It could be raw bytes or a
889 The part payload is contained in ``part.data``. It could be raw bytes or a
886 generator of byte chunks.
890 generator of byte chunks.
887
891
888 You can add parameters to the part using the ``addparam`` method.
892 You can add parameters to the part using the ``addparam`` method.
889 Parameters can be either mandatory (default) or advisory. Remote side
893 Parameters can be either mandatory (default) or advisory. Remote side
890 should be able to safely ignore the advisory ones.
894 should be able to safely ignore the advisory ones.
891
895
892 Both data and parameters cannot be modified after the generation has begun.
896 Both data and parameters cannot be modified after the generation has begun.
893 """
897 """
894
898
895 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
899 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
896 data='', mandatory=True):
900 data='', mandatory=True):
897 validateparttype(parttype)
901 validateparttype(parttype)
898 self.id = None
902 self.id = None
899 self.type = parttype
903 self.type = parttype
900 self._data = data
904 self._data = data
901 self._mandatoryparams = list(mandatoryparams)
905 self._mandatoryparams = list(mandatoryparams)
902 self._advisoryparams = list(advisoryparams)
906 self._advisoryparams = list(advisoryparams)
903 # checking for duplicated entries
907 # checking for duplicated entries
904 self._seenparams = set()
908 self._seenparams = set()
905 for pname, __ in self._mandatoryparams + self._advisoryparams:
909 for pname, __ in self._mandatoryparams + self._advisoryparams:
906 if pname in self._seenparams:
910 if pname in self._seenparams:
907 raise error.ProgrammingError('duplicated params: %s' % pname)
911 raise error.ProgrammingError('duplicated params: %s' % pname)
908 self._seenparams.add(pname)
912 self._seenparams.add(pname)
909 # status of the part's generation:
913 # status of the part's generation:
910 # - None: not started,
914 # - None: not started,
911 # - False: currently generated,
915 # - False: currently generated,
912 # - True: generation done.
916 # - True: generation done.
913 self._generated = None
917 self._generated = None
914 self.mandatory = mandatory
918 self.mandatory = mandatory
915
919
916 def __repr__(self):
920 def __repr__(self):
917 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
921 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
918 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
922 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
919 % (cls, id(self), self.id, self.type, self.mandatory))
923 % (cls, id(self), self.id, self.type, self.mandatory))
920
924
921 def copy(self):
925 def copy(self):
922 """return a copy of the part
926 """return a copy of the part
923
927
924 The new part have the very same content but no partid assigned yet.
928 The new part have the very same content but no partid assigned yet.
925 Parts with generated data cannot be copied."""
929 Parts with generated data cannot be copied."""
926 assert not util.safehasattr(self.data, 'next')
930 assert not util.safehasattr(self.data, 'next')
927 return self.__class__(self.type, self._mandatoryparams,
931 return self.__class__(self.type, self._mandatoryparams,
928 self._advisoryparams, self._data, self.mandatory)
932 self._advisoryparams, self._data, self.mandatory)
929
933
930 # methods used to defines the part content
934 # methods used to defines the part content
931 @property
935 @property
932 def data(self):
936 def data(self):
933 return self._data
937 return self._data
934
938
935 @data.setter
939 @data.setter
936 def data(self, data):
940 def data(self, data):
937 if self._generated is not None:
941 if self._generated is not None:
938 raise error.ReadOnlyPartError('part is being generated')
942 raise error.ReadOnlyPartError('part is being generated')
939 self._data = data
943 self._data = data
940
944
941 @property
945 @property
942 def mandatoryparams(self):
946 def mandatoryparams(self):
943 # make it an immutable tuple to force people through ``addparam``
947 # make it an immutable tuple to force people through ``addparam``
944 return tuple(self._mandatoryparams)
948 return tuple(self._mandatoryparams)
945
949
946 @property
950 @property
947 def advisoryparams(self):
951 def advisoryparams(self):
948 # make it an immutable tuple to force people through ``addparam``
952 # make it an immutable tuple to force people through ``addparam``
949 return tuple(self._advisoryparams)
953 return tuple(self._advisoryparams)
950
954
951 def addparam(self, name, value='', mandatory=True):
955 def addparam(self, name, value='', mandatory=True):
952 """add a parameter to the part
956 """add a parameter to the part
953
957
954 If 'mandatory' is set to True, the remote handler must claim support
958 If 'mandatory' is set to True, the remote handler must claim support
955 for this parameter or the unbundling will be aborted.
959 for this parameter or the unbundling will be aborted.
956
960
957 The 'name' and 'value' cannot exceed 255 bytes each.
961 The 'name' and 'value' cannot exceed 255 bytes each.
958 """
962 """
959 if self._generated is not None:
963 if self._generated is not None:
960 raise error.ReadOnlyPartError('part is being generated')
964 raise error.ReadOnlyPartError('part is being generated')
961 if name in self._seenparams:
965 if name in self._seenparams:
962 raise ValueError('duplicated params: %s' % name)
966 raise ValueError('duplicated params: %s' % name)
963 self._seenparams.add(name)
967 self._seenparams.add(name)
964 params = self._advisoryparams
968 params = self._advisoryparams
965 if mandatory:
969 if mandatory:
966 params = self._mandatoryparams
970 params = self._mandatoryparams
967 params.append((name, value))
971 params.append((name, value))
968
972
969 # methods used to generates the bundle2 stream
973 # methods used to generates the bundle2 stream
970 def getchunks(self, ui):
974 def getchunks(self, ui):
971 if self._generated is not None:
975 if self._generated is not None:
972 raise error.ProgrammingError('part can only be consumed once')
976 raise error.ProgrammingError('part can only be consumed once')
973 self._generated = False
977 self._generated = False
974
978
975 if ui.debugflag:
979 if ui.debugflag:
976 msg = ['bundle2-output-part: "%s"' % self.type]
980 msg = ['bundle2-output-part: "%s"' % self.type]
977 if not self.mandatory:
981 if not self.mandatory:
978 msg.append(' (advisory)')
982 msg.append(' (advisory)')
979 nbmp = len(self.mandatoryparams)
983 nbmp = len(self.mandatoryparams)
980 nbap = len(self.advisoryparams)
984 nbap = len(self.advisoryparams)
981 if nbmp or nbap:
985 if nbmp or nbap:
982 msg.append(' (params:')
986 msg.append(' (params:')
983 if nbmp:
987 if nbmp:
984 msg.append(' %i mandatory' % nbmp)
988 msg.append(' %i mandatory' % nbmp)
985 if nbap:
989 if nbap:
986 msg.append(' %i advisory' % nbmp)
990 msg.append(' %i advisory' % nbmp)
987 msg.append(')')
991 msg.append(')')
988 if not self.data:
992 if not self.data:
989 msg.append(' empty payload')
993 msg.append(' empty payload')
990 elif util.safehasattr(self.data, 'next'):
994 elif util.safehasattr(self.data, 'next'):
991 msg.append(' streamed payload')
995 msg.append(' streamed payload')
992 else:
996 else:
993 msg.append(' %i bytes payload' % len(self.data))
997 msg.append(' %i bytes payload' % len(self.data))
994 msg.append('\n')
998 msg.append('\n')
995 ui.debug(''.join(msg))
999 ui.debug(''.join(msg))
996
1000
997 #### header
1001 #### header
998 if self.mandatory:
1002 if self.mandatory:
999 parttype = self.type.upper()
1003 parttype = self.type.upper()
1000 else:
1004 else:
1001 parttype = self.type.lower()
1005 parttype = self.type.lower()
1002 outdebug(ui, 'part %s: "%s"' % (pycompat.bytestr(self.id), parttype))
1006 outdebug(ui, 'part %s: "%s"' % (pycompat.bytestr(self.id), parttype))
1003 ## parttype
1007 ## parttype
1004 header = [_pack(_fparttypesize, len(parttype)),
1008 header = [_pack(_fparttypesize, len(parttype)),
1005 parttype, _pack(_fpartid, self.id),
1009 parttype, _pack(_fpartid, self.id),
1006 ]
1010 ]
1007 ## parameters
1011 ## parameters
1008 # count
1012 # count
1009 manpar = self.mandatoryparams
1013 manpar = self.mandatoryparams
1010 advpar = self.advisoryparams
1014 advpar = self.advisoryparams
1011 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
1015 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
1012 # size
1016 # size
1013 parsizes = []
1017 parsizes = []
1014 for key, value in manpar:
1018 for key, value in manpar:
1015 parsizes.append(len(key))
1019 parsizes.append(len(key))
1016 parsizes.append(len(value))
1020 parsizes.append(len(value))
1017 for key, value in advpar:
1021 for key, value in advpar:
1018 parsizes.append(len(key))
1022 parsizes.append(len(key))
1019 parsizes.append(len(value))
1023 parsizes.append(len(value))
1020 paramsizes = _pack(_makefpartparamsizes(len(parsizes) // 2), *parsizes)
1024 paramsizes = _pack(_makefpartparamsizes(len(parsizes) // 2), *parsizes)
1021 header.append(paramsizes)
1025 header.append(paramsizes)
1022 # key, value
1026 # key, value
1023 for key, value in manpar:
1027 for key, value in manpar:
1024 header.append(key)
1028 header.append(key)
1025 header.append(value)
1029 header.append(value)
1026 for key, value in advpar:
1030 for key, value in advpar:
1027 header.append(key)
1031 header.append(key)
1028 header.append(value)
1032 header.append(value)
1029 ## finalize header
1033 ## finalize header
1030 headerchunk = ''.join(header)
1034 headerchunk = ''.join(header)
1031 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
1035 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
1032 yield _pack(_fpartheadersize, len(headerchunk))
1036 yield _pack(_fpartheadersize, len(headerchunk))
1033 yield headerchunk
1037 yield headerchunk
1034 ## payload
1038 ## payload
1035 try:
1039 try:
1036 for chunk in self._payloadchunks():
1040 for chunk in self._payloadchunks():
1037 outdebug(ui, 'payload chunk size: %i' % len(chunk))
1041 outdebug(ui, 'payload chunk size: %i' % len(chunk))
1038 yield _pack(_fpayloadsize, len(chunk))
1042 yield _pack(_fpayloadsize, len(chunk))
1039 yield chunk
1043 yield chunk
1040 except GeneratorExit:
1044 except GeneratorExit:
1041 # GeneratorExit means that nobody is listening for our
1045 # GeneratorExit means that nobody is listening for our
1042 # results anyway, so just bail quickly rather than trying
1046 # results anyway, so just bail quickly rather than trying
1043 # to produce an error part.
1047 # to produce an error part.
1044 ui.debug('bundle2-generatorexit\n')
1048 ui.debug('bundle2-generatorexit\n')
1045 raise
1049 raise
1046 except BaseException as exc:
1050 except BaseException as exc:
1047 bexc = util.forcebytestr(exc)
1051 bexc = util.forcebytestr(exc)
1048 # backup exception data for later
1052 # backup exception data for later
1049 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
1053 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
1050 % bexc)
1054 % bexc)
1051 tb = sys.exc_info()[2]
1055 tb = sys.exc_info()[2]
1052 msg = 'unexpected error: %s' % bexc
1056 msg = 'unexpected error: %s' % bexc
1053 interpart = bundlepart('error:abort', [('message', msg)],
1057 interpart = bundlepart('error:abort', [('message', msg)],
1054 mandatory=False)
1058 mandatory=False)
1055 interpart.id = 0
1059 interpart.id = 0
1056 yield _pack(_fpayloadsize, -1)
1060 yield _pack(_fpayloadsize, -1)
1057 for chunk in interpart.getchunks(ui=ui):
1061 for chunk in interpart.getchunks(ui=ui):
1058 yield chunk
1062 yield chunk
1059 outdebug(ui, 'closing payload chunk')
1063 outdebug(ui, 'closing payload chunk')
1060 # abort current part payload
1064 # abort current part payload
1061 yield _pack(_fpayloadsize, 0)
1065 yield _pack(_fpayloadsize, 0)
1062 pycompat.raisewithtb(exc, tb)
1066 pycompat.raisewithtb(exc, tb)
1063 # end of payload
1067 # end of payload
1064 outdebug(ui, 'closing payload chunk')
1068 outdebug(ui, 'closing payload chunk')
1065 yield _pack(_fpayloadsize, 0)
1069 yield _pack(_fpayloadsize, 0)
1066 self._generated = True
1070 self._generated = True
1067
1071
1068 def _payloadchunks(self):
1072 def _payloadchunks(self):
1069 """yield chunks of a the part payload
1073 """yield chunks of a the part payload
1070
1074
1071 Exists to handle the different methods to provide data to a part."""
1075 Exists to handle the different methods to provide data to a part."""
1072 # we only support fixed size data now.
1076 # we only support fixed size data now.
1073 # This will be improved in the future.
1077 # This will be improved in the future.
1074 if (util.safehasattr(self.data, 'next')
1078 if (util.safehasattr(self.data, 'next')
1075 or util.safehasattr(self.data, '__next__')):
1079 or util.safehasattr(self.data, '__next__')):
1076 buff = util.chunkbuffer(self.data)
1080 buff = util.chunkbuffer(self.data)
1077 chunk = buff.read(preferedchunksize)
1081 chunk = buff.read(preferedchunksize)
1078 while chunk:
1082 while chunk:
1079 yield chunk
1083 yield chunk
1080 chunk = buff.read(preferedchunksize)
1084 chunk = buff.read(preferedchunksize)
1081 elif len(self.data):
1085 elif len(self.data):
1082 yield self.data
1086 yield self.data
1083
1087
1084
1088
1085 flaginterrupt = -1
1089 flaginterrupt = -1
1086
1090
1087 class interrupthandler(unpackermixin):
1091 class interrupthandler(unpackermixin):
1088 """read one part and process it with restricted capability
1092 """read one part and process it with restricted capability
1089
1093
1090 This allows to transmit exception raised on the producer size during part
1094 This allows to transmit exception raised on the producer size during part
1091 iteration while the consumer is reading a part.
1095 iteration while the consumer is reading a part.
1092
1096
1093 Part processed in this manner only have access to a ui object,"""
1097 Part processed in this manner only have access to a ui object,"""
1094
1098
1095 def __init__(self, ui, fp):
1099 def __init__(self, ui, fp):
1096 super(interrupthandler, self).__init__(fp)
1100 super(interrupthandler, self).__init__(fp)
1097 self.ui = ui
1101 self.ui = ui
1098
1102
1099 def _readpartheader(self):
1103 def _readpartheader(self):
1100 """reads a part header size and return the bytes blob
1104 """reads a part header size and return the bytes blob
1101
1105
1102 returns None if empty"""
1106 returns None if empty"""
1103 headersize = self._unpack(_fpartheadersize)[0]
1107 headersize = self._unpack(_fpartheadersize)[0]
1104 if headersize < 0:
1108 if headersize < 0:
1105 raise error.BundleValueError('negative part header size: %i'
1109 raise error.BundleValueError('negative part header size: %i'
1106 % headersize)
1110 % headersize)
1107 indebug(self.ui, 'part header size: %i\n' % headersize)
1111 indebug(self.ui, 'part header size: %i\n' % headersize)
1108 if headersize:
1112 if headersize:
1109 return self._readexact(headersize)
1113 return self._readexact(headersize)
1110 return None
1114 return None
1111
1115
1112 def __call__(self):
1116 def __call__(self):
1113
1117
1114 self.ui.debug('bundle2-input-stream-interrupt:'
1118 self.ui.debug('bundle2-input-stream-interrupt:'
1115 ' opening out of band context\n')
1119 ' opening out of band context\n')
1116 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1120 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1117 headerblock = self._readpartheader()
1121 headerblock = self._readpartheader()
1118 if headerblock is None:
1122 if headerblock is None:
1119 indebug(self.ui, 'no part found during interruption.')
1123 indebug(self.ui, 'no part found during interruption.')
1120 return
1124 return
1121 part = unbundlepart(self.ui, headerblock, self._fp)
1125 part = unbundlepart(self.ui, headerblock, self._fp)
1122 op = interruptoperation(self.ui)
1126 op = interruptoperation(self.ui)
1123 _processpart(op, part)
1127 _processpart(op, part)
1124 self.ui.debug('bundle2-input-stream-interrupt:'
1128 self.ui.debug('bundle2-input-stream-interrupt:'
1125 ' closing out of band context\n')
1129 ' closing out of band context\n')
1126
1130
1127 class interruptoperation(object):
1131 class interruptoperation(object):
1128 """A limited operation to be use by part handler during interruption
1132 """A limited operation to be use by part handler during interruption
1129
1133
1130 It only have access to an ui object.
1134 It only have access to an ui object.
1131 """
1135 """
1132
1136
1133 def __init__(self, ui):
1137 def __init__(self, ui):
1134 self.ui = ui
1138 self.ui = ui
1135 self.reply = None
1139 self.reply = None
1136 self.captureoutput = False
1140 self.captureoutput = False
1137
1141
1138 @property
1142 @property
1139 def repo(self):
1143 def repo(self):
1140 raise error.ProgrammingError('no repo access from stream interruption')
1144 raise error.ProgrammingError('no repo access from stream interruption')
1141
1145
1142 def gettransaction(self):
1146 def gettransaction(self):
1143 raise TransactionUnavailable('no repo access from stream interruption')
1147 raise TransactionUnavailable('no repo access from stream interruption')
1144
1148
1145 class unbundlepart(unpackermixin):
1149 class unbundlepart(unpackermixin):
1146 """a bundle part read from a bundle"""
1150 """a bundle part read from a bundle"""
1147
1151
1148 def __init__(self, ui, header, fp):
1152 def __init__(self, ui, header, fp):
1149 super(unbundlepart, self).__init__(fp)
1153 super(unbundlepart, self).__init__(fp)
1150 self._seekable = (util.safehasattr(fp, 'seek') and
1154 self._seekable = (util.safehasattr(fp, 'seek') and
1151 util.safehasattr(fp, 'tell'))
1155 util.safehasattr(fp, 'tell'))
1152 self.ui = ui
1156 self.ui = ui
1153 # unbundle state attr
1157 # unbundle state attr
1154 self._headerdata = header
1158 self._headerdata = header
1155 self._headeroffset = 0
1159 self._headeroffset = 0
1156 self._initialized = False
1160 self._initialized = False
1157 self.consumed = False
1161 self.consumed = False
1158 # part data
1162 # part data
1159 self.id = None
1163 self.id = None
1160 self.type = None
1164 self.type = None
1161 self.mandatoryparams = None
1165 self.mandatoryparams = None
1162 self.advisoryparams = None
1166 self.advisoryparams = None
1163 self.params = None
1167 self.params = None
1164 self.mandatorykeys = ()
1168 self.mandatorykeys = ()
1165 self._payloadstream = None
1169 self._payloadstream = None
1166 self._readheader()
1170 self._readheader()
1167 self._mandatory = None
1171 self._mandatory = None
1168 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1172 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1169 self._pos = 0
1173 self._pos = 0
1170
1174
1171 def _fromheader(self, size):
1175 def _fromheader(self, size):
1172 """return the next <size> byte from the header"""
1176 """return the next <size> byte from the header"""
1173 offset = self._headeroffset
1177 offset = self._headeroffset
1174 data = self._headerdata[offset:(offset + size)]
1178 data = self._headerdata[offset:(offset + size)]
1175 self._headeroffset = offset + size
1179 self._headeroffset = offset + size
1176 return data
1180 return data
1177
1181
1178 def _unpackheader(self, format):
1182 def _unpackheader(self, format):
1179 """read given format from header
1183 """read given format from header
1180
1184
1181 This automatically compute the size of the format to read."""
1185 This automatically compute the size of the format to read."""
1182 data = self._fromheader(struct.calcsize(format))
1186 data = self._fromheader(struct.calcsize(format))
1183 return _unpack(format, data)
1187 return _unpack(format, data)
1184
1188
1185 def _initparams(self, mandatoryparams, advisoryparams):
1189 def _initparams(self, mandatoryparams, advisoryparams):
1186 """internal function to setup all logic related parameters"""
1190 """internal function to setup all logic related parameters"""
1187 # make it read only to prevent people touching it by mistake.
1191 # make it read only to prevent people touching it by mistake.
1188 self.mandatoryparams = tuple(mandatoryparams)
1192 self.mandatoryparams = tuple(mandatoryparams)
1189 self.advisoryparams = tuple(advisoryparams)
1193 self.advisoryparams = tuple(advisoryparams)
1190 # user friendly UI
1194 # user friendly UI
1191 self.params = util.sortdict(self.mandatoryparams)
1195 self.params = util.sortdict(self.mandatoryparams)
1192 self.params.update(self.advisoryparams)
1196 self.params.update(self.advisoryparams)
1193 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1197 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1194
1198
1195 def _payloadchunks(self, chunknum=0):
1199 def _payloadchunks(self, chunknum=0):
1196 '''seek to specified chunk and start yielding data'''
1200 '''seek to specified chunk and start yielding data'''
1197 if len(self._chunkindex) == 0:
1201 if len(self._chunkindex) == 0:
1198 assert chunknum == 0, 'Must start with chunk 0'
1202 assert chunknum == 0, 'Must start with chunk 0'
1199 self._chunkindex.append((0, self._tellfp()))
1203 self._chunkindex.append((0, self._tellfp()))
1200 else:
1204 else:
1201 assert chunknum < len(self._chunkindex), \
1205 assert chunknum < len(self._chunkindex), \
1202 'Unknown chunk %d' % chunknum
1206 'Unknown chunk %d' % chunknum
1203 self._seekfp(self._chunkindex[chunknum][1])
1207 self._seekfp(self._chunkindex[chunknum][1])
1204
1208
1205 pos = self._chunkindex[chunknum][0]
1209 pos = self._chunkindex[chunknum][0]
1206 payloadsize = self._unpack(_fpayloadsize)[0]
1210 payloadsize = self._unpack(_fpayloadsize)[0]
1207 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1211 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1208 while payloadsize:
1212 while payloadsize:
1209 if payloadsize == flaginterrupt:
1213 if payloadsize == flaginterrupt:
1210 # interruption detection, the handler will now read a
1214 # interruption detection, the handler will now read a
1211 # single part and process it.
1215 # single part and process it.
1212 interrupthandler(self.ui, self._fp)()
1216 interrupthandler(self.ui, self._fp)()
1213 elif payloadsize < 0:
1217 elif payloadsize < 0:
1214 msg = 'negative payload chunk size: %i' % payloadsize
1218 msg = 'negative payload chunk size: %i' % payloadsize
1215 raise error.BundleValueError(msg)
1219 raise error.BundleValueError(msg)
1216 else:
1220 else:
1217 result = self._readexact(payloadsize)
1221 result = self._readexact(payloadsize)
1218 chunknum += 1
1222 chunknum += 1
1219 pos += payloadsize
1223 pos += payloadsize
1220 if chunknum == len(self._chunkindex):
1224 if chunknum == len(self._chunkindex):
1221 self._chunkindex.append((pos, self._tellfp()))
1225 self._chunkindex.append((pos, self._tellfp()))
1222 yield result
1226 yield result
1223 payloadsize = self._unpack(_fpayloadsize)[0]
1227 payloadsize = self._unpack(_fpayloadsize)[0]
1224 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1228 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1225
1229
1226 def _findchunk(self, pos):
1230 def _findchunk(self, pos):
1227 '''for a given payload position, return a chunk number and offset'''
1231 '''for a given payload position, return a chunk number and offset'''
1228 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1232 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1229 if ppos == pos:
1233 if ppos == pos:
1230 return chunk, 0
1234 return chunk, 0
1231 elif ppos > pos:
1235 elif ppos > pos:
1232 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1236 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1233 raise ValueError('Unknown chunk')
1237 raise ValueError('Unknown chunk')
1234
1238
1235 def _readheader(self):
1239 def _readheader(self):
1236 """read the header and setup the object"""
1240 """read the header and setup the object"""
1237 typesize = self._unpackheader(_fparttypesize)[0]
1241 typesize = self._unpackheader(_fparttypesize)[0]
1238 self.type = self._fromheader(typesize)
1242 self.type = self._fromheader(typesize)
1239 indebug(self.ui, 'part type: "%s"' % self.type)
1243 indebug(self.ui, 'part type: "%s"' % self.type)
1240 self.id = self._unpackheader(_fpartid)[0]
1244 self.id = self._unpackheader(_fpartid)[0]
1241 indebug(self.ui, 'part id: "%s"' % pycompat.bytestr(self.id))
1245 indebug(self.ui, 'part id: "%s"' % pycompat.bytestr(self.id))
1242 # extract mandatory bit from type
1246 # extract mandatory bit from type
1243 self.mandatory = (self.type != self.type.lower())
1247 self.mandatory = (self.type != self.type.lower())
1244 self.type = self.type.lower()
1248 self.type = self.type.lower()
1245 ## reading parameters
1249 ## reading parameters
1246 # param count
1250 # param count
1247 mancount, advcount = self._unpackheader(_fpartparamcount)
1251 mancount, advcount = self._unpackheader(_fpartparamcount)
1248 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1252 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1249 # param size
1253 # param size
1250 fparamsizes = _makefpartparamsizes(mancount + advcount)
1254 fparamsizes = _makefpartparamsizes(mancount + advcount)
1251 paramsizes = self._unpackheader(fparamsizes)
1255 paramsizes = self._unpackheader(fparamsizes)
1252 # make it a list of couple again
1256 # make it a list of couple again
1253 paramsizes = list(zip(paramsizes[::2], paramsizes[1::2]))
1257 paramsizes = list(zip(paramsizes[::2], paramsizes[1::2]))
1254 # split mandatory from advisory
1258 # split mandatory from advisory
1255 mansizes = paramsizes[:mancount]
1259 mansizes = paramsizes[:mancount]
1256 advsizes = paramsizes[mancount:]
1260 advsizes = paramsizes[mancount:]
1257 # retrieve param value
1261 # retrieve param value
1258 manparams = []
1262 manparams = []
1259 for key, value in mansizes:
1263 for key, value in mansizes:
1260 manparams.append((self._fromheader(key), self._fromheader(value)))
1264 manparams.append((self._fromheader(key), self._fromheader(value)))
1261 advparams = []
1265 advparams = []
1262 for key, value in advsizes:
1266 for key, value in advsizes:
1263 advparams.append((self._fromheader(key), self._fromheader(value)))
1267 advparams.append((self._fromheader(key), self._fromheader(value)))
1264 self._initparams(manparams, advparams)
1268 self._initparams(manparams, advparams)
1265 ## part payload
1269 ## part payload
1266 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1270 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1267 # we read the data, tell it
1271 # we read the data, tell it
1268 self._initialized = True
1272 self._initialized = True
1269
1273
1270 def read(self, size=None):
1274 def read(self, size=None):
1271 """read payload data"""
1275 """read payload data"""
1272 if not self._initialized:
1276 if not self._initialized:
1273 self._readheader()
1277 self._readheader()
1274 if size is None:
1278 if size is None:
1275 data = self._payloadstream.read()
1279 data = self._payloadstream.read()
1276 else:
1280 else:
1277 data = self._payloadstream.read(size)
1281 data = self._payloadstream.read(size)
1278 self._pos += len(data)
1282 self._pos += len(data)
1279 if size is None or len(data) < size:
1283 if size is None or len(data) < size:
1280 if not self.consumed and self._pos:
1284 if not self.consumed and self._pos:
1281 self.ui.debug('bundle2-input-part: total payload size %i\n'
1285 self.ui.debug('bundle2-input-part: total payload size %i\n'
1282 % self._pos)
1286 % self._pos)
1283 self.consumed = True
1287 self.consumed = True
1284 return data
1288 return data
1285
1289
1286 def tell(self):
1290 def tell(self):
1287 return self._pos
1291 return self._pos
1288
1292
1289 def seek(self, offset, whence=0):
1293 def seek(self, offset, whence=0):
1290 if whence == 0:
1294 if whence == 0:
1291 newpos = offset
1295 newpos = offset
1292 elif whence == 1:
1296 elif whence == 1:
1293 newpos = self._pos + offset
1297 newpos = self._pos + offset
1294 elif whence == 2:
1298 elif whence == 2:
1295 if not self.consumed:
1299 if not self.consumed:
1296 self.read()
1300 self.read()
1297 newpos = self._chunkindex[-1][0] - offset
1301 newpos = self._chunkindex[-1][0] - offset
1298 else:
1302 else:
1299 raise ValueError('Unknown whence value: %r' % (whence,))
1303 raise ValueError('Unknown whence value: %r' % (whence,))
1300
1304
1301 if newpos > self._chunkindex[-1][0] and not self.consumed:
1305 if newpos > self._chunkindex[-1][0] and not self.consumed:
1302 self.read()
1306 self.read()
1303 if not 0 <= newpos <= self._chunkindex[-1][0]:
1307 if not 0 <= newpos <= self._chunkindex[-1][0]:
1304 raise ValueError('Offset out of range')
1308 raise ValueError('Offset out of range')
1305
1309
1306 if self._pos != newpos:
1310 if self._pos != newpos:
1307 chunk, internaloffset = self._findchunk(newpos)
1311 chunk, internaloffset = self._findchunk(newpos)
1308 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1312 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1309 adjust = self.read(internaloffset)
1313 adjust = self.read(internaloffset)
1310 if len(adjust) != internaloffset:
1314 if len(adjust) != internaloffset:
1311 raise error.Abort(_('Seek failed\n'))
1315 raise error.Abort(_('Seek failed\n'))
1312 self._pos = newpos
1316 self._pos = newpos
1313
1317
1314 def _seekfp(self, offset, whence=0):
1318 def _seekfp(self, offset, whence=0):
1315 """move the underlying file pointer
1319 """move the underlying file pointer
1316
1320
1317 This method is meant for internal usage by the bundle2 protocol only.
1321 This method is meant for internal usage by the bundle2 protocol only.
1318 They directly manipulate the low level stream including bundle2 level
1322 They directly manipulate the low level stream including bundle2 level
1319 instruction.
1323 instruction.
1320
1324
1321 Do not use it to implement higher-level logic or methods."""
1325 Do not use it to implement higher-level logic or methods."""
1322 if self._seekable:
1326 if self._seekable:
1323 return self._fp.seek(offset, whence)
1327 return self._fp.seek(offset, whence)
1324 else:
1328 else:
1325 raise NotImplementedError(_('File pointer is not seekable'))
1329 raise NotImplementedError(_('File pointer is not seekable'))
1326
1330
1327 def _tellfp(self):
1331 def _tellfp(self):
1328 """return the file offset, or None if file is not seekable
1332 """return the file offset, or None if file is not seekable
1329
1333
1330 This method is meant for internal usage by the bundle2 protocol only.
1334 This method is meant for internal usage by the bundle2 protocol only.
1331 They directly manipulate the low level stream including bundle2 level
1335 They directly manipulate the low level stream including bundle2 level
1332 instruction.
1336 instruction.
1333
1337
1334 Do not use it to implement higher-level logic or methods."""
1338 Do not use it to implement higher-level logic or methods."""
1335 if self._seekable:
1339 if self._seekable:
1336 try:
1340 try:
1337 return self._fp.tell()
1341 return self._fp.tell()
1338 except IOError as e:
1342 except IOError as e:
1339 if e.errno == errno.ESPIPE:
1343 if e.errno == errno.ESPIPE:
1340 self._seekable = False
1344 self._seekable = False
1341 else:
1345 else:
1342 raise
1346 raise
1343 return None
1347 return None
1344
1348
1345 # These are only the static capabilities.
1349 # These are only the static capabilities.
1346 # Check the 'getrepocaps' function for the rest.
1350 # Check the 'getrepocaps' function for the rest.
1347 capabilities = {'HG20': (),
1351 capabilities = {'HG20': (),
1348 'error': ('abort', 'unsupportedcontent', 'pushraced',
1352 'error': ('abort', 'unsupportedcontent', 'pushraced',
1349 'pushkey'),
1353 'pushkey'),
1350 'listkeys': (),
1354 'listkeys': (),
1351 'pushkey': (),
1355 'pushkey': (),
1352 'digests': tuple(sorted(util.DIGESTS.keys())),
1356 'digests': tuple(sorted(util.DIGESTS.keys())),
1353 'remote-changegroup': ('http', 'https'),
1357 'remote-changegroup': ('http', 'https'),
1354 'hgtagsfnodes': (),
1358 'hgtagsfnodes': (),
1355 }
1359 }
1356
1360
1357 def getrepocaps(repo, allowpushback=False):
1361 def getrepocaps(repo, allowpushback=False):
1358 """return the bundle2 capabilities for a given repo
1362 """return the bundle2 capabilities for a given repo
1359
1363
1360 Exists to allow extensions (like evolution) to mutate the capabilities.
1364 Exists to allow extensions (like evolution) to mutate the capabilities.
1361 """
1365 """
1362 caps = capabilities.copy()
1366 caps = capabilities.copy()
1363 caps['changegroup'] = tuple(sorted(
1367 caps['changegroup'] = tuple(sorted(
1364 changegroup.supportedincomingversions(repo)))
1368 changegroup.supportedincomingversions(repo)))
1365 if obsolete.isenabled(repo, obsolete.exchangeopt):
1369 if obsolete.isenabled(repo, obsolete.exchangeopt):
1366 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1370 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1367 caps['obsmarkers'] = supportedformat
1371 caps['obsmarkers'] = supportedformat
1368 if allowpushback:
1372 if allowpushback:
1369 caps['pushback'] = ()
1373 caps['pushback'] = ()
1370 cpmode = repo.ui.config('server', 'concurrent-push-mode')
1374 cpmode = repo.ui.config('server', 'concurrent-push-mode')
1371 if cpmode == 'check-related':
1375 if cpmode == 'check-related':
1372 caps['checkheads'] = ('related',)
1376 caps['checkheads'] = ('related',)
1373 return caps
1377 return caps
1374
1378
1375 def bundle2caps(remote):
1379 def bundle2caps(remote):
1376 """return the bundle capabilities of a peer as dict"""
1380 """return the bundle capabilities of a peer as dict"""
1377 raw = remote.capable('bundle2')
1381 raw = remote.capable('bundle2')
1378 if not raw and raw != '':
1382 if not raw and raw != '':
1379 return {}
1383 return {}
1380 capsblob = urlreq.unquote(remote.capable('bundle2'))
1384 capsblob = urlreq.unquote(remote.capable('bundle2'))
1381 return decodecaps(capsblob)
1385 return decodecaps(capsblob)
1382
1386
1383 def obsmarkersversion(caps):
1387 def obsmarkersversion(caps):
1384 """extract the list of supported obsmarkers versions from a bundle2caps dict
1388 """extract the list of supported obsmarkers versions from a bundle2caps dict
1385 """
1389 """
1386 obscaps = caps.get('obsmarkers', ())
1390 obscaps = caps.get('obsmarkers', ())
1387 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1391 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1388
1392
1389 def writenewbundle(ui, repo, source, filename, bundletype, outgoing, opts,
1393 def writenewbundle(ui, repo, source, filename, bundletype, outgoing, opts,
1390 vfs=None, compression=None, compopts=None):
1394 vfs=None, compression=None, compopts=None):
1391 if bundletype.startswith('HG10'):
1395 if bundletype.startswith('HG10'):
1392 cg = changegroup.getchangegroup(repo, source, outgoing, version='01')
1396 cg = changegroup.getchangegroup(repo, source, outgoing, version='01')
1393 return writebundle(ui, cg, filename, bundletype, vfs=vfs,
1397 return writebundle(ui, cg, filename, bundletype, vfs=vfs,
1394 compression=compression, compopts=compopts)
1398 compression=compression, compopts=compopts)
1395 elif not bundletype.startswith('HG20'):
1399 elif not bundletype.startswith('HG20'):
1396 raise error.ProgrammingError('unknown bundle type: %s' % bundletype)
1400 raise error.ProgrammingError('unknown bundle type: %s' % bundletype)
1397
1401
1398 caps = {}
1402 caps = {}
1399 if 'obsolescence' in opts:
1403 if 'obsolescence' in opts:
1400 caps['obsmarkers'] = ('V1',)
1404 caps['obsmarkers'] = ('V1',)
1401 bundle = bundle20(ui, caps)
1405 bundle = bundle20(ui, caps)
1402 bundle.setcompression(compression, compopts)
1406 bundle.setcompression(compression, compopts)
1403 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1407 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1404 chunkiter = bundle.getchunks()
1408 chunkiter = bundle.getchunks()
1405
1409
1406 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1410 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1407
1411
1408 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1412 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1409 # We should eventually reconcile this logic with the one behind
1413 # We should eventually reconcile this logic with the one behind
1410 # 'exchange.getbundle2partsgenerator'.
1414 # 'exchange.getbundle2partsgenerator'.
1411 #
1415 #
1412 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1416 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1413 # different right now. So we keep them separated for now for the sake of
1417 # different right now. So we keep them separated for now for the sake of
1414 # simplicity.
1418 # simplicity.
1415
1419
1416 # we always want a changegroup in such bundle
1420 # we always want a changegroup in such bundle
1417 cgversion = opts.get('cg.version')
1421 cgversion = opts.get('cg.version')
1418 if cgversion is None:
1422 if cgversion is None:
1419 cgversion = changegroup.safeversion(repo)
1423 cgversion = changegroup.safeversion(repo)
1420 cg = changegroup.getchangegroup(repo, source, outgoing,
1424 cg = changegroup.getchangegroup(repo, source, outgoing,
1421 version=cgversion)
1425 version=cgversion)
1422 part = bundler.newpart('changegroup', data=cg.getchunks())
1426 part = bundler.newpart('changegroup', data=cg.getchunks())
1423 part.addparam('version', cg.version)
1427 part.addparam('version', cg.version)
1424 if 'clcount' in cg.extras:
1428 if 'clcount' in cg.extras:
1425 part.addparam('nbchanges', str(cg.extras['clcount']),
1429 part.addparam('nbchanges', str(cg.extras['clcount']),
1426 mandatory=False)
1430 mandatory=False)
1427 if opts.get('phases') and repo.revs('%ln and secret()',
1431 if opts.get('phases') and repo.revs('%ln and secret()',
1428 outgoing.missingheads):
1432 outgoing.missingheads):
1429 part.addparam('targetphase', '%d' % phases.secret, mandatory=False)
1433 part.addparam('targetphase', '%d' % phases.secret, mandatory=False)
1430
1434
1431 addparttagsfnodescache(repo, bundler, outgoing)
1435 addparttagsfnodescache(repo, bundler, outgoing)
1432
1436
1433 if opts.get('obsolescence', False):
1437 if opts.get('obsolescence', False):
1434 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1438 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1435 buildobsmarkerspart(bundler, obsmarkers)
1439 buildobsmarkerspart(bundler, obsmarkers)
1436
1440
1437 if opts.get('phases', False):
1441 if opts.get('phases', False):
1438 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1442 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1439 phasedata = []
1443 phasedata = []
1440 for phase in phases.allphases:
1444 for phase in phases.allphases:
1441 for head in headsbyphase[phase]:
1445 for head in headsbyphase[phase]:
1442 phasedata.append(_pack(_fphasesentry, phase, head))
1446 phasedata.append(_pack(_fphasesentry, phase, head))
1443 bundler.newpart('phase-heads', data=''.join(phasedata))
1447 bundler.newpart('phase-heads', data=''.join(phasedata))
1444
1448
1445 def addparttagsfnodescache(repo, bundler, outgoing):
1449 def addparttagsfnodescache(repo, bundler, outgoing):
1446 # we include the tags fnode cache for the bundle changeset
1450 # we include the tags fnode cache for the bundle changeset
1447 # (as an optional parts)
1451 # (as an optional parts)
1448 cache = tags.hgtagsfnodescache(repo.unfiltered())
1452 cache = tags.hgtagsfnodescache(repo.unfiltered())
1449 chunks = []
1453 chunks = []
1450
1454
1451 # .hgtags fnodes are only relevant for head changesets. While we could
1455 # .hgtags fnodes are only relevant for head changesets. While we could
1452 # transfer values for all known nodes, there will likely be little to
1456 # transfer values for all known nodes, there will likely be little to
1453 # no benefit.
1457 # no benefit.
1454 #
1458 #
1455 # We don't bother using a generator to produce output data because
1459 # We don't bother using a generator to produce output data because
1456 # a) we only have 40 bytes per head and even esoteric numbers of heads
1460 # a) we only have 40 bytes per head and even esoteric numbers of heads
1457 # consume little memory (1M heads is 40MB) b) we don't want to send the
1461 # consume little memory (1M heads is 40MB) b) we don't want to send the
1458 # part if we don't have entries and knowing if we have entries requires
1462 # part if we don't have entries and knowing if we have entries requires
1459 # cache lookups.
1463 # cache lookups.
1460 for node in outgoing.missingheads:
1464 for node in outgoing.missingheads:
1461 # Don't compute missing, as this may slow down serving.
1465 # Don't compute missing, as this may slow down serving.
1462 fnode = cache.getfnode(node, computemissing=False)
1466 fnode = cache.getfnode(node, computemissing=False)
1463 if fnode is not None:
1467 if fnode is not None:
1464 chunks.extend([node, fnode])
1468 chunks.extend([node, fnode])
1465
1469
1466 if chunks:
1470 if chunks:
1467 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1471 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1468
1472
1469 def buildobsmarkerspart(bundler, markers):
1473 def buildobsmarkerspart(bundler, markers):
1470 """add an obsmarker part to the bundler with <markers>
1474 """add an obsmarker part to the bundler with <markers>
1471
1475
1472 No part is created if markers is empty.
1476 No part is created if markers is empty.
1473 Raises ValueError if the bundler doesn't support any known obsmarker format.
1477 Raises ValueError if the bundler doesn't support any known obsmarker format.
1474 """
1478 """
1475 if not markers:
1479 if not markers:
1476 return None
1480 return None
1477
1481
1478 remoteversions = obsmarkersversion(bundler.capabilities)
1482 remoteversions = obsmarkersversion(bundler.capabilities)
1479 version = obsolete.commonversion(remoteversions)
1483 version = obsolete.commonversion(remoteversions)
1480 if version is None:
1484 if version is None:
1481 raise ValueError('bundler does not support common obsmarker format')
1485 raise ValueError('bundler does not support common obsmarker format')
1482 stream = obsolete.encodemarkers(markers, True, version=version)
1486 stream = obsolete.encodemarkers(markers, True, version=version)
1483 return bundler.newpart('obsmarkers', data=stream)
1487 return bundler.newpart('obsmarkers', data=stream)
1484
1488
1485 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1489 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1486 compopts=None):
1490 compopts=None):
1487 """Write a bundle file and return its filename.
1491 """Write a bundle file and return its filename.
1488
1492
1489 Existing files will not be overwritten.
1493 Existing files will not be overwritten.
1490 If no filename is specified, a temporary file is created.
1494 If no filename is specified, a temporary file is created.
1491 bz2 compression can be turned off.
1495 bz2 compression can be turned off.
1492 The bundle file will be deleted in case of errors.
1496 The bundle file will be deleted in case of errors.
1493 """
1497 """
1494
1498
1495 if bundletype == "HG20":
1499 if bundletype == "HG20":
1496 bundle = bundle20(ui)
1500 bundle = bundle20(ui)
1497 bundle.setcompression(compression, compopts)
1501 bundle.setcompression(compression, compopts)
1498 part = bundle.newpart('changegroup', data=cg.getchunks())
1502 part = bundle.newpart('changegroup', data=cg.getchunks())
1499 part.addparam('version', cg.version)
1503 part.addparam('version', cg.version)
1500 if 'clcount' in cg.extras:
1504 if 'clcount' in cg.extras:
1501 part.addparam('nbchanges', str(cg.extras['clcount']),
1505 part.addparam('nbchanges', str(cg.extras['clcount']),
1502 mandatory=False)
1506 mandatory=False)
1503 chunkiter = bundle.getchunks()
1507 chunkiter = bundle.getchunks()
1504 else:
1508 else:
1505 # compression argument is only for the bundle2 case
1509 # compression argument is only for the bundle2 case
1506 assert compression is None
1510 assert compression is None
1507 if cg.version != '01':
1511 if cg.version != '01':
1508 raise error.Abort(_('old bundle types only supports v1 '
1512 raise error.Abort(_('old bundle types only supports v1 '
1509 'changegroups'))
1513 'changegroups'))
1510 header, comp = bundletypes[bundletype]
1514 header, comp = bundletypes[bundletype]
1511 if comp not in util.compengines.supportedbundletypes:
1515 if comp not in util.compengines.supportedbundletypes:
1512 raise error.Abort(_('unknown stream compression type: %s')
1516 raise error.Abort(_('unknown stream compression type: %s')
1513 % comp)
1517 % comp)
1514 compengine = util.compengines.forbundletype(comp)
1518 compengine = util.compengines.forbundletype(comp)
1515 def chunkiter():
1519 def chunkiter():
1516 yield header
1520 yield header
1517 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1521 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1518 yield chunk
1522 yield chunk
1519 chunkiter = chunkiter()
1523 chunkiter = chunkiter()
1520
1524
1521 # parse the changegroup data, otherwise we will block
1525 # parse the changegroup data, otherwise we will block
1522 # in case of sshrepo because we don't know the end of the stream
1526 # in case of sshrepo because we don't know the end of the stream
1523 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1527 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1524
1528
1525 def combinechangegroupresults(op):
1529 def combinechangegroupresults(op):
1526 """logic to combine 0 or more addchangegroup results into one"""
1530 """logic to combine 0 or more addchangegroup results into one"""
1527 results = [r.get('return', 0)
1531 results = [r.get('return', 0)
1528 for r in op.records['changegroup']]
1532 for r in op.records['changegroup']]
1529 changedheads = 0
1533 changedheads = 0
1530 result = 1
1534 result = 1
1531 for ret in results:
1535 for ret in results:
1532 # If any changegroup result is 0, return 0
1536 # If any changegroup result is 0, return 0
1533 if ret == 0:
1537 if ret == 0:
1534 result = 0
1538 result = 0
1535 break
1539 break
1536 if ret < -1:
1540 if ret < -1:
1537 changedheads += ret + 1
1541 changedheads += ret + 1
1538 elif ret > 1:
1542 elif ret > 1:
1539 changedheads += ret - 1
1543 changedheads += ret - 1
1540 if changedheads > 0:
1544 if changedheads > 0:
1541 result = 1 + changedheads
1545 result = 1 + changedheads
1542 elif changedheads < 0:
1546 elif changedheads < 0:
1543 result = -1 + changedheads
1547 result = -1 + changedheads
1544 return result
1548 return result
1545
1549
1546 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest',
1550 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest',
1547 'targetphase'))
1551 'targetphase'))
1548 def handlechangegroup(op, inpart):
1552 def handlechangegroup(op, inpart):
1549 """apply a changegroup part on the repo
1553 """apply a changegroup part on the repo
1550
1554
1551 This is a very early implementation that will massive rework before being
1555 This is a very early implementation that will massive rework before being
1552 inflicted to any end-user.
1556 inflicted to any end-user.
1553 """
1557 """
1554 tr = op.gettransaction()
1558 tr = op.gettransaction()
1555 unpackerversion = inpart.params.get('version', '01')
1559 unpackerversion = inpart.params.get('version', '01')
1556 # We should raise an appropriate exception here
1560 # We should raise an appropriate exception here
1557 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1561 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1558 # the source and url passed here are overwritten by the one contained in
1562 # the source and url passed here are overwritten by the one contained in
1559 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1563 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1560 nbchangesets = None
1564 nbchangesets = None
1561 if 'nbchanges' in inpart.params:
1565 if 'nbchanges' in inpart.params:
1562 nbchangesets = int(inpart.params.get('nbchanges'))
1566 nbchangesets = int(inpart.params.get('nbchanges'))
1563 if ('treemanifest' in inpart.params and
1567 if ('treemanifest' in inpart.params and
1564 'treemanifest' not in op.repo.requirements):
1568 'treemanifest' not in op.repo.requirements):
1565 if len(op.repo.changelog) != 0:
1569 if len(op.repo.changelog) != 0:
1566 raise error.Abort(_(
1570 raise error.Abort(_(
1567 "bundle contains tree manifests, but local repo is "
1571 "bundle contains tree manifests, but local repo is "
1568 "non-empty and does not use tree manifests"))
1572 "non-empty and does not use tree manifests"))
1569 op.repo.requirements.add('treemanifest')
1573 op.repo.requirements.add('treemanifest')
1570 op.repo._applyopenerreqs()
1574 op.repo._applyopenerreqs()
1571 op.repo._writerequirements()
1575 op.repo._writerequirements()
1572 extrakwargs = {}
1576 extrakwargs = {}
1573 targetphase = inpart.params.get('targetphase')
1577 targetphase = inpart.params.get('targetphase')
1574 if targetphase is not None:
1578 if targetphase is not None:
1575 extrakwargs['targetphase'] = int(targetphase)
1579 extrakwargs['targetphase'] = int(targetphase)
1576 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2',
1580 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2',
1577 expectedtotal=nbchangesets, **extrakwargs)
1581 expectedtotal=nbchangesets, **extrakwargs)
1578 if op.reply is not None:
1582 if op.reply is not None:
1579 # This is definitely not the final form of this
1583 # This is definitely not the final form of this
1580 # return. But one need to start somewhere.
1584 # return. But one need to start somewhere.
1581 part = op.reply.newpart('reply:changegroup', mandatory=False)
1585 part = op.reply.newpart('reply:changegroup', mandatory=False)
1582 part.addparam(
1586 part.addparam(
1583 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1587 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1584 part.addparam('return', '%i' % ret, mandatory=False)
1588 part.addparam('return', '%i' % ret, mandatory=False)
1585 assert not inpart.read()
1589 assert not inpart.read()
1586
1590
1587 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1591 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1588 ['digest:%s' % k for k in util.DIGESTS.keys()])
1592 ['digest:%s' % k for k in util.DIGESTS.keys()])
1589 @parthandler('remote-changegroup', _remotechangegroupparams)
1593 @parthandler('remote-changegroup', _remotechangegroupparams)
1590 def handleremotechangegroup(op, inpart):
1594 def handleremotechangegroup(op, inpart):
1591 """apply a bundle10 on the repo, given an url and validation information
1595 """apply a bundle10 on the repo, given an url and validation information
1592
1596
1593 All the information about the remote bundle to import are given as
1597 All the information about the remote bundle to import are given as
1594 parameters. The parameters include:
1598 parameters. The parameters include:
1595 - url: the url to the bundle10.
1599 - url: the url to the bundle10.
1596 - size: the bundle10 file size. It is used to validate what was
1600 - size: the bundle10 file size. It is used to validate what was
1597 retrieved by the client matches the server knowledge about the bundle.
1601 retrieved by the client matches the server knowledge about the bundle.
1598 - digests: a space separated list of the digest types provided as
1602 - digests: a space separated list of the digest types provided as
1599 parameters.
1603 parameters.
1600 - digest:<digest-type>: the hexadecimal representation of the digest with
1604 - digest:<digest-type>: the hexadecimal representation of the digest with
1601 that name. Like the size, it is used to validate what was retrieved by
1605 that name. Like the size, it is used to validate what was retrieved by
1602 the client matches what the server knows about the bundle.
1606 the client matches what the server knows about the bundle.
1603
1607
1604 When multiple digest types are given, all of them are checked.
1608 When multiple digest types are given, all of them are checked.
1605 """
1609 """
1606 try:
1610 try:
1607 raw_url = inpart.params['url']
1611 raw_url = inpart.params['url']
1608 except KeyError:
1612 except KeyError:
1609 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1613 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1610 parsed_url = util.url(raw_url)
1614 parsed_url = util.url(raw_url)
1611 if parsed_url.scheme not in capabilities['remote-changegroup']:
1615 if parsed_url.scheme not in capabilities['remote-changegroup']:
1612 raise error.Abort(_('remote-changegroup does not support %s urls') %
1616 raise error.Abort(_('remote-changegroup does not support %s urls') %
1613 parsed_url.scheme)
1617 parsed_url.scheme)
1614
1618
1615 try:
1619 try:
1616 size = int(inpart.params['size'])
1620 size = int(inpart.params['size'])
1617 except ValueError:
1621 except ValueError:
1618 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1622 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1619 % 'size')
1623 % 'size')
1620 except KeyError:
1624 except KeyError:
1621 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1625 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1622
1626
1623 digests = {}
1627 digests = {}
1624 for typ in inpart.params.get('digests', '').split():
1628 for typ in inpart.params.get('digests', '').split():
1625 param = 'digest:%s' % typ
1629 param = 'digest:%s' % typ
1626 try:
1630 try:
1627 value = inpart.params[param]
1631 value = inpart.params[param]
1628 except KeyError:
1632 except KeyError:
1629 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1633 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1630 param)
1634 param)
1631 digests[typ] = value
1635 digests[typ] = value
1632
1636
1633 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1637 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1634
1638
1635 tr = op.gettransaction()
1639 tr = op.gettransaction()
1636 from . import exchange
1640 from . import exchange
1637 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1641 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1638 if not isinstance(cg, changegroup.cg1unpacker):
1642 if not isinstance(cg, changegroup.cg1unpacker):
1639 raise error.Abort(_('%s: not a bundle version 1.0') %
1643 raise error.Abort(_('%s: not a bundle version 1.0') %
1640 util.hidepassword(raw_url))
1644 util.hidepassword(raw_url))
1641 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2')
1645 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2')
1642 if op.reply is not None:
1646 if op.reply is not None:
1643 # This is definitely not the final form of this
1647 # This is definitely not the final form of this
1644 # return. But one need to start somewhere.
1648 # return. But one need to start somewhere.
1645 part = op.reply.newpart('reply:changegroup')
1649 part = op.reply.newpart('reply:changegroup')
1646 part.addparam(
1650 part.addparam(
1647 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1651 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1648 part.addparam('return', '%i' % ret, mandatory=False)
1652 part.addparam('return', '%i' % ret, mandatory=False)
1649 try:
1653 try:
1650 real_part.validate()
1654 real_part.validate()
1651 except error.Abort as e:
1655 except error.Abort as e:
1652 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1656 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1653 (util.hidepassword(raw_url), str(e)))
1657 (util.hidepassword(raw_url), str(e)))
1654 assert not inpart.read()
1658 assert not inpart.read()
1655
1659
1656 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1660 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1657 def handlereplychangegroup(op, inpart):
1661 def handlereplychangegroup(op, inpart):
1658 ret = int(inpart.params['return'])
1662 ret = int(inpart.params['return'])
1659 replyto = int(inpart.params['in-reply-to'])
1663 replyto = int(inpart.params['in-reply-to'])
1660 op.records.add('changegroup', {'return': ret}, replyto)
1664 op.records.add('changegroup', {'return': ret}, replyto)
1661
1665
1662 @parthandler('check:heads')
1666 @parthandler('check:heads')
1663 def handlecheckheads(op, inpart):
1667 def handlecheckheads(op, inpart):
1664 """check that head of the repo did not change
1668 """check that head of the repo did not change
1665
1669
1666 This is used to detect a push race when using unbundle.
1670 This is used to detect a push race when using unbundle.
1667 This replaces the "heads" argument of unbundle."""
1671 This replaces the "heads" argument of unbundle."""
1668 h = inpart.read(20)
1672 h = inpart.read(20)
1669 heads = []
1673 heads = []
1670 while len(h) == 20:
1674 while len(h) == 20:
1671 heads.append(h)
1675 heads.append(h)
1672 h = inpart.read(20)
1676 h = inpart.read(20)
1673 assert not h
1677 assert not h
1674 # Trigger a transaction so that we are guaranteed to have the lock now.
1678 # Trigger a transaction so that we are guaranteed to have the lock now.
1675 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1679 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1676 op.gettransaction()
1680 op.gettransaction()
1677 if sorted(heads) != sorted(op.repo.heads()):
1681 if sorted(heads) != sorted(op.repo.heads()):
1678 raise error.PushRaced('repository changed while pushing - '
1682 raise error.PushRaced('repository changed while pushing - '
1679 'please try again')
1683 'please try again')
1680
1684
1681 @parthandler('check:updated-heads')
1685 @parthandler('check:updated-heads')
1682 def handlecheckupdatedheads(op, inpart):
1686 def handlecheckupdatedheads(op, inpart):
1683 """check for race on the heads touched by a push
1687 """check for race on the heads touched by a push
1684
1688
1685 This is similar to 'check:heads' but focus on the heads actually updated
1689 This is similar to 'check:heads' but focus on the heads actually updated
1686 during the push. If other activities happen on unrelated heads, it is
1690 during the push. If other activities happen on unrelated heads, it is
1687 ignored.
1691 ignored.
1688
1692
1689 This allow server with high traffic to avoid push contention as long as
1693 This allow server with high traffic to avoid push contention as long as
1690 unrelated parts of the graph are involved."""
1694 unrelated parts of the graph are involved."""
1691 h = inpart.read(20)
1695 h = inpart.read(20)
1692 heads = []
1696 heads = []
1693 while len(h) == 20:
1697 while len(h) == 20:
1694 heads.append(h)
1698 heads.append(h)
1695 h = inpart.read(20)
1699 h = inpart.read(20)
1696 assert not h
1700 assert not h
1697 # trigger a transaction so that we are guaranteed to have the lock now.
1701 # trigger a transaction so that we are guaranteed to have the lock now.
1698 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1702 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1699 op.gettransaction()
1703 op.gettransaction()
1700
1704
1701 currentheads = set()
1705 currentheads = set()
1702 for ls in op.repo.branchmap().itervalues():
1706 for ls in op.repo.branchmap().itervalues():
1703 currentheads.update(ls)
1707 currentheads.update(ls)
1704
1708
1705 for h in heads:
1709 for h in heads:
1706 if h not in currentheads:
1710 if h not in currentheads:
1707 raise error.PushRaced('repository changed while pushing - '
1711 raise error.PushRaced('repository changed while pushing - '
1708 'please try again')
1712 'please try again')
1709
1713
1710 @parthandler('output')
1714 @parthandler('output')
1711 def handleoutput(op, inpart):
1715 def handleoutput(op, inpart):
1712 """forward output captured on the server to the client"""
1716 """forward output captured on the server to the client"""
1713 for line in inpart.read().splitlines():
1717 for line in inpart.read().splitlines():
1714 op.ui.status(_('remote: %s\n') % line)
1718 op.ui.status(_('remote: %s\n') % line)
1715
1719
1716 @parthandler('replycaps')
1720 @parthandler('replycaps')
1717 def handlereplycaps(op, inpart):
1721 def handlereplycaps(op, inpart):
1718 """Notify that a reply bundle should be created
1722 """Notify that a reply bundle should be created
1719
1723
1720 The payload contains the capabilities information for the reply"""
1724 The payload contains the capabilities information for the reply"""
1721 caps = decodecaps(inpart.read())
1725 caps = decodecaps(inpart.read())
1722 if op.reply is None:
1726 if op.reply is None:
1723 op.reply = bundle20(op.ui, caps)
1727 op.reply = bundle20(op.ui, caps)
1724
1728
1725 class AbortFromPart(error.Abort):
1729 class AbortFromPart(error.Abort):
1726 """Sub-class of Abort that denotes an error from a bundle2 part."""
1730 """Sub-class of Abort that denotes an error from a bundle2 part."""
1727
1731
1728 @parthandler('error:abort', ('message', 'hint'))
1732 @parthandler('error:abort', ('message', 'hint'))
1729 def handleerrorabort(op, inpart):
1733 def handleerrorabort(op, inpart):
1730 """Used to transmit abort error over the wire"""
1734 """Used to transmit abort error over the wire"""
1731 raise AbortFromPart(inpart.params['message'],
1735 raise AbortFromPart(inpart.params['message'],
1732 hint=inpart.params.get('hint'))
1736 hint=inpart.params.get('hint'))
1733
1737
1734 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1738 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1735 'in-reply-to'))
1739 'in-reply-to'))
1736 def handleerrorpushkey(op, inpart):
1740 def handleerrorpushkey(op, inpart):
1737 """Used to transmit failure of a mandatory pushkey over the wire"""
1741 """Used to transmit failure of a mandatory pushkey over the wire"""
1738 kwargs = {}
1742 kwargs = {}
1739 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1743 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1740 value = inpart.params.get(name)
1744 value = inpart.params.get(name)
1741 if value is not None:
1745 if value is not None:
1742 kwargs[name] = value
1746 kwargs[name] = value
1743 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1747 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1744
1748
1745 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1749 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1746 def handleerrorunsupportedcontent(op, inpart):
1750 def handleerrorunsupportedcontent(op, inpart):
1747 """Used to transmit unknown content error over the wire"""
1751 """Used to transmit unknown content error over the wire"""
1748 kwargs = {}
1752 kwargs = {}
1749 parttype = inpart.params.get('parttype')
1753 parttype = inpart.params.get('parttype')
1750 if parttype is not None:
1754 if parttype is not None:
1751 kwargs['parttype'] = parttype
1755 kwargs['parttype'] = parttype
1752 params = inpart.params.get('params')
1756 params = inpart.params.get('params')
1753 if params is not None:
1757 if params is not None:
1754 kwargs['params'] = params.split('\0')
1758 kwargs['params'] = params.split('\0')
1755
1759
1756 raise error.BundleUnknownFeatureError(**kwargs)
1760 raise error.BundleUnknownFeatureError(**kwargs)
1757
1761
1758 @parthandler('error:pushraced', ('message',))
1762 @parthandler('error:pushraced', ('message',))
1759 def handleerrorpushraced(op, inpart):
1763 def handleerrorpushraced(op, inpart):
1760 """Used to transmit push race error over the wire"""
1764 """Used to transmit push race error over the wire"""
1761 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1765 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1762
1766
1763 @parthandler('listkeys', ('namespace',))
1767 @parthandler('listkeys', ('namespace',))
1764 def handlelistkeys(op, inpart):
1768 def handlelistkeys(op, inpart):
1765 """retrieve pushkey namespace content stored in a bundle2"""
1769 """retrieve pushkey namespace content stored in a bundle2"""
1766 namespace = inpart.params['namespace']
1770 namespace = inpart.params['namespace']
1767 r = pushkey.decodekeys(inpart.read())
1771 r = pushkey.decodekeys(inpart.read())
1768 op.records.add('listkeys', (namespace, r))
1772 op.records.add('listkeys', (namespace, r))
1769
1773
1770 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1774 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1771 def handlepushkey(op, inpart):
1775 def handlepushkey(op, inpart):
1772 """process a pushkey request"""
1776 """process a pushkey request"""
1773 dec = pushkey.decode
1777 dec = pushkey.decode
1774 namespace = dec(inpart.params['namespace'])
1778 namespace = dec(inpart.params['namespace'])
1775 key = dec(inpart.params['key'])
1779 key = dec(inpart.params['key'])
1776 old = dec(inpart.params['old'])
1780 old = dec(inpart.params['old'])
1777 new = dec(inpart.params['new'])
1781 new = dec(inpart.params['new'])
1778 # Grab the transaction to ensure that we have the lock before performing the
1782 # Grab the transaction to ensure that we have the lock before performing the
1779 # pushkey.
1783 # pushkey.
1780 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1784 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1781 op.gettransaction()
1785 op.gettransaction()
1782 ret = op.repo.pushkey(namespace, key, old, new)
1786 ret = op.repo.pushkey(namespace, key, old, new)
1783 record = {'namespace': namespace,
1787 record = {'namespace': namespace,
1784 'key': key,
1788 'key': key,
1785 'old': old,
1789 'old': old,
1786 'new': new}
1790 'new': new}
1787 op.records.add('pushkey', record)
1791 op.records.add('pushkey', record)
1788 if op.reply is not None:
1792 if op.reply is not None:
1789 rpart = op.reply.newpart('reply:pushkey')
1793 rpart = op.reply.newpart('reply:pushkey')
1790 rpart.addparam(
1794 rpart.addparam(
1791 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1795 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1792 rpart.addparam('return', '%i' % ret, mandatory=False)
1796 rpart.addparam('return', '%i' % ret, mandatory=False)
1793 if inpart.mandatory and not ret:
1797 if inpart.mandatory and not ret:
1794 kwargs = {}
1798 kwargs = {}
1795 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1799 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1796 if key in inpart.params:
1800 if key in inpart.params:
1797 kwargs[key] = inpart.params[key]
1801 kwargs[key] = inpart.params[key]
1798 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1802 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1799
1803
1800 def _readphaseheads(inpart):
1804 def _readphaseheads(inpart):
1801 headsbyphase = [[] for i in phases.allphases]
1805 headsbyphase = [[] for i in phases.allphases]
1802 entrysize = struct.calcsize(_fphasesentry)
1806 entrysize = struct.calcsize(_fphasesentry)
1803 while True:
1807 while True:
1804 entry = inpart.read(entrysize)
1808 entry = inpart.read(entrysize)
1805 if len(entry) < entrysize:
1809 if len(entry) < entrysize:
1806 if entry:
1810 if entry:
1807 raise error.Abort(_('bad phase-heads bundle part'))
1811 raise error.Abort(_('bad phase-heads bundle part'))
1808 break
1812 break
1809 phase, node = struct.unpack(_fphasesentry, entry)
1813 phase, node = struct.unpack(_fphasesentry, entry)
1810 headsbyphase[phase].append(node)
1814 headsbyphase[phase].append(node)
1811 return headsbyphase
1815 return headsbyphase
1812
1816
1813 @parthandler('phase-heads')
1817 @parthandler('phase-heads')
1814 def handlephases(op, inpart):
1818 def handlephases(op, inpart):
1815 """apply phases from bundle part to repo"""
1819 """apply phases from bundle part to repo"""
1816 headsbyphase = _readphaseheads(inpart)
1820 headsbyphase = _readphaseheads(inpart)
1817 phases.updatephases(op.repo.unfiltered(), op.gettransaction(), headsbyphase)
1821 phases.updatephases(op.repo.unfiltered(), op.gettransaction(), headsbyphase)
1818 op.records.add('phase-heads', {})
1822 op.records.add('phase-heads', {})
1819
1823
1820 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1824 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1821 def handlepushkeyreply(op, inpart):
1825 def handlepushkeyreply(op, inpart):
1822 """retrieve the result of a pushkey request"""
1826 """retrieve the result of a pushkey request"""
1823 ret = int(inpart.params['return'])
1827 ret = int(inpart.params['return'])
1824 partid = int(inpart.params['in-reply-to'])
1828 partid = int(inpart.params['in-reply-to'])
1825 op.records.add('pushkey', {'return': ret}, partid)
1829 op.records.add('pushkey', {'return': ret}, partid)
1826
1830
1827 @parthandler('obsmarkers')
1831 @parthandler('obsmarkers')
1828 def handleobsmarker(op, inpart):
1832 def handleobsmarker(op, inpart):
1829 """add a stream of obsmarkers to the repo"""
1833 """add a stream of obsmarkers to the repo"""
1830 tr = op.gettransaction()
1834 tr = op.gettransaction()
1831 markerdata = inpart.read()
1835 markerdata = inpart.read()
1832 if op.ui.config('experimental', 'obsmarkers-exchange-debug'):
1836 if op.ui.config('experimental', 'obsmarkers-exchange-debug'):
1833 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1837 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1834 % len(markerdata))
1838 % len(markerdata))
1835 # The mergemarkers call will crash if marker creation is not enabled.
1839 # The mergemarkers call will crash if marker creation is not enabled.
1836 # we want to avoid this if the part is advisory.
1840 # we want to avoid this if the part is advisory.
1837 if not inpart.mandatory and op.repo.obsstore.readonly:
1841 if not inpart.mandatory and op.repo.obsstore.readonly:
1838 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1842 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1839 return
1843 return
1840 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1844 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1841 op.repo.invalidatevolatilesets()
1845 op.repo.invalidatevolatilesets()
1842 if new:
1846 if new:
1843 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1847 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1844 op.records.add('obsmarkers', {'new': new})
1848 op.records.add('obsmarkers', {'new': new})
1845 if op.reply is not None:
1849 if op.reply is not None:
1846 rpart = op.reply.newpart('reply:obsmarkers')
1850 rpart = op.reply.newpart('reply:obsmarkers')
1847 rpart.addparam(
1851 rpart.addparam(
1848 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1852 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1849 rpart.addparam('new', '%i' % new, mandatory=False)
1853 rpart.addparam('new', '%i' % new, mandatory=False)
1850
1854
1851
1855
1852 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1856 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1853 def handleobsmarkerreply(op, inpart):
1857 def handleobsmarkerreply(op, inpart):
1854 """retrieve the result of a pushkey request"""
1858 """retrieve the result of a pushkey request"""
1855 ret = int(inpart.params['new'])
1859 ret = int(inpart.params['new'])
1856 partid = int(inpart.params['in-reply-to'])
1860 partid = int(inpart.params['in-reply-to'])
1857 op.records.add('obsmarkers', {'new': ret}, partid)
1861 op.records.add('obsmarkers', {'new': ret}, partid)
1858
1862
1859 @parthandler('hgtagsfnodes')
1863 @parthandler('hgtagsfnodes')
1860 def handlehgtagsfnodes(op, inpart):
1864 def handlehgtagsfnodes(op, inpart):
1861 """Applies .hgtags fnodes cache entries to the local repo.
1865 """Applies .hgtags fnodes cache entries to the local repo.
1862
1866
1863 Payload is pairs of 20 byte changeset nodes and filenodes.
1867 Payload is pairs of 20 byte changeset nodes and filenodes.
1864 """
1868 """
1865 # Grab the transaction so we ensure that we have the lock at this point.
1869 # Grab the transaction so we ensure that we have the lock at this point.
1866 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1870 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1867 op.gettransaction()
1871 op.gettransaction()
1868 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1872 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1869
1873
1870 count = 0
1874 count = 0
1871 while True:
1875 while True:
1872 node = inpart.read(20)
1876 node = inpart.read(20)
1873 fnode = inpart.read(20)
1877 fnode = inpart.read(20)
1874 if len(node) < 20 or len(fnode) < 20:
1878 if len(node) < 20 or len(fnode) < 20:
1875 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1879 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1876 break
1880 break
1877 cache.setfnode(node, fnode)
1881 cache.setfnode(node, fnode)
1878 count += 1
1882 count += 1
1879
1883
1880 cache.write()
1884 cache.write()
1881 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
1885 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
1882
1886
1883 @parthandler('pushvars')
1887 @parthandler('pushvars')
1884 def bundle2getvars(op, part):
1888 def bundle2getvars(op, part):
1885 '''unbundle a bundle2 containing shellvars on the server'''
1889 '''unbundle a bundle2 containing shellvars on the server'''
1886 # An option to disable unbundling on server-side for security reasons
1890 # An option to disable unbundling on server-side for security reasons
1887 if op.ui.configbool('push', 'pushvars.server'):
1891 if op.ui.configbool('push', 'pushvars.server'):
1888 hookargs = {}
1892 hookargs = {}
1889 for key, value in part.advisoryparams:
1893 for key, value in part.advisoryparams:
1890 key = key.upper()
1894 key = key.upper()
1891 # We want pushed variables to have USERVAR_ prepended so we know
1895 # We want pushed variables to have USERVAR_ prepended so we know
1892 # they came from the --pushvar flag.
1896 # they came from the --pushvar flag.
1893 key = "USERVAR_" + key
1897 key = "USERVAR_" + key
1894 hookargs[key] = value
1898 hookargs[key] = value
1895 op.addhookargs(hookargs)
1899 op.addhookargs(hookargs)
@@ -1,558 +1,557
1 # bundlerepo.py - repository class for viewing uncompressed bundles
1 # bundlerepo.py - repository class for viewing uncompressed bundles
2 #
2 #
3 # Copyright 2006, 2007 Benoit Boissinot <bboissin@gmail.com>
3 # Copyright 2006, 2007 Benoit Boissinot <bboissin@gmail.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 """Repository class for viewing uncompressed bundles.
8 """Repository class for viewing uncompressed bundles.
9
9
10 This provides a read-only repository interface to bundles as if they
10 This provides a read-only repository interface to bundles as if they
11 were part of the actual repository.
11 were part of the actual repository.
12 """
12 """
13
13
14 from __future__ import absolute_import
14 from __future__ import absolute_import
15
15
16 import os
16 import os
17 import shutil
17 import shutil
18 import tempfile
18 import tempfile
19
19
20 from .i18n import _
20 from .i18n import _
21 from .node import nullid
21 from .node import nullid
22
22
23 from . import (
23 from . import (
24 bundle2,
24 bundle2,
25 changegroup,
25 changegroup,
26 changelog,
26 changelog,
27 cmdutil,
27 cmdutil,
28 discovery,
28 discovery,
29 error,
29 error,
30 exchange,
30 exchange,
31 filelog,
31 filelog,
32 localrepo,
32 localrepo,
33 manifest,
33 manifest,
34 mdiff,
34 mdiff,
35 node as nodemod,
35 node as nodemod,
36 pathutil,
36 pathutil,
37 phases,
37 phases,
38 pycompat,
38 pycompat,
39 revlog,
39 revlog,
40 util,
40 util,
41 vfs as vfsmod,
41 vfs as vfsmod,
42 )
42 )
43
43
44 class bundlerevlog(revlog.revlog):
44 class bundlerevlog(revlog.revlog):
45 def __init__(self, opener, indexfile, bundle, linkmapper):
45 def __init__(self, opener, indexfile, bundle, linkmapper):
46 # How it works:
46 # How it works:
47 # To retrieve a revision, we need to know the offset of the revision in
47 # To retrieve a revision, we need to know the offset of the revision in
48 # the bundle (an unbundle object). We store this offset in the index
48 # the bundle (an unbundle object). We store this offset in the index
49 # (start). The base of the delta is stored in the base field.
49 # (start). The base of the delta is stored in the base field.
50 #
50 #
51 # To differentiate a rev in the bundle from a rev in the revlog, we
51 # To differentiate a rev in the bundle from a rev in the revlog, we
52 # check revision against repotiprev.
52 # check revision against repotiprev.
53 opener = vfsmod.readonlyvfs(opener)
53 opener = vfsmod.readonlyvfs(opener)
54 revlog.revlog.__init__(self, opener, indexfile)
54 revlog.revlog.__init__(self, opener, indexfile)
55 self.bundle = bundle
55 self.bundle = bundle
56 n = len(self)
56 n = len(self)
57 self.repotiprev = n - 1
57 self.repotiprev = n - 1
58 chain = None
58 chain = None
59 self.bundlerevs = set() # used by 'bundle()' revset expression
59 self.bundlerevs = set() # used by 'bundle()' revset expression
60 getchunk = lambda: bundle.deltachunk(chain)
60 getchunk = lambda: bundle.deltachunk(chain)
61 for chunkdata in iter(getchunk, {}):
61 for chunkdata in iter(getchunk, {}):
62 node = chunkdata['node']
62 node = chunkdata['node']
63 p1 = chunkdata['p1']
63 p1 = chunkdata['p1']
64 p2 = chunkdata['p2']
64 p2 = chunkdata['p2']
65 cs = chunkdata['cs']
65 cs = chunkdata['cs']
66 deltabase = chunkdata['deltabase']
66 deltabase = chunkdata['deltabase']
67 delta = chunkdata['delta']
67 delta = chunkdata['delta']
68 flags = chunkdata['flags']
68 flags = chunkdata['flags']
69
69
70 size = len(delta)
70 size = len(delta)
71 start = bundle.tell() - size
71 start = bundle.tell() - size
72
72
73 link = linkmapper(cs)
73 link = linkmapper(cs)
74 if node in self.nodemap:
74 if node in self.nodemap:
75 # this can happen if two branches make the same change
75 # this can happen if two branches make the same change
76 chain = node
76 chain = node
77 self.bundlerevs.add(self.nodemap[node])
77 self.bundlerevs.add(self.nodemap[node])
78 continue
78 continue
79
79
80 for p in (p1, p2):
80 for p in (p1, p2):
81 if p not in self.nodemap:
81 if p not in self.nodemap:
82 raise error.LookupError(p, self.indexfile,
82 raise error.LookupError(p, self.indexfile,
83 _("unknown parent"))
83 _("unknown parent"))
84
84
85 if deltabase not in self.nodemap:
85 if deltabase not in self.nodemap:
86 raise LookupError(deltabase, self.indexfile,
86 raise LookupError(deltabase, self.indexfile,
87 _('unknown delta base'))
87 _('unknown delta base'))
88
88
89 baserev = self.rev(deltabase)
89 baserev = self.rev(deltabase)
90 # start, size, full unc. size, base (unused), link, p1, p2, node
90 # start, size, full unc. size, base (unused), link, p1, p2, node
91 e = (revlog.offset_type(start, flags), size, -1, baserev, link,
91 e = (revlog.offset_type(start, flags), size, -1, baserev, link,
92 self.rev(p1), self.rev(p2), node)
92 self.rev(p1), self.rev(p2), node)
93 self.index.insert(-1, e)
93 self.index.insert(-1, e)
94 self.nodemap[node] = n
94 self.nodemap[node] = n
95 self.bundlerevs.add(n)
95 self.bundlerevs.add(n)
96 chain = node
96 chain = node
97 n += 1
97 n += 1
98
98
99 def _chunk(self, rev):
99 def _chunk(self, rev):
100 # Warning: in case of bundle, the diff is against what we stored as
100 # Warning: in case of bundle, the diff is against what we stored as
101 # delta base, not against rev - 1
101 # delta base, not against rev - 1
102 # XXX: could use some caching
102 # XXX: could use some caching
103 if rev <= self.repotiprev:
103 if rev <= self.repotiprev:
104 return revlog.revlog._chunk(self, rev)
104 return revlog.revlog._chunk(self, rev)
105 self.bundle.seek(self.start(rev))
105 self.bundle.seek(self.start(rev))
106 return self.bundle.read(self.length(rev))
106 return self.bundle.read(self.length(rev))
107
107
108 def revdiff(self, rev1, rev2):
108 def revdiff(self, rev1, rev2):
109 """return or calculate a delta between two revisions"""
109 """return or calculate a delta between two revisions"""
110 if rev1 > self.repotiprev and rev2 > self.repotiprev:
110 if rev1 > self.repotiprev and rev2 > self.repotiprev:
111 # hot path for bundle
111 # hot path for bundle
112 revb = self.index[rev2][3]
112 revb = self.index[rev2][3]
113 if revb == rev1:
113 if revb == rev1:
114 return self._chunk(rev2)
114 return self._chunk(rev2)
115 elif rev1 <= self.repotiprev and rev2 <= self.repotiprev:
115 elif rev1 <= self.repotiprev and rev2 <= self.repotiprev:
116 return revlog.revlog.revdiff(self, rev1, rev2)
116 return revlog.revlog.revdiff(self, rev1, rev2)
117
117
118 return mdiff.textdiff(self.revision(rev1, raw=True),
118 return mdiff.textdiff(self.revision(rev1, raw=True),
119 self.revision(rev2, raw=True))
119 self.revision(rev2, raw=True))
120
120
121 def revision(self, nodeorrev, raw=False):
121 def revision(self, nodeorrev, raw=False):
122 """return an uncompressed revision of a given node or revision
122 """return an uncompressed revision of a given node or revision
123 number.
123 number.
124 """
124 """
125 if isinstance(nodeorrev, int):
125 if isinstance(nodeorrev, int):
126 rev = nodeorrev
126 rev = nodeorrev
127 node = self.node(rev)
127 node = self.node(rev)
128 else:
128 else:
129 node = nodeorrev
129 node = nodeorrev
130 rev = self.rev(node)
130 rev = self.rev(node)
131
131
132 if node == nullid:
132 if node == nullid:
133 return ""
133 return ""
134
134
135 rawtext = None
135 rawtext = None
136 chain = []
136 chain = []
137 iterrev = rev
137 iterrev = rev
138 # reconstruct the revision if it is from a changegroup
138 # reconstruct the revision if it is from a changegroup
139 while iterrev > self.repotiprev:
139 while iterrev > self.repotiprev:
140 if self._cache and self._cache[1] == iterrev:
140 if self._cache and self._cache[1] == iterrev:
141 rawtext = self._cache[2]
141 rawtext = self._cache[2]
142 break
142 break
143 chain.append(iterrev)
143 chain.append(iterrev)
144 iterrev = self.index[iterrev][3]
144 iterrev = self.index[iterrev][3]
145 if rawtext is None:
145 if rawtext is None:
146 rawtext = self.baserevision(iterrev)
146 rawtext = self.baserevision(iterrev)
147
147
148 while chain:
148 while chain:
149 delta = self._chunk(chain.pop())
149 delta = self._chunk(chain.pop())
150 rawtext = mdiff.patches(rawtext, [delta])
150 rawtext = mdiff.patches(rawtext, [delta])
151
151
152 text, validatehash = self._processflags(rawtext, self.flags(rev),
152 text, validatehash = self._processflags(rawtext, self.flags(rev),
153 'read', raw=raw)
153 'read', raw=raw)
154 if validatehash:
154 if validatehash:
155 self.checkhash(text, node, rev=rev)
155 self.checkhash(text, node, rev=rev)
156 self._cache = (node, rev, rawtext)
156 self._cache = (node, rev, rawtext)
157 return text
157 return text
158
158
159 def baserevision(self, nodeorrev):
159 def baserevision(self, nodeorrev):
160 # Revlog subclasses may override 'revision' method to modify format of
160 # Revlog subclasses may override 'revision' method to modify format of
161 # content retrieved from revlog. To use bundlerevlog with such class one
161 # content retrieved from revlog. To use bundlerevlog with such class one
162 # needs to override 'baserevision' and make more specific call here.
162 # needs to override 'baserevision' and make more specific call here.
163 return revlog.revlog.revision(self, nodeorrev, raw=True)
163 return revlog.revlog.revision(self, nodeorrev, raw=True)
164
164
165 def addrevision(self, text, transaction, link, p1=None, p2=None, d=None):
165 def addrevision(self, text, transaction, link, p1=None, p2=None, d=None):
166 raise NotImplementedError
166 raise NotImplementedError
167 def addgroup(self, revs, linkmapper, transaction):
167 def addgroup(self, revs, linkmapper, transaction):
168 raise NotImplementedError
168 raise NotImplementedError
169 def strip(self, rev, minlink):
169 def strip(self, rev, minlink):
170 raise NotImplementedError
170 raise NotImplementedError
171 def checksize(self):
171 def checksize(self):
172 raise NotImplementedError
172 raise NotImplementedError
173
173
174 class bundlechangelog(bundlerevlog, changelog.changelog):
174 class bundlechangelog(bundlerevlog, changelog.changelog):
175 def __init__(self, opener, bundle):
175 def __init__(self, opener, bundle):
176 changelog.changelog.__init__(self, opener)
176 changelog.changelog.__init__(self, opener)
177 linkmapper = lambda x: x
177 linkmapper = lambda x: x
178 bundlerevlog.__init__(self, opener, self.indexfile, bundle,
178 bundlerevlog.__init__(self, opener, self.indexfile, bundle,
179 linkmapper)
179 linkmapper)
180
180
181 def baserevision(self, nodeorrev):
181 def baserevision(self, nodeorrev):
182 # Although changelog doesn't override 'revision' method, some extensions
182 # Although changelog doesn't override 'revision' method, some extensions
183 # may replace this class with another that does. Same story with
183 # may replace this class with another that does. Same story with
184 # manifest and filelog classes.
184 # manifest and filelog classes.
185
185
186 # This bypasses filtering on changelog.node() and rev() because we need
186 # This bypasses filtering on changelog.node() and rev() because we need
187 # revision text of the bundle base even if it is hidden.
187 # revision text of the bundle base even if it is hidden.
188 oldfilter = self.filteredrevs
188 oldfilter = self.filteredrevs
189 try:
189 try:
190 self.filteredrevs = ()
190 self.filteredrevs = ()
191 return changelog.changelog.revision(self, nodeorrev, raw=True)
191 return changelog.changelog.revision(self, nodeorrev, raw=True)
192 finally:
192 finally:
193 self.filteredrevs = oldfilter
193 self.filteredrevs = oldfilter
194
194
195 class bundlemanifest(bundlerevlog, manifest.manifestrevlog):
195 class bundlemanifest(bundlerevlog, manifest.manifestrevlog):
196 def __init__(self, opener, bundle, linkmapper, dirlogstarts=None, dir=''):
196 def __init__(self, opener, bundle, linkmapper, dirlogstarts=None, dir=''):
197 manifest.manifestrevlog.__init__(self, opener, dir=dir)
197 manifest.manifestrevlog.__init__(self, opener, dir=dir)
198 bundlerevlog.__init__(self, opener, self.indexfile, bundle,
198 bundlerevlog.__init__(self, opener, self.indexfile, bundle,
199 linkmapper)
199 linkmapper)
200 if dirlogstarts is None:
200 if dirlogstarts is None:
201 dirlogstarts = {}
201 dirlogstarts = {}
202 if self.bundle.version == "03":
202 if self.bundle.version == "03":
203 dirlogstarts = _getfilestarts(self.bundle)
203 dirlogstarts = _getfilestarts(self.bundle)
204 self._dirlogstarts = dirlogstarts
204 self._dirlogstarts = dirlogstarts
205 self._linkmapper = linkmapper
205 self._linkmapper = linkmapper
206
206
207 def baserevision(self, nodeorrev):
207 def baserevision(self, nodeorrev):
208 node = nodeorrev
208 node = nodeorrev
209 if isinstance(node, int):
209 if isinstance(node, int):
210 node = self.node(node)
210 node = self.node(node)
211
211
212 if node in self.fulltextcache:
212 if node in self.fulltextcache:
213 result = '%s' % self.fulltextcache[node]
213 result = '%s' % self.fulltextcache[node]
214 else:
214 else:
215 result = manifest.manifestrevlog.revision(self, nodeorrev, raw=True)
215 result = manifest.manifestrevlog.revision(self, nodeorrev, raw=True)
216 return result
216 return result
217
217
218 def dirlog(self, d):
218 def dirlog(self, d):
219 if d in self._dirlogstarts:
219 if d in self._dirlogstarts:
220 self.bundle.seek(self._dirlogstarts[d])
220 self.bundle.seek(self._dirlogstarts[d])
221 return bundlemanifest(
221 return bundlemanifest(
222 self.opener, self.bundle, self._linkmapper,
222 self.opener, self.bundle, self._linkmapper,
223 self._dirlogstarts, dir=d)
223 self._dirlogstarts, dir=d)
224 return super(bundlemanifest, self).dirlog(d)
224 return super(bundlemanifest, self).dirlog(d)
225
225
226 class bundlefilelog(bundlerevlog, filelog.filelog):
226 class bundlefilelog(bundlerevlog, filelog.filelog):
227 def __init__(self, opener, path, bundle, linkmapper):
227 def __init__(self, opener, path, bundle, linkmapper):
228 filelog.filelog.__init__(self, opener, path)
228 filelog.filelog.__init__(self, opener, path)
229 bundlerevlog.__init__(self, opener, self.indexfile, bundle,
229 bundlerevlog.__init__(self, opener, self.indexfile, bundle,
230 linkmapper)
230 linkmapper)
231
231
232 def baserevision(self, nodeorrev):
232 def baserevision(self, nodeorrev):
233 return filelog.filelog.revision(self, nodeorrev, raw=True)
233 return filelog.filelog.revision(self, nodeorrev, raw=True)
234
234
235 class bundlepeer(localrepo.localpeer):
235 class bundlepeer(localrepo.localpeer):
236 def canpush(self):
236 def canpush(self):
237 return False
237 return False
238
238
239 class bundlephasecache(phases.phasecache):
239 class bundlephasecache(phases.phasecache):
240 def __init__(self, *args, **kwargs):
240 def __init__(self, *args, **kwargs):
241 super(bundlephasecache, self).__init__(*args, **kwargs)
241 super(bundlephasecache, self).__init__(*args, **kwargs)
242 if util.safehasattr(self, 'opener'):
242 if util.safehasattr(self, 'opener'):
243 self.opener = vfsmod.readonlyvfs(self.opener)
243 self.opener = vfsmod.readonlyvfs(self.opener)
244
244
245 def write(self):
245 def write(self):
246 raise NotImplementedError
246 raise NotImplementedError
247
247
248 def _write(self, fp):
248 def _write(self, fp):
249 raise NotImplementedError
249 raise NotImplementedError
250
250
251 def _updateroots(self, phase, newroots, tr):
251 def _updateroots(self, phase, newroots, tr):
252 self.phaseroots[phase] = newroots
252 self.phaseroots[phase] = newroots
253 self.invalidate()
253 self.invalidate()
254 self.dirty = True
254 self.dirty = True
255
255
256 def _getfilestarts(bundle):
256 def _getfilestarts(bundle):
257 bundlefilespos = {}
257 bundlefilespos = {}
258 for chunkdata in iter(bundle.filelogheader, {}):
258 for chunkdata in iter(bundle.filelogheader, {}):
259 fname = chunkdata['filename']
259 fname = chunkdata['filename']
260 bundlefilespos[fname] = bundle.tell()
260 bundlefilespos[fname] = bundle.tell()
261 for chunk in iter(lambda: bundle.deltachunk(None), {}):
261 for chunk in iter(lambda: bundle.deltachunk(None), {}):
262 pass
262 pass
263 return bundlefilespos
263 return bundlefilespos
264
264
265 class bundlerepository(localrepo.localrepository):
265 class bundlerepository(localrepo.localrepository):
266 def __init__(self, ui, path, bundlename):
266 def __init__(self, ui, path, bundlename):
267 self._tempparent = None
267 self._tempparent = None
268 try:
268 try:
269 localrepo.localrepository.__init__(self, ui, path)
269 localrepo.localrepository.__init__(self, ui, path)
270 except error.RepoError:
270 except error.RepoError:
271 self._tempparent = tempfile.mkdtemp()
271 self._tempparent = tempfile.mkdtemp()
272 localrepo.instance(ui, self._tempparent, 1)
272 localrepo.instance(ui, self._tempparent, 1)
273 localrepo.localrepository.__init__(self, ui, self._tempparent)
273 localrepo.localrepository.__init__(self, ui, self._tempparent)
274 self.ui.setconfig('phases', 'publish', False, 'bundlerepo')
274 self.ui.setconfig('phases', 'publish', False, 'bundlerepo')
275
275
276 if path:
276 if path:
277 self._url = 'bundle:' + util.expandpath(path) + '+' + bundlename
277 self._url = 'bundle:' + util.expandpath(path) + '+' + bundlename
278 else:
278 else:
279 self._url = 'bundle:' + bundlename
279 self._url = 'bundle:' + bundlename
280
280
281 self.tempfile = None
281 self.tempfile = None
282 f = util.posixfile(bundlename, "rb")
282 f = util.posixfile(bundlename, "rb")
283 self.bundlefile = self.bundle = exchange.readbundle(ui, f, bundlename)
283 self.bundlefile = self.bundle = exchange.readbundle(ui, f, bundlename)
284
284
285 if isinstance(self.bundle, bundle2.unbundle20):
285 if isinstance(self.bundle, bundle2.unbundle20):
286 cgstream = None
286 cgstream = None
287 for part in self.bundle.iterparts():
287 for part in self.bundle.iterparts():
288 if part.type == 'changegroup':
288 if part.type == 'changegroup':
289 if cgstream is not None:
289 if cgstream is not None:
290 raise NotImplementedError("can't process "
290 raise NotImplementedError("can't process "
291 "multiple changegroups")
291 "multiple changegroups")
292 cgstream = part
292 cgstream = part
293 version = part.params.get('version', '01')
293 version = part.params.get('version', '01')
294 legalcgvers = changegroup.supportedincomingversions(self)
294 legalcgvers = changegroup.supportedincomingversions(self)
295 if version not in legalcgvers:
295 if version not in legalcgvers:
296 msg = _('Unsupported changegroup version: %s')
296 msg = _('Unsupported changegroup version: %s')
297 raise error.Abort(msg % version)
297 raise error.Abort(msg % version)
298 if self.bundle.compressed():
298 if self.bundle.compressed():
299 cgstream = self._writetempbundle(part.read,
299 cgstream = self._writetempbundle(part.read,
300 ".cg%sun" % version)
300 ".cg%sun" % version)
301
301
302 if cgstream is None:
302 if cgstream is None:
303 raise error.Abort(_('No changegroups found'))
303 raise error.Abort(_('No changegroups found'))
304 cgstream.seek(0)
305
304
306 self.bundle = changegroup.getunbundler(version, cgstream, 'UN')
305 self.bundle = changegroup.getunbundler(version, cgstream, 'UN')
307
306
308 elif self.bundle.compressed():
307 elif self.bundle.compressed():
309 f = self._writetempbundle(self.bundle.read, '.hg10un',
308 f = self._writetempbundle(self.bundle.read, '.hg10un',
310 header='HG10UN')
309 header='HG10UN')
311 self.bundlefile = self.bundle = exchange.readbundle(ui, f,
310 self.bundlefile = self.bundle = exchange.readbundle(ui, f,
312 bundlename,
311 bundlename,
313 self.vfs)
312 self.vfs)
314
313
315 # dict with the mapping 'filename' -> position in the bundle
314 # dict with the mapping 'filename' -> position in the bundle
316 self.bundlefilespos = {}
315 self.bundlefilespos = {}
317
316
318 self.firstnewrev = self.changelog.repotiprev + 1
317 self.firstnewrev = self.changelog.repotiprev + 1
319 phases.retractboundary(self, None, phases.draft,
318 phases.retractboundary(self, None, phases.draft,
320 [ctx.node() for ctx in self[self.firstnewrev:]])
319 [ctx.node() for ctx in self[self.firstnewrev:]])
321
320
322 def _writetempbundle(self, readfn, suffix, header=''):
321 def _writetempbundle(self, readfn, suffix, header=''):
323 """Write a temporary file to disk
322 """Write a temporary file to disk
324 """
323 """
325 fdtemp, temp = self.vfs.mkstemp(prefix="hg-bundle-",
324 fdtemp, temp = self.vfs.mkstemp(prefix="hg-bundle-",
326 suffix=".hg10un")
325 suffix=".hg10un")
327 self.tempfile = temp
326 self.tempfile = temp
328
327
329 with os.fdopen(fdtemp, pycompat.sysstr('wb')) as fptemp:
328 with os.fdopen(fdtemp, pycompat.sysstr('wb')) as fptemp:
330 fptemp.write(header)
329 fptemp.write(header)
331 while True:
330 while True:
332 chunk = readfn(2**18)
331 chunk = readfn(2**18)
333 if not chunk:
332 if not chunk:
334 break
333 break
335 fptemp.write(chunk)
334 fptemp.write(chunk)
336
335
337 return self.vfs.open(self.tempfile, mode="rb")
336 return self.vfs.open(self.tempfile, mode="rb")
338
337
339 @localrepo.unfilteredpropertycache
338 @localrepo.unfilteredpropertycache
340 def _phasecache(self):
339 def _phasecache(self):
341 return bundlephasecache(self, self._phasedefaults)
340 return bundlephasecache(self, self._phasedefaults)
342
341
343 @localrepo.unfilteredpropertycache
342 @localrepo.unfilteredpropertycache
344 def changelog(self):
343 def changelog(self):
345 # consume the header if it exists
344 # consume the header if it exists
346 self.bundle.changelogheader()
345 self.bundle.changelogheader()
347 c = bundlechangelog(self.svfs, self.bundle)
346 c = bundlechangelog(self.svfs, self.bundle)
348 self.manstart = self.bundle.tell()
347 self.manstart = self.bundle.tell()
349 return c
348 return c
350
349
351 def _constructmanifest(self):
350 def _constructmanifest(self):
352 self.bundle.seek(self.manstart)
351 self.bundle.seek(self.manstart)
353 # consume the header if it exists
352 # consume the header if it exists
354 self.bundle.manifestheader()
353 self.bundle.manifestheader()
355 linkmapper = self.unfiltered().changelog.rev
354 linkmapper = self.unfiltered().changelog.rev
356 m = bundlemanifest(self.svfs, self.bundle, linkmapper)
355 m = bundlemanifest(self.svfs, self.bundle, linkmapper)
357 self.filestart = self.bundle.tell()
356 self.filestart = self.bundle.tell()
358 return m
357 return m
359
358
360 @localrepo.unfilteredpropertycache
359 @localrepo.unfilteredpropertycache
361 def manstart(self):
360 def manstart(self):
362 self.changelog
361 self.changelog
363 return self.manstart
362 return self.manstart
364
363
365 @localrepo.unfilteredpropertycache
364 @localrepo.unfilteredpropertycache
366 def filestart(self):
365 def filestart(self):
367 self.manifestlog
366 self.manifestlog
368 return self.filestart
367 return self.filestart
369
368
370 def url(self):
369 def url(self):
371 return self._url
370 return self._url
372
371
373 def file(self, f):
372 def file(self, f):
374 if not self.bundlefilespos:
373 if not self.bundlefilespos:
375 self.bundle.seek(self.filestart)
374 self.bundle.seek(self.filestart)
376 self.bundlefilespos = _getfilestarts(self.bundle)
375 self.bundlefilespos = _getfilestarts(self.bundle)
377
376
378 if f in self.bundlefilespos:
377 if f in self.bundlefilespos:
379 self.bundle.seek(self.bundlefilespos[f])
378 self.bundle.seek(self.bundlefilespos[f])
380 linkmapper = self.unfiltered().changelog.rev
379 linkmapper = self.unfiltered().changelog.rev
381 return bundlefilelog(self.svfs, f, self.bundle, linkmapper)
380 return bundlefilelog(self.svfs, f, self.bundle, linkmapper)
382 else:
381 else:
383 return filelog.filelog(self.svfs, f)
382 return filelog.filelog(self.svfs, f)
384
383
385 def close(self):
384 def close(self):
386 """Close assigned bundle file immediately."""
385 """Close assigned bundle file immediately."""
387 self.bundlefile.close()
386 self.bundlefile.close()
388 if self.tempfile is not None:
387 if self.tempfile is not None:
389 self.vfs.unlink(self.tempfile)
388 self.vfs.unlink(self.tempfile)
390 if self._tempparent:
389 if self._tempparent:
391 shutil.rmtree(self._tempparent, True)
390 shutil.rmtree(self._tempparent, True)
392
391
393 def cancopy(self):
392 def cancopy(self):
394 return False
393 return False
395
394
396 def peer(self):
395 def peer(self):
397 return bundlepeer(self)
396 return bundlepeer(self)
398
397
399 def getcwd(self):
398 def getcwd(self):
400 return pycompat.getcwd() # always outside the repo
399 return pycompat.getcwd() # always outside the repo
401
400
402 # Check if parents exist in localrepo before setting
401 # Check if parents exist in localrepo before setting
403 def setparents(self, p1, p2=nullid):
402 def setparents(self, p1, p2=nullid):
404 p1rev = self.changelog.rev(p1)
403 p1rev = self.changelog.rev(p1)
405 p2rev = self.changelog.rev(p2)
404 p2rev = self.changelog.rev(p2)
406 msg = _("setting parent to node %s that only exists in the bundle\n")
405 msg = _("setting parent to node %s that only exists in the bundle\n")
407 if self.changelog.repotiprev < p1rev:
406 if self.changelog.repotiprev < p1rev:
408 self.ui.warn(msg % nodemod.hex(p1))
407 self.ui.warn(msg % nodemod.hex(p1))
409 if self.changelog.repotiprev < p2rev:
408 if self.changelog.repotiprev < p2rev:
410 self.ui.warn(msg % nodemod.hex(p2))
409 self.ui.warn(msg % nodemod.hex(p2))
411 return super(bundlerepository, self).setparents(p1, p2)
410 return super(bundlerepository, self).setparents(p1, p2)
412
411
413 def instance(ui, path, create):
412 def instance(ui, path, create):
414 if create:
413 if create:
415 raise error.Abort(_('cannot create new bundle repository'))
414 raise error.Abort(_('cannot create new bundle repository'))
416 # internal config: bundle.mainreporoot
415 # internal config: bundle.mainreporoot
417 parentpath = ui.config("bundle", "mainreporoot")
416 parentpath = ui.config("bundle", "mainreporoot")
418 if not parentpath:
417 if not parentpath:
419 # try to find the correct path to the working directory repo
418 # try to find the correct path to the working directory repo
420 parentpath = cmdutil.findrepo(pycompat.getcwd())
419 parentpath = cmdutil.findrepo(pycompat.getcwd())
421 if parentpath is None:
420 if parentpath is None:
422 parentpath = ''
421 parentpath = ''
423 if parentpath:
422 if parentpath:
424 # Try to make the full path relative so we get a nice, short URL.
423 # Try to make the full path relative so we get a nice, short URL.
425 # In particular, we don't want temp dir names in test outputs.
424 # In particular, we don't want temp dir names in test outputs.
426 cwd = pycompat.getcwd()
425 cwd = pycompat.getcwd()
427 if parentpath == cwd:
426 if parentpath == cwd:
428 parentpath = ''
427 parentpath = ''
429 else:
428 else:
430 cwd = pathutil.normasprefix(cwd)
429 cwd = pathutil.normasprefix(cwd)
431 if parentpath.startswith(cwd):
430 if parentpath.startswith(cwd):
432 parentpath = parentpath[len(cwd):]
431 parentpath = parentpath[len(cwd):]
433 u = util.url(path)
432 u = util.url(path)
434 path = u.localpath()
433 path = u.localpath()
435 if u.scheme == 'bundle':
434 if u.scheme == 'bundle':
436 s = path.split("+", 1)
435 s = path.split("+", 1)
437 if len(s) == 1:
436 if len(s) == 1:
438 repopath, bundlename = parentpath, s[0]
437 repopath, bundlename = parentpath, s[0]
439 else:
438 else:
440 repopath, bundlename = s
439 repopath, bundlename = s
441 else:
440 else:
442 repopath, bundlename = parentpath, path
441 repopath, bundlename = parentpath, path
443 return bundlerepository(ui, repopath, bundlename)
442 return bundlerepository(ui, repopath, bundlename)
444
443
445 class bundletransactionmanager(object):
444 class bundletransactionmanager(object):
446 def transaction(self):
445 def transaction(self):
447 return None
446 return None
448
447
449 def close(self):
448 def close(self):
450 raise NotImplementedError
449 raise NotImplementedError
451
450
452 def release(self):
451 def release(self):
453 raise NotImplementedError
452 raise NotImplementedError
454
453
455 def getremotechanges(ui, repo, other, onlyheads=None, bundlename=None,
454 def getremotechanges(ui, repo, other, onlyheads=None, bundlename=None,
456 force=False):
455 force=False):
457 '''obtains a bundle of changes incoming from other
456 '''obtains a bundle of changes incoming from other
458
457
459 "onlyheads" restricts the returned changes to those reachable from the
458 "onlyheads" restricts the returned changes to those reachable from the
460 specified heads.
459 specified heads.
461 "bundlename", if given, stores the bundle to this file path permanently;
460 "bundlename", if given, stores the bundle to this file path permanently;
462 otherwise it's stored to a temp file and gets deleted again when you call
461 otherwise it's stored to a temp file and gets deleted again when you call
463 the returned "cleanupfn".
462 the returned "cleanupfn".
464 "force" indicates whether to proceed on unrelated repos.
463 "force" indicates whether to proceed on unrelated repos.
465
464
466 Returns a tuple (local, csets, cleanupfn):
465 Returns a tuple (local, csets, cleanupfn):
467
466
468 "local" is a local repo from which to obtain the actual incoming
467 "local" is a local repo from which to obtain the actual incoming
469 changesets; it is a bundlerepo for the obtained bundle when the
468 changesets; it is a bundlerepo for the obtained bundle when the
470 original "other" is remote.
469 original "other" is remote.
471 "csets" lists the incoming changeset node ids.
470 "csets" lists the incoming changeset node ids.
472 "cleanupfn" must be called without arguments when you're done processing
471 "cleanupfn" must be called without arguments when you're done processing
473 the changes; it closes both the original "other" and the one returned
472 the changes; it closes both the original "other" and the one returned
474 here.
473 here.
475 '''
474 '''
476 tmp = discovery.findcommonincoming(repo, other, heads=onlyheads,
475 tmp = discovery.findcommonincoming(repo, other, heads=onlyheads,
477 force=force)
476 force=force)
478 common, incoming, rheads = tmp
477 common, incoming, rheads = tmp
479 if not incoming:
478 if not incoming:
480 try:
479 try:
481 if bundlename:
480 if bundlename:
482 os.unlink(bundlename)
481 os.unlink(bundlename)
483 except OSError:
482 except OSError:
484 pass
483 pass
485 return repo, [], other.close
484 return repo, [], other.close
486
485
487 commonset = set(common)
486 commonset = set(common)
488 rheads = [x for x in rheads if x not in commonset]
487 rheads = [x for x in rheads if x not in commonset]
489
488
490 bundle = None
489 bundle = None
491 bundlerepo = None
490 bundlerepo = None
492 localrepo = other.local()
491 localrepo = other.local()
493 if bundlename or not localrepo:
492 if bundlename or not localrepo:
494 # create a bundle (uncompressed if other repo is not local)
493 # create a bundle (uncompressed if other repo is not local)
495
494
496 # developer config: devel.legacy.exchange
495 # developer config: devel.legacy.exchange
497 legexc = ui.configlist('devel', 'legacy.exchange')
496 legexc = ui.configlist('devel', 'legacy.exchange')
498 forcebundle1 = 'bundle2' not in legexc and 'bundle1' in legexc
497 forcebundle1 = 'bundle2' not in legexc and 'bundle1' in legexc
499 canbundle2 = (not forcebundle1
498 canbundle2 = (not forcebundle1
500 and other.capable('getbundle')
499 and other.capable('getbundle')
501 and other.capable('bundle2'))
500 and other.capable('bundle2'))
502 if canbundle2:
501 if canbundle2:
503 kwargs = {}
502 kwargs = {}
504 kwargs['common'] = common
503 kwargs['common'] = common
505 kwargs['heads'] = rheads
504 kwargs['heads'] = rheads
506 kwargs['bundlecaps'] = exchange.caps20to10(repo)
505 kwargs['bundlecaps'] = exchange.caps20to10(repo)
507 kwargs['cg'] = True
506 kwargs['cg'] = True
508 b2 = other.getbundle('incoming', **kwargs)
507 b2 = other.getbundle('incoming', **kwargs)
509 fname = bundle = changegroup.writechunks(ui, b2._forwardchunks(),
508 fname = bundle = changegroup.writechunks(ui, b2._forwardchunks(),
510 bundlename)
509 bundlename)
511 else:
510 else:
512 if other.capable('getbundle'):
511 if other.capable('getbundle'):
513 cg = other.getbundle('incoming', common=common, heads=rheads)
512 cg = other.getbundle('incoming', common=common, heads=rheads)
514 elif onlyheads is None and not other.capable('changegroupsubset'):
513 elif onlyheads is None and not other.capable('changegroupsubset'):
515 # compat with older servers when pulling all remote heads
514 # compat with older servers when pulling all remote heads
516 cg = other.changegroup(incoming, "incoming")
515 cg = other.changegroup(incoming, "incoming")
517 rheads = None
516 rheads = None
518 else:
517 else:
519 cg = other.changegroupsubset(incoming, rheads, 'incoming')
518 cg = other.changegroupsubset(incoming, rheads, 'incoming')
520 if localrepo:
519 if localrepo:
521 bundletype = "HG10BZ"
520 bundletype = "HG10BZ"
522 else:
521 else:
523 bundletype = "HG10UN"
522 bundletype = "HG10UN"
524 fname = bundle = bundle2.writebundle(ui, cg, bundlename,
523 fname = bundle = bundle2.writebundle(ui, cg, bundlename,
525 bundletype)
524 bundletype)
526 # keep written bundle?
525 # keep written bundle?
527 if bundlename:
526 if bundlename:
528 bundle = None
527 bundle = None
529 if not localrepo:
528 if not localrepo:
530 # use the created uncompressed bundlerepo
529 # use the created uncompressed bundlerepo
531 localrepo = bundlerepo = bundlerepository(repo.baseui, repo.root,
530 localrepo = bundlerepo = bundlerepository(repo.baseui, repo.root,
532 fname)
531 fname)
533 # this repo contains local and other now, so filter out local again
532 # this repo contains local and other now, so filter out local again
534 common = repo.heads()
533 common = repo.heads()
535 if localrepo:
534 if localrepo:
536 # Part of common may be remotely filtered
535 # Part of common may be remotely filtered
537 # So use an unfiltered version
536 # So use an unfiltered version
538 # The discovery process probably need cleanup to avoid that
537 # The discovery process probably need cleanup to avoid that
539 localrepo = localrepo.unfiltered()
538 localrepo = localrepo.unfiltered()
540
539
541 csets = localrepo.changelog.findmissing(common, rheads)
540 csets = localrepo.changelog.findmissing(common, rheads)
542
541
543 if bundlerepo:
542 if bundlerepo:
544 reponodes = [ctx.node() for ctx in bundlerepo[bundlerepo.firstnewrev:]]
543 reponodes = [ctx.node() for ctx in bundlerepo[bundlerepo.firstnewrev:]]
545 remotephases = other.listkeys('phases')
544 remotephases = other.listkeys('phases')
546
545
547 pullop = exchange.pulloperation(bundlerepo, other, heads=reponodes)
546 pullop = exchange.pulloperation(bundlerepo, other, heads=reponodes)
548 pullop.trmanager = bundletransactionmanager()
547 pullop.trmanager = bundletransactionmanager()
549 exchange._pullapplyphases(pullop, remotephases)
548 exchange._pullapplyphases(pullop, remotephases)
550
549
551 def cleanup():
550 def cleanup():
552 if bundlerepo:
551 if bundlerepo:
553 bundlerepo.close()
552 bundlerepo.close()
554 if bundle:
553 if bundle:
555 os.unlink(bundle)
554 os.unlink(bundle)
556 other.close()
555 other.close()
557
556
558 return (localrepo, csets, cleanup)
557 return (localrepo, csets, cleanup)
General Comments 0
You need to be logged in to leave comments. Login now