##// END OF EJS Templates
streamclone: move requirement update into consumev2...
Boris Feld -
r35822:2d3e486d stable
parent child Browse files
Show More
@@ -1,2165 +1,2157 b''
1 # bundle2.py - generic container format to transmit arbitrary data.
1 # bundle2.py - generic container format to transmit arbitrary data.
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7 """Handling of the new bundle2 format
7 """Handling of the new bundle2 format
8
8
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 payloads in an application agnostic way. It consist in a sequence of "parts"
10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 that will be handed to and processed by the application layer.
11 that will be handed to and processed by the application layer.
12
12
13
13
14 General format architecture
14 General format architecture
15 ===========================
15 ===========================
16
16
17 The format is architectured as follow
17 The format is architectured as follow
18
18
19 - magic string
19 - magic string
20 - stream level parameters
20 - stream level parameters
21 - payload parts (any number)
21 - payload parts (any number)
22 - end of stream marker.
22 - end of stream marker.
23
23
24 the Binary format
24 the Binary format
25 ============================
25 ============================
26
26
27 All numbers are unsigned and big-endian.
27 All numbers are unsigned and big-endian.
28
28
29 stream level parameters
29 stream level parameters
30 ------------------------
30 ------------------------
31
31
32 Binary format is as follow
32 Binary format is as follow
33
33
34 :params size: int32
34 :params size: int32
35
35
36 The total number of Bytes used by the parameters
36 The total number of Bytes used by the parameters
37
37
38 :params value: arbitrary number of Bytes
38 :params value: arbitrary number of Bytes
39
39
40 A blob of `params size` containing the serialized version of all stream level
40 A blob of `params size` containing the serialized version of all stream level
41 parameters.
41 parameters.
42
42
43 The blob contains a space separated list of parameters. Parameters with value
43 The blob contains a space separated list of parameters. Parameters with value
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45
45
46 Empty name are obviously forbidden.
46 Empty name are obviously forbidden.
47
47
48 Name MUST start with a letter. If this first letter is lower case, the
48 Name MUST start with a letter. If this first letter is lower case, the
49 parameter is advisory and can be safely ignored. However when the first
49 parameter is advisory and can be safely ignored. However when the first
50 letter is capital, the parameter is mandatory and the bundling process MUST
50 letter is capital, the parameter is mandatory and the bundling process MUST
51 stop if he is not able to proceed it.
51 stop if he is not able to proceed it.
52
52
53 Stream parameters use a simple textual format for two main reasons:
53 Stream parameters use a simple textual format for two main reasons:
54
54
55 - Stream level parameters should remain simple and we want to discourage any
55 - Stream level parameters should remain simple and we want to discourage any
56 crazy usage.
56 crazy usage.
57 - Textual data allow easy human inspection of a bundle2 header in case of
57 - Textual data allow easy human inspection of a bundle2 header in case of
58 troubles.
58 troubles.
59
59
60 Any Applicative level options MUST go into a bundle2 part instead.
60 Any Applicative level options MUST go into a bundle2 part instead.
61
61
62 Payload part
62 Payload part
63 ------------------------
63 ------------------------
64
64
65 Binary format is as follow
65 Binary format is as follow
66
66
67 :header size: int32
67 :header size: int32
68
68
69 The total number of Bytes used by the part header. When the header is empty
69 The total number of Bytes used by the part header. When the header is empty
70 (size = 0) this is interpreted as the end of stream marker.
70 (size = 0) this is interpreted as the end of stream marker.
71
71
72 :header:
72 :header:
73
73
74 The header defines how to interpret the part. It contains two piece of
74 The header defines how to interpret the part. It contains two piece of
75 data: the part type, and the part parameters.
75 data: the part type, and the part parameters.
76
76
77 The part type is used to route an application level handler, that can
77 The part type is used to route an application level handler, that can
78 interpret payload.
78 interpret payload.
79
79
80 Part parameters are passed to the application level handler. They are
80 Part parameters are passed to the application level handler. They are
81 meant to convey information that will help the application level object to
81 meant to convey information that will help the application level object to
82 interpret the part payload.
82 interpret the part payload.
83
83
84 The binary format of the header is has follow
84 The binary format of the header is has follow
85
85
86 :typesize: (one byte)
86 :typesize: (one byte)
87
87
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89
89
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 to this part.
91 to this part.
92
92
93 :parameters:
93 :parameters:
94
94
95 Part's parameter may have arbitrary content, the binary structure is::
95 Part's parameter may have arbitrary content, the binary structure is::
96
96
97 <mandatory-count><advisory-count><param-sizes><param-data>
97 <mandatory-count><advisory-count><param-sizes><param-data>
98
98
99 :mandatory-count: 1 byte, number of mandatory parameters
99 :mandatory-count: 1 byte, number of mandatory parameters
100
100
101 :advisory-count: 1 byte, number of advisory parameters
101 :advisory-count: 1 byte, number of advisory parameters
102
102
103 :param-sizes:
103 :param-sizes:
104
104
105 N couple of bytes, where N is the total number of parameters. Each
105 N couple of bytes, where N is the total number of parameters. Each
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107
107
108 :param-data:
108 :param-data:
109
109
110 A blob of bytes from which each parameter key and value can be
110 A blob of bytes from which each parameter key and value can be
111 retrieved using the list of size couples stored in the previous
111 retrieved using the list of size couples stored in the previous
112 field.
112 field.
113
113
114 Mandatory parameters comes first, then the advisory ones.
114 Mandatory parameters comes first, then the advisory ones.
115
115
116 Each parameter's key MUST be unique within the part.
116 Each parameter's key MUST be unique within the part.
117
117
118 :payload:
118 :payload:
119
119
120 payload is a series of `<chunksize><chunkdata>`.
120 payload is a series of `<chunksize><chunkdata>`.
121
121
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124
124
125 The current implementation always produces either zero or one chunk.
125 The current implementation always produces either zero or one chunk.
126 This is an implementation limitation that will ultimately be lifted.
126 This is an implementation limitation that will ultimately be lifted.
127
127
128 `chunksize` can be negative to trigger special case processing. No such
128 `chunksize` can be negative to trigger special case processing. No such
129 processing is in place yet.
129 processing is in place yet.
130
130
131 Bundle processing
131 Bundle processing
132 ============================
132 ============================
133
133
134 Each part is processed in order using a "part handler". Handler are registered
134 Each part is processed in order using a "part handler". Handler are registered
135 for a certain part type.
135 for a certain part type.
136
136
137 The matching of a part to its handler is case insensitive. The case of the
137 The matching of a part to its handler is case insensitive. The case of the
138 part type is used to know if a part is mandatory or advisory. If the Part type
138 part type is used to know if a part is mandatory or advisory. If the Part type
139 contains any uppercase char it is considered mandatory. When no handler is
139 contains any uppercase char it is considered mandatory. When no handler is
140 known for a Mandatory part, the process is aborted and an exception is raised.
140 known for a Mandatory part, the process is aborted and an exception is raised.
141 If the part is advisory and no handler is known, the part is ignored. When the
141 If the part is advisory and no handler is known, the part is ignored. When the
142 process is aborted, the full bundle is still read from the stream to keep the
142 process is aborted, the full bundle is still read from the stream to keep the
143 channel usable. But none of the part read from an abort are processed. In the
143 channel usable. But none of the part read from an abort are processed. In the
144 future, dropping the stream may become an option for channel we do not care to
144 future, dropping the stream may become an option for channel we do not care to
145 preserve.
145 preserve.
146 """
146 """
147
147
148 from __future__ import absolute_import, division
148 from __future__ import absolute_import, division
149
149
150 import errno
150 import errno
151 import os
151 import os
152 import re
152 import re
153 import string
153 import string
154 import struct
154 import struct
155 import sys
155 import sys
156
156
157 from .i18n import _
157 from .i18n import _
158 from . import (
158 from . import (
159 bookmarks,
159 bookmarks,
160 changegroup,
160 changegroup,
161 error,
161 error,
162 node as nodemod,
162 node as nodemod,
163 obsolete,
163 obsolete,
164 phases,
164 phases,
165 pushkey,
165 pushkey,
166 pycompat,
166 pycompat,
167 streamclone,
167 streamclone,
168 tags,
168 tags,
169 url,
169 url,
170 util,
170 util,
171 )
171 )
172
172
173 urlerr = util.urlerr
173 urlerr = util.urlerr
174 urlreq = util.urlreq
174 urlreq = util.urlreq
175
175
176 _pack = struct.pack
176 _pack = struct.pack
177 _unpack = struct.unpack
177 _unpack = struct.unpack
178
178
179 _fstreamparamsize = '>i'
179 _fstreamparamsize = '>i'
180 _fpartheadersize = '>i'
180 _fpartheadersize = '>i'
181 _fparttypesize = '>B'
181 _fparttypesize = '>B'
182 _fpartid = '>I'
182 _fpartid = '>I'
183 _fpayloadsize = '>i'
183 _fpayloadsize = '>i'
184 _fpartparamcount = '>BB'
184 _fpartparamcount = '>BB'
185
185
186 preferedchunksize = 32768
186 preferedchunksize = 32768
187
187
188 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
188 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
189
189
190 def outdebug(ui, message):
190 def outdebug(ui, message):
191 """debug regarding output stream (bundling)"""
191 """debug regarding output stream (bundling)"""
192 if ui.configbool('devel', 'bundle2.debug'):
192 if ui.configbool('devel', 'bundle2.debug'):
193 ui.debug('bundle2-output: %s\n' % message)
193 ui.debug('bundle2-output: %s\n' % message)
194
194
195 def indebug(ui, message):
195 def indebug(ui, message):
196 """debug on input stream (unbundling)"""
196 """debug on input stream (unbundling)"""
197 if ui.configbool('devel', 'bundle2.debug'):
197 if ui.configbool('devel', 'bundle2.debug'):
198 ui.debug('bundle2-input: %s\n' % message)
198 ui.debug('bundle2-input: %s\n' % message)
199
199
200 def validateparttype(parttype):
200 def validateparttype(parttype):
201 """raise ValueError if a parttype contains invalid character"""
201 """raise ValueError if a parttype contains invalid character"""
202 if _parttypeforbidden.search(parttype):
202 if _parttypeforbidden.search(parttype):
203 raise ValueError(parttype)
203 raise ValueError(parttype)
204
204
205 def _makefpartparamsizes(nbparams):
205 def _makefpartparamsizes(nbparams):
206 """return a struct format to read part parameter sizes
206 """return a struct format to read part parameter sizes
207
207
208 The number parameters is variable so we need to build that format
208 The number parameters is variable so we need to build that format
209 dynamically.
209 dynamically.
210 """
210 """
211 return '>'+('BB'*nbparams)
211 return '>'+('BB'*nbparams)
212
212
213 parthandlermapping = {}
213 parthandlermapping = {}
214
214
215 def parthandler(parttype, params=()):
215 def parthandler(parttype, params=()):
216 """decorator that register a function as a bundle2 part handler
216 """decorator that register a function as a bundle2 part handler
217
217
218 eg::
218 eg::
219
219
220 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
220 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
221 def myparttypehandler(...):
221 def myparttypehandler(...):
222 '''process a part of type "my part".'''
222 '''process a part of type "my part".'''
223 ...
223 ...
224 """
224 """
225 validateparttype(parttype)
225 validateparttype(parttype)
226 def _decorator(func):
226 def _decorator(func):
227 lparttype = parttype.lower() # enforce lower case matching.
227 lparttype = parttype.lower() # enforce lower case matching.
228 assert lparttype not in parthandlermapping
228 assert lparttype not in parthandlermapping
229 parthandlermapping[lparttype] = func
229 parthandlermapping[lparttype] = func
230 func.params = frozenset(params)
230 func.params = frozenset(params)
231 return func
231 return func
232 return _decorator
232 return _decorator
233
233
234 class unbundlerecords(object):
234 class unbundlerecords(object):
235 """keep record of what happens during and unbundle
235 """keep record of what happens during and unbundle
236
236
237 New records are added using `records.add('cat', obj)`. Where 'cat' is a
237 New records are added using `records.add('cat', obj)`. Where 'cat' is a
238 category of record and obj is an arbitrary object.
238 category of record and obj is an arbitrary object.
239
239
240 `records['cat']` will return all entries of this category 'cat'.
240 `records['cat']` will return all entries of this category 'cat'.
241
241
242 Iterating on the object itself will yield `('category', obj)` tuples
242 Iterating on the object itself will yield `('category', obj)` tuples
243 for all entries.
243 for all entries.
244
244
245 All iterations happens in chronological order.
245 All iterations happens in chronological order.
246 """
246 """
247
247
248 def __init__(self):
248 def __init__(self):
249 self._categories = {}
249 self._categories = {}
250 self._sequences = []
250 self._sequences = []
251 self._replies = {}
251 self._replies = {}
252
252
253 def add(self, category, entry, inreplyto=None):
253 def add(self, category, entry, inreplyto=None):
254 """add a new record of a given category.
254 """add a new record of a given category.
255
255
256 The entry can then be retrieved in the list returned by
256 The entry can then be retrieved in the list returned by
257 self['category']."""
257 self['category']."""
258 self._categories.setdefault(category, []).append(entry)
258 self._categories.setdefault(category, []).append(entry)
259 self._sequences.append((category, entry))
259 self._sequences.append((category, entry))
260 if inreplyto is not None:
260 if inreplyto is not None:
261 self.getreplies(inreplyto).add(category, entry)
261 self.getreplies(inreplyto).add(category, entry)
262
262
263 def getreplies(self, partid):
263 def getreplies(self, partid):
264 """get the records that are replies to a specific part"""
264 """get the records that are replies to a specific part"""
265 return self._replies.setdefault(partid, unbundlerecords())
265 return self._replies.setdefault(partid, unbundlerecords())
266
266
267 def __getitem__(self, cat):
267 def __getitem__(self, cat):
268 return tuple(self._categories.get(cat, ()))
268 return tuple(self._categories.get(cat, ()))
269
269
270 def __iter__(self):
270 def __iter__(self):
271 return iter(self._sequences)
271 return iter(self._sequences)
272
272
273 def __len__(self):
273 def __len__(self):
274 return len(self._sequences)
274 return len(self._sequences)
275
275
276 def __nonzero__(self):
276 def __nonzero__(self):
277 return bool(self._sequences)
277 return bool(self._sequences)
278
278
279 __bool__ = __nonzero__
279 __bool__ = __nonzero__
280
280
281 class bundleoperation(object):
281 class bundleoperation(object):
282 """an object that represents a single bundling process
282 """an object that represents a single bundling process
283
283
284 Its purpose is to carry unbundle-related objects and states.
284 Its purpose is to carry unbundle-related objects and states.
285
285
286 A new object should be created at the beginning of each bundle processing.
286 A new object should be created at the beginning of each bundle processing.
287 The object is to be returned by the processing function.
287 The object is to be returned by the processing function.
288
288
289 The object has very little content now it will ultimately contain:
289 The object has very little content now it will ultimately contain:
290 * an access to the repo the bundle is applied to,
290 * an access to the repo the bundle is applied to,
291 * a ui object,
291 * a ui object,
292 * a way to retrieve a transaction to add changes to the repo,
292 * a way to retrieve a transaction to add changes to the repo,
293 * a way to record the result of processing each part,
293 * a way to record the result of processing each part,
294 * a way to construct a bundle response when applicable.
294 * a way to construct a bundle response when applicable.
295 """
295 """
296
296
297 def __init__(self, repo, transactiongetter, captureoutput=True):
297 def __init__(self, repo, transactiongetter, captureoutput=True):
298 self.repo = repo
298 self.repo = repo
299 self.ui = repo.ui
299 self.ui = repo.ui
300 self.records = unbundlerecords()
300 self.records = unbundlerecords()
301 self.reply = None
301 self.reply = None
302 self.captureoutput = captureoutput
302 self.captureoutput = captureoutput
303 self.hookargs = {}
303 self.hookargs = {}
304 self._gettransaction = transactiongetter
304 self._gettransaction = transactiongetter
305 # carries value that can modify part behavior
305 # carries value that can modify part behavior
306 self.modes = {}
306 self.modes = {}
307
307
308 def gettransaction(self):
308 def gettransaction(self):
309 transaction = self._gettransaction()
309 transaction = self._gettransaction()
310
310
311 if self.hookargs:
311 if self.hookargs:
312 # the ones added to the transaction supercede those added
312 # the ones added to the transaction supercede those added
313 # to the operation.
313 # to the operation.
314 self.hookargs.update(transaction.hookargs)
314 self.hookargs.update(transaction.hookargs)
315 transaction.hookargs = self.hookargs
315 transaction.hookargs = self.hookargs
316
316
317 # mark the hookargs as flushed. further attempts to add to
317 # mark the hookargs as flushed. further attempts to add to
318 # hookargs will result in an abort.
318 # hookargs will result in an abort.
319 self.hookargs = None
319 self.hookargs = None
320
320
321 return transaction
321 return transaction
322
322
323 def addhookargs(self, hookargs):
323 def addhookargs(self, hookargs):
324 if self.hookargs is None:
324 if self.hookargs is None:
325 raise error.ProgrammingError('attempted to add hookargs to '
325 raise error.ProgrammingError('attempted to add hookargs to '
326 'operation after transaction started')
326 'operation after transaction started')
327 self.hookargs.update(hookargs)
327 self.hookargs.update(hookargs)
328
328
329 class TransactionUnavailable(RuntimeError):
329 class TransactionUnavailable(RuntimeError):
330 pass
330 pass
331
331
332 def _notransaction():
332 def _notransaction():
333 """default method to get a transaction while processing a bundle
333 """default method to get a transaction while processing a bundle
334
334
335 Raise an exception to highlight the fact that no transaction was expected
335 Raise an exception to highlight the fact that no transaction was expected
336 to be created"""
336 to be created"""
337 raise TransactionUnavailable()
337 raise TransactionUnavailable()
338
338
339 def applybundle(repo, unbundler, tr, source=None, url=None, **kwargs):
339 def applybundle(repo, unbundler, tr, source=None, url=None, **kwargs):
340 # transform me into unbundler.apply() as soon as the freeze is lifted
340 # transform me into unbundler.apply() as soon as the freeze is lifted
341 if isinstance(unbundler, unbundle20):
341 if isinstance(unbundler, unbundle20):
342 tr.hookargs['bundle2'] = '1'
342 tr.hookargs['bundle2'] = '1'
343 if source is not None and 'source' not in tr.hookargs:
343 if source is not None and 'source' not in tr.hookargs:
344 tr.hookargs['source'] = source
344 tr.hookargs['source'] = source
345 if url is not None and 'url' not in tr.hookargs:
345 if url is not None and 'url' not in tr.hookargs:
346 tr.hookargs['url'] = url
346 tr.hookargs['url'] = url
347 return processbundle(repo, unbundler, lambda: tr)
347 return processbundle(repo, unbundler, lambda: tr)
348 else:
348 else:
349 # the transactiongetter won't be used, but we might as well set it
349 # the transactiongetter won't be used, but we might as well set it
350 op = bundleoperation(repo, lambda: tr)
350 op = bundleoperation(repo, lambda: tr)
351 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
351 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
352 return op
352 return op
353
353
354 class partiterator(object):
354 class partiterator(object):
355 def __init__(self, repo, op, unbundler):
355 def __init__(self, repo, op, unbundler):
356 self.repo = repo
356 self.repo = repo
357 self.op = op
357 self.op = op
358 self.unbundler = unbundler
358 self.unbundler = unbundler
359 self.iterator = None
359 self.iterator = None
360 self.count = 0
360 self.count = 0
361 self.current = None
361 self.current = None
362
362
363 def __enter__(self):
363 def __enter__(self):
364 def func():
364 def func():
365 itr = enumerate(self.unbundler.iterparts())
365 itr = enumerate(self.unbundler.iterparts())
366 for count, p in itr:
366 for count, p in itr:
367 self.count = count
367 self.count = count
368 self.current = p
368 self.current = p
369 yield p
369 yield p
370 p.consume()
370 p.consume()
371 self.current = None
371 self.current = None
372 self.iterator = func()
372 self.iterator = func()
373 return self.iterator
373 return self.iterator
374
374
375 def __exit__(self, type, exc, tb):
375 def __exit__(self, type, exc, tb):
376 if not self.iterator:
376 if not self.iterator:
377 return
377 return
378
378
379 # Only gracefully abort in a normal exception situation. User aborts
379 # Only gracefully abort in a normal exception situation. User aborts
380 # like Ctrl+C throw a KeyboardInterrupt which is not a base Exception,
380 # like Ctrl+C throw a KeyboardInterrupt which is not a base Exception,
381 # and should not gracefully cleanup.
381 # and should not gracefully cleanup.
382 if isinstance(exc, Exception):
382 if isinstance(exc, Exception):
383 # Any exceptions seeking to the end of the bundle at this point are
383 # Any exceptions seeking to the end of the bundle at this point are
384 # almost certainly related to the underlying stream being bad.
384 # almost certainly related to the underlying stream being bad.
385 # And, chances are that the exception we're handling is related to
385 # And, chances are that the exception we're handling is related to
386 # getting in that bad state. So, we swallow the seeking error and
386 # getting in that bad state. So, we swallow the seeking error and
387 # re-raise the original error.
387 # re-raise the original error.
388 seekerror = False
388 seekerror = False
389 try:
389 try:
390 if self.current:
390 if self.current:
391 # consume the part content to not corrupt the stream.
391 # consume the part content to not corrupt the stream.
392 self.current.consume()
392 self.current.consume()
393
393
394 for part in self.iterator:
394 for part in self.iterator:
395 # consume the bundle content
395 # consume the bundle content
396 part.consume()
396 part.consume()
397 except Exception:
397 except Exception:
398 seekerror = True
398 seekerror = True
399
399
400 # Small hack to let caller code distinguish exceptions from bundle2
400 # Small hack to let caller code distinguish exceptions from bundle2
401 # processing from processing the old format. This is mostly needed
401 # processing from processing the old format. This is mostly needed
402 # to handle different return codes to unbundle according to the type
402 # to handle different return codes to unbundle according to the type
403 # of bundle. We should probably clean up or drop this return code
403 # of bundle. We should probably clean up or drop this return code
404 # craziness in a future version.
404 # craziness in a future version.
405 exc.duringunbundle2 = True
405 exc.duringunbundle2 = True
406 salvaged = []
406 salvaged = []
407 replycaps = None
407 replycaps = None
408 if self.op.reply is not None:
408 if self.op.reply is not None:
409 salvaged = self.op.reply.salvageoutput()
409 salvaged = self.op.reply.salvageoutput()
410 replycaps = self.op.reply.capabilities
410 replycaps = self.op.reply.capabilities
411 exc._replycaps = replycaps
411 exc._replycaps = replycaps
412 exc._bundle2salvagedoutput = salvaged
412 exc._bundle2salvagedoutput = salvaged
413
413
414 # Re-raising from a variable loses the original stack. So only use
414 # Re-raising from a variable loses the original stack. So only use
415 # that form if we need to.
415 # that form if we need to.
416 if seekerror:
416 if seekerror:
417 raise exc
417 raise exc
418
418
419 self.repo.ui.debug('bundle2-input-bundle: %i parts total\n' %
419 self.repo.ui.debug('bundle2-input-bundle: %i parts total\n' %
420 self.count)
420 self.count)
421
421
422 def processbundle(repo, unbundler, transactiongetter=None, op=None):
422 def processbundle(repo, unbundler, transactiongetter=None, op=None):
423 """This function process a bundle, apply effect to/from a repo
423 """This function process a bundle, apply effect to/from a repo
424
424
425 It iterates over each part then searches for and uses the proper handling
425 It iterates over each part then searches for and uses the proper handling
426 code to process the part. Parts are processed in order.
426 code to process the part. Parts are processed in order.
427
427
428 Unknown Mandatory part will abort the process.
428 Unknown Mandatory part will abort the process.
429
429
430 It is temporarily possible to provide a prebuilt bundleoperation to the
430 It is temporarily possible to provide a prebuilt bundleoperation to the
431 function. This is used to ensure output is properly propagated in case of
431 function. This is used to ensure output is properly propagated in case of
432 an error during the unbundling. This output capturing part will likely be
432 an error during the unbundling. This output capturing part will likely be
433 reworked and this ability will probably go away in the process.
433 reworked and this ability will probably go away in the process.
434 """
434 """
435 if op is None:
435 if op is None:
436 if transactiongetter is None:
436 if transactiongetter is None:
437 transactiongetter = _notransaction
437 transactiongetter = _notransaction
438 op = bundleoperation(repo, transactiongetter)
438 op = bundleoperation(repo, transactiongetter)
439 # todo:
439 # todo:
440 # - replace this is a init function soon.
440 # - replace this is a init function soon.
441 # - exception catching
441 # - exception catching
442 unbundler.params
442 unbundler.params
443 if repo.ui.debugflag:
443 if repo.ui.debugflag:
444 msg = ['bundle2-input-bundle:']
444 msg = ['bundle2-input-bundle:']
445 if unbundler.params:
445 if unbundler.params:
446 msg.append(' %i params' % len(unbundler.params))
446 msg.append(' %i params' % len(unbundler.params))
447 if op._gettransaction is None or op._gettransaction is _notransaction:
447 if op._gettransaction is None or op._gettransaction is _notransaction:
448 msg.append(' no-transaction')
448 msg.append(' no-transaction')
449 else:
449 else:
450 msg.append(' with-transaction')
450 msg.append(' with-transaction')
451 msg.append('\n')
451 msg.append('\n')
452 repo.ui.debug(''.join(msg))
452 repo.ui.debug(''.join(msg))
453
453
454 processparts(repo, op, unbundler)
454 processparts(repo, op, unbundler)
455
455
456 return op
456 return op
457
457
458 def processparts(repo, op, unbundler):
458 def processparts(repo, op, unbundler):
459 with partiterator(repo, op, unbundler) as parts:
459 with partiterator(repo, op, unbundler) as parts:
460 for part in parts:
460 for part in parts:
461 _processpart(op, part)
461 _processpart(op, part)
462
462
463 def _processchangegroup(op, cg, tr, source, url, **kwargs):
463 def _processchangegroup(op, cg, tr, source, url, **kwargs):
464 ret = cg.apply(op.repo, tr, source, url, **kwargs)
464 ret = cg.apply(op.repo, tr, source, url, **kwargs)
465 op.records.add('changegroup', {
465 op.records.add('changegroup', {
466 'return': ret,
466 'return': ret,
467 })
467 })
468 return ret
468 return ret
469
469
470 def _gethandler(op, part):
470 def _gethandler(op, part):
471 status = 'unknown' # used by debug output
471 status = 'unknown' # used by debug output
472 try:
472 try:
473 handler = parthandlermapping.get(part.type)
473 handler = parthandlermapping.get(part.type)
474 if handler is None:
474 if handler is None:
475 status = 'unsupported-type'
475 status = 'unsupported-type'
476 raise error.BundleUnknownFeatureError(parttype=part.type)
476 raise error.BundleUnknownFeatureError(parttype=part.type)
477 indebug(op.ui, 'found a handler for part %s' % part.type)
477 indebug(op.ui, 'found a handler for part %s' % part.type)
478 unknownparams = part.mandatorykeys - handler.params
478 unknownparams = part.mandatorykeys - handler.params
479 if unknownparams:
479 if unknownparams:
480 unknownparams = list(unknownparams)
480 unknownparams = list(unknownparams)
481 unknownparams.sort()
481 unknownparams.sort()
482 status = 'unsupported-params (%s)' % ', '.join(unknownparams)
482 status = 'unsupported-params (%s)' % ', '.join(unknownparams)
483 raise error.BundleUnknownFeatureError(parttype=part.type,
483 raise error.BundleUnknownFeatureError(parttype=part.type,
484 params=unknownparams)
484 params=unknownparams)
485 status = 'supported'
485 status = 'supported'
486 except error.BundleUnknownFeatureError as exc:
486 except error.BundleUnknownFeatureError as exc:
487 if part.mandatory: # mandatory parts
487 if part.mandatory: # mandatory parts
488 raise
488 raise
489 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
489 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
490 return # skip to part processing
490 return # skip to part processing
491 finally:
491 finally:
492 if op.ui.debugflag:
492 if op.ui.debugflag:
493 msg = ['bundle2-input-part: "%s"' % part.type]
493 msg = ['bundle2-input-part: "%s"' % part.type]
494 if not part.mandatory:
494 if not part.mandatory:
495 msg.append(' (advisory)')
495 msg.append(' (advisory)')
496 nbmp = len(part.mandatorykeys)
496 nbmp = len(part.mandatorykeys)
497 nbap = len(part.params) - nbmp
497 nbap = len(part.params) - nbmp
498 if nbmp or nbap:
498 if nbmp or nbap:
499 msg.append(' (params:')
499 msg.append(' (params:')
500 if nbmp:
500 if nbmp:
501 msg.append(' %i mandatory' % nbmp)
501 msg.append(' %i mandatory' % nbmp)
502 if nbap:
502 if nbap:
503 msg.append(' %i advisory' % nbmp)
503 msg.append(' %i advisory' % nbmp)
504 msg.append(')')
504 msg.append(')')
505 msg.append(' %s\n' % status)
505 msg.append(' %s\n' % status)
506 op.ui.debug(''.join(msg))
506 op.ui.debug(''.join(msg))
507
507
508 return handler
508 return handler
509
509
510 def _processpart(op, part):
510 def _processpart(op, part):
511 """process a single part from a bundle
511 """process a single part from a bundle
512
512
513 The part is guaranteed to have been fully consumed when the function exits
513 The part is guaranteed to have been fully consumed when the function exits
514 (even if an exception is raised)."""
514 (even if an exception is raised)."""
515 handler = _gethandler(op, part)
515 handler = _gethandler(op, part)
516 if handler is None:
516 if handler is None:
517 return
517 return
518
518
519 # handler is called outside the above try block so that we don't
519 # handler is called outside the above try block so that we don't
520 # risk catching KeyErrors from anything other than the
520 # risk catching KeyErrors from anything other than the
521 # parthandlermapping lookup (any KeyError raised by handler()
521 # parthandlermapping lookup (any KeyError raised by handler()
522 # itself represents a defect of a different variety).
522 # itself represents a defect of a different variety).
523 output = None
523 output = None
524 if op.captureoutput and op.reply is not None:
524 if op.captureoutput and op.reply is not None:
525 op.ui.pushbuffer(error=True, subproc=True)
525 op.ui.pushbuffer(error=True, subproc=True)
526 output = ''
526 output = ''
527 try:
527 try:
528 handler(op, part)
528 handler(op, part)
529 finally:
529 finally:
530 if output is not None:
530 if output is not None:
531 output = op.ui.popbuffer()
531 output = op.ui.popbuffer()
532 if output:
532 if output:
533 outpart = op.reply.newpart('output', data=output,
533 outpart = op.reply.newpart('output', data=output,
534 mandatory=False)
534 mandatory=False)
535 outpart.addparam(
535 outpart.addparam(
536 'in-reply-to', pycompat.bytestr(part.id), mandatory=False)
536 'in-reply-to', pycompat.bytestr(part.id), mandatory=False)
537
537
538 def decodecaps(blob):
538 def decodecaps(blob):
539 """decode a bundle2 caps bytes blob into a dictionary
539 """decode a bundle2 caps bytes blob into a dictionary
540
540
541 The blob is a list of capabilities (one per line)
541 The blob is a list of capabilities (one per line)
542 Capabilities may have values using a line of the form::
542 Capabilities may have values using a line of the form::
543
543
544 capability=value1,value2,value3
544 capability=value1,value2,value3
545
545
546 The values are always a list."""
546 The values are always a list."""
547 caps = {}
547 caps = {}
548 for line in blob.splitlines():
548 for line in blob.splitlines():
549 if not line:
549 if not line:
550 continue
550 continue
551 if '=' not in line:
551 if '=' not in line:
552 key, vals = line, ()
552 key, vals = line, ()
553 else:
553 else:
554 key, vals = line.split('=', 1)
554 key, vals = line.split('=', 1)
555 vals = vals.split(',')
555 vals = vals.split(',')
556 key = urlreq.unquote(key)
556 key = urlreq.unquote(key)
557 vals = [urlreq.unquote(v) for v in vals]
557 vals = [urlreq.unquote(v) for v in vals]
558 caps[key] = vals
558 caps[key] = vals
559 return caps
559 return caps
560
560
561 def encodecaps(caps):
561 def encodecaps(caps):
562 """encode a bundle2 caps dictionary into a bytes blob"""
562 """encode a bundle2 caps dictionary into a bytes blob"""
563 chunks = []
563 chunks = []
564 for ca in sorted(caps):
564 for ca in sorted(caps):
565 vals = caps[ca]
565 vals = caps[ca]
566 ca = urlreq.quote(ca)
566 ca = urlreq.quote(ca)
567 vals = [urlreq.quote(v) for v in vals]
567 vals = [urlreq.quote(v) for v in vals]
568 if vals:
568 if vals:
569 ca = "%s=%s" % (ca, ','.join(vals))
569 ca = "%s=%s" % (ca, ','.join(vals))
570 chunks.append(ca)
570 chunks.append(ca)
571 return '\n'.join(chunks)
571 return '\n'.join(chunks)
572
572
573 bundletypes = {
573 bundletypes = {
574 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
574 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
575 # since the unification ssh accepts a header but there
575 # since the unification ssh accepts a header but there
576 # is no capability signaling it.
576 # is no capability signaling it.
577 "HG20": (), # special-cased below
577 "HG20": (), # special-cased below
578 "HG10UN": ("HG10UN", 'UN'),
578 "HG10UN": ("HG10UN", 'UN'),
579 "HG10BZ": ("HG10", 'BZ'),
579 "HG10BZ": ("HG10", 'BZ'),
580 "HG10GZ": ("HG10GZ", 'GZ'),
580 "HG10GZ": ("HG10GZ", 'GZ'),
581 }
581 }
582
582
583 # hgweb uses this list to communicate its preferred type
583 # hgweb uses this list to communicate its preferred type
584 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
584 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
585
585
586 class bundle20(object):
586 class bundle20(object):
587 """represent an outgoing bundle2 container
587 """represent an outgoing bundle2 container
588
588
589 Use the `addparam` method to add stream level parameter. and `newpart` to
589 Use the `addparam` method to add stream level parameter. and `newpart` to
590 populate it. Then call `getchunks` to retrieve all the binary chunks of
590 populate it. Then call `getchunks` to retrieve all the binary chunks of
591 data that compose the bundle2 container."""
591 data that compose the bundle2 container."""
592
592
593 _magicstring = 'HG20'
593 _magicstring = 'HG20'
594
594
595 def __init__(self, ui, capabilities=()):
595 def __init__(self, ui, capabilities=()):
596 self.ui = ui
596 self.ui = ui
597 self._params = []
597 self._params = []
598 self._parts = []
598 self._parts = []
599 self.capabilities = dict(capabilities)
599 self.capabilities = dict(capabilities)
600 self._compengine = util.compengines.forbundletype('UN')
600 self._compengine = util.compengines.forbundletype('UN')
601 self._compopts = None
601 self._compopts = None
602 # If compression is being handled by a consumer of the raw
602 # If compression is being handled by a consumer of the raw
603 # data (e.g. the wire protocol), unsetting this flag tells
603 # data (e.g. the wire protocol), unsetting this flag tells
604 # consumers that the bundle is best left uncompressed.
604 # consumers that the bundle is best left uncompressed.
605 self.prefercompressed = True
605 self.prefercompressed = True
606
606
607 def setcompression(self, alg, compopts=None):
607 def setcompression(self, alg, compopts=None):
608 """setup core part compression to <alg>"""
608 """setup core part compression to <alg>"""
609 if alg in (None, 'UN'):
609 if alg in (None, 'UN'):
610 return
610 return
611 assert not any(n.lower() == 'compression' for n, v in self._params)
611 assert not any(n.lower() == 'compression' for n, v in self._params)
612 self.addparam('Compression', alg)
612 self.addparam('Compression', alg)
613 self._compengine = util.compengines.forbundletype(alg)
613 self._compengine = util.compengines.forbundletype(alg)
614 self._compopts = compopts
614 self._compopts = compopts
615
615
616 @property
616 @property
617 def nbparts(self):
617 def nbparts(self):
618 """total number of parts added to the bundler"""
618 """total number of parts added to the bundler"""
619 return len(self._parts)
619 return len(self._parts)
620
620
621 # methods used to defines the bundle2 content
621 # methods used to defines the bundle2 content
622 def addparam(self, name, value=None):
622 def addparam(self, name, value=None):
623 """add a stream level parameter"""
623 """add a stream level parameter"""
624 if not name:
624 if not name:
625 raise ValueError(r'empty parameter name')
625 raise ValueError(r'empty parameter name')
626 if name[0:1] not in pycompat.bytestr(string.ascii_letters):
626 if name[0:1] not in pycompat.bytestr(string.ascii_letters):
627 raise ValueError(r'non letter first character: %s' % name)
627 raise ValueError(r'non letter first character: %s' % name)
628 self._params.append((name, value))
628 self._params.append((name, value))
629
629
630 def addpart(self, part):
630 def addpart(self, part):
631 """add a new part to the bundle2 container
631 """add a new part to the bundle2 container
632
632
633 Parts contains the actual applicative payload."""
633 Parts contains the actual applicative payload."""
634 assert part.id is None
634 assert part.id is None
635 part.id = len(self._parts) # very cheap counter
635 part.id = len(self._parts) # very cheap counter
636 self._parts.append(part)
636 self._parts.append(part)
637
637
638 def newpart(self, typeid, *args, **kwargs):
638 def newpart(self, typeid, *args, **kwargs):
639 """create a new part and add it to the containers
639 """create a new part and add it to the containers
640
640
641 As the part is directly added to the containers. For now, this means
641 As the part is directly added to the containers. For now, this means
642 that any failure to properly initialize the part after calling
642 that any failure to properly initialize the part after calling
643 ``newpart`` should result in a failure of the whole bundling process.
643 ``newpart`` should result in a failure of the whole bundling process.
644
644
645 You can still fall back to manually create and add if you need better
645 You can still fall back to manually create and add if you need better
646 control."""
646 control."""
647 part = bundlepart(typeid, *args, **kwargs)
647 part = bundlepart(typeid, *args, **kwargs)
648 self.addpart(part)
648 self.addpart(part)
649 return part
649 return part
650
650
651 # methods used to generate the bundle2 stream
651 # methods used to generate the bundle2 stream
652 def getchunks(self):
652 def getchunks(self):
653 if self.ui.debugflag:
653 if self.ui.debugflag:
654 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
654 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
655 if self._params:
655 if self._params:
656 msg.append(' (%i params)' % len(self._params))
656 msg.append(' (%i params)' % len(self._params))
657 msg.append(' %i parts total\n' % len(self._parts))
657 msg.append(' %i parts total\n' % len(self._parts))
658 self.ui.debug(''.join(msg))
658 self.ui.debug(''.join(msg))
659 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
659 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
660 yield self._magicstring
660 yield self._magicstring
661 param = self._paramchunk()
661 param = self._paramchunk()
662 outdebug(self.ui, 'bundle parameter: %s' % param)
662 outdebug(self.ui, 'bundle parameter: %s' % param)
663 yield _pack(_fstreamparamsize, len(param))
663 yield _pack(_fstreamparamsize, len(param))
664 if param:
664 if param:
665 yield param
665 yield param
666 for chunk in self._compengine.compressstream(self._getcorechunk(),
666 for chunk in self._compengine.compressstream(self._getcorechunk(),
667 self._compopts):
667 self._compopts):
668 yield chunk
668 yield chunk
669
669
670 def _paramchunk(self):
670 def _paramchunk(self):
671 """return a encoded version of all stream parameters"""
671 """return a encoded version of all stream parameters"""
672 blocks = []
672 blocks = []
673 for par, value in self._params:
673 for par, value in self._params:
674 par = urlreq.quote(par)
674 par = urlreq.quote(par)
675 if value is not None:
675 if value is not None:
676 value = urlreq.quote(value)
676 value = urlreq.quote(value)
677 par = '%s=%s' % (par, value)
677 par = '%s=%s' % (par, value)
678 blocks.append(par)
678 blocks.append(par)
679 return ' '.join(blocks)
679 return ' '.join(blocks)
680
680
681 def _getcorechunk(self):
681 def _getcorechunk(self):
682 """yield chunk for the core part of the bundle
682 """yield chunk for the core part of the bundle
683
683
684 (all but headers and parameters)"""
684 (all but headers and parameters)"""
685 outdebug(self.ui, 'start of parts')
685 outdebug(self.ui, 'start of parts')
686 for part in self._parts:
686 for part in self._parts:
687 outdebug(self.ui, 'bundle part: "%s"' % part.type)
687 outdebug(self.ui, 'bundle part: "%s"' % part.type)
688 for chunk in part.getchunks(ui=self.ui):
688 for chunk in part.getchunks(ui=self.ui):
689 yield chunk
689 yield chunk
690 outdebug(self.ui, 'end of bundle')
690 outdebug(self.ui, 'end of bundle')
691 yield _pack(_fpartheadersize, 0)
691 yield _pack(_fpartheadersize, 0)
692
692
693
693
694 def salvageoutput(self):
694 def salvageoutput(self):
695 """return a list with a copy of all output parts in the bundle
695 """return a list with a copy of all output parts in the bundle
696
696
697 This is meant to be used during error handling to make sure we preserve
697 This is meant to be used during error handling to make sure we preserve
698 server output"""
698 server output"""
699 salvaged = []
699 salvaged = []
700 for part in self._parts:
700 for part in self._parts:
701 if part.type.startswith('output'):
701 if part.type.startswith('output'):
702 salvaged.append(part.copy())
702 salvaged.append(part.copy())
703 return salvaged
703 return salvaged
704
704
705
705
706 class unpackermixin(object):
706 class unpackermixin(object):
707 """A mixin to extract bytes and struct data from a stream"""
707 """A mixin to extract bytes and struct data from a stream"""
708
708
709 def __init__(self, fp):
709 def __init__(self, fp):
710 self._fp = fp
710 self._fp = fp
711
711
712 def _unpack(self, format):
712 def _unpack(self, format):
713 """unpack this struct format from the stream
713 """unpack this struct format from the stream
714
714
715 This method is meant for internal usage by the bundle2 protocol only.
715 This method is meant for internal usage by the bundle2 protocol only.
716 They directly manipulate the low level stream including bundle2 level
716 They directly manipulate the low level stream including bundle2 level
717 instruction.
717 instruction.
718
718
719 Do not use it to implement higher-level logic or methods."""
719 Do not use it to implement higher-level logic or methods."""
720 data = self._readexact(struct.calcsize(format))
720 data = self._readexact(struct.calcsize(format))
721 return _unpack(format, data)
721 return _unpack(format, data)
722
722
723 def _readexact(self, size):
723 def _readexact(self, size):
724 """read exactly <size> bytes from the stream
724 """read exactly <size> bytes from the stream
725
725
726 This method is meant for internal usage by the bundle2 protocol only.
726 This method is meant for internal usage by the bundle2 protocol only.
727 They directly manipulate the low level stream including bundle2 level
727 They directly manipulate the low level stream including bundle2 level
728 instruction.
728 instruction.
729
729
730 Do not use it to implement higher-level logic or methods."""
730 Do not use it to implement higher-level logic or methods."""
731 return changegroup.readexactly(self._fp, size)
731 return changegroup.readexactly(self._fp, size)
732
732
733 def getunbundler(ui, fp, magicstring=None):
733 def getunbundler(ui, fp, magicstring=None):
734 """return a valid unbundler object for a given magicstring"""
734 """return a valid unbundler object for a given magicstring"""
735 if magicstring is None:
735 if magicstring is None:
736 magicstring = changegroup.readexactly(fp, 4)
736 magicstring = changegroup.readexactly(fp, 4)
737 magic, version = magicstring[0:2], magicstring[2:4]
737 magic, version = magicstring[0:2], magicstring[2:4]
738 if magic != 'HG':
738 if magic != 'HG':
739 ui.debug(
739 ui.debug(
740 "error: invalid magic: %r (version %r), should be 'HG'\n"
740 "error: invalid magic: %r (version %r), should be 'HG'\n"
741 % (magic, version))
741 % (magic, version))
742 raise error.Abort(_('not a Mercurial bundle'))
742 raise error.Abort(_('not a Mercurial bundle'))
743 unbundlerclass = formatmap.get(version)
743 unbundlerclass = formatmap.get(version)
744 if unbundlerclass is None:
744 if unbundlerclass is None:
745 raise error.Abort(_('unknown bundle version %s') % version)
745 raise error.Abort(_('unknown bundle version %s') % version)
746 unbundler = unbundlerclass(ui, fp)
746 unbundler = unbundlerclass(ui, fp)
747 indebug(ui, 'start processing of %s stream' % magicstring)
747 indebug(ui, 'start processing of %s stream' % magicstring)
748 return unbundler
748 return unbundler
749
749
750 class unbundle20(unpackermixin):
750 class unbundle20(unpackermixin):
751 """interpret a bundle2 stream
751 """interpret a bundle2 stream
752
752
753 This class is fed with a binary stream and yields parts through its
753 This class is fed with a binary stream and yields parts through its
754 `iterparts` methods."""
754 `iterparts` methods."""
755
755
756 _magicstring = 'HG20'
756 _magicstring = 'HG20'
757
757
758 def __init__(self, ui, fp):
758 def __init__(self, ui, fp):
759 """If header is specified, we do not read it out of the stream."""
759 """If header is specified, we do not read it out of the stream."""
760 self.ui = ui
760 self.ui = ui
761 self._compengine = util.compengines.forbundletype('UN')
761 self._compengine = util.compengines.forbundletype('UN')
762 self._compressed = None
762 self._compressed = None
763 super(unbundle20, self).__init__(fp)
763 super(unbundle20, self).__init__(fp)
764
764
765 @util.propertycache
765 @util.propertycache
766 def params(self):
766 def params(self):
767 """dictionary of stream level parameters"""
767 """dictionary of stream level parameters"""
768 indebug(self.ui, 'reading bundle2 stream parameters')
768 indebug(self.ui, 'reading bundle2 stream parameters')
769 params = {}
769 params = {}
770 paramssize = self._unpack(_fstreamparamsize)[0]
770 paramssize = self._unpack(_fstreamparamsize)[0]
771 if paramssize < 0:
771 if paramssize < 0:
772 raise error.BundleValueError('negative bundle param size: %i'
772 raise error.BundleValueError('negative bundle param size: %i'
773 % paramssize)
773 % paramssize)
774 if paramssize:
774 if paramssize:
775 params = self._readexact(paramssize)
775 params = self._readexact(paramssize)
776 params = self._processallparams(params)
776 params = self._processallparams(params)
777 return params
777 return params
778
778
779 def _processallparams(self, paramsblock):
779 def _processallparams(self, paramsblock):
780 """"""
780 """"""
781 params = util.sortdict()
781 params = util.sortdict()
782 for p in paramsblock.split(' '):
782 for p in paramsblock.split(' '):
783 p = p.split('=', 1)
783 p = p.split('=', 1)
784 p = [urlreq.unquote(i) for i in p]
784 p = [urlreq.unquote(i) for i in p]
785 if len(p) < 2:
785 if len(p) < 2:
786 p.append(None)
786 p.append(None)
787 self._processparam(*p)
787 self._processparam(*p)
788 params[p[0]] = p[1]
788 params[p[0]] = p[1]
789 return params
789 return params
790
790
791
791
792 def _processparam(self, name, value):
792 def _processparam(self, name, value):
793 """process a parameter, applying its effect if needed
793 """process a parameter, applying its effect if needed
794
794
795 Parameter starting with a lower case letter are advisory and will be
795 Parameter starting with a lower case letter are advisory and will be
796 ignored when unknown. Those starting with an upper case letter are
796 ignored when unknown. Those starting with an upper case letter are
797 mandatory and will this function will raise a KeyError when unknown.
797 mandatory and will this function will raise a KeyError when unknown.
798
798
799 Note: no option are currently supported. Any input will be either
799 Note: no option are currently supported. Any input will be either
800 ignored or failing.
800 ignored or failing.
801 """
801 """
802 if not name:
802 if not name:
803 raise ValueError(r'empty parameter name')
803 raise ValueError(r'empty parameter name')
804 if name[0:1] not in pycompat.bytestr(string.ascii_letters):
804 if name[0:1] not in pycompat.bytestr(string.ascii_letters):
805 raise ValueError(r'non letter first character: %s' % name)
805 raise ValueError(r'non letter first character: %s' % name)
806 try:
806 try:
807 handler = b2streamparamsmap[name.lower()]
807 handler = b2streamparamsmap[name.lower()]
808 except KeyError:
808 except KeyError:
809 if name[0:1].islower():
809 if name[0:1].islower():
810 indebug(self.ui, "ignoring unknown parameter %s" % name)
810 indebug(self.ui, "ignoring unknown parameter %s" % name)
811 else:
811 else:
812 raise error.BundleUnknownFeatureError(params=(name,))
812 raise error.BundleUnknownFeatureError(params=(name,))
813 else:
813 else:
814 handler(self, name, value)
814 handler(self, name, value)
815
815
816 def _forwardchunks(self):
816 def _forwardchunks(self):
817 """utility to transfer a bundle2 as binary
817 """utility to transfer a bundle2 as binary
818
818
819 This is made necessary by the fact the 'getbundle' command over 'ssh'
819 This is made necessary by the fact the 'getbundle' command over 'ssh'
820 have no way to know then the reply end, relying on the bundle to be
820 have no way to know then the reply end, relying on the bundle to be
821 interpreted to know its end. This is terrible and we are sorry, but we
821 interpreted to know its end. This is terrible and we are sorry, but we
822 needed to move forward to get general delta enabled.
822 needed to move forward to get general delta enabled.
823 """
823 """
824 yield self._magicstring
824 yield self._magicstring
825 assert 'params' not in vars(self)
825 assert 'params' not in vars(self)
826 paramssize = self._unpack(_fstreamparamsize)[0]
826 paramssize = self._unpack(_fstreamparamsize)[0]
827 if paramssize < 0:
827 if paramssize < 0:
828 raise error.BundleValueError('negative bundle param size: %i'
828 raise error.BundleValueError('negative bundle param size: %i'
829 % paramssize)
829 % paramssize)
830 yield _pack(_fstreamparamsize, paramssize)
830 yield _pack(_fstreamparamsize, paramssize)
831 if paramssize:
831 if paramssize:
832 params = self._readexact(paramssize)
832 params = self._readexact(paramssize)
833 self._processallparams(params)
833 self._processallparams(params)
834 yield params
834 yield params
835 assert self._compengine.bundletype == 'UN'
835 assert self._compengine.bundletype == 'UN'
836 # From there, payload might need to be decompressed
836 # From there, payload might need to be decompressed
837 self._fp = self._compengine.decompressorreader(self._fp)
837 self._fp = self._compengine.decompressorreader(self._fp)
838 emptycount = 0
838 emptycount = 0
839 while emptycount < 2:
839 while emptycount < 2:
840 # so we can brainlessly loop
840 # so we can brainlessly loop
841 assert _fpartheadersize == _fpayloadsize
841 assert _fpartheadersize == _fpayloadsize
842 size = self._unpack(_fpartheadersize)[0]
842 size = self._unpack(_fpartheadersize)[0]
843 yield _pack(_fpartheadersize, size)
843 yield _pack(_fpartheadersize, size)
844 if size:
844 if size:
845 emptycount = 0
845 emptycount = 0
846 else:
846 else:
847 emptycount += 1
847 emptycount += 1
848 continue
848 continue
849 if size == flaginterrupt:
849 if size == flaginterrupt:
850 continue
850 continue
851 elif size < 0:
851 elif size < 0:
852 raise error.BundleValueError('negative chunk size: %i')
852 raise error.BundleValueError('negative chunk size: %i')
853 yield self._readexact(size)
853 yield self._readexact(size)
854
854
855
855
856 def iterparts(self, seekable=False):
856 def iterparts(self, seekable=False):
857 """yield all parts contained in the stream"""
857 """yield all parts contained in the stream"""
858 cls = seekableunbundlepart if seekable else unbundlepart
858 cls = seekableunbundlepart if seekable else unbundlepart
859 # make sure param have been loaded
859 # make sure param have been loaded
860 self.params
860 self.params
861 # From there, payload need to be decompressed
861 # From there, payload need to be decompressed
862 self._fp = self._compengine.decompressorreader(self._fp)
862 self._fp = self._compengine.decompressorreader(self._fp)
863 indebug(self.ui, 'start extraction of bundle2 parts')
863 indebug(self.ui, 'start extraction of bundle2 parts')
864 headerblock = self._readpartheader()
864 headerblock = self._readpartheader()
865 while headerblock is not None:
865 while headerblock is not None:
866 part = cls(self.ui, headerblock, self._fp)
866 part = cls(self.ui, headerblock, self._fp)
867 yield part
867 yield part
868 # Ensure part is fully consumed so we can start reading the next
868 # Ensure part is fully consumed so we can start reading the next
869 # part.
869 # part.
870 part.consume()
870 part.consume()
871
871
872 headerblock = self._readpartheader()
872 headerblock = self._readpartheader()
873 indebug(self.ui, 'end of bundle2 stream')
873 indebug(self.ui, 'end of bundle2 stream')
874
874
875 def _readpartheader(self):
875 def _readpartheader(self):
876 """reads a part header size and return the bytes blob
876 """reads a part header size and return the bytes blob
877
877
878 returns None if empty"""
878 returns None if empty"""
879 headersize = self._unpack(_fpartheadersize)[0]
879 headersize = self._unpack(_fpartheadersize)[0]
880 if headersize < 0:
880 if headersize < 0:
881 raise error.BundleValueError('negative part header size: %i'
881 raise error.BundleValueError('negative part header size: %i'
882 % headersize)
882 % headersize)
883 indebug(self.ui, 'part header size: %i' % headersize)
883 indebug(self.ui, 'part header size: %i' % headersize)
884 if headersize:
884 if headersize:
885 return self._readexact(headersize)
885 return self._readexact(headersize)
886 return None
886 return None
887
887
888 def compressed(self):
888 def compressed(self):
889 self.params # load params
889 self.params # load params
890 return self._compressed
890 return self._compressed
891
891
892 def close(self):
892 def close(self):
893 """close underlying file"""
893 """close underlying file"""
894 if util.safehasattr(self._fp, 'close'):
894 if util.safehasattr(self._fp, 'close'):
895 return self._fp.close()
895 return self._fp.close()
896
896
897 formatmap = {'20': unbundle20}
897 formatmap = {'20': unbundle20}
898
898
899 b2streamparamsmap = {}
899 b2streamparamsmap = {}
900
900
901 def b2streamparamhandler(name):
901 def b2streamparamhandler(name):
902 """register a handler for a stream level parameter"""
902 """register a handler for a stream level parameter"""
903 def decorator(func):
903 def decorator(func):
904 assert name not in formatmap
904 assert name not in formatmap
905 b2streamparamsmap[name] = func
905 b2streamparamsmap[name] = func
906 return func
906 return func
907 return decorator
907 return decorator
908
908
909 @b2streamparamhandler('compression')
909 @b2streamparamhandler('compression')
910 def processcompression(unbundler, param, value):
910 def processcompression(unbundler, param, value):
911 """read compression parameter and install payload decompression"""
911 """read compression parameter and install payload decompression"""
912 if value not in util.compengines.supportedbundletypes:
912 if value not in util.compengines.supportedbundletypes:
913 raise error.BundleUnknownFeatureError(params=(param,),
913 raise error.BundleUnknownFeatureError(params=(param,),
914 values=(value,))
914 values=(value,))
915 unbundler._compengine = util.compengines.forbundletype(value)
915 unbundler._compengine = util.compengines.forbundletype(value)
916 if value is not None:
916 if value is not None:
917 unbundler._compressed = True
917 unbundler._compressed = True
918
918
919 class bundlepart(object):
919 class bundlepart(object):
920 """A bundle2 part contains application level payload
920 """A bundle2 part contains application level payload
921
921
922 The part `type` is used to route the part to the application level
922 The part `type` is used to route the part to the application level
923 handler.
923 handler.
924
924
925 The part payload is contained in ``part.data``. It could be raw bytes or a
925 The part payload is contained in ``part.data``. It could be raw bytes or a
926 generator of byte chunks.
926 generator of byte chunks.
927
927
928 You can add parameters to the part using the ``addparam`` method.
928 You can add parameters to the part using the ``addparam`` method.
929 Parameters can be either mandatory (default) or advisory. Remote side
929 Parameters can be either mandatory (default) or advisory. Remote side
930 should be able to safely ignore the advisory ones.
930 should be able to safely ignore the advisory ones.
931
931
932 Both data and parameters cannot be modified after the generation has begun.
932 Both data and parameters cannot be modified after the generation has begun.
933 """
933 """
934
934
935 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
935 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
936 data='', mandatory=True):
936 data='', mandatory=True):
937 validateparttype(parttype)
937 validateparttype(parttype)
938 self.id = None
938 self.id = None
939 self.type = parttype
939 self.type = parttype
940 self._data = data
940 self._data = data
941 self._mandatoryparams = list(mandatoryparams)
941 self._mandatoryparams = list(mandatoryparams)
942 self._advisoryparams = list(advisoryparams)
942 self._advisoryparams = list(advisoryparams)
943 # checking for duplicated entries
943 # checking for duplicated entries
944 self._seenparams = set()
944 self._seenparams = set()
945 for pname, __ in self._mandatoryparams + self._advisoryparams:
945 for pname, __ in self._mandatoryparams + self._advisoryparams:
946 if pname in self._seenparams:
946 if pname in self._seenparams:
947 raise error.ProgrammingError('duplicated params: %s' % pname)
947 raise error.ProgrammingError('duplicated params: %s' % pname)
948 self._seenparams.add(pname)
948 self._seenparams.add(pname)
949 # status of the part's generation:
949 # status of the part's generation:
950 # - None: not started,
950 # - None: not started,
951 # - False: currently generated,
951 # - False: currently generated,
952 # - True: generation done.
952 # - True: generation done.
953 self._generated = None
953 self._generated = None
954 self.mandatory = mandatory
954 self.mandatory = mandatory
955
955
956 def __repr__(self):
956 def __repr__(self):
957 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
957 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
958 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
958 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
959 % (cls, id(self), self.id, self.type, self.mandatory))
959 % (cls, id(self), self.id, self.type, self.mandatory))
960
960
961 def copy(self):
961 def copy(self):
962 """return a copy of the part
962 """return a copy of the part
963
963
964 The new part have the very same content but no partid assigned yet.
964 The new part have the very same content but no partid assigned yet.
965 Parts with generated data cannot be copied."""
965 Parts with generated data cannot be copied."""
966 assert not util.safehasattr(self.data, 'next')
966 assert not util.safehasattr(self.data, 'next')
967 return self.__class__(self.type, self._mandatoryparams,
967 return self.__class__(self.type, self._mandatoryparams,
968 self._advisoryparams, self._data, self.mandatory)
968 self._advisoryparams, self._data, self.mandatory)
969
969
970 # methods used to defines the part content
970 # methods used to defines the part content
971 @property
971 @property
972 def data(self):
972 def data(self):
973 return self._data
973 return self._data
974
974
975 @data.setter
975 @data.setter
976 def data(self, data):
976 def data(self, data):
977 if self._generated is not None:
977 if self._generated is not None:
978 raise error.ReadOnlyPartError('part is being generated')
978 raise error.ReadOnlyPartError('part is being generated')
979 self._data = data
979 self._data = data
980
980
981 @property
981 @property
982 def mandatoryparams(self):
982 def mandatoryparams(self):
983 # make it an immutable tuple to force people through ``addparam``
983 # make it an immutable tuple to force people through ``addparam``
984 return tuple(self._mandatoryparams)
984 return tuple(self._mandatoryparams)
985
985
986 @property
986 @property
987 def advisoryparams(self):
987 def advisoryparams(self):
988 # make it an immutable tuple to force people through ``addparam``
988 # make it an immutable tuple to force people through ``addparam``
989 return tuple(self._advisoryparams)
989 return tuple(self._advisoryparams)
990
990
991 def addparam(self, name, value='', mandatory=True):
991 def addparam(self, name, value='', mandatory=True):
992 """add a parameter to the part
992 """add a parameter to the part
993
993
994 If 'mandatory' is set to True, the remote handler must claim support
994 If 'mandatory' is set to True, the remote handler must claim support
995 for this parameter or the unbundling will be aborted.
995 for this parameter or the unbundling will be aborted.
996
996
997 The 'name' and 'value' cannot exceed 255 bytes each.
997 The 'name' and 'value' cannot exceed 255 bytes each.
998 """
998 """
999 if self._generated is not None:
999 if self._generated is not None:
1000 raise error.ReadOnlyPartError('part is being generated')
1000 raise error.ReadOnlyPartError('part is being generated')
1001 if name in self._seenparams:
1001 if name in self._seenparams:
1002 raise ValueError('duplicated params: %s' % name)
1002 raise ValueError('duplicated params: %s' % name)
1003 self._seenparams.add(name)
1003 self._seenparams.add(name)
1004 params = self._advisoryparams
1004 params = self._advisoryparams
1005 if mandatory:
1005 if mandatory:
1006 params = self._mandatoryparams
1006 params = self._mandatoryparams
1007 params.append((name, value))
1007 params.append((name, value))
1008
1008
1009 # methods used to generates the bundle2 stream
1009 # methods used to generates the bundle2 stream
1010 def getchunks(self, ui):
1010 def getchunks(self, ui):
1011 if self._generated is not None:
1011 if self._generated is not None:
1012 raise error.ProgrammingError('part can only be consumed once')
1012 raise error.ProgrammingError('part can only be consumed once')
1013 self._generated = False
1013 self._generated = False
1014
1014
1015 if ui.debugflag:
1015 if ui.debugflag:
1016 msg = ['bundle2-output-part: "%s"' % self.type]
1016 msg = ['bundle2-output-part: "%s"' % self.type]
1017 if not self.mandatory:
1017 if not self.mandatory:
1018 msg.append(' (advisory)')
1018 msg.append(' (advisory)')
1019 nbmp = len(self.mandatoryparams)
1019 nbmp = len(self.mandatoryparams)
1020 nbap = len(self.advisoryparams)
1020 nbap = len(self.advisoryparams)
1021 if nbmp or nbap:
1021 if nbmp or nbap:
1022 msg.append(' (params:')
1022 msg.append(' (params:')
1023 if nbmp:
1023 if nbmp:
1024 msg.append(' %i mandatory' % nbmp)
1024 msg.append(' %i mandatory' % nbmp)
1025 if nbap:
1025 if nbap:
1026 msg.append(' %i advisory' % nbmp)
1026 msg.append(' %i advisory' % nbmp)
1027 msg.append(')')
1027 msg.append(')')
1028 if not self.data:
1028 if not self.data:
1029 msg.append(' empty payload')
1029 msg.append(' empty payload')
1030 elif (util.safehasattr(self.data, 'next')
1030 elif (util.safehasattr(self.data, 'next')
1031 or util.safehasattr(self.data, '__next__')):
1031 or util.safehasattr(self.data, '__next__')):
1032 msg.append(' streamed payload')
1032 msg.append(' streamed payload')
1033 else:
1033 else:
1034 msg.append(' %i bytes payload' % len(self.data))
1034 msg.append(' %i bytes payload' % len(self.data))
1035 msg.append('\n')
1035 msg.append('\n')
1036 ui.debug(''.join(msg))
1036 ui.debug(''.join(msg))
1037
1037
1038 #### header
1038 #### header
1039 if self.mandatory:
1039 if self.mandatory:
1040 parttype = self.type.upper()
1040 parttype = self.type.upper()
1041 else:
1041 else:
1042 parttype = self.type.lower()
1042 parttype = self.type.lower()
1043 outdebug(ui, 'part %s: "%s"' % (pycompat.bytestr(self.id), parttype))
1043 outdebug(ui, 'part %s: "%s"' % (pycompat.bytestr(self.id), parttype))
1044 ## parttype
1044 ## parttype
1045 header = [_pack(_fparttypesize, len(parttype)),
1045 header = [_pack(_fparttypesize, len(parttype)),
1046 parttype, _pack(_fpartid, self.id),
1046 parttype, _pack(_fpartid, self.id),
1047 ]
1047 ]
1048 ## parameters
1048 ## parameters
1049 # count
1049 # count
1050 manpar = self.mandatoryparams
1050 manpar = self.mandatoryparams
1051 advpar = self.advisoryparams
1051 advpar = self.advisoryparams
1052 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
1052 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
1053 # size
1053 # size
1054 parsizes = []
1054 parsizes = []
1055 for key, value in manpar:
1055 for key, value in manpar:
1056 parsizes.append(len(key))
1056 parsizes.append(len(key))
1057 parsizes.append(len(value))
1057 parsizes.append(len(value))
1058 for key, value in advpar:
1058 for key, value in advpar:
1059 parsizes.append(len(key))
1059 parsizes.append(len(key))
1060 parsizes.append(len(value))
1060 parsizes.append(len(value))
1061 paramsizes = _pack(_makefpartparamsizes(len(parsizes) // 2), *parsizes)
1061 paramsizes = _pack(_makefpartparamsizes(len(parsizes) // 2), *parsizes)
1062 header.append(paramsizes)
1062 header.append(paramsizes)
1063 # key, value
1063 # key, value
1064 for key, value in manpar:
1064 for key, value in manpar:
1065 header.append(key)
1065 header.append(key)
1066 header.append(value)
1066 header.append(value)
1067 for key, value in advpar:
1067 for key, value in advpar:
1068 header.append(key)
1068 header.append(key)
1069 header.append(value)
1069 header.append(value)
1070 ## finalize header
1070 ## finalize header
1071 try:
1071 try:
1072 headerchunk = ''.join(header)
1072 headerchunk = ''.join(header)
1073 except TypeError:
1073 except TypeError:
1074 raise TypeError(r'Found a non-bytes trying to '
1074 raise TypeError(r'Found a non-bytes trying to '
1075 r'build bundle part header: %r' % header)
1075 r'build bundle part header: %r' % header)
1076 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
1076 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
1077 yield _pack(_fpartheadersize, len(headerchunk))
1077 yield _pack(_fpartheadersize, len(headerchunk))
1078 yield headerchunk
1078 yield headerchunk
1079 ## payload
1079 ## payload
1080 try:
1080 try:
1081 for chunk in self._payloadchunks():
1081 for chunk in self._payloadchunks():
1082 outdebug(ui, 'payload chunk size: %i' % len(chunk))
1082 outdebug(ui, 'payload chunk size: %i' % len(chunk))
1083 yield _pack(_fpayloadsize, len(chunk))
1083 yield _pack(_fpayloadsize, len(chunk))
1084 yield chunk
1084 yield chunk
1085 except GeneratorExit:
1085 except GeneratorExit:
1086 # GeneratorExit means that nobody is listening for our
1086 # GeneratorExit means that nobody is listening for our
1087 # results anyway, so just bail quickly rather than trying
1087 # results anyway, so just bail quickly rather than trying
1088 # to produce an error part.
1088 # to produce an error part.
1089 ui.debug('bundle2-generatorexit\n')
1089 ui.debug('bundle2-generatorexit\n')
1090 raise
1090 raise
1091 except BaseException as exc:
1091 except BaseException as exc:
1092 bexc = util.forcebytestr(exc)
1092 bexc = util.forcebytestr(exc)
1093 # backup exception data for later
1093 # backup exception data for later
1094 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
1094 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
1095 % bexc)
1095 % bexc)
1096 tb = sys.exc_info()[2]
1096 tb = sys.exc_info()[2]
1097 msg = 'unexpected error: %s' % bexc
1097 msg = 'unexpected error: %s' % bexc
1098 interpart = bundlepart('error:abort', [('message', msg)],
1098 interpart = bundlepart('error:abort', [('message', msg)],
1099 mandatory=False)
1099 mandatory=False)
1100 interpart.id = 0
1100 interpart.id = 0
1101 yield _pack(_fpayloadsize, -1)
1101 yield _pack(_fpayloadsize, -1)
1102 for chunk in interpart.getchunks(ui=ui):
1102 for chunk in interpart.getchunks(ui=ui):
1103 yield chunk
1103 yield chunk
1104 outdebug(ui, 'closing payload chunk')
1104 outdebug(ui, 'closing payload chunk')
1105 # abort current part payload
1105 # abort current part payload
1106 yield _pack(_fpayloadsize, 0)
1106 yield _pack(_fpayloadsize, 0)
1107 pycompat.raisewithtb(exc, tb)
1107 pycompat.raisewithtb(exc, tb)
1108 # end of payload
1108 # end of payload
1109 outdebug(ui, 'closing payload chunk')
1109 outdebug(ui, 'closing payload chunk')
1110 yield _pack(_fpayloadsize, 0)
1110 yield _pack(_fpayloadsize, 0)
1111 self._generated = True
1111 self._generated = True
1112
1112
1113 def _payloadchunks(self):
1113 def _payloadchunks(self):
1114 """yield chunks of a the part payload
1114 """yield chunks of a the part payload
1115
1115
1116 Exists to handle the different methods to provide data to a part."""
1116 Exists to handle the different methods to provide data to a part."""
1117 # we only support fixed size data now.
1117 # we only support fixed size data now.
1118 # This will be improved in the future.
1118 # This will be improved in the future.
1119 if (util.safehasattr(self.data, 'next')
1119 if (util.safehasattr(self.data, 'next')
1120 or util.safehasattr(self.data, '__next__')):
1120 or util.safehasattr(self.data, '__next__')):
1121 buff = util.chunkbuffer(self.data)
1121 buff = util.chunkbuffer(self.data)
1122 chunk = buff.read(preferedchunksize)
1122 chunk = buff.read(preferedchunksize)
1123 while chunk:
1123 while chunk:
1124 yield chunk
1124 yield chunk
1125 chunk = buff.read(preferedchunksize)
1125 chunk = buff.read(preferedchunksize)
1126 elif len(self.data):
1126 elif len(self.data):
1127 yield self.data
1127 yield self.data
1128
1128
1129
1129
1130 flaginterrupt = -1
1130 flaginterrupt = -1
1131
1131
1132 class interrupthandler(unpackermixin):
1132 class interrupthandler(unpackermixin):
1133 """read one part and process it with restricted capability
1133 """read one part and process it with restricted capability
1134
1134
1135 This allows to transmit exception raised on the producer size during part
1135 This allows to transmit exception raised on the producer size during part
1136 iteration while the consumer is reading a part.
1136 iteration while the consumer is reading a part.
1137
1137
1138 Part processed in this manner only have access to a ui object,"""
1138 Part processed in this manner only have access to a ui object,"""
1139
1139
1140 def __init__(self, ui, fp):
1140 def __init__(self, ui, fp):
1141 super(interrupthandler, self).__init__(fp)
1141 super(interrupthandler, self).__init__(fp)
1142 self.ui = ui
1142 self.ui = ui
1143
1143
1144 def _readpartheader(self):
1144 def _readpartheader(self):
1145 """reads a part header size and return the bytes blob
1145 """reads a part header size and return the bytes blob
1146
1146
1147 returns None if empty"""
1147 returns None if empty"""
1148 headersize = self._unpack(_fpartheadersize)[0]
1148 headersize = self._unpack(_fpartheadersize)[0]
1149 if headersize < 0:
1149 if headersize < 0:
1150 raise error.BundleValueError('negative part header size: %i'
1150 raise error.BundleValueError('negative part header size: %i'
1151 % headersize)
1151 % headersize)
1152 indebug(self.ui, 'part header size: %i\n' % headersize)
1152 indebug(self.ui, 'part header size: %i\n' % headersize)
1153 if headersize:
1153 if headersize:
1154 return self._readexact(headersize)
1154 return self._readexact(headersize)
1155 return None
1155 return None
1156
1156
1157 def __call__(self):
1157 def __call__(self):
1158
1158
1159 self.ui.debug('bundle2-input-stream-interrupt:'
1159 self.ui.debug('bundle2-input-stream-interrupt:'
1160 ' opening out of band context\n')
1160 ' opening out of band context\n')
1161 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1161 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1162 headerblock = self._readpartheader()
1162 headerblock = self._readpartheader()
1163 if headerblock is None:
1163 if headerblock is None:
1164 indebug(self.ui, 'no part found during interruption.')
1164 indebug(self.ui, 'no part found during interruption.')
1165 return
1165 return
1166 part = unbundlepart(self.ui, headerblock, self._fp)
1166 part = unbundlepart(self.ui, headerblock, self._fp)
1167 op = interruptoperation(self.ui)
1167 op = interruptoperation(self.ui)
1168 hardabort = False
1168 hardabort = False
1169 try:
1169 try:
1170 _processpart(op, part)
1170 _processpart(op, part)
1171 except (SystemExit, KeyboardInterrupt):
1171 except (SystemExit, KeyboardInterrupt):
1172 hardabort = True
1172 hardabort = True
1173 raise
1173 raise
1174 finally:
1174 finally:
1175 if not hardabort:
1175 if not hardabort:
1176 part.consume()
1176 part.consume()
1177 self.ui.debug('bundle2-input-stream-interrupt:'
1177 self.ui.debug('bundle2-input-stream-interrupt:'
1178 ' closing out of band context\n')
1178 ' closing out of band context\n')
1179
1179
1180 class interruptoperation(object):
1180 class interruptoperation(object):
1181 """A limited operation to be use by part handler during interruption
1181 """A limited operation to be use by part handler during interruption
1182
1182
1183 It only have access to an ui object.
1183 It only have access to an ui object.
1184 """
1184 """
1185
1185
1186 def __init__(self, ui):
1186 def __init__(self, ui):
1187 self.ui = ui
1187 self.ui = ui
1188 self.reply = None
1188 self.reply = None
1189 self.captureoutput = False
1189 self.captureoutput = False
1190
1190
1191 @property
1191 @property
1192 def repo(self):
1192 def repo(self):
1193 raise error.ProgrammingError('no repo access from stream interruption')
1193 raise error.ProgrammingError('no repo access from stream interruption')
1194
1194
1195 def gettransaction(self):
1195 def gettransaction(self):
1196 raise TransactionUnavailable('no repo access from stream interruption')
1196 raise TransactionUnavailable('no repo access from stream interruption')
1197
1197
1198 def decodepayloadchunks(ui, fh):
1198 def decodepayloadchunks(ui, fh):
1199 """Reads bundle2 part payload data into chunks.
1199 """Reads bundle2 part payload data into chunks.
1200
1200
1201 Part payload data consists of framed chunks. This function takes
1201 Part payload data consists of framed chunks. This function takes
1202 a file handle and emits those chunks.
1202 a file handle and emits those chunks.
1203 """
1203 """
1204 dolog = ui.configbool('devel', 'bundle2.debug')
1204 dolog = ui.configbool('devel', 'bundle2.debug')
1205 debug = ui.debug
1205 debug = ui.debug
1206
1206
1207 headerstruct = struct.Struct(_fpayloadsize)
1207 headerstruct = struct.Struct(_fpayloadsize)
1208 headersize = headerstruct.size
1208 headersize = headerstruct.size
1209 unpack = headerstruct.unpack
1209 unpack = headerstruct.unpack
1210
1210
1211 readexactly = changegroup.readexactly
1211 readexactly = changegroup.readexactly
1212 read = fh.read
1212 read = fh.read
1213
1213
1214 chunksize = unpack(readexactly(fh, headersize))[0]
1214 chunksize = unpack(readexactly(fh, headersize))[0]
1215 indebug(ui, 'payload chunk size: %i' % chunksize)
1215 indebug(ui, 'payload chunk size: %i' % chunksize)
1216
1216
1217 # changegroup.readexactly() is inlined below for performance.
1217 # changegroup.readexactly() is inlined below for performance.
1218 while chunksize:
1218 while chunksize:
1219 if chunksize >= 0:
1219 if chunksize >= 0:
1220 s = read(chunksize)
1220 s = read(chunksize)
1221 if len(s) < chunksize:
1221 if len(s) < chunksize:
1222 raise error.Abort(_('stream ended unexpectedly '
1222 raise error.Abort(_('stream ended unexpectedly '
1223 ' (got %d bytes, expected %d)') %
1223 ' (got %d bytes, expected %d)') %
1224 (len(s), chunksize))
1224 (len(s), chunksize))
1225
1225
1226 yield s
1226 yield s
1227 elif chunksize == flaginterrupt:
1227 elif chunksize == flaginterrupt:
1228 # Interrupt "signal" detected. The regular stream is interrupted
1228 # Interrupt "signal" detected. The regular stream is interrupted
1229 # and a bundle2 part follows. Consume it.
1229 # and a bundle2 part follows. Consume it.
1230 interrupthandler(ui, fh)()
1230 interrupthandler(ui, fh)()
1231 else:
1231 else:
1232 raise error.BundleValueError(
1232 raise error.BundleValueError(
1233 'negative payload chunk size: %s' % chunksize)
1233 'negative payload chunk size: %s' % chunksize)
1234
1234
1235 s = read(headersize)
1235 s = read(headersize)
1236 if len(s) < headersize:
1236 if len(s) < headersize:
1237 raise error.Abort(_('stream ended unexpectedly '
1237 raise error.Abort(_('stream ended unexpectedly '
1238 ' (got %d bytes, expected %d)') %
1238 ' (got %d bytes, expected %d)') %
1239 (len(s), chunksize))
1239 (len(s), chunksize))
1240
1240
1241 chunksize = unpack(s)[0]
1241 chunksize = unpack(s)[0]
1242
1242
1243 # indebug() inlined for performance.
1243 # indebug() inlined for performance.
1244 if dolog:
1244 if dolog:
1245 debug('bundle2-input: payload chunk size: %i\n' % chunksize)
1245 debug('bundle2-input: payload chunk size: %i\n' % chunksize)
1246
1246
1247 class unbundlepart(unpackermixin):
1247 class unbundlepart(unpackermixin):
1248 """a bundle part read from a bundle"""
1248 """a bundle part read from a bundle"""
1249
1249
1250 def __init__(self, ui, header, fp):
1250 def __init__(self, ui, header, fp):
1251 super(unbundlepart, self).__init__(fp)
1251 super(unbundlepart, self).__init__(fp)
1252 self._seekable = (util.safehasattr(fp, 'seek') and
1252 self._seekable = (util.safehasattr(fp, 'seek') and
1253 util.safehasattr(fp, 'tell'))
1253 util.safehasattr(fp, 'tell'))
1254 self.ui = ui
1254 self.ui = ui
1255 # unbundle state attr
1255 # unbundle state attr
1256 self._headerdata = header
1256 self._headerdata = header
1257 self._headeroffset = 0
1257 self._headeroffset = 0
1258 self._initialized = False
1258 self._initialized = False
1259 self.consumed = False
1259 self.consumed = False
1260 # part data
1260 # part data
1261 self.id = None
1261 self.id = None
1262 self.type = None
1262 self.type = None
1263 self.mandatoryparams = None
1263 self.mandatoryparams = None
1264 self.advisoryparams = None
1264 self.advisoryparams = None
1265 self.params = None
1265 self.params = None
1266 self.mandatorykeys = ()
1266 self.mandatorykeys = ()
1267 self._readheader()
1267 self._readheader()
1268 self._mandatory = None
1268 self._mandatory = None
1269 self._pos = 0
1269 self._pos = 0
1270
1270
1271 def _fromheader(self, size):
1271 def _fromheader(self, size):
1272 """return the next <size> byte from the header"""
1272 """return the next <size> byte from the header"""
1273 offset = self._headeroffset
1273 offset = self._headeroffset
1274 data = self._headerdata[offset:(offset + size)]
1274 data = self._headerdata[offset:(offset + size)]
1275 self._headeroffset = offset + size
1275 self._headeroffset = offset + size
1276 return data
1276 return data
1277
1277
1278 def _unpackheader(self, format):
1278 def _unpackheader(self, format):
1279 """read given format from header
1279 """read given format from header
1280
1280
1281 This automatically compute the size of the format to read."""
1281 This automatically compute the size of the format to read."""
1282 data = self._fromheader(struct.calcsize(format))
1282 data = self._fromheader(struct.calcsize(format))
1283 return _unpack(format, data)
1283 return _unpack(format, data)
1284
1284
1285 def _initparams(self, mandatoryparams, advisoryparams):
1285 def _initparams(self, mandatoryparams, advisoryparams):
1286 """internal function to setup all logic related parameters"""
1286 """internal function to setup all logic related parameters"""
1287 # make it read only to prevent people touching it by mistake.
1287 # make it read only to prevent people touching it by mistake.
1288 self.mandatoryparams = tuple(mandatoryparams)
1288 self.mandatoryparams = tuple(mandatoryparams)
1289 self.advisoryparams = tuple(advisoryparams)
1289 self.advisoryparams = tuple(advisoryparams)
1290 # user friendly UI
1290 # user friendly UI
1291 self.params = util.sortdict(self.mandatoryparams)
1291 self.params = util.sortdict(self.mandatoryparams)
1292 self.params.update(self.advisoryparams)
1292 self.params.update(self.advisoryparams)
1293 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1293 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1294
1294
1295 def _readheader(self):
1295 def _readheader(self):
1296 """read the header and setup the object"""
1296 """read the header and setup the object"""
1297 typesize = self._unpackheader(_fparttypesize)[0]
1297 typesize = self._unpackheader(_fparttypesize)[0]
1298 self.type = self._fromheader(typesize)
1298 self.type = self._fromheader(typesize)
1299 indebug(self.ui, 'part type: "%s"' % self.type)
1299 indebug(self.ui, 'part type: "%s"' % self.type)
1300 self.id = self._unpackheader(_fpartid)[0]
1300 self.id = self._unpackheader(_fpartid)[0]
1301 indebug(self.ui, 'part id: "%s"' % pycompat.bytestr(self.id))
1301 indebug(self.ui, 'part id: "%s"' % pycompat.bytestr(self.id))
1302 # extract mandatory bit from type
1302 # extract mandatory bit from type
1303 self.mandatory = (self.type != self.type.lower())
1303 self.mandatory = (self.type != self.type.lower())
1304 self.type = self.type.lower()
1304 self.type = self.type.lower()
1305 ## reading parameters
1305 ## reading parameters
1306 # param count
1306 # param count
1307 mancount, advcount = self._unpackheader(_fpartparamcount)
1307 mancount, advcount = self._unpackheader(_fpartparamcount)
1308 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1308 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1309 # param size
1309 # param size
1310 fparamsizes = _makefpartparamsizes(mancount + advcount)
1310 fparamsizes = _makefpartparamsizes(mancount + advcount)
1311 paramsizes = self._unpackheader(fparamsizes)
1311 paramsizes = self._unpackheader(fparamsizes)
1312 # make it a list of couple again
1312 # make it a list of couple again
1313 paramsizes = list(zip(paramsizes[::2], paramsizes[1::2]))
1313 paramsizes = list(zip(paramsizes[::2], paramsizes[1::2]))
1314 # split mandatory from advisory
1314 # split mandatory from advisory
1315 mansizes = paramsizes[:mancount]
1315 mansizes = paramsizes[:mancount]
1316 advsizes = paramsizes[mancount:]
1316 advsizes = paramsizes[mancount:]
1317 # retrieve param value
1317 # retrieve param value
1318 manparams = []
1318 manparams = []
1319 for key, value in mansizes:
1319 for key, value in mansizes:
1320 manparams.append((self._fromheader(key), self._fromheader(value)))
1320 manparams.append((self._fromheader(key), self._fromheader(value)))
1321 advparams = []
1321 advparams = []
1322 for key, value in advsizes:
1322 for key, value in advsizes:
1323 advparams.append((self._fromheader(key), self._fromheader(value)))
1323 advparams.append((self._fromheader(key), self._fromheader(value)))
1324 self._initparams(manparams, advparams)
1324 self._initparams(manparams, advparams)
1325 ## part payload
1325 ## part payload
1326 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1326 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1327 # we read the data, tell it
1327 # we read the data, tell it
1328 self._initialized = True
1328 self._initialized = True
1329
1329
1330 def _payloadchunks(self):
1330 def _payloadchunks(self):
1331 """Generator of decoded chunks in the payload."""
1331 """Generator of decoded chunks in the payload."""
1332 return decodepayloadchunks(self.ui, self._fp)
1332 return decodepayloadchunks(self.ui, self._fp)
1333
1333
1334 def consume(self):
1334 def consume(self):
1335 """Read the part payload until completion.
1335 """Read the part payload until completion.
1336
1336
1337 By consuming the part data, the underlying stream read offset will
1337 By consuming the part data, the underlying stream read offset will
1338 be advanced to the next part (or end of stream).
1338 be advanced to the next part (or end of stream).
1339 """
1339 """
1340 if self.consumed:
1340 if self.consumed:
1341 return
1341 return
1342
1342
1343 chunk = self.read(32768)
1343 chunk = self.read(32768)
1344 while chunk:
1344 while chunk:
1345 self._pos += len(chunk)
1345 self._pos += len(chunk)
1346 chunk = self.read(32768)
1346 chunk = self.read(32768)
1347
1347
1348 def read(self, size=None):
1348 def read(self, size=None):
1349 """read payload data"""
1349 """read payload data"""
1350 if not self._initialized:
1350 if not self._initialized:
1351 self._readheader()
1351 self._readheader()
1352 if size is None:
1352 if size is None:
1353 data = self._payloadstream.read()
1353 data = self._payloadstream.read()
1354 else:
1354 else:
1355 data = self._payloadstream.read(size)
1355 data = self._payloadstream.read(size)
1356 self._pos += len(data)
1356 self._pos += len(data)
1357 if size is None or len(data) < size:
1357 if size is None or len(data) < size:
1358 if not self.consumed and self._pos:
1358 if not self.consumed and self._pos:
1359 self.ui.debug('bundle2-input-part: total payload size %i\n'
1359 self.ui.debug('bundle2-input-part: total payload size %i\n'
1360 % self._pos)
1360 % self._pos)
1361 self.consumed = True
1361 self.consumed = True
1362 return data
1362 return data
1363
1363
1364 class seekableunbundlepart(unbundlepart):
1364 class seekableunbundlepart(unbundlepart):
1365 """A bundle2 part in a bundle that is seekable.
1365 """A bundle2 part in a bundle that is seekable.
1366
1366
1367 Regular ``unbundlepart`` instances can only be read once. This class
1367 Regular ``unbundlepart`` instances can only be read once. This class
1368 extends ``unbundlepart`` to enable bi-directional seeking within the
1368 extends ``unbundlepart`` to enable bi-directional seeking within the
1369 part.
1369 part.
1370
1370
1371 Bundle2 part data consists of framed chunks. Offsets when seeking
1371 Bundle2 part data consists of framed chunks. Offsets when seeking
1372 refer to the decoded data, not the offsets in the underlying bundle2
1372 refer to the decoded data, not the offsets in the underlying bundle2
1373 stream.
1373 stream.
1374
1374
1375 To facilitate quickly seeking within the decoded data, instances of this
1375 To facilitate quickly seeking within the decoded data, instances of this
1376 class maintain a mapping between offsets in the underlying stream and
1376 class maintain a mapping between offsets in the underlying stream and
1377 the decoded payload. This mapping will consume memory in proportion
1377 the decoded payload. This mapping will consume memory in proportion
1378 to the number of chunks within the payload (which almost certainly
1378 to the number of chunks within the payload (which almost certainly
1379 increases in proportion with the size of the part).
1379 increases in proportion with the size of the part).
1380 """
1380 """
1381 def __init__(self, ui, header, fp):
1381 def __init__(self, ui, header, fp):
1382 # (payload, file) offsets for chunk starts.
1382 # (payload, file) offsets for chunk starts.
1383 self._chunkindex = []
1383 self._chunkindex = []
1384
1384
1385 super(seekableunbundlepart, self).__init__(ui, header, fp)
1385 super(seekableunbundlepart, self).__init__(ui, header, fp)
1386
1386
1387 def _payloadchunks(self, chunknum=0):
1387 def _payloadchunks(self, chunknum=0):
1388 '''seek to specified chunk and start yielding data'''
1388 '''seek to specified chunk and start yielding data'''
1389 if len(self._chunkindex) == 0:
1389 if len(self._chunkindex) == 0:
1390 assert chunknum == 0, 'Must start with chunk 0'
1390 assert chunknum == 0, 'Must start with chunk 0'
1391 self._chunkindex.append((0, self._tellfp()))
1391 self._chunkindex.append((0, self._tellfp()))
1392 else:
1392 else:
1393 assert chunknum < len(self._chunkindex), \
1393 assert chunknum < len(self._chunkindex), \
1394 'Unknown chunk %d' % chunknum
1394 'Unknown chunk %d' % chunknum
1395 self._seekfp(self._chunkindex[chunknum][1])
1395 self._seekfp(self._chunkindex[chunknum][1])
1396
1396
1397 pos = self._chunkindex[chunknum][0]
1397 pos = self._chunkindex[chunknum][0]
1398
1398
1399 for chunk in decodepayloadchunks(self.ui, self._fp):
1399 for chunk in decodepayloadchunks(self.ui, self._fp):
1400 chunknum += 1
1400 chunknum += 1
1401 pos += len(chunk)
1401 pos += len(chunk)
1402 if chunknum == len(self._chunkindex):
1402 if chunknum == len(self._chunkindex):
1403 self._chunkindex.append((pos, self._tellfp()))
1403 self._chunkindex.append((pos, self._tellfp()))
1404
1404
1405 yield chunk
1405 yield chunk
1406
1406
1407 def _findchunk(self, pos):
1407 def _findchunk(self, pos):
1408 '''for a given payload position, return a chunk number and offset'''
1408 '''for a given payload position, return a chunk number and offset'''
1409 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1409 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1410 if ppos == pos:
1410 if ppos == pos:
1411 return chunk, 0
1411 return chunk, 0
1412 elif ppos > pos:
1412 elif ppos > pos:
1413 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1413 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1414 raise ValueError('Unknown chunk')
1414 raise ValueError('Unknown chunk')
1415
1415
1416 def tell(self):
1416 def tell(self):
1417 return self._pos
1417 return self._pos
1418
1418
1419 def seek(self, offset, whence=os.SEEK_SET):
1419 def seek(self, offset, whence=os.SEEK_SET):
1420 if whence == os.SEEK_SET:
1420 if whence == os.SEEK_SET:
1421 newpos = offset
1421 newpos = offset
1422 elif whence == os.SEEK_CUR:
1422 elif whence == os.SEEK_CUR:
1423 newpos = self._pos + offset
1423 newpos = self._pos + offset
1424 elif whence == os.SEEK_END:
1424 elif whence == os.SEEK_END:
1425 if not self.consumed:
1425 if not self.consumed:
1426 # Can't use self.consume() here because it advances self._pos.
1426 # Can't use self.consume() here because it advances self._pos.
1427 chunk = self.read(32768)
1427 chunk = self.read(32768)
1428 while chunk:
1428 while chunk:
1429 chunk = self.read(32768)
1429 chunk = self.read(32768)
1430 newpos = self._chunkindex[-1][0] - offset
1430 newpos = self._chunkindex[-1][0] - offset
1431 else:
1431 else:
1432 raise ValueError('Unknown whence value: %r' % (whence,))
1432 raise ValueError('Unknown whence value: %r' % (whence,))
1433
1433
1434 if newpos > self._chunkindex[-1][0] and not self.consumed:
1434 if newpos > self._chunkindex[-1][0] and not self.consumed:
1435 # Can't use self.consume() here because it advances self._pos.
1435 # Can't use self.consume() here because it advances self._pos.
1436 chunk = self.read(32768)
1436 chunk = self.read(32768)
1437 while chunk:
1437 while chunk:
1438 chunk = self.read(32668)
1438 chunk = self.read(32668)
1439
1439
1440 if not 0 <= newpos <= self._chunkindex[-1][0]:
1440 if not 0 <= newpos <= self._chunkindex[-1][0]:
1441 raise ValueError('Offset out of range')
1441 raise ValueError('Offset out of range')
1442
1442
1443 if self._pos != newpos:
1443 if self._pos != newpos:
1444 chunk, internaloffset = self._findchunk(newpos)
1444 chunk, internaloffset = self._findchunk(newpos)
1445 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1445 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1446 adjust = self.read(internaloffset)
1446 adjust = self.read(internaloffset)
1447 if len(adjust) != internaloffset:
1447 if len(adjust) != internaloffset:
1448 raise error.Abort(_('Seek failed\n'))
1448 raise error.Abort(_('Seek failed\n'))
1449 self._pos = newpos
1449 self._pos = newpos
1450
1450
1451 def _seekfp(self, offset, whence=0):
1451 def _seekfp(self, offset, whence=0):
1452 """move the underlying file pointer
1452 """move the underlying file pointer
1453
1453
1454 This method is meant for internal usage by the bundle2 protocol only.
1454 This method is meant for internal usage by the bundle2 protocol only.
1455 They directly manipulate the low level stream including bundle2 level
1455 They directly manipulate the low level stream including bundle2 level
1456 instruction.
1456 instruction.
1457
1457
1458 Do not use it to implement higher-level logic or methods."""
1458 Do not use it to implement higher-level logic or methods."""
1459 if self._seekable:
1459 if self._seekable:
1460 return self._fp.seek(offset, whence)
1460 return self._fp.seek(offset, whence)
1461 else:
1461 else:
1462 raise NotImplementedError(_('File pointer is not seekable'))
1462 raise NotImplementedError(_('File pointer is not seekable'))
1463
1463
1464 def _tellfp(self):
1464 def _tellfp(self):
1465 """return the file offset, or None if file is not seekable
1465 """return the file offset, or None if file is not seekable
1466
1466
1467 This method is meant for internal usage by the bundle2 protocol only.
1467 This method is meant for internal usage by the bundle2 protocol only.
1468 They directly manipulate the low level stream including bundle2 level
1468 They directly manipulate the low level stream including bundle2 level
1469 instruction.
1469 instruction.
1470
1470
1471 Do not use it to implement higher-level logic or methods."""
1471 Do not use it to implement higher-level logic or methods."""
1472 if self._seekable:
1472 if self._seekable:
1473 try:
1473 try:
1474 return self._fp.tell()
1474 return self._fp.tell()
1475 except IOError as e:
1475 except IOError as e:
1476 if e.errno == errno.ESPIPE:
1476 if e.errno == errno.ESPIPE:
1477 self._seekable = False
1477 self._seekable = False
1478 else:
1478 else:
1479 raise
1479 raise
1480 return None
1480 return None
1481
1481
1482 # These are only the static capabilities.
1482 # These are only the static capabilities.
1483 # Check the 'getrepocaps' function for the rest.
1483 # Check the 'getrepocaps' function for the rest.
1484 capabilities = {'HG20': (),
1484 capabilities = {'HG20': (),
1485 'bookmarks': (),
1485 'bookmarks': (),
1486 'error': ('abort', 'unsupportedcontent', 'pushraced',
1486 'error': ('abort', 'unsupportedcontent', 'pushraced',
1487 'pushkey'),
1487 'pushkey'),
1488 'listkeys': (),
1488 'listkeys': (),
1489 'pushkey': (),
1489 'pushkey': (),
1490 'digests': tuple(sorted(util.DIGESTS.keys())),
1490 'digests': tuple(sorted(util.DIGESTS.keys())),
1491 'remote-changegroup': ('http', 'https'),
1491 'remote-changegroup': ('http', 'https'),
1492 'hgtagsfnodes': (),
1492 'hgtagsfnodes': (),
1493 'phases': ('heads',),
1493 'phases': ('heads',),
1494 'stream': ('v2',),
1494 'stream': ('v2',),
1495 }
1495 }
1496
1496
1497 def getrepocaps(repo, allowpushback=False, role=None):
1497 def getrepocaps(repo, allowpushback=False, role=None):
1498 """return the bundle2 capabilities for a given repo
1498 """return the bundle2 capabilities for a given repo
1499
1499
1500 Exists to allow extensions (like evolution) to mutate the capabilities.
1500 Exists to allow extensions (like evolution) to mutate the capabilities.
1501
1501
1502 The returned value is used for servers advertising their capabilities as
1502 The returned value is used for servers advertising their capabilities as
1503 well as clients advertising their capabilities to servers as part of
1503 well as clients advertising their capabilities to servers as part of
1504 bundle2 requests. The ``role`` argument specifies which is which.
1504 bundle2 requests. The ``role`` argument specifies which is which.
1505 """
1505 """
1506 if role not in ('client', 'server'):
1506 if role not in ('client', 'server'):
1507 raise error.ProgrammingError('role argument must be client or server')
1507 raise error.ProgrammingError('role argument must be client or server')
1508
1508
1509 caps = capabilities.copy()
1509 caps = capabilities.copy()
1510 caps['changegroup'] = tuple(sorted(
1510 caps['changegroup'] = tuple(sorted(
1511 changegroup.supportedincomingversions(repo)))
1511 changegroup.supportedincomingversions(repo)))
1512 if obsolete.isenabled(repo, obsolete.exchangeopt):
1512 if obsolete.isenabled(repo, obsolete.exchangeopt):
1513 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1513 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1514 caps['obsmarkers'] = supportedformat
1514 caps['obsmarkers'] = supportedformat
1515 if allowpushback:
1515 if allowpushback:
1516 caps['pushback'] = ()
1516 caps['pushback'] = ()
1517 cpmode = repo.ui.config('server', 'concurrent-push-mode')
1517 cpmode = repo.ui.config('server', 'concurrent-push-mode')
1518 if cpmode == 'check-related':
1518 if cpmode == 'check-related':
1519 caps['checkheads'] = ('related',)
1519 caps['checkheads'] = ('related',)
1520 if 'phases' in repo.ui.configlist('devel', 'legacy.exchange'):
1520 if 'phases' in repo.ui.configlist('devel', 'legacy.exchange'):
1521 caps.pop('phases')
1521 caps.pop('phases')
1522
1522
1523 # Don't advertise stream clone support in server mode if not configured.
1523 # Don't advertise stream clone support in server mode if not configured.
1524 if role == 'server':
1524 if role == 'server':
1525 streamsupported = repo.ui.configbool('server', 'uncompressed',
1525 streamsupported = repo.ui.configbool('server', 'uncompressed',
1526 untrusted=True)
1526 untrusted=True)
1527 featuresupported = repo.ui.configbool('experimental', 'bundle2.stream')
1527 featuresupported = repo.ui.configbool('experimental', 'bundle2.stream')
1528
1528
1529 if not streamsupported or not featuresupported:
1529 if not streamsupported or not featuresupported:
1530 caps.pop('stream')
1530 caps.pop('stream')
1531 # Else always advertise support on client, because payload support
1531 # Else always advertise support on client, because payload support
1532 # should always be advertised.
1532 # should always be advertised.
1533
1533
1534 return caps
1534 return caps
1535
1535
1536 def bundle2caps(remote):
1536 def bundle2caps(remote):
1537 """return the bundle capabilities of a peer as dict"""
1537 """return the bundle capabilities of a peer as dict"""
1538 raw = remote.capable('bundle2')
1538 raw = remote.capable('bundle2')
1539 if not raw and raw != '':
1539 if not raw and raw != '':
1540 return {}
1540 return {}
1541 capsblob = urlreq.unquote(remote.capable('bundle2'))
1541 capsblob = urlreq.unquote(remote.capable('bundle2'))
1542 return decodecaps(capsblob)
1542 return decodecaps(capsblob)
1543
1543
1544 def obsmarkersversion(caps):
1544 def obsmarkersversion(caps):
1545 """extract the list of supported obsmarkers versions from a bundle2caps dict
1545 """extract the list of supported obsmarkers versions from a bundle2caps dict
1546 """
1546 """
1547 obscaps = caps.get('obsmarkers', ())
1547 obscaps = caps.get('obsmarkers', ())
1548 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1548 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1549
1549
1550 def writenewbundle(ui, repo, source, filename, bundletype, outgoing, opts,
1550 def writenewbundle(ui, repo, source, filename, bundletype, outgoing, opts,
1551 vfs=None, compression=None, compopts=None):
1551 vfs=None, compression=None, compopts=None):
1552 if bundletype.startswith('HG10'):
1552 if bundletype.startswith('HG10'):
1553 cg = changegroup.makechangegroup(repo, outgoing, '01', source)
1553 cg = changegroup.makechangegroup(repo, outgoing, '01', source)
1554 return writebundle(ui, cg, filename, bundletype, vfs=vfs,
1554 return writebundle(ui, cg, filename, bundletype, vfs=vfs,
1555 compression=compression, compopts=compopts)
1555 compression=compression, compopts=compopts)
1556 elif not bundletype.startswith('HG20'):
1556 elif not bundletype.startswith('HG20'):
1557 raise error.ProgrammingError('unknown bundle type: %s' % bundletype)
1557 raise error.ProgrammingError('unknown bundle type: %s' % bundletype)
1558
1558
1559 caps = {}
1559 caps = {}
1560 if 'obsolescence' in opts:
1560 if 'obsolescence' in opts:
1561 caps['obsmarkers'] = ('V1',)
1561 caps['obsmarkers'] = ('V1',)
1562 bundle = bundle20(ui, caps)
1562 bundle = bundle20(ui, caps)
1563 bundle.setcompression(compression, compopts)
1563 bundle.setcompression(compression, compopts)
1564 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1564 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1565 chunkiter = bundle.getchunks()
1565 chunkiter = bundle.getchunks()
1566
1566
1567 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1567 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1568
1568
1569 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1569 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1570 # We should eventually reconcile this logic with the one behind
1570 # We should eventually reconcile this logic with the one behind
1571 # 'exchange.getbundle2partsgenerator'.
1571 # 'exchange.getbundle2partsgenerator'.
1572 #
1572 #
1573 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1573 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1574 # different right now. So we keep them separated for now for the sake of
1574 # different right now. So we keep them separated for now for the sake of
1575 # simplicity.
1575 # simplicity.
1576
1576
1577 # we always want a changegroup in such bundle
1577 # we always want a changegroup in such bundle
1578 cgversion = opts.get('cg.version')
1578 cgversion = opts.get('cg.version')
1579 if cgversion is None:
1579 if cgversion is None:
1580 cgversion = changegroup.safeversion(repo)
1580 cgversion = changegroup.safeversion(repo)
1581 cg = changegroup.makechangegroup(repo, outgoing, cgversion, source)
1581 cg = changegroup.makechangegroup(repo, outgoing, cgversion, source)
1582 part = bundler.newpart('changegroup', data=cg.getchunks())
1582 part = bundler.newpart('changegroup', data=cg.getchunks())
1583 part.addparam('version', cg.version)
1583 part.addparam('version', cg.version)
1584 if 'clcount' in cg.extras:
1584 if 'clcount' in cg.extras:
1585 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1585 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1586 mandatory=False)
1586 mandatory=False)
1587 if opts.get('phases') and repo.revs('%ln and secret()',
1587 if opts.get('phases') and repo.revs('%ln and secret()',
1588 outgoing.missingheads):
1588 outgoing.missingheads):
1589 part.addparam('targetphase', '%d' % phases.secret, mandatory=False)
1589 part.addparam('targetphase', '%d' % phases.secret, mandatory=False)
1590
1590
1591 addparttagsfnodescache(repo, bundler, outgoing)
1591 addparttagsfnodescache(repo, bundler, outgoing)
1592
1592
1593 if opts.get('obsolescence', False):
1593 if opts.get('obsolescence', False):
1594 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1594 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1595 buildobsmarkerspart(bundler, obsmarkers)
1595 buildobsmarkerspart(bundler, obsmarkers)
1596
1596
1597 if opts.get('phases', False):
1597 if opts.get('phases', False):
1598 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1598 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1599 phasedata = phases.binaryencode(headsbyphase)
1599 phasedata = phases.binaryencode(headsbyphase)
1600 bundler.newpart('phase-heads', data=phasedata)
1600 bundler.newpart('phase-heads', data=phasedata)
1601
1601
1602 def addparttagsfnodescache(repo, bundler, outgoing):
1602 def addparttagsfnodescache(repo, bundler, outgoing):
1603 # we include the tags fnode cache for the bundle changeset
1603 # we include the tags fnode cache for the bundle changeset
1604 # (as an optional parts)
1604 # (as an optional parts)
1605 cache = tags.hgtagsfnodescache(repo.unfiltered())
1605 cache = tags.hgtagsfnodescache(repo.unfiltered())
1606 chunks = []
1606 chunks = []
1607
1607
1608 # .hgtags fnodes are only relevant for head changesets. While we could
1608 # .hgtags fnodes are only relevant for head changesets. While we could
1609 # transfer values for all known nodes, there will likely be little to
1609 # transfer values for all known nodes, there will likely be little to
1610 # no benefit.
1610 # no benefit.
1611 #
1611 #
1612 # We don't bother using a generator to produce output data because
1612 # We don't bother using a generator to produce output data because
1613 # a) we only have 40 bytes per head and even esoteric numbers of heads
1613 # a) we only have 40 bytes per head and even esoteric numbers of heads
1614 # consume little memory (1M heads is 40MB) b) we don't want to send the
1614 # consume little memory (1M heads is 40MB) b) we don't want to send the
1615 # part if we don't have entries and knowing if we have entries requires
1615 # part if we don't have entries and knowing if we have entries requires
1616 # cache lookups.
1616 # cache lookups.
1617 for node in outgoing.missingheads:
1617 for node in outgoing.missingheads:
1618 # Don't compute missing, as this may slow down serving.
1618 # Don't compute missing, as this may slow down serving.
1619 fnode = cache.getfnode(node, computemissing=False)
1619 fnode = cache.getfnode(node, computemissing=False)
1620 if fnode is not None:
1620 if fnode is not None:
1621 chunks.extend([node, fnode])
1621 chunks.extend([node, fnode])
1622
1622
1623 if chunks:
1623 if chunks:
1624 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1624 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1625
1625
1626 def buildobsmarkerspart(bundler, markers):
1626 def buildobsmarkerspart(bundler, markers):
1627 """add an obsmarker part to the bundler with <markers>
1627 """add an obsmarker part to the bundler with <markers>
1628
1628
1629 No part is created if markers is empty.
1629 No part is created if markers is empty.
1630 Raises ValueError if the bundler doesn't support any known obsmarker format.
1630 Raises ValueError if the bundler doesn't support any known obsmarker format.
1631 """
1631 """
1632 if not markers:
1632 if not markers:
1633 return None
1633 return None
1634
1634
1635 remoteversions = obsmarkersversion(bundler.capabilities)
1635 remoteversions = obsmarkersversion(bundler.capabilities)
1636 version = obsolete.commonversion(remoteversions)
1636 version = obsolete.commonversion(remoteversions)
1637 if version is None:
1637 if version is None:
1638 raise ValueError('bundler does not support common obsmarker format')
1638 raise ValueError('bundler does not support common obsmarker format')
1639 stream = obsolete.encodemarkers(markers, True, version=version)
1639 stream = obsolete.encodemarkers(markers, True, version=version)
1640 return bundler.newpart('obsmarkers', data=stream)
1640 return bundler.newpart('obsmarkers', data=stream)
1641
1641
1642 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1642 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1643 compopts=None):
1643 compopts=None):
1644 """Write a bundle file and return its filename.
1644 """Write a bundle file and return its filename.
1645
1645
1646 Existing files will not be overwritten.
1646 Existing files will not be overwritten.
1647 If no filename is specified, a temporary file is created.
1647 If no filename is specified, a temporary file is created.
1648 bz2 compression can be turned off.
1648 bz2 compression can be turned off.
1649 The bundle file will be deleted in case of errors.
1649 The bundle file will be deleted in case of errors.
1650 """
1650 """
1651
1651
1652 if bundletype == "HG20":
1652 if bundletype == "HG20":
1653 bundle = bundle20(ui)
1653 bundle = bundle20(ui)
1654 bundle.setcompression(compression, compopts)
1654 bundle.setcompression(compression, compopts)
1655 part = bundle.newpart('changegroup', data=cg.getchunks())
1655 part = bundle.newpart('changegroup', data=cg.getchunks())
1656 part.addparam('version', cg.version)
1656 part.addparam('version', cg.version)
1657 if 'clcount' in cg.extras:
1657 if 'clcount' in cg.extras:
1658 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1658 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1659 mandatory=False)
1659 mandatory=False)
1660 chunkiter = bundle.getchunks()
1660 chunkiter = bundle.getchunks()
1661 else:
1661 else:
1662 # compression argument is only for the bundle2 case
1662 # compression argument is only for the bundle2 case
1663 assert compression is None
1663 assert compression is None
1664 if cg.version != '01':
1664 if cg.version != '01':
1665 raise error.Abort(_('old bundle types only supports v1 '
1665 raise error.Abort(_('old bundle types only supports v1 '
1666 'changegroups'))
1666 'changegroups'))
1667 header, comp = bundletypes[bundletype]
1667 header, comp = bundletypes[bundletype]
1668 if comp not in util.compengines.supportedbundletypes:
1668 if comp not in util.compengines.supportedbundletypes:
1669 raise error.Abort(_('unknown stream compression type: %s')
1669 raise error.Abort(_('unknown stream compression type: %s')
1670 % comp)
1670 % comp)
1671 compengine = util.compengines.forbundletype(comp)
1671 compengine = util.compengines.forbundletype(comp)
1672 def chunkiter():
1672 def chunkiter():
1673 yield header
1673 yield header
1674 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1674 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1675 yield chunk
1675 yield chunk
1676 chunkiter = chunkiter()
1676 chunkiter = chunkiter()
1677
1677
1678 # parse the changegroup data, otherwise we will block
1678 # parse the changegroup data, otherwise we will block
1679 # in case of sshrepo because we don't know the end of the stream
1679 # in case of sshrepo because we don't know the end of the stream
1680 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1680 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1681
1681
1682 def combinechangegroupresults(op):
1682 def combinechangegroupresults(op):
1683 """logic to combine 0 or more addchangegroup results into one"""
1683 """logic to combine 0 or more addchangegroup results into one"""
1684 results = [r.get('return', 0)
1684 results = [r.get('return', 0)
1685 for r in op.records['changegroup']]
1685 for r in op.records['changegroup']]
1686 changedheads = 0
1686 changedheads = 0
1687 result = 1
1687 result = 1
1688 for ret in results:
1688 for ret in results:
1689 # If any changegroup result is 0, return 0
1689 # If any changegroup result is 0, return 0
1690 if ret == 0:
1690 if ret == 0:
1691 result = 0
1691 result = 0
1692 break
1692 break
1693 if ret < -1:
1693 if ret < -1:
1694 changedheads += ret + 1
1694 changedheads += ret + 1
1695 elif ret > 1:
1695 elif ret > 1:
1696 changedheads += ret - 1
1696 changedheads += ret - 1
1697 if changedheads > 0:
1697 if changedheads > 0:
1698 result = 1 + changedheads
1698 result = 1 + changedheads
1699 elif changedheads < 0:
1699 elif changedheads < 0:
1700 result = -1 + changedheads
1700 result = -1 + changedheads
1701 return result
1701 return result
1702
1702
1703 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest',
1703 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest',
1704 'targetphase'))
1704 'targetphase'))
1705 def handlechangegroup(op, inpart):
1705 def handlechangegroup(op, inpart):
1706 """apply a changegroup part on the repo
1706 """apply a changegroup part on the repo
1707
1707
1708 This is a very early implementation that will massive rework before being
1708 This is a very early implementation that will massive rework before being
1709 inflicted to any end-user.
1709 inflicted to any end-user.
1710 """
1710 """
1711 tr = op.gettransaction()
1711 tr = op.gettransaction()
1712 unpackerversion = inpart.params.get('version', '01')
1712 unpackerversion = inpart.params.get('version', '01')
1713 # We should raise an appropriate exception here
1713 # We should raise an appropriate exception here
1714 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1714 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1715 # the source and url passed here are overwritten by the one contained in
1715 # the source and url passed here are overwritten by the one contained in
1716 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1716 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1717 nbchangesets = None
1717 nbchangesets = None
1718 if 'nbchanges' in inpart.params:
1718 if 'nbchanges' in inpart.params:
1719 nbchangesets = int(inpart.params.get('nbchanges'))
1719 nbchangesets = int(inpart.params.get('nbchanges'))
1720 if ('treemanifest' in inpart.params and
1720 if ('treemanifest' in inpart.params and
1721 'treemanifest' not in op.repo.requirements):
1721 'treemanifest' not in op.repo.requirements):
1722 if len(op.repo.changelog) != 0:
1722 if len(op.repo.changelog) != 0:
1723 raise error.Abort(_(
1723 raise error.Abort(_(
1724 "bundle contains tree manifests, but local repo is "
1724 "bundle contains tree manifests, but local repo is "
1725 "non-empty and does not use tree manifests"))
1725 "non-empty and does not use tree manifests"))
1726 op.repo.requirements.add('treemanifest')
1726 op.repo.requirements.add('treemanifest')
1727 op.repo._applyopenerreqs()
1727 op.repo._applyopenerreqs()
1728 op.repo._writerequirements()
1728 op.repo._writerequirements()
1729 extrakwargs = {}
1729 extrakwargs = {}
1730 targetphase = inpart.params.get('targetphase')
1730 targetphase = inpart.params.get('targetphase')
1731 if targetphase is not None:
1731 if targetphase is not None:
1732 extrakwargs['targetphase'] = int(targetphase)
1732 extrakwargs['targetphase'] = int(targetphase)
1733 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2',
1733 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2',
1734 expectedtotal=nbchangesets, **extrakwargs)
1734 expectedtotal=nbchangesets, **extrakwargs)
1735 if op.reply is not None:
1735 if op.reply is not None:
1736 # This is definitely not the final form of this
1736 # This is definitely not the final form of this
1737 # return. But one need to start somewhere.
1737 # return. But one need to start somewhere.
1738 part = op.reply.newpart('reply:changegroup', mandatory=False)
1738 part = op.reply.newpart('reply:changegroup', mandatory=False)
1739 part.addparam(
1739 part.addparam(
1740 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1740 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1741 part.addparam('return', '%i' % ret, mandatory=False)
1741 part.addparam('return', '%i' % ret, mandatory=False)
1742 assert not inpart.read()
1742 assert not inpart.read()
1743
1743
1744 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1744 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1745 ['digest:%s' % k for k in util.DIGESTS.keys()])
1745 ['digest:%s' % k for k in util.DIGESTS.keys()])
1746 @parthandler('remote-changegroup', _remotechangegroupparams)
1746 @parthandler('remote-changegroup', _remotechangegroupparams)
1747 def handleremotechangegroup(op, inpart):
1747 def handleremotechangegroup(op, inpart):
1748 """apply a bundle10 on the repo, given an url and validation information
1748 """apply a bundle10 on the repo, given an url and validation information
1749
1749
1750 All the information about the remote bundle to import are given as
1750 All the information about the remote bundle to import are given as
1751 parameters. The parameters include:
1751 parameters. The parameters include:
1752 - url: the url to the bundle10.
1752 - url: the url to the bundle10.
1753 - size: the bundle10 file size. It is used to validate what was
1753 - size: the bundle10 file size. It is used to validate what was
1754 retrieved by the client matches the server knowledge about the bundle.
1754 retrieved by the client matches the server knowledge about the bundle.
1755 - digests: a space separated list of the digest types provided as
1755 - digests: a space separated list of the digest types provided as
1756 parameters.
1756 parameters.
1757 - digest:<digest-type>: the hexadecimal representation of the digest with
1757 - digest:<digest-type>: the hexadecimal representation of the digest with
1758 that name. Like the size, it is used to validate what was retrieved by
1758 that name. Like the size, it is used to validate what was retrieved by
1759 the client matches what the server knows about the bundle.
1759 the client matches what the server knows about the bundle.
1760
1760
1761 When multiple digest types are given, all of them are checked.
1761 When multiple digest types are given, all of them are checked.
1762 """
1762 """
1763 try:
1763 try:
1764 raw_url = inpart.params['url']
1764 raw_url = inpart.params['url']
1765 except KeyError:
1765 except KeyError:
1766 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1766 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1767 parsed_url = util.url(raw_url)
1767 parsed_url = util.url(raw_url)
1768 if parsed_url.scheme not in capabilities['remote-changegroup']:
1768 if parsed_url.scheme not in capabilities['remote-changegroup']:
1769 raise error.Abort(_('remote-changegroup does not support %s urls') %
1769 raise error.Abort(_('remote-changegroup does not support %s urls') %
1770 parsed_url.scheme)
1770 parsed_url.scheme)
1771
1771
1772 try:
1772 try:
1773 size = int(inpart.params['size'])
1773 size = int(inpart.params['size'])
1774 except ValueError:
1774 except ValueError:
1775 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1775 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1776 % 'size')
1776 % 'size')
1777 except KeyError:
1777 except KeyError:
1778 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1778 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1779
1779
1780 digests = {}
1780 digests = {}
1781 for typ in inpart.params.get('digests', '').split():
1781 for typ in inpart.params.get('digests', '').split():
1782 param = 'digest:%s' % typ
1782 param = 'digest:%s' % typ
1783 try:
1783 try:
1784 value = inpart.params[param]
1784 value = inpart.params[param]
1785 except KeyError:
1785 except KeyError:
1786 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1786 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1787 param)
1787 param)
1788 digests[typ] = value
1788 digests[typ] = value
1789
1789
1790 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1790 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1791
1791
1792 tr = op.gettransaction()
1792 tr = op.gettransaction()
1793 from . import exchange
1793 from . import exchange
1794 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1794 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1795 if not isinstance(cg, changegroup.cg1unpacker):
1795 if not isinstance(cg, changegroup.cg1unpacker):
1796 raise error.Abort(_('%s: not a bundle version 1.0') %
1796 raise error.Abort(_('%s: not a bundle version 1.0') %
1797 util.hidepassword(raw_url))
1797 util.hidepassword(raw_url))
1798 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2')
1798 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2')
1799 if op.reply is not None:
1799 if op.reply is not None:
1800 # This is definitely not the final form of this
1800 # This is definitely not the final form of this
1801 # return. But one need to start somewhere.
1801 # return. But one need to start somewhere.
1802 part = op.reply.newpart('reply:changegroup')
1802 part = op.reply.newpart('reply:changegroup')
1803 part.addparam(
1803 part.addparam(
1804 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1804 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1805 part.addparam('return', '%i' % ret, mandatory=False)
1805 part.addparam('return', '%i' % ret, mandatory=False)
1806 try:
1806 try:
1807 real_part.validate()
1807 real_part.validate()
1808 except error.Abort as e:
1808 except error.Abort as e:
1809 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1809 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1810 (util.hidepassword(raw_url), str(e)))
1810 (util.hidepassword(raw_url), str(e)))
1811 assert not inpart.read()
1811 assert not inpart.read()
1812
1812
1813 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1813 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1814 def handlereplychangegroup(op, inpart):
1814 def handlereplychangegroup(op, inpart):
1815 ret = int(inpart.params['return'])
1815 ret = int(inpart.params['return'])
1816 replyto = int(inpart.params['in-reply-to'])
1816 replyto = int(inpart.params['in-reply-to'])
1817 op.records.add('changegroup', {'return': ret}, replyto)
1817 op.records.add('changegroup', {'return': ret}, replyto)
1818
1818
1819 @parthandler('check:bookmarks')
1819 @parthandler('check:bookmarks')
1820 def handlecheckbookmarks(op, inpart):
1820 def handlecheckbookmarks(op, inpart):
1821 """check location of bookmarks
1821 """check location of bookmarks
1822
1822
1823 This part is to be used to detect push race regarding bookmark, it
1823 This part is to be used to detect push race regarding bookmark, it
1824 contains binary encoded (bookmark, node) tuple. If the local state does
1824 contains binary encoded (bookmark, node) tuple. If the local state does
1825 not marks the one in the part, a PushRaced exception is raised
1825 not marks the one in the part, a PushRaced exception is raised
1826 """
1826 """
1827 bookdata = bookmarks.binarydecode(inpart)
1827 bookdata = bookmarks.binarydecode(inpart)
1828
1828
1829 msgstandard = ('repository changed while pushing - please try again '
1829 msgstandard = ('repository changed while pushing - please try again '
1830 '(bookmark "%s" move from %s to %s)')
1830 '(bookmark "%s" move from %s to %s)')
1831 msgmissing = ('repository changed while pushing - please try again '
1831 msgmissing = ('repository changed while pushing - please try again '
1832 '(bookmark "%s" is missing, expected %s)')
1832 '(bookmark "%s" is missing, expected %s)')
1833 msgexist = ('repository changed while pushing - please try again '
1833 msgexist = ('repository changed while pushing - please try again '
1834 '(bookmark "%s" set on %s, expected missing)')
1834 '(bookmark "%s" set on %s, expected missing)')
1835 for book, node in bookdata:
1835 for book, node in bookdata:
1836 currentnode = op.repo._bookmarks.get(book)
1836 currentnode = op.repo._bookmarks.get(book)
1837 if currentnode != node:
1837 if currentnode != node:
1838 if node is None:
1838 if node is None:
1839 finalmsg = msgexist % (book, nodemod.short(currentnode))
1839 finalmsg = msgexist % (book, nodemod.short(currentnode))
1840 elif currentnode is None:
1840 elif currentnode is None:
1841 finalmsg = msgmissing % (book, nodemod.short(node))
1841 finalmsg = msgmissing % (book, nodemod.short(node))
1842 else:
1842 else:
1843 finalmsg = msgstandard % (book, nodemod.short(node),
1843 finalmsg = msgstandard % (book, nodemod.short(node),
1844 nodemod.short(currentnode))
1844 nodemod.short(currentnode))
1845 raise error.PushRaced(finalmsg)
1845 raise error.PushRaced(finalmsg)
1846
1846
1847 @parthandler('check:heads')
1847 @parthandler('check:heads')
1848 def handlecheckheads(op, inpart):
1848 def handlecheckheads(op, inpart):
1849 """check that head of the repo did not change
1849 """check that head of the repo did not change
1850
1850
1851 This is used to detect a push race when using unbundle.
1851 This is used to detect a push race when using unbundle.
1852 This replaces the "heads" argument of unbundle."""
1852 This replaces the "heads" argument of unbundle."""
1853 h = inpart.read(20)
1853 h = inpart.read(20)
1854 heads = []
1854 heads = []
1855 while len(h) == 20:
1855 while len(h) == 20:
1856 heads.append(h)
1856 heads.append(h)
1857 h = inpart.read(20)
1857 h = inpart.read(20)
1858 assert not h
1858 assert not h
1859 # Trigger a transaction so that we are guaranteed to have the lock now.
1859 # Trigger a transaction so that we are guaranteed to have the lock now.
1860 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1860 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1861 op.gettransaction()
1861 op.gettransaction()
1862 if sorted(heads) != sorted(op.repo.heads()):
1862 if sorted(heads) != sorted(op.repo.heads()):
1863 raise error.PushRaced('repository changed while pushing - '
1863 raise error.PushRaced('repository changed while pushing - '
1864 'please try again')
1864 'please try again')
1865
1865
1866 @parthandler('check:updated-heads')
1866 @parthandler('check:updated-heads')
1867 def handlecheckupdatedheads(op, inpart):
1867 def handlecheckupdatedheads(op, inpart):
1868 """check for race on the heads touched by a push
1868 """check for race on the heads touched by a push
1869
1869
1870 This is similar to 'check:heads' but focus on the heads actually updated
1870 This is similar to 'check:heads' but focus on the heads actually updated
1871 during the push. If other activities happen on unrelated heads, it is
1871 during the push. If other activities happen on unrelated heads, it is
1872 ignored.
1872 ignored.
1873
1873
1874 This allow server with high traffic to avoid push contention as long as
1874 This allow server with high traffic to avoid push contention as long as
1875 unrelated parts of the graph are involved."""
1875 unrelated parts of the graph are involved."""
1876 h = inpart.read(20)
1876 h = inpart.read(20)
1877 heads = []
1877 heads = []
1878 while len(h) == 20:
1878 while len(h) == 20:
1879 heads.append(h)
1879 heads.append(h)
1880 h = inpart.read(20)
1880 h = inpart.read(20)
1881 assert not h
1881 assert not h
1882 # trigger a transaction so that we are guaranteed to have the lock now.
1882 # trigger a transaction so that we are guaranteed to have the lock now.
1883 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1883 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1884 op.gettransaction()
1884 op.gettransaction()
1885
1885
1886 currentheads = set()
1886 currentheads = set()
1887 for ls in op.repo.branchmap().itervalues():
1887 for ls in op.repo.branchmap().itervalues():
1888 currentheads.update(ls)
1888 currentheads.update(ls)
1889
1889
1890 for h in heads:
1890 for h in heads:
1891 if h not in currentheads:
1891 if h not in currentheads:
1892 raise error.PushRaced('repository changed while pushing - '
1892 raise error.PushRaced('repository changed while pushing - '
1893 'please try again')
1893 'please try again')
1894
1894
1895 @parthandler('check:phases')
1895 @parthandler('check:phases')
1896 def handlecheckphases(op, inpart):
1896 def handlecheckphases(op, inpart):
1897 """check that phase boundaries of the repository did not change
1897 """check that phase boundaries of the repository did not change
1898
1898
1899 This is used to detect a push race.
1899 This is used to detect a push race.
1900 """
1900 """
1901 phasetonodes = phases.binarydecode(inpart)
1901 phasetonodes = phases.binarydecode(inpart)
1902 unfi = op.repo.unfiltered()
1902 unfi = op.repo.unfiltered()
1903 cl = unfi.changelog
1903 cl = unfi.changelog
1904 phasecache = unfi._phasecache
1904 phasecache = unfi._phasecache
1905 msg = ('repository changed while pushing - please try again '
1905 msg = ('repository changed while pushing - please try again '
1906 '(%s is %s expected %s)')
1906 '(%s is %s expected %s)')
1907 for expectedphase, nodes in enumerate(phasetonodes):
1907 for expectedphase, nodes in enumerate(phasetonodes):
1908 for n in nodes:
1908 for n in nodes:
1909 actualphase = phasecache.phase(unfi, cl.rev(n))
1909 actualphase = phasecache.phase(unfi, cl.rev(n))
1910 if actualphase != expectedphase:
1910 if actualphase != expectedphase:
1911 finalmsg = msg % (nodemod.short(n),
1911 finalmsg = msg % (nodemod.short(n),
1912 phases.phasenames[actualphase],
1912 phases.phasenames[actualphase],
1913 phases.phasenames[expectedphase])
1913 phases.phasenames[expectedphase])
1914 raise error.PushRaced(finalmsg)
1914 raise error.PushRaced(finalmsg)
1915
1915
1916 @parthandler('output')
1916 @parthandler('output')
1917 def handleoutput(op, inpart):
1917 def handleoutput(op, inpart):
1918 """forward output captured on the server to the client"""
1918 """forward output captured on the server to the client"""
1919 for line in inpart.read().splitlines():
1919 for line in inpart.read().splitlines():
1920 op.ui.status(_('remote: %s\n') % line)
1920 op.ui.status(_('remote: %s\n') % line)
1921
1921
1922 @parthandler('replycaps')
1922 @parthandler('replycaps')
1923 def handlereplycaps(op, inpart):
1923 def handlereplycaps(op, inpart):
1924 """Notify that a reply bundle should be created
1924 """Notify that a reply bundle should be created
1925
1925
1926 The payload contains the capabilities information for the reply"""
1926 The payload contains the capabilities information for the reply"""
1927 caps = decodecaps(inpart.read())
1927 caps = decodecaps(inpart.read())
1928 if op.reply is None:
1928 if op.reply is None:
1929 op.reply = bundle20(op.ui, caps)
1929 op.reply = bundle20(op.ui, caps)
1930
1930
1931 class AbortFromPart(error.Abort):
1931 class AbortFromPart(error.Abort):
1932 """Sub-class of Abort that denotes an error from a bundle2 part."""
1932 """Sub-class of Abort that denotes an error from a bundle2 part."""
1933
1933
1934 @parthandler('error:abort', ('message', 'hint'))
1934 @parthandler('error:abort', ('message', 'hint'))
1935 def handleerrorabort(op, inpart):
1935 def handleerrorabort(op, inpart):
1936 """Used to transmit abort error over the wire"""
1936 """Used to transmit abort error over the wire"""
1937 raise AbortFromPart(inpart.params['message'],
1937 raise AbortFromPart(inpart.params['message'],
1938 hint=inpart.params.get('hint'))
1938 hint=inpart.params.get('hint'))
1939
1939
1940 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1940 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1941 'in-reply-to'))
1941 'in-reply-to'))
1942 def handleerrorpushkey(op, inpart):
1942 def handleerrorpushkey(op, inpart):
1943 """Used to transmit failure of a mandatory pushkey over the wire"""
1943 """Used to transmit failure of a mandatory pushkey over the wire"""
1944 kwargs = {}
1944 kwargs = {}
1945 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1945 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1946 value = inpart.params.get(name)
1946 value = inpart.params.get(name)
1947 if value is not None:
1947 if value is not None:
1948 kwargs[name] = value
1948 kwargs[name] = value
1949 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1949 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1950
1950
1951 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1951 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1952 def handleerrorunsupportedcontent(op, inpart):
1952 def handleerrorunsupportedcontent(op, inpart):
1953 """Used to transmit unknown content error over the wire"""
1953 """Used to transmit unknown content error over the wire"""
1954 kwargs = {}
1954 kwargs = {}
1955 parttype = inpart.params.get('parttype')
1955 parttype = inpart.params.get('parttype')
1956 if parttype is not None:
1956 if parttype is not None:
1957 kwargs['parttype'] = parttype
1957 kwargs['parttype'] = parttype
1958 params = inpart.params.get('params')
1958 params = inpart.params.get('params')
1959 if params is not None:
1959 if params is not None:
1960 kwargs['params'] = params.split('\0')
1960 kwargs['params'] = params.split('\0')
1961
1961
1962 raise error.BundleUnknownFeatureError(**kwargs)
1962 raise error.BundleUnknownFeatureError(**kwargs)
1963
1963
1964 @parthandler('error:pushraced', ('message',))
1964 @parthandler('error:pushraced', ('message',))
1965 def handleerrorpushraced(op, inpart):
1965 def handleerrorpushraced(op, inpart):
1966 """Used to transmit push race error over the wire"""
1966 """Used to transmit push race error over the wire"""
1967 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1967 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1968
1968
1969 @parthandler('listkeys', ('namespace',))
1969 @parthandler('listkeys', ('namespace',))
1970 def handlelistkeys(op, inpart):
1970 def handlelistkeys(op, inpart):
1971 """retrieve pushkey namespace content stored in a bundle2"""
1971 """retrieve pushkey namespace content stored in a bundle2"""
1972 namespace = inpart.params['namespace']
1972 namespace = inpart.params['namespace']
1973 r = pushkey.decodekeys(inpart.read())
1973 r = pushkey.decodekeys(inpart.read())
1974 op.records.add('listkeys', (namespace, r))
1974 op.records.add('listkeys', (namespace, r))
1975
1975
1976 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1976 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1977 def handlepushkey(op, inpart):
1977 def handlepushkey(op, inpart):
1978 """process a pushkey request"""
1978 """process a pushkey request"""
1979 dec = pushkey.decode
1979 dec = pushkey.decode
1980 namespace = dec(inpart.params['namespace'])
1980 namespace = dec(inpart.params['namespace'])
1981 key = dec(inpart.params['key'])
1981 key = dec(inpart.params['key'])
1982 old = dec(inpart.params['old'])
1982 old = dec(inpart.params['old'])
1983 new = dec(inpart.params['new'])
1983 new = dec(inpart.params['new'])
1984 # Grab the transaction to ensure that we have the lock before performing the
1984 # Grab the transaction to ensure that we have the lock before performing the
1985 # pushkey.
1985 # pushkey.
1986 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1986 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1987 op.gettransaction()
1987 op.gettransaction()
1988 ret = op.repo.pushkey(namespace, key, old, new)
1988 ret = op.repo.pushkey(namespace, key, old, new)
1989 record = {'namespace': namespace,
1989 record = {'namespace': namespace,
1990 'key': key,
1990 'key': key,
1991 'old': old,
1991 'old': old,
1992 'new': new}
1992 'new': new}
1993 op.records.add('pushkey', record)
1993 op.records.add('pushkey', record)
1994 if op.reply is not None:
1994 if op.reply is not None:
1995 rpart = op.reply.newpart('reply:pushkey')
1995 rpart = op.reply.newpart('reply:pushkey')
1996 rpart.addparam(
1996 rpart.addparam(
1997 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1997 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1998 rpart.addparam('return', '%i' % ret, mandatory=False)
1998 rpart.addparam('return', '%i' % ret, mandatory=False)
1999 if inpart.mandatory and not ret:
1999 if inpart.mandatory and not ret:
2000 kwargs = {}
2000 kwargs = {}
2001 for key in ('namespace', 'key', 'new', 'old', 'ret'):
2001 for key in ('namespace', 'key', 'new', 'old', 'ret'):
2002 if key in inpart.params:
2002 if key in inpart.params:
2003 kwargs[key] = inpart.params[key]
2003 kwargs[key] = inpart.params[key]
2004 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
2004 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
2005
2005
2006 @parthandler('bookmarks')
2006 @parthandler('bookmarks')
2007 def handlebookmark(op, inpart):
2007 def handlebookmark(op, inpart):
2008 """transmit bookmark information
2008 """transmit bookmark information
2009
2009
2010 The part contains binary encoded bookmark information.
2010 The part contains binary encoded bookmark information.
2011
2011
2012 The exact behavior of this part can be controlled by the 'bookmarks' mode
2012 The exact behavior of this part can be controlled by the 'bookmarks' mode
2013 on the bundle operation.
2013 on the bundle operation.
2014
2014
2015 When mode is 'apply' (the default) the bookmark information is applied as
2015 When mode is 'apply' (the default) the bookmark information is applied as
2016 is to the unbundling repository. Make sure a 'check:bookmarks' part is
2016 is to the unbundling repository. Make sure a 'check:bookmarks' part is
2017 issued earlier to check for push races in such update. This behavior is
2017 issued earlier to check for push races in such update. This behavior is
2018 suitable for pushing.
2018 suitable for pushing.
2019
2019
2020 When mode is 'records', the information is recorded into the 'bookmarks'
2020 When mode is 'records', the information is recorded into the 'bookmarks'
2021 records of the bundle operation. This behavior is suitable for pulling.
2021 records of the bundle operation. This behavior is suitable for pulling.
2022 """
2022 """
2023 changes = bookmarks.binarydecode(inpart)
2023 changes = bookmarks.binarydecode(inpart)
2024
2024
2025 pushkeycompat = op.repo.ui.configbool('server', 'bookmarks-pushkey-compat')
2025 pushkeycompat = op.repo.ui.configbool('server', 'bookmarks-pushkey-compat')
2026 bookmarksmode = op.modes.get('bookmarks', 'apply')
2026 bookmarksmode = op.modes.get('bookmarks', 'apply')
2027
2027
2028 if bookmarksmode == 'apply':
2028 if bookmarksmode == 'apply':
2029 tr = op.gettransaction()
2029 tr = op.gettransaction()
2030 bookstore = op.repo._bookmarks
2030 bookstore = op.repo._bookmarks
2031 if pushkeycompat:
2031 if pushkeycompat:
2032 allhooks = []
2032 allhooks = []
2033 for book, node in changes:
2033 for book, node in changes:
2034 hookargs = tr.hookargs.copy()
2034 hookargs = tr.hookargs.copy()
2035 hookargs['pushkeycompat'] = '1'
2035 hookargs['pushkeycompat'] = '1'
2036 hookargs['namespace'] = 'bookmark'
2036 hookargs['namespace'] = 'bookmark'
2037 hookargs['key'] = book
2037 hookargs['key'] = book
2038 hookargs['old'] = nodemod.hex(bookstore.get(book, ''))
2038 hookargs['old'] = nodemod.hex(bookstore.get(book, ''))
2039 hookargs['new'] = nodemod.hex(node if node is not None else '')
2039 hookargs['new'] = nodemod.hex(node if node is not None else '')
2040 allhooks.append(hookargs)
2040 allhooks.append(hookargs)
2041
2041
2042 for hookargs in allhooks:
2042 for hookargs in allhooks:
2043 op.repo.hook('prepushkey', throw=True, **hookargs)
2043 op.repo.hook('prepushkey', throw=True, **hookargs)
2044
2044
2045 bookstore.applychanges(op.repo, op.gettransaction(), changes)
2045 bookstore.applychanges(op.repo, op.gettransaction(), changes)
2046
2046
2047 if pushkeycompat:
2047 if pushkeycompat:
2048 def runhook():
2048 def runhook():
2049 for hookargs in allhooks:
2049 for hookargs in allhooks:
2050 op.repo.hook('pushkey', **hookargs)
2050 op.repo.hook('pushkey', **hookargs)
2051 op.repo._afterlock(runhook)
2051 op.repo._afterlock(runhook)
2052
2052
2053 elif bookmarksmode == 'records':
2053 elif bookmarksmode == 'records':
2054 for book, node in changes:
2054 for book, node in changes:
2055 record = {'bookmark': book, 'node': node}
2055 record = {'bookmark': book, 'node': node}
2056 op.records.add('bookmarks', record)
2056 op.records.add('bookmarks', record)
2057 else:
2057 else:
2058 raise error.ProgrammingError('unkown bookmark mode: %s' % bookmarksmode)
2058 raise error.ProgrammingError('unkown bookmark mode: %s' % bookmarksmode)
2059
2059
2060 @parthandler('phase-heads')
2060 @parthandler('phase-heads')
2061 def handlephases(op, inpart):
2061 def handlephases(op, inpart):
2062 """apply phases from bundle part to repo"""
2062 """apply phases from bundle part to repo"""
2063 headsbyphase = phases.binarydecode(inpart)
2063 headsbyphase = phases.binarydecode(inpart)
2064 phases.updatephases(op.repo.unfiltered(), op.gettransaction, headsbyphase)
2064 phases.updatephases(op.repo.unfiltered(), op.gettransaction, headsbyphase)
2065
2065
2066 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
2066 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
2067 def handlepushkeyreply(op, inpart):
2067 def handlepushkeyreply(op, inpart):
2068 """retrieve the result of a pushkey request"""
2068 """retrieve the result of a pushkey request"""
2069 ret = int(inpart.params['return'])
2069 ret = int(inpart.params['return'])
2070 partid = int(inpart.params['in-reply-to'])
2070 partid = int(inpart.params['in-reply-to'])
2071 op.records.add('pushkey', {'return': ret}, partid)
2071 op.records.add('pushkey', {'return': ret}, partid)
2072
2072
2073 @parthandler('obsmarkers')
2073 @parthandler('obsmarkers')
2074 def handleobsmarker(op, inpart):
2074 def handleobsmarker(op, inpart):
2075 """add a stream of obsmarkers to the repo"""
2075 """add a stream of obsmarkers to the repo"""
2076 tr = op.gettransaction()
2076 tr = op.gettransaction()
2077 markerdata = inpart.read()
2077 markerdata = inpart.read()
2078 if op.ui.config('experimental', 'obsmarkers-exchange-debug'):
2078 if op.ui.config('experimental', 'obsmarkers-exchange-debug'):
2079 op.ui.write(('obsmarker-exchange: %i bytes received\n')
2079 op.ui.write(('obsmarker-exchange: %i bytes received\n')
2080 % len(markerdata))
2080 % len(markerdata))
2081 # The mergemarkers call will crash if marker creation is not enabled.
2081 # The mergemarkers call will crash if marker creation is not enabled.
2082 # we want to avoid this if the part is advisory.
2082 # we want to avoid this if the part is advisory.
2083 if not inpart.mandatory and op.repo.obsstore.readonly:
2083 if not inpart.mandatory and op.repo.obsstore.readonly:
2084 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled\n')
2084 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled\n')
2085 return
2085 return
2086 new = op.repo.obsstore.mergemarkers(tr, markerdata)
2086 new = op.repo.obsstore.mergemarkers(tr, markerdata)
2087 op.repo.invalidatevolatilesets()
2087 op.repo.invalidatevolatilesets()
2088 if new:
2088 if new:
2089 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
2089 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
2090 op.records.add('obsmarkers', {'new': new})
2090 op.records.add('obsmarkers', {'new': new})
2091 if op.reply is not None:
2091 if op.reply is not None:
2092 rpart = op.reply.newpart('reply:obsmarkers')
2092 rpart = op.reply.newpart('reply:obsmarkers')
2093 rpart.addparam(
2093 rpart.addparam(
2094 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
2094 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
2095 rpart.addparam('new', '%i' % new, mandatory=False)
2095 rpart.addparam('new', '%i' % new, mandatory=False)
2096
2096
2097
2097
2098 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
2098 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
2099 def handleobsmarkerreply(op, inpart):
2099 def handleobsmarkerreply(op, inpart):
2100 """retrieve the result of a pushkey request"""
2100 """retrieve the result of a pushkey request"""
2101 ret = int(inpart.params['new'])
2101 ret = int(inpart.params['new'])
2102 partid = int(inpart.params['in-reply-to'])
2102 partid = int(inpart.params['in-reply-to'])
2103 op.records.add('obsmarkers', {'new': ret}, partid)
2103 op.records.add('obsmarkers', {'new': ret}, partid)
2104
2104
2105 @parthandler('hgtagsfnodes')
2105 @parthandler('hgtagsfnodes')
2106 def handlehgtagsfnodes(op, inpart):
2106 def handlehgtagsfnodes(op, inpart):
2107 """Applies .hgtags fnodes cache entries to the local repo.
2107 """Applies .hgtags fnodes cache entries to the local repo.
2108
2108
2109 Payload is pairs of 20 byte changeset nodes and filenodes.
2109 Payload is pairs of 20 byte changeset nodes and filenodes.
2110 """
2110 """
2111 # Grab the transaction so we ensure that we have the lock at this point.
2111 # Grab the transaction so we ensure that we have the lock at this point.
2112 if op.ui.configbool('experimental', 'bundle2lazylocking'):
2112 if op.ui.configbool('experimental', 'bundle2lazylocking'):
2113 op.gettransaction()
2113 op.gettransaction()
2114 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
2114 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
2115
2115
2116 count = 0
2116 count = 0
2117 while True:
2117 while True:
2118 node = inpart.read(20)
2118 node = inpart.read(20)
2119 fnode = inpart.read(20)
2119 fnode = inpart.read(20)
2120 if len(node) < 20 or len(fnode) < 20:
2120 if len(node) < 20 or len(fnode) < 20:
2121 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
2121 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
2122 break
2122 break
2123 cache.setfnode(node, fnode)
2123 cache.setfnode(node, fnode)
2124 count += 1
2124 count += 1
2125
2125
2126 cache.write()
2126 cache.write()
2127 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
2127 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
2128
2128
2129 @parthandler('pushvars')
2129 @parthandler('pushvars')
2130 def bundle2getvars(op, part):
2130 def bundle2getvars(op, part):
2131 '''unbundle a bundle2 containing shellvars on the server'''
2131 '''unbundle a bundle2 containing shellvars on the server'''
2132 # An option to disable unbundling on server-side for security reasons
2132 # An option to disable unbundling on server-side for security reasons
2133 if op.ui.configbool('push', 'pushvars.server'):
2133 if op.ui.configbool('push', 'pushvars.server'):
2134 hookargs = {}
2134 hookargs = {}
2135 for key, value in part.advisoryparams:
2135 for key, value in part.advisoryparams:
2136 key = key.upper()
2136 key = key.upper()
2137 # We want pushed variables to have USERVAR_ prepended so we know
2137 # We want pushed variables to have USERVAR_ prepended so we know
2138 # they came from the --pushvar flag.
2138 # they came from the --pushvar flag.
2139 key = "USERVAR_" + key
2139 key = "USERVAR_" + key
2140 hookargs[key] = value
2140 hookargs[key] = value
2141 op.addhookargs(hookargs)
2141 op.addhookargs(hookargs)
2142
2142
2143 @parthandler('stream2', ('requirements', 'filecount', 'bytecount'))
2143 @parthandler('stream2', ('requirements', 'filecount', 'bytecount'))
2144 def handlestreamv2bundle(op, part):
2144 def handlestreamv2bundle(op, part):
2145
2145
2146 requirements = part.params['requirements'].split()
2146 requirements = part.params['requirements'].split()
2147 filecount = int(part.params['filecount'])
2147 filecount = int(part.params['filecount'])
2148 bytecount = int(part.params['bytecount'])
2148 bytecount = int(part.params['bytecount'])
2149
2149
2150 repo = op.repo
2150 repo = op.repo
2151 if len(repo):
2151 if len(repo):
2152 msg = _('cannot apply stream clone to non empty repository')
2152 msg = _('cannot apply stream clone to non empty repository')
2153 raise error.Abort(msg)
2153 raise error.Abort(msg)
2154
2154
2155 repo.ui.debug('applying stream bundle\n')
2155 repo.ui.debug('applying stream bundle\n')
2156 streamclone.applybundlev2(repo, part, filecount, bytecount,
2156 streamclone.applybundlev2(repo, part, filecount, bytecount,
2157 requirements)
2157 requirements)
2158
2159 # new requirements = old non-format requirements +
2160 # new format-related remote requirements
2161 # requirements from the streamed-in repository
2162 repo.requirements = set(requirements) | (
2163 repo.requirements - repo.supportedformats)
2164 repo._applyopenerreqs()
2165 repo._writerequirements()
@@ -1,634 +1,642 b''
1 # streamclone.py - producing and consuming streaming repository data
1 # streamclone.py - producing and consuming streaming repository data
2 #
2 #
3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import contextlib
10 import contextlib
11 import os
11 import os
12 import struct
12 import struct
13 import tempfile
13 import tempfile
14 import warnings
14 import warnings
15
15
16 from .i18n import _
16 from .i18n import _
17 from . import (
17 from . import (
18 branchmap,
18 branchmap,
19 cacheutil,
19 cacheutil,
20 error,
20 error,
21 phases,
21 phases,
22 store,
22 store,
23 util,
23 util,
24 )
24 )
25
25
26 def canperformstreamclone(pullop, bundle2=False):
26 def canperformstreamclone(pullop, bundle2=False):
27 """Whether it is possible to perform a streaming clone as part of pull.
27 """Whether it is possible to perform a streaming clone as part of pull.
28
28
29 ``bundle2`` will cause the function to consider stream clone through
29 ``bundle2`` will cause the function to consider stream clone through
30 bundle2 and only through bundle2.
30 bundle2 and only through bundle2.
31
31
32 Returns a tuple of (supported, requirements). ``supported`` is True if
32 Returns a tuple of (supported, requirements). ``supported`` is True if
33 streaming clone is supported and False otherwise. ``requirements`` is
33 streaming clone is supported and False otherwise. ``requirements`` is
34 a set of repo requirements from the remote, or ``None`` if stream clone
34 a set of repo requirements from the remote, or ``None`` if stream clone
35 isn't supported.
35 isn't supported.
36 """
36 """
37 repo = pullop.repo
37 repo = pullop.repo
38 remote = pullop.remote
38 remote = pullop.remote
39
39
40 bundle2supported = False
40 bundle2supported = False
41 if pullop.canusebundle2:
41 if pullop.canusebundle2:
42 if 'v2' in pullop.remotebundle2caps.get('stream', []):
42 if 'v2' in pullop.remotebundle2caps.get('stream', []):
43 bundle2supported = True
43 bundle2supported = True
44 # else
44 # else
45 # Server doesn't support bundle2 stream clone or doesn't support
45 # Server doesn't support bundle2 stream clone or doesn't support
46 # the versions we support. Fall back and possibly allow legacy.
46 # the versions we support. Fall back and possibly allow legacy.
47
47
48 # Ensures legacy code path uses available bundle2.
48 # Ensures legacy code path uses available bundle2.
49 if bundle2supported and not bundle2:
49 if bundle2supported and not bundle2:
50 return False, None
50 return False, None
51 # Ensures bundle2 doesn't try to do a stream clone if it isn't supported.
51 # Ensures bundle2 doesn't try to do a stream clone if it isn't supported.
52 elif bundle2 and not bundle2supported:
52 elif bundle2 and not bundle2supported:
53 return False, None
53 return False, None
54
54
55 # Streaming clone only works on empty repositories.
55 # Streaming clone only works on empty repositories.
56 if len(repo):
56 if len(repo):
57 return False, None
57 return False, None
58
58
59 # Streaming clone only works if all data is being requested.
59 # Streaming clone only works if all data is being requested.
60 if pullop.heads:
60 if pullop.heads:
61 return False, None
61 return False, None
62
62
63 streamrequested = pullop.streamclonerequested
63 streamrequested = pullop.streamclonerequested
64
64
65 # If we don't have a preference, let the server decide for us. This
65 # If we don't have a preference, let the server decide for us. This
66 # likely only comes into play in LANs.
66 # likely only comes into play in LANs.
67 if streamrequested is None:
67 if streamrequested is None:
68 # The server can advertise whether to prefer streaming clone.
68 # The server can advertise whether to prefer streaming clone.
69 streamrequested = remote.capable('stream-preferred')
69 streamrequested = remote.capable('stream-preferred')
70
70
71 if not streamrequested:
71 if not streamrequested:
72 return False, None
72 return False, None
73
73
74 # In order for stream clone to work, the client has to support all the
74 # In order for stream clone to work, the client has to support all the
75 # requirements advertised by the server.
75 # requirements advertised by the server.
76 #
76 #
77 # The server advertises its requirements via the "stream" and "streamreqs"
77 # The server advertises its requirements via the "stream" and "streamreqs"
78 # capability. "stream" (a value-less capability) is advertised if and only
78 # capability. "stream" (a value-less capability) is advertised if and only
79 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
79 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
80 # is advertised and contains a comma-delimited list of requirements.
80 # is advertised and contains a comma-delimited list of requirements.
81 requirements = set()
81 requirements = set()
82 if remote.capable('stream'):
82 if remote.capable('stream'):
83 requirements.add('revlogv1')
83 requirements.add('revlogv1')
84 else:
84 else:
85 streamreqs = remote.capable('streamreqs')
85 streamreqs = remote.capable('streamreqs')
86 # This is weird and shouldn't happen with modern servers.
86 # This is weird and shouldn't happen with modern servers.
87 if not streamreqs:
87 if not streamreqs:
88 pullop.repo.ui.warn(_(
88 pullop.repo.ui.warn(_(
89 'warning: stream clone requested but server has them '
89 'warning: stream clone requested but server has them '
90 'disabled\n'))
90 'disabled\n'))
91 return False, None
91 return False, None
92
92
93 streamreqs = set(streamreqs.split(','))
93 streamreqs = set(streamreqs.split(','))
94 # Server requires something we don't support. Bail.
94 # Server requires something we don't support. Bail.
95 missingreqs = streamreqs - repo.supportedformats
95 missingreqs = streamreqs - repo.supportedformats
96 if missingreqs:
96 if missingreqs:
97 pullop.repo.ui.warn(_(
97 pullop.repo.ui.warn(_(
98 'warning: stream clone requested but client is missing '
98 'warning: stream clone requested but client is missing '
99 'requirements: %s\n') % ', '.join(sorted(missingreqs)))
99 'requirements: %s\n') % ', '.join(sorted(missingreqs)))
100 pullop.repo.ui.warn(
100 pullop.repo.ui.warn(
101 _('(see https://www.mercurial-scm.org/wiki/MissingRequirement '
101 _('(see https://www.mercurial-scm.org/wiki/MissingRequirement '
102 'for more information)\n'))
102 'for more information)\n'))
103 return False, None
103 return False, None
104 requirements = streamreqs
104 requirements = streamreqs
105
105
106 return True, requirements
106 return True, requirements
107
107
108 def maybeperformlegacystreamclone(pullop):
108 def maybeperformlegacystreamclone(pullop):
109 """Possibly perform a legacy stream clone operation.
109 """Possibly perform a legacy stream clone operation.
110
110
111 Legacy stream clones are performed as part of pull but before all other
111 Legacy stream clones are performed as part of pull but before all other
112 operations.
112 operations.
113
113
114 A legacy stream clone will not be performed if a bundle2 stream clone is
114 A legacy stream clone will not be performed if a bundle2 stream clone is
115 supported.
115 supported.
116 """
116 """
117 supported, requirements = canperformstreamclone(pullop)
117 supported, requirements = canperformstreamclone(pullop)
118
118
119 if not supported:
119 if not supported:
120 return
120 return
121
121
122 repo = pullop.repo
122 repo = pullop.repo
123 remote = pullop.remote
123 remote = pullop.remote
124
124
125 # Save remote branchmap. We will use it later to speed up branchcache
125 # Save remote branchmap. We will use it later to speed up branchcache
126 # creation.
126 # creation.
127 rbranchmap = None
127 rbranchmap = None
128 if remote.capable('branchmap'):
128 if remote.capable('branchmap'):
129 rbranchmap = remote.branchmap()
129 rbranchmap = remote.branchmap()
130
130
131 repo.ui.status(_('streaming all changes\n'))
131 repo.ui.status(_('streaming all changes\n'))
132
132
133 fp = remote.stream_out()
133 fp = remote.stream_out()
134 l = fp.readline()
134 l = fp.readline()
135 try:
135 try:
136 resp = int(l)
136 resp = int(l)
137 except ValueError:
137 except ValueError:
138 raise error.ResponseError(
138 raise error.ResponseError(
139 _('unexpected response from remote server:'), l)
139 _('unexpected response from remote server:'), l)
140 if resp == 1:
140 if resp == 1:
141 raise error.Abort(_('operation forbidden by server'))
141 raise error.Abort(_('operation forbidden by server'))
142 elif resp == 2:
142 elif resp == 2:
143 raise error.Abort(_('locking the remote repository failed'))
143 raise error.Abort(_('locking the remote repository failed'))
144 elif resp != 0:
144 elif resp != 0:
145 raise error.Abort(_('the server sent an unknown error code'))
145 raise error.Abort(_('the server sent an unknown error code'))
146
146
147 l = fp.readline()
147 l = fp.readline()
148 try:
148 try:
149 filecount, bytecount = map(int, l.split(' ', 1))
149 filecount, bytecount = map(int, l.split(' ', 1))
150 except (ValueError, TypeError):
150 except (ValueError, TypeError):
151 raise error.ResponseError(
151 raise error.ResponseError(
152 _('unexpected response from remote server:'), l)
152 _('unexpected response from remote server:'), l)
153
153
154 with repo.lock():
154 with repo.lock():
155 consumev1(repo, fp, filecount, bytecount)
155 consumev1(repo, fp, filecount, bytecount)
156
156
157 # new requirements = old non-format requirements +
157 # new requirements = old non-format requirements +
158 # new format-related remote requirements
158 # new format-related remote requirements
159 # requirements from the streamed-in repository
159 # requirements from the streamed-in repository
160 repo.requirements = requirements | (
160 repo.requirements = requirements | (
161 repo.requirements - repo.supportedformats)
161 repo.requirements - repo.supportedformats)
162 repo._applyopenerreqs()
162 repo._applyopenerreqs()
163 repo._writerequirements()
163 repo._writerequirements()
164
164
165 if rbranchmap:
165 if rbranchmap:
166 branchmap.replacecache(repo, rbranchmap)
166 branchmap.replacecache(repo, rbranchmap)
167
167
168 repo.invalidate()
168 repo.invalidate()
169
169
170 def allowservergeneration(repo):
170 def allowservergeneration(repo):
171 """Whether streaming clones are allowed from the server."""
171 """Whether streaming clones are allowed from the server."""
172 if not repo.ui.configbool('server', 'uncompressed', untrusted=True):
172 if not repo.ui.configbool('server', 'uncompressed', untrusted=True):
173 return False
173 return False
174
174
175 # The way stream clone works makes it impossible to hide secret changesets.
175 # The way stream clone works makes it impossible to hide secret changesets.
176 # So don't allow this by default.
176 # So don't allow this by default.
177 secret = phases.hassecret(repo)
177 secret = phases.hassecret(repo)
178 if secret:
178 if secret:
179 return repo.ui.configbool('server', 'uncompressedallowsecret')
179 return repo.ui.configbool('server', 'uncompressedallowsecret')
180
180
181 return True
181 return True
182
182
183 # This is it's own function so extensions can override it.
183 # This is it's own function so extensions can override it.
184 def _walkstreamfiles(repo):
184 def _walkstreamfiles(repo):
185 return repo.store.walk()
185 return repo.store.walk()
186
186
187 def generatev1(repo):
187 def generatev1(repo):
188 """Emit content for version 1 of a streaming clone.
188 """Emit content for version 1 of a streaming clone.
189
189
190 This returns a 3-tuple of (file count, byte size, data iterator).
190 This returns a 3-tuple of (file count, byte size, data iterator).
191
191
192 The data iterator consists of N entries for each file being transferred.
192 The data iterator consists of N entries for each file being transferred.
193 Each file entry starts as a line with the file name and integer size
193 Each file entry starts as a line with the file name and integer size
194 delimited by a null byte.
194 delimited by a null byte.
195
195
196 The raw file data follows. Following the raw file data is the next file
196 The raw file data follows. Following the raw file data is the next file
197 entry, or EOF.
197 entry, or EOF.
198
198
199 When used on the wire protocol, an additional line indicating protocol
199 When used on the wire protocol, an additional line indicating protocol
200 success will be prepended to the stream. This function is not responsible
200 success will be prepended to the stream. This function is not responsible
201 for adding it.
201 for adding it.
202
202
203 This function will obtain a repository lock to ensure a consistent view of
203 This function will obtain a repository lock to ensure a consistent view of
204 the store is captured. It therefore may raise LockError.
204 the store is captured. It therefore may raise LockError.
205 """
205 """
206 entries = []
206 entries = []
207 total_bytes = 0
207 total_bytes = 0
208 # Get consistent snapshot of repo, lock during scan.
208 # Get consistent snapshot of repo, lock during scan.
209 with repo.lock():
209 with repo.lock():
210 repo.ui.debug('scanning\n')
210 repo.ui.debug('scanning\n')
211 for name, ename, size in _walkstreamfiles(repo):
211 for name, ename, size in _walkstreamfiles(repo):
212 if size:
212 if size:
213 entries.append((name, size))
213 entries.append((name, size))
214 total_bytes += size
214 total_bytes += size
215
215
216 repo.ui.debug('%d files, %d bytes to transfer\n' %
216 repo.ui.debug('%d files, %d bytes to transfer\n' %
217 (len(entries), total_bytes))
217 (len(entries), total_bytes))
218
218
219 svfs = repo.svfs
219 svfs = repo.svfs
220 debugflag = repo.ui.debugflag
220 debugflag = repo.ui.debugflag
221
221
222 def emitrevlogdata():
222 def emitrevlogdata():
223 for name, size in entries:
223 for name, size in entries:
224 if debugflag:
224 if debugflag:
225 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
225 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
226 # partially encode name over the wire for backwards compat
226 # partially encode name over the wire for backwards compat
227 yield '%s\0%d\n' % (store.encodedir(name), size)
227 yield '%s\0%d\n' % (store.encodedir(name), size)
228 # auditing at this stage is both pointless (paths are already
228 # auditing at this stage is both pointless (paths are already
229 # trusted by the local repo) and expensive
229 # trusted by the local repo) and expensive
230 with svfs(name, 'rb', auditpath=False) as fp:
230 with svfs(name, 'rb', auditpath=False) as fp:
231 if size <= 65536:
231 if size <= 65536:
232 yield fp.read(size)
232 yield fp.read(size)
233 else:
233 else:
234 for chunk in util.filechunkiter(fp, limit=size):
234 for chunk in util.filechunkiter(fp, limit=size):
235 yield chunk
235 yield chunk
236
236
237 return len(entries), total_bytes, emitrevlogdata()
237 return len(entries), total_bytes, emitrevlogdata()
238
238
239 def generatev1wireproto(repo):
239 def generatev1wireproto(repo):
240 """Emit content for version 1 of streaming clone suitable for the wire.
240 """Emit content for version 1 of streaming clone suitable for the wire.
241
241
242 This is the data output from ``generatev1()`` with 2 header lines. The
242 This is the data output from ``generatev1()`` with 2 header lines. The
243 first line indicates overall success. The 2nd contains the file count and
243 first line indicates overall success. The 2nd contains the file count and
244 byte size of payload.
244 byte size of payload.
245
245
246 The success line contains "0" for success, "1" for stream generation not
246 The success line contains "0" for success, "1" for stream generation not
247 allowed, and "2" for error locking the repository (possibly indicating
247 allowed, and "2" for error locking the repository (possibly indicating
248 a permissions error for the server process).
248 a permissions error for the server process).
249 """
249 """
250 if not allowservergeneration(repo):
250 if not allowservergeneration(repo):
251 yield '1\n'
251 yield '1\n'
252 return
252 return
253
253
254 try:
254 try:
255 filecount, bytecount, it = generatev1(repo)
255 filecount, bytecount, it = generatev1(repo)
256 except error.LockError:
256 except error.LockError:
257 yield '2\n'
257 yield '2\n'
258 return
258 return
259
259
260 # Indicates successful response.
260 # Indicates successful response.
261 yield '0\n'
261 yield '0\n'
262 yield '%d %d\n' % (filecount, bytecount)
262 yield '%d %d\n' % (filecount, bytecount)
263 for chunk in it:
263 for chunk in it:
264 yield chunk
264 yield chunk
265
265
266 def generatebundlev1(repo, compression='UN'):
266 def generatebundlev1(repo, compression='UN'):
267 """Emit content for version 1 of a stream clone bundle.
267 """Emit content for version 1 of a stream clone bundle.
268
268
269 The first 4 bytes of the output ("HGS1") denote this as stream clone
269 The first 4 bytes of the output ("HGS1") denote this as stream clone
270 bundle version 1.
270 bundle version 1.
271
271
272 The next 2 bytes indicate the compression type. Only "UN" is currently
272 The next 2 bytes indicate the compression type. Only "UN" is currently
273 supported.
273 supported.
274
274
275 The next 16 bytes are two 64-bit big endian unsigned integers indicating
275 The next 16 bytes are two 64-bit big endian unsigned integers indicating
276 file count and byte count, respectively.
276 file count and byte count, respectively.
277
277
278 The next 2 bytes is a 16-bit big endian unsigned short declaring the length
278 The next 2 bytes is a 16-bit big endian unsigned short declaring the length
279 of the requirements string, including a trailing \0. The following N bytes
279 of the requirements string, including a trailing \0. The following N bytes
280 are the requirements string, which is ASCII containing a comma-delimited
280 are the requirements string, which is ASCII containing a comma-delimited
281 list of repo requirements that are needed to support the data.
281 list of repo requirements that are needed to support the data.
282
282
283 The remaining content is the output of ``generatev1()`` (which may be
283 The remaining content is the output of ``generatev1()`` (which may be
284 compressed in the future).
284 compressed in the future).
285
285
286 Returns a tuple of (requirements, data generator).
286 Returns a tuple of (requirements, data generator).
287 """
287 """
288 if compression != 'UN':
288 if compression != 'UN':
289 raise ValueError('we do not support the compression argument yet')
289 raise ValueError('we do not support the compression argument yet')
290
290
291 requirements = repo.requirements & repo.supportedformats
291 requirements = repo.requirements & repo.supportedformats
292 requires = ','.join(sorted(requirements))
292 requires = ','.join(sorted(requirements))
293
293
294 def gen():
294 def gen():
295 yield 'HGS1'
295 yield 'HGS1'
296 yield compression
296 yield compression
297
297
298 filecount, bytecount, it = generatev1(repo)
298 filecount, bytecount, it = generatev1(repo)
299 repo.ui.status(_('writing %d bytes for %d files\n') %
299 repo.ui.status(_('writing %d bytes for %d files\n') %
300 (bytecount, filecount))
300 (bytecount, filecount))
301
301
302 yield struct.pack('>QQ', filecount, bytecount)
302 yield struct.pack('>QQ', filecount, bytecount)
303 yield struct.pack('>H', len(requires) + 1)
303 yield struct.pack('>H', len(requires) + 1)
304 yield requires + '\0'
304 yield requires + '\0'
305
305
306 # This is where we'll add compression in the future.
306 # This is where we'll add compression in the future.
307 assert compression == 'UN'
307 assert compression == 'UN'
308
308
309 seen = 0
309 seen = 0
310 repo.ui.progress(_('bundle'), 0, total=bytecount, unit=_('bytes'))
310 repo.ui.progress(_('bundle'), 0, total=bytecount, unit=_('bytes'))
311
311
312 for chunk in it:
312 for chunk in it:
313 seen += len(chunk)
313 seen += len(chunk)
314 repo.ui.progress(_('bundle'), seen, total=bytecount,
314 repo.ui.progress(_('bundle'), seen, total=bytecount,
315 unit=_('bytes'))
315 unit=_('bytes'))
316 yield chunk
316 yield chunk
317
317
318 repo.ui.progress(_('bundle'), None)
318 repo.ui.progress(_('bundle'), None)
319
319
320 return requirements, gen()
320 return requirements, gen()
321
321
322 def consumev1(repo, fp, filecount, bytecount):
322 def consumev1(repo, fp, filecount, bytecount):
323 """Apply the contents from version 1 of a streaming clone file handle.
323 """Apply the contents from version 1 of a streaming clone file handle.
324
324
325 This takes the output from "stream_out" and applies it to the specified
325 This takes the output from "stream_out" and applies it to the specified
326 repository.
326 repository.
327
327
328 Like "stream_out," the status line added by the wire protocol is not
328 Like "stream_out," the status line added by the wire protocol is not
329 handled by this function.
329 handled by this function.
330 """
330 """
331 with repo.lock():
331 with repo.lock():
332 repo.ui.status(_('%d files to transfer, %s of data\n') %
332 repo.ui.status(_('%d files to transfer, %s of data\n') %
333 (filecount, util.bytecount(bytecount)))
333 (filecount, util.bytecount(bytecount)))
334 handled_bytes = 0
334 handled_bytes = 0
335 repo.ui.progress(_('clone'), 0, total=bytecount, unit=_('bytes'))
335 repo.ui.progress(_('clone'), 0, total=bytecount, unit=_('bytes'))
336 start = util.timer()
336 start = util.timer()
337
337
338 # TODO: get rid of (potential) inconsistency
338 # TODO: get rid of (potential) inconsistency
339 #
339 #
340 # If transaction is started and any @filecache property is
340 # If transaction is started and any @filecache property is
341 # changed at this point, it causes inconsistency between
341 # changed at this point, it causes inconsistency between
342 # in-memory cached property and streamclone-ed file on the
342 # in-memory cached property and streamclone-ed file on the
343 # disk. Nested transaction prevents transaction scope "clone"
343 # disk. Nested transaction prevents transaction scope "clone"
344 # below from writing in-memory changes out at the end of it,
344 # below from writing in-memory changes out at the end of it,
345 # even though in-memory changes are discarded at the end of it
345 # even though in-memory changes are discarded at the end of it
346 # regardless of transaction nesting.
346 # regardless of transaction nesting.
347 #
347 #
348 # But transaction nesting can't be simply prohibited, because
348 # But transaction nesting can't be simply prohibited, because
349 # nesting occurs also in ordinary case (e.g. enabling
349 # nesting occurs also in ordinary case (e.g. enabling
350 # clonebundles).
350 # clonebundles).
351
351
352 with repo.transaction('clone'):
352 with repo.transaction('clone'):
353 with repo.svfs.backgroundclosing(repo.ui, expectedcount=filecount):
353 with repo.svfs.backgroundclosing(repo.ui, expectedcount=filecount):
354 for i in xrange(filecount):
354 for i in xrange(filecount):
355 # XXX doesn't support '\n' or '\r' in filenames
355 # XXX doesn't support '\n' or '\r' in filenames
356 l = fp.readline()
356 l = fp.readline()
357 try:
357 try:
358 name, size = l.split('\0', 1)
358 name, size = l.split('\0', 1)
359 size = int(size)
359 size = int(size)
360 except (ValueError, TypeError):
360 except (ValueError, TypeError):
361 raise error.ResponseError(
361 raise error.ResponseError(
362 _('unexpected response from remote server:'), l)
362 _('unexpected response from remote server:'), l)
363 if repo.ui.debugflag:
363 if repo.ui.debugflag:
364 repo.ui.debug('adding %s (%s)\n' %
364 repo.ui.debug('adding %s (%s)\n' %
365 (name, util.bytecount(size)))
365 (name, util.bytecount(size)))
366 # for backwards compat, name was partially encoded
366 # for backwards compat, name was partially encoded
367 path = store.decodedir(name)
367 path = store.decodedir(name)
368 with repo.svfs(path, 'w', backgroundclose=True) as ofp:
368 with repo.svfs(path, 'w', backgroundclose=True) as ofp:
369 for chunk in util.filechunkiter(fp, limit=size):
369 for chunk in util.filechunkiter(fp, limit=size):
370 handled_bytes += len(chunk)
370 handled_bytes += len(chunk)
371 repo.ui.progress(_('clone'), handled_bytes,
371 repo.ui.progress(_('clone'), handled_bytes,
372 total=bytecount, unit=_('bytes'))
372 total=bytecount, unit=_('bytes'))
373 ofp.write(chunk)
373 ofp.write(chunk)
374
374
375 # force @filecache properties to be reloaded from
375 # force @filecache properties to be reloaded from
376 # streamclone-ed file at next access
376 # streamclone-ed file at next access
377 repo.invalidate(clearfilecache=True)
377 repo.invalidate(clearfilecache=True)
378
378
379 elapsed = util.timer() - start
379 elapsed = util.timer() - start
380 if elapsed <= 0:
380 if elapsed <= 0:
381 elapsed = 0.001
381 elapsed = 0.001
382 repo.ui.progress(_('clone'), None)
382 repo.ui.progress(_('clone'), None)
383 repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
383 repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
384 (util.bytecount(bytecount), elapsed,
384 (util.bytecount(bytecount), elapsed,
385 util.bytecount(bytecount / elapsed)))
385 util.bytecount(bytecount / elapsed)))
386
386
387 def readbundle1header(fp):
387 def readbundle1header(fp):
388 compression = fp.read(2)
388 compression = fp.read(2)
389 if compression != 'UN':
389 if compression != 'UN':
390 raise error.Abort(_('only uncompressed stream clone bundles are '
390 raise error.Abort(_('only uncompressed stream clone bundles are '
391 'supported; got %s') % compression)
391 'supported; got %s') % compression)
392
392
393 filecount, bytecount = struct.unpack('>QQ', fp.read(16))
393 filecount, bytecount = struct.unpack('>QQ', fp.read(16))
394 requireslen = struct.unpack('>H', fp.read(2))[0]
394 requireslen = struct.unpack('>H', fp.read(2))[0]
395 requires = fp.read(requireslen)
395 requires = fp.read(requireslen)
396
396
397 if not requires.endswith('\0'):
397 if not requires.endswith('\0'):
398 raise error.Abort(_('malformed stream clone bundle: '
398 raise error.Abort(_('malformed stream clone bundle: '
399 'requirements not properly encoded'))
399 'requirements not properly encoded'))
400
400
401 requirements = set(requires.rstrip('\0').split(','))
401 requirements = set(requires.rstrip('\0').split(','))
402
402
403 return filecount, bytecount, requirements
403 return filecount, bytecount, requirements
404
404
405 def applybundlev1(repo, fp):
405 def applybundlev1(repo, fp):
406 """Apply the content from a stream clone bundle version 1.
406 """Apply the content from a stream clone bundle version 1.
407
407
408 We assume the 4 byte header has been read and validated and the file handle
408 We assume the 4 byte header has been read and validated and the file handle
409 is at the 2 byte compression identifier.
409 is at the 2 byte compression identifier.
410 """
410 """
411 if len(repo):
411 if len(repo):
412 raise error.Abort(_('cannot apply stream clone bundle on non-empty '
412 raise error.Abort(_('cannot apply stream clone bundle on non-empty '
413 'repo'))
413 'repo'))
414
414
415 filecount, bytecount, requirements = readbundle1header(fp)
415 filecount, bytecount, requirements = readbundle1header(fp)
416 missingreqs = requirements - repo.supportedformats
416 missingreqs = requirements - repo.supportedformats
417 if missingreqs:
417 if missingreqs:
418 raise error.Abort(_('unable to apply stream clone: '
418 raise error.Abort(_('unable to apply stream clone: '
419 'unsupported format: %s') %
419 'unsupported format: %s') %
420 ', '.join(sorted(missingreqs)))
420 ', '.join(sorted(missingreqs)))
421
421
422 consumev1(repo, fp, filecount, bytecount)
422 consumev1(repo, fp, filecount, bytecount)
423
423
424 class streamcloneapplier(object):
424 class streamcloneapplier(object):
425 """Class to manage applying streaming clone bundles.
425 """Class to manage applying streaming clone bundles.
426
426
427 We need to wrap ``applybundlev1()`` in a dedicated type to enable bundle
427 We need to wrap ``applybundlev1()`` in a dedicated type to enable bundle
428 readers to perform bundle type-specific functionality.
428 readers to perform bundle type-specific functionality.
429 """
429 """
430 def __init__(self, fh):
430 def __init__(self, fh):
431 self._fh = fh
431 self._fh = fh
432
432
433 def apply(self, repo):
433 def apply(self, repo):
434 return applybundlev1(repo, self._fh)
434 return applybundlev1(repo, self._fh)
435
435
436 # type of file to stream
436 # type of file to stream
437 _fileappend = 0 # append only file
437 _fileappend = 0 # append only file
438 _filefull = 1 # full snapshot file
438 _filefull = 1 # full snapshot file
439
439
440 # Source of the file
440 # Source of the file
441 _srcstore = 's' # store (svfs)
441 _srcstore = 's' # store (svfs)
442 _srccache = 'c' # cache (cache)
442 _srccache = 'c' # cache (cache)
443
443
444 # This is it's own function so extensions can override it.
444 # This is it's own function so extensions can override it.
445 def _walkstreamfullstorefiles(repo):
445 def _walkstreamfullstorefiles(repo):
446 """list snapshot file from the store"""
446 """list snapshot file from the store"""
447 fnames = []
447 fnames = []
448 if not repo.publishing():
448 if not repo.publishing():
449 fnames.append('phaseroots')
449 fnames.append('phaseroots')
450 return fnames
450 return fnames
451
451
452 def _filterfull(entry, copy, vfsmap):
452 def _filterfull(entry, copy, vfsmap):
453 """actually copy the snapshot files"""
453 """actually copy the snapshot files"""
454 src, name, ftype, data = entry
454 src, name, ftype, data = entry
455 if ftype != _filefull:
455 if ftype != _filefull:
456 return entry
456 return entry
457 return (src, name, ftype, copy(vfsmap[src].join(name)))
457 return (src, name, ftype, copy(vfsmap[src].join(name)))
458
458
459 @contextlib.contextmanager
459 @contextlib.contextmanager
460 def maketempcopies():
460 def maketempcopies():
461 """return a function to temporary copy file"""
461 """return a function to temporary copy file"""
462 files = []
462 files = []
463 try:
463 try:
464 def copy(src):
464 def copy(src):
465 fd, dst = tempfile.mkstemp()
465 fd, dst = tempfile.mkstemp()
466 os.close(fd)
466 os.close(fd)
467 files.append(dst)
467 files.append(dst)
468 util.copyfiles(src, dst, hardlink=True)
468 util.copyfiles(src, dst, hardlink=True)
469 return dst
469 return dst
470 yield copy
470 yield copy
471 finally:
471 finally:
472 for tmp in files:
472 for tmp in files:
473 util.tryunlink(tmp)
473 util.tryunlink(tmp)
474
474
475 def _makemap(repo):
475 def _makemap(repo):
476 """make a (src -> vfs) map for the repo"""
476 """make a (src -> vfs) map for the repo"""
477 vfsmap = {
477 vfsmap = {
478 _srcstore: repo.svfs,
478 _srcstore: repo.svfs,
479 _srccache: repo.cachevfs,
479 _srccache: repo.cachevfs,
480 }
480 }
481 # we keep repo.vfs out of the on purpose, ther are too many danger there
481 # we keep repo.vfs out of the on purpose, ther are too many danger there
482 # (eg: .hg/hgrc)
482 # (eg: .hg/hgrc)
483 assert repo.vfs not in vfsmap.values()
483 assert repo.vfs not in vfsmap.values()
484
484
485 return vfsmap
485 return vfsmap
486
486
487 def _emit2(repo, entries, totalfilesize):
487 def _emit2(repo, entries, totalfilesize):
488 """actually emit the stream bundle"""
488 """actually emit the stream bundle"""
489 vfsmap = _makemap(repo)
489 vfsmap = _makemap(repo)
490 progress = repo.ui.progress
490 progress = repo.ui.progress
491 progress(_('bundle'), 0, total=totalfilesize, unit=_('bytes'))
491 progress(_('bundle'), 0, total=totalfilesize, unit=_('bytes'))
492 with maketempcopies() as copy:
492 with maketempcopies() as copy:
493 try:
493 try:
494 # copy is delayed until we are in the try
494 # copy is delayed until we are in the try
495 entries = [_filterfull(e, copy, vfsmap) for e in entries]
495 entries = [_filterfull(e, copy, vfsmap) for e in entries]
496 yield None # this release the lock on the repository
496 yield None # this release the lock on the repository
497 seen = 0
497 seen = 0
498
498
499 for src, name, ftype, data in entries:
499 for src, name, ftype, data in entries:
500 vfs = vfsmap[src]
500 vfs = vfsmap[src]
501 yield src
501 yield src
502 yield util.uvarintencode(len(name))
502 yield util.uvarintencode(len(name))
503 if ftype == _fileappend:
503 if ftype == _fileappend:
504 fp = vfs(name)
504 fp = vfs(name)
505 size = data
505 size = data
506 elif ftype == _filefull:
506 elif ftype == _filefull:
507 fp = open(data, 'rb')
507 fp = open(data, 'rb')
508 size = util.fstat(fp).st_size
508 size = util.fstat(fp).st_size
509 try:
509 try:
510 yield util.uvarintencode(size)
510 yield util.uvarintencode(size)
511 yield name
511 yield name
512 if size <= 65536:
512 if size <= 65536:
513 chunks = (fp.read(size),)
513 chunks = (fp.read(size),)
514 else:
514 else:
515 chunks = util.filechunkiter(fp, limit=size)
515 chunks = util.filechunkiter(fp, limit=size)
516 for chunk in chunks:
516 for chunk in chunks:
517 seen += len(chunk)
517 seen += len(chunk)
518 progress(_('bundle'), seen, total=totalfilesize,
518 progress(_('bundle'), seen, total=totalfilesize,
519 unit=_('bytes'))
519 unit=_('bytes'))
520 yield chunk
520 yield chunk
521 finally:
521 finally:
522 fp.close()
522 fp.close()
523 finally:
523 finally:
524 progress(_('bundle'), None)
524 progress(_('bundle'), None)
525
525
526 def generatev2(repo):
526 def generatev2(repo):
527 """Emit content for version 2 of a streaming clone.
527 """Emit content for version 2 of a streaming clone.
528
528
529 the data stream consists the following entries:
529 the data stream consists the following entries:
530 1) A char representing the file destination (eg: store or cache)
530 1) A char representing the file destination (eg: store or cache)
531 2) A varint containing the length of the filename
531 2) A varint containing the length of the filename
532 3) A varint containing the length of file data
532 3) A varint containing the length of file data
533 4) N bytes containing the filename (the internal, store-agnostic form)
533 4) N bytes containing the filename (the internal, store-agnostic form)
534 5) N bytes containing the file data
534 5) N bytes containing the file data
535
535
536 Returns a 3-tuple of (file count, file size, data iterator).
536 Returns a 3-tuple of (file count, file size, data iterator).
537 """
537 """
538
538
539 with repo.lock():
539 with repo.lock():
540
540
541 entries = []
541 entries = []
542 totalfilesize = 0
542 totalfilesize = 0
543
543
544 repo.ui.debug('scanning\n')
544 repo.ui.debug('scanning\n')
545 for name, ename, size in _walkstreamfiles(repo):
545 for name, ename, size in _walkstreamfiles(repo):
546 if size:
546 if size:
547 entries.append((_srcstore, name, _fileappend, size))
547 entries.append((_srcstore, name, _fileappend, size))
548 totalfilesize += size
548 totalfilesize += size
549 for name in _walkstreamfullstorefiles(repo):
549 for name in _walkstreamfullstorefiles(repo):
550 if repo.svfs.exists(name):
550 if repo.svfs.exists(name):
551 totalfilesize += repo.svfs.lstat(name).st_size
551 totalfilesize += repo.svfs.lstat(name).st_size
552 entries.append((_srcstore, name, _filefull, None))
552 entries.append((_srcstore, name, _filefull, None))
553 for name in cacheutil.cachetocopy(repo):
553 for name in cacheutil.cachetocopy(repo):
554 if repo.cachevfs.exists(name):
554 if repo.cachevfs.exists(name):
555 totalfilesize += repo.cachevfs.lstat(name).st_size
555 totalfilesize += repo.cachevfs.lstat(name).st_size
556 entries.append((_srccache, name, _filefull, None))
556 entries.append((_srccache, name, _filefull, None))
557
557
558 chunks = _emit2(repo, entries, totalfilesize)
558 chunks = _emit2(repo, entries, totalfilesize)
559 first = next(chunks)
559 first = next(chunks)
560 assert first is None
560 assert first is None
561
561
562 return len(entries), totalfilesize, chunks
562 return len(entries), totalfilesize, chunks
563
563
564 @contextlib.contextmanager
564 @contextlib.contextmanager
565 def nested(*ctxs):
565 def nested(*ctxs):
566 with warnings.catch_warnings():
566 with warnings.catch_warnings():
567 # For some reason, Python decided 'nested' was deprecated without
567 # For some reason, Python decided 'nested' was deprecated without
568 # replacement. They officially advertised for filtering the deprecation
568 # replacement. They officially advertised for filtering the deprecation
569 # warning for people who actually need the feature.
569 # warning for people who actually need the feature.
570 warnings.filterwarnings("ignore",category=DeprecationWarning)
570 warnings.filterwarnings("ignore",category=DeprecationWarning)
571 with contextlib.nested(*ctxs):
571 with contextlib.nested(*ctxs):
572 yield
572 yield
573
573
574 def consumev2(repo, fp, filecount, filesize):
574 def consumev2(repo, fp, filecount, filesize):
575 """Apply the contents from a version 2 streaming clone.
575 """Apply the contents from a version 2 streaming clone.
576
576
577 Data is read from an object that only needs to provide a ``read(size)``
577 Data is read from an object that only needs to provide a ``read(size)``
578 method.
578 method.
579 """
579 """
580 with repo.lock():
580 with repo.lock():
581 repo.ui.status(_('%d files to transfer, %s of data\n') %
581 repo.ui.status(_('%d files to transfer, %s of data\n') %
582 (filecount, util.bytecount(filesize)))
582 (filecount, util.bytecount(filesize)))
583
583
584 start = util.timer()
584 start = util.timer()
585 handledbytes = 0
585 handledbytes = 0
586 progress = repo.ui.progress
586 progress = repo.ui.progress
587
587
588 progress(_('clone'), handledbytes, total=filesize, unit=_('bytes'))
588 progress(_('clone'), handledbytes, total=filesize, unit=_('bytes'))
589
589
590 vfsmap = _makemap(repo)
590 vfsmap = _makemap(repo)
591
591
592 with repo.transaction('clone'):
592 with repo.transaction('clone'):
593 ctxs = (vfs.backgroundclosing(repo.ui)
593 ctxs = (vfs.backgroundclosing(repo.ui)
594 for vfs in vfsmap.values())
594 for vfs in vfsmap.values())
595 with nested(*ctxs):
595 with nested(*ctxs):
596 for i in range(filecount):
596 for i in range(filecount):
597 src = util.readexactly(fp, 1)
597 src = util.readexactly(fp, 1)
598 vfs = vfsmap[src]
598 vfs = vfsmap[src]
599 namelen = util.uvarintdecodestream(fp)
599 namelen = util.uvarintdecodestream(fp)
600 datalen = util.uvarintdecodestream(fp)
600 datalen = util.uvarintdecodestream(fp)
601
601
602 name = util.readexactly(fp, namelen)
602 name = util.readexactly(fp, namelen)
603
603
604 if repo.ui.debugflag:
604 if repo.ui.debugflag:
605 repo.ui.debug('adding [%s] %s (%s)\n' %
605 repo.ui.debug('adding [%s] %s (%s)\n' %
606 (src, name, util.bytecount(datalen)))
606 (src, name, util.bytecount(datalen)))
607
607
608 with vfs(name, 'w') as ofp:
608 with vfs(name, 'w') as ofp:
609 for chunk in util.filechunkiter(fp, limit=datalen):
609 for chunk in util.filechunkiter(fp, limit=datalen):
610 handledbytes += len(chunk)
610 handledbytes += len(chunk)
611 progress(_('clone'), handledbytes, total=filesize,
611 progress(_('clone'), handledbytes, total=filesize,
612 unit=_('bytes'))
612 unit=_('bytes'))
613 ofp.write(chunk)
613 ofp.write(chunk)
614
614
615 # force @filecache properties to be reloaded from
615 # force @filecache properties to be reloaded from
616 # streamclone-ed file at next access
616 # streamclone-ed file at next access
617 repo.invalidate(clearfilecache=True)
617 repo.invalidate(clearfilecache=True)
618
618
619 elapsed = util.timer() - start
619 elapsed = util.timer() - start
620 if elapsed <= 0:
620 if elapsed <= 0:
621 elapsed = 0.001
621 elapsed = 0.001
622 progress(_('clone'), None)
622 progress(_('clone'), None)
623 repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
623 repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
624 (util.bytecount(handledbytes), elapsed,
624 (util.bytecount(handledbytes), elapsed,
625 util.bytecount(handledbytes / elapsed)))
625 util.bytecount(handledbytes / elapsed)))
626
626
627 def applybundlev2(repo, fp, filecount, filesize, requirements):
627 def applybundlev2(repo, fp, filecount, filesize, requirements):
628 missingreqs = [r for r in requirements if r not in repo.supported]
628 missingreqs = [r for r in requirements if r not in repo.supported]
629 if missingreqs:
629 if missingreqs:
630 raise error.Abort(_('unable to apply stream clone: '
630 raise error.Abort(_('unable to apply stream clone: '
631 'unsupported format: %s') %
631 'unsupported format: %s') %
632 ', '.join(sorted(missingreqs)))
632 ', '.join(sorted(missingreqs)))
633
633
634 consumev2(repo, fp, filecount, filesize)
634 consumev2(repo, fp, filecount, filesize)
635
636 # new requirements = old non-format requirements +
637 # new format-related remote requirements
638 # requirements from the streamed-in repository
639 repo.requirements = set(requirements) | (
640 repo.requirements - repo.supportedformats)
641 repo._applyopenerreqs()
642 repo._writerequirements()
General Comments 0
You need to be logged in to leave comments. Login now