##// END OF EJS Templates
bundle2: make source a mandatory argument for bundle2.applybundle() (API)...
Pulkit Goyal -
r37255:684a6a26 default
parent child Browse files
Show More
@@ -1,2263 +1,2263
1 # bundle2.py - generic container format to transmit arbitrary data.
1 # bundle2.py - generic container format to transmit arbitrary data.
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7 """Handling of the new bundle2 format
7 """Handling of the new bundle2 format
8
8
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 payloads in an application agnostic way. It consist in a sequence of "parts"
10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 that will be handed to and processed by the application layer.
11 that will be handed to and processed by the application layer.
12
12
13
13
14 General format architecture
14 General format architecture
15 ===========================
15 ===========================
16
16
17 The format is architectured as follow
17 The format is architectured as follow
18
18
19 - magic string
19 - magic string
20 - stream level parameters
20 - stream level parameters
21 - payload parts (any number)
21 - payload parts (any number)
22 - end of stream marker.
22 - end of stream marker.
23
23
24 the Binary format
24 the Binary format
25 ============================
25 ============================
26
26
27 All numbers are unsigned and big-endian.
27 All numbers are unsigned and big-endian.
28
28
29 stream level parameters
29 stream level parameters
30 ------------------------
30 ------------------------
31
31
32 Binary format is as follow
32 Binary format is as follow
33
33
34 :params size: int32
34 :params size: int32
35
35
36 The total number of Bytes used by the parameters
36 The total number of Bytes used by the parameters
37
37
38 :params value: arbitrary number of Bytes
38 :params value: arbitrary number of Bytes
39
39
40 A blob of `params size` containing the serialized version of all stream level
40 A blob of `params size` containing the serialized version of all stream level
41 parameters.
41 parameters.
42
42
43 The blob contains a space separated list of parameters. Parameters with value
43 The blob contains a space separated list of parameters. Parameters with value
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45
45
46 Empty name are obviously forbidden.
46 Empty name are obviously forbidden.
47
47
48 Name MUST start with a letter. If this first letter is lower case, the
48 Name MUST start with a letter. If this first letter is lower case, the
49 parameter is advisory and can be safely ignored. However when the first
49 parameter is advisory and can be safely ignored. However when the first
50 letter is capital, the parameter is mandatory and the bundling process MUST
50 letter is capital, the parameter is mandatory and the bundling process MUST
51 stop if he is not able to proceed it.
51 stop if he is not able to proceed it.
52
52
53 Stream parameters use a simple textual format for two main reasons:
53 Stream parameters use a simple textual format for two main reasons:
54
54
55 - Stream level parameters should remain simple and we want to discourage any
55 - Stream level parameters should remain simple and we want to discourage any
56 crazy usage.
56 crazy usage.
57 - Textual data allow easy human inspection of a bundle2 header in case of
57 - Textual data allow easy human inspection of a bundle2 header in case of
58 troubles.
58 troubles.
59
59
60 Any Applicative level options MUST go into a bundle2 part instead.
60 Any Applicative level options MUST go into a bundle2 part instead.
61
61
62 Payload part
62 Payload part
63 ------------------------
63 ------------------------
64
64
65 Binary format is as follow
65 Binary format is as follow
66
66
67 :header size: int32
67 :header size: int32
68
68
69 The total number of Bytes used by the part header. When the header is empty
69 The total number of Bytes used by the part header. When the header is empty
70 (size = 0) this is interpreted as the end of stream marker.
70 (size = 0) this is interpreted as the end of stream marker.
71
71
72 :header:
72 :header:
73
73
74 The header defines how to interpret the part. It contains two piece of
74 The header defines how to interpret the part. It contains two piece of
75 data: the part type, and the part parameters.
75 data: the part type, and the part parameters.
76
76
77 The part type is used to route an application level handler, that can
77 The part type is used to route an application level handler, that can
78 interpret payload.
78 interpret payload.
79
79
80 Part parameters are passed to the application level handler. They are
80 Part parameters are passed to the application level handler. They are
81 meant to convey information that will help the application level object to
81 meant to convey information that will help the application level object to
82 interpret the part payload.
82 interpret the part payload.
83
83
84 The binary format of the header is has follow
84 The binary format of the header is has follow
85
85
86 :typesize: (one byte)
86 :typesize: (one byte)
87
87
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89
89
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 to this part.
91 to this part.
92
92
93 :parameters:
93 :parameters:
94
94
95 Part's parameter may have arbitrary content, the binary structure is::
95 Part's parameter may have arbitrary content, the binary structure is::
96
96
97 <mandatory-count><advisory-count><param-sizes><param-data>
97 <mandatory-count><advisory-count><param-sizes><param-data>
98
98
99 :mandatory-count: 1 byte, number of mandatory parameters
99 :mandatory-count: 1 byte, number of mandatory parameters
100
100
101 :advisory-count: 1 byte, number of advisory parameters
101 :advisory-count: 1 byte, number of advisory parameters
102
102
103 :param-sizes:
103 :param-sizes:
104
104
105 N couple of bytes, where N is the total number of parameters. Each
105 N couple of bytes, where N is the total number of parameters. Each
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107
107
108 :param-data:
108 :param-data:
109
109
110 A blob of bytes from which each parameter key and value can be
110 A blob of bytes from which each parameter key and value can be
111 retrieved using the list of size couples stored in the previous
111 retrieved using the list of size couples stored in the previous
112 field.
112 field.
113
113
114 Mandatory parameters comes first, then the advisory ones.
114 Mandatory parameters comes first, then the advisory ones.
115
115
116 Each parameter's key MUST be unique within the part.
116 Each parameter's key MUST be unique within the part.
117
117
118 :payload:
118 :payload:
119
119
120 payload is a series of `<chunksize><chunkdata>`.
120 payload is a series of `<chunksize><chunkdata>`.
121
121
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124
124
125 The current implementation always produces either zero or one chunk.
125 The current implementation always produces either zero or one chunk.
126 This is an implementation limitation that will ultimately be lifted.
126 This is an implementation limitation that will ultimately be lifted.
127
127
128 `chunksize` can be negative to trigger special case processing. No such
128 `chunksize` can be negative to trigger special case processing. No such
129 processing is in place yet.
129 processing is in place yet.
130
130
131 Bundle processing
131 Bundle processing
132 ============================
132 ============================
133
133
134 Each part is processed in order using a "part handler". Handler are registered
134 Each part is processed in order using a "part handler". Handler are registered
135 for a certain part type.
135 for a certain part type.
136
136
137 The matching of a part to its handler is case insensitive. The case of the
137 The matching of a part to its handler is case insensitive. The case of the
138 part type is used to know if a part is mandatory or advisory. If the Part type
138 part type is used to know if a part is mandatory or advisory. If the Part type
139 contains any uppercase char it is considered mandatory. When no handler is
139 contains any uppercase char it is considered mandatory. When no handler is
140 known for a Mandatory part, the process is aborted and an exception is raised.
140 known for a Mandatory part, the process is aborted and an exception is raised.
141 If the part is advisory and no handler is known, the part is ignored. When the
141 If the part is advisory and no handler is known, the part is ignored. When the
142 process is aborted, the full bundle is still read from the stream to keep the
142 process is aborted, the full bundle is still read from the stream to keep the
143 channel usable. But none of the part read from an abort are processed. In the
143 channel usable. But none of the part read from an abort are processed. In the
144 future, dropping the stream may become an option for channel we do not care to
144 future, dropping the stream may become an option for channel we do not care to
145 preserve.
145 preserve.
146 """
146 """
147
147
148 from __future__ import absolute_import, division
148 from __future__ import absolute_import, division
149
149
150 import collections
150 import collections
151 import errno
151 import errno
152 import os
152 import os
153 import re
153 import re
154 import string
154 import string
155 import struct
155 import struct
156 import sys
156 import sys
157
157
158 from .i18n import _
158 from .i18n import _
159 from . import (
159 from . import (
160 bookmarks,
160 bookmarks,
161 changegroup,
161 changegroup,
162 encoding,
162 encoding,
163 error,
163 error,
164 node as nodemod,
164 node as nodemod,
165 obsolete,
165 obsolete,
166 phases,
166 phases,
167 pushkey,
167 pushkey,
168 pycompat,
168 pycompat,
169 streamclone,
169 streamclone,
170 tags,
170 tags,
171 url,
171 url,
172 util,
172 util,
173 )
173 )
174 from .utils import (
174 from .utils import (
175 stringutil,
175 stringutil,
176 )
176 )
177
177
178 urlerr = util.urlerr
178 urlerr = util.urlerr
179 urlreq = util.urlreq
179 urlreq = util.urlreq
180
180
181 _pack = struct.pack
181 _pack = struct.pack
182 _unpack = struct.unpack
182 _unpack = struct.unpack
183
183
184 _fstreamparamsize = '>i'
184 _fstreamparamsize = '>i'
185 _fpartheadersize = '>i'
185 _fpartheadersize = '>i'
186 _fparttypesize = '>B'
186 _fparttypesize = '>B'
187 _fpartid = '>I'
187 _fpartid = '>I'
188 _fpayloadsize = '>i'
188 _fpayloadsize = '>i'
189 _fpartparamcount = '>BB'
189 _fpartparamcount = '>BB'
190
190
191 preferedchunksize = 32768
191 preferedchunksize = 32768
192
192
193 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
193 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
194
194
195 def outdebug(ui, message):
195 def outdebug(ui, message):
196 """debug regarding output stream (bundling)"""
196 """debug regarding output stream (bundling)"""
197 if ui.configbool('devel', 'bundle2.debug'):
197 if ui.configbool('devel', 'bundle2.debug'):
198 ui.debug('bundle2-output: %s\n' % message)
198 ui.debug('bundle2-output: %s\n' % message)
199
199
200 def indebug(ui, message):
200 def indebug(ui, message):
201 """debug on input stream (unbundling)"""
201 """debug on input stream (unbundling)"""
202 if ui.configbool('devel', 'bundle2.debug'):
202 if ui.configbool('devel', 'bundle2.debug'):
203 ui.debug('bundle2-input: %s\n' % message)
203 ui.debug('bundle2-input: %s\n' % message)
204
204
205 def validateparttype(parttype):
205 def validateparttype(parttype):
206 """raise ValueError if a parttype contains invalid character"""
206 """raise ValueError if a parttype contains invalid character"""
207 if _parttypeforbidden.search(parttype):
207 if _parttypeforbidden.search(parttype):
208 raise ValueError(parttype)
208 raise ValueError(parttype)
209
209
210 def _makefpartparamsizes(nbparams):
210 def _makefpartparamsizes(nbparams):
211 """return a struct format to read part parameter sizes
211 """return a struct format to read part parameter sizes
212
212
213 The number parameters is variable so we need to build that format
213 The number parameters is variable so we need to build that format
214 dynamically.
214 dynamically.
215 """
215 """
216 return '>'+('BB'*nbparams)
216 return '>'+('BB'*nbparams)
217
217
218 parthandlermapping = {}
218 parthandlermapping = {}
219
219
220 def parthandler(parttype, params=()):
220 def parthandler(parttype, params=()):
221 """decorator that register a function as a bundle2 part handler
221 """decorator that register a function as a bundle2 part handler
222
222
223 eg::
223 eg::
224
224
225 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
225 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
226 def myparttypehandler(...):
226 def myparttypehandler(...):
227 '''process a part of type "my part".'''
227 '''process a part of type "my part".'''
228 ...
228 ...
229 """
229 """
230 validateparttype(parttype)
230 validateparttype(parttype)
231 def _decorator(func):
231 def _decorator(func):
232 lparttype = parttype.lower() # enforce lower case matching.
232 lparttype = parttype.lower() # enforce lower case matching.
233 assert lparttype not in parthandlermapping
233 assert lparttype not in parthandlermapping
234 parthandlermapping[lparttype] = func
234 parthandlermapping[lparttype] = func
235 func.params = frozenset(params)
235 func.params = frozenset(params)
236 return func
236 return func
237 return _decorator
237 return _decorator
238
238
239 class unbundlerecords(object):
239 class unbundlerecords(object):
240 """keep record of what happens during and unbundle
240 """keep record of what happens during and unbundle
241
241
242 New records are added using `records.add('cat', obj)`. Where 'cat' is a
242 New records are added using `records.add('cat', obj)`. Where 'cat' is a
243 category of record and obj is an arbitrary object.
243 category of record and obj is an arbitrary object.
244
244
245 `records['cat']` will return all entries of this category 'cat'.
245 `records['cat']` will return all entries of this category 'cat'.
246
246
247 Iterating on the object itself will yield `('category', obj)` tuples
247 Iterating on the object itself will yield `('category', obj)` tuples
248 for all entries.
248 for all entries.
249
249
250 All iterations happens in chronological order.
250 All iterations happens in chronological order.
251 """
251 """
252
252
253 def __init__(self):
253 def __init__(self):
254 self._categories = {}
254 self._categories = {}
255 self._sequences = []
255 self._sequences = []
256 self._replies = {}
256 self._replies = {}
257
257
258 def add(self, category, entry, inreplyto=None):
258 def add(self, category, entry, inreplyto=None):
259 """add a new record of a given category.
259 """add a new record of a given category.
260
260
261 The entry can then be retrieved in the list returned by
261 The entry can then be retrieved in the list returned by
262 self['category']."""
262 self['category']."""
263 self._categories.setdefault(category, []).append(entry)
263 self._categories.setdefault(category, []).append(entry)
264 self._sequences.append((category, entry))
264 self._sequences.append((category, entry))
265 if inreplyto is not None:
265 if inreplyto is not None:
266 self.getreplies(inreplyto).add(category, entry)
266 self.getreplies(inreplyto).add(category, entry)
267
267
268 def getreplies(self, partid):
268 def getreplies(self, partid):
269 """get the records that are replies to a specific part"""
269 """get the records that are replies to a specific part"""
270 return self._replies.setdefault(partid, unbundlerecords())
270 return self._replies.setdefault(partid, unbundlerecords())
271
271
272 def __getitem__(self, cat):
272 def __getitem__(self, cat):
273 return tuple(self._categories.get(cat, ()))
273 return tuple(self._categories.get(cat, ()))
274
274
275 def __iter__(self):
275 def __iter__(self):
276 return iter(self._sequences)
276 return iter(self._sequences)
277
277
278 def __len__(self):
278 def __len__(self):
279 return len(self._sequences)
279 return len(self._sequences)
280
280
281 def __nonzero__(self):
281 def __nonzero__(self):
282 return bool(self._sequences)
282 return bool(self._sequences)
283
283
284 __bool__ = __nonzero__
284 __bool__ = __nonzero__
285
285
286 class bundleoperation(object):
286 class bundleoperation(object):
287 """an object that represents a single bundling process
287 """an object that represents a single bundling process
288
288
289 Its purpose is to carry unbundle-related objects and states.
289 Its purpose is to carry unbundle-related objects and states.
290
290
291 A new object should be created at the beginning of each bundle processing.
291 A new object should be created at the beginning of each bundle processing.
292 The object is to be returned by the processing function.
292 The object is to be returned by the processing function.
293
293
294 The object has very little content now it will ultimately contain:
294 The object has very little content now it will ultimately contain:
295 * an access to the repo the bundle is applied to,
295 * an access to the repo the bundle is applied to,
296 * a ui object,
296 * a ui object,
297 * a way to retrieve a transaction to add changes to the repo,
297 * a way to retrieve a transaction to add changes to the repo,
298 * a way to record the result of processing each part,
298 * a way to record the result of processing each part,
299 * a way to construct a bundle response when applicable.
299 * a way to construct a bundle response when applicable.
300 """
300 """
301
301
302 def __init__(self, repo, transactiongetter, captureoutput=True, source=''):
302 def __init__(self, repo, transactiongetter, captureoutput=True, source=''):
303 self.repo = repo
303 self.repo = repo
304 self.ui = repo.ui
304 self.ui = repo.ui
305 self.records = unbundlerecords()
305 self.records = unbundlerecords()
306 self.reply = None
306 self.reply = None
307 self.captureoutput = captureoutput
307 self.captureoutput = captureoutput
308 self.hookargs = {}
308 self.hookargs = {}
309 self._gettransaction = transactiongetter
309 self._gettransaction = transactiongetter
310 # carries value that can modify part behavior
310 # carries value that can modify part behavior
311 self.modes = {}
311 self.modes = {}
312 self.source = source
312 self.source = source
313
313
314 def gettransaction(self):
314 def gettransaction(self):
315 transaction = self._gettransaction()
315 transaction = self._gettransaction()
316
316
317 if self.hookargs:
317 if self.hookargs:
318 # the ones added to the transaction supercede those added
318 # the ones added to the transaction supercede those added
319 # to the operation.
319 # to the operation.
320 self.hookargs.update(transaction.hookargs)
320 self.hookargs.update(transaction.hookargs)
321 transaction.hookargs = self.hookargs
321 transaction.hookargs = self.hookargs
322
322
323 # mark the hookargs as flushed. further attempts to add to
323 # mark the hookargs as flushed. further attempts to add to
324 # hookargs will result in an abort.
324 # hookargs will result in an abort.
325 self.hookargs = None
325 self.hookargs = None
326
326
327 return transaction
327 return transaction
328
328
329 def addhookargs(self, hookargs):
329 def addhookargs(self, hookargs):
330 if self.hookargs is None:
330 if self.hookargs is None:
331 raise error.ProgrammingError('attempted to add hookargs to '
331 raise error.ProgrammingError('attempted to add hookargs to '
332 'operation after transaction started')
332 'operation after transaction started')
333 self.hookargs.update(hookargs)
333 self.hookargs.update(hookargs)
334
334
335 class TransactionUnavailable(RuntimeError):
335 class TransactionUnavailable(RuntimeError):
336 pass
336 pass
337
337
338 def _notransaction():
338 def _notransaction():
339 """default method to get a transaction while processing a bundle
339 """default method to get a transaction while processing a bundle
340
340
341 Raise an exception to highlight the fact that no transaction was expected
341 Raise an exception to highlight the fact that no transaction was expected
342 to be created"""
342 to be created"""
343 raise TransactionUnavailable()
343 raise TransactionUnavailable()
344
344
345 def applybundle(repo, unbundler, tr, source=None, url=None, **kwargs):
345 def applybundle(repo, unbundler, tr, source, url=None, **kwargs):
346 # transform me into unbundler.apply() as soon as the freeze is lifted
346 # transform me into unbundler.apply() as soon as the freeze is lifted
347 if isinstance(unbundler, unbundle20):
347 if isinstance(unbundler, unbundle20):
348 tr.hookargs['bundle2'] = '1'
348 tr.hookargs['bundle2'] = '1'
349 if source is not None and 'source' not in tr.hookargs:
349 if source is not None and 'source' not in tr.hookargs:
350 tr.hookargs['source'] = source
350 tr.hookargs['source'] = source
351 if url is not None and 'url' not in tr.hookargs:
351 if url is not None and 'url' not in tr.hookargs:
352 tr.hookargs['url'] = url
352 tr.hookargs['url'] = url
353 return processbundle(repo, unbundler, lambda: tr, source=source)
353 return processbundle(repo, unbundler, lambda: tr, source=source)
354 else:
354 else:
355 # the transactiongetter won't be used, but we might as well set it
355 # the transactiongetter won't be used, but we might as well set it
356 op = bundleoperation(repo, lambda: tr, source=source)
356 op = bundleoperation(repo, lambda: tr, source=source)
357 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
357 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
358 return op
358 return op
359
359
360 class partiterator(object):
360 class partiterator(object):
361 def __init__(self, repo, op, unbundler):
361 def __init__(self, repo, op, unbundler):
362 self.repo = repo
362 self.repo = repo
363 self.op = op
363 self.op = op
364 self.unbundler = unbundler
364 self.unbundler = unbundler
365 self.iterator = None
365 self.iterator = None
366 self.count = 0
366 self.count = 0
367 self.current = None
367 self.current = None
368
368
369 def __enter__(self):
369 def __enter__(self):
370 def func():
370 def func():
371 itr = enumerate(self.unbundler.iterparts())
371 itr = enumerate(self.unbundler.iterparts())
372 for count, p in itr:
372 for count, p in itr:
373 self.count = count
373 self.count = count
374 self.current = p
374 self.current = p
375 yield p
375 yield p
376 p.consume()
376 p.consume()
377 self.current = None
377 self.current = None
378 self.iterator = func()
378 self.iterator = func()
379 return self.iterator
379 return self.iterator
380
380
381 def __exit__(self, type, exc, tb):
381 def __exit__(self, type, exc, tb):
382 if not self.iterator:
382 if not self.iterator:
383 return
383 return
384
384
385 # Only gracefully abort in a normal exception situation. User aborts
385 # Only gracefully abort in a normal exception situation. User aborts
386 # like Ctrl+C throw a KeyboardInterrupt which is not a base Exception,
386 # like Ctrl+C throw a KeyboardInterrupt which is not a base Exception,
387 # and should not gracefully cleanup.
387 # and should not gracefully cleanup.
388 if isinstance(exc, Exception):
388 if isinstance(exc, Exception):
389 # Any exceptions seeking to the end of the bundle at this point are
389 # Any exceptions seeking to the end of the bundle at this point are
390 # almost certainly related to the underlying stream being bad.
390 # almost certainly related to the underlying stream being bad.
391 # And, chances are that the exception we're handling is related to
391 # And, chances are that the exception we're handling is related to
392 # getting in that bad state. So, we swallow the seeking error and
392 # getting in that bad state. So, we swallow the seeking error and
393 # re-raise the original error.
393 # re-raise the original error.
394 seekerror = False
394 seekerror = False
395 try:
395 try:
396 if self.current:
396 if self.current:
397 # consume the part content to not corrupt the stream.
397 # consume the part content to not corrupt the stream.
398 self.current.consume()
398 self.current.consume()
399
399
400 for part in self.iterator:
400 for part in self.iterator:
401 # consume the bundle content
401 # consume the bundle content
402 part.consume()
402 part.consume()
403 except Exception:
403 except Exception:
404 seekerror = True
404 seekerror = True
405
405
406 # Small hack to let caller code distinguish exceptions from bundle2
406 # Small hack to let caller code distinguish exceptions from bundle2
407 # processing from processing the old format. This is mostly needed
407 # processing from processing the old format. This is mostly needed
408 # to handle different return codes to unbundle according to the type
408 # to handle different return codes to unbundle according to the type
409 # of bundle. We should probably clean up or drop this return code
409 # of bundle. We should probably clean up or drop this return code
410 # craziness in a future version.
410 # craziness in a future version.
411 exc.duringunbundle2 = True
411 exc.duringunbundle2 = True
412 salvaged = []
412 salvaged = []
413 replycaps = None
413 replycaps = None
414 if self.op.reply is not None:
414 if self.op.reply is not None:
415 salvaged = self.op.reply.salvageoutput()
415 salvaged = self.op.reply.salvageoutput()
416 replycaps = self.op.reply.capabilities
416 replycaps = self.op.reply.capabilities
417 exc._replycaps = replycaps
417 exc._replycaps = replycaps
418 exc._bundle2salvagedoutput = salvaged
418 exc._bundle2salvagedoutput = salvaged
419
419
420 # Re-raising from a variable loses the original stack. So only use
420 # Re-raising from a variable loses the original stack. So only use
421 # that form if we need to.
421 # that form if we need to.
422 if seekerror:
422 if seekerror:
423 raise exc
423 raise exc
424
424
425 self.repo.ui.debug('bundle2-input-bundle: %i parts total\n' %
425 self.repo.ui.debug('bundle2-input-bundle: %i parts total\n' %
426 self.count)
426 self.count)
427
427
428 def processbundle(repo, unbundler, transactiongetter=None, op=None, source=''):
428 def processbundle(repo, unbundler, transactiongetter=None, op=None, source=''):
429 """This function process a bundle, apply effect to/from a repo
429 """This function process a bundle, apply effect to/from a repo
430
430
431 It iterates over each part then searches for and uses the proper handling
431 It iterates over each part then searches for and uses the proper handling
432 code to process the part. Parts are processed in order.
432 code to process the part. Parts are processed in order.
433
433
434 Unknown Mandatory part will abort the process.
434 Unknown Mandatory part will abort the process.
435
435
436 It is temporarily possible to provide a prebuilt bundleoperation to the
436 It is temporarily possible to provide a prebuilt bundleoperation to the
437 function. This is used to ensure output is properly propagated in case of
437 function. This is used to ensure output is properly propagated in case of
438 an error during the unbundling. This output capturing part will likely be
438 an error during the unbundling. This output capturing part will likely be
439 reworked and this ability will probably go away in the process.
439 reworked and this ability will probably go away in the process.
440 """
440 """
441 if op is None:
441 if op is None:
442 if transactiongetter is None:
442 if transactiongetter is None:
443 transactiongetter = _notransaction
443 transactiongetter = _notransaction
444 op = bundleoperation(repo, transactiongetter, source=source)
444 op = bundleoperation(repo, transactiongetter, source=source)
445 # todo:
445 # todo:
446 # - replace this is a init function soon.
446 # - replace this is a init function soon.
447 # - exception catching
447 # - exception catching
448 unbundler.params
448 unbundler.params
449 if repo.ui.debugflag:
449 if repo.ui.debugflag:
450 msg = ['bundle2-input-bundle:']
450 msg = ['bundle2-input-bundle:']
451 if unbundler.params:
451 if unbundler.params:
452 msg.append(' %i params' % len(unbundler.params))
452 msg.append(' %i params' % len(unbundler.params))
453 if op._gettransaction is None or op._gettransaction is _notransaction:
453 if op._gettransaction is None or op._gettransaction is _notransaction:
454 msg.append(' no-transaction')
454 msg.append(' no-transaction')
455 else:
455 else:
456 msg.append(' with-transaction')
456 msg.append(' with-transaction')
457 msg.append('\n')
457 msg.append('\n')
458 repo.ui.debug(''.join(msg))
458 repo.ui.debug(''.join(msg))
459
459
460 processparts(repo, op, unbundler)
460 processparts(repo, op, unbundler)
461
461
462 return op
462 return op
463
463
464 def processparts(repo, op, unbundler):
464 def processparts(repo, op, unbundler):
465 with partiterator(repo, op, unbundler) as parts:
465 with partiterator(repo, op, unbundler) as parts:
466 for part in parts:
466 for part in parts:
467 _processpart(op, part)
467 _processpart(op, part)
468
468
469 def _processchangegroup(op, cg, tr, source, url, **kwargs):
469 def _processchangegroup(op, cg, tr, source, url, **kwargs):
470 ret = cg.apply(op.repo, tr, source, url, **kwargs)
470 ret = cg.apply(op.repo, tr, source, url, **kwargs)
471 op.records.add('changegroup', {
471 op.records.add('changegroup', {
472 'return': ret,
472 'return': ret,
473 })
473 })
474 return ret
474 return ret
475
475
476 def _gethandler(op, part):
476 def _gethandler(op, part):
477 status = 'unknown' # used by debug output
477 status = 'unknown' # used by debug output
478 try:
478 try:
479 handler = parthandlermapping.get(part.type)
479 handler = parthandlermapping.get(part.type)
480 if handler is None:
480 if handler is None:
481 status = 'unsupported-type'
481 status = 'unsupported-type'
482 raise error.BundleUnknownFeatureError(parttype=part.type)
482 raise error.BundleUnknownFeatureError(parttype=part.type)
483 indebug(op.ui, 'found a handler for part %s' % part.type)
483 indebug(op.ui, 'found a handler for part %s' % part.type)
484 unknownparams = part.mandatorykeys - handler.params
484 unknownparams = part.mandatorykeys - handler.params
485 if unknownparams:
485 if unknownparams:
486 unknownparams = list(unknownparams)
486 unknownparams = list(unknownparams)
487 unknownparams.sort()
487 unknownparams.sort()
488 status = 'unsupported-params (%s)' % ', '.join(unknownparams)
488 status = 'unsupported-params (%s)' % ', '.join(unknownparams)
489 raise error.BundleUnknownFeatureError(parttype=part.type,
489 raise error.BundleUnknownFeatureError(parttype=part.type,
490 params=unknownparams)
490 params=unknownparams)
491 status = 'supported'
491 status = 'supported'
492 except error.BundleUnknownFeatureError as exc:
492 except error.BundleUnknownFeatureError as exc:
493 if part.mandatory: # mandatory parts
493 if part.mandatory: # mandatory parts
494 raise
494 raise
495 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
495 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
496 return # skip to part processing
496 return # skip to part processing
497 finally:
497 finally:
498 if op.ui.debugflag:
498 if op.ui.debugflag:
499 msg = ['bundle2-input-part: "%s"' % part.type]
499 msg = ['bundle2-input-part: "%s"' % part.type]
500 if not part.mandatory:
500 if not part.mandatory:
501 msg.append(' (advisory)')
501 msg.append(' (advisory)')
502 nbmp = len(part.mandatorykeys)
502 nbmp = len(part.mandatorykeys)
503 nbap = len(part.params) - nbmp
503 nbap = len(part.params) - nbmp
504 if nbmp or nbap:
504 if nbmp or nbap:
505 msg.append(' (params:')
505 msg.append(' (params:')
506 if nbmp:
506 if nbmp:
507 msg.append(' %i mandatory' % nbmp)
507 msg.append(' %i mandatory' % nbmp)
508 if nbap:
508 if nbap:
509 msg.append(' %i advisory' % nbmp)
509 msg.append(' %i advisory' % nbmp)
510 msg.append(')')
510 msg.append(')')
511 msg.append(' %s\n' % status)
511 msg.append(' %s\n' % status)
512 op.ui.debug(''.join(msg))
512 op.ui.debug(''.join(msg))
513
513
514 return handler
514 return handler
515
515
516 def _processpart(op, part):
516 def _processpart(op, part):
517 """process a single part from a bundle
517 """process a single part from a bundle
518
518
519 The part is guaranteed to have been fully consumed when the function exits
519 The part is guaranteed to have been fully consumed when the function exits
520 (even if an exception is raised)."""
520 (even if an exception is raised)."""
521 handler = _gethandler(op, part)
521 handler = _gethandler(op, part)
522 if handler is None:
522 if handler is None:
523 return
523 return
524
524
525 # handler is called outside the above try block so that we don't
525 # handler is called outside the above try block so that we don't
526 # risk catching KeyErrors from anything other than the
526 # risk catching KeyErrors from anything other than the
527 # parthandlermapping lookup (any KeyError raised by handler()
527 # parthandlermapping lookup (any KeyError raised by handler()
528 # itself represents a defect of a different variety).
528 # itself represents a defect of a different variety).
529 output = None
529 output = None
530 if op.captureoutput and op.reply is not None:
530 if op.captureoutput and op.reply is not None:
531 op.ui.pushbuffer(error=True, subproc=True)
531 op.ui.pushbuffer(error=True, subproc=True)
532 output = ''
532 output = ''
533 try:
533 try:
534 handler(op, part)
534 handler(op, part)
535 finally:
535 finally:
536 if output is not None:
536 if output is not None:
537 output = op.ui.popbuffer()
537 output = op.ui.popbuffer()
538 if output:
538 if output:
539 outpart = op.reply.newpart('output', data=output,
539 outpart = op.reply.newpart('output', data=output,
540 mandatory=False)
540 mandatory=False)
541 outpart.addparam(
541 outpart.addparam(
542 'in-reply-to', pycompat.bytestr(part.id), mandatory=False)
542 'in-reply-to', pycompat.bytestr(part.id), mandatory=False)
543
543
544 def decodecaps(blob):
544 def decodecaps(blob):
545 """decode a bundle2 caps bytes blob into a dictionary
545 """decode a bundle2 caps bytes blob into a dictionary
546
546
547 The blob is a list of capabilities (one per line)
547 The blob is a list of capabilities (one per line)
548 Capabilities may have values using a line of the form::
548 Capabilities may have values using a line of the form::
549
549
550 capability=value1,value2,value3
550 capability=value1,value2,value3
551
551
552 The values are always a list."""
552 The values are always a list."""
553 caps = {}
553 caps = {}
554 for line in blob.splitlines():
554 for line in blob.splitlines():
555 if not line:
555 if not line:
556 continue
556 continue
557 if '=' not in line:
557 if '=' not in line:
558 key, vals = line, ()
558 key, vals = line, ()
559 else:
559 else:
560 key, vals = line.split('=', 1)
560 key, vals = line.split('=', 1)
561 vals = vals.split(',')
561 vals = vals.split(',')
562 key = urlreq.unquote(key)
562 key = urlreq.unquote(key)
563 vals = [urlreq.unquote(v) for v in vals]
563 vals = [urlreq.unquote(v) for v in vals]
564 caps[key] = vals
564 caps[key] = vals
565 return caps
565 return caps
566
566
567 def encodecaps(caps):
567 def encodecaps(caps):
568 """encode a bundle2 caps dictionary into a bytes blob"""
568 """encode a bundle2 caps dictionary into a bytes blob"""
569 chunks = []
569 chunks = []
570 for ca in sorted(caps):
570 for ca in sorted(caps):
571 vals = caps[ca]
571 vals = caps[ca]
572 ca = urlreq.quote(ca)
572 ca = urlreq.quote(ca)
573 vals = [urlreq.quote(v) for v in vals]
573 vals = [urlreq.quote(v) for v in vals]
574 if vals:
574 if vals:
575 ca = "%s=%s" % (ca, ','.join(vals))
575 ca = "%s=%s" % (ca, ','.join(vals))
576 chunks.append(ca)
576 chunks.append(ca)
577 return '\n'.join(chunks)
577 return '\n'.join(chunks)
578
578
579 bundletypes = {
579 bundletypes = {
580 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
580 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
581 # since the unification ssh accepts a header but there
581 # since the unification ssh accepts a header but there
582 # is no capability signaling it.
582 # is no capability signaling it.
583 "HG20": (), # special-cased below
583 "HG20": (), # special-cased below
584 "HG10UN": ("HG10UN", 'UN'),
584 "HG10UN": ("HG10UN", 'UN'),
585 "HG10BZ": ("HG10", 'BZ'),
585 "HG10BZ": ("HG10", 'BZ'),
586 "HG10GZ": ("HG10GZ", 'GZ'),
586 "HG10GZ": ("HG10GZ", 'GZ'),
587 }
587 }
588
588
589 # hgweb uses this list to communicate its preferred type
589 # hgweb uses this list to communicate its preferred type
590 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
590 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
591
591
592 class bundle20(object):
592 class bundle20(object):
593 """represent an outgoing bundle2 container
593 """represent an outgoing bundle2 container
594
594
595 Use the `addparam` method to add stream level parameter. and `newpart` to
595 Use the `addparam` method to add stream level parameter. and `newpart` to
596 populate it. Then call `getchunks` to retrieve all the binary chunks of
596 populate it. Then call `getchunks` to retrieve all the binary chunks of
597 data that compose the bundle2 container."""
597 data that compose the bundle2 container."""
598
598
599 _magicstring = 'HG20'
599 _magicstring = 'HG20'
600
600
601 def __init__(self, ui, capabilities=()):
601 def __init__(self, ui, capabilities=()):
602 self.ui = ui
602 self.ui = ui
603 self._params = []
603 self._params = []
604 self._parts = []
604 self._parts = []
605 self.capabilities = dict(capabilities)
605 self.capabilities = dict(capabilities)
606 self._compengine = util.compengines.forbundletype('UN')
606 self._compengine = util.compengines.forbundletype('UN')
607 self._compopts = None
607 self._compopts = None
608 # If compression is being handled by a consumer of the raw
608 # If compression is being handled by a consumer of the raw
609 # data (e.g. the wire protocol), unsetting this flag tells
609 # data (e.g. the wire protocol), unsetting this flag tells
610 # consumers that the bundle is best left uncompressed.
610 # consumers that the bundle is best left uncompressed.
611 self.prefercompressed = True
611 self.prefercompressed = True
612
612
613 def setcompression(self, alg, compopts=None):
613 def setcompression(self, alg, compopts=None):
614 """setup core part compression to <alg>"""
614 """setup core part compression to <alg>"""
615 if alg in (None, 'UN'):
615 if alg in (None, 'UN'):
616 return
616 return
617 assert not any(n.lower() == 'compression' for n, v in self._params)
617 assert not any(n.lower() == 'compression' for n, v in self._params)
618 self.addparam('Compression', alg)
618 self.addparam('Compression', alg)
619 self._compengine = util.compengines.forbundletype(alg)
619 self._compengine = util.compengines.forbundletype(alg)
620 self._compopts = compopts
620 self._compopts = compopts
621
621
622 @property
622 @property
623 def nbparts(self):
623 def nbparts(self):
624 """total number of parts added to the bundler"""
624 """total number of parts added to the bundler"""
625 return len(self._parts)
625 return len(self._parts)
626
626
627 # methods used to defines the bundle2 content
627 # methods used to defines the bundle2 content
628 def addparam(self, name, value=None):
628 def addparam(self, name, value=None):
629 """add a stream level parameter"""
629 """add a stream level parameter"""
630 if not name:
630 if not name:
631 raise ValueError(r'empty parameter name')
631 raise ValueError(r'empty parameter name')
632 if name[0:1] not in pycompat.bytestr(string.ascii_letters):
632 if name[0:1] not in pycompat.bytestr(string.ascii_letters):
633 raise ValueError(r'non letter first character: %s' % name)
633 raise ValueError(r'non letter first character: %s' % name)
634 self._params.append((name, value))
634 self._params.append((name, value))
635
635
636 def addpart(self, part):
636 def addpart(self, part):
637 """add a new part to the bundle2 container
637 """add a new part to the bundle2 container
638
638
639 Parts contains the actual applicative payload."""
639 Parts contains the actual applicative payload."""
640 assert part.id is None
640 assert part.id is None
641 part.id = len(self._parts) # very cheap counter
641 part.id = len(self._parts) # very cheap counter
642 self._parts.append(part)
642 self._parts.append(part)
643
643
644 def newpart(self, typeid, *args, **kwargs):
644 def newpart(self, typeid, *args, **kwargs):
645 """create a new part and add it to the containers
645 """create a new part and add it to the containers
646
646
647 As the part is directly added to the containers. For now, this means
647 As the part is directly added to the containers. For now, this means
648 that any failure to properly initialize the part after calling
648 that any failure to properly initialize the part after calling
649 ``newpart`` should result in a failure of the whole bundling process.
649 ``newpart`` should result in a failure of the whole bundling process.
650
650
651 You can still fall back to manually create and add if you need better
651 You can still fall back to manually create and add if you need better
652 control."""
652 control."""
653 part = bundlepart(typeid, *args, **kwargs)
653 part = bundlepart(typeid, *args, **kwargs)
654 self.addpart(part)
654 self.addpart(part)
655 return part
655 return part
656
656
657 # methods used to generate the bundle2 stream
657 # methods used to generate the bundle2 stream
658 def getchunks(self):
658 def getchunks(self):
659 if self.ui.debugflag:
659 if self.ui.debugflag:
660 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
660 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
661 if self._params:
661 if self._params:
662 msg.append(' (%i params)' % len(self._params))
662 msg.append(' (%i params)' % len(self._params))
663 msg.append(' %i parts total\n' % len(self._parts))
663 msg.append(' %i parts total\n' % len(self._parts))
664 self.ui.debug(''.join(msg))
664 self.ui.debug(''.join(msg))
665 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
665 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
666 yield self._magicstring
666 yield self._magicstring
667 param = self._paramchunk()
667 param = self._paramchunk()
668 outdebug(self.ui, 'bundle parameter: %s' % param)
668 outdebug(self.ui, 'bundle parameter: %s' % param)
669 yield _pack(_fstreamparamsize, len(param))
669 yield _pack(_fstreamparamsize, len(param))
670 if param:
670 if param:
671 yield param
671 yield param
672 for chunk in self._compengine.compressstream(self._getcorechunk(),
672 for chunk in self._compengine.compressstream(self._getcorechunk(),
673 self._compopts):
673 self._compopts):
674 yield chunk
674 yield chunk
675
675
676 def _paramchunk(self):
676 def _paramchunk(self):
677 """return a encoded version of all stream parameters"""
677 """return a encoded version of all stream parameters"""
678 blocks = []
678 blocks = []
679 for par, value in self._params:
679 for par, value in self._params:
680 par = urlreq.quote(par)
680 par = urlreq.quote(par)
681 if value is not None:
681 if value is not None:
682 value = urlreq.quote(value)
682 value = urlreq.quote(value)
683 par = '%s=%s' % (par, value)
683 par = '%s=%s' % (par, value)
684 blocks.append(par)
684 blocks.append(par)
685 return ' '.join(blocks)
685 return ' '.join(blocks)
686
686
687 def _getcorechunk(self):
687 def _getcorechunk(self):
688 """yield chunk for the core part of the bundle
688 """yield chunk for the core part of the bundle
689
689
690 (all but headers and parameters)"""
690 (all but headers and parameters)"""
691 outdebug(self.ui, 'start of parts')
691 outdebug(self.ui, 'start of parts')
692 for part in self._parts:
692 for part in self._parts:
693 outdebug(self.ui, 'bundle part: "%s"' % part.type)
693 outdebug(self.ui, 'bundle part: "%s"' % part.type)
694 for chunk in part.getchunks(ui=self.ui):
694 for chunk in part.getchunks(ui=self.ui):
695 yield chunk
695 yield chunk
696 outdebug(self.ui, 'end of bundle')
696 outdebug(self.ui, 'end of bundle')
697 yield _pack(_fpartheadersize, 0)
697 yield _pack(_fpartheadersize, 0)
698
698
699
699
700 def salvageoutput(self):
700 def salvageoutput(self):
701 """return a list with a copy of all output parts in the bundle
701 """return a list with a copy of all output parts in the bundle
702
702
703 This is meant to be used during error handling to make sure we preserve
703 This is meant to be used during error handling to make sure we preserve
704 server output"""
704 server output"""
705 salvaged = []
705 salvaged = []
706 for part in self._parts:
706 for part in self._parts:
707 if part.type.startswith('output'):
707 if part.type.startswith('output'):
708 salvaged.append(part.copy())
708 salvaged.append(part.copy())
709 return salvaged
709 return salvaged
710
710
711
711
712 class unpackermixin(object):
712 class unpackermixin(object):
713 """A mixin to extract bytes and struct data from a stream"""
713 """A mixin to extract bytes and struct data from a stream"""
714
714
715 def __init__(self, fp):
715 def __init__(self, fp):
716 self._fp = fp
716 self._fp = fp
717
717
718 def _unpack(self, format):
718 def _unpack(self, format):
719 """unpack this struct format from the stream
719 """unpack this struct format from the stream
720
720
721 This method is meant for internal usage by the bundle2 protocol only.
721 This method is meant for internal usage by the bundle2 protocol only.
722 They directly manipulate the low level stream including bundle2 level
722 They directly manipulate the low level stream including bundle2 level
723 instruction.
723 instruction.
724
724
725 Do not use it to implement higher-level logic or methods."""
725 Do not use it to implement higher-level logic or methods."""
726 data = self._readexact(struct.calcsize(format))
726 data = self._readexact(struct.calcsize(format))
727 return _unpack(format, data)
727 return _unpack(format, data)
728
728
729 def _readexact(self, size):
729 def _readexact(self, size):
730 """read exactly <size> bytes from the stream
730 """read exactly <size> bytes from the stream
731
731
732 This method is meant for internal usage by the bundle2 protocol only.
732 This method is meant for internal usage by the bundle2 protocol only.
733 They directly manipulate the low level stream including bundle2 level
733 They directly manipulate the low level stream including bundle2 level
734 instruction.
734 instruction.
735
735
736 Do not use it to implement higher-level logic or methods."""
736 Do not use it to implement higher-level logic or methods."""
737 return changegroup.readexactly(self._fp, size)
737 return changegroup.readexactly(self._fp, size)
738
738
739 def getunbundler(ui, fp, magicstring=None):
739 def getunbundler(ui, fp, magicstring=None):
740 """return a valid unbundler object for a given magicstring"""
740 """return a valid unbundler object for a given magicstring"""
741 if magicstring is None:
741 if magicstring is None:
742 magicstring = changegroup.readexactly(fp, 4)
742 magicstring = changegroup.readexactly(fp, 4)
743 magic, version = magicstring[0:2], magicstring[2:4]
743 magic, version = magicstring[0:2], magicstring[2:4]
744 if magic != 'HG':
744 if magic != 'HG':
745 ui.debug(
745 ui.debug(
746 "error: invalid magic: %r (version %r), should be 'HG'\n"
746 "error: invalid magic: %r (version %r), should be 'HG'\n"
747 % (magic, version))
747 % (magic, version))
748 raise error.Abort(_('not a Mercurial bundle'))
748 raise error.Abort(_('not a Mercurial bundle'))
749 unbundlerclass = formatmap.get(version)
749 unbundlerclass = formatmap.get(version)
750 if unbundlerclass is None:
750 if unbundlerclass is None:
751 raise error.Abort(_('unknown bundle version %s') % version)
751 raise error.Abort(_('unknown bundle version %s') % version)
752 unbundler = unbundlerclass(ui, fp)
752 unbundler = unbundlerclass(ui, fp)
753 indebug(ui, 'start processing of %s stream' % magicstring)
753 indebug(ui, 'start processing of %s stream' % magicstring)
754 return unbundler
754 return unbundler
755
755
756 class unbundle20(unpackermixin):
756 class unbundle20(unpackermixin):
757 """interpret a bundle2 stream
757 """interpret a bundle2 stream
758
758
759 This class is fed with a binary stream and yields parts through its
759 This class is fed with a binary stream and yields parts through its
760 `iterparts` methods."""
760 `iterparts` methods."""
761
761
762 _magicstring = 'HG20'
762 _magicstring = 'HG20'
763
763
764 def __init__(self, ui, fp):
764 def __init__(self, ui, fp):
765 """If header is specified, we do not read it out of the stream."""
765 """If header is specified, we do not read it out of the stream."""
766 self.ui = ui
766 self.ui = ui
767 self._compengine = util.compengines.forbundletype('UN')
767 self._compengine = util.compengines.forbundletype('UN')
768 self._compressed = None
768 self._compressed = None
769 super(unbundle20, self).__init__(fp)
769 super(unbundle20, self).__init__(fp)
770
770
771 @util.propertycache
771 @util.propertycache
772 def params(self):
772 def params(self):
773 """dictionary of stream level parameters"""
773 """dictionary of stream level parameters"""
774 indebug(self.ui, 'reading bundle2 stream parameters')
774 indebug(self.ui, 'reading bundle2 stream parameters')
775 params = {}
775 params = {}
776 paramssize = self._unpack(_fstreamparamsize)[0]
776 paramssize = self._unpack(_fstreamparamsize)[0]
777 if paramssize < 0:
777 if paramssize < 0:
778 raise error.BundleValueError('negative bundle param size: %i'
778 raise error.BundleValueError('negative bundle param size: %i'
779 % paramssize)
779 % paramssize)
780 if paramssize:
780 if paramssize:
781 params = self._readexact(paramssize)
781 params = self._readexact(paramssize)
782 params = self._processallparams(params)
782 params = self._processallparams(params)
783 return params
783 return params
784
784
785 def _processallparams(self, paramsblock):
785 def _processallparams(self, paramsblock):
786 """"""
786 """"""
787 params = util.sortdict()
787 params = util.sortdict()
788 for p in paramsblock.split(' '):
788 for p in paramsblock.split(' '):
789 p = p.split('=', 1)
789 p = p.split('=', 1)
790 p = [urlreq.unquote(i) for i in p]
790 p = [urlreq.unquote(i) for i in p]
791 if len(p) < 2:
791 if len(p) < 2:
792 p.append(None)
792 p.append(None)
793 self._processparam(*p)
793 self._processparam(*p)
794 params[p[0]] = p[1]
794 params[p[0]] = p[1]
795 return params
795 return params
796
796
797
797
798 def _processparam(self, name, value):
798 def _processparam(self, name, value):
799 """process a parameter, applying its effect if needed
799 """process a parameter, applying its effect if needed
800
800
801 Parameter starting with a lower case letter are advisory and will be
801 Parameter starting with a lower case letter are advisory and will be
802 ignored when unknown. Those starting with an upper case letter are
802 ignored when unknown. Those starting with an upper case letter are
803 mandatory and will this function will raise a KeyError when unknown.
803 mandatory and will this function will raise a KeyError when unknown.
804
804
805 Note: no option are currently supported. Any input will be either
805 Note: no option are currently supported. Any input will be either
806 ignored or failing.
806 ignored or failing.
807 """
807 """
808 if not name:
808 if not name:
809 raise ValueError(r'empty parameter name')
809 raise ValueError(r'empty parameter name')
810 if name[0:1] not in pycompat.bytestr(string.ascii_letters):
810 if name[0:1] not in pycompat.bytestr(string.ascii_letters):
811 raise ValueError(r'non letter first character: %s' % name)
811 raise ValueError(r'non letter first character: %s' % name)
812 try:
812 try:
813 handler = b2streamparamsmap[name.lower()]
813 handler = b2streamparamsmap[name.lower()]
814 except KeyError:
814 except KeyError:
815 if name[0:1].islower():
815 if name[0:1].islower():
816 indebug(self.ui, "ignoring unknown parameter %s" % name)
816 indebug(self.ui, "ignoring unknown parameter %s" % name)
817 else:
817 else:
818 raise error.BundleUnknownFeatureError(params=(name,))
818 raise error.BundleUnknownFeatureError(params=(name,))
819 else:
819 else:
820 handler(self, name, value)
820 handler(self, name, value)
821
821
822 def _forwardchunks(self):
822 def _forwardchunks(self):
823 """utility to transfer a bundle2 as binary
823 """utility to transfer a bundle2 as binary
824
824
825 This is made necessary by the fact the 'getbundle' command over 'ssh'
825 This is made necessary by the fact the 'getbundle' command over 'ssh'
826 have no way to know then the reply end, relying on the bundle to be
826 have no way to know then the reply end, relying on the bundle to be
827 interpreted to know its end. This is terrible and we are sorry, but we
827 interpreted to know its end. This is terrible and we are sorry, but we
828 needed to move forward to get general delta enabled.
828 needed to move forward to get general delta enabled.
829 """
829 """
830 yield self._magicstring
830 yield self._magicstring
831 assert 'params' not in vars(self)
831 assert 'params' not in vars(self)
832 paramssize = self._unpack(_fstreamparamsize)[0]
832 paramssize = self._unpack(_fstreamparamsize)[0]
833 if paramssize < 0:
833 if paramssize < 0:
834 raise error.BundleValueError('negative bundle param size: %i'
834 raise error.BundleValueError('negative bundle param size: %i'
835 % paramssize)
835 % paramssize)
836 yield _pack(_fstreamparamsize, paramssize)
836 yield _pack(_fstreamparamsize, paramssize)
837 if paramssize:
837 if paramssize:
838 params = self._readexact(paramssize)
838 params = self._readexact(paramssize)
839 self._processallparams(params)
839 self._processallparams(params)
840 yield params
840 yield params
841 assert self._compengine.bundletype == 'UN'
841 assert self._compengine.bundletype == 'UN'
842 # From there, payload might need to be decompressed
842 # From there, payload might need to be decompressed
843 self._fp = self._compengine.decompressorreader(self._fp)
843 self._fp = self._compengine.decompressorreader(self._fp)
844 emptycount = 0
844 emptycount = 0
845 while emptycount < 2:
845 while emptycount < 2:
846 # so we can brainlessly loop
846 # so we can brainlessly loop
847 assert _fpartheadersize == _fpayloadsize
847 assert _fpartheadersize == _fpayloadsize
848 size = self._unpack(_fpartheadersize)[0]
848 size = self._unpack(_fpartheadersize)[0]
849 yield _pack(_fpartheadersize, size)
849 yield _pack(_fpartheadersize, size)
850 if size:
850 if size:
851 emptycount = 0
851 emptycount = 0
852 else:
852 else:
853 emptycount += 1
853 emptycount += 1
854 continue
854 continue
855 if size == flaginterrupt:
855 if size == flaginterrupt:
856 continue
856 continue
857 elif size < 0:
857 elif size < 0:
858 raise error.BundleValueError('negative chunk size: %i')
858 raise error.BundleValueError('negative chunk size: %i')
859 yield self._readexact(size)
859 yield self._readexact(size)
860
860
861
861
862 def iterparts(self, seekable=False):
862 def iterparts(self, seekable=False):
863 """yield all parts contained in the stream"""
863 """yield all parts contained in the stream"""
864 cls = seekableunbundlepart if seekable else unbundlepart
864 cls = seekableunbundlepart if seekable else unbundlepart
865 # make sure param have been loaded
865 # make sure param have been loaded
866 self.params
866 self.params
867 # From there, payload need to be decompressed
867 # From there, payload need to be decompressed
868 self._fp = self._compengine.decompressorreader(self._fp)
868 self._fp = self._compengine.decompressorreader(self._fp)
869 indebug(self.ui, 'start extraction of bundle2 parts')
869 indebug(self.ui, 'start extraction of bundle2 parts')
870 headerblock = self._readpartheader()
870 headerblock = self._readpartheader()
871 while headerblock is not None:
871 while headerblock is not None:
872 part = cls(self.ui, headerblock, self._fp)
872 part = cls(self.ui, headerblock, self._fp)
873 yield part
873 yield part
874 # Ensure part is fully consumed so we can start reading the next
874 # Ensure part is fully consumed so we can start reading the next
875 # part.
875 # part.
876 part.consume()
876 part.consume()
877
877
878 headerblock = self._readpartheader()
878 headerblock = self._readpartheader()
879 indebug(self.ui, 'end of bundle2 stream')
879 indebug(self.ui, 'end of bundle2 stream')
880
880
881 def _readpartheader(self):
881 def _readpartheader(self):
882 """reads a part header size and return the bytes blob
882 """reads a part header size and return the bytes blob
883
883
884 returns None if empty"""
884 returns None if empty"""
885 headersize = self._unpack(_fpartheadersize)[0]
885 headersize = self._unpack(_fpartheadersize)[0]
886 if headersize < 0:
886 if headersize < 0:
887 raise error.BundleValueError('negative part header size: %i'
887 raise error.BundleValueError('negative part header size: %i'
888 % headersize)
888 % headersize)
889 indebug(self.ui, 'part header size: %i' % headersize)
889 indebug(self.ui, 'part header size: %i' % headersize)
890 if headersize:
890 if headersize:
891 return self._readexact(headersize)
891 return self._readexact(headersize)
892 return None
892 return None
893
893
894 def compressed(self):
894 def compressed(self):
895 self.params # load params
895 self.params # load params
896 return self._compressed
896 return self._compressed
897
897
898 def close(self):
898 def close(self):
899 """close underlying file"""
899 """close underlying file"""
900 if util.safehasattr(self._fp, 'close'):
900 if util.safehasattr(self._fp, 'close'):
901 return self._fp.close()
901 return self._fp.close()
902
902
903 formatmap = {'20': unbundle20}
903 formatmap = {'20': unbundle20}
904
904
905 b2streamparamsmap = {}
905 b2streamparamsmap = {}
906
906
907 def b2streamparamhandler(name):
907 def b2streamparamhandler(name):
908 """register a handler for a stream level parameter"""
908 """register a handler for a stream level parameter"""
909 def decorator(func):
909 def decorator(func):
910 assert name not in formatmap
910 assert name not in formatmap
911 b2streamparamsmap[name] = func
911 b2streamparamsmap[name] = func
912 return func
912 return func
913 return decorator
913 return decorator
914
914
915 @b2streamparamhandler('compression')
915 @b2streamparamhandler('compression')
916 def processcompression(unbundler, param, value):
916 def processcompression(unbundler, param, value):
917 """read compression parameter and install payload decompression"""
917 """read compression parameter and install payload decompression"""
918 if value not in util.compengines.supportedbundletypes:
918 if value not in util.compengines.supportedbundletypes:
919 raise error.BundleUnknownFeatureError(params=(param,),
919 raise error.BundleUnknownFeatureError(params=(param,),
920 values=(value,))
920 values=(value,))
921 unbundler._compengine = util.compengines.forbundletype(value)
921 unbundler._compengine = util.compengines.forbundletype(value)
922 if value is not None:
922 if value is not None:
923 unbundler._compressed = True
923 unbundler._compressed = True
924
924
925 class bundlepart(object):
925 class bundlepart(object):
926 """A bundle2 part contains application level payload
926 """A bundle2 part contains application level payload
927
927
928 The part `type` is used to route the part to the application level
928 The part `type` is used to route the part to the application level
929 handler.
929 handler.
930
930
931 The part payload is contained in ``part.data``. It could be raw bytes or a
931 The part payload is contained in ``part.data``. It could be raw bytes or a
932 generator of byte chunks.
932 generator of byte chunks.
933
933
934 You can add parameters to the part using the ``addparam`` method.
934 You can add parameters to the part using the ``addparam`` method.
935 Parameters can be either mandatory (default) or advisory. Remote side
935 Parameters can be either mandatory (default) or advisory. Remote side
936 should be able to safely ignore the advisory ones.
936 should be able to safely ignore the advisory ones.
937
937
938 Both data and parameters cannot be modified after the generation has begun.
938 Both data and parameters cannot be modified after the generation has begun.
939 """
939 """
940
940
941 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
941 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
942 data='', mandatory=True):
942 data='', mandatory=True):
943 validateparttype(parttype)
943 validateparttype(parttype)
944 self.id = None
944 self.id = None
945 self.type = parttype
945 self.type = parttype
946 self._data = data
946 self._data = data
947 self._mandatoryparams = list(mandatoryparams)
947 self._mandatoryparams = list(mandatoryparams)
948 self._advisoryparams = list(advisoryparams)
948 self._advisoryparams = list(advisoryparams)
949 # checking for duplicated entries
949 # checking for duplicated entries
950 self._seenparams = set()
950 self._seenparams = set()
951 for pname, __ in self._mandatoryparams + self._advisoryparams:
951 for pname, __ in self._mandatoryparams + self._advisoryparams:
952 if pname in self._seenparams:
952 if pname in self._seenparams:
953 raise error.ProgrammingError('duplicated params: %s' % pname)
953 raise error.ProgrammingError('duplicated params: %s' % pname)
954 self._seenparams.add(pname)
954 self._seenparams.add(pname)
955 # status of the part's generation:
955 # status of the part's generation:
956 # - None: not started,
956 # - None: not started,
957 # - False: currently generated,
957 # - False: currently generated,
958 # - True: generation done.
958 # - True: generation done.
959 self._generated = None
959 self._generated = None
960 self.mandatory = mandatory
960 self.mandatory = mandatory
961
961
962 def __repr__(self):
962 def __repr__(self):
963 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
963 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
964 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
964 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
965 % (cls, id(self), self.id, self.type, self.mandatory))
965 % (cls, id(self), self.id, self.type, self.mandatory))
966
966
967 def copy(self):
967 def copy(self):
968 """return a copy of the part
968 """return a copy of the part
969
969
970 The new part have the very same content but no partid assigned yet.
970 The new part have the very same content but no partid assigned yet.
971 Parts with generated data cannot be copied."""
971 Parts with generated data cannot be copied."""
972 assert not util.safehasattr(self.data, 'next')
972 assert not util.safehasattr(self.data, 'next')
973 return self.__class__(self.type, self._mandatoryparams,
973 return self.__class__(self.type, self._mandatoryparams,
974 self._advisoryparams, self._data, self.mandatory)
974 self._advisoryparams, self._data, self.mandatory)
975
975
976 # methods used to defines the part content
976 # methods used to defines the part content
977 @property
977 @property
978 def data(self):
978 def data(self):
979 return self._data
979 return self._data
980
980
981 @data.setter
981 @data.setter
982 def data(self, data):
982 def data(self, data):
983 if self._generated is not None:
983 if self._generated is not None:
984 raise error.ReadOnlyPartError('part is being generated')
984 raise error.ReadOnlyPartError('part is being generated')
985 self._data = data
985 self._data = data
986
986
987 @property
987 @property
988 def mandatoryparams(self):
988 def mandatoryparams(self):
989 # make it an immutable tuple to force people through ``addparam``
989 # make it an immutable tuple to force people through ``addparam``
990 return tuple(self._mandatoryparams)
990 return tuple(self._mandatoryparams)
991
991
992 @property
992 @property
993 def advisoryparams(self):
993 def advisoryparams(self):
994 # make it an immutable tuple to force people through ``addparam``
994 # make it an immutable tuple to force people through ``addparam``
995 return tuple(self._advisoryparams)
995 return tuple(self._advisoryparams)
996
996
997 def addparam(self, name, value='', mandatory=True):
997 def addparam(self, name, value='', mandatory=True):
998 """add a parameter to the part
998 """add a parameter to the part
999
999
1000 If 'mandatory' is set to True, the remote handler must claim support
1000 If 'mandatory' is set to True, the remote handler must claim support
1001 for this parameter or the unbundling will be aborted.
1001 for this parameter or the unbundling will be aborted.
1002
1002
1003 The 'name' and 'value' cannot exceed 255 bytes each.
1003 The 'name' and 'value' cannot exceed 255 bytes each.
1004 """
1004 """
1005 if self._generated is not None:
1005 if self._generated is not None:
1006 raise error.ReadOnlyPartError('part is being generated')
1006 raise error.ReadOnlyPartError('part is being generated')
1007 if name in self._seenparams:
1007 if name in self._seenparams:
1008 raise ValueError('duplicated params: %s' % name)
1008 raise ValueError('duplicated params: %s' % name)
1009 self._seenparams.add(name)
1009 self._seenparams.add(name)
1010 params = self._advisoryparams
1010 params = self._advisoryparams
1011 if mandatory:
1011 if mandatory:
1012 params = self._mandatoryparams
1012 params = self._mandatoryparams
1013 params.append((name, value))
1013 params.append((name, value))
1014
1014
1015 # methods used to generates the bundle2 stream
1015 # methods used to generates the bundle2 stream
1016 def getchunks(self, ui):
1016 def getchunks(self, ui):
1017 if self._generated is not None:
1017 if self._generated is not None:
1018 raise error.ProgrammingError('part can only be consumed once')
1018 raise error.ProgrammingError('part can only be consumed once')
1019 self._generated = False
1019 self._generated = False
1020
1020
1021 if ui.debugflag:
1021 if ui.debugflag:
1022 msg = ['bundle2-output-part: "%s"' % self.type]
1022 msg = ['bundle2-output-part: "%s"' % self.type]
1023 if not self.mandatory:
1023 if not self.mandatory:
1024 msg.append(' (advisory)')
1024 msg.append(' (advisory)')
1025 nbmp = len(self.mandatoryparams)
1025 nbmp = len(self.mandatoryparams)
1026 nbap = len(self.advisoryparams)
1026 nbap = len(self.advisoryparams)
1027 if nbmp or nbap:
1027 if nbmp or nbap:
1028 msg.append(' (params:')
1028 msg.append(' (params:')
1029 if nbmp:
1029 if nbmp:
1030 msg.append(' %i mandatory' % nbmp)
1030 msg.append(' %i mandatory' % nbmp)
1031 if nbap:
1031 if nbap:
1032 msg.append(' %i advisory' % nbmp)
1032 msg.append(' %i advisory' % nbmp)
1033 msg.append(')')
1033 msg.append(')')
1034 if not self.data:
1034 if not self.data:
1035 msg.append(' empty payload')
1035 msg.append(' empty payload')
1036 elif (util.safehasattr(self.data, 'next')
1036 elif (util.safehasattr(self.data, 'next')
1037 or util.safehasattr(self.data, '__next__')):
1037 or util.safehasattr(self.data, '__next__')):
1038 msg.append(' streamed payload')
1038 msg.append(' streamed payload')
1039 else:
1039 else:
1040 msg.append(' %i bytes payload' % len(self.data))
1040 msg.append(' %i bytes payload' % len(self.data))
1041 msg.append('\n')
1041 msg.append('\n')
1042 ui.debug(''.join(msg))
1042 ui.debug(''.join(msg))
1043
1043
1044 #### header
1044 #### header
1045 if self.mandatory:
1045 if self.mandatory:
1046 parttype = self.type.upper()
1046 parttype = self.type.upper()
1047 else:
1047 else:
1048 parttype = self.type.lower()
1048 parttype = self.type.lower()
1049 outdebug(ui, 'part %s: "%s"' % (pycompat.bytestr(self.id), parttype))
1049 outdebug(ui, 'part %s: "%s"' % (pycompat.bytestr(self.id), parttype))
1050 ## parttype
1050 ## parttype
1051 header = [_pack(_fparttypesize, len(parttype)),
1051 header = [_pack(_fparttypesize, len(parttype)),
1052 parttype, _pack(_fpartid, self.id),
1052 parttype, _pack(_fpartid, self.id),
1053 ]
1053 ]
1054 ## parameters
1054 ## parameters
1055 # count
1055 # count
1056 manpar = self.mandatoryparams
1056 manpar = self.mandatoryparams
1057 advpar = self.advisoryparams
1057 advpar = self.advisoryparams
1058 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
1058 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
1059 # size
1059 # size
1060 parsizes = []
1060 parsizes = []
1061 for key, value in manpar:
1061 for key, value in manpar:
1062 parsizes.append(len(key))
1062 parsizes.append(len(key))
1063 parsizes.append(len(value))
1063 parsizes.append(len(value))
1064 for key, value in advpar:
1064 for key, value in advpar:
1065 parsizes.append(len(key))
1065 parsizes.append(len(key))
1066 parsizes.append(len(value))
1066 parsizes.append(len(value))
1067 paramsizes = _pack(_makefpartparamsizes(len(parsizes) // 2), *parsizes)
1067 paramsizes = _pack(_makefpartparamsizes(len(parsizes) // 2), *parsizes)
1068 header.append(paramsizes)
1068 header.append(paramsizes)
1069 # key, value
1069 # key, value
1070 for key, value in manpar:
1070 for key, value in manpar:
1071 header.append(key)
1071 header.append(key)
1072 header.append(value)
1072 header.append(value)
1073 for key, value in advpar:
1073 for key, value in advpar:
1074 header.append(key)
1074 header.append(key)
1075 header.append(value)
1075 header.append(value)
1076 ## finalize header
1076 ## finalize header
1077 try:
1077 try:
1078 headerchunk = ''.join(header)
1078 headerchunk = ''.join(header)
1079 except TypeError:
1079 except TypeError:
1080 raise TypeError(r'Found a non-bytes trying to '
1080 raise TypeError(r'Found a non-bytes trying to '
1081 r'build bundle part header: %r' % header)
1081 r'build bundle part header: %r' % header)
1082 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
1082 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
1083 yield _pack(_fpartheadersize, len(headerchunk))
1083 yield _pack(_fpartheadersize, len(headerchunk))
1084 yield headerchunk
1084 yield headerchunk
1085 ## payload
1085 ## payload
1086 try:
1086 try:
1087 for chunk in self._payloadchunks():
1087 for chunk in self._payloadchunks():
1088 outdebug(ui, 'payload chunk size: %i' % len(chunk))
1088 outdebug(ui, 'payload chunk size: %i' % len(chunk))
1089 yield _pack(_fpayloadsize, len(chunk))
1089 yield _pack(_fpayloadsize, len(chunk))
1090 yield chunk
1090 yield chunk
1091 except GeneratorExit:
1091 except GeneratorExit:
1092 # GeneratorExit means that nobody is listening for our
1092 # GeneratorExit means that nobody is listening for our
1093 # results anyway, so just bail quickly rather than trying
1093 # results anyway, so just bail quickly rather than trying
1094 # to produce an error part.
1094 # to produce an error part.
1095 ui.debug('bundle2-generatorexit\n')
1095 ui.debug('bundle2-generatorexit\n')
1096 raise
1096 raise
1097 except BaseException as exc:
1097 except BaseException as exc:
1098 bexc = stringutil.forcebytestr(exc)
1098 bexc = stringutil.forcebytestr(exc)
1099 # backup exception data for later
1099 # backup exception data for later
1100 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
1100 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
1101 % bexc)
1101 % bexc)
1102 tb = sys.exc_info()[2]
1102 tb = sys.exc_info()[2]
1103 msg = 'unexpected error: %s' % bexc
1103 msg = 'unexpected error: %s' % bexc
1104 interpart = bundlepart('error:abort', [('message', msg)],
1104 interpart = bundlepart('error:abort', [('message', msg)],
1105 mandatory=False)
1105 mandatory=False)
1106 interpart.id = 0
1106 interpart.id = 0
1107 yield _pack(_fpayloadsize, -1)
1107 yield _pack(_fpayloadsize, -1)
1108 for chunk in interpart.getchunks(ui=ui):
1108 for chunk in interpart.getchunks(ui=ui):
1109 yield chunk
1109 yield chunk
1110 outdebug(ui, 'closing payload chunk')
1110 outdebug(ui, 'closing payload chunk')
1111 # abort current part payload
1111 # abort current part payload
1112 yield _pack(_fpayloadsize, 0)
1112 yield _pack(_fpayloadsize, 0)
1113 pycompat.raisewithtb(exc, tb)
1113 pycompat.raisewithtb(exc, tb)
1114 # end of payload
1114 # end of payload
1115 outdebug(ui, 'closing payload chunk')
1115 outdebug(ui, 'closing payload chunk')
1116 yield _pack(_fpayloadsize, 0)
1116 yield _pack(_fpayloadsize, 0)
1117 self._generated = True
1117 self._generated = True
1118
1118
1119 def _payloadchunks(self):
1119 def _payloadchunks(self):
1120 """yield chunks of a the part payload
1120 """yield chunks of a the part payload
1121
1121
1122 Exists to handle the different methods to provide data to a part."""
1122 Exists to handle the different methods to provide data to a part."""
1123 # we only support fixed size data now.
1123 # we only support fixed size data now.
1124 # This will be improved in the future.
1124 # This will be improved in the future.
1125 if (util.safehasattr(self.data, 'next')
1125 if (util.safehasattr(self.data, 'next')
1126 or util.safehasattr(self.data, '__next__')):
1126 or util.safehasattr(self.data, '__next__')):
1127 buff = util.chunkbuffer(self.data)
1127 buff = util.chunkbuffer(self.data)
1128 chunk = buff.read(preferedchunksize)
1128 chunk = buff.read(preferedchunksize)
1129 while chunk:
1129 while chunk:
1130 yield chunk
1130 yield chunk
1131 chunk = buff.read(preferedchunksize)
1131 chunk = buff.read(preferedchunksize)
1132 elif len(self.data):
1132 elif len(self.data):
1133 yield self.data
1133 yield self.data
1134
1134
1135
1135
1136 flaginterrupt = -1
1136 flaginterrupt = -1
1137
1137
1138 class interrupthandler(unpackermixin):
1138 class interrupthandler(unpackermixin):
1139 """read one part and process it with restricted capability
1139 """read one part and process it with restricted capability
1140
1140
1141 This allows to transmit exception raised on the producer size during part
1141 This allows to transmit exception raised on the producer size during part
1142 iteration while the consumer is reading a part.
1142 iteration while the consumer is reading a part.
1143
1143
1144 Part processed in this manner only have access to a ui object,"""
1144 Part processed in this manner only have access to a ui object,"""
1145
1145
1146 def __init__(self, ui, fp):
1146 def __init__(self, ui, fp):
1147 super(interrupthandler, self).__init__(fp)
1147 super(interrupthandler, self).__init__(fp)
1148 self.ui = ui
1148 self.ui = ui
1149
1149
1150 def _readpartheader(self):
1150 def _readpartheader(self):
1151 """reads a part header size and return the bytes blob
1151 """reads a part header size and return the bytes blob
1152
1152
1153 returns None if empty"""
1153 returns None if empty"""
1154 headersize = self._unpack(_fpartheadersize)[0]
1154 headersize = self._unpack(_fpartheadersize)[0]
1155 if headersize < 0:
1155 if headersize < 0:
1156 raise error.BundleValueError('negative part header size: %i'
1156 raise error.BundleValueError('negative part header size: %i'
1157 % headersize)
1157 % headersize)
1158 indebug(self.ui, 'part header size: %i\n' % headersize)
1158 indebug(self.ui, 'part header size: %i\n' % headersize)
1159 if headersize:
1159 if headersize:
1160 return self._readexact(headersize)
1160 return self._readexact(headersize)
1161 return None
1161 return None
1162
1162
1163 def __call__(self):
1163 def __call__(self):
1164
1164
1165 self.ui.debug('bundle2-input-stream-interrupt:'
1165 self.ui.debug('bundle2-input-stream-interrupt:'
1166 ' opening out of band context\n')
1166 ' opening out of band context\n')
1167 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1167 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1168 headerblock = self._readpartheader()
1168 headerblock = self._readpartheader()
1169 if headerblock is None:
1169 if headerblock is None:
1170 indebug(self.ui, 'no part found during interruption.')
1170 indebug(self.ui, 'no part found during interruption.')
1171 return
1171 return
1172 part = unbundlepart(self.ui, headerblock, self._fp)
1172 part = unbundlepart(self.ui, headerblock, self._fp)
1173 op = interruptoperation(self.ui)
1173 op = interruptoperation(self.ui)
1174 hardabort = False
1174 hardabort = False
1175 try:
1175 try:
1176 _processpart(op, part)
1176 _processpart(op, part)
1177 except (SystemExit, KeyboardInterrupt):
1177 except (SystemExit, KeyboardInterrupt):
1178 hardabort = True
1178 hardabort = True
1179 raise
1179 raise
1180 finally:
1180 finally:
1181 if not hardabort:
1181 if not hardabort:
1182 part.consume()
1182 part.consume()
1183 self.ui.debug('bundle2-input-stream-interrupt:'
1183 self.ui.debug('bundle2-input-stream-interrupt:'
1184 ' closing out of band context\n')
1184 ' closing out of band context\n')
1185
1185
1186 class interruptoperation(object):
1186 class interruptoperation(object):
1187 """A limited operation to be use by part handler during interruption
1187 """A limited operation to be use by part handler during interruption
1188
1188
1189 It only have access to an ui object.
1189 It only have access to an ui object.
1190 """
1190 """
1191
1191
1192 def __init__(self, ui):
1192 def __init__(self, ui):
1193 self.ui = ui
1193 self.ui = ui
1194 self.reply = None
1194 self.reply = None
1195 self.captureoutput = False
1195 self.captureoutput = False
1196
1196
1197 @property
1197 @property
1198 def repo(self):
1198 def repo(self):
1199 raise error.ProgrammingError('no repo access from stream interruption')
1199 raise error.ProgrammingError('no repo access from stream interruption')
1200
1200
1201 def gettransaction(self):
1201 def gettransaction(self):
1202 raise TransactionUnavailable('no repo access from stream interruption')
1202 raise TransactionUnavailable('no repo access from stream interruption')
1203
1203
1204 def decodepayloadchunks(ui, fh):
1204 def decodepayloadchunks(ui, fh):
1205 """Reads bundle2 part payload data into chunks.
1205 """Reads bundle2 part payload data into chunks.
1206
1206
1207 Part payload data consists of framed chunks. This function takes
1207 Part payload data consists of framed chunks. This function takes
1208 a file handle and emits those chunks.
1208 a file handle and emits those chunks.
1209 """
1209 """
1210 dolog = ui.configbool('devel', 'bundle2.debug')
1210 dolog = ui.configbool('devel', 'bundle2.debug')
1211 debug = ui.debug
1211 debug = ui.debug
1212
1212
1213 headerstruct = struct.Struct(_fpayloadsize)
1213 headerstruct = struct.Struct(_fpayloadsize)
1214 headersize = headerstruct.size
1214 headersize = headerstruct.size
1215 unpack = headerstruct.unpack
1215 unpack = headerstruct.unpack
1216
1216
1217 readexactly = changegroup.readexactly
1217 readexactly = changegroup.readexactly
1218 read = fh.read
1218 read = fh.read
1219
1219
1220 chunksize = unpack(readexactly(fh, headersize))[0]
1220 chunksize = unpack(readexactly(fh, headersize))[0]
1221 indebug(ui, 'payload chunk size: %i' % chunksize)
1221 indebug(ui, 'payload chunk size: %i' % chunksize)
1222
1222
1223 # changegroup.readexactly() is inlined below for performance.
1223 # changegroup.readexactly() is inlined below for performance.
1224 while chunksize:
1224 while chunksize:
1225 if chunksize >= 0:
1225 if chunksize >= 0:
1226 s = read(chunksize)
1226 s = read(chunksize)
1227 if len(s) < chunksize:
1227 if len(s) < chunksize:
1228 raise error.Abort(_('stream ended unexpectedly '
1228 raise error.Abort(_('stream ended unexpectedly '
1229 ' (got %d bytes, expected %d)') %
1229 ' (got %d bytes, expected %d)') %
1230 (len(s), chunksize))
1230 (len(s), chunksize))
1231
1231
1232 yield s
1232 yield s
1233 elif chunksize == flaginterrupt:
1233 elif chunksize == flaginterrupt:
1234 # Interrupt "signal" detected. The regular stream is interrupted
1234 # Interrupt "signal" detected. The regular stream is interrupted
1235 # and a bundle2 part follows. Consume it.
1235 # and a bundle2 part follows. Consume it.
1236 interrupthandler(ui, fh)()
1236 interrupthandler(ui, fh)()
1237 else:
1237 else:
1238 raise error.BundleValueError(
1238 raise error.BundleValueError(
1239 'negative payload chunk size: %s' % chunksize)
1239 'negative payload chunk size: %s' % chunksize)
1240
1240
1241 s = read(headersize)
1241 s = read(headersize)
1242 if len(s) < headersize:
1242 if len(s) < headersize:
1243 raise error.Abort(_('stream ended unexpectedly '
1243 raise error.Abort(_('stream ended unexpectedly '
1244 ' (got %d bytes, expected %d)') %
1244 ' (got %d bytes, expected %d)') %
1245 (len(s), chunksize))
1245 (len(s), chunksize))
1246
1246
1247 chunksize = unpack(s)[0]
1247 chunksize = unpack(s)[0]
1248
1248
1249 # indebug() inlined for performance.
1249 # indebug() inlined for performance.
1250 if dolog:
1250 if dolog:
1251 debug('bundle2-input: payload chunk size: %i\n' % chunksize)
1251 debug('bundle2-input: payload chunk size: %i\n' % chunksize)
1252
1252
1253 class unbundlepart(unpackermixin):
1253 class unbundlepart(unpackermixin):
1254 """a bundle part read from a bundle"""
1254 """a bundle part read from a bundle"""
1255
1255
1256 def __init__(self, ui, header, fp):
1256 def __init__(self, ui, header, fp):
1257 super(unbundlepart, self).__init__(fp)
1257 super(unbundlepart, self).__init__(fp)
1258 self._seekable = (util.safehasattr(fp, 'seek') and
1258 self._seekable = (util.safehasattr(fp, 'seek') and
1259 util.safehasattr(fp, 'tell'))
1259 util.safehasattr(fp, 'tell'))
1260 self.ui = ui
1260 self.ui = ui
1261 # unbundle state attr
1261 # unbundle state attr
1262 self._headerdata = header
1262 self._headerdata = header
1263 self._headeroffset = 0
1263 self._headeroffset = 0
1264 self._initialized = False
1264 self._initialized = False
1265 self.consumed = False
1265 self.consumed = False
1266 # part data
1266 # part data
1267 self.id = None
1267 self.id = None
1268 self.type = None
1268 self.type = None
1269 self.mandatoryparams = None
1269 self.mandatoryparams = None
1270 self.advisoryparams = None
1270 self.advisoryparams = None
1271 self.params = None
1271 self.params = None
1272 self.mandatorykeys = ()
1272 self.mandatorykeys = ()
1273 self._readheader()
1273 self._readheader()
1274 self._mandatory = None
1274 self._mandatory = None
1275 self._pos = 0
1275 self._pos = 0
1276
1276
1277 def _fromheader(self, size):
1277 def _fromheader(self, size):
1278 """return the next <size> byte from the header"""
1278 """return the next <size> byte from the header"""
1279 offset = self._headeroffset
1279 offset = self._headeroffset
1280 data = self._headerdata[offset:(offset + size)]
1280 data = self._headerdata[offset:(offset + size)]
1281 self._headeroffset = offset + size
1281 self._headeroffset = offset + size
1282 return data
1282 return data
1283
1283
1284 def _unpackheader(self, format):
1284 def _unpackheader(self, format):
1285 """read given format from header
1285 """read given format from header
1286
1286
1287 This automatically compute the size of the format to read."""
1287 This automatically compute the size of the format to read."""
1288 data = self._fromheader(struct.calcsize(format))
1288 data = self._fromheader(struct.calcsize(format))
1289 return _unpack(format, data)
1289 return _unpack(format, data)
1290
1290
1291 def _initparams(self, mandatoryparams, advisoryparams):
1291 def _initparams(self, mandatoryparams, advisoryparams):
1292 """internal function to setup all logic related parameters"""
1292 """internal function to setup all logic related parameters"""
1293 # make it read only to prevent people touching it by mistake.
1293 # make it read only to prevent people touching it by mistake.
1294 self.mandatoryparams = tuple(mandatoryparams)
1294 self.mandatoryparams = tuple(mandatoryparams)
1295 self.advisoryparams = tuple(advisoryparams)
1295 self.advisoryparams = tuple(advisoryparams)
1296 # user friendly UI
1296 # user friendly UI
1297 self.params = util.sortdict(self.mandatoryparams)
1297 self.params = util.sortdict(self.mandatoryparams)
1298 self.params.update(self.advisoryparams)
1298 self.params.update(self.advisoryparams)
1299 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1299 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1300
1300
1301 def _readheader(self):
1301 def _readheader(self):
1302 """read the header and setup the object"""
1302 """read the header and setup the object"""
1303 typesize = self._unpackheader(_fparttypesize)[0]
1303 typesize = self._unpackheader(_fparttypesize)[0]
1304 self.type = self._fromheader(typesize)
1304 self.type = self._fromheader(typesize)
1305 indebug(self.ui, 'part type: "%s"' % self.type)
1305 indebug(self.ui, 'part type: "%s"' % self.type)
1306 self.id = self._unpackheader(_fpartid)[0]
1306 self.id = self._unpackheader(_fpartid)[0]
1307 indebug(self.ui, 'part id: "%s"' % pycompat.bytestr(self.id))
1307 indebug(self.ui, 'part id: "%s"' % pycompat.bytestr(self.id))
1308 # extract mandatory bit from type
1308 # extract mandatory bit from type
1309 self.mandatory = (self.type != self.type.lower())
1309 self.mandatory = (self.type != self.type.lower())
1310 self.type = self.type.lower()
1310 self.type = self.type.lower()
1311 ## reading parameters
1311 ## reading parameters
1312 # param count
1312 # param count
1313 mancount, advcount = self._unpackheader(_fpartparamcount)
1313 mancount, advcount = self._unpackheader(_fpartparamcount)
1314 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1314 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1315 # param size
1315 # param size
1316 fparamsizes = _makefpartparamsizes(mancount + advcount)
1316 fparamsizes = _makefpartparamsizes(mancount + advcount)
1317 paramsizes = self._unpackheader(fparamsizes)
1317 paramsizes = self._unpackheader(fparamsizes)
1318 # make it a list of couple again
1318 # make it a list of couple again
1319 paramsizes = list(zip(paramsizes[::2], paramsizes[1::2]))
1319 paramsizes = list(zip(paramsizes[::2], paramsizes[1::2]))
1320 # split mandatory from advisory
1320 # split mandatory from advisory
1321 mansizes = paramsizes[:mancount]
1321 mansizes = paramsizes[:mancount]
1322 advsizes = paramsizes[mancount:]
1322 advsizes = paramsizes[mancount:]
1323 # retrieve param value
1323 # retrieve param value
1324 manparams = []
1324 manparams = []
1325 for key, value in mansizes:
1325 for key, value in mansizes:
1326 manparams.append((self._fromheader(key), self._fromheader(value)))
1326 manparams.append((self._fromheader(key), self._fromheader(value)))
1327 advparams = []
1327 advparams = []
1328 for key, value in advsizes:
1328 for key, value in advsizes:
1329 advparams.append((self._fromheader(key), self._fromheader(value)))
1329 advparams.append((self._fromheader(key), self._fromheader(value)))
1330 self._initparams(manparams, advparams)
1330 self._initparams(manparams, advparams)
1331 ## part payload
1331 ## part payload
1332 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1332 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1333 # we read the data, tell it
1333 # we read the data, tell it
1334 self._initialized = True
1334 self._initialized = True
1335
1335
1336 def _payloadchunks(self):
1336 def _payloadchunks(self):
1337 """Generator of decoded chunks in the payload."""
1337 """Generator of decoded chunks in the payload."""
1338 return decodepayloadchunks(self.ui, self._fp)
1338 return decodepayloadchunks(self.ui, self._fp)
1339
1339
1340 def consume(self):
1340 def consume(self):
1341 """Read the part payload until completion.
1341 """Read the part payload until completion.
1342
1342
1343 By consuming the part data, the underlying stream read offset will
1343 By consuming the part data, the underlying stream read offset will
1344 be advanced to the next part (or end of stream).
1344 be advanced to the next part (or end of stream).
1345 """
1345 """
1346 if self.consumed:
1346 if self.consumed:
1347 return
1347 return
1348
1348
1349 chunk = self.read(32768)
1349 chunk = self.read(32768)
1350 while chunk:
1350 while chunk:
1351 self._pos += len(chunk)
1351 self._pos += len(chunk)
1352 chunk = self.read(32768)
1352 chunk = self.read(32768)
1353
1353
1354 def read(self, size=None):
1354 def read(self, size=None):
1355 """read payload data"""
1355 """read payload data"""
1356 if not self._initialized:
1356 if not self._initialized:
1357 self._readheader()
1357 self._readheader()
1358 if size is None:
1358 if size is None:
1359 data = self._payloadstream.read()
1359 data = self._payloadstream.read()
1360 else:
1360 else:
1361 data = self._payloadstream.read(size)
1361 data = self._payloadstream.read(size)
1362 self._pos += len(data)
1362 self._pos += len(data)
1363 if size is None or len(data) < size:
1363 if size is None or len(data) < size:
1364 if not self.consumed and self._pos:
1364 if not self.consumed and self._pos:
1365 self.ui.debug('bundle2-input-part: total payload size %i\n'
1365 self.ui.debug('bundle2-input-part: total payload size %i\n'
1366 % self._pos)
1366 % self._pos)
1367 self.consumed = True
1367 self.consumed = True
1368 return data
1368 return data
1369
1369
1370 class seekableunbundlepart(unbundlepart):
1370 class seekableunbundlepart(unbundlepart):
1371 """A bundle2 part in a bundle that is seekable.
1371 """A bundle2 part in a bundle that is seekable.
1372
1372
1373 Regular ``unbundlepart`` instances can only be read once. This class
1373 Regular ``unbundlepart`` instances can only be read once. This class
1374 extends ``unbundlepart`` to enable bi-directional seeking within the
1374 extends ``unbundlepart`` to enable bi-directional seeking within the
1375 part.
1375 part.
1376
1376
1377 Bundle2 part data consists of framed chunks. Offsets when seeking
1377 Bundle2 part data consists of framed chunks. Offsets when seeking
1378 refer to the decoded data, not the offsets in the underlying bundle2
1378 refer to the decoded data, not the offsets in the underlying bundle2
1379 stream.
1379 stream.
1380
1380
1381 To facilitate quickly seeking within the decoded data, instances of this
1381 To facilitate quickly seeking within the decoded data, instances of this
1382 class maintain a mapping between offsets in the underlying stream and
1382 class maintain a mapping between offsets in the underlying stream and
1383 the decoded payload. This mapping will consume memory in proportion
1383 the decoded payload. This mapping will consume memory in proportion
1384 to the number of chunks within the payload (which almost certainly
1384 to the number of chunks within the payload (which almost certainly
1385 increases in proportion with the size of the part).
1385 increases in proportion with the size of the part).
1386 """
1386 """
1387 def __init__(self, ui, header, fp):
1387 def __init__(self, ui, header, fp):
1388 # (payload, file) offsets for chunk starts.
1388 # (payload, file) offsets for chunk starts.
1389 self._chunkindex = []
1389 self._chunkindex = []
1390
1390
1391 super(seekableunbundlepart, self).__init__(ui, header, fp)
1391 super(seekableunbundlepart, self).__init__(ui, header, fp)
1392
1392
1393 def _payloadchunks(self, chunknum=0):
1393 def _payloadchunks(self, chunknum=0):
1394 '''seek to specified chunk and start yielding data'''
1394 '''seek to specified chunk and start yielding data'''
1395 if len(self._chunkindex) == 0:
1395 if len(self._chunkindex) == 0:
1396 assert chunknum == 0, 'Must start with chunk 0'
1396 assert chunknum == 0, 'Must start with chunk 0'
1397 self._chunkindex.append((0, self._tellfp()))
1397 self._chunkindex.append((0, self._tellfp()))
1398 else:
1398 else:
1399 assert chunknum < len(self._chunkindex), \
1399 assert chunknum < len(self._chunkindex), \
1400 'Unknown chunk %d' % chunknum
1400 'Unknown chunk %d' % chunknum
1401 self._seekfp(self._chunkindex[chunknum][1])
1401 self._seekfp(self._chunkindex[chunknum][1])
1402
1402
1403 pos = self._chunkindex[chunknum][0]
1403 pos = self._chunkindex[chunknum][0]
1404
1404
1405 for chunk in decodepayloadchunks(self.ui, self._fp):
1405 for chunk in decodepayloadchunks(self.ui, self._fp):
1406 chunknum += 1
1406 chunknum += 1
1407 pos += len(chunk)
1407 pos += len(chunk)
1408 if chunknum == len(self._chunkindex):
1408 if chunknum == len(self._chunkindex):
1409 self._chunkindex.append((pos, self._tellfp()))
1409 self._chunkindex.append((pos, self._tellfp()))
1410
1410
1411 yield chunk
1411 yield chunk
1412
1412
1413 def _findchunk(self, pos):
1413 def _findchunk(self, pos):
1414 '''for a given payload position, return a chunk number and offset'''
1414 '''for a given payload position, return a chunk number and offset'''
1415 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1415 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1416 if ppos == pos:
1416 if ppos == pos:
1417 return chunk, 0
1417 return chunk, 0
1418 elif ppos > pos:
1418 elif ppos > pos:
1419 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1419 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1420 raise ValueError('Unknown chunk')
1420 raise ValueError('Unknown chunk')
1421
1421
1422 def tell(self):
1422 def tell(self):
1423 return self._pos
1423 return self._pos
1424
1424
1425 def seek(self, offset, whence=os.SEEK_SET):
1425 def seek(self, offset, whence=os.SEEK_SET):
1426 if whence == os.SEEK_SET:
1426 if whence == os.SEEK_SET:
1427 newpos = offset
1427 newpos = offset
1428 elif whence == os.SEEK_CUR:
1428 elif whence == os.SEEK_CUR:
1429 newpos = self._pos + offset
1429 newpos = self._pos + offset
1430 elif whence == os.SEEK_END:
1430 elif whence == os.SEEK_END:
1431 if not self.consumed:
1431 if not self.consumed:
1432 # Can't use self.consume() here because it advances self._pos.
1432 # Can't use self.consume() here because it advances self._pos.
1433 chunk = self.read(32768)
1433 chunk = self.read(32768)
1434 while chunk:
1434 while chunk:
1435 chunk = self.read(32768)
1435 chunk = self.read(32768)
1436 newpos = self._chunkindex[-1][0] - offset
1436 newpos = self._chunkindex[-1][0] - offset
1437 else:
1437 else:
1438 raise ValueError('Unknown whence value: %r' % (whence,))
1438 raise ValueError('Unknown whence value: %r' % (whence,))
1439
1439
1440 if newpos > self._chunkindex[-1][0] and not self.consumed:
1440 if newpos > self._chunkindex[-1][0] and not self.consumed:
1441 # Can't use self.consume() here because it advances self._pos.
1441 # Can't use self.consume() here because it advances self._pos.
1442 chunk = self.read(32768)
1442 chunk = self.read(32768)
1443 while chunk:
1443 while chunk:
1444 chunk = self.read(32668)
1444 chunk = self.read(32668)
1445
1445
1446 if not 0 <= newpos <= self._chunkindex[-1][0]:
1446 if not 0 <= newpos <= self._chunkindex[-1][0]:
1447 raise ValueError('Offset out of range')
1447 raise ValueError('Offset out of range')
1448
1448
1449 if self._pos != newpos:
1449 if self._pos != newpos:
1450 chunk, internaloffset = self._findchunk(newpos)
1450 chunk, internaloffset = self._findchunk(newpos)
1451 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1451 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1452 adjust = self.read(internaloffset)
1452 adjust = self.read(internaloffset)
1453 if len(adjust) != internaloffset:
1453 if len(adjust) != internaloffset:
1454 raise error.Abort(_('Seek failed\n'))
1454 raise error.Abort(_('Seek failed\n'))
1455 self._pos = newpos
1455 self._pos = newpos
1456
1456
1457 def _seekfp(self, offset, whence=0):
1457 def _seekfp(self, offset, whence=0):
1458 """move the underlying file pointer
1458 """move the underlying file pointer
1459
1459
1460 This method is meant for internal usage by the bundle2 protocol only.
1460 This method is meant for internal usage by the bundle2 protocol only.
1461 They directly manipulate the low level stream including bundle2 level
1461 They directly manipulate the low level stream including bundle2 level
1462 instruction.
1462 instruction.
1463
1463
1464 Do not use it to implement higher-level logic or methods."""
1464 Do not use it to implement higher-level logic or methods."""
1465 if self._seekable:
1465 if self._seekable:
1466 return self._fp.seek(offset, whence)
1466 return self._fp.seek(offset, whence)
1467 else:
1467 else:
1468 raise NotImplementedError(_('File pointer is not seekable'))
1468 raise NotImplementedError(_('File pointer is not seekable'))
1469
1469
1470 def _tellfp(self):
1470 def _tellfp(self):
1471 """return the file offset, or None if file is not seekable
1471 """return the file offset, or None if file is not seekable
1472
1472
1473 This method is meant for internal usage by the bundle2 protocol only.
1473 This method is meant for internal usage by the bundle2 protocol only.
1474 They directly manipulate the low level stream including bundle2 level
1474 They directly manipulate the low level stream including bundle2 level
1475 instruction.
1475 instruction.
1476
1476
1477 Do not use it to implement higher-level logic or methods."""
1477 Do not use it to implement higher-level logic or methods."""
1478 if self._seekable:
1478 if self._seekable:
1479 try:
1479 try:
1480 return self._fp.tell()
1480 return self._fp.tell()
1481 except IOError as e:
1481 except IOError as e:
1482 if e.errno == errno.ESPIPE:
1482 if e.errno == errno.ESPIPE:
1483 self._seekable = False
1483 self._seekable = False
1484 else:
1484 else:
1485 raise
1485 raise
1486 return None
1486 return None
1487
1487
1488 # These are only the static capabilities.
1488 # These are only the static capabilities.
1489 # Check the 'getrepocaps' function for the rest.
1489 # Check the 'getrepocaps' function for the rest.
1490 capabilities = {'HG20': (),
1490 capabilities = {'HG20': (),
1491 'bookmarks': (),
1491 'bookmarks': (),
1492 'error': ('abort', 'unsupportedcontent', 'pushraced',
1492 'error': ('abort', 'unsupportedcontent', 'pushraced',
1493 'pushkey'),
1493 'pushkey'),
1494 'listkeys': (),
1494 'listkeys': (),
1495 'pushkey': (),
1495 'pushkey': (),
1496 'digests': tuple(sorted(util.DIGESTS.keys())),
1496 'digests': tuple(sorted(util.DIGESTS.keys())),
1497 'remote-changegroup': ('http', 'https'),
1497 'remote-changegroup': ('http', 'https'),
1498 'hgtagsfnodes': (),
1498 'hgtagsfnodes': (),
1499 'rev-branch-cache': (),
1499 'rev-branch-cache': (),
1500 'phases': ('heads',),
1500 'phases': ('heads',),
1501 'stream': ('v2',),
1501 'stream': ('v2',),
1502 }
1502 }
1503
1503
1504 def getrepocaps(repo, allowpushback=False, role=None):
1504 def getrepocaps(repo, allowpushback=False, role=None):
1505 """return the bundle2 capabilities for a given repo
1505 """return the bundle2 capabilities for a given repo
1506
1506
1507 Exists to allow extensions (like evolution) to mutate the capabilities.
1507 Exists to allow extensions (like evolution) to mutate the capabilities.
1508
1508
1509 The returned value is used for servers advertising their capabilities as
1509 The returned value is used for servers advertising their capabilities as
1510 well as clients advertising their capabilities to servers as part of
1510 well as clients advertising their capabilities to servers as part of
1511 bundle2 requests. The ``role`` argument specifies which is which.
1511 bundle2 requests. The ``role`` argument specifies which is which.
1512 """
1512 """
1513 if role not in ('client', 'server'):
1513 if role not in ('client', 'server'):
1514 raise error.ProgrammingError('role argument must be client or server')
1514 raise error.ProgrammingError('role argument must be client or server')
1515
1515
1516 caps = capabilities.copy()
1516 caps = capabilities.copy()
1517 caps['changegroup'] = tuple(sorted(
1517 caps['changegroup'] = tuple(sorted(
1518 changegroup.supportedincomingversions(repo)))
1518 changegroup.supportedincomingversions(repo)))
1519 if obsolete.isenabled(repo, obsolete.exchangeopt):
1519 if obsolete.isenabled(repo, obsolete.exchangeopt):
1520 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1520 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1521 caps['obsmarkers'] = supportedformat
1521 caps['obsmarkers'] = supportedformat
1522 if allowpushback:
1522 if allowpushback:
1523 caps['pushback'] = ()
1523 caps['pushback'] = ()
1524 cpmode = repo.ui.config('server', 'concurrent-push-mode')
1524 cpmode = repo.ui.config('server', 'concurrent-push-mode')
1525 if cpmode == 'check-related':
1525 if cpmode == 'check-related':
1526 caps['checkheads'] = ('related',)
1526 caps['checkheads'] = ('related',)
1527 if 'phases' in repo.ui.configlist('devel', 'legacy.exchange'):
1527 if 'phases' in repo.ui.configlist('devel', 'legacy.exchange'):
1528 caps.pop('phases')
1528 caps.pop('phases')
1529
1529
1530 # Don't advertise stream clone support in server mode if not configured.
1530 # Don't advertise stream clone support in server mode if not configured.
1531 if role == 'server':
1531 if role == 'server':
1532 streamsupported = repo.ui.configbool('server', 'uncompressed',
1532 streamsupported = repo.ui.configbool('server', 'uncompressed',
1533 untrusted=True)
1533 untrusted=True)
1534 featuresupported = repo.ui.configbool('experimental', 'bundle2.stream')
1534 featuresupported = repo.ui.configbool('experimental', 'bundle2.stream')
1535
1535
1536 if not streamsupported or not featuresupported:
1536 if not streamsupported or not featuresupported:
1537 caps.pop('stream')
1537 caps.pop('stream')
1538 # Else always advertise support on client, because payload support
1538 # Else always advertise support on client, because payload support
1539 # should always be advertised.
1539 # should always be advertised.
1540
1540
1541 return caps
1541 return caps
1542
1542
1543 def bundle2caps(remote):
1543 def bundle2caps(remote):
1544 """return the bundle capabilities of a peer as dict"""
1544 """return the bundle capabilities of a peer as dict"""
1545 raw = remote.capable('bundle2')
1545 raw = remote.capable('bundle2')
1546 if not raw and raw != '':
1546 if not raw and raw != '':
1547 return {}
1547 return {}
1548 capsblob = urlreq.unquote(remote.capable('bundle2'))
1548 capsblob = urlreq.unquote(remote.capable('bundle2'))
1549 return decodecaps(capsblob)
1549 return decodecaps(capsblob)
1550
1550
1551 def obsmarkersversion(caps):
1551 def obsmarkersversion(caps):
1552 """extract the list of supported obsmarkers versions from a bundle2caps dict
1552 """extract the list of supported obsmarkers versions from a bundle2caps dict
1553 """
1553 """
1554 obscaps = caps.get('obsmarkers', ())
1554 obscaps = caps.get('obsmarkers', ())
1555 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1555 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1556
1556
1557 def writenewbundle(ui, repo, source, filename, bundletype, outgoing, opts,
1557 def writenewbundle(ui, repo, source, filename, bundletype, outgoing, opts,
1558 vfs=None, compression=None, compopts=None):
1558 vfs=None, compression=None, compopts=None):
1559 if bundletype.startswith('HG10'):
1559 if bundletype.startswith('HG10'):
1560 cg = changegroup.makechangegroup(repo, outgoing, '01', source)
1560 cg = changegroup.makechangegroup(repo, outgoing, '01', source)
1561 return writebundle(ui, cg, filename, bundletype, vfs=vfs,
1561 return writebundle(ui, cg, filename, bundletype, vfs=vfs,
1562 compression=compression, compopts=compopts)
1562 compression=compression, compopts=compopts)
1563 elif not bundletype.startswith('HG20'):
1563 elif not bundletype.startswith('HG20'):
1564 raise error.ProgrammingError('unknown bundle type: %s' % bundletype)
1564 raise error.ProgrammingError('unknown bundle type: %s' % bundletype)
1565
1565
1566 caps = {}
1566 caps = {}
1567 if 'obsolescence' in opts:
1567 if 'obsolescence' in opts:
1568 caps['obsmarkers'] = ('V1',)
1568 caps['obsmarkers'] = ('V1',)
1569 bundle = bundle20(ui, caps)
1569 bundle = bundle20(ui, caps)
1570 bundle.setcompression(compression, compopts)
1570 bundle.setcompression(compression, compopts)
1571 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1571 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1572 chunkiter = bundle.getchunks()
1572 chunkiter = bundle.getchunks()
1573
1573
1574 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1574 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1575
1575
1576 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1576 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1577 # We should eventually reconcile this logic with the one behind
1577 # We should eventually reconcile this logic with the one behind
1578 # 'exchange.getbundle2partsgenerator'.
1578 # 'exchange.getbundle2partsgenerator'.
1579 #
1579 #
1580 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1580 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1581 # different right now. So we keep them separated for now for the sake of
1581 # different right now. So we keep them separated for now for the sake of
1582 # simplicity.
1582 # simplicity.
1583
1583
1584 # we might not always want a changegroup in such bundle, for example in
1584 # we might not always want a changegroup in such bundle, for example in
1585 # stream bundles
1585 # stream bundles
1586 if opts.get('changegroup', True):
1586 if opts.get('changegroup', True):
1587 cgversion = opts.get('cg.version')
1587 cgversion = opts.get('cg.version')
1588 if cgversion is None:
1588 if cgversion is None:
1589 cgversion = changegroup.safeversion(repo)
1589 cgversion = changegroup.safeversion(repo)
1590 cg = changegroup.makechangegroup(repo, outgoing, cgversion, source)
1590 cg = changegroup.makechangegroup(repo, outgoing, cgversion, source)
1591 part = bundler.newpart('changegroup', data=cg.getchunks())
1591 part = bundler.newpart('changegroup', data=cg.getchunks())
1592 part.addparam('version', cg.version)
1592 part.addparam('version', cg.version)
1593 if 'clcount' in cg.extras:
1593 if 'clcount' in cg.extras:
1594 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1594 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1595 mandatory=False)
1595 mandatory=False)
1596 if opts.get('phases') and repo.revs('%ln and secret()',
1596 if opts.get('phases') and repo.revs('%ln and secret()',
1597 outgoing.missingheads):
1597 outgoing.missingheads):
1598 part.addparam('targetphase', '%d' % phases.secret, mandatory=False)
1598 part.addparam('targetphase', '%d' % phases.secret, mandatory=False)
1599
1599
1600 if opts.get('streamv2', False):
1600 if opts.get('streamv2', False):
1601 addpartbundlestream2(bundler, repo, stream=True)
1601 addpartbundlestream2(bundler, repo, stream=True)
1602
1602
1603 if opts.get('tagsfnodescache', True):
1603 if opts.get('tagsfnodescache', True):
1604 addparttagsfnodescache(repo, bundler, outgoing)
1604 addparttagsfnodescache(repo, bundler, outgoing)
1605
1605
1606 if opts.get('revbranchcache', True):
1606 if opts.get('revbranchcache', True):
1607 addpartrevbranchcache(repo, bundler, outgoing)
1607 addpartrevbranchcache(repo, bundler, outgoing)
1608
1608
1609 if opts.get('obsolescence', False):
1609 if opts.get('obsolescence', False):
1610 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1610 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1611 buildobsmarkerspart(bundler, obsmarkers)
1611 buildobsmarkerspart(bundler, obsmarkers)
1612
1612
1613 if opts.get('phases', False):
1613 if opts.get('phases', False):
1614 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1614 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1615 phasedata = phases.binaryencode(headsbyphase)
1615 phasedata = phases.binaryencode(headsbyphase)
1616 bundler.newpart('phase-heads', data=phasedata)
1616 bundler.newpart('phase-heads', data=phasedata)
1617
1617
1618 def addparttagsfnodescache(repo, bundler, outgoing):
1618 def addparttagsfnodescache(repo, bundler, outgoing):
1619 # we include the tags fnode cache for the bundle changeset
1619 # we include the tags fnode cache for the bundle changeset
1620 # (as an optional parts)
1620 # (as an optional parts)
1621 cache = tags.hgtagsfnodescache(repo.unfiltered())
1621 cache = tags.hgtagsfnodescache(repo.unfiltered())
1622 chunks = []
1622 chunks = []
1623
1623
1624 # .hgtags fnodes are only relevant for head changesets. While we could
1624 # .hgtags fnodes are only relevant for head changesets. While we could
1625 # transfer values for all known nodes, there will likely be little to
1625 # transfer values for all known nodes, there will likely be little to
1626 # no benefit.
1626 # no benefit.
1627 #
1627 #
1628 # We don't bother using a generator to produce output data because
1628 # We don't bother using a generator to produce output data because
1629 # a) we only have 40 bytes per head and even esoteric numbers of heads
1629 # a) we only have 40 bytes per head and even esoteric numbers of heads
1630 # consume little memory (1M heads is 40MB) b) we don't want to send the
1630 # consume little memory (1M heads is 40MB) b) we don't want to send the
1631 # part if we don't have entries and knowing if we have entries requires
1631 # part if we don't have entries and knowing if we have entries requires
1632 # cache lookups.
1632 # cache lookups.
1633 for node in outgoing.missingheads:
1633 for node in outgoing.missingheads:
1634 # Don't compute missing, as this may slow down serving.
1634 # Don't compute missing, as this may slow down serving.
1635 fnode = cache.getfnode(node, computemissing=False)
1635 fnode = cache.getfnode(node, computemissing=False)
1636 if fnode is not None:
1636 if fnode is not None:
1637 chunks.extend([node, fnode])
1637 chunks.extend([node, fnode])
1638
1638
1639 if chunks:
1639 if chunks:
1640 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1640 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1641
1641
1642 def addpartrevbranchcache(repo, bundler, outgoing):
1642 def addpartrevbranchcache(repo, bundler, outgoing):
1643 # we include the rev branch cache for the bundle changeset
1643 # we include the rev branch cache for the bundle changeset
1644 # (as an optional parts)
1644 # (as an optional parts)
1645 cache = repo.revbranchcache()
1645 cache = repo.revbranchcache()
1646 cl = repo.unfiltered().changelog
1646 cl = repo.unfiltered().changelog
1647 branchesdata = collections.defaultdict(lambda: (set(), set()))
1647 branchesdata = collections.defaultdict(lambda: (set(), set()))
1648 for node in outgoing.missing:
1648 for node in outgoing.missing:
1649 branch, close = cache.branchinfo(cl.rev(node))
1649 branch, close = cache.branchinfo(cl.rev(node))
1650 branchesdata[branch][close].add(node)
1650 branchesdata[branch][close].add(node)
1651
1651
1652 def generate():
1652 def generate():
1653 for branch, (nodes, closed) in sorted(branchesdata.items()):
1653 for branch, (nodes, closed) in sorted(branchesdata.items()):
1654 utf8branch = encoding.fromlocal(branch)
1654 utf8branch = encoding.fromlocal(branch)
1655 yield rbcstruct.pack(len(utf8branch), len(nodes), len(closed))
1655 yield rbcstruct.pack(len(utf8branch), len(nodes), len(closed))
1656 yield utf8branch
1656 yield utf8branch
1657 for n in sorted(nodes):
1657 for n in sorted(nodes):
1658 yield n
1658 yield n
1659 for n in sorted(closed):
1659 for n in sorted(closed):
1660 yield n
1660 yield n
1661
1661
1662 bundler.newpart('cache:rev-branch-cache', data=generate())
1662 bundler.newpart('cache:rev-branch-cache', data=generate())
1663
1663
1664 def _formatrequirementsspec(requirements):
1664 def _formatrequirementsspec(requirements):
1665 return urlreq.quote(','.join(sorted(requirements)))
1665 return urlreq.quote(','.join(sorted(requirements)))
1666
1666
1667 def _formatrequirementsparams(requirements):
1667 def _formatrequirementsparams(requirements):
1668 requirements = _formatrequirementsspec(requirements)
1668 requirements = _formatrequirementsspec(requirements)
1669 params = "%s%s" % (urlreq.quote("requirements="), requirements)
1669 params = "%s%s" % (urlreq.quote("requirements="), requirements)
1670 return params
1670 return params
1671
1671
1672 def addpartbundlestream2(bundler, repo, **kwargs):
1672 def addpartbundlestream2(bundler, repo, **kwargs):
1673 if not kwargs.get('stream', False):
1673 if not kwargs.get('stream', False):
1674 return
1674 return
1675
1675
1676 if not streamclone.allowservergeneration(repo):
1676 if not streamclone.allowservergeneration(repo):
1677 raise error.Abort(_('stream data requested but server does not allow '
1677 raise error.Abort(_('stream data requested but server does not allow '
1678 'this feature'),
1678 'this feature'),
1679 hint=_('well-behaved clients should not be '
1679 hint=_('well-behaved clients should not be '
1680 'requesting stream data from servers not '
1680 'requesting stream data from servers not '
1681 'advertising it; the client may be buggy'))
1681 'advertising it; the client may be buggy'))
1682
1682
1683 # Stream clones don't compress well. And compression undermines a
1683 # Stream clones don't compress well. And compression undermines a
1684 # goal of stream clones, which is to be fast. Communicate the desire
1684 # goal of stream clones, which is to be fast. Communicate the desire
1685 # to avoid compression to consumers of the bundle.
1685 # to avoid compression to consumers of the bundle.
1686 bundler.prefercompressed = False
1686 bundler.prefercompressed = False
1687
1687
1688 filecount, bytecount, it = streamclone.generatev2(repo)
1688 filecount, bytecount, it = streamclone.generatev2(repo)
1689 requirements = _formatrequirementsspec(repo.requirements)
1689 requirements = _formatrequirementsspec(repo.requirements)
1690 part = bundler.newpart('stream2', data=it)
1690 part = bundler.newpart('stream2', data=it)
1691 part.addparam('bytecount', '%d' % bytecount, mandatory=True)
1691 part.addparam('bytecount', '%d' % bytecount, mandatory=True)
1692 part.addparam('filecount', '%d' % filecount, mandatory=True)
1692 part.addparam('filecount', '%d' % filecount, mandatory=True)
1693 part.addparam('requirements', requirements, mandatory=True)
1693 part.addparam('requirements', requirements, mandatory=True)
1694
1694
1695 def buildobsmarkerspart(bundler, markers):
1695 def buildobsmarkerspart(bundler, markers):
1696 """add an obsmarker part to the bundler with <markers>
1696 """add an obsmarker part to the bundler with <markers>
1697
1697
1698 No part is created if markers is empty.
1698 No part is created if markers is empty.
1699 Raises ValueError if the bundler doesn't support any known obsmarker format.
1699 Raises ValueError if the bundler doesn't support any known obsmarker format.
1700 """
1700 """
1701 if not markers:
1701 if not markers:
1702 return None
1702 return None
1703
1703
1704 remoteversions = obsmarkersversion(bundler.capabilities)
1704 remoteversions = obsmarkersversion(bundler.capabilities)
1705 version = obsolete.commonversion(remoteversions)
1705 version = obsolete.commonversion(remoteversions)
1706 if version is None:
1706 if version is None:
1707 raise ValueError('bundler does not support common obsmarker format')
1707 raise ValueError('bundler does not support common obsmarker format')
1708 stream = obsolete.encodemarkers(markers, True, version=version)
1708 stream = obsolete.encodemarkers(markers, True, version=version)
1709 return bundler.newpart('obsmarkers', data=stream)
1709 return bundler.newpart('obsmarkers', data=stream)
1710
1710
1711 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1711 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1712 compopts=None):
1712 compopts=None):
1713 """Write a bundle file and return its filename.
1713 """Write a bundle file and return its filename.
1714
1714
1715 Existing files will not be overwritten.
1715 Existing files will not be overwritten.
1716 If no filename is specified, a temporary file is created.
1716 If no filename is specified, a temporary file is created.
1717 bz2 compression can be turned off.
1717 bz2 compression can be turned off.
1718 The bundle file will be deleted in case of errors.
1718 The bundle file will be deleted in case of errors.
1719 """
1719 """
1720
1720
1721 if bundletype == "HG20":
1721 if bundletype == "HG20":
1722 bundle = bundle20(ui)
1722 bundle = bundle20(ui)
1723 bundle.setcompression(compression, compopts)
1723 bundle.setcompression(compression, compopts)
1724 part = bundle.newpart('changegroup', data=cg.getchunks())
1724 part = bundle.newpart('changegroup', data=cg.getchunks())
1725 part.addparam('version', cg.version)
1725 part.addparam('version', cg.version)
1726 if 'clcount' in cg.extras:
1726 if 'clcount' in cg.extras:
1727 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1727 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1728 mandatory=False)
1728 mandatory=False)
1729 chunkiter = bundle.getchunks()
1729 chunkiter = bundle.getchunks()
1730 else:
1730 else:
1731 # compression argument is only for the bundle2 case
1731 # compression argument is only for the bundle2 case
1732 assert compression is None
1732 assert compression is None
1733 if cg.version != '01':
1733 if cg.version != '01':
1734 raise error.Abort(_('old bundle types only supports v1 '
1734 raise error.Abort(_('old bundle types only supports v1 '
1735 'changegroups'))
1735 'changegroups'))
1736 header, comp = bundletypes[bundletype]
1736 header, comp = bundletypes[bundletype]
1737 if comp not in util.compengines.supportedbundletypes:
1737 if comp not in util.compengines.supportedbundletypes:
1738 raise error.Abort(_('unknown stream compression type: %s')
1738 raise error.Abort(_('unknown stream compression type: %s')
1739 % comp)
1739 % comp)
1740 compengine = util.compengines.forbundletype(comp)
1740 compengine = util.compengines.forbundletype(comp)
1741 def chunkiter():
1741 def chunkiter():
1742 yield header
1742 yield header
1743 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1743 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1744 yield chunk
1744 yield chunk
1745 chunkiter = chunkiter()
1745 chunkiter = chunkiter()
1746
1746
1747 # parse the changegroup data, otherwise we will block
1747 # parse the changegroup data, otherwise we will block
1748 # in case of sshrepo because we don't know the end of the stream
1748 # in case of sshrepo because we don't know the end of the stream
1749 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1749 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1750
1750
1751 def combinechangegroupresults(op):
1751 def combinechangegroupresults(op):
1752 """logic to combine 0 or more addchangegroup results into one"""
1752 """logic to combine 0 or more addchangegroup results into one"""
1753 results = [r.get('return', 0)
1753 results = [r.get('return', 0)
1754 for r in op.records['changegroup']]
1754 for r in op.records['changegroup']]
1755 changedheads = 0
1755 changedheads = 0
1756 result = 1
1756 result = 1
1757 for ret in results:
1757 for ret in results:
1758 # If any changegroup result is 0, return 0
1758 # If any changegroup result is 0, return 0
1759 if ret == 0:
1759 if ret == 0:
1760 result = 0
1760 result = 0
1761 break
1761 break
1762 if ret < -1:
1762 if ret < -1:
1763 changedheads += ret + 1
1763 changedheads += ret + 1
1764 elif ret > 1:
1764 elif ret > 1:
1765 changedheads += ret - 1
1765 changedheads += ret - 1
1766 if changedheads > 0:
1766 if changedheads > 0:
1767 result = 1 + changedheads
1767 result = 1 + changedheads
1768 elif changedheads < 0:
1768 elif changedheads < 0:
1769 result = -1 + changedheads
1769 result = -1 + changedheads
1770 return result
1770 return result
1771
1771
1772 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest',
1772 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest',
1773 'targetphase'))
1773 'targetphase'))
1774 def handlechangegroup(op, inpart):
1774 def handlechangegroup(op, inpart):
1775 """apply a changegroup part on the repo
1775 """apply a changegroup part on the repo
1776
1776
1777 This is a very early implementation that will massive rework before being
1777 This is a very early implementation that will massive rework before being
1778 inflicted to any end-user.
1778 inflicted to any end-user.
1779 """
1779 """
1780 tr = op.gettransaction()
1780 tr = op.gettransaction()
1781 unpackerversion = inpart.params.get('version', '01')
1781 unpackerversion = inpart.params.get('version', '01')
1782 # We should raise an appropriate exception here
1782 # We should raise an appropriate exception here
1783 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1783 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1784 # the source and url passed here are overwritten by the one contained in
1784 # the source and url passed here are overwritten by the one contained in
1785 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1785 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1786 nbchangesets = None
1786 nbchangesets = None
1787 if 'nbchanges' in inpart.params:
1787 if 'nbchanges' in inpart.params:
1788 nbchangesets = int(inpart.params.get('nbchanges'))
1788 nbchangesets = int(inpart.params.get('nbchanges'))
1789 if ('treemanifest' in inpart.params and
1789 if ('treemanifest' in inpart.params and
1790 'treemanifest' not in op.repo.requirements):
1790 'treemanifest' not in op.repo.requirements):
1791 if len(op.repo.changelog) != 0:
1791 if len(op.repo.changelog) != 0:
1792 raise error.Abort(_(
1792 raise error.Abort(_(
1793 "bundle contains tree manifests, but local repo is "
1793 "bundle contains tree manifests, but local repo is "
1794 "non-empty and does not use tree manifests"))
1794 "non-empty and does not use tree manifests"))
1795 op.repo.requirements.add('treemanifest')
1795 op.repo.requirements.add('treemanifest')
1796 op.repo._applyopenerreqs()
1796 op.repo._applyopenerreqs()
1797 op.repo._writerequirements()
1797 op.repo._writerequirements()
1798 extrakwargs = {}
1798 extrakwargs = {}
1799 targetphase = inpart.params.get('targetphase')
1799 targetphase = inpart.params.get('targetphase')
1800 if targetphase is not None:
1800 if targetphase is not None:
1801 extrakwargs[r'targetphase'] = int(targetphase)
1801 extrakwargs[r'targetphase'] = int(targetphase)
1802 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2',
1802 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2',
1803 expectedtotal=nbchangesets, **extrakwargs)
1803 expectedtotal=nbchangesets, **extrakwargs)
1804 if op.reply is not None:
1804 if op.reply is not None:
1805 # This is definitely not the final form of this
1805 # This is definitely not the final form of this
1806 # return. But one need to start somewhere.
1806 # return. But one need to start somewhere.
1807 part = op.reply.newpart('reply:changegroup', mandatory=False)
1807 part = op.reply.newpart('reply:changegroup', mandatory=False)
1808 part.addparam(
1808 part.addparam(
1809 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1809 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1810 part.addparam('return', '%i' % ret, mandatory=False)
1810 part.addparam('return', '%i' % ret, mandatory=False)
1811 assert not inpart.read()
1811 assert not inpart.read()
1812
1812
1813 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1813 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1814 ['digest:%s' % k for k in util.DIGESTS.keys()])
1814 ['digest:%s' % k for k in util.DIGESTS.keys()])
1815 @parthandler('remote-changegroup', _remotechangegroupparams)
1815 @parthandler('remote-changegroup', _remotechangegroupparams)
1816 def handleremotechangegroup(op, inpart):
1816 def handleremotechangegroup(op, inpart):
1817 """apply a bundle10 on the repo, given an url and validation information
1817 """apply a bundle10 on the repo, given an url and validation information
1818
1818
1819 All the information about the remote bundle to import are given as
1819 All the information about the remote bundle to import are given as
1820 parameters. The parameters include:
1820 parameters. The parameters include:
1821 - url: the url to the bundle10.
1821 - url: the url to the bundle10.
1822 - size: the bundle10 file size. It is used to validate what was
1822 - size: the bundle10 file size. It is used to validate what was
1823 retrieved by the client matches the server knowledge about the bundle.
1823 retrieved by the client matches the server knowledge about the bundle.
1824 - digests: a space separated list of the digest types provided as
1824 - digests: a space separated list of the digest types provided as
1825 parameters.
1825 parameters.
1826 - digest:<digest-type>: the hexadecimal representation of the digest with
1826 - digest:<digest-type>: the hexadecimal representation of the digest with
1827 that name. Like the size, it is used to validate what was retrieved by
1827 that name. Like the size, it is used to validate what was retrieved by
1828 the client matches what the server knows about the bundle.
1828 the client matches what the server knows about the bundle.
1829
1829
1830 When multiple digest types are given, all of them are checked.
1830 When multiple digest types are given, all of them are checked.
1831 """
1831 """
1832 try:
1832 try:
1833 raw_url = inpart.params['url']
1833 raw_url = inpart.params['url']
1834 except KeyError:
1834 except KeyError:
1835 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1835 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1836 parsed_url = util.url(raw_url)
1836 parsed_url = util.url(raw_url)
1837 if parsed_url.scheme not in capabilities['remote-changegroup']:
1837 if parsed_url.scheme not in capabilities['remote-changegroup']:
1838 raise error.Abort(_('remote-changegroup does not support %s urls') %
1838 raise error.Abort(_('remote-changegroup does not support %s urls') %
1839 parsed_url.scheme)
1839 parsed_url.scheme)
1840
1840
1841 try:
1841 try:
1842 size = int(inpart.params['size'])
1842 size = int(inpart.params['size'])
1843 except ValueError:
1843 except ValueError:
1844 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1844 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1845 % 'size')
1845 % 'size')
1846 except KeyError:
1846 except KeyError:
1847 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1847 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1848
1848
1849 digests = {}
1849 digests = {}
1850 for typ in inpart.params.get('digests', '').split():
1850 for typ in inpart.params.get('digests', '').split():
1851 param = 'digest:%s' % typ
1851 param = 'digest:%s' % typ
1852 try:
1852 try:
1853 value = inpart.params[param]
1853 value = inpart.params[param]
1854 except KeyError:
1854 except KeyError:
1855 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1855 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1856 param)
1856 param)
1857 digests[typ] = value
1857 digests[typ] = value
1858
1858
1859 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1859 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1860
1860
1861 tr = op.gettransaction()
1861 tr = op.gettransaction()
1862 from . import exchange
1862 from . import exchange
1863 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1863 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1864 if not isinstance(cg, changegroup.cg1unpacker):
1864 if not isinstance(cg, changegroup.cg1unpacker):
1865 raise error.Abort(_('%s: not a bundle version 1.0') %
1865 raise error.Abort(_('%s: not a bundle version 1.0') %
1866 util.hidepassword(raw_url))
1866 util.hidepassword(raw_url))
1867 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2')
1867 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2')
1868 if op.reply is not None:
1868 if op.reply is not None:
1869 # This is definitely not the final form of this
1869 # This is definitely not the final form of this
1870 # return. But one need to start somewhere.
1870 # return. But one need to start somewhere.
1871 part = op.reply.newpart('reply:changegroup')
1871 part = op.reply.newpart('reply:changegroup')
1872 part.addparam(
1872 part.addparam(
1873 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1873 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1874 part.addparam('return', '%i' % ret, mandatory=False)
1874 part.addparam('return', '%i' % ret, mandatory=False)
1875 try:
1875 try:
1876 real_part.validate()
1876 real_part.validate()
1877 except error.Abort as e:
1877 except error.Abort as e:
1878 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1878 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1879 (util.hidepassword(raw_url), str(e)))
1879 (util.hidepassword(raw_url), str(e)))
1880 assert not inpart.read()
1880 assert not inpart.read()
1881
1881
1882 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1882 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1883 def handlereplychangegroup(op, inpart):
1883 def handlereplychangegroup(op, inpart):
1884 ret = int(inpart.params['return'])
1884 ret = int(inpart.params['return'])
1885 replyto = int(inpart.params['in-reply-to'])
1885 replyto = int(inpart.params['in-reply-to'])
1886 op.records.add('changegroup', {'return': ret}, replyto)
1886 op.records.add('changegroup', {'return': ret}, replyto)
1887
1887
1888 @parthandler('check:bookmarks')
1888 @parthandler('check:bookmarks')
1889 def handlecheckbookmarks(op, inpart):
1889 def handlecheckbookmarks(op, inpart):
1890 """check location of bookmarks
1890 """check location of bookmarks
1891
1891
1892 This part is to be used to detect push race regarding bookmark, it
1892 This part is to be used to detect push race regarding bookmark, it
1893 contains binary encoded (bookmark, node) tuple. If the local state does
1893 contains binary encoded (bookmark, node) tuple. If the local state does
1894 not marks the one in the part, a PushRaced exception is raised
1894 not marks the one in the part, a PushRaced exception is raised
1895 """
1895 """
1896 bookdata = bookmarks.binarydecode(inpart)
1896 bookdata = bookmarks.binarydecode(inpart)
1897
1897
1898 msgstandard = ('repository changed while pushing - please try again '
1898 msgstandard = ('repository changed while pushing - please try again '
1899 '(bookmark "%s" move from %s to %s)')
1899 '(bookmark "%s" move from %s to %s)')
1900 msgmissing = ('repository changed while pushing - please try again '
1900 msgmissing = ('repository changed while pushing - please try again '
1901 '(bookmark "%s" is missing, expected %s)')
1901 '(bookmark "%s" is missing, expected %s)')
1902 msgexist = ('repository changed while pushing - please try again '
1902 msgexist = ('repository changed while pushing - please try again '
1903 '(bookmark "%s" set on %s, expected missing)')
1903 '(bookmark "%s" set on %s, expected missing)')
1904 for book, node in bookdata:
1904 for book, node in bookdata:
1905 currentnode = op.repo._bookmarks.get(book)
1905 currentnode = op.repo._bookmarks.get(book)
1906 if currentnode != node:
1906 if currentnode != node:
1907 if node is None:
1907 if node is None:
1908 finalmsg = msgexist % (book, nodemod.short(currentnode))
1908 finalmsg = msgexist % (book, nodemod.short(currentnode))
1909 elif currentnode is None:
1909 elif currentnode is None:
1910 finalmsg = msgmissing % (book, nodemod.short(node))
1910 finalmsg = msgmissing % (book, nodemod.short(node))
1911 else:
1911 else:
1912 finalmsg = msgstandard % (book, nodemod.short(node),
1912 finalmsg = msgstandard % (book, nodemod.short(node),
1913 nodemod.short(currentnode))
1913 nodemod.short(currentnode))
1914 raise error.PushRaced(finalmsg)
1914 raise error.PushRaced(finalmsg)
1915
1915
1916 @parthandler('check:heads')
1916 @parthandler('check:heads')
1917 def handlecheckheads(op, inpart):
1917 def handlecheckheads(op, inpart):
1918 """check that head of the repo did not change
1918 """check that head of the repo did not change
1919
1919
1920 This is used to detect a push race when using unbundle.
1920 This is used to detect a push race when using unbundle.
1921 This replaces the "heads" argument of unbundle."""
1921 This replaces the "heads" argument of unbundle."""
1922 h = inpart.read(20)
1922 h = inpart.read(20)
1923 heads = []
1923 heads = []
1924 while len(h) == 20:
1924 while len(h) == 20:
1925 heads.append(h)
1925 heads.append(h)
1926 h = inpart.read(20)
1926 h = inpart.read(20)
1927 assert not h
1927 assert not h
1928 # Trigger a transaction so that we are guaranteed to have the lock now.
1928 # Trigger a transaction so that we are guaranteed to have the lock now.
1929 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1929 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1930 op.gettransaction()
1930 op.gettransaction()
1931 if sorted(heads) != sorted(op.repo.heads()):
1931 if sorted(heads) != sorted(op.repo.heads()):
1932 raise error.PushRaced('repository changed while pushing - '
1932 raise error.PushRaced('repository changed while pushing - '
1933 'please try again')
1933 'please try again')
1934
1934
1935 @parthandler('check:updated-heads')
1935 @parthandler('check:updated-heads')
1936 def handlecheckupdatedheads(op, inpart):
1936 def handlecheckupdatedheads(op, inpart):
1937 """check for race on the heads touched by a push
1937 """check for race on the heads touched by a push
1938
1938
1939 This is similar to 'check:heads' but focus on the heads actually updated
1939 This is similar to 'check:heads' but focus on the heads actually updated
1940 during the push. If other activities happen on unrelated heads, it is
1940 during the push. If other activities happen on unrelated heads, it is
1941 ignored.
1941 ignored.
1942
1942
1943 This allow server with high traffic to avoid push contention as long as
1943 This allow server with high traffic to avoid push contention as long as
1944 unrelated parts of the graph are involved."""
1944 unrelated parts of the graph are involved."""
1945 h = inpart.read(20)
1945 h = inpart.read(20)
1946 heads = []
1946 heads = []
1947 while len(h) == 20:
1947 while len(h) == 20:
1948 heads.append(h)
1948 heads.append(h)
1949 h = inpart.read(20)
1949 h = inpart.read(20)
1950 assert not h
1950 assert not h
1951 # trigger a transaction so that we are guaranteed to have the lock now.
1951 # trigger a transaction so that we are guaranteed to have the lock now.
1952 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1952 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1953 op.gettransaction()
1953 op.gettransaction()
1954
1954
1955 currentheads = set()
1955 currentheads = set()
1956 for ls in op.repo.branchmap().itervalues():
1956 for ls in op.repo.branchmap().itervalues():
1957 currentheads.update(ls)
1957 currentheads.update(ls)
1958
1958
1959 for h in heads:
1959 for h in heads:
1960 if h not in currentheads:
1960 if h not in currentheads:
1961 raise error.PushRaced('repository changed while pushing - '
1961 raise error.PushRaced('repository changed while pushing - '
1962 'please try again')
1962 'please try again')
1963
1963
1964 @parthandler('check:phases')
1964 @parthandler('check:phases')
1965 def handlecheckphases(op, inpart):
1965 def handlecheckphases(op, inpart):
1966 """check that phase boundaries of the repository did not change
1966 """check that phase boundaries of the repository did not change
1967
1967
1968 This is used to detect a push race.
1968 This is used to detect a push race.
1969 """
1969 """
1970 phasetonodes = phases.binarydecode(inpart)
1970 phasetonodes = phases.binarydecode(inpart)
1971 unfi = op.repo.unfiltered()
1971 unfi = op.repo.unfiltered()
1972 cl = unfi.changelog
1972 cl = unfi.changelog
1973 phasecache = unfi._phasecache
1973 phasecache = unfi._phasecache
1974 msg = ('repository changed while pushing - please try again '
1974 msg = ('repository changed while pushing - please try again '
1975 '(%s is %s expected %s)')
1975 '(%s is %s expected %s)')
1976 for expectedphase, nodes in enumerate(phasetonodes):
1976 for expectedphase, nodes in enumerate(phasetonodes):
1977 for n in nodes:
1977 for n in nodes:
1978 actualphase = phasecache.phase(unfi, cl.rev(n))
1978 actualphase = phasecache.phase(unfi, cl.rev(n))
1979 if actualphase != expectedphase:
1979 if actualphase != expectedphase:
1980 finalmsg = msg % (nodemod.short(n),
1980 finalmsg = msg % (nodemod.short(n),
1981 phases.phasenames[actualphase],
1981 phases.phasenames[actualphase],
1982 phases.phasenames[expectedphase])
1982 phases.phasenames[expectedphase])
1983 raise error.PushRaced(finalmsg)
1983 raise error.PushRaced(finalmsg)
1984
1984
1985 @parthandler('output')
1985 @parthandler('output')
1986 def handleoutput(op, inpart):
1986 def handleoutput(op, inpart):
1987 """forward output captured on the server to the client"""
1987 """forward output captured on the server to the client"""
1988 for line in inpart.read().splitlines():
1988 for line in inpart.read().splitlines():
1989 op.ui.status(_('remote: %s\n') % line)
1989 op.ui.status(_('remote: %s\n') % line)
1990
1990
1991 @parthandler('replycaps')
1991 @parthandler('replycaps')
1992 def handlereplycaps(op, inpart):
1992 def handlereplycaps(op, inpart):
1993 """Notify that a reply bundle should be created
1993 """Notify that a reply bundle should be created
1994
1994
1995 The payload contains the capabilities information for the reply"""
1995 The payload contains the capabilities information for the reply"""
1996 caps = decodecaps(inpart.read())
1996 caps = decodecaps(inpart.read())
1997 if op.reply is None:
1997 if op.reply is None:
1998 op.reply = bundle20(op.ui, caps)
1998 op.reply = bundle20(op.ui, caps)
1999
1999
2000 class AbortFromPart(error.Abort):
2000 class AbortFromPart(error.Abort):
2001 """Sub-class of Abort that denotes an error from a bundle2 part."""
2001 """Sub-class of Abort that denotes an error from a bundle2 part."""
2002
2002
2003 @parthandler('error:abort', ('message', 'hint'))
2003 @parthandler('error:abort', ('message', 'hint'))
2004 def handleerrorabort(op, inpart):
2004 def handleerrorabort(op, inpart):
2005 """Used to transmit abort error over the wire"""
2005 """Used to transmit abort error over the wire"""
2006 raise AbortFromPart(inpart.params['message'],
2006 raise AbortFromPart(inpart.params['message'],
2007 hint=inpart.params.get('hint'))
2007 hint=inpart.params.get('hint'))
2008
2008
2009 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
2009 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
2010 'in-reply-to'))
2010 'in-reply-to'))
2011 def handleerrorpushkey(op, inpart):
2011 def handleerrorpushkey(op, inpart):
2012 """Used to transmit failure of a mandatory pushkey over the wire"""
2012 """Used to transmit failure of a mandatory pushkey over the wire"""
2013 kwargs = {}
2013 kwargs = {}
2014 for name in ('namespace', 'key', 'new', 'old', 'ret'):
2014 for name in ('namespace', 'key', 'new', 'old', 'ret'):
2015 value = inpart.params.get(name)
2015 value = inpart.params.get(name)
2016 if value is not None:
2016 if value is not None:
2017 kwargs[name] = value
2017 kwargs[name] = value
2018 raise error.PushkeyFailed(inpart.params['in-reply-to'],
2018 raise error.PushkeyFailed(inpart.params['in-reply-to'],
2019 **pycompat.strkwargs(kwargs))
2019 **pycompat.strkwargs(kwargs))
2020
2020
2021 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
2021 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
2022 def handleerrorunsupportedcontent(op, inpart):
2022 def handleerrorunsupportedcontent(op, inpart):
2023 """Used to transmit unknown content error over the wire"""
2023 """Used to transmit unknown content error over the wire"""
2024 kwargs = {}
2024 kwargs = {}
2025 parttype = inpart.params.get('parttype')
2025 parttype = inpart.params.get('parttype')
2026 if parttype is not None:
2026 if parttype is not None:
2027 kwargs['parttype'] = parttype
2027 kwargs['parttype'] = parttype
2028 params = inpart.params.get('params')
2028 params = inpart.params.get('params')
2029 if params is not None:
2029 if params is not None:
2030 kwargs['params'] = params.split('\0')
2030 kwargs['params'] = params.split('\0')
2031
2031
2032 raise error.BundleUnknownFeatureError(**pycompat.strkwargs(kwargs))
2032 raise error.BundleUnknownFeatureError(**pycompat.strkwargs(kwargs))
2033
2033
2034 @parthandler('error:pushraced', ('message',))
2034 @parthandler('error:pushraced', ('message',))
2035 def handleerrorpushraced(op, inpart):
2035 def handleerrorpushraced(op, inpart):
2036 """Used to transmit push race error over the wire"""
2036 """Used to transmit push race error over the wire"""
2037 raise error.ResponseError(_('push failed:'), inpart.params['message'])
2037 raise error.ResponseError(_('push failed:'), inpart.params['message'])
2038
2038
2039 @parthandler('listkeys', ('namespace',))
2039 @parthandler('listkeys', ('namespace',))
2040 def handlelistkeys(op, inpart):
2040 def handlelistkeys(op, inpart):
2041 """retrieve pushkey namespace content stored in a bundle2"""
2041 """retrieve pushkey namespace content stored in a bundle2"""
2042 namespace = inpart.params['namespace']
2042 namespace = inpart.params['namespace']
2043 r = pushkey.decodekeys(inpart.read())
2043 r = pushkey.decodekeys(inpart.read())
2044 op.records.add('listkeys', (namespace, r))
2044 op.records.add('listkeys', (namespace, r))
2045
2045
2046 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
2046 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
2047 def handlepushkey(op, inpart):
2047 def handlepushkey(op, inpart):
2048 """process a pushkey request"""
2048 """process a pushkey request"""
2049 dec = pushkey.decode
2049 dec = pushkey.decode
2050 namespace = dec(inpart.params['namespace'])
2050 namespace = dec(inpart.params['namespace'])
2051 key = dec(inpart.params['key'])
2051 key = dec(inpart.params['key'])
2052 old = dec(inpart.params['old'])
2052 old = dec(inpart.params['old'])
2053 new = dec(inpart.params['new'])
2053 new = dec(inpart.params['new'])
2054 # Grab the transaction to ensure that we have the lock before performing the
2054 # Grab the transaction to ensure that we have the lock before performing the
2055 # pushkey.
2055 # pushkey.
2056 if op.ui.configbool('experimental', 'bundle2lazylocking'):
2056 if op.ui.configbool('experimental', 'bundle2lazylocking'):
2057 op.gettransaction()
2057 op.gettransaction()
2058 ret = op.repo.pushkey(namespace, key, old, new)
2058 ret = op.repo.pushkey(namespace, key, old, new)
2059 record = {'namespace': namespace,
2059 record = {'namespace': namespace,
2060 'key': key,
2060 'key': key,
2061 'old': old,
2061 'old': old,
2062 'new': new}
2062 'new': new}
2063 op.records.add('pushkey', record)
2063 op.records.add('pushkey', record)
2064 if op.reply is not None:
2064 if op.reply is not None:
2065 rpart = op.reply.newpart('reply:pushkey')
2065 rpart = op.reply.newpart('reply:pushkey')
2066 rpart.addparam(
2066 rpart.addparam(
2067 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
2067 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
2068 rpart.addparam('return', '%i' % ret, mandatory=False)
2068 rpart.addparam('return', '%i' % ret, mandatory=False)
2069 if inpart.mandatory and not ret:
2069 if inpart.mandatory and not ret:
2070 kwargs = {}
2070 kwargs = {}
2071 for key in ('namespace', 'key', 'new', 'old', 'ret'):
2071 for key in ('namespace', 'key', 'new', 'old', 'ret'):
2072 if key in inpart.params:
2072 if key in inpart.params:
2073 kwargs[key] = inpart.params[key]
2073 kwargs[key] = inpart.params[key]
2074 raise error.PushkeyFailed(partid='%d' % inpart.id,
2074 raise error.PushkeyFailed(partid='%d' % inpart.id,
2075 **pycompat.strkwargs(kwargs))
2075 **pycompat.strkwargs(kwargs))
2076
2076
2077 @parthandler('bookmarks')
2077 @parthandler('bookmarks')
2078 def handlebookmark(op, inpart):
2078 def handlebookmark(op, inpart):
2079 """transmit bookmark information
2079 """transmit bookmark information
2080
2080
2081 The part contains binary encoded bookmark information.
2081 The part contains binary encoded bookmark information.
2082
2082
2083 The exact behavior of this part can be controlled by the 'bookmarks' mode
2083 The exact behavior of this part can be controlled by the 'bookmarks' mode
2084 on the bundle operation.
2084 on the bundle operation.
2085
2085
2086 When mode is 'apply' (the default) the bookmark information is applied as
2086 When mode is 'apply' (the default) the bookmark information is applied as
2087 is to the unbundling repository. Make sure a 'check:bookmarks' part is
2087 is to the unbundling repository. Make sure a 'check:bookmarks' part is
2088 issued earlier to check for push races in such update. This behavior is
2088 issued earlier to check for push races in such update. This behavior is
2089 suitable for pushing.
2089 suitable for pushing.
2090
2090
2091 When mode is 'records', the information is recorded into the 'bookmarks'
2091 When mode is 'records', the information is recorded into the 'bookmarks'
2092 records of the bundle operation. This behavior is suitable for pulling.
2092 records of the bundle operation. This behavior is suitable for pulling.
2093 """
2093 """
2094 changes = bookmarks.binarydecode(inpart)
2094 changes = bookmarks.binarydecode(inpart)
2095
2095
2096 pushkeycompat = op.repo.ui.configbool('server', 'bookmarks-pushkey-compat')
2096 pushkeycompat = op.repo.ui.configbool('server', 'bookmarks-pushkey-compat')
2097 bookmarksmode = op.modes.get('bookmarks', 'apply')
2097 bookmarksmode = op.modes.get('bookmarks', 'apply')
2098
2098
2099 if bookmarksmode == 'apply':
2099 if bookmarksmode == 'apply':
2100 tr = op.gettransaction()
2100 tr = op.gettransaction()
2101 bookstore = op.repo._bookmarks
2101 bookstore = op.repo._bookmarks
2102 if pushkeycompat:
2102 if pushkeycompat:
2103 allhooks = []
2103 allhooks = []
2104 for book, node in changes:
2104 for book, node in changes:
2105 hookargs = tr.hookargs.copy()
2105 hookargs = tr.hookargs.copy()
2106 hookargs['pushkeycompat'] = '1'
2106 hookargs['pushkeycompat'] = '1'
2107 hookargs['namespace'] = 'bookmarks'
2107 hookargs['namespace'] = 'bookmarks'
2108 hookargs['key'] = book
2108 hookargs['key'] = book
2109 hookargs['old'] = nodemod.hex(bookstore.get(book, ''))
2109 hookargs['old'] = nodemod.hex(bookstore.get(book, ''))
2110 hookargs['new'] = nodemod.hex(node if node is not None else '')
2110 hookargs['new'] = nodemod.hex(node if node is not None else '')
2111 allhooks.append(hookargs)
2111 allhooks.append(hookargs)
2112
2112
2113 for hookargs in allhooks:
2113 for hookargs in allhooks:
2114 op.repo.hook('prepushkey', throw=True,
2114 op.repo.hook('prepushkey', throw=True,
2115 **pycompat.strkwargs(hookargs))
2115 **pycompat.strkwargs(hookargs))
2116
2116
2117 bookstore.applychanges(op.repo, op.gettransaction(), changes)
2117 bookstore.applychanges(op.repo, op.gettransaction(), changes)
2118
2118
2119 if pushkeycompat:
2119 if pushkeycompat:
2120 def runhook():
2120 def runhook():
2121 for hookargs in allhooks:
2121 for hookargs in allhooks:
2122 op.repo.hook('pushkey', **pycompat.strkwargs(hookargs))
2122 op.repo.hook('pushkey', **pycompat.strkwargs(hookargs))
2123 op.repo._afterlock(runhook)
2123 op.repo._afterlock(runhook)
2124
2124
2125 elif bookmarksmode == 'records':
2125 elif bookmarksmode == 'records':
2126 for book, node in changes:
2126 for book, node in changes:
2127 record = {'bookmark': book, 'node': node}
2127 record = {'bookmark': book, 'node': node}
2128 op.records.add('bookmarks', record)
2128 op.records.add('bookmarks', record)
2129 else:
2129 else:
2130 raise error.ProgrammingError('unkown bookmark mode: %s' % bookmarksmode)
2130 raise error.ProgrammingError('unkown bookmark mode: %s' % bookmarksmode)
2131
2131
2132 @parthandler('phase-heads')
2132 @parthandler('phase-heads')
2133 def handlephases(op, inpart):
2133 def handlephases(op, inpart):
2134 """apply phases from bundle part to repo"""
2134 """apply phases from bundle part to repo"""
2135 headsbyphase = phases.binarydecode(inpart)
2135 headsbyphase = phases.binarydecode(inpart)
2136 phases.updatephases(op.repo.unfiltered(), op.gettransaction, headsbyphase)
2136 phases.updatephases(op.repo.unfiltered(), op.gettransaction, headsbyphase)
2137
2137
2138 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
2138 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
2139 def handlepushkeyreply(op, inpart):
2139 def handlepushkeyreply(op, inpart):
2140 """retrieve the result of a pushkey request"""
2140 """retrieve the result of a pushkey request"""
2141 ret = int(inpart.params['return'])
2141 ret = int(inpart.params['return'])
2142 partid = int(inpart.params['in-reply-to'])
2142 partid = int(inpart.params['in-reply-to'])
2143 op.records.add('pushkey', {'return': ret}, partid)
2143 op.records.add('pushkey', {'return': ret}, partid)
2144
2144
2145 @parthandler('obsmarkers')
2145 @parthandler('obsmarkers')
2146 def handleobsmarker(op, inpart):
2146 def handleobsmarker(op, inpart):
2147 """add a stream of obsmarkers to the repo"""
2147 """add a stream of obsmarkers to the repo"""
2148 tr = op.gettransaction()
2148 tr = op.gettransaction()
2149 markerdata = inpart.read()
2149 markerdata = inpart.read()
2150 if op.ui.config('experimental', 'obsmarkers-exchange-debug'):
2150 if op.ui.config('experimental', 'obsmarkers-exchange-debug'):
2151 op.ui.write(('obsmarker-exchange: %i bytes received\n')
2151 op.ui.write(('obsmarker-exchange: %i bytes received\n')
2152 % len(markerdata))
2152 % len(markerdata))
2153 # The mergemarkers call will crash if marker creation is not enabled.
2153 # The mergemarkers call will crash if marker creation is not enabled.
2154 # we want to avoid this if the part is advisory.
2154 # we want to avoid this if the part is advisory.
2155 if not inpart.mandatory and op.repo.obsstore.readonly:
2155 if not inpart.mandatory and op.repo.obsstore.readonly:
2156 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled\n')
2156 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled\n')
2157 return
2157 return
2158 new = op.repo.obsstore.mergemarkers(tr, markerdata)
2158 new = op.repo.obsstore.mergemarkers(tr, markerdata)
2159 op.repo.invalidatevolatilesets()
2159 op.repo.invalidatevolatilesets()
2160 if new:
2160 if new:
2161 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
2161 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
2162 op.records.add('obsmarkers', {'new': new})
2162 op.records.add('obsmarkers', {'new': new})
2163 if op.reply is not None:
2163 if op.reply is not None:
2164 rpart = op.reply.newpart('reply:obsmarkers')
2164 rpart = op.reply.newpart('reply:obsmarkers')
2165 rpart.addparam(
2165 rpart.addparam(
2166 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
2166 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
2167 rpart.addparam('new', '%i' % new, mandatory=False)
2167 rpart.addparam('new', '%i' % new, mandatory=False)
2168
2168
2169
2169
2170 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
2170 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
2171 def handleobsmarkerreply(op, inpart):
2171 def handleobsmarkerreply(op, inpart):
2172 """retrieve the result of a pushkey request"""
2172 """retrieve the result of a pushkey request"""
2173 ret = int(inpart.params['new'])
2173 ret = int(inpart.params['new'])
2174 partid = int(inpart.params['in-reply-to'])
2174 partid = int(inpart.params['in-reply-to'])
2175 op.records.add('obsmarkers', {'new': ret}, partid)
2175 op.records.add('obsmarkers', {'new': ret}, partid)
2176
2176
2177 @parthandler('hgtagsfnodes')
2177 @parthandler('hgtagsfnodes')
2178 def handlehgtagsfnodes(op, inpart):
2178 def handlehgtagsfnodes(op, inpart):
2179 """Applies .hgtags fnodes cache entries to the local repo.
2179 """Applies .hgtags fnodes cache entries to the local repo.
2180
2180
2181 Payload is pairs of 20 byte changeset nodes and filenodes.
2181 Payload is pairs of 20 byte changeset nodes and filenodes.
2182 """
2182 """
2183 # Grab the transaction so we ensure that we have the lock at this point.
2183 # Grab the transaction so we ensure that we have the lock at this point.
2184 if op.ui.configbool('experimental', 'bundle2lazylocking'):
2184 if op.ui.configbool('experimental', 'bundle2lazylocking'):
2185 op.gettransaction()
2185 op.gettransaction()
2186 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
2186 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
2187
2187
2188 count = 0
2188 count = 0
2189 while True:
2189 while True:
2190 node = inpart.read(20)
2190 node = inpart.read(20)
2191 fnode = inpart.read(20)
2191 fnode = inpart.read(20)
2192 if len(node) < 20 or len(fnode) < 20:
2192 if len(node) < 20 or len(fnode) < 20:
2193 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
2193 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
2194 break
2194 break
2195 cache.setfnode(node, fnode)
2195 cache.setfnode(node, fnode)
2196 count += 1
2196 count += 1
2197
2197
2198 cache.write()
2198 cache.write()
2199 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
2199 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
2200
2200
2201 rbcstruct = struct.Struct('>III')
2201 rbcstruct = struct.Struct('>III')
2202
2202
2203 @parthandler('cache:rev-branch-cache')
2203 @parthandler('cache:rev-branch-cache')
2204 def handlerbc(op, inpart):
2204 def handlerbc(op, inpart):
2205 """receive a rev-branch-cache payload and update the local cache
2205 """receive a rev-branch-cache payload and update the local cache
2206
2206
2207 The payload is a series of data related to each branch
2207 The payload is a series of data related to each branch
2208
2208
2209 1) branch name length
2209 1) branch name length
2210 2) number of open heads
2210 2) number of open heads
2211 3) number of closed heads
2211 3) number of closed heads
2212 4) open heads nodes
2212 4) open heads nodes
2213 5) closed heads nodes
2213 5) closed heads nodes
2214 """
2214 """
2215 total = 0
2215 total = 0
2216 rawheader = inpart.read(rbcstruct.size)
2216 rawheader = inpart.read(rbcstruct.size)
2217 cache = op.repo.revbranchcache()
2217 cache = op.repo.revbranchcache()
2218 cl = op.repo.unfiltered().changelog
2218 cl = op.repo.unfiltered().changelog
2219 while rawheader:
2219 while rawheader:
2220 header = rbcstruct.unpack(rawheader)
2220 header = rbcstruct.unpack(rawheader)
2221 total += header[1] + header[2]
2221 total += header[1] + header[2]
2222 utf8branch = inpart.read(header[0])
2222 utf8branch = inpart.read(header[0])
2223 branch = encoding.tolocal(utf8branch)
2223 branch = encoding.tolocal(utf8branch)
2224 for x in xrange(header[1]):
2224 for x in xrange(header[1]):
2225 node = inpart.read(20)
2225 node = inpart.read(20)
2226 rev = cl.rev(node)
2226 rev = cl.rev(node)
2227 cache.setdata(branch, rev, node, False)
2227 cache.setdata(branch, rev, node, False)
2228 for x in xrange(header[2]):
2228 for x in xrange(header[2]):
2229 node = inpart.read(20)
2229 node = inpart.read(20)
2230 rev = cl.rev(node)
2230 rev = cl.rev(node)
2231 cache.setdata(branch, rev, node, True)
2231 cache.setdata(branch, rev, node, True)
2232 rawheader = inpart.read(rbcstruct.size)
2232 rawheader = inpart.read(rbcstruct.size)
2233 cache.write()
2233 cache.write()
2234
2234
2235 @parthandler('pushvars')
2235 @parthandler('pushvars')
2236 def bundle2getvars(op, part):
2236 def bundle2getvars(op, part):
2237 '''unbundle a bundle2 containing shellvars on the server'''
2237 '''unbundle a bundle2 containing shellvars on the server'''
2238 # An option to disable unbundling on server-side for security reasons
2238 # An option to disable unbundling on server-side for security reasons
2239 if op.ui.configbool('push', 'pushvars.server'):
2239 if op.ui.configbool('push', 'pushvars.server'):
2240 hookargs = {}
2240 hookargs = {}
2241 for key, value in part.advisoryparams:
2241 for key, value in part.advisoryparams:
2242 key = key.upper()
2242 key = key.upper()
2243 # We want pushed variables to have USERVAR_ prepended so we know
2243 # We want pushed variables to have USERVAR_ prepended so we know
2244 # they came from the --pushvar flag.
2244 # they came from the --pushvar flag.
2245 key = "USERVAR_" + key
2245 key = "USERVAR_" + key
2246 hookargs[key] = value
2246 hookargs[key] = value
2247 op.addhookargs(hookargs)
2247 op.addhookargs(hookargs)
2248
2248
2249 @parthandler('stream2', ('requirements', 'filecount', 'bytecount'))
2249 @parthandler('stream2', ('requirements', 'filecount', 'bytecount'))
2250 def handlestreamv2bundle(op, part):
2250 def handlestreamv2bundle(op, part):
2251
2251
2252 requirements = urlreq.unquote(part.params['requirements']).split(',')
2252 requirements = urlreq.unquote(part.params['requirements']).split(',')
2253 filecount = int(part.params['filecount'])
2253 filecount = int(part.params['filecount'])
2254 bytecount = int(part.params['bytecount'])
2254 bytecount = int(part.params['bytecount'])
2255
2255
2256 repo = op.repo
2256 repo = op.repo
2257 if len(repo):
2257 if len(repo):
2258 msg = _('cannot apply stream clone to non empty repository')
2258 msg = _('cannot apply stream clone to non empty repository')
2259 raise error.Abort(msg)
2259 raise error.Abort(msg)
2260
2260
2261 repo.ui.debug('applying stream bundle\n')
2261 repo.ui.debug('applying stream bundle\n')
2262 streamclone.applybundlev2(repo, part, filecount, bytecount,
2262 streamclone.applybundlev2(repo, part, filecount, bytecount,
2263 requirements)
2263 requirements)
General Comments 0
You need to be logged in to leave comments. Login now