##// END OF EJS Templates
bundle2: convert ints to strings using pycompat.bytestring()...
Augie Fackler -
r33677:373ca510 default
parent child Browse files
Show More
@@ -1,1895 +1,1895
1 # bundle2.py - generic container format to transmit arbitrary data.
1 # bundle2.py - generic container format to transmit arbitrary data.
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7 """Handling of the new bundle2 format
7 """Handling of the new bundle2 format
8
8
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 payloads in an application agnostic way. It consist in a sequence of "parts"
10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 that will be handed to and processed by the application layer.
11 that will be handed to and processed by the application layer.
12
12
13
13
14 General format architecture
14 General format architecture
15 ===========================
15 ===========================
16
16
17 The format is architectured as follow
17 The format is architectured as follow
18
18
19 - magic string
19 - magic string
20 - stream level parameters
20 - stream level parameters
21 - payload parts (any number)
21 - payload parts (any number)
22 - end of stream marker.
22 - end of stream marker.
23
23
24 the Binary format
24 the Binary format
25 ============================
25 ============================
26
26
27 All numbers are unsigned and big-endian.
27 All numbers are unsigned and big-endian.
28
28
29 stream level parameters
29 stream level parameters
30 ------------------------
30 ------------------------
31
31
32 Binary format is as follow
32 Binary format is as follow
33
33
34 :params size: int32
34 :params size: int32
35
35
36 The total number of Bytes used by the parameters
36 The total number of Bytes used by the parameters
37
37
38 :params value: arbitrary number of Bytes
38 :params value: arbitrary number of Bytes
39
39
40 A blob of `params size` containing the serialized version of all stream level
40 A blob of `params size` containing the serialized version of all stream level
41 parameters.
41 parameters.
42
42
43 The blob contains a space separated list of parameters. Parameters with value
43 The blob contains a space separated list of parameters. Parameters with value
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45
45
46 Empty name are obviously forbidden.
46 Empty name are obviously forbidden.
47
47
48 Name MUST start with a letter. If this first letter is lower case, the
48 Name MUST start with a letter. If this first letter is lower case, the
49 parameter is advisory and can be safely ignored. However when the first
49 parameter is advisory and can be safely ignored. However when the first
50 letter is capital, the parameter is mandatory and the bundling process MUST
50 letter is capital, the parameter is mandatory and the bundling process MUST
51 stop if he is not able to proceed it.
51 stop if he is not able to proceed it.
52
52
53 Stream parameters use a simple textual format for two main reasons:
53 Stream parameters use a simple textual format for two main reasons:
54
54
55 - Stream level parameters should remain simple and we want to discourage any
55 - Stream level parameters should remain simple and we want to discourage any
56 crazy usage.
56 crazy usage.
57 - Textual data allow easy human inspection of a bundle2 header in case of
57 - Textual data allow easy human inspection of a bundle2 header in case of
58 troubles.
58 troubles.
59
59
60 Any Applicative level options MUST go into a bundle2 part instead.
60 Any Applicative level options MUST go into a bundle2 part instead.
61
61
62 Payload part
62 Payload part
63 ------------------------
63 ------------------------
64
64
65 Binary format is as follow
65 Binary format is as follow
66
66
67 :header size: int32
67 :header size: int32
68
68
69 The total number of Bytes used by the part header. When the header is empty
69 The total number of Bytes used by the part header. When the header is empty
70 (size = 0) this is interpreted as the end of stream marker.
70 (size = 0) this is interpreted as the end of stream marker.
71
71
72 :header:
72 :header:
73
73
74 The header defines how to interpret the part. It contains two piece of
74 The header defines how to interpret the part. It contains two piece of
75 data: the part type, and the part parameters.
75 data: the part type, and the part parameters.
76
76
77 The part type is used to route an application level handler, that can
77 The part type is used to route an application level handler, that can
78 interpret payload.
78 interpret payload.
79
79
80 Part parameters are passed to the application level handler. They are
80 Part parameters are passed to the application level handler. They are
81 meant to convey information that will help the application level object to
81 meant to convey information that will help the application level object to
82 interpret the part payload.
82 interpret the part payload.
83
83
84 The binary format of the header is has follow
84 The binary format of the header is has follow
85
85
86 :typesize: (one byte)
86 :typesize: (one byte)
87
87
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89
89
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 to this part.
91 to this part.
92
92
93 :parameters:
93 :parameters:
94
94
95 Part's parameter may have arbitrary content, the binary structure is::
95 Part's parameter may have arbitrary content, the binary structure is::
96
96
97 <mandatory-count><advisory-count><param-sizes><param-data>
97 <mandatory-count><advisory-count><param-sizes><param-data>
98
98
99 :mandatory-count: 1 byte, number of mandatory parameters
99 :mandatory-count: 1 byte, number of mandatory parameters
100
100
101 :advisory-count: 1 byte, number of advisory parameters
101 :advisory-count: 1 byte, number of advisory parameters
102
102
103 :param-sizes:
103 :param-sizes:
104
104
105 N couple of bytes, where N is the total number of parameters. Each
105 N couple of bytes, where N is the total number of parameters. Each
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107
107
108 :param-data:
108 :param-data:
109
109
110 A blob of bytes from which each parameter key and value can be
110 A blob of bytes from which each parameter key and value can be
111 retrieved using the list of size couples stored in the previous
111 retrieved using the list of size couples stored in the previous
112 field.
112 field.
113
113
114 Mandatory parameters comes first, then the advisory ones.
114 Mandatory parameters comes first, then the advisory ones.
115
115
116 Each parameter's key MUST be unique within the part.
116 Each parameter's key MUST be unique within the part.
117
117
118 :payload:
118 :payload:
119
119
120 payload is a series of `<chunksize><chunkdata>`.
120 payload is a series of `<chunksize><chunkdata>`.
121
121
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124
124
125 The current implementation always produces either zero or one chunk.
125 The current implementation always produces either zero or one chunk.
126 This is an implementation limitation that will ultimately be lifted.
126 This is an implementation limitation that will ultimately be lifted.
127
127
128 `chunksize` can be negative to trigger special case processing. No such
128 `chunksize` can be negative to trigger special case processing. No such
129 processing is in place yet.
129 processing is in place yet.
130
130
131 Bundle processing
131 Bundle processing
132 ============================
132 ============================
133
133
134 Each part is processed in order using a "part handler". Handler are registered
134 Each part is processed in order using a "part handler". Handler are registered
135 for a certain part type.
135 for a certain part type.
136
136
137 The matching of a part to its handler is case insensitive. The case of the
137 The matching of a part to its handler is case insensitive. The case of the
138 part type is used to know if a part is mandatory or advisory. If the Part type
138 part type is used to know if a part is mandatory or advisory. If the Part type
139 contains any uppercase char it is considered mandatory. When no handler is
139 contains any uppercase char it is considered mandatory. When no handler is
140 known for a Mandatory part, the process is aborted and an exception is raised.
140 known for a Mandatory part, the process is aborted and an exception is raised.
141 If the part is advisory and no handler is known, the part is ignored. When the
141 If the part is advisory and no handler is known, the part is ignored. When the
142 process is aborted, the full bundle is still read from the stream to keep the
142 process is aborted, the full bundle is still read from the stream to keep the
143 channel usable. But none of the part read from an abort are processed. In the
143 channel usable. But none of the part read from an abort are processed. In the
144 future, dropping the stream may become an option for channel we do not care to
144 future, dropping the stream may become an option for channel we do not care to
145 preserve.
145 preserve.
146 """
146 """
147
147
148 from __future__ import absolute_import, division
148 from __future__ import absolute_import, division
149
149
150 import errno
150 import errno
151 import re
151 import re
152 import string
152 import string
153 import struct
153 import struct
154 import sys
154 import sys
155
155
156 from .i18n import _
156 from .i18n import _
157 from . import (
157 from . import (
158 changegroup,
158 changegroup,
159 error,
159 error,
160 obsolete,
160 obsolete,
161 phases,
161 phases,
162 pushkey,
162 pushkey,
163 pycompat,
163 pycompat,
164 tags,
164 tags,
165 url,
165 url,
166 util,
166 util,
167 )
167 )
168
168
169 urlerr = util.urlerr
169 urlerr = util.urlerr
170 urlreq = util.urlreq
170 urlreq = util.urlreq
171
171
172 _pack = struct.pack
172 _pack = struct.pack
173 _unpack = struct.unpack
173 _unpack = struct.unpack
174
174
175 _fstreamparamsize = '>i'
175 _fstreamparamsize = '>i'
176 _fpartheadersize = '>i'
176 _fpartheadersize = '>i'
177 _fparttypesize = '>B'
177 _fparttypesize = '>B'
178 _fpartid = '>I'
178 _fpartid = '>I'
179 _fpayloadsize = '>i'
179 _fpayloadsize = '>i'
180 _fpartparamcount = '>BB'
180 _fpartparamcount = '>BB'
181
181
182 _fphasesentry = '>i20s'
182 _fphasesentry = '>i20s'
183
183
184 preferedchunksize = 4096
184 preferedchunksize = 4096
185
185
186 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
186 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
187
187
188 def outdebug(ui, message):
188 def outdebug(ui, message):
189 """debug regarding output stream (bundling)"""
189 """debug regarding output stream (bundling)"""
190 if ui.configbool('devel', 'bundle2.debug'):
190 if ui.configbool('devel', 'bundle2.debug'):
191 ui.debug('bundle2-output: %s\n' % message)
191 ui.debug('bundle2-output: %s\n' % message)
192
192
193 def indebug(ui, message):
193 def indebug(ui, message):
194 """debug on input stream (unbundling)"""
194 """debug on input stream (unbundling)"""
195 if ui.configbool('devel', 'bundle2.debug'):
195 if ui.configbool('devel', 'bundle2.debug'):
196 ui.debug('bundle2-input: %s\n' % message)
196 ui.debug('bundle2-input: %s\n' % message)
197
197
198 def validateparttype(parttype):
198 def validateparttype(parttype):
199 """raise ValueError if a parttype contains invalid character"""
199 """raise ValueError if a parttype contains invalid character"""
200 if _parttypeforbidden.search(parttype):
200 if _parttypeforbidden.search(parttype):
201 raise ValueError(parttype)
201 raise ValueError(parttype)
202
202
203 def _makefpartparamsizes(nbparams):
203 def _makefpartparamsizes(nbparams):
204 """return a struct format to read part parameter sizes
204 """return a struct format to read part parameter sizes
205
205
206 The number parameters is variable so we need to build that format
206 The number parameters is variable so we need to build that format
207 dynamically.
207 dynamically.
208 """
208 """
209 return '>'+('BB'*nbparams)
209 return '>'+('BB'*nbparams)
210
210
211 parthandlermapping = {}
211 parthandlermapping = {}
212
212
213 def parthandler(parttype, params=()):
213 def parthandler(parttype, params=()):
214 """decorator that register a function as a bundle2 part handler
214 """decorator that register a function as a bundle2 part handler
215
215
216 eg::
216 eg::
217
217
218 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
218 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
219 def myparttypehandler(...):
219 def myparttypehandler(...):
220 '''process a part of type "my part".'''
220 '''process a part of type "my part".'''
221 ...
221 ...
222 """
222 """
223 validateparttype(parttype)
223 validateparttype(parttype)
224 def _decorator(func):
224 def _decorator(func):
225 lparttype = parttype.lower() # enforce lower case matching.
225 lparttype = parttype.lower() # enforce lower case matching.
226 assert lparttype not in parthandlermapping
226 assert lparttype not in parthandlermapping
227 parthandlermapping[lparttype] = func
227 parthandlermapping[lparttype] = func
228 func.params = frozenset(params)
228 func.params = frozenset(params)
229 return func
229 return func
230 return _decorator
230 return _decorator
231
231
232 class unbundlerecords(object):
232 class unbundlerecords(object):
233 """keep record of what happens during and unbundle
233 """keep record of what happens during and unbundle
234
234
235 New records are added using `records.add('cat', obj)`. Where 'cat' is a
235 New records are added using `records.add('cat', obj)`. Where 'cat' is a
236 category of record and obj is an arbitrary object.
236 category of record and obj is an arbitrary object.
237
237
238 `records['cat']` will return all entries of this category 'cat'.
238 `records['cat']` will return all entries of this category 'cat'.
239
239
240 Iterating on the object itself will yield `('category', obj)` tuples
240 Iterating on the object itself will yield `('category', obj)` tuples
241 for all entries.
241 for all entries.
242
242
243 All iterations happens in chronological order.
243 All iterations happens in chronological order.
244 """
244 """
245
245
246 def __init__(self):
246 def __init__(self):
247 self._categories = {}
247 self._categories = {}
248 self._sequences = []
248 self._sequences = []
249 self._replies = {}
249 self._replies = {}
250
250
251 def add(self, category, entry, inreplyto=None):
251 def add(self, category, entry, inreplyto=None):
252 """add a new record of a given category.
252 """add a new record of a given category.
253
253
254 The entry can then be retrieved in the list returned by
254 The entry can then be retrieved in the list returned by
255 self['category']."""
255 self['category']."""
256 self._categories.setdefault(category, []).append(entry)
256 self._categories.setdefault(category, []).append(entry)
257 self._sequences.append((category, entry))
257 self._sequences.append((category, entry))
258 if inreplyto is not None:
258 if inreplyto is not None:
259 self.getreplies(inreplyto).add(category, entry)
259 self.getreplies(inreplyto).add(category, entry)
260
260
261 def getreplies(self, partid):
261 def getreplies(self, partid):
262 """get the records that are replies to a specific part"""
262 """get the records that are replies to a specific part"""
263 return self._replies.setdefault(partid, unbundlerecords())
263 return self._replies.setdefault(partid, unbundlerecords())
264
264
265 def __getitem__(self, cat):
265 def __getitem__(self, cat):
266 return tuple(self._categories.get(cat, ()))
266 return tuple(self._categories.get(cat, ()))
267
267
268 def __iter__(self):
268 def __iter__(self):
269 return iter(self._sequences)
269 return iter(self._sequences)
270
270
271 def __len__(self):
271 def __len__(self):
272 return len(self._sequences)
272 return len(self._sequences)
273
273
274 def __nonzero__(self):
274 def __nonzero__(self):
275 return bool(self._sequences)
275 return bool(self._sequences)
276
276
277 __bool__ = __nonzero__
277 __bool__ = __nonzero__
278
278
279 class bundleoperation(object):
279 class bundleoperation(object):
280 """an object that represents a single bundling process
280 """an object that represents a single bundling process
281
281
282 Its purpose is to carry unbundle-related objects and states.
282 Its purpose is to carry unbundle-related objects and states.
283
283
284 A new object should be created at the beginning of each bundle processing.
284 A new object should be created at the beginning of each bundle processing.
285 The object is to be returned by the processing function.
285 The object is to be returned by the processing function.
286
286
287 The object has very little content now it will ultimately contain:
287 The object has very little content now it will ultimately contain:
288 * an access to the repo the bundle is applied to,
288 * an access to the repo the bundle is applied to,
289 * a ui object,
289 * a ui object,
290 * a way to retrieve a transaction to add changes to the repo,
290 * a way to retrieve a transaction to add changes to the repo,
291 * a way to record the result of processing each part,
291 * a way to record the result of processing each part,
292 * a way to construct a bundle response when applicable.
292 * a way to construct a bundle response when applicable.
293 """
293 """
294
294
295 def __init__(self, repo, transactiongetter, captureoutput=True):
295 def __init__(self, repo, transactiongetter, captureoutput=True):
296 self.repo = repo
296 self.repo = repo
297 self.ui = repo.ui
297 self.ui = repo.ui
298 self.records = unbundlerecords()
298 self.records = unbundlerecords()
299 self.reply = None
299 self.reply = None
300 self.captureoutput = captureoutput
300 self.captureoutput = captureoutput
301 self.hookargs = {}
301 self.hookargs = {}
302 self._gettransaction = transactiongetter
302 self._gettransaction = transactiongetter
303
303
304 def gettransaction(self):
304 def gettransaction(self):
305 transaction = self._gettransaction()
305 transaction = self._gettransaction()
306
306
307 if self.hookargs is not None:
307 if self.hookargs is not None:
308 # the ones added to the transaction supercede those added
308 # the ones added to the transaction supercede those added
309 # to the operation.
309 # to the operation.
310 self.hookargs.update(transaction.hookargs)
310 self.hookargs.update(transaction.hookargs)
311 transaction.hookargs = self.hookargs
311 transaction.hookargs = self.hookargs
312
312
313 # mark the hookargs as flushed. further attempts to add to
313 # mark the hookargs as flushed. further attempts to add to
314 # hookargs will result in an abort.
314 # hookargs will result in an abort.
315 self.hookargs = None
315 self.hookargs = None
316
316
317 return transaction
317 return transaction
318
318
319 def addhookargs(self, hookargs):
319 def addhookargs(self, hookargs):
320 if self.hookargs is None:
320 if self.hookargs is None:
321 raise error.Abort(
321 raise error.Abort(
322 _('attempted to add hooks to operation after transaction '
322 _('attempted to add hooks to operation after transaction '
323 'started'))
323 'started'))
324 self.hookargs.update(hookargs)
324 self.hookargs.update(hookargs)
325
325
326 class TransactionUnavailable(RuntimeError):
326 class TransactionUnavailable(RuntimeError):
327 pass
327 pass
328
328
329 def _notransaction():
329 def _notransaction():
330 """default method to get a transaction while processing a bundle
330 """default method to get a transaction while processing a bundle
331
331
332 Raise an exception to highlight the fact that no transaction was expected
332 Raise an exception to highlight the fact that no transaction was expected
333 to be created"""
333 to be created"""
334 raise TransactionUnavailable()
334 raise TransactionUnavailable()
335
335
336 def applybundle(repo, unbundler, tr, source=None, url=None, **kwargs):
336 def applybundle(repo, unbundler, tr, source=None, url=None, **kwargs):
337 # transform me into unbundler.apply() as soon as the freeze is lifted
337 # transform me into unbundler.apply() as soon as the freeze is lifted
338 if isinstance(unbundler, unbundle20):
338 if isinstance(unbundler, unbundle20):
339 tr.hookargs['bundle2'] = '1'
339 tr.hookargs['bundle2'] = '1'
340 if source is not None and 'source' not in tr.hookargs:
340 if source is not None and 'source' not in tr.hookargs:
341 tr.hookargs['source'] = source
341 tr.hookargs['source'] = source
342 if url is not None and 'url' not in tr.hookargs:
342 if url is not None and 'url' not in tr.hookargs:
343 tr.hookargs['url'] = url
343 tr.hookargs['url'] = url
344 return processbundle(repo, unbundler, lambda: tr)
344 return processbundle(repo, unbundler, lambda: tr)
345 else:
345 else:
346 # the transactiongetter won't be used, but we might as well set it
346 # the transactiongetter won't be used, but we might as well set it
347 op = bundleoperation(repo, lambda: tr)
347 op = bundleoperation(repo, lambda: tr)
348 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
348 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
349 return op
349 return op
350
350
351 def processbundle(repo, unbundler, transactiongetter=None, op=None):
351 def processbundle(repo, unbundler, transactiongetter=None, op=None):
352 """This function process a bundle, apply effect to/from a repo
352 """This function process a bundle, apply effect to/from a repo
353
353
354 It iterates over each part then searches for and uses the proper handling
354 It iterates over each part then searches for and uses the proper handling
355 code to process the part. Parts are processed in order.
355 code to process the part. Parts are processed in order.
356
356
357 Unknown Mandatory part will abort the process.
357 Unknown Mandatory part will abort the process.
358
358
359 It is temporarily possible to provide a prebuilt bundleoperation to the
359 It is temporarily possible to provide a prebuilt bundleoperation to the
360 function. This is used to ensure output is properly propagated in case of
360 function. This is used to ensure output is properly propagated in case of
361 an error during the unbundling. This output capturing part will likely be
361 an error during the unbundling. This output capturing part will likely be
362 reworked and this ability will probably go away in the process.
362 reworked and this ability will probably go away in the process.
363 """
363 """
364 if op is None:
364 if op is None:
365 if transactiongetter is None:
365 if transactiongetter is None:
366 transactiongetter = _notransaction
366 transactiongetter = _notransaction
367 op = bundleoperation(repo, transactiongetter)
367 op = bundleoperation(repo, transactiongetter)
368 # todo:
368 # todo:
369 # - replace this is a init function soon.
369 # - replace this is a init function soon.
370 # - exception catching
370 # - exception catching
371 unbundler.params
371 unbundler.params
372 if repo.ui.debugflag:
372 if repo.ui.debugflag:
373 msg = ['bundle2-input-bundle:']
373 msg = ['bundle2-input-bundle:']
374 if unbundler.params:
374 if unbundler.params:
375 msg.append(' %i params' % len(unbundler.params))
375 msg.append(' %i params' % len(unbundler.params))
376 if op.gettransaction is None or op.gettransaction is _notransaction:
376 if op.gettransaction is None or op.gettransaction is _notransaction:
377 msg.append(' no-transaction')
377 msg.append(' no-transaction')
378 else:
378 else:
379 msg.append(' with-transaction')
379 msg.append(' with-transaction')
380 msg.append('\n')
380 msg.append('\n')
381 repo.ui.debug(''.join(msg))
381 repo.ui.debug(''.join(msg))
382 iterparts = enumerate(unbundler.iterparts())
382 iterparts = enumerate(unbundler.iterparts())
383 part = None
383 part = None
384 nbpart = 0
384 nbpart = 0
385 try:
385 try:
386 for nbpart, part in iterparts:
386 for nbpart, part in iterparts:
387 _processpart(op, part)
387 _processpart(op, part)
388 except Exception as exc:
388 except Exception as exc:
389 # Any exceptions seeking to the end of the bundle at this point are
389 # Any exceptions seeking to the end of the bundle at this point are
390 # almost certainly related to the underlying stream being bad.
390 # almost certainly related to the underlying stream being bad.
391 # And, chances are that the exception we're handling is related to
391 # And, chances are that the exception we're handling is related to
392 # getting in that bad state. So, we swallow the seeking error and
392 # getting in that bad state. So, we swallow the seeking error and
393 # re-raise the original error.
393 # re-raise the original error.
394 seekerror = False
394 seekerror = False
395 try:
395 try:
396 for nbpart, part in iterparts:
396 for nbpart, part in iterparts:
397 # consume the bundle content
397 # consume the bundle content
398 part.seek(0, 2)
398 part.seek(0, 2)
399 except Exception:
399 except Exception:
400 seekerror = True
400 seekerror = True
401
401
402 # Small hack to let caller code distinguish exceptions from bundle2
402 # Small hack to let caller code distinguish exceptions from bundle2
403 # processing from processing the old format. This is mostly
403 # processing from processing the old format. This is mostly
404 # needed to handle different return codes to unbundle according to the
404 # needed to handle different return codes to unbundle according to the
405 # type of bundle. We should probably clean up or drop this return code
405 # type of bundle. We should probably clean up or drop this return code
406 # craziness in a future version.
406 # craziness in a future version.
407 exc.duringunbundle2 = True
407 exc.duringunbundle2 = True
408 salvaged = []
408 salvaged = []
409 replycaps = None
409 replycaps = None
410 if op.reply is not None:
410 if op.reply is not None:
411 salvaged = op.reply.salvageoutput()
411 salvaged = op.reply.salvageoutput()
412 replycaps = op.reply.capabilities
412 replycaps = op.reply.capabilities
413 exc._replycaps = replycaps
413 exc._replycaps = replycaps
414 exc._bundle2salvagedoutput = salvaged
414 exc._bundle2salvagedoutput = salvaged
415
415
416 # Re-raising from a variable loses the original stack. So only use
416 # Re-raising from a variable loses the original stack. So only use
417 # that form if we need to.
417 # that form if we need to.
418 if seekerror:
418 if seekerror:
419 raise exc
419 raise exc
420 else:
420 else:
421 raise
421 raise
422 finally:
422 finally:
423 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
423 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
424
424
425 return op
425 return op
426
426
427 def _processchangegroup(op, cg, tr, source, url, **kwargs):
427 def _processchangegroup(op, cg, tr, source, url, **kwargs):
428 ret = cg.apply(op.repo, tr, source, url, **kwargs)
428 ret = cg.apply(op.repo, tr, source, url, **kwargs)
429 op.records.add('changegroup', {
429 op.records.add('changegroup', {
430 'return': ret,
430 'return': ret,
431 })
431 })
432 return ret
432 return ret
433
433
434 def _processpart(op, part):
434 def _processpart(op, part):
435 """process a single part from a bundle
435 """process a single part from a bundle
436
436
437 The part is guaranteed to have been fully consumed when the function exits
437 The part is guaranteed to have been fully consumed when the function exits
438 (even if an exception is raised)."""
438 (even if an exception is raised)."""
439 status = 'unknown' # used by debug output
439 status = 'unknown' # used by debug output
440 hardabort = False
440 hardabort = False
441 try:
441 try:
442 try:
442 try:
443 handler = parthandlermapping.get(part.type)
443 handler = parthandlermapping.get(part.type)
444 if handler is None:
444 if handler is None:
445 status = 'unsupported-type'
445 status = 'unsupported-type'
446 raise error.BundleUnknownFeatureError(parttype=part.type)
446 raise error.BundleUnknownFeatureError(parttype=part.type)
447 indebug(op.ui, 'found a handler for part %r' % part.type)
447 indebug(op.ui, 'found a handler for part %r' % part.type)
448 unknownparams = part.mandatorykeys - handler.params
448 unknownparams = part.mandatorykeys - handler.params
449 if unknownparams:
449 if unknownparams:
450 unknownparams = list(unknownparams)
450 unknownparams = list(unknownparams)
451 unknownparams.sort()
451 unknownparams.sort()
452 status = 'unsupported-params (%s)' % unknownparams
452 status = 'unsupported-params (%s)' % unknownparams
453 raise error.BundleUnknownFeatureError(parttype=part.type,
453 raise error.BundleUnknownFeatureError(parttype=part.type,
454 params=unknownparams)
454 params=unknownparams)
455 status = 'supported'
455 status = 'supported'
456 except error.BundleUnknownFeatureError as exc:
456 except error.BundleUnknownFeatureError as exc:
457 if part.mandatory: # mandatory parts
457 if part.mandatory: # mandatory parts
458 raise
458 raise
459 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
459 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
460 return # skip to part processing
460 return # skip to part processing
461 finally:
461 finally:
462 if op.ui.debugflag:
462 if op.ui.debugflag:
463 msg = ['bundle2-input-part: "%s"' % part.type]
463 msg = ['bundle2-input-part: "%s"' % part.type]
464 if not part.mandatory:
464 if not part.mandatory:
465 msg.append(' (advisory)')
465 msg.append(' (advisory)')
466 nbmp = len(part.mandatorykeys)
466 nbmp = len(part.mandatorykeys)
467 nbap = len(part.params) - nbmp
467 nbap = len(part.params) - nbmp
468 if nbmp or nbap:
468 if nbmp or nbap:
469 msg.append(' (params:')
469 msg.append(' (params:')
470 if nbmp:
470 if nbmp:
471 msg.append(' %i mandatory' % nbmp)
471 msg.append(' %i mandatory' % nbmp)
472 if nbap:
472 if nbap:
473 msg.append(' %i advisory' % nbmp)
473 msg.append(' %i advisory' % nbmp)
474 msg.append(')')
474 msg.append(')')
475 msg.append(' %s\n' % status)
475 msg.append(' %s\n' % status)
476 op.ui.debug(''.join(msg))
476 op.ui.debug(''.join(msg))
477
477
478 # handler is called outside the above try block so that we don't
478 # handler is called outside the above try block so that we don't
479 # risk catching KeyErrors from anything other than the
479 # risk catching KeyErrors from anything other than the
480 # parthandlermapping lookup (any KeyError raised by handler()
480 # parthandlermapping lookup (any KeyError raised by handler()
481 # itself represents a defect of a different variety).
481 # itself represents a defect of a different variety).
482 output = None
482 output = None
483 if op.captureoutput and op.reply is not None:
483 if op.captureoutput and op.reply is not None:
484 op.ui.pushbuffer(error=True, subproc=True)
484 op.ui.pushbuffer(error=True, subproc=True)
485 output = ''
485 output = ''
486 try:
486 try:
487 handler(op, part)
487 handler(op, part)
488 finally:
488 finally:
489 if output is not None:
489 if output is not None:
490 output = op.ui.popbuffer()
490 output = op.ui.popbuffer()
491 if output:
491 if output:
492 outpart = op.reply.newpart('output', data=output,
492 outpart = op.reply.newpart('output', data=output,
493 mandatory=False)
493 mandatory=False)
494 outpart.addparam(
494 outpart.addparam(
495 'in-reply-to', pycompat.bytestr(part.id), mandatory=False)
495 'in-reply-to', pycompat.bytestr(part.id), mandatory=False)
496 # If exiting or interrupted, do not attempt to seek the stream in the
496 # If exiting or interrupted, do not attempt to seek the stream in the
497 # finally block below. This makes abort faster.
497 # finally block below. This makes abort faster.
498 except (SystemExit, KeyboardInterrupt):
498 except (SystemExit, KeyboardInterrupt):
499 hardabort = True
499 hardabort = True
500 raise
500 raise
501 finally:
501 finally:
502 # consume the part content to not corrupt the stream.
502 # consume the part content to not corrupt the stream.
503 if not hardabort:
503 if not hardabort:
504 part.seek(0, 2)
504 part.seek(0, 2)
505
505
506
506
507 def decodecaps(blob):
507 def decodecaps(blob):
508 """decode a bundle2 caps bytes blob into a dictionary
508 """decode a bundle2 caps bytes blob into a dictionary
509
509
510 The blob is a list of capabilities (one per line)
510 The blob is a list of capabilities (one per line)
511 Capabilities may have values using a line of the form::
511 Capabilities may have values using a line of the form::
512
512
513 capability=value1,value2,value3
513 capability=value1,value2,value3
514
514
515 The values are always a list."""
515 The values are always a list."""
516 caps = {}
516 caps = {}
517 for line in blob.splitlines():
517 for line in blob.splitlines():
518 if not line:
518 if not line:
519 continue
519 continue
520 if '=' not in line:
520 if '=' not in line:
521 key, vals = line, ()
521 key, vals = line, ()
522 else:
522 else:
523 key, vals = line.split('=', 1)
523 key, vals = line.split('=', 1)
524 vals = vals.split(',')
524 vals = vals.split(',')
525 key = urlreq.unquote(key)
525 key = urlreq.unquote(key)
526 vals = [urlreq.unquote(v) for v in vals]
526 vals = [urlreq.unquote(v) for v in vals]
527 caps[key] = vals
527 caps[key] = vals
528 return caps
528 return caps
529
529
530 def encodecaps(caps):
530 def encodecaps(caps):
531 """encode a bundle2 caps dictionary into a bytes blob"""
531 """encode a bundle2 caps dictionary into a bytes blob"""
532 chunks = []
532 chunks = []
533 for ca in sorted(caps):
533 for ca in sorted(caps):
534 vals = caps[ca]
534 vals = caps[ca]
535 ca = urlreq.quote(ca)
535 ca = urlreq.quote(ca)
536 vals = [urlreq.quote(v) for v in vals]
536 vals = [urlreq.quote(v) for v in vals]
537 if vals:
537 if vals:
538 ca = "%s=%s" % (ca, ','.join(vals))
538 ca = "%s=%s" % (ca, ','.join(vals))
539 chunks.append(ca)
539 chunks.append(ca)
540 return '\n'.join(chunks)
540 return '\n'.join(chunks)
541
541
542 bundletypes = {
542 bundletypes = {
543 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
543 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
544 # since the unification ssh accepts a header but there
544 # since the unification ssh accepts a header but there
545 # is no capability signaling it.
545 # is no capability signaling it.
546 "HG20": (), # special-cased below
546 "HG20": (), # special-cased below
547 "HG10UN": ("HG10UN", 'UN'),
547 "HG10UN": ("HG10UN", 'UN'),
548 "HG10BZ": ("HG10", 'BZ'),
548 "HG10BZ": ("HG10", 'BZ'),
549 "HG10GZ": ("HG10GZ", 'GZ'),
549 "HG10GZ": ("HG10GZ", 'GZ'),
550 }
550 }
551
551
552 # hgweb uses this list to communicate its preferred type
552 # hgweb uses this list to communicate its preferred type
553 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
553 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
554
554
555 class bundle20(object):
555 class bundle20(object):
556 """represent an outgoing bundle2 container
556 """represent an outgoing bundle2 container
557
557
558 Use the `addparam` method to add stream level parameter. and `newpart` to
558 Use the `addparam` method to add stream level parameter. and `newpart` to
559 populate it. Then call `getchunks` to retrieve all the binary chunks of
559 populate it. Then call `getchunks` to retrieve all the binary chunks of
560 data that compose the bundle2 container."""
560 data that compose the bundle2 container."""
561
561
562 _magicstring = 'HG20'
562 _magicstring = 'HG20'
563
563
564 def __init__(self, ui, capabilities=()):
564 def __init__(self, ui, capabilities=()):
565 self.ui = ui
565 self.ui = ui
566 self._params = []
566 self._params = []
567 self._parts = []
567 self._parts = []
568 self.capabilities = dict(capabilities)
568 self.capabilities = dict(capabilities)
569 self._compengine = util.compengines.forbundletype('UN')
569 self._compengine = util.compengines.forbundletype('UN')
570 self._compopts = None
570 self._compopts = None
571
571
572 def setcompression(self, alg, compopts=None):
572 def setcompression(self, alg, compopts=None):
573 """setup core part compression to <alg>"""
573 """setup core part compression to <alg>"""
574 if alg in (None, 'UN'):
574 if alg in (None, 'UN'):
575 return
575 return
576 assert not any(n.lower() == 'compression' for n, v in self._params)
576 assert not any(n.lower() == 'compression' for n, v in self._params)
577 self.addparam('Compression', alg)
577 self.addparam('Compression', alg)
578 self._compengine = util.compengines.forbundletype(alg)
578 self._compengine = util.compengines.forbundletype(alg)
579 self._compopts = compopts
579 self._compopts = compopts
580
580
581 @property
581 @property
582 def nbparts(self):
582 def nbparts(self):
583 """total number of parts added to the bundler"""
583 """total number of parts added to the bundler"""
584 return len(self._parts)
584 return len(self._parts)
585
585
586 # methods used to defines the bundle2 content
586 # methods used to defines the bundle2 content
587 def addparam(self, name, value=None):
587 def addparam(self, name, value=None):
588 """add a stream level parameter"""
588 """add a stream level parameter"""
589 if not name:
589 if not name:
590 raise ValueError('empty parameter name')
590 raise ValueError('empty parameter name')
591 if name[0] not in string.letters:
591 if name[0] not in string.letters:
592 raise ValueError('non letter first character: %r' % name)
592 raise ValueError('non letter first character: %r' % name)
593 self._params.append((name, value))
593 self._params.append((name, value))
594
594
595 def addpart(self, part):
595 def addpart(self, part):
596 """add a new part to the bundle2 container
596 """add a new part to the bundle2 container
597
597
598 Parts contains the actual applicative payload."""
598 Parts contains the actual applicative payload."""
599 assert part.id is None
599 assert part.id is None
600 part.id = len(self._parts) # very cheap counter
600 part.id = len(self._parts) # very cheap counter
601 self._parts.append(part)
601 self._parts.append(part)
602
602
603 def newpart(self, typeid, *args, **kwargs):
603 def newpart(self, typeid, *args, **kwargs):
604 """create a new part and add it to the containers
604 """create a new part and add it to the containers
605
605
606 As the part is directly added to the containers. For now, this means
606 As the part is directly added to the containers. For now, this means
607 that any failure to properly initialize the part after calling
607 that any failure to properly initialize the part after calling
608 ``newpart`` should result in a failure of the whole bundling process.
608 ``newpart`` should result in a failure of the whole bundling process.
609
609
610 You can still fall back to manually create and add if you need better
610 You can still fall back to manually create and add if you need better
611 control."""
611 control."""
612 part = bundlepart(typeid, *args, **kwargs)
612 part = bundlepart(typeid, *args, **kwargs)
613 self.addpart(part)
613 self.addpart(part)
614 return part
614 return part
615
615
616 # methods used to generate the bundle2 stream
616 # methods used to generate the bundle2 stream
617 def getchunks(self):
617 def getchunks(self):
618 if self.ui.debugflag:
618 if self.ui.debugflag:
619 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
619 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
620 if self._params:
620 if self._params:
621 msg.append(' (%i params)' % len(self._params))
621 msg.append(' (%i params)' % len(self._params))
622 msg.append(' %i parts total\n' % len(self._parts))
622 msg.append(' %i parts total\n' % len(self._parts))
623 self.ui.debug(''.join(msg))
623 self.ui.debug(''.join(msg))
624 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
624 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
625 yield self._magicstring
625 yield self._magicstring
626 param = self._paramchunk()
626 param = self._paramchunk()
627 outdebug(self.ui, 'bundle parameter: %s' % param)
627 outdebug(self.ui, 'bundle parameter: %s' % param)
628 yield _pack(_fstreamparamsize, len(param))
628 yield _pack(_fstreamparamsize, len(param))
629 if param:
629 if param:
630 yield param
630 yield param
631 for chunk in self._compengine.compressstream(self._getcorechunk(),
631 for chunk in self._compengine.compressstream(self._getcorechunk(),
632 self._compopts):
632 self._compopts):
633 yield chunk
633 yield chunk
634
634
635 def _paramchunk(self):
635 def _paramchunk(self):
636 """return a encoded version of all stream parameters"""
636 """return a encoded version of all stream parameters"""
637 blocks = []
637 blocks = []
638 for par, value in self._params:
638 for par, value in self._params:
639 par = urlreq.quote(par)
639 par = urlreq.quote(par)
640 if value is not None:
640 if value is not None:
641 value = urlreq.quote(value)
641 value = urlreq.quote(value)
642 par = '%s=%s' % (par, value)
642 par = '%s=%s' % (par, value)
643 blocks.append(par)
643 blocks.append(par)
644 return ' '.join(blocks)
644 return ' '.join(blocks)
645
645
646 def _getcorechunk(self):
646 def _getcorechunk(self):
647 """yield chunk for the core part of the bundle
647 """yield chunk for the core part of the bundle
648
648
649 (all but headers and parameters)"""
649 (all but headers and parameters)"""
650 outdebug(self.ui, 'start of parts')
650 outdebug(self.ui, 'start of parts')
651 for part in self._parts:
651 for part in self._parts:
652 outdebug(self.ui, 'bundle part: "%s"' % part.type)
652 outdebug(self.ui, 'bundle part: "%s"' % part.type)
653 for chunk in part.getchunks(ui=self.ui):
653 for chunk in part.getchunks(ui=self.ui):
654 yield chunk
654 yield chunk
655 outdebug(self.ui, 'end of bundle')
655 outdebug(self.ui, 'end of bundle')
656 yield _pack(_fpartheadersize, 0)
656 yield _pack(_fpartheadersize, 0)
657
657
658
658
659 def salvageoutput(self):
659 def salvageoutput(self):
660 """return a list with a copy of all output parts in the bundle
660 """return a list with a copy of all output parts in the bundle
661
661
662 This is meant to be used during error handling to make sure we preserve
662 This is meant to be used during error handling to make sure we preserve
663 server output"""
663 server output"""
664 salvaged = []
664 salvaged = []
665 for part in self._parts:
665 for part in self._parts:
666 if part.type.startswith('output'):
666 if part.type.startswith('output'):
667 salvaged.append(part.copy())
667 salvaged.append(part.copy())
668 return salvaged
668 return salvaged
669
669
670
670
671 class unpackermixin(object):
671 class unpackermixin(object):
672 """A mixin to extract bytes and struct data from a stream"""
672 """A mixin to extract bytes and struct data from a stream"""
673
673
674 def __init__(self, fp):
674 def __init__(self, fp):
675 self._fp = fp
675 self._fp = fp
676
676
677 def _unpack(self, format):
677 def _unpack(self, format):
678 """unpack this struct format from the stream
678 """unpack this struct format from the stream
679
679
680 This method is meant for internal usage by the bundle2 protocol only.
680 This method is meant for internal usage by the bundle2 protocol only.
681 They directly manipulate the low level stream including bundle2 level
681 They directly manipulate the low level stream including bundle2 level
682 instruction.
682 instruction.
683
683
684 Do not use it to implement higher-level logic or methods."""
684 Do not use it to implement higher-level logic or methods."""
685 data = self._readexact(struct.calcsize(format))
685 data = self._readexact(struct.calcsize(format))
686 return _unpack(format, data)
686 return _unpack(format, data)
687
687
688 def _readexact(self, size):
688 def _readexact(self, size):
689 """read exactly <size> bytes from the stream
689 """read exactly <size> bytes from the stream
690
690
691 This method is meant for internal usage by the bundle2 protocol only.
691 This method is meant for internal usage by the bundle2 protocol only.
692 They directly manipulate the low level stream including bundle2 level
692 They directly manipulate the low level stream including bundle2 level
693 instruction.
693 instruction.
694
694
695 Do not use it to implement higher-level logic or methods."""
695 Do not use it to implement higher-level logic or methods."""
696 return changegroup.readexactly(self._fp, size)
696 return changegroup.readexactly(self._fp, size)
697
697
698 def getunbundler(ui, fp, magicstring=None):
698 def getunbundler(ui, fp, magicstring=None):
699 """return a valid unbundler object for a given magicstring"""
699 """return a valid unbundler object for a given magicstring"""
700 if magicstring is None:
700 if magicstring is None:
701 magicstring = changegroup.readexactly(fp, 4)
701 magicstring = changegroup.readexactly(fp, 4)
702 magic, version = magicstring[0:2], magicstring[2:4]
702 magic, version = magicstring[0:2], magicstring[2:4]
703 if magic != 'HG':
703 if magic != 'HG':
704 ui.debug(
704 ui.debug(
705 "error: invalid magic: %r (version %r), should be 'HG'\n"
705 "error: invalid magic: %r (version %r), should be 'HG'\n"
706 % (magic, version))
706 % (magic, version))
707 raise error.Abort(_('not a Mercurial bundle'))
707 raise error.Abort(_('not a Mercurial bundle'))
708 unbundlerclass = formatmap.get(version)
708 unbundlerclass = formatmap.get(version)
709 if unbundlerclass is None:
709 if unbundlerclass is None:
710 raise error.Abort(_('unknown bundle version %s') % version)
710 raise error.Abort(_('unknown bundle version %s') % version)
711 unbundler = unbundlerclass(ui, fp)
711 unbundler = unbundlerclass(ui, fp)
712 indebug(ui, 'start processing of %s stream' % magicstring)
712 indebug(ui, 'start processing of %s stream' % magicstring)
713 return unbundler
713 return unbundler
714
714
715 class unbundle20(unpackermixin):
715 class unbundle20(unpackermixin):
716 """interpret a bundle2 stream
716 """interpret a bundle2 stream
717
717
718 This class is fed with a binary stream and yields parts through its
718 This class is fed with a binary stream and yields parts through its
719 `iterparts` methods."""
719 `iterparts` methods."""
720
720
721 _magicstring = 'HG20'
721 _magicstring = 'HG20'
722
722
723 def __init__(self, ui, fp):
723 def __init__(self, ui, fp):
724 """If header is specified, we do not read it out of the stream."""
724 """If header is specified, we do not read it out of the stream."""
725 self.ui = ui
725 self.ui = ui
726 self._compengine = util.compengines.forbundletype('UN')
726 self._compengine = util.compengines.forbundletype('UN')
727 self._compressed = None
727 self._compressed = None
728 super(unbundle20, self).__init__(fp)
728 super(unbundle20, self).__init__(fp)
729
729
730 @util.propertycache
730 @util.propertycache
731 def params(self):
731 def params(self):
732 """dictionary of stream level parameters"""
732 """dictionary of stream level parameters"""
733 indebug(self.ui, 'reading bundle2 stream parameters')
733 indebug(self.ui, 'reading bundle2 stream parameters')
734 params = {}
734 params = {}
735 paramssize = self._unpack(_fstreamparamsize)[0]
735 paramssize = self._unpack(_fstreamparamsize)[0]
736 if paramssize < 0:
736 if paramssize < 0:
737 raise error.BundleValueError('negative bundle param size: %i'
737 raise error.BundleValueError('negative bundle param size: %i'
738 % paramssize)
738 % paramssize)
739 if paramssize:
739 if paramssize:
740 params = self._readexact(paramssize)
740 params = self._readexact(paramssize)
741 params = self._processallparams(params)
741 params = self._processallparams(params)
742 return params
742 return params
743
743
744 def _processallparams(self, paramsblock):
744 def _processallparams(self, paramsblock):
745 """"""
745 """"""
746 params = util.sortdict()
746 params = util.sortdict()
747 for p in paramsblock.split(' '):
747 for p in paramsblock.split(' '):
748 p = p.split('=', 1)
748 p = p.split('=', 1)
749 p = [urlreq.unquote(i) for i in p]
749 p = [urlreq.unquote(i) for i in p]
750 if len(p) < 2:
750 if len(p) < 2:
751 p.append(None)
751 p.append(None)
752 self._processparam(*p)
752 self._processparam(*p)
753 params[p[0]] = p[1]
753 params[p[0]] = p[1]
754 return params
754 return params
755
755
756
756
757 def _processparam(self, name, value):
757 def _processparam(self, name, value):
758 """process a parameter, applying its effect if needed
758 """process a parameter, applying its effect if needed
759
759
760 Parameter starting with a lower case letter are advisory and will be
760 Parameter starting with a lower case letter are advisory and will be
761 ignored when unknown. Those starting with an upper case letter are
761 ignored when unknown. Those starting with an upper case letter are
762 mandatory and will this function will raise a KeyError when unknown.
762 mandatory and will this function will raise a KeyError when unknown.
763
763
764 Note: no option are currently supported. Any input will be either
764 Note: no option are currently supported. Any input will be either
765 ignored or failing.
765 ignored or failing.
766 """
766 """
767 if not name:
767 if not name:
768 raise ValueError('empty parameter name')
768 raise ValueError('empty parameter name')
769 if name[0] not in string.letters:
769 if name[0] not in string.letters:
770 raise ValueError('non letter first character: %r' % name)
770 raise ValueError('non letter first character: %r' % name)
771 try:
771 try:
772 handler = b2streamparamsmap[name.lower()]
772 handler = b2streamparamsmap[name.lower()]
773 except KeyError:
773 except KeyError:
774 if name[0].islower():
774 if name[0].islower():
775 indebug(self.ui, "ignoring unknown parameter %r" % name)
775 indebug(self.ui, "ignoring unknown parameter %r" % name)
776 else:
776 else:
777 raise error.BundleUnknownFeatureError(params=(name,))
777 raise error.BundleUnknownFeatureError(params=(name,))
778 else:
778 else:
779 handler(self, name, value)
779 handler(self, name, value)
780
780
781 def _forwardchunks(self):
781 def _forwardchunks(self):
782 """utility to transfer a bundle2 as binary
782 """utility to transfer a bundle2 as binary
783
783
784 This is made necessary by the fact the 'getbundle' command over 'ssh'
784 This is made necessary by the fact the 'getbundle' command over 'ssh'
785 have no way to know then the reply end, relying on the bundle to be
785 have no way to know then the reply end, relying on the bundle to be
786 interpreted to know its end. This is terrible and we are sorry, but we
786 interpreted to know its end. This is terrible and we are sorry, but we
787 needed to move forward to get general delta enabled.
787 needed to move forward to get general delta enabled.
788 """
788 """
789 yield self._magicstring
789 yield self._magicstring
790 assert 'params' not in vars(self)
790 assert 'params' not in vars(self)
791 paramssize = self._unpack(_fstreamparamsize)[0]
791 paramssize = self._unpack(_fstreamparamsize)[0]
792 if paramssize < 0:
792 if paramssize < 0:
793 raise error.BundleValueError('negative bundle param size: %i'
793 raise error.BundleValueError('negative bundle param size: %i'
794 % paramssize)
794 % paramssize)
795 yield _pack(_fstreamparamsize, paramssize)
795 yield _pack(_fstreamparamsize, paramssize)
796 if paramssize:
796 if paramssize:
797 params = self._readexact(paramssize)
797 params = self._readexact(paramssize)
798 self._processallparams(params)
798 self._processallparams(params)
799 yield params
799 yield params
800 assert self._compengine.bundletype == 'UN'
800 assert self._compengine.bundletype == 'UN'
801 # From there, payload might need to be decompressed
801 # From there, payload might need to be decompressed
802 self._fp = self._compengine.decompressorreader(self._fp)
802 self._fp = self._compengine.decompressorreader(self._fp)
803 emptycount = 0
803 emptycount = 0
804 while emptycount < 2:
804 while emptycount < 2:
805 # so we can brainlessly loop
805 # so we can brainlessly loop
806 assert _fpartheadersize == _fpayloadsize
806 assert _fpartheadersize == _fpayloadsize
807 size = self._unpack(_fpartheadersize)[0]
807 size = self._unpack(_fpartheadersize)[0]
808 yield _pack(_fpartheadersize, size)
808 yield _pack(_fpartheadersize, size)
809 if size:
809 if size:
810 emptycount = 0
810 emptycount = 0
811 else:
811 else:
812 emptycount += 1
812 emptycount += 1
813 continue
813 continue
814 if size == flaginterrupt:
814 if size == flaginterrupt:
815 continue
815 continue
816 elif size < 0:
816 elif size < 0:
817 raise error.BundleValueError('negative chunk size: %i')
817 raise error.BundleValueError('negative chunk size: %i')
818 yield self._readexact(size)
818 yield self._readexact(size)
819
819
820
820
821 def iterparts(self):
821 def iterparts(self):
822 """yield all parts contained in the stream"""
822 """yield all parts contained in the stream"""
823 # make sure param have been loaded
823 # make sure param have been loaded
824 self.params
824 self.params
825 # From there, payload need to be decompressed
825 # From there, payload need to be decompressed
826 self._fp = self._compengine.decompressorreader(self._fp)
826 self._fp = self._compengine.decompressorreader(self._fp)
827 indebug(self.ui, 'start extraction of bundle2 parts')
827 indebug(self.ui, 'start extraction of bundle2 parts')
828 headerblock = self._readpartheader()
828 headerblock = self._readpartheader()
829 while headerblock is not None:
829 while headerblock is not None:
830 part = unbundlepart(self.ui, headerblock, self._fp)
830 part = unbundlepart(self.ui, headerblock, self._fp)
831 yield part
831 yield part
832 part.seek(0, 2)
832 part.seek(0, 2)
833 headerblock = self._readpartheader()
833 headerblock = self._readpartheader()
834 indebug(self.ui, 'end of bundle2 stream')
834 indebug(self.ui, 'end of bundle2 stream')
835
835
836 def _readpartheader(self):
836 def _readpartheader(self):
837 """reads a part header size and return the bytes blob
837 """reads a part header size and return the bytes blob
838
838
839 returns None if empty"""
839 returns None if empty"""
840 headersize = self._unpack(_fpartheadersize)[0]
840 headersize = self._unpack(_fpartheadersize)[0]
841 if headersize < 0:
841 if headersize < 0:
842 raise error.BundleValueError('negative part header size: %i'
842 raise error.BundleValueError('negative part header size: %i'
843 % headersize)
843 % headersize)
844 indebug(self.ui, 'part header size: %i' % headersize)
844 indebug(self.ui, 'part header size: %i' % headersize)
845 if headersize:
845 if headersize:
846 return self._readexact(headersize)
846 return self._readexact(headersize)
847 return None
847 return None
848
848
849 def compressed(self):
849 def compressed(self):
850 self.params # load params
850 self.params # load params
851 return self._compressed
851 return self._compressed
852
852
853 def close(self):
853 def close(self):
854 """close underlying file"""
854 """close underlying file"""
855 if util.safehasattr(self._fp, 'close'):
855 if util.safehasattr(self._fp, 'close'):
856 return self._fp.close()
856 return self._fp.close()
857
857
858 formatmap = {'20': unbundle20}
858 formatmap = {'20': unbundle20}
859
859
860 b2streamparamsmap = {}
860 b2streamparamsmap = {}
861
861
862 def b2streamparamhandler(name):
862 def b2streamparamhandler(name):
863 """register a handler for a stream level parameter"""
863 """register a handler for a stream level parameter"""
864 def decorator(func):
864 def decorator(func):
865 assert name not in formatmap
865 assert name not in formatmap
866 b2streamparamsmap[name] = func
866 b2streamparamsmap[name] = func
867 return func
867 return func
868 return decorator
868 return decorator
869
869
870 @b2streamparamhandler('compression')
870 @b2streamparamhandler('compression')
871 def processcompression(unbundler, param, value):
871 def processcompression(unbundler, param, value):
872 """read compression parameter and install payload decompression"""
872 """read compression parameter and install payload decompression"""
873 if value not in util.compengines.supportedbundletypes:
873 if value not in util.compengines.supportedbundletypes:
874 raise error.BundleUnknownFeatureError(params=(param,),
874 raise error.BundleUnknownFeatureError(params=(param,),
875 values=(value,))
875 values=(value,))
876 unbundler._compengine = util.compengines.forbundletype(value)
876 unbundler._compengine = util.compengines.forbundletype(value)
877 if value is not None:
877 if value is not None:
878 unbundler._compressed = True
878 unbundler._compressed = True
879
879
880 class bundlepart(object):
880 class bundlepart(object):
881 """A bundle2 part contains application level payload
881 """A bundle2 part contains application level payload
882
882
883 The part `type` is used to route the part to the application level
883 The part `type` is used to route the part to the application level
884 handler.
884 handler.
885
885
886 The part payload is contained in ``part.data``. It could be raw bytes or a
886 The part payload is contained in ``part.data``. It could be raw bytes or a
887 generator of byte chunks.
887 generator of byte chunks.
888
888
889 You can add parameters to the part using the ``addparam`` method.
889 You can add parameters to the part using the ``addparam`` method.
890 Parameters can be either mandatory (default) or advisory. Remote side
890 Parameters can be either mandatory (default) or advisory. Remote side
891 should be able to safely ignore the advisory ones.
891 should be able to safely ignore the advisory ones.
892
892
893 Both data and parameters cannot be modified after the generation has begun.
893 Both data and parameters cannot be modified after the generation has begun.
894 """
894 """
895
895
896 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
896 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
897 data='', mandatory=True):
897 data='', mandatory=True):
898 validateparttype(parttype)
898 validateparttype(parttype)
899 self.id = None
899 self.id = None
900 self.type = parttype
900 self.type = parttype
901 self._data = data
901 self._data = data
902 self._mandatoryparams = list(mandatoryparams)
902 self._mandatoryparams = list(mandatoryparams)
903 self._advisoryparams = list(advisoryparams)
903 self._advisoryparams = list(advisoryparams)
904 # checking for duplicated entries
904 # checking for duplicated entries
905 self._seenparams = set()
905 self._seenparams = set()
906 for pname, __ in self._mandatoryparams + self._advisoryparams:
906 for pname, __ in self._mandatoryparams + self._advisoryparams:
907 if pname in self._seenparams:
907 if pname in self._seenparams:
908 raise error.ProgrammingError('duplicated params: %s' % pname)
908 raise error.ProgrammingError('duplicated params: %s' % pname)
909 self._seenparams.add(pname)
909 self._seenparams.add(pname)
910 # status of the part's generation:
910 # status of the part's generation:
911 # - None: not started,
911 # - None: not started,
912 # - False: currently generated,
912 # - False: currently generated,
913 # - True: generation done.
913 # - True: generation done.
914 self._generated = None
914 self._generated = None
915 self.mandatory = mandatory
915 self.mandatory = mandatory
916
916
917 def __repr__(self):
917 def __repr__(self):
918 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
918 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
919 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
919 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
920 % (cls, id(self), self.id, self.type, self.mandatory))
920 % (cls, id(self), self.id, self.type, self.mandatory))
921
921
922 def copy(self):
922 def copy(self):
923 """return a copy of the part
923 """return a copy of the part
924
924
925 The new part have the very same content but no partid assigned yet.
925 The new part have the very same content but no partid assigned yet.
926 Parts with generated data cannot be copied."""
926 Parts with generated data cannot be copied."""
927 assert not util.safehasattr(self.data, 'next')
927 assert not util.safehasattr(self.data, 'next')
928 return self.__class__(self.type, self._mandatoryparams,
928 return self.__class__(self.type, self._mandatoryparams,
929 self._advisoryparams, self._data, self.mandatory)
929 self._advisoryparams, self._data, self.mandatory)
930
930
931 # methods used to defines the part content
931 # methods used to defines the part content
932 @property
932 @property
933 def data(self):
933 def data(self):
934 return self._data
934 return self._data
935
935
936 @data.setter
936 @data.setter
937 def data(self, data):
937 def data(self, data):
938 if self._generated is not None:
938 if self._generated is not None:
939 raise error.ReadOnlyPartError('part is being generated')
939 raise error.ReadOnlyPartError('part is being generated')
940 self._data = data
940 self._data = data
941
941
942 @property
942 @property
943 def mandatoryparams(self):
943 def mandatoryparams(self):
944 # make it an immutable tuple to force people through ``addparam``
944 # make it an immutable tuple to force people through ``addparam``
945 return tuple(self._mandatoryparams)
945 return tuple(self._mandatoryparams)
946
946
947 @property
947 @property
948 def advisoryparams(self):
948 def advisoryparams(self):
949 # make it an immutable tuple to force people through ``addparam``
949 # make it an immutable tuple to force people through ``addparam``
950 return tuple(self._advisoryparams)
950 return tuple(self._advisoryparams)
951
951
952 def addparam(self, name, value='', mandatory=True):
952 def addparam(self, name, value='', mandatory=True):
953 """add a parameter to the part
953 """add a parameter to the part
954
954
955 If 'mandatory' is set to True, the remote handler must claim support
955 If 'mandatory' is set to True, the remote handler must claim support
956 for this parameter or the unbundling will be aborted.
956 for this parameter or the unbundling will be aborted.
957
957
958 The 'name' and 'value' cannot exceed 255 bytes each.
958 The 'name' and 'value' cannot exceed 255 bytes each.
959 """
959 """
960 if self._generated is not None:
960 if self._generated is not None:
961 raise error.ReadOnlyPartError('part is being generated')
961 raise error.ReadOnlyPartError('part is being generated')
962 if name in self._seenparams:
962 if name in self._seenparams:
963 raise ValueError('duplicated params: %s' % name)
963 raise ValueError('duplicated params: %s' % name)
964 self._seenparams.add(name)
964 self._seenparams.add(name)
965 params = self._advisoryparams
965 params = self._advisoryparams
966 if mandatory:
966 if mandatory:
967 params = self._mandatoryparams
967 params = self._mandatoryparams
968 params.append((name, value))
968 params.append((name, value))
969
969
970 # methods used to generates the bundle2 stream
970 # methods used to generates the bundle2 stream
971 def getchunks(self, ui):
971 def getchunks(self, ui):
972 if self._generated is not None:
972 if self._generated is not None:
973 raise error.ProgrammingError('part can only be consumed once')
973 raise error.ProgrammingError('part can only be consumed once')
974 self._generated = False
974 self._generated = False
975
975
976 if ui.debugflag:
976 if ui.debugflag:
977 msg = ['bundle2-output-part: "%s"' % self.type]
977 msg = ['bundle2-output-part: "%s"' % self.type]
978 if not self.mandatory:
978 if not self.mandatory:
979 msg.append(' (advisory)')
979 msg.append(' (advisory)')
980 nbmp = len(self.mandatoryparams)
980 nbmp = len(self.mandatoryparams)
981 nbap = len(self.advisoryparams)
981 nbap = len(self.advisoryparams)
982 if nbmp or nbap:
982 if nbmp or nbap:
983 msg.append(' (params:')
983 msg.append(' (params:')
984 if nbmp:
984 if nbmp:
985 msg.append(' %i mandatory' % nbmp)
985 msg.append(' %i mandatory' % nbmp)
986 if nbap:
986 if nbap:
987 msg.append(' %i advisory' % nbmp)
987 msg.append(' %i advisory' % nbmp)
988 msg.append(')')
988 msg.append(')')
989 if not self.data:
989 if not self.data:
990 msg.append(' empty payload')
990 msg.append(' empty payload')
991 elif util.safehasattr(self.data, 'next'):
991 elif util.safehasattr(self.data, 'next'):
992 msg.append(' streamed payload')
992 msg.append(' streamed payload')
993 else:
993 else:
994 msg.append(' %i bytes payload' % len(self.data))
994 msg.append(' %i bytes payload' % len(self.data))
995 msg.append('\n')
995 msg.append('\n')
996 ui.debug(''.join(msg))
996 ui.debug(''.join(msg))
997
997
998 #### header
998 #### header
999 if self.mandatory:
999 if self.mandatory:
1000 parttype = self.type.upper()
1000 parttype = self.type.upper()
1001 else:
1001 else:
1002 parttype = self.type.lower()
1002 parttype = self.type.lower()
1003 outdebug(ui, 'part %s: "%s"' % (self.id, parttype))
1003 outdebug(ui, 'part %s: "%s"' % (pycompat.bytestr(self.id), parttype))
1004 ## parttype
1004 ## parttype
1005 header = [_pack(_fparttypesize, len(parttype)),
1005 header = [_pack(_fparttypesize, len(parttype)),
1006 parttype, _pack(_fpartid, self.id),
1006 parttype, _pack(_fpartid, self.id),
1007 ]
1007 ]
1008 ## parameters
1008 ## parameters
1009 # count
1009 # count
1010 manpar = self.mandatoryparams
1010 manpar = self.mandatoryparams
1011 advpar = self.advisoryparams
1011 advpar = self.advisoryparams
1012 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
1012 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
1013 # size
1013 # size
1014 parsizes = []
1014 parsizes = []
1015 for key, value in manpar:
1015 for key, value in manpar:
1016 parsizes.append(len(key))
1016 parsizes.append(len(key))
1017 parsizes.append(len(value))
1017 parsizes.append(len(value))
1018 for key, value in advpar:
1018 for key, value in advpar:
1019 parsizes.append(len(key))
1019 parsizes.append(len(key))
1020 parsizes.append(len(value))
1020 parsizes.append(len(value))
1021 paramsizes = _pack(_makefpartparamsizes(len(parsizes) // 2), *parsizes)
1021 paramsizes = _pack(_makefpartparamsizes(len(parsizes) // 2), *parsizes)
1022 header.append(paramsizes)
1022 header.append(paramsizes)
1023 # key, value
1023 # key, value
1024 for key, value in manpar:
1024 for key, value in manpar:
1025 header.append(key)
1025 header.append(key)
1026 header.append(value)
1026 header.append(value)
1027 for key, value in advpar:
1027 for key, value in advpar:
1028 header.append(key)
1028 header.append(key)
1029 header.append(value)
1029 header.append(value)
1030 ## finalize header
1030 ## finalize header
1031 headerchunk = ''.join(header)
1031 headerchunk = ''.join(header)
1032 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
1032 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
1033 yield _pack(_fpartheadersize, len(headerchunk))
1033 yield _pack(_fpartheadersize, len(headerchunk))
1034 yield headerchunk
1034 yield headerchunk
1035 ## payload
1035 ## payload
1036 try:
1036 try:
1037 for chunk in self._payloadchunks():
1037 for chunk in self._payloadchunks():
1038 outdebug(ui, 'payload chunk size: %i' % len(chunk))
1038 outdebug(ui, 'payload chunk size: %i' % len(chunk))
1039 yield _pack(_fpayloadsize, len(chunk))
1039 yield _pack(_fpayloadsize, len(chunk))
1040 yield chunk
1040 yield chunk
1041 except GeneratorExit:
1041 except GeneratorExit:
1042 # GeneratorExit means that nobody is listening for our
1042 # GeneratorExit means that nobody is listening for our
1043 # results anyway, so just bail quickly rather than trying
1043 # results anyway, so just bail quickly rather than trying
1044 # to produce an error part.
1044 # to produce an error part.
1045 ui.debug('bundle2-generatorexit\n')
1045 ui.debug('bundle2-generatorexit\n')
1046 raise
1046 raise
1047 except BaseException as exc:
1047 except BaseException as exc:
1048 bexc = util.forcebytestr(exc)
1048 bexc = util.forcebytestr(exc)
1049 # backup exception data for later
1049 # backup exception data for later
1050 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
1050 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
1051 % bexc)
1051 % bexc)
1052 tb = sys.exc_info()[2]
1052 tb = sys.exc_info()[2]
1053 msg = 'unexpected error: %s' % bexc
1053 msg = 'unexpected error: %s' % bexc
1054 interpart = bundlepart('error:abort', [('message', msg)],
1054 interpart = bundlepart('error:abort', [('message', msg)],
1055 mandatory=False)
1055 mandatory=False)
1056 interpart.id = 0
1056 interpart.id = 0
1057 yield _pack(_fpayloadsize, -1)
1057 yield _pack(_fpayloadsize, -1)
1058 for chunk in interpart.getchunks(ui=ui):
1058 for chunk in interpart.getchunks(ui=ui):
1059 yield chunk
1059 yield chunk
1060 outdebug(ui, 'closing payload chunk')
1060 outdebug(ui, 'closing payload chunk')
1061 # abort current part payload
1061 # abort current part payload
1062 yield _pack(_fpayloadsize, 0)
1062 yield _pack(_fpayloadsize, 0)
1063 pycompat.raisewithtb(exc, tb)
1063 pycompat.raisewithtb(exc, tb)
1064 # end of payload
1064 # end of payload
1065 outdebug(ui, 'closing payload chunk')
1065 outdebug(ui, 'closing payload chunk')
1066 yield _pack(_fpayloadsize, 0)
1066 yield _pack(_fpayloadsize, 0)
1067 self._generated = True
1067 self._generated = True
1068
1068
1069 def _payloadchunks(self):
1069 def _payloadchunks(self):
1070 """yield chunks of a the part payload
1070 """yield chunks of a the part payload
1071
1071
1072 Exists to handle the different methods to provide data to a part."""
1072 Exists to handle the different methods to provide data to a part."""
1073 # we only support fixed size data now.
1073 # we only support fixed size data now.
1074 # This will be improved in the future.
1074 # This will be improved in the future.
1075 if (util.safehasattr(self.data, 'next')
1075 if (util.safehasattr(self.data, 'next')
1076 or util.safehasattr(self.data, '__next__')):
1076 or util.safehasattr(self.data, '__next__')):
1077 buff = util.chunkbuffer(self.data)
1077 buff = util.chunkbuffer(self.data)
1078 chunk = buff.read(preferedchunksize)
1078 chunk = buff.read(preferedchunksize)
1079 while chunk:
1079 while chunk:
1080 yield chunk
1080 yield chunk
1081 chunk = buff.read(preferedchunksize)
1081 chunk = buff.read(preferedchunksize)
1082 elif len(self.data):
1082 elif len(self.data):
1083 yield self.data
1083 yield self.data
1084
1084
1085
1085
1086 flaginterrupt = -1
1086 flaginterrupt = -1
1087
1087
1088 class interrupthandler(unpackermixin):
1088 class interrupthandler(unpackermixin):
1089 """read one part and process it with restricted capability
1089 """read one part and process it with restricted capability
1090
1090
1091 This allows to transmit exception raised on the producer size during part
1091 This allows to transmit exception raised on the producer size during part
1092 iteration while the consumer is reading a part.
1092 iteration while the consumer is reading a part.
1093
1093
1094 Part processed in this manner only have access to a ui object,"""
1094 Part processed in this manner only have access to a ui object,"""
1095
1095
1096 def __init__(self, ui, fp):
1096 def __init__(self, ui, fp):
1097 super(interrupthandler, self).__init__(fp)
1097 super(interrupthandler, self).__init__(fp)
1098 self.ui = ui
1098 self.ui = ui
1099
1099
1100 def _readpartheader(self):
1100 def _readpartheader(self):
1101 """reads a part header size and return the bytes blob
1101 """reads a part header size and return the bytes blob
1102
1102
1103 returns None if empty"""
1103 returns None if empty"""
1104 headersize = self._unpack(_fpartheadersize)[0]
1104 headersize = self._unpack(_fpartheadersize)[0]
1105 if headersize < 0:
1105 if headersize < 0:
1106 raise error.BundleValueError('negative part header size: %i'
1106 raise error.BundleValueError('negative part header size: %i'
1107 % headersize)
1107 % headersize)
1108 indebug(self.ui, 'part header size: %i\n' % headersize)
1108 indebug(self.ui, 'part header size: %i\n' % headersize)
1109 if headersize:
1109 if headersize:
1110 return self._readexact(headersize)
1110 return self._readexact(headersize)
1111 return None
1111 return None
1112
1112
1113 def __call__(self):
1113 def __call__(self):
1114
1114
1115 self.ui.debug('bundle2-input-stream-interrupt:'
1115 self.ui.debug('bundle2-input-stream-interrupt:'
1116 ' opening out of band context\n')
1116 ' opening out of band context\n')
1117 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1117 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1118 headerblock = self._readpartheader()
1118 headerblock = self._readpartheader()
1119 if headerblock is None:
1119 if headerblock is None:
1120 indebug(self.ui, 'no part found during interruption.')
1120 indebug(self.ui, 'no part found during interruption.')
1121 return
1121 return
1122 part = unbundlepart(self.ui, headerblock, self._fp)
1122 part = unbundlepart(self.ui, headerblock, self._fp)
1123 op = interruptoperation(self.ui)
1123 op = interruptoperation(self.ui)
1124 _processpart(op, part)
1124 _processpart(op, part)
1125 self.ui.debug('bundle2-input-stream-interrupt:'
1125 self.ui.debug('bundle2-input-stream-interrupt:'
1126 ' closing out of band context\n')
1126 ' closing out of band context\n')
1127
1127
1128 class interruptoperation(object):
1128 class interruptoperation(object):
1129 """A limited operation to be use by part handler during interruption
1129 """A limited operation to be use by part handler during interruption
1130
1130
1131 It only have access to an ui object.
1131 It only have access to an ui object.
1132 """
1132 """
1133
1133
1134 def __init__(self, ui):
1134 def __init__(self, ui):
1135 self.ui = ui
1135 self.ui = ui
1136 self.reply = None
1136 self.reply = None
1137 self.captureoutput = False
1137 self.captureoutput = False
1138
1138
1139 @property
1139 @property
1140 def repo(self):
1140 def repo(self):
1141 raise error.ProgrammingError('no repo access from stream interruption')
1141 raise error.ProgrammingError('no repo access from stream interruption')
1142
1142
1143 def gettransaction(self):
1143 def gettransaction(self):
1144 raise TransactionUnavailable('no repo access from stream interruption')
1144 raise TransactionUnavailable('no repo access from stream interruption')
1145
1145
1146 class unbundlepart(unpackermixin):
1146 class unbundlepart(unpackermixin):
1147 """a bundle part read from a bundle"""
1147 """a bundle part read from a bundle"""
1148
1148
1149 def __init__(self, ui, header, fp):
1149 def __init__(self, ui, header, fp):
1150 super(unbundlepart, self).__init__(fp)
1150 super(unbundlepart, self).__init__(fp)
1151 self._seekable = (util.safehasattr(fp, 'seek') and
1151 self._seekable = (util.safehasattr(fp, 'seek') and
1152 util.safehasattr(fp, 'tell'))
1152 util.safehasattr(fp, 'tell'))
1153 self.ui = ui
1153 self.ui = ui
1154 # unbundle state attr
1154 # unbundle state attr
1155 self._headerdata = header
1155 self._headerdata = header
1156 self._headeroffset = 0
1156 self._headeroffset = 0
1157 self._initialized = False
1157 self._initialized = False
1158 self.consumed = False
1158 self.consumed = False
1159 # part data
1159 # part data
1160 self.id = None
1160 self.id = None
1161 self.type = None
1161 self.type = None
1162 self.mandatoryparams = None
1162 self.mandatoryparams = None
1163 self.advisoryparams = None
1163 self.advisoryparams = None
1164 self.params = None
1164 self.params = None
1165 self.mandatorykeys = ()
1165 self.mandatorykeys = ()
1166 self._payloadstream = None
1166 self._payloadstream = None
1167 self._readheader()
1167 self._readheader()
1168 self._mandatory = None
1168 self._mandatory = None
1169 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1169 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1170 self._pos = 0
1170 self._pos = 0
1171
1171
1172 def _fromheader(self, size):
1172 def _fromheader(self, size):
1173 """return the next <size> byte from the header"""
1173 """return the next <size> byte from the header"""
1174 offset = self._headeroffset
1174 offset = self._headeroffset
1175 data = self._headerdata[offset:(offset + size)]
1175 data = self._headerdata[offset:(offset + size)]
1176 self._headeroffset = offset + size
1176 self._headeroffset = offset + size
1177 return data
1177 return data
1178
1178
1179 def _unpackheader(self, format):
1179 def _unpackheader(self, format):
1180 """read given format from header
1180 """read given format from header
1181
1181
1182 This automatically compute the size of the format to read."""
1182 This automatically compute the size of the format to read."""
1183 data = self._fromheader(struct.calcsize(format))
1183 data = self._fromheader(struct.calcsize(format))
1184 return _unpack(format, data)
1184 return _unpack(format, data)
1185
1185
1186 def _initparams(self, mandatoryparams, advisoryparams):
1186 def _initparams(self, mandatoryparams, advisoryparams):
1187 """internal function to setup all logic related parameters"""
1187 """internal function to setup all logic related parameters"""
1188 # make it read only to prevent people touching it by mistake.
1188 # make it read only to prevent people touching it by mistake.
1189 self.mandatoryparams = tuple(mandatoryparams)
1189 self.mandatoryparams = tuple(mandatoryparams)
1190 self.advisoryparams = tuple(advisoryparams)
1190 self.advisoryparams = tuple(advisoryparams)
1191 # user friendly UI
1191 # user friendly UI
1192 self.params = util.sortdict(self.mandatoryparams)
1192 self.params = util.sortdict(self.mandatoryparams)
1193 self.params.update(self.advisoryparams)
1193 self.params.update(self.advisoryparams)
1194 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1194 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1195
1195
1196 def _payloadchunks(self, chunknum=0):
1196 def _payloadchunks(self, chunknum=0):
1197 '''seek to specified chunk and start yielding data'''
1197 '''seek to specified chunk and start yielding data'''
1198 if len(self._chunkindex) == 0:
1198 if len(self._chunkindex) == 0:
1199 assert chunknum == 0, 'Must start with chunk 0'
1199 assert chunknum == 0, 'Must start with chunk 0'
1200 self._chunkindex.append((0, self._tellfp()))
1200 self._chunkindex.append((0, self._tellfp()))
1201 else:
1201 else:
1202 assert chunknum < len(self._chunkindex), \
1202 assert chunknum < len(self._chunkindex), \
1203 'Unknown chunk %d' % chunknum
1203 'Unknown chunk %d' % chunknum
1204 self._seekfp(self._chunkindex[chunknum][1])
1204 self._seekfp(self._chunkindex[chunknum][1])
1205
1205
1206 pos = self._chunkindex[chunknum][0]
1206 pos = self._chunkindex[chunknum][0]
1207 payloadsize = self._unpack(_fpayloadsize)[0]
1207 payloadsize = self._unpack(_fpayloadsize)[0]
1208 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1208 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1209 while payloadsize:
1209 while payloadsize:
1210 if payloadsize == flaginterrupt:
1210 if payloadsize == flaginterrupt:
1211 # interruption detection, the handler will now read a
1211 # interruption detection, the handler will now read a
1212 # single part and process it.
1212 # single part and process it.
1213 interrupthandler(self.ui, self._fp)()
1213 interrupthandler(self.ui, self._fp)()
1214 elif payloadsize < 0:
1214 elif payloadsize < 0:
1215 msg = 'negative payload chunk size: %i' % payloadsize
1215 msg = 'negative payload chunk size: %i' % payloadsize
1216 raise error.BundleValueError(msg)
1216 raise error.BundleValueError(msg)
1217 else:
1217 else:
1218 result = self._readexact(payloadsize)
1218 result = self._readexact(payloadsize)
1219 chunknum += 1
1219 chunknum += 1
1220 pos += payloadsize
1220 pos += payloadsize
1221 if chunknum == len(self._chunkindex):
1221 if chunknum == len(self._chunkindex):
1222 self._chunkindex.append((pos, self._tellfp()))
1222 self._chunkindex.append((pos, self._tellfp()))
1223 yield result
1223 yield result
1224 payloadsize = self._unpack(_fpayloadsize)[0]
1224 payloadsize = self._unpack(_fpayloadsize)[0]
1225 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1225 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1226
1226
1227 def _findchunk(self, pos):
1227 def _findchunk(self, pos):
1228 '''for a given payload position, return a chunk number and offset'''
1228 '''for a given payload position, return a chunk number and offset'''
1229 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1229 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1230 if ppos == pos:
1230 if ppos == pos:
1231 return chunk, 0
1231 return chunk, 0
1232 elif ppos > pos:
1232 elif ppos > pos:
1233 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1233 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1234 raise ValueError('Unknown chunk')
1234 raise ValueError('Unknown chunk')
1235
1235
1236 def _readheader(self):
1236 def _readheader(self):
1237 """read the header and setup the object"""
1237 """read the header and setup the object"""
1238 typesize = self._unpackheader(_fparttypesize)[0]
1238 typesize = self._unpackheader(_fparttypesize)[0]
1239 self.type = self._fromheader(typesize)
1239 self.type = self._fromheader(typesize)
1240 indebug(self.ui, 'part type: "%s"' % self.type)
1240 indebug(self.ui, 'part type: "%s"' % self.type)
1241 self.id = self._unpackheader(_fpartid)[0]
1241 self.id = self._unpackheader(_fpartid)[0]
1242 indebug(self.ui, 'part id: "%s"' % self.id)
1242 indebug(self.ui, 'part id: "%s"' % pycompat.bytestr(self.id))
1243 # extract mandatory bit from type
1243 # extract mandatory bit from type
1244 self.mandatory = (self.type != self.type.lower())
1244 self.mandatory = (self.type != self.type.lower())
1245 self.type = self.type.lower()
1245 self.type = self.type.lower()
1246 ## reading parameters
1246 ## reading parameters
1247 # param count
1247 # param count
1248 mancount, advcount = self._unpackheader(_fpartparamcount)
1248 mancount, advcount = self._unpackheader(_fpartparamcount)
1249 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1249 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1250 # param size
1250 # param size
1251 fparamsizes = _makefpartparamsizes(mancount + advcount)
1251 fparamsizes = _makefpartparamsizes(mancount + advcount)
1252 paramsizes = self._unpackheader(fparamsizes)
1252 paramsizes = self._unpackheader(fparamsizes)
1253 # make it a list of couple again
1253 # make it a list of couple again
1254 paramsizes = list(zip(paramsizes[::2], paramsizes[1::2]))
1254 paramsizes = list(zip(paramsizes[::2], paramsizes[1::2]))
1255 # split mandatory from advisory
1255 # split mandatory from advisory
1256 mansizes = paramsizes[:mancount]
1256 mansizes = paramsizes[:mancount]
1257 advsizes = paramsizes[mancount:]
1257 advsizes = paramsizes[mancount:]
1258 # retrieve param value
1258 # retrieve param value
1259 manparams = []
1259 manparams = []
1260 for key, value in mansizes:
1260 for key, value in mansizes:
1261 manparams.append((self._fromheader(key), self._fromheader(value)))
1261 manparams.append((self._fromheader(key), self._fromheader(value)))
1262 advparams = []
1262 advparams = []
1263 for key, value in advsizes:
1263 for key, value in advsizes:
1264 advparams.append((self._fromheader(key), self._fromheader(value)))
1264 advparams.append((self._fromheader(key), self._fromheader(value)))
1265 self._initparams(manparams, advparams)
1265 self._initparams(manparams, advparams)
1266 ## part payload
1266 ## part payload
1267 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1267 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1268 # we read the data, tell it
1268 # we read the data, tell it
1269 self._initialized = True
1269 self._initialized = True
1270
1270
1271 def read(self, size=None):
1271 def read(self, size=None):
1272 """read payload data"""
1272 """read payload data"""
1273 if not self._initialized:
1273 if not self._initialized:
1274 self._readheader()
1274 self._readheader()
1275 if size is None:
1275 if size is None:
1276 data = self._payloadstream.read()
1276 data = self._payloadstream.read()
1277 else:
1277 else:
1278 data = self._payloadstream.read(size)
1278 data = self._payloadstream.read(size)
1279 self._pos += len(data)
1279 self._pos += len(data)
1280 if size is None or len(data) < size:
1280 if size is None or len(data) < size:
1281 if not self.consumed and self._pos:
1281 if not self.consumed and self._pos:
1282 self.ui.debug('bundle2-input-part: total payload size %i\n'
1282 self.ui.debug('bundle2-input-part: total payload size %i\n'
1283 % self._pos)
1283 % self._pos)
1284 self.consumed = True
1284 self.consumed = True
1285 return data
1285 return data
1286
1286
1287 def tell(self):
1287 def tell(self):
1288 return self._pos
1288 return self._pos
1289
1289
1290 def seek(self, offset, whence=0):
1290 def seek(self, offset, whence=0):
1291 if whence == 0:
1291 if whence == 0:
1292 newpos = offset
1292 newpos = offset
1293 elif whence == 1:
1293 elif whence == 1:
1294 newpos = self._pos + offset
1294 newpos = self._pos + offset
1295 elif whence == 2:
1295 elif whence == 2:
1296 if not self.consumed:
1296 if not self.consumed:
1297 self.read()
1297 self.read()
1298 newpos = self._chunkindex[-1][0] - offset
1298 newpos = self._chunkindex[-1][0] - offset
1299 else:
1299 else:
1300 raise ValueError('Unknown whence value: %r' % (whence,))
1300 raise ValueError('Unknown whence value: %r' % (whence,))
1301
1301
1302 if newpos > self._chunkindex[-1][0] and not self.consumed:
1302 if newpos > self._chunkindex[-1][0] and not self.consumed:
1303 self.read()
1303 self.read()
1304 if not 0 <= newpos <= self._chunkindex[-1][0]:
1304 if not 0 <= newpos <= self._chunkindex[-1][0]:
1305 raise ValueError('Offset out of range')
1305 raise ValueError('Offset out of range')
1306
1306
1307 if self._pos != newpos:
1307 if self._pos != newpos:
1308 chunk, internaloffset = self._findchunk(newpos)
1308 chunk, internaloffset = self._findchunk(newpos)
1309 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1309 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1310 adjust = self.read(internaloffset)
1310 adjust = self.read(internaloffset)
1311 if len(adjust) != internaloffset:
1311 if len(adjust) != internaloffset:
1312 raise error.Abort(_('Seek failed\n'))
1312 raise error.Abort(_('Seek failed\n'))
1313 self._pos = newpos
1313 self._pos = newpos
1314
1314
1315 def _seekfp(self, offset, whence=0):
1315 def _seekfp(self, offset, whence=0):
1316 """move the underlying file pointer
1316 """move the underlying file pointer
1317
1317
1318 This method is meant for internal usage by the bundle2 protocol only.
1318 This method is meant for internal usage by the bundle2 protocol only.
1319 They directly manipulate the low level stream including bundle2 level
1319 They directly manipulate the low level stream including bundle2 level
1320 instruction.
1320 instruction.
1321
1321
1322 Do not use it to implement higher-level logic or methods."""
1322 Do not use it to implement higher-level logic or methods."""
1323 if self._seekable:
1323 if self._seekable:
1324 return self._fp.seek(offset, whence)
1324 return self._fp.seek(offset, whence)
1325 else:
1325 else:
1326 raise NotImplementedError(_('File pointer is not seekable'))
1326 raise NotImplementedError(_('File pointer is not seekable'))
1327
1327
1328 def _tellfp(self):
1328 def _tellfp(self):
1329 """return the file offset, or None if file is not seekable
1329 """return the file offset, or None if file is not seekable
1330
1330
1331 This method is meant for internal usage by the bundle2 protocol only.
1331 This method is meant for internal usage by the bundle2 protocol only.
1332 They directly manipulate the low level stream including bundle2 level
1332 They directly manipulate the low level stream including bundle2 level
1333 instruction.
1333 instruction.
1334
1334
1335 Do not use it to implement higher-level logic or methods."""
1335 Do not use it to implement higher-level logic or methods."""
1336 if self._seekable:
1336 if self._seekable:
1337 try:
1337 try:
1338 return self._fp.tell()
1338 return self._fp.tell()
1339 except IOError as e:
1339 except IOError as e:
1340 if e.errno == errno.ESPIPE:
1340 if e.errno == errno.ESPIPE:
1341 self._seekable = False
1341 self._seekable = False
1342 else:
1342 else:
1343 raise
1343 raise
1344 return None
1344 return None
1345
1345
1346 # These are only the static capabilities.
1346 # These are only the static capabilities.
1347 # Check the 'getrepocaps' function for the rest.
1347 # Check the 'getrepocaps' function for the rest.
1348 capabilities = {'HG20': (),
1348 capabilities = {'HG20': (),
1349 'error': ('abort', 'unsupportedcontent', 'pushraced',
1349 'error': ('abort', 'unsupportedcontent', 'pushraced',
1350 'pushkey'),
1350 'pushkey'),
1351 'listkeys': (),
1351 'listkeys': (),
1352 'pushkey': (),
1352 'pushkey': (),
1353 'digests': tuple(sorted(util.DIGESTS.keys())),
1353 'digests': tuple(sorted(util.DIGESTS.keys())),
1354 'remote-changegroup': ('http', 'https'),
1354 'remote-changegroup': ('http', 'https'),
1355 'hgtagsfnodes': (),
1355 'hgtagsfnodes': (),
1356 }
1356 }
1357
1357
1358 def getrepocaps(repo, allowpushback=False):
1358 def getrepocaps(repo, allowpushback=False):
1359 """return the bundle2 capabilities for a given repo
1359 """return the bundle2 capabilities for a given repo
1360
1360
1361 Exists to allow extensions (like evolution) to mutate the capabilities.
1361 Exists to allow extensions (like evolution) to mutate the capabilities.
1362 """
1362 """
1363 caps = capabilities.copy()
1363 caps = capabilities.copy()
1364 caps['changegroup'] = tuple(sorted(
1364 caps['changegroup'] = tuple(sorted(
1365 changegroup.supportedincomingversions(repo)))
1365 changegroup.supportedincomingversions(repo)))
1366 if obsolete.isenabled(repo, obsolete.exchangeopt):
1366 if obsolete.isenabled(repo, obsolete.exchangeopt):
1367 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1367 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1368 caps['obsmarkers'] = supportedformat
1368 caps['obsmarkers'] = supportedformat
1369 if allowpushback:
1369 if allowpushback:
1370 caps['pushback'] = ()
1370 caps['pushback'] = ()
1371 cpmode = repo.ui.config('server', 'concurrent-push-mode')
1371 cpmode = repo.ui.config('server', 'concurrent-push-mode')
1372 if cpmode == 'check-related':
1372 if cpmode == 'check-related':
1373 caps['checkheads'] = ('related',)
1373 caps['checkheads'] = ('related',)
1374 return caps
1374 return caps
1375
1375
1376 def bundle2caps(remote):
1376 def bundle2caps(remote):
1377 """return the bundle capabilities of a peer as dict"""
1377 """return the bundle capabilities of a peer as dict"""
1378 raw = remote.capable('bundle2')
1378 raw = remote.capable('bundle2')
1379 if not raw and raw != '':
1379 if not raw and raw != '':
1380 return {}
1380 return {}
1381 capsblob = urlreq.unquote(remote.capable('bundle2'))
1381 capsblob = urlreq.unquote(remote.capable('bundle2'))
1382 return decodecaps(capsblob)
1382 return decodecaps(capsblob)
1383
1383
1384 def obsmarkersversion(caps):
1384 def obsmarkersversion(caps):
1385 """extract the list of supported obsmarkers versions from a bundle2caps dict
1385 """extract the list of supported obsmarkers versions from a bundle2caps dict
1386 """
1386 """
1387 obscaps = caps.get('obsmarkers', ())
1387 obscaps = caps.get('obsmarkers', ())
1388 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1388 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1389
1389
1390 def writenewbundle(ui, repo, source, filename, bundletype, outgoing, opts,
1390 def writenewbundle(ui, repo, source, filename, bundletype, outgoing, opts,
1391 vfs=None, compression=None, compopts=None):
1391 vfs=None, compression=None, compopts=None):
1392 if bundletype.startswith('HG10'):
1392 if bundletype.startswith('HG10'):
1393 cg = changegroup.getchangegroup(repo, source, outgoing, version='01')
1393 cg = changegroup.getchangegroup(repo, source, outgoing, version='01')
1394 return writebundle(ui, cg, filename, bundletype, vfs=vfs,
1394 return writebundle(ui, cg, filename, bundletype, vfs=vfs,
1395 compression=compression, compopts=compopts)
1395 compression=compression, compopts=compopts)
1396 elif not bundletype.startswith('HG20'):
1396 elif not bundletype.startswith('HG20'):
1397 raise error.ProgrammingError('unknown bundle type: %s' % bundletype)
1397 raise error.ProgrammingError('unknown bundle type: %s' % bundletype)
1398
1398
1399 caps = {}
1399 caps = {}
1400 if 'obsolescence' in opts:
1400 if 'obsolescence' in opts:
1401 caps['obsmarkers'] = ('V1',)
1401 caps['obsmarkers'] = ('V1',)
1402 bundle = bundle20(ui, caps)
1402 bundle = bundle20(ui, caps)
1403 bundle.setcompression(compression, compopts)
1403 bundle.setcompression(compression, compopts)
1404 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1404 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1405 chunkiter = bundle.getchunks()
1405 chunkiter = bundle.getchunks()
1406
1406
1407 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1407 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1408
1408
1409 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1409 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1410 # We should eventually reconcile this logic with the one behind
1410 # We should eventually reconcile this logic with the one behind
1411 # 'exchange.getbundle2partsgenerator'.
1411 # 'exchange.getbundle2partsgenerator'.
1412 #
1412 #
1413 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1413 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1414 # different right now. So we keep them separated for now for the sake of
1414 # different right now. So we keep them separated for now for the sake of
1415 # simplicity.
1415 # simplicity.
1416
1416
1417 # we always want a changegroup in such bundle
1417 # we always want a changegroup in such bundle
1418 cgversion = opts.get('cg.version')
1418 cgversion = opts.get('cg.version')
1419 if cgversion is None:
1419 if cgversion is None:
1420 cgversion = changegroup.safeversion(repo)
1420 cgversion = changegroup.safeversion(repo)
1421 cg = changegroup.getchangegroup(repo, source, outgoing,
1421 cg = changegroup.getchangegroup(repo, source, outgoing,
1422 version=cgversion)
1422 version=cgversion)
1423 part = bundler.newpart('changegroup', data=cg.getchunks())
1423 part = bundler.newpart('changegroup', data=cg.getchunks())
1424 part.addparam('version', cg.version)
1424 part.addparam('version', cg.version)
1425 if 'clcount' in cg.extras:
1425 if 'clcount' in cg.extras:
1426 part.addparam('nbchanges', str(cg.extras['clcount']),
1426 part.addparam('nbchanges', str(cg.extras['clcount']),
1427 mandatory=False)
1427 mandatory=False)
1428 if opts.get('phases') and repo.revs('%ln and secret()',
1428 if opts.get('phases') and repo.revs('%ln and secret()',
1429 outgoing.missingheads):
1429 outgoing.missingheads):
1430 part.addparam('targetphase', '%d' % phases.secret, mandatory=False)
1430 part.addparam('targetphase', '%d' % phases.secret, mandatory=False)
1431
1431
1432 addparttagsfnodescache(repo, bundler, outgoing)
1432 addparttagsfnodescache(repo, bundler, outgoing)
1433
1433
1434 if opts.get('obsolescence', False):
1434 if opts.get('obsolescence', False):
1435 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1435 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1436 buildobsmarkerspart(bundler, obsmarkers)
1436 buildobsmarkerspart(bundler, obsmarkers)
1437
1437
1438 if opts.get('phases', False):
1438 if opts.get('phases', False):
1439 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1439 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1440 phasedata = []
1440 phasedata = []
1441 for phase in phases.allphases:
1441 for phase in phases.allphases:
1442 for head in headsbyphase[phase]:
1442 for head in headsbyphase[phase]:
1443 phasedata.append(_pack(_fphasesentry, phase, head))
1443 phasedata.append(_pack(_fphasesentry, phase, head))
1444 bundler.newpart('phase-heads', data=''.join(phasedata))
1444 bundler.newpart('phase-heads', data=''.join(phasedata))
1445
1445
1446 def addparttagsfnodescache(repo, bundler, outgoing):
1446 def addparttagsfnodescache(repo, bundler, outgoing):
1447 # we include the tags fnode cache for the bundle changeset
1447 # we include the tags fnode cache for the bundle changeset
1448 # (as an optional parts)
1448 # (as an optional parts)
1449 cache = tags.hgtagsfnodescache(repo.unfiltered())
1449 cache = tags.hgtagsfnodescache(repo.unfiltered())
1450 chunks = []
1450 chunks = []
1451
1451
1452 # .hgtags fnodes are only relevant for head changesets. While we could
1452 # .hgtags fnodes are only relevant for head changesets. While we could
1453 # transfer values for all known nodes, there will likely be little to
1453 # transfer values for all known nodes, there will likely be little to
1454 # no benefit.
1454 # no benefit.
1455 #
1455 #
1456 # We don't bother using a generator to produce output data because
1456 # We don't bother using a generator to produce output data because
1457 # a) we only have 40 bytes per head and even esoteric numbers of heads
1457 # a) we only have 40 bytes per head and even esoteric numbers of heads
1458 # consume little memory (1M heads is 40MB) b) we don't want to send the
1458 # consume little memory (1M heads is 40MB) b) we don't want to send the
1459 # part if we don't have entries and knowing if we have entries requires
1459 # part if we don't have entries and knowing if we have entries requires
1460 # cache lookups.
1460 # cache lookups.
1461 for node in outgoing.missingheads:
1461 for node in outgoing.missingheads:
1462 # Don't compute missing, as this may slow down serving.
1462 # Don't compute missing, as this may slow down serving.
1463 fnode = cache.getfnode(node, computemissing=False)
1463 fnode = cache.getfnode(node, computemissing=False)
1464 if fnode is not None:
1464 if fnode is not None:
1465 chunks.extend([node, fnode])
1465 chunks.extend([node, fnode])
1466
1466
1467 if chunks:
1467 if chunks:
1468 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1468 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1469
1469
1470 def buildobsmarkerspart(bundler, markers):
1470 def buildobsmarkerspart(bundler, markers):
1471 """add an obsmarker part to the bundler with <markers>
1471 """add an obsmarker part to the bundler with <markers>
1472
1472
1473 No part is created if markers is empty.
1473 No part is created if markers is empty.
1474 Raises ValueError if the bundler doesn't support any known obsmarker format.
1474 Raises ValueError if the bundler doesn't support any known obsmarker format.
1475 """
1475 """
1476 if not markers:
1476 if not markers:
1477 return None
1477 return None
1478
1478
1479 remoteversions = obsmarkersversion(bundler.capabilities)
1479 remoteversions = obsmarkersversion(bundler.capabilities)
1480 version = obsolete.commonversion(remoteversions)
1480 version = obsolete.commonversion(remoteversions)
1481 if version is None:
1481 if version is None:
1482 raise ValueError('bundler does not support common obsmarker format')
1482 raise ValueError('bundler does not support common obsmarker format')
1483 stream = obsolete.encodemarkers(markers, True, version=version)
1483 stream = obsolete.encodemarkers(markers, True, version=version)
1484 return bundler.newpart('obsmarkers', data=stream)
1484 return bundler.newpart('obsmarkers', data=stream)
1485
1485
1486 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1486 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1487 compopts=None):
1487 compopts=None):
1488 """Write a bundle file and return its filename.
1488 """Write a bundle file and return its filename.
1489
1489
1490 Existing files will not be overwritten.
1490 Existing files will not be overwritten.
1491 If no filename is specified, a temporary file is created.
1491 If no filename is specified, a temporary file is created.
1492 bz2 compression can be turned off.
1492 bz2 compression can be turned off.
1493 The bundle file will be deleted in case of errors.
1493 The bundle file will be deleted in case of errors.
1494 """
1494 """
1495
1495
1496 if bundletype == "HG20":
1496 if bundletype == "HG20":
1497 bundle = bundle20(ui)
1497 bundle = bundle20(ui)
1498 bundle.setcompression(compression, compopts)
1498 bundle.setcompression(compression, compopts)
1499 part = bundle.newpart('changegroup', data=cg.getchunks())
1499 part = bundle.newpart('changegroup', data=cg.getchunks())
1500 part.addparam('version', cg.version)
1500 part.addparam('version', cg.version)
1501 if 'clcount' in cg.extras:
1501 if 'clcount' in cg.extras:
1502 part.addparam('nbchanges', str(cg.extras['clcount']),
1502 part.addparam('nbchanges', str(cg.extras['clcount']),
1503 mandatory=False)
1503 mandatory=False)
1504 chunkiter = bundle.getchunks()
1504 chunkiter = bundle.getchunks()
1505 else:
1505 else:
1506 # compression argument is only for the bundle2 case
1506 # compression argument is only for the bundle2 case
1507 assert compression is None
1507 assert compression is None
1508 if cg.version != '01':
1508 if cg.version != '01':
1509 raise error.Abort(_('old bundle types only supports v1 '
1509 raise error.Abort(_('old bundle types only supports v1 '
1510 'changegroups'))
1510 'changegroups'))
1511 header, comp = bundletypes[bundletype]
1511 header, comp = bundletypes[bundletype]
1512 if comp not in util.compengines.supportedbundletypes:
1512 if comp not in util.compengines.supportedbundletypes:
1513 raise error.Abort(_('unknown stream compression type: %s')
1513 raise error.Abort(_('unknown stream compression type: %s')
1514 % comp)
1514 % comp)
1515 compengine = util.compengines.forbundletype(comp)
1515 compengine = util.compengines.forbundletype(comp)
1516 def chunkiter():
1516 def chunkiter():
1517 yield header
1517 yield header
1518 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1518 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1519 yield chunk
1519 yield chunk
1520 chunkiter = chunkiter()
1520 chunkiter = chunkiter()
1521
1521
1522 # parse the changegroup data, otherwise we will block
1522 # parse the changegroup data, otherwise we will block
1523 # in case of sshrepo because we don't know the end of the stream
1523 # in case of sshrepo because we don't know the end of the stream
1524 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1524 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1525
1525
1526 def combinechangegroupresults(op):
1526 def combinechangegroupresults(op):
1527 """logic to combine 0 or more addchangegroup results into one"""
1527 """logic to combine 0 or more addchangegroup results into one"""
1528 results = [r.get('return', 0)
1528 results = [r.get('return', 0)
1529 for r in op.records['changegroup']]
1529 for r in op.records['changegroup']]
1530 changedheads = 0
1530 changedheads = 0
1531 result = 1
1531 result = 1
1532 for ret in results:
1532 for ret in results:
1533 # If any changegroup result is 0, return 0
1533 # If any changegroup result is 0, return 0
1534 if ret == 0:
1534 if ret == 0:
1535 result = 0
1535 result = 0
1536 break
1536 break
1537 if ret < -1:
1537 if ret < -1:
1538 changedheads += ret + 1
1538 changedheads += ret + 1
1539 elif ret > 1:
1539 elif ret > 1:
1540 changedheads += ret - 1
1540 changedheads += ret - 1
1541 if changedheads > 0:
1541 if changedheads > 0:
1542 result = 1 + changedheads
1542 result = 1 + changedheads
1543 elif changedheads < 0:
1543 elif changedheads < 0:
1544 result = -1 + changedheads
1544 result = -1 + changedheads
1545 return result
1545 return result
1546
1546
1547 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest',
1547 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest',
1548 'targetphase'))
1548 'targetphase'))
1549 def handlechangegroup(op, inpart):
1549 def handlechangegroup(op, inpart):
1550 """apply a changegroup part on the repo
1550 """apply a changegroup part on the repo
1551
1551
1552 This is a very early implementation that will massive rework before being
1552 This is a very early implementation that will massive rework before being
1553 inflicted to any end-user.
1553 inflicted to any end-user.
1554 """
1554 """
1555 tr = op.gettransaction()
1555 tr = op.gettransaction()
1556 unpackerversion = inpart.params.get('version', '01')
1556 unpackerversion = inpart.params.get('version', '01')
1557 # We should raise an appropriate exception here
1557 # We should raise an appropriate exception here
1558 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1558 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1559 # the source and url passed here are overwritten by the one contained in
1559 # the source and url passed here are overwritten by the one contained in
1560 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1560 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1561 nbchangesets = None
1561 nbchangesets = None
1562 if 'nbchanges' in inpart.params:
1562 if 'nbchanges' in inpart.params:
1563 nbchangesets = int(inpart.params.get('nbchanges'))
1563 nbchangesets = int(inpart.params.get('nbchanges'))
1564 if ('treemanifest' in inpart.params and
1564 if ('treemanifest' in inpart.params and
1565 'treemanifest' not in op.repo.requirements):
1565 'treemanifest' not in op.repo.requirements):
1566 if len(op.repo.changelog) != 0:
1566 if len(op.repo.changelog) != 0:
1567 raise error.Abort(_(
1567 raise error.Abort(_(
1568 "bundle contains tree manifests, but local repo is "
1568 "bundle contains tree manifests, but local repo is "
1569 "non-empty and does not use tree manifests"))
1569 "non-empty and does not use tree manifests"))
1570 op.repo.requirements.add('treemanifest')
1570 op.repo.requirements.add('treemanifest')
1571 op.repo._applyopenerreqs()
1571 op.repo._applyopenerreqs()
1572 op.repo._writerequirements()
1572 op.repo._writerequirements()
1573 extrakwargs = {}
1573 extrakwargs = {}
1574 targetphase = inpart.params.get('targetphase')
1574 targetphase = inpart.params.get('targetphase')
1575 if targetphase is not None:
1575 if targetphase is not None:
1576 extrakwargs['targetphase'] = int(targetphase)
1576 extrakwargs['targetphase'] = int(targetphase)
1577 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2',
1577 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2',
1578 expectedtotal=nbchangesets, **extrakwargs)
1578 expectedtotal=nbchangesets, **extrakwargs)
1579 if op.reply is not None:
1579 if op.reply is not None:
1580 # This is definitely not the final form of this
1580 # This is definitely not the final form of this
1581 # return. But one need to start somewhere.
1581 # return. But one need to start somewhere.
1582 part = op.reply.newpart('reply:changegroup', mandatory=False)
1582 part = op.reply.newpart('reply:changegroup', mandatory=False)
1583 part.addparam(
1583 part.addparam(
1584 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1584 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1585 part.addparam('return', '%i' % ret, mandatory=False)
1585 part.addparam('return', '%i' % ret, mandatory=False)
1586 assert not inpart.read()
1586 assert not inpart.read()
1587
1587
1588 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1588 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1589 ['digest:%s' % k for k in util.DIGESTS.keys()])
1589 ['digest:%s' % k for k in util.DIGESTS.keys()])
1590 @parthandler('remote-changegroup', _remotechangegroupparams)
1590 @parthandler('remote-changegroup', _remotechangegroupparams)
1591 def handleremotechangegroup(op, inpart):
1591 def handleremotechangegroup(op, inpart):
1592 """apply a bundle10 on the repo, given an url and validation information
1592 """apply a bundle10 on the repo, given an url and validation information
1593
1593
1594 All the information about the remote bundle to import are given as
1594 All the information about the remote bundle to import are given as
1595 parameters. The parameters include:
1595 parameters. The parameters include:
1596 - url: the url to the bundle10.
1596 - url: the url to the bundle10.
1597 - size: the bundle10 file size. It is used to validate what was
1597 - size: the bundle10 file size. It is used to validate what was
1598 retrieved by the client matches the server knowledge about the bundle.
1598 retrieved by the client matches the server knowledge about the bundle.
1599 - digests: a space separated list of the digest types provided as
1599 - digests: a space separated list of the digest types provided as
1600 parameters.
1600 parameters.
1601 - digest:<digest-type>: the hexadecimal representation of the digest with
1601 - digest:<digest-type>: the hexadecimal representation of the digest with
1602 that name. Like the size, it is used to validate what was retrieved by
1602 that name. Like the size, it is used to validate what was retrieved by
1603 the client matches what the server knows about the bundle.
1603 the client matches what the server knows about the bundle.
1604
1604
1605 When multiple digest types are given, all of them are checked.
1605 When multiple digest types are given, all of them are checked.
1606 """
1606 """
1607 try:
1607 try:
1608 raw_url = inpart.params['url']
1608 raw_url = inpart.params['url']
1609 except KeyError:
1609 except KeyError:
1610 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1610 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1611 parsed_url = util.url(raw_url)
1611 parsed_url = util.url(raw_url)
1612 if parsed_url.scheme not in capabilities['remote-changegroup']:
1612 if parsed_url.scheme not in capabilities['remote-changegroup']:
1613 raise error.Abort(_('remote-changegroup does not support %s urls') %
1613 raise error.Abort(_('remote-changegroup does not support %s urls') %
1614 parsed_url.scheme)
1614 parsed_url.scheme)
1615
1615
1616 try:
1616 try:
1617 size = int(inpart.params['size'])
1617 size = int(inpart.params['size'])
1618 except ValueError:
1618 except ValueError:
1619 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1619 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1620 % 'size')
1620 % 'size')
1621 except KeyError:
1621 except KeyError:
1622 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1622 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1623
1623
1624 digests = {}
1624 digests = {}
1625 for typ in inpart.params.get('digests', '').split():
1625 for typ in inpart.params.get('digests', '').split():
1626 param = 'digest:%s' % typ
1626 param = 'digest:%s' % typ
1627 try:
1627 try:
1628 value = inpart.params[param]
1628 value = inpart.params[param]
1629 except KeyError:
1629 except KeyError:
1630 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1630 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1631 param)
1631 param)
1632 digests[typ] = value
1632 digests[typ] = value
1633
1633
1634 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1634 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1635
1635
1636 tr = op.gettransaction()
1636 tr = op.gettransaction()
1637 from . import exchange
1637 from . import exchange
1638 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1638 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1639 if not isinstance(cg, changegroup.cg1unpacker):
1639 if not isinstance(cg, changegroup.cg1unpacker):
1640 raise error.Abort(_('%s: not a bundle version 1.0') %
1640 raise error.Abort(_('%s: not a bundle version 1.0') %
1641 util.hidepassword(raw_url))
1641 util.hidepassword(raw_url))
1642 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2')
1642 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2')
1643 if op.reply is not None:
1643 if op.reply is not None:
1644 # This is definitely not the final form of this
1644 # This is definitely not the final form of this
1645 # return. But one need to start somewhere.
1645 # return. But one need to start somewhere.
1646 part = op.reply.newpart('reply:changegroup')
1646 part = op.reply.newpart('reply:changegroup')
1647 part.addparam(
1647 part.addparam(
1648 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1648 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1649 part.addparam('return', '%i' % ret, mandatory=False)
1649 part.addparam('return', '%i' % ret, mandatory=False)
1650 try:
1650 try:
1651 real_part.validate()
1651 real_part.validate()
1652 except error.Abort as e:
1652 except error.Abort as e:
1653 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1653 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1654 (util.hidepassword(raw_url), str(e)))
1654 (util.hidepassword(raw_url), str(e)))
1655 assert not inpart.read()
1655 assert not inpart.read()
1656
1656
1657 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1657 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1658 def handlereplychangegroup(op, inpart):
1658 def handlereplychangegroup(op, inpart):
1659 ret = int(inpart.params['return'])
1659 ret = int(inpart.params['return'])
1660 replyto = int(inpart.params['in-reply-to'])
1660 replyto = int(inpart.params['in-reply-to'])
1661 op.records.add('changegroup', {'return': ret}, replyto)
1661 op.records.add('changegroup', {'return': ret}, replyto)
1662
1662
1663 @parthandler('check:heads')
1663 @parthandler('check:heads')
1664 def handlecheckheads(op, inpart):
1664 def handlecheckheads(op, inpart):
1665 """check that head of the repo did not change
1665 """check that head of the repo did not change
1666
1666
1667 This is used to detect a push race when using unbundle.
1667 This is used to detect a push race when using unbundle.
1668 This replaces the "heads" argument of unbundle."""
1668 This replaces the "heads" argument of unbundle."""
1669 h = inpart.read(20)
1669 h = inpart.read(20)
1670 heads = []
1670 heads = []
1671 while len(h) == 20:
1671 while len(h) == 20:
1672 heads.append(h)
1672 heads.append(h)
1673 h = inpart.read(20)
1673 h = inpart.read(20)
1674 assert not h
1674 assert not h
1675 # Trigger a transaction so that we are guaranteed to have the lock now.
1675 # Trigger a transaction so that we are guaranteed to have the lock now.
1676 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1676 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1677 op.gettransaction()
1677 op.gettransaction()
1678 if sorted(heads) != sorted(op.repo.heads()):
1678 if sorted(heads) != sorted(op.repo.heads()):
1679 raise error.PushRaced('repository changed while pushing - '
1679 raise error.PushRaced('repository changed while pushing - '
1680 'please try again')
1680 'please try again')
1681
1681
1682 @parthandler('check:updated-heads')
1682 @parthandler('check:updated-heads')
1683 def handlecheckupdatedheads(op, inpart):
1683 def handlecheckupdatedheads(op, inpart):
1684 """check for race on the heads touched by a push
1684 """check for race on the heads touched by a push
1685
1685
1686 This is similar to 'check:heads' but focus on the heads actually updated
1686 This is similar to 'check:heads' but focus on the heads actually updated
1687 during the push. If other activities happen on unrelated heads, it is
1687 during the push. If other activities happen on unrelated heads, it is
1688 ignored.
1688 ignored.
1689
1689
1690 This allow server with high traffic to avoid push contention as long as
1690 This allow server with high traffic to avoid push contention as long as
1691 unrelated parts of the graph are involved."""
1691 unrelated parts of the graph are involved."""
1692 h = inpart.read(20)
1692 h = inpart.read(20)
1693 heads = []
1693 heads = []
1694 while len(h) == 20:
1694 while len(h) == 20:
1695 heads.append(h)
1695 heads.append(h)
1696 h = inpart.read(20)
1696 h = inpart.read(20)
1697 assert not h
1697 assert not h
1698 # trigger a transaction so that we are guaranteed to have the lock now.
1698 # trigger a transaction so that we are guaranteed to have the lock now.
1699 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1699 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1700 op.gettransaction()
1700 op.gettransaction()
1701
1701
1702 currentheads = set()
1702 currentheads = set()
1703 for ls in op.repo.branchmap().itervalues():
1703 for ls in op.repo.branchmap().itervalues():
1704 currentheads.update(ls)
1704 currentheads.update(ls)
1705
1705
1706 for h in heads:
1706 for h in heads:
1707 if h not in currentheads:
1707 if h not in currentheads:
1708 raise error.PushRaced('repository changed while pushing - '
1708 raise error.PushRaced('repository changed while pushing - '
1709 'please try again')
1709 'please try again')
1710
1710
1711 @parthandler('output')
1711 @parthandler('output')
1712 def handleoutput(op, inpart):
1712 def handleoutput(op, inpart):
1713 """forward output captured on the server to the client"""
1713 """forward output captured on the server to the client"""
1714 for line in inpart.read().splitlines():
1714 for line in inpart.read().splitlines():
1715 op.ui.status(_('remote: %s\n') % line)
1715 op.ui.status(_('remote: %s\n') % line)
1716
1716
1717 @parthandler('replycaps')
1717 @parthandler('replycaps')
1718 def handlereplycaps(op, inpart):
1718 def handlereplycaps(op, inpart):
1719 """Notify that a reply bundle should be created
1719 """Notify that a reply bundle should be created
1720
1720
1721 The payload contains the capabilities information for the reply"""
1721 The payload contains the capabilities information for the reply"""
1722 caps = decodecaps(inpart.read())
1722 caps = decodecaps(inpart.read())
1723 if op.reply is None:
1723 if op.reply is None:
1724 op.reply = bundle20(op.ui, caps)
1724 op.reply = bundle20(op.ui, caps)
1725
1725
1726 class AbortFromPart(error.Abort):
1726 class AbortFromPart(error.Abort):
1727 """Sub-class of Abort that denotes an error from a bundle2 part."""
1727 """Sub-class of Abort that denotes an error from a bundle2 part."""
1728
1728
1729 @parthandler('error:abort', ('message', 'hint'))
1729 @parthandler('error:abort', ('message', 'hint'))
1730 def handleerrorabort(op, inpart):
1730 def handleerrorabort(op, inpart):
1731 """Used to transmit abort error over the wire"""
1731 """Used to transmit abort error over the wire"""
1732 raise AbortFromPart(inpart.params['message'],
1732 raise AbortFromPart(inpart.params['message'],
1733 hint=inpart.params.get('hint'))
1733 hint=inpart.params.get('hint'))
1734
1734
1735 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1735 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1736 'in-reply-to'))
1736 'in-reply-to'))
1737 def handleerrorpushkey(op, inpart):
1737 def handleerrorpushkey(op, inpart):
1738 """Used to transmit failure of a mandatory pushkey over the wire"""
1738 """Used to transmit failure of a mandatory pushkey over the wire"""
1739 kwargs = {}
1739 kwargs = {}
1740 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1740 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1741 value = inpart.params.get(name)
1741 value = inpart.params.get(name)
1742 if value is not None:
1742 if value is not None:
1743 kwargs[name] = value
1743 kwargs[name] = value
1744 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1744 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1745
1745
1746 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1746 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1747 def handleerrorunsupportedcontent(op, inpart):
1747 def handleerrorunsupportedcontent(op, inpart):
1748 """Used to transmit unknown content error over the wire"""
1748 """Used to transmit unknown content error over the wire"""
1749 kwargs = {}
1749 kwargs = {}
1750 parttype = inpart.params.get('parttype')
1750 parttype = inpart.params.get('parttype')
1751 if parttype is not None:
1751 if parttype is not None:
1752 kwargs['parttype'] = parttype
1752 kwargs['parttype'] = parttype
1753 params = inpart.params.get('params')
1753 params = inpart.params.get('params')
1754 if params is not None:
1754 if params is not None:
1755 kwargs['params'] = params.split('\0')
1755 kwargs['params'] = params.split('\0')
1756
1756
1757 raise error.BundleUnknownFeatureError(**kwargs)
1757 raise error.BundleUnknownFeatureError(**kwargs)
1758
1758
1759 @parthandler('error:pushraced', ('message',))
1759 @parthandler('error:pushraced', ('message',))
1760 def handleerrorpushraced(op, inpart):
1760 def handleerrorpushraced(op, inpart):
1761 """Used to transmit push race error over the wire"""
1761 """Used to transmit push race error over the wire"""
1762 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1762 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1763
1763
1764 @parthandler('listkeys', ('namespace',))
1764 @parthandler('listkeys', ('namespace',))
1765 def handlelistkeys(op, inpart):
1765 def handlelistkeys(op, inpart):
1766 """retrieve pushkey namespace content stored in a bundle2"""
1766 """retrieve pushkey namespace content stored in a bundle2"""
1767 namespace = inpart.params['namespace']
1767 namespace = inpart.params['namespace']
1768 r = pushkey.decodekeys(inpart.read())
1768 r = pushkey.decodekeys(inpart.read())
1769 op.records.add('listkeys', (namespace, r))
1769 op.records.add('listkeys', (namespace, r))
1770
1770
1771 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1771 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1772 def handlepushkey(op, inpart):
1772 def handlepushkey(op, inpart):
1773 """process a pushkey request"""
1773 """process a pushkey request"""
1774 dec = pushkey.decode
1774 dec = pushkey.decode
1775 namespace = dec(inpart.params['namespace'])
1775 namespace = dec(inpart.params['namespace'])
1776 key = dec(inpart.params['key'])
1776 key = dec(inpart.params['key'])
1777 old = dec(inpart.params['old'])
1777 old = dec(inpart.params['old'])
1778 new = dec(inpart.params['new'])
1778 new = dec(inpart.params['new'])
1779 # Grab the transaction to ensure that we have the lock before performing the
1779 # Grab the transaction to ensure that we have the lock before performing the
1780 # pushkey.
1780 # pushkey.
1781 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1781 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1782 op.gettransaction()
1782 op.gettransaction()
1783 ret = op.repo.pushkey(namespace, key, old, new)
1783 ret = op.repo.pushkey(namespace, key, old, new)
1784 record = {'namespace': namespace,
1784 record = {'namespace': namespace,
1785 'key': key,
1785 'key': key,
1786 'old': old,
1786 'old': old,
1787 'new': new}
1787 'new': new}
1788 op.records.add('pushkey', record)
1788 op.records.add('pushkey', record)
1789 if op.reply is not None:
1789 if op.reply is not None:
1790 rpart = op.reply.newpart('reply:pushkey')
1790 rpart = op.reply.newpart('reply:pushkey')
1791 rpart.addparam(
1791 rpart.addparam(
1792 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1792 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1793 rpart.addparam('return', '%i' % ret, mandatory=False)
1793 rpart.addparam('return', '%i' % ret, mandatory=False)
1794 if inpart.mandatory and not ret:
1794 if inpart.mandatory and not ret:
1795 kwargs = {}
1795 kwargs = {}
1796 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1796 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1797 if key in inpart.params:
1797 if key in inpart.params:
1798 kwargs[key] = inpart.params[key]
1798 kwargs[key] = inpart.params[key]
1799 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1799 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1800
1800
1801 def _readphaseheads(inpart):
1801 def _readphaseheads(inpart):
1802 headsbyphase = [[] for i in phases.allphases]
1802 headsbyphase = [[] for i in phases.allphases]
1803 entrysize = struct.calcsize(_fphasesentry)
1803 entrysize = struct.calcsize(_fphasesentry)
1804 while True:
1804 while True:
1805 entry = inpart.read(entrysize)
1805 entry = inpart.read(entrysize)
1806 if len(entry) < entrysize:
1806 if len(entry) < entrysize:
1807 if entry:
1807 if entry:
1808 raise error.Abort(_('bad phase-heads bundle part'))
1808 raise error.Abort(_('bad phase-heads bundle part'))
1809 break
1809 break
1810 phase, node = struct.unpack(_fphasesentry, entry)
1810 phase, node = struct.unpack(_fphasesentry, entry)
1811 headsbyphase[phase].append(node)
1811 headsbyphase[phase].append(node)
1812 return headsbyphase
1812 return headsbyphase
1813
1813
1814 @parthandler('phase-heads')
1814 @parthandler('phase-heads')
1815 def handlephases(op, inpart):
1815 def handlephases(op, inpart):
1816 """apply phases from bundle part to repo"""
1816 """apply phases from bundle part to repo"""
1817 headsbyphase = _readphaseheads(inpart)
1817 headsbyphase = _readphaseheads(inpart)
1818 phases.updatephases(op.repo.unfiltered(), op.gettransaction(), headsbyphase)
1818 phases.updatephases(op.repo.unfiltered(), op.gettransaction(), headsbyphase)
1819
1819
1820 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1820 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1821 def handlepushkeyreply(op, inpart):
1821 def handlepushkeyreply(op, inpart):
1822 """retrieve the result of a pushkey request"""
1822 """retrieve the result of a pushkey request"""
1823 ret = int(inpart.params['return'])
1823 ret = int(inpart.params['return'])
1824 partid = int(inpart.params['in-reply-to'])
1824 partid = int(inpart.params['in-reply-to'])
1825 op.records.add('pushkey', {'return': ret}, partid)
1825 op.records.add('pushkey', {'return': ret}, partid)
1826
1826
1827 @parthandler('obsmarkers')
1827 @parthandler('obsmarkers')
1828 def handleobsmarker(op, inpart):
1828 def handleobsmarker(op, inpart):
1829 """add a stream of obsmarkers to the repo"""
1829 """add a stream of obsmarkers to the repo"""
1830 tr = op.gettransaction()
1830 tr = op.gettransaction()
1831 markerdata = inpart.read()
1831 markerdata = inpart.read()
1832 if op.ui.config('experimental', 'obsmarkers-exchange-debug'):
1832 if op.ui.config('experimental', 'obsmarkers-exchange-debug'):
1833 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1833 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1834 % len(markerdata))
1834 % len(markerdata))
1835 # The mergemarkers call will crash if marker creation is not enabled.
1835 # The mergemarkers call will crash if marker creation is not enabled.
1836 # we want to avoid this if the part is advisory.
1836 # we want to avoid this if the part is advisory.
1837 if not inpart.mandatory and op.repo.obsstore.readonly:
1837 if not inpart.mandatory and op.repo.obsstore.readonly:
1838 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1838 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1839 return
1839 return
1840 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1840 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1841 op.repo.invalidatevolatilesets()
1841 op.repo.invalidatevolatilesets()
1842 if new:
1842 if new:
1843 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1843 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1844 op.records.add('obsmarkers', {'new': new})
1844 op.records.add('obsmarkers', {'new': new})
1845 if op.reply is not None:
1845 if op.reply is not None:
1846 rpart = op.reply.newpart('reply:obsmarkers')
1846 rpart = op.reply.newpart('reply:obsmarkers')
1847 rpart.addparam(
1847 rpart.addparam(
1848 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1848 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1849 rpart.addparam('new', '%i' % new, mandatory=False)
1849 rpart.addparam('new', '%i' % new, mandatory=False)
1850
1850
1851
1851
1852 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1852 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1853 def handleobsmarkerreply(op, inpart):
1853 def handleobsmarkerreply(op, inpart):
1854 """retrieve the result of a pushkey request"""
1854 """retrieve the result of a pushkey request"""
1855 ret = int(inpart.params['new'])
1855 ret = int(inpart.params['new'])
1856 partid = int(inpart.params['in-reply-to'])
1856 partid = int(inpart.params['in-reply-to'])
1857 op.records.add('obsmarkers', {'new': ret}, partid)
1857 op.records.add('obsmarkers', {'new': ret}, partid)
1858
1858
1859 @parthandler('hgtagsfnodes')
1859 @parthandler('hgtagsfnodes')
1860 def handlehgtagsfnodes(op, inpart):
1860 def handlehgtagsfnodes(op, inpart):
1861 """Applies .hgtags fnodes cache entries to the local repo.
1861 """Applies .hgtags fnodes cache entries to the local repo.
1862
1862
1863 Payload is pairs of 20 byte changeset nodes and filenodes.
1863 Payload is pairs of 20 byte changeset nodes and filenodes.
1864 """
1864 """
1865 # Grab the transaction so we ensure that we have the lock at this point.
1865 # Grab the transaction so we ensure that we have the lock at this point.
1866 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1866 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1867 op.gettransaction()
1867 op.gettransaction()
1868 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1868 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1869
1869
1870 count = 0
1870 count = 0
1871 while True:
1871 while True:
1872 node = inpart.read(20)
1872 node = inpart.read(20)
1873 fnode = inpart.read(20)
1873 fnode = inpart.read(20)
1874 if len(node) < 20 or len(fnode) < 20:
1874 if len(node) < 20 or len(fnode) < 20:
1875 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1875 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1876 break
1876 break
1877 cache.setfnode(node, fnode)
1877 cache.setfnode(node, fnode)
1878 count += 1
1878 count += 1
1879
1879
1880 cache.write()
1880 cache.write()
1881 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
1881 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
1882
1882
1883 @parthandler('pushvars')
1883 @parthandler('pushvars')
1884 def bundle2getvars(op, part):
1884 def bundle2getvars(op, part):
1885 '''unbundle a bundle2 containing shellvars on the server'''
1885 '''unbundle a bundle2 containing shellvars on the server'''
1886 # An option to disable unbundling on server-side for security reasons
1886 # An option to disable unbundling on server-side for security reasons
1887 if op.ui.configbool('push', 'pushvars.server', False):
1887 if op.ui.configbool('push', 'pushvars.server', False):
1888 hookargs = {}
1888 hookargs = {}
1889 for key, value in part.advisoryparams:
1889 for key, value in part.advisoryparams:
1890 key = key.upper()
1890 key = key.upper()
1891 # We want pushed variables to have USERVAR_ prepended so we know
1891 # We want pushed variables to have USERVAR_ prepended so we know
1892 # they came from the --pushvar flag.
1892 # they came from the --pushvar flag.
1893 key = "USERVAR_" + key
1893 key = "USERVAR_" + key
1894 hookargs[key] = value
1894 hookargs[key] = value
1895 op.addhookargs(hookargs)
1895 op.addhookargs(hookargs)
General Comments 0
You need to be logged in to leave comments. Login now