##// END OF EJS Templates
bundle: extract _processchangegroup() method...
Martin von Zweigbergk -
r33038:f0efd2bf default
parent child Browse files
Show More
@@ -1,1839 +1,1839
1 # bundle2.py - generic container format to transmit arbitrary data.
1 # bundle2.py - generic container format to transmit arbitrary data.
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7 """Handling of the new bundle2 format
7 """Handling of the new bundle2 format
8
8
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 payloads in an application agnostic way. It consist in a sequence of "parts"
10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 that will be handed to and processed by the application layer.
11 that will be handed to and processed by the application layer.
12
12
13
13
14 General format architecture
14 General format architecture
15 ===========================
15 ===========================
16
16
17 The format is architectured as follow
17 The format is architectured as follow
18
18
19 - magic string
19 - magic string
20 - stream level parameters
20 - stream level parameters
21 - payload parts (any number)
21 - payload parts (any number)
22 - end of stream marker.
22 - end of stream marker.
23
23
24 the Binary format
24 the Binary format
25 ============================
25 ============================
26
26
27 All numbers are unsigned and big-endian.
27 All numbers are unsigned and big-endian.
28
28
29 stream level parameters
29 stream level parameters
30 ------------------------
30 ------------------------
31
31
32 Binary format is as follow
32 Binary format is as follow
33
33
34 :params size: int32
34 :params size: int32
35
35
36 The total number of Bytes used by the parameters
36 The total number of Bytes used by the parameters
37
37
38 :params value: arbitrary number of Bytes
38 :params value: arbitrary number of Bytes
39
39
40 A blob of `params size` containing the serialized version of all stream level
40 A blob of `params size` containing the serialized version of all stream level
41 parameters.
41 parameters.
42
42
43 The blob contains a space separated list of parameters. Parameters with value
43 The blob contains a space separated list of parameters. Parameters with value
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45
45
46 Empty name are obviously forbidden.
46 Empty name are obviously forbidden.
47
47
48 Name MUST start with a letter. If this first letter is lower case, the
48 Name MUST start with a letter. If this first letter is lower case, the
49 parameter is advisory and can be safely ignored. However when the first
49 parameter is advisory and can be safely ignored. However when the first
50 letter is capital, the parameter is mandatory and the bundling process MUST
50 letter is capital, the parameter is mandatory and the bundling process MUST
51 stop if he is not able to proceed it.
51 stop if he is not able to proceed it.
52
52
53 Stream parameters use a simple textual format for two main reasons:
53 Stream parameters use a simple textual format for two main reasons:
54
54
55 - Stream level parameters should remain simple and we want to discourage any
55 - Stream level parameters should remain simple and we want to discourage any
56 crazy usage.
56 crazy usage.
57 - Textual data allow easy human inspection of a bundle2 header in case of
57 - Textual data allow easy human inspection of a bundle2 header in case of
58 troubles.
58 troubles.
59
59
60 Any Applicative level options MUST go into a bundle2 part instead.
60 Any Applicative level options MUST go into a bundle2 part instead.
61
61
62 Payload part
62 Payload part
63 ------------------------
63 ------------------------
64
64
65 Binary format is as follow
65 Binary format is as follow
66
66
67 :header size: int32
67 :header size: int32
68
68
69 The total number of Bytes used by the part header. When the header is empty
69 The total number of Bytes used by the part header. When the header is empty
70 (size = 0) this is interpreted as the end of stream marker.
70 (size = 0) this is interpreted as the end of stream marker.
71
71
72 :header:
72 :header:
73
73
74 The header defines how to interpret the part. It contains two piece of
74 The header defines how to interpret the part. It contains two piece of
75 data: the part type, and the part parameters.
75 data: the part type, and the part parameters.
76
76
77 The part type is used to route an application level handler, that can
77 The part type is used to route an application level handler, that can
78 interpret payload.
78 interpret payload.
79
79
80 Part parameters are passed to the application level handler. They are
80 Part parameters are passed to the application level handler. They are
81 meant to convey information that will help the application level object to
81 meant to convey information that will help the application level object to
82 interpret the part payload.
82 interpret the part payload.
83
83
84 The binary format of the header is has follow
84 The binary format of the header is has follow
85
85
86 :typesize: (one byte)
86 :typesize: (one byte)
87
87
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89
89
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 to this part.
91 to this part.
92
92
93 :parameters:
93 :parameters:
94
94
95 Part's parameter may have arbitrary content, the binary structure is::
95 Part's parameter may have arbitrary content, the binary structure is::
96
96
97 <mandatory-count><advisory-count><param-sizes><param-data>
97 <mandatory-count><advisory-count><param-sizes><param-data>
98
98
99 :mandatory-count: 1 byte, number of mandatory parameters
99 :mandatory-count: 1 byte, number of mandatory parameters
100
100
101 :advisory-count: 1 byte, number of advisory parameters
101 :advisory-count: 1 byte, number of advisory parameters
102
102
103 :param-sizes:
103 :param-sizes:
104
104
105 N couple of bytes, where N is the total number of parameters. Each
105 N couple of bytes, where N is the total number of parameters. Each
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107
107
108 :param-data:
108 :param-data:
109
109
110 A blob of bytes from which each parameter key and value can be
110 A blob of bytes from which each parameter key and value can be
111 retrieved using the list of size couples stored in the previous
111 retrieved using the list of size couples stored in the previous
112 field.
112 field.
113
113
114 Mandatory parameters comes first, then the advisory ones.
114 Mandatory parameters comes first, then the advisory ones.
115
115
116 Each parameter's key MUST be unique within the part.
116 Each parameter's key MUST be unique within the part.
117
117
118 :payload:
118 :payload:
119
119
120 payload is a series of `<chunksize><chunkdata>`.
120 payload is a series of `<chunksize><chunkdata>`.
121
121
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124
124
125 The current implementation always produces either zero or one chunk.
125 The current implementation always produces either zero or one chunk.
126 This is an implementation limitation that will ultimately be lifted.
126 This is an implementation limitation that will ultimately be lifted.
127
127
128 `chunksize` can be negative to trigger special case processing. No such
128 `chunksize` can be negative to trigger special case processing. No such
129 processing is in place yet.
129 processing is in place yet.
130
130
131 Bundle processing
131 Bundle processing
132 ============================
132 ============================
133
133
134 Each part is processed in order using a "part handler". Handler are registered
134 Each part is processed in order using a "part handler". Handler are registered
135 for a certain part type.
135 for a certain part type.
136
136
137 The matching of a part to its handler is case insensitive. The case of the
137 The matching of a part to its handler is case insensitive. The case of the
138 part type is used to know if a part is mandatory or advisory. If the Part type
138 part type is used to know if a part is mandatory or advisory. If the Part type
139 contains any uppercase char it is considered mandatory. When no handler is
139 contains any uppercase char it is considered mandatory. When no handler is
140 known for a Mandatory part, the process is aborted and an exception is raised.
140 known for a Mandatory part, the process is aborted and an exception is raised.
141 If the part is advisory and no handler is known, the part is ignored. When the
141 If the part is advisory and no handler is known, the part is ignored. When the
142 process is aborted, the full bundle is still read from the stream to keep the
142 process is aborted, the full bundle is still read from the stream to keep the
143 channel usable. But none of the part read from an abort are processed. In the
143 channel usable. But none of the part read from an abort are processed. In the
144 future, dropping the stream may become an option for channel we do not care to
144 future, dropping the stream may become an option for channel we do not care to
145 preserve.
145 preserve.
146 """
146 """
147
147
148 from __future__ import absolute_import
148 from __future__ import absolute_import
149
149
150 import errno
150 import errno
151 import re
151 import re
152 import string
152 import string
153 import struct
153 import struct
154 import sys
154 import sys
155
155
156 from .i18n import _
156 from .i18n import _
157 from . import (
157 from . import (
158 changegroup,
158 changegroup,
159 error,
159 error,
160 obsolete,
160 obsolete,
161 phases,
161 phases,
162 pushkey,
162 pushkey,
163 pycompat,
163 pycompat,
164 tags,
164 tags,
165 url,
165 url,
166 util,
166 util,
167 )
167 )
168
168
169 urlerr = util.urlerr
169 urlerr = util.urlerr
170 urlreq = util.urlreq
170 urlreq = util.urlreq
171
171
172 _pack = struct.pack
172 _pack = struct.pack
173 _unpack = struct.unpack
173 _unpack = struct.unpack
174
174
175 _fstreamparamsize = '>i'
175 _fstreamparamsize = '>i'
176 _fpartheadersize = '>i'
176 _fpartheadersize = '>i'
177 _fparttypesize = '>B'
177 _fparttypesize = '>B'
178 _fpartid = '>I'
178 _fpartid = '>I'
179 _fpayloadsize = '>i'
179 _fpayloadsize = '>i'
180 _fpartparamcount = '>BB'
180 _fpartparamcount = '>BB'
181
181
182 _fphasesentry = '>i20s'
182 _fphasesentry = '>i20s'
183
183
184 preferedchunksize = 4096
184 preferedchunksize = 4096
185
185
186 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
186 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
187
187
188 def outdebug(ui, message):
188 def outdebug(ui, message):
189 """debug regarding output stream (bundling)"""
189 """debug regarding output stream (bundling)"""
190 if ui.configbool('devel', 'bundle2.debug', False):
190 if ui.configbool('devel', 'bundle2.debug', False):
191 ui.debug('bundle2-output: %s\n' % message)
191 ui.debug('bundle2-output: %s\n' % message)
192
192
193 def indebug(ui, message):
193 def indebug(ui, message):
194 """debug on input stream (unbundling)"""
194 """debug on input stream (unbundling)"""
195 if ui.configbool('devel', 'bundle2.debug', False):
195 if ui.configbool('devel', 'bundle2.debug', False):
196 ui.debug('bundle2-input: %s\n' % message)
196 ui.debug('bundle2-input: %s\n' % message)
197
197
198 def validateparttype(parttype):
198 def validateparttype(parttype):
199 """raise ValueError if a parttype contains invalid character"""
199 """raise ValueError if a parttype contains invalid character"""
200 if _parttypeforbidden.search(parttype):
200 if _parttypeforbidden.search(parttype):
201 raise ValueError(parttype)
201 raise ValueError(parttype)
202
202
203 def _makefpartparamsizes(nbparams):
203 def _makefpartparamsizes(nbparams):
204 """return a struct format to read part parameter sizes
204 """return a struct format to read part parameter sizes
205
205
206 The number parameters is variable so we need to build that format
206 The number parameters is variable so we need to build that format
207 dynamically.
207 dynamically.
208 """
208 """
209 return '>'+('BB'*nbparams)
209 return '>'+('BB'*nbparams)
210
210
211 parthandlermapping = {}
211 parthandlermapping = {}
212
212
213 def parthandler(parttype, params=()):
213 def parthandler(parttype, params=()):
214 """decorator that register a function as a bundle2 part handler
214 """decorator that register a function as a bundle2 part handler
215
215
216 eg::
216 eg::
217
217
218 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
218 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
219 def myparttypehandler(...):
219 def myparttypehandler(...):
220 '''process a part of type "my part".'''
220 '''process a part of type "my part".'''
221 ...
221 ...
222 """
222 """
223 validateparttype(parttype)
223 validateparttype(parttype)
224 def _decorator(func):
224 def _decorator(func):
225 lparttype = parttype.lower() # enforce lower case matching.
225 lparttype = parttype.lower() # enforce lower case matching.
226 assert lparttype not in parthandlermapping
226 assert lparttype not in parthandlermapping
227 parthandlermapping[lparttype] = func
227 parthandlermapping[lparttype] = func
228 func.params = frozenset(params)
228 func.params = frozenset(params)
229 return func
229 return func
230 return _decorator
230 return _decorator
231
231
232 class unbundlerecords(object):
232 class unbundlerecords(object):
233 """keep record of what happens during and unbundle
233 """keep record of what happens during and unbundle
234
234
235 New records are added using `records.add('cat', obj)`. Where 'cat' is a
235 New records are added using `records.add('cat', obj)`. Where 'cat' is a
236 category of record and obj is an arbitrary object.
236 category of record and obj is an arbitrary object.
237
237
238 `records['cat']` will return all entries of this category 'cat'.
238 `records['cat']` will return all entries of this category 'cat'.
239
239
240 Iterating on the object itself will yield `('category', obj)` tuples
240 Iterating on the object itself will yield `('category', obj)` tuples
241 for all entries.
241 for all entries.
242
242
243 All iterations happens in chronological order.
243 All iterations happens in chronological order.
244 """
244 """
245
245
246 def __init__(self):
246 def __init__(self):
247 self._categories = {}
247 self._categories = {}
248 self._sequences = []
248 self._sequences = []
249 self._replies = {}
249 self._replies = {}
250
250
251 def add(self, category, entry, inreplyto=None):
251 def add(self, category, entry, inreplyto=None):
252 """add a new record of a given category.
252 """add a new record of a given category.
253
253
254 The entry can then be retrieved in the list returned by
254 The entry can then be retrieved in the list returned by
255 self['category']."""
255 self['category']."""
256 self._categories.setdefault(category, []).append(entry)
256 self._categories.setdefault(category, []).append(entry)
257 self._sequences.append((category, entry))
257 self._sequences.append((category, entry))
258 if inreplyto is not None:
258 if inreplyto is not None:
259 self.getreplies(inreplyto).add(category, entry)
259 self.getreplies(inreplyto).add(category, entry)
260
260
261 def getreplies(self, partid):
261 def getreplies(self, partid):
262 """get the records that are replies to a specific part"""
262 """get the records that are replies to a specific part"""
263 return self._replies.setdefault(partid, unbundlerecords())
263 return self._replies.setdefault(partid, unbundlerecords())
264
264
265 def __getitem__(self, cat):
265 def __getitem__(self, cat):
266 return tuple(self._categories.get(cat, ()))
266 return tuple(self._categories.get(cat, ()))
267
267
268 def __iter__(self):
268 def __iter__(self):
269 return iter(self._sequences)
269 return iter(self._sequences)
270
270
271 def __len__(self):
271 def __len__(self):
272 return len(self._sequences)
272 return len(self._sequences)
273
273
274 def __nonzero__(self):
274 def __nonzero__(self):
275 return bool(self._sequences)
275 return bool(self._sequences)
276
276
277 __bool__ = __nonzero__
277 __bool__ = __nonzero__
278
278
279 class bundleoperation(object):
279 class bundleoperation(object):
280 """an object that represents a single bundling process
280 """an object that represents a single bundling process
281
281
282 Its purpose is to carry unbundle-related objects and states.
282 Its purpose is to carry unbundle-related objects and states.
283
283
284 A new object should be created at the beginning of each bundle processing.
284 A new object should be created at the beginning of each bundle processing.
285 The object is to be returned by the processing function.
285 The object is to be returned by the processing function.
286
286
287 The object has very little content now it will ultimately contain:
287 The object has very little content now it will ultimately contain:
288 * an access to the repo the bundle is applied to,
288 * an access to the repo the bundle is applied to,
289 * a ui object,
289 * a ui object,
290 * a way to retrieve a transaction to add changes to the repo,
290 * a way to retrieve a transaction to add changes to the repo,
291 * a way to record the result of processing each part,
291 * a way to record the result of processing each part,
292 * a way to construct a bundle response when applicable.
292 * a way to construct a bundle response when applicable.
293 """
293 """
294
294
295 def __init__(self, repo, transactiongetter, captureoutput=True):
295 def __init__(self, repo, transactiongetter, captureoutput=True):
296 self.repo = repo
296 self.repo = repo
297 self.ui = repo.ui
297 self.ui = repo.ui
298 self.records = unbundlerecords()
298 self.records = unbundlerecords()
299 self.gettransaction = transactiongetter
299 self.gettransaction = transactiongetter
300 self.reply = None
300 self.reply = None
301 self.captureoutput = captureoutput
301 self.captureoutput = captureoutput
302
302
303 class TransactionUnavailable(RuntimeError):
303 class TransactionUnavailable(RuntimeError):
304 pass
304 pass
305
305
306 def _notransaction():
306 def _notransaction():
307 """default method to get a transaction while processing a bundle
307 """default method to get a transaction while processing a bundle
308
308
309 Raise an exception to highlight the fact that no transaction was expected
309 Raise an exception to highlight the fact that no transaction was expected
310 to be created"""
310 to be created"""
311 raise TransactionUnavailable()
311 raise TransactionUnavailable()
312
312
313 def applybundle(repo, unbundler, tr, source=None, url=None):
313 def applybundle(repo, unbundler, tr, source=None, url=None):
314 # transform me into unbundler.apply() as soon as the freeze is lifted
314 # transform me into unbundler.apply() as soon as the freeze is lifted
315 tr.hookargs['bundle2'] = '1'
315 tr.hookargs['bundle2'] = '1'
316 if source is not None and 'source' not in tr.hookargs:
316 if source is not None and 'source' not in tr.hookargs:
317 tr.hookargs['source'] = source
317 tr.hookargs['source'] = source
318 if url is not None and 'url' not in tr.hookargs:
318 if url is not None and 'url' not in tr.hookargs:
319 tr.hookargs['url'] = url
319 tr.hookargs['url'] = url
320 return processbundle(repo, unbundler, lambda: tr)
320 return processbundle(repo, unbundler, lambda: tr)
321
321
322 def processbundle(repo, unbundler, transactiongetter=None, op=None):
322 def processbundle(repo, unbundler, transactiongetter=None, op=None):
323 """This function process a bundle, apply effect to/from a repo
323 """This function process a bundle, apply effect to/from a repo
324
324
325 It iterates over each part then searches for and uses the proper handling
325 It iterates over each part then searches for and uses the proper handling
326 code to process the part. Parts are processed in order.
326 code to process the part. Parts are processed in order.
327
327
328 Unknown Mandatory part will abort the process.
328 Unknown Mandatory part will abort the process.
329
329
330 It is temporarily possible to provide a prebuilt bundleoperation to the
330 It is temporarily possible to provide a prebuilt bundleoperation to the
331 function. This is used to ensure output is properly propagated in case of
331 function. This is used to ensure output is properly propagated in case of
332 an error during the unbundling. This output capturing part will likely be
332 an error during the unbundling. This output capturing part will likely be
333 reworked and this ability will probably go away in the process.
333 reworked and this ability will probably go away in the process.
334 """
334 """
335 if op is None:
335 if op is None:
336 if transactiongetter is None:
336 if transactiongetter is None:
337 transactiongetter = _notransaction
337 transactiongetter = _notransaction
338 op = bundleoperation(repo, transactiongetter)
338 op = bundleoperation(repo, transactiongetter)
339 # todo:
339 # todo:
340 # - replace this is a init function soon.
340 # - replace this is a init function soon.
341 # - exception catching
341 # - exception catching
342 unbundler.params
342 unbundler.params
343 if repo.ui.debugflag:
343 if repo.ui.debugflag:
344 msg = ['bundle2-input-bundle:']
344 msg = ['bundle2-input-bundle:']
345 if unbundler.params:
345 if unbundler.params:
346 msg.append(' %i params')
346 msg.append(' %i params')
347 if op.gettransaction is None or op.gettransaction is _notransaction:
347 if op.gettransaction is None or op.gettransaction is _notransaction:
348 msg.append(' no-transaction')
348 msg.append(' no-transaction')
349 else:
349 else:
350 msg.append(' with-transaction')
350 msg.append(' with-transaction')
351 msg.append('\n')
351 msg.append('\n')
352 repo.ui.debug(''.join(msg))
352 repo.ui.debug(''.join(msg))
353 iterparts = enumerate(unbundler.iterparts())
353 iterparts = enumerate(unbundler.iterparts())
354 part = None
354 part = None
355 nbpart = 0
355 nbpart = 0
356 try:
356 try:
357 for nbpart, part in iterparts:
357 for nbpart, part in iterparts:
358 _processpart(op, part)
358 _processpart(op, part)
359 except Exception as exc:
359 except Exception as exc:
360 # Any exceptions seeking to the end of the bundle at this point are
360 # Any exceptions seeking to the end of the bundle at this point are
361 # almost certainly related to the underlying stream being bad.
361 # almost certainly related to the underlying stream being bad.
362 # And, chances are that the exception we're handling is related to
362 # And, chances are that the exception we're handling is related to
363 # getting in that bad state. So, we swallow the seeking error and
363 # getting in that bad state. So, we swallow the seeking error and
364 # re-raise the original error.
364 # re-raise the original error.
365 seekerror = False
365 seekerror = False
366 try:
366 try:
367 for nbpart, part in iterparts:
367 for nbpart, part in iterparts:
368 # consume the bundle content
368 # consume the bundle content
369 part.seek(0, 2)
369 part.seek(0, 2)
370 except Exception:
370 except Exception:
371 seekerror = True
371 seekerror = True
372
372
373 # Small hack to let caller code distinguish exceptions from bundle2
373 # Small hack to let caller code distinguish exceptions from bundle2
374 # processing from processing the old format. This is mostly
374 # processing from processing the old format. This is mostly
375 # needed to handle different return codes to unbundle according to the
375 # needed to handle different return codes to unbundle according to the
376 # type of bundle. We should probably clean up or drop this return code
376 # type of bundle. We should probably clean up or drop this return code
377 # craziness in a future version.
377 # craziness in a future version.
378 exc.duringunbundle2 = True
378 exc.duringunbundle2 = True
379 salvaged = []
379 salvaged = []
380 replycaps = None
380 replycaps = None
381 if op.reply is not None:
381 if op.reply is not None:
382 salvaged = op.reply.salvageoutput()
382 salvaged = op.reply.salvageoutput()
383 replycaps = op.reply.capabilities
383 replycaps = op.reply.capabilities
384 exc._replycaps = replycaps
384 exc._replycaps = replycaps
385 exc._bundle2salvagedoutput = salvaged
385 exc._bundle2salvagedoutput = salvaged
386
386
387 # Re-raising from a variable loses the original stack. So only use
387 # Re-raising from a variable loses the original stack. So only use
388 # that form if we need to.
388 # that form if we need to.
389 if seekerror:
389 if seekerror:
390 raise exc
390 raise exc
391 else:
391 else:
392 raise
392 raise
393 finally:
393 finally:
394 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
394 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
395
395
396 return op
396 return op
397
397
398 def _processchangegroup(op, cg, tr, source, url, **kwargs):
399 ret, addednodes = cg.apply(op.repo, tr, source, url, **kwargs)
400 op.records.add('changegroup', {
401 'return': ret,
402 'addednodes': addednodes,
403 })
404 return ret
405
398 def _processpart(op, part):
406 def _processpart(op, part):
399 """process a single part from a bundle
407 """process a single part from a bundle
400
408
401 The part is guaranteed to have been fully consumed when the function exits
409 The part is guaranteed to have been fully consumed when the function exits
402 (even if an exception is raised)."""
410 (even if an exception is raised)."""
403 status = 'unknown' # used by debug output
411 status = 'unknown' # used by debug output
404 hardabort = False
412 hardabort = False
405 try:
413 try:
406 try:
414 try:
407 handler = parthandlermapping.get(part.type)
415 handler = parthandlermapping.get(part.type)
408 if handler is None:
416 if handler is None:
409 status = 'unsupported-type'
417 status = 'unsupported-type'
410 raise error.BundleUnknownFeatureError(parttype=part.type)
418 raise error.BundleUnknownFeatureError(parttype=part.type)
411 indebug(op.ui, 'found a handler for part %r' % part.type)
419 indebug(op.ui, 'found a handler for part %r' % part.type)
412 unknownparams = part.mandatorykeys - handler.params
420 unknownparams = part.mandatorykeys - handler.params
413 if unknownparams:
421 if unknownparams:
414 unknownparams = list(unknownparams)
422 unknownparams = list(unknownparams)
415 unknownparams.sort()
423 unknownparams.sort()
416 status = 'unsupported-params (%s)' % unknownparams
424 status = 'unsupported-params (%s)' % unknownparams
417 raise error.BundleUnknownFeatureError(parttype=part.type,
425 raise error.BundleUnknownFeatureError(parttype=part.type,
418 params=unknownparams)
426 params=unknownparams)
419 status = 'supported'
427 status = 'supported'
420 except error.BundleUnknownFeatureError as exc:
428 except error.BundleUnknownFeatureError as exc:
421 if part.mandatory: # mandatory parts
429 if part.mandatory: # mandatory parts
422 raise
430 raise
423 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
431 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
424 return # skip to part processing
432 return # skip to part processing
425 finally:
433 finally:
426 if op.ui.debugflag:
434 if op.ui.debugflag:
427 msg = ['bundle2-input-part: "%s"' % part.type]
435 msg = ['bundle2-input-part: "%s"' % part.type]
428 if not part.mandatory:
436 if not part.mandatory:
429 msg.append(' (advisory)')
437 msg.append(' (advisory)')
430 nbmp = len(part.mandatorykeys)
438 nbmp = len(part.mandatorykeys)
431 nbap = len(part.params) - nbmp
439 nbap = len(part.params) - nbmp
432 if nbmp or nbap:
440 if nbmp or nbap:
433 msg.append(' (params:')
441 msg.append(' (params:')
434 if nbmp:
442 if nbmp:
435 msg.append(' %i mandatory' % nbmp)
443 msg.append(' %i mandatory' % nbmp)
436 if nbap:
444 if nbap:
437 msg.append(' %i advisory' % nbmp)
445 msg.append(' %i advisory' % nbmp)
438 msg.append(')')
446 msg.append(')')
439 msg.append(' %s\n' % status)
447 msg.append(' %s\n' % status)
440 op.ui.debug(''.join(msg))
448 op.ui.debug(''.join(msg))
441
449
442 # handler is called outside the above try block so that we don't
450 # handler is called outside the above try block so that we don't
443 # risk catching KeyErrors from anything other than the
451 # risk catching KeyErrors from anything other than the
444 # parthandlermapping lookup (any KeyError raised by handler()
452 # parthandlermapping lookup (any KeyError raised by handler()
445 # itself represents a defect of a different variety).
453 # itself represents a defect of a different variety).
446 output = None
454 output = None
447 if op.captureoutput and op.reply is not None:
455 if op.captureoutput and op.reply is not None:
448 op.ui.pushbuffer(error=True, subproc=True)
456 op.ui.pushbuffer(error=True, subproc=True)
449 output = ''
457 output = ''
450 try:
458 try:
451 handler(op, part)
459 handler(op, part)
452 finally:
460 finally:
453 if output is not None:
461 if output is not None:
454 output = op.ui.popbuffer()
462 output = op.ui.popbuffer()
455 if output:
463 if output:
456 outpart = op.reply.newpart('output', data=output,
464 outpart = op.reply.newpart('output', data=output,
457 mandatory=False)
465 mandatory=False)
458 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
466 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
459 # If exiting or interrupted, do not attempt to seek the stream in the
467 # If exiting or interrupted, do not attempt to seek the stream in the
460 # finally block below. This makes abort faster.
468 # finally block below. This makes abort faster.
461 except (SystemExit, KeyboardInterrupt):
469 except (SystemExit, KeyboardInterrupt):
462 hardabort = True
470 hardabort = True
463 raise
471 raise
464 finally:
472 finally:
465 # consume the part content to not corrupt the stream.
473 # consume the part content to not corrupt the stream.
466 if not hardabort:
474 if not hardabort:
467 part.seek(0, 2)
475 part.seek(0, 2)
468
476
469
477
470 def decodecaps(blob):
478 def decodecaps(blob):
471 """decode a bundle2 caps bytes blob into a dictionary
479 """decode a bundle2 caps bytes blob into a dictionary
472
480
473 The blob is a list of capabilities (one per line)
481 The blob is a list of capabilities (one per line)
474 Capabilities may have values using a line of the form::
482 Capabilities may have values using a line of the form::
475
483
476 capability=value1,value2,value3
484 capability=value1,value2,value3
477
485
478 The values are always a list."""
486 The values are always a list."""
479 caps = {}
487 caps = {}
480 for line in blob.splitlines():
488 for line in blob.splitlines():
481 if not line:
489 if not line:
482 continue
490 continue
483 if '=' not in line:
491 if '=' not in line:
484 key, vals = line, ()
492 key, vals = line, ()
485 else:
493 else:
486 key, vals = line.split('=', 1)
494 key, vals = line.split('=', 1)
487 vals = vals.split(',')
495 vals = vals.split(',')
488 key = urlreq.unquote(key)
496 key = urlreq.unquote(key)
489 vals = [urlreq.unquote(v) for v in vals]
497 vals = [urlreq.unquote(v) for v in vals]
490 caps[key] = vals
498 caps[key] = vals
491 return caps
499 return caps
492
500
493 def encodecaps(caps):
501 def encodecaps(caps):
494 """encode a bundle2 caps dictionary into a bytes blob"""
502 """encode a bundle2 caps dictionary into a bytes blob"""
495 chunks = []
503 chunks = []
496 for ca in sorted(caps):
504 for ca in sorted(caps):
497 vals = caps[ca]
505 vals = caps[ca]
498 ca = urlreq.quote(ca)
506 ca = urlreq.quote(ca)
499 vals = [urlreq.quote(v) for v in vals]
507 vals = [urlreq.quote(v) for v in vals]
500 if vals:
508 if vals:
501 ca = "%s=%s" % (ca, ','.join(vals))
509 ca = "%s=%s" % (ca, ','.join(vals))
502 chunks.append(ca)
510 chunks.append(ca)
503 return '\n'.join(chunks)
511 return '\n'.join(chunks)
504
512
505 bundletypes = {
513 bundletypes = {
506 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
514 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
507 # since the unification ssh accepts a header but there
515 # since the unification ssh accepts a header but there
508 # is no capability signaling it.
516 # is no capability signaling it.
509 "HG20": (), # special-cased below
517 "HG20": (), # special-cased below
510 "HG10UN": ("HG10UN", 'UN'),
518 "HG10UN": ("HG10UN", 'UN'),
511 "HG10BZ": ("HG10", 'BZ'),
519 "HG10BZ": ("HG10", 'BZ'),
512 "HG10GZ": ("HG10GZ", 'GZ'),
520 "HG10GZ": ("HG10GZ", 'GZ'),
513 }
521 }
514
522
515 # hgweb uses this list to communicate its preferred type
523 # hgweb uses this list to communicate its preferred type
516 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
524 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
517
525
518 class bundle20(object):
526 class bundle20(object):
519 """represent an outgoing bundle2 container
527 """represent an outgoing bundle2 container
520
528
521 Use the `addparam` method to add stream level parameter. and `newpart` to
529 Use the `addparam` method to add stream level parameter. and `newpart` to
522 populate it. Then call `getchunks` to retrieve all the binary chunks of
530 populate it. Then call `getchunks` to retrieve all the binary chunks of
523 data that compose the bundle2 container."""
531 data that compose the bundle2 container."""
524
532
525 _magicstring = 'HG20'
533 _magicstring = 'HG20'
526
534
527 def __init__(self, ui, capabilities=()):
535 def __init__(self, ui, capabilities=()):
528 self.ui = ui
536 self.ui = ui
529 self._params = []
537 self._params = []
530 self._parts = []
538 self._parts = []
531 self.capabilities = dict(capabilities)
539 self.capabilities = dict(capabilities)
532 self._compengine = util.compengines.forbundletype('UN')
540 self._compengine = util.compengines.forbundletype('UN')
533 self._compopts = None
541 self._compopts = None
534
542
535 def setcompression(self, alg, compopts=None):
543 def setcompression(self, alg, compopts=None):
536 """setup core part compression to <alg>"""
544 """setup core part compression to <alg>"""
537 if alg in (None, 'UN'):
545 if alg in (None, 'UN'):
538 return
546 return
539 assert not any(n.lower() == 'compression' for n, v in self._params)
547 assert not any(n.lower() == 'compression' for n, v in self._params)
540 self.addparam('Compression', alg)
548 self.addparam('Compression', alg)
541 self._compengine = util.compengines.forbundletype(alg)
549 self._compengine = util.compengines.forbundletype(alg)
542 self._compopts = compopts
550 self._compopts = compopts
543
551
544 @property
552 @property
545 def nbparts(self):
553 def nbparts(self):
546 """total number of parts added to the bundler"""
554 """total number of parts added to the bundler"""
547 return len(self._parts)
555 return len(self._parts)
548
556
549 # methods used to defines the bundle2 content
557 # methods used to defines the bundle2 content
550 def addparam(self, name, value=None):
558 def addparam(self, name, value=None):
551 """add a stream level parameter"""
559 """add a stream level parameter"""
552 if not name:
560 if not name:
553 raise ValueError('empty parameter name')
561 raise ValueError('empty parameter name')
554 if name[0] not in string.letters:
562 if name[0] not in string.letters:
555 raise ValueError('non letter first character: %r' % name)
563 raise ValueError('non letter first character: %r' % name)
556 self._params.append((name, value))
564 self._params.append((name, value))
557
565
558 def addpart(self, part):
566 def addpart(self, part):
559 """add a new part to the bundle2 container
567 """add a new part to the bundle2 container
560
568
561 Parts contains the actual applicative payload."""
569 Parts contains the actual applicative payload."""
562 assert part.id is None
570 assert part.id is None
563 part.id = len(self._parts) # very cheap counter
571 part.id = len(self._parts) # very cheap counter
564 self._parts.append(part)
572 self._parts.append(part)
565
573
566 def newpart(self, typeid, *args, **kwargs):
574 def newpart(self, typeid, *args, **kwargs):
567 """create a new part and add it to the containers
575 """create a new part and add it to the containers
568
576
569 As the part is directly added to the containers. For now, this means
577 As the part is directly added to the containers. For now, this means
570 that any failure to properly initialize the part after calling
578 that any failure to properly initialize the part after calling
571 ``newpart`` should result in a failure of the whole bundling process.
579 ``newpart`` should result in a failure of the whole bundling process.
572
580
573 You can still fall back to manually create and add if you need better
581 You can still fall back to manually create and add if you need better
574 control."""
582 control."""
575 part = bundlepart(typeid, *args, **kwargs)
583 part = bundlepart(typeid, *args, **kwargs)
576 self.addpart(part)
584 self.addpart(part)
577 return part
585 return part
578
586
579 # methods used to generate the bundle2 stream
587 # methods used to generate the bundle2 stream
580 def getchunks(self):
588 def getchunks(self):
581 if self.ui.debugflag:
589 if self.ui.debugflag:
582 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
590 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
583 if self._params:
591 if self._params:
584 msg.append(' (%i params)' % len(self._params))
592 msg.append(' (%i params)' % len(self._params))
585 msg.append(' %i parts total\n' % len(self._parts))
593 msg.append(' %i parts total\n' % len(self._parts))
586 self.ui.debug(''.join(msg))
594 self.ui.debug(''.join(msg))
587 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
595 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
588 yield self._magicstring
596 yield self._magicstring
589 param = self._paramchunk()
597 param = self._paramchunk()
590 outdebug(self.ui, 'bundle parameter: %s' % param)
598 outdebug(self.ui, 'bundle parameter: %s' % param)
591 yield _pack(_fstreamparamsize, len(param))
599 yield _pack(_fstreamparamsize, len(param))
592 if param:
600 if param:
593 yield param
601 yield param
594 for chunk in self._compengine.compressstream(self._getcorechunk(),
602 for chunk in self._compengine.compressstream(self._getcorechunk(),
595 self._compopts):
603 self._compopts):
596 yield chunk
604 yield chunk
597
605
598 def _paramchunk(self):
606 def _paramchunk(self):
599 """return a encoded version of all stream parameters"""
607 """return a encoded version of all stream parameters"""
600 blocks = []
608 blocks = []
601 for par, value in self._params:
609 for par, value in self._params:
602 par = urlreq.quote(par)
610 par = urlreq.quote(par)
603 if value is not None:
611 if value is not None:
604 value = urlreq.quote(value)
612 value = urlreq.quote(value)
605 par = '%s=%s' % (par, value)
613 par = '%s=%s' % (par, value)
606 blocks.append(par)
614 blocks.append(par)
607 return ' '.join(blocks)
615 return ' '.join(blocks)
608
616
609 def _getcorechunk(self):
617 def _getcorechunk(self):
610 """yield chunk for the core part of the bundle
618 """yield chunk for the core part of the bundle
611
619
612 (all but headers and parameters)"""
620 (all but headers and parameters)"""
613 outdebug(self.ui, 'start of parts')
621 outdebug(self.ui, 'start of parts')
614 for part in self._parts:
622 for part in self._parts:
615 outdebug(self.ui, 'bundle part: "%s"' % part.type)
623 outdebug(self.ui, 'bundle part: "%s"' % part.type)
616 for chunk in part.getchunks(ui=self.ui):
624 for chunk in part.getchunks(ui=self.ui):
617 yield chunk
625 yield chunk
618 outdebug(self.ui, 'end of bundle')
626 outdebug(self.ui, 'end of bundle')
619 yield _pack(_fpartheadersize, 0)
627 yield _pack(_fpartheadersize, 0)
620
628
621
629
622 def salvageoutput(self):
630 def salvageoutput(self):
623 """return a list with a copy of all output parts in the bundle
631 """return a list with a copy of all output parts in the bundle
624
632
625 This is meant to be used during error handling to make sure we preserve
633 This is meant to be used during error handling to make sure we preserve
626 server output"""
634 server output"""
627 salvaged = []
635 salvaged = []
628 for part in self._parts:
636 for part in self._parts:
629 if part.type.startswith('output'):
637 if part.type.startswith('output'):
630 salvaged.append(part.copy())
638 salvaged.append(part.copy())
631 return salvaged
639 return salvaged
632
640
633
641
634 class unpackermixin(object):
642 class unpackermixin(object):
635 """A mixin to extract bytes and struct data from a stream"""
643 """A mixin to extract bytes and struct data from a stream"""
636
644
637 def __init__(self, fp):
645 def __init__(self, fp):
638 self._fp = fp
646 self._fp = fp
639
647
640 def _unpack(self, format):
648 def _unpack(self, format):
641 """unpack this struct format from the stream
649 """unpack this struct format from the stream
642
650
643 This method is meant for internal usage by the bundle2 protocol only.
651 This method is meant for internal usage by the bundle2 protocol only.
644 They directly manipulate the low level stream including bundle2 level
652 They directly manipulate the low level stream including bundle2 level
645 instruction.
653 instruction.
646
654
647 Do not use it to implement higher-level logic or methods."""
655 Do not use it to implement higher-level logic or methods."""
648 data = self._readexact(struct.calcsize(format))
656 data = self._readexact(struct.calcsize(format))
649 return _unpack(format, data)
657 return _unpack(format, data)
650
658
651 def _readexact(self, size):
659 def _readexact(self, size):
652 """read exactly <size> bytes from the stream
660 """read exactly <size> bytes from the stream
653
661
654 This method is meant for internal usage by the bundle2 protocol only.
662 This method is meant for internal usage by the bundle2 protocol only.
655 They directly manipulate the low level stream including bundle2 level
663 They directly manipulate the low level stream including bundle2 level
656 instruction.
664 instruction.
657
665
658 Do not use it to implement higher-level logic or methods."""
666 Do not use it to implement higher-level logic or methods."""
659 return changegroup.readexactly(self._fp, size)
667 return changegroup.readexactly(self._fp, size)
660
668
661 def getunbundler(ui, fp, magicstring=None):
669 def getunbundler(ui, fp, magicstring=None):
662 """return a valid unbundler object for a given magicstring"""
670 """return a valid unbundler object for a given magicstring"""
663 if magicstring is None:
671 if magicstring is None:
664 magicstring = changegroup.readexactly(fp, 4)
672 magicstring = changegroup.readexactly(fp, 4)
665 magic, version = magicstring[0:2], magicstring[2:4]
673 magic, version = magicstring[0:2], magicstring[2:4]
666 if magic != 'HG':
674 if magic != 'HG':
667 raise error.Abort(_('not a Mercurial bundle'))
675 raise error.Abort(_('not a Mercurial bundle'))
668 unbundlerclass = formatmap.get(version)
676 unbundlerclass = formatmap.get(version)
669 if unbundlerclass is None:
677 if unbundlerclass is None:
670 raise error.Abort(_('unknown bundle version %s') % version)
678 raise error.Abort(_('unknown bundle version %s') % version)
671 unbundler = unbundlerclass(ui, fp)
679 unbundler = unbundlerclass(ui, fp)
672 indebug(ui, 'start processing of %s stream' % magicstring)
680 indebug(ui, 'start processing of %s stream' % magicstring)
673 return unbundler
681 return unbundler
674
682
675 class unbundle20(unpackermixin):
683 class unbundle20(unpackermixin):
676 """interpret a bundle2 stream
684 """interpret a bundle2 stream
677
685
678 This class is fed with a binary stream and yields parts through its
686 This class is fed with a binary stream and yields parts through its
679 `iterparts` methods."""
687 `iterparts` methods."""
680
688
681 _magicstring = 'HG20'
689 _magicstring = 'HG20'
682
690
683 def __init__(self, ui, fp):
691 def __init__(self, ui, fp):
684 """If header is specified, we do not read it out of the stream."""
692 """If header is specified, we do not read it out of the stream."""
685 self.ui = ui
693 self.ui = ui
686 self._compengine = util.compengines.forbundletype('UN')
694 self._compengine = util.compengines.forbundletype('UN')
687 self._compressed = None
695 self._compressed = None
688 super(unbundle20, self).__init__(fp)
696 super(unbundle20, self).__init__(fp)
689
697
690 @util.propertycache
698 @util.propertycache
691 def params(self):
699 def params(self):
692 """dictionary of stream level parameters"""
700 """dictionary of stream level parameters"""
693 indebug(self.ui, 'reading bundle2 stream parameters')
701 indebug(self.ui, 'reading bundle2 stream parameters')
694 params = {}
702 params = {}
695 paramssize = self._unpack(_fstreamparamsize)[0]
703 paramssize = self._unpack(_fstreamparamsize)[0]
696 if paramssize < 0:
704 if paramssize < 0:
697 raise error.BundleValueError('negative bundle param size: %i'
705 raise error.BundleValueError('negative bundle param size: %i'
698 % paramssize)
706 % paramssize)
699 if paramssize:
707 if paramssize:
700 params = self._readexact(paramssize)
708 params = self._readexact(paramssize)
701 params = self._processallparams(params)
709 params = self._processallparams(params)
702 return params
710 return params
703
711
704 def _processallparams(self, paramsblock):
712 def _processallparams(self, paramsblock):
705 """"""
713 """"""
706 params = util.sortdict()
714 params = util.sortdict()
707 for p in paramsblock.split(' '):
715 for p in paramsblock.split(' '):
708 p = p.split('=', 1)
716 p = p.split('=', 1)
709 p = [urlreq.unquote(i) for i in p]
717 p = [urlreq.unquote(i) for i in p]
710 if len(p) < 2:
718 if len(p) < 2:
711 p.append(None)
719 p.append(None)
712 self._processparam(*p)
720 self._processparam(*p)
713 params[p[0]] = p[1]
721 params[p[0]] = p[1]
714 return params
722 return params
715
723
716
724
717 def _processparam(self, name, value):
725 def _processparam(self, name, value):
718 """process a parameter, applying its effect if needed
726 """process a parameter, applying its effect if needed
719
727
720 Parameter starting with a lower case letter are advisory and will be
728 Parameter starting with a lower case letter are advisory and will be
721 ignored when unknown. Those starting with an upper case letter are
729 ignored when unknown. Those starting with an upper case letter are
722 mandatory and will this function will raise a KeyError when unknown.
730 mandatory and will this function will raise a KeyError when unknown.
723
731
724 Note: no option are currently supported. Any input will be either
732 Note: no option are currently supported. Any input will be either
725 ignored or failing.
733 ignored or failing.
726 """
734 """
727 if not name:
735 if not name:
728 raise ValueError('empty parameter name')
736 raise ValueError('empty parameter name')
729 if name[0] not in string.letters:
737 if name[0] not in string.letters:
730 raise ValueError('non letter first character: %r' % name)
738 raise ValueError('non letter first character: %r' % name)
731 try:
739 try:
732 handler = b2streamparamsmap[name.lower()]
740 handler = b2streamparamsmap[name.lower()]
733 except KeyError:
741 except KeyError:
734 if name[0].islower():
742 if name[0].islower():
735 indebug(self.ui, "ignoring unknown parameter %r" % name)
743 indebug(self.ui, "ignoring unknown parameter %r" % name)
736 else:
744 else:
737 raise error.BundleUnknownFeatureError(params=(name,))
745 raise error.BundleUnknownFeatureError(params=(name,))
738 else:
746 else:
739 handler(self, name, value)
747 handler(self, name, value)
740
748
741 def _forwardchunks(self):
749 def _forwardchunks(self):
742 """utility to transfer a bundle2 as binary
750 """utility to transfer a bundle2 as binary
743
751
744 This is made necessary by the fact the 'getbundle' command over 'ssh'
752 This is made necessary by the fact the 'getbundle' command over 'ssh'
745 have no way to know then the reply end, relying on the bundle to be
753 have no way to know then the reply end, relying on the bundle to be
746 interpreted to know its end. This is terrible and we are sorry, but we
754 interpreted to know its end. This is terrible and we are sorry, but we
747 needed to move forward to get general delta enabled.
755 needed to move forward to get general delta enabled.
748 """
756 """
749 yield self._magicstring
757 yield self._magicstring
750 assert 'params' not in vars(self)
758 assert 'params' not in vars(self)
751 paramssize = self._unpack(_fstreamparamsize)[0]
759 paramssize = self._unpack(_fstreamparamsize)[0]
752 if paramssize < 0:
760 if paramssize < 0:
753 raise error.BundleValueError('negative bundle param size: %i'
761 raise error.BundleValueError('negative bundle param size: %i'
754 % paramssize)
762 % paramssize)
755 yield _pack(_fstreamparamsize, paramssize)
763 yield _pack(_fstreamparamsize, paramssize)
756 if paramssize:
764 if paramssize:
757 params = self._readexact(paramssize)
765 params = self._readexact(paramssize)
758 self._processallparams(params)
766 self._processallparams(params)
759 yield params
767 yield params
760 assert self._compengine.bundletype == 'UN'
768 assert self._compengine.bundletype == 'UN'
761 # From there, payload might need to be decompressed
769 # From there, payload might need to be decompressed
762 self._fp = self._compengine.decompressorreader(self._fp)
770 self._fp = self._compengine.decompressorreader(self._fp)
763 emptycount = 0
771 emptycount = 0
764 while emptycount < 2:
772 while emptycount < 2:
765 # so we can brainlessly loop
773 # so we can brainlessly loop
766 assert _fpartheadersize == _fpayloadsize
774 assert _fpartheadersize == _fpayloadsize
767 size = self._unpack(_fpartheadersize)[0]
775 size = self._unpack(_fpartheadersize)[0]
768 yield _pack(_fpartheadersize, size)
776 yield _pack(_fpartheadersize, size)
769 if size:
777 if size:
770 emptycount = 0
778 emptycount = 0
771 else:
779 else:
772 emptycount += 1
780 emptycount += 1
773 continue
781 continue
774 if size == flaginterrupt:
782 if size == flaginterrupt:
775 continue
783 continue
776 elif size < 0:
784 elif size < 0:
777 raise error.BundleValueError('negative chunk size: %i')
785 raise error.BundleValueError('negative chunk size: %i')
778 yield self._readexact(size)
786 yield self._readexact(size)
779
787
780
788
781 def iterparts(self):
789 def iterparts(self):
782 """yield all parts contained in the stream"""
790 """yield all parts contained in the stream"""
783 # make sure param have been loaded
791 # make sure param have been loaded
784 self.params
792 self.params
785 # From there, payload need to be decompressed
793 # From there, payload need to be decompressed
786 self._fp = self._compengine.decompressorreader(self._fp)
794 self._fp = self._compengine.decompressorreader(self._fp)
787 indebug(self.ui, 'start extraction of bundle2 parts')
795 indebug(self.ui, 'start extraction of bundle2 parts')
788 headerblock = self._readpartheader()
796 headerblock = self._readpartheader()
789 while headerblock is not None:
797 while headerblock is not None:
790 part = unbundlepart(self.ui, headerblock, self._fp)
798 part = unbundlepart(self.ui, headerblock, self._fp)
791 yield part
799 yield part
792 part.seek(0, 2)
800 part.seek(0, 2)
793 headerblock = self._readpartheader()
801 headerblock = self._readpartheader()
794 indebug(self.ui, 'end of bundle2 stream')
802 indebug(self.ui, 'end of bundle2 stream')
795
803
796 def _readpartheader(self):
804 def _readpartheader(self):
797 """reads a part header size and return the bytes blob
805 """reads a part header size and return the bytes blob
798
806
799 returns None if empty"""
807 returns None if empty"""
800 headersize = self._unpack(_fpartheadersize)[0]
808 headersize = self._unpack(_fpartheadersize)[0]
801 if headersize < 0:
809 if headersize < 0:
802 raise error.BundleValueError('negative part header size: %i'
810 raise error.BundleValueError('negative part header size: %i'
803 % headersize)
811 % headersize)
804 indebug(self.ui, 'part header size: %i' % headersize)
812 indebug(self.ui, 'part header size: %i' % headersize)
805 if headersize:
813 if headersize:
806 return self._readexact(headersize)
814 return self._readexact(headersize)
807 return None
815 return None
808
816
809 def compressed(self):
817 def compressed(self):
810 self.params # load params
818 self.params # load params
811 return self._compressed
819 return self._compressed
812
820
813 def close(self):
821 def close(self):
814 """close underlying file"""
822 """close underlying file"""
815 if util.safehasattr(self._fp, 'close'):
823 if util.safehasattr(self._fp, 'close'):
816 return self._fp.close()
824 return self._fp.close()
817
825
818 formatmap = {'20': unbundle20}
826 formatmap = {'20': unbundle20}
819
827
820 b2streamparamsmap = {}
828 b2streamparamsmap = {}
821
829
822 def b2streamparamhandler(name):
830 def b2streamparamhandler(name):
823 """register a handler for a stream level parameter"""
831 """register a handler for a stream level parameter"""
824 def decorator(func):
832 def decorator(func):
825 assert name not in formatmap
833 assert name not in formatmap
826 b2streamparamsmap[name] = func
834 b2streamparamsmap[name] = func
827 return func
835 return func
828 return decorator
836 return decorator
829
837
830 @b2streamparamhandler('compression')
838 @b2streamparamhandler('compression')
831 def processcompression(unbundler, param, value):
839 def processcompression(unbundler, param, value):
832 """read compression parameter and install payload decompression"""
840 """read compression parameter and install payload decompression"""
833 if value not in util.compengines.supportedbundletypes:
841 if value not in util.compengines.supportedbundletypes:
834 raise error.BundleUnknownFeatureError(params=(param,),
842 raise error.BundleUnknownFeatureError(params=(param,),
835 values=(value,))
843 values=(value,))
836 unbundler._compengine = util.compengines.forbundletype(value)
844 unbundler._compengine = util.compengines.forbundletype(value)
837 if value is not None:
845 if value is not None:
838 unbundler._compressed = True
846 unbundler._compressed = True
839
847
840 class bundlepart(object):
848 class bundlepart(object):
841 """A bundle2 part contains application level payload
849 """A bundle2 part contains application level payload
842
850
843 The part `type` is used to route the part to the application level
851 The part `type` is used to route the part to the application level
844 handler.
852 handler.
845
853
846 The part payload is contained in ``part.data``. It could be raw bytes or a
854 The part payload is contained in ``part.data``. It could be raw bytes or a
847 generator of byte chunks.
855 generator of byte chunks.
848
856
849 You can add parameters to the part using the ``addparam`` method.
857 You can add parameters to the part using the ``addparam`` method.
850 Parameters can be either mandatory (default) or advisory. Remote side
858 Parameters can be either mandatory (default) or advisory. Remote side
851 should be able to safely ignore the advisory ones.
859 should be able to safely ignore the advisory ones.
852
860
853 Both data and parameters cannot be modified after the generation has begun.
861 Both data and parameters cannot be modified after the generation has begun.
854 """
862 """
855
863
856 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
864 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
857 data='', mandatory=True):
865 data='', mandatory=True):
858 validateparttype(parttype)
866 validateparttype(parttype)
859 self.id = None
867 self.id = None
860 self.type = parttype
868 self.type = parttype
861 self._data = data
869 self._data = data
862 self._mandatoryparams = list(mandatoryparams)
870 self._mandatoryparams = list(mandatoryparams)
863 self._advisoryparams = list(advisoryparams)
871 self._advisoryparams = list(advisoryparams)
864 # checking for duplicated entries
872 # checking for duplicated entries
865 self._seenparams = set()
873 self._seenparams = set()
866 for pname, __ in self._mandatoryparams + self._advisoryparams:
874 for pname, __ in self._mandatoryparams + self._advisoryparams:
867 if pname in self._seenparams:
875 if pname in self._seenparams:
868 raise error.ProgrammingError('duplicated params: %s' % pname)
876 raise error.ProgrammingError('duplicated params: %s' % pname)
869 self._seenparams.add(pname)
877 self._seenparams.add(pname)
870 # status of the part's generation:
878 # status of the part's generation:
871 # - None: not started,
879 # - None: not started,
872 # - False: currently generated,
880 # - False: currently generated,
873 # - True: generation done.
881 # - True: generation done.
874 self._generated = None
882 self._generated = None
875 self.mandatory = mandatory
883 self.mandatory = mandatory
876
884
877 def __repr__(self):
885 def __repr__(self):
878 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
886 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
879 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
887 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
880 % (cls, id(self), self.id, self.type, self.mandatory))
888 % (cls, id(self), self.id, self.type, self.mandatory))
881
889
882 def copy(self):
890 def copy(self):
883 """return a copy of the part
891 """return a copy of the part
884
892
885 The new part have the very same content but no partid assigned yet.
893 The new part have the very same content but no partid assigned yet.
886 Parts with generated data cannot be copied."""
894 Parts with generated data cannot be copied."""
887 assert not util.safehasattr(self.data, 'next')
895 assert not util.safehasattr(self.data, 'next')
888 return self.__class__(self.type, self._mandatoryparams,
896 return self.__class__(self.type, self._mandatoryparams,
889 self._advisoryparams, self._data, self.mandatory)
897 self._advisoryparams, self._data, self.mandatory)
890
898
891 # methods used to defines the part content
899 # methods used to defines the part content
892 @property
900 @property
893 def data(self):
901 def data(self):
894 return self._data
902 return self._data
895
903
896 @data.setter
904 @data.setter
897 def data(self, data):
905 def data(self, data):
898 if self._generated is not None:
906 if self._generated is not None:
899 raise error.ReadOnlyPartError('part is being generated')
907 raise error.ReadOnlyPartError('part is being generated')
900 self._data = data
908 self._data = data
901
909
902 @property
910 @property
903 def mandatoryparams(self):
911 def mandatoryparams(self):
904 # make it an immutable tuple to force people through ``addparam``
912 # make it an immutable tuple to force people through ``addparam``
905 return tuple(self._mandatoryparams)
913 return tuple(self._mandatoryparams)
906
914
907 @property
915 @property
908 def advisoryparams(self):
916 def advisoryparams(self):
909 # make it an immutable tuple to force people through ``addparam``
917 # make it an immutable tuple to force people through ``addparam``
910 return tuple(self._advisoryparams)
918 return tuple(self._advisoryparams)
911
919
912 def addparam(self, name, value='', mandatory=True):
920 def addparam(self, name, value='', mandatory=True):
913 """add a parameter to the part
921 """add a parameter to the part
914
922
915 If 'mandatory' is set to True, the remote handler must claim support
923 If 'mandatory' is set to True, the remote handler must claim support
916 for this parameter or the unbundling will be aborted.
924 for this parameter or the unbundling will be aborted.
917
925
918 The 'name' and 'value' cannot exceed 255 bytes each.
926 The 'name' and 'value' cannot exceed 255 bytes each.
919 """
927 """
920 if self._generated is not None:
928 if self._generated is not None:
921 raise error.ReadOnlyPartError('part is being generated')
929 raise error.ReadOnlyPartError('part is being generated')
922 if name in self._seenparams:
930 if name in self._seenparams:
923 raise ValueError('duplicated params: %s' % name)
931 raise ValueError('duplicated params: %s' % name)
924 self._seenparams.add(name)
932 self._seenparams.add(name)
925 params = self._advisoryparams
933 params = self._advisoryparams
926 if mandatory:
934 if mandatory:
927 params = self._mandatoryparams
935 params = self._mandatoryparams
928 params.append((name, value))
936 params.append((name, value))
929
937
930 # methods used to generates the bundle2 stream
938 # methods used to generates the bundle2 stream
931 def getchunks(self, ui):
939 def getchunks(self, ui):
932 if self._generated is not None:
940 if self._generated is not None:
933 raise error.ProgrammingError('part can only be consumed once')
941 raise error.ProgrammingError('part can only be consumed once')
934 self._generated = False
942 self._generated = False
935
943
936 if ui.debugflag:
944 if ui.debugflag:
937 msg = ['bundle2-output-part: "%s"' % self.type]
945 msg = ['bundle2-output-part: "%s"' % self.type]
938 if not self.mandatory:
946 if not self.mandatory:
939 msg.append(' (advisory)')
947 msg.append(' (advisory)')
940 nbmp = len(self.mandatoryparams)
948 nbmp = len(self.mandatoryparams)
941 nbap = len(self.advisoryparams)
949 nbap = len(self.advisoryparams)
942 if nbmp or nbap:
950 if nbmp or nbap:
943 msg.append(' (params:')
951 msg.append(' (params:')
944 if nbmp:
952 if nbmp:
945 msg.append(' %i mandatory' % nbmp)
953 msg.append(' %i mandatory' % nbmp)
946 if nbap:
954 if nbap:
947 msg.append(' %i advisory' % nbmp)
955 msg.append(' %i advisory' % nbmp)
948 msg.append(')')
956 msg.append(')')
949 if not self.data:
957 if not self.data:
950 msg.append(' empty payload')
958 msg.append(' empty payload')
951 elif util.safehasattr(self.data, 'next'):
959 elif util.safehasattr(self.data, 'next'):
952 msg.append(' streamed payload')
960 msg.append(' streamed payload')
953 else:
961 else:
954 msg.append(' %i bytes payload' % len(self.data))
962 msg.append(' %i bytes payload' % len(self.data))
955 msg.append('\n')
963 msg.append('\n')
956 ui.debug(''.join(msg))
964 ui.debug(''.join(msg))
957
965
958 #### header
966 #### header
959 if self.mandatory:
967 if self.mandatory:
960 parttype = self.type.upper()
968 parttype = self.type.upper()
961 else:
969 else:
962 parttype = self.type.lower()
970 parttype = self.type.lower()
963 outdebug(ui, 'part %s: "%s"' % (self.id, parttype))
971 outdebug(ui, 'part %s: "%s"' % (self.id, parttype))
964 ## parttype
972 ## parttype
965 header = [_pack(_fparttypesize, len(parttype)),
973 header = [_pack(_fparttypesize, len(parttype)),
966 parttype, _pack(_fpartid, self.id),
974 parttype, _pack(_fpartid, self.id),
967 ]
975 ]
968 ## parameters
976 ## parameters
969 # count
977 # count
970 manpar = self.mandatoryparams
978 manpar = self.mandatoryparams
971 advpar = self.advisoryparams
979 advpar = self.advisoryparams
972 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
980 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
973 # size
981 # size
974 parsizes = []
982 parsizes = []
975 for key, value in manpar:
983 for key, value in manpar:
976 parsizes.append(len(key))
984 parsizes.append(len(key))
977 parsizes.append(len(value))
985 parsizes.append(len(value))
978 for key, value in advpar:
986 for key, value in advpar:
979 parsizes.append(len(key))
987 parsizes.append(len(key))
980 parsizes.append(len(value))
988 parsizes.append(len(value))
981 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
989 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
982 header.append(paramsizes)
990 header.append(paramsizes)
983 # key, value
991 # key, value
984 for key, value in manpar:
992 for key, value in manpar:
985 header.append(key)
993 header.append(key)
986 header.append(value)
994 header.append(value)
987 for key, value in advpar:
995 for key, value in advpar:
988 header.append(key)
996 header.append(key)
989 header.append(value)
997 header.append(value)
990 ## finalize header
998 ## finalize header
991 headerchunk = ''.join(header)
999 headerchunk = ''.join(header)
992 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
1000 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
993 yield _pack(_fpartheadersize, len(headerchunk))
1001 yield _pack(_fpartheadersize, len(headerchunk))
994 yield headerchunk
1002 yield headerchunk
995 ## payload
1003 ## payload
996 try:
1004 try:
997 for chunk in self._payloadchunks():
1005 for chunk in self._payloadchunks():
998 outdebug(ui, 'payload chunk size: %i' % len(chunk))
1006 outdebug(ui, 'payload chunk size: %i' % len(chunk))
999 yield _pack(_fpayloadsize, len(chunk))
1007 yield _pack(_fpayloadsize, len(chunk))
1000 yield chunk
1008 yield chunk
1001 except GeneratorExit:
1009 except GeneratorExit:
1002 # GeneratorExit means that nobody is listening for our
1010 # GeneratorExit means that nobody is listening for our
1003 # results anyway, so just bail quickly rather than trying
1011 # results anyway, so just bail quickly rather than trying
1004 # to produce an error part.
1012 # to produce an error part.
1005 ui.debug('bundle2-generatorexit\n')
1013 ui.debug('bundle2-generatorexit\n')
1006 raise
1014 raise
1007 except BaseException as exc:
1015 except BaseException as exc:
1008 # backup exception data for later
1016 # backup exception data for later
1009 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
1017 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
1010 % exc)
1018 % exc)
1011 tb = sys.exc_info()[2]
1019 tb = sys.exc_info()[2]
1012 msg = 'unexpected error: %s' % exc
1020 msg = 'unexpected error: %s' % exc
1013 interpart = bundlepart('error:abort', [('message', msg)],
1021 interpart = bundlepart('error:abort', [('message', msg)],
1014 mandatory=False)
1022 mandatory=False)
1015 interpart.id = 0
1023 interpart.id = 0
1016 yield _pack(_fpayloadsize, -1)
1024 yield _pack(_fpayloadsize, -1)
1017 for chunk in interpart.getchunks(ui=ui):
1025 for chunk in interpart.getchunks(ui=ui):
1018 yield chunk
1026 yield chunk
1019 outdebug(ui, 'closing payload chunk')
1027 outdebug(ui, 'closing payload chunk')
1020 # abort current part payload
1028 # abort current part payload
1021 yield _pack(_fpayloadsize, 0)
1029 yield _pack(_fpayloadsize, 0)
1022 pycompat.raisewithtb(exc, tb)
1030 pycompat.raisewithtb(exc, tb)
1023 # end of payload
1031 # end of payload
1024 outdebug(ui, 'closing payload chunk')
1032 outdebug(ui, 'closing payload chunk')
1025 yield _pack(_fpayloadsize, 0)
1033 yield _pack(_fpayloadsize, 0)
1026 self._generated = True
1034 self._generated = True
1027
1035
1028 def _payloadchunks(self):
1036 def _payloadchunks(self):
1029 """yield chunks of a the part payload
1037 """yield chunks of a the part payload
1030
1038
1031 Exists to handle the different methods to provide data to a part."""
1039 Exists to handle the different methods to provide data to a part."""
1032 # we only support fixed size data now.
1040 # we only support fixed size data now.
1033 # This will be improved in the future.
1041 # This will be improved in the future.
1034 if util.safehasattr(self.data, 'next'):
1042 if util.safehasattr(self.data, 'next'):
1035 buff = util.chunkbuffer(self.data)
1043 buff = util.chunkbuffer(self.data)
1036 chunk = buff.read(preferedchunksize)
1044 chunk = buff.read(preferedchunksize)
1037 while chunk:
1045 while chunk:
1038 yield chunk
1046 yield chunk
1039 chunk = buff.read(preferedchunksize)
1047 chunk = buff.read(preferedchunksize)
1040 elif len(self.data):
1048 elif len(self.data):
1041 yield self.data
1049 yield self.data
1042
1050
1043
1051
1044 flaginterrupt = -1
1052 flaginterrupt = -1
1045
1053
1046 class interrupthandler(unpackermixin):
1054 class interrupthandler(unpackermixin):
1047 """read one part and process it with restricted capability
1055 """read one part and process it with restricted capability
1048
1056
1049 This allows to transmit exception raised on the producer size during part
1057 This allows to transmit exception raised on the producer size during part
1050 iteration while the consumer is reading a part.
1058 iteration while the consumer is reading a part.
1051
1059
1052 Part processed in this manner only have access to a ui object,"""
1060 Part processed in this manner only have access to a ui object,"""
1053
1061
1054 def __init__(self, ui, fp):
1062 def __init__(self, ui, fp):
1055 super(interrupthandler, self).__init__(fp)
1063 super(interrupthandler, self).__init__(fp)
1056 self.ui = ui
1064 self.ui = ui
1057
1065
1058 def _readpartheader(self):
1066 def _readpartheader(self):
1059 """reads a part header size and return the bytes blob
1067 """reads a part header size and return the bytes blob
1060
1068
1061 returns None if empty"""
1069 returns None if empty"""
1062 headersize = self._unpack(_fpartheadersize)[0]
1070 headersize = self._unpack(_fpartheadersize)[0]
1063 if headersize < 0:
1071 if headersize < 0:
1064 raise error.BundleValueError('negative part header size: %i'
1072 raise error.BundleValueError('negative part header size: %i'
1065 % headersize)
1073 % headersize)
1066 indebug(self.ui, 'part header size: %i\n' % headersize)
1074 indebug(self.ui, 'part header size: %i\n' % headersize)
1067 if headersize:
1075 if headersize:
1068 return self._readexact(headersize)
1076 return self._readexact(headersize)
1069 return None
1077 return None
1070
1078
1071 def __call__(self):
1079 def __call__(self):
1072
1080
1073 self.ui.debug('bundle2-input-stream-interrupt:'
1081 self.ui.debug('bundle2-input-stream-interrupt:'
1074 ' opening out of band context\n')
1082 ' opening out of band context\n')
1075 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1083 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1076 headerblock = self._readpartheader()
1084 headerblock = self._readpartheader()
1077 if headerblock is None:
1085 if headerblock is None:
1078 indebug(self.ui, 'no part found during interruption.')
1086 indebug(self.ui, 'no part found during interruption.')
1079 return
1087 return
1080 part = unbundlepart(self.ui, headerblock, self._fp)
1088 part = unbundlepart(self.ui, headerblock, self._fp)
1081 op = interruptoperation(self.ui)
1089 op = interruptoperation(self.ui)
1082 _processpart(op, part)
1090 _processpart(op, part)
1083 self.ui.debug('bundle2-input-stream-interrupt:'
1091 self.ui.debug('bundle2-input-stream-interrupt:'
1084 ' closing out of band context\n')
1092 ' closing out of band context\n')
1085
1093
1086 class interruptoperation(object):
1094 class interruptoperation(object):
1087 """A limited operation to be use by part handler during interruption
1095 """A limited operation to be use by part handler during interruption
1088
1096
1089 It only have access to an ui object.
1097 It only have access to an ui object.
1090 """
1098 """
1091
1099
1092 def __init__(self, ui):
1100 def __init__(self, ui):
1093 self.ui = ui
1101 self.ui = ui
1094 self.reply = None
1102 self.reply = None
1095 self.captureoutput = False
1103 self.captureoutput = False
1096
1104
1097 @property
1105 @property
1098 def repo(self):
1106 def repo(self):
1099 raise error.ProgrammingError('no repo access from stream interruption')
1107 raise error.ProgrammingError('no repo access from stream interruption')
1100
1108
1101 def gettransaction(self):
1109 def gettransaction(self):
1102 raise TransactionUnavailable('no repo access from stream interruption')
1110 raise TransactionUnavailable('no repo access from stream interruption')
1103
1111
1104 class unbundlepart(unpackermixin):
1112 class unbundlepart(unpackermixin):
1105 """a bundle part read from a bundle"""
1113 """a bundle part read from a bundle"""
1106
1114
1107 def __init__(self, ui, header, fp):
1115 def __init__(self, ui, header, fp):
1108 super(unbundlepart, self).__init__(fp)
1116 super(unbundlepart, self).__init__(fp)
1109 self._seekable = (util.safehasattr(fp, 'seek') and
1117 self._seekable = (util.safehasattr(fp, 'seek') and
1110 util.safehasattr(fp, 'tell'))
1118 util.safehasattr(fp, 'tell'))
1111 self.ui = ui
1119 self.ui = ui
1112 # unbundle state attr
1120 # unbundle state attr
1113 self._headerdata = header
1121 self._headerdata = header
1114 self._headeroffset = 0
1122 self._headeroffset = 0
1115 self._initialized = False
1123 self._initialized = False
1116 self.consumed = False
1124 self.consumed = False
1117 # part data
1125 # part data
1118 self.id = None
1126 self.id = None
1119 self.type = None
1127 self.type = None
1120 self.mandatoryparams = None
1128 self.mandatoryparams = None
1121 self.advisoryparams = None
1129 self.advisoryparams = None
1122 self.params = None
1130 self.params = None
1123 self.mandatorykeys = ()
1131 self.mandatorykeys = ()
1124 self._payloadstream = None
1132 self._payloadstream = None
1125 self._readheader()
1133 self._readheader()
1126 self._mandatory = None
1134 self._mandatory = None
1127 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1135 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1128 self._pos = 0
1136 self._pos = 0
1129
1137
1130 def _fromheader(self, size):
1138 def _fromheader(self, size):
1131 """return the next <size> byte from the header"""
1139 """return the next <size> byte from the header"""
1132 offset = self._headeroffset
1140 offset = self._headeroffset
1133 data = self._headerdata[offset:(offset + size)]
1141 data = self._headerdata[offset:(offset + size)]
1134 self._headeroffset = offset + size
1142 self._headeroffset = offset + size
1135 return data
1143 return data
1136
1144
1137 def _unpackheader(self, format):
1145 def _unpackheader(self, format):
1138 """read given format from header
1146 """read given format from header
1139
1147
1140 This automatically compute the size of the format to read."""
1148 This automatically compute the size of the format to read."""
1141 data = self._fromheader(struct.calcsize(format))
1149 data = self._fromheader(struct.calcsize(format))
1142 return _unpack(format, data)
1150 return _unpack(format, data)
1143
1151
1144 def _initparams(self, mandatoryparams, advisoryparams):
1152 def _initparams(self, mandatoryparams, advisoryparams):
1145 """internal function to setup all logic related parameters"""
1153 """internal function to setup all logic related parameters"""
1146 # make it read only to prevent people touching it by mistake.
1154 # make it read only to prevent people touching it by mistake.
1147 self.mandatoryparams = tuple(mandatoryparams)
1155 self.mandatoryparams = tuple(mandatoryparams)
1148 self.advisoryparams = tuple(advisoryparams)
1156 self.advisoryparams = tuple(advisoryparams)
1149 # user friendly UI
1157 # user friendly UI
1150 self.params = util.sortdict(self.mandatoryparams)
1158 self.params = util.sortdict(self.mandatoryparams)
1151 self.params.update(self.advisoryparams)
1159 self.params.update(self.advisoryparams)
1152 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1160 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1153
1161
1154 def _payloadchunks(self, chunknum=0):
1162 def _payloadchunks(self, chunknum=0):
1155 '''seek to specified chunk and start yielding data'''
1163 '''seek to specified chunk and start yielding data'''
1156 if len(self._chunkindex) == 0:
1164 if len(self._chunkindex) == 0:
1157 assert chunknum == 0, 'Must start with chunk 0'
1165 assert chunknum == 0, 'Must start with chunk 0'
1158 self._chunkindex.append((0, self._tellfp()))
1166 self._chunkindex.append((0, self._tellfp()))
1159 else:
1167 else:
1160 assert chunknum < len(self._chunkindex), \
1168 assert chunknum < len(self._chunkindex), \
1161 'Unknown chunk %d' % chunknum
1169 'Unknown chunk %d' % chunknum
1162 self._seekfp(self._chunkindex[chunknum][1])
1170 self._seekfp(self._chunkindex[chunknum][1])
1163
1171
1164 pos = self._chunkindex[chunknum][0]
1172 pos = self._chunkindex[chunknum][0]
1165 payloadsize = self._unpack(_fpayloadsize)[0]
1173 payloadsize = self._unpack(_fpayloadsize)[0]
1166 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1174 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1167 while payloadsize:
1175 while payloadsize:
1168 if payloadsize == flaginterrupt:
1176 if payloadsize == flaginterrupt:
1169 # interruption detection, the handler will now read a
1177 # interruption detection, the handler will now read a
1170 # single part and process it.
1178 # single part and process it.
1171 interrupthandler(self.ui, self._fp)()
1179 interrupthandler(self.ui, self._fp)()
1172 elif payloadsize < 0:
1180 elif payloadsize < 0:
1173 msg = 'negative payload chunk size: %i' % payloadsize
1181 msg = 'negative payload chunk size: %i' % payloadsize
1174 raise error.BundleValueError(msg)
1182 raise error.BundleValueError(msg)
1175 else:
1183 else:
1176 result = self._readexact(payloadsize)
1184 result = self._readexact(payloadsize)
1177 chunknum += 1
1185 chunknum += 1
1178 pos += payloadsize
1186 pos += payloadsize
1179 if chunknum == len(self._chunkindex):
1187 if chunknum == len(self._chunkindex):
1180 self._chunkindex.append((pos, self._tellfp()))
1188 self._chunkindex.append((pos, self._tellfp()))
1181 yield result
1189 yield result
1182 payloadsize = self._unpack(_fpayloadsize)[0]
1190 payloadsize = self._unpack(_fpayloadsize)[0]
1183 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1191 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1184
1192
1185 def _findchunk(self, pos):
1193 def _findchunk(self, pos):
1186 '''for a given payload position, return a chunk number and offset'''
1194 '''for a given payload position, return a chunk number and offset'''
1187 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1195 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1188 if ppos == pos:
1196 if ppos == pos:
1189 return chunk, 0
1197 return chunk, 0
1190 elif ppos > pos:
1198 elif ppos > pos:
1191 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1199 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1192 raise ValueError('Unknown chunk')
1200 raise ValueError('Unknown chunk')
1193
1201
1194 def _readheader(self):
1202 def _readheader(self):
1195 """read the header and setup the object"""
1203 """read the header and setup the object"""
1196 typesize = self._unpackheader(_fparttypesize)[0]
1204 typesize = self._unpackheader(_fparttypesize)[0]
1197 self.type = self._fromheader(typesize)
1205 self.type = self._fromheader(typesize)
1198 indebug(self.ui, 'part type: "%s"' % self.type)
1206 indebug(self.ui, 'part type: "%s"' % self.type)
1199 self.id = self._unpackheader(_fpartid)[0]
1207 self.id = self._unpackheader(_fpartid)[0]
1200 indebug(self.ui, 'part id: "%s"' % self.id)
1208 indebug(self.ui, 'part id: "%s"' % self.id)
1201 # extract mandatory bit from type
1209 # extract mandatory bit from type
1202 self.mandatory = (self.type != self.type.lower())
1210 self.mandatory = (self.type != self.type.lower())
1203 self.type = self.type.lower()
1211 self.type = self.type.lower()
1204 ## reading parameters
1212 ## reading parameters
1205 # param count
1213 # param count
1206 mancount, advcount = self._unpackheader(_fpartparamcount)
1214 mancount, advcount = self._unpackheader(_fpartparamcount)
1207 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1215 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1208 # param size
1216 # param size
1209 fparamsizes = _makefpartparamsizes(mancount + advcount)
1217 fparamsizes = _makefpartparamsizes(mancount + advcount)
1210 paramsizes = self._unpackheader(fparamsizes)
1218 paramsizes = self._unpackheader(fparamsizes)
1211 # make it a list of couple again
1219 # make it a list of couple again
1212 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
1220 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
1213 # split mandatory from advisory
1221 # split mandatory from advisory
1214 mansizes = paramsizes[:mancount]
1222 mansizes = paramsizes[:mancount]
1215 advsizes = paramsizes[mancount:]
1223 advsizes = paramsizes[mancount:]
1216 # retrieve param value
1224 # retrieve param value
1217 manparams = []
1225 manparams = []
1218 for key, value in mansizes:
1226 for key, value in mansizes:
1219 manparams.append((self._fromheader(key), self._fromheader(value)))
1227 manparams.append((self._fromheader(key), self._fromheader(value)))
1220 advparams = []
1228 advparams = []
1221 for key, value in advsizes:
1229 for key, value in advsizes:
1222 advparams.append((self._fromheader(key), self._fromheader(value)))
1230 advparams.append((self._fromheader(key), self._fromheader(value)))
1223 self._initparams(manparams, advparams)
1231 self._initparams(manparams, advparams)
1224 ## part payload
1232 ## part payload
1225 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1233 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1226 # we read the data, tell it
1234 # we read the data, tell it
1227 self._initialized = True
1235 self._initialized = True
1228
1236
1229 def read(self, size=None):
1237 def read(self, size=None):
1230 """read payload data"""
1238 """read payload data"""
1231 if not self._initialized:
1239 if not self._initialized:
1232 self._readheader()
1240 self._readheader()
1233 if size is None:
1241 if size is None:
1234 data = self._payloadstream.read()
1242 data = self._payloadstream.read()
1235 else:
1243 else:
1236 data = self._payloadstream.read(size)
1244 data = self._payloadstream.read(size)
1237 self._pos += len(data)
1245 self._pos += len(data)
1238 if size is None or len(data) < size:
1246 if size is None or len(data) < size:
1239 if not self.consumed and self._pos:
1247 if not self.consumed and self._pos:
1240 self.ui.debug('bundle2-input-part: total payload size %i\n'
1248 self.ui.debug('bundle2-input-part: total payload size %i\n'
1241 % self._pos)
1249 % self._pos)
1242 self.consumed = True
1250 self.consumed = True
1243 return data
1251 return data
1244
1252
1245 def tell(self):
1253 def tell(self):
1246 return self._pos
1254 return self._pos
1247
1255
1248 def seek(self, offset, whence=0):
1256 def seek(self, offset, whence=0):
1249 if whence == 0:
1257 if whence == 0:
1250 newpos = offset
1258 newpos = offset
1251 elif whence == 1:
1259 elif whence == 1:
1252 newpos = self._pos + offset
1260 newpos = self._pos + offset
1253 elif whence == 2:
1261 elif whence == 2:
1254 if not self.consumed:
1262 if not self.consumed:
1255 self.read()
1263 self.read()
1256 newpos = self._chunkindex[-1][0] - offset
1264 newpos = self._chunkindex[-1][0] - offset
1257 else:
1265 else:
1258 raise ValueError('Unknown whence value: %r' % (whence,))
1266 raise ValueError('Unknown whence value: %r' % (whence,))
1259
1267
1260 if newpos > self._chunkindex[-1][0] and not self.consumed:
1268 if newpos > self._chunkindex[-1][0] and not self.consumed:
1261 self.read()
1269 self.read()
1262 if not 0 <= newpos <= self._chunkindex[-1][0]:
1270 if not 0 <= newpos <= self._chunkindex[-1][0]:
1263 raise ValueError('Offset out of range')
1271 raise ValueError('Offset out of range')
1264
1272
1265 if self._pos != newpos:
1273 if self._pos != newpos:
1266 chunk, internaloffset = self._findchunk(newpos)
1274 chunk, internaloffset = self._findchunk(newpos)
1267 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1275 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1268 adjust = self.read(internaloffset)
1276 adjust = self.read(internaloffset)
1269 if len(adjust) != internaloffset:
1277 if len(adjust) != internaloffset:
1270 raise error.Abort(_('Seek failed\n'))
1278 raise error.Abort(_('Seek failed\n'))
1271 self._pos = newpos
1279 self._pos = newpos
1272
1280
1273 def _seekfp(self, offset, whence=0):
1281 def _seekfp(self, offset, whence=0):
1274 """move the underlying file pointer
1282 """move the underlying file pointer
1275
1283
1276 This method is meant for internal usage by the bundle2 protocol only.
1284 This method is meant for internal usage by the bundle2 protocol only.
1277 They directly manipulate the low level stream including bundle2 level
1285 They directly manipulate the low level stream including bundle2 level
1278 instruction.
1286 instruction.
1279
1287
1280 Do not use it to implement higher-level logic or methods."""
1288 Do not use it to implement higher-level logic or methods."""
1281 if self._seekable:
1289 if self._seekable:
1282 return self._fp.seek(offset, whence)
1290 return self._fp.seek(offset, whence)
1283 else:
1291 else:
1284 raise NotImplementedError(_('File pointer is not seekable'))
1292 raise NotImplementedError(_('File pointer is not seekable'))
1285
1293
1286 def _tellfp(self):
1294 def _tellfp(self):
1287 """return the file offset, or None if file is not seekable
1295 """return the file offset, or None if file is not seekable
1288
1296
1289 This method is meant for internal usage by the bundle2 protocol only.
1297 This method is meant for internal usage by the bundle2 protocol only.
1290 They directly manipulate the low level stream including bundle2 level
1298 They directly manipulate the low level stream including bundle2 level
1291 instruction.
1299 instruction.
1292
1300
1293 Do not use it to implement higher-level logic or methods."""
1301 Do not use it to implement higher-level logic or methods."""
1294 if self._seekable:
1302 if self._seekable:
1295 try:
1303 try:
1296 return self._fp.tell()
1304 return self._fp.tell()
1297 except IOError as e:
1305 except IOError as e:
1298 if e.errno == errno.ESPIPE:
1306 if e.errno == errno.ESPIPE:
1299 self._seekable = False
1307 self._seekable = False
1300 else:
1308 else:
1301 raise
1309 raise
1302 return None
1310 return None
1303
1311
1304 # These are only the static capabilities.
1312 # These are only the static capabilities.
1305 # Check the 'getrepocaps' function for the rest.
1313 # Check the 'getrepocaps' function for the rest.
1306 capabilities = {'HG20': (),
1314 capabilities = {'HG20': (),
1307 'error': ('abort', 'unsupportedcontent', 'pushraced',
1315 'error': ('abort', 'unsupportedcontent', 'pushraced',
1308 'pushkey'),
1316 'pushkey'),
1309 'listkeys': (),
1317 'listkeys': (),
1310 'pushkey': (),
1318 'pushkey': (),
1311 'digests': tuple(sorted(util.DIGESTS.keys())),
1319 'digests': tuple(sorted(util.DIGESTS.keys())),
1312 'remote-changegroup': ('http', 'https'),
1320 'remote-changegroup': ('http', 'https'),
1313 'hgtagsfnodes': (),
1321 'hgtagsfnodes': (),
1314 }
1322 }
1315
1323
1316 def getrepocaps(repo, allowpushback=False):
1324 def getrepocaps(repo, allowpushback=False):
1317 """return the bundle2 capabilities for a given repo
1325 """return the bundle2 capabilities for a given repo
1318
1326
1319 Exists to allow extensions (like evolution) to mutate the capabilities.
1327 Exists to allow extensions (like evolution) to mutate the capabilities.
1320 """
1328 """
1321 caps = capabilities.copy()
1329 caps = capabilities.copy()
1322 caps['changegroup'] = tuple(sorted(
1330 caps['changegroup'] = tuple(sorted(
1323 changegroup.supportedincomingversions(repo)))
1331 changegroup.supportedincomingversions(repo)))
1324 if obsolete.isenabled(repo, obsolete.exchangeopt):
1332 if obsolete.isenabled(repo, obsolete.exchangeopt):
1325 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1333 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1326 caps['obsmarkers'] = supportedformat
1334 caps['obsmarkers'] = supportedformat
1327 if allowpushback:
1335 if allowpushback:
1328 caps['pushback'] = ()
1336 caps['pushback'] = ()
1329 cpmode = repo.ui.config('server', 'concurrent-push-mode', 'strict')
1337 cpmode = repo.ui.config('server', 'concurrent-push-mode', 'strict')
1330 if cpmode == 'check-related':
1338 if cpmode == 'check-related':
1331 caps['checkheads'] = ('related',)
1339 caps['checkheads'] = ('related',)
1332 return caps
1340 return caps
1333
1341
1334 def bundle2caps(remote):
1342 def bundle2caps(remote):
1335 """return the bundle capabilities of a peer as dict"""
1343 """return the bundle capabilities of a peer as dict"""
1336 raw = remote.capable('bundle2')
1344 raw = remote.capable('bundle2')
1337 if not raw and raw != '':
1345 if not raw and raw != '':
1338 return {}
1346 return {}
1339 capsblob = urlreq.unquote(remote.capable('bundle2'))
1347 capsblob = urlreq.unquote(remote.capable('bundle2'))
1340 return decodecaps(capsblob)
1348 return decodecaps(capsblob)
1341
1349
1342 def obsmarkersversion(caps):
1350 def obsmarkersversion(caps):
1343 """extract the list of supported obsmarkers versions from a bundle2caps dict
1351 """extract the list of supported obsmarkers versions from a bundle2caps dict
1344 """
1352 """
1345 obscaps = caps.get('obsmarkers', ())
1353 obscaps = caps.get('obsmarkers', ())
1346 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1354 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1347
1355
1348 def writenewbundle(ui, repo, source, filename, bundletype, outgoing, opts,
1356 def writenewbundle(ui, repo, source, filename, bundletype, outgoing, opts,
1349 vfs=None, compression=None, compopts=None):
1357 vfs=None, compression=None, compopts=None):
1350 if bundletype.startswith('HG10'):
1358 if bundletype.startswith('HG10'):
1351 cg = changegroup.getchangegroup(repo, source, outgoing, version='01')
1359 cg = changegroup.getchangegroup(repo, source, outgoing, version='01')
1352 return writebundle(ui, cg, filename, bundletype, vfs=vfs,
1360 return writebundle(ui, cg, filename, bundletype, vfs=vfs,
1353 compression=compression, compopts=compopts)
1361 compression=compression, compopts=compopts)
1354 elif not bundletype.startswith('HG20'):
1362 elif not bundletype.startswith('HG20'):
1355 raise error.ProgrammingError('unknown bundle type: %s' % bundletype)
1363 raise error.ProgrammingError('unknown bundle type: %s' % bundletype)
1356
1364
1357 caps = {}
1365 caps = {}
1358 if 'obsolescence' in opts:
1366 if 'obsolescence' in opts:
1359 caps['obsmarkers'] = ('V1',)
1367 caps['obsmarkers'] = ('V1',)
1360 bundle = bundle20(ui, caps)
1368 bundle = bundle20(ui, caps)
1361 bundle.setcompression(compression, compopts)
1369 bundle.setcompression(compression, compopts)
1362 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1370 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1363 chunkiter = bundle.getchunks()
1371 chunkiter = bundle.getchunks()
1364
1372
1365 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1373 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1366
1374
1367 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1375 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1368 # We should eventually reconcile this logic with the one behind
1376 # We should eventually reconcile this logic with the one behind
1369 # 'exchange.getbundle2partsgenerator'.
1377 # 'exchange.getbundle2partsgenerator'.
1370 #
1378 #
1371 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1379 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1372 # different right now. So we keep them separated for now for the sake of
1380 # different right now. So we keep them separated for now for the sake of
1373 # simplicity.
1381 # simplicity.
1374
1382
1375 # we always want a changegroup in such bundle
1383 # we always want a changegroup in such bundle
1376 cgversion = opts.get('cg.version')
1384 cgversion = opts.get('cg.version')
1377 if cgversion is None:
1385 if cgversion is None:
1378 cgversion = changegroup.safeversion(repo)
1386 cgversion = changegroup.safeversion(repo)
1379 cg = changegroup.getchangegroup(repo, source, outgoing,
1387 cg = changegroup.getchangegroup(repo, source, outgoing,
1380 version=cgversion)
1388 version=cgversion)
1381 part = bundler.newpart('changegroup', data=cg.getchunks())
1389 part = bundler.newpart('changegroup', data=cg.getchunks())
1382 part.addparam('version', cg.version)
1390 part.addparam('version', cg.version)
1383 if 'clcount' in cg.extras:
1391 if 'clcount' in cg.extras:
1384 part.addparam('nbchanges', str(cg.extras['clcount']),
1392 part.addparam('nbchanges', str(cg.extras['clcount']),
1385 mandatory=False)
1393 mandatory=False)
1386
1394
1387 addparttagsfnodescache(repo, bundler, outgoing)
1395 addparttagsfnodescache(repo, bundler, outgoing)
1388
1396
1389 if opts.get('obsolescence', False):
1397 if opts.get('obsolescence', False):
1390 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1398 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1391 buildobsmarkerspart(bundler, obsmarkers)
1399 buildobsmarkerspart(bundler, obsmarkers)
1392
1400
1393 if opts.get('phases', False):
1401 if opts.get('phases', False):
1394 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1402 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1395 phasedata = []
1403 phasedata = []
1396 for phase in phases.allphases:
1404 for phase in phases.allphases:
1397 for head in headsbyphase[phase]:
1405 for head in headsbyphase[phase]:
1398 phasedata.append(_pack(_fphasesentry, phase, head))
1406 phasedata.append(_pack(_fphasesentry, phase, head))
1399 bundler.newpart('phase-heads', data=''.join(phasedata))
1407 bundler.newpart('phase-heads', data=''.join(phasedata))
1400
1408
1401 def addparttagsfnodescache(repo, bundler, outgoing):
1409 def addparttagsfnodescache(repo, bundler, outgoing):
1402 # we include the tags fnode cache for the bundle changeset
1410 # we include the tags fnode cache for the bundle changeset
1403 # (as an optional parts)
1411 # (as an optional parts)
1404 cache = tags.hgtagsfnodescache(repo.unfiltered())
1412 cache = tags.hgtagsfnodescache(repo.unfiltered())
1405 chunks = []
1413 chunks = []
1406
1414
1407 # .hgtags fnodes are only relevant for head changesets. While we could
1415 # .hgtags fnodes are only relevant for head changesets. While we could
1408 # transfer values for all known nodes, there will likely be little to
1416 # transfer values for all known nodes, there will likely be little to
1409 # no benefit.
1417 # no benefit.
1410 #
1418 #
1411 # We don't bother using a generator to produce output data because
1419 # We don't bother using a generator to produce output data because
1412 # a) we only have 40 bytes per head and even esoteric numbers of heads
1420 # a) we only have 40 bytes per head and even esoteric numbers of heads
1413 # consume little memory (1M heads is 40MB) b) we don't want to send the
1421 # consume little memory (1M heads is 40MB) b) we don't want to send the
1414 # part if we don't have entries and knowing if we have entries requires
1422 # part if we don't have entries and knowing if we have entries requires
1415 # cache lookups.
1423 # cache lookups.
1416 for node in outgoing.missingheads:
1424 for node in outgoing.missingheads:
1417 # Don't compute missing, as this may slow down serving.
1425 # Don't compute missing, as this may slow down serving.
1418 fnode = cache.getfnode(node, computemissing=False)
1426 fnode = cache.getfnode(node, computemissing=False)
1419 if fnode is not None:
1427 if fnode is not None:
1420 chunks.extend([node, fnode])
1428 chunks.extend([node, fnode])
1421
1429
1422 if chunks:
1430 if chunks:
1423 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1431 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1424
1432
1425 def buildobsmarkerspart(bundler, markers):
1433 def buildobsmarkerspart(bundler, markers):
1426 """add an obsmarker part to the bundler with <markers>
1434 """add an obsmarker part to the bundler with <markers>
1427
1435
1428 No part is created if markers is empty.
1436 No part is created if markers is empty.
1429 Raises ValueError if the bundler doesn't support any known obsmarker format.
1437 Raises ValueError if the bundler doesn't support any known obsmarker format.
1430 """
1438 """
1431 if not markers:
1439 if not markers:
1432 return None
1440 return None
1433
1441
1434 remoteversions = obsmarkersversion(bundler.capabilities)
1442 remoteversions = obsmarkersversion(bundler.capabilities)
1435 version = obsolete.commonversion(remoteversions)
1443 version = obsolete.commonversion(remoteversions)
1436 if version is None:
1444 if version is None:
1437 raise ValueError('bundler does not support common obsmarker format')
1445 raise ValueError('bundler does not support common obsmarker format')
1438 stream = obsolete.encodemarkers(markers, True, version=version)
1446 stream = obsolete.encodemarkers(markers, True, version=version)
1439 return bundler.newpart('obsmarkers', data=stream)
1447 return bundler.newpart('obsmarkers', data=stream)
1440
1448
1441 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1449 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1442 compopts=None):
1450 compopts=None):
1443 """Write a bundle file and return its filename.
1451 """Write a bundle file and return its filename.
1444
1452
1445 Existing files will not be overwritten.
1453 Existing files will not be overwritten.
1446 If no filename is specified, a temporary file is created.
1454 If no filename is specified, a temporary file is created.
1447 bz2 compression can be turned off.
1455 bz2 compression can be turned off.
1448 The bundle file will be deleted in case of errors.
1456 The bundle file will be deleted in case of errors.
1449 """
1457 """
1450
1458
1451 if bundletype == "HG20":
1459 if bundletype == "HG20":
1452 bundle = bundle20(ui)
1460 bundle = bundle20(ui)
1453 bundle.setcompression(compression, compopts)
1461 bundle.setcompression(compression, compopts)
1454 part = bundle.newpart('changegroup', data=cg.getchunks())
1462 part = bundle.newpart('changegroup', data=cg.getchunks())
1455 part.addparam('version', cg.version)
1463 part.addparam('version', cg.version)
1456 if 'clcount' in cg.extras:
1464 if 'clcount' in cg.extras:
1457 part.addparam('nbchanges', str(cg.extras['clcount']),
1465 part.addparam('nbchanges', str(cg.extras['clcount']),
1458 mandatory=False)
1466 mandatory=False)
1459 chunkiter = bundle.getchunks()
1467 chunkiter = bundle.getchunks()
1460 else:
1468 else:
1461 # compression argument is only for the bundle2 case
1469 # compression argument is only for the bundle2 case
1462 assert compression is None
1470 assert compression is None
1463 if cg.version != '01':
1471 if cg.version != '01':
1464 raise error.Abort(_('old bundle types only supports v1 '
1472 raise error.Abort(_('old bundle types only supports v1 '
1465 'changegroups'))
1473 'changegroups'))
1466 header, comp = bundletypes[bundletype]
1474 header, comp = bundletypes[bundletype]
1467 if comp not in util.compengines.supportedbundletypes:
1475 if comp not in util.compengines.supportedbundletypes:
1468 raise error.Abort(_('unknown stream compression type: %s')
1476 raise error.Abort(_('unknown stream compression type: %s')
1469 % comp)
1477 % comp)
1470 compengine = util.compengines.forbundletype(comp)
1478 compengine = util.compengines.forbundletype(comp)
1471 def chunkiter():
1479 def chunkiter():
1472 yield header
1480 yield header
1473 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1481 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1474 yield chunk
1482 yield chunk
1475 chunkiter = chunkiter()
1483 chunkiter = chunkiter()
1476
1484
1477 # parse the changegroup data, otherwise we will block
1485 # parse the changegroup data, otherwise we will block
1478 # in case of sshrepo because we don't know the end of the stream
1486 # in case of sshrepo because we don't know the end of the stream
1479 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1487 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1480
1488
1481 def combinechangegroupresults(op):
1489 def combinechangegroupresults(op):
1482 """logic to combine 0 or more addchangegroup results into one"""
1490 """logic to combine 0 or more addchangegroup results into one"""
1483 results = [r.get('return', 0)
1491 results = [r.get('return', 0)
1484 for r in op.records['changegroup']]
1492 for r in op.records['changegroup']]
1485 changedheads = 0
1493 changedheads = 0
1486 result = 1
1494 result = 1
1487 for ret in results:
1495 for ret in results:
1488 # If any changegroup result is 0, return 0
1496 # If any changegroup result is 0, return 0
1489 if ret == 0:
1497 if ret == 0:
1490 result = 0
1498 result = 0
1491 break
1499 break
1492 if ret < -1:
1500 if ret < -1:
1493 changedheads += ret + 1
1501 changedheads += ret + 1
1494 elif ret > 1:
1502 elif ret > 1:
1495 changedheads += ret - 1
1503 changedheads += ret - 1
1496 if changedheads > 0:
1504 if changedheads > 0:
1497 result = 1 + changedheads
1505 result = 1 + changedheads
1498 elif changedheads < 0:
1506 elif changedheads < 0:
1499 result = -1 + changedheads
1507 result = -1 + changedheads
1500 return result
1508 return result
1501
1509
1502 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest'))
1510 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest'))
1503 def handlechangegroup(op, inpart):
1511 def handlechangegroup(op, inpart):
1504 """apply a changegroup part on the repo
1512 """apply a changegroup part on the repo
1505
1513
1506 This is a very early implementation that will massive rework before being
1514 This is a very early implementation that will massive rework before being
1507 inflicted to any end-user.
1515 inflicted to any end-user.
1508 """
1516 """
1509 tr = op.gettransaction()
1517 tr = op.gettransaction()
1510 unpackerversion = inpart.params.get('version', '01')
1518 unpackerversion = inpart.params.get('version', '01')
1511 # We should raise an appropriate exception here
1519 # We should raise an appropriate exception here
1512 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1520 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1513 # the source and url passed here are overwritten by the one contained in
1521 # the source and url passed here are overwritten by the one contained in
1514 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1522 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1515 nbchangesets = None
1523 nbchangesets = None
1516 if 'nbchanges' in inpart.params:
1524 if 'nbchanges' in inpart.params:
1517 nbchangesets = int(inpart.params.get('nbchanges'))
1525 nbchangesets = int(inpart.params.get('nbchanges'))
1518 if ('treemanifest' in inpart.params and
1526 if ('treemanifest' in inpart.params and
1519 'treemanifest' not in op.repo.requirements):
1527 'treemanifest' not in op.repo.requirements):
1520 if len(op.repo.changelog) != 0:
1528 if len(op.repo.changelog) != 0:
1521 raise error.Abort(_(
1529 raise error.Abort(_(
1522 "bundle contains tree manifests, but local repo is "
1530 "bundle contains tree manifests, but local repo is "
1523 "non-empty and does not use tree manifests"))
1531 "non-empty and does not use tree manifests"))
1524 op.repo.requirements.add('treemanifest')
1532 op.repo.requirements.add('treemanifest')
1525 op.repo._applyopenerreqs()
1533 op.repo._applyopenerreqs()
1526 op.repo._writerequirements()
1534 op.repo._writerequirements()
1527 ret, addednodes = cg.apply(op.repo, tr, 'bundle2', 'bundle2',
1535 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2',
1528 expectedtotal=nbchangesets)
1536 expectedtotal=nbchangesets)
1529 op.records.add('changegroup', {
1530 'return': ret,
1531 'addednodes': addednodes,
1532 })
1533 if op.reply is not None:
1537 if op.reply is not None:
1534 # This is definitely not the final form of this
1538 # This is definitely not the final form of this
1535 # return. But one need to start somewhere.
1539 # return. But one need to start somewhere.
1536 part = op.reply.newpart('reply:changegroup', mandatory=False)
1540 part = op.reply.newpart('reply:changegroup', mandatory=False)
1537 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1541 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1538 part.addparam('return', '%i' % ret, mandatory=False)
1542 part.addparam('return', '%i' % ret, mandatory=False)
1539 assert not inpart.read()
1543 assert not inpart.read()
1540
1544
1541 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1545 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1542 ['digest:%s' % k for k in util.DIGESTS.keys()])
1546 ['digest:%s' % k for k in util.DIGESTS.keys()])
1543 @parthandler('remote-changegroup', _remotechangegroupparams)
1547 @parthandler('remote-changegroup', _remotechangegroupparams)
1544 def handleremotechangegroup(op, inpart):
1548 def handleremotechangegroup(op, inpart):
1545 """apply a bundle10 on the repo, given an url and validation information
1549 """apply a bundle10 on the repo, given an url and validation information
1546
1550
1547 All the information about the remote bundle to import are given as
1551 All the information about the remote bundle to import are given as
1548 parameters. The parameters include:
1552 parameters. The parameters include:
1549 - url: the url to the bundle10.
1553 - url: the url to the bundle10.
1550 - size: the bundle10 file size. It is used to validate what was
1554 - size: the bundle10 file size. It is used to validate what was
1551 retrieved by the client matches the server knowledge about the bundle.
1555 retrieved by the client matches the server knowledge about the bundle.
1552 - digests: a space separated list of the digest types provided as
1556 - digests: a space separated list of the digest types provided as
1553 parameters.
1557 parameters.
1554 - digest:<digest-type>: the hexadecimal representation of the digest with
1558 - digest:<digest-type>: the hexadecimal representation of the digest with
1555 that name. Like the size, it is used to validate what was retrieved by
1559 that name. Like the size, it is used to validate what was retrieved by
1556 the client matches what the server knows about the bundle.
1560 the client matches what the server knows about the bundle.
1557
1561
1558 When multiple digest types are given, all of them are checked.
1562 When multiple digest types are given, all of them are checked.
1559 """
1563 """
1560 try:
1564 try:
1561 raw_url = inpart.params['url']
1565 raw_url = inpart.params['url']
1562 except KeyError:
1566 except KeyError:
1563 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1567 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1564 parsed_url = util.url(raw_url)
1568 parsed_url = util.url(raw_url)
1565 if parsed_url.scheme not in capabilities['remote-changegroup']:
1569 if parsed_url.scheme not in capabilities['remote-changegroup']:
1566 raise error.Abort(_('remote-changegroup does not support %s urls') %
1570 raise error.Abort(_('remote-changegroup does not support %s urls') %
1567 parsed_url.scheme)
1571 parsed_url.scheme)
1568
1572
1569 try:
1573 try:
1570 size = int(inpart.params['size'])
1574 size = int(inpart.params['size'])
1571 except ValueError:
1575 except ValueError:
1572 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1576 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1573 % 'size')
1577 % 'size')
1574 except KeyError:
1578 except KeyError:
1575 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1579 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1576
1580
1577 digests = {}
1581 digests = {}
1578 for typ in inpart.params.get('digests', '').split():
1582 for typ in inpart.params.get('digests', '').split():
1579 param = 'digest:%s' % typ
1583 param = 'digest:%s' % typ
1580 try:
1584 try:
1581 value = inpart.params[param]
1585 value = inpart.params[param]
1582 except KeyError:
1586 except KeyError:
1583 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1587 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1584 param)
1588 param)
1585 digests[typ] = value
1589 digests[typ] = value
1586
1590
1587 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1591 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1588
1592
1589 tr = op.gettransaction()
1593 tr = op.gettransaction()
1590 from . import exchange
1594 from . import exchange
1591 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1595 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1592 if not isinstance(cg, changegroup.cg1unpacker):
1596 if not isinstance(cg, changegroup.cg1unpacker):
1593 raise error.Abort(_('%s: not a bundle version 1.0') %
1597 raise error.Abort(_('%s: not a bundle version 1.0') %
1594 util.hidepassword(raw_url))
1598 util.hidepassword(raw_url))
1595 ret, addednodes = cg.apply(op.repo, tr, 'bundle2', 'bundle2')
1599 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2')
1596 op.records.add('changegroup', {
1597 'return': ret,
1598 'addednodes': addednodes,
1599 })
1600 if op.reply is not None:
1600 if op.reply is not None:
1601 # This is definitely not the final form of this
1601 # This is definitely not the final form of this
1602 # return. But one need to start somewhere.
1602 # return. But one need to start somewhere.
1603 part = op.reply.newpart('reply:changegroup')
1603 part = op.reply.newpart('reply:changegroup')
1604 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1604 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1605 part.addparam('return', '%i' % ret, mandatory=False)
1605 part.addparam('return', '%i' % ret, mandatory=False)
1606 try:
1606 try:
1607 real_part.validate()
1607 real_part.validate()
1608 except error.Abort as e:
1608 except error.Abort as e:
1609 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1609 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1610 (util.hidepassword(raw_url), str(e)))
1610 (util.hidepassword(raw_url), str(e)))
1611 assert not inpart.read()
1611 assert not inpart.read()
1612
1612
1613 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1613 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1614 def handlereplychangegroup(op, inpart):
1614 def handlereplychangegroup(op, inpart):
1615 ret = int(inpart.params['return'])
1615 ret = int(inpart.params['return'])
1616 replyto = int(inpart.params['in-reply-to'])
1616 replyto = int(inpart.params['in-reply-to'])
1617 op.records.add('changegroup', {'return': ret}, replyto)
1617 op.records.add('changegroup', {'return': ret}, replyto)
1618
1618
1619 @parthandler('check:heads')
1619 @parthandler('check:heads')
1620 def handlecheckheads(op, inpart):
1620 def handlecheckheads(op, inpart):
1621 """check that head of the repo did not change
1621 """check that head of the repo did not change
1622
1622
1623 This is used to detect a push race when using unbundle.
1623 This is used to detect a push race when using unbundle.
1624 This replaces the "heads" argument of unbundle."""
1624 This replaces the "heads" argument of unbundle."""
1625 h = inpart.read(20)
1625 h = inpart.read(20)
1626 heads = []
1626 heads = []
1627 while len(h) == 20:
1627 while len(h) == 20:
1628 heads.append(h)
1628 heads.append(h)
1629 h = inpart.read(20)
1629 h = inpart.read(20)
1630 assert not h
1630 assert not h
1631 # Trigger a transaction so that we are guaranteed to have the lock now.
1631 # Trigger a transaction so that we are guaranteed to have the lock now.
1632 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1632 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1633 op.gettransaction()
1633 op.gettransaction()
1634 if sorted(heads) != sorted(op.repo.heads()):
1634 if sorted(heads) != sorted(op.repo.heads()):
1635 raise error.PushRaced('repository changed while pushing - '
1635 raise error.PushRaced('repository changed while pushing - '
1636 'please try again')
1636 'please try again')
1637
1637
1638 @parthandler('check:updated-heads')
1638 @parthandler('check:updated-heads')
1639 def handlecheckupdatedheads(op, inpart):
1639 def handlecheckupdatedheads(op, inpart):
1640 """check for race on the heads touched by a push
1640 """check for race on the heads touched by a push
1641
1641
1642 This is similar to 'check:heads' but focus on the heads actually updated
1642 This is similar to 'check:heads' but focus on the heads actually updated
1643 during the push. If other activities happen on unrelated heads, it is
1643 during the push. If other activities happen on unrelated heads, it is
1644 ignored.
1644 ignored.
1645
1645
1646 This allow server with high traffic to avoid push contention as long as
1646 This allow server with high traffic to avoid push contention as long as
1647 unrelated parts of the graph are involved."""
1647 unrelated parts of the graph are involved."""
1648 h = inpart.read(20)
1648 h = inpart.read(20)
1649 heads = []
1649 heads = []
1650 while len(h) == 20:
1650 while len(h) == 20:
1651 heads.append(h)
1651 heads.append(h)
1652 h = inpart.read(20)
1652 h = inpart.read(20)
1653 assert not h
1653 assert not h
1654 # trigger a transaction so that we are guaranteed to have the lock now.
1654 # trigger a transaction so that we are guaranteed to have the lock now.
1655 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1655 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1656 op.gettransaction()
1656 op.gettransaction()
1657
1657
1658 currentheads = set()
1658 currentheads = set()
1659 for ls in op.repo.branchmap().itervalues():
1659 for ls in op.repo.branchmap().itervalues():
1660 currentheads.update(ls)
1660 currentheads.update(ls)
1661
1661
1662 for h in heads:
1662 for h in heads:
1663 if h not in currentheads:
1663 if h not in currentheads:
1664 raise error.PushRaced('repository changed while pushing - '
1664 raise error.PushRaced('repository changed while pushing - '
1665 'please try again')
1665 'please try again')
1666
1666
1667 @parthandler('output')
1667 @parthandler('output')
1668 def handleoutput(op, inpart):
1668 def handleoutput(op, inpart):
1669 """forward output captured on the server to the client"""
1669 """forward output captured on the server to the client"""
1670 for line in inpart.read().splitlines():
1670 for line in inpart.read().splitlines():
1671 op.ui.status(_('remote: %s\n') % line)
1671 op.ui.status(_('remote: %s\n') % line)
1672
1672
1673 @parthandler('replycaps')
1673 @parthandler('replycaps')
1674 def handlereplycaps(op, inpart):
1674 def handlereplycaps(op, inpart):
1675 """Notify that a reply bundle should be created
1675 """Notify that a reply bundle should be created
1676
1676
1677 The payload contains the capabilities information for the reply"""
1677 The payload contains the capabilities information for the reply"""
1678 caps = decodecaps(inpart.read())
1678 caps = decodecaps(inpart.read())
1679 if op.reply is None:
1679 if op.reply is None:
1680 op.reply = bundle20(op.ui, caps)
1680 op.reply = bundle20(op.ui, caps)
1681
1681
1682 class AbortFromPart(error.Abort):
1682 class AbortFromPart(error.Abort):
1683 """Sub-class of Abort that denotes an error from a bundle2 part."""
1683 """Sub-class of Abort that denotes an error from a bundle2 part."""
1684
1684
1685 @parthandler('error:abort', ('message', 'hint'))
1685 @parthandler('error:abort', ('message', 'hint'))
1686 def handleerrorabort(op, inpart):
1686 def handleerrorabort(op, inpart):
1687 """Used to transmit abort error over the wire"""
1687 """Used to transmit abort error over the wire"""
1688 raise AbortFromPart(inpart.params['message'],
1688 raise AbortFromPart(inpart.params['message'],
1689 hint=inpart.params.get('hint'))
1689 hint=inpart.params.get('hint'))
1690
1690
1691 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1691 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1692 'in-reply-to'))
1692 'in-reply-to'))
1693 def handleerrorpushkey(op, inpart):
1693 def handleerrorpushkey(op, inpart):
1694 """Used to transmit failure of a mandatory pushkey over the wire"""
1694 """Used to transmit failure of a mandatory pushkey over the wire"""
1695 kwargs = {}
1695 kwargs = {}
1696 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1696 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1697 value = inpart.params.get(name)
1697 value = inpart.params.get(name)
1698 if value is not None:
1698 if value is not None:
1699 kwargs[name] = value
1699 kwargs[name] = value
1700 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1700 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1701
1701
1702 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1702 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1703 def handleerrorunsupportedcontent(op, inpart):
1703 def handleerrorunsupportedcontent(op, inpart):
1704 """Used to transmit unknown content error over the wire"""
1704 """Used to transmit unknown content error over the wire"""
1705 kwargs = {}
1705 kwargs = {}
1706 parttype = inpart.params.get('parttype')
1706 parttype = inpart.params.get('parttype')
1707 if parttype is not None:
1707 if parttype is not None:
1708 kwargs['parttype'] = parttype
1708 kwargs['parttype'] = parttype
1709 params = inpart.params.get('params')
1709 params = inpart.params.get('params')
1710 if params is not None:
1710 if params is not None:
1711 kwargs['params'] = params.split('\0')
1711 kwargs['params'] = params.split('\0')
1712
1712
1713 raise error.BundleUnknownFeatureError(**kwargs)
1713 raise error.BundleUnknownFeatureError(**kwargs)
1714
1714
1715 @parthandler('error:pushraced', ('message',))
1715 @parthandler('error:pushraced', ('message',))
1716 def handleerrorpushraced(op, inpart):
1716 def handleerrorpushraced(op, inpart):
1717 """Used to transmit push race error over the wire"""
1717 """Used to transmit push race error over the wire"""
1718 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1718 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1719
1719
1720 @parthandler('listkeys', ('namespace',))
1720 @parthandler('listkeys', ('namespace',))
1721 def handlelistkeys(op, inpart):
1721 def handlelistkeys(op, inpart):
1722 """retrieve pushkey namespace content stored in a bundle2"""
1722 """retrieve pushkey namespace content stored in a bundle2"""
1723 namespace = inpart.params['namespace']
1723 namespace = inpart.params['namespace']
1724 r = pushkey.decodekeys(inpart.read())
1724 r = pushkey.decodekeys(inpart.read())
1725 op.records.add('listkeys', (namespace, r))
1725 op.records.add('listkeys', (namespace, r))
1726
1726
1727 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1727 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1728 def handlepushkey(op, inpart):
1728 def handlepushkey(op, inpart):
1729 """process a pushkey request"""
1729 """process a pushkey request"""
1730 dec = pushkey.decode
1730 dec = pushkey.decode
1731 namespace = dec(inpart.params['namespace'])
1731 namespace = dec(inpart.params['namespace'])
1732 key = dec(inpart.params['key'])
1732 key = dec(inpart.params['key'])
1733 old = dec(inpart.params['old'])
1733 old = dec(inpart.params['old'])
1734 new = dec(inpart.params['new'])
1734 new = dec(inpart.params['new'])
1735 # Grab the transaction to ensure that we have the lock before performing the
1735 # Grab the transaction to ensure that we have the lock before performing the
1736 # pushkey.
1736 # pushkey.
1737 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1737 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1738 op.gettransaction()
1738 op.gettransaction()
1739 ret = op.repo.pushkey(namespace, key, old, new)
1739 ret = op.repo.pushkey(namespace, key, old, new)
1740 record = {'namespace': namespace,
1740 record = {'namespace': namespace,
1741 'key': key,
1741 'key': key,
1742 'old': old,
1742 'old': old,
1743 'new': new}
1743 'new': new}
1744 op.records.add('pushkey', record)
1744 op.records.add('pushkey', record)
1745 if op.reply is not None:
1745 if op.reply is not None:
1746 rpart = op.reply.newpart('reply:pushkey')
1746 rpart = op.reply.newpart('reply:pushkey')
1747 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1747 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1748 rpart.addparam('return', '%i' % ret, mandatory=False)
1748 rpart.addparam('return', '%i' % ret, mandatory=False)
1749 if inpart.mandatory and not ret:
1749 if inpart.mandatory and not ret:
1750 kwargs = {}
1750 kwargs = {}
1751 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1751 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1752 if key in inpart.params:
1752 if key in inpart.params:
1753 kwargs[key] = inpart.params[key]
1753 kwargs[key] = inpart.params[key]
1754 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1754 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1755
1755
1756 def _readphaseheads(inpart):
1756 def _readphaseheads(inpart):
1757 headsbyphase = [[] for i in phases.allphases]
1757 headsbyphase = [[] for i in phases.allphases]
1758 entrysize = struct.calcsize(_fphasesentry)
1758 entrysize = struct.calcsize(_fphasesentry)
1759 while True:
1759 while True:
1760 entry = inpart.read(entrysize)
1760 entry = inpart.read(entrysize)
1761 if len(entry) < entrysize:
1761 if len(entry) < entrysize:
1762 if entry:
1762 if entry:
1763 raise error.Abort(_('bad phase-heads bundle part'))
1763 raise error.Abort(_('bad phase-heads bundle part'))
1764 break
1764 break
1765 phase, node = struct.unpack(_fphasesentry, entry)
1765 phase, node = struct.unpack(_fphasesentry, entry)
1766 headsbyphase[phase].append(node)
1766 headsbyphase[phase].append(node)
1767 return headsbyphase
1767 return headsbyphase
1768
1768
1769 @parthandler('phase-heads')
1769 @parthandler('phase-heads')
1770 def handlephases(op, inpart):
1770 def handlephases(op, inpart):
1771 """apply phases from bundle part to repo"""
1771 """apply phases from bundle part to repo"""
1772 headsbyphase = _readphaseheads(inpart)
1772 headsbyphase = _readphaseheads(inpart)
1773 addednodes = []
1773 addednodes = []
1774 for entry in op.records['changegroup']:
1774 for entry in op.records['changegroup']:
1775 addednodes.extend(entry['addednodes'])
1775 addednodes.extend(entry['addednodes'])
1776 phases.updatephases(op.repo.unfiltered(), op.gettransaction(), headsbyphase,
1776 phases.updatephases(op.repo.unfiltered(), op.gettransaction(), headsbyphase,
1777 addednodes)
1777 addednodes)
1778
1778
1779 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1779 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1780 def handlepushkeyreply(op, inpart):
1780 def handlepushkeyreply(op, inpart):
1781 """retrieve the result of a pushkey request"""
1781 """retrieve the result of a pushkey request"""
1782 ret = int(inpart.params['return'])
1782 ret = int(inpart.params['return'])
1783 partid = int(inpart.params['in-reply-to'])
1783 partid = int(inpart.params['in-reply-to'])
1784 op.records.add('pushkey', {'return': ret}, partid)
1784 op.records.add('pushkey', {'return': ret}, partid)
1785
1785
1786 @parthandler('obsmarkers')
1786 @parthandler('obsmarkers')
1787 def handleobsmarker(op, inpart):
1787 def handleobsmarker(op, inpart):
1788 """add a stream of obsmarkers to the repo"""
1788 """add a stream of obsmarkers to the repo"""
1789 tr = op.gettransaction()
1789 tr = op.gettransaction()
1790 markerdata = inpart.read()
1790 markerdata = inpart.read()
1791 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1791 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1792 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1792 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1793 % len(markerdata))
1793 % len(markerdata))
1794 # The mergemarkers call will crash if marker creation is not enabled.
1794 # The mergemarkers call will crash if marker creation is not enabled.
1795 # we want to avoid this if the part is advisory.
1795 # we want to avoid this if the part is advisory.
1796 if not inpart.mandatory and op.repo.obsstore.readonly:
1796 if not inpart.mandatory and op.repo.obsstore.readonly:
1797 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1797 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1798 return
1798 return
1799 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1799 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1800 op.repo.invalidatevolatilesets()
1800 op.repo.invalidatevolatilesets()
1801 if new:
1801 if new:
1802 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1802 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1803 op.records.add('obsmarkers', {'new': new})
1803 op.records.add('obsmarkers', {'new': new})
1804 if op.reply is not None:
1804 if op.reply is not None:
1805 rpart = op.reply.newpart('reply:obsmarkers')
1805 rpart = op.reply.newpart('reply:obsmarkers')
1806 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1806 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1807 rpart.addparam('new', '%i' % new, mandatory=False)
1807 rpart.addparam('new', '%i' % new, mandatory=False)
1808
1808
1809
1809
1810 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1810 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1811 def handleobsmarkerreply(op, inpart):
1811 def handleobsmarkerreply(op, inpart):
1812 """retrieve the result of a pushkey request"""
1812 """retrieve the result of a pushkey request"""
1813 ret = int(inpart.params['new'])
1813 ret = int(inpart.params['new'])
1814 partid = int(inpart.params['in-reply-to'])
1814 partid = int(inpart.params['in-reply-to'])
1815 op.records.add('obsmarkers', {'new': ret}, partid)
1815 op.records.add('obsmarkers', {'new': ret}, partid)
1816
1816
1817 @parthandler('hgtagsfnodes')
1817 @parthandler('hgtagsfnodes')
1818 def handlehgtagsfnodes(op, inpart):
1818 def handlehgtagsfnodes(op, inpart):
1819 """Applies .hgtags fnodes cache entries to the local repo.
1819 """Applies .hgtags fnodes cache entries to the local repo.
1820
1820
1821 Payload is pairs of 20 byte changeset nodes and filenodes.
1821 Payload is pairs of 20 byte changeset nodes and filenodes.
1822 """
1822 """
1823 # Grab the transaction so we ensure that we have the lock at this point.
1823 # Grab the transaction so we ensure that we have the lock at this point.
1824 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1824 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1825 op.gettransaction()
1825 op.gettransaction()
1826 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1826 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1827
1827
1828 count = 0
1828 count = 0
1829 while True:
1829 while True:
1830 node = inpart.read(20)
1830 node = inpart.read(20)
1831 fnode = inpart.read(20)
1831 fnode = inpart.read(20)
1832 if len(node) < 20 or len(fnode) < 20:
1832 if len(node) < 20 or len(fnode) < 20:
1833 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1833 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1834 break
1834 break
1835 cache.setfnode(node, fnode)
1835 cache.setfnode(node, fnode)
1836 count += 1
1836 count += 1
1837
1837
1838 cache.write()
1838 cache.write()
1839 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
1839 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
General Comments 0
You need to be logged in to leave comments. Login now