##// END OF EJS Templates
bundle2: add 'source' atrribute to bundleoperation class...
Pulkit Goyal -
r37252:7e906d8a default
parent child Browse files
Show More
@@ -1,2262 +1,2263 b''
1 # bundle2.py - generic container format to transmit arbitrary data.
1 # bundle2.py - generic container format to transmit arbitrary data.
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7 """Handling of the new bundle2 format
7 """Handling of the new bundle2 format
8
8
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 payloads in an application agnostic way. It consist in a sequence of "parts"
10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 that will be handed to and processed by the application layer.
11 that will be handed to and processed by the application layer.
12
12
13
13
14 General format architecture
14 General format architecture
15 ===========================
15 ===========================
16
16
17 The format is architectured as follow
17 The format is architectured as follow
18
18
19 - magic string
19 - magic string
20 - stream level parameters
20 - stream level parameters
21 - payload parts (any number)
21 - payload parts (any number)
22 - end of stream marker.
22 - end of stream marker.
23
23
24 the Binary format
24 the Binary format
25 ============================
25 ============================
26
26
27 All numbers are unsigned and big-endian.
27 All numbers are unsigned and big-endian.
28
28
29 stream level parameters
29 stream level parameters
30 ------------------------
30 ------------------------
31
31
32 Binary format is as follow
32 Binary format is as follow
33
33
34 :params size: int32
34 :params size: int32
35
35
36 The total number of Bytes used by the parameters
36 The total number of Bytes used by the parameters
37
37
38 :params value: arbitrary number of Bytes
38 :params value: arbitrary number of Bytes
39
39
40 A blob of `params size` containing the serialized version of all stream level
40 A blob of `params size` containing the serialized version of all stream level
41 parameters.
41 parameters.
42
42
43 The blob contains a space separated list of parameters. Parameters with value
43 The blob contains a space separated list of parameters. Parameters with value
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45
45
46 Empty name are obviously forbidden.
46 Empty name are obviously forbidden.
47
47
48 Name MUST start with a letter. If this first letter is lower case, the
48 Name MUST start with a letter. If this first letter is lower case, the
49 parameter is advisory and can be safely ignored. However when the first
49 parameter is advisory and can be safely ignored. However when the first
50 letter is capital, the parameter is mandatory and the bundling process MUST
50 letter is capital, the parameter is mandatory and the bundling process MUST
51 stop if he is not able to proceed it.
51 stop if he is not able to proceed it.
52
52
53 Stream parameters use a simple textual format for two main reasons:
53 Stream parameters use a simple textual format for two main reasons:
54
54
55 - Stream level parameters should remain simple and we want to discourage any
55 - Stream level parameters should remain simple and we want to discourage any
56 crazy usage.
56 crazy usage.
57 - Textual data allow easy human inspection of a bundle2 header in case of
57 - Textual data allow easy human inspection of a bundle2 header in case of
58 troubles.
58 troubles.
59
59
60 Any Applicative level options MUST go into a bundle2 part instead.
60 Any Applicative level options MUST go into a bundle2 part instead.
61
61
62 Payload part
62 Payload part
63 ------------------------
63 ------------------------
64
64
65 Binary format is as follow
65 Binary format is as follow
66
66
67 :header size: int32
67 :header size: int32
68
68
69 The total number of Bytes used by the part header. When the header is empty
69 The total number of Bytes used by the part header. When the header is empty
70 (size = 0) this is interpreted as the end of stream marker.
70 (size = 0) this is interpreted as the end of stream marker.
71
71
72 :header:
72 :header:
73
73
74 The header defines how to interpret the part. It contains two piece of
74 The header defines how to interpret the part. It contains two piece of
75 data: the part type, and the part parameters.
75 data: the part type, and the part parameters.
76
76
77 The part type is used to route an application level handler, that can
77 The part type is used to route an application level handler, that can
78 interpret payload.
78 interpret payload.
79
79
80 Part parameters are passed to the application level handler. They are
80 Part parameters are passed to the application level handler. They are
81 meant to convey information that will help the application level object to
81 meant to convey information that will help the application level object to
82 interpret the part payload.
82 interpret the part payload.
83
83
84 The binary format of the header is has follow
84 The binary format of the header is has follow
85
85
86 :typesize: (one byte)
86 :typesize: (one byte)
87
87
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89
89
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 to this part.
91 to this part.
92
92
93 :parameters:
93 :parameters:
94
94
95 Part's parameter may have arbitrary content, the binary structure is::
95 Part's parameter may have arbitrary content, the binary structure is::
96
96
97 <mandatory-count><advisory-count><param-sizes><param-data>
97 <mandatory-count><advisory-count><param-sizes><param-data>
98
98
99 :mandatory-count: 1 byte, number of mandatory parameters
99 :mandatory-count: 1 byte, number of mandatory parameters
100
100
101 :advisory-count: 1 byte, number of advisory parameters
101 :advisory-count: 1 byte, number of advisory parameters
102
102
103 :param-sizes:
103 :param-sizes:
104
104
105 N couple of bytes, where N is the total number of parameters. Each
105 N couple of bytes, where N is the total number of parameters. Each
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107
107
108 :param-data:
108 :param-data:
109
109
110 A blob of bytes from which each parameter key and value can be
110 A blob of bytes from which each parameter key and value can be
111 retrieved using the list of size couples stored in the previous
111 retrieved using the list of size couples stored in the previous
112 field.
112 field.
113
113
114 Mandatory parameters comes first, then the advisory ones.
114 Mandatory parameters comes first, then the advisory ones.
115
115
116 Each parameter's key MUST be unique within the part.
116 Each parameter's key MUST be unique within the part.
117
117
118 :payload:
118 :payload:
119
119
120 payload is a series of `<chunksize><chunkdata>`.
120 payload is a series of `<chunksize><chunkdata>`.
121
121
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124
124
125 The current implementation always produces either zero or one chunk.
125 The current implementation always produces either zero or one chunk.
126 This is an implementation limitation that will ultimately be lifted.
126 This is an implementation limitation that will ultimately be lifted.
127
127
128 `chunksize` can be negative to trigger special case processing. No such
128 `chunksize` can be negative to trigger special case processing. No such
129 processing is in place yet.
129 processing is in place yet.
130
130
131 Bundle processing
131 Bundle processing
132 ============================
132 ============================
133
133
134 Each part is processed in order using a "part handler". Handler are registered
134 Each part is processed in order using a "part handler". Handler are registered
135 for a certain part type.
135 for a certain part type.
136
136
137 The matching of a part to its handler is case insensitive. The case of the
137 The matching of a part to its handler is case insensitive. The case of the
138 part type is used to know if a part is mandatory or advisory. If the Part type
138 part type is used to know if a part is mandatory or advisory. If the Part type
139 contains any uppercase char it is considered mandatory. When no handler is
139 contains any uppercase char it is considered mandatory. When no handler is
140 known for a Mandatory part, the process is aborted and an exception is raised.
140 known for a Mandatory part, the process is aborted and an exception is raised.
141 If the part is advisory and no handler is known, the part is ignored. When the
141 If the part is advisory and no handler is known, the part is ignored. When the
142 process is aborted, the full bundle is still read from the stream to keep the
142 process is aborted, the full bundle is still read from the stream to keep the
143 channel usable. But none of the part read from an abort are processed. In the
143 channel usable. But none of the part read from an abort are processed. In the
144 future, dropping the stream may become an option for channel we do not care to
144 future, dropping the stream may become an option for channel we do not care to
145 preserve.
145 preserve.
146 """
146 """
147
147
148 from __future__ import absolute_import, division
148 from __future__ import absolute_import, division
149
149
150 import collections
150 import collections
151 import errno
151 import errno
152 import os
152 import os
153 import re
153 import re
154 import string
154 import string
155 import struct
155 import struct
156 import sys
156 import sys
157
157
158 from .i18n import _
158 from .i18n import _
159 from . import (
159 from . import (
160 bookmarks,
160 bookmarks,
161 changegroup,
161 changegroup,
162 encoding,
162 encoding,
163 error,
163 error,
164 node as nodemod,
164 node as nodemod,
165 obsolete,
165 obsolete,
166 phases,
166 phases,
167 pushkey,
167 pushkey,
168 pycompat,
168 pycompat,
169 streamclone,
169 streamclone,
170 tags,
170 tags,
171 url,
171 url,
172 util,
172 util,
173 )
173 )
174 from .utils import (
174 from .utils import (
175 stringutil,
175 stringutil,
176 )
176 )
177
177
178 urlerr = util.urlerr
178 urlerr = util.urlerr
179 urlreq = util.urlreq
179 urlreq = util.urlreq
180
180
181 _pack = struct.pack
181 _pack = struct.pack
182 _unpack = struct.unpack
182 _unpack = struct.unpack
183
183
184 _fstreamparamsize = '>i'
184 _fstreamparamsize = '>i'
185 _fpartheadersize = '>i'
185 _fpartheadersize = '>i'
186 _fparttypesize = '>B'
186 _fparttypesize = '>B'
187 _fpartid = '>I'
187 _fpartid = '>I'
188 _fpayloadsize = '>i'
188 _fpayloadsize = '>i'
189 _fpartparamcount = '>BB'
189 _fpartparamcount = '>BB'
190
190
191 preferedchunksize = 32768
191 preferedchunksize = 32768
192
192
193 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
193 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
194
194
195 def outdebug(ui, message):
195 def outdebug(ui, message):
196 """debug regarding output stream (bundling)"""
196 """debug regarding output stream (bundling)"""
197 if ui.configbool('devel', 'bundle2.debug'):
197 if ui.configbool('devel', 'bundle2.debug'):
198 ui.debug('bundle2-output: %s\n' % message)
198 ui.debug('bundle2-output: %s\n' % message)
199
199
200 def indebug(ui, message):
200 def indebug(ui, message):
201 """debug on input stream (unbundling)"""
201 """debug on input stream (unbundling)"""
202 if ui.configbool('devel', 'bundle2.debug'):
202 if ui.configbool('devel', 'bundle2.debug'):
203 ui.debug('bundle2-input: %s\n' % message)
203 ui.debug('bundle2-input: %s\n' % message)
204
204
205 def validateparttype(parttype):
205 def validateparttype(parttype):
206 """raise ValueError if a parttype contains invalid character"""
206 """raise ValueError if a parttype contains invalid character"""
207 if _parttypeforbidden.search(parttype):
207 if _parttypeforbidden.search(parttype):
208 raise ValueError(parttype)
208 raise ValueError(parttype)
209
209
210 def _makefpartparamsizes(nbparams):
210 def _makefpartparamsizes(nbparams):
211 """return a struct format to read part parameter sizes
211 """return a struct format to read part parameter sizes
212
212
213 The number parameters is variable so we need to build that format
213 The number parameters is variable so we need to build that format
214 dynamically.
214 dynamically.
215 """
215 """
216 return '>'+('BB'*nbparams)
216 return '>'+('BB'*nbparams)
217
217
218 parthandlermapping = {}
218 parthandlermapping = {}
219
219
220 def parthandler(parttype, params=()):
220 def parthandler(parttype, params=()):
221 """decorator that register a function as a bundle2 part handler
221 """decorator that register a function as a bundle2 part handler
222
222
223 eg::
223 eg::
224
224
225 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
225 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
226 def myparttypehandler(...):
226 def myparttypehandler(...):
227 '''process a part of type "my part".'''
227 '''process a part of type "my part".'''
228 ...
228 ...
229 """
229 """
230 validateparttype(parttype)
230 validateparttype(parttype)
231 def _decorator(func):
231 def _decorator(func):
232 lparttype = parttype.lower() # enforce lower case matching.
232 lparttype = parttype.lower() # enforce lower case matching.
233 assert lparttype not in parthandlermapping
233 assert lparttype not in parthandlermapping
234 parthandlermapping[lparttype] = func
234 parthandlermapping[lparttype] = func
235 func.params = frozenset(params)
235 func.params = frozenset(params)
236 return func
236 return func
237 return _decorator
237 return _decorator
238
238
239 class unbundlerecords(object):
239 class unbundlerecords(object):
240 """keep record of what happens during and unbundle
240 """keep record of what happens during and unbundle
241
241
242 New records are added using `records.add('cat', obj)`. Where 'cat' is a
242 New records are added using `records.add('cat', obj)`. Where 'cat' is a
243 category of record and obj is an arbitrary object.
243 category of record and obj is an arbitrary object.
244
244
245 `records['cat']` will return all entries of this category 'cat'.
245 `records['cat']` will return all entries of this category 'cat'.
246
246
247 Iterating on the object itself will yield `('category', obj)` tuples
247 Iterating on the object itself will yield `('category', obj)` tuples
248 for all entries.
248 for all entries.
249
249
250 All iterations happens in chronological order.
250 All iterations happens in chronological order.
251 """
251 """
252
252
253 def __init__(self):
253 def __init__(self):
254 self._categories = {}
254 self._categories = {}
255 self._sequences = []
255 self._sequences = []
256 self._replies = {}
256 self._replies = {}
257
257
258 def add(self, category, entry, inreplyto=None):
258 def add(self, category, entry, inreplyto=None):
259 """add a new record of a given category.
259 """add a new record of a given category.
260
260
261 The entry can then be retrieved in the list returned by
261 The entry can then be retrieved in the list returned by
262 self['category']."""
262 self['category']."""
263 self._categories.setdefault(category, []).append(entry)
263 self._categories.setdefault(category, []).append(entry)
264 self._sequences.append((category, entry))
264 self._sequences.append((category, entry))
265 if inreplyto is not None:
265 if inreplyto is not None:
266 self.getreplies(inreplyto).add(category, entry)
266 self.getreplies(inreplyto).add(category, entry)
267
267
268 def getreplies(self, partid):
268 def getreplies(self, partid):
269 """get the records that are replies to a specific part"""
269 """get the records that are replies to a specific part"""
270 return self._replies.setdefault(partid, unbundlerecords())
270 return self._replies.setdefault(partid, unbundlerecords())
271
271
272 def __getitem__(self, cat):
272 def __getitem__(self, cat):
273 return tuple(self._categories.get(cat, ()))
273 return tuple(self._categories.get(cat, ()))
274
274
275 def __iter__(self):
275 def __iter__(self):
276 return iter(self._sequences)
276 return iter(self._sequences)
277
277
278 def __len__(self):
278 def __len__(self):
279 return len(self._sequences)
279 return len(self._sequences)
280
280
281 def __nonzero__(self):
281 def __nonzero__(self):
282 return bool(self._sequences)
282 return bool(self._sequences)
283
283
284 __bool__ = __nonzero__
284 __bool__ = __nonzero__
285
285
286 class bundleoperation(object):
286 class bundleoperation(object):
287 """an object that represents a single bundling process
287 """an object that represents a single bundling process
288
288
289 Its purpose is to carry unbundle-related objects and states.
289 Its purpose is to carry unbundle-related objects and states.
290
290
291 A new object should be created at the beginning of each bundle processing.
291 A new object should be created at the beginning of each bundle processing.
292 The object is to be returned by the processing function.
292 The object is to be returned by the processing function.
293
293
294 The object has very little content now it will ultimately contain:
294 The object has very little content now it will ultimately contain:
295 * an access to the repo the bundle is applied to,
295 * an access to the repo the bundle is applied to,
296 * a ui object,
296 * a ui object,
297 * a way to retrieve a transaction to add changes to the repo,
297 * a way to retrieve a transaction to add changes to the repo,
298 * a way to record the result of processing each part,
298 * a way to record the result of processing each part,
299 * a way to construct a bundle response when applicable.
299 * a way to construct a bundle response when applicable.
300 """
300 """
301
301
302 def __init__(self, repo, transactiongetter, captureoutput=True):
302 def __init__(self, repo, transactiongetter, captureoutput=True, source=''):
303 self.repo = repo
303 self.repo = repo
304 self.ui = repo.ui
304 self.ui = repo.ui
305 self.records = unbundlerecords()
305 self.records = unbundlerecords()
306 self.reply = None
306 self.reply = None
307 self.captureoutput = captureoutput
307 self.captureoutput = captureoutput
308 self.hookargs = {}
308 self.hookargs = {}
309 self._gettransaction = transactiongetter
309 self._gettransaction = transactiongetter
310 # carries value that can modify part behavior
310 # carries value that can modify part behavior
311 self.modes = {}
311 self.modes = {}
312 self.source = source
312
313
313 def gettransaction(self):
314 def gettransaction(self):
314 transaction = self._gettransaction()
315 transaction = self._gettransaction()
315
316
316 if self.hookargs:
317 if self.hookargs:
317 # the ones added to the transaction supercede those added
318 # the ones added to the transaction supercede those added
318 # to the operation.
319 # to the operation.
319 self.hookargs.update(transaction.hookargs)
320 self.hookargs.update(transaction.hookargs)
320 transaction.hookargs = self.hookargs
321 transaction.hookargs = self.hookargs
321
322
322 # mark the hookargs as flushed. further attempts to add to
323 # mark the hookargs as flushed. further attempts to add to
323 # hookargs will result in an abort.
324 # hookargs will result in an abort.
324 self.hookargs = None
325 self.hookargs = None
325
326
326 return transaction
327 return transaction
327
328
328 def addhookargs(self, hookargs):
329 def addhookargs(self, hookargs):
329 if self.hookargs is None:
330 if self.hookargs is None:
330 raise error.ProgrammingError('attempted to add hookargs to '
331 raise error.ProgrammingError('attempted to add hookargs to '
331 'operation after transaction started')
332 'operation after transaction started')
332 self.hookargs.update(hookargs)
333 self.hookargs.update(hookargs)
333
334
334 class TransactionUnavailable(RuntimeError):
335 class TransactionUnavailable(RuntimeError):
335 pass
336 pass
336
337
337 def _notransaction():
338 def _notransaction():
338 """default method to get a transaction while processing a bundle
339 """default method to get a transaction while processing a bundle
339
340
340 Raise an exception to highlight the fact that no transaction was expected
341 Raise an exception to highlight the fact that no transaction was expected
341 to be created"""
342 to be created"""
342 raise TransactionUnavailable()
343 raise TransactionUnavailable()
343
344
344 def applybundle(repo, unbundler, tr, source=None, url=None, **kwargs):
345 def applybundle(repo, unbundler, tr, source=None, url=None, **kwargs):
345 # transform me into unbundler.apply() as soon as the freeze is lifted
346 # transform me into unbundler.apply() as soon as the freeze is lifted
346 if isinstance(unbundler, unbundle20):
347 if isinstance(unbundler, unbundle20):
347 tr.hookargs['bundle2'] = '1'
348 tr.hookargs['bundle2'] = '1'
348 if source is not None and 'source' not in tr.hookargs:
349 if source is not None and 'source' not in tr.hookargs:
349 tr.hookargs['source'] = source
350 tr.hookargs['source'] = source
350 if url is not None and 'url' not in tr.hookargs:
351 if url is not None and 'url' not in tr.hookargs:
351 tr.hookargs['url'] = url
352 tr.hookargs['url'] = url
352 return processbundle(repo, unbundler, lambda: tr)
353 return processbundle(repo, unbundler, lambda: tr)
353 else:
354 else:
354 # the transactiongetter won't be used, but we might as well set it
355 # the transactiongetter won't be used, but we might as well set it
355 op = bundleoperation(repo, lambda: tr)
356 op = bundleoperation(repo, lambda: tr)
356 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
357 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
357 return op
358 return op
358
359
359 class partiterator(object):
360 class partiterator(object):
360 def __init__(self, repo, op, unbundler):
361 def __init__(self, repo, op, unbundler):
361 self.repo = repo
362 self.repo = repo
362 self.op = op
363 self.op = op
363 self.unbundler = unbundler
364 self.unbundler = unbundler
364 self.iterator = None
365 self.iterator = None
365 self.count = 0
366 self.count = 0
366 self.current = None
367 self.current = None
367
368
368 def __enter__(self):
369 def __enter__(self):
369 def func():
370 def func():
370 itr = enumerate(self.unbundler.iterparts())
371 itr = enumerate(self.unbundler.iterparts())
371 for count, p in itr:
372 for count, p in itr:
372 self.count = count
373 self.count = count
373 self.current = p
374 self.current = p
374 yield p
375 yield p
375 p.consume()
376 p.consume()
376 self.current = None
377 self.current = None
377 self.iterator = func()
378 self.iterator = func()
378 return self.iterator
379 return self.iterator
379
380
380 def __exit__(self, type, exc, tb):
381 def __exit__(self, type, exc, tb):
381 if not self.iterator:
382 if not self.iterator:
382 return
383 return
383
384
384 # Only gracefully abort in a normal exception situation. User aborts
385 # Only gracefully abort in a normal exception situation. User aborts
385 # like Ctrl+C throw a KeyboardInterrupt which is not a base Exception,
386 # like Ctrl+C throw a KeyboardInterrupt which is not a base Exception,
386 # and should not gracefully cleanup.
387 # and should not gracefully cleanup.
387 if isinstance(exc, Exception):
388 if isinstance(exc, Exception):
388 # Any exceptions seeking to the end of the bundle at this point are
389 # Any exceptions seeking to the end of the bundle at this point are
389 # almost certainly related to the underlying stream being bad.
390 # almost certainly related to the underlying stream being bad.
390 # And, chances are that the exception we're handling is related to
391 # And, chances are that the exception we're handling is related to
391 # getting in that bad state. So, we swallow the seeking error and
392 # getting in that bad state. So, we swallow the seeking error and
392 # re-raise the original error.
393 # re-raise the original error.
393 seekerror = False
394 seekerror = False
394 try:
395 try:
395 if self.current:
396 if self.current:
396 # consume the part content to not corrupt the stream.
397 # consume the part content to not corrupt the stream.
397 self.current.consume()
398 self.current.consume()
398
399
399 for part in self.iterator:
400 for part in self.iterator:
400 # consume the bundle content
401 # consume the bundle content
401 part.consume()
402 part.consume()
402 except Exception:
403 except Exception:
403 seekerror = True
404 seekerror = True
404
405
405 # Small hack to let caller code distinguish exceptions from bundle2
406 # Small hack to let caller code distinguish exceptions from bundle2
406 # processing from processing the old format. This is mostly needed
407 # processing from processing the old format. This is mostly needed
407 # to handle different return codes to unbundle according to the type
408 # to handle different return codes to unbundle according to the type
408 # of bundle. We should probably clean up or drop this return code
409 # of bundle. We should probably clean up or drop this return code
409 # craziness in a future version.
410 # craziness in a future version.
410 exc.duringunbundle2 = True
411 exc.duringunbundle2 = True
411 salvaged = []
412 salvaged = []
412 replycaps = None
413 replycaps = None
413 if self.op.reply is not None:
414 if self.op.reply is not None:
414 salvaged = self.op.reply.salvageoutput()
415 salvaged = self.op.reply.salvageoutput()
415 replycaps = self.op.reply.capabilities
416 replycaps = self.op.reply.capabilities
416 exc._replycaps = replycaps
417 exc._replycaps = replycaps
417 exc._bundle2salvagedoutput = salvaged
418 exc._bundle2salvagedoutput = salvaged
418
419
419 # Re-raising from a variable loses the original stack. So only use
420 # Re-raising from a variable loses the original stack. So only use
420 # that form if we need to.
421 # that form if we need to.
421 if seekerror:
422 if seekerror:
422 raise exc
423 raise exc
423
424
424 self.repo.ui.debug('bundle2-input-bundle: %i parts total\n' %
425 self.repo.ui.debug('bundle2-input-bundle: %i parts total\n' %
425 self.count)
426 self.count)
426
427
427 def processbundle(repo, unbundler, transactiongetter=None, op=None):
428 def processbundle(repo, unbundler, transactiongetter=None, op=None):
428 """This function process a bundle, apply effect to/from a repo
429 """This function process a bundle, apply effect to/from a repo
429
430
430 It iterates over each part then searches for and uses the proper handling
431 It iterates over each part then searches for and uses the proper handling
431 code to process the part. Parts are processed in order.
432 code to process the part. Parts are processed in order.
432
433
433 Unknown Mandatory part will abort the process.
434 Unknown Mandatory part will abort the process.
434
435
435 It is temporarily possible to provide a prebuilt bundleoperation to the
436 It is temporarily possible to provide a prebuilt bundleoperation to the
436 function. This is used to ensure output is properly propagated in case of
437 function. This is used to ensure output is properly propagated in case of
437 an error during the unbundling. This output capturing part will likely be
438 an error during the unbundling. This output capturing part will likely be
438 reworked and this ability will probably go away in the process.
439 reworked and this ability will probably go away in the process.
439 """
440 """
440 if op is None:
441 if op is None:
441 if transactiongetter is None:
442 if transactiongetter is None:
442 transactiongetter = _notransaction
443 transactiongetter = _notransaction
443 op = bundleoperation(repo, transactiongetter)
444 op = bundleoperation(repo, transactiongetter)
444 # todo:
445 # todo:
445 # - replace this is a init function soon.
446 # - replace this is a init function soon.
446 # - exception catching
447 # - exception catching
447 unbundler.params
448 unbundler.params
448 if repo.ui.debugflag:
449 if repo.ui.debugflag:
449 msg = ['bundle2-input-bundle:']
450 msg = ['bundle2-input-bundle:']
450 if unbundler.params:
451 if unbundler.params:
451 msg.append(' %i params' % len(unbundler.params))
452 msg.append(' %i params' % len(unbundler.params))
452 if op._gettransaction is None or op._gettransaction is _notransaction:
453 if op._gettransaction is None or op._gettransaction is _notransaction:
453 msg.append(' no-transaction')
454 msg.append(' no-transaction')
454 else:
455 else:
455 msg.append(' with-transaction')
456 msg.append(' with-transaction')
456 msg.append('\n')
457 msg.append('\n')
457 repo.ui.debug(''.join(msg))
458 repo.ui.debug(''.join(msg))
458
459
459 processparts(repo, op, unbundler)
460 processparts(repo, op, unbundler)
460
461
461 return op
462 return op
462
463
463 def processparts(repo, op, unbundler):
464 def processparts(repo, op, unbundler):
464 with partiterator(repo, op, unbundler) as parts:
465 with partiterator(repo, op, unbundler) as parts:
465 for part in parts:
466 for part in parts:
466 _processpart(op, part)
467 _processpart(op, part)
467
468
468 def _processchangegroup(op, cg, tr, source, url, **kwargs):
469 def _processchangegroup(op, cg, tr, source, url, **kwargs):
469 ret = cg.apply(op.repo, tr, source, url, **kwargs)
470 ret = cg.apply(op.repo, tr, source, url, **kwargs)
470 op.records.add('changegroup', {
471 op.records.add('changegroup', {
471 'return': ret,
472 'return': ret,
472 })
473 })
473 return ret
474 return ret
474
475
475 def _gethandler(op, part):
476 def _gethandler(op, part):
476 status = 'unknown' # used by debug output
477 status = 'unknown' # used by debug output
477 try:
478 try:
478 handler = parthandlermapping.get(part.type)
479 handler = parthandlermapping.get(part.type)
479 if handler is None:
480 if handler is None:
480 status = 'unsupported-type'
481 status = 'unsupported-type'
481 raise error.BundleUnknownFeatureError(parttype=part.type)
482 raise error.BundleUnknownFeatureError(parttype=part.type)
482 indebug(op.ui, 'found a handler for part %s' % part.type)
483 indebug(op.ui, 'found a handler for part %s' % part.type)
483 unknownparams = part.mandatorykeys - handler.params
484 unknownparams = part.mandatorykeys - handler.params
484 if unknownparams:
485 if unknownparams:
485 unknownparams = list(unknownparams)
486 unknownparams = list(unknownparams)
486 unknownparams.sort()
487 unknownparams.sort()
487 status = 'unsupported-params (%s)' % ', '.join(unknownparams)
488 status = 'unsupported-params (%s)' % ', '.join(unknownparams)
488 raise error.BundleUnknownFeatureError(parttype=part.type,
489 raise error.BundleUnknownFeatureError(parttype=part.type,
489 params=unknownparams)
490 params=unknownparams)
490 status = 'supported'
491 status = 'supported'
491 except error.BundleUnknownFeatureError as exc:
492 except error.BundleUnknownFeatureError as exc:
492 if part.mandatory: # mandatory parts
493 if part.mandatory: # mandatory parts
493 raise
494 raise
494 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
495 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
495 return # skip to part processing
496 return # skip to part processing
496 finally:
497 finally:
497 if op.ui.debugflag:
498 if op.ui.debugflag:
498 msg = ['bundle2-input-part: "%s"' % part.type]
499 msg = ['bundle2-input-part: "%s"' % part.type]
499 if not part.mandatory:
500 if not part.mandatory:
500 msg.append(' (advisory)')
501 msg.append(' (advisory)')
501 nbmp = len(part.mandatorykeys)
502 nbmp = len(part.mandatorykeys)
502 nbap = len(part.params) - nbmp
503 nbap = len(part.params) - nbmp
503 if nbmp or nbap:
504 if nbmp or nbap:
504 msg.append(' (params:')
505 msg.append(' (params:')
505 if nbmp:
506 if nbmp:
506 msg.append(' %i mandatory' % nbmp)
507 msg.append(' %i mandatory' % nbmp)
507 if nbap:
508 if nbap:
508 msg.append(' %i advisory' % nbmp)
509 msg.append(' %i advisory' % nbmp)
509 msg.append(')')
510 msg.append(')')
510 msg.append(' %s\n' % status)
511 msg.append(' %s\n' % status)
511 op.ui.debug(''.join(msg))
512 op.ui.debug(''.join(msg))
512
513
513 return handler
514 return handler
514
515
515 def _processpart(op, part):
516 def _processpart(op, part):
516 """process a single part from a bundle
517 """process a single part from a bundle
517
518
518 The part is guaranteed to have been fully consumed when the function exits
519 The part is guaranteed to have been fully consumed when the function exits
519 (even if an exception is raised)."""
520 (even if an exception is raised)."""
520 handler = _gethandler(op, part)
521 handler = _gethandler(op, part)
521 if handler is None:
522 if handler is None:
522 return
523 return
523
524
524 # handler is called outside the above try block so that we don't
525 # handler is called outside the above try block so that we don't
525 # risk catching KeyErrors from anything other than the
526 # risk catching KeyErrors from anything other than the
526 # parthandlermapping lookup (any KeyError raised by handler()
527 # parthandlermapping lookup (any KeyError raised by handler()
527 # itself represents a defect of a different variety).
528 # itself represents a defect of a different variety).
528 output = None
529 output = None
529 if op.captureoutput and op.reply is not None:
530 if op.captureoutput and op.reply is not None:
530 op.ui.pushbuffer(error=True, subproc=True)
531 op.ui.pushbuffer(error=True, subproc=True)
531 output = ''
532 output = ''
532 try:
533 try:
533 handler(op, part)
534 handler(op, part)
534 finally:
535 finally:
535 if output is not None:
536 if output is not None:
536 output = op.ui.popbuffer()
537 output = op.ui.popbuffer()
537 if output:
538 if output:
538 outpart = op.reply.newpart('output', data=output,
539 outpart = op.reply.newpart('output', data=output,
539 mandatory=False)
540 mandatory=False)
540 outpart.addparam(
541 outpart.addparam(
541 'in-reply-to', pycompat.bytestr(part.id), mandatory=False)
542 'in-reply-to', pycompat.bytestr(part.id), mandatory=False)
542
543
543 def decodecaps(blob):
544 def decodecaps(blob):
544 """decode a bundle2 caps bytes blob into a dictionary
545 """decode a bundle2 caps bytes blob into a dictionary
545
546
546 The blob is a list of capabilities (one per line)
547 The blob is a list of capabilities (one per line)
547 Capabilities may have values using a line of the form::
548 Capabilities may have values using a line of the form::
548
549
549 capability=value1,value2,value3
550 capability=value1,value2,value3
550
551
551 The values are always a list."""
552 The values are always a list."""
552 caps = {}
553 caps = {}
553 for line in blob.splitlines():
554 for line in blob.splitlines():
554 if not line:
555 if not line:
555 continue
556 continue
556 if '=' not in line:
557 if '=' not in line:
557 key, vals = line, ()
558 key, vals = line, ()
558 else:
559 else:
559 key, vals = line.split('=', 1)
560 key, vals = line.split('=', 1)
560 vals = vals.split(',')
561 vals = vals.split(',')
561 key = urlreq.unquote(key)
562 key = urlreq.unquote(key)
562 vals = [urlreq.unquote(v) for v in vals]
563 vals = [urlreq.unquote(v) for v in vals]
563 caps[key] = vals
564 caps[key] = vals
564 return caps
565 return caps
565
566
566 def encodecaps(caps):
567 def encodecaps(caps):
567 """encode a bundle2 caps dictionary into a bytes blob"""
568 """encode a bundle2 caps dictionary into a bytes blob"""
568 chunks = []
569 chunks = []
569 for ca in sorted(caps):
570 for ca in sorted(caps):
570 vals = caps[ca]
571 vals = caps[ca]
571 ca = urlreq.quote(ca)
572 ca = urlreq.quote(ca)
572 vals = [urlreq.quote(v) for v in vals]
573 vals = [urlreq.quote(v) for v in vals]
573 if vals:
574 if vals:
574 ca = "%s=%s" % (ca, ','.join(vals))
575 ca = "%s=%s" % (ca, ','.join(vals))
575 chunks.append(ca)
576 chunks.append(ca)
576 return '\n'.join(chunks)
577 return '\n'.join(chunks)
577
578
578 bundletypes = {
579 bundletypes = {
579 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
580 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
580 # since the unification ssh accepts a header but there
581 # since the unification ssh accepts a header but there
581 # is no capability signaling it.
582 # is no capability signaling it.
582 "HG20": (), # special-cased below
583 "HG20": (), # special-cased below
583 "HG10UN": ("HG10UN", 'UN'),
584 "HG10UN": ("HG10UN", 'UN'),
584 "HG10BZ": ("HG10", 'BZ'),
585 "HG10BZ": ("HG10", 'BZ'),
585 "HG10GZ": ("HG10GZ", 'GZ'),
586 "HG10GZ": ("HG10GZ", 'GZ'),
586 }
587 }
587
588
588 # hgweb uses this list to communicate its preferred type
589 # hgweb uses this list to communicate its preferred type
589 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
590 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
590
591
591 class bundle20(object):
592 class bundle20(object):
592 """represent an outgoing bundle2 container
593 """represent an outgoing bundle2 container
593
594
594 Use the `addparam` method to add stream level parameter. and `newpart` to
595 Use the `addparam` method to add stream level parameter. and `newpart` to
595 populate it. Then call `getchunks` to retrieve all the binary chunks of
596 populate it. Then call `getchunks` to retrieve all the binary chunks of
596 data that compose the bundle2 container."""
597 data that compose the bundle2 container."""
597
598
598 _magicstring = 'HG20'
599 _magicstring = 'HG20'
599
600
600 def __init__(self, ui, capabilities=()):
601 def __init__(self, ui, capabilities=()):
601 self.ui = ui
602 self.ui = ui
602 self._params = []
603 self._params = []
603 self._parts = []
604 self._parts = []
604 self.capabilities = dict(capabilities)
605 self.capabilities = dict(capabilities)
605 self._compengine = util.compengines.forbundletype('UN')
606 self._compengine = util.compengines.forbundletype('UN')
606 self._compopts = None
607 self._compopts = None
607 # If compression is being handled by a consumer of the raw
608 # If compression is being handled by a consumer of the raw
608 # data (e.g. the wire protocol), unsetting this flag tells
609 # data (e.g. the wire protocol), unsetting this flag tells
609 # consumers that the bundle is best left uncompressed.
610 # consumers that the bundle is best left uncompressed.
610 self.prefercompressed = True
611 self.prefercompressed = True
611
612
612 def setcompression(self, alg, compopts=None):
613 def setcompression(self, alg, compopts=None):
613 """setup core part compression to <alg>"""
614 """setup core part compression to <alg>"""
614 if alg in (None, 'UN'):
615 if alg in (None, 'UN'):
615 return
616 return
616 assert not any(n.lower() == 'compression' for n, v in self._params)
617 assert not any(n.lower() == 'compression' for n, v in self._params)
617 self.addparam('Compression', alg)
618 self.addparam('Compression', alg)
618 self._compengine = util.compengines.forbundletype(alg)
619 self._compengine = util.compengines.forbundletype(alg)
619 self._compopts = compopts
620 self._compopts = compopts
620
621
621 @property
622 @property
622 def nbparts(self):
623 def nbparts(self):
623 """total number of parts added to the bundler"""
624 """total number of parts added to the bundler"""
624 return len(self._parts)
625 return len(self._parts)
625
626
626 # methods used to defines the bundle2 content
627 # methods used to defines the bundle2 content
627 def addparam(self, name, value=None):
628 def addparam(self, name, value=None):
628 """add a stream level parameter"""
629 """add a stream level parameter"""
629 if not name:
630 if not name:
630 raise ValueError(r'empty parameter name')
631 raise ValueError(r'empty parameter name')
631 if name[0:1] not in pycompat.bytestr(string.ascii_letters):
632 if name[0:1] not in pycompat.bytestr(string.ascii_letters):
632 raise ValueError(r'non letter first character: %s' % name)
633 raise ValueError(r'non letter first character: %s' % name)
633 self._params.append((name, value))
634 self._params.append((name, value))
634
635
635 def addpart(self, part):
636 def addpart(self, part):
636 """add a new part to the bundle2 container
637 """add a new part to the bundle2 container
637
638
638 Parts contains the actual applicative payload."""
639 Parts contains the actual applicative payload."""
639 assert part.id is None
640 assert part.id is None
640 part.id = len(self._parts) # very cheap counter
641 part.id = len(self._parts) # very cheap counter
641 self._parts.append(part)
642 self._parts.append(part)
642
643
643 def newpart(self, typeid, *args, **kwargs):
644 def newpart(self, typeid, *args, **kwargs):
644 """create a new part and add it to the containers
645 """create a new part and add it to the containers
645
646
646 As the part is directly added to the containers. For now, this means
647 As the part is directly added to the containers. For now, this means
647 that any failure to properly initialize the part after calling
648 that any failure to properly initialize the part after calling
648 ``newpart`` should result in a failure of the whole bundling process.
649 ``newpart`` should result in a failure of the whole bundling process.
649
650
650 You can still fall back to manually create and add if you need better
651 You can still fall back to manually create and add if you need better
651 control."""
652 control."""
652 part = bundlepart(typeid, *args, **kwargs)
653 part = bundlepart(typeid, *args, **kwargs)
653 self.addpart(part)
654 self.addpart(part)
654 return part
655 return part
655
656
656 # methods used to generate the bundle2 stream
657 # methods used to generate the bundle2 stream
657 def getchunks(self):
658 def getchunks(self):
658 if self.ui.debugflag:
659 if self.ui.debugflag:
659 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
660 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
660 if self._params:
661 if self._params:
661 msg.append(' (%i params)' % len(self._params))
662 msg.append(' (%i params)' % len(self._params))
662 msg.append(' %i parts total\n' % len(self._parts))
663 msg.append(' %i parts total\n' % len(self._parts))
663 self.ui.debug(''.join(msg))
664 self.ui.debug(''.join(msg))
664 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
665 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
665 yield self._magicstring
666 yield self._magicstring
666 param = self._paramchunk()
667 param = self._paramchunk()
667 outdebug(self.ui, 'bundle parameter: %s' % param)
668 outdebug(self.ui, 'bundle parameter: %s' % param)
668 yield _pack(_fstreamparamsize, len(param))
669 yield _pack(_fstreamparamsize, len(param))
669 if param:
670 if param:
670 yield param
671 yield param
671 for chunk in self._compengine.compressstream(self._getcorechunk(),
672 for chunk in self._compengine.compressstream(self._getcorechunk(),
672 self._compopts):
673 self._compopts):
673 yield chunk
674 yield chunk
674
675
675 def _paramchunk(self):
676 def _paramchunk(self):
676 """return a encoded version of all stream parameters"""
677 """return a encoded version of all stream parameters"""
677 blocks = []
678 blocks = []
678 for par, value in self._params:
679 for par, value in self._params:
679 par = urlreq.quote(par)
680 par = urlreq.quote(par)
680 if value is not None:
681 if value is not None:
681 value = urlreq.quote(value)
682 value = urlreq.quote(value)
682 par = '%s=%s' % (par, value)
683 par = '%s=%s' % (par, value)
683 blocks.append(par)
684 blocks.append(par)
684 return ' '.join(blocks)
685 return ' '.join(blocks)
685
686
686 def _getcorechunk(self):
687 def _getcorechunk(self):
687 """yield chunk for the core part of the bundle
688 """yield chunk for the core part of the bundle
688
689
689 (all but headers and parameters)"""
690 (all but headers and parameters)"""
690 outdebug(self.ui, 'start of parts')
691 outdebug(self.ui, 'start of parts')
691 for part in self._parts:
692 for part in self._parts:
692 outdebug(self.ui, 'bundle part: "%s"' % part.type)
693 outdebug(self.ui, 'bundle part: "%s"' % part.type)
693 for chunk in part.getchunks(ui=self.ui):
694 for chunk in part.getchunks(ui=self.ui):
694 yield chunk
695 yield chunk
695 outdebug(self.ui, 'end of bundle')
696 outdebug(self.ui, 'end of bundle')
696 yield _pack(_fpartheadersize, 0)
697 yield _pack(_fpartheadersize, 0)
697
698
698
699
699 def salvageoutput(self):
700 def salvageoutput(self):
700 """return a list with a copy of all output parts in the bundle
701 """return a list with a copy of all output parts in the bundle
701
702
702 This is meant to be used during error handling to make sure we preserve
703 This is meant to be used during error handling to make sure we preserve
703 server output"""
704 server output"""
704 salvaged = []
705 salvaged = []
705 for part in self._parts:
706 for part in self._parts:
706 if part.type.startswith('output'):
707 if part.type.startswith('output'):
707 salvaged.append(part.copy())
708 salvaged.append(part.copy())
708 return salvaged
709 return salvaged
709
710
710
711
711 class unpackermixin(object):
712 class unpackermixin(object):
712 """A mixin to extract bytes and struct data from a stream"""
713 """A mixin to extract bytes and struct data from a stream"""
713
714
714 def __init__(self, fp):
715 def __init__(self, fp):
715 self._fp = fp
716 self._fp = fp
716
717
717 def _unpack(self, format):
718 def _unpack(self, format):
718 """unpack this struct format from the stream
719 """unpack this struct format from the stream
719
720
720 This method is meant for internal usage by the bundle2 protocol only.
721 This method is meant for internal usage by the bundle2 protocol only.
721 They directly manipulate the low level stream including bundle2 level
722 They directly manipulate the low level stream including bundle2 level
722 instruction.
723 instruction.
723
724
724 Do not use it to implement higher-level logic or methods."""
725 Do not use it to implement higher-level logic or methods."""
725 data = self._readexact(struct.calcsize(format))
726 data = self._readexact(struct.calcsize(format))
726 return _unpack(format, data)
727 return _unpack(format, data)
727
728
728 def _readexact(self, size):
729 def _readexact(self, size):
729 """read exactly <size> bytes from the stream
730 """read exactly <size> bytes from the stream
730
731
731 This method is meant for internal usage by the bundle2 protocol only.
732 This method is meant for internal usage by the bundle2 protocol only.
732 They directly manipulate the low level stream including bundle2 level
733 They directly manipulate the low level stream including bundle2 level
733 instruction.
734 instruction.
734
735
735 Do not use it to implement higher-level logic or methods."""
736 Do not use it to implement higher-level logic or methods."""
736 return changegroup.readexactly(self._fp, size)
737 return changegroup.readexactly(self._fp, size)
737
738
738 def getunbundler(ui, fp, magicstring=None):
739 def getunbundler(ui, fp, magicstring=None):
739 """return a valid unbundler object for a given magicstring"""
740 """return a valid unbundler object for a given magicstring"""
740 if magicstring is None:
741 if magicstring is None:
741 magicstring = changegroup.readexactly(fp, 4)
742 magicstring = changegroup.readexactly(fp, 4)
742 magic, version = magicstring[0:2], magicstring[2:4]
743 magic, version = magicstring[0:2], magicstring[2:4]
743 if magic != 'HG':
744 if magic != 'HG':
744 ui.debug(
745 ui.debug(
745 "error: invalid magic: %r (version %r), should be 'HG'\n"
746 "error: invalid magic: %r (version %r), should be 'HG'\n"
746 % (magic, version))
747 % (magic, version))
747 raise error.Abort(_('not a Mercurial bundle'))
748 raise error.Abort(_('not a Mercurial bundle'))
748 unbundlerclass = formatmap.get(version)
749 unbundlerclass = formatmap.get(version)
749 if unbundlerclass is None:
750 if unbundlerclass is None:
750 raise error.Abort(_('unknown bundle version %s') % version)
751 raise error.Abort(_('unknown bundle version %s') % version)
751 unbundler = unbundlerclass(ui, fp)
752 unbundler = unbundlerclass(ui, fp)
752 indebug(ui, 'start processing of %s stream' % magicstring)
753 indebug(ui, 'start processing of %s stream' % magicstring)
753 return unbundler
754 return unbundler
754
755
755 class unbundle20(unpackermixin):
756 class unbundle20(unpackermixin):
756 """interpret a bundle2 stream
757 """interpret a bundle2 stream
757
758
758 This class is fed with a binary stream and yields parts through its
759 This class is fed with a binary stream and yields parts through its
759 `iterparts` methods."""
760 `iterparts` methods."""
760
761
761 _magicstring = 'HG20'
762 _magicstring = 'HG20'
762
763
763 def __init__(self, ui, fp):
764 def __init__(self, ui, fp):
764 """If header is specified, we do not read it out of the stream."""
765 """If header is specified, we do not read it out of the stream."""
765 self.ui = ui
766 self.ui = ui
766 self._compengine = util.compengines.forbundletype('UN')
767 self._compengine = util.compengines.forbundletype('UN')
767 self._compressed = None
768 self._compressed = None
768 super(unbundle20, self).__init__(fp)
769 super(unbundle20, self).__init__(fp)
769
770
770 @util.propertycache
771 @util.propertycache
771 def params(self):
772 def params(self):
772 """dictionary of stream level parameters"""
773 """dictionary of stream level parameters"""
773 indebug(self.ui, 'reading bundle2 stream parameters')
774 indebug(self.ui, 'reading bundle2 stream parameters')
774 params = {}
775 params = {}
775 paramssize = self._unpack(_fstreamparamsize)[0]
776 paramssize = self._unpack(_fstreamparamsize)[0]
776 if paramssize < 0:
777 if paramssize < 0:
777 raise error.BundleValueError('negative bundle param size: %i'
778 raise error.BundleValueError('negative bundle param size: %i'
778 % paramssize)
779 % paramssize)
779 if paramssize:
780 if paramssize:
780 params = self._readexact(paramssize)
781 params = self._readexact(paramssize)
781 params = self._processallparams(params)
782 params = self._processallparams(params)
782 return params
783 return params
783
784
784 def _processallparams(self, paramsblock):
785 def _processallparams(self, paramsblock):
785 """"""
786 """"""
786 params = util.sortdict()
787 params = util.sortdict()
787 for p in paramsblock.split(' '):
788 for p in paramsblock.split(' '):
788 p = p.split('=', 1)
789 p = p.split('=', 1)
789 p = [urlreq.unquote(i) for i in p]
790 p = [urlreq.unquote(i) for i in p]
790 if len(p) < 2:
791 if len(p) < 2:
791 p.append(None)
792 p.append(None)
792 self._processparam(*p)
793 self._processparam(*p)
793 params[p[0]] = p[1]
794 params[p[0]] = p[1]
794 return params
795 return params
795
796
796
797
797 def _processparam(self, name, value):
798 def _processparam(self, name, value):
798 """process a parameter, applying its effect if needed
799 """process a parameter, applying its effect if needed
799
800
800 Parameter starting with a lower case letter are advisory and will be
801 Parameter starting with a lower case letter are advisory and will be
801 ignored when unknown. Those starting with an upper case letter are
802 ignored when unknown. Those starting with an upper case letter are
802 mandatory and will this function will raise a KeyError when unknown.
803 mandatory and will this function will raise a KeyError when unknown.
803
804
804 Note: no option are currently supported. Any input will be either
805 Note: no option are currently supported. Any input will be either
805 ignored or failing.
806 ignored or failing.
806 """
807 """
807 if not name:
808 if not name:
808 raise ValueError(r'empty parameter name')
809 raise ValueError(r'empty parameter name')
809 if name[0:1] not in pycompat.bytestr(string.ascii_letters):
810 if name[0:1] not in pycompat.bytestr(string.ascii_letters):
810 raise ValueError(r'non letter first character: %s' % name)
811 raise ValueError(r'non letter first character: %s' % name)
811 try:
812 try:
812 handler = b2streamparamsmap[name.lower()]
813 handler = b2streamparamsmap[name.lower()]
813 except KeyError:
814 except KeyError:
814 if name[0:1].islower():
815 if name[0:1].islower():
815 indebug(self.ui, "ignoring unknown parameter %s" % name)
816 indebug(self.ui, "ignoring unknown parameter %s" % name)
816 else:
817 else:
817 raise error.BundleUnknownFeatureError(params=(name,))
818 raise error.BundleUnknownFeatureError(params=(name,))
818 else:
819 else:
819 handler(self, name, value)
820 handler(self, name, value)
820
821
821 def _forwardchunks(self):
822 def _forwardchunks(self):
822 """utility to transfer a bundle2 as binary
823 """utility to transfer a bundle2 as binary
823
824
824 This is made necessary by the fact the 'getbundle' command over 'ssh'
825 This is made necessary by the fact the 'getbundle' command over 'ssh'
825 have no way to know then the reply end, relying on the bundle to be
826 have no way to know then the reply end, relying on the bundle to be
826 interpreted to know its end. This is terrible and we are sorry, but we
827 interpreted to know its end. This is terrible and we are sorry, but we
827 needed to move forward to get general delta enabled.
828 needed to move forward to get general delta enabled.
828 """
829 """
829 yield self._magicstring
830 yield self._magicstring
830 assert 'params' not in vars(self)
831 assert 'params' not in vars(self)
831 paramssize = self._unpack(_fstreamparamsize)[0]
832 paramssize = self._unpack(_fstreamparamsize)[0]
832 if paramssize < 0:
833 if paramssize < 0:
833 raise error.BundleValueError('negative bundle param size: %i'
834 raise error.BundleValueError('negative bundle param size: %i'
834 % paramssize)
835 % paramssize)
835 yield _pack(_fstreamparamsize, paramssize)
836 yield _pack(_fstreamparamsize, paramssize)
836 if paramssize:
837 if paramssize:
837 params = self._readexact(paramssize)
838 params = self._readexact(paramssize)
838 self._processallparams(params)
839 self._processallparams(params)
839 yield params
840 yield params
840 assert self._compengine.bundletype == 'UN'
841 assert self._compengine.bundletype == 'UN'
841 # From there, payload might need to be decompressed
842 # From there, payload might need to be decompressed
842 self._fp = self._compengine.decompressorreader(self._fp)
843 self._fp = self._compengine.decompressorreader(self._fp)
843 emptycount = 0
844 emptycount = 0
844 while emptycount < 2:
845 while emptycount < 2:
845 # so we can brainlessly loop
846 # so we can brainlessly loop
846 assert _fpartheadersize == _fpayloadsize
847 assert _fpartheadersize == _fpayloadsize
847 size = self._unpack(_fpartheadersize)[0]
848 size = self._unpack(_fpartheadersize)[0]
848 yield _pack(_fpartheadersize, size)
849 yield _pack(_fpartheadersize, size)
849 if size:
850 if size:
850 emptycount = 0
851 emptycount = 0
851 else:
852 else:
852 emptycount += 1
853 emptycount += 1
853 continue
854 continue
854 if size == flaginterrupt:
855 if size == flaginterrupt:
855 continue
856 continue
856 elif size < 0:
857 elif size < 0:
857 raise error.BundleValueError('negative chunk size: %i')
858 raise error.BundleValueError('negative chunk size: %i')
858 yield self._readexact(size)
859 yield self._readexact(size)
859
860
860
861
861 def iterparts(self, seekable=False):
862 def iterparts(self, seekable=False):
862 """yield all parts contained in the stream"""
863 """yield all parts contained in the stream"""
863 cls = seekableunbundlepart if seekable else unbundlepart
864 cls = seekableunbundlepart if seekable else unbundlepart
864 # make sure param have been loaded
865 # make sure param have been loaded
865 self.params
866 self.params
866 # From there, payload need to be decompressed
867 # From there, payload need to be decompressed
867 self._fp = self._compengine.decompressorreader(self._fp)
868 self._fp = self._compengine.decompressorreader(self._fp)
868 indebug(self.ui, 'start extraction of bundle2 parts')
869 indebug(self.ui, 'start extraction of bundle2 parts')
869 headerblock = self._readpartheader()
870 headerblock = self._readpartheader()
870 while headerblock is not None:
871 while headerblock is not None:
871 part = cls(self.ui, headerblock, self._fp)
872 part = cls(self.ui, headerblock, self._fp)
872 yield part
873 yield part
873 # Ensure part is fully consumed so we can start reading the next
874 # Ensure part is fully consumed so we can start reading the next
874 # part.
875 # part.
875 part.consume()
876 part.consume()
876
877
877 headerblock = self._readpartheader()
878 headerblock = self._readpartheader()
878 indebug(self.ui, 'end of bundle2 stream')
879 indebug(self.ui, 'end of bundle2 stream')
879
880
880 def _readpartheader(self):
881 def _readpartheader(self):
881 """reads a part header size and return the bytes blob
882 """reads a part header size and return the bytes blob
882
883
883 returns None if empty"""
884 returns None if empty"""
884 headersize = self._unpack(_fpartheadersize)[0]
885 headersize = self._unpack(_fpartheadersize)[0]
885 if headersize < 0:
886 if headersize < 0:
886 raise error.BundleValueError('negative part header size: %i'
887 raise error.BundleValueError('negative part header size: %i'
887 % headersize)
888 % headersize)
888 indebug(self.ui, 'part header size: %i' % headersize)
889 indebug(self.ui, 'part header size: %i' % headersize)
889 if headersize:
890 if headersize:
890 return self._readexact(headersize)
891 return self._readexact(headersize)
891 return None
892 return None
892
893
893 def compressed(self):
894 def compressed(self):
894 self.params # load params
895 self.params # load params
895 return self._compressed
896 return self._compressed
896
897
897 def close(self):
898 def close(self):
898 """close underlying file"""
899 """close underlying file"""
899 if util.safehasattr(self._fp, 'close'):
900 if util.safehasattr(self._fp, 'close'):
900 return self._fp.close()
901 return self._fp.close()
901
902
902 formatmap = {'20': unbundle20}
903 formatmap = {'20': unbundle20}
903
904
904 b2streamparamsmap = {}
905 b2streamparamsmap = {}
905
906
906 def b2streamparamhandler(name):
907 def b2streamparamhandler(name):
907 """register a handler for a stream level parameter"""
908 """register a handler for a stream level parameter"""
908 def decorator(func):
909 def decorator(func):
909 assert name not in formatmap
910 assert name not in formatmap
910 b2streamparamsmap[name] = func
911 b2streamparamsmap[name] = func
911 return func
912 return func
912 return decorator
913 return decorator
913
914
914 @b2streamparamhandler('compression')
915 @b2streamparamhandler('compression')
915 def processcompression(unbundler, param, value):
916 def processcompression(unbundler, param, value):
916 """read compression parameter and install payload decompression"""
917 """read compression parameter and install payload decompression"""
917 if value not in util.compengines.supportedbundletypes:
918 if value not in util.compengines.supportedbundletypes:
918 raise error.BundleUnknownFeatureError(params=(param,),
919 raise error.BundleUnknownFeatureError(params=(param,),
919 values=(value,))
920 values=(value,))
920 unbundler._compengine = util.compengines.forbundletype(value)
921 unbundler._compengine = util.compengines.forbundletype(value)
921 if value is not None:
922 if value is not None:
922 unbundler._compressed = True
923 unbundler._compressed = True
923
924
924 class bundlepart(object):
925 class bundlepart(object):
925 """A bundle2 part contains application level payload
926 """A bundle2 part contains application level payload
926
927
927 The part `type` is used to route the part to the application level
928 The part `type` is used to route the part to the application level
928 handler.
929 handler.
929
930
930 The part payload is contained in ``part.data``. It could be raw bytes or a
931 The part payload is contained in ``part.data``. It could be raw bytes or a
931 generator of byte chunks.
932 generator of byte chunks.
932
933
933 You can add parameters to the part using the ``addparam`` method.
934 You can add parameters to the part using the ``addparam`` method.
934 Parameters can be either mandatory (default) or advisory. Remote side
935 Parameters can be either mandatory (default) or advisory. Remote side
935 should be able to safely ignore the advisory ones.
936 should be able to safely ignore the advisory ones.
936
937
937 Both data and parameters cannot be modified after the generation has begun.
938 Both data and parameters cannot be modified after the generation has begun.
938 """
939 """
939
940
940 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
941 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
941 data='', mandatory=True):
942 data='', mandatory=True):
942 validateparttype(parttype)
943 validateparttype(parttype)
943 self.id = None
944 self.id = None
944 self.type = parttype
945 self.type = parttype
945 self._data = data
946 self._data = data
946 self._mandatoryparams = list(mandatoryparams)
947 self._mandatoryparams = list(mandatoryparams)
947 self._advisoryparams = list(advisoryparams)
948 self._advisoryparams = list(advisoryparams)
948 # checking for duplicated entries
949 # checking for duplicated entries
949 self._seenparams = set()
950 self._seenparams = set()
950 for pname, __ in self._mandatoryparams + self._advisoryparams:
951 for pname, __ in self._mandatoryparams + self._advisoryparams:
951 if pname in self._seenparams:
952 if pname in self._seenparams:
952 raise error.ProgrammingError('duplicated params: %s' % pname)
953 raise error.ProgrammingError('duplicated params: %s' % pname)
953 self._seenparams.add(pname)
954 self._seenparams.add(pname)
954 # status of the part's generation:
955 # status of the part's generation:
955 # - None: not started,
956 # - None: not started,
956 # - False: currently generated,
957 # - False: currently generated,
957 # - True: generation done.
958 # - True: generation done.
958 self._generated = None
959 self._generated = None
959 self.mandatory = mandatory
960 self.mandatory = mandatory
960
961
961 def __repr__(self):
962 def __repr__(self):
962 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
963 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
963 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
964 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
964 % (cls, id(self), self.id, self.type, self.mandatory))
965 % (cls, id(self), self.id, self.type, self.mandatory))
965
966
966 def copy(self):
967 def copy(self):
967 """return a copy of the part
968 """return a copy of the part
968
969
969 The new part have the very same content but no partid assigned yet.
970 The new part have the very same content but no partid assigned yet.
970 Parts with generated data cannot be copied."""
971 Parts with generated data cannot be copied."""
971 assert not util.safehasattr(self.data, 'next')
972 assert not util.safehasattr(self.data, 'next')
972 return self.__class__(self.type, self._mandatoryparams,
973 return self.__class__(self.type, self._mandatoryparams,
973 self._advisoryparams, self._data, self.mandatory)
974 self._advisoryparams, self._data, self.mandatory)
974
975
975 # methods used to defines the part content
976 # methods used to defines the part content
976 @property
977 @property
977 def data(self):
978 def data(self):
978 return self._data
979 return self._data
979
980
980 @data.setter
981 @data.setter
981 def data(self, data):
982 def data(self, data):
982 if self._generated is not None:
983 if self._generated is not None:
983 raise error.ReadOnlyPartError('part is being generated')
984 raise error.ReadOnlyPartError('part is being generated')
984 self._data = data
985 self._data = data
985
986
986 @property
987 @property
987 def mandatoryparams(self):
988 def mandatoryparams(self):
988 # make it an immutable tuple to force people through ``addparam``
989 # make it an immutable tuple to force people through ``addparam``
989 return tuple(self._mandatoryparams)
990 return tuple(self._mandatoryparams)
990
991
991 @property
992 @property
992 def advisoryparams(self):
993 def advisoryparams(self):
993 # make it an immutable tuple to force people through ``addparam``
994 # make it an immutable tuple to force people through ``addparam``
994 return tuple(self._advisoryparams)
995 return tuple(self._advisoryparams)
995
996
996 def addparam(self, name, value='', mandatory=True):
997 def addparam(self, name, value='', mandatory=True):
997 """add a parameter to the part
998 """add a parameter to the part
998
999
999 If 'mandatory' is set to True, the remote handler must claim support
1000 If 'mandatory' is set to True, the remote handler must claim support
1000 for this parameter or the unbundling will be aborted.
1001 for this parameter or the unbundling will be aborted.
1001
1002
1002 The 'name' and 'value' cannot exceed 255 bytes each.
1003 The 'name' and 'value' cannot exceed 255 bytes each.
1003 """
1004 """
1004 if self._generated is not None:
1005 if self._generated is not None:
1005 raise error.ReadOnlyPartError('part is being generated')
1006 raise error.ReadOnlyPartError('part is being generated')
1006 if name in self._seenparams:
1007 if name in self._seenparams:
1007 raise ValueError('duplicated params: %s' % name)
1008 raise ValueError('duplicated params: %s' % name)
1008 self._seenparams.add(name)
1009 self._seenparams.add(name)
1009 params = self._advisoryparams
1010 params = self._advisoryparams
1010 if mandatory:
1011 if mandatory:
1011 params = self._mandatoryparams
1012 params = self._mandatoryparams
1012 params.append((name, value))
1013 params.append((name, value))
1013
1014
1014 # methods used to generates the bundle2 stream
1015 # methods used to generates the bundle2 stream
1015 def getchunks(self, ui):
1016 def getchunks(self, ui):
1016 if self._generated is not None:
1017 if self._generated is not None:
1017 raise error.ProgrammingError('part can only be consumed once')
1018 raise error.ProgrammingError('part can only be consumed once')
1018 self._generated = False
1019 self._generated = False
1019
1020
1020 if ui.debugflag:
1021 if ui.debugflag:
1021 msg = ['bundle2-output-part: "%s"' % self.type]
1022 msg = ['bundle2-output-part: "%s"' % self.type]
1022 if not self.mandatory:
1023 if not self.mandatory:
1023 msg.append(' (advisory)')
1024 msg.append(' (advisory)')
1024 nbmp = len(self.mandatoryparams)
1025 nbmp = len(self.mandatoryparams)
1025 nbap = len(self.advisoryparams)
1026 nbap = len(self.advisoryparams)
1026 if nbmp or nbap:
1027 if nbmp or nbap:
1027 msg.append(' (params:')
1028 msg.append(' (params:')
1028 if nbmp:
1029 if nbmp:
1029 msg.append(' %i mandatory' % nbmp)
1030 msg.append(' %i mandatory' % nbmp)
1030 if nbap:
1031 if nbap:
1031 msg.append(' %i advisory' % nbmp)
1032 msg.append(' %i advisory' % nbmp)
1032 msg.append(')')
1033 msg.append(')')
1033 if not self.data:
1034 if not self.data:
1034 msg.append(' empty payload')
1035 msg.append(' empty payload')
1035 elif (util.safehasattr(self.data, 'next')
1036 elif (util.safehasattr(self.data, 'next')
1036 or util.safehasattr(self.data, '__next__')):
1037 or util.safehasattr(self.data, '__next__')):
1037 msg.append(' streamed payload')
1038 msg.append(' streamed payload')
1038 else:
1039 else:
1039 msg.append(' %i bytes payload' % len(self.data))
1040 msg.append(' %i bytes payload' % len(self.data))
1040 msg.append('\n')
1041 msg.append('\n')
1041 ui.debug(''.join(msg))
1042 ui.debug(''.join(msg))
1042
1043
1043 #### header
1044 #### header
1044 if self.mandatory:
1045 if self.mandatory:
1045 parttype = self.type.upper()
1046 parttype = self.type.upper()
1046 else:
1047 else:
1047 parttype = self.type.lower()
1048 parttype = self.type.lower()
1048 outdebug(ui, 'part %s: "%s"' % (pycompat.bytestr(self.id), parttype))
1049 outdebug(ui, 'part %s: "%s"' % (pycompat.bytestr(self.id), parttype))
1049 ## parttype
1050 ## parttype
1050 header = [_pack(_fparttypesize, len(parttype)),
1051 header = [_pack(_fparttypesize, len(parttype)),
1051 parttype, _pack(_fpartid, self.id),
1052 parttype, _pack(_fpartid, self.id),
1052 ]
1053 ]
1053 ## parameters
1054 ## parameters
1054 # count
1055 # count
1055 manpar = self.mandatoryparams
1056 manpar = self.mandatoryparams
1056 advpar = self.advisoryparams
1057 advpar = self.advisoryparams
1057 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
1058 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
1058 # size
1059 # size
1059 parsizes = []
1060 parsizes = []
1060 for key, value in manpar:
1061 for key, value in manpar:
1061 parsizes.append(len(key))
1062 parsizes.append(len(key))
1062 parsizes.append(len(value))
1063 parsizes.append(len(value))
1063 for key, value in advpar:
1064 for key, value in advpar:
1064 parsizes.append(len(key))
1065 parsizes.append(len(key))
1065 parsizes.append(len(value))
1066 parsizes.append(len(value))
1066 paramsizes = _pack(_makefpartparamsizes(len(parsizes) // 2), *parsizes)
1067 paramsizes = _pack(_makefpartparamsizes(len(parsizes) // 2), *parsizes)
1067 header.append(paramsizes)
1068 header.append(paramsizes)
1068 # key, value
1069 # key, value
1069 for key, value in manpar:
1070 for key, value in manpar:
1070 header.append(key)
1071 header.append(key)
1071 header.append(value)
1072 header.append(value)
1072 for key, value in advpar:
1073 for key, value in advpar:
1073 header.append(key)
1074 header.append(key)
1074 header.append(value)
1075 header.append(value)
1075 ## finalize header
1076 ## finalize header
1076 try:
1077 try:
1077 headerchunk = ''.join(header)
1078 headerchunk = ''.join(header)
1078 except TypeError:
1079 except TypeError:
1079 raise TypeError(r'Found a non-bytes trying to '
1080 raise TypeError(r'Found a non-bytes trying to '
1080 r'build bundle part header: %r' % header)
1081 r'build bundle part header: %r' % header)
1081 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
1082 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
1082 yield _pack(_fpartheadersize, len(headerchunk))
1083 yield _pack(_fpartheadersize, len(headerchunk))
1083 yield headerchunk
1084 yield headerchunk
1084 ## payload
1085 ## payload
1085 try:
1086 try:
1086 for chunk in self._payloadchunks():
1087 for chunk in self._payloadchunks():
1087 outdebug(ui, 'payload chunk size: %i' % len(chunk))
1088 outdebug(ui, 'payload chunk size: %i' % len(chunk))
1088 yield _pack(_fpayloadsize, len(chunk))
1089 yield _pack(_fpayloadsize, len(chunk))
1089 yield chunk
1090 yield chunk
1090 except GeneratorExit:
1091 except GeneratorExit:
1091 # GeneratorExit means that nobody is listening for our
1092 # GeneratorExit means that nobody is listening for our
1092 # results anyway, so just bail quickly rather than trying
1093 # results anyway, so just bail quickly rather than trying
1093 # to produce an error part.
1094 # to produce an error part.
1094 ui.debug('bundle2-generatorexit\n')
1095 ui.debug('bundle2-generatorexit\n')
1095 raise
1096 raise
1096 except BaseException as exc:
1097 except BaseException as exc:
1097 bexc = stringutil.forcebytestr(exc)
1098 bexc = stringutil.forcebytestr(exc)
1098 # backup exception data for later
1099 # backup exception data for later
1099 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
1100 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
1100 % bexc)
1101 % bexc)
1101 tb = sys.exc_info()[2]
1102 tb = sys.exc_info()[2]
1102 msg = 'unexpected error: %s' % bexc
1103 msg = 'unexpected error: %s' % bexc
1103 interpart = bundlepart('error:abort', [('message', msg)],
1104 interpart = bundlepart('error:abort', [('message', msg)],
1104 mandatory=False)
1105 mandatory=False)
1105 interpart.id = 0
1106 interpart.id = 0
1106 yield _pack(_fpayloadsize, -1)
1107 yield _pack(_fpayloadsize, -1)
1107 for chunk in interpart.getchunks(ui=ui):
1108 for chunk in interpart.getchunks(ui=ui):
1108 yield chunk
1109 yield chunk
1109 outdebug(ui, 'closing payload chunk')
1110 outdebug(ui, 'closing payload chunk')
1110 # abort current part payload
1111 # abort current part payload
1111 yield _pack(_fpayloadsize, 0)
1112 yield _pack(_fpayloadsize, 0)
1112 pycompat.raisewithtb(exc, tb)
1113 pycompat.raisewithtb(exc, tb)
1113 # end of payload
1114 # end of payload
1114 outdebug(ui, 'closing payload chunk')
1115 outdebug(ui, 'closing payload chunk')
1115 yield _pack(_fpayloadsize, 0)
1116 yield _pack(_fpayloadsize, 0)
1116 self._generated = True
1117 self._generated = True
1117
1118
1118 def _payloadchunks(self):
1119 def _payloadchunks(self):
1119 """yield chunks of a the part payload
1120 """yield chunks of a the part payload
1120
1121
1121 Exists to handle the different methods to provide data to a part."""
1122 Exists to handle the different methods to provide data to a part."""
1122 # we only support fixed size data now.
1123 # we only support fixed size data now.
1123 # This will be improved in the future.
1124 # This will be improved in the future.
1124 if (util.safehasattr(self.data, 'next')
1125 if (util.safehasattr(self.data, 'next')
1125 or util.safehasattr(self.data, '__next__')):
1126 or util.safehasattr(self.data, '__next__')):
1126 buff = util.chunkbuffer(self.data)
1127 buff = util.chunkbuffer(self.data)
1127 chunk = buff.read(preferedchunksize)
1128 chunk = buff.read(preferedchunksize)
1128 while chunk:
1129 while chunk:
1129 yield chunk
1130 yield chunk
1130 chunk = buff.read(preferedchunksize)
1131 chunk = buff.read(preferedchunksize)
1131 elif len(self.data):
1132 elif len(self.data):
1132 yield self.data
1133 yield self.data
1133
1134
1134
1135
1135 flaginterrupt = -1
1136 flaginterrupt = -1
1136
1137
1137 class interrupthandler(unpackermixin):
1138 class interrupthandler(unpackermixin):
1138 """read one part and process it with restricted capability
1139 """read one part and process it with restricted capability
1139
1140
1140 This allows to transmit exception raised on the producer size during part
1141 This allows to transmit exception raised on the producer size during part
1141 iteration while the consumer is reading a part.
1142 iteration while the consumer is reading a part.
1142
1143
1143 Part processed in this manner only have access to a ui object,"""
1144 Part processed in this manner only have access to a ui object,"""
1144
1145
1145 def __init__(self, ui, fp):
1146 def __init__(self, ui, fp):
1146 super(interrupthandler, self).__init__(fp)
1147 super(interrupthandler, self).__init__(fp)
1147 self.ui = ui
1148 self.ui = ui
1148
1149
1149 def _readpartheader(self):
1150 def _readpartheader(self):
1150 """reads a part header size and return the bytes blob
1151 """reads a part header size and return the bytes blob
1151
1152
1152 returns None if empty"""
1153 returns None if empty"""
1153 headersize = self._unpack(_fpartheadersize)[0]
1154 headersize = self._unpack(_fpartheadersize)[0]
1154 if headersize < 0:
1155 if headersize < 0:
1155 raise error.BundleValueError('negative part header size: %i'
1156 raise error.BundleValueError('negative part header size: %i'
1156 % headersize)
1157 % headersize)
1157 indebug(self.ui, 'part header size: %i\n' % headersize)
1158 indebug(self.ui, 'part header size: %i\n' % headersize)
1158 if headersize:
1159 if headersize:
1159 return self._readexact(headersize)
1160 return self._readexact(headersize)
1160 return None
1161 return None
1161
1162
1162 def __call__(self):
1163 def __call__(self):
1163
1164
1164 self.ui.debug('bundle2-input-stream-interrupt:'
1165 self.ui.debug('bundle2-input-stream-interrupt:'
1165 ' opening out of band context\n')
1166 ' opening out of band context\n')
1166 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1167 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1167 headerblock = self._readpartheader()
1168 headerblock = self._readpartheader()
1168 if headerblock is None:
1169 if headerblock is None:
1169 indebug(self.ui, 'no part found during interruption.')
1170 indebug(self.ui, 'no part found during interruption.')
1170 return
1171 return
1171 part = unbundlepart(self.ui, headerblock, self._fp)
1172 part = unbundlepart(self.ui, headerblock, self._fp)
1172 op = interruptoperation(self.ui)
1173 op = interruptoperation(self.ui)
1173 hardabort = False
1174 hardabort = False
1174 try:
1175 try:
1175 _processpart(op, part)
1176 _processpart(op, part)
1176 except (SystemExit, KeyboardInterrupt):
1177 except (SystemExit, KeyboardInterrupt):
1177 hardabort = True
1178 hardabort = True
1178 raise
1179 raise
1179 finally:
1180 finally:
1180 if not hardabort:
1181 if not hardabort:
1181 part.consume()
1182 part.consume()
1182 self.ui.debug('bundle2-input-stream-interrupt:'
1183 self.ui.debug('bundle2-input-stream-interrupt:'
1183 ' closing out of band context\n')
1184 ' closing out of band context\n')
1184
1185
1185 class interruptoperation(object):
1186 class interruptoperation(object):
1186 """A limited operation to be use by part handler during interruption
1187 """A limited operation to be use by part handler during interruption
1187
1188
1188 It only have access to an ui object.
1189 It only have access to an ui object.
1189 """
1190 """
1190
1191
1191 def __init__(self, ui):
1192 def __init__(self, ui):
1192 self.ui = ui
1193 self.ui = ui
1193 self.reply = None
1194 self.reply = None
1194 self.captureoutput = False
1195 self.captureoutput = False
1195
1196
1196 @property
1197 @property
1197 def repo(self):
1198 def repo(self):
1198 raise error.ProgrammingError('no repo access from stream interruption')
1199 raise error.ProgrammingError('no repo access from stream interruption')
1199
1200
1200 def gettransaction(self):
1201 def gettransaction(self):
1201 raise TransactionUnavailable('no repo access from stream interruption')
1202 raise TransactionUnavailable('no repo access from stream interruption')
1202
1203
1203 def decodepayloadchunks(ui, fh):
1204 def decodepayloadchunks(ui, fh):
1204 """Reads bundle2 part payload data into chunks.
1205 """Reads bundle2 part payload data into chunks.
1205
1206
1206 Part payload data consists of framed chunks. This function takes
1207 Part payload data consists of framed chunks. This function takes
1207 a file handle and emits those chunks.
1208 a file handle and emits those chunks.
1208 """
1209 """
1209 dolog = ui.configbool('devel', 'bundle2.debug')
1210 dolog = ui.configbool('devel', 'bundle2.debug')
1210 debug = ui.debug
1211 debug = ui.debug
1211
1212
1212 headerstruct = struct.Struct(_fpayloadsize)
1213 headerstruct = struct.Struct(_fpayloadsize)
1213 headersize = headerstruct.size
1214 headersize = headerstruct.size
1214 unpack = headerstruct.unpack
1215 unpack = headerstruct.unpack
1215
1216
1216 readexactly = changegroup.readexactly
1217 readexactly = changegroup.readexactly
1217 read = fh.read
1218 read = fh.read
1218
1219
1219 chunksize = unpack(readexactly(fh, headersize))[0]
1220 chunksize = unpack(readexactly(fh, headersize))[0]
1220 indebug(ui, 'payload chunk size: %i' % chunksize)
1221 indebug(ui, 'payload chunk size: %i' % chunksize)
1221
1222
1222 # changegroup.readexactly() is inlined below for performance.
1223 # changegroup.readexactly() is inlined below for performance.
1223 while chunksize:
1224 while chunksize:
1224 if chunksize >= 0:
1225 if chunksize >= 0:
1225 s = read(chunksize)
1226 s = read(chunksize)
1226 if len(s) < chunksize:
1227 if len(s) < chunksize:
1227 raise error.Abort(_('stream ended unexpectedly '
1228 raise error.Abort(_('stream ended unexpectedly '
1228 ' (got %d bytes, expected %d)') %
1229 ' (got %d bytes, expected %d)') %
1229 (len(s), chunksize))
1230 (len(s), chunksize))
1230
1231
1231 yield s
1232 yield s
1232 elif chunksize == flaginterrupt:
1233 elif chunksize == flaginterrupt:
1233 # Interrupt "signal" detected. The regular stream is interrupted
1234 # Interrupt "signal" detected. The regular stream is interrupted
1234 # and a bundle2 part follows. Consume it.
1235 # and a bundle2 part follows. Consume it.
1235 interrupthandler(ui, fh)()
1236 interrupthandler(ui, fh)()
1236 else:
1237 else:
1237 raise error.BundleValueError(
1238 raise error.BundleValueError(
1238 'negative payload chunk size: %s' % chunksize)
1239 'negative payload chunk size: %s' % chunksize)
1239
1240
1240 s = read(headersize)
1241 s = read(headersize)
1241 if len(s) < headersize:
1242 if len(s) < headersize:
1242 raise error.Abort(_('stream ended unexpectedly '
1243 raise error.Abort(_('stream ended unexpectedly '
1243 ' (got %d bytes, expected %d)') %
1244 ' (got %d bytes, expected %d)') %
1244 (len(s), chunksize))
1245 (len(s), chunksize))
1245
1246
1246 chunksize = unpack(s)[0]
1247 chunksize = unpack(s)[0]
1247
1248
1248 # indebug() inlined for performance.
1249 # indebug() inlined for performance.
1249 if dolog:
1250 if dolog:
1250 debug('bundle2-input: payload chunk size: %i\n' % chunksize)
1251 debug('bundle2-input: payload chunk size: %i\n' % chunksize)
1251
1252
1252 class unbundlepart(unpackermixin):
1253 class unbundlepart(unpackermixin):
1253 """a bundle part read from a bundle"""
1254 """a bundle part read from a bundle"""
1254
1255
1255 def __init__(self, ui, header, fp):
1256 def __init__(self, ui, header, fp):
1256 super(unbundlepart, self).__init__(fp)
1257 super(unbundlepart, self).__init__(fp)
1257 self._seekable = (util.safehasattr(fp, 'seek') and
1258 self._seekable = (util.safehasattr(fp, 'seek') and
1258 util.safehasattr(fp, 'tell'))
1259 util.safehasattr(fp, 'tell'))
1259 self.ui = ui
1260 self.ui = ui
1260 # unbundle state attr
1261 # unbundle state attr
1261 self._headerdata = header
1262 self._headerdata = header
1262 self._headeroffset = 0
1263 self._headeroffset = 0
1263 self._initialized = False
1264 self._initialized = False
1264 self.consumed = False
1265 self.consumed = False
1265 # part data
1266 # part data
1266 self.id = None
1267 self.id = None
1267 self.type = None
1268 self.type = None
1268 self.mandatoryparams = None
1269 self.mandatoryparams = None
1269 self.advisoryparams = None
1270 self.advisoryparams = None
1270 self.params = None
1271 self.params = None
1271 self.mandatorykeys = ()
1272 self.mandatorykeys = ()
1272 self._readheader()
1273 self._readheader()
1273 self._mandatory = None
1274 self._mandatory = None
1274 self._pos = 0
1275 self._pos = 0
1275
1276
1276 def _fromheader(self, size):
1277 def _fromheader(self, size):
1277 """return the next <size> byte from the header"""
1278 """return the next <size> byte from the header"""
1278 offset = self._headeroffset
1279 offset = self._headeroffset
1279 data = self._headerdata[offset:(offset + size)]
1280 data = self._headerdata[offset:(offset + size)]
1280 self._headeroffset = offset + size
1281 self._headeroffset = offset + size
1281 return data
1282 return data
1282
1283
1283 def _unpackheader(self, format):
1284 def _unpackheader(self, format):
1284 """read given format from header
1285 """read given format from header
1285
1286
1286 This automatically compute the size of the format to read."""
1287 This automatically compute the size of the format to read."""
1287 data = self._fromheader(struct.calcsize(format))
1288 data = self._fromheader(struct.calcsize(format))
1288 return _unpack(format, data)
1289 return _unpack(format, data)
1289
1290
1290 def _initparams(self, mandatoryparams, advisoryparams):
1291 def _initparams(self, mandatoryparams, advisoryparams):
1291 """internal function to setup all logic related parameters"""
1292 """internal function to setup all logic related parameters"""
1292 # make it read only to prevent people touching it by mistake.
1293 # make it read only to prevent people touching it by mistake.
1293 self.mandatoryparams = tuple(mandatoryparams)
1294 self.mandatoryparams = tuple(mandatoryparams)
1294 self.advisoryparams = tuple(advisoryparams)
1295 self.advisoryparams = tuple(advisoryparams)
1295 # user friendly UI
1296 # user friendly UI
1296 self.params = util.sortdict(self.mandatoryparams)
1297 self.params = util.sortdict(self.mandatoryparams)
1297 self.params.update(self.advisoryparams)
1298 self.params.update(self.advisoryparams)
1298 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1299 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1299
1300
1300 def _readheader(self):
1301 def _readheader(self):
1301 """read the header and setup the object"""
1302 """read the header and setup the object"""
1302 typesize = self._unpackheader(_fparttypesize)[0]
1303 typesize = self._unpackheader(_fparttypesize)[0]
1303 self.type = self._fromheader(typesize)
1304 self.type = self._fromheader(typesize)
1304 indebug(self.ui, 'part type: "%s"' % self.type)
1305 indebug(self.ui, 'part type: "%s"' % self.type)
1305 self.id = self._unpackheader(_fpartid)[0]
1306 self.id = self._unpackheader(_fpartid)[0]
1306 indebug(self.ui, 'part id: "%s"' % pycompat.bytestr(self.id))
1307 indebug(self.ui, 'part id: "%s"' % pycompat.bytestr(self.id))
1307 # extract mandatory bit from type
1308 # extract mandatory bit from type
1308 self.mandatory = (self.type != self.type.lower())
1309 self.mandatory = (self.type != self.type.lower())
1309 self.type = self.type.lower()
1310 self.type = self.type.lower()
1310 ## reading parameters
1311 ## reading parameters
1311 # param count
1312 # param count
1312 mancount, advcount = self._unpackheader(_fpartparamcount)
1313 mancount, advcount = self._unpackheader(_fpartparamcount)
1313 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1314 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1314 # param size
1315 # param size
1315 fparamsizes = _makefpartparamsizes(mancount + advcount)
1316 fparamsizes = _makefpartparamsizes(mancount + advcount)
1316 paramsizes = self._unpackheader(fparamsizes)
1317 paramsizes = self._unpackheader(fparamsizes)
1317 # make it a list of couple again
1318 # make it a list of couple again
1318 paramsizes = list(zip(paramsizes[::2], paramsizes[1::2]))
1319 paramsizes = list(zip(paramsizes[::2], paramsizes[1::2]))
1319 # split mandatory from advisory
1320 # split mandatory from advisory
1320 mansizes = paramsizes[:mancount]
1321 mansizes = paramsizes[:mancount]
1321 advsizes = paramsizes[mancount:]
1322 advsizes = paramsizes[mancount:]
1322 # retrieve param value
1323 # retrieve param value
1323 manparams = []
1324 manparams = []
1324 for key, value in mansizes:
1325 for key, value in mansizes:
1325 manparams.append((self._fromheader(key), self._fromheader(value)))
1326 manparams.append((self._fromheader(key), self._fromheader(value)))
1326 advparams = []
1327 advparams = []
1327 for key, value in advsizes:
1328 for key, value in advsizes:
1328 advparams.append((self._fromheader(key), self._fromheader(value)))
1329 advparams.append((self._fromheader(key), self._fromheader(value)))
1329 self._initparams(manparams, advparams)
1330 self._initparams(manparams, advparams)
1330 ## part payload
1331 ## part payload
1331 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1332 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1332 # we read the data, tell it
1333 # we read the data, tell it
1333 self._initialized = True
1334 self._initialized = True
1334
1335
1335 def _payloadchunks(self):
1336 def _payloadchunks(self):
1336 """Generator of decoded chunks in the payload."""
1337 """Generator of decoded chunks in the payload."""
1337 return decodepayloadchunks(self.ui, self._fp)
1338 return decodepayloadchunks(self.ui, self._fp)
1338
1339
1339 def consume(self):
1340 def consume(self):
1340 """Read the part payload until completion.
1341 """Read the part payload until completion.
1341
1342
1342 By consuming the part data, the underlying stream read offset will
1343 By consuming the part data, the underlying stream read offset will
1343 be advanced to the next part (or end of stream).
1344 be advanced to the next part (or end of stream).
1344 """
1345 """
1345 if self.consumed:
1346 if self.consumed:
1346 return
1347 return
1347
1348
1348 chunk = self.read(32768)
1349 chunk = self.read(32768)
1349 while chunk:
1350 while chunk:
1350 self._pos += len(chunk)
1351 self._pos += len(chunk)
1351 chunk = self.read(32768)
1352 chunk = self.read(32768)
1352
1353
1353 def read(self, size=None):
1354 def read(self, size=None):
1354 """read payload data"""
1355 """read payload data"""
1355 if not self._initialized:
1356 if not self._initialized:
1356 self._readheader()
1357 self._readheader()
1357 if size is None:
1358 if size is None:
1358 data = self._payloadstream.read()
1359 data = self._payloadstream.read()
1359 else:
1360 else:
1360 data = self._payloadstream.read(size)
1361 data = self._payloadstream.read(size)
1361 self._pos += len(data)
1362 self._pos += len(data)
1362 if size is None or len(data) < size:
1363 if size is None or len(data) < size:
1363 if not self.consumed and self._pos:
1364 if not self.consumed and self._pos:
1364 self.ui.debug('bundle2-input-part: total payload size %i\n'
1365 self.ui.debug('bundle2-input-part: total payload size %i\n'
1365 % self._pos)
1366 % self._pos)
1366 self.consumed = True
1367 self.consumed = True
1367 return data
1368 return data
1368
1369
1369 class seekableunbundlepart(unbundlepart):
1370 class seekableunbundlepart(unbundlepart):
1370 """A bundle2 part in a bundle that is seekable.
1371 """A bundle2 part in a bundle that is seekable.
1371
1372
1372 Regular ``unbundlepart`` instances can only be read once. This class
1373 Regular ``unbundlepart`` instances can only be read once. This class
1373 extends ``unbundlepart`` to enable bi-directional seeking within the
1374 extends ``unbundlepart`` to enable bi-directional seeking within the
1374 part.
1375 part.
1375
1376
1376 Bundle2 part data consists of framed chunks. Offsets when seeking
1377 Bundle2 part data consists of framed chunks. Offsets when seeking
1377 refer to the decoded data, not the offsets in the underlying bundle2
1378 refer to the decoded data, not the offsets in the underlying bundle2
1378 stream.
1379 stream.
1379
1380
1380 To facilitate quickly seeking within the decoded data, instances of this
1381 To facilitate quickly seeking within the decoded data, instances of this
1381 class maintain a mapping between offsets in the underlying stream and
1382 class maintain a mapping between offsets in the underlying stream and
1382 the decoded payload. This mapping will consume memory in proportion
1383 the decoded payload. This mapping will consume memory in proportion
1383 to the number of chunks within the payload (which almost certainly
1384 to the number of chunks within the payload (which almost certainly
1384 increases in proportion with the size of the part).
1385 increases in proportion with the size of the part).
1385 """
1386 """
1386 def __init__(self, ui, header, fp):
1387 def __init__(self, ui, header, fp):
1387 # (payload, file) offsets for chunk starts.
1388 # (payload, file) offsets for chunk starts.
1388 self._chunkindex = []
1389 self._chunkindex = []
1389
1390
1390 super(seekableunbundlepart, self).__init__(ui, header, fp)
1391 super(seekableunbundlepart, self).__init__(ui, header, fp)
1391
1392
1392 def _payloadchunks(self, chunknum=0):
1393 def _payloadchunks(self, chunknum=0):
1393 '''seek to specified chunk and start yielding data'''
1394 '''seek to specified chunk and start yielding data'''
1394 if len(self._chunkindex) == 0:
1395 if len(self._chunkindex) == 0:
1395 assert chunknum == 0, 'Must start with chunk 0'
1396 assert chunknum == 0, 'Must start with chunk 0'
1396 self._chunkindex.append((0, self._tellfp()))
1397 self._chunkindex.append((0, self._tellfp()))
1397 else:
1398 else:
1398 assert chunknum < len(self._chunkindex), \
1399 assert chunknum < len(self._chunkindex), \
1399 'Unknown chunk %d' % chunknum
1400 'Unknown chunk %d' % chunknum
1400 self._seekfp(self._chunkindex[chunknum][1])
1401 self._seekfp(self._chunkindex[chunknum][1])
1401
1402
1402 pos = self._chunkindex[chunknum][0]
1403 pos = self._chunkindex[chunknum][0]
1403
1404
1404 for chunk in decodepayloadchunks(self.ui, self._fp):
1405 for chunk in decodepayloadchunks(self.ui, self._fp):
1405 chunknum += 1
1406 chunknum += 1
1406 pos += len(chunk)
1407 pos += len(chunk)
1407 if chunknum == len(self._chunkindex):
1408 if chunknum == len(self._chunkindex):
1408 self._chunkindex.append((pos, self._tellfp()))
1409 self._chunkindex.append((pos, self._tellfp()))
1409
1410
1410 yield chunk
1411 yield chunk
1411
1412
1412 def _findchunk(self, pos):
1413 def _findchunk(self, pos):
1413 '''for a given payload position, return a chunk number and offset'''
1414 '''for a given payload position, return a chunk number and offset'''
1414 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1415 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1415 if ppos == pos:
1416 if ppos == pos:
1416 return chunk, 0
1417 return chunk, 0
1417 elif ppos > pos:
1418 elif ppos > pos:
1418 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1419 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1419 raise ValueError('Unknown chunk')
1420 raise ValueError('Unknown chunk')
1420
1421
1421 def tell(self):
1422 def tell(self):
1422 return self._pos
1423 return self._pos
1423
1424
1424 def seek(self, offset, whence=os.SEEK_SET):
1425 def seek(self, offset, whence=os.SEEK_SET):
1425 if whence == os.SEEK_SET:
1426 if whence == os.SEEK_SET:
1426 newpos = offset
1427 newpos = offset
1427 elif whence == os.SEEK_CUR:
1428 elif whence == os.SEEK_CUR:
1428 newpos = self._pos + offset
1429 newpos = self._pos + offset
1429 elif whence == os.SEEK_END:
1430 elif whence == os.SEEK_END:
1430 if not self.consumed:
1431 if not self.consumed:
1431 # Can't use self.consume() here because it advances self._pos.
1432 # Can't use self.consume() here because it advances self._pos.
1432 chunk = self.read(32768)
1433 chunk = self.read(32768)
1433 while chunk:
1434 while chunk:
1434 chunk = self.read(32768)
1435 chunk = self.read(32768)
1435 newpos = self._chunkindex[-1][0] - offset
1436 newpos = self._chunkindex[-1][0] - offset
1436 else:
1437 else:
1437 raise ValueError('Unknown whence value: %r' % (whence,))
1438 raise ValueError('Unknown whence value: %r' % (whence,))
1438
1439
1439 if newpos > self._chunkindex[-1][0] and not self.consumed:
1440 if newpos > self._chunkindex[-1][0] and not self.consumed:
1440 # Can't use self.consume() here because it advances self._pos.
1441 # Can't use self.consume() here because it advances self._pos.
1441 chunk = self.read(32768)
1442 chunk = self.read(32768)
1442 while chunk:
1443 while chunk:
1443 chunk = self.read(32668)
1444 chunk = self.read(32668)
1444
1445
1445 if not 0 <= newpos <= self._chunkindex[-1][0]:
1446 if not 0 <= newpos <= self._chunkindex[-1][0]:
1446 raise ValueError('Offset out of range')
1447 raise ValueError('Offset out of range')
1447
1448
1448 if self._pos != newpos:
1449 if self._pos != newpos:
1449 chunk, internaloffset = self._findchunk(newpos)
1450 chunk, internaloffset = self._findchunk(newpos)
1450 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1451 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1451 adjust = self.read(internaloffset)
1452 adjust = self.read(internaloffset)
1452 if len(adjust) != internaloffset:
1453 if len(adjust) != internaloffset:
1453 raise error.Abort(_('Seek failed\n'))
1454 raise error.Abort(_('Seek failed\n'))
1454 self._pos = newpos
1455 self._pos = newpos
1455
1456
1456 def _seekfp(self, offset, whence=0):
1457 def _seekfp(self, offset, whence=0):
1457 """move the underlying file pointer
1458 """move the underlying file pointer
1458
1459
1459 This method is meant for internal usage by the bundle2 protocol only.
1460 This method is meant for internal usage by the bundle2 protocol only.
1460 They directly manipulate the low level stream including bundle2 level
1461 They directly manipulate the low level stream including bundle2 level
1461 instruction.
1462 instruction.
1462
1463
1463 Do not use it to implement higher-level logic or methods."""
1464 Do not use it to implement higher-level logic or methods."""
1464 if self._seekable:
1465 if self._seekable:
1465 return self._fp.seek(offset, whence)
1466 return self._fp.seek(offset, whence)
1466 else:
1467 else:
1467 raise NotImplementedError(_('File pointer is not seekable'))
1468 raise NotImplementedError(_('File pointer is not seekable'))
1468
1469
1469 def _tellfp(self):
1470 def _tellfp(self):
1470 """return the file offset, or None if file is not seekable
1471 """return the file offset, or None if file is not seekable
1471
1472
1472 This method is meant for internal usage by the bundle2 protocol only.
1473 This method is meant for internal usage by the bundle2 protocol only.
1473 They directly manipulate the low level stream including bundle2 level
1474 They directly manipulate the low level stream including bundle2 level
1474 instruction.
1475 instruction.
1475
1476
1476 Do not use it to implement higher-level logic or methods."""
1477 Do not use it to implement higher-level logic or methods."""
1477 if self._seekable:
1478 if self._seekable:
1478 try:
1479 try:
1479 return self._fp.tell()
1480 return self._fp.tell()
1480 except IOError as e:
1481 except IOError as e:
1481 if e.errno == errno.ESPIPE:
1482 if e.errno == errno.ESPIPE:
1482 self._seekable = False
1483 self._seekable = False
1483 else:
1484 else:
1484 raise
1485 raise
1485 return None
1486 return None
1486
1487
1487 # These are only the static capabilities.
1488 # These are only the static capabilities.
1488 # Check the 'getrepocaps' function for the rest.
1489 # Check the 'getrepocaps' function for the rest.
1489 capabilities = {'HG20': (),
1490 capabilities = {'HG20': (),
1490 'bookmarks': (),
1491 'bookmarks': (),
1491 'error': ('abort', 'unsupportedcontent', 'pushraced',
1492 'error': ('abort', 'unsupportedcontent', 'pushraced',
1492 'pushkey'),
1493 'pushkey'),
1493 'listkeys': (),
1494 'listkeys': (),
1494 'pushkey': (),
1495 'pushkey': (),
1495 'digests': tuple(sorted(util.DIGESTS.keys())),
1496 'digests': tuple(sorted(util.DIGESTS.keys())),
1496 'remote-changegroup': ('http', 'https'),
1497 'remote-changegroup': ('http', 'https'),
1497 'hgtagsfnodes': (),
1498 'hgtagsfnodes': (),
1498 'rev-branch-cache': (),
1499 'rev-branch-cache': (),
1499 'phases': ('heads',),
1500 'phases': ('heads',),
1500 'stream': ('v2',),
1501 'stream': ('v2',),
1501 }
1502 }
1502
1503
1503 def getrepocaps(repo, allowpushback=False, role=None):
1504 def getrepocaps(repo, allowpushback=False, role=None):
1504 """return the bundle2 capabilities for a given repo
1505 """return the bundle2 capabilities for a given repo
1505
1506
1506 Exists to allow extensions (like evolution) to mutate the capabilities.
1507 Exists to allow extensions (like evolution) to mutate the capabilities.
1507
1508
1508 The returned value is used for servers advertising their capabilities as
1509 The returned value is used for servers advertising their capabilities as
1509 well as clients advertising their capabilities to servers as part of
1510 well as clients advertising their capabilities to servers as part of
1510 bundle2 requests. The ``role`` argument specifies which is which.
1511 bundle2 requests. The ``role`` argument specifies which is which.
1511 """
1512 """
1512 if role not in ('client', 'server'):
1513 if role not in ('client', 'server'):
1513 raise error.ProgrammingError('role argument must be client or server')
1514 raise error.ProgrammingError('role argument must be client or server')
1514
1515
1515 caps = capabilities.copy()
1516 caps = capabilities.copy()
1516 caps['changegroup'] = tuple(sorted(
1517 caps['changegroup'] = tuple(sorted(
1517 changegroup.supportedincomingversions(repo)))
1518 changegroup.supportedincomingversions(repo)))
1518 if obsolete.isenabled(repo, obsolete.exchangeopt):
1519 if obsolete.isenabled(repo, obsolete.exchangeopt):
1519 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1520 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1520 caps['obsmarkers'] = supportedformat
1521 caps['obsmarkers'] = supportedformat
1521 if allowpushback:
1522 if allowpushback:
1522 caps['pushback'] = ()
1523 caps['pushback'] = ()
1523 cpmode = repo.ui.config('server', 'concurrent-push-mode')
1524 cpmode = repo.ui.config('server', 'concurrent-push-mode')
1524 if cpmode == 'check-related':
1525 if cpmode == 'check-related':
1525 caps['checkheads'] = ('related',)
1526 caps['checkheads'] = ('related',)
1526 if 'phases' in repo.ui.configlist('devel', 'legacy.exchange'):
1527 if 'phases' in repo.ui.configlist('devel', 'legacy.exchange'):
1527 caps.pop('phases')
1528 caps.pop('phases')
1528
1529
1529 # Don't advertise stream clone support in server mode if not configured.
1530 # Don't advertise stream clone support in server mode if not configured.
1530 if role == 'server':
1531 if role == 'server':
1531 streamsupported = repo.ui.configbool('server', 'uncompressed',
1532 streamsupported = repo.ui.configbool('server', 'uncompressed',
1532 untrusted=True)
1533 untrusted=True)
1533 featuresupported = repo.ui.configbool('experimental', 'bundle2.stream')
1534 featuresupported = repo.ui.configbool('experimental', 'bundle2.stream')
1534
1535
1535 if not streamsupported or not featuresupported:
1536 if not streamsupported or not featuresupported:
1536 caps.pop('stream')
1537 caps.pop('stream')
1537 # Else always advertise support on client, because payload support
1538 # Else always advertise support on client, because payload support
1538 # should always be advertised.
1539 # should always be advertised.
1539
1540
1540 return caps
1541 return caps
1541
1542
1542 def bundle2caps(remote):
1543 def bundle2caps(remote):
1543 """return the bundle capabilities of a peer as dict"""
1544 """return the bundle capabilities of a peer as dict"""
1544 raw = remote.capable('bundle2')
1545 raw = remote.capable('bundle2')
1545 if not raw and raw != '':
1546 if not raw and raw != '':
1546 return {}
1547 return {}
1547 capsblob = urlreq.unquote(remote.capable('bundle2'))
1548 capsblob = urlreq.unquote(remote.capable('bundle2'))
1548 return decodecaps(capsblob)
1549 return decodecaps(capsblob)
1549
1550
1550 def obsmarkersversion(caps):
1551 def obsmarkersversion(caps):
1551 """extract the list of supported obsmarkers versions from a bundle2caps dict
1552 """extract the list of supported obsmarkers versions from a bundle2caps dict
1552 """
1553 """
1553 obscaps = caps.get('obsmarkers', ())
1554 obscaps = caps.get('obsmarkers', ())
1554 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1555 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1555
1556
1556 def writenewbundle(ui, repo, source, filename, bundletype, outgoing, opts,
1557 def writenewbundle(ui, repo, source, filename, bundletype, outgoing, opts,
1557 vfs=None, compression=None, compopts=None):
1558 vfs=None, compression=None, compopts=None):
1558 if bundletype.startswith('HG10'):
1559 if bundletype.startswith('HG10'):
1559 cg = changegroup.makechangegroup(repo, outgoing, '01', source)
1560 cg = changegroup.makechangegroup(repo, outgoing, '01', source)
1560 return writebundle(ui, cg, filename, bundletype, vfs=vfs,
1561 return writebundle(ui, cg, filename, bundletype, vfs=vfs,
1561 compression=compression, compopts=compopts)
1562 compression=compression, compopts=compopts)
1562 elif not bundletype.startswith('HG20'):
1563 elif not bundletype.startswith('HG20'):
1563 raise error.ProgrammingError('unknown bundle type: %s' % bundletype)
1564 raise error.ProgrammingError('unknown bundle type: %s' % bundletype)
1564
1565
1565 caps = {}
1566 caps = {}
1566 if 'obsolescence' in opts:
1567 if 'obsolescence' in opts:
1567 caps['obsmarkers'] = ('V1',)
1568 caps['obsmarkers'] = ('V1',)
1568 bundle = bundle20(ui, caps)
1569 bundle = bundle20(ui, caps)
1569 bundle.setcompression(compression, compopts)
1570 bundle.setcompression(compression, compopts)
1570 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1571 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1571 chunkiter = bundle.getchunks()
1572 chunkiter = bundle.getchunks()
1572
1573
1573 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1574 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1574
1575
1575 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1576 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1576 # We should eventually reconcile this logic with the one behind
1577 # We should eventually reconcile this logic with the one behind
1577 # 'exchange.getbundle2partsgenerator'.
1578 # 'exchange.getbundle2partsgenerator'.
1578 #
1579 #
1579 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1580 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1580 # different right now. So we keep them separated for now for the sake of
1581 # different right now. So we keep them separated for now for the sake of
1581 # simplicity.
1582 # simplicity.
1582
1583
1583 # we might not always want a changegroup in such bundle, for example in
1584 # we might not always want a changegroup in such bundle, for example in
1584 # stream bundles
1585 # stream bundles
1585 if opts.get('changegroup', True):
1586 if opts.get('changegroup', True):
1586 cgversion = opts.get('cg.version')
1587 cgversion = opts.get('cg.version')
1587 if cgversion is None:
1588 if cgversion is None:
1588 cgversion = changegroup.safeversion(repo)
1589 cgversion = changegroup.safeversion(repo)
1589 cg = changegroup.makechangegroup(repo, outgoing, cgversion, source)
1590 cg = changegroup.makechangegroup(repo, outgoing, cgversion, source)
1590 part = bundler.newpart('changegroup', data=cg.getchunks())
1591 part = bundler.newpart('changegroup', data=cg.getchunks())
1591 part.addparam('version', cg.version)
1592 part.addparam('version', cg.version)
1592 if 'clcount' in cg.extras:
1593 if 'clcount' in cg.extras:
1593 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1594 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1594 mandatory=False)
1595 mandatory=False)
1595 if opts.get('phases') and repo.revs('%ln and secret()',
1596 if opts.get('phases') and repo.revs('%ln and secret()',
1596 outgoing.missingheads):
1597 outgoing.missingheads):
1597 part.addparam('targetphase', '%d' % phases.secret, mandatory=False)
1598 part.addparam('targetphase', '%d' % phases.secret, mandatory=False)
1598
1599
1599 if opts.get('streamv2', False):
1600 if opts.get('streamv2', False):
1600 addpartbundlestream2(bundler, repo, stream=True)
1601 addpartbundlestream2(bundler, repo, stream=True)
1601
1602
1602 if opts.get('tagsfnodescache', True):
1603 if opts.get('tagsfnodescache', True):
1603 addparttagsfnodescache(repo, bundler, outgoing)
1604 addparttagsfnodescache(repo, bundler, outgoing)
1604
1605
1605 if opts.get('revbranchcache', True):
1606 if opts.get('revbranchcache', True):
1606 addpartrevbranchcache(repo, bundler, outgoing)
1607 addpartrevbranchcache(repo, bundler, outgoing)
1607
1608
1608 if opts.get('obsolescence', False):
1609 if opts.get('obsolescence', False):
1609 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1610 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1610 buildobsmarkerspart(bundler, obsmarkers)
1611 buildobsmarkerspart(bundler, obsmarkers)
1611
1612
1612 if opts.get('phases', False):
1613 if opts.get('phases', False):
1613 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1614 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1614 phasedata = phases.binaryencode(headsbyphase)
1615 phasedata = phases.binaryencode(headsbyphase)
1615 bundler.newpart('phase-heads', data=phasedata)
1616 bundler.newpart('phase-heads', data=phasedata)
1616
1617
1617 def addparttagsfnodescache(repo, bundler, outgoing):
1618 def addparttagsfnodescache(repo, bundler, outgoing):
1618 # we include the tags fnode cache for the bundle changeset
1619 # we include the tags fnode cache for the bundle changeset
1619 # (as an optional parts)
1620 # (as an optional parts)
1620 cache = tags.hgtagsfnodescache(repo.unfiltered())
1621 cache = tags.hgtagsfnodescache(repo.unfiltered())
1621 chunks = []
1622 chunks = []
1622
1623
1623 # .hgtags fnodes are only relevant for head changesets. While we could
1624 # .hgtags fnodes are only relevant for head changesets. While we could
1624 # transfer values for all known nodes, there will likely be little to
1625 # transfer values for all known nodes, there will likely be little to
1625 # no benefit.
1626 # no benefit.
1626 #
1627 #
1627 # We don't bother using a generator to produce output data because
1628 # We don't bother using a generator to produce output data because
1628 # a) we only have 40 bytes per head and even esoteric numbers of heads
1629 # a) we only have 40 bytes per head and even esoteric numbers of heads
1629 # consume little memory (1M heads is 40MB) b) we don't want to send the
1630 # consume little memory (1M heads is 40MB) b) we don't want to send the
1630 # part if we don't have entries and knowing if we have entries requires
1631 # part if we don't have entries and knowing if we have entries requires
1631 # cache lookups.
1632 # cache lookups.
1632 for node in outgoing.missingheads:
1633 for node in outgoing.missingheads:
1633 # Don't compute missing, as this may slow down serving.
1634 # Don't compute missing, as this may slow down serving.
1634 fnode = cache.getfnode(node, computemissing=False)
1635 fnode = cache.getfnode(node, computemissing=False)
1635 if fnode is not None:
1636 if fnode is not None:
1636 chunks.extend([node, fnode])
1637 chunks.extend([node, fnode])
1637
1638
1638 if chunks:
1639 if chunks:
1639 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1640 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1640
1641
1641 def addpartrevbranchcache(repo, bundler, outgoing):
1642 def addpartrevbranchcache(repo, bundler, outgoing):
1642 # we include the rev branch cache for the bundle changeset
1643 # we include the rev branch cache for the bundle changeset
1643 # (as an optional parts)
1644 # (as an optional parts)
1644 cache = repo.revbranchcache()
1645 cache = repo.revbranchcache()
1645 cl = repo.unfiltered().changelog
1646 cl = repo.unfiltered().changelog
1646 branchesdata = collections.defaultdict(lambda: (set(), set()))
1647 branchesdata = collections.defaultdict(lambda: (set(), set()))
1647 for node in outgoing.missing:
1648 for node in outgoing.missing:
1648 branch, close = cache.branchinfo(cl.rev(node))
1649 branch, close = cache.branchinfo(cl.rev(node))
1649 branchesdata[branch][close].add(node)
1650 branchesdata[branch][close].add(node)
1650
1651
1651 def generate():
1652 def generate():
1652 for branch, (nodes, closed) in sorted(branchesdata.items()):
1653 for branch, (nodes, closed) in sorted(branchesdata.items()):
1653 utf8branch = encoding.fromlocal(branch)
1654 utf8branch = encoding.fromlocal(branch)
1654 yield rbcstruct.pack(len(utf8branch), len(nodes), len(closed))
1655 yield rbcstruct.pack(len(utf8branch), len(nodes), len(closed))
1655 yield utf8branch
1656 yield utf8branch
1656 for n in sorted(nodes):
1657 for n in sorted(nodes):
1657 yield n
1658 yield n
1658 for n in sorted(closed):
1659 for n in sorted(closed):
1659 yield n
1660 yield n
1660
1661
1661 bundler.newpart('cache:rev-branch-cache', data=generate())
1662 bundler.newpart('cache:rev-branch-cache', data=generate())
1662
1663
1663 def _formatrequirementsspec(requirements):
1664 def _formatrequirementsspec(requirements):
1664 return urlreq.quote(','.join(sorted(requirements)))
1665 return urlreq.quote(','.join(sorted(requirements)))
1665
1666
1666 def _formatrequirementsparams(requirements):
1667 def _formatrequirementsparams(requirements):
1667 requirements = _formatrequirementsspec(requirements)
1668 requirements = _formatrequirementsspec(requirements)
1668 params = "%s%s" % (urlreq.quote("requirements="), requirements)
1669 params = "%s%s" % (urlreq.quote("requirements="), requirements)
1669 return params
1670 return params
1670
1671
1671 def addpartbundlestream2(bundler, repo, **kwargs):
1672 def addpartbundlestream2(bundler, repo, **kwargs):
1672 if not kwargs.get('stream', False):
1673 if not kwargs.get('stream', False):
1673 return
1674 return
1674
1675
1675 if not streamclone.allowservergeneration(repo):
1676 if not streamclone.allowservergeneration(repo):
1676 raise error.Abort(_('stream data requested but server does not allow '
1677 raise error.Abort(_('stream data requested but server does not allow '
1677 'this feature'),
1678 'this feature'),
1678 hint=_('well-behaved clients should not be '
1679 hint=_('well-behaved clients should not be '
1679 'requesting stream data from servers not '
1680 'requesting stream data from servers not '
1680 'advertising it; the client may be buggy'))
1681 'advertising it; the client may be buggy'))
1681
1682
1682 # Stream clones don't compress well. And compression undermines a
1683 # Stream clones don't compress well. And compression undermines a
1683 # goal of stream clones, which is to be fast. Communicate the desire
1684 # goal of stream clones, which is to be fast. Communicate the desire
1684 # to avoid compression to consumers of the bundle.
1685 # to avoid compression to consumers of the bundle.
1685 bundler.prefercompressed = False
1686 bundler.prefercompressed = False
1686
1687
1687 filecount, bytecount, it = streamclone.generatev2(repo)
1688 filecount, bytecount, it = streamclone.generatev2(repo)
1688 requirements = _formatrequirementsspec(repo.requirements)
1689 requirements = _formatrequirementsspec(repo.requirements)
1689 part = bundler.newpart('stream2', data=it)
1690 part = bundler.newpart('stream2', data=it)
1690 part.addparam('bytecount', '%d' % bytecount, mandatory=True)
1691 part.addparam('bytecount', '%d' % bytecount, mandatory=True)
1691 part.addparam('filecount', '%d' % filecount, mandatory=True)
1692 part.addparam('filecount', '%d' % filecount, mandatory=True)
1692 part.addparam('requirements', requirements, mandatory=True)
1693 part.addparam('requirements', requirements, mandatory=True)
1693
1694
1694 def buildobsmarkerspart(bundler, markers):
1695 def buildobsmarkerspart(bundler, markers):
1695 """add an obsmarker part to the bundler with <markers>
1696 """add an obsmarker part to the bundler with <markers>
1696
1697
1697 No part is created if markers is empty.
1698 No part is created if markers is empty.
1698 Raises ValueError if the bundler doesn't support any known obsmarker format.
1699 Raises ValueError if the bundler doesn't support any known obsmarker format.
1699 """
1700 """
1700 if not markers:
1701 if not markers:
1701 return None
1702 return None
1702
1703
1703 remoteversions = obsmarkersversion(bundler.capabilities)
1704 remoteversions = obsmarkersversion(bundler.capabilities)
1704 version = obsolete.commonversion(remoteversions)
1705 version = obsolete.commonversion(remoteversions)
1705 if version is None:
1706 if version is None:
1706 raise ValueError('bundler does not support common obsmarker format')
1707 raise ValueError('bundler does not support common obsmarker format')
1707 stream = obsolete.encodemarkers(markers, True, version=version)
1708 stream = obsolete.encodemarkers(markers, True, version=version)
1708 return bundler.newpart('obsmarkers', data=stream)
1709 return bundler.newpart('obsmarkers', data=stream)
1709
1710
1710 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1711 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1711 compopts=None):
1712 compopts=None):
1712 """Write a bundle file and return its filename.
1713 """Write a bundle file and return its filename.
1713
1714
1714 Existing files will not be overwritten.
1715 Existing files will not be overwritten.
1715 If no filename is specified, a temporary file is created.
1716 If no filename is specified, a temporary file is created.
1716 bz2 compression can be turned off.
1717 bz2 compression can be turned off.
1717 The bundle file will be deleted in case of errors.
1718 The bundle file will be deleted in case of errors.
1718 """
1719 """
1719
1720
1720 if bundletype == "HG20":
1721 if bundletype == "HG20":
1721 bundle = bundle20(ui)
1722 bundle = bundle20(ui)
1722 bundle.setcompression(compression, compopts)
1723 bundle.setcompression(compression, compopts)
1723 part = bundle.newpart('changegroup', data=cg.getchunks())
1724 part = bundle.newpart('changegroup', data=cg.getchunks())
1724 part.addparam('version', cg.version)
1725 part.addparam('version', cg.version)
1725 if 'clcount' in cg.extras:
1726 if 'clcount' in cg.extras:
1726 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1727 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1727 mandatory=False)
1728 mandatory=False)
1728 chunkiter = bundle.getchunks()
1729 chunkiter = bundle.getchunks()
1729 else:
1730 else:
1730 # compression argument is only for the bundle2 case
1731 # compression argument is only for the bundle2 case
1731 assert compression is None
1732 assert compression is None
1732 if cg.version != '01':
1733 if cg.version != '01':
1733 raise error.Abort(_('old bundle types only supports v1 '
1734 raise error.Abort(_('old bundle types only supports v1 '
1734 'changegroups'))
1735 'changegroups'))
1735 header, comp = bundletypes[bundletype]
1736 header, comp = bundletypes[bundletype]
1736 if comp not in util.compengines.supportedbundletypes:
1737 if comp not in util.compengines.supportedbundletypes:
1737 raise error.Abort(_('unknown stream compression type: %s')
1738 raise error.Abort(_('unknown stream compression type: %s')
1738 % comp)
1739 % comp)
1739 compengine = util.compengines.forbundletype(comp)
1740 compengine = util.compengines.forbundletype(comp)
1740 def chunkiter():
1741 def chunkiter():
1741 yield header
1742 yield header
1742 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1743 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1743 yield chunk
1744 yield chunk
1744 chunkiter = chunkiter()
1745 chunkiter = chunkiter()
1745
1746
1746 # parse the changegroup data, otherwise we will block
1747 # parse the changegroup data, otherwise we will block
1747 # in case of sshrepo because we don't know the end of the stream
1748 # in case of sshrepo because we don't know the end of the stream
1748 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1749 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1749
1750
1750 def combinechangegroupresults(op):
1751 def combinechangegroupresults(op):
1751 """logic to combine 0 or more addchangegroup results into one"""
1752 """logic to combine 0 or more addchangegroup results into one"""
1752 results = [r.get('return', 0)
1753 results = [r.get('return', 0)
1753 for r in op.records['changegroup']]
1754 for r in op.records['changegroup']]
1754 changedheads = 0
1755 changedheads = 0
1755 result = 1
1756 result = 1
1756 for ret in results:
1757 for ret in results:
1757 # If any changegroup result is 0, return 0
1758 # If any changegroup result is 0, return 0
1758 if ret == 0:
1759 if ret == 0:
1759 result = 0
1760 result = 0
1760 break
1761 break
1761 if ret < -1:
1762 if ret < -1:
1762 changedheads += ret + 1
1763 changedheads += ret + 1
1763 elif ret > 1:
1764 elif ret > 1:
1764 changedheads += ret - 1
1765 changedheads += ret - 1
1765 if changedheads > 0:
1766 if changedheads > 0:
1766 result = 1 + changedheads
1767 result = 1 + changedheads
1767 elif changedheads < 0:
1768 elif changedheads < 0:
1768 result = -1 + changedheads
1769 result = -1 + changedheads
1769 return result
1770 return result
1770
1771
1771 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest',
1772 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest',
1772 'targetphase'))
1773 'targetphase'))
1773 def handlechangegroup(op, inpart):
1774 def handlechangegroup(op, inpart):
1774 """apply a changegroup part on the repo
1775 """apply a changegroup part on the repo
1775
1776
1776 This is a very early implementation that will massive rework before being
1777 This is a very early implementation that will massive rework before being
1777 inflicted to any end-user.
1778 inflicted to any end-user.
1778 """
1779 """
1779 tr = op.gettransaction()
1780 tr = op.gettransaction()
1780 unpackerversion = inpart.params.get('version', '01')
1781 unpackerversion = inpart.params.get('version', '01')
1781 # We should raise an appropriate exception here
1782 # We should raise an appropriate exception here
1782 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1783 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1783 # the source and url passed here are overwritten by the one contained in
1784 # the source and url passed here are overwritten by the one contained in
1784 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1785 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1785 nbchangesets = None
1786 nbchangesets = None
1786 if 'nbchanges' in inpart.params:
1787 if 'nbchanges' in inpart.params:
1787 nbchangesets = int(inpart.params.get('nbchanges'))
1788 nbchangesets = int(inpart.params.get('nbchanges'))
1788 if ('treemanifest' in inpart.params and
1789 if ('treemanifest' in inpart.params and
1789 'treemanifest' not in op.repo.requirements):
1790 'treemanifest' not in op.repo.requirements):
1790 if len(op.repo.changelog) != 0:
1791 if len(op.repo.changelog) != 0:
1791 raise error.Abort(_(
1792 raise error.Abort(_(
1792 "bundle contains tree manifests, but local repo is "
1793 "bundle contains tree manifests, but local repo is "
1793 "non-empty and does not use tree manifests"))
1794 "non-empty and does not use tree manifests"))
1794 op.repo.requirements.add('treemanifest')
1795 op.repo.requirements.add('treemanifest')
1795 op.repo._applyopenerreqs()
1796 op.repo._applyopenerreqs()
1796 op.repo._writerequirements()
1797 op.repo._writerequirements()
1797 extrakwargs = {}
1798 extrakwargs = {}
1798 targetphase = inpart.params.get('targetphase')
1799 targetphase = inpart.params.get('targetphase')
1799 if targetphase is not None:
1800 if targetphase is not None:
1800 extrakwargs[r'targetphase'] = int(targetphase)
1801 extrakwargs[r'targetphase'] = int(targetphase)
1801 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2',
1802 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2',
1802 expectedtotal=nbchangesets, **extrakwargs)
1803 expectedtotal=nbchangesets, **extrakwargs)
1803 if op.reply is not None:
1804 if op.reply is not None:
1804 # This is definitely not the final form of this
1805 # This is definitely not the final form of this
1805 # return. But one need to start somewhere.
1806 # return. But one need to start somewhere.
1806 part = op.reply.newpart('reply:changegroup', mandatory=False)
1807 part = op.reply.newpart('reply:changegroup', mandatory=False)
1807 part.addparam(
1808 part.addparam(
1808 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1809 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1809 part.addparam('return', '%i' % ret, mandatory=False)
1810 part.addparam('return', '%i' % ret, mandatory=False)
1810 assert not inpart.read()
1811 assert not inpart.read()
1811
1812
1812 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1813 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1813 ['digest:%s' % k for k in util.DIGESTS.keys()])
1814 ['digest:%s' % k for k in util.DIGESTS.keys()])
1814 @parthandler('remote-changegroup', _remotechangegroupparams)
1815 @parthandler('remote-changegroup', _remotechangegroupparams)
1815 def handleremotechangegroup(op, inpart):
1816 def handleremotechangegroup(op, inpart):
1816 """apply a bundle10 on the repo, given an url and validation information
1817 """apply a bundle10 on the repo, given an url and validation information
1817
1818
1818 All the information about the remote bundle to import are given as
1819 All the information about the remote bundle to import are given as
1819 parameters. The parameters include:
1820 parameters. The parameters include:
1820 - url: the url to the bundle10.
1821 - url: the url to the bundle10.
1821 - size: the bundle10 file size. It is used to validate what was
1822 - size: the bundle10 file size. It is used to validate what was
1822 retrieved by the client matches the server knowledge about the bundle.
1823 retrieved by the client matches the server knowledge about the bundle.
1823 - digests: a space separated list of the digest types provided as
1824 - digests: a space separated list of the digest types provided as
1824 parameters.
1825 parameters.
1825 - digest:<digest-type>: the hexadecimal representation of the digest with
1826 - digest:<digest-type>: the hexadecimal representation of the digest with
1826 that name. Like the size, it is used to validate what was retrieved by
1827 that name. Like the size, it is used to validate what was retrieved by
1827 the client matches what the server knows about the bundle.
1828 the client matches what the server knows about the bundle.
1828
1829
1829 When multiple digest types are given, all of them are checked.
1830 When multiple digest types are given, all of them are checked.
1830 """
1831 """
1831 try:
1832 try:
1832 raw_url = inpart.params['url']
1833 raw_url = inpart.params['url']
1833 except KeyError:
1834 except KeyError:
1834 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1835 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1835 parsed_url = util.url(raw_url)
1836 parsed_url = util.url(raw_url)
1836 if parsed_url.scheme not in capabilities['remote-changegroup']:
1837 if parsed_url.scheme not in capabilities['remote-changegroup']:
1837 raise error.Abort(_('remote-changegroup does not support %s urls') %
1838 raise error.Abort(_('remote-changegroup does not support %s urls') %
1838 parsed_url.scheme)
1839 parsed_url.scheme)
1839
1840
1840 try:
1841 try:
1841 size = int(inpart.params['size'])
1842 size = int(inpart.params['size'])
1842 except ValueError:
1843 except ValueError:
1843 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1844 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1844 % 'size')
1845 % 'size')
1845 except KeyError:
1846 except KeyError:
1846 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1847 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1847
1848
1848 digests = {}
1849 digests = {}
1849 for typ in inpart.params.get('digests', '').split():
1850 for typ in inpart.params.get('digests', '').split():
1850 param = 'digest:%s' % typ
1851 param = 'digest:%s' % typ
1851 try:
1852 try:
1852 value = inpart.params[param]
1853 value = inpart.params[param]
1853 except KeyError:
1854 except KeyError:
1854 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1855 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1855 param)
1856 param)
1856 digests[typ] = value
1857 digests[typ] = value
1857
1858
1858 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1859 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1859
1860
1860 tr = op.gettransaction()
1861 tr = op.gettransaction()
1861 from . import exchange
1862 from . import exchange
1862 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1863 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1863 if not isinstance(cg, changegroup.cg1unpacker):
1864 if not isinstance(cg, changegroup.cg1unpacker):
1864 raise error.Abort(_('%s: not a bundle version 1.0') %
1865 raise error.Abort(_('%s: not a bundle version 1.0') %
1865 util.hidepassword(raw_url))
1866 util.hidepassword(raw_url))
1866 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2')
1867 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2')
1867 if op.reply is not None:
1868 if op.reply is not None:
1868 # This is definitely not the final form of this
1869 # This is definitely not the final form of this
1869 # return. But one need to start somewhere.
1870 # return. But one need to start somewhere.
1870 part = op.reply.newpart('reply:changegroup')
1871 part = op.reply.newpart('reply:changegroup')
1871 part.addparam(
1872 part.addparam(
1872 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1873 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1873 part.addparam('return', '%i' % ret, mandatory=False)
1874 part.addparam('return', '%i' % ret, mandatory=False)
1874 try:
1875 try:
1875 real_part.validate()
1876 real_part.validate()
1876 except error.Abort as e:
1877 except error.Abort as e:
1877 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1878 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1878 (util.hidepassword(raw_url), str(e)))
1879 (util.hidepassword(raw_url), str(e)))
1879 assert not inpart.read()
1880 assert not inpart.read()
1880
1881
1881 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1882 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1882 def handlereplychangegroup(op, inpart):
1883 def handlereplychangegroup(op, inpart):
1883 ret = int(inpart.params['return'])
1884 ret = int(inpart.params['return'])
1884 replyto = int(inpart.params['in-reply-to'])
1885 replyto = int(inpart.params['in-reply-to'])
1885 op.records.add('changegroup', {'return': ret}, replyto)
1886 op.records.add('changegroup', {'return': ret}, replyto)
1886
1887
1887 @parthandler('check:bookmarks')
1888 @parthandler('check:bookmarks')
1888 def handlecheckbookmarks(op, inpart):
1889 def handlecheckbookmarks(op, inpart):
1889 """check location of bookmarks
1890 """check location of bookmarks
1890
1891
1891 This part is to be used to detect push race regarding bookmark, it
1892 This part is to be used to detect push race regarding bookmark, it
1892 contains binary encoded (bookmark, node) tuple. If the local state does
1893 contains binary encoded (bookmark, node) tuple. If the local state does
1893 not marks the one in the part, a PushRaced exception is raised
1894 not marks the one in the part, a PushRaced exception is raised
1894 """
1895 """
1895 bookdata = bookmarks.binarydecode(inpart)
1896 bookdata = bookmarks.binarydecode(inpart)
1896
1897
1897 msgstandard = ('repository changed while pushing - please try again '
1898 msgstandard = ('repository changed while pushing - please try again '
1898 '(bookmark "%s" move from %s to %s)')
1899 '(bookmark "%s" move from %s to %s)')
1899 msgmissing = ('repository changed while pushing - please try again '
1900 msgmissing = ('repository changed while pushing - please try again '
1900 '(bookmark "%s" is missing, expected %s)')
1901 '(bookmark "%s" is missing, expected %s)')
1901 msgexist = ('repository changed while pushing - please try again '
1902 msgexist = ('repository changed while pushing - please try again '
1902 '(bookmark "%s" set on %s, expected missing)')
1903 '(bookmark "%s" set on %s, expected missing)')
1903 for book, node in bookdata:
1904 for book, node in bookdata:
1904 currentnode = op.repo._bookmarks.get(book)
1905 currentnode = op.repo._bookmarks.get(book)
1905 if currentnode != node:
1906 if currentnode != node:
1906 if node is None:
1907 if node is None:
1907 finalmsg = msgexist % (book, nodemod.short(currentnode))
1908 finalmsg = msgexist % (book, nodemod.short(currentnode))
1908 elif currentnode is None:
1909 elif currentnode is None:
1909 finalmsg = msgmissing % (book, nodemod.short(node))
1910 finalmsg = msgmissing % (book, nodemod.short(node))
1910 else:
1911 else:
1911 finalmsg = msgstandard % (book, nodemod.short(node),
1912 finalmsg = msgstandard % (book, nodemod.short(node),
1912 nodemod.short(currentnode))
1913 nodemod.short(currentnode))
1913 raise error.PushRaced(finalmsg)
1914 raise error.PushRaced(finalmsg)
1914
1915
1915 @parthandler('check:heads')
1916 @parthandler('check:heads')
1916 def handlecheckheads(op, inpart):
1917 def handlecheckheads(op, inpart):
1917 """check that head of the repo did not change
1918 """check that head of the repo did not change
1918
1919
1919 This is used to detect a push race when using unbundle.
1920 This is used to detect a push race when using unbundle.
1920 This replaces the "heads" argument of unbundle."""
1921 This replaces the "heads" argument of unbundle."""
1921 h = inpart.read(20)
1922 h = inpart.read(20)
1922 heads = []
1923 heads = []
1923 while len(h) == 20:
1924 while len(h) == 20:
1924 heads.append(h)
1925 heads.append(h)
1925 h = inpart.read(20)
1926 h = inpart.read(20)
1926 assert not h
1927 assert not h
1927 # Trigger a transaction so that we are guaranteed to have the lock now.
1928 # Trigger a transaction so that we are guaranteed to have the lock now.
1928 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1929 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1929 op.gettransaction()
1930 op.gettransaction()
1930 if sorted(heads) != sorted(op.repo.heads()):
1931 if sorted(heads) != sorted(op.repo.heads()):
1931 raise error.PushRaced('repository changed while pushing - '
1932 raise error.PushRaced('repository changed while pushing - '
1932 'please try again')
1933 'please try again')
1933
1934
1934 @parthandler('check:updated-heads')
1935 @parthandler('check:updated-heads')
1935 def handlecheckupdatedheads(op, inpart):
1936 def handlecheckupdatedheads(op, inpart):
1936 """check for race on the heads touched by a push
1937 """check for race on the heads touched by a push
1937
1938
1938 This is similar to 'check:heads' but focus on the heads actually updated
1939 This is similar to 'check:heads' but focus on the heads actually updated
1939 during the push. If other activities happen on unrelated heads, it is
1940 during the push. If other activities happen on unrelated heads, it is
1940 ignored.
1941 ignored.
1941
1942
1942 This allow server with high traffic to avoid push contention as long as
1943 This allow server with high traffic to avoid push contention as long as
1943 unrelated parts of the graph are involved."""
1944 unrelated parts of the graph are involved."""
1944 h = inpart.read(20)
1945 h = inpart.read(20)
1945 heads = []
1946 heads = []
1946 while len(h) == 20:
1947 while len(h) == 20:
1947 heads.append(h)
1948 heads.append(h)
1948 h = inpart.read(20)
1949 h = inpart.read(20)
1949 assert not h
1950 assert not h
1950 # trigger a transaction so that we are guaranteed to have the lock now.
1951 # trigger a transaction so that we are guaranteed to have the lock now.
1951 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1952 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1952 op.gettransaction()
1953 op.gettransaction()
1953
1954
1954 currentheads = set()
1955 currentheads = set()
1955 for ls in op.repo.branchmap().itervalues():
1956 for ls in op.repo.branchmap().itervalues():
1956 currentheads.update(ls)
1957 currentheads.update(ls)
1957
1958
1958 for h in heads:
1959 for h in heads:
1959 if h not in currentheads:
1960 if h not in currentheads:
1960 raise error.PushRaced('repository changed while pushing - '
1961 raise error.PushRaced('repository changed while pushing - '
1961 'please try again')
1962 'please try again')
1962
1963
1963 @parthandler('check:phases')
1964 @parthandler('check:phases')
1964 def handlecheckphases(op, inpart):
1965 def handlecheckphases(op, inpart):
1965 """check that phase boundaries of the repository did not change
1966 """check that phase boundaries of the repository did not change
1966
1967
1967 This is used to detect a push race.
1968 This is used to detect a push race.
1968 """
1969 """
1969 phasetonodes = phases.binarydecode(inpart)
1970 phasetonodes = phases.binarydecode(inpart)
1970 unfi = op.repo.unfiltered()
1971 unfi = op.repo.unfiltered()
1971 cl = unfi.changelog
1972 cl = unfi.changelog
1972 phasecache = unfi._phasecache
1973 phasecache = unfi._phasecache
1973 msg = ('repository changed while pushing - please try again '
1974 msg = ('repository changed while pushing - please try again '
1974 '(%s is %s expected %s)')
1975 '(%s is %s expected %s)')
1975 for expectedphase, nodes in enumerate(phasetonodes):
1976 for expectedphase, nodes in enumerate(phasetonodes):
1976 for n in nodes:
1977 for n in nodes:
1977 actualphase = phasecache.phase(unfi, cl.rev(n))
1978 actualphase = phasecache.phase(unfi, cl.rev(n))
1978 if actualphase != expectedphase:
1979 if actualphase != expectedphase:
1979 finalmsg = msg % (nodemod.short(n),
1980 finalmsg = msg % (nodemod.short(n),
1980 phases.phasenames[actualphase],
1981 phases.phasenames[actualphase],
1981 phases.phasenames[expectedphase])
1982 phases.phasenames[expectedphase])
1982 raise error.PushRaced(finalmsg)
1983 raise error.PushRaced(finalmsg)
1983
1984
1984 @parthandler('output')
1985 @parthandler('output')
1985 def handleoutput(op, inpart):
1986 def handleoutput(op, inpart):
1986 """forward output captured on the server to the client"""
1987 """forward output captured on the server to the client"""
1987 for line in inpart.read().splitlines():
1988 for line in inpart.read().splitlines():
1988 op.ui.status(_('remote: %s\n') % line)
1989 op.ui.status(_('remote: %s\n') % line)
1989
1990
1990 @parthandler('replycaps')
1991 @parthandler('replycaps')
1991 def handlereplycaps(op, inpart):
1992 def handlereplycaps(op, inpart):
1992 """Notify that a reply bundle should be created
1993 """Notify that a reply bundle should be created
1993
1994
1994 The payload contains the capabilities information for the reply"""
1995 The payload contains the capabilities information for the reply"""
1995 caps = decodecaps(inpart.read())
1996 caps = decodecaps(inpart.read())
1996 if op.reply is None:
1997 if op.reply is None:
1997 op.reply = bundle20(op.ui, caps)
1998 op.reply = bundle20(op.ui, caps)
1998
1999
1999 class AbortFromPart(error.Abort):
2000 class AbortFromPart(error.Abort):
2000 """Sub-class of Abort that denotes an error from a bundle2 part."""
2001 """Sub-class of Abort that denotes an error from a bundle2 part."""
2001
2002
2002 @parthandler('error:abort', ('message', 'hint'))
2003 @parthandler('error:abort', ('message', 'hint'))
2003 def handleerrorabort(op, inpart):
2004 def handleerrorabort(op, inpart):
2004 """Used to transmit abort error over the wire"""
2005 """Used to transmit abort error over the wire"""
2005 raise AbortFromPart(inpart.params['message'],
2006 raise AbortFromPart(inpart.params['message'],
2006 hint=inpart.params.get('hint'))
2007 hint=inpart.params.get('hint'))
2007
2008
2008 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
2009 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
2009 'in-reply-to'))
2010 'in-reply-to'))
2010 def handleerrorpushkey(op, inpart):
2011 def handleerrorpushkey(op, inpart):
2011 """Used to transmit failure of a mandatory pushkey over the wire"""
2012 """Used to transmit failure of a mandatory pushkey over the wire"""
2012 kwargs = {}
2013 kwargs = {}
2013 for name in ('namespace', 'key', 'new', 'old', 'ret'):
2014 for name in ('namespace', 'key', 'new', 'old', 'ret'):
2014 value = inpart.params.get(name)
2015 value = inpart.params.get(name)
2015 if value is not None:
2016 if value is not None:
2016 kwargs[name] = value
2017 kwargs[name] = value
2017 raise error.PushkeyFailed(inpart.params['in-reply-to'],
2018 raise error.PushkeyFailed(inpart.params['in-reply-to'],
2018 **pycompat.strkwargs(kwargs))
2019 **pycompat.strkwargs(kwargs))
2019
2020
2020 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
2021 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
2021 def handleerrorunsupportedcontent(op, inpart):
2022 def handleerrorunsupportedcontent(op, inpart):
2022 """Used to transmit unknown content error over the wire"""
2023 """Used to transmit unknown content error over the wire"""
2023 kwargs = {}
2024 kwargs = {}
2024 parttype = inpart.params.get('parttype')
2025 parttype = inpart.params.get('parttype')
2025 if parttype is not None:
2026 if parttype is not None:
2026 kwargs['parttype'] = parttype
2027 kwargs['parttype'] = parttype
2027 params = inpart.params.get('params')
2028 params = inpart.params.get('params')
2028 if params is not None:
2029 if params is not None:
2029 kwargs['params'] = params.split('\0')
2030 kwargs['params'] = params.split('\0')
2030
2031
2031 raise error.BundleUnknownFeatureError(**pycompat.strkwargs(kwargs))
2032 raise error.BundleUnknownFeatureError(**pycompat.strkwargs(kwargs))
2032
2033
2033 @parthandler('error:pushraced', ('message',))
2034 @parthandler('error:pushraced', ('message',))
2034 def handleerrorpushraced(op, inpart):
2035 def handleerrorpushraced(op, inpart):
2035 """Used to transmit push race error over the wire"""
2036 """Used to transmit push race error over the wire"""
2036 raise error.ResponseError(_('push failed:'), inpart.params['message'])
2037 raise error.ResponseError(_('push failed:'), inpart.params['message'])
2037
2038
2038 @parthandler('listkeys', ('namespace',))
2039 @parthandler('listkeys', ('namespace',))
2039 def handlelistkeys(op, inpart):
2040 def handlelistkeys(op, inpart):
2040 """retrieve pushkey namespace content stored in a bundle2"""
2041 """retrieve pushkey namespace content stored in a bundle2"""
2041 namespace = inpart.params['namespace']
2042 namespace = inpart.params['namespace']
2042 r = pushkey.decodekeys(inpart.read())
2043 r = pushkey.decodekeys(inpart.read())
2043 op.records.add('listkeys', (namespace, r))
2044 op.records.add('listkeys', (namespace, r))
2044
2045
2045 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
2046 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
2046 def handlepushkey(op, inpart):
2047 def handlepushkey(op, inpart):
2047 """process a pushkey request"""
2048 """process a pushkey request"""
2048 dec = pushkey.decode
2049 dec = pushkey.decode
2049 namespace = dec(inpart.params['namespace'])
2050 namespace = dec(inpart.params['namespace'])
2050 key = dec(inpart.params['key'])
2051 key = dec(inpart.params['key'])
2051 old = dec(inpart.params['old'])
2052 old = dec(inpart.params['old'])
2052 new = dec(inpart.params['new'])
2053 new = dec(inpart.params['new'])
2053 # Grab the transaction to ensure that we have the lock before performing the
2054 # Grab the transaction to ensure that we have the lock before performing the
2054 # pushkey.
2055 # pushkey.
2055 if op.ui.configbool('experimental', 'bundle2lazylocking'):
2056 if op.ui.configbool('experimental', 'bundle2lazylocking'):
2056 op.gettransaction()
2057 op.gettransaction()
2057 ret = op.repo.pushkey(namespace, key, old, new)
2058 ret = op.repo.pushkey(namespace, key, old, new)
2058 record = {'namespace': namespace,
2059 record = {'namespace': namespace,
2059 'key': key,
2060 'key': key,
2060 'old': old,
2061 'old': old,
2061 'new': new}
2062 'new': new}
2062 op.records.add('pushkey', record)
2063 op.records.add('pushkey', record)
2063 if op.reply is not None:
2064 if op.reply is not None:
2064 rpart = op.reply.newpart('reply:pushkey')
2065 rpart = op.reply.newpart('reply:pushkey')
2065 rpart.addparam(
2066 rpart.addparam(
2066 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
2067 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
2067 rpart.addparam('return', '%i' % ret, mandatory=False)
2068 rpart.addparam('return', '%i' % ret, mandatory=False)
2068 if inpart.mandatory and not ret:
2069 if inpart.mandatory and not ret:
2069 kwargs = {}
2070 kwargs = {}
2070 for key in ('namespace', 'key', 'new', 'old', 'ret'):
2071 for key in ('namespace', 'key', 'new', 'old', 'ret'):
2071 if key in inpart.params:
2072 if key in inpart.params:
2072 kwargs[key] = inpart.params[key]
2073 kwargs[key] = inpart.params[key]
2073 raise error.PushkeyFailed(partid='%d' % inpart.id,
2074 raise error.PushkeyFailed(partid='%d' % inpart.id,
2074 **pycompat.strkwargs(kwargs))
2075 **pycompat.strkwargs(kwargs))
2075
2076
2076 @parthandler('bookmarks')
2077 @parthandler('bookmarks')
2077 def handlebookmark(op, inpart):
2078 def handlebookmark(op, inpart):
2078 """transmit bookmark information
2079 """transmit bookmark information
2079
2080
2080 The part contains binary encoded bookmark information.
2081 The part contains binary encoded bookmark information.
2081
2082
2082 The exact behavior of this part can be controlled by the 'bookmarks' mode
2083 The exact behavior of this part can be controlled by the 'bookmarks' mode
2083 on the bundle operation.
2084 on the bundle operation.
2084
2085
2085 When mode is 'apply' (the default) the bookmark information is applied as
2086 When mode is 'apply' (the default) the bookmark information is applied as
2086 is to the unbundling repository. Make sure a 'check:bookmarks' part is
2087 is to the unbundling repository. Make sure a 'check:bookmarks' part is
2087 issued earlier to check for push races in such update. This behavior is
2088 issued earlier to check for push races in such update. This behavior is
2088 suitable for pushing.
2089 suitable for pushing.
2089
2090
2090 When mode is 'records', the information is recorded into the 'bookmarks'
2091 When mode is 'records', the information is recorded into the 'bookmarks'
2091 records of the bundle operation. This behavior is suitable for pulling.
2092 records of the bundle operation. This behavior is suitable for pulling.
2092 """
2093 """
2093 changes = bookmarks.binarydecode(inpart)
2094 changes = bookmarks.binarydecode(inpart)
2094
2095
2095 pushkeycompat = op.repo.ui.configbool('server', 'bookmarks-pushkey-compat')
2096 pushkeycompat = op.repo.ui.configbool('server', 'bookmarks-pushkey-compat')
2096 bookmarksmode = op.modes.get('bookmarks', 'apply')
2097 bookmarksmode = op.modes.get('bookmarks', 'apply')
2097
2098
2098 if bookmarksmode == 'apply':
2099 if bookmarksmode == 'apply':
2099 tr = op.gettransaction()
2100 tr = op.gettransaction()
2100 bookstore = op.repo._bookmarks
2101 bookstore = op.repo._bookmarks
2101 if pushkeycompat:
2102 if pushkeycompat:
2102 allhooks = []
2103 allhooks = []
2103 for book, node in changes:
2104 for book, node in changes:
2104 hookargs = tr.hookargs.copy()
2105 hookargs = tr.hookargs.copy()
2105 hookargs['pushkeycompat'] = '1'
2106 hookargs['pushkeycompat'] = '1'
2106 hookargs['namespace'] = 'bookmarks'
2107 hookargs['namespace'] = 'bookmarks'
2107 hookargs['key'] = book
2108 hookargs['key'] = book
2108 hookargs['old'] = nodemod.hex(bookstore.get(book, ''))
2109 hookargs['old'] = nodemod.hex(bookstore.get(book, ''))
2109 hookargs['new'] = nodemod.hex(node if node is not None else '')
2110 hookargs['new'] = nodemod.hex(node if node is not None else '')
2110 allhooks.append(hookargs)
2111 allhooks.append(hookargs)
2111
2112
2112 for hookargs in allhooks:
2113 for hookargs in allhooks:
2113 op.repo.hook('prepushkey', throw=True,
2114 op.repo.hook('prepushkey', throw=True,
2114 **pycompat.strkwargs(hookargs))
2115 **pycompat.strkwargs(hookargs))
2115
2116
2116 bookstore.applychanges(op.repo, op.gettransaction(), changes)
2117 bookstore.applychanges(op.repo, op.gettransaction(), changes)
2117
2118
2118 if pushkeycompat:
2119 if pushkeycompat:
2119 def runhook():
2120 def runhook():
2120 for hookargs in allhooks:
2121 for hookargs in allhooks:
2121 op.repo.hook('pushkey', **pycompat.strkwargs(hookargs))
2122 op.repo.hook('pushkey', **pycompat.strkwargs(hookargs))
2122 op.repo._afterlock(runhook)
2123 op.repo._afterlock(runhook)
2123
2124
2124 elif bookmarksmode == 'records':
2125 elif bookmarksmode == 'records':
2125 for book, node in changes:
2126 for book, node in changes:
2126 record = {'bookmark': book, 'node': node}
2127 record = {'bookmark': book, 'node': node}
2127 op.records.add('bookmarks', record)
2128 op.records.add('bookmarks', record)
2128 else:
2129 else:
2129 raise error.ProgrammingError('unkown bookmark mode: %s' % bookmarksmode)
2130 raise error.ProgrammingError('unkown bookmark mode: %s' % bookmarksmode)
2130
2131
2131 @parthandler('phase-heads')
2132 @parthandler('phase-heads')
2132 def handlephases(op, inpart):
2133 def handlephases(op, inpart):
2133 """apply phases from bundle part to repo"""
2134 """apply phases from bundle part to repo"""
2134 headsbyphase = phases.binarydecode(inpart)
2135 headsbyphase = phases.binarydecode(inpart)
2135 phases.updatephases(op.repo.unfiltered(), op.gettransaction, headsbyphase)
2136 phases.updatephases(op.repo.unfiltered(), op.gettransaction, headsbyphase)
2136
2137
2137 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
2138 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
2138 def handlepushkeyreply(op, inpart):
2139 def handlepushkeyreply(op, inpart):
2139 """retrieve the result of a pushkey request"""
2140 """retrieve the result of a pushkey request"""
2140 ret = int(inpart.params['return'])
2141 ret = int(inpart.params['return'])
2141 partid = int(inpart.params['in-reply-to'])
2142 partid = int(inpart.params['in-reply-to'])
2142 op.records.add('pushkey', {'return': ret}, partid)
2143 op.records.add('pushkey', {'return': ret}, partid)
2143
2144
2144 @parthandler('obsmarkers')
2145 @parthandler('obsmarkers')
2145 def handleobsmarker(op, inpart):
2146 def handleobsmarker(op, inpart):
2146 """add a stream of obsmarkers to the repo"""
2147 """add a stream of obsmarkers to the repo"""
2147 tr = op.gettransaction()
2148 tr = op.gettransaction()
2148 markerdata = inpart.read()
2149 markerdata = inpart.read()
2149 if op.ui.config('experimental', 'obsmarkers-exchange-debug'):
2150 if op.ui.config('experimental', 'obsmarkers-exchange-debug'):
2150 op.ui.write(('obsmarker-exchange: %i bytes received\n')
2151 op.ui.write(('obsmarker-exchange: %i bytes received\n')
2151 % len(markerdata))
2152 % len(markerdata))
2152 # The mergemarkers call will crash if marker creation is not enabled.
2153 # The mergemarkers call will crash if marker creation is not enabled.
2153 # we want to avoid this if the part is advisory.
2154 # we want to avoid this if the part is advisory.
2154 if not inpart.mandatory and op.repo.obsstore.readonly:
2155 if not inpart.mandatory and op.repo.obsstore.readonly:
2155 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled\n')
2156 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled\n')
2156 return
2157 return
2157 new = op.repo.obsstore.mergemarkers(tr, markerdata)
2158 new = op.repo.obsstore.mergemarkers(tr, markerdata)
2158 op.repo.invalidatevolatilesets()
2159 op.repo.invalidatevolatilesets()
2159 if new:
2160 if new:
2160 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
2161 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
2161 op.records.add('obsmarkers', {'new': new})
2162 op.records.add('obsmarkers', {'new': new})
2162 if op.reply is not None:
2163 if op.reply is not None:
2163 rpart = op.reply.newpart('reply:obsmarkers')
2164 rpart = op.reply.newpart('reply:obsmarkers')
2164 rpart.addparam(
2165 rpart.addparam(
2165 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
2166 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
2166 rpart.addparam('new', '%i' % new, mandatory=False)
2167 rpart.addparam('new', '%i' % new, mandatory=False)
2167
2168
2168
2169
2169 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
2170 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
2170 def handleobsmarkerreply(op, inpart):
2171 def handleobsmarkerreply(op, inpart):
2171 """retrieve the result of a pushkey request"""
2172 """retrieve the result of a pushkey request"""
2172 ret = int(inpart.params['new'])
2173 ret = int(inpart.params['new'])
2173 partid = int(inpart.params['in-reply-to'])
2174 partid = int(inpart.params['in-reply-to'])
2174 op.records.add('obsmarkers', {'new': ret}, partid)
2175 op.records.add('obsmarkers', {'new': ret}, partid)
2175
2176
2176 @parthandler('hgtagsfnodes')
2177 @parthandler('hgtagsfnodes')
2177 def handlehgtagsfnodes(op, inpart):
2178 def handlehgtagsfnodes(op, inpart):
2178 """Applies .hgtags fnodes cache entries to the local repo.
2179 """Applies .hgtags fnodes cache entries to the local repo.
2179
2180
2180 Payload is pairs of 20 byte changeset nodes and filenodes.
2181 Payload is pairs of 20 byte changeset nodes and filenodes.
2181 """
2182 """
2182 # Grab the transaction so we ensure that we have the lock at this point.
2183 # Grab the transaction so we ensure that we have the lock at this point.
2183 if op.ui.configbool('experimental', 'bundle2lazylocking'):
2184 if op.ui.configbool('experimental', 'bundle2lazylocking'):
2184 op.gettransaction()
2185 op.gettransaction()
2185 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
2186 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
2186
2187
2187 count = 0
2188 count = 0
2188 while True:
2189 while True:
2189 node = inpart.read(20)
2190 node = inpart.read(20)
2190 fnode = inpart.read(20)
2191 fnode = inpart.read(20)
2191 if len(node) < 20 or len(fnode) < 20:
2192 if len(node) < 20 or len(fnode) < 20:
2192 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
2193 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
2193 break
2194 break
2194 cache.setfnode(node, fnode)
2195 cache.setfnode(node, fnode)
2195 count += 1
2196 count += 1
2196
2197
2197 cache.write()
2198 cache.write()
2198 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
2199 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
2199
2200
2200 rbcstruct = struct.Struct('>III')
2201 rbcstruct = struct.Struct('>III')
2201
2202
2202 @parthandler('cache:rev-branch-cache')
2203 @parthandler('cache:rev-branch-cache')
2203 def handlerbc(op, inpart):
2204 def handlerbc(op, inpart):
2204 """receive a rev-branch-cache payload and update the local cache
2205 """receive a rev-branch-cache payload and update the local cache
2205
2206
2206 The payload is a series of data related to each branch
2207 The payload is a series of data related to each branch
2207
2208
2208 1) branch name length
2209 1) branch name length
2209 2) number of open heads
2210 2) number of open heads
2210 3) number of closed heads
2211 3) number of closed heads
2211 4) open heads nodes
2212 4) open heads nodes
2212 5) closed heads nodes
2213 5) closed heads nodes
2213 """
2214 """
2214 total = 0
2215 total = 0
2215 rawheader = inpart.read(rbcstruct.size)
2216 rawheader = inpart.read(rbcstruct.size)
2216 cache = op.repo.revbranchcache()
2217 cache = op.repo.revbranchcache()
2217 cl = op.repo.unfiltered().changelog
2218 cl = op.repo.unfiltered().changelog
2218 while rawheader:
2219 while rawheader:
2219 header = rbcstruct.unpack(rawheader)
2220 header = rbcstruct.unpack(rawheader)
2220 total += header[1] + header[2]
2221 total += header[1] + header[2]
2221 utf8branch = inpart.read(header[0])
2222 utf8branch = inpart.read(header[0])
2222 branch = encoding.tolocal(utf8branch)
2223 branch = encoding.tolocal(utf8branch)
2223 for x in xrange(header[1]):
2224 for x in xrange(header[1]):
2224 node = inpart.read(20)
2225 node = inpart.read(20)
2225 rev = cl.rev(node)
2226 rev = cl.rev(node)
2226 cache.setdata(branch, rev, node, False)
2227 cache.setdata(branch, rev, node, False)
2227 for x in xrange(header[2]):
2228 for x in xrange(header[2]):
2228 node = inpart.read(20)
2229 node = inpart.read(20)
2229 rev = cl.rev(node)
2230 rev = cl.rev(node)
2230 cache.setdata(branch, rev, node, True)
2231 cache.setdata(branch, rev, node, True)
2231 rawheader = inpart.read(rbcstruct.size)
2232 rawheader = inpart.read(rbcstruct.size)
2232 cache.write()
2233 cache.write()
2233
2234
2234 @parthandler('pushvars')
2235 @parthandler('pushvars')
2235 def bundle2getvars(op, part):
2236 def bundle2getvars(op, part):
2236 '''unbundle a bundle2 containing shellvars on the server'''
2237 '''unbundle a bundle2 containing shellvars on the server'''
2237 # An option to disable unbundling on server-side for security reasons
2238 # An option to disable unbundling on server-side for security reasons
2238 if op.ui.configbool('push', 'pushvars.server'):
2239 if op.ui.configbool('push', 'pushvars.server'):
2239 hookargs = {}
2240 hookargs = {}
2240 for key, value in part.advisoryparams:
2241 for key, value in part.advisoryparams:
2241 key = key.upper()
2242 key = key.upper()
2242 # We want pushed variables to have USERVAR_ prepended so we know
2243 # We want pushed variables to have USERVAR_ prepended so we know
2243 # they came from the --pushvar flag.
2244 # they came from the --pushvar flag.
2244 key = "USERVAR_" + key
2245 key = "USERVAR_" + key
2245 hookargs[key] = value
2246 hookargs[key] = value
2246 op.addhookargs(hookargs)
2247 op.addhookargs(hookargs)
2247
2248
2248 @parthandler('stream2', ('requirements', 'filecount', 'bytecount'))
2249 @parthandler('stream2', ('requirements', 'filecount', 'bytecount'))
2249 def handlestreamv2bundle(op, part):
2250 def handlestreamv2bundle(op, part):
2250
2251
2251 requirements = urlreq.unquote(part.params['requirements']).split(',')
2252 requirements = urlreq.unquote(part.params['requirements']).split(',')
2252 filecount = int(part.params['filecount'])
2253 filecount = int(part.params['filecount'])
2253 bytecount = int(part.params['bytecount'])
2254 bytecount = int(part.params['bytecount'])
2254
2255
2255 repo = op.repo
2256 repo = op.repo
2256 if len(repo):
2257 if len(repo):
2257 msg = _('cannot apply stream clone to non empty repository')
2258 msg = _('cannot apply stream clone to non empty repository')
2258 raise error.Abort(msg)
2259 raise error.Abort(msg)
2259
2260
2260 repo.ui.debug('applying stream bundle\n')
2261 repo.ui.debug('applying stream bundle\n')
2261 streamclone.applybundlev2(repo, part, filecount, bytecount,
2262 streamclone.applybundlev2(repo, part, filecount, bytecount,
2262 requirements)
2263 requirements)
General Comments 0
You need to be logged in to leave comments. Login now