##// END OF EJS Templates
bundle2: raise ProgrammingError for invalid call of addhookargs()...
Yuya Nishihara -
r33808:1bf5c550 default
parent child Browse files
Show More
@@ -1,1895 +1,1894 b''
1 # bundle2.py - generic container format to transmit arbitrary data.
1 # bundle2.py - generic container format to transmit arbitrary data.
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7 """Handling of the new bundle2 format
7 """Handling of the new bundle2 format
8
8
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 payloads in an application agnostic way. It consist in a sequence of "parts"
10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 that will be handed to and processed by the application layer.
11 that will be handed to and processed by the application layer.
12
12
13
13
14 General format architecture
14 General format architecture
15 ===========================
15 ===========================
16
16
17 The format is architectured as follow
17 The format is architectured as follow
18
18
19 - magic string
19 - magic string
20 - stream level parameters
20 - stream level parameters
21 - payload parts (any number)
21 - payload parts (any number)
22 - end of stream marker.
22 - end of stream marker.
23
23
24 the Binary format
24 the Binary format
25 ============================
25 ============================
26
26
27 All numbers are unsigned and big-endian.
27 All numbers are unsigned and big-endian.
28
28
29 stream level parameters
29 stream level parameters
30 ------------------------
30 ------------------------
31
31
32 Binary format is as follow
32 Binary format is as follow
33
33
34 :params size: int32
34 :params size: int32
35
35
36 The total number of Bytes used by the parameters
36 The total number of Bytes used by the parameters
37
37
38 :params value: arbitrary number of Bytes
38 :params value: arbitrary number of Bytes
39
39
40 A blob of `params size` containing the serialized version of all stream level
40 A blob of `params size` containing the serialized version of all stream level
41 parameters.
41 parameters.
42
42
43 The blob contains a space separated list of parameters. Parameters with value
43 The blob contains a space separated list of parameters. Parameters with value
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45
45
46 Empty name are obviously forbidden.
46 Empty name are obviously forbidden.
47
47
48 Name MUST start with a letter. If this first letter is lower case, the
48 Name MUST start with a letter. If this first letter is lower case, the
49 parameter is advisory and can be safely ignored. However when the first
49 parameter is advisory and can be safely ignored. However when the first
50 letter is capital, the parameter is mandatory and the bundling process MUST
50 letter is capital, the parameter is mandatory and the bundling process MUST
51 stop if he is not able to proceed it.
51 stop if he is not able to proceed it.
52
52
53 Stream parameters use a simple textual format for two main reasons:
53 Stream parameters use a simple textual format for two main reasons:
54
54
55 - Stream level parameters should remain simple and we want to discourage any
55 - Stream level parameters should remain simple and we want to discourage any
56 crazy usage.
56 crazy usage.
57 - Textual data allow easy human inspection of a bundle2 header in case of
57 - Textual data allow easy human inspection of a bundle2 header in case of
58 troubles.
58 troubles.
59
59
60 Any Applicative level options MUST go into a bundle2 part instead.
60 Any Applicative level options MUST go into a bundle2 part instead.
61
61
62 Payload part
62 Payload part
63 ------------------------
63 ------------------------
64
64
65 Binary format is as follow
65 Binary format is as follow
66
66
67 :header size: int32
67 :header size: int32
68
68
69 The total number of Bytes used by the part header. When the header is empty
69 The total number of Bytes used by the part header. When the header is empty
70 (size = 0) this is interpreted as the end of stream marker.
70 (size = 0) this is interpreted as the end of stream marker.
71
71
72 :header:
72 :header:
73
73
74 The header defines how to interpret the part. It contains two piece of
74 The header defines how to interpret the part. It contains two piece of
75 data: the part type, and the part parameters.
75 data: the part type, and the part parameters.
76
76
77 The part type is used to route an application level handler, that can
77 The part type is used to route an application level handler, that can
78 interpret payload.
78 interpret payload.
79
79
80 Part parameters are passed to the application level handler. They are
80 Part parameters are passed to the application level handler. They are
81 meant to convey information that will help the application level object to
81 meant to convey information that will help the application level object to
82 interpret the part payload.
82 interpret the part payload.
83
83
84 The binary format of the header is has follow
84 The binary format of the header is has follow
85
85
86 :typesize: (one byte)
86 :typesize: (one byte)
87
87
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89
89
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 to this part.
91 to this part.
92
92
93 :parameters:
93 :parameters:
94
94
95 Part's parameter may have arbitrary content, the binary structure is::
95 Part's parameter may have arbitrary content, the binary structure is::
96
96
97 <mandatory-count><advisory-count><param-sizes><param-data>
97 <mandatory-count><advisory-count><param-sizes><param-data>
98
98
99 :mandatory-count: 1 byte, number of mandatory parameters
99 :mandatory-count: 1 byte, number of mandatory parameters
100
100
101 :advisory-count: 1 byte, number of advisory parameters
101 :advisory-count: 1 byte, number of advisory parameters
102
102
103 :param-sizes:
103 :param-sizes:
104
104
105 N couple of bytes, where N is the total number of parameters. Each
105 N couple of bytes, where N is the total number of parameters. Each
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107
107
108 :param-data:
108 :param-data:
109
109
110 A blob of bytes from which each parameter key and value can be
110 A blob of bytes from which each parameter key and value can be
111 retrieved using the list of size couples stored in the previous
111 retrieved using the list of size couples stored in the previous
112 field.
112 field.
113
113
114 Mandatory parameters comes first, then the advisory ones.
114 Mandatory parameters comes first, then the advisory ones.
115
115
116 Each parameter's key MUST be unique within the part.
116 Each parameter's key MUST be unique within the part.
117
117
118 :payload:
118 :payload:
119
119
120 payload is a series of `<chunksize><chunkdata>`.
120 payload is a series of `<chunksize><chunkdata>`.
121
121
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124
124
125 The current implementation always produces either zero or one chunk.
125 The current implementation always produces either zero or one chunk.
126 This is an implementation limitation that will ultimately be lifted.
126 This is an implementation limitation that will ultimately be lifted.
127
127
128 `chunksize` can be negative to trigger special case processing. No such
128 `chunksize` can be negative to trigger special case processing. No such
129 processing is in place yet.
129 processing is in place yet.
130
130
131 Bundle processing
131 Bundle processing
132 ============================
132 ============================
133
133
134 Each part is processed in order using a "part handler". Handler are registered
134 Each part is processed in order using a "part handler". Handler are registered
135 for a certain part type.
135 for a certain part type.
136
136
137 The matching of a part to its handler is case insensitive. The case of the
137 The matching of a part to its handler is case insensitive. The case of the
138 part type is used to know if a part is mandatory or advisory. If the Part type
138 part type is used to know if a part is mandatory or advisory. If the Part type
139 contains any uppercase char it is considered mandatory. When no handler is
139 contains any uppercase char it is considered mandatory. When no handler is
140 known for a Mandatory part, the process is aborted and an exception is raised.
140 known for a Mandatory part, the process is aborted and an exception is raised.
141 If the part is advisory and no handler is known, the part is ignored. When the
141 If the part is advisory and no handler is known, the part is ignored. When the
142 process is aborted, the full bundle is still read from the stream to keep the
142 process is aborted, the full bundle is still read from the stream to keep the
143 channel usable. But none of the part read from an abort are processed. In the
143 channel usable. But none of the part read from an abort are processed. In the
144 future, dropping the stream may become an option for channel we do not care to
144 future, dropping the stream may become an option for channel we do not care to
145 preserve.
145 preserve.
146 """
146 """
147
147
148 from __future__ import absolute_import, division
148 from __future__ import absolute_import, division
149
149
150 import errno
150 import errno
151 import re
151 import re
152 import string
152 import string
153 import struct
153 import struct
154 import sys
154 import sys
155
155
156 from .i18n import _
156 from .i18n import _
157 from . import (
157 from . import (
158 changegroup,
158 changegroup,
159 error,
159 error,
160 obsolete,
160 obsolete,
161 phases,
161 phases,
162 pushkey,
162 pushkey,
163 pycompat,
163 pycompat,
164 tags,
164 tags,
165 url,
165 url,
166 util,
166 util,
167 )
167 )
168
168
169 urlerr = util.urlerr
169 urlerr = util.urlerr
170 urlreq = util.urlreq
170 urlreq = util.urlreq
171
171
172 _pack = struct.pack
172 _pack = struct.pack
173 _unpack = struct.unpack
173 _unpack = struct.unpack
174
174
175 _fstreamparamsize = '>i'
175 _fstreamparamsize = '>i'
176 _fpartheadersize = '>i'
176 _fpartheadersize = '>i'
177 _fparttypesize = '>B'
177 _fparttypesize = '>B'
178 _fpartid = '>I'
178 _fpartid = '>I'
179 _fpayloadsize = '>i'
179 _fpayloadsize = '>i'
180 _fpartparamcount = '>BB'
180 _fpartparamcount = '>BB'
181
181
182 _fphasesentry = '>i20s'
182 _fphasesentry = '>i20s'
183
183
184 preferedchunksize = 4096
184 preferedchunksize = 4096
185
185
186 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
186 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
187
187
188 def outdebug(ui, message):
188 def outdebug(ui, message):
189 """debug regarding output stream (bundling)"""
189 """debug regarding output stream (bundling)"""
190 if ui.configbool('devel', 'bundle2.debug'):
190 if ui.configbool('devel', 'bundle2.debug'):
191 ui.debug('bundle2-output: %s\n' % message)
191 ui.debug('bundle2-output: %s\n' % message)
192
192
193 def indebug(ui, message):
193 def indebug(ui, message):
194 """debug on input stream (unbundling)"""
194 """debug on input stream (unbundling)"""
195 if ui.configbool('devel', 'bundle2.debug'):
195 if ui.configbool('devel', 'bundle2.debug'):
196 ui.debug('bundle2-input: %s\n' % message)
196 ui.debug('bundle2-input: %s\n' % message)
197
197
198 def validateparttype(parttype):
198 def validateparttype(parttype):
199 """raise ValueError if a parttype contains invalid character"""
199 """raise ValueError if a parttype contains invalid character"""
200 if _parttypeforbidden.search(parttype):
200 if _parttypeforbidden.search(parttype):
201 raise ValueError(parttype)
201 raise ValueError(parttype)
202
202
203 def _makefpartparamsizes(nbparams):
203 def _makefpartparamsizes(nbparams):
204 """return a struct format to read part parameter sizes
204 """return a struct format to read part parameter sizes
205
205
206 The number parameters is variable so we need to build that format
206 The number parameters is variable so we need to build that format
207 dynamically.
207 dynamically.
208 """
208 """
209 return '>'+('BB'*nbparams)
209 return '>'+('BB'*nbparams)
210
210
211 parthandlermapping = {}
211 parthandlermapping = {}
212
212
213 def parthandler(parttype, params=()):
213 def parthandler(parttype, params=()):
214 """decorator that register a function as a bundle2 part handler
214 """decorator that register a function as a bundle2 part handler
215
215
216 eg::
216 eg::
217
217
218 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
218 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
219 def myparttypehandler(...):
219 def myparttypehandler(...):
220 '''process a part of type "my part".'''
220 '''process a part of type "my part".'''
221 ...
221 ...
222 """
222 """
223 validateparttype(parttype)
223 validateparttype(parttype)
224 def _decorator(func):
224 def _decorator(func):
225 lparttype = parttype.lower() # enforce lower case matching.
225 lparttype = parttype.lower() # enforce lower case matching.
226 assert lparttype not in parthandlermapping
226 assert lparttype not in parthandlermapping
227 parthandlermapping[lparttype] = func
227 parthandlermapping[lparttype] = func
228 func.params = frozenset(params)
228 func.params = frozenset(params)
229 return func
229 return func
230 return _decorator
230 return _decorator
231
231
232 class unbundlerecords(object):
232 class unbundlerecords(object):
233 """keep record of what happens during and unbundle
233 """keep record of what happens during and unbundle
234
234
235 New records are added using `records.add('cat', obj)`. Where 'cat' is a
235 New records are added using `records.add('cat', obj)`. Where 'cat' is a
236 category of record and obj is an arbitrary object.
236 category of record and obj is an arbitrary object.
237
237
238 `records['cat']` will return all entries of this category 'cat'.
238 `records['cat']` will return all entries of this category 'cat'.
239
239
240 Iterating on the object itself will yield `('category', obj)` tuples
240 Iterating on the object itself will yield `('category', obj)` tuples
241 for all entries.
241 for all entries.
242
242
243 All iterations happens in chronological order.
243 All iterations happens in chronological order.
244 """
244 """
245
245
246 def __init__(self):
246 def __init__(self):
247 self._categories = {}
247 self._categories = {}
248 self._sequences = []
248 self._sequences = []
249 self._replies = {}
249 self._replies = {}
250
250
251 def add(self, category, entry, inreplyto=None):
251 def add(self, category, entry, inreplyto=None):
252 """add a new record of a given category.
252 """add a new record of a given category.
253
253
254 The entry can then be retrieved in the list returned by
254 The entry can then be retrieved in the list returned by
255 self['category']."""
255 self['category']."""
256 self._categories.setdefault(category, []).append(entry)
256 self._categories.setdefault(category, []).append(entry)
257 self._sequences.append((category, entry))
257 self._sequences.append((category, entry))
258 if inreplyto is not None:
258 if inreplyto is not None:
259 self.getreplies(inreplyto).add(category, entry)
259 self.getreplies(inreplyto).add(category, entry)
260
260
261 def getreplies(self, partid):
261 def getreplies(self, partid):
262 """get the records that are replies to a specific part"""
262 """get the records that are replies to a specific part"""
263 return self._replies.setdefault(partid, unbundlerecords())
263 return self._replies.setdefault(partid, unbundlerecords())
264
264
265 def __getitem__(self, cat):
265 def __getitem__(self, cat):
266 return tuple(self._categories.get(cat, ()))
266 return tuple(self._categories.get(cat, ()))
267
267
268 def __iter__(self):
268 def __iter__(self):
269 return iter(self._sequences)
269 return iter(self._sequences)
270
270
271 def __len__(self):
271 def __len__(self):
272 return len(self._sequences)
272 return len(self._sequences)
273
273
274 def __nonzero__(self):
274 def __nonzero__(self):
275 return bool(self._sequences)
275 return bool(self._sequences)
276
276
277 __bool__ = __nonzero__
277 __bool__ = __nonzero__
278
278
279 class bundleoperation(object):
279 class bundleoperation(object):
280 """an object that represents a single bundling process
280 """an object that represents a single bundling process
281
281
282 Its purpose is to carry unbundle-related objects and states.
282 Its purpose is to carry unbundle-related objects and states.
283
283
284 A new object should be created at the beginning of each bundle processing.
284 A new object should be created at the beginning of each bundle processing.
285 The object is to be returned by the processing function.
285 The object is to be returned by the processing function.
286
286
287 The object has very little content now it will ultimately contain:
287 The object has very little content now it will ultimately contain:
288 * an access to the repo the bundle is applied to,
288 * an access to the repo the bundle is applied to,
289 * a ui object,
289 * a ui object,
290 * a way to retrieve a transaction to add changes to the repo,
290 * a way to retrieve a transaction to add changes to the repo,
291 * a way to record the result of processing each part,
291 * a way to record the result of processing each part,
292 * a way to construct a bundle response when applicable.
292 * a way to construct a bundle response when applicable.
293 """
293 """
294
294
295 def __init__(self, repo, transactiongetter, captureoutput=True):
295 def __init__(self, repo, transactiongetter, captureoutput=True):
296 self.repo = repo
296 self.repo = repo
297 self.ui = repo.ui
297 self.ui = repo.ui
298 self.records = unbundlerecords()
298 self.records = unbundlerecords()
299 self.reply = None
299 self.reply = None
300 self.captureoutput = captureoutput
300 self.captureoutput = captureoutput
301 self.hookargs = {}
301 self.hookargs = {}
302 self._gettransaction = transactiongetter
302 self._gettransaction = transactiongetter
303
303
304 def gettransaction(self):
304 def gettransaction(self):
305 transaction = self._gettransaction()
305 transaction = self._gettransaction()
306
306
307 if self.hookargs is not None:
307 if self.hookargs is not None:
308 # the ones added to the transaction supercede those added
308 # the ones added to the transaction supercede those added
309 # to the operation.
309 # to the operation.
310 self.hookargs.update(transaction.hookargs)
310 self.hookargs.update(transaction.hookargs)
311 transaction.hookargs = self.hookargs
311 transaction.hookargs = self.hookargs
312
312
313 # mark the hookargs as flushed. further attempts to add to
313 # mark the hookargs as flushed. further attempts to add to
314 # hookargs will result in an abort.
314 # hookargs will result in an abort.
315 self.hookargs = None
315 self.hookargs = None
316
316
317 return transaction
317 return transaction
318
318
319 def addhookargs(self, hookargs):
319 def addhookargs(self, hookargs):
320 if self.hookargs is None:
320 if self.hookargs is None:
321 raise error.Abort(
321 raise error.ProgrammingError('attempted to add hookargs to '
322 _('attempted to add hooks to operation after transaction '
322 'operation after transaction started')
323 'started'))
324 self.hookargs.update(hookargs)
323 self.hookargs.update(hookargs)
325
324
326 class TransactionUnavailable(RuntimeError):
325 class TransactionUnavailable(RuntimeError):
327 pass
326 pass
328
327
329 def _notransaction():
328 def _notransaction():
330 """default method to get a transaction while processing a bundle
329 """default method to get a transaction while processing a bundle
331
330
332 Raise an exception to highlight the fact that no transaction was expected
331 Raise an exception to highlight the fact that no transaction was expected
333 to be created"""
332 to be created"""
334 raise TransactionUnavailable()
333 raise TransactionUnavailable()
335
334
336 def applybundle(repo, unbundler, tr, source=None, url=None, **kwargs):
335 def applybundle(repo, unbundler, tr, source=None, url=None, **kwargs):
337 # transform me into unbundler.apply() as soon as the freeze is lifted
336 # transform me into unbundler.apply() as soon as the freeze is lifted
338 if isinstance(unbundler, unbundle20):
337 if isinstance(unbundler, unbundle20):
339 tr.hookargs['bundle2'] = '1'
338 tr.hookargs['bundle2'] = '1'
340 if source is not None and 'source' not in tr.hookargs:
339 if source is not None and 'source' not in tr.hookargs:
341 tr.hookargs['source'] = source
340 tr.hookargs['source'] = source
342 if url is not None and 'url' not in tr.hookargs:
341 if url is not None and 'url' not in tr.hookargs:
343 tr.hookargs['url'] = url
342 tr.hookargs['url'] = url
344 return processbundle(repo, unbundler, lambda: tr)
343 return processbundle(repo, unbundler, lambda: tr)
345 else:
344 else:
346 # the transactiongetter won't be used, but we might as well set it
345 # the transactiongetter won't be used, but we might as well set it
347 op = bundleoperation(repo, lambda: tr)
346 op = bundleoperation(repo, lambda: tr)
348 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
347 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
349 return op
348 return op
350
349
351 def processbundle(repo, unbundler, transactiongetter=None, op=None):
350 def processbundle(repo, unbundler, transactiongetter=None, op=None):
352 """This function process a bundle, apply effect to/from a repo
351 """This function process a bundle, apply effect to/from a repo
353
352
354 It iterates over each part then searches for and uses the proper handling
353 It iterates over each part then searches for and uses the proper handling
355 code to process the part. Parts are processed in order.
354 code to process the part. Parts are processed in order.
356
355
357 Unknown Mandatory part will abort the process.
356 Unknown Mandatory part will abort the process.
358
357
359 It is temporarily possible to provide a prebuilt bundleoperation to the
358 It is temporarily possible to provide a prebuilt bundleoperation to the
360 function. This is used to ensure output is properly propagated in case of
359 function. This is used to ensure output is properly propagated in case of
361 an error during the unbundling. This output capturing part will likely be
360 an error during the unbundling. This output capturing part will likely be
362 reworked and this ability will probably go away in the process.
361 reworked and this ability will probably go away in the process.
363 """
362 """
364 if op is None:
363 if op is None:
365 if transactiongetter is None:
364 if transactiongetter is None:
366 transactiongetter = _notransaction
365 transactiongetter = _notransaction
367 op = bundleoperation(repo, transactiongetter)
366 op = bundleoperation(repo, transactiongetter)
368 # todo:
367 # todo:
369 # - replace this is a init function soon.
368 # - replace this is a init function soon.
370 # - exception catching
369 # - exception catching
371 unbundler.params
370 unbundler.params
372 if repo.ui.debugflag:
371 if repo.ui.debugflag:
373 msg = ['bundle2-input-bundle:']
372 msg = ['bundle2-input-bundle:']
374 if unbundler.params:
373 if unbundler.params:
375 msg.append(' %i params' % len(unbundler.params))
374 msg.append(' %i params' % len(unbundler.params))
376 if op._gettransaction is None or op._gettransaction is _notransaction:
375 if op._gettransaction is None or op._gettransaction is _notransaction:
377 msg.append(' no-transaction')
376 msg.append(' no-transaction')
378 else:
377 else:
379 msg.append(' with-transaction')
378 msg.append(' with-transaction')
380 msg.append('\n')
379 msg.append('\n')
381 repo.ui.debug(''.join(msg))
380 repo.ui.debug(''.join(msg))
382 iterparts = enumerate(unbundler.iterparts())
381 iterparts = enumerate(unbundler.iterparts())
383 part = None
382 part = None
384 nbpart = 0
383 nbpart = 0
385 try:
384 try:
386 for nbpart, part in iterparts:
385 for nbpart, part in iterparts:
387 _processpart(op, part)
386 _processpart(op, part)
388 except Exception as exc:
387 except Exception as exc:
389 # Any exceptions seeking to the end of the bundle at this point are
388 # Any exceptions seeking to the end of the bundle at this point are
390 # almost certainly related to the underlying stream being bad.
389 # almost certainly related to the underlying stream being bad.
391 # And, chances are that the exception we're handling is related to
390 # And, chances are that the exception we're handling is related to
392 # getting in that bad state. So, we swallow the seeking error and
391 # getting in that bad state. So, we swallow the seeking error and
393 # re-raise the original error.
392 # re-raise the original error.
394 seekerror = False
393 seekerror = False
395 try:
394 try:
396 for nbpart, part in iterparts:
395 for nbpart, part in iterparts:
397 # consume the bundle content
396 # consume the bundle content
398 part.seek(0, 2)
397 part.seek(0, 2)
399 except Exception:
398 except Exception:
400 seekerror = True
399 seekerror = True
401
400
402 # Small hack to let caller code distinguish exceptions from bundle2
401 # Small hack to let caller code distinguish exceptions from bundle2
403 # processing from processing the old format. This is mostly
402 # processing from processing the old format. This is mostly
404 # needed to handle different return codes to unbundle according to the
403 # needed to handle different return codes to unbundle according to the
405 # type of bundle. We should probably clean up or drop this return code
404 # type of bundle. We should probably clean up or drop this return code
406 # craziness in a future version.
405 # craziness in a future version.
407 exc.duringunbundle2 = True
406 exc.duringunbundle2 = True
408 salvaged = []
407 salvaged = []
409 replycaps = None
408 replycaps = None
410 if op.reply is not None:
409 if op.reply is not None:
411 salvaged = op.reply.salvageoutput()
410 salvaged = op.reply.salvageoutput()
412 replycaps = op.reply.capabilities
411 replycaps = op.reply.capabilities
413 exc._replycaps = replycaps
412 exc._replycaps = replycaps
414 exc._bundle2salvagedoutput = salvaged
413 exc._bundle2salvagedoutput = salvaged
415
414
416 # Re-raising from a variable loses the original stack. So only use
415 # Re-raising from a variable loses the original stack. So only use
417 # that form if we need to.
416 # that form if we need to.
418 if seekerror:
417 if seekerror:
419 raise exc
418 raise exc
420 else:
419 else:
421 raise
420 raise
422 finally:
421 finally:
423 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
422 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
424
423
425 return op
424 return op
426
425
427 def _processchangegroup(op, cg, tr, source, url, **kwargs):
426 def _processchangegroup(op, cg, tr, source, url, **kwargs):
428 ret = cg.apply(op.repo, tr, source, url, **kwargs)
427 ret = cg.apply(op.repo, tr, source, url, **kwargs)
429 op.records.add('changegroup', {
428 op.records.add('changegroup', {
430 'return': ret,
429 'return': ret,
431 })
430 })
432 return ret
431 return ret
433
432
434 def _processpart(op, part):
433 def _processpart(op, part):
435 """process a single part from a bundle
434 """process a single part from a bundle
436
435
437 The part is guaranteed to have been fully consumed when the function exits
436 The part is guaranteed to have been fully consumed when the function exits
438 (even if an exception is raised)."""
437 (even if an exception is raised)."""
439 status = 'unknown' # used by debug output
438 status = 'unknown' # used by debug output
440 hardabort = False
439 hardabort = False
441 try:
440 try:
442 try:
441 try:
443 handler = parthandlermapping.get(part.type)
442 handler = parthandlermapping.get(part.type)
444 if handler is None:
443 if handler is None:
445 status = 'unsupported-type'
444 status = 'unsupported-type'
446 raise error.BundleUnknownFeatureError(parttype=part.type)
445 raise error.BundleUnknownFeatureError(parttype=part.type)
447 indebug(op.ui, 'found a handler for part %r' % part.type)
446 indebug(op.ui, 'found a handler for part %r' % part.type)
448 unknownparams = part.mandatorykeys - handler.params
447 unknownparams = part.mandatorykeys - handler.params
449 if unknownparams:
448 if unknownparams:
450 unknownparams = list(unknownparams)
449 unknownparams = list(unknownparams)
451 unknownparams.sort()
450 unknownparams.sort()
452 status = 'unsupported-params (%s)' % unknownparams
451 status = 'unsupported-params (%s)' % unknownparams
453 raise error.BundleUnknownFeatureError(parttype=part.type,
452 raise error.BundleUnknownFeatureError(parttype=part.type,
454 params=unknownparams)
453 params=unknownparams)
455 status = 'supported'
454 status = 'supported'
456 except error.BundleUnknownFeatureError as exc:
455 except error.BundleUnknownFeatureError as exc:
457 if part.mandatory: # mandatory parts
456 if part.mandatory: # mandatory parts
458 raise
457 raise
459 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
458 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
460 return # skip to part processing
459 return # skip to part processing
461 finally:
460 finally:
462 if op.ui.debugflag:
461 if op.ui.debugflag:
463 msg = ['bundle2-input-part: "%s"' % part.type]
462 msg = ['bundle2-input-part: "%s"' % part.type]
464 if not part.mandatory:
463 if not part.mandatory:
465 msg.append(' (advisory)')
464 msg.append(' (advisory)')
466 nbmp = len(part.mandatorykeys)
465 nbmp = len(part.mandatorykeys)
467 nbap = len(part.params) - nbmp
466 nbap = len(part.params) - nbmp
468 if nbmp or nbap:
467 if nbmp or nbap:
469 msg.append(' (params:')
468 msg.append(' (params:')
470 if nbmp:
469 if nbmp:
471 msg.append(' %i mandatory' % nbmp)
470 msg.append(' %i mandatory' % nbmp)
472 if nbap:
471 if nbap:
473 msg.append(' %i advisory' % nbmp)
472 msg.append(' %i advisory' % nbmp)
474 msg.append(')')
473 msg.append(')')
475 msg.append(' %s\n' % status)
474 msg.append(' %s\n' % status)
476 op.ui.debug(''.join(msg))
475 op.ui.debug(''.join(msg))
477
476
478 # handler is called outside the above try block so that we don't
477 # handler is called outside the above try block so that we don't
479 # risk catching KeyErrors from anything other than the
478 # risk catching KeyErrors from anything other than the
480 # parthandlermapping lookup (any KeyError raised by handler()
479 # parthandlermapping lookup (any KeyError raised by handler()
481 # itself represents a defect of a different variety).
480 # itself represents a defect of a different variety).
482 output = None
481 output = None
483 if op.captureoutput and op.reply is not None:
482 if op.captureoutput and op.reply is not None:
484 op.ui.pushbuffer(error=True, subproc=True)
483 op.ui.pushbuffer(error=True, subproc=True)
485 output = ''
484 output = ''
486 try:
485 try:
487 handler(op, part)
486 handler(op, part)
488 finally:
487 finally:
489 if output is not None:
488 if output is not None:
490 output = op.ui.popbuffer()
489 output = op.ui.popbuffer()
491 if output:
490 if output:
492 outpart = op.reply.newpart('output', data=output,
491 outpart = op.reply.newpart('output', data=output,
493 mandatory=False)
492 mandatory=False)
494 outpart.addparam(
493 outpart.addparam(
495 'in-reply-to', pycompat.bytestr(part.id), mandatory=False)
494 'in-reply-to', pycompat.bytestr(part.id), mandatory=False)
496 # If exiting or interrupted, do not attempt to seek the stream in the
495 # If exiting or interrupted, do not attempt to seek the stream in the
497 # finally block below. This makes abort faster.
496 # finally block below. This makes abort faster.
498 except (SystemExit, KeyboardInterrupt):
497 except (SystemExit, KeyboardInterrupt):
499 hardabort = True
498 hardabort = True
500 raise
499 raise
501 finally:
500 finally:
502 # consume the part content to not corrupt the stream.
501 # consume the part content to not corrupt the stream.
503 if not hardabort:
502 if not hardabort:
504 part.seek(0, 2)
503 part.seek(0, 2)
505
504
506
505
507 def decodecaps(blob):
506 def decodecaps(blob):
508 """decode a bundle2 caps bytes blob into a dictionary
507 """decode a bundle2 caps bytes blob into a dictionary
509
508
510 The blob is a list of capabilities (one per line)
509 The blob is a list of capabilities (one per line)
511 Capabilities may have values using a line of the form::
510 Capabilities may have values using a line of the form::
512
511
513 capability=value1,value2,value3
512 capability=value1,value2,value3
514
513
515 The values are always a list."""
514 The values are always a list."""
516 caps = {}
515 caps = {}
517 for line in blob.splitlines():
516 for line in blob.splitlines():
518 if not line:
517 if not line:
519 continue
518 continue
520 if '=' not in line:
519 if '=' not in line:
521 key, vals = line, ()
520 key, vals = line, ()
522 else:
521 else:
523 key, vals = line.split('=', 1)
522 key, vals = line.split('=', 1)
524 vals = vals.split(',')
523 vals = vals.split(',')
525 key = urlreq.unquote(key)
524 key = urlreq.unquote(key)
526 vals = [urlreq.unquote(v) for v in vals]
525 vals = [urlreq.unquote(v) for v in vals]
527 caps[key] = vals
526 caps[key] = vals
528 return caps
527 return caps
529
528
530 def encodecaps(caps):
529 def encodecaps(caps):
531 """encode a bundle2 caps dictionary into a bytes blob"""
530 """encode a bundle2 caps dictionary into a bytes blob"""
532 chunks = []
531 chunks = []
533 for ca in sorted(caps):
532 for ca in sorted(caps):
534 vals = caps[ca]
533 vals = caps[ca]
535 ca = urlreq.quote(ca)
534 ca = urlreq.quote(ca)
536 vals = [urlreq.quote(v) for v in vals]
535 vals = [urlreq.quote(v) for v in vals]
537 if vals:
536 if vals:
538 ca = "%s=%s" % (ca, ','.join(vals))
537 ca = "%s=%s" % (ca, ','.join(vals))
539 chunks.append(ca)
538 chunks.append(ca)
540 return '\n'.join(chunks)
539 return '\n'.join(chunks)
541
540
542 bundletypes = {
541 bundletypes = {
543 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
542 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
544 # since the unification ssh accepts a header but there
543 # since the unification ssh accepts a header but there
545 # is no capability signaling it.
544 # is no capability signaling it.
546 "HG20": (), # special-cased below
545 "HG20": (), # special-cased below
547 "HG10UN": ("HG10UN", 'UN'),
546 "HG10UN": ("HG10UN", 'UN'),
548 "HG10BZ": ("HG10", 'BZ'),
547 "HG10BZ": ("HG10", 'BZ'),
549 "HG10GZ": ("HG10GZ", 'GZ'),
548 "HG10GZ": ("HG10GZ", 'GZ'),
550 }
549 }
551
550
552 # hgweb uses this list to communicate its preferred type
551 # hgweb uses this list to communicate its preferred type
553 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
552 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
554
553
555 class bundle20(object):
554 class bundle20(object):
556 """represent an outgoing bundle2 container
555 """represent an outgoing bundle2 container
557
556
558 Use the `addparam` method to add stream level parameter. and `newpart` to
557 Use the `addparam` method to add stream level parameter. and `newpart` to
559 populate it. Then call `getchunks` to retrieve all the binary chunks of
558 populate it. Then call `getchunks` to retrieve all the binary chunks of
560 data that compose the bundle2 container."""
559 data that compose the bundle2 container."""
561
560
562 _magicstring = 'HG20'
561 _magicstring = 'HG20'
563
562
564 def __init__(self, ui, capabilities=()):
563 def __init__(self, ui, capabilities=()):
565 self.ui = ui
564 self.ui = ui
566 self._params = []
565 self._params = []
567 self._parts = []
566 self._parts = []
568 self.capabilities = dict(capabilities)
567 self.capabilities = dict(capabilities)
569 self._compengine = util.compengines.forbundletype('UN')
568 self._compengine = util.compengines.forbundletype('UN')
570 self._compopts = None
569 self._compopts = None
571
570
572 def setcompression(self, alg, compopts=None):
571 def setcompression(self, alg, compopts=None):
573 """setup core part compression to <alg>"""
572 """setup core part compression to <alg>"""
574 if alg in (None, 'UN'):
573 if alg in (None, 'UN'):
575 return
574 return
576 assert not any(n.lower() == 'compression' for n, v in self._params)
575 assert not any(n.lower() == 'compression' for n, v in self._params)
577 self.addparam('Compression', alg)
576 self.addparam('Compression', alg)
578 self._compengine = util.compengines.forbundletype(alg)
577 self._compengine = util.compengines.forbundletype(alg)
579 self._compopts = compopts
578 self._compopts = compopts
580
579
581 @property
580 @property
582 def nbparts(self):
581 def nbparts(self):
583 """total number of parts added to the bundler"""
582 """total number of parts added to the bundler"""
584 return len(self._parts)
583 return len(self._parts)
585
584
586 # methods used to defines the bundle2 content
585 # methods used to defines the bundle2 content
587 def addparam(self, name, value=None):
586 def addparam(self, name, value=None):
588 """add a stream level parameter"""
587 """add a stream level parameter"""
589 if not name:
588 if not name:
590 raise ValueError('empty parameter name')
589 raise ValueError('empty parameter name')
591 if name[0] not in string.letters:
590 if name[0] not in string.letters:
592 raise ValueError('non letter first character: %r' % name)
591 raise ValueError('non letter first character: %r' % name)
593 self._params.append((name, value))
592 self._params.append((name, value))
594
593
595 def addpart(self, part):
594 def addpart(self, part):
596 """add a new part to the bundle2 container
595 """add a new part to the bundle2 container
597
596
598 Parts contains the actual applicative payload."""
597 Parts contains the actual applicative payload."""
599 assert part.id is None
598 assert part.id is None
600 part.id = len(self._parts) # very cheap counter
599 part.id = len(self._parts) # very cheap counter
601 self._parts.append(part)
600 self._parts.append(part)
602
601
603 def newpart(self, typeid, *args, **kwargs):
602 def newpart(self, typeid, *args, **kwargs):
604 """create a new part and add it to the containers
603 """create a new part and add it to the containers
605
604
606 As the part is directly added to the containers. For now, this means
605 As the part is directly added to the containers. For now, this means
607 that any failure to properly initialize the part after calling
606 that any failure to properly initialize the part after calling
608 ``newpart`` should result in a failure of the whole bundling process.
607 ``newpart`` should result in a failure of the whole bundling process.
609
608
610 You can still fall back to manually create and add if you need better
609 You can still fall back to manually create and add if you need better
611 control."""
610 control."""
612 part = bundlepart(typeid, *args, **kwargs)
611 part = bundlepart(typeid, *args, **kwargs)
613 self.addpart(part)
612 self.addpart(part)
614 return part
613 return part
615
614
616 # methods used to generate the bundle2 stream
615 # methods used to generate the bundle2 stream
617 def getchunks(self):
616 def getchunks(self):
618 if self.ui.debugflag:
617 if self.ui.debugflag:
619 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
618 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
620 if self._params:
619 if self._params:
621 msg.append(' (%i params)' % len(self._params))
620 msg.append(' (%i params)' % len(self._params))
622 msg.append(' %i parts total\n' % len(self._parts))
621 msg.append(' %i parts total\n' % len(self._parts))
623 self.ui.debug(''.join(msg))
622 self.ui.debug(''.join(msg))
624 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
623 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
625 yield self._magicstring
624 yield self._magicstring
626 param = self._paramchunk()
625 param = self._paramchunk()
627 outdebug(self.ui, 'bundle parameter: %s' % param)
626 outdebug(self.ui, 'bundle parameter: %s' % param)
628 yield _pack(_fstreamparamsize, len(param))
627 yield _pack(_fstreamparamsize, len(param))
629 if param:
628 if param:
630 yield param
629 yield param
631 for chunk in self._compengine.compressstream(self._getcorechunk(),
630 for chunk in self._compengine.compressstream(self._getcorechunk(),
632 self._compopts):
631 self._compopts):
633 yield chunk
632 yield chunk
634
633
635 def _paramchunk(self):
634 def _paramchunk(self):
636 """return a encoded version of all stream parameters"""
635 """return a encoded version of all stream parameters"""
637 blocks = []
636 blocks = []
638 for par, value in self._params:
637 for par, value in self._params:
639 par = urlreq.quote(par)
638 par = urlreq.quote(par)
640 if value is not None:
639 if value is not None:
641 value = urlreq.quote(value)
640 value = urlreq.quote(value)
642 par = '%s=%s' % (par, value)
641 par = '%s=%s' % (par, value)
643 blocks.append(par)
642 blocks.append(par)
644 return ' '.join(blocks)
643 return ' '.join(blocks)
645
644
646 def _getcorechunk(self):
645 def _getcorechunk(self):
647 """yield chunk for the core part of the bundle
646 """yield chunk for the core part of the bundle
648
647
649 (all but headers and parameters)"""
648 (all but headers and parameters)"""
650 outdebug(self.ui, 'start of parts')
649 outdebug(self.ui, 'start of parts')
651 for part in self._parts:
650 for part in self._parts:
652 outdebug(self.ui, 'bundle part: "%s"' % part.type)
651 outdebug(self.ui, 'bundle part: "%s"' % part.type)
653 for chunk in part.getchunks(ui=self.ui):
652 for chunk in part.getchunks(ui=self.ui):
654 yield chunk
653 yield chunk
655 outdebug(self.ui, 'end of bundle')
654 outdebug(self.ui, 'end of bundle')
656 yield _pack(_fpartheadersize, 0)
655 yield _pack(_fpartheadersize, 0)
657
656
658
657
659 def salvageoutput(self):
658 def salvageoutput(self):
660 """return a list with a copy of all output parts in the bundle
659 """return a list with a copy of all output parts in the bundle
661
660
662 This is meant to be used during error handling to make sure we preserve
661 This is meant to be used during error handling to make sure we preserve
663 server output"""
662 server output"""
664 salvaged = []
663 salvaged = []
665 for part in self._parts:
664 for part in self._parts:
666 if part.type.startswith('output'):
665 if part.type.startswith('output'):
667 salvaged.append(part.copy())
666 salvaged.append(part.copy())
668 return salvaged
667 return salvaged
669
668
670
669
671 class unpackermixin(object):
670 class unpackermixin(object):
672 """A mixin to extract bytes and struct data from a stream"""
671 """A mixin to extract bytes and struct data from a stream"""
673
672
674 def __init__(self, fp):
673 def __init__(self, fp):
675 self._fp = fp
674 self._fp = fp
676
675
677 def _unpack(self, format):
676 def _unpack(self, format):
678 """unpack this struct format from the stream
677 """unpack this struct format from the stream
679
678
680 This method is meant for internal usage by the bundle2 protocol only.
679 This method is meant for internal usage by the bundle2 protocol only.
681 They directly manipulate the low level stream including bundle2 level
680 They directly manipulate the low level stream including bundle2 level
682 instruction.
681 instruction.
683
682
684 Do not use it to implement higher-level logic or methods."""
683 Do not use it to implement higher-level logic or methods."""
685 data = self._readexact(struct.calcsize(format))
684 data = self._readexact(struct.calcsize(format))
686 return _unpack(format, data)
685 return _unpack(format, data)
687
686
688 def _readexact(self, size):
687 def _readexact(self, size):
689 """read exactly <size> bytes from the stream
688 """read exactly <size> bytes from the stream
690
689
691 This method is meant for internal usage by the bundle2 protocol only.
690 This method is meant for internal usage by the bundle2 protocol only.
692 They directly manipulate the low level stream including bundle2 level
691 They directly manipulate the low level stream including bundle2 level
693 instruction.
692 instruction.
694
693
695 Do not use it to implement higher-level logic or methods."""
694 Do not use it to implement higher-level logic or methods."""
696 return changegroup.readexactly(self._fp, size)
695 return changegroup.readexactly(self._fp, size)
697
696
698 def getunbundler(ui, fp, magicstring=None):
697 def getunbundler(ui, fp, magicstring=None):
699 """return a valid unbundler object for a given magicstring"""
698 """return a valid unbundler object for a given magicstring"""
700 if magicstring is None:
699 if magicstring is None:
701 magicstring = changegroup.readexactly(fp, 4)
700 magicstring = changegroup.readexactly(fp, 4)
702 magic, version = magicstring[0:2], magicstring[2:4]
701 magic, version = magicstring[0:2], magicstring[2:4]
703 if magic != 'HG':
702 if magic != 'HG':
704 ui.debug(
703 ui.debug(
705 "error: invalid magic: %r (version %r), should be 'HG'\n"
704 "error: invalid magic: %r (version %r), should be 'HG'\n"
706 % (magic, version))
705 % (magic, version))
707 raise error.Abort(_('not a Mercurial bundle'))
706 raise error.Abort(_('not a Mercurial bundle'))
708 unbundlerclass = formatmap.get(version)
707 unbundlerclass = formatmap.get(version)
709 if unbundlerclass is None:
708 if unbundlerclass is None:
710 raise error.Abort(_('unknown bundle version %s') % version)
709 raise error.Abort(_('unknown bundle version %s') % version)
711 unbundler = unbundlerclass(ui, fp)
710 unbundler = unbundlerclass(ui, fp)
712 indebug(ui, 'start processing of %s stream' % magicstring)
711 indebug(ui, 'start processing of %s stream' % magicstring)
713 return unbundler
712 return unbundler
714
713
715 class unbundle20(unpackermixin):
714 class unbundle20(unpackermixin):
716 """interpret a bundle2 stream
715 """interpret a bundle2 stream
717
716
718 This class is fed with a binary stream and yields parts through its
717 This class is fed with a binary stream and yields parts through its
719 `iterparts` methods."""
718 `iterparts` methods."""
720
719
721 _magicstring = 'HG20'
720 _magicstring = 'HG20'
722
721
723 def __init__(self, ui, fp):
722 def __init__(self, ui, fp):
724 """If header is specified, we do not read it out of the stream."""
723 """If header is specified, we do not read it out of the stream."""
725 self.ui = ui
724 self.ui = ui
726 self._compengine = util.compengines.forbundletype('UN')
725 self._compengine = util.compengines.forbundletype('UN')
727 self._compressed = None
726 self._compressed = None
728 super(unbundle20, self).__init__(fp)
727 super(unbundle20, self).__init__(fp)
729
728
730 @util.propertycache
729 @util.propertycache
731 def params(self):
730 def params(self):
732 """dictionary of stream level parameters"""
731 """dictionary of stream level parameters"""
733 indebug(self.ui, 'reading bundle2 stream parameters')
732 indebug(self.ui, 'reading bundle2 stream parameters')
734 params = {}
733 params = {}
735 paramssize = self._unpack(_fstreamparamsize)[0]
734 paramssize = self._unpack(_fstreamparamsize)[0]
736 if paramssize < 0:
735 if paramssize < 0:
737 raise error.BundleValueError('negative bundle param size: %i'
736 raise error.BundleValueError('negative bundle param size: %i'
738 % paramssize)
737 % paramssize)
739 if paramssize:
738 if paramssize:
740 params = self._readexact(paramssize)
739 params = self._readexact(paramssize)
741 params = self._processallparams(params)
740 params = self._processallparams(params)
742 return params
741 return params
743
742
744 def _processallparams(self, paramsblock):
743 def _processallparams(self, paramsblock):
745 """"""
744 """"""
746 params = util.sortdict()
745 params = util.sortdict()
747 for p in paramsblock.split(' '):
746 for p in paramsblock.split(' '):
748 p = p.split('=', 1)
747 p = p.split('=', 1)
749 p = [urlreq.unquote(i) for i in p]
748 p = [urlreq.unquote(i) for i in p]
750 if len(p) < 2:
749 if len(p) < 2:
751 p.append(None)
750 p.append(None)
752 self._processparam(*p)
751 self._processparam(*p)
753 params[p[0]] = p[1]
752 params[p[0]] = p[1]
754 return params
753 return params
755
754
756
755
757 def _processparam(self, name, value):
756 def _processparam(self, name, value):
758 """process a parameter, applying its effect if needed
757 """process a parameter, applying its effect if needed
759
758
760 Parameter starting with a lower case letter are advisory and will be
759 Parameter starting with a lower case letter are advisory and will be
761 ignored when unknown. Those starting with an upper case letter are
760 ignored when unknown. Those starting with an upper case letter are
762 mandatory and will this function will raise a KeyError when unknown.
761 mandatory and will this function will raise a KeyError when unknown.
763
762
764 Note: no option are currently supported. Any input will be either
763 Note: no option are currently supported. Any input will be either
765 ignored or failing.
764 ignored or failing.
766 """
765 """
767 if not name:
766 if not name:
768 raise ValueError('empty parameter name')
767 raise ValueError('empty parameter name')
769 if name[0] not in string.letters:
768 if name[0] not in string.letters:
770 raise ValueError('non letter first character: %r' % name)
769 raise ValueError('non letter first character: %r' % name)
771 try:
770 try:
772 handler = b2streamparamsmap[name.lower()]
771 handler = b2streamparamsmap[name.lower()]
773 except KeyError:
772 except KeyError:
774 if name[0].islower():
773 if name[0].islower():
775 indebug(self.ui, "ignoring unknown parameter %r" % name)
774 indebug(self.ui, "ignoring unknown parameter %r" % name)
776 else:
775 else:
777 raise error.BundleUnknownFeatureError(params=(name,))
776 raise error.BundleUnknownFeatureError(params=(name,))
778 else:
777 else:
779 handler(self, name, value)
778 handler(self, name, value)
780
779
781 def _forwardchunks(self):
780 def _forwardchunks(self):
782 """utility to transfer a bundle2 as binary
781 """utility to transfer a bundle2 as binary
783
782
784 This is made necessary by the fact the 'getbundle' command over 'ssh'
783 This is made necessary by the fact the 'getbundle' command over 'ssh'
785 have no way to know then the reply end, relying on the bundle to be
784 have no way to know then the reply end, relying on the bundle to be
786 interpreted to know its end. This is terrible and we are sorry, but we
785 interpreted to know its end. This is terrible and we are sorry, but we
787 needed to move forward to get general delta enabled.
786 needed to move forward to get general delta enabled.
788 """
787 """
789 yield self._magicstring
788 yield self._magicstring
790 assert 'params' not in vars(self)
789 assert 'params' not in vars(self)
791 paramssize = self._unpack(_fstreamparamsize)[0]
790 paramssize = self._unpack(_fstreamparamsize)[0]
792 if paramssize < 0:
791 if paramssize < 0:
793 raise error.BundleValueError('negative bundle param size: %i'
792 raise error.BundleValueError('negative bundle param size: %i'
794 % paramssize)
793 % paramssize)
795 yield _pack(_fstreamparamsize, paramssize)
794 yield _pack(_fstreamparamsize, paramssize)
796 if paramssize:
795 if paramssize:
797 params = self._readexact(paramssize)
796 params = self._readexact(paramssize)
798 self._processallparams(params)
797 self._processallparams(params)
799 yield params
798 yield params
800 assert self._compengine.bundletype == 'UN'
799 assert self._compengine.bundletype == 'UN'
801 # From there, payload might need to be decompressed
800 # From there, payload might need to be decompressed
802 self._fp = self._compengine.decompressorreader(self._fp)
801 self._fp = self._compengine.decompressorreader(self._fp)
803 emptycount = 0
802 emptycount = 0
804 while emptycount < 2:
803 while emptycount < 2:
805 # so we can brainlessly loop
804 # so we can brainlessly loop
806 assert _fpartheadersize == _fpayloadsize
805 assert _fpartheadersize == _fpayloadsize
807 size = self._unpack(_fpartheadersize)[0]
806 size = self._unpack(_fpartheadersize)[0]
808 yield _pack(_fpartheadersize, size)
807 yield _pack(_fpartheadersize, size)
809 if size:
808 if size:
810 emptycount = 0
809 emptycount = 0
811 else:
810 else:
812 emptycount += 1
811 emptycount += 1
813 continue
812 continue
814 if size == flaginterrupt:
813 if size == flaginterrupt:
815 continue
814 continue
816 elif size < 0:
815 elif size < 0:
817 raise error.BundleValueError('negative chunk size: %i')
816 raise error.BundleValueError('negative chunk size: %i')
818 yield self._readexact(size)
817 yield self._readexact(size)
819
818
820
819
821 def iterparts(self):
820 def iterparts(self):
822 """yield all parts contained in the stream"""
821 """yield all parts contained in the stream"""
823 # make sure param have been loaded
822 # make sure param have been loaded
824 self.params
823 self.params
825 # From there, payload need to be decompressed
824 # From there, payload need to be decompressed
826 self._fp = self._compengine.decompressorreader(self._fp)
825 self._fp = self._compengine.decompressorreader(self._fp)
827 indebug(self.ui, 'start extraction of bundle2 parts')
826 indebug(self.ui, 'start extraction of bundle2 parts')
828 headerblock = self._readpartheader()
827 headerblock = self._readpartheader()
829 while headerblock is not None:
828 while headerblock is not None:
830 part = unbundlepart(self.ui, headerblock, self._fp)
829 part = unbundlepart(self.ui, headerblock, self._fp)
831 yield part
830 yield part
832 part.seek(0, 2)
831 part.seek(0, 2)
833 headerblock = self._readpartheader()
832 headerblock = self._readpartheader()
834 indebug(self.ui, 'end of bundle2 stream')
833 indebug(self.ui, 'end of bundle2 stream')
835
834
836 def _readpartheader(self):
835 def _readpartheader(self):
837 """reads a part header size and return the bytes blob
836 """reads a part header size and return the bytes blob
838
837
839 returns None if empty"""
838 returns None if empty"""
840 headersize = self._unpack(_fpartheadersize)[0]
839 headersize = self._unpack(_fpartheadersize)[0]
841 if headersize < 0:
840 if headersize < 0:
842 raise error.BundleValueError('negative part header size: %i'
841 raise error.BundleValueError('negative part header size: %i'
843 % headersize)
842 % headersize)
844 indebug(self.ui, 'part header size: %i' % headersize)
843 indebug(self.ui, 'part header size: %i' % headersize)
845 if headersize:
844 if headersize:
846 return self._readexact(headersize)
845 return self._readexact(headersize)
847 return None
846 return None
848
847
849 def compressed(self):
848 def compressed(self):
850 self.params # load params
849 self.params # load params
851 return self._compressed
850 return self._compressed
852
851
853 def close(self):
852 def close(self):
854 """close underlying file"""
853 """close underlying file"""
855 if util.safehasattr(self._fp, 'close'):
854 if util.safehasattr(self._fp, 'close'):
856 return self._fp.close()
855 return self._fp.close()
857
856
858 formatmap = {'20': unbundle20}
857 formatmap = {'20': unbundle20}
859
858
860 b2streamparamsmap = {}
859 b2streamparamsmap = {}
861
860
862 def b2streamparamhandler(name):
861 def b2streamparamhandler(name):
863 """register a handler for a stream level parameter"""
862 """register a handler for a stream level parameter"""
864 def decorator(func):
863 def decorator(func):
865 assert name not in formatmap
864 assert name not in formatmap
866 b2streamparamsmap[name] = func
865 b2streamparamsmap[name] = func
867 return func
866 return func
868 return decorator
867 return decorator
869
868
870 @b2streamparamhandler('compression')
869 @b2streamparamhandler('compression')
871 def processcompression(unbundler, param, value):
870 def processcompression(unbundler, param, value):
872 """read compression parameter and install payload decompression"""
871 """read compression parameter and install payload decompression"""
873 if value not in util.compengines.supportedbundletypes:
872 if value not in util.compengines.supportedbundletypes:
874 raise error.BundleUnknownFeatureError(params=(param,),
873 raise error.BundleUnknownFeatureError(params=(param,),
875 values=(value,))
874 values=(value,))
876 unbundler._compengine = util.compengines.forbundletype(value)
875 unbundler._compengine = util.compengines.forbundletype(value)
877 if value is not None:
876 if value is not None:
878 unbundler._compressed = True
877 unbundler._compressed = True
879
878
880 class bundlepart(object):
879 class bundlepart(object):
881 """A bundle2 part contains application level payload
880 """A bundle2 part contains application level payload
882
881
883 The part `type` is used to route the part to the application level
882 The part `type` is used to route the part to the application level
884 handler.
883 handler.
885
884
886 The part payload is contained in ``part.data``. It could be raw bytes or a
885 The part payload is contained in ``part.data``. It could be raw bytes or a
887 generator of byte chunks.
886 generator of byte chunks.
888
887
889 You can add parameters to the part using the ``addparam`` method.
888 You can add parameters to the part using the ``addparam`` method.
890 Parameters can be either mandatory (default) or advisory. Remote side
889 Parameters can be either mandatory (default) or advisory. Remote side
891 should be able to safely ignore the advisory ones.
890 should be able to safely ignore the advisory ones.
892
891
893 Both data and parameters cannot be modified after the generation has begun.
892 Both data and parameters cannot be modified after the generation has begun.
894 """
893 """
895
894
896 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
895 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
897 data='', mandatory=True):
896 data='', mandatory=True):
898 validateparttype(parttype)
897 validateparttype(parttype)
899 self.id = None
898 self.id = None
900 self.type = parttype
899 self.type = parttype
901 self._data = data
900 self._data = data
902 self._mandatoryparams = list(mandatoryparams)
901 self._mandatoryparams = list(mandatoryparams)
903 self._advisoryparams = list(advisoryparams)
902 self._advisoryparams = list(advisoryparams)
904 # checking for duplicated entries
903 # checking for duplicated entries
905 self._seenparams = set()
904 self._seenparams = set()
906 for pname, __ in self._mandatoryparams + self._advisoryparams:
905 for pname, __ in self._mandatoryparams + self._advisoryparams:
907 if pname in self._seenparams:
906 if pname in self._seenparams:
908 raise error.ProgrammingError('duplicated params: %s' % pname)
907 raise error.ProgrammingError('duplicated params: %s' % pname)
909 self._seenparams.add(pname)
908 self._seenparams.add(pname)
910 # status of the part's generation:
909 # status of the part's generation:
911 # - None: not started,
910 # - None: not started,
912 # - False: currently generated,
911 # - False: currently generated,
913 # - True: generation done.
912 # - True: generation done.
914 self._generated = None
913 self._generated = None
915 self.mandatory = mandatory
914 self.mandatory = mandatory
916
915
917 def __repr__(self):
916 def __repr__(self):
918 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
917 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
919 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
918 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
920 % (cls, id(self), self.id, self.type, self.mandatory))
919 % (cls, id(self), self.id, self.type, self.mandatory))
921
920
922 def copy(self):
921 def copy(self):
923 """return a copy of the part
922 """return a copy of the part
924
923
925 The new part have the very same content but no partid assigned yet.
924 The new part have the very same content but no partid assigned yet.
926 Parts with generated data cannot be copied."""
925 Parts with generated data cannot be copied."""
927 assert not util.safehasattr(self.data, 'next')
926 assert not util.safehasattr(self.data, 'next')
928 return self.__class__(self.type, self._mandatoryparams,
927 return self.__class__(self.type, self._mandatoryparams,
929 self._advisoryparams, self._data, self.mandatory)
928 self._advisoryparams, self._data, self.mandatory)
930
929
931 # methods used to defines the part content
930 # methods used to defines the part content
932 @property
931 @property
933 def data(self):
932 def data(self):
934 return self._data
933 return self._data
935
934
936 @data.setter
935 @data.setter
937 def data(self, data):
936 def data(self, data):
938 if self._generated is not None:
937 if self._generated is not None:
939 raise error.ReadOnlyPartError('part is being generated')
938 raise error.ReadOnlyPartError('part is being generated')
940 self._data = data
939 self._data = data
941
940
942 @property
941 @property
943 def mandatoryparams(self):
942 def mandatoryparams(self):
944 # make it an immutable tuple to force people through ``addparam``
943 # make it an immutable tuple to force people through ``addparam``
945 return tuple(self._mandatoryparams)
944 return tuple(self._mandatoryparams)
946
945
947 @property
946 @property
948 def advisoryparams(self):
947 def advisoryparams(self):
949 # make it an immutable tuple to force people through ``addparam``
948 # make it an immutable tuple to force people through ``addparam``
950 return tuple(self._advisoryparams)
949 return tuple(self._advisoryparams)
951
950
952 def addparam(self, name, value='', mandatory=True):
951 def addparam(self, name, value='', mandatory=True):
953 """add a parameter to the part
952 """add a parameter to the part
954
953
955 If 'mandatory' is set to True, the remote handler must claim support
954 If 'mandatory' is set to True, the remote handler must claim support
956 for this parameter or the unbundling will be aborted.
955 for this parameter or the unbundling will be aborted.
957
956
958 The 'name' and 'value' cannot exceed 255 bytes each.
957 The 'name' and 'value' cannot exceed 255 bytes each.
959 """
958 """
960 if self._generated is not None:
959 if self._generated is not None:
961 raise error.ReadOnlyPartError('part is being generated')
960 raise error.ReadOnlyPartError('part is being generated')
962 if name in self._seenparams:
961 if name in self._seenparams:
963 raise ValueError('duplicated params: %s' % name)
962 raise ValueError('duplicated params: %s' % name)
964 self._seenparams.add(name)
963 self._seenparams.add(name)
965 params = self._advisoryparams
964 params = self._advisoryparams
966 if mandatory:
965 if mandatory:
967 params = self._mandatoryparams
966 params = self._mandatoryparams
968 params.append((name, value))
967 params.append((name, value))
969
968
970 # methods used to generates the bundle2 stream
969 # methods used to generates the bundle2 stream
971 def getchunks(self, ui):
970 def getchunks(self, ui):
972 if self._generated is not None:
971 if self._generated is not None:
973 raise error.ProgrammingError('part can only be consumed once')
972 raise error.ProgrammingError('part can only be consumed once')
974 self._generated = False
973 self._generated = False
975
974
976 if ui.debugflag:
975 if ui.debugflag:
977 msg = ['bundle2-output-part: "%s"' % self.type]
976 msg = ['bundle2-output-part: "%s"' % self.type]
978 if not self.mandatory:
977 if not self.mandatory:
979 msg.append(' (advisory)')
978 msg.append(' (advisory)')
980 nbmp = len(self.mandatoryparams)
979 nbmp = len(self.mandatoryparams)
981 nbap = len(self.advisoryparams)
980 nbap = len(self.advisoryparams)
982 if nbmp or nbap:
981 if nbmp or nbap:
983 msg.append(' (params:')
982 msg.append(' (params:')
984 if nbmp:
983 if nbmp:
985 msg.append(' %i mandatory' % nbmp)
984 msg.append(' %i mandatory' % nbmp)
986 if nbap:
985 if nbap:
987 msg.append(' %i advisory' % nbmp)
986 msg.append(' %i advisory' % nbmp)
988 msg.append(')')
987 msg.append(')')
989 if not self.data:
988 if not self.data:
990 msg.append(' empty payload')
989 msg.append(' empty payload')
991 elif util.safehasattr(self.data, 'next'):
990 elif util.safehasattr(self.data, 'next'):
992 msg.append(' streamed payload')
991 msg.append(' streamed payload')
993 else:
992 else:
994 msg.append(' %i bytes payload' % len(self.data))
993 msg.append(' %i bytes payload' % len(self.data))
995 msg.append('\n')
994 msg.append('\n')
996 ui.debug(''.join(msg))
995 ui.debug(''.join(msg))
997
996
998 #### header
997 #### header
999 if self.mandatory:
998 if self.mandatory:
1000 parttype = self.type.upper()
999 parttype = self.type.upper()
1001 else:
1000 else:
1002 parttype = self.type.lower()
1001 parttype = self.type.lower()
1003 outdebug(ui, 'part %s: "%s"' % (pycompat.bytestr(self.id), parttype))
1002 outdebug(ui, 'part %s: "%s"' % (pycompat.bytestr(self.id), parttype))
1004 ## parttype
1003 ## parttype
1005 header = [_pack(_fparttypesize, len(parttype)),
1004 header = [_pack(_fparttypesize, len(parttype)),
1006 parttype, _pack(_fpartid, self.id),
1005 parttype, _pack(_fpartid, self.id),
1007 ]
1006 ]
1008 ## parameters
1007 ## parameters
1009 # count
1008 # count
1010 manpar = self.mandatoryparams
1009 manpar = self.mandatoryparams
1011 advpar = self.advisoryparams
1010 advpar = self.advisoryparams
1012 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
1011 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
1013 # size
1012 # size
1014 parsizes = []
1013 parsizes = []
1015 for key, value in manpar:
1014 for key, value in manpar:
1016 parsizes.append(len(key))
1015 parsizes.append(len(key))
1017 parsizes.append(len(value))
1016 parsizes.append(len(value))
1018 for key, value in advpar:
1017 for key, value in advpar:
1019 parsizes.append(len(key))
1018 parsizes.append(len(key))
1020 parsizes.append(len(value))
1019 parsizes.append(len(value))
1021 paramsizes = _pack(_makefpartparamsizes(len(parsizes) // 2), *parsizes)
1020 paramsizes = _pack(_makefpartparamsizes(len(parsizes) // 2), *parsizes)
1022 header.append(paramsizes)
1021 header.append(paramsizes)
1023 # key, value
1022 # key, value
1024 for key, value in manpar:
1023 for key, value in manpar:
1025 header.append(key)
1024 header.append(key)
1026 header.append(value)
1025 header.append(value)
1027 for key, value in advpar:
1026 for key, value in advpar:
1028 header.append(key)
1027 header.append(key)
1029 header.append(value)
1028 header.append(value)
1030 ## finalize header
1029 ## finalize header
1031 headerchunk = ''.join(header)
1030 headerchunk = ''.join(header)
1032 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
1031 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
1033 yield _pack(_fpartheadersize, len(headerchunk))
1032 yield _pack(_fpartheadersize, len(headerchunk))
1034 yield headerchunk
1033 yield headerchunk
1035 ## payload
1034 ## payload
1036 try:
1035 try:
1037 for chunk in self._payloadchunks():
1036 for chunk in self._payloadchunks():
1038 outdebug(ui, 'payload chunk size: %i' % len(chunk))
1037 outdebug(ui, 'payload chunk size: %i' % len(chunk))
1039 yield _pack(_fpayloadsize, len(chunk))
1038 yield _pack(_fpayloadsize, len(chunk))
1040 yield chunk
1039 yield chunk
1041 except GeneratorExit:
1040 except GeneratorExit:
1042 # GeneratorExit means that nobody is listening for our
1041 # GeneratorExit means that nobody is listening for our
1043 # results anyway, so just bail quickly rather than trying
1042 # results anyway, so just bail quickly rather than trying
1044 # to produce an error part.
1043 # to produce an error part.
1045 ui.debug('bundle2-generatorexit\n')
1044 ui.debug('bundle2-generatorexit\n')
1046 raise
1045 raise
1047 except BaseException as exc:
1046 except BaseException as exc:
1048 bexc = util.forcebytestr(exc)
1047 bexc = util.forcebytestr(exc)
1049 # backup exception data for later
1048 # backup exception data for later
1050 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
1049 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
1051 % bexc)
1050 % bexc)
1052 tb = sys.exc_info()[2]
1051 tb = sys.exc_info()[2]
1053 msg = 'unexpected error: %s' % bexc
1052 msg = 'unexpected error: %s' % bexc
1054 interpart = bundlepart('error:abort', [('message', msg)],
1053 interpart = bundlepart('error:abort', [('message', msg)],
1055 mandatory=False)
1054 mandatory=False)
1056 interpart.id = 0
1055 interpart.id = 0
1057 yield _pack(_fpayloadsize, -1)
1056 yield _pack(_fpayloadsize, -1)
1058 for chunk in interpart.getchunks(ui=ui):
1057 for chunk in interpart.getchunks(ui=ui):
1059 yield chunk
1058 yield chunk
1060 outdebug(ui, 'closing payload chunk')
1059 outdebug(ui, 'closing payload chunk')
1061 # abort current part payload
1060 # abort current part payload
1062 yield _pack(_fpayloadsize, 0)
1061 yield _pack(_fpayloadsize, 0)
1063 pycompat.raisewithtb(exc, tb)
1062 pycompat.raisewithtb(exc, tb)
1064 # end of payload
1063 # end of payload
1065 outdebug(ui, 'closing payload chunk')
1064 outdebug(ui, 'closing payload chunk')
1066 yield _pack(_fpayloadsize, 0)
1065 yield _pack(_fpayloadsize, 0)
1067 self._generated = True
1066 self._generated = True
1068
1067
1069 def _payloadchunks(self):
1068 def _payloadchunks(self):
1070 """yield chunks of a the part payload
1069 """yield chunks of a the part payload
1071
1070
1072 Exists to handle the different methods to provide data to a part."""
1071 Exists to handle the different methods to provide data to a part."""
1073 # we only support fixed size data now.
1072 # we only support fixed size data now.
1074 # This will be improved in the future.
1073 # This will be improved in the future.
1075 if (util.safehasattr(self.data, 'next')
1074 if (util.safehasattr(self.data, 'next')
1076 or util.safehasattr(self.data, '__next__')):
1075 or util.safehasattr(self.data, '__next__')):
1077 buff = util.chunkbuffer(self.data)
1076 buff = util.chunkbuffer(self.data)
1078 chunk = buff.read(preferedchunksize)
1077 chunk = buff.read(preferedchunksize)
1079 while chunk:
1078 while chunk:
1080 yield chunk
1079 yield chunk
1081 chunk = buff.read(preferedchunksize)
1080 chunk = buff.read(preferedchunksize)
1082 elif len(self.data):
1081 elif len(self.data):
1083 yield self.data
1082 yield self.data
1084
1083
1085
1084
1086 flaginterrupt = -1
1085 flaginterrupt = -1
1087
1086
1088 class interrupthandler(unpackermixin):
1087 class interrupthandler(unpackermixin):
1089 """read one part and process it with restricted capability
1088 """read one part and process it with restricted capability
1090
1089
1091 This allows to transmit exception raised on the producer size during part
1090 This allows to transmit exception raised on the producer size during part
1092 iteration while the consumer is reading a part.
1091 iteration while the consumer is reading a part.
1093
1092
1094 Part processed in this manner only have access to a ui object,"""
1093 Part processed in this manner only have access to a ui object,"""
1095
1094
1096 def __init__(self, ui, fp):
1095 def __init__(self, ui, fp):
1097 super(interrupthandler, self).__init__(fp)
1096 super(interrupthandler, self).__init__(fp)
1098 self.ui = ui
1097 self.ui = ui
1099
1098
1100 def _readpartheader(self):
1099 def _readpartheader(self):
1101 """reads a part header size and return the bytes blob
1100 """reads a part header size and return the bytes blob
1102
1101
1103 returns None if empty"""
1102 returns None if empty"""
1104 headersize = self._unpack(_fpartheadersize)[0]
1103 headersize = self._unpack(_fpartheadersize)[0]
1105 if headersize < 0:
1104 if headersize < 0:
1106 raise error.BundleValueError('negative part header size: %i'
1105 raise error.BundleValueError('negative part header size: %i'
1107 % headersize)
1106 % headersize)
1108 indebug(self.ui, 'part header size: %i\n' % headersize)
1107 indebug(self.ui, 'part header size: %i\n' % headersize)
1109 if headersize:
1108 if headersize:
1110 return self._readexact(headersize)
1109 return self._readexact(headersize)
1111 return None
1110 return None
1112
1111
1113 def __call__(self):
1112 def __call__(self):
1114
1113
1115 self.ui.debug('bundle2-input-stream-interrupt:'
1114 self.ui.debug('bundle2-input-stream-interrupt:'
1116 ' opening out of band context\n')
1115 ' opening out of band context\n')
1117 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1116 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1118 headerblock = self._readpartheader()
1117 headerblock = self._readpartheader()
1119 if headerblock is None:
1118 if headerblock is None:
1120 indebug(self.ui, 'no part found during interruption.')
1119 indebug(self.ui, 'no part found during interruption.')
1121 return
1120 return
1122 part = unbundlepart(self.ui, headerblock, self._fp)
1121 part = unbundlepart(self.ui, headerblock, self._fp)
1123 op = interruptoperation(self.ui)
1122 op = interruptoperation(self.ui)
1124 _processpart(op, part)
1123 _processpart(op, part)
1125 self.ui.debug('bundle2-input-stream-interrupt:'
1124 self.ui.debug('bundle2-input-stream-interrupt:'
1126 ' closing out of band context\n')
1125 ' closing out of band context\n')
1127
1126
1128 class interruptoperation(object):
1127 class interruptoperation(object):
1129 """A limited operation to be use by part handler during interruption
1128 """A limited operation to be use by part handler during interruption
1130
1129
1131 It only have access to an ui object.
1130 It only have access to an ui object.
1132 """
1131 """
1133
1132
1134 def __init__(self, ui):
1133 def __init__(self, ui):
1135 self.ui = ui
1134 self.ui = ui
1136 self.reply = None
1135 self.reply = None
1137 self.captureoutput = False
1136 self.captureoutput = False
1138
1137
1139 @property
1138 @property
1140 def repo(self):
1139 def repo(self):
1141 raise error.ProgrammingError('no repo access from stream interruption')
1140 raise error.ProgrammingError('no repo access from stream interruption')
1142
1141
1143 def gettransaction(self):
1142 def gettransaction(self):
1144 raise TransactionUnavailable('no repo access from stream interruption')
1143 raise TransactionUnavailable('no repo access from stream interruption')
1145
1144
1146 class unbundlepart(unpackermixin):
1145 class unbundlepart(unpackermixin):
1147 """a bundle part read from a bundle"""
1146 """a bundle part read from a bundle"""
1148
1147
1149 def __init__(self, ui, header, fp):
1148 def __init__(self, ui, header, fp):
1150 super(unbundlepart, self).__init__(fp)
1149 super(unbundlepart, self).__init__(fp)
1151 self._seekable = (util.safehasattr(fp, 'seek') and
1150 self._seekable = (util.safehasattr(fp, 'seek') and
1152 util.safehasattr(fp, 'tell'))
1151 util.safehasattr(fp, 'tell'))
1153 self.ui = ui
1152 self.ui = ui
1154 # unbundle state attr
1153 # unbundle state attr
1155 self._headerdata = header
1154 self._headerdata = header
1156 self._headeroffset = 0
1155 self._headeroffset = 0
1157 self._initialized = False
1156 self._initialized = False
1158 self.consumed = False
1157 self.consumed = False
1159 # part data
1158 # part data
1160 self.id = None
1159 self.id = None
1161 self.type = None
1160 self.type = None
1162 self.mandatoryparams = None
1161 self.mandatoryparams = None
1163 self.advisoryparams = None
1162 self.advisoryparams = None
1164 self.params = None
1163 self.params = None
1165 self.mandatorykeys = ()
1164 self.mandatorykeys = ()
1166 self._payloadstream = None
1165 self._payloadstream = None
1167 self._readheader()
1166 self._readheader()
1168 self._mandatory = None
1167 self._mandatory = None
1169 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1168 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1170 self._pos = 0
1169 self._pos = 0
1171
1170
1172 def _fromheader(self, size):
1171 def _fromheader(self, size):
1173 """return the next <size> byte from the header"""
1172 """return the next <size> byte from the header"""
1174 offset = self._headeroffset
1173 offset = self._headeroffset
1175 data = self._headerdata[offset:(offset + size)]
1174 data = self._headerdata[offset:(offset + size)]
1176 self._headeroffset = offset + size
1175 self._headeroffset = offset + size
1177 return data
1176 return data
1178
1177
1179 def _unpackheader(self, format):
1178 def _unpackheader(self, format):
1180 """read given format from header
1179 """read given format from header
1181
1180
1182 This automatically compute the size of the format to read."""
1181 This automatically compute the size of the format to read."""
1183 data = self._fromheader(struct.calcsize(format))
1182 data = self._fromheader(struct.calcsize(format))
1184 return _unpack(format, data)
1183 return _unpack(format, data)
1185
1184
1186 def _initparams(self, mandatoryparams, advisoryparams):
1185 def _initparams(self, mandatoryparams, advisoryparams):
1187 """internal function to setup all logic related parameters"""
1186 """internal function to setup all logic related parameters"""
1188 # make it read only to prevent people touching it by mistake.
1187 # make it read only to prevent people touching it by mistake.
1189 self.mandatoryparams = tuple(mandatoryparams)
1188 self.mandatoryparams = tuple(mandatoryparams)
1190 self.advisoryparams = tuple(advisoryparams)
1189 self.advisoryparams = tuple(advisoryparams)
1191 # user friendly UI
1190 # user friendly UI
1192 self.params = util.sortdict(self.mandatoryparams)
1191 self.params = util.sortdict(self.mandatoryparams)
1193 self.params.update(self.advisoryparams)
1192 self.params.update(self.advisoryparams)
1194 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1193 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1195
1194
1196 def _payloadchunks(self, chunknum=0):
1195 def _payloadchunks(self, chunknum=0):
1197 '''seek to specified chunk and start yielding data'''
1196 '''seek to specified chunk and start yielding data'''
1198 if len(self._chunkindex) == 0:
1197 if len(self._chunkindex) == 0:
1199 assert chunknum == 0, 'Must start with chunk 0'
1198 assert chunknum == 0, 'Must start with chunk 0'
1200 self._chunkindex.append((0, self._tellfp()))
1199 self._chunkindex.append((0, self._tellfp()))
1201 else:
1200 else:
1202 assert chunknum < len(self._chunkindex), \
1201 assert chunknum < len(self._chunkindex), \
1203 'Unknown chunk %d' % chunknum
1202 'Unknown chunk %d' % chunknum
1204 self._seekfp(self._chunkindex[chunknum][1])
1203 self._seekfp(self._chunkindex[chunknum][1])
1205
1204
1206 pos = self._chunkindex[chunknum][0]
1205 pos = self._chunkindex[chunknum][0]
1207 payloadsize = self._unpack(_fpayloadsize)[0]
1206 payloadsize = self._unpack(_fpayloadsize)[0]
1208 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1207 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1209 while payloadsize:
1208 while payloadsize:
1210 if payloadsize == flaginterrupt:
1209 if payloadsize == flaginterrupt:
1211 # interruption detection, the handler will now read a
1210 # interruption detection, the handler will now read a
1212 # single part and process it.
1211 # single part and process it.
1213 interrupthandler(self.ui, self._fp)()
1212 interrupthandler(self.ui, self._fp)()
1214 elif payloadsize < 0:
1213 elif payloadsize < 0:
1215 msg = 'negative payload chunk size: %i' % payloadsize
1214 msg = 'negative payload chunk size: %i' % payloadsize
1216 raise error.BundleValueError(msg)
1215 raise error.BundleValueError(msg)
1217 else:
1216 else:
1218 result = self._readexact(payloadsize)
1217 result = self._readexact(payloadsize)
1219 chunknum += 1
1218 chunknum += 1
1220 pos += payloadsize
1219 pos += payloadsize
1221 if chunknum == len(self._chunkindex):
1220 if chunknum == len(self._chunkindex):
1222 self._chunkindex.append((pos, self._tellfp()))
1221 self._chunkindex.append((pos, self._tellfp()))
1223 yield result
1222 yield result
1224 payloadsize = self._unpack(_fpayloadsize)[0]
1223 payloadsize = self._unpack(_fpayloadsize)[0]
1225 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1224 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1226
1225
1227 def _findchunk(self, pos):
1226 def _findchunk(self, pos):
1228 '''for a given payload position, return a chunk number and offset'''
1227 '''for a given payload position, return a chunk number and offset'''
1229 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1228 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1230 if ppos == pos:
1229 if ppos == pos:
1231 return chunk, 0
1230 return chunk, 0
1232 elif ppos > pos:
1231 elif ppos > pos:
1233 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1232 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1234 raise ValueError('Unknown chunk')
1233 raise ValueError('Unknown chunk')
1235
1234
1236 def _readheader(self):
1235 def _readheader(self):
1237 """read the header and setup the object"""
1236 """read the header and setup the object"""
1238 typesize = self._unpackheader(_fparttypesize)[0]
1237 typesize = self._unpackheader(_fparttypesize)[0]
1239 self.type = self._fromheader(typesize)
1238 self.type = self._fromheader(typesize)
1240 indebug(self.ui, 'part type: "%s"' % self.type)
1239 indebug(self.ui, 'part type: "%s"' % self.type)
1241 self.id = self._unpackheader(_fpartid)[0]
1240 self.id = self._unpackheader(_fpartid)[0]
1242 indebug(self.ui, 'part id: "%s"' % pycompat.bytestr(self.id))
1241 indebug(self.ui, 'part id: "%s"' % pycompat.bytestr(self.id))
1243 # extract mandatory bit from type
1242 # extract mandatory bit from type
1244 self.mandatory = (self.type != self.type.lower())
1243 self.mandatory = (self.type != self.type.lower())
1245 self.type = self.type.lower()
1244 self.type = self.type.lower()
1246 ## reading parameters
1245 ## reading parameters
1247 # param count
1246 # param count
1248 mancount, advcount = self._unpackheader(_fpartparamcount)
1247 mancount, advcount = self._unpackheader(_fpartparamcount)
1249 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1248 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1250 # param size
1249 # param size
1251 fparamsizes = _makefpartparamsizes(mancount + advcount)
1250 fparamsizes = _makefpartparamsizes(mancount + advcount)
1252 paramsizes = self._unpackheader(fparamsizes)
1251 paramsizes = self._unpackheader(fparamsizes)
1253 # make it a list of couple again
1252 # make it a list of couple again
1254 paramsizes = list(zip(paramsizes[::2], paramsizes[1::2]))
1253 paramsizes = list(zip(paramsizes[::2], paramsizes[1::2]))
1255 # split mandatory from advisory
1254 # split mandatory from advisory
1256 mansizes = paramsizes[:mancount]
1255 mansizes = paramsizes[:mancount]
1257 advsizes = paramsizes[mancount:]
1256 advsizes = paramsizes[mancount:]
1258 # retrieve param value
1257 # retrieve param value
1259 manparams = []
1258 manparams = []
1260 for key, value in mansizes:
1259 for key, value in mansizes:
1261 manparams.append((self._fromheader(key), self._fromheader(value)))
1260 manparams.append((self._fromheader(key), self._fromheader(value)))
1262 advparams = []
1261 advparams = []
1263 for key, value in advsizes:
1262 for key, value in advsizes:
1264 advparams.append((self._fromheader(key), self._fromheader(value)))
1263 advparams.append((self._fromheader(key), self._fromheader(value)))
1265 self._initparams(manparams, advparams)
1264 self._initparams(manparams, advparams)
1266 ## part payload
1265 ## part payload
1267 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1266 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1268 # we read the data, tell it
1267 # we read the data, tell it
1269 self._initialized = True
1268 self._initialized = True
1270
1269
1271 def read(self, size=None):
1270 def read(self, size=None):
1272 """read payload data"""
1271 """read payload data"""
1273 if not self._initialized:
1272 if not self._initialized:
1274 self._readheader()
1273 self._readheader()
1275 if size is None:
1274 if size is None:
1276 data = self._payloadstream.read()
1275 data = self._payloadstream.read()
1277 else:
1276 else:
1278 data = self._payloadstream.read(size)
1277 data = self._payloadstream.read(size)
1279 self._pos += len(data)
1278 self._pos += len(data)
1280 if size is None or len(data) < size:
1279 if size is None or len(data) < size:
1281 if not self.consumed and self._pos:
1280 if not self.consumed and self._pos:
1282 self.ui.debug('bundle2-input-part: total payload size %i\n'
1281 self.ui.debug('bundle2-input-part: total payload size %i\n'
1283 % self._pos)
1282 % self._pos)
1284 self.consumed = True
1283 self.consumed = True
1285 return data
1284 return data
1286
1285
1287 def tell(self):
1286 def tell(self):
1288 return self._pos
1287 return self._pos
1289
1288
1290 def seek(self, offset, whence=0):
1289 def seek(self, offset, whence=0):
1291 if whence == 0:
1290 if whence == 0:
1292 newpos = offset
1291 newpos = offset
1293 elif whence == 1:
1292 elif whence == 1:
1294 newpos = self._pos + offset
1293 newpos = self._pos + offset
1295 elif whence == 2:
1294 elif whence == 2:
1296 if not self.consumed:
1295 if not self.consumed:
1297 self.read()
1296 self.read()
1298 newpos = self._chunkindex[-1][0] - offset
1297 newpos = self._chunkindex[-1][0] - offset
1299 else:
1298 else:
1300 raise ValueError('Unknown whence value: %r' % (whence,))
1299 raise ValueError('Unknown whence value: %r' % (whence,))
1301
1300
1302 if newpos > self._chunkindex[-1][0] and not self.consumed:
1301 if newpos > self._chunkindex[-1][0] and not self.consumed:
1303 self.read()
1302 self.read()
1304 if not 0 <= newpos <= self._chunkindex[-1][0]:
1303 if not 0 <= newpos <= self._chunkindex[-1][0]:
1305 raise ValueError('Offset out of range')
1304 raise ValueError('Offset out of range')
1306
1305
1307 if self._pos != newpos:
1306 if self._pos != newpos:
1308 chunk, internaloffset = self._findchunk(newpos)
1307 chunk, internaloffset = self._findchunk(newpos)
1309 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1308 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1310 adjust = self.read(internaloffset)
1309 adjust = self.read(internaloffset)
1311 if len(adjust) != internaloffset:
1310 if len(adjust) != internaloffset:
1312 raise error.Abort(_('Seek failed\n'))
1311 raise error.Abort(_('Seek failed\n'))
1313 self._pos = newpos
1312 self._pos = newpos
1314
1313
1315 def _seekfp(self, offset, whence=0):
1314 def _seekfp(self, offset, whence=0):
1316 """move the underlying file pointer
1315 """move the underlying file pointer
1317
1316
1318 This method is meant for internal usage by the bundle2 protocol only.
1317 This method is meant for internal usage by the bundle2 protocol only.
1319 They directly manipulate the low level stream including bundle2 level
1318 They directly manipulate the low level stream including bundle2 level
1320 instruction.
1319 instruction.
1321
1320
1322 Do not use it to implement higher-level logic or methods."""
1321 Do not use it to implement higher-level logic or methods."""
1323 if self._seekable:
1322 if self._seekable:
1324 return self._fp.seek(offset, whence)
1323 return self._fp.seek(offset, whence)
1325 else:
1324 else:
1326 raise NotImplementedError(_('File pointer is not seekable'))
1325 raise NotImplementedError(_('File pointer is not seekable'))
1327
1326
1328 def _tellfp(self):
1327 def _tellfp(self):
1329 """return the file offset, or None if file is not seekable
1328 """return the file offset, or None if file is not seekable
1330
1329
1331 This method is meant for internal usage by the bundle2 protocol only.
1330 This method is meant for internal usage by the bundle2 protocol only.
1332 They directly manipulate the low level stream including bundle2 level
1331 They directly manipulate the low level stream including bundle2 level
1333 instruction.
1332 instruction.
1334
1333
1335 Do not use it to implement higher-level logic or methods."""
1334 Do not use it to implement higher-level logic or methods."""
1336 if self._seekable:
1335 if self._seekable:
1337 try:
1336 try:
1338 return self._fp.tell()
1337 return self._fp.tell()
1339 except IOError as e:
1338 except IOError as e:
1340 if e.errno == errno.ESPIPE:
1339 if e.errno == errno.ESPIPE:
1341 self._seekable = False
1340 self._seekable = False
1342 else:
1341 else:
1343 raise
1342 raise
1344 return None
1343 return None
1345
1344
1346 # These are only the static capabilities.
1345 # These are only the static capabilities.
1347 # Check the 'getrepocaps' function for the rest.
1346 # Check the 'getrepocaps' function for the rest.
1348 capabilities = {'HG20': (),
1347 capabilities = {'HG20': (),
1349 'error': ('abort', 'unsupportedcontent', 'pushraced',
1348 'error': ('abort', 'unsupportedcontent', 'pushraced',
1350 'pushkey'),
1349 'pushkey'),
1351 'listkeys': (),
1350 'listkeys': (),
1352 'pushkey': (),
1351 'pushkey': (),
1353 'digests': tuple(sorted(util.DIGESTS.keys())),
1352 'digests': tuple(sorted(util.DIGESTS.keys())),
1354 'remote-changegroup': ('http', 'https'),
1353 'remote-changegroup': ('http', 'https'),
1355 'hgtagsfnodes': (),
1354 'hgtagsfnodes': (),
1356 }
1355 }
1357
1356
1358 def getrepocaps(repo, allowpushback=False):
1357 def getrepocaps(repo, allowpushback=False):
1359 """return the bundle2 capabilities for a given repo
1358 """return the bundle2 capabilities for a given repo
1360
1359
1361 Exists to allow extensions (like evolution) to mutate the capabilities.
1360 Exists to allow extensions (like evolution) to mutate the capabilities.
1362 """
1361 """
1363 caps = capabilities.copy()
1362 caps = capabilities.copy()
1364 caps['changegroup'] = tuple(sorted(
1363 caps['changegroup'] = tuple(sorted(
1365 changegroup.supportedincomingversions(repo)))
1364 changegroup.supportedincomingversions(repo)))
1366 if obsolete.isenabled(repo, obsolete.exchangeopt):
1365 if obsolete.isenabled(repo, obsolete.exchangeopt):
1367 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1366 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1368 caps['obsmarkers'] = supportedformat
1367 caps['obsmarkers'] = supportedformat
1369 if allowpushback:
1368 if allowpushback:
1370 caps['pushback'] = ()
1369 caps['pushback'] = ()
1371 cpmode = repo.ui.config('server', 'concurrent-push-mode')
1370 cpmode = repo.ui.config('server', 'concurrent-push-mode')
1372 if cpmode == 'check-related':
1371 if cpmode == 'check-related':
1373 caps['checkheads'] = ('related',)
1372 caps['checkheads'] = ('related',)
1374 return caps
1373 return caps
1375
1374
1376 def bundle2caps(remote):
1375 def bundle2caps(remote):
1377 """return the bundle capabilities of a peer as dict"""
1376 """return the bundle capabilities of a peer as dict"""
1378 raw = remote.capable('bundle2')
1377 raw = remote.capable('bundle2')
1379 if not raw and raw != '':
1378 if not raw and raw != '':
1380 return {}
1379 return {}
1381 capsblob = urlreq.unquote(remote.capable('bundle2'))
1380 capsblob = urlreq.unquote(remote.capable('bundle2'))
1382 return decodecaps(capsblob)
1381 return decodecaps(capsblob)
1383
1382
1384 def obsmarkersversion(caps):
1383 def obsmarkersversion(caps):
1385 """extract the list of supported obsmarkers versions from a bundle2caps dict
1384 """extract the list of supported obsmarkers versions from a bundle2caps dict
1386 """
1385 """
1387 obscaps = caps.get('obsmarkers', ())
1386 obscaps = caps.get('obsmarkers', ())
1388 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1387 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1389
1388
1390 def writenewbundle(ui, repo, source, filename, bundletype, outgoing, opts,
1389 def writenewbundle(ui, repo, source, filename, bundletype, outgoing, opts,
1391 vfs=None, compression=None, compopts=None):
1390 vfs=None, compression=None, compopts=None):
1392 if bundletype.startswith('HG10'):
1391 if bundletype.startswith('HG10'):
1393 cg = changegroup.getchangegroup(repo, source, outgoing, version='01')
1392 cg = changegroup.getchangegroup(repo, source, outgoing, version='01')
1394 return writebundle(ui, cg, filename, bundletype, vfs=vfs,
1393 return writebundle(ui, cg, filename, bundletype, vfs=vfs,
1395 compression=compression, compopts=compopts)
1394 compression=compression, compopts=compopts)
1396 elif not bundletype.startswith('HG20'):
1395 elif not bundletype.startswith('HG20'):
1397 raise error.ProgrammingError('unknown bundle type: %s' % bundletype)
1396 raise error.ProgrammingError('unknown bundle type: %s' % bundletype)
1398
1397
1399 caps = {}
1398 caps = {}
1400 if 'obsolescence' in opts:
1399 if 'obsolescence' in opts:
1401 caps['obsmarkers'] = ('V1',)
1400 caps['obsmarkers'] = ('V1',)
1402 bundle = bundle20(ui, caps)
1401 bundle = bundle20(ui, caps)
1403 bundle.setcompression(compression, compopts)
1402 bundle.setcompression(compression, compopts)
1404 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1403 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1405 chunkiter = bundle.getchunks()
1404 chunkiter = bundle.getchunks()
1406
1405
1407 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1406 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1408
1407
1409 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1408 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1410 # We should eventually reconcile this logic with the one behind
1409 # We should eventually reconcile this logic with the one behind
1411 # 'exchange.getbundle2partsgenerator'.
1410 # 'exchange.getbundle2partsgenerator'.
1412 #
1411 #
1413 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1412 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1414 # different right now. So we keep them separated for now for the sake of
1413 # different right now. So we keep them separated for now for the sake of
1415 # simplicity.
1414 # simplicity.
1416
1415
1417 # we always want a changegroup in such bundle
1416 # we always want a changegroup in such bundle
1418 cgversion = opts.get('cg.version')
1417 cgversion = opts.get('cg.version')
1419 if cgversion is None:
1418 if cgversion is None:
1420 cgversion = changegroup.safeversion(repo)
1419 cgversion = changegroup.safeversion(repo)
1421 cg = changegroup.getchangegroup(repo, source, outgoing,
1420 cg = changegroup.getchangegroup(repo, source, outgoing,
1422 version=cgversion)
1421 version=cgversion)
1423 part = bundler.newpart('changegroup', data=cg.getchunks())
1422 part = bundler.newpart('changegroup', data=cg.getchunks())
1424 part.addparam('version', cg.version)
1423 part.addparam('version', cg.version)
1425 if 'clcount' in cg.extras:
1424 if 'clcount' in cg.extras:
1426 part.addparam('nbchanges', str(cg.extras['clcount']),
1425 part.addparam('nbchanges', str(cg.extras['clcount']),
1427 mandatory=False)
1426 mandatory=False)
1428 if opts.get('phases') and repo.revs('%ln and secret()',
1427 if opts.get('phases') and repo.revs('%ln and secret()',
1429 outgoing.missingheads):
1428 outgoing.missingheads):
1430 part.addparam('targetphase', '%d' % phases.secret, mandatory=False)
1429 part.addparam('targetphase', '%d' % phases.secret, mandatory=False)
1431
1430
1432 addparttagsfnodescache(repo, bundler, outgoing)
1431 addparttagsfnodescache(repo, bundler, outgoing)
1433
1432
1434 if opts.get('obsolescence', False):
1433 if opts.get('obsolescence', False):
1435 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1434 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1436 buildobsmarkerspart(bundler, obsmarkers)
1435 buildobsmarkerspart(bundler, obsmarkers)
1437
1436
1438 if opts.get('phases', False):
1437 if opts.get('phases', False):
1439 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1438 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1440 phasedata = []
1439 phasedata = []
1441 for phase in phases.allphases:
1440 for phase in phases.allphases:
1442 for head in headsbyphase[phase]:
1441 for head in headsbyphase[phase]:
1443 phasedata.append(_pack(_fphasesentry, phase, head))
1442 phasedata.append(_pack(_fphasesentry, phase, head))
1444 bundler.newpart('phase-heads', data=''.join(phasedata))
1443 bundler.newpart('phase-heads', data=''.join(phasedata))
1445
1444
1446 def addparttagsfnodescache(repo, bundler, outgoing):
1445 def addparttagsfnodescache(repo, bundler, outgoing):
1447 # we include the tags fnode cache for the bundle changeset
1446 # we include the tags fnode cache for the bundle changeset
1448 # (as an optional parts)
1447 # (as an optional parts)
1449 cache = tags.hgtagsfnodescache(repo.unfiltered())
1448 cache = tags.hgtagsfnodescache(repo.unfiltered())
1450 chunks = []
1449 chunks = []
1451
1450
1452 # .hgtags fnodes are only relevant for head changesets. While we could
1451 # .hgtags fnodes are only relevant for head changesets. While we could
1453 # transfer values for all known nodes, there will likely be little to
1452 # transfer values for all known nodes, there will likely be little to
1454 # no benefit.
1453 # no benefit.
1455 #
1454 #
1456 # We don't bother using a generator to produce output data because
1455 # We don't bother using a generator to produce output data because
1457 # a) we only have 40 bytes per head and even esoteric numbers of heads
1456 # a) we only have 40 bytes per head and even esoteric numbers of heads
1458 # consume little memory (1M heads is 40MB) b) we don't want to send the
1457 # consume little memory (1M heads is 40MB) b) we don't want to send the
1459 # part if we don't have entries and knowing if we have entries requires
1458 # part if we don't have entries and knowing if we have entries requires
1460 # cache lookups.
1459 # cache lookups.
1461 for node in outgoing.missingheads:
1460 for node in outgoing.missingheads:
1462 # Don't compute missing, as this may slow down serving.
1461 # Don't compute missing, as this may slow down serving.
1463 fnode = cache.getfnode(node, computemissing=False)
1462 fnode = cache.getfnode(node, computemissing=False)
1464 if fnode is not None:
1463 if fnode is not None:
1465 chunks.extend([node, fnode])
1464 chunks.extend([node, fnode])
1466
1465
1467 if chunks:
1466 if chunks:
1468 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1467 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1469
1468
1470 def buildobsmarkerspart(bundler, markers):
1469 def buildobsmarkerspart(bundler, markers):
1471 """add an obsmarker part to the bundler with <markers>
1470 """add an obsmarker part to the bundler with <markers>
1472
1471
1473 No part is created if markers is empty.
1472 No part is created if markers is empty.
1474 Raises ValueError if the bundler doesn't support any known obsmarker format.
1473 Raises ValueError if the bundler doesn't support any known obsmarker format.
1475 """
1474 """
1476 if not markers:
1475 if not markers:
1477 return None
1476 return None
1478
1477
1479 remoteversions = obsmarkersversion(bundler.capabilities)
1478 remoteversions = obsmarkersversion(bundler.capabilities)
1480 version = obsolete.commonversion(remoteversions)
1479 version = obsolete.commonversion(remoteversions)
1481 if version is None:
1480 if version is None:
1482 raise ValueError('bundler does not support common obsmarker format')
1481 raise ValueError('bundler does not support common obsmarker format')
1483 stream = obsolete.encodemarkers(markers, True, version=version)
1482 stream = obsolete.encodemarkers(markers, True, version=version)
1484 return bundler.newpart('obsmarkers', data=stream)
1483 return bundler.newpart('obsmarkers', data=stream)
1485
1484
1486 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1485 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1487 compopts=None):
1486 compopts=None):
1488 """Write a bundle file and return its filename.
1487 """Write a bundle file and return its filename.
1489
1488
1490 Existing files will not be overwritten.
1489 Existing files will not be overwritten.
1491 If no filename is specified, a temporary file is created.
1490 If no filename is specified, a temporary file is created.
1492 bz2 compression can be turned off.
1491 bz2 compression can be turned off.
1493 The bundle file will be deleted in case of errors.
1492 The bundle file will be deleted in case of errors.
1494 """
1493 """
1495
1494
1496 if bundletype == "HG20":
1495 if bundletype == "HG20":
1497 bundle = bundle20(ui)
1496 bundle = bundle20(ui)
1498 bundle.setcompression(compression, compopts)
1497 bundle.setcompression(compression, compopts)
1499 part = bundle.newpart('changegroup', data=cg.getchunks())
1498 part = bundle.newpart('changegroup', data=cg.getchunks())
1500 part.addparam('version', cg.version)
1499 part.addparam('version', cg.version)
1501 if 'clcount' in cg.extras:
1500 if 'clcount' in cg.extras:
1502 part.addparam('nbchanges', str(cg.extras['clcount']),
1501 part.addparam('nbchanges', str(cg.extras['clcount']),
1503 mandatory=False)
1502 mandatory=False)
1504 chunkiter = bundle.getchunks()
1503 chunkiter = bundle.getchunks()
1505 else:
1504 else:
1506 # compression argument is only for the bundle2 case
1505 # compression argument is only for the bundle2 case
1507 assert compression is None
1506 assert compression is None
1508 if cg.version != '01':
1507 if cg.version != '01':
1509 raise error.Abort(_('old bundle types only supports v1 '
1508 raise error.Abort(_('old bundle types only supports v1 '
1510 'changegroups'))
1509 'changegroups'))
1511 header, comp = bundletypes[bundletype]
1510 header, comp = bundletypes[bundletype]
1512 if comp not in util.compengines.supportedbundletypes:
1511 if comp not in util.compengines.supportedbundletypes:
1513 raise error.Abort(_('unknown stream compression type: %s')
1512 raise error.Abort(_('unknown stream compression type: %s')
1514 % comp)
1513 % comp)
1515 compengine = util.compengines.forbundletype(comp)
1514 compengine = util.compengines.forbundletype(comp)
1516 def chunkiter():
1515 def chunkiter():
1517 yield header
1516 yield header
1518 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1517 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1519 yield chunk
1518 yield chunk
1520 chunkiter = chunkiter()
1519 chunkiter = chunkiter()
1521
1520
1522 # parse the changegroup data, otherwise we will block
1521 # parse the changegroup data, otherwise we will block
1523 # in case of sshrepo because we don't know the end of the stream
1522 # in case of sshrepo because we don't know the end of the stream
1524 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1523 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1525
1524
1526 def combinechangegroupresults(op):
1525 def combinechangegroupresults(op):
1527 """logic to combine 0 or more addchangegroup results into one"""
1526 """logic to combine 0 or more addchangegroup results into one"""
1528 results = [r.get('return', 0)
1527 results = [r.get('return', 0)
1529 for r in op.records['changegroup']]
1528 for r in op.records['changegroup']]
1530 changedheads = 0
1529 changedheads = 0
1531 result = 1
1530 result = 1
1532 for ret in results:
1531 for ret in results:
1533 # If any changegroup result is 0, return 0
1532 # If any changegroup result is 0, return 0
1534 if ret == 0:
1533 if ret == 0:
1535 result = 0
1534 result = 0
1536 break
1535 break
1537 if ret < -1:
1536 if ret < -1:
1538 changedheads += ret + 1
1537 changedheads += ret + 1
1539 elif ret > 1:
1538 elif ret > 1:
1540 changedheads += ret - 1
1539 changedheads += ret - 1
1541 if changedheads > 0:
1540 if changedheads > 0:
1542 result = 1 + changedheads
1541 result = 1 + changedheads
1543 elif changedheads < 0:
1542 elif changedheads < 0:
1544 result = -1 + changedheads
1543 result = -1 + changedheads
1545 return result
1544 return result
1546
1545
1547 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest',
1546 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest',
1548 'targetphase'))
1547 'targetphase'))
1549 def handlechangegroup(op, inpart):
1548 def handlechangegroup(op, inpart):
1550 """apply a changegroup part on the repo
1549 """apply a changegroup part on the repo
1551
1550
1552 This is a very early implementation that will massive rework before being
1551 This is a very early implementation that will massive rework before being
1553 inflicted to any end-user.
1552 inflicted to any end-user.
1554 """
1553 """
1555 tr = op.gettransaction()
1554 tr = op.gettransaction()
1556 unpackerversion = inpart.params.get('version', '01')
1555 unpackerversion = inpart.params.get('version', '01')
1557 # We should raise an appropriate exception here
1556 # We should raise an appropriate exception here
1558 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1557 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1559 # the source and url passed here are overwritten by the one contained in
1558 # the source and url passed here are overwritten by the one contained in
1560 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1559 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1561 nbchangesets = None
1560 nbchangesets = None
1562 if 'nbchanges' in inpart.params:
1561 if 'nbchanges' in inpart.params:
1563 nbchangesets = int(inpart.params.get('nbchanges'))
1562 nbchangesets = int(inpart.params.get('nbchanges'))
1564 if ('treemanifest' in inpart.params and
1563 if ('treemanifest' in inpart.params and
1565 'treemanifest' not in op.repo.requirements):
1564 'treemanifest' not in op.repo.requirements):
1566 if len(op.repo.changelog) != 0:
1565 if len(op.repo.changelog) != 0:
1567 raise error.Abort(_(
1566 raise error.Abort(_(
1568 "bundle contains tree manifests, but local repo is "
1567 "bundle contains tree manifests, but local repo is "
1569 "non-empty and does not use tree manifests"))
1568 "non-empty and does not use tree manifests"))
1570 op.repo.requirements.add('treemanifest')
1569 op.repo.requirements.add('treemanifest')
1571 op.repo._applyopenerreqs()
1570 op.repo._applyopenerreqs()
1572 op.repo._writerequirements()
1571 op.repo._writerequirements()
1573 extrakwargs = {}
1572 extrakwargs = {}
1574 targetphase = inpart.params.get('targetphase')
1573 targetphase = inpart.params.get('targetphase')
1575 if targetphase is not None:
1574 if targetphase is not None:
1576 extrakwargs['targetphase'] = int(targetphase)
1575 extrakwargs['targetphase'] = int(targetphase)
1577 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2',
1576 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2',
1578 expectedtotal=nbchangesets, **extrakwargs)
1577 expectedtotal=nbchangesets, **extrakwargs)
1579 if op.reply is not None:
1578 if op.reply is not None:
1580 # This is definitely not the final form of this
1579 # This is definitely not the final form of this
1581 # return. But one need to start somewhere.
1580 # return. But one need to start somewhere.
1582 part = op.reply.newpart('reply:changegroup', mandatory=False)
1581 part = op.reply.newpart('reply:changegroup', mandatory=False)
1583 part.addparam(
1582 part.addparam(
1584 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1583 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1585 part.addparam('return', '%i' % ret, mandatory=False)
1584 part.addparam('return', '%i' % ret, mandatory=False)
1586 assert not inpart.read()
1585 assert not inpart.read()
1587
1586
1588 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1587 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1589 ['digest:%s' % k for k in util.DIGESTS.keys()])
1588 ['digest:%s' % k for k in util.DIGESTS.keys()])
1590 @parthandler('remote-changegroup', _remotechangegroupparams)
1589 @parthandler('remote-changegroup', _remotechangegroupparams)
1591 def handleremotechangegroup(op, inpart):
1590 def handleremotechangegroup(op, inpart):
1592 """apply a bundle10 on the repo, given an url and validation information
1591 """apply a bundle10 on the repo, given an url and validation information
1593
1592
1594 All the information about the remote bundle to import are given as
1593 All the information about the remote bundle to import are given as
1595 parameters. The parameters include:
1594 parameters. The parameters include:
1596 - url: the url to the bundle10.
1595 - url: the url to the bundle10.
1597 - size: the bundle10 file size. It is used to validate what was
1596 - size: the bundle10 file size. It is used to validate what was
1598 retrieved by the client matches the server knowledge about the bundle.
1597 retrieved by the client matches the server knowledge about the bundle.
1599 - digests: a space separated list of the digest types provided as
1598 - digests: a space separated list of the digest types provided as
1600 parameters.
1599 parameters.
1601 - digest:<digest-type>: the hexadecimal representation of the digest with
1600 - digest:<digest-type>: the hexadecimal representation of the digest with
1602 that name. Like the size, it is used to validate what was retrieved by
1601 that name. Like the size, it is used to validate what was retrieved by
1603 the client matches what the server knows about the bundle.
1602 the client matches what the server knows about the bundle.
1604
1603
1605 When multiple digest types are given, all of them are checked.
1604 When multiple digest types are given, all of them are checked.
1606 """
1605 """
1607 try:
1606 try:
1608 raw_url = inpart.params['url']
1607 raw_url = inpart.params['url']
1609 except KeyError:
1608 except KeyError:
1610 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1609 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1611 parsed_url = util.url(raw_url)
1610 parsed_url = util.url(raw_url)
1612 if parsed_url.scheme not in capabilities['remote-changegroup']:
1611 if parsed_url.scheme not in capabilities['remote-changegroup']:
1613 raise error.Abort(_('remote-changegroup does not support %s urls') %
1612 raise error.Abort(_('remote-changegroup does not support %s urls') %
1614 parsed_url.scheme)
1613 parsed_url.scheme)
1615
1614
1616 try:
1615 try:
1617 size = int(inpart.params['size'])
1616 size = int(inpart.params['size'])
1618 except ValueError:
1617 except ValueError:
1619 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1618 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1620 % 'size')
1619 % 'size')
1621 except KeyError:
1620 except KeyError:
1622 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1621 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1623
1622
1624 digests = {}
1623 digests = {}
1625 for typ in inpart.params.get('digests', '').split():
1624 for typ in inpart.params.get('digests', '').split():
1626 param = 'digest:%s' % typ
1625 param = 'digest:%s' % typ
1627 try:
1626 try:
1628 value = inpart.params[param]
1627 value = inpart.params[param]
1629 except KeyError:
1628 except KeyError:
1630 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1629 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1631 param)
1630 param)
1632 digests[typ] = value
1631 digests[typ] = value
1633
1632
1634 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1633 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1635
1634
1636 tr = op.gettransaction()
1635 tr = op.gettransaction()
1637 from . import exchange
1636 from . import exchange
1638 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1637 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1639 if not isinstance(cg, changegroup.cg1unpacker):
1638 if not isinstance(cg, changegroup.cg1unpacker):
1640 raise error.Abort(_('%s: not a bundle version 1.0') %
1639 raise error.Abort(_('%s: not a bundle version 1.0') %
1641 util.hidepassword(raw_url))
1640 util.hidepassword(raw_url))
1642 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2')
1641 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2')
1643 if op.reply is not None:
1642 if op.reply is not None:
1644 # This is definitely not the final form of this
1643 # This is definitely not the final form of this
1645 # return. But one need to start somewhere.
1644 # return. But one need to start somewhere.
1646 part = op.reply.newpart('reply:changegroup')
1645 part = op.reply.newpart('reply:changegroup')
1647 part.addparam(
1646 part.addparam(
1648 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1647 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1649 part.addparam('return', '%i' % ret, mandatory=False)
1648 part.addparam('return', '%i' % ret, mandatory=False)
1650 try:
1649 try:
1651 real_part.validate()
1650 real_part.validate()
1652 except error.Abort as e:
1651 except error.Abort as e:
1653 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1652 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1654 (util.hidepassword(raw_url), str(e)))
1653 (util.hidepassword(raw_url), str(e)))
1655 assert not inpart.read()
1654 assert not inpart.read()
1656
1655
1657 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1656 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1658 def handlereplychangegroup(op, inpart):
1657 def handlereplychangegroup(op, inpart):
1659 ret = int(inpart.params['return'])
1658 ret = int(inpart.params['return'])
1660 replyto = int(inpart.params['in-reply-to'])
1659 replyto = int(inpart.params['in-reply-to'])
1661 op.records.add('changegroup', {'return': ret}, replyto)
1660 op.records.add('changegroup', {'return': ret}, replyto)
1662
1661
1663 @parthandler('check:heads')
1662 @parthandler('check:heads')
1664 def handlecheckheads(op, inpart):
1663 def handlecheckheads(op, inpart):
1665 """check that head of the repo did not change
1664 """check that head of the repo did not change
1666
1665
1667 This is used to detect a push race when using unbundle.
1666 This is used to detect a push race when using unbundle.
1668 This replaces the "heads" argument of unbundle."""
1667 This replaces the "heads" argument of unbundle."""
1669 h = inpart.read(20)
1668 h = inpart.read(20)
1670 heads = []
1669 heads = []
1671 while len(h) == 20:
1670 while len(h) == 20:
1672 heads.append(h)
1671 heads.append(h)
1673 h = inpart.read(20)
1672 h = inpart.read(20)
1674 assert not h
1673 assert not h
1675 # Trigger a transaction so that we are guaranteed to have the lock now.
1674 # Trigger a transaction so that we are guaranteed to have the lock now.
1676 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1675 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1677 op.gettransaction()
1676 op.gettransaction()
1678 if sorted(heads) != sorted(op.repo.heads()):
1677 if sorted(heads) != sorted(op.repo.heads()):
1679 raise error.PushRaced('repository changed while pushing - '
1678 raise error.PushRaced('repository changed while pushing - '
1680 'please try again')
1679 'please try again')
1681
1680
1682 @parthandler('check:updated-heads')
1681 @parthandler('check:updated-heads')
1683 def handlecheckupdatedheads(op, inpart):
1682 def handlecheckupdatedheads(op, inpart):
1684 """check for race on the heads touched by a push
1683 """check for race on the heads touched by a push
1685
1684
1686 This is similar to 'check:heads' but focus on the heads actually updated
1685 This is similar to 'check:heads' but focus on the heads actually updated
1687 during the push. If other activities happen on unrelated heads, it is
1686 during the push. If other activities happen on unrelated heads, it is
1688 ignored.
1687 ignored.
1689
1688
1690 This allow server with high traffic to avoid push contention as long as
1689 This allow server with high traffic to avoid push contention as long as
1691 unrelated parts of the graph are involved."""
1690 unrelated parts of the graph are involved."""
1692 h = inpart.read(20)
1691 h = inpart.read(20)
1693 heads = []
1692 heads = []
1694 while len(h) == 20:
1693 while len(h) == 20:
1695 heads.append(h)
1694 heads.append(h)
1696 h = inpart.read(20)
1695 h = inpart.read(20)
1697 assert not h
1696 assert not h
1698 # trigger a transaction so that we are guaranteed to have the lock now.
1697 # trigger a transaction so that we are guaranteed to have the lock now.
1699 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1698 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1700 op.gettransaction()
1699 op.gettransaction()
1701
1700
1702 currentheads = set()
1701 currentheads = set()
1703 for ls in op.repo.branchmap().itervalues():
1702 for ls in op.repo.branchmap().itervalues():
1704 currentheads.update(ls)
1703 currentheads.update(ls)
1705
1704
1706 for h in heads:
1705 for h in heads:
1707 if h not in currentheads:
1706 if h not in currentheads:
1708 raise error.PushRaced('repository changed while pushing - '
1707 raise error.PushRaced('repository changed while pushing - '
1709 'please try again')
1708 'please try again')
1710
1709
1711 @parthandler('output')
1710 @parthandler('output')
1712 def handleoutput(op, inpart):
1711 def handleoutput(op, inpart):
1713 """forward output captured on the server to the client"""
1712 """forward output captured on the server to the client"""
1714 for line in inpart.read().splitlines():
1713 for line in inpart.read().splitlines():
1715 op.ui.status(_('remote: %s\n') % line)
1714 op.ui.status(_('remote: %s\n') % line)
1716
1715
1717 @parthandler('replycaps')
1716 @parthandler('replycaps')
1718 def handlereplycaps(op, inpart):
1717 def handlereplycaps(op, inpart):
1719 """Notify that a reply bundle should be created
1718 """Notify that a reply bundle should be created
1720
1719
1721 The payload contains the capabilities information for the reply"""
1720 The payload contains the capabilities information for the reply"""
1722 caps = decodecaps(inpart.read())
1721 caps = decodecaps(inpart.read())
1723 if op.reply is None:
1722 if op.reply is None:
1724 op.reply = bundle20(op.ui, caps)
1723 op.reply = bundle20(op.ui, caps)
1725
1724
1726 class AbortFromPart(error.Abort):
1725 class AbortFromPart(error.Abort):
1727 """Sub-class of Abort that denotes an error from a bundle2 part."""
1726 """Sub-class of Abort that denotes an error from a bundle2 part."""
1728
1727
1729 @parthandler('error:abort', ('message', 'hint'))
1728 @parthandler('error:abort', ('message', 'hint'))
1730 def handleerrorabort(op, inpart):
1729 def handleerrorabort(op, inpart):
1731 """Used to transmit abort error over the wire"""
1730 """Used to transmit abort error over the wire"""
1732 raise AbortFromPart(inpart.params['message'],
1731 raise AbortFromPart(inpart.params['message'],
1733 hint=inpart.params.get('hint'))
1732 hint=inpart.params.get('hint'))
1734
1733
1735 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1734 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1736 'in-reply-to'))
1735 'in-reply-to'))
1737 def handleerrorpushkey(op, inpart):
1736 def handleerrorpushkey(op, inpart):
1738 """Used to transmit failure of a mandatory pushkey over the wire"""
1737 """Used to transmit failure of a mandatory pushkey over the wire"""
1739 kwargs = {}
1738 kwargs = {}
1740 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1739 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1741 value = inpart.params.get(name)
1740 value = inpart.params.get(name)
1742 if value is not None:
1741 if value is not None:
1743 kwargs[name] = value
1742 kwargs[name] = value
1744 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1743 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1745
1744
1746 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1745 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1747 def handleerrorunsupportedcontent(op, inpart):
1746 def handleerrorunsupportedcontent(op, inpart):
1748 """Used to transmit unknown content error over the wire"""
1747 """Used to transmit unknown content error over the wire"""
1749 kwargs = {}
1748 kwargs = {}
1750 parttype = inpart.params.get('parttype')
1749 parttype = inpart.params.get('parttype')
1751 if parttype is not None:
1750 if parttype is not None:
1752 kwargs['parttype'] = parttype
1751 kwargs['parttype'] = parttype
1753 params = inpart.params.get('params')
1752 params = inpart.params.get('params')
1754 if params is not None:
1753 if params is not None:
1755 kwargs['params'] = params.split('\0')
1754 kwargs['params'] = params.split('\0')
1756
1755
1757 raise error.BundleUnknownFeatureError(**kwargs)
1756 raise error.BundleUnknownFeatureError(**kwargs)
1758
1757
1759 @parthandler('error:pushraced', ('message',))
1758 @parthandler('error:pushraced', ('message',))
1760 def handleerrorpushraced(op, inpart):
1759 def handleerrorpushraced(op, inpart):
1761 """Used to transmit push race error over the wire"""
1760 """Used to transmit push race error over the wire"""
1762 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1761 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1763
1762
1764 @parthandler('listkeys', ('namespace',))
1763 @parthandler('listkeys', ('namespace',))
1765 def handlelistkeys(op, inpart):
1764 def handlelistkeys(op, inpart):
1766 """retrieve pushkey namespace content stored in a bundle2"""
1765 """retrieve pushkey namespace content stored in a bundle2"""
1767 namespace = inpart.params['namespace']
1766 namespace = inpart.params['namespace']
1768 r = pushkey.decodekeys(inpart.read())
1767 r = pushkey.decodekeys(inpart.read())
1769 op.records.add('listkeys', (namespace, r))
1768 op.records.add('listkeys', (namespace, r))
1770
1769
1771 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1770 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1772 def handlepushkey(op, inpart):
1771 def handlepushkey(op, inpart):
1773 """process a pushkey request"""
1772 """process a pushkey request"""
1774 dec = pushkey.decode
1773 dec = pushkey.decode
1775 namespace = dec(inpart.params['namespace'])
1774 namespace = dec(inpart.params['namespace'])
1776 key = dec(inpart.params['key'])
1775 key = dec(inpart.params['key'])
1777 old = dec(inpart.params['old'])
1776 old = dec(inpart.params['old'])
1778 new = dec(inpart.params['new'])
1777 new = dec(inpart.params['new'])
1779 # Grab the transaction to ensure that we have the lock before performing the
1778 # Grab the transaction to ensure that we have the lock before performing the
1780 # pushkey.
1779 # pushkey.
1781 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1780 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1782 op.gettransaction()
1781 op.gettransaction()
1783 ret = op.repo.pushkey(namespace, key, old, new)
1782 ret = op.repo.pushkey(namespace, key, old, new)
1784 record = {'namespace': namespace,
1783 record = {'namespace': namespace,
1785 'key': key,
1784 'key': key,
1786 'old': old,
1785 'old': old,
1787 'new': new}
1786 'new': new}
1788 op.records.add('pushkey', record)
1787 op.records.add('pushkey', record)
1789 if op.reply is not None:
1788 if op.reply is not None:
1790 rpart = op.reply.newpart('reply:pushkey')
1789 rpart = op.reply.newpart('reply:pushkey')
1791 rpart.addparam(
1790 rpart.addparam(
1792 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1791 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1793 rpart.addparam('return', '%i' % ret, mandatory=False)
1792 rpart.addparam('return', '%i' % ret, mandatory=False)
1794 if inpart.mandatory and not ret:
1793 if inpart.mandatory and not ret:
1795 kwargs = {}
1794 kwargs = {}
1796 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1795 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1797 if key in inpart.params:
1796 if key in inpart.params:
1798 kwargs[key] = inpart.params[key]
1797 kwargs[key] = inpart.params[key]
1799 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1798 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1800
1799
1801 def _readphaseheads(inpart):
1800 def _readphaseheads(inpart):
1802 headsbyphase = [[] for i in phases.allphases]
1801 headsbyphase = [[] for i in phases.allphases]
1803 entrysize = struct.calcsize(_fphasesentry)
1802 entrysize = struct.calcsize(_fphasesentry)
1804 while True:
1803 while True:
1805 entry = inpart.read(entrysize)
1804 entry = inpart.read(entrysize)
1806 if len(entry) < entrysize:
1805 if len(entry) < entrysize:
1807 if entry:
1806 if entry:
1808 raise error.Abort(_('bad phase-heads bundle part'))
1807 raise error.Abort(_('bad phase-heads bundle part'))
1809 break
1808 break
1810 phase, node = struct.unpack(_fphasesentry, entry)
1809 phase, node = struct.unpack(_fphasesentry, entry)
1811 headsbyphase[phase].append(node)
1810 headsbyphase[phase].append(node)
1812 return headsbyphase
1811 return headsbyphase
1813
1812
1814 @parthandler('phase-heads')
1813 @parthandler('phase-heads')
1815 def handlephases(op, inpart):
1814 def handlephases(op, inpart):
1816 """apply phases from bundle part to repo"""
1815 """apply phases from bundle part to repo"""
1817 headsbyphase = _readphaseheads(inpart)
1816 headsbyphase = _readphaseheads(inpart)
1818 phases.updatephases(op.repo.unfiltered(), op.gettransaction(), headsbyphase)
1817 phases.updatephases(op.repo.unfiltered(), op.gettransaction(), headsbyphase)
1819
1818
1820 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1819 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1821 def handlepushkeyreply(op, inpart):
1820 def handlepushkeyreply(op, inpart):
1822 """retrieve the result of a pushkey request"""
1821 """retrieve the result of a pushkey request"""
1823 ret = int(inpart.params['return'])
1822 ret = int(inpart.params['return'])
1824 partid = int(inpart.params['in-reply-to'])
1823 partid = int(inpart.params['in-reply-to'])
1825 op.records.add('pushkey', {'return': ret}, partid)
1824 op.records.add('pushkey', {'return': ret}, partid)
1826
1825
1827 @parthandler('obsmarkers')
1826 @parthandler('obsmarkers')
1828 def handleobsmarker(op, inpart):
1827 def handleobsmarker(op, inpart):
1829 """add a stream of obsmarkers to the repo"""
1828 """add a stream of obsmarkers to the repo"""
1830 tr = op.gettransaction()
1829 tr = op.gettransaction()
1831 markerdata = inpart.read()
1830 markerdata = inpart.read()
1832 if op.ui.config('experimental', 'obsmarkers-exchange-debug'):
1831 if op.ui.config('experimental', 'obsmarkers-exchange-debug'):
1833 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1832 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1834 % len(markerdata))
1833 % len(markerdata))
1835 # The mergemarkers call will crash if marker creation is not enabled.
1834 # The mergemarkers call will crash if marker creation is not enabled.
1836 # we want to avoid this if the part is advisory.
1835 # we want to avoid this if the part is advisory.
1837 if not inpart.mandatory and op.repo.obsstore.readonly:
1836 if not inpart.mandatory and op.repo.obsstore.readonly:
1838 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1837 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1839 return
1838 return
1840 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1839 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1841 op.repo.invalidatevolatilesets()
1840 op.repo.invalidatevolatilesets()
1842 if new:
1841 if new:
1843 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1842 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1844 op.records.add('obsmarkers', {'new': new})
1843 op.records.add('obsmarkers', {'new': new})
1845 if op.reply is not None:
1844 if op.reply is not None:
1846 rpart = op.reply.newpart('reply:obsmarkers')
1845 rpart = op.reply.newpart('reply:obsmarkers')
1847 rpart.addparam(
1846 rpart.addparam(
1848 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1847 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1849 rpart.addparam('new', '%i' % new, mandatory=False)
1848 rpart.addparam('new', '%i' % new, mandatory=False)
1850
1849
1851
1850
1852 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1851 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1853 def handleobsmarkerreply(op, inpart):
1852 def handleobsmarkerreply(op, inpart):
1854 """retrieve the result of a pushkey request"""
1853 """retrieve the result of a pushkey request"""
1855 ret = int(inpart.params['new'])
1854 ret = int(inpart.params['new'])
1856 partid = int(inpart.params['in-reply-to'])
1855 partid = int(inpart.params['in-reply-to'])
1857 op.records.add('obsmarkers', {'new': ret}, partid)
1856 op.records.add('obsmarkers', {'new': ret}, partid)
1858
1857
1859 @parthandler('hgtagsfnodes')
1858 @parthandler('hgtagsfnodes')
1860 def handlehgtagsfnodes(op, inpart):
1859 def handlehgtagsfnodes(op, inpart):
1861 """Applies .hgtags fnodes cache entries to the local repo.
1860 """Applies .hgtags fnodes cache entries to the local repo.
1862
1861
1863 Payload is pairs of 20 byte changeset nodes and filenodes.
1862 Payload is pairs of 20 byte changeset nodes and filenodes.
1864 """
1863 """
1865 # Grab the transaction so we ensure that we have the lock at this point.
1864 # Grab the transaction so we ensure that we have the lock at this point.
1866 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1865 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1867 op.gettransaction()
1866 op.gettransaction()
1868 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1867 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1869
1868
1870 count = 0
1869 count = 0
1871 while True:
1870 while True:
1872 node = inpart.read(20)
1871 node = inpart.read(20)
1873 fnode = inpart.read(20)
1872 fnode = inpart.read(20)
1874 if len(node) < 20 or len(fnode) < 20:
1873 if len(node) < 20 or len(fnode) < 20:
1875 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1874 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1876 break
1875 break
1877 cache.setfnode(node, fnode)
1876 cache.setfnode(node, fnode)
1878 count += 1
1877 count += 1
1879
1878
1880 cache.write()
1879 cache.write()
1881 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
1880 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
1882
1881
1883 @parthandler('pushvars')
1882 @parthandler('pushvars')
1884 def bundle2getvars(op, part):
1883 def bundle2getvars(op, part):
1885 '''unbundle a bundle2 containing shellvars on the server'''
1884 '''unbundle a bundle2 containing shellvars on the server'''
1886 # An option to disable unbundling on server-side for security reasons
1885 # An option to disable unbundling on server-side for security reasons
1887 if op.ui.configbool('push', 'pushvars.server', False):
1886 if op.ui.configbool('push', 'pushvars.server', False):
1888 hookargs = {}
1887 hookargs = {}
1889 for key, value in part.advisoryparams:
1888 for key, value in part.advisoryparams:
1890 key = key.upper()
1889 key = key.upper()
1891 # We want pushed variables to have USERVAR_ prepended so we know
1890 # We want pushed variables to have USERVAR_ prepended so we know
1892 # they came from the --pushvar flag.
1891 # they came from the --pushvar flag.
1893 key = "USERVAR_" + key
1892 key = "USERVAR_" + key
1894 hookargs[key] = value
1893 hookargs[key] = value
1895 op.addhookargs(hookargs)
1894 op.addhookargs(hookargs)
General Comments 0
You need to be logged in to leave comments. Login now