##// END OF EJS Templates
bundle2: move processpart stream maintenance into part iterator...
Durham Goode -
r34259:e71890f2 default
parent child Browse files
Show More
@@ -1,1921 +1,1932 b''
1 # bundle2.py - generic container format to transmit arbitrary data.
1 # bundle2.py - generic container format to transmit arbitrary data.
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7 """Handling of the new bundle2 format
7 """Handling of the new bundle2 format
8
8
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 payloads in an application agnostic way. It consist in a sequence of "parts"
10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 that will be handed to and processed by the application layer.
11 that will be handed to and processed by the application layer.
12
12
13
13
14 General format architecture
14 General format architecture
15 ===========================
15 ===========================
16
16
17 The format is architectured as follow
17 The format is architectured as follow
18
18
19 - magic string
19 - magic string
20 - stream level parameters
20 - stream level parameters
21 - payload parts (any number)
21 - payload parts (any number)
22 - end of stream marker.
22 - end of stream marker.
23
23
24 the Binary format
24 the Binary format
25 ============================
25 ============================
26
26
27 All numbers are unsigned and big-endian.
27 All numbers are unsigned and big-endian.
28
28
29 stream level parameters
29 stream level parameters
30 ------------------------
30 ------------------------
31
31
32 Binary format is as follow
32 Binary format is as follow
33
33
34 :params size: int32
34 :params size: int32
35
35
36 The total number of Bytes used by the parameters
36 The total number of Bytes used by the parameters
37
37
38 :params value: arbitrary number of Bytes
38 :params value: arbitrary number of Bytes
39
39
40 A blob of `params size` containing the serialized version of all stream level
40 A blob of `params size` containing the serialized version of all stream level
41 parameters.
41 parameters.
42
42
43 The blob contains a space separated list of parameters. Parameters with value
43 The blob contains a space separated list of parameters. Parameters with value
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45
45
46 Empty name are obviously forbidden.
46 Empty name are obviously forbidden.
47
47
48 Name MUST start with a letter. If this first letter is lower case, the
48 Name MUST start with a letter. If this first letter is lower case, the
49 parameter is advisory and can be safely ignored. However when the first
49 parameter is advisory and can be safely ignored. However when the first
50 letter is capital, the parameter is mandatory and the bundling process MUST
50 letter is capital, the parameter is mandatory and the bundling process MUST
51 stop if he is not able to proceed it.
51 stop if he is not able to proceed it.
52
52
53 Stream parameters use a simple textual format for two main reasons:
53 Stream parameters use a simple textual format for two main reasons:
54
54
55 - Stream level parameters should remain simple and we want to discourage any
55 - Stream level parameters should remain simple and we want to discourage any
56 crazy usage.
56 crazy usage.
57 - Textual data allow easy human inspection of a bundle2 header in case of
57 - Textual data allow easy human inspection of a bundle2 header in case of
58 troubles.
58 troubles.
59
59
60 Any Applicative level options MUST go into a bundle2 part instead.
60 Any Applicative level options MUST go into a bundle2 part instead.
61
61
62 Payload part
62 Payload part
63 ------------------------
63 ------------------------
64
64
65 Binary format is as follow
65 Binary format is as follow
66
66
67 :header size: int32
67 :header size: int32
68
68
69 The total number of Bytes used by the part header. When the header is empty
69 The total number of Bytes used by the part header. When the header is empty
70 (size = 0) this is interpreted as the end of stream marker.
70 (size = 0) this is interpreted as the end of stream marker.
71
71
72 :header:
72 :header:
73
73
74 The header defines how to interpret the part. It contains two piece of
74 The header defines how to interpret the part. It contains two piece of
75 data: the part type, and the part parameters.
75 data: the part type, and the part parameters.
76
76
77 The part type is used to route an application level handler, that can
77 The part type is used to route an application level handler, that can
78 interpret payload.
78 interpret payload.
79
79
80 Part parameters are passed to the application level handler. They are
80 Part parameters are passed to the application level handler. They are
81 meant to convey information that will help the application level object to
81 meant to convey information that will help the application level object to
82 interpret the part payload.
82 interpret the part payload.
83
83
84 The binary format of the header is has follow
84 The binary format of the header is has follow
85
85
86 :typesize: (one byte)
86 :typesize: (one byte)
87
87
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89
89
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 to this part.
91 to this part.
92
92
93 :parameters:
93 :parameters:
94
94
95 Part's parameter may have arbitrary content, the binary structure is::
95 Part's parameter may have arbitrary content, the binary structure is::
96
96
97 <mandatory-count><advisory-count><param-sizes><param-data>
97 <mandatory-count><advisory-count><param-sizes><param-data>
98
98
99 :mandatory-count: 1 byte, number of mandatory parameters
99 :mandatory-count: 1 byte, number of mandatory parameters
100
100
101 :advisory-count: 1 byte, number of advisory parameters
101 :advisory-count: 1 byte, number of advisory parameters
102
102
103 :param-sizes:
103 :param-sizes:
104
104
105 N couple of bytes, where N is the total number of parameters. Each
105 N couple of bytes, where N is the total number of parameters. Each
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107
107
108 :param-data:
108 :param-data:
109
109
110 A blob of bytes from which each parameter key and value can be
110 A blob of bytes from which each parameter key and value can be
111 retrieved using the list of size couples stored in the previous
111 retrieved using the list of size couples stored in the previous
112 field.
112 field.
113
113
114 Mandatory parameters comes first, then the advisory ones.
114 Mandatory parameters comes first, then the advisory ones.
115
115
116 Each parameter's key MUST be unique within the part.
116 Each parameter's key MUST be unique within the part.
117
117
118 :payload:
118 :payload:
119
119
120 payload is a series of `<chunksize><chunkdata>`.
120 payload is a series of `<chunksize><chunkdata>`.
121
121
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124
124
125 The current implementation always produces either zero or one chunk.
125 The current implementation always produces either zero or one chunk.
126 This is an implementation limitation that will ultimately be lifted.
126 This is an implementation limitation that will ultimately be lifted.
127
127
128 `chunksize` can be negative to trigger special case processing. No such
128 `chunksize` can be negative to trigger special case processing. No such
129 processing is in place yet.
129 processing is in place yet.
130
130
131 Bundle processing
131 Bundle processing
132 ============================
132 ============================
133
133
134 Each part is processed in order using a "part handler". Handler are registered
134 Each part is processed in order using a "part handler". Handler are registered
135 for a certain part type.
135 for a certain part type.
136
136
137 The matching of a part to its handler is case insensitive. The case of the
137 The matching of a part to its handler is case insensitive. The case of the
138 part type is used to know if a part is mandatory or advisory. If the Part type
138 part type is used to know if a part is mandatory or advisory. If the Part type
139 contains any uppercase char it is considered mandatory. When no handler is
139 contains any uppercase char it is considered mandatory. When no handler is
140 known for a Mandatory part, the process is aborted and an exception is raised.
140 known for a Mandatory part, the process is aborted and an exception is raised.
141 If the part is advisory and no handler is known, the part is ignored. When the
141 If the part is advisory and no handler is known, the part is ignored. When the
142 process is aborted, the full bundle is still read from the stream to keep the
142 process is aborted, the full bundle is still read from the stream to keep the
143 channel usable. But none of the part read from an abort are processed. In the
143 channel usable. But none of the part read from an abort are processed. In the
144 future, dropping the stream may become an option for channel we do not care to
144 future, dropping the stream may become an option for channel we do not care to
145 preserve.
145 preserve.
146 """
146 """
147
147
148 from __future__ import absolute_import, division
148 from __future__ import absolute_import, division
149
149
150 import errno
150 import errno
151 import re
151 import re
152 import string
152 import string
153 import struct
153 import struct
154 import sys
154 import sys
155
155
156 from .i18n import _
156 from .i18n import _
157 from . import (
157 from . import (
158 changegroup,
158 changegroup,
159 error,
159 error,
160 obsolete,
160 obsolete,
161 phases,
161 phases,
162 pushkey,
162 pushkey,
163 pycompat,
163 pycompat,
164 tags,
164 tags,
165 url,
165 url,
166 util,
166 util,
167 )
167 )
168
168
169 urlerr = util.urlerr
169 urlerr = util.urlerr
170 urlreq = util.urlreq
170 urlreq = util.urlreq
171
171
172 _pack = struct.pack
172 _pack = struct.pack
173 _unpack = struct.unpack
173 _unpack = struct.unpack
174
174
175 _fstreamparamsize = '>i'
175 _fstreamparamsize = '>i'
176 _fpartheadersize = '>i'
176 _fpartheadersize = '>i'
177 _fparttypesize = '>B'
177 _fparttypesize = '>B'
178 _fpartid = '>I'
178 _fpartid = '>I'
179 _fpayloadsize = '>i'
179 _fpayloadsize = '>i'
180 _fpartparamcount = '>BB'
180 _fpartparamcount = '>BB'
181
181
182 _fphasesentry = '>i20s'
182 _fphasesentry = '>i20s'
183
183
184 preferedchunksize = 4096
184 preferedchunksize = 4096
185
185
186 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
186 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
187
187
188 def outdebug(ui, message):
188 def outdebug(ui, message):
189 """debug regarding output stream (bundling)"""
189 """debug regarding output stream (bundling)"""
190 if ui.configbool('devel', 'bundle2.debug'):
190 if ui.configbool('devel', 'bundle2.debug'):
191 ui.debug('bundle2-output: %s\n' % message)
191 ui.debug('bundle2-output: %s\n' % message)
192
192
193 def indebug(ui, message):
193 def indebug(ui, message):
194 """debug on input stream (unbundling)"""
194 """debug on input stream (unbundling)"""
195 if ui.configbool('devel', 'bundle2.debug'):
195 if ui.configbool('devel', 'bundle2.debug'):
196 ui.debug('bundle2-input: %s\n' % message)
196 ui.debug('bundle2-input: %s\n' % message)
197
197
198 def validateparttype(parttype):
198 def validateparttype(parttype):
199 """raise ValueError if a parttype contains invalid character"""
199 """raise ValueError if a parttype contains invalid character"""
200 if _parttypeforbidden.search(parttype):
200 if _parttypeforbidden.search(parttype):
201 raise ValueError(parttype)
201 raise ValueError(parttype)
202
202
203 def _makefpartparamsizes(nbparams):
203 def _makefpartparamsizes(nbparams):
204 """return a struct format to read part parameter sizes
204 """return a struct format to read part parameter sizes
205
205
206 The number parameters is variable so we need to build that format
206 The number parameters is variable so we need to build that format
207 dynamically.
207 dynamically.
208 """
208 """
209 return '>'+('BB'*nbparams)
209 return '>'+('BB'*nbparams)
210
210
211 parthandlermapping = {}
211 parthandlermapping = {}
212
212
213 def parthandler(parttype, params=()):
213 def parthandler(parttype, params=()):
214 """decorator that register a function as a bundle2 part handler
214 """decorator that register a function as a bundle2 part handler
215
215
216 eg::
216 eg::
217
217
218 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
218 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
219 def myparttypehandler(...):
219 def myparttypehandler(...):
220 '''process a part of type "my part".'''
220 '''process a part of type "my part".'''
221 ...
221 ...
222 """
222 """
223 validateparttype(parttype)
223 validateparttype(parttype)
224 def _decorator(func):
224 def _decorator(func):
225 lparttype = parttype.lower() # enforce lower case matching.
225 lparttype = parttype.lower() # enforce lower case matching.
226 assert lparttype not in parthandlermapping
226 assert lparttype not in parthandlermapping
227 parthandlermapping[lparttype] = func
227 parthandlermapping[lparttype] = func
228 func.params = frozenset(params)
228 func.params = frozenset(params)
229 return func
229 return func
230 return _decorator
230 return _decorator
231
231
232 class unbundlerecords(object):
232 class unbundlerecords(object):
233 """keep record of what happens during and unbundle
233 """keep record of what happens during and unbundle
234
234
235 New records are added using `records.add('cat', obj)`. Where 'cat' is a
235 New records are added using `records.add('cat', obj)`. Where 'cat' is a
236 category of record and obj is an arbitrary object.
236 category of record and obj is an arbitrary object.
237
237
238 `records['cat']` will return all entries of this category 'cat'.
238 `records['cat']` will return all entries of this category 'cat'.
239
239
240 Iterating on the object itself will yield `('category', obj)` tuples
240 Iterating on the object itself will yield `('category', obj)` tuples
241 for all entries.
241 for all entries.
242
242
243 All iterations happens in chronological order.
243 All iterations happens in chronological order.
244 """
244 """
245
245
246 def __init__(self):
246 def __init__(self):
247 self._categories = {}
247 self._categories = {}
248 self._sequences = []
248 self._sequences = []
249 self._replies = {}
249 self._replies = {}
250
250
251 def add(self, category, entry, inreplyto=None):
251 def add(self, category, entry, inreplyto=None):
252 """add a new record of a given category.
252 """add a new record of a given category.
253
253
254 The entry can then be retrieved in the list returned by
254 The entry can then be retrieved in the list returned by
255 self['category']."""
255 self['category']."""
256 self._categories.setdefault(category, []).append(entry)
256 self._categories.setdefault(category, []).append(entry)
257 self._sequences.append((category, entry))
257 self._sequences.append((category, entry))
258 if inreplyto is not None:
258 if inreplyto is not None:
259 self.getreplies(inreplyto).add(category, entry)
259 self.getreplies(inreplyto).add(category, entry)
260
260
261 def getreplies(self, partid):
261 def getreplies(self, partid):
262 """get the records that are replies to a specific part"""
262 """get the records that are replies to a specific part"""
263 return self._replies.setdefault(partid, unbundlerecords())
263 return self._replies.setdefault(partid, unbundlerecords())
264
264
265 def __getitem__(self, cat):
265 def __getitem__(self, cat):
266 return tuple(self._categories.get(cat, ()))
266 return tuple(self._categories.get(cat, ()))
267
267
268 def __iter__(self):
268 def __iter__(self):
269 return iter(self._sequences)
269 return iter(self._sequences)
270
270
271 def __len__(self):
271 def __len__(self):
272 return len(self._sequences)
272 return len(self._sequences)
273
273
274 def __nonzero__(self):
274 def __nonzero__(self):
275 return bool(self._sequences)
275 return bool(self._sequences)
276
276
277 __bool__ = __nonzero__
277 __bool__ = __nonzero__
278
278
279 class bundleoperation(object):
279 class bundleoperation(object):
280 """an object that represents a single bundling process
280 """an object that represents a single bundling process
281
281
282 Its purpose is to carry unbundle-related objects and states.
282 Its purpose is to carry unbundle-related objects and states.
283
283
284 A new object should be created at the beginning of each bundle processing.
284 A new object should be created at the beginning of each bundle processing.
285 The object is to be returned by the processing function.
285 The object is to be returned by the processing function.
286
286
287 The object has very little content now it will ultimately contain:
287 The object has very little content now it will ultimately contain:
288 * an access to the repo the bundle is applied to,
288 * an access to the repo the bundle is applied to,
289 * a ui object,
289 * a ui object,
290 * a way to retrieve a transaction to add changes to the repo,
290 * a way to retrieve a transaction to add changes to the repo,
291 * a way to record the result of processing each part,
291 * a way to record the result of processing each part,
292 * a way to construct a bundle response when applicable.
292 * a way to construct a bundle response when applicable.
293 """
293 """
294
294
295 def __init__(self, repo, transactiongetter, captureoutput=True):
295 def __init__(self, repo, transactiongetter, captureoutput=True):
296 self.repo = repo
296 self.repo = repo
297 self.ui = repo.ui
297 self.ui = repo.ui
298 self.records = unbundlerecords()
298 self.records = unbundlerecords()
299 self.reply = None
299 self.reply = None
300 self.captureoutput = captureoutput
300 self.captureoutput = captureoutput
301 self.hookargs = {}
301 self.hookargs = {}
302 self._gettransaction = transactiongetter
302 self._gettransaction = transactiongetter
303
303
304 def gettransaction(self):
304 def gettransaction(self):
305 transaction = self._gettransaction()
305 transaction = self._gettransaction()
306
306
307 if self.hookargs:
307 if self.hookargs:
308 # the ones added to the transaction supercede those added
308 # the ones added to the transaction supercede those added
309 # to the operation.
309 # to the operation.
310 self.hookargs.update(transaction.hookargs)
310 self.hookargs.update(transaction.hookargs)
311 transaction.hookargs = self.hookargs
311 transaction.hookargs = self.hookargs
312
312
313 # mark the hookargs as flushed. further attempts to add to
313 # mark the hookargs as flushed. further attempts to add to
314 # hookargs will result in an abort.
314 # hookargs will result in an abort.
315 self.hookargs = None
315 self.hookargs = None
316
316
317 return transaction
317 return transaction
318
318
319 def addhookargs(self, hookargs):
319 def addhookargs(self, hookargs):
320 if self.hookargs is None:
320 if self.hookargs is None:
321 raise error.ProgrammingError('attempted to add hookargs to '
321 raise error.ProgrammingError('attempted to add hookargs to '
322 'operation after transaction started')
322 'operation after transaction started')
323 self.hookargs.update(hookargs)
323 self.hookargs.update(hookargs)
324
324
325 class TransactionUnavailable(RuntimeError):
325 class TransactionUnavailable(RuntimeError):
326 pass
326 pass
327
327
328 def _notransaction():
328 def _notransaction():
329 """default method to get a transaction while processing a bundle
329 """default method to get a transaction while processing a bundle
330
330
331 Raise an exception to highlight the fact that no transaction was expected
331 Raise an exception to highlight the fact that no transaction was expected
332 to be created"""
332 to be created"""
333 raise TransactionUnavailable()
333 raise TransactionUnavailable()
334
334
335 def applybundle(repo, unbundler, tr, source=None, url=None, **kwargs):
335 def applybundle(repo, unbundler, tr, source=None, url=None, **kwargs):
336 # transform me into unbundler.apply() as soon as the freeze is lifted
336 # transform me into unbundler.apply() as soon as the freeze is lifted
337 if isinstance(unbundler, unbundle20):
337 if isinstance(unbundler, unbundle20):
338 tr.hookargs['bundle2'] = '1'
338 tr.hookargs['bundle2'] = '1'
339 if source is not None and 'source' not in tr.hookargs:
339 if source is not None and 'source' not in tr.hookargs:
340 tr.hookargs['source'] = source
340 tr.hookargs['source'] = source
341 if url is not None and 'url' not in tr.hookargs:
341 if url is not None and 'url' not in tr.hookargs:
342 tr.hookargs['url'] = url
342 tr.hookargs['url'] = url
343 return processbundle(repo, unbundler, lambda: tr)
343 return processbundle(repo, unbundler, lambda: tr)
344 else:
344 else:
345 # the transactiongetter won't be used, but we might as well set it
345 # the transactiongetter won't be used, but we might as well set it
346 op = bundleoperation(repo, lambda: tr)
346 op = bundleoperation(repo, lambda: tr)
347 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
347 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
348 return op
348 return op
349
349
350 class partiterator(object):
350 class partiterator(object):
351 def __init__(self, repo, op, unbundler):
351 def __init__(self, repo, op, unbundler):
352 self.repo = repo
352 self.repo = repo
353 self.op = op
353 self.op = op
354 self.unbundler = unbundler
354 self.unbundler = unbundler
355 self.iterator = None
355 self.iterator = None
356 self.count = 0
356 self.count = 0
357 self.current = None
357
358
358 def __enter__(self):
359 def __enter__(self):
359 def func():
360 def func():
360 itr = enumerate(self.unbundler.iterparts())
361 itr = enumerate(self.unbundler.iterparts())
361 for count, p in itr:
362 for count, p in itr:
362 self.count = count
363 self.count = count
364 self.current = p
363 yield p
365 yield p
366 p.seek(0, 2)
367 self.current = None
364 self.iterator = func()
368 self.iterator = func()
365 return self.iterator
369 return self.iterator
366
370
367 def __exit__(self, type, exc, tb):
371 def __exit__(self, type, exc, tb):
368 if not self.iterator:
372 if not self.iterator:
369 return
373 return
370
374
371 if exc:
375 if exc:
376 # If exiting or interrupted, do not attempt to seek the stream in
377 # the finally block below. This makes abort faster.
378 if (self.current and
379 not isinstance(exc, (SystemExit, KeyboardInterrupt))):
380 # consume the part content to not corrupt the stream.
381 self.current.seek(0, 2)
382
372 # Any exceptions seeking to the end of the bundle at this point are
383 # Any exceptions seeking to the end of the bundle at this point are
373 # almost certainly related to the underlying stream being bad.
384 # almost certainly related to the underlying stream being bad.
374 # And, chances are that the exception we're handling is related to
385 # And, chances are that the exception we're handling is related to
375 # getting in that bad state. So, we swallow the seeking error and
386 # getting in that bad state. So, we swallow the seeking error and
376 # re-raise the original error.
387 # re-raise the original error.
377 seekerror = False
388 seekerror = False
378 try:
389 try:
379 for part in self.iterator:
390 for part in self.iterator:
380 # consume the bundle content
391 # consume the bundle content
381 part.seek(0, 2)
392 part.seek(0, 2)
382 except Exception:
393 except Exception:
383 seekerror = True
394 seekerror = True
384
395
385 # Small hack to let caller code distinguish exceptions from bundle2
396 # Small hack to let caller code distinguish exceptions from bundle2
386 # processing from processing the old format. This is mostly needed
397 # processing from processing the old format. This is mostly needed
387 # to handle different return codes to unbundle according to the type
398 # to handle different return codes to unbundle according to the type
388 # of bundle. We should probably clean up or drop this return code
399 # of bundle. We should probably clean up or drop this return code
389 # craziness in a future version.
400 # craziness in a future version.
390 exc.duringunbundle2 = True
401 exc.duringunbundle2 = True
391 salvaged = []
402 salvaged = []
392 replycaps = None
403 replycaps = None
393 if self.op.reply is not None:
404 if self.op.reply is not None:
394 salvaged = self.op.reply.salvageoutput()
405 salvaged = self.op.reply.salvageoutput()
395 replycaps = self.op.reply.capabilities
406 replycaps = self.op.reply.capabilities
396 exc._replycaps = replycaps
407 exc._replycaps = replycaps
397 exc._bundle2salvagedoutput = salvaged
408 exc._bundle2salvagedoutput = salvaged
398
409
399 # Re-raising from a variable loses the original stack. So only use
410 # Re-raising from a variable loses the original stack. So only use
400 # that form if we need to.
411 # that form if we need to.
401 if seekerror:
412 if seekerror:
402 raise exc
413 raise exc
403
414
404 self.repo.ui.debug('bundle2-input-bundle: %i parts total\n' %
415 self.repo.ui.debug('bundle2-input-bundle: %i parts total\n' %
405 self.count)
416 self.count)
406
417
407 def processbundle(repo, unbundler, transactiongetter=None, op=None):
418 def processbundle(repo, unbundler, transactiongetter=None, op=None):
408 """This function process a bundle, apply effect to/from a repo
419 """This function process a bundle, apply effect to/from a repo
409
420
410 It iterates over each part then searches for and uses the proper handling
421 It iterates over each part then searches for and uses the proper handling
411 code to process the part. Parts are processed in order.
422 code to process the part. Parts are processed in order.
412
423
413 Unknown Mandatory part will abort the process.
424 Unknown Mandatory part will abort the process.
414
425
415 It is temporarily possible to provide a prebuilt bundleoperation to the
426 It is temporarily possible to provide a prebuilt bundleoperation to the
416 function. This is used to ensure output is properly propagated in case of
427 function. This is used to ensure output is properly propagated in case of
417 an error during the unbundling. This output capturing part will likely be
428 an error during the unbundling. This output capturing part will likely be
418 reworked and this ability will probably go away in the process.
429 reworked and this ability will probably go away in the process.
419 """
430 """
420 if op is None:
431 if op is None:
421 if transactiongetter is None:
432 if transactiongetter is None:
422 transactiongetter = _notransaction
433 transactiongetter = _notransaction
423 op = bundleoperation(repo, transactiongetter)
434 op = bundleoperation(repo, transactiongetter)
424 # todo:
435 # todo:
425 # - replace this is a init function soon.
436 # - replace this is a init function soon.
426 # - exception catching
437 # - exception catching
427 unbundler.params
438 unbundler.params
428 if repo.ui.debugflag:
439 if repo.ui.debugflag:
429 msg = ['bundle2-input-bundle:']
440 msg = ['bundle2-input-bundle:']
430 if unbundler.params:
441 if unbundler.params:
431 msg.append(' %i params' % len(unbundler.params))
442 msg.append(' %i params' % len(unbundler.params))
432 if op._gettransaction is None or op._gettransaction is _notransaction:
443 if op._gettransaction is None or op._gettransaction is _notransaction:
433 msg.append(' no-transaction')
444 msg.append(' no-transaction')
434 else:
445 else:
435 msg.append(' with-transaction')
446 msg.append(' with-transaction')
436 msg.append('\n')
447 msg.append('\n')
437 repo.ui.debug(''.join(msg))
448 repo.ui.debug(''.join(msg))
438
449
439 with partiterator(repo, op, unbundler) as parts:
450 with partiterator(repo, op, unbundler) as parts:
440 for part in parts:
451 for part in parts:
441 _processpart(op, part)
452 _processpart(op, part)
442
453
443 return op
454 return op
444
455
445 def _processchangegroup(op, cg, tr, source, url, **kwargs):
456 def _processchangegroup(op, cg, tr, source, url, **kwargs):
446 ret = cg.apply(op.repo, tr, source, url, **kwargs)
457 ret = cg.apply(op.repo, tr, source, url, **kwargs)
447 op.records.add('changegroup', {
458 op.records.add('changegroup', {
448 'return': ret,
459 'return': ret,
449 })
460 })
450 return ret
461 return ret
451
462
452 def _processpart(op, part):
463 def _processpart(op, part):
453 """process a single part from a bundle
464 """process a single part from a bundle
454
465
455 The part is guaranteed to have been fully consumed when the function exits
466 The part is guaranteed to have been fully consumed when the function exits
456 (even if an exception is raised)."""
467 (even if an exception is raised)."""
457 status = 'unknown' # used by debug output
468 status = 'unknown' # used by debug output
458 hardabort = False
459 try:
469 try:
460 try:
470 try:
461 handler = parthandlermapping.get(part.type)
471 handler = parthandlermapping.get(part.type)
462 if handler is None:
472 if handler is None:
463 status = 'unsupported-type'
473 status = 'unsupported-type'
464 raise error.BundleUnknownFeatureError(parttype=part.type)
474 raise error.BundleUnknownFeatureError(parttype=part.type)
465 indebug(op.ui, 'found a handler for part %r' % part.type)
475 indebug(op.ui, 'found a handler for part %r' % part.type)
466 unknownparams = part.mandatorykeys - handler.params
476 unknownparams = part.mandatorykeys - handler.params
467 if unknownparams:
477 if unknownparams:
468 unknownparams = list(unknownparams)
478 unknownparams = list(unknownparams)
469 unknownparams.sort()
479 unknownparams.sort()
470 status = 'unsupported-params (%s)' % unknownparams
480 status = 'unsupported-params (%s)' % unknownparams
471 raise error.BundleUnknownFeatureError(parttype=part.type,
481 raise error.BundleUnknownFeatureError(parttype=part.type,
472 params=unknownparams)
482 params=unknownparams)
473 status = 'supported'
483 status = 'supported'
474 except error.BundleUnknownFeatureError as exc:
484 except error.BundleUnknownFeatureError as exc:
475 if part.mandatory: # mandatory parts
485 if part.mandatory: # mandatory parts
476 raise
486 raise
477 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
487 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
478 return # skip to part processing
488 return # skip to part processing
479 finally:
489 finally:
480 if op.ui.debugflag:
490 if op.ui.debugflag:
481 msg = ['bundle2-input-part: "%s"' % part.type]
491 msg = ['bundle2-input-part: "%s"' % part.type]
482 if not part.mandatory:
492 if not part.mandatory:
483 msg.append(' (advisory)')
493 msg.append(' (advisory)')
484 nbmp = len(part.mandatorykeys)
494 nbmp = len(part.mandatorykeys)
485 nbap = len(part.params) - nbmp
495 nbap = len(part.params) - nbmp
486 if nbmp or nbap:
496 if nbmp or nbap:
487 msg.append(' (params:')
497 msg.append(' (params:')
488 if nbmp:
498 if nbmp:
489 msg.append(' %i mandatory' % nbmp)
499 msg.append(' %i mandatory' % nbmp)
490 if nbap:
500 if nbap:
491 msg.append(' %i advisory' % nbmp)
501 msg.append(' %i advisory' % nbmp)
492 msg.append(')')
502 msg.append(')')
493 msg.append(' %s\n' % status)
503 msg.append(' %s\n' % status)
494 op.ui.debug(''.join(msg))
504 op.ui.debug(''.join(msg))
495
505
496 # handler is called outside the above try block so that we don't
506 # handler is called outside the above try block so that we don't
497 # risk catching KeyErrors from anything other than the
507 # risk catching KeyErrors from anything other than the
498 # parthandlermapping lookup (any KeyError raised by handler()
508 # parthandlermapping lookup (any KeyError raised by handler()
499 # itself represents a defect of a different variety).
509 # itself represents a defect of a different variety).
500 output = None
510 output = None
501 if op.captureoutput and op.reply is not None:
511 if op.captureoutput and op.reply is not None:
502 op.ui.pushbuffer(error=True, subproc=True)
512 op.ui.pushbuffer(error=True, subproc=True)
503 output = ''
513 output = ''
504 try:
514 try:
505 handler(op, part)
515 handler(op, part)
506 finally:
516 finally:
507 if output is not None:
517 if output is not None:
508 output = op.ui.popbuffer()
518 output = op.ui.popbuffer()
509 if output:
519 if output:
510 outpart = op.reply.newpart('output', data=output,
520 outpart = op.reply.newpart('output', data=output,
511 mandatory=False)
521 mandatory=False)
512 outpart.addparam(
522 outpart.addparam(
513 'in-reply-to', pycompat.bytestr(part.id), mandatory=False)
523 'in-reply-to', pycompat.bytestr(part.id), mandatory=False)
514 # If exiting or interrupted, do not attempt to seek the stream in the
515 # finally block below. This makes abort faster.
516 except (SystemExit, KeyboardInterrupt):
517 hardabort = True
518 raise
519 finally:
524 finally:
520 # consume the part content to not corrupt the stream.
525 pass
521 if not hardabort:
522 part.seek(0, 2)
523
526
524
527
525 def decodecaps(blob):
528 def decodecaps(blob):
526 """decode a bundle2 caps bytes blob into a dictionary
529 """decode a bundle2 caps bytes blob into a dictionary
527
530
528 The blob is a list of capabilities (one per line)
531 The blob is a list of capabilities (one per line)
529 Capabilities may have values using a line of the form::
532 Capabilities may have values using a line of the form::
530
533
531 capability=value1,value2,value3
534 capability=value1,value2,value3
532
535
533 The values are always a list."""
536 The values are always a list."""
534 caps = {}
537 caps = {}
535 for line in blob.splitlines():
538 for line in blob.splitlines():
536 if not line:
539 if not line:
537 continue
540 continue
538 if '=' not in line:
541 if '=' not in line:
539 key, vals = line, ()
542 key, vals = line, ()
540 else:
543 else:
541 key, vals = line.split('=', 1)
544 key, vals = line.split('=', 1)
542 vals = vals.split(',')
545 vals = vals.split(',')
543 key = urlreq.unquote(key)
546 key = urlreq.unquote(key)
544 vals = [urlreq.unquote(v) for v in vals]
547 vals = [urlreq.unquote(v) for v in vals]
545 caps[key] = vals
548 caps[key] = vals
546 return caps
549 return caps
547
550
548 def encodecaps(caps):
551 def encodecaps(caps):
549 """encode a bundle2 caps dictionary into a bytes blob"""
552 """encode a bundle2 caps dictionary into a bytes blob"""
550 chunks = []
553 chunks = []
551 for ca in sorted(caps):
554 for ca in sorted(caps):
552 vals = caps[ca]
555 vals = caps[ca]
553 ca = urlreq.quote(ca)
556 ca = urlreq.quote(ca)
554 vals = [urlreq.quote(v) for v in vals]
557 vals = [urlreq.quote(v) for v in vals]
555 if vals:
558 if vals:
556 ca = "%s=%s" % (ca, ','.join(vals))
559 ca = "%s=%s" % (ca, ','.join(vals))
557 chunks.append(ca)
560 chunks.append(ca)
558 return '\n'.join(chunks)
561 return '\n'.join(chunks)
559
562
560 bundletypes = {
563 bundletypes = {
561 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
564 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
562 # since the unification ssh accepts a header but there
565 # since the unification ssh accepts a header but there
563 # is no capability signaling it.
566 # is no capability signaling it.
564 "HG20": (), # special-cased below
567 "HG20": (), # special-cased below
565 "HG10UN": ("HG10UN", 'UN'),
568 "HG10UN": ("HG10UN", 'UN'),
566 "HG10BZ": ("HG10", 'BZ'),
569 "HG10BZ": ("HG10", 'BZ'),
567 "HG10GZ": ("HG10GZ", 'GZ'),
570 "HG10GZ": ("HG10GZ", 'GZ'),
568 }
571 }
569
572
570 # hgweb uses this list to communicate its preferred type
573 # hgweb uses this list to communicate its preferred type
571 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
574 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
572
575
573 class bundle20(object):
576 class bundle20(object):
574 """represent an outgoing bundle2 container
577 """represent an outgoing bundle2 container
575
578
576 Use the `addparam` method to add stream level parameter. and `newpart` to
579 Use the `addparam` method to add stream level parameter. and `newpart` to
577 populate it. Then call `getchunks` to retrieve all the binary chunks of
580 populate it. Then call `getchunks` to retrieve all the binary chunks of
578 data that compose the bundle2 container."""
581 data that compose the bundle2 container."""
579
582
580 _magicstring = 'HG20'
583 _magicstring = 'HG20'
581
584
582 def __init__(self, ui, capabilities=()):
585 def __init__(self, ui, capabilities=()):
583 self.ui = ui
586 self.ui = ui
584 self._params = []
587 self._params = []
585 self._parts = []
588 self._parts = []
586 self.capabilities = dict(capabilities)
589 self.capabilities = dict(capabilities)
587 self._compengine = util.compengines.forbundletype('UN')
590 self._compengine = util.compengines.forbundletype('UN')
588 self._compopts = None
591 self._compopts = None
589
592
590 def setcompression(self, alg, compopts=None):
593 def setcompression(self, alg, compopts=None):
591 """setup core part compression to <alg>"""
594 """setup core part compression to <alg>"""
592 if alg in (None, 'UN'):
595 if alg in (None, 'UN'):
593 return
596 return
594 assert not any(n.lower() == 'compression' for n, v in self._params)
597 assert not any(n.lower() == 'compression' for n, v in self._params)
595 self.addparam('Compression', alg)
598 self.addparam('Compression', alg)
596 self._compengine = util.compengines.forbundletype(alg)
599 self._compengine = util.compengines.forbundletype(alg)
597 self._compopts = compopts
600 self._compopts = compopts
598
601
599 @property
602 @property
600 def nbparts(self):
603 def nbparts(self):
601 """total number of parts added to the bundler"""
604 """total number of parts added to the bundler"""
602 return len(self._parts)
605 return len(self._parts)
603
606
604 # methods used to defines the bundle2 content
607 # methods used to defines the bundle2 content
605 def addparam(self, name, value=None):
608 def addparam(self, name, value=None):
606 """add a stream level parameter"""
609 """add a stream level parameter"""
607 if not name:
610 if not name:
608 raise ValueError('empty parameter name')
611 raise ValueError('empty parameter name')
609 if name[0] not in pycompat.bytestr(string.ascii_letters):
612 if name[0] not in pycompat.bytestr(string.ascii_letters):
610 raise ValueError('non letter first character: %r' % name)
613 raise ValueError('non letter first character: %r' % name)
611 self._params.append((name, value))
614 self._params.append((name, value))
612
615
613 def addpart(self, part):
616 def addpart(self, part):
614 """add a new part to the bundle2 container
617 """add a new part to the bundle2 container
615
618
616 Parts contains the actual applicative payload."""
619 Parts contains the actual applicative payload."""
617 assert part.id is None
620 assert part.id is None
618 part.id = len(self._parts) # very cheap counter
621 part.id = len(self._parts) # very cheap counter
619 self._parts.append(part)
622 self._parts.append(part)
620
623
621 def newpart(self, typeid, *args, **kwargs):
624 def newpart(self, typeid, *args, **kwargs):
622 """create a new part and add it to the containers
625 """create a new part and add it to the containers
623
626
624 As the part is directly added to the containers. For now, this means
627 As the part is directly added to the containers. For now, this means
625 that any failure to properly initialize the part after calling
628 that any failure to properly initialize the part after calling
626 ``newpart`` should result in a failure of the whole bundling process.
629 ``newpart`` should result in a failure of the whole bundling process.
627
630
628 You can still fall back to manually create and add if you need better
631 You can still fall back to manually create and add if you need better
629 control."""
632 control."""
630 part = bundlepart(typeid, *args, **kwargs)
633 part = bundlepart(typeid, *args, **kwargs)
631 self.addpart(part)
634 self.addpart(part)
632 return part
635 return part
633
636
634 # methods used to generate the bundle2 stream
637 # methods used to generate the bundle2 stream
635 def getchunks(self):
638 def getchunks(self):
636 if self.ui.debugflag:
639 if self.ui.debugflag:
637 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
640 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
638 if self._params:
641 if self._params:
639 msg.append(' (%i params)' % len(self._params))
642 msg.append(' (%i params)' % len(self._params))
640 msg.append(' %i parts total\n' % len(self._parts))
643 msg.append(' %i parts total\n' % len(self._parts))
641 self.ui.debug(''.join(msg))
644 self.ui.debug(''.join(msg))
642 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
645 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
643 yield self._magicstring
646 yield self._magicstring
644 param = self._paramchunk()
647 param = self._paramchunk()
645 outdebug(self.ui, 'bundle parameter: %s' % param)
648 outdebug(self.ui, 'bundle parameter: %s' % param)
646 yield _pack(_fstreamparamsize, len(param))
649 yield _pack(_fstreamparamsize, len(param))
647 if param:
650 if param:
648 yield param
651 yield param
649 for chunk in self._compengine.compressstream(self._getcorechunk(),
652 for chunk in self._compengine.compressstream(self._getcorechunk(),
650 self._compopts):
653 self._compopts):
651 yield chunk
654 yield chunk
652
655
653 def _paramchunk(self):
656 def _paramchunk(self):
654 """return a encoded version of all stream parameters"""
657 """return a encoded version of all stream parameters"""
655 blocks = []
658 blocks = []
656 for par, value in self._params:
659 for par, value in self._params:
657 par = urlreq.quote(par)
660 par = urlreq.quote(par)
658 if value is not None:
661 if value is not None:
659 value = urlreq.quote(value)
662 value = urlreq.quote(value)
660 par = '%s=%s' % (par, value)
663 par = '%s=%s' % (par, value)
661 blocks.append(par)
664 blocks.append(par)
662 return ' '.join(blocks)
665 return ' '.join(blocks)
663
666
664 def _getcorechunk(self):
667 def _getcorechunk(self):
665 """yield chunk for the core part of the bundle
668 """yield chunk for the core part of the bundle
666
669
667 (all but headers and parameters)"""
670 (all but headers and parameters)"""
668 outdebug(self.ui, 'start of parts')
671 outdebug(self.ui, 'start of parts')
669 for part in self._parts:
672 for part in self._parts:
670 outdebug(self.ui, 'bundle part: "%s"' % part.type)
673 outdebug(self.ui, 'bundle part: "%s"' % part.type)
671 for chunk in part.getchunks(ui=self.ui):
674 for chunk in part.getchunks(ui=self.ui):
672 yield chunk
675 yield chunk
673 outdebug(self.ui, 'end of bundle')
676 outdebug(self.ui, 'end of bundle')
674 yield _pack(_fpartheadersize, 0)
677 yield _pack(_fpartheadersize, 0)
675
678
676
679
677 def salvageoutput(self):
680 def salvageoutput(self):
678 """return a list with a copy of all output parts in the bundle
681 """return a list with a copy of all output parts in the bundle
679
682
680 This is meant to be used during error handling to make sure we preserve
683 This is meant to be used during error handling to make sure we preserve
681 server output"""
684 server output"""
682 salvaged = []
685 salvaged = []
683 for part in self._parts:
686 for part in self._parts:
684 if part.type.startswith('output'):
687 if part.type.startswith('output'):
685 salvaged.append(part.copy())
688 salvaged.append(part.copy())
686 return salvaged
689 return salvaged
687
690
688
691
689 class unpackermixin(object):
692 class unpackermixin(object):
690 """A mixin to extract bytes and struct data from a stream"""
693 """A mixin to extract bytes and struct data from a stream"""
691
694
692 def __init__(self, fp):
695 def __init__(self, fp):
693 self._fp = fp
696 self._fp = fp
694
697
695 def _unpack(self, format):
698 def _unpack(self, format):
696 """unpack this struct format from the stream
699 """unpack this struct format from the stream
697
700
698 This method is meant for internal usage by the bundle2 protocol only.
701 This method is meant for internal usage by the bundle2 protocol only.
699 They directly manipulate the low level stream including bundle2 level
702 They directly manipulate the low level stream including bundle2 level
700 instruction.
703 instruction.
701
704
702 Do not use it to implement higher-level logic or methods."""
705 Do not use it to implement higher-level logic or methods."""
703 data = self._readexact(struct.calcsize(format))
706 data = self._readexact(struct.calcsize(format))
704 return _unpack(format, data)
707 return _unpack(format, data)
705
708
706 def _readexact(self, size):
709 def _readexact(self, size):
707 """read exactly <size> bytes from the stream
710 """read exactly <size> bytes from the stream
708
711
709 This method is meant for internal usage by the bundle2 protocol only.
712 This method is meant for internal usage by the bundle2 protocol only.
710 They directly manipulate the low level stream including bundle2 level
713 They directly manipulate the low level stream including bundle2 level
711 instruction.
714 instruction.
712
715
713 Do not use it to implement higher-level logic or methods."""
716 Do not use it to implement higher-level logic or methods."""
714 return changegroup.readexactly(self._fp, size)
717 return changegroup.readexactly(self._fp, size)
715
718
716 def getunbundler(ui, fp, magicstring=None):
719 def getunbundler(ui, fp, magicstring=None):
717 """return a valid unbundler object for a given magicstring"""
720 """return a valid unbundler object for a given magicstring"""
718 if magicstring is None:
721 if magicstring is None:
719 magicstring = changegroup.readexactly(fp, 4)
722 magicstring = changegroup.readexactly(fp, 4)
720 magic, version = magicstring[0:2], magicstring[2:4]
723 magic, version = magicstring[0:2], magicstring[2:4]
721 if magic != 'HG':
724 if magic != 'HG':
722 ui.debug(
725 ui.debug(
723 "error: invalid magic: %r (version %r), should be 'HG'\n"
726 "error: invalid magic: %r (version %r), should be 'HG'\n"
724 % (magic, version))
727 % (magic, version))
725 raise error.Abort(_('not a Mercurial bundle'))
728 raise error.Abort(_('not a Mercurial bundle'))
726 unbundlerclass = formatmap.get(version)
729 unbundlerclass = formatmap.get(version)
727 if unbundlerclass is None:
730 if unbundlerclass is None:
728 raise error.Abort(_('unknown bundle version %s') % version)
731 raise error.Abort(_('unknown bundle version %s') % version)
729 unbundler = unbundlerclass(ui, fp)
732 unbundler = unbundlerclass(ui, fp)
730 indebug(ui, 'start processing of %s stream' % magicstring)
733 indebug(ui, 'start processing of %s stream' % magicstring)
731 return unbundler
734 return unbundler
732
735
733 class unbundle20(unpackermixin):
736 class unbundle20(unpackermixin):
734 """interpret a bundle2 stream
737 """interpret a bundle2 stream
735
738
736 This class is fed with a binary stream and yields parts through its
739 This class is fed with a binary stream and yields parts through its
737 `iterparts` methods."""
740 `iterparts` methods."""
738
741
739 _magicstring = 'HG20'
742 _magicstring = 'HG20'
740
743
741 def __init__(self, ui, fp):
744 def __init__(self, ui, fp):
742 """If header is specified, we do not read it out of the stream."""
745 """If header is specified, we do not read it out of the stream."""
743 self.ui = ui
746 self.ui = ui
744 self._compengine = util.compengines.forbundletype('UN')
747 self._compengine = util.compengines.forbundletype('UN')
745 self._compressed = None
748 self._compressed = None
746 super(unbundle20, self).__init__(fp)
749 super(unbundle20, self).__init__(fp)
747
750
748 @util.propertycache
751 @util.propertycache
749 def params(self):
752 def params(self):
750 """dictionary of stream level parameters"""
753 """dictionary of stream level parameters"""
751 indebug(self.ui, 'reading bundle2 stream parameters')
754 indebug(self.ui, 'reading bundle2 stream parameters')
752 params = {}
755 params = {}
753 paramssize = self._unpack(_fstreamparamsize)[0]
756 paramssize = self._unpack(_fstreamparamsize)[0]
754 if paramssize < 0:
757 if paramssize < 0:
755 raise error.BundleValueError('negative bundle param size: %i'
758 raise error.BundleValueError('negative bundle param size: %i'
756 % paramssize)
759 % paramssize)
757 if paramssize:
760 if paramssize:
758 params = self._readexact(paramssize)
761 params = self._readexact(paramssize)
759 params = self._processallparams(params)
762 params = self._processallparams(params)
760 return params
763 return params
761
764
762 def _processallparams(self, paramsblock):
765 def _processallparams(self, paramsblock):
763 """"""
766 """"""
764 params = util.sortdict()
767 params = util.sortdict()
765 for p in paramsblock.split(' '):
768 for p in paramsblock.split(' '):
766 p = p.split('=', 1)
769 p = p.split('=', 1)
767 p = [urlreq.unquote(i) for i in p]
770 p = [urlreq.unquote(i) for i in p]
768 if len(p) < 2:
771 if len(p) < 2:
769 p.append(None)
772 p.append(None)
770 self._processparam(*p)
773 self._processparam(*p)
771 params[p[0]] = p[1]
774 params[p[0]] = p[1]
772 return params
775 return params
773
776
774
777
775 def _processparam(self, name, value):
778 def _processparam(self, name, value):
776 """process a parameter, applying its effect if needed
779 """process a parameter, applying its effect if needed
777
780
778 Parameter starting with a lower case letter are advisory and will be
781 Parameter starting with a lower case letter are advisory and will be
779 ignored when unknown. Those starting with an upper case letter are
782 ignored when unknown. Those starting with an upper case letter are
780 mandatory and will this function will raise a KeyError when unknown.
783 mandatory and will this function will raise a KeyError when unknown.
781
784
782 Note: no option are currently supported. Any input will be either
785 Note: no option are currently supported. Any input will be either
783 ignored or failing.
786 ignored or failing.
784 """
787 """
785 if not name:
788 if not name:
786 raise ValueError('empty parameter name')
789 raise ValueError('empty parameter name')
787 if name[0] not in pycompat.bytestr(string.ascii_letters):
790 if name[0] not in pycompat.bytestr(string.ascii_letters):
788 raise ValueError('non letter first character: %r' % name)
791 raise ValueError('non letter first character: %r' % name)
789 try:
792 try:
790 handler = b2streamparamsmap[name.lower()]
793 handler = b2streamparamsmap[name.lower()]
791 except KeyError:
794 except KeyError:
792 if name[0].islower():
795 if name[0].islower():
793 indebug(self.ui, "ignoring unknown parameter %r" % name)
796 indebug(self.ui, "ignoring unknown parameter %r" % name)
794 else:
797 else:
795 raise error.BundleUnknownFeatureError(params=(name,))
798 raise error.BundleUnknownFeatureError(params=(name,))
796 else:
799 else:
797 handler(self, name, value)
800 handler(self, name, value)
798
801
799 def _forwardchunks(self):
802 def _forwardchunks(self):
800 """utility to transfer a bundle2 as binary
803 """utility to transfer a bundle2 as binary
801
804
802 This is made necessary by the fact the 'getbundle' command over 'ssh'
805 This is made necessary by the fact the 'getbundle' command over 'ssh'
803 have no way to know then the reply end, relying on the bundle to be
806 have no way to know then the reply end, relying on the bundle to be
804 interpreted to know its end. This is terrible and we are sorry, but we
807 interpreted to know its end. This is terrible and we are sorry, but we
805 needed to move forward to get general delta enabled.
808 needed to move forward to get general delta enabled.
806 """
809 """
807 yield self._magicstring
810 yield self._magicstring
808 assert 'params' not in vars(self)
811 assert 'params' not in vars(self)
809 paramssize = self._unpack(_fstreamparamsize)[0]
812 paramssize = self._unpack(_fstreamparamsize)[0]
810 if paramssize < 0:
813 if paramssize < 0:
811 raise error.BundleValueError('negative bundle param size: %i'
814 raise error.BundleValueError('negative bundle param size: %i'
812 % paramssize)
815 % paramssize)
813 yield _pack(_fstreamparamsize, paramssize)
816 yield _pack(_fstreamparamsize, paramssize)
814 if paramssize:
817 if paramssize:
815 params = self._readexact(paramssize)
818 params = self._readexact(paramssize)
816 self._processallparams(params)
819 self._processallparams(params)
817 yield params
820 yield params
818 assert self._compengine.bundletype == 'UN'
821 assert self._compengine.bundletype == 'UN'
819 # From there, payload might need to be decompressed
822 # From there, payload might need to be decompressed
820 self._fp = self._compengine.decompressorreader(self._fp)
823 self._fp = self._compengine.decompressorreader(self._fp)
821 emptycount = 0
824 emptycount = 0
822 while emptycount < 2:
825 while emptycount < 2:
823 # so we can brainlessly loop
826 # so we can brainlessly loop
824 assert _fpartheadersize == _fpayloadsize
827 assert _fpartheadersize == _fpayloadsize
825 size = self._unpack(_fpartheadersize)[0]
828 size = self._unpack(_fpartheadersize)[0]
826 yield _pack(_fpartheadersize, size)
829 yield _pack(_fpartheadersize, size)
827 if size:
830 if size:
828 emptycount = 0
831 emptycount = 0
829 else:
832 else:
830 emptycount += 1
833 emptycount += 1
831 continue
834 continue
832 if size == flaginterrupt:
835 if size == flaginterrupt:
833 continue
836 continue
834 elif size < 0:
837 elif size < 0:
835 raise error.BundleValueError('negative chunk size: %i')
838 raise error.BundleValueError('negative chunk size: %i')
836 yield self._readexact(size)
839 yield self._readexact(size)
837
840
838
841
839 def iterparts(self):
842 def iterparts(self):
840 """yield all parts contained in the stream"""
843 """yield all parts contained in the stream"""
841 # make sure param have been loaded
844 # make sure param have been loaded
842 self.params
845 self.params
843 # From there, payload need to be decompressed
846 # From there, payload need to be decompressed
844 self._fp = self._compengine.decompressorreader(self._fp)
847 self._fp = self._compengine.decompressorreader(self._fp)
845 indebug(self.ui, 'start extraction of bundle2 parts')
848 indebug(self.ui, 'start extraction of bundle2 parts')
846 headerblock = self._readpartheader()
849 headerblock = self._readpartheader()
847 while headerblock is not None:
850 while headerblock is not None:
848 part = unbundlepart(self.ui, headerblock, self._fp)
851 part = unbundlepart(self.ui, headerblock, self._fp)
849 yield part
852 yield part
850 # Seek to the end of the part to force it's consumption so the next
853 # Seek to the end of the part to force it's consumption so the next
851 # part can be read. But then seek back to the beginning so the
854 # part can be read. But then seek back to the beginning so the
852 # code consuming this generator has a part that starts at 0.
855 # code consuming this generator has a part that starts at 0.
853 part.seek(0, 2)
856 part.seek(0, 2)
854 part.seek(0)
857 part.seek(0)
855 headerblock = self._readpartheader()
858 headerblock = self._readpartheader()
856 indebug(self.ui, 'end of bundle2 stream')
859 indebug(self.ui, 'end of bundle2 stream')
857
860
858 def _readpartheader(self):
861 def _readpartheader(self):
859 """reads a part header size and return the bytes blob
862 """reads a part header size and return the bytes blob
860
863
861 returns None if empty"""
864 returns None if empty"""
862 headersize = self._unpack(_fpartheadersize)[0]
865 headersize = self._unpack(_fpartheadersize)[0]
863 if headersize < 0:
866 if headersize < 0:
864 raise error.BundleValueError('negative part header size: %i'
867 raise error.BundleValueError('negative part header size: %i'
865 % headersize)
868 % headersize)
866 indebug(self.ui, 'part header size: %i' % headersize)
869 indebug(self.ui, 'part header size: %i' % headersize)
867 if headersize:
870 if headersize:
868 return self._readexact(headersize)
871 return self._readexact(headersize)
869 return None
872 return None
870
873
871 def compressed(self):
874 def compressed(self):
872 self.params # load params
875 self.params # load params
873 return self._compressed
876 return self._compressed
874
877
875 def close(self):
878 def close(self):
876 """close underlying file"""
879 """close underlying file"""
877 if util.safehasattr(self._fp, 'close'):
880 if util.safehasattr(self._fp, 'close'):
878 return self._fp.close()
881 return self._fp.close()
879
882
880 formatmap = {'20': unbundle20}
883 formatmap = {'20': unbundle20}
881
884
882 b2streamparamsmap = {}
885 b2streamparamsmap = {}
883
886
884 def b2streamparamhandler(name):
887 def b2streamparamhandler(name):
885 """register a handler for a stream level parameter"""
888 """register a handler for a stream level parameter"""
886 def decorator(func):
889 def decorator(func):
887 assert name not in formatmap
890 assert name not in formatmap
888 b2streamparamsmap[name] = func
891 b2streamparamsmap[name] = func
889 return func
892 return func
890 return decorator
893 return decorator
891
894
892 @b2streamparamhandler('compression')
895 @b2streamparamhandler('compression')
893 def processcompression(unbundler, param, value):
896 def processcompression(unbundler, param, value):
894 """read compression parameter and install payload decompression"""
897 """read compression parameter and install payload decompression"""
895 if value not in util.compengines.supportedbundletypes:
898 if value not in util.compengines.supportedbundletypes:
896 raise error.BundleUnknownFeatureError(params=(param,),
899 raise error.BundleUnknownFeatureError(params=(param,),
897 values=(value,))
900 values=(value,))
898 unbundler._compengine = util.compengines.forbundletype(value)
901 unbundler._compengine = util.compengines.forbundletype(value)
899 if value is not None:
902 if value is not None:
900 unbundler._compressed = True
903 unbundler._compressed = True
901
904
902 class bundlepart(object):
905 class bundlepart(object):
903 """A bundle2 part contains application level payload
906 """A bundle2 part contains application level payload
904
907
905 The part `type` is used to route the part to the application level
908 The part `type` is used to route the part to the application level
906 handler.
909 handler.
907
910
908 The part payload is contained in ``part.data``. It could be raw bytes or a
911 The part payload is contained in ``part.data``. It could be raw bytes or a
909 generator of byte chunks.
912 generator of byte chunks.
910
913
911 You can add parameters to the part using the ``addparam`` method.
914 You can add parameters to the part using the ``addparam`` method.
912 Parameters can be either mandatory (default) or advisory. Remote side
915 Parameters can be either mandatory (default) or advisory. Remote side
913 should be able to safely ignore the advisory ones.
916 should be able to safely ignore the advisory ones.
914
917
915 Both data and parameters cannot be modified after the generation has begun.
918 Both data and parameters cannot be modified after the generation has begun.
916 """
919 """
917
920
918 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
921 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
919 data='', mandatory=True):
922 data='', mandatory=True):
920 validateparttype(parttype)
923 validateparttype(parttype)
921 self.id = None
924 self.id = None
922 self.type = parttype
925 self.type = parttype
923 self._data = data
926 self._data = data
924 self._mandatoryparams = list(mandatoryparams)
927 self._mandatoryparams = list(mandatoryparams)
925 self._advisoryparams = list(advisoryparams)
928 self._advisoryparams = list(advisoryparams)
926 # checking for duplicated entries
929 # checking for duplicated entries
927 self._seenparams = set()
930 self._seenparams = set()
928 for pname, __ in self._mandatoryparams + self._advisoryparams:
931 for pname, __ in self._mandatoryparams + self._advisoryparams:
929 if pname in self._seenparams:
932 if pname in self._seenparams:
930 raise error.ProgrammingError('duplicated params: %s' % pname)
933 raise error.ProgrammingError('duplicated params: %s' % pname)
931 self._seenparams.add(pname)
934 self._seenparams.add(pname)
932 # status of the part's generation:
935 # status of the part's generation:
933 # - None: not started,
936 # - None: not started,
934 # - False: currently generated,
937 # - False: currently generated,
935 # - True: generation done.
938 # - True: generation done.
936 self._generated = None
939 self._generated = None
937 self.mandatory = mandatory
940 self.mandatory = mandatory
938
941
939 def __repr__(self):
942 def __repr__(self):
940 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
943 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
941 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
944 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
942 % (cls, id(self), self.id, self.type, self.mandatory))
945 % (cls, id(self), self.id, self.type, self.mandatory))
943
946
944 def copy(self):
947 def copy(self):
945 """return a copy of the part
948 """return a copy of the part
946
949
947 The new part have the very same content but no partid assigned yet.
950 The new part have the very same content but no partid assigned yet.
948 Parts with generated data cannot be copied."""
951 Parts with generated data cannot be copied."""
949 assert not util.safehasattr(self.data, 'next')
952 assert not util.safehasattr(self.data, 'next')
950 return self.__class__(self.type, self._mandatoryparams,
953 return self.__class__(self.type, self._mandatoryparams,
951 self._advisoryparams, self._data, self.mandatory)
954 self._advisoryparams, self._data, self.mandatory)
952
955
953 # methods used to defines the part content
956 # methods used to defines the part content
954 @property
957 @property
955 def data(self):
958 def data(self):
956 return self._data
959 return self._data
957
960
958 @data.setter
961 @data.setter
959 def data(self, data):
962 def data(self, data):
960 if self._generated is not None:
963 if self._generated is not None:
961 raise error.ReadOnlyPartError('part is being generated')
964 raise error.ReadOnlyPartError('part is being generated')
962 self._data = data
965 self._data = data
963
966
964 @property
967 @property
965 def mandatoryparams(self):
968 def mandatoryparams(self):
966 # make it an immutable tuple to force people through ``addparam``
969 # make it an immutable tuple to force people through ``addparam``
967 return tuple(self._mandatoryparams)
970 return tuple(self._mandatoryparams)
968
971
969 @property
972 @property
970 def advisoryparams(self):
973 def advisoryparams(self):
971 # make it an immutable tuple to force people through ``addparam``
974 # make it an immutable tuple to force people through ``addparam``
972 return tuple(self._advisoryparams)
975 return tuple(self._advisoryparams)
973
976
974 def addparam(self, name, value='', mandatory=True):
977 def addparam(self, name, value='', mandatory=True):
975 """add a parameter to the part
978 """add a parameter to the part
976
979
977 If 'mandatory' is set to True, the remote handler must claim support
980 If 'mandatory' is set to True, the remote handler must claim support
978 for this parameter or the unbundling will be aborted.
981 for this parameter or the unbundling will be aborted.
979
982
980 The 'name' and 'value' cannot exceed 255 bytes each.
983 The 'name' and 'value' cannot exceed 255 bytes each.
981 """
984 """
982 if self._generated is not None:
985 if self._generated is not None:
983 raise error.ReadOnlyPartError('part is being generated')
986 raise error.ReadOnlyPartError('part is being generated')
984 if name in self._seenparams:
987 if name in self._seenparams:
985 raise ValueError('duplicated params: %s' % name)
988 raise ValueError('duplicated params: %s' % name)
986 self._seenparams.add(name)
989 self._seenparams.add(name)
987 params = self._advisoryparams
990 params = self._advisoryparams
988 if mandatory:
991 if mandatory:
989 params = self._mandatoryparams
992 params = self._mandatoryparams
990 params.append((name, value))
993 params.append((name, value))
991
994
992 # methods used to generates the bundle2 stream
995 # methods used to generates the bundle2 stream
993 def getchunks(self, ui):
996 def getchunks(self, ui):
994 if self._generated is not None:
997 if self._generated is not None:
995 raise error.ProgrammingError('part can only be consumed once')
998 raise error.ProgrammingError('part can only be consumed once')
996 self._generated = False
999 self._generated = False
997
1000
998 if ui.debugflag:
1001 if ui.debugflag:
999 msg = ['bundle2-output-part: "%s"' % self.type]
1002 msg = ['bundle2-output-part: "%s"' % self.type]
1000 if not self.mandatory:
1003 if not self.mandatory:
1001 msg.append(' (advisory)')
1004 msg.append(' (advisory)')
1002 nbmp = len(self.mandatoryparams)
1005 nbmp = len(self.mandatoryparams)
1003 nbap = len(self.advisoryparams)
1006 nbap = len(self.advisoryparams)
1004 if nbmp or nbap:
1007 if nbmp or nbap:
1005 msg.append(' (params:')
1008 msg.append(' (params:')
1006 if nbmp:
1009 if nbmp:
1007 msg.append(' %i mandatory' % nbmp)
1010 msg.append(' %i mandatory' % nbmp)
1008 if nbap:
1011 if nbap:
1009 msg.append(' %i advisory' % nbmp)
1012 msg.append(' %i advisory' % nbmp)
1010 msg.append(')')
1013 msg.append(')')
1011 if not self.data:
1014 if not self.data:
1012 msg.append(' empty payload')
1015 msg.append(' empty payload')
1013 elif util.safehasattr(self.data, 'next'):
1016 elif util.safehasattr(self.data, 'next'):
1014 msg.append(' streamed payload')
1017 msg.append(' streamed payload')
1015 else:
1018 else:
1016 msg.append(' %i bytes payload' % len(self.data))
1019 msg.append(' %i bytes payload' % len(self.data))
1017 msg.append('\n')
1020 msg.append('\n')
1018 ui.debug(''.join(msg))
1021 ui.debug(''.join(msg))
1019
1022
1020 #### header
1023 #### header
1021 if self.mandatory:
1024 if self.mandatory:
1022 parttype = self.type.upper()
1025 parttype = self.type.upper()
1023 else:
1026 else:
1024 parttype = self.type.lower()
1027 parttype = self.type.lower()
1025 outdebug(ui, 'part %s: "%s"' % (pycompat.bytestr(self.id), parttype))
1028 outdebug(ui, 'part %s: "%s"' % (pycompat.bytestr(self.id), parttype))
1026 ## parttype
1029 ## parttype
1027 header = [_pack(_fparttypesize, len(parttype)),
1030 header = [_pack(_fparttypesize, len(parttype)),
1028 parttype, _pack(_fpartid, self.id),
1031 parttype, _pack(_fpartid, self.id),
1029 ]
1032 ]
1030 ## parameters
1033 ## parameters
1031 # count
1034 # count
1032 manpar = self.mandatoryparams
1035 manpar = self.mandatoryparams
1033 advpar = self.advisoryparams
1036 advpar = self.advisoryparams
1034 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
1037 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
1035 # size
1038 # size
1036 parsizes = []
1039 parsizes = []
1037 for key, value in manpar:
1040 for key, value in manpar:
1038 parsizes.append(len(key))
1041 parsizes.append(len(key))
1039 parsizes.append(len(value))
1042 parsizes.append(len(value))
1040 for key, value in advpar:
1043 for key, value in advpar:
1041 parsizes.append(len(key))
1044 parsizes.append(len(key))
1042 parsizes.append(len(value))
1045 parsizes.append(len(value))
1043 paramsizes = _pack(_makefpartparamsizes(len(parsizes) // 2), *parsizes)
1046 paramsizes = _pack(_makefpartparamsizes(len(parsizes) // 2), *parsizes)
1044 header.append(paramsizes)
1047 header.append(paramsizes)
1045 # key, value
1048 # key, value
1046 for key, value in manpar:
1049 for key, value in manpar:
1047 header.append(key)
1050 header.append(key)
1048 header.append(value)
1051 header.append(value)
1049 for key, value in advpar:
1052 for key, value in advpar:
1050 header.append(key)
1053 header.append(key)
1051 header.append(value)
1054 header.append(value)
1052 ## finalize header
1055 ## finalize header
1053 try:
1056 try:
1054 headerchunk = ''.join(header)
1057 headerchunk = ''.join(header)
1055 except TypeError:
1058 except TypeError:
1056 raise TypeError(r'Found a non-bytes trying to '
1059 raise TypeError(r'Found a non-bytes trying to '
1057 r'build bundle part header: %r' % header)
1060 r'build bundle part header: %r' % header)
1058 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
1061 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
1059 yield _pack(_fpartheadersize, len(headerchunk))
1062 yield _pack(_fpartheadersize, len(headerchunk))
1060 yield headerchunk
1063 yield headerchunk
1061 ## payload
1064 ## payload
1062 try:
1065 try:
1063 for chunk in self._payloadchunks():
1066 for chunk in self._payloadchunks():
1064 outdebug(ui, 'payload chunk size: %i' % len(chunk))
1067 outdebug(ui, 'payload chunk size: %i' % len(chunk))
1065 yield _pack(_fpayloadsize, len(chunk))
1068 yield _pack(_fpayloadsize, len(chunk))
1066 yield chunk
1069 yield chunk
1067 except GeneratorExit:
1070 except GeneratorExit:
1068 # GeneratorExit means that nobody is listening for our
1071 # GeneratorExit means that nobody is listening for our
1069 # results anyway, so just bail quickly rather than trying
1072 # results anyway, so just bail quickly rather than trying
1070 # to produce an error part.
1073 # to produce an error part.
1071 ui.debug('bundle2-generatorexit\n')
1074 ui.debug('bundle2-generatorexit\n')
1072 raise
1075 raise
1073 except BaseException as exc:
1076 except BaseException as exc:
1074 bexc = util.forcebytestr(exc)
1077 bexc = util.forcebytestr(exc)
1075 # backup exception data for later
1078 # backup exception data for later
1076 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
1079 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
1077 % bexc)
1080 % bexc)
1078 tb = sys.exc_info()[2]
1081 tb = sys.exc_info()[2]
1079 msg = 'unexpected error: %s' % bexc
1082 msg = 'unexpected error: %s' % bexc
1080 interpart = bundlepart('error:abort', [('message', msg)],
1083 interpart = bundlepart('error:abort', [('message', msg)],
1081 mandatory=False)
1084 mandatory=False)
1082 interpart.id = 0
1085 interpart.id = 0
1083 yield _pack(_fpayloadsize, -1)
1086 yield _pack(_fpayloadsize, -1)
1084 for chunk in interpart.getchunks(ui=ui):
1087 for chunk in interpart.getchunks(ui=ui):
1085 yield chunk
1088 yield chunk
1086 outdebug(ui, 'closing payload chunk')
1089 outdebug(ui, 'closing payload chunk')
1087 # abort current part payload
1090 # abort current part payload
1088 yield _pack(_fpayloadsize, 0)
1091 yield _pack(_fpayloadsize, 0)
1089 pycompat.raisewithtb(exc, tb)
1092 pycompat.raisewithtb(exc, tb)
1090 # end of payload
1093 # end of payload
1091 outdebug(ui, 'closing payload chunk')
1094 outdebug(ui, 'closing payload chunk')
1092 yield _pack(_fpayloadsize, 0)
1095 yield _pack(_fpayloadsize, 0)
1093 self._generated = True
1096 self._generated = True
1094
1097
1095 def _payloadchunks(self):
1098 def _payloadchunks(self):
1096 """yield chunks of a the part payload
1099 """yield chunks of a the part payload
1097
1100
1098 Exists to handle the different methods to provide data to a part."""
1101 Exists to handle the different methods to provide data to a part."""
1099 # we only support fixed size data now.
1102 # we only support fixed size data now.
1100 # This will be improved in the future.
1103 # This will be improved in the future.
1101 if (util.safehasattr(self.data, 'next')
1104 if (util.safehasattr(self.data, 'next')
1102 or util.safehasattr(self.data, '__next__')):
1105 or util.safehasattr(self.data, '__next__')):
1103 buff = util.chunkbuffer(self.data)
1106 buff = util.chunkbuffer(self.data)
1104 chunk = buff.read(preferedchunksize)
1107 chunk = buff.read(preferedchunksize)
1105 while chunk:
1108 while chunk:
1106 yield chunk
1109 yield chunk
1107 chunk = buff.read(preferedchunksize)
1110 chunk = buff.read(preferedchunksize)
1108 elif len(self.data):
1111 elif len(self.data):
1109 yield self.data
1112 yield self.data
1110
1113
1111
1114
1112 flaginterrupt = -1
1115 flaginterrupt = -1
1113
1116
1114 class interrupthandler(unpackermixin):
1117 class interrupthandler(unpackermixin):
1115 """read one part and process it with restricted capability
1118 """read one part and process it with restricted capability
1116
1119
1117 This allows to transmit exception raised on the producer size during part
1120 This allows to transmit exception raised on the producer size during part
1118 iteration while the consumer is reading a part.
1121 iteration while the consumer is reading a part.
1119
1122
1120 Part processed in this manner only have access to a ui object,"""
1123 Part processed in this manner only have access to a ui object,"""
1121
1124
1122 def __init__(self, ui, fp):
1125 def __init__(self, ui, fp):
1123 super(interrupthandler, self).__init__(fp)
1126 super(interrupthandler, self).__init__(fp)
1124 self.ui = ui
1127 self.ui = ui
1125
1128
1126 def _readpartheader(self):
1129 def _readpartheader(self):
1127 """reads a part header size and return the bytes blob
1130 """reads a part header size and return the bytes blob
1128
1131
1129 returns None if empty"""
1132 returns None if empty"""
1130 headersize = self._unpack(_fpartheadersize)[0]
1133 headersize = self._unpack(_fpartheadersize)[0]
1131 if headersize < 0:
1134 if headersize < 0:
1132 raise error.BundleValueError('negative part header size: %i'
1135 raise error.BundleValueError('negative part header size: %i'
1133 % headersize)
1136 % headersize)
1134 indebug(self.ui, 'part header size: %i\n' % headersize)
1137 indebug(self.ui, 'part header size: %i\n' % headersize)
1135 if headersize:
1138 if headersize:
1136 return self._readexact(headersize)
1139 return self._readexact(headersize)
1137 return None
1140 return None
1138
1141
1139 def __call__(self):
1142 def __call__(self):
1140
1143
1141 self.ui.debug('bundle2-input-stream-interrupt:'
1144 self.ui.debug('bundle2-input-stream-interrupt:'
1142 ' opening out of band context\n')
1145 ' opening out of band context\n')
1143 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1146 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1144 headerblock = self._readpartheader()
1147 headerblock = self._readpartheader()
1145 if headerblock is None:
1148 if headerblock is None:
1146 indebug(self.ui, 'no part found during interruption.')
1149 indebug(self.ui, 'no part found during interruption.')
1147 return
1150 return
1148 part = unbundlepart(self.ui, headerblock, self._fp)
1151 part = unbundlepart(self.ui, headerblock, self._fp)
1149 op = interruptoperation(self.ui)
1152 op = interruptoperation(self.ui)
1150 _processpart(op, part)
1153 hardabort = False
1154 try:
1155 _processpart(op, part)
1156 except (SystemExit, KeyboardInterrupt):
1157 hardabort = True
1158 raise
1159 finally:
1160 if not hardabort:
1161 part.seek(0, 2)
1151 self.ui.debug('bundle2-input-stream-interrupt:'
1162 self.ui.debug('bundle2-input-stream-interrupt:'
1152 ' closing out of band context\n')
1163 ' closing out of band context\n')
1153
1164
1154 class interruptoperation(object):
1165 class interruptoperation(object):
1155 """A limited operation to be use by part handler during interruption
1166 """A limited operation to be use by part handler during interruption
1156
1167
1157 It only have access to an ui object.
1168 It only have access to an ui object.
1158 """
1169 """
1159
1170
1160 def __init__(self, ui):
1171 def __init__(self, ui):
1161 self.ui = ui
1172 self.ui = ui
1162 self.reply = None
1173 self.reply = None
1163 self.captureoutput = False
1174 self.captureoutput = False
1164
1175
1165 @property
1176 @property
1166 def repo(self):
1177 def repo(self):
1167 raise error.ProgrammingError('no repo access from stream interruption')
1178 raise error.ProgrammingError('no repo access from stream interruption')
1168
1179
1169 def gettransaction(self):
1180 def gettransaction(self):
1170 raise TransactionUnavailable('no repo access from stream interruption')
1181 raise TransactionUnavailable('no repo access from stream interruption')
1171
1182
1172 class unbundlepart(unpackermixin):
1183 class unbundlepart(unpackermixin):
1173 """a bundle part read from a bundle"""
1184 """a bundle part read from a bundle"""
1174
1185
1175 def __init__(self, ui, header, fp):
1186 def __init__(self, ui, header, fp):
1176 super(unbundlepart, self).__init__(fp)
1187 super(unbundlepart, self).__init__(fp)
1177 self._seekable = (util.safehasattr(fp, 'seek') and
1188 self._seekable = (util.safehasattr(fp, 'seek') and
1178 util.safehasattr(fp, 'tell'))
1189 util.safehasattr(fp, 'tell'))
1179 self.ui = ui
1190 self.ui = ui
1180 # unbundle state attr
1191 # unbundle state attr
1181 self._headerdata = header
1192 self._headerdata = header
1182 self._headeroffset = 0
1193 self._headeroffset = 0
1183 self._initialized = False
1194 self._initialized = False
1184 self.consumed = False
1195 self.consumed = False
1185 # part data
1196 # part data
1186 self.id = None
1197 self.id = None
1187 self.type = None
1198 self.type = None
1188 self.mandatoryparams = None
1199 self.mandatoryparams = None
1189 self.advisoryparams = None
1200 self.advisoryparams = None
1190 self.params = None
1201 self.params = None
1191 self.mandatorykeys = ()
1202 self.mandatorykeys = ()
1192 self._payloadstream = None
1203 self._payloadstream = None
1193 self._readheader()
1204 self._readheader()
1194 self._mandatory = None
1205 self._mandatory = None
1195 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1206 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1196 self._pos = 0
1207 self._pos = 0
1197
1208
1198 def _fromheader(self, size):
1209 def _fromheader(self, size):
1199 """return the next <size> byte from the header"""
1210 """return the next <size> byte from the header"""
1200 offset = self._headeroffset
1211 offset = self._headeroffset
1201 data = self._headerdata[offset:(offset + size)]
1212 data = self._headerdata[offset:(offset + size)]
1202 self._headeroffset = offset + size
1213 self._headeroffset = offset + size
1203 return data
1214 return data
1204
1215
1205 def _unpackheader(self, format):
1216 def _unpackheader(self, format):
1206 """read given format from header
1217 """read given format from header
1207
1218
1208 This automatically compute the size of the format to read."""
1219 This automatically compute the size of the format to read."""
1209 data = self._fromheader(struct.calcsize(format))
1220 data = self._fromheader(struct.calcsize(format))
1210 return _unpack(format, data)
1221 return _unpack(format, data)
1211
1222
1212 def _initparams(self, mandatoryparams, advisoryparams):
1223 def _initparams(self, mandatoryparams, advisoryparams):
1213 """internal function to setup all logic related parameters"""
1224 """internal function to setup all logic related parameters"""
1214 # make it read only to prevent people touching it by mistake.
1225 # make it read only to prevent people touching it by mistake.
1215 self.mandatoryparams = tuple(mandatoryparams)
1226 self.mandatoryparams = tuple(mandatoryparams)
1216 self.advisoryparams = tuple(advisoryparams)
1227 self.advisoryparams = tuple(advisoryparams)
1217 # user friendly UI
1228 # user friendly UI
1218 self.params = util.sortdict(self.mandatoryparams)
1229 self.params = util.sortdict(self.mandatoryparams)
1219 self.params.update(self.advisoryparams)
1230 self.params.update(self.advisoryparams)
1220 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1231 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1221
1232
1222 def _payloadchunks(self, chunknum=0):
1233 def _payloadchunks(self, chunknum=0):
1223 '''seek to specified chunk and start yielding data'''
1234 '''seek to specified chunk and start yielding data'''
1224 if len(self._chunkindex) == 0:
1235 if len(self._chunkindex) == 0:
1225 assert chunknum == 0, 'Must start with chunk 0'
1236 assert chunknum == 0, 'Must start with chunk 0'
1226 self._chunkindex.append((0, self._tellfp()))
1237 self._chunkindex.append((0, self._tellfp()))
1227 else:
1238 else:
1228 assert chunknum < len(self._chunkindex), \
1239 assert chunknum < len(self._chunkindex), \
1229 'Unknown chunk %d' % chunknum
1240 'Unknown chunk %d' % chunknum
1230 self._seekfp(self._chunkindex[chunknum][1])
1241 self._seekfp(self._chunkindex[chunknum][1])
1231
1242
1232 pos = self._chunkindex[chunknum][0]
1243 pos = self._chunkindex[chunknum][0]
1233 payloadsize = self._unpack(_fpayloadsize)[0]
1244 payloadsize = self._unpack(_fpayloadsize)[0]
1234 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1245 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1235 while payloadsize:
1246 while payloadsize:
1236 if payloadsize == flaginterrupt:
1247 if payloadsize == flaginterrupt:
1237 # interruption detection, the handler will now read a
1248 # interruption detection, the handler will now read a
1238 # single part and process it.
1249 # single part and process it.
1239 interrupthandler(self.ui, self._fp)()
1250 interrupthandler(self.ui, self._fp)()
1240 elif payloadsize < 0:
1251 elif payloadsize < 0:
1241 msg = 'negative payload chunk size: %i' % payloadsize
1252 msg = 'negative payload chunk size: %i' % payloadsize
1242 raise error.BundleValueError(msg)
1253 raise error.BundleValueError(msg)
1243 else:
1254 else:
1244 result = self._readexact(payloadsize)
1255 result = self._readexact(payloadsize)
1245 chunknum += 1
1256 chunknum += 1
1246 pos += payloadsize
1257 pos += payloadsize
1247 if chunknum == len(self._chunkindex):
1258 if chunknum == len(self._chunkindex):
1248 self._chunkindex.append((pos, self._tellfp()))
1259 self._chunkindex.append((pos, self._tellfp()))
1249 yield result
1260 yield result
1250 payloadsize = self._unpack(_fpayloadsize)[0]
1261 payloadsize = self._unpack(_fpayloadsize)[0]
1251 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1262 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1252
1263
1253 def _findchunk(self, pos):
1264 def _findchunk(self, pos):
1254 '''for a given payload position, return a chunk number and offset'''
1265 '''for a given payload position, return a chunk number and offset'''
1255 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1266 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1256 if ppos == pos:
1267 if ppos == pos:
1257 return chunk, 0
1268 return chunk, 0
1258 elif ppos > pos:
1269 elif ppos > pos:
1259 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1270 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1260 raise ValueError('Unknown chunk')
1271 raise ValueError('Unknown chunk')
1261
1272
1262 def _readheader(self):
1273 def _readheader(self):
1263 """read the header and setup the object"""
1274 """read the header and setup the object"""
1264 typesize = self._unpackheader(_fparttypesize)[0]
1275 typesize = self._unpackheader(_fparttypesize)[0]
1265 self.type = self._fromheader(typesize)
1276 self.type = self._fromheader(typesize)
1266 indebug(self.ui, 'part type: "%s"' % self.type)
1277 indebug(self.ui, 'part type: "%s"' % self.type)
1267 self.id = self._unpackheader(_fpartid)[0]
1278 self.id = self._unpackheader(_fpartid)[0]
1268 indebug(self.ui, 'part id: "%s"' % pycompat.bytestr(self.id))
1279 indebug(self.ui, 'part id: "%s"' % pycompat.bytestr(self.id))
1269 # extract mandatory bit from type
1280 # extract mandatory bit from type
1270 self.mandatory = (self.type != self.type.lower())
1281 self.mandatory = (self.type != self.type.lower())
1271 self.type = self.type.lower()
1282 self.type = self.type.lower()
1272 ## reading parameters
1283 ## reading parameters
1273 # param count
1284 # param count
1274 mancount, advcount = self._unpackheader(_fpartparamcount)
1285 mancount, advcount = self._unpackheader(_fpartparamcount)
1275 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1286 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1276 # param size
1287 # param size
1277 fparamsizes = _makefpartparamsizes(mancount + advcount)
1288 fparamsizes = _makefpartparamsizes(mancount + advcount)
1278 paramsizes = self._unpackheader(fparamsizes)
1289 paramsizes = self._unpackheader(fparamsizes)
1279 # make it a list of couple again
1290 # make it a list of couple again
1280 paramsizes = list(zip(paramsizes[::2], paramsizes[1::2]))
1291 paramsizes = list(zip(paramsizes[::2], paramsizes[1::2]))
1281 # split mandatory from advisory
1292 # split mandatory from advisory
1282 mansizes = paramsizes[:mancount]
1293 mansizes = paramsizes[:mancount]
1283 advsizes = paramsizes[mancount:]
1294 advsizes = paramsizes[mancount:]
1284 # retrieve param value
1295 # retrieve param value
1285 manparams = []
1296 manparams = []
1286 for key, value in mansizes:
1297 for key, value in mansizes:
1287 manparams.append((self._fromheader(key), self._fromheader(value)))
1298 manparams.append((self._fromheader(key), self._fromheader(value)))
1288 advparams = []
1299 advparams = []
1289 for key, value in advsizes:
1300 for key, value in advsizes:
1290 advparams.append((self._fromheader(key), self._fromheader(value)))
1301 advparams.append((self._fromheader(key), self._fromheader(value)))
1291 self._initparams(manparams, advparams)
1302 self._initparams(manparams, advparams)
1292 ## part payload
1303 ## part payload
1293 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1304 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1294 # we read the data, tell it
1305 # we read the data, tell it
1295 self._initialized = True
1306 self._initialized = True
1296
1307
1297 def read(self, size=None):
1308 def read(self, size=None):
1298 """read payload data"""
1309 """read payload data"""
1299 if not self._initialized:
1310 if not self._initialized:
1300 self._readheader()
1311 self._readheader()
1301 if size is None:
1312 if size is None:
1302 data = self._payloadstream.read()
1313 data = self._payloadstream.read()
1303 else:
1314 else:
1304 data = self._payloadstream.read(size)
1315 data = self._payloadstream.read(size)
1305 self._pos += len(data)
1316 self._pos += len(data)
1306 if size is None or len(data) < size:
1317 if size is None or len(data) < size:
1307 if not self.consumed and self._pos:
1318 if not self.consumed and self._pos:
1308 self.ui.debug('bundle2-input-part: total payload size %i\n'
1319 self.ui.debug('bundle2-input-part: total payload size %i\n'
1309 % self._pos)
1320 % self._pos)
1310 self.consumed = True
1321 self.consumed = True
1311 return data
1322 return data
1312
1323
1313 def tell(self):
1324 def tell(self):
1314 return self._pos
1325 return self._pos
1315
1326
1316 def seek(self, offset, whence=0):
1327 def seek(self, offset, whence=0):
1317 if whence == 0:
1328 if whence == 0:
1318 newpos = offset
1329 newpos = offset
1319 elif whence == 1:
1330 elif whence == 1:
1320 newpos = self._pos + offset
1331 newpos = self._pos + offset
1321 elif whence == 2:
1332 elif whence == 2:
1322 if not self.consumed:
1333 if not self.consumed:
1323 self.read()
1334 self.read()
1324 newpos = self._chunkindex[-1][0] - offset
1335 newpos = self._chunkindex[-1][0] - offset
1325 else:
1336 else:
1326 raise ValueError('Unknown whence value: %r' % (whence,))
1337 raise ValueError('Unknown whence value: %r' % (whence,))
1327
1338
1328 if newpos > self._chunkindex[-1][0] and not self.consumed:
1339 if newpos > self._chunkindex[-1][0] and not self.consumed:
1329 self.read()
1340 self.read()
1330 if not 0 <= newpos <= self._chunkindex[-1][0]:
1341 if not 0 <= newpos <= self._chunkindex[-1][0]:
1331 raise ValueError('Offset out of range')
1342 raise ValueError('Offset out of range')
1332
1343
1333 if self._pos != newpos:
1344 if self._pos != newpos:
1334 chunk, internaloffset = self._findchunk(newpos)
1345 chunk, internaloffset = self._findchunk(newpos)
1335 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1346 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1336 adjust = self.read(internaloffset)
1347 adjust = self.read(internaloffset)
1337 if len(adjust) != internaloffset:
1348 if len(adjust) != internaloffset:
1338 raise error.Abort(_('Seek failed\n'))
1349 raise error.Abort(_('Seek failed\n'))
1339 self._pos = newpos
1350 self._pos = newpos
1340
1351
1341 def _seekfp(self, offset, whence=0):
1352 def _seekfp(self, offset, whence=0):
1342 """move the underlying file pointer
1353 """move the underlying file pointer
1343
1354
1344 This method is meant for internal usage by the bundle2 protocol only.
1355 This method is meant for internal usage by the bundle2 protocol only.
1345 They directly manipulate the low level stream including bundle2 level
1356 They directly manipulate the low level stream including bundle2 level
1346 instruction.
1357 instruction.
1347
1358
1348 Do not use it to implement higher-level logic or methods."""
1359 Do not use it to implement higher-level logic or methods."""
1349 if self._seekable:
1360 if self._seekable:
1350 return self._fp.seek(offset, whence)
1361 return self._fp.seek(offset, whence)
1351 else:
1362 else:
1352 raise NotImplementedError(_('File pointer is not seekable'))
1363 raise NotImplementedError(_('File pointer is not seekable'))
1353
1364
1354 def _tellfp(self):
1365 def _tellfp(self):
1355 """return the file offset, or None if file is not seekable
1366 """return the file offset, or None if file is not seekable
1356
1367
1357 This method is meant for internal usage by the bundle2 protocol only.
1368 This method is meant for internal usage by the bundle2 protocol only.
1358 They directly manipulate the low level stream including bundle2 level
1369 They directly manipulate the low level stream including bundle2 level
1359 instruction.
1370 instruction.
1360
1371
1361 Do not use it to implement higher-level logic or methods."""
1372 Do not use it to implement higher-level logic or methods."""
1362 if self._seekable:
1373 if self._seekable:
1363 try:
1374 try:
1364 return self._fp.tell()
1375 return self._fp.tell()
1365 except IOError as e:
1376 except IOError as e:
1366 if e.errno == errno.ESPIPE:
1377 if e.errno == errno.ESPIPE:
1367 self._seekable = False
1378 self._seekable = False
1368 else:
1379 else:
1369 raise
1380 raise
1370 return None
1381 return None
1371
1382
1372 # These are only the static capabilities.
1383 # These are only the static capabilities.
1373 # Check the 'getrepocaps' function for the rest.
1384 # Check the 'getrepocaps' function for the rest.
1374 capabilities = {'HG20': (),
1385 capabilities = {'HG20': (),
1375 'error': ('abort', 'unsupportedcontent', 'pushraced',
1386 'error': ('abort', 'unsupportedcontent', 'pushraced',
1376 'pushkey'),
1387 'pushkey'),
1377 'listkeys': (),
1388 'listkeys': (),
1378 'pushkey': (),
1389 'pushkey': (),
1379 'digests': tuple(sorted(util.DIGESTS.keys())),
1390 'digests': tuple(sorted(util.DIGESTS.keys())),
1380 'remote-changegroup': ('http', 'https'),
1391 'remote-changegroup': ('http', 'https'),
1381 'hgtagsfnodes': (),
1392 'hgtagsfnodes': (),
1382 }
1393 }
1383
1394
1384 def getrepocaps(repo, allowpushback=False):
1395 def getrepocaps(repo, allowpushback=False):
1385 """return the bundle2 capabilities for a given repo
1396 """return the bundle2 capabilities for a given repo
1386
1397
1387 Exists to allow extensions (like evolution) to mutate the capabilities.
1398 Exists to allow extensions (like evolution) to mutate the capabilities.
1388 """
1399 """
1389 caps = capabilities.copy()
1400 caps = capabilities.copy()
1390 caps['changegroup'] = tuple(sorted(
1401 caps['changegroup'] = tuple(sorted(
1391 changegroup.supportedincomingversions(repo)))
1402 changegroup.supportedincomingversions(repo)))
1392 if obsolete.isenabled(repo, obsolete.exchangeopt):
1403 if obsolete.isenabled(repo, obsolete.exchangeopt):
1393 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1404 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1394 caps['obsmarkers'] = supportedformat
1405 caps['obsmarkers'] = supportedformat
1395 if allowpushback:
1406 if allowpushback:
1396 caps['pushback'] = ()
1407 caps['pushback'] = ()
1397 cpmode = repo.ui.config('server', 'concurrent-push-mode')
1408 cpmode = repo.ui.config('server', 'concurrent-push-mode')
1398 if cpmode == 'check-related':
1409 if cpmode == 'check-related':
1399 caps['checkheads'] = ('related',)
1410 caps['checkheads'] = ('related',)
1400 return caps
1411 return caps
1401
1412
1402 def bundle2caps(remote):
1413 def bundle2caps(remote):
1403 """return the bundle capabilities of a peer as dict"""
1414 """return the bundle capabilities of a peer as dict"""
1404 raw = remote.capable('bundle2')
1415 raw = remote.capable('bundle2')
1405 if not raw and raw != '':
1416 if not raw and raw != '':
1406 return {}
1417 return {}
1407 capsblob = urlreq.unquote(remote.capable('bundle2'))
1418 capsblob = urlreq.unquote(remote.capable('bundle2'))
1408 return decodecaps(capsblob)
1419 return decodecaps(capsblob)
1409
1420
1410 def obsmarkersversion(caps):
1421 def obsmarkersversion(caps):
1411 """extract the list of supported obsmarkers versions from a bundle2caps dict
1422 """extract the list of supported obsmarkers versions from a bundle2caps dict
1412 """
1423 """
1413 obscaps = caps.get('obsmarkers', ())
1424 obscaps = caps.get('obsmarkers', ())
1414 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1425 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1415
1426
1416 def writenewbundle(ui, repo, source, filename, bundletype, outgoing, opts,
1427 def writenewbundle(ui, repo, source, filename, bundletype, outgoing, opts,
1417 vfs=None, compression=None, compopts=None):
1428 vfs=None, compression=None, compopts=None):
1418 if bundletype.startswith('HG10'):
1429 if bundletype.startswith('HG10'):
1419 cg = changegroup.makechangegroup(repo, outgoing, '01', source)
1430 cg = changegroup.makechangegroup(repo, outgoing, '01', source)
1420 return writebundle(ui, cg, filename, bundletype, vfs=vfs,
1431 return writebundle(ui, cg, filename, bundletype, vfs=vfs,
1421 compression=compression, compopts=compopts)
1432 compression=compression, compopts=compopts)
1422 elif not bundletype.startswith('HG20'):
1433 elif not bundletype.startswith('HG20'):
1423 raise error.ProgrammingError('unknown bundle type: %s' % bundletype)
1434 raise error.ProgrammingError('unknown bundle type: %s' % bundletype)
1424
1435
1425 caps = {}
1436 caps = {}
1426 if 'obsolescence' in opts:
1437 if 'obsolescence' in opts:
1427 caps['obsmarkers'] = ('V1',)
1438 caps['obsmarkers'] = ('V1',)
1428 bundle = bundle20(ui, caps)
1439 bundle = bundle20(ui, caps)
1429 bundle.setcompression(compression, compopts)
1440 bundle.setcompression(compression, compopts)
1430 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1441 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1431 chunkiter = bundle.getchunks()
1442 chunkiter = bundle.getchunks()
1432
1443
1433 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1444 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1434
1445
1435 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1446 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1436 # We should eventually reconcile this logic with the one behind
1447 # We should eventually reconcile this logic with the one behind
1437 # 'exchange.getbundle2partsgenerator'.
1448 # 'exchange.getbundle2partsgenerator'.
1438 #
1449 #
1439 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1450 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1440 # different right now. So we keep them separated for now for the sake of
1451 # different right now. So we keep them separated for now for the sake of
1441 # simplicity.
1452 # simplicity.
1442
1453
1443 # we always want a changegroup in such bundle
1454 # we always want a changegroup in such bundle
1444 cgversion = opts.get('cg.version')
1455 cgversion = opts.get('cg.version')
1445 if cgversion is None:
1456 if cgversion is None:
1446 cgversion = changegroup.safeversion(repo)
1457 cgversion = changegroup.safeversion(repo)
1447 cg = changegroup.makechangegroup(repo, outgoing, cgversion, source)
1458 cg = changegroup.makechangegroup(repo, outgoing, cgversion, source)
1448 part = bundler.newpart('changegroup', data=cg.getchunks())
1459 part = bundler.newpart('changegroup', data=cg.getchunks())
1449 part.addparam('version', cg.version)
1460 part.addparam('version', cg.version)
1450 if 'clcount' in cg.extras:
1461 if 'clcount' in cg.extras:
1451 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1462 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1452 mandatory=False)
1463 mandatory=False)
1453 if opts.get('phases') and repo.revs('%ln and secret()',
1464 if opts.get('phases') and repo.revs('%ln and secret()',
1454 outgoing.missingheads):
1465 outgoing.missingheads):
1455 part.addparam('targetphase', '%d' % phases.secret, mandatory=False)
1466 part.addparam('targetphase', '%d' % phases.secret, mandatory=False)
1456
1467
1457 addparttagsfnodescache(repo, bundler, outgoing)
1468 addparttagsfnodescache(repo, bundler, outgoing)
1458
1469
1459 if opts.get('obsolescence', False):
1470 if opts.get('obsolescence', False):
1460 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1471 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1461 buildobsmarkerspart(bundler, obsmarkers)
1472 buildobsmarkerspart(bundler, obsmarkers)
1462
1473
1463 if opts.get('phases', False):
1474 if opts.get('phases', False):
1464 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1475 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1465 phasedata = []
1476 phasedata = []
1466 for phase in phases.allphases:
1477 for phase in phases.allphases:
1467 for head in headsbyphase[phase]:
1478 for head in headsbyphase[phase]:
1468 phasedata.append(_pack(_fphasesentry, phase, head))
1479 phasedata.append(_pack(_fphasesentry, phase, head))
1469 bundler.newpart('phase-heads', data=''.join(phasedata))
1480 bundler.newpart('phase-heads', data=''.join(phasedata))
1470
1481
1471 def addparttagsfnodescache(repo, bundler, outgoing):
1482 def addparttagsfnodescache(repo, bundler, outgoing):
1472 # we include the tags fnode cache for the bundle changeset
1483 # we include the tags fnode cache for the bundle changeset
1473 # (as an optional parts)
1484 # (as an optional parts)
1474 cache = tags.hgtagsfnodescache(repo.unfiltered())
1485 cache = tags.hgtagsfnodescache(repo.unfiltered())
1475 chunks = []
1486 chunks = []
1476
1487
1477 # .hgtags fnodes are only relevant for head changesets. While we could
1488 # .hgtags fnodes are only relevant for head changesets. While we could
1478 # transfer values for all known nodes, there will likely be little to
1489 # transfer values for all known nodes, there will likely be little to
1479 # no benefit.
1490 # no benefit.
1480 #
1491 #
1481 # We don't bother using a generator to produce output data because
1492 # We don't bother using a generator to produce output data because
1482 # a) we only have 40 bytes per head and even esoteric numbers of heads
1493 # a) we only have 40 bytes per head and even esoteric numbers of heads
1483 # consume little memory (1M heads is 40MB) b) we don't want to send the
1494 # consume little memory (1M heads is 40MB) b) we don't want to send the
1484 # part if we don't have entries and knowing if we have entries requires
1495 # part if we don't have entries and knowing if we have entries requires
1485 # cache lookups.
1496 # cache lookups.
1486 for node in outgoing.missingheads:
1497 for node in outgoing.missingheads:
1487 # Don't compute missing, as this may slow down serving.
1498 # Don't compute missing, as this may slow down serving.
1488 fnode = cache.getfnode(node, computemissing=False)
1499 fnode = cache.getfnode(node, computemissing=False)
1489 if fnode is not None:
1500 if fnode is not None:
1490 chunks.extend([node, fnode])
1501 chunks.extend([node, fnode])
1491
1502
1492 if chunks:
1503 if chunks:
1493 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1504 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1494
1505
1495 def buildobsmarkerspart(bundler, markers):
1506 def buildobsmarkerspart(bundler, markers):
1496 """add an obsmarker part to the bundler with <markers>
1507 """add an obsmarker part to the bundler with <markers>
1497
1508
1498 No part is created if markers is empty.
1509 No part is created if markers is empty.
1499 Raises ValueError if the bundler doesn't support any known obsmarker format.
1510 Raises ValueError if the bundler doesn't support any known obsmarker format.
1500 """
1511 """
1501 if not markers:
1512 if not markers:
1502 return None
1513 return None
1503
1514
1504 remoteversions = obsmarkersversion(bundler.capabilities)
1515 remoteversions = obsmarkersversion(bundler.capabilities)
1505 version = obsolete.commonversion(remoteversions)
1516 version = obsolete.commonversion(remoteversions)
1506 if version is None:
1517 if version is None:
1507 raise ValueError('bundler does not support common obsmarker format')
1518 raise ValueError('bundler does not support common obsmarker format')
1508 stream = obsolete.encodemarkers(markers, True, version=version)
1519 stream = obsolete.encodemarkers(markers, True, version=version)
1509 return bundler.newpart('obsmarkers', data=stream)
1520 return bundler.newpart('obsmarkers', data=stream)
1510
1521
1511 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1522 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1512 compopts=None):
1523 compopts=None):
1513 """Write a bundle file and return its filename.
1524 """Write a bundle file and return its filename.
1514
1525
1515 Existing files will not be overwritten.
1526 Existing files will not be overwritten.
1516 If no filename is specified, a temporary file is created.
1527 If no filename is specified, a temporary file is created.
1517 bz2 compression can be turned off.
1528 bz2 compression can be turned off.
1518 The bundle file will be deleted in case of errors.
1529 The bundle file will be deleted in case of errors.
1519 """
1530 """
1520
1531
1521 if bundletype == "HG20":
1532 if bundletype == "HG20":
1522 bundle = bundle20(ui)
1533 bundle = bundle20(ui)
1523 bundle.setcompression(compression, compopts)
1534 bundle.setcompression(compression, compopts)
1524 part = bundle.newpart('changegroup', data=cg.getchunks())
1535 part = bundle.newpart('changegroup', data=cg.getchunks())
1525 part.addparam('version', cg.version)
1536 part.addparam('version', cg.version)
1526 if 'clcount' in cg.extras:
1537 if 'clcount' in cg.extras:
1527 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1538 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1528 mandatory=False)
1539 mandatory=False)
1529 chunkiter = bundle.getchunks()
1540 chunkiter = bundle.getchunks()
1530 else:
1541 else:
1531 # compression argument is only for the bundle2 case
1542 # compression argument is only for the bundle2 case
1532 assert compression is None
1543 assert compression is None
1533 if cg.version != '01':
1544 if cg.version != '01':
1534 raise error.Abort(_('old bundle types only supports v1 '
1545 raise error.Abort(_('old bundle types only supports v1 '
1535 'changegroups'))
1546 'changegroups'))
1536 header, comp = bundletypes[bundletype]
1547 header, comp = bundletypes[bundletype]
1537 if comp not in util.compengines.supportedbundletypes:
1548 if comp not in util.compengines.supportedbundletypes:
1538 raise error.Abort(_('unknown stream compression type: %s')
1549 raise error.Abort(_('unknown stream compression type: %s')
1539 % comp)
1550 % comp)
1540 compengine = util.compengines.forbundletype(comp)
1551 compengine = util.compengines.forbundletype(comp)
1541 def chunkiter():
1552 def chunkiter():
1542 yield header
1553 yield header
1543 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1554 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1544 yield chunk
1555 yield chunk
1545 chunkiter = chunkiter()
1556 chunkiter = chunkiter()
1546
1557
1547 # parse the changegroup data, otherwise we will block
1558 # parse the changegroup data, otherwise we will block
1548 # in case of sshrepo because we don't know the end of the stream
1559 # in case of sshrepo because we don't know the end of the stream
1549 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1560 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1550
1561
1551 def combinechangegroupresults(op):
1562 def combinechangegroupresults(op):
1552 """logic to combine 0 or more addchangegroup results into one"""
1563 """logic to combine 0 or more addchangegroup results into one"""
1553 results = [r.get('return', 0)
1564 results = [r.get('return', 0)
1554 for r in op.records['changegroup']]
1565 for r in op.records['changegroup']]
1555 changedheads = 0
1566 changedheads = 0
1556 result = 1
1567 result = 1
1557 for ret in results:
1568 for ret in results:
1558 # If any changegroup result is 0, return 0
1569 # If any changegroup result is 0, return 0
1559 if ret == 0:
1570 if ret == 0:
1560 result = 0
1571 result = 0
1561 break
1572 break
1562 if ret < -1:
1573 if ret < -1:
1563 changedheads += ret + 1
1574 changedheads += ret + 1
1564 elif ret > 1:
1575 elif ret > 1:
1565 changedheads += ret - 1
1576 changedheads += ret - 1
1566 if changedheads > 0:
1577 if changedheads > 0:
1567 result = 1 + changedheads
1578 result = 1 + changedheads
1568 elif changedheads < 0:
1579 elif changedheads < 0:
1569 result = -1 + changedheads
1580 result = -1 + changedheads
1570 return result
1581 return result
1571
1582
1572 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest',
1583 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest',
1573 'targetphase'))
1584 'targetphase'))
1574 def handlechangegroup(op, inpart):
1585 def handlechangegroup(op, inpart):
1575 """apply a changegroup part on the repo
1586 """apply a changegroup part on the repo
1576
1587
1577 This is a very early implementation that will massive rework before being
1588 This is a very early implementation that will massive rework before being
1578 inflicted to any end-user.
1589 inflicted to any end-user.
1579 """
1590 """
1580 tr = op.gettransaction()
1591 tr = op.gettransaction()
1581 unpackerversion = inpart.params.get('version', '01')
1592 unpackerversion = inpart.params.get('version', '01')
1582 # We should raise an appropriate exception here
1593 # We should raise an appropriate exception here
1583 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1594 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1584 # the source and url passed here are overwritten by the one contained in
1595 # the source and url passed here are overwritten by the one contained in
1585 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1596 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1586 nbchangesets = None
1597 nbchangesets = None
1587 if 'nbchanges' in inpart.params:
1598 if 'nbchanges' in inpart.params:
1588 nbchangesets = int(inpart.params.get('nbchanges'))
1599 nbchangesets = int(inpart.params.get('nbchanges'))
1589 if ('treemanifest' in inpart.params and
1600 if ('treemanifest' in inpart.params and
1590 'treemanifest' not in op.repo.requirements):
1601 'treemanifest' not in op.repo.requirements):
1591 if len(op.repo.changelog) != 0:
1602 if len(op.repo.changelog) != 0:
1592 raise error.Abort(_(
1603 raise error.Abort(_(
1593 "bundle contains tree manifests, but local repo is "
1604 "bundle contains tree manifests, but local repo is "
1594 "non-empty and does not use tree manifests"))
1605 "non-empty and does not use tree manifests"))
1595 op.repo.requirements.add('treemanifest')
1606 op.repo.requirements.add('treemanifest')
1596 op.repo._applyopenerreqs()
1607 op.repo._applyopenerreqs()
1597 op.repo._writerequirements()
1608 op.repo._writerequirements()
1598 extrakwargs = {}
1609 extrakwargs = {}
1599 targetphase = inpart.params.get('targetphase')
1610 targetphase = inpart.params.get('targetphase')
1600 if targetphase is not None:
1611 if targetphase is not None:
1601 extrakwargs['targetphase'] = int(targetphase)
1612 extrakwargs['targetphase'] = int(targetphase)
1602 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2',
1613 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2',
1603 expectedtotal=nbchangesets, **extrakwargs)
1614 expectedtotal=nbchangesets, **extrakwargs)
1604 if op.reply is not None:
1615 if op.reply is not None:
1605 # This is definitely not the final form of this
1616 # This is definitely not the final form of this
1606 # return. But one need to start somewhere.
1617 # return. But one need to start somewhere.
1607 part = op.reply.newpart('reply:changegroup', mandatory=False)
1618 part = op.reply.newpart('reply:changegroup', mandatory=False)
1608 part.addparam(
1619 part.addparam(
1609 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1620 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1610 part.addparam('return', '%i' % ret, mandatory=False)
1621 part.addparam('return', '%i' % ret, mandatory=False)
1611 assert not inpart.read()
1622 assert not inpart.read()
1612
1623
1613 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1624 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1614 ['digest:%s' % k for k in util.DIGESTS.keys()])
1625 ['digest:%s' % k for k in util.DIGESTS.keys()])
1615 @parthandler('remote-changegroup', _remotechangegroupparams)
1626 @parthandler('remote-changegroup', _remotechangegroupparams)
1616 def handleremotechangegroup(op, inpart):
1627 def handleremotechangegroup(op, inpart):
1617 """apply a bundle10 on the repo, given an url and validation information
1628 """apply a bundle10 on the repo, given an url and validation information
1618
1629
1619 All the information about the remote bundle to import are given as
1630 All the information about the remote bundle to import are given as
1620 parameters. The parameters include:
1631 parameters. The parameters include:
1621 - url: the url to the bundle10.
1632 - url: the url to the bundle10.
1622 - size: the bundle10 file size. It is used to validate what was
1633 - size: the bundle10 file size. It is used to validate what was
1623 retrieved by the client matches the server knowledge about the bundle.
1634 retrieved by the client matches the server knowledge about the bundle.
1624 - digests: a space separated list of the digest types provided as
1635 - digests: a space separated list of the digest types provided as
1625 parameters.
1636 parameters.
1626 - digest:<digest-type>: the hexadecimal representation of the digest with
1637 - digest:<digest-type>: the hexadecimal representation of the digest with
1627 that name. Like the size, it is used to validate what was retrieved by
1638 that name. Like the size, it is used to validate what was retrieved by
1628 the client matches what the server knows about the bundle.
1639 the client matches what the server knows about the bundle.
1629
1640
1630 When multiple digest types are given, all of them are checked.
1641 When multiple digest types are given, all of them are checked.
1631 """
1642 """
1632 try:
1643 try:
1633 raw_url = inpart.params['url']
1644 raw_url = inpart.params['url']
1634 except KeyError:
1645 except KeyError:
1635 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1646 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1636 parsed_url = util.url(raw_url)
1647 parsed_url = util.url(raw_url)
1637 if parsed_url.scheme not in capabilities['remote-changegroup']:
1648 if parsed_url.scheme not in capabilities['remote-changegroup']:
1638 raise error.Abort(_('remote-changegroup does not support %s urls') %
1649 raise error.Abort(_('remote-changegroup does not support %s urls') %
1639 parsed_url.scheme)
1650 parsed_url.scheme)
1640
1651
1641 try:
1652 try:
1642 size = int(inpart.params['size'])
1653 size = int(inpart.params['size'])
1643 except ValueError:
1654 except ValueError:
1644 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1655 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1645 % 'size')
1656 % 'size')
1646 except KeyError:
1657 except KeyError:
1647 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1658 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1648
1659
1649 digests = {}
1660 digests = {}
1650 for typ in inpart.params.get('digests', '').split():
1661 for typ in inpart.params.get('digests', '').split():
1651 param = 'digest:%s' % typ
1662 param = 'digest:%s' % typ
1652 try:
1663 try:
1653 value = inpart.params[param]
1664 value = inpart.params[param]
1654 except KeyError:
1665 except KeyError:
1655 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1666 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1656 param)
1667 param)
1657 digests[typ] = value
1668 digests[typ] = value
1658
1669
1659 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1670 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1660
1671
1661 tr = op.gettransaction()
1672 tr = op.gettransaction()
1662 from . import exchange
1673 from . import exchange
1663 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1674 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1664 if not isinstance(cg, changegroup.cg1unpacker):
1675 if not isinstance(cg, changegroup.cg1unpacker):
1665 raise error.Abort(_('%s: not a bundle version 1.0') %
1676 raise error.Abort(_('%s: not a bundle version 1.0') %
1666 util.hidepassword(raw_url))
1677 util.hidepassword(raw_url))
1667 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2')
1678 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2')
1668 if op.reply is not None:
1679 if op.reply is not None:
1669 # This is definitely not the final form of this
1680 # This is definitely not the final form of this
1670 # return. But one need to start somewhere.
1681 # return. But one need to start somewhere.
1671 part = op.reply.newpart('reply:changegroup')
1682 part = op.reply.newpart('reply:changegroup')
1672 part.addparam(
1683 part.addparam(
1673 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1684 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1674 part.addparam('return', '%i' % ret, mandatory=False)
1685 part.addparam('return', '%i' % ret, mandatory=False)
1675 try:
1686 try:
1676 real_part.validate()
1687 real_part.validate()
1677 except error.Abort as e:
1688 except error.Abort as e:
1678 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1689 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1679 (util.hidepassword(raw_url), str(e)))
1690 (util.hidepassword(raw_url), str(e)))
1680 assert not inpart.read()
1691 assert not inpart.read()
1681
1692
1682 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1693 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1683 def handlereplychangegroup(op, inpart):
1694 def handlereplychangegroup(op, inpart):
1684 ret = int(inpart.params['return'])
1695 ret = int(inpart.params['return'])
1685 replyto = int(inpart.params['in-reply-to'])
1696 replyto = int(inpart.params['in-reply-to'])
1686 op.records.add('changegroup', {'return': ret}, replyto)
1697 op.records.add('changegroup', {'return': ret}, replyto)
1687
1698
1688 @parthandler('check:heads')
1699 @parthandler('check:heads')
1689 def handlecheckheads(op, inpart):
1700 def handlecheckheads(op, inpart):
1690 """check that head of the repo did not change
1701 """check that head of the repo did not change
1691
1702
1692 This is used to detect a push race when using unbundle.
1703 This is used to detect a push race when using unbundle.
1693 This replaces the "heads" argument of unbundle."""
1704 This replaces the "heads" argument of unbundle."""
1694 h = inpart.read(20)
1705 h = inpart.read(20)
1695 heads = []
1706 heads = []
1696 while len(h) == 20:
1707 while len(h) == 20:
1697 heads.append(h)
1708 heads.append(h)
1698 h = inpart.read(20)
1709 h = inpart.read(20)
1699 assert not h
1710 assert not h
1700 # Trigger a transaction so that we are guaranteed to have the lock now.
1711 # Trigger a transaction so that we are guaranteed to have the lock now.
1701 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1712 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1702 op.gettransaction()
1713 op.gettransaction()
1703 if sorted(heads) != sorted(op.repo.heads()):
1714 if sorted(heads) != sorted(op.repo.heads()):
1704 raise error.PushRaced('repository changed while pushing - '
1715 raise error.PushRaced('repository changed while pushing - '
1705 'please try again')
1716 'please try again')
1706
1717
1707 @parthandler('check:updated-heads')
1718 @parthandler('check:updated-heads')
1708 def handlecheckupdatedheads(op, inpart):
1719 def handlecheckupdatedheads(op, inpart):
1709 """check for race on the heads touched by a push
1720 """check for race on the heads touched by a push
1710
1721
1711 This is similar to 'check:heads' but focus on the heads actually updated
1722 This is similar to 'check:heads' but focus on the heads actually updated
1712 during the push. If other activities happen on unrelated heads, it is
1723 during the push. If other activities happen on unrelated heads, it is
1713 ignored.
1724 ignored.
1714
1725
1715 This allow server with high traffic to avoid push contention as long as
1726 This allow server with high traffic to avoid push contention as long as
1716 unrelated parts of the graph are involved."""
1727 unrelated parts of the graph are involved."""
1717 h = inpart.read(20)
1728 h = inpart.read(20)
1718 heads = []
1729 heads = []
1719 while len(h) == 20:
1730 while len(h) == 20:
1720 heads.append(h)
1731 heads.append(h)
1721 h = inpart.read(20)
1732 h = inpart.read(20)
1722 assert not h
1733 assert not h
1723 # trigger a transaction so that we are guaranteed to have the lock now.
1734 # trigger a transaction so that we are guaranteed to have the lock now.
1724 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1735 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1725 op.gettransaction()
1736 op.gettransaction()
1726
1737
1727 currentheads = set()
1738 currentheads = set()
1728 for ls in op.repo.branchmap().itervalues():
1739 for ls in op.repo.branchmap().itervalues():
1729 currentheads.update(ls)
1740 currentheads.update(ls)
1730
1741
1731 for h in heads:
1742 for h in heads:
1732 if h not in currentheads:
1743 if h not in currentheads:
1733 raise error.PushRaced('repository changed while pushing - '
1744 raise error.PushRaced('repository changed while pushing - '
1734 'please try again')
1745 'please try again')
1735
1746
1736 @parthandler('output')
1747 @parthandler('output')
1737 def handleoutput(op, inpart):
1748 def handleoutput(op, inpart):
1738 """forward output captured on the server to the client"""
1749 """forward output captured on the server to the client"""
1739 for line in inpart.read().splitlines():
1750 for line in inpart.read().splitlines():
1740 op.ui.status(_('remote: %s\n') % line)
1751 op.ui.status(_('remote: %s\n') % line)
1741
1752
1742 @parthandler('replycaps')
1753 @parthandler('replycaps')
1743 def handlereplycaps(op, inpart):
1754 def handlereplycaps(op, inpart):
1744 """Notify that a reply bundle should be created
1755 """Notify that a reply bundle should be created
1745
1756
1746 The payload contains the capabilities information for the reply"""
1757 The payload contains the capabilities information for the reply"""
1747 caps = decodecaps(inpart.read())
1758 caps = decodecaps(inpart.read())
1748 if op.reply is None:
1759 if op.reply is None:
1749 op.reply = bundle20(op.ui, caps)
1760 op.reply = bundle20(op.ui, caps)
1750
1761
1751 class AbortFromPart(error.Abort):
1762 class AbortFromPart(error.Abort):
1752 """Sub-class of Abort that denotes an error from a bundle2 part."""
1763 """Sub-class of Abort that denotes an error from a bundle2 part."""
1753
1764
1754 @parthandler('error:abort', ('message', 'hint'))
1765 @parthandler('error:abort', ('message', 'hint'))
1755 def handleerrorabort(op, inpart):
1766 def handleerrorabort(op, inpart):
1756 """Used to transmit abort error over the wire"""
1767 """Used to transmit abort error over the wire"""
1757 raise AbortFromPart(inpart.params['message'],
1768 raise AbortFromPart(inpart.params['message'],
1758 hint=inpart.params.get('hint'))
1769 hint=inpart.params.get('hint'))
1759
1770
1760 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1771 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1761 'in-reply-to'))
1772 'in-reply-to'))
1762 def handleerrorpushkey(op, inpart):
1773 def handleerrorpushkey(op, inpart):
1763 """Used to transmit failure of a mandatory pushkey over the wire"""
1774 """Used to transmit failure of a mandatory pushkey over the wire"""
1764 kwargs = {}
1775 kwargs = {}
1765 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1776 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1766 value = inpart.params.get(name)
1777 value = inpart.params.get(name)
1767 if value is not None:
1778 if value is not None:
1768 kwargs[name] = value
1779 kwargs[name] = value
1769 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1780 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1770
1781
1771 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1782 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1772 def handleerrorunsupportedcontent(op, inpart):
1783 def handleerrorunsupportedcontent(op, inpart):
1773 """Used to transmit unknown content error over the wire"""
1784 """Used to transmit unknown content error over the wire"""
1774 kwargs = {}
1785 kwargs = {}
1775 parttype = inpart.params.get('parttype')
1786 parttype = inpart.params.get('parttype')
1776 if parttype is not None:
1787 if parttype is not None:
1777 kwargs['parttype'] = parttype
1788 kwargs['parttype'] = parttype
1778 params = inpart.params.get('params')
1789 params = inpart.params.get('params')
1779 if params is not None:
1790 if params is not None:
1780 kwargs['params'] = params.split('\0')
1791 kwargs['params'] = params.split('\0')
1781
1792
1782 raise error.BundleUnknownFeatureError(**kwargs)
1793 raise error.BundleUnknownFeatureError(**kwargs)
1783
1794
1784 @parthandler('error:pushraced', ('message',))
1795 @parthandler('error:pushraced', ('message',))
1785 def handleerrorpushraced(op, inpart):
1796 def handleerrorpushraced(op, inpart):
1786 """Used to transmit push race error over the wire"""
1797 """Used to transmit push race error over the wire"""
1787 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1798 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1788
1799
1789 @parthandler('listkeys', ('namespace',))
1800 @parthandler('listkeys', ('namespace',))
1790 def handlelistkeys(op, inpart):
1801 def handlelistkeys(op, inpart):
1791 """retrieve pushkey namespace content stored in a bundle2"""
1802 """retrieve pushkey namespace content stored in a bundle2"""
1792 namespace = inpart.params['namespace']
1803 namespace = inpart.params['namespace']
1793 r = pushkey.decodekeys(inpart.read())
1804 r = pushkey.decodekeys(inpart.read())
1794 op.records.add('listkeys', (namespace, r))
1805 op.records.add('listkeys', (namespace, r))
1795
1806
1796 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1807 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1797 def handlepushkey(op, inpart):
1808 def handlepushkey(op, inpart):
1798 """process a pushkey request"""
1809 """process a pushkey request"""
1799 dec = pushkey.decode
1810 dec = pushkey.decode
1800 namespace = dec(inpart.params['namespace'])
1811 namespace = dec(inpart.params['namespace'])
1801 key = dec(inpart.params['key'])
1812 key = dec(inpart.params['key'])
1802 old = dec(inpart.params['old'])
1813 old = dec(inpart.params['old'])
1803 new = dec(inpart.params['new'])
1814 new = dec(inpart.params['new'])
1804 # Grab the transaction to ensure that we have the lock before performing the
1815 # Grab the transaction to ensure that we have the lock before performing the
1805 # pushkey.
1816 # pushkey.
1806 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1817 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1807 op.gettransaction()
1818 op.gettransaction()
1808 ret = op.repo.pushkey(namespace, key, old, new)
1819 ret = op.repo.pushkey(namespace, key, old, new)
1809 record = {'namespace': namespace,
1820 record = {'namespace': namespace,
1810 'key': key,
1821 'key': key,
1811 'old': old,
1822 'old': old,
1812 'new': new}
1823 'new': new}
1813 op.records.add('pushkey', record)
1824 op.records.add('pushkey', record)
1814 if op.reply is not None:
1825 if op.reply is not None:
1815 rpart = op.reply.newpart('reply:pushkey')
1826 rpart = op.reply.newpart('reply:pushkey')
1816 rpart.addparam(
1827 rpart.addparam(
1817 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1828 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1818 rpart.addparam('return', '%i' % ret, mandatory=False)
1829 rpart.addparam('return', '%i' % ret, mandatory=False)
1819 if inpart.mandatory and not ret:
1830 if inpart.mandatory and not ret:
1820 kwargs = {}
1831 kwargs = {}
1821 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1832 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1822 if key in inpart.params:
1833 if key in inpart.params:
1823 kwargs[key] = inpart.params[key]
1834 kwargs[key] = inpart.params[key]
1824 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1835 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1825
1836
1826 def _readphaseheads(inpart):
1837 def _readphaseheads(inpart):
1827 headsbyphase = [[] for i in phases.allphases]
1838 headsbyphase = [[] for i in phases.allphases]
1828 entrysize = struct.calcsize(_fphasesentry)
1839 entrysize = struct.calcsize(_fphasesentry)
1829 while True:
1840 while True:
1830 entry = inpart.read(entrysize)
1841 entry = inpart.read(entrysize)
1831 if len(entry) < entrysize:
1842 if len(entry) < entrysize:
1832 if entry:
1843 if entry:
1833 raise error.Abort(_('bad phase-heads bundle part'))
1844 raise error.Abort(_('bad phase-heads bundle part'))
1834 break
1845 break
1835 phase, node = struct.unpack(_fphasesentry, entry)
1846 phase, node = struct.unpack(_fphasesentry, entry)
1836 headsbyphase[phase].append(node)
1847 headsbyphase[phase].append(node)
1837 return headsbyphase
1848 return headsbyphase
1838
1849
1839 @parthandler('phase-heads')
1850 @parthandler('phase-heads')
1840 def handlephases(op, inpart):
1851 def handlephases(op, inpart):
1841 """apply phases from bundle part to repo"""
1852 """apply phases from bundle part to repo"""
1842 headsbyphase = _readphaseheads(inpart)
1853 headsbyphase = _readphaseheads(inpart)
1843 phases.updatephases(op.repo.unfiltered(), op.gettransaction(), headsbyphase)
1854 phases.updatephases(op.repo.unfiltered(), op.gettransaction(), headsbyphase)
1844 op.records.add('phase-heads', {})
1855 op.records.add('phase-heads', {})
1845
1856
1846 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1857 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1847 def handlepushkeyreply(op, inpart):
1858 def handlepushkeyreply(op, inpart):
1848 """retrieve the result of a pushkey request"""
1859 """retrieve the result of a pushkey request"""
1849 ret = int(inpart.params['return'])
1860 ret = int(inpart.params['return'])
1850 partid = int(inpart.params['in-reply-to'])
1861 partid = int(inpart.params['in-reply-to'])
1851 op.records.add('pushkey', {'return': ret}, partid)
1862 op.records.add('pushkey', {'return': ret}, partid)
1852
1863
1853 @parthandler('obsmarkers')
1864 @parthandler('obsmarkers')
1854 def handleobsmarker(op, inpart):
1865 def handleobsmarker(op, inpart):
1855 """add a stream of obsmarkers to the repo"""
1866 """add a stream of obsmarkers to the repo"""
1856 tr = op.gettransaction()
1867 tr = op.gettransaction()
1857 markerdata = inpart.read()
1868 markerdata = inpart.read()
1858 if op.ui.config('experimental', 'obsmarkers-exchange-debug'):
1869 if op.ui.config('experimental', 'obsmarkers-exchange-debug'):
1859 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1870 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1860 % len(markerdata))
1871 % len(markerdata))
1861 # The mergemarkers call will crash if marker creation is not enabled.
1872 # The mergemarkers call will crash if marker creation is not enabled.
1862 # we want to avoid this if the part is advisory.
1873 # we want to avoid this if the part is advisory.
1863 if not inpart.mandatory and op.repo.obsstore.readonly:
1874 if not inpart.mandatory and op.repo.obsstore.readonly:
1864 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1875 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1865 return
1876 return
1866 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1877 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1867 op.repo.invalidatevolatilesets()
1878 op.repo.invalidatevolatilesets()
1868 if new:
1879 if new:
1869 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1880 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1870 op.records.add('obsmarkers', {'new': new})
1881 op.records.add('obsmarkers', {'new': new})
1871 if op.reply is not None:
1882 if op.reply is not None:
1872 rpart = op.reply.newpart('reply:obsmarkers')
1883 rpart = op.reply.newpart('reply:obsmarkers')
1873 rpart.addparam(
1884 rpart.addparam(
1874 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1885 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1875 rpart.addparam('new', '%i' % new, mandatory=False)
1886 rpart.addparam('new', '%i' % new, mandatory=False)
1876
1887
1877
1888
1878 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1889 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1879 def handleobsmarkerreply(op, inpart):
1890 def handleobsmarkerreply(op, inpart):
1880 """retrieve the result of a pushkey request"""
1891 """retrieve the result of a pushkey request"""
1881 ret = int(inpart.params['new'])
1892 ret = int(inpart.params['new'])
1882 partid = int(inpart.params['in-reply-to'])
1893 partid = int(inpart.params['in-reply-to'])
1883 op.records.add('obsmarkers', {'new': ret}, partid)
1894 op.records.add('obsmarkers', {'new': ret}, partid)
1884
1895
1885 @parthandler('hgtagsfnodes')
1896 @parthandler('hgtagsfnodes')
1886 def handlehgtagsfnodes(op, inpart):
1897 def handlehgtagsfnodes(op, inpart):
1887 """Applies .hgtags fnodes cache entries to the local repo.
1898 """Applies .hgtags fnodes cache entries to the local repo.
1888
1899
1889 Payload is pairs of 20 byte changeset nodes and filenodes.
1900 Payload is pairs of 20 byte changeset nodes and filenodes.
1890 """
1901 """
1891 # Grab the transaction so we ensure that we have the lock at this point.
1902 # Grab the transaction so we ensure that we have the lock at this point.
1892 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1903 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1893 op.gettransaction()
1904 op.gettransaction()
1894 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1905 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1895
1906
1896 count = 0
1907 count = 0
1897 while True:
1908 while True:
1898 node = inpart.read(20)
1909 node = inpart.read(20)
1899 fnode = inpart.read(20)
1910 fnode = inpart.read(20)
1900 if len(node) < 20 or len(fnode) < 20:
1911 if len(node) < 20 or len(fnode) < 20:
1901 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1912 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1902 break
1913 break
1903 cache.setfnode(node, fnode)
1914 cache.setfnode(node, fnode)
1904 count += 1
1915 count += 1
1905
1916
1906 cache.write()
1917 cache.write()
1907 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
1918 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
1908
1919
1909 @parthandler('pushvars')
1920 @parthandler('pushvars')
1910 def bundle2getvars(op, part):
1921 def bundle2getvars(op, part):
1911 '''unbundle a bundle2 containing shellvars on the server'''
1922 '''unbundle a bundle2 containing shellvars on the server'''
1912 # An option to disable unbundling on server-side for security reasons
1923 # An option to disable unbundling on server-side for security reasons
1913 if op.ui.configbool('push', 'pushvars.server'):
1924 if op.ui.configbool('push', 'pushvars.server'):
1914 hookargs = {}
1925 hookargs = {}
1915 for key, value in part.advisoryparams:
1926 for key, value in part.advisoryparams:
1916 key = key.upper()
1927 key = key.upper()
1917 # We want pushed variables to have USERVAR_ prepended so we know
1928 # We want pushed variables to have USERVAR_ prepended so we know
1918 # they came from the --pushvar flag.
1929 # they came from the --pushvar flag.
1919 key = "USERVAR_" + key
1930 key = "USERVAR_" + key
1920 hookargs[key] = value
1931 hookargs[key] = value
1921 op.addhookargs(hookargs)
1932 op.addhookargs(hookargs)
General Comments 0
You need to be logged in to leave comments. Login now