##// END OF EJS Templates
bundle2: remove unnecessary try finally...
Durham Goode -
r34261:cc7b37c9 default
parent child Browse files
Show More
@@ -1,1939 +1,1935 b''
1 # bundle2.py - generic container format to transmit arbitrary data.
1 # bundle2.py - generic container format to transmit arbitrary data.
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7 """Handling of the new bundle2 format
7 """Handling of the new bundle2 format
8
8
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 payloads in an application agnostic way. It consist in a sequence of "parts"
10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 that will be handed to and processed by the application layer.
11 that will be handed to and processed by the application layer.
12
12
13
13
14 General format architecture
14 General format architecture
15 ===========================
15 ===========================
16
16
17 The format is architectured as follow
17 The format is architectured as follow
18
18
19 - magic string
19 - magic string
20 - stream level parameters
20 - stream level parameters
21 - payload parts (any number)
21 - payload parts (any number)
22 - end of stream marker.
22 - end of stream marker.
23
23
24 the Binary format
24 the Binary format
25 ============================
25 ============================
26
26
27 All numbers are unsigned and big-endian.
27 All numbers are unsigned and big-endian.
28
28
29 stream level parameters
29 stream level parameters
30 ------------------------
30 ------------------------
31
31
32 Binary format is as follow
32 Binary format is as follow
33
33
34 :params size: int32
34 :params size: int32
35
35
36 The total number of Bytes used by the parameters
36 The total number of Bytes used by the parameters
37
37
38 :params value: arbitrary number of Bytes
38 :params value: arbitrary number of Bytes
39
39
40 A blob of `params size` containing the serialized version of all stream level
40 A blob of `params size` containing the serialized version of all stream level
41 parameters.
41 parameters.
42
42
43 The blob contains a space separated list of parameters. Parameters with value
43 The blob contains a space separated list of parameters. Parameters with value
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45
45
46 Empty name are obviously forbidden.
46 Empty name are obviously forbidden.
47
47
48 Name MUST start with a letter. If this first letter is lower case, the
48 Name MUST start with a letter. If this first letter is lower case, the
49 parameter is advisory and can be safely ignored. However when the first
49 parameter is advisory and can be safely ignored. However when the first
50 letter is capital, the parameter is mandatory and the bundling process MUST
50 letter is capital, the parameter is mandatory and the bundling process MUST
51 stop if he is not able to proceed it.
51 stop if he is not able to proceed it.
52
52
53 Stream parameters use a simple textual format for two main reasons:
53 Stream parameters use a simple textual format for two main reasons:
54
54
55 - Stream level parameters should remain simple and we want to discourage any
55 - Stream level parameters should remain simple and we want to discourage any
56 crazy usage.
56 crazy usage.
57 - Textual data allow easy human inspection of a bundle2 header in case of
57 - Textual data allow easy human inspection of a bundle2 header in case of
58 troubles.
58 troubles.
59
59
60 Any Applicative level options MUST go into a bundle2 part instead.
60 Any Applicative level options MUST go into a bundle2 part instead.
61
61
62 Payload part
62 Payload part
63 ------------------------
63 ------------------------
64
64
65 Binary format is as follow
65 Binary format is as follow
66
66
67 :header size: int32
67 :header size: int32
68
68
69 The total number of Bytes used by the part header. When the header is empty
69 The total number of Bytes used by the part header. When the header is empty
70 (size = 0) this is interpreted as the end of stream marker.
70 (size = 0) this is interpreted as the end of stream marker.
71
71
72 :header:
72 :header:
73
73
74 The header defines how to interpret the part. It contains two piece of
74 The header defines how to interpret the part. It contains two piece of
75 data: the part type, and the part parameters.
75 data: the part type, and the part parameters.
76
76
77 The part type is used to route an application level handler, that can
77 The part type is used to route an application level handler, that can
78 interpret payload.
78 interpret payload.
79
79
80 Part parameters are passed to the application level handler. They are
80 Part parameters are passed to the application level handler. They are
81 meant to convey information that will help the application level object to
81 meant to convey information that will help the application level object to
82 interpret the part payload.
82 interpret the part payload.
83
83
84 The binary format of the header is has follow
84 The binary format of the header is has follow
85
85
86 :typesize: (one byte)
86 :typesize: (one byte)
87
87
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89
89
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 to this part.
91 to this part.
92
92
93 :parameters:
93 :parameters:
94
94
95 Part's parameter may have arbitrary content, the binary structure is::
95 Part's parameter may have arbitrary content, the binary structure is::
96
96
97 <mandatory-count><advisory-count><param-sizes><param-data>
97 <mandatory-count><advisory-count><param-sizes><param-data>
98
98
99 :mandatory-count: 1 byte, number of mandatory parameters
99 :mandatory-count: 1 byte, number of mandatory parameters
100
100
101 :advisory-count: 1 byte, number of advisory parameters
101 :advisory-count: 1 byte, number of advisory parameters
102
102
103 :param-sizes:
103 :param-sizes:
104
104
105 N couple of bytes, where N is the total number of parameters. Each
105 N couple of bytes, where N is the total number of parameters. Each
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107
107
108 :param-data:
108 :param-data:
109
109
110 A blob of bytes from which each parameter key and value can be
110 A blob of bytes from which each parameter key and value can be
111 retrieved using the list of size couples stored in the previous
111 retrieved using the list of size couples stored in the previous
112 field.
112 field.
113
113
114 Mandatory parameters comes first, then the advisory ones.
114 Mandatory parameters comes first, then the advisory ones.
115
115
116 Each parameter's key MUST be unique within the part.
116 Each parameter's key MUST be unique within the part.
117
117
118 :payload:
118 :payload:
119
119
120 payload is a series of `<chunksize><chunkdata>`.
120 payload is a series of `<chunksize><chunkdata>`.
121
121
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124
124
125 The current implementation always produces either zero or one chunk.
125 The current implementation always produces either zero or one chunk.
126 This is an implementation limitation that will ultimately be lifted.
126 This is an implementation limitation that will ultimately be lifted.
127
127
128 `chunksize` can be negative to trigger special case processing. No such
128 `chunksize` can be negative to trigger special case processing. No such
129 processing is in place yet.
129 processing is in place yet.
130
130
131 Bundle processing
131 Bundle processing
132 ============================
132 ============================
133
133
134 Each part is processed in order using a "part handler". Handler are registered
134 Each part is processed in order using a "part handler". Handler are registered
135 for a certain part type.
135 for a certain part type.
136
136
137 The matching of a part to its handler is case insensitive. The case of the
137 The matching of a part to its handler is case insensitive. The case of the
138 part type is used to know if a part is mandatory or advisory. If the Part type
138 part type is used to know if a part is mandatory or advisory. If the Part type
139 contains any uppercase char it is considered mandatory. When no handler is
139 contains any uppercase char it is considered mandatory. When no handler is
140 known for a Mandatory part, the process is aborted and an exception is raised.
140 known for a Mandatory part, the process is aborted and an exception is raised.
141 If the part is advisory and no handler is known, the part is ignored. When the
141 If the part is advisory and no handler is known, the part is ignored. When the
142 process is aborted, the full bundle is still read from the stream to keep the
142 process is aborted, the full bundle is still read from the stream to keep the
143 channel usable. But none of the part read from an abort are processed. In the
143 channel usable. But none of the part read from an abort are processed. In the
144 future, dropping the stream may become an option for channel we do not care to
144 future, dropping the stream may become an option for channel we do not care to
145 preserve.
145 preserve.
146 """
146 """
147
147
148 from __future__ import absolute_import, division
148 from __future__ import absolute_import, division
149
149
150 import errno
150 import errno
151 import re
151 import re
152 import string
152 import string
153 import struct
153 import struct
154 import sys
154 import sys
155
155
156 from .i18n import _
156 from .i18n import _
157 from . import (
157 from . import (
158 changegroup,
158 changegroup,
159 error,
159 error,
160 obsolete,
160 obsolete,
161 phases,
161 phases,
162 pushkey,
162 pushkey,
163 pycompat,
163 pycompat,
164 tags,
164 tags,
165 url,
165 url,
166 util,
166 util,
167 )
167 )
168
168
169 urlerr = util.urlerr
169 urlerr = util.urlerr
170 urlreq = util.urlreq
170 urlreq = util.urlreq
171
171
172 _pack = struct.pack
172 _pack = struct.pack
173 _unpack = struct.unpack
173 _unpack = struct.unpack
174
174
175 _fstreamparamsize = '>i'
175 _fstreamparamsize = '>i'
176 _fpartheadersize = '>i'
176 _fpartheadersize = '>i'
177 _fparttypesize = '>B'
177 _fparttypesize = '>B'
178 _fpartid = '>I'
178 _fpartid = '>I'
179 _fpayloadsize = '>i'
179 _fpayloadsize = '>i'
180 _fpartparamcount = '>BB'
180 _fpartparamcount = '>BB'
181
181
182 _fphasesentry = '>i20s'
182 _fphasesentry = '>i20s'
183
183
184 preferedchunksize = 4096
184 preferedchunksize = 4096
185
185
186 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
186 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
187
187
188 def outdebug(ui, message):
188 def outdebug(ui, message):
189 """debug regarding output stream (bundling)"""
189 """debug regarding output stream (bundling)"""
190 if ui.configbool('devel', 'bundle2.debug'):
190 if ui.configbool('devel', 'bundle2.debug'):
191 ui.debug('bundle2-output: %s\n' % message)
191 ui.debug('bundle2-output: %s\n' % message)
192
192
193 def indebug(ui, message):
193 def indebug(ui, message):
194 """debug on input stream (unbundling)"""
194 """debug on input stream (unbundling)"""
195 if ui.configbool('devel', 'bundle2.debug'):
195 if ui.configbool('devel', 'bundle2.debug'):
196 ui.debug('bundle2-input: %s\n' % message)
196 ui.debug('bundle2-input: %s\n' % message)
197
197
198 def validateparttype(parttype):
198 def validateparttype(parttype):
199 """raise ValueError if a parttype contains invalid character"""
199 """raise ValueError if a parttype contains invalid character"""
200 if _parttypeforbidden.search(parttype):
200 if _parttypeforbidden.search(parttype):
201 raise ValueError(parttype)
201 raise ValueError(parttype)
202
202
203 def _makefpartparamsizes(nbparams):
203 def _makefpartparamsizes(nbparams):
204 """return a struct format to read part parameter sizes
204 """return a struct format to read part parameter sizes
205
205
206 The number parameters is variable so we need to build that format
206 The number parameters is variable so we need to build that format
207 dynamically.
207 dynamically.
208 """
208 """
209 return '>'+('BB'*nbparams)
209 return '>'+('BB'*nbparams)
210
210
211 parthandlermapping = {}
211 parthandlermapping = {}
212
212
213 def parthandler(parttype, params=()):
213 def parthandler(parttype, params=()):
214 """decorator that register a function as a bundle2 part handler
214 """decorator that register a function as a bundle2 part handler
215
215
216 eg::
216 eg::
217
217
218 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
218 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
219 def myparttypehandler(...):
219 def myparttypehandler(...):
220 '''process a part of type "my part".'''
220 '''process a part of type "my part".'''
221 ...
221 ...
222 """
222 """
223 validateparttype(parttype)
223 validateparttype(parttype)
224 def _decorator(func):
224 def _decorator(func):
225 lparttype = parttype.lower() # enforce lower case matching.
225 lparttype = parttype.lower() # enforce lower case matching.
226 assert lparttype not in parthandlermapping
226 assert lparttype not in parthandlermapping
227 parthandlermapping[lparttype] = func
227 parthandlermapping[lparttype] = func
228 func.params = frozenset(params)
228 func.params = frozenset(params)
229 return func
229 return func
230 return _decorator
230 return _decorator
231
231
232 class unbundlerecords(object):
232 class unbundlerecords(object):
233 """keep record of what happens during and unbundle
233 """keep record of what happens during and unbundle
234
234
235 New records are added using `records.add('cat', obj)`. Where 'cat' is a
235 New records are added using `records.add('cat', obj)`. Where 'cat' is a
236 category of record and obj is an arbitrary object.
236 category of record and obj is an arbitrary object.
237
237
238 `records['cat']` will return all entries of this category 'cat'.
238 `records['cat']` will return all entries of this category 'cat'.
239
239
240 Iterating on the object itself will yield `('category', obj)` tuples
240 Iterating on the object itself will yield `('category', obj)` tuples
241 for all entries.
241 for all entries.
242
242
243 All iterations happens in chronological order.
243 All iterations happens in chronological order.
244 """
244 """
245
245
246 def __init__(self):
246 def __init__(self):
247 self._categories = {}
247 self._categories = {}
248 self._sequences = []
248 self._sequences = []
249 self._replies = {}
249 self._replies = {}
250
250
251 def add(self, category, entry, inreplyto=None):
251 def add(self, category, entry, inreplyto=None):
252 """add a new record of a given category.
252 """add a new record of a given category.
253
253
254 The entry can then be retrieved in the list returned by
254 The entry can then be retrieved in the list returned by
255 self['category']."""
255 self['category']."""
256 self._categories.setdefault(category, []).append(entry)
256 self._categories.setdefault(category, []).append(entry)
257 self._sequences.append((category, entry))
257 self._sequences.append((category, entry))
258 if inreplyto is not None:
258 if inreplyto is not None:
259 self.getreplies(inreplyto).add(category, entry)
259 self.getreplies(inreplyto).add(category, entry)
260
260
261 def getreplies(self, partid):
261 def getreplies(self, partid):
262 """get the records that are replies to a specific part"""
262 """get the records that are replies to a specific part"""
263 return self._replies.setdefault(partid, unbundlerecords())
263 return self._replies.setdefault(partid, unbundlerecords())
264
264
265 def __getitem__(self, cat):
265 def __getitem__(self, cat):
266 return tuple(self._categories.get(cat, ()))
266 return tuple(self._categories.get(cat, ()))
267
267
268 def __iter__(self):
268 def __iter__(self):
269 return iter(self._sequences)
269 return iter(self._sequences)
270
270
271 def __len__(self):
271 def __len__(self):
272 return len(self._sequences)
272 return len(self._sequences)
273
273
274 def __nonzero__(self):
274 def __nonzero__(self):
275 return bool(self._sequences)
275 return bool(self._sequences)
276
276
277 __bool__ = __nonzero__
277 __bool__ = __nonzero__
278
278
279 class bundleoperation(object):
279 class bundleoperation(object):
280 """an object that represents a single bundling process
280 """an object that represents a single bundling process
281
281
282 Its purpose is to carry unbundle-related objects and states.
282 Its purpose is to carry unbundle-related objects and states.
283
283
284 A new object should be created at the beginning of each bundle processing.
284 A new object should be created at the beginning of each bundle processing.
285 The object is to be returned by the processing function.
285 The object is to be returned by the processing function.
286
286
287 The object has very little content now it will ultimately contain:
287 The object has very little content now it will ultimately contain:
288 * an access to the repo the bundle is applied to,
288 * an access to the repo the bundle is applied to,
289 * a ui object,
289 * a ui object,
290 * a way to retrieve a transaction to add changes to the repo,
290 * a way to retrieve a transaction to add changes to the repo,
291 * a way to record the result of processing each part,
291 * a way to record the result of processing each part,
292 * a way to construct a bundle response when applicable.
292 * a way to construct a bundle response when applicable.
293 """
293 """
294
294
295 def __init__(self, repo, transactiongetter, captureoutput=True):
295 def __init__(self, repo, transactiongetter, captureoutput=True):
296 self.repo = repo
296 self.repo = repo
297 self.ui = repo.ui
297 self.ui = repo.ui
298 self.records = unbundlerecords()
298 self.records = unbundlerecords()
299 self.reply = None
299 self.reply = None
300 self.captureoutput = captureoutput
300 self.captureoutput = captureoutput
301 self.hookargs = {}
301 self.hookargs = {}
302 self._gettransaction = transactiongetter
302 self._gettransaction = transactiongetter
303
303
304 def gettransaction(self):
304 def gettransaction(self):
305 transaction = self._gettransaction()
305 transaction = self._gettransaction()
306
306
307 if self.hookargs:
307 if self.hookargs:
308 # the ones added to the transaction supercede those added
308 # the ones added to the transaction supercede those added
309 # to the operation.
309 # to the operation.
310 self.hookargs.update(transaction.hookargs)
310 self.hookargs.update(transaction.hookargs)
311 transaction.hookargs = self.hookargs
311 transaction.hookargs = self.hookargs
312
312
313 # mark the hookargs as flushed. further attempts to add to
313 # mark the hookargs as flushed. further attempts to add to
314 # hookargs will result in an abort.
314 # hookargs will result in an abort.
315 self.hookargs = None
315 self.hookargs = None
316
316
317 return transaction
317 return transaction
318
318
319 def addhookargs(self, hookargs):
319 def addhookargs(self, hookargs):
320 if self.hookargs is None:
320 if self.hookargs is None:
321 raise error.ProgrammingError('attempted to add hookargs to '
321 raise error.ProgrammingError('attempted to add hookargs to '
322 'operation after transaction started')
322 'operation after transaction started')
323 self.hookargs.update(hookargs)
323 self.hookargs.update(hookargs)
324
324
325 class TransactionUnavailable(RuntimeError):
325 class TransactionUnavailable(RuntimeError):
326 pass
326 pass
327
327
328 def _notransaction():
328 def _notransaction():
329 """default method to get a transaction while processing a bundle
329 """default method to get a transaction while processing a bundle
330
330
331 Raise an exception to highlight the fact that no transaction was expected
331 Raise an exception to highlight the fact that no transaction was expected
332 to be created"""
332 to be created"""
333 raise TransactionUnavailable()
333 raise TransactionUnavailable()
334
334
335 def applybundle(repo, unbundler, tr, source=None, url=None, **kwargs):
335 def applybundle(repo, unbundler, tr, source=None, url=None, **kwargs):
336 # transform me into unbundler.apply() as soon as the freeze is lifted
336 # transform me into unbundler.apply() as soon as the freeze is lifted
337 if isinstance(unbundler, unbundle20):
337 if isinstance(unbundler, unbundle20):
338 tr.hookargs['bundle2'] = '1'
338 tr.hookargs['bundle2'] = '1'
339 if source is not None and 'source' not in tr.hookargs:
339 if source is not None and 'source' not in tr.hookargs:
340 tr.hookargs['source'] = source
340 tr.hookargs['source'] = source
341 if url is not None and 'url' not in tr.hookargs:
341 if url is not None and 'url' not in tr.hookargs:
342 tr.hookargs['url'] = url
342 tr.hookargs['url'] = url
343 return processbundle(repo, unbundler, lambda: tr)
343 return processbundle(repo, unbundler, lambda: tr)
344 else:
344 else:
345 # the transactiongetter won't be used, but we might as well set it
345 # the transactiongetter won't be used, but we might as well set it
346 op = bundleoperation(repo, lambda: tr)
346 op = bundleoperation(repo, lambda: tr)
347 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
347 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
348 return op
348 return op
349
349
350 class partiterator(object):
350 class partiterator(object):
351 def __init__(self, repo, op, unbundler):
351 def __init__(self, repo, op, unbundler):
352 self.repo = repo
352 self.repo = repo
353 self.op = op
353 self.op = op
354 self.unbundler = unbundler
354 self.unbundler = unbundler
355 self.iterator = None
355 self.iterator = None
356 self.count = 0
356 self.count = 0
357 self.current = None
357 self.current = None
358
358
359 def __enter__(self):
359 def __enter__(self):
360 def func():
360 def func():
361 itr = enumerate(self.unbundler.iterparts())
361 itr = enumerate(self.unbundler.iterparts())
362 for count, p in itr:
362 for count, p in itr:
363 self.count = count
363 self.count = count
364 self.current = p
364 self.current = p
365 yield p
365 yield p
366 p.seek(0, 2)
366 p.seek(0, 2)
367 self.current = None
367 self.current = None
368 self.iterator = func()
368 self.iterator = func()
369 return self.iterator
369 return self.iterator
370
370
371 def __exit__(self, type, exc, tb):
371 def __exit__(self, type, exc, tb):
372 if not self.iterator:
372 if not self.iterator:
373 return
373 return
374
374
375 if exc:
375 if exc:
376 # If exiting or interrupted, do not attempt to seek the stream in
376 # If exiting or interrupted, do not attempt to seek the stream in
377 # the finally block below. This makes abort faster.
377 # the finally block below. This makes abort faster.
378 if (self.current and
378 if (self.current and
379 not isinstance(exc, (SystemExit, KeyboardInterrupt))):
379 not isinstance(exc, (SystemExit, KeyboardInterrupt))):
380 # consume the part content to not corrupt the stream.
380 # consume the part content to not corrupt the stream.
381 self.current.seek(0, 2)
381 self.current.seek(0, 2)
382
382
383 # Any exceptions seeking to the end of the bundle at this point are
383 # Any exceptions seeking to the end of the bundle at this point are
384 # almost certainly related to the underlying stream being bad.
384 # almost certainly related to the underlying stream being bad.
385 # And, chances are that the exception we're handling is related to
385 # And, chances are that the exception we're handling is related to
386 # getting in that bad state. So, we swallow the seeking error and
386 # getting in that bad state. So, we swallow the seeking error and
387 # re-raise the original error.
387 # re-raise the original error.
388 seekerror = False
388 seekerror = False
389 try:
389 try:
390 for part in self.iterator:
390 for part in self.iterator:
391 # consume the bundle content
391 # consume the bundle content
392 part.seek(0, 2)
392 part.seek(0, 2)
393 except Exception:
393 except Exception:
394 seekerror = True
394 seekerror = True
395
395
396 # Small hack to let caller code distinguish exceptions from bundle2
396 # Small hack to let caller code distinguish exceptions from bundle2
397 # processing from processing the old format. This is mostly needed
397 # processing from processing the old format. This is mostly needed
398 # to handle different return codes to unbundle according to the type
398 # to handle different return codes to unbundle according to the type
399 # of bundle. We should probably clean up or drop this return code
399 # of bundle. We should probably clean up or drop this return code
400 # craziness in a future version.
400 # craziness in a future version.
401 exc.duringunbundle2 = True
401 exc.duringunbundle2 = True
402 salvaged = []
402 salvaged = []
403 replycaps = None
403 replycaps = None
404 if self.op.reply is not None:
404 if self.op.reply is not None:
405 salvaged = self.op.reply.salvageoutput()
405 salvaged = self.op.reply.salvageoutput()
406 replycaps = self.op.reply.capabilities
406 replycaps = self.op.reply.capabilities
407 exc._replycaps = replycaps
407 exc._replycaps = replycaps
408 exc._bundle2salvagedoutput = salvaged
408 exc._bundle2salvagedoutput = salvaged
409
409
410 # Re-raising from a variable loses the original stack. So only use
410 # Re-raising from a variable loses the original stack. So only use
411 # that form if we need to.
411 # that form if we need to.
412 if seekerror:
412 if seekerror:
413 raise exc
413 raise exc
414
414
415 self.repo.ui.debug('bundle2-input-bundle: %i parts total\n' %
415 self.repo.ui.debug('bundle2-input-bundle: %i parts total\n' %
416 self.count)
416 self.count)
417
417
418 def processbundle(repo, unbundler, transactiongetter=None, op=None):
418 def processbundle(repo, unbundler, transactiongetter=None, op=None):
419 """This function process a bundle, apply effect to/from a repo
419 """This function process a bundle, apply effect to/from a repo
420
420
421 It iterates over each part then searches for and uses the proper handling
421 It iterates over each part then searches for and uses the proper handling
422 code to process the part. Parts are processed in order.
422 code to process the part. Parts are processed in order.
423
423
424 Unknown Mandatory part will abort the process.
424 Unknown Mandatory part will abort the process.
425
425
426 It is temporarily possible to provide a prebuilt bundleoperation to the
426 It is temporarily possible to provide a prebuilt bundleoperation to the
427 function. This is used to ensure output is properly propagated in case of
427 function. This is used to ensure output is properly propagated in case of
428 an error during the unbundling. This output capturing part will likely be
428 an error during the unbundling. This output capturing part will likely be
429 reworked and this ability will probably go away in the process.
429 reworked and this ability will probably go away in the process.
430 """
430 """
431 if op is None:
431 if op is None:
432 if transactiongetter is None:
432 if transactiongetter is None:
433 transactiongetter = _notransaction
433 transactiongetter = _notransaction
434 op = bundleoperation(repo, transactiongetter)
434 op = bundleoperation(repo, transactiongetter)
435 # todo:
435 # todo:
436 # - replace this is a init function soon.
436 # - replace this is a init function soon.
437 # - exception catching
437 # - exception catching
438 unbundler.params
438 unbundler.params
439 if repo.ui.debugflag:
439 if repo.ui.debugflag:
440 msg = ['bundle2-input-bundle:']
440 msg = ['bundle2-input-bundle:']
441 if unbundler.params:
441 if unbundler.params:
442 msg.append(' %i params' % len(unbundler.params))
442 msg.append(' %i params' % len(unbundler.params))
443 if op._gettransaction is None or op._gettransaction is _notransaction:
443 if op._gettransaction is None or op._gettransaction is _notransaction:
444 msg.append(' no-transaction')
444 msg.append(' no-transaction')
445 else:
445 else:
446 msg.append(' with-transaction')
446 msg.append(' with-transaction')
447 msg.append('\n')
447 msg.append('\n')
448 repo.ui.debug(''.join(msg))
448 repo.ui.debug(''.join(msg))
449
449
450 with partiterator(repo, op, unbundler) as parts:
450 with partiterator(repo, op, unbundler) as parts:
451 for part in parts:
451 for part in parts:
452 _processpart(op, part)
452 _processpart(op, part)
453
453
454 return op
454 return op
455
455
456 def _processchangegroup(op, cg, tr, source, url, **kwargs):
456 def _processchangegroup(op, cg, tr, source, url, **kwargs):
457 ret = cg.apply(op.repo, tr, source, url, **kwargs)
457 ret = cg.apply(op.repo, tr, source, url, **kwargs)
458 op.records.add('changegroup', {
458 op.records.add('changegroup', {
459 'return': ret,
459 'return': ret,
460 })
460 })
461 return ret
461 return ret
462
462
463 def _gethandler(op, part):
463 def _gethandler(op, part):
464 status = 'unknown' # used by debug output
464 status = 'unknown' # used by debug output
465 try:
465 try:
466 handler = parthandlermapping.get(part.type)
466 handler = parthandlermapping.get(part.type)
467 if handler is None:
467 if handler is None:
468 status = 'unsupported-type'
468 status = 'unsupported-type'
469 raise error.BundleUnknownFeatureError(parttype=part.type)
469 raise error.BundleUnknownFeatureError(parttype=part.type)
470 indebug(op.ui, 'found a handler for part %r' % part.type)
470 indebug(op.ui, 'found a handler for part %r' % part.type)
471 unknownparams = part.mandatorykeys - handler.params
471 unknownparams = part.mandatorykeys - handler.params
472 if unknownparams:
472 if unknownparams:
473 unknownparams = list(unknownparams)
473 unknownparams = list(unknownparams)
474 unknownparams.sort()
474 unknownparams.sort()
475 status = 'unsupported-params (%s)' % unknownparams
475 status = 'unsupported-params (%s)' % unknownparams
476 raise error.BundleUnknownFeatureError(parttype=part.type,
476 raise error.BundleUnknownFeatureError(parttype=part.type,
477 params=unknownparams)
477 params=unknownparams)
478 status = 'supported'
478 status = 'supported'
479 except error.BundleUnknownFeatureError as exc:
479 except error.BundleUnknownFeatureError as exc:
480 if part.mandatory: # mandatory parts
480 if part.mandatory: # mandatory parts
481 raise
481 raise
482 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
482 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
483 return # skip to part processing
483 return # skip to part processing
484 finally:
484 finally:
485 if op.ui.debugflag:
485 if op.ui.debugflag:
486 msg = ['bundle2-input-part: "%s"' % part.type]
486 msg = ['bundle2-input-part: "%s"' % part.type]
487 if not part.mandatory:
487 if not part.mandatory:
488 msg.append(' (advisory)')
488 msg.append(' (advisory)')
489 nbmp = len(part.mandatorykeys)
489 nbmp = len(part.mandatorykeys)
490 nbap = len(part.params) - nbmp
490 nbap = len(part.params) - nbmp
491 if nbmp or nbap:
491 if nbmp or nbap:
492 msg.append(' (params:')
492 msg.append(' (params:')
493 if nbmp:
493 if nbmp:
494 msg.append(' %i mandatory' % nbmp)
494 msg.append(' %i mandatory' % nbmp)
495 if nbap:
495 if nbap:
496 msg.append(' %i advisory' % nbmp)
496 msg.append(' %i advisory' % nbmp)
497 msg.append(')')
497 msg.append(')')
498 msg.append(' %s\n' % status)
498 msg.append(' %s\n' % status)
499 op.ui.debug(''.join(msg))
499 op.ui.debug(''.join(msg))
500
500
501 return handler
501 return handler
502
502
503 def _processpart(op, part):
503 def _processpart(op, part):
504 """process a single part from a bundle
504 """process a single part from a bundle
505
505
506 The part is guaranteed to have been fully consumed when the function exits
506 The part is guaranteed to have been fully consumed when the function exits
507 (even if an exception is raised)."""
507 (even if an exception is raised)."""
508 try:
508 handler = _gethandler(op, part)
509 handler = _gethandler(op, part)
509 if handler is None:
510 if handler is None:
510 return
511 return
512
511
513 # handler is called outside the above try block so that we don't
512 # handler is called outside the above try block so that we don't
514 # risk catching KeyErrors from anything other than the
513 # risk catching KeyErrors from anything other than the
515 # parthandlermapping lookup (any KeyError raised by handler()
514 # parthandlermapping lookup (any KeyError raised by handler()
516 # itself represents a defect of a different variety).
515 # itself represents a defect of a different variety).
517 output = None
516 output = None
518 if op.captureoutput and op.reply is not None:
517 if op.captureoutput and op.reply is not None:
519 op.ui.pushbuffer(error=True, subproc=True)
518 op.ui.pushbuffer(error=True, subproc=True)
520 output = ''
519 output = ''
521 try:
520 try:
522 handler(op, part)
521 handler(op, part)
523 finally:
524 if output is not None:
525 output = op.ui.popbuffer()
526 if output:
527 outpart = op.reply.newpart('output', data=output,
528 mandatory=False)
529 outpart.addparam(
530 'in-reply-to', pycompat.bytestr(part.id), mandatory=False)
531 finally:
522 finally:
532 pass
523 if output is not None:
533
524 output = op.ui.popbuffer()
525 if output:
526 outpart = op.reply.newpart('output', data=output,
527 mandatory=False)
528 outpart.addparam(
529 'in-reply-to', pycompat.bytestr(part.id), mandatory=False)
534
530
535 def decodecaps(blob):
531 def decodecaps(blob):
536 """decode a bundle2 caps bytes blob into a dictionary
532 """decode a bundle2 caps bytes blob into a dictionary
537
533
538 The blob is a list of capabilities (one per line)
534 The blob is a list of capabilities (one per line)
539 Capabilities may have values using a line of the form::
535 Capabilities may have values using a line of the form::
540
536
541 capability=value1,value2,value3
537 capability=value1,value2,value3
542
538
543 The values are always a list."""
539 The values are always a list."""
544 caps = {}
540 caps = {}
545 for line in blob.splitlines():
541 for line in blob.splitlines():
546 if not line:
542 if not line:
547 continue
543 continue
548 if '=' not in line:
544 if '=' not in line:
549 key, vals = line, ()
545 key, vals = line, ()
550 else:
546 else:
551 key, vals = line.split('=', 1)
547 key, vals = line.split('=', 1)
552 vals = vals.split(',')
548 vals = vals.split(',')
553 key = urlreq.unquote(key)
549 key = urlreq.unquote(key)
554 vals = [urlreq.unquote(v) for v in vals]
550 vals = [urlreq.unquote(v) for v in vals]
555 caps[key] = vals
551 caps[key] = vals
556 return caps
552 return caps
557
553
558 def encodecaps(caps):
554 def encodecaps(caps):
559 """encode a bundle2 caps dictionary into a bytes blob"""
555 """encode a bundle2 caps dictionary into a bytes blob"""
560 chunks = []
556 chunks = []
561 for ca in sorted(caps):
557 for ca in sorted(caps):
562 vals = caps[ca]
558 vals = caps[ca]
563 ca = urlreq.quote(ca)
559 ca = urlreq.quote(ca)
564 vals = [urlreq.quote(v) for v in vals]
560 vals = [urlreq.quote(v) for v in vals]
565 if vals:
561 if vals:
566 ca = "%s=%s" % (ca, ','.join(vals))
562 ca = "%s=%s" % (ca, ','.join(vals))
567 chunks.append(ca)
563 chunks.append(ca)
568 return '\n'.join(chunks)
564 return '\n'.join(chunks)
569
565
570 bundletypes = {
566 bundletypes = {
571 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
567 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
572 # since the unification ssh accepts a header but there
568 # since the unification ssh accepts a header but there
573 # is no capability signaling it.
569 # is no capability signaling it.
574 "HG20": (), # special-cased below
570 "HG20": (), # special-cased below
575 "HG10UN": ("HG10UN", 'UN'),
571 "HG10UN": ("HG10UN", 'UN'),
576 "HG10BZ": ("HG10", 'BZ'),
572 "HG10BZ": ("HG10", 'BZ'),
577 "HG10GZ": ("HG10GZ", 'GZ'),
573 "HG10GZ": ("HG10GZ", 'GZ'),
578 }
574 }
579
575
580 # hgweb uses this list to communicate its preferred type
576 # hgweb uses this list to communicate its preferred type
581 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
577 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
582
578
583 class bundle20(object):
579 class bundle20(object):
584 """represent an outgoing bundle2 container
580 """represent an outgoing bundle2 container
585
581
586 Use the `addparam` method to add stream level parameter. and `newpart` to
582 Use the `addparam` method to add stream level parameter. and `newpart` to
587 populate it. Then call `getchunks` to retrieve all the binary chunks of
583 populate it. Then call `getchunks` to retrieve all the binary chunks of
588 data that compose the bundle2 container."""
584 data that compose the bundle2 container."""
589
585
590 _magicstring = 'HG20'
586 _magicstring = 'HG20'
591
587
592 def __init__(self, ui, capabilities=()):
588 def __init__(self, ui, capabilities=()):
593 self.ui = ui
589 self.ui = ui
594 self._params = []
590 self._params = []
595 self._parts = []
591 self._parts = []
596 self.capabilities = dict(capabilities)
592 self.capabilities = dict(capabilities)
597 self._compengine = util.compengines.forbundletype('UN')
593 self._compengine = util.compengines.forbundletype('UN')
598 self._compopts = None
594 self._compopts = None
599
595
600 def setcompression(self, alg, compopts=None):
596 def setcompression(self, alg, compopts=None):
601 """setup core part compression to <alg>"""
597 """setup core part compression to <alg>"""
602 if alg in (None, 'UN'):
598 if alg in (None, 'UN'):
603 return
599 return
604 assert not any(n.lower() == 'compression' for n, v in self._params)
600 assert not any(n.lower() == 'compression' for n, v in self._params)
605 self.addparam('Compression', alg)
601 self.addparam('Compression', alg)
606 self._compengine = util.compengines.forbundletype(alg)
602 self._compengine = util.compengines.forbundletype(alg)
607 self._compopts = compopts
603 self._compopts = compopts
608
604
609 @property
605 @property
610 def nbparts(self):
606 def nbparts(self):
611 """total number of parts added to the bundler"""
607 """total number of parts added to the bundler"""
612 return len(self._parts)
608 return len(self._parts)
613
609
614 # methods used to defines the bundle2 content
610 # methods used to defines the bundle2 content
615 def addparam(self, name, value=None):
611 def addparam(self, name, value=None):
616 """add a stream level parameter"""
612 """add a stream level parameter"""
617 if not name:
613 if not name:
618 raise ValueError('empty parameter name')
614 raise ValueError('empty parameter name')
619 if name[0] not in pycompat.bytestr(string.ascii_letters):
615 if name[0] not in pycompat.bytestr(string.ascii_letters):
620 raise ValueError('non letter first character: %r' % name)
616 raise ValueError('non letter first character: %r' % name)
621 self._params.append((name, value))
617 self._params.append((name, value))
622
618
623 def addpart(self, part):
619 def addpart(self, part):
624 """add a new part to the bundle2 container
620 """add a new part to the bundle2 container
625
621
626 Parts contains the actual applicative payload."""
622 Parts contains the actual applicative payload."""
627 assert part.id is None
623 assert part.id is None
628 part.id = len(self._parts) # very cheap counter
624 part.id = len(self._parts) # very cheap counter
629 self._parts.append(part)
625 self._parts.append(part)
630
626
631 def newpart(self, typeid, *args, **kwargs):
627 def newpart(self, typeid, *args, **kwargs):
632 """create a new part and add it to the containers
628 """create a new part and add it to the containers
633
629
634 As the part is directly added to the containers. For now, this means
630 As the part is directly added to the containers. For now, this means
635 that any failure to properly initialize the part after calling
631 that any failure to properly initialize the part after calling
636 ``newpart`` should result in a failure of the whole bundling process.
632 ``newpart`` should result in a failure of the whole bundling process.
637
633
638 You can still fall back to manually create and add if you need better
634 You can still fall back to manually create and add if you need better
639 control."""
635 control."""
640 part = bundlepart(typeid, *args, **kwargs)
636 part = bundlepart(typeid, *args, **kwargs)
641 self.addpart(part)
637 self.addpart(part)
642 return part
638 return part
643
639
644 # methods used to generate the bundle2 stream
640 # methods used to generate the bundle2 stream
645 def getchunks(self):
641 def getchunks(self):
646 if self.ui.debugflag:
642 if self.ui.debugflag:
647 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
643 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
648 if self._params:
644 if self._params:
649 msg.append(' (%i params)' % len(self._params))
645 msg.append(' (%i params)' % len(self._params))
650 msg.append(' %i parts total\n' % len(self._parts))
646 msg.append(' %i parts total\n' % len(self._parts))
651 self.ui.debug(''.join(msg))
647 self.ui.debug(''.join(msg))
652 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
648 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
653 yield self._magicstring
649 yield self._magicstring
654 param = self._paramchunk()
650 param = self._paramchunk()
655 outdebug(self.ui, 'bundle parameter: %s' % param)
651 outdebug(self.ui, 'bundle parameter: %s' % param)
656 yield _pack(_fstreamparamsize, len(param))
652 yield _pack(_fstreamparamsize, len(param))
657 if param:
653 if param:
658 yield param
654 yield param
659 for chunk in self._compengine.compressstream(self._getcorechunk(),
655 for chunk in self._compengine.compressstream(self._getcorechunk(),
660 self._compopts):
656 self._compopts):
661 yield chunk
657 yield chunk
662
658
663 def _paramchunk(self):
659 def _paramchunk(self):
664 """return a encoded version of all stream parameters"""
660 """return a encoded version of all stream parameters"""
665 blocks = []
661 blocks = []
666 for par, value in self._params:
662 for par, value in self._params:
667 par = urlreq.quote(par)
663 par = urlreq.quote(par)
668 if value is not None:
664 if value is not None:
669 value = urlreq.quote(value)
665 value = urlreq.quote(value)
670 par = '%s=%s' % (par, value)
666 par = '%s=%s' % (par, value)
671 blocks.append(par)
667 blocks.append(par)
672 return ' '.join(blocks)
668 return ' '.join(blocks)
673
669
674 def _getcorechunk(self):
670 def _getcorechunk(self):
675 """yield chunk for the core part of the bundle
671 """yield chunk for the core part of the bundle
676
672
677 (all but headers and parameters)"""
673 (all but headers and parameters)"""
678 outdebug(self.ui, 'start of parts')
674 outdebug(self.ui, 'start of parts')
679 for part in self._parts:
675 for part in self._parts:
680 outdebug(self.ui, 'bundle part: "%s"' % part.type)
676 outdebug(self.ui, 'bundle part: "%s"' % part.type)
681 for chunk in part.getchunks(ui=self.ui):
677 for chunk in part.getchunks(ui=self.ui):
682 yield chunk
678 yield chunk
683 outdebug(self.ui, 'end of bundle')
679 outdebug(self.ui, 'end of bundle')
684 yield _pack(_fpartheadersize, 0)
680 yield _pack(_fpartheadersize, 0)
685
681
686
682
687 def salvageoutput(self):
683 def salvageoutput(self):
688 """return a list with a copy of all output parts in the bundle
684 """return a list with a copy of all output parts in the bundle
689
685
690 This is meant to be used during error handling to make sure we preserve
686 This is meant to be used during error handling to make sure we preserve
691 server output"""
687 server output"""
692 salvaged = []
688 salvaged = []
693 for part in self._parts:
689 for part in self._parts:
694 if part.type.startswith('output'):
690 if part.type.startswith('output'):
695 salvaged.append(part.copy())
691 salvaged.append(part.copy())
696 return salvaged
692 return salvaged
697
693
698
694
699 class unpackermixin(object):
695 class unpackermixin(object):
700 """A mixin to extract bytes and struct data from a stream"""
696 """A mixin to extract bytes and struct data from a stream"""
701
697
702 def __init__(self, fp):
698 def __init__(self, fp):
703 self._fp = fp
699 self._fp = fp
704
700
705 def _unpack(self, format):
701 def _unpack(self, format):
706 """unpack this struct format from the stream
702 """unpack this struct format from the stream
707
703
708 This method is meant for internal usage by the bundle2 protocol only.
704 This method is meant for internal usage by the bundle2 protocol only.
709 They directly manipulate the low level stream including bundle2 level
705 They directly manipulate the low level stream including bundle2 level
710 instruction.
706 instruction.
711
707
712 Do not use it to implement higher-level logic or methods."""
708 Do not use it to implement higher-level logic or methods."""
713 data = self._readexact(struct.calcsize(format))
709 data = self._readexact(struct.calcsize(format))
714 return _unpack(format, data)
710 return _unpack(format, data)
715
711
716 def _readexact(self, size):
712 def _readexact(self, size):
717 """read exactly <size> bytes from the stream
713 """read exactly <size> bytes from the stream
718
714
719 This method is meant for internal usage by the bundle2 protocol only.
715 This method is meant for internal usage by the bundle2 protocol only.
720 They directly manipulate the low level stream including bundle2 level
716 They directly manipulate the low level stream including bundle2 level
721 instruction.
717 instruction.
722
718
723 Do not use it to implement higher-level logic or methods."""
719 Do not use it to implement higher-level logic or methods."""
724 return changegroup.readexactly(self._fp, size)
720 return changegroup.readexactly(self._fp, size)
725
721
726 def getunbundler(ui, fp, magicstring=None):
722 def getunbundler(ui, fp, magicstring=None):
727 """return a valid unbundler object for a given magicstring"""
723 """return a valid unbundler object for a given magicstring"""
728 if magicstring is None:
724 if magicstring is None:
729 magicstring = changegroup.readexactly(fp, 4)
725 magicstring = changegroup.readexactly(fp, 4)
730 magic, version = magicstring[0:2], magicstring[2:4]
726 magic, version = magicstring[0:2], magicstring[2:4]
731 if magic != 'HG':
727 if magic != 'HG':
732 ui.debug(
728 ui.debug(
733 "error: invalid magic: %r (version %r), should be 'HG'\n"
729 "error: invalid magic: %r (version %r), should be 'HG'\n"
734 % (magic, version))
730 % (magic, version))
735 raise error.Abort(_('not a Mercurial bundle'))
731 raise error.Abort(_('not a Mercurial bundle'))
736 unbundlerclass = formatmap.get(version)
732 unbundlerclass = formatmap.get(version)
737 if unbundlerclass is None:
733 if unbundlerclass is None:
738 raise error.Abort(_('unknown bundle version %s') % version)
734 raise error.Abort(_('unknown bundle version %s') % version)
739 unbundler = unbundlerclass(ui, fp)
735 unbundler = unbundlerclass(ui, fp)
740 indebug(ui, 'start processing of %s stream' % magicstring)
736 indebug(ui, 'start processing of %s stream' % magicstring)
741 return unbundler
737 return unbundler
742
738
743 class unbundle20(unpackermixin):
739 class unbundle20(unpackermixin):
744 """interpret a bundle2 stream
740 """interpret a bundle2 stream
745
741
746 This class is fed with a binary stream and yields parts through its
742 This class is fed with a binary stream and yields parts through its
747 `iterparts` methods."""
743 `iterparts` methods."""
748
744
749 _magicstring = 'HG20'
745 _magicstring = 'HG20'
750
746
751 def __init__(self, ui, fp):
747 def __init__(self, ui, fp):
752 """If header is specified, we do not read it out of the stream."""
748 """If header is specified, we do not read it out of the stream."""
753 self.ui = ui
749 self.ui = ui
754 self._compengine = util.compengines.forbundletype('UN')
750 self._compengine = util.compengines.forbundletype('UN')
755 self._compressed = None
751 self._compressed = None
756 super(unbundle20, self).__init__(fp)
752 super(unbundle20, self).__init__(fp)
757
753
758 @util.propertycache
754 @util.propertycache
759 def params(self):
755 def params(self):
760 """dictionary of stream level parameters"""
756 """dictionary of stream level parameters"""
761 indebug(self.ui, 'reading bundle2 stream parameters')
757 indebug(self.ui, 'reading bundle2 stream parameters')
762 params = {}
758 params = {}
763 paramssize = self._unpack(_fstreamparamsize)[0]
759 paramssize = self._unpack(_fstreamparamsize)[0]
764 if paramssize < 0:
760 if paramssize < 0:
765 raise error.BundleValueError('negative bundle param size: %i'
761 raise error.BundleValueError('negative bundle param size: %i'
766 % paramssize)
762 % paramssize)
767 if paramssize:
763 if paramssize:
768 params = self._readexact(paramssize)
764 params = self._readexact(paramssize)
769 params = self._processallparams(params)
765 params = self._processallparams(params)
770 return params
766 return params
771
767
772 def _processallparams(self, paramsblock):
768 def _processallparams(self, paramsblock):
773 """"""
769 """"""
774 params = util.sortdict()
770 params = util.sortdict()
775 for p in paramsblock.split(' '):
771 for p in paramsblock.split(' '):
776 p = p.split('=', 1)
772 p = p.split('=', 1)
777 p = [urlreq.unquote(i) for i in p]
773 p = [urlreq.unquote(i) for i in p]
778 if len(p) < 2:
774 if len(p) < 2:
779 p.append(None)
775 p.append(None)
780 self._processparam(*p)
776 self._processparam(*p)
781 params[p[0]] = p[1]
777 params[p[0]] = p[1]
782 return params
778 return params
783
779
784
780
785 def _processparam(self, name, value):
781 def _processparam(self, name, value):
786 """process a parameter, applying its effect if needed
782 """process a parameter, applying its effect if needed
787
783
788 Parameter starting with a lower case letter are advisory and will be
784 Parameter starting with a lower case letter are advisory and will be
789 ignored when unknown. Those starting with an upper case letter are
785 ignored when unknown. Those starting with an upper case letter are
790 mandatory and will this function will raise a KeyError when unknown.
786 mandatory and will this function will raise a KeyError when unknown.
791
787
792 Note: no option are currently supported. Any input will be either
788 Note: no option are currently supported. Any input will be either
793 ignored or failing.
789 ignored or failing.
794 """
790 """
795 if not name:
791 if not name:
796 raise ValueError('empty parameter name')
792 raise ValueError('empty parameter name')
797 if name[0] not in pycompat.bytestr(string.ascii_letters):
793 if name[0] not in pycompat.bytestr(string.ascii_letters):
798 raise ValueError('non letter first character: %r' % name)
794 raise ValueError('non letter first character: %r' % name)
799 try:
795 try:
800 handler = b2streamparamsmap[name.lower()]
796 handler = b2streamparamsmap[name.lower()]
801 except KeyError:
797 except KeyError:
802 if name[0].islower():
798 if name[0].islower():
803 indebug(self.ui, "ignoring unknown parameter %r" % name)
799 indebug(self.ui, "ignoring unknown parameter %r" % name)
804 else:
800 else:
805 raise error.BundleUnknownFeatureError(params=(name,))
801 raise error.BundleUnknownFeatureError(params=(name,))
806 else:
802 else:
807 handler(self, name, value)
803 handler(self, name, value)
808
804
809 def _forwardchunks(self):
805 def _forwardchunks(self):
810 """utility to transfer a bundle2 as binary
806 """utility to transfer a bundle2 as binary
811
807
812 This is made necessary by the fact the 'getbundle' command over 'ssh'
808 This is made necessary by the fact the 'getbundle' command over 'ssh'
813 have no way to know then the reply end, relying on the bundle to be
809 have no way to know then the reply end, relying on the bundle to be
814 interpreted to know its end. This is terrible and we are sorry, but we
810 interpreted to know its end. This is terrible and we are sorry, but we
815 needed to move forward to get general delta enabled.
811 needed to move forward to get general delta enabled.
816 """
812 """
817 yield self._magicstring
813 yield self._magicstring
818 assert 'params' not in vars(self)
814 assert 'params' not in vars(self)
819 paramssize = self._unpack(_fstreamparamsize)[0]
815 paramssize = self._unpack(_fstreamparamsize)[0]
820 if paramssize < 0:
816 if paramssize < 0:
821 raise error.BundleValueError('negative bundle param size: %i'
817 raise error.BundleValueError('negative bundle param size: %i'
822 % paramssize)
818 % paramssize)
823 yield _pack(_fstreamparamsize, paramssize)
819 yield _pack(_fstreamparamsize, paramssize)
824 if paramssize:
820 if paramssize:
825 params = self._readexact(paramssize)
821 params = self._readexact(paramssize)
826 self._processallparams(params)
822 self._processallparams(params)
827 yield params
823 yield params
828 assert self._compengine.bundletype == 'UN'
824 assert self._compengine.bundletype == 'UN'
829 # From there, payload might need to be decompressed
825 # From there, payload might need to be decompressed
830 self._fp = self._compengine.decompressorreader(self._fp)
826 self._fp = self._compengine.decompressorreader(self._fp)
831 emptycount = 0
827 emptycount = 0
832 while emptycount < 2:
828 while emptycount < 2:
833 # so we can brainlessly loop
829 # so we can brainlessly loop
834 assert _fpartheadersize == _fpayloadsize
830 assert _fpartheadersize == _fpayloadsize
835 size = self._unpack(_fpartheadersize)[0]
831 size = self._unpack(_fpartheadersize)[0]
836 yield _pack(_fpartheadersize, size)
832 yield _pack(_fpartheadersize, size)
837 if size:
833 if size:
838 emptycount = 0
834 emptycount = 0
839 else:
835 else:
840 emptycount += 1
836 emptycount += 1
841 continue
837 continue
842 if size == flaginterrupt:
838 if size == flaginterrupt:
843 continue
839 continue
844 elif size < 0:
840 elif size < 0:
845 raise error.BundleValueError('negative chunk size: %i')
841 raise error.BundleValueError('negative chunk size: %i')
846 yield self._readexact(size)
842 yield self._readexact(size)
847
843
848
844
849 def iterparts(self):
845 def iterparts(self):
850 """yield all parts contained in the stream"""
846 """yield all parts contained in the stream"""
851 # make sure param have been loaded
847 # make sure param have been loaded
852 self.params
848 self.params
853 # From there, payload need to be decompressed
849 # From there, payload need to be decompressed
854 self._fp = self._compengine.decompressorreader(self._fp)
850 self._fp = self._compengine.decompressorreader(self._fp)
855 indebug(self.ui, 'start extraction of bundle2 parts')
851 indebug(self.ui, 'start extraction of bundle2 parts')
856 headerblock = self._readpartheader()
852 headerblock = self._readpartheader()
857 while headerblock is not None:
853 while headerblock is not None:
858 part = unbundlepart(self.ui, headerblock, self._fp)
854 part = unbundlepart(self.ui, headerblock, self._fp)
859 yield part
855 yield part
860 # Seek to the end of the part to force it's consumption so the next
856 # Seek to the end of the part to force it's consumption so the next
861 # part can be read. But then seek back to the beginning so the
857 # part can be read. But then seek back to the beginning so the
862 # code consuming this generator has a part that starts at 0.
858 # code consuming this generator has a part that starts at 0.
863 part.seek(0, 2)
859 part.seek(0, 2)
864 part.seek(0)
860 part.seek(0)
865 headerblock = self._readpartheader()
861 headerblock = self._readpartheader()
866 indebug(self.ui, 'end of bundle2 stream')
862 indebug(self.ui, 'end of bundle2 stream')
867
863
868 def _readpartheader(self):
864 def _readpartheader(self):
869 """reads a part header size and return the bytes blob
865 """reads a part header size and return the bytes blob
870
866
871 returns None if empty"""
867 returns None if empty"""
872 headersize = self._unpack(_fpartheadersize)[0]
868 headersize = self._unpack(_fpartheadersize)[0]
873 if headersize < 0:
869 if headersize < 0:
874 raise error.BundleValueError('negative part header size: %i'
870 raise error.BundleValueError('negative part header size: %i'
875 % headersize)
871 % headersize)
876 indebug(self.ui, 'part header size: %i' % headersize)
872 indebug(self.ui, 'part header size: %i' % headersize)
877 if headersize:
873 if headersize:
878 return self._readexact(headersize)
874 return self._readexact(headersize)
879 return None
875 return None
880
876
881 def compressed(self):
877 def compressed(self):
882 self.params # load params
878 self.params # load params
883 return self._compressed
879 return self._compressed
884
880
885 def close(self):
881 def close(self):
886 """close underlying file"""
882 """close underlying file"""
887 if util.safehasattr(self._fp, 'close'):
883 if util.safehasattr(self._fp, 'close'):
888 return self._fp.close()
884 return self._fp.close()
889
885
890 formatmap = {'20': unbundle20}
886 formatmap = {'20': unbundle20}
891
887
892 b2streamparamsmap = {}
888 b2streamparamsmap = {}
893
889
894 def b2streamparamhandler(name):
890 def b2streamparamhandler(name):
895 """register a handler for a stream level parameter"""
891 """register a handler for a stream level parameter"""
896 def decorator(func):
892 def decorator(func):
897 assert name not in formatmap
893 assert name not in formatmap
898 b2streamparamsmap[name] = func
894 b2streamparamsmap[name] = func
899 return func
895 return func
900 return decorator
896 return decorator
901
897
902 @b2streamparamhandler('compression')
898 @b2streamparamhandler('compression')
903 def processcompression(unbundler, param, value):
899 def processcompression(unbundler, param, value):
904 """read compression parameter and install payload decompression"""
900 """read compression parameter and install payload decompression"""
905 if value not in util.compengines.supportedbundletypes:
901 if value not in util.compengines.supportedbundletypes:
906 raise error.BundleUnknownFeatureError(params=(param,),
902 raise error.BundleUnknownFeatureError(params=(param,),
907 values=(value,))
903 values=(value,))
908 unbundler._compengine = util.compengines.forbundletype(value)
904 unbundler._compengine = util.compengines.forbundletype(value)
909 if value is not None:
905 if value is not None:
910 unbundler._compressed = True
906 unbundler._compressed = True
911
907
912 class bundlepart(object):
908 class bundlepart(object):
913 """A bundle2 part contains application level payload
909 """A bundle2 part contains application level payload
914
910
915 The part `type` is used to route the part to the application level
911 The part `type` is used to route the part to the application level
916 handler.
912 handler.
917
913
918 The part payload is contained in ``part.data``. It could be raw bytes or a
914 The part payload is contained in ``part.data``. It could be raw bytes or a
919 generator of byte chunks.
915 generator of byte chunks.
920
916
921 You can add parameters to the part using the ``addparam`` method.
917 You can add parameters to the part using the ``addparam`` method.
922 Parameters can be either mandatory (default) or advisory. Remote side
918 Parameters can be either mandatory (default) or advisory. Remote side
923 should be able to safely ignore the advisory ones.
919 should be able to safely ignore the advisory ones.
924
920
925 Both data and parameters cannot be modified after the generation has begun.
921 Both data and parameters cannot be modified after the generation has begun.
926 """
922 """
927
923
928 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
924 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
929 data='', mandatory=True):
925 data='', mandatory=True):
930 validateparttype(parttype)
926 validateparttype(parttype)
931 self.id = None
927 self.id = None
932 self.type = parttype
928 self.type = parttype
933 self._data = data
929 self._data = data
934 self._mandatoryparams = list(mandatoryparams)
930 self._mandatoryparams = list(mandatoryparams)
935 self._advisoryparams = list(advisoryparams)
931 self._advisoryparams = list(advisoryparams)
936 # checking for duplicated entries
932 # checking for duplicated entries
937 self._seenparams = set()
933 self._seenparams = set()
938 for pname, __ in self._mandatoryparams + self._advisoryparams:
934 for pname, __ in self._mandatoryparams + self._advisoryparams:
939 if pname in self._seenparams:
935 if pname in self._seenparams:
940 raise error.ProgrammingError('duplicated params: %s' % pname)
936 raise error.ProgrammingError('duplicated params: %s' % pname)
941 self._seenparams.add(pname)
937 self._seenparams.add(pname)
942 # status of the part's generation:
938 # status of the part's generation:
943 # - None: not started,
939 # - None: not started,
944 # - False: currently generated,
940 # - False: currently generated,
945 # - True: generation done.
941 # - True: generation done.
946 self._generated = None
942 self._generated = None
947 self.mandatory = mandatory
943 self.mandatory = mandatory
948
944
949 def __repr__(self):
945 def __repr__(self):
950 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
946 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
951 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
947 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
952 % (cls, id(self), self.id, self.type, self.mandatory))
948 % (cls, id(self), self.id, self.type, self.mandatory))
953
949
954 def copy(self):
950 def copy(self):
955 """return a copy of the part
951 """return a copy of the part
956
952
957 The new part have the very same content but no partid assigned yet.
953 The new part have the very same content but no partid assigned yet.
958 Parts with generated data cannot be copied."""
954 Parts with generated data cannot be copied."""
959 assert not util.safehasattr(self.data, 'next')
955 assert not util.safehasattr(self.data, 'next')
960 return self.__class__(self.type, self._mandatoryparams,
956 return self.__class__(self.type, self._mandatoryparams,
961 self._advisoryparams, self._data, self.mandatory)
957 self._advisoryparams, self._data, self.mandatory)
962
958
963 # methods used to defines the part content
959 # methods used to defines the part content
964 @property
960 @property
965 def data(self):
961 def data(self):
966 return self._data
962 return self._data
967
963
968 @data.setter
964 @data.setter
969 def data(self, data):
965 def data(self, data):
970 if self._generated is not None:
966 if self._generated is not None:
971 raise error.ReadOnlyPartError('part is being generated')
967 raise error.ReadOnlyPartError('part is being generated')
972 self._data = data
968 self._data = data
973
969
974 @property
970 @property
975 def mandatoryparams(self):
971 def mandatoryparams(self):
976 # make it an immutable tuple to force people through ``addparam``
972 # make it an immutable tuple to force people through ``addparam``
977 return tuple(self._mandatoryparams)
973 return tuple(self._mandatoryparams)
978
974
979 @property
975 @property
980 def advisoryparams(self):
976 def advisoryparams(self):
981 # make it an immutable tuple to force people through ``addparam``
977 # make it an immutable tuple to force people through ``addparam``
982 return tuple(self._advisoryparams)
978 return tuple(self._advisoryparams)
983
979
984 def addparam(self, name, value='', mandatory=True):
980 def addparam(self, name, value='', mandatory=True):
985 """add a parameter to the part
981 """add a parameter to the part
986
982
987 If 'mandatory' is set to True, the remote handler must claim support
983 If 'mandatory' is set to True, the remote handler must claim support
988 for this parameter or the unbundling will be aborted.
984 for this parameter or the unbundling will be aborted.
989
985
990 The 'name' and 'value' cannot exceed 255 bytes each.
986 The 'name' and 'value' cannot exceed 255 bytes each.
991 """
987 """
992 if self._generated is not None:
988 if self._generated is not None:
993 raise error.ReadOnlyPartError('part is being generated')
989 raise error.ReadOnlyPartError('part is being generated')
994 if name in self._seenparams:
990 if name in self._seenparams:
995 raise ValueError('duplicated params: %s' % name)
991 raise ValueError('duplicated params: %s' % name)
996 self._seenparams.add(name)
992 self._seenparams.add(name)
997 params = self._advisoryparams
993 params = self._advisoryparams
998 if mandatory:
994 if mandatory:
999 params = self._mandatoryparams
995 params = self._mandatoryparams
1000 params.append((name, value))
996 params.append((name, value))
1001
997
1002 # methods used to generates the bundle2 stream
998 # methods used to generates the bundle2 stream
1003 def getchunks(self, ui):
999 def getchunks(self, ui):
1004 if self._generated is not None:
1000 if self._generated is not None:
1005 raise error.ProgrammingError('part can only be consumed once')
1001 raise error.ProgrammingError('part can only be consumed once')
1006 self._generated = False
1002 self._generated = False
1007
1003
1008 if ui.debugflag:
1004 if ui.debugflag:
1009 msg = ['bundle2-output-part: "%s"' % self.type]
1005 msg = ['bundle2-output-part: "%s"' % self.type]
1010 if not self.mandatory:
1006 if not self.mandatory:
1011 msg.append(' (advisory)')
1007 msg.append(' (advisory)')
1012 nbmp = len(self.mandatoryparams)
1008 nbmp = len(self.mandatoryparams)
1013 nbap = len(self.advisoryparams)
1009 nbap = len(self.advisoryparams)
1014 if nbmp or nbap:
1010 if nbmp or nbap:
1015 msg.append(' (params:')
1011 msg.append(' (params:')
1016 if nbmp:
1012 if nbmp:
1017 msg.append(' %i mandatory' % nbmp)
1013 msg.append(' %i mandatory' % nbmp)
1018 if nbap:
1014 if nbap:
1019 msg.append(' %i advisory' % nbmp)
1015 msg.append(' %i advisory' % nbmp)
1020 msg.append(')')
1016 msg.append(')')
1021 if not self.data:
1017 if not self.data:
1022 msg.append(' empty payload')
1018 msg.append(' empty payload')
1023 elif util.safehasattr(self.data, 'next'):
1019 elif util.safehasattr(self.data, 'next'):
1024 msg.append(' streamed payload')
1020 msg.append(' streamed payload')
1025 else:
1021 else:
1026 msg.append(' %i bytes payload' % len(self.data))
1022 msg.append(' %i bytes payload' % len(self.data))
1027 msg.append('\n')
1023 msg.append('\n')
1028 ui.debug(''.join(msg))
1024 ui.debug(''.join(msg))
1029
1025
1030 #### header
1026 #### header
1031 if self.mandatory:
1027 if self.mandatory:
1032 parttype = self.type.upper()
1028 parttype = self.type.upper()
1033 else:
1029 else:
1034 parttype = self.type.lower()
1030 parttype = self.type.lower()
1035 outdebug(ui, 'part %s: "%s"' % (pycompat.bytestr(self.id), parttype))
1031 outdebug(ui, 'part %s: "%s"' % (pycompat.bytestr(self.id), parttype))
1036 ## parttype
1032 ## parttype
1037 header = [_pack(_fparttypesize, len(parttype)),
1033 header = [_pack(_fparttypesize, len(parttype)),
1038 parttype, _pack(_fpartid, self.id),
1034 parttype, _pack(_fpartid, self.id),
1039 ]
1035 ]
1040 ## parameters
1036 ## parameters
1041 # count
1037 # count
1042 manpar = self.mandatoryparams
1038 manpar = self.mandatoryparams
1043 advpar = self.advisoryparams
1039 advpar = self.advisoryparams
1044 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
1040 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
1045 # size
1041 # size
1046 parsizes = []
1042 parsizes = []
1047 for key, value in manpar:
1043 for key, value in manpar:
1048 parsizes.append(len(key))
1044 parsizes.append(len(key))
1049 parsizes.append(len(value))
1045 parsizes.append(len(value))
1050 for key, value in advpar:
1046 for key, value in advpar:
1051 parsizes.append(len(key))
1047 parsizes.append(len(key))
1052 parsizes.append(len(value))
1048 parsizes.append(len(value))
1053 paramsizes = _pack(_makefpartparamsizes(len(parsizes) // 2), *parsizes)
1049 paramsizes = _pack(_makefpartparamsizes(len(parsizes) // 2), *parsizes)
1054 header.append(paramsizes)
1050 header.append(paramsizes)
1055 # key, value
1051 # key, value
1056 for key, value in manpar:
1052 for key, value in manpar:
1057 header.append(key)
1053 header.append(key)
1058 header.append(value)
1054 header.append(value)
1059 for key, value in advpar:
1055 for key, value in advpar:
1060 header.append(key)
1056 header.append(key)
1061 header.append(value)
1057 header.append(value)
1062 ## finalize header
1058 ## finalize header
1063 try:
1059 try:
1064 headerchunk = ''.join(header)
1060 headerchunk = ''.join(header)
1065 except TypeError:
1061 except TypeError:
1066 raise TypeError(r'Found a non-bytes trying to '
1062 raise TypeError(r'Found a non-bytes trying to '
1067 r'build bundle part header: %r' % header)
1063 r'build bundle part header: %r' % header)
1068 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
1064 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
1069 yield _pack(_fpartheadersize, len(headerchunk))
1065 yield _pack(_fpartheadersize, len(headerchunk))
1070 yield headerchunk
1066 yield headerchunk
1071 ## payload
1067 ## payload
1072 try:
1068 try:
1073 for chunk in self._payloadchunks():
1069 for chunk in self._payloadchunks():
1074 outdebug(ui, 'payload chunk size: %i' % len(chunk))
1070 outdebug(ui, 'payload chunk size: %i' % len(chunk))
1075 yield _pack(_fpayloadsize, len(chunk))
1071 yield _pack(_fpayloadsize, len(chunk))
1076 yield chunk
1072 yield chunk
1077 except GeneratorExit:
1073 except GeneratorExit:
1078 # GeneratorExit means that nobody is listening for our
1074 # GeneratorExit means that nobody is listening for our
1079 # results anyway, so just bail quickly rather than trying
1075 # results anyway, so just bail quickly rather than trying
1080 # to produce an error part.
1076 # to produce an error part.
1081 ui.debug('bundle2-generatorexit\n')
1077 ui.debug('bundle2-generatorexit\n')
1082 raise
1078 raise
1083 except BaseException as exc:
1079 except BaseException as exc:
1084 bexc = util.forcebytestr(exc)
1080 bexc = util.forcebytestr(exc)
1085 # backup exception data for later
1081 # backup exception data for later
1086 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
1082 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
1087 % bexc)
1083 % bexc)
1088 tb = sys.exc_info()[2]
1084 tb = sys.exc_info()[2]
1089 msg = 'unexpected error: %s' % bexc
1085 msg = 'unexpected error: %s' % bexc
1090 interpart = bundlepart('error:abort', [('message', msg)],
1086 interpart = bundlepart('error:abort', [('message', msg)],
1091 mandatory=False)
1087 mandatory=False)
1092 interpart.id = 0
1088 interpart.id = 0
1093 yield _pack(_fpayloadsize, -1)
1089 yield _pack(_fpayloadsize, -1)
1094 for chunk in interpart.getchunks(ui=ui):
1090 for chunk in interpart.getchunks(ui=ui):
1095 yield chunk
1091 yield chunk
1096 outdebug(ui, 'closing payload chunk')
1092 outdebug(ui, 'closing payload chunk')
1097 # abort current part payload
1093 # abort current part payload
1098 yield _pack(_fpayloadsize, 0)
1094 yield _pack(_fpayloadsize, 0)
1099 pycompat.raisewithtb(exc, tb)
1095 pycompat.raisewithtb(exc, tb)
1100 # end of payload
1096 # end of payload
1101 outdebug(ui, 'closing payload chunk')
1097 outdebug(ui, 'closing payload chunk')
1102 yield _pack(_fpayloadsize, 0)
1098 yield _pack(_fpayloadsize, 0)
1103 self._generated = True
1099 self._generated = True
1104
1100
1105 def _payloadchunks(self):
1101 def _payloadchunks(self):
1106 """yield chunks of a the part payload
1102 """yield chunks of a the part payload
1107
1103
1108 Exists to handle the different methods to provide data to a part."""
1104 Exists to handle the different methods to provide data to a part."""
1109 # we only support fixed size data now.
1105 # we only support fixed size data now.
1110 # This will be improved in the future.
1106 # This will be improved in the future.
1111 if (util.safehasattr(self.data, 'next')
1107 if (util.safehasattr(self.data, 'next')
1112 or util.safehasattr(self.data, '__next__')):
1108 or util.safehasattr(self.data, '__next__')):
1113 buff = util.chunkbuffer(self.data)
1109 buff = util.chunkbuffer(self.data)
1114 chunk = buff.read(preferedchunksize)
1110 chunk = buff.read(preferedchunksize)
1115 while chunk:
1111 while chunk:
1116 yield chunk
1112 yield chunk
1117 chunk = buff.read(preferedchunksize)
1113 chunk = buff.read(preferedchunksize)
1118 elif len(self.data):
1114 elif len(self.data):
1119 yield self.data
1115 yield self.data
1120
1116
1121
1117
1122 flaginterrupt = -1
1118 flaginterrupt = -1
1123
1119
1124 class interrupthandler(unpackermixin):
1120 class interrupthandler(unpackermixin):
1125 """read one part and process it with restricted capability
1121 """read one part and process it with restricted capability
1126
1122
1127 This allows to transmit exception raised on the producer size during part
1123 This allows to transmit exception raised on the producer size during part
1128 iteration while the consumer is reading a part.
1124 iteration while the consumer is reading a part.
1129
1125
1130 Part processed in this manner only have access to a ui object,"""
1126 Part processed in this manner only have access to a ui object,"""
1131
1127
1132 def __init__(self, ui, fp):
1128 def __init__(self, ui, fp):
1133 super(interrupthandler, self).__init__(fp)
1129 super(interrupthandler, self).__init__(fp)
1134 self.ui = ui
1130 self.ui = ui
1135
1131
1136 def _readpartheader(self):
1132 def _readpartheader(self):
1137 """reads a part header size and return the bytes blob
1133 """reads a part header size and return the bytes blob
1138
1134
1139 returns None if empty"""
1135 returns None if empty"""
1140 headersize = self._unpack(_fpartheadersize)[0]
1136 headersize = self._unpack(_fpartheadersize)[0]
1141 if headersize < 0:
1137 if headersize < 0:
1142 raise error.BundleValueError('negative part header size: %i'
1138 raise error.BundleValueError('negative part header size: %i'
1143 % headersize)
1139 % headersize)
1144 indebug(self.ui, 'part header size: %i\n' % headersize)
1140 indebug(self.ui, 'part header size: %i\n' % headersize)
1145 if headersize:
1141 if headersize:
1146 return self._readexact(headersize)
1142 return self._readexact(headersize)
1147 return None
1143 return None
1148
1144
1149 def __call__(self):
1145 def __call__(self):
1150
1146
1151 self.ui.debug('bundle2-input-stream-interrupt:'
1147 self.ui.debug('bundle2-input-stream-interrupt:'
1152 ' opening out of band context\n')
1148 ' opening out of band context\n')
1153 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1149 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1154 headerblock = self._readpartheader()
1150 headerblock = self._readpartheader()
1155 if headerblock is None:
1151 if headerblock is None:
1156 indebug(self.ui, 'no part found during interruption.')
1152 indebug(self.ui, 'no part found during interruption.')
1157 return
1153 return
1158 part = unbundlepart(self.ui, headerblock, self._fp)
1154 part = unbundlepart(self.ui, headerblock, self._fp)
1159 op = interruptoperation(self.ui)
1155 op = interruptoperation(self.ui)
1160 hardabort = False
1156 hardabort = False
1161 try:
1157 try:
1162 _processpart(op, part)
1158 _processpart(op, part)
1163 except (SystemExit, KeyboardInterrupt):
1159 except (SystemExit, KeyboardInterrupt):
1164 hardabort = True
1160 hardabort = True
1165 raise
1161 raise
1166 finally:
1162 finally:
1167 if not hardabort:
1163 if not hardabort:
1168 part.seek(0, 2)
1164 part.seek(0, 2)
1169 self.ui.debug('bundle2-input-stream-interrupt:'
1165 self.ui.debug('bundle2-input-stream-interrupt:'
1170 ' closing out of band context\n')
1166 ' closing out of band context\n')
1171
1167
1172 class interruptoperation(object):
1168 class interruptoperation(object):
1173 """A limited operation to be use by part handler during interruption
1169 """A limited operation to be use by part handler during interruption
1174
1170
1175 It only have access to an ui object.
1171 It only have access to an ui object.
1176 """
1172 """
1177
1173
1178 def __init__(self, ui):
1174 def __init__(self, ui):
1179 self.ui = ui
1175 self.ui = ui
1180 self.reply = None
1176 self.reply = None
1181 self.captureoutput = False
1177 self.captureoutput = False
1182
1178
1183 @property
1179 @property
1184 def repo(self):
1180 def repo(self):
1185 raise error.ProgrammingError('no repo access from stream interruption')
1181 raise error.ProgrammingError('no repo access from stream interruption')
1186
1182
1187 def gettransaction(self):
1183 def gettransaction(self):
1188 raise TransactionUnavailable('no repo access from stream interruption')
1184 raise TransactionUnavailable('no repo access from stream interruption')
1189
1185
1190 class unbundlepart(unpackermixin):
1186 class unbundlepart(unpackermixin):
1191 """a bundle part read from a bundle"""
1187 """a bundle part read from a bundle"""
1192
1188
1193 def __init__(self, ui, header, fp):
1189 def __init__(self, ui, header, fp):
1194 super(unbundlepart, self).__init__(fp)
1190 super(unbundlepart, self).__init__(fp)
1195 self._seekable = (util.safehasattr(fp, 'seek') and
1191 self._seekable = (util.safehasattr(fp, 'seek') and
1196 util.safehasattr(fp, 'tell'))
1192 util.safehasattr(fp, 'tell'))
1197 self.ui = ui
1193 self.ui = ui
1198 # unbundle state attr
1194 # unbundle state attr
1199 self._headerdata = header
1195 self._headerdata = header
1200 self._headeroffset = 0
1196 self._headeroffset = 0
1201 self._initialized = False
1197 self._initialized = False
1202 self.consumed = False
1198 self.consumed = False
1203 # part data
1199 # part data
1204 self.id = None
1200 self.id = None
1205 self.type = None
1201 self.type = None
1206 self.mandatoryparams = None
1202 self.mandatoryparams = None
1207 self.advisoryparams = None
1203 self.advisoryparams = None
1208 self.params = None
1204 self.params = None
1209 self.mandatorykeys = ()
1205 self.mandatorykeys = ()
1210 self._payloadstream = None
1206 self._payloadstream = None
1211 self._readheader()
1207 self._readheader()
1212 self._mandatory = None
1208 self._mandatory = None
1213 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1209 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1214 self._pos = 0
1210 self._pos = 0
1215
1211
1216 def _fromheader(self, size):
1212 def _fromheader(self, size):
1217 """return the next <size> byte from the header"""
1213 """return the next <size> byte from the header"""
1218 offset = self._headeroffset
1214 offset = self._headeroffset
1219 data = self._headerdata[offset:(offset + size)]
1215 data = self._headerdata[offset:(offset + size)]
1220 self._headeroffset = offset + size
1216 self._headeroffset = offset + size
1221 return data
1217 return data
1222
1218
1223 def _unpackheader(self, format):
1219 def _unpackheader(self, format):
1224 """read given format from header
1220 """read given format from header
1225
1221
1226 This automatically compute the size of the format to read."""
1222 This automatically compute the size of the format to read."""
1227 data = self._fromheader(struct.calcsize(format))
1223 data = self._fromheader(struct.calcsize(format))
1228 return _unpack(format, data)
1224 return _unpack(format, data)
1229
1225
1230 def _initparams(self, mandatoryparams, advisoryparams):
1226 def _initparams(self, mandatoryparams, advisoryparams):
1231 """internal function to setup all logic related parameters"""
1227 """internal function to setup all logic related parameters"""
1232 # make it read only to prevent people touching it by mistake.
1228 # make it read only to prevent people touching it by mistake.
1233 self.mandatoryparams = tuple(mandatoryparams)
1229 self.mandatoryparams = tuple(mandatoryparams)
1234 self.advisoryparams = tuple(advisoryparams)
1230 self.advisoryparams = tuple(advisoryparams)
1235 # user friendly UI
1231 # user friendly UI
1236 self.params = util.sortdict(self.mandatoryparams)
1232 self.params = util.sortdict(self.mandatoryparams)
1237 self.params.update(self.advisoryparams)
1233 self.params.update(self.advisoryparams)
1238 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1234 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1239
1235
1240 def _payloadchunks(self, chunknum=0):
1236 def _payloadchunks(self, chunknum=0):
1241 '''seek to specified chunk and start yielding data'''
1237 '''seek to specified chunk and start yielding data'''
1242 if len(self._chunkindex) == 0:
1238 if len(self._chunkindex) == 0:
1243 assert chunknum == 0, 'Must start with chunk 0'
1239 assert chunknum == 0, 'Must start with chunk 0'
1244 self._chunkindex.append((0, self._tellfp()))
1240 self._chunkindex.append((0, self._tellfp()))
1245 else:
1241 else:
1246 assert chunknum < len(self._chunkindex), \
1242 assert chunknum < len(self._chunkindex), \
1247 'Unknown chunk %d' % chunknum
1243 'Unknown chunk %d' % chunknum
1248 self._seekfp(self._chunkindex[chunknum][1])
1244 self._seekfp(self._chunkindex[chunknum][1])
1249
1245
1250 pos = self._chunkindex[chunknum][0]
1246 pos = self._chunkindex[chunknum][0]
1251 payloadsize = self._unpack(_fpayloadsize)[0]
1247 payloadsize = self._unpack(_fpayloadsize)[0]
1252 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1248 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1253 while payloadsize:
1249 while payloadsize:
1254 if payloadsize == flaginterrupt:
1250 if payloadsize == flaginterrupt:
1255 # interruption detection, the handler will now read a
1251 # interruption detection, the handler will now read a
1256 # single part and process it.
1252 # single part and process it.
1257 interrupthandler(self.ui, self._fp)()
1253 interrupthandler(self.ui, self._fp)()
1258 elif payloadsize < 0:
1254 elif payloadsize < 0:
1259 msg = 'negative payload chunk size: %i' % payloadsize
1255 msg = 'negative payload chunk size: %i' % payloadsize
1260 raise error.BundleValueError(msg)
1256 raise error.BundleValueError(msg)
1261 else:
1257 else:
1262 result = self._readexact(payloadsize)
1258 result = self._readexact(payloadsize)
1263 chunknum += 1
1259 chunknum += 1
1264 pos += payloadsize
1260 pos += payloadsize
1265 if chunknum == len(self._chunkindex):
1261 if chunknum == len(self._chunkindex):
1266 self._chunkindex.append((pos, self._tellfp()))
1262 self._chunkindex.append((pos, self._tellfp()))
1267 yield result
1263 yield result
1268 payloadsize = self._unpack(_fpayloadsize)[0]
1264 payloadsize = self._unpack(_fpayloadsize)[0]
1269 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1265 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1270
1266
1271 def _findchunk(self, pos):
1267 def _findchunk(self, pos):
1272 '''for a given payload position, return a chunk number and offset'''
1268 '''for a given payload position, return a chunk number and offset'''
1273 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1269 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1274 if ppos == pos:
1270 if ppos == pos:
1275 return chunk, 0
1271 return chunk, 0
1276 elif ppos > pos:
1272 elif ppos > pos:
1277 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1273 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1278 raise ValueError('Unknown chunk')
1274 raise ValueError('Unknown chunk')
1279
1275
1280 def _readheader(self):
1276 def _readheader(self):
1281 """read the header and setup the object"""
1277 """read the header and setup the object"""
1282 typesize = self._unpackheader(_fparttypesize)[0]
1278 typesize = self._unpackheader(_fparttypesize)[0]
1283 self.type = self._fromheader(typesize)
1279 self.type = self._fromheader(typesize)
1284 indebug(self.ui, 'part type: "%s"' % self.type)
1280 indebug(self.ui, 'part type: "%s"' % self.type)
1285 self.id = self._unpackheader(_fpartid)[0]
1281 self.id = self._unpackheader(_fpartid)[0]
1286 indebug(self.ui, 'part id: "%s"' % pycompat.bytestr(self.id))
1282 indebug(self.ui, 'part id: "%s"' % pycompat.bytestr(self.id))
1287 # extract mandatory bit from type
1283 # extract mandatory bit from type
1288 self.mandatory = (self.type != self.type.lower())
1284 self.mandatory = (self.type != self.type.lower())
1289 self.type = self.type.lower()
1285 self.type = self.type.lower()
1290 ## reading parameters
1286 ## reading parameters
1291 # param count
1287 # param count
1292 mancount, advcount = self._unpackheader(_fpartparamcount)
1288 mancount, advcount = self._unpackheader(_fpartparamcount)
1293 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1289 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1294 # param size
1290 # param size
1295 fparamsizes = _makefpartparamsizes(mancount + advcount)
1291 fparamsizes = _makefpartparamsizes(mancount + advcount)
1296 paramsizes = self._unpackheader(fparamsizes)
1292 paramsizes = self._unpackheader(fparamsizes)
1297 # make it a list of couple again
1293 # make it a list of couple again
1298 paramsizes = list(zip(paramsizes[::2], paramsizes[1::2]))
1294 paramsizes = list(zip(paramsizes[::2], paramsizes[1::2]))
1299 # split mandatory from advisory
1295 # split mandatory from advisory
1300 mansizes = paramsizes[:mancount]
1296 mansizes = paramsizes[:mancount]
1301 advsizes = paramsizes[mancount:]
1297 advsizes = paramsizes[mancount:]
1302 # retrieve param value
1298 # retrieve param value
1303 manparams = []
1299 manparams = []
1304 for key, value in mansizes:
1300 for key, value in mansizes:
1305 manparams.append((self._fromheader(key), self._fromheader(value)))
1301 manparams.append((self._fromheader(key), self._fromheader(value)))
1306 advparams = []
1302 advparams = []
1307 for key, value in advsizes:
1303 for key, value in advsizes:
1308 advparams.append((self._fromheader(key), self._fromheader(value)))
1304 advparams.append((self._fromheader(key), self._fromheader(value)))
1309 self._initparams(manparams, advparams)
1305 self._initparams(manparams, advparams)
1310 ## part payload
1306 ## part payload
1311 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1307 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1312 # we read the data, tell it
1308 # we read the data, tell it
1313 self._initialized = True
1309 self._initialized = True
1314
1310
1315 def read(self, size=None):
1311 def read(self, size=None):
1316 """read payload data"""
1312 """read payload data"""
1317 if not self._initialized:
1313 if not self._initialized:
1318 self._readheader()
1314 self._readheader()
1319 if size is None:
1315 if size is None:
1320 data = self._payloadstream.read()
1316 data = self._payloadstream.read()
1321 else:
1317 else:
1322 data = self._payloadstream.read(size)
1318 data = self._payloadstream.read(size)
1323 self._pos += len(data)
1319 self._pos += len(data)
1324 if size is None or len(data) < size:
1320 if size is None or len(data) < size:
1325 if not self.consumed and self._pos:
1321 if not self.consumed and self._pos:
1326 self.ui.debug('bundle2-input-part: total payload size %i\n'
1322 self.ui.debug('bundle2-input-part: total payload size %i\n'
1327 % self._pos)
1323 % self._pos)
1328 self.consumed = True
1324 self.consumed = True
1329 return data
1325 return data
1330
1326
1331 def tell(self):
1327 def tell(self):
1332 return self._pos
1328 return self._pos
1333
1329
1334 def seek(self, offset, whence=0):
1330 def seek(self, offset, whence=0):
1335 if whence == 0:
1331 if whence == 0:
1336 newpos = offset
1332 newpos = offset
1337 elif whence == 1:
1333 elif whence == 1:
1338 newpos = self._pos + offset
1334 newpos = self._pos + offset
1339 elif whence == 2:
1335 elif whence == 2:
1340 if not self.consumed:
1336 if not self.consumed:
1341 self.read()
1337 self.read()
1342 newpos = self._chunkindex[-1][0] - offset
1338 newpos = self._chunkindex[-1][0] - offset
1343 else:
1339 else:
1344 raise ValueError('Unknown whence value: %r' % (whence,))
1340 raise ValueError('Unknown whence value: %r' % (whence,))
1345
1341
1346 if newpos > self._chunkindex[-1][0] and not self.consumed:
1342 if newpos > self._chunkindex[-1][0] and not self.consumed:
1347 self.read()
1343 self.read()
1348 if not 0 <= newpos <= self._chunkindex[-1][0]:
1344 if not 0 <= newpos <= self._chunkindex[-1][0]:
1349 raise ValueError('Offset out of range')
1345 raise ValueError('Offset out of range')
1350
1346
1351 if self._pos != newpos:
1347 if self._pos != newpos:
1352 chunk, internaloffset = self._findchunk(newpos)
1348 chunk, internaloffset = self._findchunk(newpos)
1353 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1349 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1354 adjust = self.read(internaloffset)
1350 adjust = self.read(internaloffset)
1355 if len(adjust) != internaloffset:
1351 if len(adjust) != internaloffset:
1356 raise error.Abort(_('Seek failed\n'))
1352 raise error.Abort(_('Seek failed\n'))
1357 self._pos = newpos
1353 self._pos = newpos
1358
1354
1359 def _seekfp(self, offset, whence=0):
1355 def _seekfp(self, offset, whence=0):
1360 """move the underlying file pointer
1356 """move the underlying file pointer
1361
1357
1362 This method is meant for internal usage by the bundle2 protocol only.
1358 This method is meant for internal usage by the bundle2 protocol only.
1363 They directly manipulate the low level stream including bundle2 level
1359 They directly manipulate the low level stream including bundle2 level
1364 instruction.
1360 instruction.
1365
1361
1366 Do not use it to implement higher-level logic or methods."""
1362 Do not use it to implement higher-level logic or methods."""
1367 if self._seekable:
1363 if self._seekable:
1368 return self._fp.seek(offset, whence)
1364 return self._fp.seek(offset, whence)
1369 else:
1365 else:
1370 raise NotImplementedError(_('File pointer is not seekable'))
1366 raise NotImplementedError(_('File pointer is not seekable'))
1371
1367
1372 def _tellfp(self):
1368 def _tellfp(self):
1373 """return the file offset, or None if file is not seekable
1369 """return the file offset, or None if file is not seekable
1374
1370
1375 This method is meant for internal usage by the bundle2 protocol only.
1371 This method is meant for internal usage by the bundle2 protocol only.
1376 They directly manipulate the low level stream including bundle2 level
1372 They directly manipulate the low level stream including bundle2 level
1377 instruction.
1373 instruction.
1378
1374
1379 Do not use it to implement higher-level logic or methods."""
1375 Do not use it to implement higher-level logic or methods."""
1380 if self._seekable:
1376 if self._seekable:
1381 try:
1377 try:
1382 return self._fp.tell()
1378 return self._fp.tell()
1383 except IOError as e:
1379 except IOError as e:
1384 if e.errno == errno.ESPIPE:
1380 if e.errno == errno.ESPIPE:
1385 self._seekable = False
1381 self._seekable = False
1386 else:
1382 else:
1387 raise
1383 raise
1388 return None
1384 return None
1389
1385
1390 # These are only the static capabilities.
1386 # These are only the static capabilities.
1391 # Check the 'getrepocaps' function for the rest.
1387 # Check the 'getrepocaps' function for the rest.
1392 capabilities = {'HG20': (),
1388 capabilities = {'HG20': (),
1393 'error': ('abort', 'unsupportedcontent', 'pushraced',
1389 'error': ('abort', 'unsupportedcontent', 'pushraced',
1394 'pushkey'),
1390 'pushkey'),
1395 'listkeys': (),
1391 'listkeys': (),
1396 'pushkey': (),
1392 'pushkey': (),
1397 'digests': tuple(sorted(util.DIGESTS.keys())),
1393 'digests': tuple(sorted(util.DIGESTS.keys())),
1398 'remote-changegroup': ('http', 'https'),
1394 'remote-changegroup': ('http', 'https'),
1399 'hgtagsfnodes': (),
1395 'hgtagsfnodes': (),
1400 }
1396 }
1401
1397
1402 def getrepocaps(repo, allowpushback=False):
1398 def getrepocaps(repo, allowpushback=False):
1403 """return the bundle2 capabilities for a given repo
1399 """return the bundle2 capabilities for a given repo
1404
1400
1405 Exists to allow extensions (like evolution) to mutate the capabilities.
1401 Exists to allow extensions (like evolution) to mutate the capabilities.
1406 """
1402 """
1407 caps = capabilities.copy()
1403 caps = capabilities.copy()
1408 caps['changegroup'] = tuple(sorted(
1404 caps['changegroup'] = tuple(sorted(
1409 changegroup.supportedincomingversions(repo)))
1405 changegroup.supportedincomingversions(repo)))
1410 if obsolete.isenabled(repo, obsolete.exchangeopt):
1406 if obsolete.isenabled(repo, obsolete.exchangeopt):
1411 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1407 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1412 caps['obsmarkers'] = supportedformat
1408 caps['obsmarkers'] = supportedformat
1413 if allowpushback:
1409 if allowpushback:
1414 caps['pushback'] = ()
1410 caps['pushback'] = ()
1415 cpmode = repo.ui.config('server', 'concurrent-push-mode')
1411 cpmode = repo.ui.config('server', 'concurrent-push-mode')
1416 if cpmode == 'check-related':
1412 if cpmode == 'check-related':
1417 caps['checkheads'] = ('related',)
1413 caps['checkheads'] = ('related',)
1418 return caps
1414 return caps
1419
1415
1420 def bundle2caps(remote):
1416 def bundle2caps(remote):
1421 """return the bundle capabilities of a peer as dict"""
1417 """return the bundle capabilities of a peer as dict"""
1422 raw = remote.capable('bundle2')
1418 raw = remote.capable('bundle2')
1423 if not raw and raw != '':
1419 if not raw and raw != '':
1424 return {}
1420 return {}
1425 capsblob = urlreq.unquote(remote.capable('bundle2'))
1421 capsblob = urlreq.unquote(remote.capable('bundle2'))
1426 return decodecaps(capsblob)
1422 return decodecaps(capsblob)
1427
1423
1428 def obsmarkersversion(caps):
1424 def obsmarkersversion(caps):
1429 """extract the list of supported obsmarkers versions from a bundle2caps dict
1425 """extract the list of supported obsmarkers versions from a bundle2caps dict
1430 """
1426 """
1431 obscaps = caps.get('obsmarkers', ())
1427 obscaps = caps.get('obsmarkers', ())
1432 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1428 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1433
1429
1434 def writenewbundle(ui, repo, source, filename, bundletype, outgoing, opts,
1430 def writenewbundle(ui, repo, source, filename, bundletype, outgoing, opts,
1435 vfs=None, compression=None, compopts=None):
1431 vfs=None, compression=None, compopts=None):
1436 if bundletype.startswith('HG10'):
1432 if bundletype.startswith('HG10'):
1437 cg = changegroup.makechangegroup(repo, outgoing, '01', source)
1433 cg = changegroup.makechangegroup(repo, outgoing, '01', source)
1438 return writebundle(ui, cg, filename, bundletype, vfs=vfs,
1434 return writebundle(ui, cg, filename, bundletype, vfs=vfs,
1439 compression=compression, compopts=compopts)
1435 compression=compression, compopts=compopts)
1440 elif not bundletype.startswith('HG20'):
1436 elif not bundletype.startswith('HG20'):
1441 raise error.ProgrammingError('unknown bundle type: %s' % bundletype)
1437 raise error.ProgrammingError('unknown bundle type: %s' % bundletype)
1442
1438
1443 caps = {}
1439 caps = {}
1444 if 'obsolescence' in opts:
1440 if 'obsolescence' in opts:
1445 caps['obsmarkers'] = ('V1',)
1441 caps['obsmarkers'] = ('V1',)
1446 bundle = bundle20(ui, caps)
1442 bundle = bundle20(ui, caps)
1447 bundle.setcompression(compression, compopts)
1443 bundle.setcompression(compression, compopts)
1448 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1444 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1449 chunkiter = bundle.getchunks()
1445 chunkiter = bundle.getchunks()
1450
1446
1451 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1447 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1452
1448
1453 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1449 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1454 # We should eventually reconcile this logic with the one behind
1450 # We should eventually reconcile this logic with the one behind
1455 # 'exchange.getbundle2partsgenerator'.
1451 # 'exchange.getbundle2partsgenerator'.
1456 #
1452 #
1457 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1453 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1458 # different right now. So we keep them separated for now for the sake of
1454 # different right now. So we keep them separated for now for the sake of
1459 # simplicity.
1455 # simplicity.
1460
1456
1461 # we always want a changegroup in such bundle
1457 # we always want a changegroup in such bundle
1462 cgversion = opts.get('cg.version')
1458 cgversion = opts.get('cg.version')
1463 if cgversion is None:
1459 if cgversion is None:
1464 cgversion = changegroup.safeversion(repo)
1460 cgversion = changegroup.safeversion(repo)
1465 cg = changegroup.makechangegroup(repo, outgoing, cgversion, source)
1461 cg = changegroup.makechangegroup(repo, outgoing, cgversion, source)
1466 part = bundler.newpart('changegroup', data=cg.getchunks())
1462 part = bundler.newpart('changegroup', data=cg.getchunks())
1467 part.addparam('version', cg.version)
1463 part.addparam('version', cg.version)
1468 if 'clcount' in cg.extras:
1464 if 'clcount' in cg.extras:
1469 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1465 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1470 mandatory=False)
1466 mandatory=False)
1471 if opts.get('phases') and repo.revs('%ln and secret()',
1467 if opts.get('phases') and repo.revs('%ln and secret()',
1472 outgoing.missingheads):
1468 outgoing.missingheads):
1473 part.addparam('targetphase', '%d' % phases.secret, mandatory=False)
1469 part.addparam('targetphase', '%d' % phases.secret, mandatory=False)
1474
1470
1475 addparttagsfnodescache(repo, bundler, outgoing)
1471 addparttagsfnodescache(repo, bundler, outgoing)
1476
1472
1477 if opts.get('obsolescence', False):
1473 if opts.get('obsolescence', False):
1478 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1474 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1479 buildobsmarkerspart(bundler, obsmarkers)
1475 buildobsmarkerspart(bundler, obsmarkers)
1480
1476
1481 if opts.get('phases', False):
1477 if opts.get('phases', False):
1482 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1478 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1483 phasedata = []
1479 phasedata = []
1484 for phase in phases.allphases:
1480 for phase in phases.allphases:
1485 for head in headsbyphase[phase]:
1481 for head in headsbyphase[phase]:
1486 phasedata.append(_pack(_fphasesentry, phase, head))
1482 phasedata.append(_pack(_fphasesentry, phase, head))
1487 bundler.newpart('phase-heads', data=''.join(phasedata))
1483 bundler.newpart('phase-heads', data=''.join(phasedata))
1488
1484
1489 def addparttagsfnodescache(repo, bundler, outgoing):
1485 def addparttagsfnodescache(repo, bundler, outgoing):
1490 # we include the tags fnode cache for the bundle changeset
1486 # we include the tags fnode cache for the bundle changeset
1491 # (as an optional parts)
1487 # (as an optional parts)
1492 cache = tags.hgtagsfnodescache(repo.unfiltered())
1488 cache = tags.hgtagsfnodescache(repo.unfiltered())
1493 chunks = []
1489 chunks = []
1494
1490
1495 # .hgtags fnodes are only relevant for head changesets. While we could
1491 # .hgtags fnodes are only relevant for head changesets. While we could
1496 # transfer values for all known nodes, there will likely be little to
1492 # transfer values for all known nodes, there will likely be little to
1497 # no benefit.
1493 # no benefit.
1498 #
1494 #
1499 # We don't bother using a generator to produce output data because
1495 # We don't bother using a generator to produce output data because
1500 # a) we only have 40 bytes per head and even esoteric numbers of heads
1496 # a) we only have 40 bytes per head and even esoteric numbers of heads
1501 # consume little memory (1M heads is 40MB) b) we don't want to send the
1497 # consume little memory (1M heads is 40MB) b) we don't want to send the
1502 # part if we don't have entries and knowing if we have entries requires
1498 # part if we don't have entries and knowing if we have entries requires
1503 # cache lookups.
1499 # cache lookups.
1504 for node in outgoing.missingheads:
1500 for node in outgoing.missingheads:
1505 # Don't compute missing, as this may slow down serving.
1501 # Don't compute missing, as this may slow down serving.
1506 fnode = cache.getfnode(node, computemissing=False)
1502 fnode = cache.getfnode(node, computemissing=False)
1507 if fnode is not None:
1503 if fnode is not None:
1508 chunks.extend([node, fnode])
1504 chunks.extend([node, fnode])
1509
1505
1510 if chunks:
1506 if chunks:
1511 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1507 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1512
1508
1513 def buildobsmarkerspart(bundler, markers):
1509 def buildobsmarkerspart(bundler, markers):
1514 """add an obsmarker part to the bundler with <markers>
1510 """add an obsmarker part to the bundler with <markers>
1515
1511
1516 No part is created if markers is empty.
1512 No part is created if markers is empty.
1517 Raises ValueError if the bundler doesn't support any known obsmarker format.
1513 Raises ValueError if the bundler doesn't support any known obsmarker format.
1518 """
1514 """
1519 if not markers:
1515 if not markers:
1520 return None
1516 return None
1521
1517
1522 remoteversions = obsmarkersversion(bundler.capabilities)
1518 remoteversions = obsmarkersversion(bundler.capabilities)
1523 version = obsolete.commonversion(remoteversions)
1519 version = obsolete.commonversion(remoteversions)
1524 if version is None:
1520 if version is None:
1525 raise ValueError('bundler does not support common obsmarker format')
1521 raise ValueError('bundler does not support common obsmarker format')
1526 stream = obsolete.encodemarkers(markers, True, version=version)
1522 stream = obsolete.encodemarkers(markers, True, version=version)
1527 return bundler.newpart('obsmarkers', data=stream)
1523 return bundler.newpart('obsmarkers', data=stream)
1528
1524
1529 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1525 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1530 compopts=None):
1526 compopts=None):
1531 """Write a bundle file and return its filename.
1527 """Write a bundle file and return its filename.
1532
1528
1533 Existing files will not be overwritten.
1529 Existing files will not be overwritten.
1534 If no filename is specified, a temporary file is created.
1530 If no filename is specified, a temporary file is created.
1535 bz2 compression can be turned off.
1531 bz2 compression can be turned off.
1536 The bundle file will be deleted in case of errors.
1532 The bundle file will be deleted in case of errors.
1537 """
1533 """
1538
1534
1539 if bundletype == "HG20":
1535 if bundletype == "HG20":
1540 bundle = bundle20(ui)
1536 bundle = bundle20(ui)
1541 bundle.setcompression(compression, compopts)
1537 bundle.setcompression(compression, compopts)
1542 part = bundle.newpart('changegroup', data=cg.getchunks())
1538 part = bundle.newpart('changegroup', data=cg.getchunks())
1543 part.addparam('version', cg.version)
1539 part.addparam('version', cg.version)
1544 if 'clcount' in cg.extras:
1540 if 'clcount' in cg.extras:
1545 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1541 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1546 mandatory=False)
1542 mandatory=False)
1547 chunkiter = bundle.getchunks()
1543 chunkiter = bundle.getchunks()
1548 else:
1544 else:
1549 # compression argument is only for the bundle2 case
1545 # compression argument is only for the bundle2 case
1550 assert compression is None
1546 assert compression is None
1551 if cg.version != '01':
1547 if cg.version != '01':
1552 raise error.Abort(_('old bundle types only supports v1 '
1548 raise error.Abort(_('old bundle types only supports v1 '
1553 'changegroups'))
1549 'changegroups'))
1554 header, comp = bundletypes[bundletype]
1550 header, comp = bundletypes[bundletype]
1555 if comp not in util.compengines.supportedbundletypes:
1551 if comp not in util.compengines.supportedbundletypes:
1556 raise error.Abort(_('unknown stream compression type: %s')
1552 raise error.Abort(_('unknown stream compression type: %s')
1557 % comp)
1553 % comp)
1558 compengine = util.compengines.forbundletype(comp)
1554 compengine = util.compengines.forbundletype(comp)
1559 def chunkiter():
1555 def chunkiter():
1560 yield header
1556 yield header
1561 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1557 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1562 yield chunk
1558 yield chunk
1563 chunkiter = chunkiter()
1559 chunkiter = chunkiter()
1564
1560
1565 # parse the changegroup data, otherwise we will block
1561 # parse the changegroup data, otherwise we will block
1566 # in case of sshrepo because we don't know the end of the stream
1562 # in case of sshrepo because we don't know the end of the stream
1567 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1563 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1568
1564
1569 def combinechangegroupresults(op):
1565 def combinechangegroupresults(op):
1570 """logic to combine 0 or more addchangegroup results into one"""
1566 """logic to combine 0 or more addchangegroup results into one"""
1571 results = [r.get('return', 0)
1567 results = [r.get('return', 0)
1572 for r in op.records['changegroup']]
1568 for r in op.records['changegroup']]
1573 changedheads = 0
1569 changedheads = 0
1574 result = 1
1570 result = 1
1575 for ret in results:
1571 for ret in results:
1576 # If any changegroup result is 0, return 0
1572 # If any changegroup result is 0, return 0
1577 if ret == 0:
1573 if ret == 0:
1578 result = 0
1574 result = 0
1579 break
1575 break
1580 if ret < -1:
1576 if ret < -1:
1581 changedheads += ret + 1
1577 changedheads += ret + 1
1582 elif ret > 1:
1578 elif ret > 1:
1583 changedheads += ret - 1
1579 changedheads += ret - 1
1584 if changedheads > 0:
1580 if changedheads > 0:
1585 result = 1 + changedheads
1581 result = 1 + changedheads
1586 elif changedheads < 0:
1582 elif changedheads < 0:
1587 result = -1 + changedheads
1583 result = -1 + changedheads
1588 return result
1584 return result
1589
1585
1590 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest',
1586 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest',
1591 'targetphase'))
1587 'targetphase'))
1592 def handlechangegroup(op, inpart):
1588 def handlechangegroup(op, inpart):
1593 """apply a changegroup part on the repo
1589 """apply a changegroup part on the repo
1594
1590
1595 This is a very early implementation that will massive rework before being
1591 This is a very early implementation that will massive rework before being
1596 inflicted to any end-user.
1592 inflicted to any end-user.
1597 """
1593 """
1598 tr = op.gettransaction()
1594 tr = op.gettransaction()
1599 unpackerversion = inpart.params.get('version', '01')
1595 unpackerversion = inpart.params.get('version', '01')
1600 # We should raise an appropriate exception here
1596 # We should raise an appropriate exception here
1601 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1597 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1602 # the source and url passed here are overwritten by the one contained in
1598 # the source and url passed here are overwritten by the one contained in
1603 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1599 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1604 nbchangesets = None
1600 nbchangesets = None
1605 if 'nbchanges' in inpart.params:
1601 if 'nbchanges' in inpart.params:
1606 nbchangesets = int(inpart.params.get('nbchanges'))
1602 nbchangesets = int(inpart.params.get('nbchanges'))
1607 if ('treemanifest' in inpart.params and
1603 if ('treemanifest' in inpart.params and
1608 'treemanifest' not in op.repo.requirements):
1604 'treemanifest' not in op.repo.requirements):
1609 if len(op.repo.changelog) != 0:
1605 if len(op.repo.changelog) != 0:
1610 raise error.Abort(_(
1606 raise error.Abort(_(
1611 "bundle contains tree manifests, but local repo is "
1607 "bundle contains tree manifests, but local repo is "
1612 "non-empty and does not use tree manifests"))
1608 "non-empty and does not use tree manifests"))
1613 op.repo.requirements.add('treemanifest')
1609 op.repo.requirements.add('treemanifest')
1614 op.repo._applyopenerreqs()
1610 op.repo._applyopenerreqs()
1615 op.repo._writerequirements()
1611 op.repo._writerequirements()
1616 extrakwargs = {}
1612 extrakwargs = {}
1617 targetphase = inpart.params.get('targetphase')
1613 targetphase = inpart.params.get('targetphase')
1618 if targetphase is not None:
1614 if targetphase is not None:
1619 extrakwargs['targetphase'] = int(targetphase)
1615 extrakwargs['targetphase'] = int(targetphase)
1620 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2',
1616 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2',
1621 expectedtotal=nbchangesets, **extrakwargs)
1617 expectedtotal=nbchangesets, **extrakwargs)
1622 if op.reply is not None:
1618 if op.reply is not None:
1623 # This is definitely not the final form of this
1619 # This is definitely not the final form of this
1624 # return. But one need to start somewhere.
1620 # return. But one need to start somewhere.
1625 part = op.reply.newpart('reply:changegroup', mandatory=False)
1621 part = op.reply.newpart('reply:changegroup', mandatory=False)
1626 part.addparam(
1622 part.addparam(
1627 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1623 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1628 part.addparam('return', '%i' % ret, mandatory=False)
1624 part.addparam('return', '%i' % ret, mandatory=False)
1629 assert not inpart.read()
1625 assert not inpart.read()
1630
1626
1631 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1627 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1632 ['digest:%s' % k for k in util.DIGESTS.keys()])
1628 ['digest:%s' % k for k in util.DIGESTS.keys()])
1633 @parthandler('remote-changegroup', _remotechangegroupparams)
1629 @parthandler('remote-changegroup', _remotechangegroupparams)
1634 def handleremotechangegroup(op, inpart):
1630 def handleremotechangegroup(op, inpart):
1635 """apply a bundle10 on the repo, given an url and validation information
1631 """apply a bundle10 on the repo, given an url and validation information
1636
1632
1637 All the information about the remote bundle to import are given as
1633 All the information about the remote bundle to import are given as
1638 parameters. The parameters include:
1634 parameters. The parameters include:
1639 - url: the url to the bundle10.
1635 - url: the url to the bundle10.
1640 - size: the bundle10 file size. It is used to validate what was
1636 - size: the bundle10 file size. It is used to validate what was
1641 retrieved by the client matches the server knowledge about the bundle.
1637 retrieved by the client matches the server knowledge about the bundle.
1642 - digests: a space separated list of the digest types provided as
1638 - digests: a space separated list of the digest types provided as
1643 parameters.
1639 parameters.
1644 - digest:<digest-type>: the hexadecimal representation of the digest with
1640 - digest:<digest-type>: the hexadecimal representation of the digest with
1645 that name. Like the size, it is used to validate what was retrieved by
1641 that name. Like the size, it is used to validate what was retrieved by
1646 the client matches what the server knows about the bundle.
1642 the client matches what the server knows about the bundle.
1647
1643
1648 When multiple digest types are given, all of them are checked.
1644 When multiple digest types are given, all of them are checked.
1649 """
1645 """
1650 try:
1646 try:
1651 raw_url = inpart.params['url']
1647 raw_url = inpart.params['url']
1652 except KeyError:
1648 except KeyError:
1653 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1649 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1654 parsed_url = util.url(raw_url)
1650 parsed_url = util.url(raw_url)
1655 if parsed_url.scheme not in capabilities['remote-changegroup']:
1651 if parsed_url.scheme not in capabilities['remote-changegroup']:
1656 raise error.Abort(_('remote-changegroup does not support %s urls') %
1652 raise error.Abort(_('remote-changegroup does not support %s urls') %
1657 parsed_url.scheme)
1653 parsed_url.scheme)
1658
1654
1659 try:
1655 try:
1660 size = int(inpart.params['size'])
1656 size = int(inpart.params['size'])
1661 except ValueError:
1657 except ValueError:
1662 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1658 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1663 % 'size')
1659 % 'size')
1664 except KeyError:
1660 except KeyError:
1665 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1661 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1666
1662
1667 digests = {}
1663 digests = {}
1668 for typ in inpart.params.get('digests', '').split():
1664 for typ in inpart.params.get('digests', '').split():
1669 param = 'digest:%s' % typ
1665 param = 'digest:%s' % typ
1670 try:
1666 try:
1671 value = inpart.params[param]
1667 value = inpart.params[param]
1672 except KeyError:
1668 except KeyError:
1673 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1669 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1674 param)
1670 param)
1675 digests[typ] = value
1671 digests[typ] = value
1676
1672
1677 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1673 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1678
1674
1679 tr = op.gettransaction()
1675 tr = op.gettransaction()
1680 from . import exchange
1676 from . import exchange
1681 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1677 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1682 if not isinstance(cg, changegroup.cg1unpacker):
1678 if not isinstance(cg, changegroup.cg1unpacker):
1683 raise error.Abort(_('%s: not a bundle version 1.0') %
1679 raise error.Abort(_('%s: not a bundle version 1.0') %
1684 util.hidepassword(raw_url))
1680 util.hidepassword(raw_url))
1685 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2')
1681 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2')
1686 if op.reply is not None:
1682 if op.reply is not None:
1687 # This is definitely not the final form of this
1683 # This is definitely not the final form of this
1688 # return. But one need to start somewhere.
1684 # return. But one need to start somewhere.
1689 part = op.reply.newpart('reply:changegroup')
1685 part = op.reply.newpart('reply:changegroup')
1690 part.addparam(
1686 part.addparam(
1691 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1687 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1692 part.addparam('return', '%i' % ret, mandatory=False)
1688 part.addparam('return', '%i' % ret, mandatory=False)
1693 try:
1689 try:
1694 real_part.validate()
1690 real_part.validate()
1695 except error.Abort as e:
1691 except error.Abort as e:
1696 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1692 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1697 (util.hidepassword(raw_url), str(e)))
1693 (util.hidepassword(raw_url), str(e)))
1698 assert not inpart.read()
1694 assert not inpart.read()
1699
1695
1700 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1696 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1701 def handlereplychangegroup(op, inpart):
1697 def handlereplychangegroup(op, inpart):
1702 ret = int(inpart.params['return'])
1698 ret = int(inpart.params['return'])
1703 replyto = int(inpart.params['in-reply-to'])
1699 replyto = int(inpart.params['in-reply-to'])
1704 op.records.add('changegroup', {'return': ret}, replyto)
1700 op.records.add('changegroup', {'return': ret}, replyto)
1705
1701
1706 @parthandler('check:heads')
1702 @parthandler('check:heads')
1707 def handlecheckheads(op, inpart):
1703 def handlecheckheads(op, inpart):
1708 """check that head of the repo did not change
1704 """check that head of the repo did not change
1709
1705
1710 This is used to detect a push race when using unbundle.
1706 This is used to detect a push race when using unbundle.
1711 This replaces the "heads" argument of unbundle."""
1707 This replaces the "heads" argument of unbundle."""
1712 h = inpart.read(20)
1708 h = inpart.read(20)
1713 heads = []
1709 heads = []
1714 while len(h) == 20:
1710 while len(h) == 20:
1715 heads.append(h)
1711 heads.append(h)
1716 h = inpart.read(20)
1712 h = inpart.read(20)
1717 assert not h
1713 assert not h
1718 # Trigger a transaction so that we are guaranteed to have the lock now.
1714 # Trigger a transaction so that we are guaranteed to have the lock now.
1719 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1715 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1720 op.gettransaction()
1716 op.gettransaction()
1721 if sorted(heads) != sorted(op.repo.heads()):
1717 if sorted(heads) != sorted(op.repo.heads()):
1722 raise error.PushRaced('repository changed while pushing - '
1718 raise error.PushRaced('repository changed while pushing - '
1723 'please try again')
1719 'please try again')
1724
1720
1725 @parthandler('check:updated-heads')
1721 @parthandler('check:updated-heads')
1726 def handlecheckupdatedheads(op, inpart):
1722 def handlecheckupdatedheads(op, inpart):
1727 """check for race on the heads touched by a push
1723 """check for race on the heads touched by a push
1728
1724
1729 This is similar to 'check:heads' but focus on the heads actually updated
1725 This is similar to 'check:heads' but focus on the heads actually updated
1730 during the push. If other activities happen on unrelated heads, it is
1726 during the push. If other activities happen on unrelated heads, it is
1731 ignored.
1727 ignored.
1732
1728
1733 This allow server with high traffic to avoid push contention as long as
1729 This allow server with high traffic to avoid push contention as long as
1734 unrelated parts of the graph are involved."""
1730 unrelated parts of the graph are involved."""
1735 h = inpart.read(20)
1731 h = inpart.read(20)
1736 heads = []
1732 heads = []
1737 while len(h) == 20:
1733 while len(h) == 20:
1738 heads.append(h)
1734 heads.append(h)
1739 h = inpart.read(20)
1735 h = inpart.read(20)
1740 assert not h
1736 assert not h
1741 # trigger a transaction so that we are guaranteed to have the lock now.
1737 # trigger a transaction so that we are guaranteed to have the lock now.
1742 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1738 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1743 op.gettransaction()
1739 op.gettransaction()
1744
1740
1745 currentheads = set()
1741 currentheads = set()
1746 for ls in op.repo.branchmap().itervalues():
1742 for ls in op.repo.branchmap().itervalues():
1747 currentheads.update(ls)
1743 currentheads.update(ls)
1748
1744
1749 for h in heads:
1745 for h in heads:
1750 if h not in currentheads:
1746 if h not in currentheads:
1751 raise error.PushRaced('repository changed while pushing - '
1747 raise error.PushRaced('repository changed while pushing - '
1752 'please try again')
1748 'please try again')
1753
1749
1754 @parthandler('output')
1750 @parthandler('output')
1755 def handleoutput(op, inpart):
1751 def handleoutput(op, inpart):
1756 """forward output captured on the server to the client"""
1752 """forward output captured on the server to the client"""
1757 for line in inpart.read().splitlines():
1753 for line in inpart.read().splitlines():
1758 op.ui.status(_('remote: %s\n') % line)
1754 op.ui.status(_('remote: %s\n') % line)
1759
1755
1760 @parthandler('replycaps')
1756 @parthandler('replycaps')
1761 def handlereplycaps(op, inpart):
1757 def handlereplycaps(op, inpart):
1762 """Notify that a reply bundle should be created
1758 """Notify that a reply bundle should be created
1763
1759
1764 The payload contains the capabilities information for the reply"""
1760 The payload contains the capabilities information for the reply"""
1765 caps = decodecaps(inpart.read())
1761 caps = decodecaps(inpart.read())
1766 if op.reply is None:
1762 if op.reply is None:
1767 op.reply = bundle20(op.ui, caps)
1763 op.reply = bundle20(op.ui, caps)
1768
1764
1769 class AbortFromPart(error.Abort):
1765 class AbortFromPart(error.Abort):
1770 """Sub-class of Abort that denotes an error from a bundle2 part."""
1766 """Sub-class of Abort that denotes an error from a bundle2 part."""
1771
1767
1772 @parthandler('error:abort', ('message', 'hint'))
1768 @parthandler('error:abort', ('message', 'hint'))
1773 def handleerrorabort(op, inpart):
1769 def handleerrorabort(op, inpart):
1774 """Used to transmit abort error over the wire"""
1770 """Used to transmit abort error over the wire"""
1775 raise AbortFromPart(inpart.params['message'],
1771 raise AbortFromPart(inpart.params['message'],
1776 hint=inpart.params.get('hint'))
1772 hint=inpart.params.get('hint'))
1777
1773
1778 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1774 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1779 'in-reply-to'))
1775 'in-reply-to'))
1780 def handleerrorpushkey(op, inpart):
1776 def handleerrorpushkey(op, inpart):
1781 """Used to transmit failure of a mandatory pushkey over the wire"""
1777 """Used to transmit failure of a mandatory pushkey over the wire"""
1782 kwargs = {}
1778 kwargs = {}
1783 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1779 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1784 value = inpart.params.get(name)
1780 value = inpart.params.get(name)
1785 if value is not None:
1781 if value is not None:
1786 kwargs[name] = value
1782 kwargs[name] = value
1787 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1783 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1788
1784
1789 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1785 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1790 def handleerrorunsupportedcontent(op, inpart):
1786 def handleerrorunsupportedcontent(op, inpart):
1791 """Used to transmit unknown content error over the wire"""
1787 """Used to transmit unknown content error over the wire"""
1792 kwargs = {}
1788 kwargs = {}
1793 parttype = inpart.params.get('parttype')
1789 parttype = inpart.params.get('parttype')
1794 if parttype is not None:
1790 if parttype is not None:
1795 kwargs['parttype'] = parttype
1791 kwargs['parttype'] = parttype
1796 params = inpart.params.get('params')
1792 params = inpart.params.get('params')
1797 if params is not None:
1793 if params is not None:
1798 kwargs['params'] = params.split('\0')
1794 kwargs['params'] = params.split('\0')
1799
1795
1800 raise error.BundleUnknownFeatureError(**kwargs)
1796 raise error.BundleUnknownFeatureError(**kwargs)
1801
1797
1802 @parthandler('error:pushraced', ('message',))
1798 @parthandler('error:pushraced', ('message',))
1803 def handleerrorpushraced(op, inpart):
1799 def handleerrorpushraced(op, inpart):
1804 """Used to transmit push race error over the wire"""
1800 """Used to transmit push race error over the wire"""
1805 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1801 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1806
1802
1807 @parthandler('listkeys', ('namespace',))
1803 @parthandler('listkeys', ('namespace',))
1808 def handlelistkeys(op, inpart):
1804 def handlelistkeys(op, inpart):
1809 """retrieve pushkey namespace content stored in a bundle2"""
1805 """retrieve pushkey namespace content stored in a bundle2"""
1810 namespace = inpart.params['namespace']
1806 namespace = inpart.params['namespace']
1811 r = pushkey.decodekeys(inpart.read())
1807 r = pushkey.decodekeys(inpart.read())
1812 op.records.add('listkeys', (namespace, r))
1808 op.records.add('listkeys', (namespace, r))
1813
1809
1814 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1810 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1815 def handlepushkey(op, inpart):
1811 def handlepushkey(op, inpart):
1816 """process a pushkey request"""
1812 """process a pushkey request"""
1817 dec = pushkey.decode
1813 dec = pushkey.decode
1818 namespace = dec(inpart.params['namespace'])
1814 namespace = dec(inpart.params['namespace'])
1819 key = dec(inpart.params['key'])
1815 key = dec(inpart.params['key'])
1820 old = dec(inpart.params['old'])
1816 old = dec(inpart.params['old'])
1821 new = dec(inpart.params['new'])
1817 new = dec(inpart.params['new'])
1822 # Grab the transaction to ensure that we have the lock before performing the
1818 # Grab the transaction to ensure that we have the lock before performing the
1823 # pushkey.
1819 # pushkey.
1824 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1820 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1825 op.gettransaction()
1821 op.gettransaction()
1826 ret = op.repo.pushkey(namespace, key, old, new)
1822 ret = op.repo.pushkey(namespace, key, old, new)
1827 record = {'namespace': namespace,
1823 record = {'namespace': namespace,
1828 'key': key,
1824 'key': key,
1829 'old': old,
1825 'old': old,
1830 'new': new}
1826 'new': new}
1831 op.records.add('pushkey', record)
1827 op.records.add('pushkey', record)
1832 if op.reply is not None:
1828 if op.reply is not None:
1833 rpart = op.reply.newpart('reply:pushkey')
1829 rpart = op.reply.newpart('reply:pushkey')
1834 rpart.addparam(
1830 rpart.addparam(
1835 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1831 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1836 rpart.addparam('return', '%i' % ret, mandatory=False)
1832 rpart.addparam('return', '%i' % ret, mandatory=False)
1837 if inpart.mandatory and not ret:
1833 if inpart.mandatory and not ret:
1838 kwargs = {}
1834 kwargs = {}
1839 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1835 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1840 if key in inpart.params:
1836 if key in inpart.params:
1841 kwargs[key] = inpart.params[key]
1837 kwargs[key] = inpart.params[key]
1842 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1838 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1843
1839
1844 def _readphaseheads(inpart):
1840 def _readphaseheads(inpart):
1845 headsbyphase = [[] for i in phases.allphases]
1841 headsbyphase = [[] for i in phases.allphases]
1846 entrysize = struct.calcsize(_fphasesentry)
1842 entrysize = struct.calcsize(_fphasesentry)
1847 while True:
1843 while True:
1848 entry = inpart.read(entrysize)
1844 entry = inpart.read(entrysize)
1849 if len(entry) < entrysize:
1845 if len(entry) < entrysize:
1850 if entry:
1846 if entry:
1851 raise error.Abort(_('bad phase-heads bundle part'))
1847 raise error.Abort(_('bad phase-heads bundle part'))
1852 break
1848 break
1853 phase, node = struct.unpack(_fphasesentry, entry)
1849 phase, node = struct.unpack(_fphasesentry, entry)
1854 headsbyphase[phase].append(node)
1850 headsbyphase[phase].append(node)
1855 return headsbyphase
1851 return headsbyphase
1856
1852
1857 @parthandler('phase-heads')
1853 @parthandler('phase-heads')
1858 def handlephases(op, inpart):
1854 def handlephases(op, inpart):
1859 """apply phases from bundle part to repo"""
1855 """apply phases from bundle part to repo"""
1860 headsbyphase = _readphaseheads(inpart)
1856 headsbyphase = _readphaseheads(inpart)
1861 phases.updatephases(op.repo.unfiltered(), op.gettransaction(), headsbyphase)
1857 phases.updatephases(op.repo.unfiltered(), op.gettransaction(), headsbyphase)
1862 op.records.add('phase-heads', {})
1858 op.records.add('phase-heads', {})
1863
1859
1864 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1860 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1865 def handlepushkeyreply(op, inpart):
1861 def handlepushkeyreply(op, inpart):
1866 """retrieve the result of a pushkey request"""
1862 """retrieve the result of a pushkey request"""
1867 ret = int(inpart.params['return'])
1863 ret = int(inpart.params['return'])
1868 partid = int(inpart.params['in-reply-to'])
1864 partid = int(inpart.params['in-reply-to'])
1869 op.records.add('pushkey', {'return': ret}, partid)
1865 op.records.add('pushkey', {'return': ret}, partid)
1870
1866
1871 @parthandler('obsmarkers')
1867 @parthandler('obsmarkers')
1872 def handleobsmarker(op, inpart):
1868 def handleobsmarker(op, inpart):
1873 """add a stream of obsmarkers to the repo"""
1869 """add a stream of obsmarkers to the repo"""
1874 tr = op.gettransaction()
1870 tr = op.gettransaction()
1875 markerdata = inpart.read()
1871 markerdata = inpart.read()
1876 if op.ui.config('experimental', 'obsmarkers-exchange-debug'):
1872 if op.ui.config('experimental', 'obsmarkers-exchange-debug'):
1877 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1873 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1878 % len(markerdata))
1874 % len(markerdata))
1879 # The mergemarkers call will crash if marker creation is not enabled.
1875 # The mergemarkers call will crash if marker creation is not enabled.
1880 # we want to avoid this if the part is advisory.
1876 # we want to avoid this if the part is advisory.
1881 if not inpart.mandatory and op.repo.obsstore.readonly:
1877 if not inpart.mandatory and op.repo.obsstore.readonly:
1882 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1878 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1883 return
1879 return
1884 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1880 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1885 op.repo.invalidatevolatilesets()
1881 op.repo.invalidatevolatilesets()
1886 if new:
1882 if new:
1887 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1883 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1888 op.records.add('obsmarkers', {'new': new})
1884 op.records.add('obsmarkers', {'new': new})
1889 if op.reply is not None:
1885 if op.reply is not None:
1890 rpart = op.reply.newpart('reply:obsmarkers')
1886 rpart = op.reply.newpart('reply:obsmarkers')
1891 rpart.addparam(
1887 rpart.addparam(
1892 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1888 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1893 rpart.addparam('new', '%i' % new, mandatory=False)
1889 rpart.addparam('new', '%i' % new, mandatory=False)
1894
1890
1895
1891
1896 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1892 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1897 def handleobsmarkerreply(op, inpart):
1893 def handleobsmarkerreply(op, inpart):
1898 """retrieve the result of a pushkey request"""
1894 """retrieve the result of a pushkey request"""
1899 ret = int(inpart.params['new'])
1895 ret = int(inpart.params['new'])
1900 partid = int(inpart.params['in-reply-to'])
1896 partid = int(inpart.params['in-reply-to'])
1901 op.records.add('obsmarkers', {'new': ret}, partid)
1897 op.records.add('obsmarkers', {'new': ret}, partid)
1902
1898
1903 @parthandler('hgtagsfnodes')
1899 @parthandler('hgtagsfnodes')
1904 def handlehgtagsfnodes(op, inpart):
1900 def handlehgtagsfnodes(op, inpart):
1905 """Applies .hgtags fnodes cache entries to the local repo.
1901 """Applies .hgtags fnodes cache entries to the local repo.
1906
1902
1907 Payload is pairs of 20 byte changeset nodes and filenodes.
1903 Payload is pairs of 20 byte changeset nodes and filenodes.
1908 """
1904 """
1909 # Grab the transaction so we ensure that we have the lock at this point.
1905 # Grab the transaction so we ensure that we have the lock at this point.
1910 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1906 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1911 op.gettransaction()
1907 op.gettransaction()
1912 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1908 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1913
1909
1914 count = 0
1910 count = 0
1915 while True:
1911 while True:
1916 node = inpart.read(20)
1912 node = inpart.read(20)
1917 fnode = inpart.read(20)
1913 fnode = inpart.read(20)
1918 if len(node) < 20 or len(fnode) < 20:
1914 if len(node) < 20 or len(fnode) < 20:
1919 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1915 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1920 break
1916 break
1921 cache.setfnode(node, fnode)
1917 cache.setfnode(node, fnode)
1922 count += 1
1918 count += 1
1923
1919
1924 cache.write()
1920 cache.write()
1925 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
1921 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
1926
1922
1927 @parthandler('pushvars')
1923 @parthandler('pushvars')
1928 def bundle2getvars(op, part):
1924 def bundle2getvars(op, part):
1929 '''unbundle a bundle2 containing shellvars on the server'''
1925 '''unbundle a bundle2 containing shellvars on the server'''
1930 # An option to disable unbundling on server-side for security reasons
1926 # An option to disable unbundling on server-side for security reasons
1931 if op.ui.configbool('push', 'pushvars.server'):
1927 if op.ui.configbool('push', 'pushvars.server'):
1932 hookargs = {}
1928 hookargs = {}
1933 for key, value in part.advisoryparams:
1929 for key, value in part.advisoryparams:
1934 key = key.upper()
1930 key = key.upper()
1935 # We want pushed variables to have USERVAR_ prepended so we know
1931 # We want pushed variables to have USERVAR_ prepended so we know
1936 # they came from the --pushvar flag.
1932 # they came from the --pushvar flag.
1937 key = "USERVAR_" + key
1933 key = "USERVAR_" + key
1938 hookargs[key] = value
1934 hookargs[key] = value
1939 op.addhookargs(hookargs)
1935 op.addhookargs(hookargs)
General Comments 0
You need to be logged in to leave comments. Login now