##// END OF EJS Templates
bundle2: move exception handling into part iterator...
Durham Goode -
r34151:21c2df59 default
parent child Browse files
Show More
@@ -1,1919 +1,1917
1 # bundle2.py - generic container format to transmit arbitrary data.
1 # bundle2.py - generic container format to transmit arbitrary data.
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7 """Handling of the new bundle2 format
7 """Handling of the new bundle2 format
8
8
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 payloads in an application agnostic way. It consist in a sequence of "parts"
10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 that will be handed to and processed by the application layer.
11 that will be handed to and processed by the application layer.
12
12
13
13
14 General format architecture
14 General format architecture
15 ===========================
15 ===========================
16
16
17 The format is architectured as follow
17 The format is architectured as follow
18
18
19 - magic string
19 - magic string
20 - stream level parameters
20 - stream level parameters
21 - payload parts (any number)
21 - payload parts (any number)
22 - end of stream marker.
22 - end of stream marker.
23
23
24 the Binary format
24 the Binary format
25 ============================
25 ============================
26
26
27 All numbers are unsigned and big-endian.
27 All numbers are unsigned and big-endian.
28
28
29 stream level parameters
29 stream level parameters
30 ------------------------
30 ------------------------
31
31
32 Binary format is as follow
32 Binary format is as follow
33
33
34 :params size: int32
34 :params size: int32
35
35
36 The total number of Bytes used by the parameters
36 The total number of Bytes used by the parameters
37
37
38 :params value: arbitrary number of Bytes
38 :params value: arbitrary number of Bytes
39
39
40 A blob of `params size` containing the serialized version of all stream level
40 A blob of `params size` containing the serialized version of all stream level
41 parameters.
41 parameters.
42
42
43 The blob contains a space separated list of parameters. Parameters with value
43 The blob contains a space separated list of parameters. Parameters with value
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45
45
46 Empty name are obviously forbidden.
46 Empty name are obviously forbidden.
47
47
48 Name MUST start with a letter. If this first letter is lower case, the
48 Name MUST start with a letter. If this first letter is lower case, the
49 parameter is advisory and can be safely ignored. However when the first
49 parameter is advisory and can be safely ignored. However when the first
50 letter is capital, the parameter is mandatory and the bundling process MUST
50 letter is capital, the parameter is mandatory and the bundling process MUST
51 stop if he is not able to proceed it.
51 stop if he is not able to proceed it.
52
52
53 Stream parameters use a simple textual format for two main reasons:
53 Stream parameters use a simple textual format for two main reasons:
54
54
55 - Stream level parameters should remain simple and we want to discourage any
55 - Stream level parameters should remain simple and we want to discourage any
56 crazy usage.
56 crazy usage.
57 - Textual data allow easy human inspection of a bundle2 header in case of
57 - Textual data allow easy human inspection of a bundle2 header in case of
58 troubles.
58 troubles.
59
59
60 Any Applicative level options MUST go into a bundle2 part instead.
60 Any Applicative level options MUST go into a bundle2 part instead.
61
61
62 Payload part
62 Payload part
63 ------------------------
63 ------------------------
64
64
65 Binary format is as follow
65 Binary format is as follow
66
66
67 :header size: int32
67 :header size: int32
68
68
69 The total number of Bytes used by the part header. When the header is empty
69 The total number of Bytes used by the part header. When the header is empty
70 (size = 0) this is interpreted as the end of stream marker.
70 (size = 0) this is interpreted as the end of stream marker.
71
71
72 :header:
72 :header:
73
73
74 The header defines how to interpret the part. It contains two piece of
74 The header defines how to interpret the part. It contains two piece of
75 data: the part type, and the part parameters.
75 data: the part type, and the part parameters.
76
76
77 The part type is used to route an application level handler, that can
77 The part type is used to route an application level handler, that can
78 interpret payload.
78 interpret payload.
79
79
80 Part parameters are passed to the application level handler. They are
80 Part parameters are passed to the application level handler. They are
81 meant to convey information that will help the application level object to
81 meant to convey information that will help the application level object to
82 interpret the part payload.
82 interpret the part payload.
83
83
84 The binary format of the header is has follow
84 The binary format of the header is has follow
85
85
86 :typesize: (one byte)
86 :typesize: (one byte)
87
87
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89
89
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 to this part.
91 to this part.
92
92
93 :parameters:
93 :parameters:
94
94
95 Part's parameter may have arbitrary content, the binary structure is::
95 Part's parameter may have arbitrary content, the binary structure is::
96
96
97 <mandatory-count><advisory-count><param-sizes><param-data>
97 <mandatory-count><advisory-count><param-sizes><param-data>
98
98
99 :mandatory-count: 1 byte, number of mandatory parameters
99 :mandatory-count: 1 byte, number of mandatory parameters
100
100
101 :advisory-count: 1 byte, number of advisory parameters
101 :advisory-count: 1 byte, number of advisory parameters
102
102
103 :param-sizes:
103 :param-sizes:
104
104
105 N couple of bytes, where N is the total number of parameters. Each
105 N couple of bytes, where N is the total number of parameters. Each
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107
107
108 :param-data:
108 :param-data:
109
109
110 A blob of bytes from which each parameter key and value can be
110 A blob of bytes from which each parameter key and value can be
111 retrieved using the list of size couples stored in the previous
111 retrieved using the list of size couples stored in the previous
112 field.
112 field.
113
113
114 Mandatory parameters comes first, then the advisory ones.
114 Mandatory parameters comes first, then the advisory ones.
115
115
116 Each parameter's key MUST be unique within the part.
116 Each parameter's key MUST be unique within the part.
117
117
118 :payload:
118 :payload:
119
119
120 payload is a series of `<chunksize><chunkdata>`.
120 payload is a series of `<chunksize><chunkdata>`.
121
121
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124
124
125 The current implementation always produces either zero or one chunk.
125 The current implementation always produces either zero or one chunk.
126 This is an implementation limitation that will ultimately be lifted.
126 This is an implementation limitation that will ultimately be lifted.
127
127
128 `chunksize` can be negative to trigger special case processing. No such
128 `chunksize` can be negative to trigger special case processing. No such
129 processing is in place yet.
129 processing is in place yet.
130
130
131 Bundle processing
131 Bundle processing
132 ============================
132 ============================
133
133
134 Each part is processed in order using a "part handler". Handler are registered
134 Each part is processed in order using a "part handler". Handler are registered
135 for a certain part type.
135 for a certain part type.
136
136
137 The matching of a part to its handler is case insensitive. The case of the
137 The matching of a part to its handler is case insensitive. The case of the
138 part type is used to know if a part is mandatory or advisory. If the Part type
138 part type is used to know if a part is mandatory or advisory. If the Part type
139 contains any uppercase char it is considered mandatory. When no handler is
139 contains any uppercase char it is considered mandatory. When no handler is
140 known for a Mandatory part, the process is aborted and an exception is raised.
140 known for a Mandatory part, the process is aborted and an exception is raised.
141 If the part is advisory and no handler is known, the part is ignored. When the
141 If the part is advisory and no handler is known, the part is ignored. When the
142 process is aborted, the full bundle is still read from the stream to keep the
142 process is aborted, the full bundle is still read from the stream to keep the
143 channel usable. But none of the part read from an abort are processed. In the
143 channel usable. But none of the part read from an abort are processed. In the
144 future, dropping the stream may become an option for channel we do not care to
144 future, dropping the stream may become an option for channel we do not care to
145 preserve.
145 preserve.
146 """
146 """
147
147
148 from __future__ import absolute_import, division
148 from __future__ import absolute_import, division
149
149
150 import errno
150 import errno
151 import re
151 import re
152 import string
152 import string
153 import struct
153 import struct
154 import sys
154 import sys
155
155
156 from .i18n import _
156 from .i18n import _
157 from . import (
157 from . import (
158 changegroup,
158 changegroup,
159 error,
159 error,
160 obsolete,
160 obsolete,
161 phases,
161 phases,
162 pushkey,
162 pushkey,
163 pycompat,
163 pycompat,
164 tags,
164 tags,
165 url,
165 url,
166 util,
166 util,
167 )
167 )
168
168
169 urlerr = util.urlerr
169 urlerr = util.urlerr
170 urlreq = util.urlreq
170 urlreq = util.urlreq
171
171
172 _pack = struct.pack
172 _pack = struct.pack
173 _unpack = struct.unpack
173 _unpack = struct.unpack
174
174
175 _fstreamparamsize = '>i'
175 _fstreamparamsize = '>i'
176 _fpartheadersize = '>i'
176 _fpartheadersize = '>i'
177 _fparttypesize = '>B'
177 _fparttypesize = '>B'
178 _fpartid = '>I'
178 _fpartid = '>I'
179 _fpayloadsize = '>i'
179 _fpayloadsize = '>i'
180 _fpartparamcount = '>BB'
180 _fpartparamcount = '>BB'
181
181
182 _fphasesentry = '>i20s'
182 _fphasesentry = '>i20s'
183
183
184 preferedchunksize = 4096
184 preferedchunksize = 4096
185
185
186 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
186 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
187
187
188 def outdebug(ui, message):
188 def outdebug(ui, message):
189 """debug regarding output stream (bundling)"""
189 """debug regarding output stream (bundling)"""
190 if ui.configbool('devel', 'bundle2.debug'):
190 if ui.configbool('devel', 'bundle2.debug'):
191 ui.debug('bundle2-output: %s\n' % message)
191 ui.debug('bundle2-output: %s\n' % message)
192
192
193 def indebug(ui, message):
193 def indebug(ui, message):
194 """debug on input stream (unbundling)"""
194 """debug on input stream (unbundling)"""
195 if ui.configbool('devel', 'bundle2.debug'):
195 if ui.configbool('devel', 'bundle2.debug'):
196 ui.debug('bundle2-input: %s\n' % message)
196 ui.debug('bundle2-input: %s\n' % message)
197
197
198 def validateparttype(parttype):
198 def validateparttype(parttype):
199 """raise ValueError if a parttype contains invalid character"""
199 """raise ValueError if a parttype contains invalid character"""
200 if _parttypeforbidden.search(parttype):
200 if _parttypeforbidden.search(parttype):
201 raise ValueError(parttype)
201 raise ValueError(parttype)
202
202
203 def _makefpartparamsizes(nbparams):
203 def _makefpartparamsizes(nbparams):
204 """return a struct format to read part parameter sizes
204 """return a struct format to read part parameter sizes
205
205
206 The number parameters is variable so we need to build that format
206 The number parameters is variable so we need to build that format
207 dynamically.
207 dynamically.
208 """
208 """
209 return '>'+('BB'*nbparams)
209 return '>'+('BB'*nbparams)
210
210
211 parthandlermapping = {}
211 parthandlermapping = {}
212
212
213 def parthandler(parttype, params=()):
213 def parthandler(parttype, params=()):
214 """decorator that register a function as a bundle2 part handler
214 """decorator that register a function as a bundle2 part handler
215
215
216 eg::
216 eg::
217
217
218 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
218 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
219 def myparttypehandler(...):
219 def myparttypehandler(...):
220 '''process a part of type "my part".'''
220 '''process a part of type "my part".'''
221 ...
221 ...
222 """
222 """
223 validateparttype(parttype)
223 validateparttype(parttype)
224 def _decorator(func):
224 def _decorator(func):
225 lparttype = parttype.lower() # enforce lower case matching.
225 lparttype = parttype.lower() # enforce lower case matching.
226 assert lparttype not in parthandlermapping
226 assert lparttype not in parthandlermapping
227 parthandlermapping[lparttype] = func
227 parthandlermapping[lparttype] = func
228 func.params = frozenset(params)
228 func.params = frozenset(params)
229 return func
229 return func
230 return _decorator
230 return _decorator
231
231
232 class unbundlerecords(object):
232 class unbundlerecords(object):
233 """keep record of what happens during and unbundle
233 """keep record of what happens during and unbundle
234
234
235 New records are added using `records.add('cat', obj)`. Where 'cat' is a
235 New records are added using `records.add('cat', obj)`. Where 'cat' is a
236 category of record and obj is an arbitrary object.
236 category of record and obj is an arbitrary object.
237
237
238 `records['cat']` will return all entries of this category 'cat'.
238 `records['cat']` will return all entries of this category 'cat'.
239
239
240 Iterating on the object itself will yield `('category', obj)` tuples
240 Iterating on the object itself will yield `('category', obj)` tuples
241 for all entries.
241 for all entries.
242
242
243 All iterations happens in chronological order.
243 All iterations happens in chronological order.
244 """
244 """
245
245
246 def __init__(self):
246 def __init__(self):
247 self._categories = {}
247 self._categories = {}
248 self._sequences = []
248 self._sequences = []
249 self._replies = {}
249 self._replies = {}
250
250
251 def add(self, category, entry, inreplyto=None):
251 def add(self, category, entry, inreplyto=None):
252 """add a new record of a given category.
252 """add a new record of a given category.
253
253
254 The entry can then be retrieved in the list returned by
254 The entry can then be retrieved in the list returned by
255 self['category']."""
255 self['category']."""
256 self._categories.setdefault(category, []).append(entry)
256 self._categories.setdefault(category, []).append(entry)
257 self._sequences.append((category, entry))
257 self._sequences.append((category, entry))
258 if inreplyto is not None:
258 if inreplyto is not None:
259 self.getreplies(inreplyto).add(category, entry)
259 self.getreplies(inreplyto).add(category, entry)
260
260
261 def getreplies(self, partid):
261 def getreplies(self, partid):
262 """get the records that are replies to a specific part"""
262 """get the records that are replies to a specific part"""
263 return self._replies.setdefault(partid, unbundlerecords())
263 return self._replies.setdefault(partid, unbundlerecords())
264
264
265 def __getitem__(self, cat):
265 def __getitem__(self, cat):
266 return tuple(self._categories.get(cat, ()))
266 return tuple(self._categories.get(cat, ()))
267
267
268 def __iter__(self):
268 def __iter__(self):
269 return iter(self._sequences)
269 return iter(self._sequences)
270
270
271 def __len__(self):
271 def __len__(self):
272 return len(self._sequences)
272 return len(self._sequences)
273
273
274 def __nonzero__(self):
274 def __nonzero__(self):
275 return bool(self._sequences)
275 return bool(self._sequences)
276
276
277 __bool__ = __nonzero__
277 __bool__ = __nonzero__
278
278
279 class bundleoperation(object):
279 class bundleoperation(object):
280 """an object that represents a single bundling process
280 """an object that represents a single bundling process
281
281
282 Its purpose is to carry unbundle-related objects and states.
282 Its purpose is to carry unbundle-related objects and states.
283
283
284 A new object should be created at the beginning of each bundle processing.
284 A new object should be created at the beginning of each bundle processing.
285 The object is to be returned by the processing function.
285 The object is to be returned by the processing function.
286
286
287 The object has very little content now it will ultimately contain:
287 The object has very little content now it will ultimately contain:
288 * an access to the repo the bundle is applied to,
288 * an access to the repo the bundle is applied to,
289 * a ui object,
289 * a ui object,
290 * a way to retrieve a transaction to add changes to the repo,
290 * a way to retrieve a transaction to add changes to the repo,
291 * a way to record the result of processing each part,
291 * a way to record the result of processing each part,
292 * a way to construct a bundle response when applicable.
292 * a way to construct a bundle response when applicable.
293 """
293 """
294
294
295 def __init__(self, repo, transactiongetter, captureoutput=True):
295 def __init__(self, repo, transactiongetter, captureoutput=True):
296 self.repo = repo
296 self.repo = repo
297 self.ui = repo.ui
297 self.ui = repo.ui
298 self.records = unbundlerecords()
298 self.records = unbundlerecords()
299 self.reply = None
299 self.reply = None
300 self.captureoutput = captureoutput
300 self.captureoutput = captureoutput
301 self.hookargs = {}
301 self.hookargs = {}
302 self._gettransaction = transactiongetter
302 self._gettransaction = transactiongetter
303
303
304 def gettransaction(self):
304 def gettransaction(self):
305 transaction = self._gettransaction()
305 transaction = self._gettransaction()
306
306
307 if self.hookargs:
307 if self.hookargs:
308 # the ones added to the transaction supercede those added
308 # the ones added to the transaction supercede those added
309 # to the operation.
309 # to the operation.
310 self.hookargs.update(transaction.hookargs)
310 self.hookargs.update(transaction.hookargs)
311 transaction.hookargs = self.hookargs
311 transaction.hookargs = self.hookargs
312
312
313 # mark the hookargs as flushed. further attempts to add to
313 # mark the hookargs as flushed. further attempts to add to
314 # hookargs will result in an abort.
314 # hookargs will result in an abort.
315 self.hookargs = None
315 self.hookargs = None
316
316
317 return transaction
317 return transaction
318
318
319 def addhookargs(self, hookargs):
319 def addhookargs(self, hookargs):
320 if self.hookargs is None:
320 if self.hookargs is None:
321 raise error.ProgrammingError('attempted to add hookargs to '
321 raise error.ProgrammingError('attempted to add hookargs to '
322 'operation after transaction started')
322 'operation after transaction started')
323 self.hookargs.update(hookargs)
323 self.hookargs.update(hookargs)
324
324
325 class TransactionUnavailable(RuntimeError):
325 class TransactionUnavailable(RuntimeError):
326 pass
326 pass
327
327
328 def _notransaction():
328 def _notransaction():
329 """default method to get a transaction while processing a bundle
329 """default method to get a transaction while processing a bundle
330
330
331 Raise an exception to highlight the fact that no transaction was expected
331 Raise an exception to highlight the fact that no transaction was expected
332 to be created"""
332 to be created"""
333 raise TransactionUnavailable()
333 raise TransactionUnavailable()
334
334
335 def applybundle(repo, unbundler, tr, source=None, url=None, **kwargs):
335 def applybundle(repo, unbundler, tr, source=None, url=None, **kwargs):
336 # transform me into unbundler.apply() as soon as the freeze is lifted
336 # transform me into unbundler.apply() as soon as the freeze is lifted
337 if isinstance(unbundler, unbundle20):
337 if isinstance(unbundler, unbundle20):
338 tr.hookargs['bundle2'] = '1'
338 tr.hookargs['bundle2'] = '1'
339 if source is not None and 'source' not in tr.hookargs:
339 if source is not None and 'source' not in tr.hookargs:
340 tr.hookargs['source'] = source
340 tr.hookargs['source'] = source
341 if url is not None and 'url' not in tr.hookargs:
341 if url is not None and 'url' not in tr.hookargs:
342 tr.hookargs['url'] = url
342 tr.hookargs['url'] = url
343 return processbundle(repo, unbundler, lambda: tr)
343 return processbundle(repo, unbundler, lambda: tr)
344 else:
344 else:
345 # the transactiongetter won't be used, but we might as well set it
345 # the transactiongetter won't be used, but we might as well set it
346 op = bundleoperation(repo, lambda: tr)
346 op = bundleoperation(repo, lambda: tr)
347 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
347 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
348 return op
348 return op
349
349
350 class partiterator(object):
350 class partiterator(object):
351 def __init__(self, repo, unbundler):
351 def __init__(self, repo, op, unbundler):
352 self.repo = repo
352 self.repo = repo
353 self.op = op
353 self.unbundler = unbundler
354 self.unbundler = unbundler
354 self.iterator = None
355 self.iterator = None
355 self.count = 0
356 self.count = 0
356
357
357 def __enter__(self):
358 def __enter__(self):
358 def func():
359 def func():
359 itr = enumerate(self.unbundler.iterparts())
360 itr = enumerate(self.unbundler.iterparts())
360 for count, p in itr:
361 for count, p in itr:
361 self.count = count
362 self.count = count
362 yield p
363 yield p
363 self.iterator = func()
364 self.iterator = func()
364 return self.iterator
365 return self.iterator
365
366
366 def __exit__(self, type, value, tb):
367 def __exit__(self, type, exc, tb):
367 if not self.iterator:
368 if not self.iterator:
368 return
369 return
369
370
371 if exc:
372 # Any exceptions seeking to the end of the bundle at this point are
373 # almost certainly related to the underlying stream being bad.
374 # And, chances are that the exception we're handling is related to
375 # getting in that bad state. So, we swallow the seeking error and
376 # re-raise the original error.
377 seekerror = False
378 try:
379 for part in self.iterator:
380 # consume the bundle content
381 part.seek(0, 2)
382 except Exception:
383 seekerror = True
384
385 # Small hack to let caller code distinguish exceptions from bundle2
386 # processing from processing the old format. This is mostly needed
387 # to handle different return codes to unbundle according to the type
388 # of bundle. We should probably clean up or drop this return code
389 # craziness in a future version.
390 exc.duringunbundle2 = True
391 salvaged = []
392 replycaps = None
393 if self.op.reply is not None:
394 salvaged = self.op.reply.salvageoutput()
395 replycaps = self.op.reply.capabilities
396 exc._replycaps = replycaps
397 exc._bundle2salvagedoutput = salvaged
398
399 # Re-raising from a variable loses the original stack. So only use
400 # that form if we need to.
401 if seekerror:
402 raise exc
403
370 self.repo.ui.debug('bundle2-input-bundle: %i parts total\n' %
404 self.repo.ui.debug('bundle2-input-bundle: %i parts total\n' %
371 self.count)
405 self.count)
372
406
373 def processbundle(repo, unbundler, transactiongetter=None, op=None):
407 def processbundle(repo, unbundler, transactiongetter=None, op=None):
374 """This function process a bundle, apply effect to/from a repo
408 """This function process a bundle, apply effect to/from a repo
375
409
376 It iterates over each part then searches for and uses the proper handling
410 It iterates over each part then searches for and uses the proper handling
377 code to process the part. Parts are processed in order.
411 code to process the part. Parts are processed in order.
378
412
379 Unknown Mandatory part will abort the process.
413 Unknown Mandatory part will abort the process.
380
414
381 It is temporarily possible to provide a prebuilt bundleoperation to the
415 It is temporarily possible to provide a prebuilt bundleoperation to the
382 function. This is used to ensure output is properly propagated in case of
416 function. This is used to ensure output is properly propagated in case of
383 an error during the unbundling. This output capturing part will likely be
417 an error during the unbundling. This output capturing part will likely be
384 reworked and this ability will probably go away in the process.
418 reworked and this ability will probably go away in the process.
385 """
419 """
386 if op is None:
420 if op is None:
387 if transactiongetter is None:
421 if transactiongetter is None:
388 transactiongetter = _notransaction
422 transactiongetter = _notransaction
389 op = bundleoperation(repo, transactiongetter)
423 op = bundleoperation(repo, transactiongetter)
390 # todo:
424 # todo:
391 # - replace this is a init function soon.
425 # - replace this is a init function soon.
392 # - exception catching
426 # - exception catching
393 unbundler.params
427 unbundler.params
394 if repo.ui.debugflag:
428 if repo.ui.debugflag:
395 msg = ['bundle2-input-bundle:']
429 msg = ['bundle2-input-bundle:']
396 if unbundler.params:
430 if unbundler.params:
397 msg.append(' %i params' % len(unbundler.params))
431 msg.append(' %i params' % len(unbundler.params))
398 if op._gettransaction is None or op._gettransaction is _notransaction:
432 if op._gettransaction is None or op._gettransaction is _notransaction:
399 msg.append(' no-transaction')
433 msg.append(' no-transaction')
400 else:
434 else:
401 msg.append(' with-transaction')
435 msg.append(' with-transaction')
402 msg.append('\n')
436 msg.append('\n')
403 repo.ui.debug(''.join(msg))
437 repo.ui.debug(''.join(msg))
404
438
405 with partiterator(repo, unbundler) as parts:
439 with partiterator(repo, op, unbundler) as parts:
406 part = None
407 try:
408 for part in parts:
440 for part in parts:
409 _processpart(op, part)
441 _processpart(op, part)
410 except Exception as exc:
411 # Any exceptions seeking to the end of the bundle at this point are
412 # almost certainly related to the underlying stream being bad.
413 # And, chances are that the exception we're handling is related to
414 # getting in that bad state. So, we swallow the seeking error and
415 # re-raise the original error.
416 seekerror = False
417 try:
418 for part in parts:
419 # consume the bundle content
420 part.seek(0, 2)
421 except Exception:
422 seekerror = True
423
424 # Small hack to let caller code distinguish exceptions from bundle2
425 # processing from processing the old format. This is mostly needed
426 # to handle different return codes to unbundle according to the type
427 # of bundle. We should probably clean up or drop this return code
428 # craziness in a future version.
429 exc.duringunbundle2 = True
430 salvaged = []
431 replycaps = None
432 if op.reply is not None:
433 salvaged = op.reply.salvageoutput()
434 replycaps = op.reply.capabilities
435 exc._replycaps = replycaps
436 exc._bundle2salvagedoutput = salvaged
437
438 # Re-raising from a variable loses the original stack. So only use
439 # that form if we need to.
440 if seekerror:
441 raise exc
442 else:
443 raise
444
442
445 return op
443 return op
446
444
447 def _processchangegroup(op, cg, tr, source, url, **kwargs):
445 def _processchangegroup(op, cg, tr, source, url, **kwargs):
448 ret = cg.apply(op.repo, tr, source, url, **kwargs)
446 ret = cg.apply(op.repo, tr, source, url, **kwargs)
449 op.records.add('changegroup', {
447 op.records.add('changegroup', {
450 'return': ret,
448 'return': ret,
451 })
449 })
452 return ret
450 return ret
453
451
454 def _processpart(op, part):
452 def _processpart(op, part):
455 """process a single part from a bundle
453 """process a single part from a bundle
456
454
457 The part is guaranteed to have been fully consumed when the function exits
455 The part is guaranteed to have been fully consumed when the function exits
458 (even if an exception is raised)."""
456 (even if an exception is raised)."""
459 status = 'unknown' # used by debug output
457 status = 'unknown' # used by debug output
460 hardabort = False
458 hardabort = False
461 try:
459 try:
462 try:
460 try:
463 handler = parthandlermapping.get(part.type)
461 handler = parthandlermapping.get(part.type)
464 if handler is None:
462 if handler is None:
465 status = 'unsupported-type'
463 status = 'unsupported-type'
466 raise error.BundleUnknownFeatureError(parttype=part.type)
464 raise error.BundleUnknownFeatureError(parttype=part.type)
467 indebug(op.ui, 'found a handler for part %r' % part.type)
465 indebug(op.ui, 'found a handler for part %r' % part.type)
468 unknownparams = part.mandatorykeys - handler.params
466 unknownparams = part.mandatorykeys - handler.params
469 if unknownparams:
467 if unknownparams:
470 unknownparams = list(unknownparams)
468 unknownparams = list(unknownparams)
471 unknownparams.sort()
469 unknownparams.sort()
472 status = 'unsupported-params (%s)' % unknownparams
470 status = 'unsupported-params (%s)' % unknownparams
473 raise error.BundleUnknownFeatureError(parttype=part.type,
471 raise error.BundleUnknownFeatureError(parttype=part.type,
474 params=unknownparams)
472 params=unknownparams)
475 status = 'supported'
473 status = 'supported'
476 except error.BundleUnknownFeatureError as exc:
474 except error.BundleUnknownFeatureError as exc:
477 if part.mandatory: # mandatory parts
475 if part.mandatory: # mandatory parts
478 raise
476 raise
479 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
477 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
480 return # skip to part processing
478 return # skip to part processing
481 finally:
479 finally:
482 if op.ui.debugflag:
480 if op.ui.debugflag:
483 msg = ['bundle2-input-part: "%s"' % part.type]
481 msg = ['bundle2-input-part: "%s"' % part.type]
484 if not part.mandatory:
482 if not part.mandatory:
485 msg.append(' (advisory)')
483 msg.append(' (advisory)')
486 nbmp = len(part.mandatorykeys)
484 nbmp = len(part.mandatorykeys)
487 nbap = len(part.params) - nbmp
485 nbap = len(part.params) - nbmp
488 if nbmp or nbap:
486 if nbmp or nbap:
489 msg.append(' (params:')
487 msg.append(' (params:')
490 if nbmp:
488 if nbmp:
491 msg.append(' %i mandatory' % nbmp)
489 msg.append(' %i mandatory' % nbmp)
492 if nbap:
490 if nbap:
493 msg.append(' %i advisory' % nbmp)
491 msg.append(' %i advisory' % nbmp)
494 msg.append(')')
492 msg.append(')')
495 msg.append(' %s\n' % status)
493 msg.append(' %s\n' % status)
496 op.ui.debug(''.join(msg))
494 op.ui.debug(''.join(msg))
497
495
498 # handler is called outside the above try block so that we don't
496 # handler is called outside the above try block so that we don't
499 # risk catching KeyErrors from anything other than the
497 # risk catching KeyErrors from anything other than the
500 # parthandlermapping lookup (any KeyError raised by handler()
498 # parthandlermapping lookup (any KeyError raised by handler()
501 # itself represents a defect of a different variety).
499 # itself represents a defect of a different variety).
502 output = None
500 output = None
503 if op.captureoutput and op.reply is not None:
501 if op.captureoutput and op.reply is not None:
504 op.ui.pushbuffer(error=True, subproc=True)
502 op.ui.pushbuffer(error=True, subproc=True)
505 output = ''
503 output = ''
506 try:
504 try:
507 handler(op, part)
505 handler(op, part)
508 finally:
506 finally:
509 if output is not None:
507 if output is not None:
510 output = op.ui.popbuffer()
508 output = op.ui.popbuffer()
511 if output:
509 if output:
512 outpart = op.reply.newpart('output', data=output,
510 outpart = op.reply.newpart('output', data=output,
513 mandatory=False)
511 mandatory=False)
514 outpart.addparam(
512 outpart.addparam(
515 'in-reply-to', pycompat.bytestr(part.id), mandatory=False)
513 'in-reply-to', pycompat.bytestr(part.id), mandatory=False)
516 # If exiting or interrupted, do not attempt to seek the stream in the
514 # If exiting or interrupted, do not attempt to seek the stream in the
517 # finally block below. This makes abort faster.
515 # finally block below. This makes abort faster.
518 except (SystemExit, KeyboardInterrupt):
516 except (SystemExit, KeyboardInterrupt):
519 hardabort = True
517 hardabort = True
520 raise
518 raise
521 finally:
519 finally:
522 # consume the part content to not corrupt the stream.
520 # consume the part content to not corrupt the stream.
523 if not hardabort:
521 if not hardabort:
524 part.seek(0, 2)
522 part.seek(0, 2)
525
523
526
524
527 def decodecaps(blob):
525 def decodecaps(blob):
528 """decode a bundle2 caps bytes blob into a dictionary
526 """decode a bundle2 caps bytes blob into a dictionary
529
527
530 The blob is a list of capabilities (one per line)
528 The blob is a list of capabilities (one per line)
531 Capabilities may have values using a line of the form::
529 Capabilities may have values using a line of the form::
532
530
533 capability=value1,value2,value3
531 capability=value1,value2,value3
534
532
535 The values are always a list."""
533 The values are always a list."""
536 caps = {}
534 caps = {}
537 for line in blob.splitlines():
535 for line in blob.splitlines():
538 if not line:
536 if not line:
539 continue
537 continue
540 if '=' not in line:
538 if '=' not in line:
541 key, vals = line, ()
539 key, vals = line, ()
542 else:
540 else:
543 key, vals = line.split('=', 1)
541 key, vals = line.split('=', 1)
544 vals = vals.split(',')
542 vals = vals.split(',')
545 key = urlreq.unquote(key)
543 key = urlreq.unquote(key)
546 vals = [urlreq.unquote(v) for v in vals]
544 vals = [urlreq.unquote(v) for v in vals]
547 caps[key] = vals
545 caps[key] = vals
548 return caps
546 return caps
549
547
550 def encodecaps(caps):
548 def encodecaps(caps):
551 """encode a bundle2 caps dictionary into a bytes blob"""
549 """encode a bundle2 caps dictionary into a bytes blob"""
552 chunks = []
550 chunks = []
553 for ca in sorted(caps):
551 for ca in sorted(caps):
554 vals = caps[ca]
552 vals = caps[ca]
555 ca = urlreq.quote(ca)
553 ca = urlreq.quote(ca)
556 vals = [urlreq.quote(v) for v in vals]
554 vals = [urlreq.quote(v) for v in vals]
557 if vals:
555 if vals:
558 ca = "%s=%s" % (ca, ','.join(vals))
556 ca = "%s=%s" % (ca, ','.join(vals))
559 chunks.append(ca)
557 chunks.append(ca)
560 return '\n'.join(chunks)
558 return '\n'.join(chunks)
561
559
562 bundletypes = {
560 bundletypes = {
563 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
561 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
564 # since the unification ssh accepts a header but there
562 # since the unification ssh accepts a header but there
565 # is no capability signaling it.
563 # is no capability signaling it.
566 "HG20": (), # special-cased below
564 "HG20": (), # special-cased below
567 "HG10UN": ("HG10UN", 'UN'),
565 "HG10UN": ("HG10UN", 'UN'),
568 "HG10BZ": ("HG10", 'BZ'),
566 "HG10BZ": ("HG10", 'BZ'),
569 "HG10GZ": ("HG10GZ", 'GZ'),
567 "HG10GZ": ("HG10GZ", 'GZ'),
570 }
568 }
571
569
572 # hgweb uses this list to communicate its preferred type
570 # hgweb uses this list to communicate its preferred type
573 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
571 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
574
572
575 class bundle20(object):
573 class bundle20(object):
576 """represent an outgoing bundle2 container
574 """represent an outgoing bundle2 container
577
575
578 Use the `addparam` method to add stream level parameter. and `newpart` to
576 Use the `addparam` method to add stream level parameter. and `newpart` to
579 populate it. Then call `getchunks` to retrieve all the binary chunks of
577 populate it. Then call `getchunks` to retrieve all the binary chunks of
580 data that compose the bundle2 container."""
578 data that compose the bundle2 container."""
581
579
582 _magicstring = 'HG20'
580 _magicstring = 'HG20'
583
581
584 def __init__(self, ui, capabilities=()):
582 def __init__(self, ui, capabilities=()):
585 self.ui = ui
583 self.ui = ui
586 self._params = []
584 self._params = []
587 self._parts = []
585 self._parts = []
588 self.capabilities = dict(capabilities)
586 self.capabilities = dict(capabilities)
589 self._compengine = util.compengines.forbundletype('UN')
587 self._compengine = util.compengines.forbundletype('UN')
590 self._compopts = None
588 self._compopts = None
591
589
592 def setcompression(self, alg, compopts=None):
590 def setcompression(self, alg, compopts=None):
593 """setup core part compression to <alg>"""
591 """setup core part compression to <alg>"""
594 if alg in (None, 'UN'):
592 if alg in (None, 'UN'):
595 return
593 return
596 assert not any(n.lower() == 'compression' for n, v in self._params)
594 assert not any(n.lower() == 'compression' for n, v in self._params)
597 self.addparam('Compression', alg)
595 self.addparam('Compression', alg)
598 self._compengine = util.compengines.forbundletype(alg)
596 self._compengine = util.compengines.forbundletype(alg)
599 self._compopts = compopts
597 self._compopts = compopts
600
598
601 @property
599 @property
602 def nbparts(self):
600 def nbparts(self):
603 """total number of parts added to the bundler"""
601 """total number of parts added to the bundler"""
604 return len(self._parts)
602 return len(self._parts)
605
603
606 # methods used to defines the bundle2 content
604 # methods used to defines the bundle2 content
607 def addparam(self, name, value=None):
605 def addparam(self, name, value=None):
608 """add a stream level parameter"""
606 """add a stream level parameter"""
609 if not name:
607 if not name:
610 raise ValueError('empty parameter name')
608 raise ValueError('empty parameter name')
611 if name[0] not in pycompat.bytestr(string.ascii_letters):
609 if name[0] not in pycompat.bytestr(string.ascii_letters):
612 raise ValueError('non letter first character: %r' % name)
610 raise ValueError('non letter first character: %r' % name)
613 self._params.append((name, value))
611 self._params.append((name, value))
614
612
615 def addpart(self, part):
613 def addpart(self, part):
616 """add a new part to the bundle2 container
614 """add a new part to the bundle2 container
617
615
618 Parts contains the actual applicative payload."""
616 Parts contains the actual applicative payload."""
619 assert part.id is None
617 assert part.id is None
620 part.id = len(self._parts) # very cheap counter
618 part.id = len(self._parts) # very cheap counter
621 self._parts.append(part)
619 self._parts.append(part)
622
620
623 def newpart(self, typeid, *args, **kwargs):
621 def newpart(self, typeid, *args, **kwargs):
624 """create a new part and add it to the containers
622 """create a new part and add it to the containers
625
623
626 As the part is directly added to the containers. For now, this means
624 As the part is directly added to the containers. For now, this means
627 that any failure to properly initialize the part after calling
625 that any failure to properly initialize the part after calling
628 ``newpart`` should result in a failure of the whole bundling process.
626 ``newpart`` should result in a failure of the whole bundling process.
629
627
630 You can still fall back to manually create and add if you need better
628 You can still fall back to manually create and add if you need better
631 control."""
629 control."""
632 part = bundlepart(typeid, *args, **kwargs)
630 part = bundlepart(typeid, *args, **kwargs)
633 self.addpart(part)
631 self.addpart(part)
634 return part
632 return part
635
633
636 # methods used to generate the bundle2 stream
634 # methods used to generate the bundle2 stream
637 def getchunks(self):
635 def getchunks(self):
638 if self.ui.debugflag:
636 if self.ui.debugflag:
639 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
637 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
640 if self._params:
638 if self._params:
641 msg.append(' (%i params)' % len(self._params))
639 msg.append(' (%i params)' % len(self._params))
642 msg.append(' %i parts total\n' % len(self._parts))
640 msg.append(' %i parts total\n' % len(self._parts))
643 self.ui.debug(''.join(msg))
641 self.ui.debug(''.join(msg))
644 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
642 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
645 yield self._magicstring
643 yield self._magicstring
646 param = self._paramchunk()
644 param = self._paramchunk()
647 outdebug(self.ui, 'bundle parameter: %s' % param)
645 outdebug(self.ui, 'bundle parameter: %s' % param)
648 yield _pack(_fstreamparamsize, len(param))
646 yield _pack(_fstreamparamsize, len(param))
649 if param:
647 if param:
650 yield param
648 yield param
651 for chunk in self._compengine.compressstream(self._getcorechunk(),
649 for chunk in self._compengine.compressstream(self._getcorechunk(),
652 self._compopts):
650 self._compopts):
653 yield chunk
651 yield chunk
654
652
655 def _paramchunk(self):
653 def _paramchunk(self):
656 """return a encoded version of all stream parameters"""
654 """return a encoded version of all stream parameters"""
657 blocks = []
655 blocks = []
658 for par, value in self._params:
656 for par, value in self._params:
659 par = urlreq.quote(par)
657 par = urlreq.quote(par)
660 if value is not None:
658 if value is not None:
661 value = urlreq.quote(value)
659 value = urlreq.quote(value)
662 par = '%s=%s' % (par, value)
660 par = '%s=%s' % (par, value)
663 blocks.append(par)
661 blocks.append(par)
664 return ' '.join(blocks)
662 return ' '.join(blocks)
665
663
666 def _getcorechunk(self):
664 def _getcorechunk(self):
667 """yield chunk for the core part of the bundle
665 """yield chunk for the core part of the bundle
668
666
669 (all but headers and parameters)"""
667 (all but headers and parameters)"""
670 outdebug(self.ui, 'start of parts')
668 outdebug(self.ui, 'start of parts')
671 for part in self._parts:
669 for part in self._parts:
672 outdebug(self.ui, 'bundle part: "%s"' % part.type)
670 outdebug(self.ui, 'bundle part: "%s"' % part.type)
673 for chunk in part.getchunks(ui=self.ui):
671 for chunk in part.getchunks(ui=self.ui):
674 yield chunk
672 yield chunk
675 outdebug(self.ui, 'end of bundle')
673 outdebug(self.ui, 'end of bundle')
676 yield _pack(_fpartheadersize, 0)
674 yield _pack(_fpartheadersize, 0)
677
675
678
676
679 def salvageoutput(self):
677 def salvageoutput(self):
680 """return a list with a copy of all output parts in the bundle
678 """return a list with a copy of all output parts in the bundle
681
679
682 This is meant to be used during error handling to make sure we preserve
680 This is meant to be used during error handling to make sure we preserve
683 server output"""
681 server output"""
684 salvaged = []
682 salvaged = []
685 for part in self._parts:
683 for part in self._parts:
686 if part.type.startswith('output'):
684 if part.type.startswith('output'):
687 salvaged.append(part.copy())
685 salvaged.append(part.copy())
688 return salvaged
686 return salvaged
689
687
690
688
691 class unpackermixin(object):
689 class unpackermixin(object):
692 """A mixin to extract bytes and struct data from a stream"""
690 """A mixin to extract bytes and struct data from a stream"""
693
691
694 def __init__(self, fp):
692 def __init__(self, fp):
695 self._fp = fp
693 self._fp = fp
696
694
697 def _unpack(self, format):
695 def _unpack(self, format):
698 """unpack this struct format from the stream
696 """unpack this struct format from the stream
699
697
700 This method is meant for internal usage by the bundle2 protocol only.
698 This method is meant for internal usage by the bundle2 protocol only.
701 They directly manipulate the low level stream including bundle2 level
699 They directly manipulate the low level stream including bundle2 level
702 instruction.
700 instruction.
703
701
704 Do not use it to implement higher-level logic or methods."""
702 Do not use it to implement higher-level logic or methods."""
705 data = self._readexact(struct.calcsize(format))
703 data = self._readexact(struct.calcsize(format))
706 return _unpack(format, data)
704 return _unpack(format, data)
707
705
708 def _readexact(self, size):
706 def _readexact(self, size):
709 """read exactly <size> bytes from the stream
707 """read exactly <size> bytes from the stream
710
708
711 This method is meant for internal usage by the bundle2 protocol only.
709 This method is meant for internal usage by the bundle2 protocol only.
712 They directly manipulate the low level stream including bundle2 level
710 They directly manipulate the low level stream including bundle2 level
713 instruction.
711 instruction.
714
712
715 Do not use it to implement higher-level logic or methods."""
713 Do not use it to implement higher-level logic or methods."""
716 return changegroup.readexactly(self._fp, size)
714 return changegroup.readexactly(self._fp, size)
717
715
718 def getunbundler(ui, fp, magicstring=None):
716 def getunbundler(ui, fp, magicstring=None):
719 """return a valid unbundler object for a given magicstring"""
717 """return a valid unbundler object for a given magicstring"""
720 if magicstring is None:
718 if magicstring is None:
721 magicstring = changegroup.readexactly(fp, 4)
719 magicstring = changegroup.readexactly(fp, 4)
722 magic, version = magicstring[0:2], magicstring[2:4]
720 magic, version = magicstring[0:2], magicstring[2:4]
723 if magic != 'HG':
721 if magic != 'HG':
724 ui.debug(
722 ui.debug(
725 "error: invalid magic: %r (version %r), should be 'HG'\n"
723 "error: invalid magic: %r (version %r), should be 'HG'\n"
726 % (magic, version))
724 % (magic, version))
727 raise error.Abort(_('not a Mercurial bundle'))
725 raise error.Abort(_('not a Mercurial bundle'))
728 unbundlerclass = formatmap.get(version)
726 unbundlerclass = formatmap.get(version)
729 if unbundlerclass is None:
727 if unbundlerclass is None:
730 raise error.Abort(_('unknown bundle version %s') % version)
728 raise error.Abort(_('unknown bundle version %s') % version)
731 unbundler = unbundlerclass(ui, fp)
729 unbundler = unbundlerclass(ui, fp)
732 indebug(ui, 'start processing of %s stream' % magicstring)
730 indebug(ui, 'start processing of %s stream' % magicstring)
733 return unbundler
731 return unbundler
734
732
735 class unbundle20(unpackermixin):
733 class unbundle20(unpackermixin):
736 """interpret a bundle2 stream
734 """interpret a bundle2 stream
737
735
738 This class is fed with a binary stream and yields parts through its
736 This class is fed with a binary stream and yields parts through its
739 `iterparts` methods."""
737 `iterparts` methods."""
740
738
741 _magicstring = 'HG20'
739 _magicstring = 'HG20'
742
740
743 def __init__(self, ui, fp):
741 def __init__(self, ui, fp):
744 """If header is specified, we do not read it out of the stream."""
742 """If header is specified, we do not read it out of the stream."""
745 self.ui = ui
743 self.ui = ui
746 self._compengine = util.compengines.forbundletype('UN')
744 self._compengine = util.compengines.forbundletype('UN')
747 self._compressed = None
745 self._compressed = None
748 super(unbundle20, self).__init__(fp)
746 super(unbundle20, self).__init__(fp)
749
747
750 @util.propertycache
748 @util.propertycache
751 def params(self):
749 def params(self):
752 """dictionary of stream level parameters"""
750 """dictionary of stream level parameters"""
753 indebug(self.ui, 'reading bundle2 stream parameters')
751 indebug(self.ui, 'reading bundle2 stream parameters')
754 params = {}
752 params = {}
755 paramssize = self._unpack(_fstreamparamsize)[0]
753 paramssize = self._unpack(_fstreamparamsize)[0]
756 if paramssize < 0:
754 if paramssize < 0:
757 raise error.BundleValueError('negative bundle param size: %i'
755 raise error.BundleValueError('negative bundle param size: %i'
758 % paramssize)
756 % paramssize)
759 if paramssize:
757 if paramssize:
760 params = self._readexact(paramssize)
758 params = self._readexact(paramssize)
761 params = self._processallparams(params)
759 params = self._processallparams(params)
762 return params
760 return params
763
761
764 def _processallparams(self, paramsblock):
762 def _processallparams(self, paramsblock):
765 """"""
763 """"""
766 params = util.sortdict()
764 params = util.sortdict()
767 for p in paramsblock.split(' '):
765 for p in paramsblock.split(' '):
768 p = p.split('=', 1)
766 p = p.split('=', 1)
769 p = [urlreq.unquote(i) for i in p]
767 p = [urlreq.unquote(i) for i in p]
770 if len(p) < 2:
768 if len(p) < 2:
771 p.append(None)
769 p.append(None)
772 self._processparam(*p)
770 self._processparam(*p)
773 params[p[0]] = p[1]
771 params[p[0]] = p[1]
774 return params
772 return params
775
773
776
774
777 def _processparam(self, name, value):
775 def _processparam(self, name, value):
778 """process a parameter, applying its effect if needed
776 """process a parameter, applying its effect if needed
779
777
780 Parameter starting with a lower case letter are advisory and will be
778 Parameter starting with a lower case letter are advisory and will be
781 ignored when unknown. Those starting with an upper case letter are
779 ignored when unknown. Those starting with an upper case letter are
782 mandatory and will this function will raise a KeyError when unknown.
780 mandatory and will this function will raise a KeyError when unknown.
783
781
784 Note: no option are currently supported. Any input will be either
782 Note: no option are currently supported. Any input will be either
785 ignored or failing.
783 ignored or failing.
786 """
784 """
787 if not name:
785 if not name:
788 raise ValueError('empty parameter name')
786 raise ValueError('empty parameter name')
789 if name[0] not in pycompat.bytestr(string.ascii_letters):
787 if name[0] not in pycompat.bytestr(string.ascii_letters):
790 raise ValueError('non letter first character: %r' % name)
788 raise ValueError('non letter first character: %r' % name)
791 try:
789 try:
792 handler = b2streamparamsmap[name.lower()]
790 handler = b2streamparamsmap[name.lower()]
793 except KeyError:
791 except KeyError:
794 if name[0].islower():
792 if name[0].islower():
795 indebug(self.ui, "ignoring unknown parameter %r" % name)
793 indebug(self.ui, "ignoring unknown parameter %r" % name)
796 else:
794 else:
797 raise error.BundleUnknownFeatureError(params=(name,))
795 raise error.BundleUnknownFeatureError(params=(name,))
798 else:
796 else:
799 handler(self, name, value)
797 handler(self, name, value)
800
798
801 def _forwardchunks(self):
799 def _forwardchunks(self):
802 """utility to transfer a bundle2 as binary
800 """utility to transfer a bundle2 as binary
803
801
804 This is made necessary by the fact the 'getbundle' command over 'ssh'
802 This is made necessary by the fact the 'getbundle' command over 'ssh'
805 have no way to know then the reply end, relying on the bundle to be
803 have no way to know then the reply end, relying on the bundle to be
806 interpreted to know its end. This is terrible and we are sorry, but we
804 interpreted to know its end. This is terrible and we are sorry, but we
807 needed to move forward to get general delta enabled.
805 needed to move forward to get general delta enabled.
808 """
806 """
809 yield self._magicstring
807 yield self._magicstring
810 assert 'params' not in vars(self)
808 assert 'params' not in vars(self)
811 paramssize = self._unpack(_fstreamparamsize)[0]
809 paramssize = self._unpack(_fstreamparamsize)[0]
812 if paramssize < 0:
810 if paramssize < 0:
813 raise error.BundleValueError('negative bundle param size: %i'
811 raise error.BundleValueError('negative bundle param size: %i'
814 % paramssize)
812 % paramssize)
815 yield _pack(_fstreamparamsize, paramssize)
813 yield _pack(_fstreamparamsize, paramssize)
816 if paramssize:
814 if paramssize:
817 params = self._readexact(paramssize)
815 params = self._readexact(paramssize)
818 self._processallparams(params)
816 self._processallparams(params)
819 yield params
817 yield params
820 assert self._compengine.bundletype == 'UN'
818 assert self._compengine.bundletype == 'UN'
821 # From there, payload might need to be decompressed
819 # From there, payload might need to be decompressed
822 self._fp = self._compengine.decompressorreader(self._fp)
820 self._fp = self._compengine.decompressorreader(self._fp)
823 emptycount = 0
821 emptycount = 0
824 while emptycount < 2:
822 while emptycount < 2:
825 # so we can brainlessly loop
823 # so we can brainlessly loop
826 assert _fpartheadersize == _fpayloadsize
824 assert _fpartheadersize == _fpayloadsize
827 size = self._unpack(_fpartheadersize)[0]
825 size = self._unpack(_fpartheadersize)[0]
828 yield _pack(_fpartheadersize, size)
826 yield _pack(_fpartheadersize, size)
829 if size:
827 if size:
830 emptycount = 0
828 emptycount = 0
831 else:
829 else:
832 emptycount += 1
830 emptycount += 1
833 continue
831 continue
834 if size == flaginterrupt:
832 if size == flaginterrupt:
835 continue
833 continue
836 elif size < 0:
834 elif size < 0:
837 raise error.BundleValueError('negative chunk size: %i')
835 raise error.BundleValueError('negative chunk size: %i')
838 yield self._readexact(size)
836 yield self._readexact(size)
839
837
840
838
841 def iterparts(self):
839 def iterparts(self):
842 """yield all parts contained in the stream"""
840 """yield all parts contained in the stream"""
843 # make sure param have been loaded
841 # make sure param have been loaded
844 self.params
842 self.params
845 # From there, payload need to be decompressed
843 # From there, payload need to be decompressed
846 self._fp = self._compengine.decompressorreader(self._fp)
844 self._fp = self._compengine.decompressorreader(self._fp)
847 indebug(self.ui, 'start extraction of bundle2 parts')
845 indebug(self.ui, 'start extraction of bundle2 parts')
848 headerblock = self._readpartheader()
846 headerblock = self._readpartheader()
849 while headerblock is not None:
847 while headerblock is not None:
850 part = unbundlepart(self.ui, headerblock, self._fp)
848 part = unbundlepart(self.ui, headerblock, self._fp)
851 yield part
849 yield part
852 # Seek to the end of the part to force it's consumption so the next
850 # Seek to the end of the part to force it's consumption so the next
853 # part can be read. But then seek back to the beginning so the
851 # part can be read. But then seek back to the beginning so the
854 # code consuming this generator has a part that starts at 0.
852 # code consuming this generator has a part that starts at 0.
855 part.seek(0, 2)
853 part.seek(0, 2)
856 part.seek(0)
854 part.seek(0)
857 headerblock = self._readpartheader()
855 headerblock = self._readpartheader()
858 indebug(self.ui, 'end of bundle2 stream')
856 indebug(self.ui, 'end of bundle2 stream')
859
857
860 def _readpartheader(self):
858 def _readpartheader(self):
861 """reads a part header size and return the bytes blob
859 """reads a part header size and return the bytes blob
862
860
863 returns None if empty"""
861 returns None if empty"""
864 headersize = self._unpack(_fpartheadersize)[0]
862 headersize = self._unpack(_fpartheadersize)[0]
865 if headersize < 0:
863 if headersize < 0:
866 raise error.BundleValueError('negative part header size: %i'
864 raise error.BundleValueError('negative part header size: %i'
867 % headersize)
865 % headersize)
868 indebug(self.ui, 'part header size: %i' % headersize)
866 indebug(self.ui, 'part header size: %i' % headersize)
869 if headersize:
867 if headersize:
870 return self._readexact(headersize)
868 return self._readexact(headersize)
871 return None
869 return None
872
870
873 def compressed(self):
871 def compressed(self):
874 self.params # load params
872 self.params # load params
875 return self._compressed
873 return self._compressed
876
874
877 def close(self):
875 def close(self):
878 """close underlying file"""
876 """close underlying file"""
879 if util.safehasattr(self._fp, 'close'):
877 if util.safehasattr(self._fp, 'close'):
880 return self._fp.close()
878 return self._fp.close()
881
879
882 formatmap = {'20': unbundle20}
880 formatmap = {'20': unbundle20}
883
881
884 b2streamparamsmap = {}
882 b2streamparamsmap = {}
885
883
886 def b2streamparamhandler(name):
884 def b2streamparamhandler(name):
887 """register a handler for a stream level parameter"""
885 """register a handler for a stream level parameter"""
888 def decorator(func):
886 def decorator(func):
889 assert name not in formatmap
887 assert name not in formatmap
890 b2streamparamsmap[name] = func
888 b2streamparamsmap[name] = func
891 return func
889 return func
892 return decorator
890 return decorator
893
891
894 @b2streamparamhandler('compression')
892 @b2streamparamhandler('compression')
895 def processcompression(unbundler, param, value):
893 def processcompression(unbundler, param, value):
896 """read compression parameter and install payload decompression"""
894 """read compression parameter and install payload decompression"""
897 if value not in util.compengines.supportedbundletypes:
895 if value not in util.compengines.supportedbundletypes:
898 raise error.BundleUnknownFeatureError(params=(param,),
896 raise error.BundleUnknownFeatureError(params=(param,),
899 values=(value,))
897 values=(value,))
900 unbundler._compengine = util.compengines.forbundletype(value)
898 unbundler._compengine = util.compengines.forbundletype(value)
901 if value is not None:
899 if value is not None:
902 unbundler._compressed = True
900 unbundler._compressed = True
903
901
904 class bundlepart(object):
902 class bundlepart(object):
905 """A bundle2 part contains application level payload
903 """A bundle2 part contains application level payload
906
904
907 The part `type` is used to route the part to the application level
905 The part `type` is used to route the part to the application level
908 handler.
906 handler.
909
907
910 The part payload is contained in ``part.data``. It could be raw bytes or a
908 The part payload is contained in ``part.data``. It could be raw bytes or a
911 generator of byte chunks.
909 generator of byte chunks.
912
910
913 You can add parameters to the part using the ``addparam`` method.
911 You can add parameters to the part using the ``addparam`` method.
914 Parameters can be either mandatory (default) or advisory. Remote side
912 Parameters can be either mandatory (default) or advisory. Remote side
915 should be able to safely ignore the advisory ones.
913 should be able to safely ignore the advisory ones.
916
914
917 Both data and parameters cannot be modified after the generation has begun.
915 Both data and parameters cannot be modified after the generation has begun.
918 """
916 """
919
917
920 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
918 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
921 data='', mandatory=True):
919 data='', mandatory=True):
922 validateparttype(parttype)
920 validateparttype(parttype)
923 self.id = None
921 self.id = None
924 self.type = parttype
922 self.type = parttype
925 self._data = data
923 self._data = data
926 self._mandatoryparams = list(mandatoryparams)
924 self._mandatoryparams = list(mandatoryparams)
927 self._advisoryparams = list(advisoryparams)
925 self._advisoryparams = list(advisoryparams)
928 # checking for duplicated entries
926 # checking for duplicated entries
929 self._seenparams = set()
927 self._seenparams = set()
930 for pname, __ in self._mandatoryparams + self._advisoryparams:
928 for pname, __ in self._mandatoryparams + self._advisoryparams:
931 if pname in self._seenparams:
929 if pname in self._seenparams:
932 raise error.ProgrammingError('duplicated params: %s' % pname)
930 raise error.ProgrammingError('duplicated params: %s' % pname)
933 self._seenparams.add(pname)
931 self._seenparams.add(pname)
934 # status of the part's generation:
932 # status of the part's generation:
935 # - None: not started,
933 # - None: not started,
936 # - False: currently generated,
934 # - False: currently generated,
937 # - True: generation done.
935 # - True: generation done.
938 self._generated = None
936 self._generated = None
939 self.mandatory = mandatory
937 self.mandatory = mandatory
940
938
941 def __repr__(self):
939 def __repr__(self):
942 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
940 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
943 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
941 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
944 % (cls, id(self), self.id, self.type, self.mandatory))
942 % (cls, id(self), self.id, self.type, self.mandatory))
945
943
946 def copy(self):
944 def copy(self):
947 """return a copy of the part
945 """return a copy of the part
948
946
949 The new part have the very same content but no partid assigned yet.
947 The new part have the very same content but no partid assigned yet.
950 Parts with generated data cannot be copied."""
948 Parts with generated data cannot be copied."""
951 assert not util.safehasattr(self.data, 'next')
949 assert not util.safehasattr(self.data, 'next')
952 return self.__class__(self.type, self._mandatoryparams,
950 return self.__class__(self.type, self._mandatoryparams,
953 self._advisoryparams, self._data, self.mandatory)
951 self._advisoryparams, self._data, self.mandatory)
954
952
955 # methods used to defines the part content
953 # methods used to defines the part content
956 @property
954 @property
957 def data(self):
955 def data(self):
958 return self._data
956 return self._data
959
957
960 @data.setter
958 @data.setter
961 def data(self, data):
959 def data(self, data):
962 if self._generated is not None:
960 if self._generated is not None:
963 raise error.ReadOnlyPartError('part is being generated')
961 raise error.ReadOnlyPartError('part is being generated')
964 self._data = data
962 self._data = data
965
963
966 @property
964 @property
967 def mandatoryparams(self):
965 def mandatoryparams(self):
968 # make it an immutable tuple to force people through ``addparam``
966 # make it an immutable tuple to force people through ``addparam``
969 return tuple(self._mandatoryparams)
967 return tuple(self._mandatoryparams)
970
968
971 @property
969 @property
972 def advisoryparams(self):
970 def advisoryparams(self):
973 # make it an immutable tuple to force people through ``addparam``
971 # make it an immutable tuple to force people through ``addparam``
974 return tuple(self._advisoryparams)
972 return tuple(self._advisoryparams)
975
973
976 def addparam(self, name, value='', mandatory=True):
974 def addparam(self, name, value='', mandatory=True):
977 """add a parameter to the part
975 """add a parameter to the part
978
976
979 If 'mandatory' is set to True, the remote handler must claim support
977 If 'mandatory' is set to True, the remote handler must claim support
980 for this parameter or the unbundling will be aborted.
978 for this parameter or the unbundling will be aborted.
981
979
982 The 'name' and 'value' cannot exceed 255 bytes each.
980 The 'name' and 'value' cannot exceed 255 bytes each.
983 """
981 """
984 if self._generated is not None:
982 if self._generated is not None:
985 raise error.ReadOnlyPartError('part is being generated')
983 raise error.ReadOnlyPartError('part is being generated')
986 if name in self._seenparams:
984 if name in self._seenparams:
987 raise ValueError('duplicated params: %s' % name)
985 raise ValueError('duplicated params: %s' % name)
988 self._seenparams.add(name)
986 self._seenparams.add(name)
989 params = self._advisoryparams
987 params = self._advisoryparams
990 if mandatory:
988 if mandatory:
991 params = self._mandatoryparams
989 params = self._mandatoryparams
992 params.append((name, value))
990 params.append((name, value))
993
991
994 # methods used to generates the bundle2 stream
992 # methods used to generates the bundle2 stream
995 def getchunks(self, ui):
993 def getchunks(self, ui):
996 if self._generated is not None:
994 if self._generated is not None:
997 raise error.ProgrammingError('part can only be consumed once')
995 raise error.ProgrammingError('part can only be consumed once')
998 self._generated = False
996 self._generated = False
999
997
1000 if ui.debugflag:
998 if ui.debugflag:
1001 msg = ['bundle2-output-part: "%s"' % self.type]
999 msg = ['bundle2-output-part: "%s"' % self.type]
1002 if not self.mandatory:
1000 if not self.mandatory:
1003 msg.append(' (advisory)')
1001 msg.append(' (advisory)')
1004 nbmp = len(self.mandatoryparams)
1002 nbmp = len(self.mandatoryparams)
1005 nbap = len(self.advisoryparams)
1003 nbap = len(self.advisoryparams)
1006 if nbmp or nbap:
1004 if nbmp or nbap:
1007 msg.append(' (params:')
1005 msg.append(' (params:')
1008 if nbmp:
1006 if nbmp:
1009 msg.append(' %i mandatory' % nbmp)
1007 msg.append(' %i mandatory' % nbmp)
1010 if nbap:
1008 if nbap:
1011 msg.append(' %i advisory' % nbmp)
1009 msg.append(' %i advisory' % nbmp)
1012 msg.append(')')
1010 msg.append(')')
1013 if not self.data:
1011 if not self.data:
1014 msg.append(' empty payload')
1012 msg.append(' empty payload')
1015 elif util.safehasattr(self.data, 'next'):
1013 elif util.safehasattr(self.data, 'next'):
1016 msg.append(' streamed payload')
1014 msg.append(' streamed payload')
1017 else:
1015 else:
1018 msg.append(' %i bytes payload' % len(self.data))
1016 msg.append(' %i bytes payload' % len(self.data))
1019 msg.append('\n')
1017 msg.append('\n')
1020 ui.debug(''.join(msg))
1018 ui.debug(''.join(msg))
1021
1019
1022 #### header
1020 #### header
1023 if self.mandatory:
1021 if self.mandatory:
1024 parttype = self.type.upper()
1022 parttype = self.type.upper()
1025 else:
1023 else:
1026 parttype = self.type.lower()
1024 parttype = self.type.lower()
1027 outdebug(ui, 'part %s: "%s"' % (pycompat.bytestr(self.id), parttype))
1025 outdebug(ui, 'part %s: "%s"' % (pycompat.bytestr(self.id), parttype))
1028 ## parttype
1026 ## parttype
1029 header = [_pack(_fparttypesize, len(parttype)),
1027 header = [_pack(_fparttypesize, len(parttype)),
1030 parttype, _pack(_fpartid, self.id),
1028 parttype, _pack(_fpartid, self.id),
1031 ]
1029 ]
1032 ## parameters
1030 ## parameters
1033 # count
1031 # count
1034 manpar = self.mandatoryparams
1032 manpar = self.mandatoryparams
1035 advpar = self.advisoryparams
1033 advpar = self.advisoryparams
1036 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
1034 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
1037 # size
1035 # size
1038 parsizes = []
1036 parsizes = []
1039 for key, value in manpar:
1037 for key, value in manpar:
1040 parsizes.append(len(key))
1038 parsizes.append(len(key))
1041 parsizes.append(len(value))
1039 parsizes.append(len(value))
1042 for key, value in advpar:
1040 for key, value in advpar:
1043 parsizes.append(len(key))
1041 parsizes.append(len(key))
1044 parsizes.append(len(value))
1042 parsizes.append(len(value))
1045 paramsizes = _pack(_makefpartparamsizes(len(parsizes) // 2), *parsizes)
1043 paramsizes = _pack(_makefpartparamsizes(len(parsizes) // 2), *parsizes)
1046 header.append(paramsizes)
1044 header.append(paramsizes)
1047 # key, value
1045 # key, value
1048 for key, value in manpar:
1046 for key, value in manpar:
1049 header.append(key)
1047 header.append(key)
1050 header.append(value)
1048 header.append(value)
1051 for key, value in advpar:
1049 for key, value in advpar:
1052 header.append(key)
1050 header.append(key)
1053 header.append(value)
1051 header.append(value)
1054 ## finalize header
1052 ## finalize header
1055 headerchunk = ''.join(header)
1053 headerchunk = ''.join(header)
1056 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
1054 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
1057 yield _pack(_fpartheadersize, len(headerchunk))
1055 yield _pack(_fpartheadersize, len(headerchunk))
1058 yield headerchunk
1056 yield headerchunk
1059 ## payload
1057 ## payload
1060 try:
1058 try:
1061 for chunk in self._payloadchunks():
1059 for chunk in self._payloadchunks():
1062 outdebug(ui, 'payload chunk size: %i' % len(chunk))
1060 outdebug(ui, 'payload chunk size: %i' % len(chunk))
1063 yield _pack(_fpayloadsize, len(chunk))
1061 yield _pack(_fpayloadsize, len(chunk))
1064 yield chunk
1062 yield chunk
1065 except GeneratorExit:
1063 except GeneratorExit:
1066 # GeneratorExit means that nobody is listening for our
1064 # GeneratorExit means that nobody is listening for our
1067 # results anyway, so just bail quickly rather than trying
1065 # results anyway, so just bail quickly rather than trying
1068 # to produce an error part.
1066 # to produce an error part.
1069 ui.debug('bundle2-generatorexit\n')
1067 ui.debug('bundle2-generatorexit\n')
1070 raise
1068 raise
1071 except BaseException as exc:
1069 except BaseException as exc:
1072 bexc = util.forcebytestr(exc)
1070 bexc = util.forcebytestr(exc)
1073 # backup exception data for later
1071 # backup exception data for later
1074 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
1072 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
1075 % bexc)
1073 % bexc)
1076 tb = sys.exc_info()[2]
1074 tb = sys.exc_info()[2]
1077 msg = 'unexpected error: %s' % bexc
1075 msg = 'unexpected error: %s' % bexc
1078 interpart = bundlepart('error:abort', [('message', msg)],
1076 interpart = bundlepart('error:abort', [('message', msg)],
1079 mandatory=False)
1077 mandatory=False)
1080 interpart.id = 0
1078 interpart.id = 0
1081 yield _pack(_fpayloadsize, -1)
1079 yield _pack(_fpayloadsize, -1)
1082 for chunk in interpart.getchunks(ui=ui):
1080 for chunk in interpart.getchunks(ui=ui):
1083 yield chunk
1081 yield chunk
1084 outdebug(ui, 'closing payload chunk')
1082 outdebug(ui, 'closing payload chunk')
1085 # abort current part payload
1083 # abort current part payload
1086 yield _pack(_fpayloadsize, 0)
1084 yield _pack(_fpayloadsize, 0)
1087 pycompat.raisewithtb(exc, tb)
1085 pycompat.raisewithtb(exc, tb)
1088 # end of payload
1086 # end of payload
1089 outdebug(ui, 'closing payload chunk')
1087 outdebug(ui, 'closing payload chunk')
1090 yield _pack(_fpayloadsize, 0)
1088 yield _pack(_fpayloadsize, 0)
1091 self._generated = True
1089 self._generated = True
1092
1090
1093 def _payloadchunks(self):
1091 def _payloadchunks(self):
1094 """yield chunks of a the part payload
1092 """yield chunks of a the part payload
1095
1093
1096 Exists to handle the different methods to provide data to a part."""
1094 Exists to handle the different methods to provide data to a part."""
1097 # we only support fixed size data now.
1095 # we only support fixed size data now.
1098 # This will be improved in the future.
1096 # This will be improved in the future.
1099 if (util.safehasattr(self.data, 'next')
1097 if (util.safehasattr(self.data, 'next')
1100 or util.safehasattr(self.data, '__next__')):
1098 or util.safehasattr(self.data, '__next__')):
1101 buff = util.chunkbuffer(self.data)
1099 buff = util.chunkbuffer(self.data)
1102 chunk = buff.read(preferedchunksize)
1100 chunk = buff.read(preferedchunksize)
1103 while chunk:
1101 while chunk:
1104 yield chunk
1102 yield chunk
1105 chunk = buff.read(preferedchunksize)
1103 chunk = buff.read(preferedchunksize)
1106 elif len(self.data):
1104 elif len(self.data):
1107 yield self.data
1105 yield self.data
1108
1106
1109
1107
1110 flaginterrupt = -1
1108 flaginterrupt = -1
1111
1109
1112 class interrupthandler(unpackermixin):
1110 class interrupthandler(unpackermixin):
1113 """read one part and process it with restricted capability
1111 """read one part and process it with restricted capability
1114
1112
1115 This allows to transmit exception raised on the producer size during part
1113 This allows to transmit exception raised on the producer size during part
1116 iteration while the consumer is reading a part.
1114 iteration while the consumer is reading a part.
1117
1115
1118 Part processed in this manner only have access to a ui object,"""
1116 Part processed in this manner only have access to a ui object,"""
1119
1117
1120 def __init__(self, ui, fp):
1118 def __init__(self, ui, fp):
1121 super(interrupthandler, self).__init__(fp)
1119 super(interrupthandler, self).__init__(fp)
1122 self.ui = ui
1120 self.ui = ui
1123
1121
1124 def _readpartheader(self):
1122 def _readpartheader(self):
1125 """reads a part header size and return the bytes blob
1123 """reads a part header size and return the bytes blob
1126
1124
1127 returns None if empty"""
1125 returns None if empty"""
1128 headersize = self._unpack(_fpartheadersize)[0]
1126 headersize = self._unpack(_fpartheadersize)[0]
1129 if headersize < 0:
1127 if headersize < 0:
1130 raise error.BundleValueError('negative part header size: %i'
1128 raise error.BundleValueError('negative part header size: %i'
1131 % headersize)
1129 % headersize)
1132 indebug(self.ui, 'part header size: %i\n' % headersize)
1130 indebug(self.ui, 'part header size: %i\n' % headersize)
1133 if headersize:
1131 if headersize:
1134 return self._readexact(headersize)
1132 return self._readexact(headersize)
1135 return None
1133 return None
1136
1134
1137 def __call__(self):
1135 def __call__(self):
1138
1136
1139 self.ui.debug('bundle2-input-stream-interrupt:'
1137 self.ui.debug('bundle2-input-stream-interrupt:'
1140 ' opening out of band context\n')
1138 ' opening out of band context\n')
1141 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1139 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1142 headerblock = self._readpartheader()
1140 headerblock = self._readpartheader()
1143 if headerblock is None:
1141 if headerblock is None:
1144 indebug(self.ui, 'no part found during interruption.')
1142 indebug(self.ui, 'no part found during interruption.')
1145 return
1143 return
1146 part = unbundlepart(self.ui, headerblock, self._fp)
1144 part = unbundlepart(self.ui, headerblock, self._fp)
1147 op = interruptoperation(self.ui)
1145 op = interruptoperation(self.ui)
1148 _processpart(op, part)
1146 _processpart(op, part)
1149 self.ui.debug('bundle2-input-stream-interrupt:'
1147 self.ui.debug('bundle2-input-stream-interrupt:'
1150 ' closing out of band context\n')
1148 ' closing out of band context\n')
1151
1149
1152 class interruptoperation(object):
1150 class interruptoperation(object):
1153 """A limited operation to be use by part handler during interruption
1151 """A limited operation to be use by part handler during interruption
1154
1152
1155 It only have access to an ui object.
1153 It only have access to an ui object.
1156 """
1154 """
1157
1155
1158 def __init__(self, ui):
1156 def __init__(self, ui):
1159 self.ui = ui
1157 self.ui = ui
1160 self.reply = None
1158 self.reply = None
1161 self.captureoutput = False
1159 self.captureoutput = False
1162
1160
1163 @property
1161 @property
1164 def repo(self):
1162 def repo(self):
1165 raise error.ProgrammingError('no repo access from stream interruption')
1163 raise error.ProgrammingError('no repo access from stream interruption')
1166
1164
1167 def gettransaction(self):
1165 def gettransaction(self):
1168 raise TransactionUnavailable('no repo access from stream interruption')
1166 raise TransactionUnavailable('no repo access from stream interruption')
1169
1167
1170 class unbundlepart(unpackermixin):
1168 class unbundlepart(unpackermixin):
1171 """a bundle part read from a bundle"""
1169 """a bundle part read from a bundle"""
1172
1170
1173 def __init__(self, ui, header, fp):
1171 def __init__(self, ui, header, fp):
1174 super(unbundlepart, self).__init__(fp)
1172 super(unbundlepart, self).__init__(fp)
1175 self._seekable = (util.safehasattr(fp, 'seek') and
1173 self._seekable = (util.safehasattr(fp, 'seek') and
1176 util.safehasattr(fp, 'tell'))
1174 util.safehasattr(fp, 'tell'))
1177 self.ui = ui
1175 self.ui = ui
1178 # unbundle state attr
1176 # unbundle state attr
1179 self._headerdata = header
1177 self._headerdata = header
1180 self._headeroffset = 0
1178 self._headeroffset = 0
1181 self._initialized = False
1179 self._initialized = False
1182 self.consumed = False
1180 self.consumed = False
1183 # part data
1181 # part data
1184 self.id = None
1182 self.id = None
1185 self.type = None
1183 self.type = None
1186 self.mandatoryparams = None
1184 self.mandatoryparams = None
1187 self.advisoryparams = None
1185 self.advisoryparams = None
1188 self.params = None
1186 self.params = None
1189 self.mandatorykeys = ()
1187 self.mandatorykeys = ()
1190 self._payloadstream = None
1188 self._payloadstream = None
1191 self._readheader()
1189 self._readheader()
1192 self._mandatory = None
1190 self._mandatory = None
1193 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1191 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1194 self._pos = 0
1192 self._pos = 0
1195
1193
1196 def _fromheader(self, size):
1194 def _fromheader(self, size):
1197 """return the next <size> byte from the header"""
1195 """return the next <size> byte from the header"""
1198 offset = self._headeroffset
1196 offset = self._headeroffset
1199 data = self._headerdata[offset:(offset + size)]
1197 data = self._headerdata[offset:(offset + size)]
1200 self._headeroffset = offset + size
1198 self._headeroffset = offset + size
1201 return data
1199 return data
1202
1200
1203 def _unpackheader(self, format):
1201 def _unpackheader(self, format):
1204 """read given format from header
1202 """read given format from header
1205
1203
1206 This automatically compute the size of the format to read."""
1204 This automatically compute the size of the format to read."""
1207 data = self._fromheader(struct.calcsize(format))
1205 data = self._fromheader(struct.calcsize(format))
1208 return _unpack(format, data)
1206 return _unpack(format, data)
1209
1207
1210 def _initparams(self, mandatoryparams, advisoryparams):
1208 def _initparams(self, mandatoryparams, advisoryparams):
1211 """internal function to setup all logic related parameters"""
1209 """internal function to setup all logic related parameters"""
1212 # make it read only to prevent people touching it by mistake.
1210 # make it read only to prevent people touching it by mistake.
1213 self.mandatoryparams = tuple(mandatoryparams)
1211 self.mandatoryparams = tuple(mandatoryparams)
1214 self.advisoryparams = tuple(advisoryparams)
1212 self.advisoryparams = tuple(advisoryparams)
1215 # user friendly UI
1213 # user friendly UI
1216 self.params = util.sortdict(self.mandatoryparams)
1214 self.params = util.sortdict(self.mandatoryparams)
1217 self.params.update(self.advisoryparams)
1215 self.params.update(self.advisoryparams)
1218 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1216 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1219
1217
1220 def _payloadchunks(self, chunknum=0):
1218 def _payloadchunks(self, chunknum=0):
1221 '''seek to specified chunk and start yielding data'''
1219 '''seek to specified chunk and start yielding data'''
1222 if len(self._chunkindex) == 0:
1220 if len(self._chunkindex) == 0:
1223 assert chunknum == 0, 'Must start with chunk 0'
1221 assert chunknum == 0, 'Must start with chunk 0'
1224 self._chunkindex.append((0, self._tellfp()))
1222 self._chunkindex.append((0, self._tellfp()))
1225 else:
1223 else:
1226 assert chunknum < len(self._chunkindex), \
1224 assert chunknum < len(self._chunkindex), \
1227 'Unknown chunk %d' % chunknum
1225 'Unknown chunk %d' % chunknum
1228 self._seekfp(self._chunkindex[chunknum][1])
1226 self._seekfp(self._chunkindex[chunknum][1])
1229
1227
1230 pos = self._chunkindex[chunknum][0]
1228 pos = self._chunkindex[chunknum][0]
1231 payloadsize = self._unpack(_fpayloadsize)[0]
1229 payloadsize = self._unpack(_fpayloadsize)[0]
1232 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1230 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1233 while payloadsize:
1231 while payloadsize:
1234 if payloadsize == flaginterrupt:
1232 if payloadsize == flaginterrupt:
1235 # interruption detection, the handler will now read a
1233 # interruption detection, the handler will now read a
1236 # single part and process it.
1234 # single part and process it.
1237 interrupthandler(self.ui, self._fp)()
1235 interrupthandler(self.ui, self._fp)()
1238 elif payloadsize < 0:
1236 elif payloadsize < 0:
1239 msg = 'negative payload chunk size: %i' % payloadsize
1237 msg = 'negative payload chunk size: %i' % payloadsize
1240 raise error.BundleValueError(msg)
1238 raise error.BundleValueError(msg)
1241 else:
1239 else:
1242 result = self._readexact(payloadsize)
1240 result = self._readexact(payloadsize)
1243 chunknum += 1
1241 chunknum += 1
1244 pos += payloadsize
1242 pos += payloadsize
1245 if chunknum == len(self._chunkindex):
1243 if chunknum == len(self._chunkindex):
1246 self._chunkindex.append((pos, self._tellfp()))
1244 self._chunkindex.append((pos, self._tellfp()))
1247 yield result
1245 yield result
1248 payloadsize = self._unpack(_fpayloadsize)[0]
1246 payloadsize = self._unpack(_fpayloadsize)[0]
1249 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1247 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1250
1248
1251 def _findchunk(self, pos):
1249 def _findchunk(self, pos):
1252 '''for a given payload position, return a chunk number and offset'''
1250 '''for a given payload position, return a chunk number and offset'''
1253 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1251 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1254 if ppos == pos:
1252 if ppos == pos:
1255 return chunk, 0
1253 return chunk, 0
1256 elif ppos > pos:
1254 elif ppos > pos:
1257 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1255 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1258 raise ValueError('Unknown chunk')
1256 raise ValueError('Unknown chunk')
1259
1257
1260 def _readheader(self):
1258 def _readheader(self):
1261 """read the header and setup the object"""
1259 """read the header and setup the object"""
1262 typesize = self._unpackheader(_fparttypesize)[0]
1260 typesize = self._unpackheader(_fparttypesize)[0]
1263 self.type = self._fromheader(typesize)
1261 self.type = self._fromheader(typesize)
1264 indebug(self.ui, 'part type: "%s"' % self.type)
1262 indebug(self.ui, 'part type: "%s"' % self.type)
1265 self.id = self._unpackheader(_fpartid)[0]
1263 self.id = self._unpackheader(_fpartid)[0]
1266 indebug(self.ui, 'part id: "%s"' % pycompat.bytestr(self.id))
1264 indebug(self.ui, 'part id: "%s"' % pycompat.bytestr(self.id))
1267 # extract mandatory bit from type
1265 # extract mandatory bit from type
1268 self.mandatory = (self.type != self.type.lower())
1266 self.mandatory = (self.type != self.type.lower())
1269 self.type = self.type.lower()
1267 self.type = self.type.lower()
1270 ## reading parameters
1268 ## reading parameters
1271 # param count
1269 # param count
1272 mancount, advcount = self._unpackheader(_fpartparamcount)
1270 mancount, advcount = self._unpackheader(_fpartparamcount)
1273 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1271 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1274 # param size
1272 # param size
1275 fparamsizes = _makefpartparamsizes(mancount + advcount)
1273 fparamsizes = _makefpartparamsizes(mancount + advcount)
1276 paramsizes = self._unpackheader(fparamsizes)
1274 paramsizes = self._unpackheader(fparamsizes)
1277 # make it a list of couple again
1275 # make it a list of couple again
1278 paramsizes = list(zip(paramsizes[::2], paramsizes[1::2]))
1276 paramsizes = list(zip(paramsizes[::2], paramsizes[1::2]))
1279 # split mandatory from advisory
1277 # split mandatory from advisory
1280 mansizes = paramsizes[:mancount]
1278 mansizes = paramsizes[:mancount]
1281 advsizes = paramsizes[mancount:]
1279 advsizes = paramsizes[mancount:]
1282 # retrieve param value
1280 # retrieve param value
1283 manparams = []
1281 manparams = []
1284 for key, value in mansizes:
1282 for key, value in mansizes:
1285 manparams.append((self._fromheader(key), self._fromheader(value)))
1283 manparams.append((self._fromheader(key), self._fromheader(value)))
1286 advparams = []
1284 advparams = []
1287 for key, value in advsizes:
1285 for key, value in advsizes:
1288 advparams.append((self._fromheader(key), self._fromheader(value)))
1286 advparams.append((self._fromheader(key), self._fromheader(value)))
1289 self._initparams(manparams, advparams)
1287 self._initparams(manparams, advparams)
1290 ## part payload
1288 ## part payload
1291 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1289 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1292 # we read the data, tell it
1290 # we read the data, tell it
1293 self._initialized = True
1291 self._initialized = True
1294
1292
1295 def read(self, size=None):
1293 def read(self, size=None):
1296 """read payload data"""
1294 """read payload data"""
1297 if not self._initialized:
1295 if not self._initialized:
1298 self._readheader()
1296 self._readheader()
1299 if size is None:
1297 if size is None:
1300 data = self._payloadstream.read()
1298 data = self._payloadstream.read()
1301 else:
1299 else:
1302 data = self._payloadstream.read(size)
1300 data = self._payloadstream.read(size)
1303 self._pos += len(data)
1301 self._pos += len(data)
1304 if size is None or len(data) < size:
1302 if size is None or len(data) < size:
1305 if not self.consumed and self._pos:
1303 if not self.consumed and self._pos:
1306 self.ui.debug('bundle2-input-part: total payload size %i\n'
1304 self.ui.debug('bundle2-input-part: total payload size %i\n'
1307 % self._pos)
1305 % self._pos)
1308 self.consumed = True
1306 self.consumed = True
1309 return data
1307 return data
1310
1308
1311 def tell(self):
1309 def tell(self):
1312 return self._pos
1310 return self._pos
1313
1311
1314 def seek(self, offset, whence=0):
1312 def seek(self, offset, whence=0):
1315 if whence == 0:
1313 if whence == 0:
1316 newpos = offset
1314 newpos = offset
1317 elif whence == 1:
1315 elif whence == 1:
1318 newpos = self._pos + offset
1316 newpos = self._pos + offset
1319 elif whence == 2:
1317 elif whence == 2:
1320 if not self.consumed:
1318 if not self.consumed:
1321 self.read()
1319 self.read()
1322 newpos = self._chunkindex[-1][0] - offset
1320 newpos = self._chunkindex[-1][0] - offset
1323 else:
1321 else:
1324 raise ValueError('Unknown whence value: %r' % (whence,))
1322 raise ValueError('Unknown whence value: %r' % (whence,))
1325
1323
1326 if newpos > self._chunkindex[-1][0] and not self.consumed:
1324 if newpos > self._chunkindex[-1][0] and not self.consumed:
1327 self.read()
1325 self.read()
1328 if not 0 <= newpos <= self._chunkindex[-1][0]:
1326 if not 0 <= newpos <= self._chunkindex[-1][0]:
1329 raise ValueError('Offset out of range')
1327 raise ValueError('Offset out of range')
1330
1328
1331 if self._pos != newpos:
1329 if self._pos != newpos:
1332 chunk, internaloffset = self._findchunk(newpos)
1330 chunk, internaloffset = self._findchunk(newpos)
1333 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1331 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1334 adjust = self.read(internaloffset)
1332 adjust = self.read(internaloffset)
1335 if len(adjust) != internaloffset:
1333 if len(adjust) != internaloffset:
1336 raise error.Abort(_('Seek failed\n'))
1334 raise error.Abort(_('Seek failed\n'))
1337 self._pos = newpos
1335 self._pos = newpos
1338
1336
1339 def _seekfp(self, offset, whence=0):
1337 def _seekfp(self, offset, whence=0):
1340 """move the underlying file pointer
1338 """move the underlying file pointer
1341
1339
1342 This method is meant for internal usage by the bundle2 protocol only.
1340 This method is meant for internal usage by the bundle2 protocol only.
1343 They directly manipulate the low level stream including bundle2 level
1341 They directly manipulate the low level stream including bundle2 level
1344 instruction.
1342 instruction.
1345
1343
1346 Do not use it to implement higher-level logic or methods."""
1344 Do not use it to implement higher-level logic or methods."""
1347 if self._seekable:
1345 if self._seekable:
1348 return self._fp.seek(offset, whence)
1346 return self._fp.seek(offset, whence)
1349 else:
1347 else:
1350 raise NotImplementedError(_('File pointer is not seekable'))
1348 raise NotImplementedError(_('File pointer is not seekable'))
1351
1349
1352 def _tellfp(self):
1350 def _tellfp(self):
1353 """return the file offset, or None if file is not seekable
1351 """return the file offset, or None if file is not seekable
1354
1352
1355 This method is meant for internal usage by the bundle2 protocol only.
1353 This method is meant for internal usage by the bundle2 protocol only.
1356 They directly manipulate the low level stream including bundle2 level
1354 They directly manipulate the low level stream including bundle2 level
1357 instruction.
1355 instruction.
1358
1356
1359 Do not use it to implement higher-level logic or methods."""
1357 Do not use it to implement higher-level logic or methods."""
1360 if self._seekable:
1358 if self._seekable:
1361 try:
1359 try:
1362 return self._fp.tell()
1360 return self._fp.tell()
1363 except IOError as e:
1361 except IOError as e:
1364 if e.errno == errno.ESPIPE:
1362 if e.errno == errno.ESPIPE:
1365 self._seekable = False
1363 self._seekable = False
1366 else:
1364 else:
1367 raise
1365 raise
1368 return None
1366 return None
1369
1367
1370 # These are only the static capabilities.
1368 # These are only the static capabilities.
1371 # Check the 'getrepocaps' function for the rest.
1369 # Check the 'getrepocaps' function for the rest.
1372 capabilities = {'HG20': (),
1370 capabilities = {'HG20': (),
1373 'error': ('abort', 'unsupportedcontent', 'pushraced',
1371 'error': ('abort', 'unsupportedcontent', 'pushraced',
1374 'pushkey'),
1372 'pushkey'),
1375 'listkeys': (),
1373 'listkeys': (),
1376 'pushkey': (),
1374 'pushkey': (),
1377 'digests': tuple(sorted(util.DIGESTS.keys())),
1375 'digests': tuple(sorted(util.DIGESTS.keys())),
1378 'remote-changegroup': ('http', 'https'),
1376 'remote-changegroup': ('http', 'https'),
1379 'hgtagsfnodes': (),
1377 'hgtagsfnodes': (),
1380 }
1378 }
1381
1379
1382 def getrepocaps(repo, allowpushback=False):
1380 def getrepocaps(repo, allowpushback=False):
1383 """return the bundle2 capabilities for a given repo
1381 """return the bundle2 capabilities for a given repo
1384
1382
1385 Exists to allow extensions (like evolution) to mutate the capabilities.
1383 Exists to allow extensions (like evolution) to mutate the capabilities.
1386 """
1384 """
1387 caps = capabilities.copy()
1385 caps = capabilities.copy()
1388 caps['changegroup'] = tuple(sorted(
1386 caps['changegroup'] = tuple(sorted(
1389 changegroup.supportedincomingversions(repo)))
1387 changegroup.supportedincomingversions(repo)))
1390 if obsolete.isenabled(repo, obsolete.exchangeopt):
1388 if obsolete.isenabled(repo, obsolete.exchangeopt):
1391 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1389 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1392 caps['obsmarkers'] = supportedformat
1390 caps['obsmarkers'] = supportedformat
1393 if allowpushback:
1391 if allowpushback:
1394 caps['pushback'] = ()
1392 caps['pushback'] = ()
1395 cpmode = repo.ui.config('server', 'concurrent-push-mode')
1393 cpmode = repo.ui.config('server', 'concurrent-push-mode')
1396 if cpmode == 'check-related':
1394 if cpmode == 'check-related':
1397 caps['checkheads'] = ('related',)
1395 caps['checkheads'] = ('related',)
1398 return caps
1396 return caps
1399
1397
1400 def bundle2caps(remote):
1398 def bundle2caps(remote):
1401 """return the bundle capabilities of a peer as dict"""
1399 """return the bundle capabilities of a peer as dict"""
1402 raw = remote.capable('bundle2')
1400 raw = remote.capable('bundle2')
1403 if not raw and raw != '':
1401 if not raw and raw != '':
1404 return {}
1402 return {}
1405 capsblob = urlreq.unquote(remote.capable('bundle2'))
1403 capsblob = urlreq.unquote(remote.capable('bundle2'))
1406 return decodecaps(capsblob)
1404 return decodecaps(capsblob)
1407
1405
1408 def obsmarkersversion(caps):
1406 def obsmarkersversion(caps):
1409 """extract the list of supported obsmarkers versions from a bundle2caps dict
1407 """extract the list of supported obsmarkers versions from a bundle2caps dict
1410 """
1408 """
1411 obscaps = caps.get('obsmarkers', ())
1409 obscaps = caps.get('obsmarkers', ())
1412 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1410 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1413
1411
1414 def writenewbundle(ui, repo, source, filename, bundletype, outgoing, opts,
1412 def writenewbundle(ui, repo, source, filename, bundletype, outgoing, opts,
1415 vfs=None, compression=None, compopts=None):
1413 vfs=None, compression=None, compopts=None):
1416 if bundletype.startswith('HG10'):
1414 if bundletype.startswith('HG10'):
1417 cg = changegroup.makechangegroup(repo, outgoing, '01', source)
1415 cg = changegroup.makechangegroup(repo, outgoing, '01', source)
1418 return writebundle(ui, cg, filename, bundletype, vfs=vfs,
1416 return writebundle(ui, cg, filename, bundletype, vfs=vfs,
1419 compression=compression, compopts=compopts)
1417 compression=compression, compopts=compopts)
1420 elif not bundletype.startswith('HG20'):
1418 elif not bundletype.startswith('HG20'):
1421 raise error.ProgrammingError('unknown bundle type: %s' % bundletype)
1419 raise error.ProgrammingError('unknown bundle type: %s' % bundletype)
1422
1420
1423 caps = {}
1421 caps = {}
1424 if 'obsolescence' in opts:
1422 if 'obsolescence' in opts:
1425 caps['obsmarkers'] = ('V1',)
1423 caps['obsmarkers'] = ('V1',)
1426 bundle = bundle20(ui, caps)
1424 bundle = bundle20(ui, caps)
1427 bundle.setcompression(compression, compopts)
1425 bundle.setcompression(compression, compopts)
1428 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1426 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1429 chunkiter = bundle.getchunks()
1427 chunkiter = bundle.getchunks()
1430
1428
1431 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1429 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1432
1430
1433 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1431 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1434 # We should eventually reconcile this logic with the one behind
1432 # We should eventually reconcile this logic with the one behind
1435 # 'exchange.getbundle2partsgenerator'.
1433 # 'exchange.getbundle2partsgenerator'.
1436 #
1434 #
1437 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1435 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1438 # different right now. So we keep them separated for now for the sake of
1436 # different right now. So we keep them separated for now for the sake of
1439 # simplicity.
1437 # simplicity.
1440
1438
1441 # we always want a changegroup in such bundle
1439 # we always want a changegroup in such bundle
1442 cgversion = opts.get('cg.version')
1440 cgversion = opts.get('cg.version')
1443 if cgversion is None:
1441 if cgversion is None:
1444 cgversion = changegroup.safeversion(repo)
1442 cgversion = changegroup.safeversion(repo)
1445 cg = changegroup.makechangegroup(repo, outgoing, cgversion, source)
1443 cg = changegroup.makechangegroup(repo, outgoing, cgversion, source)
1446 part = bundler.newpart('changegroup', data=cg.getchunks())
1444 part = bundler.newpart('changegroup', data=cg.getchunks())
1447 part.addparam('version', cg.version)
1445 part.addparam('version', cg.version)
1448 if 'clcount' in cg.extras:
1446 if 'clcount' in cg.extras:
1449 part.addparam('nbchanges', str(cg.extras['clcount']),
1447 part.addparam('nbchanges', str(cg.extras['clcount']),
1450 mandatory=False)
1448 mandatory=False)
1451 if opts.get('phases') and repo.revs('%ln and secret()',
1449 if opts.get('phases') and repo.revs('%ln and secret()',
1452 outgoing.missingheads):
1450 outgoing.missingheads):
1453 part.addparam('targetphase', '%d' % phases.secret, mandatory=False)
1451 part.addparam('targetphase', '%d' % phases.secret, mandatory=False)
1454
1452
1455 addparttagsfnodescache(repo, bundler, outgoing)
1453 addparttagsfnodescache(repo, bundler, outgoing)
1456
1454
1457 if opts.get('obsolescence', False):
1455 if opts.get('obsolescence', False):
1458 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1456 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1459 buildobsmarkerspart(bundler, obsmarkers)
1457 buildobsmarkerspart(bundler, obsmarkers)
1460
1458
1461 if opts.get('phases', False):
1459 if opts.get('phases', False):
1462 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1460 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1463 phasedata = []
1461 phasedata = []
1464 for phase in phases.allphases:
1462 for phase in phases.allphases:
1465 for head in headsbyphase[phase]:
1463 for head in headsbyphase[phase]:
1466 phasedata.append(_pack(_fphasesentry, phase, head))
1464 phasedata.append(_pack(_fphasesentry, phase, head))
1467 bundler.newpart('phase-heads', data=''.join(phasedata))
1465 bundler.newpart('phase-heads', data=''.join(phasedata))
1468
1466
1469 def addparttagsfnodescache(repo, bundler, outgoing):
1467 def addparttagsfnodescache(repo, bundler, outgoing):
1470 # we include the tags fnode cache for the bundle changeset
1468 # we include the tags fnode cache for the bundle changeset
1471 # (as an optional parts)
1469 # (as an optional parts)
1472 cache = tags.hgtagsfnodescache(repo.unfiltered())
1470 cache = tags.hgtagsfnodescache(repo.unfiltered())
1473 chunks = []
1471 chunks = []
1474
1472
1475 # .hgtags fnodes are only relevant for head changesets. While we could
1473 # .hgtags fnodes are only relevant for head changesets. While we could
1476 # transfer values for all known nodes, there will likely be little to
1474 # transfer values for all known nodes, there will likely be little to
1477 # no benefit.
1475 # no benefit.
1478 #
1476 #
1479 # We don't bother using a generator to produce output data because
1477 # We don't bother using a generator to produce output data because
1480 # a) we only have 40 bytes per head and even esoteric numbers of heads
1478 # a) we only have 40 bytes per head and even esoteric numbers of heads
1481 # consume little memory (1M heads is 40MB) b) we don't want to send the
1479 # consume little memory (1M heads is 40MB) b) we don't want to send the
1482 # part if we don't have entries and knowing if we have entries requires
1480 # part if we don't have entries and knowing if we have entries requires
1483 # cache lookups.
1481 # cache lookups.
1484 for node in outgoing.missingheads:
1482 for node in outgoing.missingheads:
1485 # Don't compute missing, as this may slow down serving.
1483 # Don't compute missing, as this may slow down serving.
1486 fnode = cache.getfnode(node, computemissing=False)
1484 fnode = cache.getfnode(node, computemissing=False)
1487 if fnode is not None:
1485 if fnode is not None:
1488 chunks.extend([node, fnode])
1486 chunks.extend([node, fnode])
1489
1487
1490 if chunks:
1488 if chunks:
1491 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1489 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1492
1490
1493 def buildobsmarkerspart(bundler, markers):
1491 def buildobsmarkerspart(bundler, markers):
1494 """add an obsmarker part to the bundler with <markers>
1492 """add an obsmarker part to the bundler with <markers>
1495
1493
1496 No part is created if markers is empty.
1494 No part is created if markers is empty.
1497 Raises ValueError if the bundler doesn't support any known obsmarker format.
1495 Raises ValueError if the bundler doesn't support any known obsmarker format.
1498 """
1496 """
1499 if not markers:
1497 if not markers:
1500 return None
1498 return None
1501
1499
1502 remoteversions = obsmarkersversion(bundler.capabilities)
1500 remoteversions = obsmarkersversion(bundler.capabilities)
1503 version = obsolete.commonversion(remoteversions)
1501 version = obsolete.commonversion(remoteversions)
1504 if version is None:
1502 if version is None:
1505 raise ValueError('bundler does not support common obsmarker format')
1503 raise ValueError('bundler does not support common obsmarker format')
1506 stream = obsolete.encodemarkers(markers, True, version=version)
1504 stream = obsolete.encodemarkers(markers, True, version=version)
1507 return bundler.newpart('obsmarkers', data=stream)
1505 return bundler.newpart('obsmarkers', data=stream)
1508
1506
1509 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1507 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1510 compopts=None):
1508 compopts=None):
1511 """Write a bundle file and return its filename.
1509 """Write a bundle file and return its filename.
1512
1510
1513 Existing files will not be overwritten.
1511 Existing files will not be overwritten.
1514 If no filename is specified, a temporary file is created.
1512 If no filename is specified, a temporary file is created.
1515 bz2 compression can be turned off.
1513 bz2 compression can be turned off.
1516 The bundle file will be deleted in case of errors.
1514 The bundle file will be deleted in case of errors.
1517 """
1515 """
1518
1516
1519 if bundletype == "HG20":
1517 if bundletype == "HG20":
1520 bundle = bundle20(ui)
1518 bundle = bundle20(ui)
1521 bundle.setcompression(compression, compopts)
1519 bundle.setcompression(compression, compopts)
1522 part = bundle.newpart('changegroup', data=cg.getchunks())
1520 part = bundle.newpart('changegroup', data=cg.getchunks())
1523 part.addparam('version', cg.version)
1521 part.addparam('version', cg.version)
1524 if 'clcount' in cg.extras:
1522 if 'clcount' in cg.extras:
1525 part.addparam('nbchanges', str(cg.extras['clcount']),
1523 part.addparam('nbchanges', str(cg.extras['clcount']),
1526 mandatory=False)
1524 mandatory=False)
1527 chunkiter = bundle.getchunks()
1525 chunkiter = bundle.getchunks()
1528 else:
1526 else:
1529 # compression argument is only for the bundle2 case
1527 # compression argument is only for the bundle2 case
1530 assert compression is None
1528 assert compression is None
1531 if cg.version != '01':
1529 if cg.version != '01':
1532 raise error.Abort(_('old bundle types only supports v1 '
1530 raise error.Abort(_('old bundle types only supports v1 '
1533 'changegroups'))
1531 'changegroups'))
1534 header, comp = bundletypes[bundletype]
1532 header, comp = bundletypes[bundletype]
1535 if comp not in util.compengines.supportedbundletypes:
1533 if comp not in util.compengines.supportedbundletypes:
1536 raise error.Abort(_('unknown stream compression type: %s')
1534 raise error.Abort(_('unknown stream compression type: %s')
1537 % comp)
1535 % comp)
1538 compengine = util.compengines.forbundletype(comp)
1536 compengine = util.compengines.forbundletype(comp)
1539 def chunkiter():
1537 def chunkiter():
1540 yield header
1538 yield header
1541 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1539 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1542 yield chunk
1540 yield chunk
1543 chunkiter = chunkiter()
1541 chunkiter = chunkiter()
1544
1542
1545 # parse the changegroup data, otherwise we will block
1543 # parse the changegroup data, otherwise we will block
1546 # in case of sshrepo because we don't know the end of the stream
1544 # in case of sshrepo because we don't know the end of the stream
1547 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1545 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1548
1546
1549 def combinechangegroupresults(op):
1547 def combinechangegroupresults(op):
1550 """logic to combine 0 or more addchangegroup results into one"""
1548 """logic to combine 0 or more addchangegroup results into one"""
1551 results = [r.get('return', 0)
1549 results = [r.get('return', 0)
1552 for r in op.records['changegroup']]
1550 for r in op.records['changegroup']]
1553 changedheads = 0
1551 changedheads = 0
1554 result = 1
1552 result = 1
1555 for ret in results:
1553 for ret in results:
1556 # If any changegroup result is 0, return 0
1554 # If any changegroup result is 0, return 0
1557 if ret == 0:
1555 if ret == 0:
1558 result = 0
1556 result = 0
1559 break
1557 break
1560 if ret < -1:
1558 if ret < -1:
1561 changedheads += ret + 1
1559 changedheads += ret + 1
1562 elif ret > 1:
1560 elif ret > 1:
1563 changedheads += ret - 1
1561 changedheads += ret - 1
1564 if changedheads > 0:
1562 if changedheads > 0:
1565 result = 1 + changedheads
1563 result = 1 + changedheads
1566 elif changedheads < 0:
1564 elif changedheads < 0:
1567 result = -1 + changedheads
1565 result = -1 + changedheads
1568 return result
1566 return result
1569
1567
1570 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest',
1568 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest',
1571 'targetphase'))
1569 'targetphase'))
1572 def handlechangegroup(op, inpart):
1570 def handlechangegroup(op, inpart):
1573 """apply a changegroup part on the repo
1571 """apply a changegroup part on the repo
1574
1572
1575 This is a very early implementation that will massive rework before being
1573 This is a very early implementation that will massive rework before being
1576 inflicted to any end-user.
1574 inflicted to any end-user.
1577 """
1575 """
1578 tr = op.gettransaction()
1576 tr = op.gettransaction()
1579 unpackerversion = inpart.params.get('version', '01')
1577 unpackerversion = inpart.params.get('version', '01')
1580 # We should raise an appropriate exception here
1578 # We should raise an appropriate exception here
1581 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1579 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1582 # the source and url passed here are overwritten by the one contained in
1580 # the source and url passed here are overwritten by the one contained in
1583 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1581 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1584 nbchangesets = None
1582 nbchangesets = None
1585 if 'nbchanges' in inpart.params:
1583 if 'nbchanges' in inpart.params:
1586 nbchangesets = int(inpart.params.get('nbchanges'))
1584 nbchangesets = int(inpart.params.get('nbchanges'))
1587 if ('treemanifest' in inpart.params and
1585 if ('treemanifest' in inpart.params and
1588 'treemanifest' not in op.repo.requirements):
1586 'treemanifest' not in op.repo.requirements):
1589 if len(op.repo.changelog) != 0:
1587 if len(op.repo.changelog) != 0:
1590 raise error.Abort(_(
1588 raise error.Abort(_(
1591 "bundle contains tree manifests, but local repo is "
1589 "bundle contains tree manifests, but local repo is "
1592 "non-empty and does not use tree manifests"))
1590 "non-empty and does not use tree manifests"))
1593 op.repo.requirements.add('treemanifest')
1591 op.repo.requirements.add('treemanifest')
1594 op.repo._applyopenerreqs()
1592 op.repo._applyopenerreqs()
1595 op.repo._writerequirements()
1593 op.repo._writerequirements()
1596 extrakwargs = {}
1594 extrakwargs = {}
1597 targetphase = inpart.params.get('targetphase')
1595 targetphase = inpart.params.get('targetphase')
1598 if targetphase is not None:
1596 if targetphase is not None:
1599 extrakwargs['targetphase'] = int(targetphase)
1597 extrakwargs['targetphase'] = int(targetphase)
1600 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2',
1598 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2',
1601 expectedtotal=nbchangesets, **extrakwargs)
1599 expectedtotal=nbchangesets, **extrakwargs)
1602 if op.reply is not None:
1600 if op.reply is not None:
1603 # This is definitely not the final form of this
1601 # This is definitely not the final form of this
1604 # return. But one need to start somewhere.
1602 # return. But one need to start somewhere.
1605 part = op.reply.newpart('reply:changegroup', mandatory=False)
1603 part = op.reply.newpart('reply:changegroup', mandatory=False)
1606 part.addparam(
1604 part.addparam(
1607 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1605 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1608 part.addparam('return', '%i' % ret, mandatory=False)
1606 part.addparam('return', '%i' % ret, mandatory=False)
1609 assert not inpart.read()
1607 assert not inpart.read()
1610
1608
1611 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1609 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1612 ['digest:%s' % k for k in util.DIGESTS.keys()])
1610 ['digest:%s' % k for k in util.DIGESTS.keys()])
1613 @parthandler('remote-changegroup', _remotechangegroupparams)
1611 @parthandler('remote-changegroup', _remotechangegroupparams)
1614 def handleremotechangegroup(op, inpart):
1612 def handleremotechangegroup(op, inpart):
1615 """apply a bundle10 on the repo, given an url and validation information
1613 """apply a bundle10 on the repo, given an url and validation information
1616
1614
1617 All the information about the remote bundle to import are given as
1615 All the information about the remote bundle to import are given as
1618 parameters. The parameters include:
1616 parameters. The parameters include:
1619 - url: the url to the bundle10.
1617 - url: the url to the bundle10.
1620 - size: the bundle10 file size. It is used to validate what was
1618 - size: the bundle10 file size. It is used to validate what was
1621 retrieved by the client matches the server knowledge about the bundle.
1619 retrieved by the client matches the server knowledge about the bundle.
1622 - digests: a space separated list of the digest types provided as
1620 - digests: a space separated list of the digest types provided as
1623 parameters.
1621 parameters.
1624 - digest:<digest-type>: the hexadecimal representation of the digest with
1622 - digest:<digest-type>: the hexadecimal representation of the digest with
1625 that name. Like the size, it is used to validate what was retrieved by
1623 that name. Like the size, it is used to validate what was retrieved by
1626 the client matches what the server knows about the bundle.
1624 the client matches what the server knows about the bundle.
1627
1625
1628 When multiple digest types are given, all of them are checked.
1626 When multiple digest types are given, all of them are checked.
1629 """
1627 """
1630 try:
1628 try:
1631 raw_url = inpart.params['url']
1629 raw_url = inpart.params['url']
1632 except KeyError:
1630 except KeyError:
1633 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1631 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1634 parsed_url = util.url(raw_url)
1632 parsed_url = util.url(raw_url)
1635 if parsed_url.scheme not in capabilities['remote-changegroup']:
1633 if parsed_url.scheme not in capabilities['remote-changegroup']:
1636 raise error.Abort(_('remote-changegroup does not support %s urls') %
1634 raise error.Abort(_('remote-changegroup does not support %s urls') %
1637 parsed_url.scheme)
1635 parsed_url.scheme)
1638
1636
1639 try:
1637 try:
1640 size = int(inpart.params['size'])
1638 size = int(inpart.params['size'])
1641 except ValueError:
1639 except ValueError:
1642 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1640 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1643 % 'size')
1641 % 'size')
1644 except KeyError:
1642 except KeyError:
1645 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1643 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1646
1644
1647 digests = {}
1645 digests = {}
1648 for typ in inpart.params.get('digests', '').split():
1646 for typ in inpart.params.get('digests', '').split():
1649 param = 'digest:%s' % typ
1647 param = 'digest:%s' % typ
1650 try:
1648 try:
1651 value = inpart.params[param]
1649 value = inpart.params[param]
1652 except KeyError:
1650 except KeyError:
1653 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1651 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1654 param)
1652 param)
1655 digests[typ] = value
1653 digests[typ] = value
1656
1654
1657 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1655 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1658
1656
1659 tr = op.gettransaction()
1657 tr = op.gettransaction()
1660 from . import exchange
1658 from . import exchange
1661 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1659 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1662 if not isinstance(cg, changegroup.cg1unpacker):
1660 if not isinstance(cg, changegroup.cg1unpacker):
1663 raise error.Abort(_('%s: not a bundle version 1.0') %
1661 raise error.Abort(_('%s: not a bundle version 1.0') %
1664 util.hidepassword(raw_url))
1662 util.hidepassword(raw_url))
1665 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2')
1663 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2')
1666 if op.reply is not None:
1664 if op.reply is not None:
1667 # This is definitely not the final form of this
1665 # This is definitely not the final form of this
1668 # return. But one need to start somewhere.
1666 # return. But one need to start somewhere.
1669 part = op.reply.newpart('reply:changegroup')
1667 part = op.reply.newpart('reply:changegroup')
1670 part.addparam(
1668 part.addparam(
1671 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1669 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1672 part.addparam('return', '%i' % ret, mandatory=False)
1670 part.addparam('return', '%i' % ret, mandatory=False)
1673 try:
1671 try:
1674 real_part.validate()
1672 real_part.validate()
1675 except error.Abort as e:
1673 except error.Abort as e:
1676 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1674 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1677 (util.hidepassword(raw_url), str(e)))
1675 (util.hidepassword(raw_url), str(e)))
1678 assert not inpart.read()
1676 assert not inpart.read()
1679
1677
1680 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1678 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1681 def handlereplychangegroup(op, inpart):
1679 def handlereplychangegroup(op, inpart):
1682 ret = int(inpart.params['return'])
1680 ret = int(inpart.params['return'])
1683 replyto = int(inpart.params['in-reply-to'])
1681 replyto = int(inpart.params['in-reply-to'])
1684 op.records.add('changegroup', {'return': ret}, replyto)
1682 op.records.add('changegroup', {'return': ret}, replyto)
1685
1683
1686 @parthandler('check:heads')
1684 @parthandler('check:heads')
1687 def handlecheckheads(op, inpart):
1685 def handlecheckheads(op, inpart):
1688 """check that head of the repo did not change
1686 """check that head of the repo did not change
1689
1687
1690 This is used to detect a push race when using unbundle.
1688 This is used to detect a push race when using unbundle.
1691 This replaces the "heads" argument of unbundle."""
1689 This replaces the "heads" argument of unbundle."""
1692 h = inpart.read(20)
1690 h = inpart.read(20)
1693 heads = []
1691 heads = []
1694 while len(h) == 20:
1692 while len(h) == 20:
1695 heads.append(h)
1693 heads.append(h)
1696 h = inpart.read(20)
1694 h = inpart.read(20)
1697 assert not h
1695 assert not h
1698 # Trigger a transaction so that we are guaranteed to have the lock now.
1696 # Trigger a transaction so that we are guaranteed to have the lock now.
1699 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1697 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1700 op.gettransaction()
1698 op.gettransaction()
1701 if sorted(heads) != sorted(op.repo.heads()):
1699 if sorted(heads) != sorted(op.repo.heads()):
1702 raise error.PushRaced('repository changed while pushing - '
1700 raise error.PushRaced('repository changed while pushing - '
1703 'please try again')
1701 'please try again')
1704
1702
1705 @parthandler('check:updated-heads')
1703 @parthandler('check:updated-heads')
1706 def handlecheckupdatedheads(op, inpart):
1704 def handlecheckupdatedheads(op, inpart):
1707 """check for race on the heads touched by a push
1705 """check for race on the heads touched by a push
1708
1706
1709 This is similar to 'check:heads' but focus on the heads actually updated
1707 This is similar to 'check:heads' but focus on the heads actually updated
1710 during the push. If other activities happen on unrelated heads, it is
1708 during the push. If other activities happen on unrelated heads, it is
1711 ignored.
1709 ignored.
1712
1710
1713 This allow server with high traffic to avoid push contention as long as
1711 This allow server with high traffic to avoid push contention as long as
1714 unrelated parts of the graph are involved."""
1712 unrelated parts of the graph are involved."""
1715 h = inpart.read(20)
1713 h = inpart.read(20)
1716 heads = []
1714 heads = []
1717 while len(h) == 20:
1715 while len(h) == 20:
1718 heads.append(h)
1716 heads.append(h)
1719 h = inpart.read(20)
1717 h = inpart.read(20)
1720 assert not h
1718 assert not h
1721 # trigger a transaction so that we are guaranteed to have the lock now.
1719 # trigger a transaction so that we are guaranteed to have the lock now.
1722 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1720 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1723 op.gettransaction()
1721 op.gettransaction()
1724
1722
1725 currentheads = set()
1723 currentheads = set()
1726 for ls in op.repo.branchmap().itervalues():
1724 for ls in op.repo.branchmap().itervalues():
1727 currentheads.update(ls)
1725 currentheads.update(ls)
1728
1726
1729 for h in heads:
1727 for h in heads:
1730 if h not in currentheads:
1728 if h not in currentheads:
1731 raise error.PushRaced('repository changed while pushing - '
1729 raise error.PushRaced('repository changed while pushing - '
1732 'please try again')
1730 'please try again')
1733
1731
1734 @parthandler('output')
1732 @parthandler('output')
1735 def handleoutput(op, inpart):
1733 def handleoutput(op, inpart):
1736 """forward output captured on the server to the client"""
1734 """forward output captured on the server to the client"""
1737 for line in inpart.read().splitlines():
1735 for line in inpart.read().splitlines():
1738 op.ui.status(_('remote: %s\n') % line)
1736 op.ui.status(_('remote: %s\n') % line)
1739
1737
1740 @parthandler('replycaps')
1738 @parthandler('replycaps')
1741 def handlereplycaps(op, inpart):
1739 def handlereplycaps(op, inpart):
1742 """Notify that a reply bundle should be created
1740 """Notify that a reply bundle should be created
1743
1741
1744 The payload contains the capabilities information for the reply"""
1742 The payload contains the capabilities information for the reply"""
1745 caps = decodecaps(inpart.read())
1743 caps = decodecaps(inpart.read())
1746 if op.reply is None:
1744 if op.reply is None:
1747 op.reply = bundle20(op.ui, caps)
1745 op.reply = bundle20(op.ui, caps)
1748
1746
1749 class AbortFromPart(error.Abort):
1747 class AbortFromPart(error.Abort):
1750 """Sub-class of Abort that denotes an error from a bundle2 part."""
1748 """Sub-class of Abort that denotes an error from a bundle2 part."""
1751
1749
1752 @parthandler('error:abort', ('message', 'hint'))
1750 @parthandler('error:abort', ('message', 'hint'))
1753 def handleerrorabort(op, inpart):
1751 def handleerrorabort(op, inpart):
1754 """Used to transmit abort error over the wire"""
1752 """Used to transmit abort error over the wire"""
1755 raise AbortFromPart(inpart.params['message'],
1753 raise AbortFromPart(inpart.params['message'],
1756 hint=inpart.params.get('hint'))
1754 hint=inpart.params.get('hint'))
1757
1755
1758 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1756 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1759 'in-reply-to'))
1757 'in-reply-to'))
1760 def handleerrorpushkey(op, inpart):
1758 def handleerrorpushkey(op, inpart):
1761 """Used to transmit failure of a mandatory pushkey over the wire"""
1759 """Used to transmit failure of a mandatory pushkey over the wire"""
1762 kwargs = {}
1760 kwargs = {}
1763 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1761 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1764 value = inpart.params.get(name)
1762 value = inpart.params.get(name)
1765 if value is not None:
1763 if value is not None:
1766 kwargs[name] = value
1764 kwargs[name] = value
1767 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1765 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1768
1766
1769 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1767 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1770 def handleerrorunsupportedcontent(op, inpart):
1768 def handleerrorunsupportedcontent(op, inpart):
1771 """Used to transmit unknown content error over the wire"""
1769 """Used to transmit unknown content error over the wire"""
1772 kwargs = {}
1770 kwargs = {}
1773 parttype = inpart.params.get('parttype')
1771 parttype = inpart.params.get('parttype')
1774 if parttype is not None:
1772 if parttype is not None:
1775 kwargs['parttype'] = parttype
1773 kwargs['parttype'] = parttype
1776 params = inpart.params.get('params')
1774 params = inpart.params.get('params')
1777 if params is not None:
1775 if params is not None:
1778 kwargs['params'] = params.split('\0')
1776 kwargs['params'] = params.split('\0')
1779
1777
1780 raise error.BundleUnknownFeatureError(**kwargs)
1778 raise error.BundleUnknownFeatureError(**kwargs)
1781
1779
1782 @parthandler('error:pushraced', ('message',))
1780 @parthandler('error:pushraced', ('message',))
1783 def handleerrorpushraced(op, inpart):
1781 def handleerrorpushraced(op, inpart):
1784 """Used to transmit push race error over the wire"""
1782 """Used to transmit push race error over the wire"""
1785 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1783 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1786
1784
1787 @parthandler('listkeys', ('namespace',))
1785 @parthandler('listkeys', ('namespace',))
1788 def handlelistkeys(op, inpart):
1786 def handlelistkeys(op, inpart):
1789 """retrieve pushkey namespace content stored in a bundle2"""
1787 """retrieve pushkey namespace content stored in a bundle2"""
1790 namespace = inpart.params['namespace']
1788 namespace = inpart.params['namespace']
1791 r = pushkey.decodekeys(inpart.read())
1789 r = pushkey.decodekeys(inpart.read())
1792 op.records.add('listkeys', (namespace, r))
1790 op.records.add('listkeys', (namespace, r))
1793
1791
1794 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1792 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1795 def handlepushkey(op, inpart):
1793 def handlepushkey(op, inpart):
1796 """process a pushkey request"""
1794 """process a pushkey request"""
1797 dec = pushkey.decode
1795 dec = pushkey.decode
1798 namespace = dec(inpart.params['namespace'])
1796 namespace = dec(inpart.params['namespace'])
1799 key = dec(inpart.params['key'])
1797 key = dec(inpart.params['key'])
1800 old = dec(inpart.params['old'])
1798 old = dec(inpart.params['old'])
1801 new = dec(inpart.params['new'])
1799 new = dec(inpart.params['new'])
1802 # Grab the transaction to ensure that we have the lock before performing the
1800 # Grab the transaction to ensure that we have the lock before performing the
1803 # pushkey.
1801 # pushkey.
1804 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1802 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1805 op.gettransaction()
1803 op.gettransaction()
1806 ret = op.repo.pushkey(namespace, key, old, new)
1804 ret = op.repo.pushkey(namespace, key, old, new)
1807 record = {'namespace': namespace,
1805 record = {'namespace': namespace,
1808 'key': key,
1806 'key': key,
1809 'old': old,
1807 'old': old,
1810 'new': new}
1808 'new': new}
1811 op.records.add('pushkey', record)
1809 op.records.add('pushkey', record)
1812 if op.reply is not None:
1810 if op.reply is not None:
1813 rpart = op.reply.newpart('reply:pushkey')
1811 rpart = op.reply.newpart('reply:pushkey')
1814 rpart.addparam(
1812 rpart.addparam(
1815 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1813 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1816 rpart.addparam('return', '%i' % ret, mandatory=False)
1814 rpart.addparam('return', '%i' % ret, mandatory=False)
1817 if inpart.mandatory and not ret:
1815 if inpart.mandatory and not ret:
1818 kwargs = {}
1816 kwargs = {}
1819 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1817 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1820 if key in inpart.params:
1818 if key in inpart.params:
1821 kwargs[key] = inpart.params[key]
1819 kwargs[key] = inpart.params[key]
1822 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1820 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1823
1821
1824 def _readphaseheads(inpart):
1822 def _readphaseheads(inpart):
1825 headsbyphase = [[] for i in phases.allphases]
1823 headsbyphase = [[] for i in phases.allphases]
1826 entrysize = struct.calcsize(_fphasesentry)
1824 entrysize = struct.calcsize(_fphasesentry)
1827 while True:
1825 while True:
1828 entry = inpart.read(entrysize)
1826 entry = inpart.read(entrysize)
1829 if len(entry) < entrysize:
1827 if len(entry) < entrysize:
1830 if entry:
1828 if entry:
1831 raise error.Abort(_('bad phase-heads bundle part'))
1829 raise error.Abort(_('bad phase-heads bundle part'))
1832 break
1830 break
1833 phase, node = struct.unpack(_fphasesentry, entry)
1831 phase, node = struct.unpack(_fphasesentry, entry)
1834 headsbyphase[phase].append(node)
1832 headsbyphase[phase].append(node)
1835 return headsbyphase
1833 return headsbyphase
1836
1834
1837 @parthandler('phase-heads')
1835 @parthandler('phase-heads')
1838 def handlephases(op, inpart):
1836 def handlephases(op, inpart):
1839 """apply phases from bundle part to repo"""
1837 """apply phases from bundle part to repo"""
1840 headsbyphase = _readphaseheads(inpart)
1838 headsbyphase = _readphaseheads(inpart)
1841 phases.updatephases(op.repo.unfiltered(), op.gettransaction(), headsbyphase)
1839 phases.updatephases(op.repo.unfiltered(), op.gettransaction(), headsbyphase)
1842 op.records.add('phase-heads', {})
1840 op.records.add('phase-heads', {})
1843
1841
1844 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1842 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1845 def handlepushkeyreply(op, inpart):
1843 def handlepushkeyreply(op, inpart):
1846 """retrieve the result of a pushkey request"""
1844 """retrieve the result of a pushkey request"""
1847 ret = int(inpart.params['return'])
1845 ret = int(inpart.params['return'])
1848 partid = int(inpart.params['in-reply-to'])
1846 partid = int(inpart.params['in-reply-to'])
1849 op.records.add('pushkey', {'return': ret}, partid)
1847 op.records.add('pushkey', {'return': ret}, partid)
1850
1848
1851 @parthandler('obsmarkers')
1849 @parthandler('obsmarkers')
1852 def handleobsmarker(op, inpart):
1850 def handleobsmarker(op, inpart):
1853 """add a stream of obsmarkers to the repo"""
1851 """add a stream of obsmarkers to the repo"""
1854 tr = op.gettransaction()
1852 tr = op.gettransaction()
1855 markerdata = inpart.read()
1853 markerdata = inpart.read()
1856 if op.ui.config('experimental', 'obsmarkers-exchange-debug'):
1854 if op.ui.config('experimental', 'obsmarkers-exchange-debug'):
1857 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1855 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1858 % len(markerdata))
1856 % len(markerdata))
1859 # The mergemarkers call will crash if marker creation is not enabled.
1857 # The mergemarkers call will crash if marker creation is not enabled.
1860 # we want to avoid this if the part is advisory.
1858 # we want to avoid this if the part is advisory.
1861 if not inpart.mandatory and op.repo.obsstore.readonly:
1859 if not inpart.mandatory and op.repo.obsstore.readonly:
1862 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1860 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1863 return
1861 return
1864 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1862 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1865 op.repo.invalidatevolatilesets()
1863 op.repo.invalidatevolatilesets()
1866 if new:
1864 if new:
1867 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1865 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1868 op.records.add('obsmarkers', {'new': new})
1866 op.records.add('obsmarkers', {'new': new})
1869 if op.reply is not None:
1867 if op.reply is not None:
1870 rpart = op.reply.newpart('reply:obsmarkers')
1868 rpart = op.reply.newpart('reply:obsmarkers')
1871 rpart.addparam(
1869 rpart.addparam(
1872 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1870 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1873 rpart.addparam('new', '%i' % new, mandatory=False)
1871 rpart.addparam('new', '%i' % new, mandatory=False)
1874
1872
1875
1873
1876 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1874 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1877 def handleobsmarkerreply(op, inpart):
1875 def handleobsmarkerreply(op, inpart):
1878 """retrieve the result of a pushkey request"""
1876 """retrieve the result of a pushkey request"""
1879 ret = int(inpart.params['new'])
1877 ret = int(inpart.params['new'])
1880 partid = int(inpart.params['in-reply-to'])
1878 partid = int(inpart.params['in-reply-to'])
1881 op.records.add('obsmarkers', {'new': ret}, partid)
1879 op.records.add('obsmarkers', {'new': ret}, partid)
1882
1880
1883 @parthandler('hgtagsfnodes')
1881 @parthandler('hgtagsfnodes')
1884 def handlehgtagsfnodes(op, inpart):
1882 def handlehgtagsfnodes(op, inpart):
1885 """Applies .hgtags fnodes cache entries to the local repo.
1883 """Applies .hgtags fnodes cache entries to the local repo.
1886
1884
1887 Payload is pairs of 20 byte changeset nodes and filenodes.
1885 Payload is pairs of 20 byte changeset nodes and filenodes.
1888 """
1886 """
1889 # Grab the transaction so we ensure that we have the lock at this point.
1887 # Grab the transaction so we ensure that we have the lock at this point.
1890 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1888 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1891 op.gettransaction()
1889 op.gettransaction()
1892 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1890 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1893
1891
1894 count = 0
1892 count = 0
1895 while True:
1893 while True:
1896 node = inpart.read(20)
1894 node = inpart.read(20)
1897 fnode = inpart.read(20)
1895 fnode = inpart.read(20)
1898 if len(node) < 20 or len(fnode) < 20:
1896 if len(node) < 20 or len(fnode) < 20:
1899 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1897 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1900 break
1898 break
1901 cache.setfnode(node, fnode)
1899 cache.setfnode(node, fnode)
1902 count += 1
1900 count += 1
1903
1901
1904 cache.write()
1902 cache.write()
1905 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
1903 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
1906
1904
1907 @parthandler('pushvars')
1905 @parthandler('pushvars')
1908 def bundle2getvars(op, part):
1906 def bundle2getvars(op, part):
1909 '''unbundle a bundle2 containing shellvars on the server'''
1907 '''unbundle a bundle2 containing shellvars on the server'''
1910 # An option to disable unbundling on server-side for security reasons
1908 # An option to disable unbundling on server-side for security reasons
1911 if op.ui.configbool('push', 'pushvars.server'):
1909 if op.ui.configbool('push', 'pushvars.server'):
1912 hookargs = {}
1910 hookargs = {}
1913 for key, value in part.advisoryparams:
1911 for key, value in part.advisoryparams:
1914 key = key.upper()
1912 key = key.upper()
1915 # We want pushed variables to have USERVAR_ prepended so we know
1913 # We want pushed variables to have USERVAR_ prepended so we know
1916 # they came from the --pushvar flag.
1914 # they came from the --pushvar flag.
1917 key = "USERVAR_" + key
1915 key = "USERVAR_" + key
1918 hookargs[key] = value
1916 hookargs[key] = value
1919 op.addhookargs(hookargs)
1917 op.addhookargs(hookargs)
General Comments 0
You need to be logged in to leave comments. Login now