##// END OF EJS Templates
bundle2: raise a more helpful error if building a bundle part header fails...
Augie Fackler -
r34246:ab379eed default
parent child Browse files
Show More
@@ -1,1917 +1,1921 b''
1 # bundle2.py - generic container format to transmit arbitrary data.
1 # bundle2.py - generic container format to transmit arbitrary data.
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7 """Handling of the new bundle2 format
7 """Handling of the new bundle2 format
8
8
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 payloads in an application agnostic way. It consist in a sequence of "parts"
10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 that will be handed to and processed by the application layer.
11 that will be handed to and processed by the application layer.
12
12
13
13
14 General format architecture
14 General format architecture
15 ===========================
15 ===========================
16
16
17 The format is architectured as follow
17 The format is architectured as follow
18
18
19 - magic string
19 - magic string
20 - stream level parameters
20 - stream level parameters
21 - payload parts (any number)
21 - payload parts (any number)
22 - end of stream marker.
22 - end of stream marker.
23
23
24 the Binary format
24 the Binary format
25 ============================
25 ============================
26
26
27 All numbers are unsigned and big-endian.
27 All numbers are unsigned and big-endian.
28
28
29 stream level parameters
29 stream level parameters
30 ------------------------
30 ------------------------
31
31
32 Binary format is as follow
32 Binary format is as follow
33
33
34 :params size: int32
34 :params size: int32
35
35
36 The total number of Bytes used by the parameters
36 The total number of Bytes used by the parameters
37
37
38 :params value: arbitrary number of Bytes
38 :params value: arbitrary number of Bytes
39
39
40 A blob of `params size` containing the serialized version of all stream level
40 A blob of `params size` containing the serialized version of all stream level
41 parameters.
41 parameters.
42
42
43 The blob contains a space separated list of parameters. Parameters with value
43 The blob contains a space separated list of parameters. Parameters with value
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45
45
46 Empty name are obviously forbidden.
46 Empty name are obviously forbidden.
47
47
48 Name MUST start with a letter. If this first letter is lower case, the
48 Name MUST start with a letter. If this first letter is lower case, the
49 parameter is advisory and can be safely ignored. However when the first
49 parameter is advisory and can be safely ignored. However when the first
50 letter is capital, the parameter is mandatory and the bundling process MUST
50 letter is capital, the parameter is mandatory and the bundling process MUST
51 stop if he is not able to proceed it.
51 stop if he is not able to proceed it.
52
52
53 Stream parameters use a simple textual format for two main reasons:
53 Stream parameters use a simple textual format for two main reasons:
54
54
55 - Stream level parameters should remain simple and we want to discourage any
55 - Stream level parameters should remain simple and we want to discourage any
56 crazy usage.
56 crazy usage.
57 - Textual data allow easy human inspection of a bundle2 header in case of
57 - Textual data allow easy human inspection of a bundle2 header in case of
58 troubles.
58 troubles.
59
59
60 Any Applicative level options MUST go into a bundle2 part instead.
60 Any Applicative level options MUST go into a bundle2 part instead.
61
61
62 Payload part
62 Payload part
63 ------------------------
63 ------------------------
64
64
65 Binary format is as follow
65 Binary format is as follow
66
66
67 :header size: int32
67 :header size: int32
68
68
69 The total number of Bytes used by the part header. When the header is empty
69 The total number of Bytes used by the part header. When the header is empty
70 (size = 0) this is interpreted as the end of stream marker.
70 (size = 0) this is interpreted as the end of stream marker.
71
71
72 :header:
72 :header:
73
73
74 The header defines how to interpret the part. It contains two piece of
74 The header defines how to interpret the part. It contains two piece of
75 data: the part type, and the part parameters.
75 data: the part type, and the part parameters.
76
76
77 The part type is used to route an application level handler, that can
77 The part type is used to route an application level handler, that can
78 interpret payload.
78 interpret payload.
79
79
80 Part parameters are passed to the application level handler. They are
80 Part parameters are passed to the application level handler. They are
81 meant to convey information that will help the application level object to
81 meant to convey information that will help the application level object to
82 interpret the part payload.
82 interpret the part payload.
83
83
84 The binary format of the header is has follow
84 The binary format of the header is has follow
85
85
86 :typesize: (one byte)
86 :typesize: (one byte)
87
87
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89
89
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 to this part.
91 to this part.
92
92
93 :parameters:
93 :parameters:
94
94
95 Part's parameter may have arbitrary content, the binary structure is::
95 Part's parameter may have arbitrary content, the binary structure is::
96
96
97 <mandatory-count><advisory-count><param-sizes><param-data>
97 <mandatory-count><advisory-count><param-sizes><param-data>
98
98
99 :mandatory-count: 1 byte, number of mandatory parameters
99 :mandatory-count: 1 byte, number of mandatory parameters
100
100
101 :advisory-count: 1 byte, number of advisory parameters
101 :advisory-count: 1 byte, number of advisory parameters
102
102
103 :param-sizes:
103 :param-sizes:
104
104
105 N couple of bytes, where N is the total number of parameters. Each
105 N couple of bytes, where N is the total number of parameters. Each
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107
107
108 :param-data:
108 :param-data:
109
109
110 A blob of bytes from which each parameter key and value can be
110 A blob of bytes from which each parameter key and value can be
111 retrieved using the list of size couples stored in the previous
111 retrieved using the list of size couples stored in the previous
112 field.
112 field.
113
113
114 Mandatory parameters comes first, then the advisory ones.
114 Mandatory parameters comes first, then the advisory ones.
115
115
116 Each parameter's key MUST be unique within the part.
116 Each parameter's key MUST be unique within the part.
117
117
118 :payload:
118 :payload:
119
119
120 payload is a series of `<chunksize><chunkdata>`.
120 payload is a series of `<chunksize><chunkdata>`.
121
121
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124
124
125 The current implementation always produces either zero or one chunk.
125 The current implementation always produces either zero or one chunk.
126 This is an implementation limitation that will ultimately be lifted.
126 This is an implementation limitation that will ultimately be lifted.
127
127
128 `chunksize` can be negative to trigger special case processing. No such
128 `chunksize` can be negative to trigger special case processing. No such
129 processing is in place yet.
129 processing is in place yet.
130
130
131 Bundle processing
131 Bundle processing
132 ============================
132 ============================
133
133
134 Each part is processed in order using a "part handler". Handler are registered
134 Each part is processed in order using a "part handler". Handler are registered
135 for a certain part type.
135 for a certain part type.
136
136
137 The matching of a part to its handler is case insensitive. The case of the
137 The matching of a part to its handler is case insensitive. The case of the
138 part type is used to know if a part is mandatory or advisory. If the Part type
138 part type is used to know if a part is mandatory or advisory. If the Part type
139 contains any uppercase char it is considered mandatory. When no handler is
139 contains any uppercase char it is considered mandatory. When no handler is
140 known for a Mandatory part, the process is aborted and an exception is raised.
140 known for a Mandatory part, the process is aborted and an exception is raised.
141 If the part is advisory and no handler is known, the part is ignored. When the
141 If the part is advisory and no handler is known, the part is ignored. When the
142 process is aborted, the full bundle is still read from the stream to keep the
142 process is aborted, the full bundle is still read from the stream to keep the
143 channel usable. But none of the part read from an abort are processed. In the
143 channel usable. But none of the part read from an abort are processed. In the
144 future, dropping the stream may become an option for channel we do not care to
144 future, dropping the stream may become an option for channel we do not care to
145 preserve.
145 preserve.
146 """
146 """
147
147
148 from __future__ import absolute_import, division
148 from __future__ import absolute_import, division
149
149
150 import errno
150 import errno
151 import re
151 import re
152 import string
152 import string
153 import struct
153 import struct
154 import sys
154 import sys
155
155
156 from .i18n import _
156 from .i18n import _
157 from . import (
157 from . import (
158 changegroup,
158 changegroup,
159 error,
159 error,
160 obsolete,
160 obsolete,
161 phases,
161 phases,
162 pushkey,
162 pushkey,
163 pycompat,
163 pycompat,
164 tags,
164 tags,
165 url,
165 url,
166 util,
166 util,
167 )
167 )
168
168
169 urlerr = util.urlerr
169 urlerr = util.urlerr
170 urlreq = util.urlreq
170 urlreq = util.urlreq
171
171
172 _pack = struct.pack
172 _pack = struct.pack
173 _unpack = struct.unpack
173 _unpack = struct.unpack
174
174
175 _fstreamparamsize = '>i'
175 _fstreamparamsize = '>i'
176 _fpartheadersize = '>i'
176 _fpartheadersize = '>i'
177 _fparttypesize = '>B'
177 _fparttypesize = '>B'
178 _fpartid = '>I'
178 _fpartid = '>I'
179 _fpayloadsize = '>i'
179 _fpayloadsize = '>i'
180 _fpartparamcount = '>BB'
180 _fpartparamcount = '>BB'
181
181
182 _fphasesentry = '>i20s'
182 _fphasesentry = '>i20s'
183
183
184 preferedchunksize = 4096
184 preferedchunksize = 4096
185
185
186 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
186 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
187
187
188 def outdebug(ui, message):
188 def outdebug(ui, message):
189 """debug regarding output stream (bundling)"""
189 """debug regarding output stream (bundling)"""
190 if ui.configbool('devel', 'bundle2.debug'):
190 if ui.configbool('devel', 'bundle2.debug'):
191 ui.debug('bundle2-output: %s\n' % message)
191 ui.debug('bundle2-output: %s\n' % message)
192
192
193 def indebug(ui, message):
193 def indebug(ui, message):
194 """debug on input stream (unbundling)"""
194 """debug on input stream (unbundling)"""
195 if ui.configbool('devel', 'bundle2.debug'):
195 if ui.configbool('devel', 'bundle2.debug'):
196 ui.debug('bundle2-input: %s\n' % message)
196 ui.debug('bundle2-input: %s\n' % message)
197
197
198 def validateparttype(parttype):
198 def validateparttype(parttype):
199 """raise ValueError if a parttype contains invalid character"""
199 """raise ValueError if a parttype contains invalid character"""
200 if _parttypeforbidden.search(parttype):
200 if _parttypeforbidden.search(parttype):
201 raise ValueError(parttype)
201 raise ValueError(parttype)
202
202
203 def _makefpartparamsizes(nbparams):
203 def _makefpartparamsizes(nbparams):
204 """return a struct format to read part parameter sizes
204 """return a struct format to read part parameter sizes
205
205
206 The number parameters is variable so we need to build that format
206 The number parameters is variable so we need to build that format
207 dynamically.
207 dynamically.
208 """
208 """
209 return '>'+('BB'*nbparams)
209 return '>'+('BB'*nbparams)
210
210
211 parthandlermapping = {}
211 parthandlermapping = {}
212
212
213 def parthandler(parttype, params=()):
213 def parthandler(parttype, params=()):
214 """decorator that register a function as a bundle2 part handler
214 """decorator that register a function as a bundle2 part handler
215
215
216 eg::
216 eg::
217
217
218 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
218 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
219 def myparttypehandler(...):
219 def myparttypehandler(...):
220 '''process a part of type "my part".'''
220 '''process a part of type "my part".'''
221 ...
221 ...
222 """
222 """
223 validateparttype(parttype)
223 validateparttype(parttype)
224 def _decorator(func):
224 def _decorator(func):
225 lparttype = parttype.lower() # enforce lower case matching.
225 lparttype = parttype.lower() # enforce lower case matching.
226 assert lparttype not in parthandlermapping
226 assert lparttype not in parthandlermapping
227 parthandlermapping[lparttype] = func
227 parthandlermapping[lparttype] = func
228 func.params = frozenset(params)
228 func.params = frozenset(params)
229 return func
229 return func
230 return _decorator
230 return _decorator
231
231
232 class unbundlerecords(object):
232 class unbundlerecords(object):
233 """keep record of what happens during and unbundle
233 """keep record of what happens during and unbundle
234
234
235 New records are added using `records.add('cat', obj)`. Where 'cat' is a
235 New records are added using `records.add('cat', obj)`. Where 'cat' is a
236 category of record and obj is an arbitrary object.
236 category of record and obj is an arbitrary object.
237
237
238 `records['cat']` will return all entries of this category 'cat'.
238 `records['cat']` will return all entries of this category 'cat'.
239
239
240 Iterating on the object itself will yield `('category', obj)` tuples
240 Iterating on the object itself will yield `('category', obj)` tuples
241 for all entries.
241 for all entries.
242
242
243 All iterations happens in chronological order.
243 All iterations happens in chronological order.
244 """
244 """
245
245
246 def __init__(self):
246 def __init__(self):
247 self._categories = {}
247 self._categories = {}
248 self._sequences = []
248 self._sequences = []
249 self._replies = {}
249 self._replies = {}
250
250
251 def add(self, category, entry, inreplyto=None):
251 def add(self, category, entry, inreplyto=None):
252 """add a new record of a given category.
252 """add a new record of a given category.
253
253
254 The entry can then be retrieved in the list returned by
254 The entry can then be retrieved in the list returned by
255 self['category']."""
255 self['category']."""
256 self._categories.setdefault(category, []).append(entry)
256 self._categories.setdefault(category, []).append(entry)
257 self._sequences.append((category, entry))
257 self._sequences.append((category, entry))
258 if inreplyto is not None:
258 if inreplyto is not None:
259 self.getreplies(inreplyto).add(category, entry)
259 self.getreplies(inreplyto).add(category, entry)
260
260
261 def getreplies(self, partid):
261 def getreplies(self, partid):
262 """get the records that are replies to a specific part"""
262 """get the records that are replies to a specific part"""
263 return self._replies.setdefault(partid, unbundlerecords())
263 return self._replies.setdefault(partid, unbundlerecords())
264
264
265 def __getitem__(self, cat):
265 def __getitem__(self, cat):
266 return tuple(self._categories.get(cat, ()))
266 return tuple(self._categories.get(cat, ()))
267
267
268 def __iter__(self):
268 def __iter__(self):
269 return iter(self._sequences)
269 return iter(self._sequences)
270
270
271 def __len__(self):
271 def __len__(self):
272 return len(self._sequences)
272 return len(self._sequences)
273
273
274 def __nonzero__(self):
274 def __nonzero__(self):
275 return bool(self._sequences)
275 return bool(self._sequences)
276
276
277 __bool__ = __nonzero__
277 __bool__ = __nonzero__
278
278
279 class bundleoperation(object):
279 class bundleoperation(object):
280 """an object that represents a single bundling process
280 """an object that represents a single bundling process
281
281
282 Its purpose is to carry unbundle-related objects and states.
282 Its purpose is to carry unbundle-related objects and states.
283
283
284 A new object should be created at the beginning of each bundle processing.
284 A new object should be created at the beginning of each bundle processing.
285 The object is to be returned by the processing function.
285 The object is to be returned by the processing function.
286
286
287 The object has very little content now it will ultimately contain:
287 The object has very little content now it will ultimately contain:
288 * an access to the repo the bundle is applied to,
288 * an access to the repo the bundle is applied to,
289 * a ui object,
289 * a ui object,
290 * a way to retrieve a transaction to add changes to the repo,
290 * a way to retrieve a transaction to add changes to the repo,
291 * a way to record the result of processing each part,
291 * a way to record the result of processing each part,
292 * a way to construct a bundle response when applicable.
292 * a way to construct a bundle response when applicable.
293 """
293 """
294
294
295 def __init__(self, repo, transactiongetter, captureoutput=True):
295 def __init__(self, repo, transactiongetter, captureoutput=True):
296 self.repo = repo
296 self.repo = repo
297 self.ui = repo.ui
297 self.ui = repo.ui
298 self.records = unbundlerecords()
298 self.records = unbundlerecords()
299 self.reply = None
299 self.reply = None
300 self.captureoutput = captureoutput
300 self.captureoutput = captureoutput
301 self.hookargs = {}
301 self.hookargs = {}
302 self._gettransaction = transactiongetter
302 self._gettransaction = transactiongetter
303
303
304 def gettransaction(self):
304 def gettransaction(self):
305 transaction = self._gettransaction()
305 transaction = self._gettransaction()
306
306
307 if self.hookargs:
307 if self.hookargs:
308 # the ones added to the transaction supercede those added
308 # the ones added to the transaction supercede those added
309 # to the operation.
309 # to the operation.
310 self.hookargs.update(transaction.hookargs)
310 self.hookargs.update(transaction.hookargs)
311 transaction.hookargs = self.hookargs
311 transaction.hookargs = self.hookargs
312
312
313 # mark the hookargs as flushed. further attempts to add to
313 # mark the hookargs as flushed. further attempts to add to
314 # hookargs will result in an abort.
314 # hookargs will result in an abort.
315 self.hookargs = None
315 self.hookargs = None
316
316
317 return transaction
317 return transaction
318
318
319 def addhookargs(self, hookargs):
319 def addhookargs(self, hookargs):
320 if self.hookargs is None:
320 if self.hookargs is None:
321 raise error.ProgrammingError('attempted to add hookargs to '
321 raise error.ProgrammingError('attempted to add hookargs to '
322 'operation after transaction started')
322 'operation after transaction started')
323 self.hookargs.update(hookargs)
323 self.hookargs.update(hookargs)
324
324
325 class TransactionUnavailable(RuntimeError):
325 class TransactionUnavailable(RuntimeError):
326 pass
326 pass
327
327
328 def _notransaction():
328 def _notransaction():
329 """default method to get a transaction while processing a bundle
329 """default method to get a transaction while processing a bundle
330
330
331 Raise an exception to highlight the fact that no transaction was expected
331 Raise an exception to highlight the fact that no transaction was expected
332 to be created"""
332 to be created"""
333 raise TransactionUnavailable()
333 raise TransactionUnavailable()
334
334
335 def applybundle(repo, unbundler, tr, source=None, url=None, **kwargs):
335 def applybundle(repo, unbundler, tr, source=None, url=None, **kwargs):
336 # transform me into unbundler.apply() as soon as the freeze is lifted
336 # transform me into unbundler.apply() as soon as the freeze is lifted
337 if isinstance(unbundler, unbundle20):
337 if isinstance(unbundler, unbundle20):
338 tr.hookargs['bundle2'] = '1'
338 tr.hookargs['bundle2'] = '1'
339 if source is not None and 'source' not in tr.hookargs:
339 if source is not None and 'source' not in tr.hookargs:
340 tr.hookargs['source'] = source
340 tr.hookargs['source'] = source
341 if url is not None and 'url' not in tr.hookargs:
341 if url is not None and 'url' not in tr.hookargs:
342 tr.hookargs['url'] = url
342 tr.hookargs['url'] = url
343 return processbundle(repo, unbundler, lambda: tr)
343 return processbundle(repo, unbundler, lambda: tr)
344 else:
344 else:
345 # the transactiongetter won't be used, but we might as well set it
345 # the transactiongetter won't be used, but we might as well set it
346 op = bundleoperation(repo, lambda: tr)
346 op = bundleoperation(repo, lambda: tr)
347 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
347 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
348 return op
348 return op
349
349
350 class partiterator(object):
350 class partiterator(object):
351 def __init__(self, repo, op, unbundler):
351 def __init__(self, repo, op, unbundler):
352 self.repo = repo
352 self.repo = repo
353 self.op = op
353 self.op = op
354 self.unbundler = unbundler
354 self.unbundler = unbundler
355 self.iterator = None
355 self.iterator = None
356 self.count = 0
356 self.count = 0
357
357
358 def __enter__(self):
358 def __enter__(self):
359 def func():
359 def func():
360 itr = enumerate(self.unbundler.iterparts())
360 itr = enumerate(self.unbundler.iterparts())
361 for count, p in itr:
361 for count, p in itr:
362 self.count = count
362 self.count = count
363 yield p
363 yield p
364 self.iterator = func()
364 self.iterator = func()
365 return self.iterator
365 return self.iterator
366
366
367 def __exit__(self, type, exc, tb):
367 def __exit__(self, type, exc, tb):
368 if not self.iterator:
368 if not self.iterator:
369 return
369 return
370
370
371 if exc:
371 if exc:
372 # Any exceptions seeking to the end of the bundle at this point are
372 # Any exceptions seeking to the end of the bundle at this point are
373 # almost certainly related to the underlying stream being bad.
373 # almost certainly related to the underlying stream being bad.
374 # And, chances are that the exception we're handling is related to
374 # And, chances are that the exception we're handling is related to
375 # getting in that bad state. So, we swallow the seeking error and
375 # getting in that bad state. So, we swallow the seeking error and
376 # re-raise the original error.
376 # re-raise the original error.
377 seekerror = False
377 seekerror = False
378 try:
378 try:
379 for part in self.iterator:
379 for part in self.iterator:
380 # consume the bundle content
380 # consume the bundle content
381 part.seek(0, 2)
381 part.seek(0, 2)
382 except Exception:
382 except Exception:
383 seekerror = True
383 seekerror = True
384
384
385 # Small hack to let caller code distinguish exceptions from bundle2
385 # Small hack to let caller code distinguish exceptions from bundle2
386 # processing from processing the old format. This is mostly needed
386 # processing from processing the old format. This is mostly needed
387 # to handle different return codes to unbundle according to the type
387 # to handle different return codes to unbundle according to the type
388 # of bundle. We should probably clean up or drop this return code
388 # of bundle. We should probably clean up or drop this return code
389 # craziness in a future version.
389 # craziness in a future version.
390 exc.duringunbundle2 = True
390 exc.duringunbundle2 = True
391 salvaged = []
391 salvaged = []
392 replycaps = None
392 replycaps = None
393 if self.op.reply is not None:
393 if self.op.reply is not None:
394 salvaged = self.op.reply.salvageoutput()
394 salvaged = self.op.reply.salvageoutput()
395 replycaps = self.op.reply.capabilities
395 replycaps = self.op.reply.capabilities
396 exc._replycaps = replycaps
396 exc._replycaps = replycaps
397 exc._bundle2salvagedoutput = salvaged
397 exc._bundle2salvagedoutput = salvaged
398
398
399 # Re-raising from a variable loses the original stack. So only use
399 # Re-raising from a variable loses the original stack. So only use
400 # that form if we need to.
400 # that form if we need to.
401 if seekerror:
401 if seekerror:
402 raise exc
402 raise exc
403
403
404 self.repo.ui.debug('bundle2-input-bundle: %i parts total\n' %
404 self.repo.ui.debug('bundle2-input-bundle: %i parts total\n' %
405 self.count)
405 self.count)
406
406
407 def processbundle(repo, unbundler, transactiongetter=None, op=None):
407 def processbundle(repo, unbundler, transactiongetter=None, op=None):
408 """This function process a bundle, apply effect to/from a repo
408 """This function process a bundle, apply effect to/from a repo
409
409
410 It iterates over each part then searches for and uses the proper handling
410 It iterates over each part then searches for and uses the proper handling
411 code to process the part. Parts are processed in order.
411 code to process the part. Parts are processed in order.
412
412
413 Unknown Mandatory part will abort the process.
413 Unknown Mandatory part will abort the process.
414
414
415 It is temporarily possible to provide a prebuilt bundleoperation to the
415 It is temporarily possible to provide a prebuilt bundleoperation to the
416 function. This is used to ensure output is properly propagated in case of
416 function. This is used to ensure output is properly propagated in case of
417 an error during the unbundling. This output capturing part will likely be
417 an error during the unbundling. This output capturing part will likely be
418 reworked and this ability will probably go away in the process.
418 reworked and this ability will probably go away in the process.
419 """
419 """
420 if op is None:
420 if op is None:
421 if transactiongetter is None:
421 if transactiongetter is None:
422 transactiongetter = _notransaction
422 transactiongetter = _notransaction
423 op = bundleoperation(repo, transactiongetter)
423 op = bundleoperation(repo, transactiongetter)
424 # todo:
424 # todo:
425 # - replace this is a init function soon.
425 # - replace this is a init function soon.
426 # - exception catching
426 # - exception catching
427 unbundler.params
427 unbundler.params
428 if repo.ui.debugflag:
428 if repo.ui.debugflag:
429 msg = ['bundle2-input-bundle:']
429 msg = ['bundle2-input-bundle:']
430 if unbundler.params:
430 if unbundler.params:
431 msg.append(' %i params' % len(unbundler.params))
431 msg.append(' %i params' % len(unbundler.params))
432 if op._gettransaction is None or op._gettransaction is _notransaction:
432 if op._gettransaction is None or op._gettransaction is _notransaction:
433 msg.append(' no-transaction')
433 msg.append(' no-transaction')
434 else:
434 else:
435 msg.append(' with-transaction')
435 msg.append(' with-transaction')
436 msg.append('\n')
436 msg.append('\n')
437 repo.ui.debug(''.join(msg))
437 repo.ui.debug(''.join(msg))
438
438
439 with partiterator(repo, op, unbundler) as parts:
439 with partiterator(repo, op, unbundler) as parts:
440 for part in parts:
440 for part in parts:
441 _processpart(op, part)
441 _processpart(op, part)
442
442
443 return op
443 return op
444
444
445 def _processchangegroup(op, cg, tr, source, url, **kwargs):
445 def _processchangegroup(op, cg, tr, source, url, **kwargs):
446 ret = cg.apply(op.repo, tr, source, url, **kwargs)
446 ret = cg.apply(op.repo, tr, source, url, **kwargs)
447 op.records.add('changegroup', {
447 op.records.add('changegroup', {
448 'return': ret,
448 'return': ret,
449 })
449 })
450 return ret
450 return ret
451
451
452 def _processpart(op, part):
452 def _processpart(op, part):
453 """process a single part from a bundle
453 """process a single part from a bundle
454
454
455 The part is guaranteed to have been fully consumed when the function exits
455 The part is guaranteed to have been fully consumed when the function exits
456 (even if an exception is raised)."""
456 (even if an exception is raised)."""
457 status = 'unknown' # used by debug output
457 status = 'unknown' # used by debug output
458 hardabort = False
458 hardabort = False
459 try:
459 try:
460 try:
460 try:
461 handler = parthandlermapping.get(part.type)
461 handler = parthandlermapping.get(part.type)
462 if handler is None:
462 if handler is None:
463 status = 'unsupported-type'
463 status = 'unsupported-type'
464 raise error.BundleUnknownFeatureError(parttype=part.type)
464 raise error.BundleUnknownFeatureError(parttype=part.type)
465 indebug(op.ui, 'found a handler for part %r' % part.type)
465 indebug(op.ui, 'found a handler for part %r' % part.type)
466 unknownparams = part.mandatorykeys - handler.params
466 unknownparams = part.mandatorykeys - handler.params
467 if unknownparams:
467 if unknownparams:
468 unknownparams = list(unknownparams)
468 unknownparams = list(unknownparams)
469 unknownparams.sort()
469 unknownparams.sort()
470 status = 'unsupported-params (%s)' % unknownparams
470 status = 'unsupported-params (%s)' % unknownparams
471 raise error.BundleUnknownFeatureError(parttype=part.type,
471 raise error.BundleUnknownFeatureError(parttype=part.type,
472 params=unknownparams)
472 params=unknownparams)
473 status = 'supported'
473 status = 'supported'
474 except error.BundleUnknownFeatureError as exc:
474 except error.BundleUnknownFeatureError as exc:
475 if part.mandatory: # mandatory parts
475 if part.mandatory: # mandatory parts
476 raise
476 raise
477 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
477 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
478 return # skip to part processing
478 return # skip to part processing
479 finally:
479 finally:
480 if op.ui.debugflag:
480 if op.ui.debugflag:
481 msg = ['bundle2-input-part: "%s"' % part.type]
481 msg = ['bundle2-input-part: "%s"' % part.type]
482 if not part.mandatory:
482 if not part.mandatory:
483 msg.append(' (advisory)')
483 msg.append(' (advisory)')
484 nbmp = len(part.mandatorykeys)
484 nbmp = len(part.mandatorykeys)
485 nbap = len(part.params) - nbmp
485 nbap = len(part.params) - nbmp
486 if nbmp or nbap:
486 if nbmp or nbap:
487 msg.append(' (params:')
487 msg.append(' (params:')
488 if nbmp:
488 if nbmp:
489 msg.append(' %i mandatory' % nbmp)
489 msg.append(' %i mandatory' % nbmp)
490 if nbap:
490 if nbap:
491 msg.append(' %i advisory' % nbmp)
491 msg.append(' %i advisory' % nbmp)
492 msg.append(')')
492 msg.append(')')
493 msg.append(' %s\n' % status)
493 msg.append(' %s\n' % status)
494 op.ui.debug(''.join(msg))
494 op.ui.debug(''.join(msg))
495
495
496 # handler is called outside the above try block so that we don't
496 # handler is called outside the above try block so that we don't
497 # risk catching KeyErrors from anything other than the
497 # risk catching KeyErrors from anything other than the
498 # parthandlermapping lookup (any KeyError raised by handler()
498 # parthandlermapping lookup (any KeyError raised by handler()
499 # itself represents a defect of a different variety).
499 # itself represents a defect of a different variety).
500 output = None
500 output = None
501 if op.captureoutput and op.reply is not None:
501 if op.captureoutput and op.reply is not None:
502 op.ui.pushbuffer(error=True, subproc=True)
502 op.ui.pushbuffer(error=True, subproc=True)
503 output = ''
503 output = ''
504 try:
504 try:
505 handler(op, part)
505 handler(op, part)
506 finally:
506 finally:
507 if output is not None:
507 if output is not None:
508 output = op.ui.popbuffer()
508 output = op.ui.popbuffer()
509 if output:
509 if output:
510 outpart = op.reply.newpart('output', data=output,
510 outpart = op.reply.newpart('output', data=output,
511 mandatory=False)
511 mandatory=False)
512 outpart.addparam(
512 outpart.addparam(
513 'in-reply-to', pycompat.bytestr(part.id), mandatory=False)
513 'in-reply-to', pycompat.bytestr(part.id), mandatory=False)
514 # If exiting or interrupted, do not attempt to seek the stream in the
514 # If exiting or interrupted, do not attempt to seek the stream in the
515 # finally block below. This makes abort faster.
515 # finally block below. This makes abort faster.
516 except (SystemExit, KeyboardInterrupt):
516 except (SystemExit, KeyboardInterrupt):
517 hardabort = True
517 hardabort = True
518 raise
518 raise
519 finally:
519 finally:
520 # consume the part content to not corrupt the stream.
520 # consume the part content to not corrupt the stream.
521 if not hardabort:
521 if not hardabort:
522 part.seek(0, 2)
522 part.seek(0, 2)
523
523
524
524
525 def decodecaps(blob):
525 def decodecaps(blob):
526 """decode a bundle2 caps bytes blob into a dictionary
526 """decode a bundle2 caps bytes blob into a dictionary
527
527
528 The blob is a list of capabilities (one per line)
528 The blob is a list of capabilities (one per line)
529 Capabilities may have values using a line of the form::
529 Capabilities may have values using a line of the form::
530
530
531 capability=value1,value2,value3
531 capability=value1,value2,value3
532
532
533 The values are always a list."""
533 The values are always a list."""
534 caps = {}
534 caps = {}
535 for line in blob.splitlines():
535 for line in blob.splitlines():
536 if not line:
536 if not line:
537 continue
537 continue
538 if '=' not in line:
538 if '=' not in line:
539 key, vals = line, ()
539 key, vals = line, ()
540 else:
540 else:
541 key, vals = line.split('=', 1)
541 key, vals = line.split('=', 1)
542 vals = vals.split(',')
542 vals = vals.split(',')
543 key = urlreq.unquote(key)
543 key = urlreq.unquote(key)
544 vals = [urlreq.unquote(v) for v in vals]
544 vals = [urlreq.unquote(v) for v in vals]
545 caps[key] = vals
545 caps[key] = vals
546 return caps
546 return caps
547
547
548 def encodecaps(caps):
548 def encodecaps(caps):
549 """encode a bundle2 caps dictionary into a bytes blob"""
549 """encode a bundle2 caps dictionary into a bytes blob"""
550 chunks = []
550 chunks = []
551 for ca in sorted(caps):
551 for ca in sorted(caps):
552 vals = caps[ca]
552 vals = caps[ca]
553 ca = urlreq.quote(ca)
553 ca = urlreq.quote(ca)
554 vals = [urlreq.quote(v) for v in vals]
554 vals = [urlreq.quote(v) for v in vals]
555 if vals:
555 if vals:
556 ca = "%s=%s" % (ca, ','.join(vals))
556 ca = "%s=%s" % (ca, ','.join(vals))
557 chunks.append(ca)
557 chunks.append(ca)
558 return '\n'.join(chunks)
558 return '\n'.join(chunks)
559
559
560 bundletypes = {
560 bundletypes = {
561 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
561 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
562 # since the unification ssh accepts a header but there
562 # since the unification ssh accepts a header but there
563 # is no capability signaling it.
563 # is no capability signaling it.
564 "HG20": (), # special-cased below
564 "HG20": (), # special-cased below
565 "HG10UN": ("HG10UN", 'UN'),
565 "HG10UN": ("HG10UN", 'UN'),
566 "HG10BZ": ("HG10", 'BZ'),
566 "HG10BZ": ("HG10", 'BZ'),
567 "HG10GZ": ("HG10GZ", 'GZ'),
567 "HG10GZ": ("HG10GZ", 'GZ'),
568 }
568 }
569
569
570 # hgweb uses this list to communicate its preferred type
570 # hgweb uses this list to communicate its preferred type
571 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
571 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
572
572
573 class bundle20(object):
573 class bundle20(object):
574 """represent an outgoing bundle2 container
574 """represent an outgoing bundle2 container
575
575
576 Use the `addparam` method to add stream level parameter. and `newpart` to
576 Use the `addparam` method to add stream level parameter. and `newpart` to
577 populate it. Then call `getchunks` to retrieve all the binary chunks of
577 populate it. Then call `getchunks` to retrieve all the binary chunks of
578 data that compose the bundle2 container."""
578 data that compose the bundle2 container."""
579
579
580 _magicstring = 'HG20'
580 _magicstring = 'HG20'
581
581
582 def __init__(self, ui, capabilities=()):
582 def __init__(self, ui, capabilities=()):
583 self.ui = ui
583 self.ui = ui
584 self._params = []
584 self._params = []
585 self._parts = []
585 self._parts = []
586 self.capabilities = dict(capabilities)
586 self.capabilities = dict(capabilities)
587 self._compengine = util.compengines.forbundletype('UN')
587 self._compengine = util.compengines.forbundletype('UN')
588 self._compopts = None
588 self._compopts = None
589
589
590 def setcompression(self, alg, compopts=None):
590 def setcompression(self, alg, compopts=None):
591 """setup core part compression to <alg>"""
591 """setup core part compression to <alg>"""
592 if alg in (None, 'UN'):
592 if alg in (None, 'UN'):
593 return
593 return
594 assert not any(n.lower() == 'compression' for n, v in self._params)
594 assert not any(n.lower() == 'compression' for n, v in self._params)
595 self.addparam('Compression', alg)
595 self.addparam('Compression', alg)
596 self._compengine = util.compengines.forbundletype(alg)
596 self._compengine = util.compengines.forbundletype(alg)
597 self._compopts = compopts
597 self._compopts = compopts
598
598
599 @property
599 @property
600 def nbparts(self):
600 def nbparts(self):
601 """total number of parts added to the bundler"""
601 """total number of parts added to the bundler"""
602 return len(self._parts)
602 return len(self._parts)
603
603
604 # methods used to defines the bundle2 content
604 # methods used to defines the bundle2 content
605 def addparam(self, name, value=None):
605 def addparam(self, name, value=None):
606 """add a stream level parameter"""
606 """add a stream level parameter"""
607 if not name:
607 if not name:
608 raise ValueError('empty parameter name')
608 raise ValueError('empty parameter name')
609 if name[0] not in pycompat.bytestr(string.ascii_letters):
609 if name[0] not in pycompat.bytestr(string.ascii_letters):
610 raise ValueError('non letter first character: %r' % name)
610 raise ValueError('non letter first character: %r' % name)
611 self._params.append((name, value))
611 self._params.append((name, value))
612
612
613 def addpart(self, part):
613 def addpart(self, part):
614 """add a new part to the bundle2 container
614 """add a new part to the bundle2 container
615
615
616 Parts contains the actual applicative payload."""
616 Parts contains the actual applicative payload."""
617 assert part.id is None
617 assert part.id is None
618 part.id = len(self._parts) # very cheap counter
618 part.id = len(self._parts) # very cheap counter
619 self._parts.append(part)
619 self._parts.append(part)
620
620
621 def newpart(self, typeid, *args, **kwargs):
621 def newpart(self, typeid, *args, **kwargs):
622 """create a new part and add it to the containers
622 """create a new part and add it to the containers
623
623
624 As the part is directly added to the containers. For now, this means
624 As the part is directly added to the containers. For now, this means
625 that any failure to properly initialize the part after calling
625 that any failure to properly initialize the part after calling
626 ``newpart`` should result in a failure of the whole bundling process.
626 ``newpart`` should result in a failure of the whole bundling process.
627
627
628 You can still fall back to manually create and add if you need better
628 You can still fall back to manually create and add if you need better
629 control."""
629 control."""
630 part = bundlepart(typeid, *args, **kwargs)
630 part = bundlepart(typeid, *args, **kwargs)
631 self.addpart(part)
631 self.addpart(part)
632 return part
632 return part
633
633
634 # methods used to generate the bundle2 stream
634 # methods used to generate the bundle2 stream
635 def getchunks(self):
635 def getchunks(self):
636 if self.ui.debugflag:
636 if self.ui.debugflag:
637 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
637 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
638 if self._params:
638 if self._params:
639 msg.append(' (%i params)' % len(self._params))
639 msg.append(' (%i params)' % len(self._params))
640 msg.append(' %i parts total\n' % len(self._parts))
640 msg.append(' %i parts total\n' % len(self._parts))
641 self.ui.debug(''.join(msg))
641 self.ui.debug(''.join(msg))
642 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
642 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
643 yield self._magicstring
643 yield self._magicstring
644 param = self._paramchunk()
644 param = self._paramchunk()
645 outdebug(self.ui, 'bundle parameter: %s' % param)
645 outdebug(self.ui, 'bundle parameter: %s' % param)
646 yield _pack(_fstreamparamsize, len(param))
646 yield _pack(_fstreamparamsize, len(param))
647 if param:
647 if param:
648 yield param
648 yield param
649 for chunk in self._compengine.compressstream(self._getcorechunk(),
649 for chunk in self._compengine.compressstream(self._getcorechunk(),
650 self._compopts):
650 self._compopts):
651 yield chunk
651 yield chunk
652
652
653 def _paramchunk(self):
653 def _paramchunk(self):
654 """return a encoded version of all stream parameters"""
654 """return a encoded version of all stream parameters"""
655 blocks = []
655 blocks = []
656 for par, value in self._params:
656 for par, value in self._params:
657 par = urlreq.quote(par)
657 par = urlreq.quote(par)
658 if value is not None:
658 if value is not None:
659 value = urlreq.quote(value)
659 value = urlreq.quote(value)
660 par = '%s=%s' % (par, value)
660 par = '%s=%s' % (par, value)
661 blocks.append(par)
661 blocks.append(par)
662 return ' '.join(blocks)
662 return ' '.join(blocks)
663
663
664 def _getcorechunk(self):
664 def _getcorechunk(self):
665 """yield chunk for the core part of the bundle
665 """yield chunk for the core part of the bundle
666
666
667 (all but headers and parameters)"""
667 (all but headers and parameters)"""
668 outdebug(self.ui, 'start of parts')
668 outdebug(self.ui, 'start of parts')
669 for part in self._parts:
669 for part in self._parts:
670 outdebug(self.ui, 'bundle part: "%s"' % part.type)
670 outdebug(self.ui, 'bundle part: "%s"' % part.type)
671 for chunk in part.getchunks(ui=self.ui):
671 for chunk in part.getchunks(ui=self.ui):
672 yield chunk
672 yield chunk
673 outdebug(self.ui, 'end of bundle')
673 outdebug(self.ui, 'end of bundle')
674 yield _pack(_fpartheadersize, 0)
674 yield _pack(_fpartheadersize, 0)
675
675
676
676
677 def salvageoutput(self):
677 def salvageoutput(self):
678 """return a list with a copy of all output parts in the bundle
678 """return a list with a copy of all output parts in the bundle
679
679
680 This is meant to be used during error handling to make sure we preserve
680 This is meant to be used during error handling to make sure we preserve
681 server output"""
681 server output"""
682 salvaged = []
682 salvaged = []
683 for part in self._parts:
683 for part in self._parts:
684 if part.type.startswith('output'):
684 if part.type.startswith('output'):
685 salvaged.append(part.copy())
685 salvaged.append(part.copy())
686 return salvaged
686 return salvaged
687
687
688
688
689 class unpackermixin(object):
689 class unpackermixin(object):
690 """A mixin to extract bytes and struct data from a stream"""
690 """A mixin to extract bytes and struct data from a stream"""
691
691
692 def __init__(self, fp):
692 def __init__(self, fp):
693 self._fp = fp
693 self._fp = fp
694
694
695 def _unpack(self, format):
695 def _unpack(self, format):
696 """unpack this struct format from the stream
696 """unpack this struct format from the stream
697
697
698 This method is meant for internal usage by the bundle2 protocol only.
698 This method is meant for internal usage by the bundle2 protocol only.
699 They directly manipulate the low level stream including bundle2 level
699 They directly manipulate the low level stream including bundle2 level
700 instruction.
700 instruction.
701
701
702 Do not use it to implement higher-level logic or methods."""
702 Do not use it to implement higher-level logic or methods."""
703 data = self._readexact(struct.calcsize(format))
703 data = self._readexact(struct.calcsize(format))
704 return _unpack(format, data)
704 return _unpack(format, data)
705
705
706 def _readexact(self, size):
706 def _readexact(self, size):
707 """read exactly <size> bytes from the stream
707 """read exactly <size> bytes from the stream
708
708
709 This method is meant for internal usage by the bundle2 protocol only.
709 This method is meant for internal usage by the bundle2 protocol only.
710 They directly manipulate the low level stream including bundle2 level
710 They directly manipulate the low level stream including bundle2 level
711 instruction.
711 instruction.
712
712
713 Do not use it to implement higher-level logic or methods."""
713 Do not use it to implement higher-level logic or methods."""
714 return changegroup.readexactly(self._fp, size)
714 return changegroup.readexactly(self._fp, size)
715
715
716 def getunbundler(ui, fp, magicstring=None):
716 def getunbundler(ui, fp, magicstring=None):
717 """return a valid unbundler object for a given magicstring"""
717 """return a valid unbundler object for a given magicstring"""
718 if magicstring is None:
718 if magicstring is None:
719 magicstring = changegroup.readexactly(fp, 4)
719 magicstring = changegroup.readexactly(fp, 4)
720 magic, version = magicstring[0:2], magicstring[2:4]
720 magic, version = magicstring[0:2], magicstring[2:4]
721 if magic != 'HG':
721 if magic != 'HG':
722 ui.debug(
722 ui.debug(
723 "error: invalid magic: %r (version %r), should be 'HG'\n"
723 "error: invalid magic: %r (version %r), should be 'HG'\n"
724 % (magic, version))
724 % (magic, version))
725 raise error.Abort(_('not a Mercurial bundle'))
725 raise error.Abort(_('not a Mercurial bundle'))
726 unbundlerclass = formatmap.get(version)
726 unbundlerclass = formatmap.get(version)
727 if unbundlerclass is None:
727 if unbundlerclass is None:
728 raise error.Abort(_('unknown bundle version %s') % version)
728 raise error.Abort(_('unknown bundle version %s') % version)
729 unbundler = unbundlerclass(ui, fp)
729 unbundler = unbundlerclass(ui, fp)
730 indebug(ui, 'start processing of %s stream' % magicstring)
730 indebug(ui, 'start processing of %s stream' % magicstring)
731 return unbundler
731 return unbundler
732
732
733 class unbundle20(unpackermixin):
733 class unbundle20(unpackermixin):
734 """interpret a bundle2 stream
734 """interpret a bundle2 stream
735
735
736 This class is fed with a binary stream and yields parts through its
736 This class is fed with a binary stream and yields parts through its
737 `iterparts` methods."""
737 `iterparts` methods."""
738
738
739 _magicstring = 'HG20'
739 _magicstring = 'HG20'
740
740
741 def __init__(self, ui, fp):
741 def __init__(self, ui, fp):
742 """If header is specified, we do not read it out of the stream."""
742 """If header is specified, we do not read it out of the stream."""
743 self.ui = ui
743 self.ui = ui
744 self._compengine = util.compengines.forbundletype('UN')
744 self._compengine = util.compengines.forbundletype('UN')
745 self._compressed = None
745 self._compressed = None
746 super(unbundle20, self).__init__(fp)
746 super(unbundle20, self).__init__(fp)
747
747
748 @util.propertycache
748 @util.propertycache
749 def params(self):
749 def params(self):
750 """dictionary of stream level parameters"""
750 """dictionary of stream level parameters"""
751 indebug(self.ui, 'reading bundle2 stream parameters')
751 indebug(self.ui, 'reading bundle2 stream parameters')
752 params = {}
752 params = {}
753 paramssize = self._unpack(_fstreamparamsize)[0]
753 paramssize = self._unpack(_fstreamparamsize)[0]
754 if paramssize < 0:
754 if paramssize < 0:
755 raise error.BundleValueError('negative bundle param size: %i'
755 raise error.BundleValueError('negative bundle param size: %i'
756 % paramssize)
756 % paramssize)
757 if paramssize:
757 if paramssize:
758 params = self._readexact(paramssize)
758 params = self._readexact(paramssize)
759 params = self._processallparams(params)
759 params = self._processallparams(params)
760 return params
760 return params
761
761
762 def _processallparams(self, paramsblock):
762 def _processallparams(self, paramsblock):
763 """"""
763 """"""
764 params = util.sortdict()
764 params = util.sortdict()
765 for p in paramsblock.split(' '):
765 for p in paramsblock.split(' '):
766 p = p.split('=', 1)
766 p = p.split('=', 1)
767 p = [urlreq.unquote(i) for i in p]
767 p = [urlreq.unquote(i) for i in p]
768 if len(p) < 2:
768 if len(p) < 2:
769 p.append(None)
769 p.append(None)
770 self._processparam(*p)
770 self._processparam(*p)
771 params[p[0]] = p[1]
771 params[p[0]] = p[1]
772 return params
772 return params
773
773
774
774
775 def _processparam(self, name, value):
775 def _processparam(self, name, value):
776 """process a parameter, applying its effect if needed
776 """process a parameter, applying its effect if needed
777
777
778 Parameter starting with a lower case letter are advisory and will be
778 Parameter starting with a lower case letter are advisory and will be
779 ignored when unknown. Those starting with an upper case letter are
779 ignored when unknown. Those starting with an upper case letter are
780 mandatory and will this function will raise a KeyError when unknown.
780 mandatory and will this function will raise a KeyError when unknown.
781
781
782 Note: no option are currently supported. Any input will be either
782 Note: no option are currently supported. Any input will be either
783 ignored or failing.
783 ignored or failing.
784 """
784 """
785 if not name:
785 if not name:
786 raise ValueError('empty parameter name')
786 raise ValueError('empty parameter name')
787 if name[0] not in pycompat.bytestr(string.ascii_letters):
787 if name[0] not in pycompat.bytestr(string.ascii_letters):
788 raise ValueError('non letter first character: %r' % name)
788 raise ValueError('non letter first character: %r' % name)
789 try:
789 try:
790 handler = b2streamparamsmap[name.lower()]
790 handler = b2streamparamsmap[name.lower()]
791 except KeyError:
791 except KeyError:
792 if name[0].islower():
792 if name[0].islower():
793 indebug(self.ui, "ignoring unknown parameter %r" % name)
793 indebug(self.ui, "ignoring unknown parameter %r" % name)
794 else:
794 else:
795 raise error.BundleUnknownFeatureError(params=(name,))
795 raise error.BundleUnknownFeatureError(params=(name,))
796 else:
796 else:
797 handler(self, name, value)
797 handler(self, name, value)
798
798
799 def _forwardchunks(self):
799 def _forwardchunks(self):
800 """utility to transfer a bundle2 as binary
800 """utility to transfer a bundle2 as binary
801
801
802 This is made necessary by the fact the 'getbundle' command over 'ssh'
802 This is made necessary by the fact the 'getbundle' command over 'ssh'
803 have no way to know then the reply end, relying on the bundle to be
803 have no way to know then the reply end, relying on the bundle to be
804 interpreted to know its end. This is terrible and we are sorry, but we
804 interpreted to know its end. This is terrible and we are sorry, but we
805 needed to move forward to get general delta enabled.
805 needed to move forward to get general delta enabled.
806 """
806 """
807 yield self._magicstring
807 yield self._magicstring
808 assert 'params' not in vars(self)
808 assert 'params' not in vars(self)
809 paramssize = self._unpack(_fstreamparamsize)[0]
809 paramssize = self._unpack(_fstreamparamsize)[0]
810 if paramssize < 0:
810 if paramssize < 0:
811 raise error.BundleValueError('negative bundle param size: %i'
811 raise error.BundleValueError('negative bundle param size: %i'
812 % paramssize)
812 % paramssize)
813 yield _pack(_fstreamparamsize, paramssize)
813 yield _pack(_fstreamparamsize, paramssize)
814 if paramssize:
814 if paramssize:
815 params = self._readexact(paramssize)
815 params = self._readexact(paramssize)
816 self._processallparams(params)
816 self._processallparams(params)
817 yield params
817 yield params
818 assert self._compengine.bundletype == 'UN'
818 assert self._compengine.bundletype == 'UN'
819 # From there, payload might need to be decompressed
819 # From there, payload might need to be decompressed
820 self._fp = self._compengine.decompressorreader(self._fp)
820 self._fp = self._compengine.decompressorreader(self._fp)
821 emptycount = 0
821 emptycount = 0
822 while emptycount < 2:
822 while emptycount < 2:
823 # so we can brainlessly loop
823 # so we can brainlessly loop
824 assert _fpartheadersize == _fpayloadsize
824 assert _fpartheadersize == _fpayloadsize
825 size = self._unpack(_fpartheadersize)[0]
825 size = self._unpack(_fpartheadersize)[0]
826 yield _pack(_fpartheadersize, size)
826 yield _pack(_fpartheadersize, size)
827 if size:
827 if size:
828 emptycount = 0
828 emptycount = 0
829 else:
829 else:
830 emptycount += 1
830 emptycount += 1
831 continue
831 continue
832 if size == flaginterrupt:
832 if size == flaginterrupt:
833 continue
833 continue
834 elif size < 0:
834 elif size < 0:
835 raise error.BundleValueError('negative chunk size: %i')
835 raise error.BundleValueError('negative chunk size: %i')
836 yield self._readexact(size)
836 yield self._readexact(size)
837
837
838
838
839 def iterparts(self):
839 def iterparts(self):
840 """yield all parts contained in the stream"""
840 """yield all parts contained in the stream"""
841 # make sure param have been loaded
841 # make sure param have been loaded
842 self.params
842 self.params
843 # From there, payload need to be decompressed
843 # From there, payload need to be decompressed
844 self._fp = self._compengine.decompressorreader(self._fp)
844 self._fp = self._compengine.decompressorreader(self._fp)
845 indebug(self.ui, 'start extraction of bundle2 parts')
845 indebug(self.ui, 'start extraction of bundle2 parts')
846 headerblock = self._readpartheader()
846 headerblock = self._readpartheader()
847 while headerblock is not None:
847 while headerblock is not None:
848 part = unbundlepart(self.ui, headerblock, self._fp)
848 part = unbundlepart(self.ui, headerblock, self._fp)
849 yield part
849 yield part
850 # Seek to the end of the part to force it's consumption so the next
850 # Seek to the end of the part to force it's consumption so the next
851 # part can be read. But then seek back to the beginning so the
851 # part can be read. But then seek back to the beginning so the
852 # code consuming this generator has a part that starts at 0.
852 # code consuming this generator has a part that starts at 0.
853 part.seek(0, 2)
853 part.seek(0, 2)
854 part.seek(0)
854 part.seek(0)
855 headerblock = self._readpartheader()
855 headerblock = self._readpartheader()
856 indebug(self.ui, 'end of bundle2 stream')
856 indebug(self.ui, 'end of bundle2 stream')
857
857
858 def _readpartheader(self):
858 def _readpartheader(self):
859 """reads a part header size and return the bytes blob
859 """reads a part header size and return the bytes blob
860
860
861 returns None if empty"""
861 returns None if empty"""
862 headersize = self._unpack(_fpartheadersize)[0]
862 headersize = self._unpack(_fpartheadersize)[0]
863 if headersize < 0:
863 if headersize < 0:
864 raise error.BundleValueError('negative part header size: %i'
864 raise error.BundleValueError('negative part header size: %i'
865 % headersize)
865 % headersize)
866 indebug(self.ui, 'part header size: %i' % headersize)
866 indebug(self.ui, 'part header size: %i' % headersize)
867 if headersize:
867 if headersize:
868 return self._readexact(headersize)
868 return self._readexact(headersize)
869 return None
869 return None
870
870
871 def compressed(self):
871 def compressed(self):
872 self.params # load params
872 self.params # load params
873 return self._compressed
873 return self._compressed
874
874
875 def close(self):
875 def close(self):
876 """close underlying file"""
876 """close underlying file"""
877 if util.safehasattr(self._fp, 'close'):
877 if util.safehasattr(self._fp, 'close'):
878 return self._fp.close()
878 return self._fp.close()
879
879
880 formatmap = {'20': unbundle20}
880 formatmap = {'20': unbundle20}
881
881
882 b2streamparamsmap = {}
882 b2streamparamsmap = {}
883
883
884 def b2streamparamhandler(name):
884 def b2streamparamhandler(name):
885 """register a handler for a stream level parameter"""
885 """register a handler for a stream level parameter"""
886 def decorator(func):
886 def decorator(func):
887 assert name not in formatmap
887 assert name not in formatmap
888 b2streamparamsmap[name] = func
888 b2streamparamsmap[name] = func
889 return func
889 return func
890 return decorator
890 return decorator
891
891
892 @b2streamparamhandler('compression')
892 @b2streamparamhandler('compression')
893 def processcompression(unbundler, param, value):
893 def processcompression(unbundler, param, value):
894 """read compression parameter and install payload decompression"""
894 """read compression parameter and install payload decompression"""
895 if value not in util.compengines.supportedbundletypes:
895 if value not in util.compengines.supportedbundletypes:
896 raise error.BundleUnknownFeatureError(params=(param,),
896 raise error.BundleUnknownFeatureError(params=(param,),
897 values=(value,))
897 values=(value,))
898 unbundler._compengine = util.compengines.forbundletype(value)
898 unbundler._compengine = util.compengines.forbundletype(value)
899 if value is not None:
899 if value is not None:
900 unbundler._compressed = True
900 unbundler._compressed = True
901
901
902 class bundlepart(object):
902 class bundlepart(object):
903 """A bundle2 part contains application level payload
903 """A bundle2 part contains application level payload
904
904
905 The part `type` is used to route the part to the application level
905 The part `type` is used to route the part to the application level
906 handler.
906 handler.
907
907
908 The part payload is contained in ``part.data``. It could be raw bytes or a
908 The part payload is contained in ``part.data``. It could be raw bytes or a
909 generator of byte chunks.
909 generator of byte chunks.
910
910
911 You can add parameters to the part using the ``addparam`` method.
911 You can add parameters to the part using the ``addparam`` method.
912 Parameters can be either mandatory (default) or advisory. Remote side
912 Parameters can be either mandatory (default) or advisory. Remote side
913 should be able to safely ignore the advisory ones.
913 should be able to safely ignore the advisory ones.
914
914
915 Both data and parameters cannot be modified after the generation has begun.
915 Both data and parameters cannot be modified after the generation has begun.
916 """
916 """
917
917
918 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
918 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
919 data='', mandatory=True):
919 data='', mandatory=True):
920 validateparttype(parttype)
920 validateparttype(parttype)
921 self.id = None
921 self.id = None
922 self.type = parttype
922 self.type = parttype
923 self._data = data
923 self._data = data
924 self._mandatoryparams = list(mandatoryparams)
924 self._mandatoryparams = list(mandatoryparams)
925 self._advisoryparams = list(advisoryparams)
925 self._advisoryparams = list(advisoryparams)
926 # checking for duplicated entries
926 # checking for duplicated entries
927 self._seenparams = set()
927 self._seenparams = set()
928 for pname, __ in self._mandatoryparams + self._advisoryparams:
928 for pname, __ in self._mandatoryparams + self._advisoryparams:
929 if pname in self._seenparams:
929 if pname in self._seenparams:
930 raise error.ProgrammingError('duplicated params: %s' % pname)
930 raise error.ProgrammingError('duplicated params: %s' % pname)
931 self._seenparams.add(pname)
931 self._seenparams.add(pname)
932 # status of the part's generation:
932 # status of the part's generation:
933 # - None: not started,
933 # - None: not started,
934 # - False: currently generated,
934 # - False: currently generated,
935 # - True: generation done.
935 # - True: generation done.
936 self._generated = None
936 self._generated = None
937 self.mandatory = mandatory
937 self.mandatory = mandatory
938
938
939 def __repr__(self):
939 def __repr__(self):
940 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
940 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
941 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
941 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
942 % (cls, id(self), self.id, self.type, self.mandatory))
942 % (cls, id(self), self.id, self.type, self.mandatory))
943
943
944 def copy(self):
944 def copy(self):
945 """return a copy of the part
945 """return a copy of the part
946
946
947 The new part have the very same content but no partid assigned yet.
947 The new part have the very same content but no partid assigned yet.
948 Parts with generated data cannot be copied."""
948 Parts with generated data cannot be copied."""
949 assert not util.safehasattr(self.data, 'next')
949 assert not util.safehasattr(self.data, 'next')
950 return self.__class__(self.type, self._mandatoryparams,
950 return self.__class__(self.type, self._mandatoryparams,
951 self._advisoryparams, self._data, self.mandatory)
951 self._advisoryparams, self._data, self.mandatory)
952
952
953 # methods used to defines the part content
953 # methods used to defines the part content
954 @property
954 @property
955 def data(self):
955 def data(self):
956 return self._data
956 return self._data
957
957
958 @data.setter
958 @data.setter
959 def data(self, data):
959 def data(self, data):
960 if self._generated is not None:
960 if self._generated is not None:
961 raise error.ReadOnlyPartError('part is being generated')
961 raise error.ReadOnlyPartError('part is being generated')
962 self._data = data
962 self._data = data
963
963
964 @property
964 @property
965 def mandatoryparams(self):
965 def mandatoryparams(self):
966 # make it an immutable tuple to force people through ``addparam``
966 # make it an immutable tuple to force people through ``addparam``
967 return tuple(self._mandatoryparams)
967 return tuple(self._mandatoryparams)
968
968
969 @property
969 @property
970 def advisoryparams(self):
970 def advisoryparams(self):
971 # make it an immutable tuple to force people through ``addparam``
971 # make it an immutable tuple to force people through ``addparam``
972 return tuple(self._advisoryparams)
972 return tuple(self._advisoryparams)
973
973
974 def addparam(self, name, value='', mandatory=True):
974 def addparam(self, name, value='', mandatory=True):
975 """add a parameter to the part
975 """add a parameter to the part
976
976
977 If 'mandatory' is set to True, the remote handler must claim support
977 If 'mandatory' is set to True, the remote handler must claim support
978 for this parameter or the unbundling will be aborted.
978 for this parameter or the unbundling will be aborted.
979
979
980 The 'name' and 'value' cannot exceed 255 bytes each.
980 The 'name' and 'value' cannot exceed 255 bytes each.
981 """
981 """
982 if self._generated is not None:
982 if self._generated is not None:
983 raise error.ReadOnlyPartError('part is being generated')
983 raise error.ReadOnlyPartError('part is being generated')
984 if name in self._seenparams:
984 if name in self._seenparams:
985 raise ValueError('duplicated params: %s' % name)
985 raise ValueError('duplicated params: %s' % name)
986 self._seenparams.add(name)
986 self._seenparams.add(name)
987 params = self._advisoryparams
987 params = self._advisoryparams
988 if mandatory:
988 if mandatory:
989 params = self._mandatoryparams
989 params = self._mandatoryparams
990 params.append((name, value))
990 params.append((name, value))
991
991
992 # methods used to generates the bundle2 stream
992 # methods used to generates the bundle2 stream
993 def getchunks(self, ui):
993 def getchunks(self, ui):
994 if self._generated is not None:
994 if self._generated is not None:
995 raise error.ProgrammingError('part can only be consumed once')
995 raise error.ProgrammingError('part can only be consumed once')
996 self._generated = False
996 self._generated = False
997
997
998 if ui.debugflag:
998 if ui.debugflag:
999 msg = ['bundle2-output-part: "%s"' % self.type]
999 msg = ['bundle2-output-part: "%s"' % self.type]
1000 if not self.mandatory:
1000 if not self.mandatory:
1001 msg.append(' (advisory)')
1001 msg.append(' (advisory)')
1002 nbmp = len(self.mandatoryparams)
1002 nbmp = len(self.mandatoryparams)
1003 nbap = len(self.advisoryparams)
1003 nbap = len(self.advisoryparams)
1004 if nbmp or nbap:
1004 if nbmp or nbap:
1005 msg.append(' (params:')
1005 msg.append(' (params:')
1006 if nbmp:
1006 if nbmp:
1007 msg.append(' %i mandatory' % nbmp)
1007 msg.append(' %i mandatory' % nbmp)
1008 if nbap:
1008 if nbap:
1009 msg.append(' %i advisory' % nbmp)
1009 msg.append(' %i advisory' % nbmp)
1010 msg.append(')')
1010 msg.append(')')
1011 if not self.data:
1011 if not self.data:
1012 msg.append(' empty payload')
1012 msg.append(' empty payload')
1013 elif util.safehasattr(self.data, 'next'):
1013 elif util.safehasattr(self.data, 'next'):
1014 msg.append(' streamed payload')
1014 msg.append(' streamed payload')
1015 else:
1015 else:
1016 msg.append(' %i bytes payload' % len(self.data))
1016 msg.append(' %i bytes payload' % len(self.data))
1017 msg.append('\n')
1017 msg.append('\n')
1018 ui.debug(''.join(msg))
1018 ui.debug(''.join(msg))
1019
1019
1020 #### header
1020 #### header
1021 if self.mandatory:
1021 if self.mandatory:
1022 parttype = self.type.upper()
1022 parttype = self.type.upper()
1023 else:
1023 else:
1024 parttype = self.type.lower()
1024 parttype = self.type.lower()
1025 outdebug(ui, 'part %s: "%s"' % (pycompat.bytestr(self.id), parttype))
1025 outdebug(ui, 'part %s: "%s"' % (pycompat.bytestr(self.id), parttype))
1026 ## parttype
1026 ## parttype
1027 header = [_pack(_fparttypesize, len(parttype)),
1027 header = [_pack(_fparttypesize, len(parttype)),
1028 parttype, _pack(_fpartid, self.id),
1028 parttype, _pack(_fpartid, self.id),
1029 ]
1029 ]
1030 ## parameters
1030 ## parameters
1031 # count
1031 # count
1032 manpar = self.mandatoryparams
1032 manpar = self.mandatoryparams
1033 advpar = self.advisoryparams
1033 advpar = self.advisoryparams
1034 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
1034 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
1035 # size
1035 # size
1036 parsizes = []
1036 parsizes = []
1037 for key, value in manpar:
1037 for key, value in manpar:
1038 parsizes.append(len(key))
1038 parsizes.append(len(key))
1039 parsizes.append(len(value))
1039 parsizes.append(len(value))
1040 for key, value in advpar:
1040 for key, value in advpar:
1041 parsizes.append(len(key))
1041 parsizes.append(len(key))
1042 parsizes.append(len(value))
1042 parsizes.append(len(value))
1043 paramsizes = _pack(_makefpartparamsizes(len(parsizes) // 2), *parsizes)
1043 paramsizes = _pack(_makefpartparamsizes(len(parsizes) // 2), *parsizes)
1044 header.append(paramsizes)
1044 header.append(paramsizes)
1045 # key, value
1045 # key, value
1046 for key, value in manpar:
1046 for key, value in manpar:
1047 header.append(key)
1047 header.append(key)
1048 header.append(value)
1048 header.append(value)
1049 for key, value in advpar:
1049 for key, value in advpar:
1050 header.append(key)
1050 header.append(key)
1051 header.append(value)
1051 header.append(value)
1052 ## finalize header
1052 ## finalize header
1053 headerchunk = ''.join(header)
1053 try:
1054 headerchunk = ''.join(header)
1055 except TypeError:
1056 raise TypeError(r'Found a non-bytes trying to '
1057 r'build bundle part header: %r' % header)
1054 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
1058 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
1055 yield _pack(_fpartheadersize, len(headerchunk))
1059 yield _pack(_fpartheadersize, len(headerchunk))
1056 yield headerchunk
1060 yield headerchunk
1057 ## payload
1061 ## payload
1058 try:
1062 try:
1059 for chunk in self._payloadchunks():
1063 for chunk in self._payloadchunks():
1060 outdebug(ui, 'payload chunk size: %i' % len(chunk))
1064 outdebug(ui, 'payload chunk size: %i' % len(chunk))
1061 yield _pack(_fpayloadsize, len(chunk))
1065 yield _pack(_fpayloadsize, len(chunk))
1062 yield chunk
1066 yield chunk
1063 except GeneratorExit:
1067 except GeneratorExit:
1064 # GeneratorExit means that nobody is listening for our
1068 # GeneratorExit means that nobody is listening for our
1065 # results anyway, so just bail quickly rather than trying
1069 # results anyway, so just bail quickly rather than trying
1066 # to produce an error part.
1070 # to produce an error part.
1067 ui.debug('bundle2-generatorexit\n')
1071 ui.debug('bundle2-generatorexit\n')
1068 raise
1072 raise
1069 except BaseException as exc:
1073 except BaseException as exc:
1070 bexc = util.forcebytestr(exc)
1074 bexc = util.forcebytestr(exc)
1071 # backup exception data for later
1075 # backup exception data for later
1072 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
1076 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
1073 % bexc)
1077 % bexc)
1074 tb = sys.exc_info()[2]
1078 tb = sys.exc_info()[2]
1075 msg = 'unexpected error: %s' % bexc
1079 msg = 'unexpected error: %s' % bexc
1076 interpart = bundlepart('error:abort', [('message', msg)],
1080 interpart = bundlepart('error:abort', [('message', msg)],
1077 mandatory=False)
1081 mandatory=False)
1078 interpart.id = 0
1082 interpart.id = 0
1079 yield _pack(_fpayloadsize, -1)
1083 yield _pack(_fpayloadsize, -1)
1080 for chunk in interpart.getchunks(ui=ui):
1084 for chunk in interpart.getchunks(ui=ui):
1081 yield chunk
1085 yield chunk
1082 outdebug(ui, 'closing payload chunk')
1086 outdebug(ui, 'closing payload chunk')
1083 # abort current part payload
1087 # abort current part payload
1084 yield _pack(_fpayloadsize, 0)
1088 yield _pack(_fpayloadsize, 0)
1085 pycompat.raisewithtb(exc, tb)
1089 pycompat.raisewithtb(exc, tb)
1086 # end of payload
1090 # end of payload
1087 outdebug(ui, 'closing payload chunk')
1091 outdebug(ui, 'closing payload chunk')
1088 yield _pack(_fpayloadsize, 0)
1092 yield _pack(_fpayloadsize, 0)
1089 self._generated = True
1093 self._generated = True
1090
1094
1091 def _payloadchunks(self):
1095 def _payloadchunks(self):
1092 """yield chunks of a the part payload
1096 """yield chunks of a the part payload
1093
1097
1094 Exists to handle the different methods to provide data to a part."""
1098 Exists to handle the different methods to provide data to a part."""
1095 # we only support fixed size data now.
1099 # we only support fixed size data now.
1096 # This will be improved in the future.
1100 # This will be improved in the future.
1097 if (util.safehasattr(self.data, 'next')
1101 if (util.safehasattr(self.data, 'next')
1098 or util.safehasattr(self.data, '__next__')):
1102 or util.safehasattr(self.data, '__next__')):
1099 buff = util.chunkbuffer(self.data)
1103 buff = util.chunkbuffer(self.data)
1100 chunk = buff.read(preferedchunksize)
1104 chunk = buff.read(preferedchunksize)
1101 while chunk:
1105 while chunk:
1102 yield chunk
1106 yield chunk
1103 chunk = buff.read(preferedchunksize)
1107 chunk = buff.read(preferedchunksize)
1104 elif len(self.data):
1108 elif len(self.data):
1105 yield self.data
1109 yield self.data
1106
1110
1107
1111
1108 flaginterrupt = -1
1112 flaginterrupt = -1
1109
1113
1110 class interrupthandler(unpackermixin):
1114 class interrupthandler(unpackermixin):
1111 """read one part and process it with restricted capability
1115 """read one part and process it with restricted capability
1112
1116
1113 This allows to transmit exception raised on the producer size during part
1117 This allows to transmit exception raised on the producer size during part
1114 iteration while the consumer is reading a part.
1118 iteration while the consumer is reading a part.
1115
1119
1116 Part processed in this manner only have access to a ui object,"""
1120 Part processed in this manner only have access to a ui object,"""
1117
1121
1118 def __init__(self, ui, fp):
1122 def __init__(self, ui, fp):
1119 super(interrupthandler, self).__init__(fp)
1123 super(interrupthandler, self).__init__(fp)
1120 self.ui = ui
1124 self.ui = ui
1121
1125
1122 def _readpartheader(self):
1126 def _readpartheader(self):
1123 """reads a part header size and return the bytes blob
1127 """reads a part header size and return the bytes blob
1124
1128
1125 returns None if empty"""
1129 returns None if empty"""
1126 headersize = self._unpack(_fpartheadersize)[0]
1130 headersize = self._unpack(_fpartheadersize)[0]
1127 if headersize < 0:
1131 if headersize < 0:
1128 raise error.BundleValueError('negative part header size: %i'
1132 raise error.BundleValueError('negative part header size: %i'
1129 % headersize)
1133 % headersize)
1130 indebug(self.ui, 'part header size: %i\n' % headersize)
1134 indebug(self.ui, 'part header size: %i\n' % headersize)
1131 if headersize:
1135 if headersize:
1132 return self._readexact(headersize)
1136 return self._readexact(headersize)
1133 return None
1137 return None
1134
1138
1135 def __call__(self):
1139 def __call__(self):
1136
1140
1137 self.ui.debug('bundle2-input-stream-interrupt:'
1141 self.ui.debug('bundle2-input-stream-interrupt:'
1138 ' opening out of band context\n')
1142 ' opening out of band context\n')
1139 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1143 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1140 headerblock = self._readpartheader()
1144 headerblock = self._readpartheader()
1141 if headerblock is None:
1145 if headerblock is None:
1142 indebug(self.ui, 'no part found during interruption.')
1146 indebug(self.ui, 'no part found during interruption.')
1143 return
1147 return
1144 part = unbundlepart(self.ui, headerblock, self._fp)
1148 part = unbundlepart(self.ui, headerblock, self._fp)
1145 op = interruptoperation(self.ui)
1149 op = interruptoperation(self.ui)
1146 _processpart(op, part)
1150 _processpart(op, part)
1147 self.ui.debug('bundle2-input-stream-interrupt:'
1151 self.ui.debug('bundle2-input-stream-interrupt:'
1148 ' closing out of band context\n')
1152 ' closing out of band context\n')
1149
1153
1150 class interruptoperation(object):
1154 class interruptoperation(object):
1151 """A limited operation to be use by part handler during interruption
1155 """A limited operation to be use by part handler during interruption
1152
1156
1153 It only have access to an ui object.
1157 It only have access to an ui object.
1154 """
1158 """
1155
1159
1156 def __init__(self, ui):
1160 def __init__(self, ui):
1157 self.ui = ui
1161 self.ui = ui
1158 self.reply = None
1162 self.reply = None
1159 self.captureoutput = False
1163 self.captureoutput = False
1160
1164
1161 @property
1165 @property
1162 def repo(self):
1166 def repo(self):
1163 raise error.ProgrammingError('no repo access from stream interruption')
1167 raise error.ProgrammingError('no repo access from stream interruption')
1164
1168
1165 def gettransaction(self):
1169 def gettransaction(self):
1166 raise TransactionUnavailable('no repo access from stream interruption')
1170 raise TransactionUnavailable('no repo access from stream interruption')
1167
1171
1168 class unbundlepart(unpackermixin):
1172 class unbundlepart(unpackermixin):
1169 """a bundle part read from a bundle"""
1173 """a bundle part read from a bundle"""
1170
1174
1171 def __init__(self, ui, header, fp):
1175 def __init__(self, ui, header, fp):
1172 super(unbundlepart, self).__init__(fp)
1176 super(unbundlepart, self).__init__(fp)
1173 self._seekable = (util.safehasattr(fp, 'seek') and
1177 self._seekable = (util.safehasattr(fp, 'seek') and
1174 util.safehasattr(fp, 'tell'))
1178 util.safehasattr(fp, 'tell'))
1175 self.ui = ui
1179 self.ui = ui
1176 # unbundle state attr
1180 # unbundle state attr
1177 self._headerdata = header
1181 self._headerdata = header
1178 self._headeroffset = 0
1182 self._headeroffset = 0
1179 self._initialized = False
1183 self._initialized = False
1180 self.consumed = False
1184 self.consumed = False
1181 # part data
1185 # part data
1182 self.id = None
1186 self.id = None
1183 self.type = None
1187 self.type = None
1184 self.mandatoryparams = None
1188 self.mandatoryparams = None
1185 self.advisoryparams = None
1189 self.advisoryparams = None
1186 self.params = None
1190 self.params = None
1187 self.mandatorykeys = ()
1191 self.mandatorykeys = ()
1188 self._payloadstream = None
1192 self._payloadstream = None
1189 self._readheader()
1193 self._readheader()
1190 self._mandatory = None
1194 self._mandatory = None
1191 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1195 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1192 self._pos = 0
1196 self._pos = 0
1193
1197
1194 def _fromheader(self, size):
1198 def _fromheader(self, size):
1195 """return the next <size> byte from the header"""
1199 """return the next <size> byte from the header"""
1196 offset = self._headeroffset
1200 offset = self._headeroffset
1197 data = self._headerdata[offset:(offset + size)]
1201 data = self._headerdata[offset:(offset + size)]
1198 self._headeroffset = offset + size
1202 self._headeroffset = offset + size
1199 return data
1203 return data
1200
1204
1201 def _unpackheader(self, format):
1205 def _unpackheader(self, format):
1202 """read given format from header
1206 """read given format from header
1203
1207
1204 This automatically compute the size of the format to read."""
1208 This automatically compute the size of the format to read."""
1205 data = self._fromheader(struct.calcsize(format))
1209 data = self._fromheader(struct.calcsize(format))
1206 return _unpack(format, data)
1210 return _unpack(format, data)
1207
1211
1208 def _initparams(self, mandatoryparams, advisoryparams):
1212 def _initparams(self, mandatoryparams, advisoryparams):
1209 """internal function to setup all logic related parameters"""
1213 """internal function to setup all logic related parameters"""
1210 # make it read only to prevent people touching it by mistake.
1214 # make it read only to prevent people touching it by mistake.
1211 self.mandatoryparams = tuple(mandatoryparams)
1215 self.mandatoryparams = tuple(mandatoryparams)
1212 self.advisoryparams = tuple(advisoryparams)
1216 self.advisoryparams = tuple(advisoryparams)
1213 # user friendly UI
1217 # user friendly UI
1214 self.params = util.sortdict(self.mandatoryparams)
1218 self.params = util.sortdict(self.mandatoryparams)
1215 self.params.update(self.advisoryparams)
1219 self.params.update(self.advisoryparams)
1216 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1220 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1217
1221
1218 def _payloadchunks(self, chunknum=0):
1222 def _payloadchunks(self, chunknum=0):
1219 '''seek to specified chunk and start yielding data'''
1223 '''seek to specified chunk and start yielding data'''
1220 if len(self._chunkindex) == 0:
1224 if len(self._chunkindex) == 0:
1221 assert chunknum == 0, 'Must start with chunk 0'
1225 assert chunknum == 0, 'Must start with chunk 0'
1222 self._chunkindex.append((0, self._tellfp()))
1226 self._chunkindex.append((0, self._tellfp()))
1223 else:
1227 else:
1224 assert chunknum < len(self._chunkindex), \
1228 assert chunknum < len(self._chunkindex), \
1225 'Unknown chunk %d' % chunknum
1229 'Unknown chunk %d' % chunknum
1226 self._seekfp(self._chunkindex[chunknum][1])
1230 self._seekfp(self._chunkindex[chunknum][1])
1227
1231
1228 pos = self._chunkindex[chunknum][0]
1232 pos = self._chunkindex[chunknum][0]
1229 payloadsize = self._unpack(_fpayloadsize)[0]
1233 payloadsize = self._unpack(_fpayloadsize)[0]
1230 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1234 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1231 while payloadsize:
1235 while payloadsize:
1232 if payloadsize == flaginterrupt:
1236 if payloadsize == flaginterrupt:
1233 # interruption detection, the handler will now read a
1237 # interruption detection, the handler will now read a
1234 # single part and process it.
1238 # single part and process it.
1235 interrupthandler(self.ui, self._fp)()
1239 interrupthandler(self.ui, self._fp)()
1236 elif payloadsize < 0:
1240 elif payloadsize < 0:
1237 msg = 'negative payload chunk size: %i' % payloadsize
1241 msg = 'negative payload chunk size: %i' % payloadsize
1238 raise error.BundleValueError(msg)
1242 raise error.BundleValueError(msg)
1239 else:
1243 else:
1240 result = self._readexact(payloadsize)
1244 result = self._readexact(payloadsize)
1241 chunknum += 1
1245 chunknum += 1
1242 pos += payloadsize
1246 pos += payloadsize
1243 if chunknum == len(self._chunkindex):
1247 if chunknum == len(self._chunkindex):
1244 self._chunkindex.append((pos, self._tellfp()))
1248 self._chunkindex.append((pos, self._tellfp()))
1245 yield result
1249 yield result
1246 payloadsize = self._unpack(_fpayloadsize)[0]
1250 payloadsize = self._unpack(_fpayloadsize)[0]
1247 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1251 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1248
1252
1249 def _findchunk(self, pos):
1253 def _findchunk(self, pos):
1250 '''for a given payload position, return a chunk number and offset'''
1254 '''for a given payload position, return a chunk number and offset'''
1251 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1255 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1252 if ppos == pos:
1256 if ppos == pos:
1253 return chunk, 0
1257 return chunk, 0
1254 elif ppos > pos:
1258 elif ppos > pos:
1255 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1259 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1256 raise ValueError('Unknown chunk')
1260 raise ValueError('Unknown chunk')
1257
1261
1258 def _readheader(self):
1262 def _readheader(self):
1259 """read the header and setup the object"""
1263 """read the header and setup the object"""
1260 typesize = self._unpackheader(_fparttypesize)[0]
1264 typesize = self._unpackheader(_fparttypesize)[0]
1261 self.type = self._fromheader(typesize)
1265 self.type = self._fromheader(typesize)
1262 indebug(self.ui, 'part type: "%s"' % self.type)
1266 indebug(self.ui, 'part type: "%s"' % self.type)
1263 self.id = self._unpackheader(_fpartid)[0]
1267 self.id = self._unpackheader(_fpartid)[0]
1264 indebug(self.ui, 'part id: "%s"' % pycompat.bytestr(self.id))
1268 indebug(self.ui, 'part id: "%s"' % pycompat.bytestr(self.id))
1265 # extract mandatory bit from type
1269 # extract mandatory bit from type
1266 self.mandatory = (self.type != self.type.lower())
1270 self.mandatory = (self.type != self.type.lower())
1267 self.type = self.type.lower()
1271 self.type = self.type.lower()
1268 ## reading parameters
1272 ## reading parameters
1269 # param count
1273 # param count
1270 mancount, advcount = self._unpackheader(_fpartparamcount)
1274 mancount, advcount = self._unpackheader(_fpartparamcount)
1271 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1275 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1272 # param size
1276 # param size
1273 fparamsizes = _makefpartparamsizes(mancount + advcount)
1277 fparamsizes = _makefpartparamsizes(mancount + advcount)
1274 paramsizes = self._unpackheader(fparamsizes)
1278 paramsizes = self._unpackheader(fparamsizes)
1275 # make it a list of couple again
1279 # make it a list of couple again
1276 paramsizes = list(zip(paramsizes[::2], paramsizes[1::2]))
1280 paramsizes = list(zip(paramsizes[::2], paramsizes[1::2]))
1277 # split mandatory from advisory
1281 # split mandatory from advisory
1278 mansizes = paramsizes[:mancount]
1282 mansizes = paramsizes[:mancount]
1279 advsizes = paramsizes[mancount:]
1283 advsizes = paramsizes[mancount:]
1280 # retrieve param value
1284 # retrieve param value
1281 manparams = []
1285 manparams = []
1282 for key, value in mansizes:
1286 for key, value in mansizes:
1283 manparams.append((self._fromheader(key), self._fromheader(value)))
1287 manparams.append((self._fromheader(key), self._fromheader(value)))
1284 advparams = []
1288 advparams = []
1285 for key, value in advsizes:
1289 for key, value in advsizes:
1286 advparams.append((self._fromheader(key), self._fromheader(value)))
1290 advparams.append((self._fromheader(key), self._fromheader(value)))
1287 self._initparams(manparams, advparams)
1291 self._initparams(manparams, advparams)
1288 ## part payload
1292 ## part payload
1289 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1293 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1290 # we read the data, tell it
1294 # we read the data, tell it
1291 self._initialized = True
1295 self._initialized = True
1292
1296
1293 def read(self, size=None):
1297 def read(self, size=None):
1294 """read payload data"""
1298 """read payload data"""
1295 if not self._initialized:
1299 if not self._initialized:
1296 self._readheader()
1300 self._readheader()
1297 if size is None:
1301 if size is None:
1298 data = self._payloadstream.read()
1302 data = self._payloadstream.read()
1299 else:
1303 else:
1300 data = self._payloadstream.read(size)
1304 data = self._payloadstream.read(size)
1301 self._pos += len(data)
1305 self._pos += len(data)
1302 if size is None or len(data) < size:
1306 if size is None or len(data) < size:
1303 if not self.consumed and self._pos:
1307 if not self.consumed and self._pos:
1304 self.ui.debug('bundle2-input-part: total payload size %i\n'
1308 self.ui.debug('bundle2-input-part: total payload size %i\n'
1305 % self._pos)
1309 % self._pos)
1306 self.consumed = True
1310 self.consumed = True
1307 return data
1311 return data
1308
1312
1309 def tell(self):
1313 def tell(self):
1310 return self._pos
1314 return self._pos
1311
1315
1312 def seek(self, offset, whence=0):
1316 def seek(self, offset, whence=0):
1313 if whence == 0:
1317 if whence == 0:
1314 newpos = offset
1318 newpos = offset
1315 elif whence == 1:
1319 elif whence == 1:
1316 newpos = self._pos + offset
1320 newpos = self._pos + offset
1317 elif whence == 2:
1321 elif whence == 2:
1318 if not self.consumed:
1322 if not self.consumed:
1319 self.read()
1323 self.read()
1320 newpos = self._chunkindex[-1][0] - offset
1324 newpos = self._chunkindex[-1][0] - offset
1321 else:
1325 else:
1322 raise ValueError('Unknown whence value: %r' % (whence,))
1326 raise ValueError('Unknown whence value: %r' % (whence,))
1323
1327
1324 if newpos > self._chunkindex[-1][0] and not self.consumed:
1328 if newpos > self._chunkindex[-1][0] and not self.consumed:
1325 self.read()
1329 self.read()
1326 if not 0 <= newpos <= self._chunkindex[-1][0]:
1330 if not 0 <= newpos <= self._chunkindex[-1][0]:
1327 raise ValueError('Offset out of range')
1331 raise ValueError('Offset out of range')
1328
1332
1329 if self._pos != newpos:
1333 if self._pos != newpos:
1330 chunk, internaloffset = self._findchunk(newpos)
1334 chunk, internaloffset = self._findchunk(newpos)
1331 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1335 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1332 adjust = self.read(internaloffset)
1336 adjust = self.read(internaloffset)
1333 if len(adjust) != internaloffset:
1337 if len(adjust) != internaloffset:
1334 raise error.Abort(_('Seek failed\n'))
1338 raise error.Abort(_('Seek failed\n'))
1335 self._pos = newpos
1339 self._pos = newpos
1336
1340
1337 def _seekfp(self, offset, whence=0):
1341 def _seekfp(self, offset, whence=0):
1338 """move the underlying file pointer
1342 """move the underlying file pointer
1339
1343
1340 This method is meant for internal usage by the bundle2 protocol only.
1344 This method is meant for internal usage by the bundle2 protocol only.
1341 They directly manipulate the low level stream including bundle2 level
1345 They directly manipulate the low level stream including bundle2 level
1342 instruction.
1346 instruction.
1343
1347
1344 Do not use it to implement higher-level logic or methods."""
1348 Do not use it to implement higher-level logic or methods."""
1345 if self._seekable:
1349 if self._seekable:
1346 return self._fp.seek(offset, whence)
1350 return self._fp.seek(offset, whence)
1347 else:
1351 else:
1348 raise NotImplementedError(_('File pointer is not seekable'))
1352 raise NotImplementedError(_('File pointer is not seekable'))
1349
1353
1350 def _tellfp(self):
1354 def _tellfp(self):
1351 """return the file offset, or None if file is not seekable
1355 """return the file offset, or None if file is not seekable
1352
1356
1353 This method is meant for internal usage by the bundle2 protocol only.
1357 This method is meant for internal usage by the bundle2 protocol only.
1354 They directly manipulate the low level stream including bundle2 level
1358 They directly manipulate the low level stream including bundle2 level
1355 instruction.
1359 instruction.
1356
1360
1357 Do not use it to implement higher-level logic or methods."""
1361 Do not use it to implement higher-level logic or methods."""
1358 if self._seekable:
1362 if self._seekable:
1359 try:
1363 try:
1360 return self._fp.tell()
1364 return self._fp.tell()
1361 except IOError as e:
1365 except IOError as e:
1362 if e.errno == errno.ESPIPE:
1366 if e.errno == errno.ESPIPE:
1363 self._seekable = False
1367 self._seekable = False
1364 else:
1368 else:
1365 raise
1369 raise
1366 return None
1370 return None
1367
1371
1368 # These are only the static capabilities.
1372 # These are only the static capabilities.
1369 # Check the 'getrepocaps' function for the rest.
1373 # Check the 'getrepocaps' function for the rest.
1370 capabilities = {'HG20': (),
1374 capabilities = {'HG20': (),
1371 'error': ('abort', 'unsupportedcontent', 'pushraced',
1375 'error': ('abort', 'unsupportedcontent', 'pushraced',
1372 'pushkey'),
1376 'pushkey'),
1373 'listkeys': (),
1377 'listkeys': (),
1374 'pushkey': (),
1378 'pushkey': (),
1375 'digests': tuple(sorted(util.DIGESTS.keys())),
1379 'digests': tuple(sorted(util.DIGESTS.keys())),
1376 'remote-changegroup': ('http', 'https'),
1380 'remote-changegroup': ('http', 'https'),
1377 'hgtagsfnodes': (),
1381 'hgtagsfnodes': (),
1378 }
1382 }
1379
1383
1380 def getrepocaps(repo, allowpushback=False):
1384 def getrepocaps(repo, allowpushback=False):
1381 """return the bundle2 capabilities for a given repo
1385 """return the bundle2 capabilities for a given repo
1382
1386
1383 Exists to allow extensions (like evolution) to mutate the capabilities.
1387 Exists to allow extensions (like evolution) to mutate the capabilities.
1384 """
1388 """
1385 caps = capabilities.copy()
1389 caps = capabilities.copy()
1386 caps['changegroup'] = tuple(sorted(
1390 caps['changegroup'] = tuple(sorted(
1387 changegroup.supportedincomingversions(repo)))
1391 changegroup.supportedincomingversions(repo)))
1388 if obsolete.isenabled(repo, obsolete.exchangeopt):
1392 if obsolete.isenabled(repo, obsolete.exchangeopt):
1389 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1393 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1390 caps['obsmarkers'] = supportedformat
1394 caps['obsmarkers'] = supportedformat
1391 if allowpushback:
1395 if allowpushback:
1392 caps['pushback'] = ()
1396 caps['pushback'] = ()
1393 cpmode = repo.ui.config('server', 'concurrent-push-mode')
1397 cpmode = repo.ui.config('server', 'concurrent-push-mode')
1394 if cpmode == 'check-related':
1398 if cpmode == 'check-related':
1395 caps['checkheads'] = ('related',)
1399 caps['checkheads'] = ('related',)
1396 return caps
1400 return caps
1397
1401
1398 def bundle2caps(remote):
1402 def bundle2caps(remote):
1399 """return the bundle capabilities of a peer as dict"""
1403 """return the bundle capabilities of a peer as dict"""
1400 raw = remote.capable('bundle2')
1404 raw = remote.capable('bundle2')
1401 if not raw and raw != '':
1405 if not raw and raw != '':
1402 return {}
1406 return {}
1403 capsblob = urlreq.unquote(remote.capable('bundle2'))
1407 capsblob = urlreq.unquote(remote.capable('bundle2'))
1404 return decodecaps(capsblob)
1408 return decodecaps(capsblob)
1405
1409
1406 def obsmarkersversion(caps):
1410 def obsmarkersversion(caps):
1407 """extract the list of supported obsmarkers versions from a bundle2caps dict
1411 """extract the list of supported obsmarkers versions from a bundle2caps dict
1408 """
1412 """
1409 obscaps = caps.get('obsmarkers', ())
1413 obscaps = caps.get('obsmarkers', ())
1410 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1414 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1411
1415
1412 def writenewbundle(ui, repo, source, filename, bundletype, outgoing, opts,
1416 def writenewbundle(ui, repo, source, filename, bundletype, outgoing, opts,
1413 vfs=None, compression=None, compopts=None):
1417 vfs=None, compression=None, compopts=None):
1414 if bundletype.startswith('HG10'):
1418 if bundletype.startswith('HG10'):
1415 cg = changegroup.makechangegroup(repo, outgoing, '01', source)
1419 cg = changegroup.makechangegroup(repo, outgoing, '01', source)
1416 return writebundle(ui, cg, filename, bundletype, vfs=vfs,
1420 return writebundle(ui, cg, filename, bundletype, vfs=vfs,
1417 compression=compression, compopts=compopts)
1421 compression=compression, compopts=compopts)
1418 elif not bundletype.startswith('HG20'):
1422 elif not bundletype.startswith('HG20'):
1419 raise error.ProgrammingError('unknown bundle type: %s' % bundletype)
1423 raise error.ProgrammingError('unknown bundle type: %s' % bundletype)
1420
1424
1421 caps = {}
1425 caps = {}
1422 if 'obsolescence' in opts:
1426 if 'obsolescence' in opts:
1423 caps['obsmarkers'] = ('V1',)
1427 caps['obsmarkers'] = ('V1',)
1424 bundle = bundle20(ui, caps)
1428 bundle = bundle20(ui, caps)
1425 bundle.setcompression(compression, compopts)
1429 bundle.setcompression(compression, compopts)
1426 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1430 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1427 chunkiter = bundle.getchunks()
1431 chunkiter = bundle.getchunks()
1428
1432
1429 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1433 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1430
1434
1431 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1435 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1432 # We should eventually reconcile this logic with the one behind
1436 # We should eventually reconcile this logic with the one behind
1433 # 'exchange.getbundle2partsgenerator'.
1437 # 'exchange.getbundle2partsgenerator'.
1434 #
1438 #
1435 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1439 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1436 # different right now. So we keep them separated for now for the sake of
1440 # different right now. So we keep them separated for now for the sake of
1437 # simplicity.
1441 # simplicity.
1438
1442
1439 # we always want a changegroup in such bundle
1443 # we always want a changegroup in such bundle
1440 cgversion = opts.get('cg.version')
1444 cgversion = opts.get('cg.version')
1441 if cgversion is None:
1445 if cgversion is None:
1442 cgversion = changegroup.safeversion(repo)
1446 cgversion = changegroup.safeversion(repo)
1443 cg = changegroup.makechangegroup(repo, outgoing, cgversion, source)
1447 cg = changegroup.makechangegroup(repo, outgoing, cgversion, source)
1444 part = bundler.newpart('changegroup', data=cg.getchunks())
1448 part = bundler.newpart('changegroup', data=cg.getchunks())
1445 part.addparam('version', cg.version)
1449 part.addparam('version', cg.version)
1446 if 'clcount' in cg.extras:
1450 if 'clcount' in cg.extras:
1447 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1451 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1448 mandatory=False)
1452 mandatory=False)
1449 if opts.get('phases') and repo.revs('%ln and secret()',
1453 if opts.get('phases') and repo.revs('%ln and secret()',
1450 outgoing.missingheads):
1454 outgoing.missingheads):
1451 part.addparam('targetphase', '%d' % phases.secret, mandatory=False)
1455 part.addparam('targetphase', '%d' % phases.secret, mandatory=False)
1452
1456
1453 addparttagsfnodescache(repo, bundler, outgoing)
1457 addparttagsfnodescache(repo, bundler, outgoing)
1454
1458
1455 if opts.get('obsolescence', False):
1459 if opts.get('obsolescence', False):
1456 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1460 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1457 buildobsmarkerspart(bundler, obsmarkers)
1461 buildobsmarkerspart(bundler, obsmarkers)
1458
1462
1459 if opts.get('phases', False):
1463 if opts.get('phases', False):
1460 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1464 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1461 phasedata = []
1465 phasedata = []
1462 for phase in phases.allphases:
1466 for phase in phases.allphases:
1463 for head in headsbyphase[phase]:
1467 for head in headsbyphase[phase]:
1464 phasedata.append(_pack(_fphasesentry, phase, head))
1468 phasedata.append(_pack(_fphasesentry, phase, head))
1465 bundler.newpart('phase-heads', data=''.join(phasedata))
1469 bundler.newpart('phase-heads', data=''.join(phasedata))
1466
1470
1467 def addparttagsfnodescache(repo, bundler, outgoing):
1471 def addparttagsfnodescache(repo, bundler, outgoing):
1468 # we include the tags fnode cache for the bundle changeset
1472 # we include the tags fnode cache for the bundle changeset
1469 # (as an optional parts)
1473 # (as an optional parts)
1470 cache = tags.hgtagsfnodescache(repo.unfiltered())
1474 cache = tags.hgtagsfnodescache(repo.unfiltered())
1471 chunks = []
1475 chunks = []
1472
1476
1473 # .hgtags fnodes are only relevant for head changesets. While we could
1477 # .hgtags fnodes are only relevant for head changesets. While we could
1474 # transfer values for all known nodes, there will likely be little to
1478 # transfer values for all known nodes, there will likely be little to
1475 # no benefit.
1479 # no benefit.
1476 #
1480 #
1477 # We don't bother using a generator to produce output data because
1481 # We don't bother using a generator to produce output data because
1478 # a) we only have 40 bytes per head and even esoteric numbers of heads
1482 # a) we only have 40 bytes per head and even esoteric numbers of heads
1479 # consume little memory (1M heads is 40MB) b) we don't want to send the
1483 # consume little memory (1M heads is 40MB) b) we don't want to send the
1480 # part if we don't have entries and knowing if we have entries requires
1484 # part if we don't have entries and knowing if we have entries requires
1481 # cache lookups.
1485 # cache lookups.
1482 for node in outgoing.missingheads:
1486 for node in outgoing.missingheads:
1483 # Don't compute missing, as this may slow down serving.
1487 # Don't compute missing, as this may slow down serving.
1484 fnode = cache.getfnode(node, computemissing=False)
1488 fnode = cache.getfnode(node, computemissing=False)
1485 if fnode is not None:
1489 if fnode is not None:
1486 chunks.extend([node, fnode])
1490 chunks.extend([node, fnode])
1487
1491
1488 if chunks:
1492 if chunks:
1489 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1493 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1490
1494
1491 def buildobsmarkerspart(bundler, markers):
1495 def buildobsmarkerspart(bundler, markers):
1492 """add an obsmarker part to the bundler with <markers>
1496 """add an obsmarker part to the bundler with <markers>
1493
1497
1494 No part is created if markers is empty.
1498 No part is created if markers is empty.
1495 Raises ValueError if the bundler doesn't support any known obsmarker format.
1499 Raises ValueError if the bundler doesn't support any known obsmarker format.
1496 """
1500 """
1497 if not markers:
1501 if not markers:
1498 return None
1502 return None
1499
1503
1500 remoteversions = obsmarkersversion(bundler.capabilities)
1504 remoteversions = obsmarkersversion(bundler.capabilities)
1501 version = obsolete.commonversion(remoteversions)
1505 version = obsolete.commonversion(remoteversions)
1502 if version is None:
1506 if version is None:
1503 raise ValueError('bundler does not support common obsmarker format')
1507 raise ValueError('bundler does not support common obsmarker format')
1504 stream = obsolete.encodemarkers(markers, True, version=version)
1508 stream = obsolete.encodemarkers(markers, True, version=version)
1505 return bundler.newpart('obsmarkers', data=stream)
1509 return bundler.newpart('obsmarkers', data=stream)
1506
1510
1507 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1511 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1508 compopts=None):
1512 compopts=None):
1509 """Write a bundle file and return its filename.
1513 """Write a bundle file and return its filename.
1510
1514
1511 Existing files will not be overwritten.
1515 Existing files will not be overwritten.
1512 If no filename is specified, a temporary file is created.
1516 If no filename is specified, a temporary file is created.
1513 bz2 compression can be turned off.
1517 bz2 compression can be turned off.
1514 The bundle file will be deleted in case of errors.
1518 The bundle file will be deleted in case of errors.
1515 """
1519 """
1516
1520
1517 if bundletype == "HG20":
1521 if bundletype == "HG20":
1518 bundle = bundle20(ui)
1522 bundle = bundle20(ui)
1519 bundle.setcompression(compression, compopts)
1523 bundle.setcompression(compression, compopts)
1520 part = bundle.newpart('changegroup', data=cg.getchunks())
1524 part = bundle.newpart('changegroup', data=cg.getchunks())
1521 part.addparam('version', cg.version)
1525 part.addparam('version', cg.version)
1522 if 'clcount' in cg.extras:
1526 if 'clcount' in cg.extras:
1523 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1527 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1524 mandatory=False)
1528 mandatory=False)
1525 chunkiter = bundle.getchunks()
1529 chunkiter = bundle.getchunks()
1526 else:
1530 else:
1527 # compression argument is only for the bundle2 case
1531 # compression argument is only for the bundle2 case
1528 assert compression is None
1532 assert compression is None
1529 if cg.version != '01':
1533 if cg.version != '01':
1530 raise error.Abort(_('old bundle types only supports v1 '
1534 raise error.Abort(_('old bundle types only supports v1 '
1531 'changegroups'))
1535 'changegroups'))
1532 header, comp = bundletypes[bundletype]
1536 header, comp = bundletypes[bundletype]
1533 if comp not in util.compengines.supportedbundletypes:
1537 if comp not in util.compengines.supportedbundletypes:
1534 raise error.Abort(_('unknown stream compression type: %s')
1538 raise error.Abort(_('unknown stream compression type: %s')
1535 % comp)
1539 % comp)
1536 compengine = util.compengines.forbundletype(comp)
1540 compengine = util.compengines.forbundletype(comp)
1537 def chunkiter():
1541 def chunkiter():
1538 yield header
1542 yield header
1539 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1543 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1540 yield chunk
1544 yield chunk
1541 chunkiter = chunkiter()
1545 chunkiter = chunkiter()
1542
1546
1543 # parse the changegroup data, otherwise we will block
1547 # parse the changegroup data, otherwise we will block
1544 # in case of sshrepo because we don't know the end of the stream
1548 # in case of sshrepo because we don't know the end of the stream
1545 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1549 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1546
1550
1547 def combinechangegroupresults(op):
1551 def combinechangegroupresults(op):
1548 """logic to combine 0 or more addchangegroup results into one"""
1552 """logic to combine 0 or more addchangegroup results into one"""
1549 results = [r.get('return', 0)
1553 results = [r.get('return', 0)
1550 for r in op.records['changegroup']]
1554 for r in op.records['changegroup']]
1551 changedheads = 0
1555 changedheads = 0
1552 result = 1
1556 result = 1
1553 for ret in results:
1557 for ret in results:
1554 # If any changegroup result is 0, return 0
1558 # If any changegroup result is 0, return 0
1555 if ret == 0:
1559 if ret == 0:
1556 result = 0
1560 result = 0
1557 break
1561 break
1558 if ret < -1:
1562 if ret < -1:
1559 changedheads += ret + 1
1563 changedheads += ret + 1
1560 elif ret > 1:
1564 elif ret > 1:
1561 changedheads += ret - 1
1565 changedheads += ret - 1
1562 if changedheads > 0:
1566 if changedheads > 0:
1563 result = 1 + changedheads
1567 result = 1 + changedheads
1564 elif changedheads < 0:
1568 elif changedheads < 0:
1565 result = -1 + changedheads
1569 result = -1 + changedheads
1566 return result
1570 return result
1567
1571
1568 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest',
1572 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest',
1569 'targetphase'))
1573 'targetphase'))
1570 def handlechangegroup(op, inpart):
1574 def handlechangegroup(op, inpart):
1571 """apply a changegroup part on the repo
1575 """apply a changegroup part on the repo
1572
1576
1573 This is a very early implementation that will massive rework before being
1577 This is a very early implementation that will massive rework before being
1574 inflicted to any end-user.
1578 inflicted to any end-user.
1575 """
1579 """
1576 tr = op.gettransaction()
1580 tr = op.gettransaction()
1577 unpackerversion = inpart.params.get('version', '01')
1581 unpackerversion = inpart.params.get('version', '01')
1578 # We should raise an appropriate exception here
1582 # We should raise an appropriate exception here
1579 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1583 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1580 # the source and url passed here are overwritten by the one contained in
1584 # the source and url passed here are overwritten by the one contained in
1581 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1585 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1582 nbchangesets = None
1586 nbchangesets = None
1583 if 'nbchanges' in inpart.params:
1587 if 'nbchanges' in inpart.params:
1584 nbchangesets = int(inpart.params.get('nbchanges'))
1588 nbchangesets = int(inpart.params.get('nbchanges'))
1585 if ('treemanifest' in inpart.params and
1589 if ('treemanifest' in inpart.params and
1586 'treemanifest' not in op.repo.requirements):
1590 'treemanifest' not in op.repo.requirements):
1587 if len(op.repo.changelog) != 0:
1591 if len(op.repo.changelog) != 0:
1588 raise error.Abort(_(
1592 raise error.Abort(_(
1589 "bundle contains tree manifests, but local repo is "
1593 "bundle contains tree manifests, but local repo is "
1590 "non-empty and does not use tree manifests"))
1594 "non-empty and does not use tree manifests"))
1591 op.repo.requirements.add('treemanifest')
1595 op.repo.requirements.add('treemanifest')
1592 op.repo._applyopenerreqs()
1596 op.repo._applyopenerreqs()
1593 op.repo._writerequirements()
1597 op.repo._writerequirements()
1594 extrakwargs = {}
1598 extrakwargs = {}
1595 targetphase = inpart.params.get('targetphase')
1599 targetphase = inpart.params.get('targetphase')
1596 if targetphase is not None:
1600 if targetphase is not None:
1597 extrakwargs['targetphase'] = int(targetphase)
1601 extrakwargs['targetphase'] = int(targetphase)
1598 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2',
1602 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2',
1599 expectedtotal=nbchangesets, **extrakwargs)
1603 expectedtotal=nbchangesets, **extrakwargs)
1600 if op.reply is not None:
1604 if op.reply is not None:
1601 # This is definitely not the final form of this
1605 # This is definitely not the final form of this
1602 # return. But one need to start somewhere.
1606 # return. But one need to start somewhere.
1603 part = op.reply.newpart('reply:changegroup', mandatory=False)
1607 part = op.reply.newpart('reply:changegroup', mandatory=False)
1604 part.addparam(
1608 part.addparam(
1605 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1609 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1606 part.addparam('return', '%i' % ret, mandatory=False)
1610 part.addparam('return', '%i' % ret, mandatory=False)
1607 assert not inpart.read()
1611 assert not inpart.read()
1608
1612
1609 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1613 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1610 ['digest:%s' % k for k in util.DIGESTS.keys()])
1614 ['digest:%s' % k for k in util.DIGESTS.keys()])
1611 @parthandler('remote-changegroup', _remotechangegroupparams)
1615 @parthandler('remote-changegroup', _remotechangegroupparams)
1612 def handleremotechangegroup(op, inpart):
1616 def handleremotechangegroup(op, inpart):
1613 """apply a bundle10 on the repo, given an url and validation information
1617 """apply a bundle10 on the repo, given an url and validation information
1614
1618
1615 All the information about the remote bundle to import are given as
1619 All the information about the remote bundle to import are given as
1616 parameters. The parameters include:
1620 parameters. The parameters include:
1617 - url: the url to the bundle10.
1621 - url: the url to the bundle10.
1618 - size: the bundle10 file size. It is used to validate what was
1622 - size: the bundle10 file size. It is used to validate what was
1619 retrieved by the client matches the server knowledge about the bundle.
1623 retrieved by the client matches the server knowledge about the bundle.
1620 - digests: a space separated list of the digest types provided as
1624 - digests: a space separated list of the digest types provided as
1621 parameters.
1625 parameters.
1622 - digest:<digest-type>: the hexadecimal representation of the digest with
1626 - digest:<digest-type>: the hexadecimal representation of the digest with
1623 that name. Like the size, it is used to validate what was retrieved by
1627 that name. Like the size, it is used to validate what was retrieved by
1624 the client matches what the server knows about the bundle.
1628 the client matches what the server knows about the bundle.
1625
1629
1626 When multiple digest types are given, all of them are checked.
1630 When multiple digest types are given, all of them are checked.
1627 """
1631 """
1628 try:
1632 try:
1629 raw_url = inpart.params['url']
1633 raw_url = inpart.params['url']
1630 except KeyError:
1634 except KeyError:
1631 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1635 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1632 parsed_url = util.url(raw_url)
1636 parsed_url = util.url(raw_url)
1633 if parsed_url.scheme not in capabilities['remote-changegroup']:
1637 if parsed_url.scheme not in capabilities['remote-changegroup']:
1634 raise error.Abort(_('remote-changegroup does not support %s urls') %
1638 raise error.Abort(_('remote-changegroup does not support %s urls') %
1635 parsed_url.scheme)
1639 parsed_url.scheme)
1636
1640
1637 try:
1641 try:
1638 size = int(inpart.params['size'])
1642 size = int(inpart.params['size'])
1639 except ValueError:
1643 except ValueError:
1640 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1644 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1641 % 'size')
1645 % 'size')
1642 except KeyError:
1646 except KeyError:
1643 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1647 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1644
1648
1645 digests = {}
1649 digests = {}
1646 for typ in inpart.params.get('digests', '').split():
1650 for typ in inpart.params.get('digests', '').split():
1647 param = 'digest:%s' % typ
1651 param = 'digest:%s' % typ
1648 try:
1652 try:
1649 value = inpart.params[param]
1653 value = inpart.params[param]
1650 except KeyError:
1654 except KeyError:
1651 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1655 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1652 param)
1656 param)
1653 digests[typ] = value
1657 digests[typ] = value
1654
1658
1655 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1659 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1656
1660
1657 tr = op.gettransaction()
1661 tr = op.gettransaction()
1658 from . import exchange
1662 from . import exchange
1659 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1663 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1660 if not isinstance(cg, changegroup.cg1unpacker):
1664 if not isinstance(cg, changegroup.cg1unpacker):
1661 raise error.Abort(_('%s: not a bundle version 1.0') %
1665 raise error.Abort(_('%s: not a bundle version 1.0') %
1662 util.hidepassword(raw_url))
1666 util.hidepassword(raw_url))
1663 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2')
1667 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2')
1664 if op.reply is not None:
1668 if op.reply is not None:
1665 # This is definitely not the final form of this
1669 # This is definitely not the final form of this
1666 # return. But one need to start somewhere.
1670 # return. But one need to start somewhere.
1667 part = op.reply.newpart('reply:changegroup')
1671 part = op.reply.newpart('reply:changegroup')
1668 part.addparam(
1672 part.addparam(
1669 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1673 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1670 part.addparam('return', '%i' % ret, mandatory=False)
1674 part.addparam('return', '%i' % ret, mandatory=False)
1671 try:
1675 try:
1672 real_part.validate()
1676 real_part.validate()
1673 except error.Abort as e:
1677 except error.Abort as e:
1674 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1678 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1675 (util.hidepassword(raw_url), str(e)))
1679 (util.hidepassword(raw_url), str(e)))
1676 assert not inpart.read()
1680 assert not inpart.read()
1677
1681
1678 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1682 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1679 def handlereplychangegroup(op, inpart):
1683 def handlereplychangegroup(op, inpart):
1680 ret = int(inpart.params['return'])
1684 ret = int(inpart.params['return'])
1681 replyto = int(inpart.params['in-reply-to'])
1685 replyto = int(inpart.params['in-reply-to'])
1682 op.records.add('changegroup', {'return': ret}, replyto)
1686 op.records.add('changegroup', {'return': ret}, replyto)
1683
1687
1684 @parthandler('check:heads')
1688 @parthandler('check:heads')
1685 def handlecheckheads(op, inpart):
1689 def handlecheckheads(op, inpart):
1686 """check that head of the repo did not change
1690 """check that head of the repo did not change
1687
1691
1688 This is used to detect a push race when using unbundle.
1692 This is used to detect a push race when using unbundle.
1689 This replaces the "heads" argument of unbundle."""
1693 This replaces the "heads" argument of unbundle."""
1690 h = inpart.read(20)
1694 h = inpart.read(20)
1691 heads = []
1695 heads = []
1692 while len(h) == 20:
1696 while len(h) == 20:
1693 heads.append(h)
1697 heads.append(h)
1694 h = inpart.read(20)
1698 h = inpart.read(20)
1695 assert not h
1699 assert not h
1696 # Trigger a transaction so that we are guaranteed to have the lock now.
1700 # Trigger a transaction so that we are guaranteed to have the lock now.
1697 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1701 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1698 op.gettransaction()
1702 op.gettransaction()
1699 if sorted(heads) != sorted(op.repo.heads()):
1703 if sorted(heads) != sorted(op.repo.heads()):
1700 raise error.PushRaced('repository changed while pushing - '
1704 raise error.PushRaced('repository changed while pushing - '
1701 'please try again')
1705 'please try again')
1702
1706
1703 @parthandler('check:updated-heads')
1707 @parthandler('check:updated-heads')
1704 def handlecheckupdatedheads(op, inpart):
1708 def handlecheckupdatedheads(op, inpart):
1705 """check for race on the heads touched by a push
1709 """check for race on the heads touched by a push
1706
1710
1707 This is similar to 'check:heads' but focus on the heads actually updated
1711 This is similar to 'check:heads' but focus on the heads actually updated
1708 during the push. If other activities happen on unrelated heads, it is
1712 during the push. If other activities happen on unrelated heads, it is
1709 ignored.
1713 ignored.
1710
1714
1711 This allow server with high traffic to avoid push contention as long as
1715 This allow server with high traffic to avoid push contention as long as
1712 unrelated parts of the graph are involved."""
1716 unrelated parts of the graph are involved."""
1713 h = inpart.read(20)
1717 h = inpart.read(20)
1714 heads = []
1718 heads = []
1715 while len(h) == 20:
1719 while len(h) == 20:
1716 heads.append(h)
1720 heads.append(h)
1717 h = inpart.read(20)
1721 h = inpart.read(20)
1718 assert not h
1722 assert not h
1719 # trigger a transaction so that we are guaranteed to have the lock now.
1723 # trigger a transaction so that we are guaranteed to have the lock now.
1720 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1724 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1721 op.gettransaction()
1725 op.gettransaction()
1722
1726
1723 currentheads = set()
1727 currentheads = set()
1724 for ls in op.repo.branchmap().itervalues():
1728 for ls in op.repo.branchmap().itervalues():
1725 currentheads.update(ls)
1729 currentheads.update(ls)
1726
1730
1727 for h in heads:
1731 for h in heads:
1728 if h not in currentheads:
1732 if h not in currentheads:
1729 raise error.PushRaced('repository changed while pushing - '
1733 raise error.PushRaced('repository changed while pushing - '
1730 'please try again')
1734 'please try again')
1731
1735
1732 @parthandler('output')
1736 @parthandler('output')
1733 def handleoutput(op, inpart):
1737 def handleoutput(op, inpart):
1734 """forward output captured on the server to the client"""
1738 """forward output captured on the server to the client"""
1735 for line in inpart.read().splitlines():
1739 for line in inpart.read().splitlines():
1736 op.ui.status(_('remote: %s\n') % line)
1740 op.ui.status(_('remote: %s\n') % line)
1737
1741
1738 @parthandler('replycaps')
1742 @parthandler('replycaps')
1739 def handlereplycaps(op, inpart):
1743 def handlereplycaps(op, inpart):
1740 """Notify that a reply bundle should be created
1744 """Notify that a reply bundle should be created
1741
1745
1742 The payload contains the capabilities information for the reply"""
1746 The payload contains the capabilities information for the reply"""
1743 caps = decodecaps(inpart.read())
1747 caps = decodecaps(inpart.read())
1744 if op.reply is None:
1748 if op.reply is None:
1745 op.reply = bundle20(op.ui, caps)
1749 op.reply = bundle20(op.ui, caps)
1746
1750
1747 class AbortFromPart(error.Abort):
1751 class AbortFromPart(error.Abort):
1748 """Sub-class of Abort that denotes an error from a bundle2 part."""
1752 """Sub-class of Abort that denotes an error from a bundle2 part."""
1749
1753
1750 @parthandler('error:abort', ('message', 'hint'))
1754 @parthandler('error:abort', ('message', 'hint'))
1751 def handleerrorabort(op, inpart):
1755 def handleerrorabort(op, inpart):
1752 """Used to transmit abort error over the wire"""
1756 """Used to transmit abort error over the wire"""
1753 raise AbortFromPart(inpart.params['message'],
1757 raise AbortFromPart(inpart.params['message'],
1754 hint=inpart.params.get('hint'))
1758 hint=inpart.params.get('hint'))
1755
1759
1756 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1760 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1757 'in-reply-to'))
1761 'in-reply-to'))
1758 def handleerrorpushkey(op, inpart):
1762 def handleerrorpushkey(op, inpart):
1759 """Used to transmit failure of a mandatory pushkey over the wire"""
1763 """Used to transmit failure of a mandatory pushkey over the wire"""
1760 kwargs = {}
1764 kwargs = {}
1761 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1765 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1762 value = inpart.params.get(name)
1766 value = inpart.params.get(name)
1763 if value is not None:
1767 if value is not None:
1764 kwargs[name] = value
1768 kwargs[name] = value
1765 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1769 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1766
1770
1767 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1771 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1768 def handleerrorunsupportedcontent(op, inpart):
1772 def handleerrorunsupportedcontent(op, inpart):
1769 """Used to transmit unknown content error over the wire"""
1773 """Used to transmit unknown content error over the wire"""
1770 kwargs = {}
1774 kwargs = {}
1771 parttype = inpart.params.get('parttype')
1775 parttype = inpart.params.get('parttype')
1772 if parttype is not None:
1776 if parttype is not None:
1773 kwargs['parttype'] = parttype
1777 kwargs['parttype'] = parttype
1774 params = inpart.params.get('params')
1778 params = inpart.params.get('params')
1775 if params is not None:
1779 if params is not None:
1776 kwargs['params'] = params.split('\0')
1780 kwargs['params'] = params.split('\0')
1777
1781
1778 raise error.BundleUnknownFeatureError(**kwargs)
1782 raise error.BundleUnknownFeatureError(**kwargs)
1779
1783
1780 @parthandler('error:pushraced', ('message',))
1784 @parthandler('error:pushraced', ('message',))
1781 def handleerrorpushraced(op, inpart):
1785 def handleerrorpushraced(op, inpart):
1782 """Used to transmit push race error over the wire"""
1786 """Used to transmit push race error over the wire"""
1783 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1787 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1784
1788
1785 @parthandler('listkeys', ('namespace',))
1789 @parthandler('listkeys', ('namespace',))
1786 def handlelistkeys(op, inpart):
1790 def handlelistkeys(op, inpart):
1787 """retrieve pushkey namespace content stored in a bundle2"""
1791 """retrieve pushkey namespace content stored in a bundle2"""
1788 namespace = inpart.params['namespace']
1792 namespace = inpart.params['namespace']
1789 r = pushkey.decodekeys(inpart.read())
1793 r = pushkey.decodekeys(inpart.read())
1790 op.records.add('listkeys', (namespace, r))
1794 op.records.add('listkeys', (namespace, r))
1791
1795
1792 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1796 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1793 def handlepushkey(op, inpart):
1797 def handlepushkey(op, inpart):
1794 """process a pushkey request"""
1798 """process a pushkey request"""
1795 dec = pushkey.decode
1799 dec = pushkey.decode
1796 namespace = dec(inpart.params['namespace'])
1800 namespace = dec(inpart.params['namespace'])
1797 key = dec(inpart.params['key'])
1801 key = dec(inpart.params['key'])
1798 old = dec(inpart.params['old'])
1802 old = dec(inpart.params['old'])
1799 new = dec(inpart.params['new'])
1803 new = dec(inpart.params['new'])
1800 # Grab the transaction to ensure that we have the lock before performing the
1804 # Grab the transaction to ensure that we have the lock before performing the
1801 # pushkey.
1805 # pushkey.
1802 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1806 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1803 op.gettransaction()
1807 op.gettransaction()
1804 ret = op.repo.pushkey(namespace, key, old, new)
1808 ret = op.repo.pushkey(namespace, key, old, new)
1805 record = {'namespace': namespace,
1809 record = {'namespace': namespace,
1806 'key': key,
1810 'key': key,
1807 'old': old,
1811 'old': old,
1808 'new': new}
1812 'new': new}
1809 op.records.add('pushkey', record)
1813 op.records.add('pushkey', record)
1810 if op.reply is not None:
1814 if op.reply is not None:
1811 rpart = op.reply.newpart('reply:pushkey')
1815 rpart = op.reply.newpart('reply:pushkey')
1812 rpart.addparam(
1816 rpart.addparam(
1813 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1817 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1814 rpart.addparam('return', '%i' % ret, mandatory=False)
1818 rpart.addparam('return', '%i' % ret, mandatory=False)
1815 if inpart.mandatory and not ret:
1819 if inpart.mandatory and not ret:
1816 kwargs = {}
1820 kwargs = {}
1817 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1821 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1818 if key in inpart.params:
1822 if key in inpart.params:
1819 kwargs[key] = inpart.params[key]
1823 kwargs[key] = inpart.params[key]
1820 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1824 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1821
1825
1822 def _readphaseheads(inpart):
1826 def _readphaseheads(inpart):
1823 headsbyphase = [[] for i in phases.allphases]
1827 headsbyphase = [[] for i in phases.allphases]
1824 entrysize = struct.calcsize(_fphasesentry)
1828 entrysize = struct.calcsize(_fphasesentry)
1825 while True:
1829 while True:
1826 entry = inpart.read(entrysize)
1830 entry = inpart.read(entrysize)
1827 if len(entry) < entrysize:
1831 if len(entry) < entrysize:
1828 if entry:
1832 if entry:
1829 raise error.Abort(_('bad phase-heads bundle part'))
1833 raise error.Abort(_('bad phase-heads bundle part'))
1830 break
1834 break
1831 phase, node = struct.unpack(_fphasesentry, entry)
1835 phase, node = struct.unpack(_fphasesentry, entry)
1832 headsbyphase[phase].append(node)
1836 headsbyphase[phase].append(node)
1833 return headsbyphase
1837 return headsbyphase
1834
1838
1835 @parthandler('phase-heads')
1839 @parthandler('phase-heads')
1836 def handlephases(op, inpart):
1840 def handlephases(op, inpart):
1837 """apply phases from bundle part to repo"""
1841 """apply phases from bundle part to repo"""
1838 headsbyphase = _readphaseheads(inpart)
1842 headsbyphase = _readphaseheads(inpart)
1839 phases.updatephases(op.repo.unfiltered(), op.gettransaction(), headsbyphase)
1843 phases.updatephases(op.repo.unfiltered(), op.gettransaction(), headsbyphase)
1840 op.records.add('phase-heads', {})
1844 op.records.add('phase-heads', {})
1841
1845
1842 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1846 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1843 def handlepushkeyreply(op, inpart):
1847 def handlepushkeyreply(op, inpart):
1844 """retrieve the result of a pushkey request"""
1848 """retrieve the result of a pushkey request"""
1845 ret = int(inpart.params['return'])
1849 ret = int(inpart.params['return'])
1846 partid = int(inpart.params['in-reply-to'])
1850 partid = int(inpart.params['in-reply-to'])
1847 op.records.add('pushkey', {'return': ret}, partid)
1851 op.records.add('pushkey', {'return': ret}, partid)
1848
1852
1849 @parthandler('obsmarkers')
1853 @parthandler('obsmarkers')
1850 def handleobsmarker(op, inpart):
1854 def handleobsmarker(op, inpart):
1851 """add a stream of obsmarkers to the repo"""
1855 """add a stream of obsmarkers to the repo"""
1852 tr = op.gettransaction()
1856 tr = op.gettransaction()
1853 markerdata = inpart.read()
1857 markerdata = inpart.read()
1854 if op.ui.config('experimental', 'obsmarkers-exchange-debug'):
1858 if op.ui.config('experimental', 'obsmarkers-exchange-debug'):
1855 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1859 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1856 % len(markerdata))
1860 % len(markerdata))
1857 # The mergemarkers call will crash if marker creation is not enabled.
1861 # The mergemarkers call will crash if marker creation is not enabled.
1858 # we want to avoid this if the part is advisory.
1862 # we want to avoid this if the part is advisory.
1859 if not inpart.mandatory and op.repo.obsstore.readonly:
1863 if not inpart.mandatory and op.repo.obsstore.readonly:
1860 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1864 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1861 return
1865 return
1862 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1866 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1863 op.repo.invalidatevolatilesets()
1867 op.repo.invalidatevolatilesets()
1864 if new:
1868 if new:
1865 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1869 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1866 op.records.add('obsmarkers', {'new': new})
1870 op.records.add('obsmarkers', {'new': new})
1867 if op.reply is not None:
1871 if op.reply is not None:
1868 rpart = op.reply.newpart('reply:obsmarkers')
1872 rpart = op.reply.newpart('reply:obsmarkers')
1869 rpart.addparam(
1873 rpart.addparam(
1870 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1874 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1871 rpart.addparam('new', '%i' % new, mandatory=False)
1875 rpart.addparam('new', '%i' % new, mandatory=False)
1872
1876
1873
1877
1874 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1878 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1875 def handleobsmarkerreply(op, inpart):
1879 def handleobsmarkerreply(op, inpart):
1876 """retrieve the result of a pushkey request"""
1880 """retrieve the result of a pushkey request"""
1877 ret = int(inpart.params['new'])
1881 ret = int(inpart.params['new'])
1878 partid = int(inpart.params['in-reply-to'])
1882 partid = int(inpart.params['in-reply-to'])
1879 op.records.add('obsmarkers', {'new': ret}, partid)
1883 op.records.add('obsmarkers', {'new': ret}, partid)
1880
1884
1881 @parthandler('hgtagsfnodes')
1885 @parthandler('hgtagsfnodes')
1882 def handlehgtagsfnodes(op, inpart):
1886 def handlehgtagsfnodes(op, inpart):
1883 """Applies .hgtags fnodes cache entries to the local repo.
1887 """Applies .hgtags fnodes cache entries to the local repo.
1884
1888
1885 Payload is pairs of 20 byte changeset nodes and filenodes.
1889 Payload is pairs of 20 byte changeset nodes and filenodes.
1886 """
1890 """
1887 # Grab the transaction so we ensure that we have the lock at this point.
1891 # Grab the transaction so we ensure that we have the lock at this point.
1888 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1892 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1889 op.gettransaction()
1893 op.gettransaction()
1890 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1894 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1891
1895
1892 count = 0
1896 count = 0
1893 while True:
1897 while True:
1894 node = inpart.read(20)
1898 node = inpart.read(20)
1895 fnode = inpart.read(20)
1899 fnode = inpart.read(20)
1896 if len(node) < 20 or len(fnode) < 20:
1900 if len(node) < 20 or len(fnode) < 20:
1897 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1901 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1898 break
1902 break
1899 cache.setfnode(node, fnode)
1903 cache.setfnode(node, fnode)
1900 count += 1
1904 count += 1
1901
1905
1902 cache.write()
1906 cache.write()
1903 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
1907 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
1904
1908
1905 @parthandler('pushvars')
1909 @parthandler('pushvars')
1906 def bundle2getvars(op, part):
1910 def bundle2getvars(op, part):
1907 '''unbundle a bundle2 containing shellvars on the server'''
1911 '''unbundle a bundle2 containing shellvars on the server'''
1908 # An option to disable unbundling on server-side for security reasons
1912 # An option to disable unbundling on server-side for security reasons
1909 if op.ui.configbool('push', 'pushvars.server'):
1913 if op.ui.configbool('push', 'pushvars.server'):
1910 hookargs = {}
1914 hookargs = {}
1911 for key, value in part.advisoryparams:
1915 for key, value in part.advisoryparams:
1912 key = key.upper()
1916 key = key.upper()
1913 # We want pushed variables to have USERVAR_ prepended so we know
1917 # We want pushed variables to have USERVAR_ prepended so we know
1914 # they came from the --pushvar flag.
1918 # they came from the --pushvar flag.
1915 key = "USERVAR_" + key
1919 key = "USERVAR_" + key
1916 hookargs[key] = value
1920 hookargs[key] = value
1917 op.addhookargs(hookargs)
1921 op.addhookargs(hookargs)
General Comments 0
You need to be logged in to leave comments. Login now