##// END OF EJS Templates
bundle2: support the 'targetphase' parameter for the changegroup part...
Boris Feld -
r33407:39d4e5a6 default
parent child Browse files
Show More
@@ -1,1850 +1,1855 b''
1 # bundle2.py - generic container format to transmit arbitrary data.
1 # bundle2.py - generic container format to transmit arbitrary data.
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7 """Handling of the new bundle2 format
7 """Handling of the new bundle2 format
8
8
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 payloads in an application agnostic way. It consist in a sequence of "parts"
10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 that will be handed to and processed by the application layer.
11 that will be handed to and processed by the application layer.
12
12
13
13
14 General format architecture
14 General format architecture
15 ===========================
15 ===========================
16
16
17 The format is architectured as follow
17 The format is architectured as follow
18
18
19 - magic string
19 - magic string
20 - stream level parameters
20 - stream level parameters
21 - payload parts (any number)
21 - payload parts (any number)
22 - end of stream marker.
22 - end of stream marker.
23
23
24 the Binary format
24 the Binary format
25 ============================
25 ============================
26
26
27 All numbers are unsigned and big-endian.
27 All numbers are unsigned and big-endian.
28
28
29 stream level parameters
29 stream level parameters
30 ------------------------
30 ------------------------
31
31
32 Binary format is as follow
32 Binary format is as follow
33
33
34 :params size: int32
34 :params size: int32
35
35
36 The total number of Bytes used by the parameters
36 The total number of Bytes used by the parameters
37
37
38 :params value: arbitrary number of Bytes
38 :params value: arbitrary number of Bytes
39
39
40 A blob of `params size` containing the serialized version of all stream level
40 A blob of `params size` containing the serialized version of all stream level
41 parameters.
41 parameters.
42
42
43 The blob contains a space separated list of parameters. Parameters with value
43 The blob contains a space separated list of parameters. Parameters with value
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45
45
46 Empty name are obviously forbidden.
46 Empty name are obviously forbidden.
47
47
48 Name MUST start with a letter. If this first letter is lower case, the
48 Name MUST start with a letter. If this first letter is lower case, the
49 parameter is advisory and can be safely ignored. However when the first
49 parameter is advisory and can be safely ignored. However when the first
50 letter is capital, the parameter is mandatory and the bundling process MUST
50 letter is capital, the parameter is mandatory and the bundling process MUST
51 stop if he is not able to proceed it.
51 stop if he is not able to proceed it.
52
52
53 Stream parameters use a simple textual format for two main reasons:
53 Stream parameters use a simple textual format for two main reasons:
54
54
55 - Stream level parameters should remain simple and we want to discourage any
55 - Stream level parameters should remain simple and we want to discourage any
56 crazy usage.
56 crazy usage.
57 - Textual data allow easy human inspection of a bundle2 header in case of
57 - Textual data allow easy human inspection of a bundle2 header in case of
58 troubles.
58 troubles.
59
59
60 Any Applicative level options MUST go into a bundle2 part instead.
60 Any Applicative level options MUST go into a bundle2 part instead.
61
61
62 Payload part
62 Payload part
63 ------------------------
63 ------------------------
64
64
65 Binary format is as follow
65 Binary format is as follow
66
66
67 :header size: int32
67 :header size: int32
68
68
69 The total number of Bytes used by the part header. When the header is empty
69 The total number of Bytes used by the part header. When the header is empty
70 (size = 0) this is interpreted as the end of stream marker.
70 (size = 0) this is interpreted as the end of stream marker.
71
71
72 :header:
72 :header:
73
73
74 The header defines how to interpret the part. It contains two piece of
74 The header defines how to interpret the part. It contains two piece of
75 data: the part type, and the part parameters.
75 data: the part type, and the part parameters.
76
76
77 The part type is used to route an application level handler, that can
77 The part type is used to route an application level handler, that can
78 interpret payload.
78 interpret payload.
79
79
80 Part parameters are passed to the application level handler. They are
80 Part parameters are passed to the application level handler. They are
81 meant to convey information that will help the application level object to
81 meant to convey information that will help the application level object to
82 interpret the part payload.
82 interpret the part payload.
83
83
84 The binary format of the header is has follow
84 The binary format of the header is has follow
85
85
86 :typesize: (one byte)
86 :typesize: (one byte)
87
87
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89
89
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 to this part.
91 to this part.
92
92
93 :parameters:
93 :parameters:
94
94
95 Part's parameter may have arbitrary content, the binary structure is::
95 Part's parameter may have arbitrary content, the binary structure is::
96
96
97 <mandatory-count><advisory-count><param-sizes><param-data>
97 <mandatory-count><advisory-count><param-sizes><param-data>
98
98
99 :mandatory-count: 1 byte, number of mandatory parameters
99 :mandatory-count: 1 byte, number of mandatory parameters
100
100
101 :advisory-count: 1 byte, number of advisory parameters
101 :advisory-count: 1 byte, number of advisory parameters
102
102
103 :param-sizes:
103 :param-sizes:
104
104
105 N couple of bytes, where N is the total number of parameters. Each
105 N couple of bytes, where N is the total number of parameters. Each
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107
107
108 :param-data:
108 :param-data:
109
109
110 A blob of bytes from which each parameter key and value can be
110 A blob of bytes from which each parameter key and value can be
111 retrieved using the list of size couples stored in the previous
111 retrieved using the list of size couples stored in the previous
112 field.
112 field.
113
113
114 Mandatory parameters comes first, then the advisory ones.
114 Mandatory parameters comes first, then the advisory ones.
115
115
116 Each parameter's key MUST be unique within the part.
116 Each parameter's key MUST be unique within the part.
117
117
118 :payload:
118 :payload:
119
119
120 payload is a series of `<chunksize><chunkdata>`.
120 payload is a series of `<chunksize><chunkdata>`.
121
121
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124
124
125 The current implementation always produces either zero or one chunk.
125 The current implementation always produces either zero or one chunk.
126 This is an implementation limitation that will ultimately be lifted.
126 This is an implementation limitation that will ultimately be lifted.
127
127
128 `chunksize` can be negative to trigger special case processing. No such
128 `chunksize` can be negative to trigger special case processing. No such
129 processing is in place yet.
129 processing is in place yet.
130
130
131 Bundle processing
131 Bundle processing
132 ============================
132 ============================
133
133
134 Each part is processed in order using a "part handler". Handler are registered
134 Each part is processed in order using a "part handler". Handler are registered
135 for a certain part type.
135 for a certain part type.
136
136
137 The matching of a part to its handler is case insensitive. The case of the
137 The matching of a part to its handler is case insensitive. The case of the
138 part type is used to know if a part is mandatory or advisory. If the Part type
138 part type is used to know if a part is mandatory or advisory. If the Part type
139 contains any uppercase char it is considered mandatory. When no handler is
139 contains any uppercase char it is considered mandatory. When no handler is
140 known for a Mandatory part, the process is aborted and an exception is raised.
140 known for a Mandatory part, the process is aborted and an exception is raised.
141 If the part is advisory and no handler is known, the part is ignored. When the
141 If the part is advisory and no handler is known, the part is ignored. When the
142 process is aborted, the full bundle is still read from the stream to keep the
142 process is aborted, the full bundle is still read from the stream to keep the
143 channel usable. But none of the part read from an abort are processed. In the
143 channel usable. But none of the part read from an abort are processed. In the
144 future, dropping the stream may become an option for channel we do not care to
144 future, dropping the stream may become an option for channel we do not care to
145 preserve.
145 preserve.
146 """
146 """
147
147
148 from __future__ import absolute_import
148 from __future__ import absolute_import
149
149
150 import errno
150 import errno
151 import re
151 import re
152 import string
152 import string
153 import struct
153 import struct
154 import sys
154 import sys
155
155
156 from .i18n import _
156 from .i18n import _
157 from . import (
157 from . import (
158 changegroup,
158 changegroup,
159 error,
159 error,
160 obsolete,
160 obsolete,
161 phases,
161 phases,
162 pushkey,
162 pushkey,
163 pycompat,
163 pycompat,
164 scmutil,
164 scmutil,
165 tags,
165 tags,
166 url,
166 url,
167 util,
167 util,
168 )
168 )
169
169
170 urlerr = util.urlerr
170 urlerr = util.urlerr
171 urlreq = util.urlreq
171 urlreq = util.urlreq
172
172
173 _pack = struct.pack
173 _pack = struct.pack
174 _unpack = struct.unpack
174 _unpack = struct.unpack
175
175
176 _fstreamparamsize = '>i'
176 _fstreamparamsize = '>i'
177 _fpartheadersize = '>i'
177 _fpartheadersize = '>i'
178 _fparttypesize = '>B'
178 _fparttypesize = '>B'
179 _fpartid = '>I'
179 _fpartid = '>I'
180 _fpayloadsize = '>i'
180 _fpayloadsize = '>i'
181 _fpartparamcount = '>BB'
181 _fpartparamcount = '>BB'
182
182
183 _fphasesentry = '>i20s'
183 _fphasesentry = '>i20s'
184
184
185 preferedchunksize = 4096
185 preferedchunksize = 4096
186
186
187 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
187 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
188
188
189 def outdebug(ui, message):
189 def outdebug(ui, message):
190 """debug regarding output stream (bundling)"""
190 """debug regarding output stream (bundling)"""
191 if ui.configbool('devel', 'bundle2.debug'):
191 if ui.configbool('devel', 'bundle2.debug'):
192 ui.debug('bundle2-output: %s\n' % message)
192 ui.debug('bundle2-output: %s\n' % message)
193
193
194 def indebug(ui, message):
194 def indebug(ui, message):
195 """debug on input stream (unbundling)"""
195 """debug on input stream (unbundling)"""
196 if ui.configbool('devel', 'bundle2.debug'):
196 if ui.configbool('devel', 'bundle2.debug'):
197 ui.debug('bundle2-input: %s\n' % message)
197 ui.debug('bundle2-input: %s\n' % message)
198
198
199 def validateparttype(parttype):
199 def validateparttype(parttype):
200 """raise ValueError if a parttype contains invalid character"""
200 """raise ValueError if a parttype contains invalid character"""
201 if _parttypeforbidden.search(parttype):
201 if _parttypeforbidden.search(parttype):
202 raise ValueError(parttype)
202 raise ValueError(parttype)
203
203
204 def _makefpartparamsizes(nbparams):
204 def _makefpartparamsizes(nbparams):
205 """return a struct format to read part parameter sizes
205 """return a struct format to read part parameter sizes
206
206
207 The number parameters is variable so we need to build that format
207 The number parameters is variable so we need to build that format
208 dynamically.
208 dynamically.
209 """
209 """
210 return '>'+('BB'*nbparams)
210 return '>'+('BB'*nbparams)
211
211
212 parthandlermapping = {}
212 parthandlermapping = {}
213
213
214 def parthandler(parttype, params=()):
214 def parthandler(parttype, params=()):
215 """decorator that register a function as a bundle2 part handler
215 """decorator that register a function as a bundle2 part handler
216
216
217 eg::
217 eg::
218
218
219 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
219 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
220 def myparttypehandler(...):
220 def myparttypehandler(...):
221 '''process a part of type "my part".'''
221 '''process a part of type "my part".'''
222 ...
222 ...
223 """
223 """
224 validateparttype(parttype)
224 validateparttype(parttype)
225 def _decorator(func):
225 def _decorator(func):
226 lparttype = parttype.lower() # enforce lower case matching.
226 lparttype = parttype.lower() # enforce lower case matching.
227 assert lparttype not in parthandlermapping
227 assert lparttype not in parthandlermapping
228 parthandlermapping[lparttype] = func
228 parthandlermapping[lparttype] = func
229 func.params = frozenset(params)
229 func.params = frozenset(params)
230 return func
230 return func
231 return _decorator
231 return _decorator
232
232
233 class unbundlerecords(object):
233 class unbundlerecords(object):
234 """keep record of what happens during and unbundle
234 """keep record of what happens during and unbundle
235
235
236 New records are added using `records.add('cat', obj)`. Where 'cat' is a
236 New records are added using `records.add('cat', obj)`. Where 'cat' is a
237 category of record and obj is an arbitrary object.
237 category of record and obj is an arbitrary object.
238
238
239 `records['cat']` will return all entries of this category 'cat'.
239 `records['cat']` will return all entries of this category 'cat'.
240
240
241 Iterating on the object itself will yield `('category', obj)` tuples
241 Iterating on the object itself will yield `('category', obj)` tuples
242 for all entries.
242 for all entries.
243
243
244 All iterations happens in chronological order.
244 All iterations happens in chronological order.
245 """
245 """
246
246
247 def __init__(self):
247 def __init__(self):
248 self._categories = {}
248 self._categories = {}
249 self._sequences = []
249 self._sequences = []
250 self._replies = {}
250 self._replies = {}
251
251
252 def add(self, category, entry, inreplyto=None):
252 def add(self, category, entry, inreplyto=None):
253 """add a new record of a given category.
253 """add a new record of a given category.
254
254
255 The entry can then be retrieved in the list returned by
255 The entry can then be retrieved in the list returned by
256 self['category']."""
256 self['category']."""
257 self._categories.setdefault(category, []).append(entry)
257 self._categories.setdefault(category, []).append(entry)
258 self._sequences.append((category, entry))
258 self._sequences.append((category, entry))
259 if inreplyto is not None:
259 if inreplyto is not None:
260 self.getreplies(inreplyto).add(category, entry)
260 self.getreplies(inreplyto).add(category, entry)
261
261
262 def getreplies(self, partid):
262 def getreplies(self, partid):
263 """get the records that are replies to a specific part"""
263 """get the records that are replies to a specific part"""
264 return self._replies.setdefault(partid, unbundlerecords())
264 return self._replies.setdefault(partid, unbundlerecords())
265
265
266 def __getitem__(self, cat):
266 def __getitem__(self, cat):
267 return tuple(self._categories.get(cat, ()))
267 return tuple(self._categories.get(cat, ()))
268
268
269 def __iter__(self):
269 def __iter__(self):
270 return iter(self._sequences)
270 return iter(self._sequences)
271
271
272 def __len__(self):
272 def __len__(self):
273 return len(self._sequences)
273 return len(self._sequences)
274
274
275 def __nonzero__(self):
275 def __nonzero__(self):
276 return bool(self._sequences)
276 return bool(self._sequences)
277
277
278 __bool__ = __nonzero__
278 __bool__ = __nonzero__
279
279
280 class bundleoperation(object):
280 class bundleoperation(object):
281 """an object that represents a single bundling process
281 """an object that represents a single bundling process
282
282
283 Its purpose is to carry unbundle-related objects and states.
283 Its purpose is to carry unbundle-related objects and states.
284
284
285 A new object should be created at the beginning of each bundle processing.
285 A new object should be created at the beginning of each bundle processing.
286 The object is to be returned by the processing function.
286 The object is to be returned by the processing function.
287
287
288 The object has very little content now it will ultimately contain:
288 The object has very little content now it will ultimately contain:
289 * an access to the repo the bundle is applied to,
289 * an access to the repo the bundle is applied to,
290 * a ui object,
290 * a ui object,
291 * a way to retrieve a transaction to add changes to the repo,
291 * a way to retrieve a transaction to add changes to the repo,
292 * a way to record the result of processing each part,
292 * a way to record the result of processing each part,
293 * a way to construct a bundle response when applicable.
293 * a way to construct a bundle response when applicable.
294 """
294 """
295
295
296 def __init__(self, repo, transactiongetter, captureoutput=True):
296 def __init__(self, repo, transactiongetter, captureoutput=True):
297 self.repo = repo
297 self.repo = repo
298 self.ui = repo.ui
298 self.ui = repo.ui
299 self.records = unbundlerecords()
299 self.records = unbundlerecords()
300 self.gettransaction = transactiongetter
300 self.gettransaction = transactiongetter
301 self.reply = None
301 self.reply = None
302 self.captureoutput = captureoutput
302 self.captureoutput = captureoutput
303
303
304 class TransactionUnavailable(RuntimeError):
304 class TransactionUnavailable(RuntimeError):
305 pass
305 pass
306
306
307 def _notransaction():
307 def _notransaction():
308 """default method to get a transaction while processing a bundle
308 """default method to get a transaction while processing a bundle
309
309
310 Raise an exception to highlight the fact that no transaction was expected
310 Raise an exception to highlight the fact that no transaction was expected
311 to be created"""
311 to be created"""
312 raise TransactionUnavailable()
312 raise TransactionUnavailable()
313
313
314 def applybundle(repo, unbundler, tr, source=None, url=None, **kwargs):
314 def applybundle(repo, unbundler, tr, source=None, url=None, **kwargs):
315 # transform me into unbundler.apply() as soon as the freeze is lifted
315 # transform me into unbundler.apply() as soon as the freeze is lifted
316 if isinstance(unbundler, unbundle20):
316 if isinstance(unbundler, unbundle20):
317 tr.hookargs['bundle2'] = '1'
317 tr.hookargs['bundle2'] = '1'
318 if source is not None and 'source' not in tr.hookargs:
318 if source is not None and 'source' not in tr.hookargs:
319 tr.hookargs['source'] = source
319 tr.hookargs['source'] = source
320 if url is not None and 'url' not in tr.hookargs:
320 if url is not None and 'url' not in tr.hookargs:
321 tr.hookargs['url'] = url
321 tr.hookargs['url'] = url
322 return processbundle(repo, unbundler, lambda: tr)
322 return processbundle(repo, unbundler, lambda: tr)
323 else:
323 else:
324 # the transactiongetter won't be used, but we might as well set it
324 # the transactiongetter won't be used, but we might as well set it
325 op = bundleoperation(repo, lambda: tr)
325 op = bundleoperation(repo, lambda: tr)
326 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
326 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
327 return op
327 return op
328
328
329 def processbundle(repo, unbundler, transactiongetter=None, op=None):
329 def processbundle(repo, unbundler, transactiongetter=None, op=None):
330 """This function process a bundle, apply effect to/from a repo
330 """This function process a bundle, apply effect to/from a repo
331
331
332 It iterates over each part then searches for and uses the proper handling
332 It iterates over each part then searches for and uses the proper handling
333 code to process the part. Parts are processed in order.
333 code to process the part. Parts are processed in order.
334
334
335 Unknown Mandatory part will abort the process.
335 Unknown Mandatory part will abort the process.
336
336
337 It is temporarily possible to provide a prebuilt bundleoperation to the
337 It is temporarily possible to provide a prebuilt bundleoperation to the
338 function. This is used to ensure output is properly propagated in case of
338 function. This is used to ensure output is properly propagated in case of
339 an error during the unbundling. This output capturing part will likely be
339 an error during the unbundling. This output capturing part will likely be
340 reworked and this ability will probably go away in the process.
340 reworked and this ability will probably go away in the process.
341 """
341 """
342 if op is None:
342 if op is None:
343 if transactiongetter is None:
343 if transactiongetter is None:
344 transactiongetter = _notransaction
344 transactiongetter = _notransaction
345 op = bundleoperation(repo, transactiongetter)
345 op = bundleoperation(repo, transactiongetter)
346 # todo:
346 # todo:
347 # - replace this is a init function soon.
347 # - replace this is a init function soon.
348 # - exception catching
348 # - exception catching
349 unbundler.params
349 unbundler.params
350 if repo.ui.debugflag:
350 if repo.ui.debugflag:
351 msg = ['bundle2-input-bundle:']
351 msg = ['bundle2-input-bundle:']
352 if unbundler.params:
352 if unbundler.params:
353 msg.append(' %i params' % len(unbundler.params))
353 msg.append(' %i params' % len(unbundler.params))
354 if op.gettransaction is None or op.gettransaction is _notransaction:
354 if op.gettransaction is None or op.gettransaction is _notransaction:
355 msg.append(' no-transaction')
355 msg.append(' no-transaction')
356 else:
356 else:
357 msg.append(' with-transaction')
357 msg.append(' with-transaction')
358 msg.append('\n')
358 msg.append('\n')
359 repo.ui.debug(''.join(msg))
359 repo.ui.debug(''.join(msg))
360 iterparts = enumerate(unbundler.iterparts())
360 iterparts = enumerate(unbundler.iterparts())
361 part = None
361 part = None
362 nbpart = 0
362 nbpart = 0
363 try:
363 try:
364 for nbpart, part in iterparts:
364 for nbpart, part in iterparts:
365 _processpart(op, part)
365 _processpart(op, part)
366 except Exception as exc:
366 except Exception as exc:
367 # Any exceptions seeking to the end of the bundle at this point are
367 # Any exceptions seeking to the end of the bundle at this point are
368 # almost certainly related to the underlying stream being bad.
368 # almost certainly related to the underlying stream being bad.
369 # And, chances are that the exception we're handling is related to
369 # And, chances are that the exception we're handling is related to
370 # getting in that bad state. So, we swallow the seeking error and
370 # getting in that bad state. So, we swallow the seeking error and
371 # re-raise the original error.
371 # re-raise the original error.
372 seekerror = False
372 seekerror = False
373 try:
373 try:
374 for nbpart, part in iterparts:
374 for nbpart, part in iterparts:
375 # consume the bundle content
375 # consume the bundle content
376 part.seek(0, 2)
376 part.seek(0, 2)
377 except Exception:
377 except Exception:
378 seekerror = True
378 seekerror = True
379
379
380 # Small hack to let caller code distinguish exceptions from bundle2
380 # Small hack to let caller code distinguish exceptions from bundle2
381 # processing from processing the old format. This is mostly
381 # processing from processing the old format. This is mostly
382 # needed to handle different return codes to unbundle according to the
382 # needed to handle different return codes to unbundle according to the
383 # type of bundle. We should probably clean up or drop this return code
383 # type of bundle. We should probably clean up or drop this return code
384 # craziness in a future version.
384 # craziness in a future version.
385 exc.duringunbundle2 = True
385 exc.duringunbundle2 = True
386 salvaged = []
386 salvaged = []
387 replycaps = None
387 replycaps = None
388 if op.reply is not None:
388 if op.reply is not None:
389 salvaged = op.reply.salvageoutput()
389 salvaged = op.reply.salvageoutput()
390 replycaps = op.reply.capabilities
390 replycaps = op.reply.capabilities
391 exc._replycaps = replycaps
391 exc._replycaps = replycaps
392 exc._bundle2salvagedoutput = salvaged
392 exc._bundle2salvagedoutput = salvaged
393
393
394 # Re-raising from a variable loses the original stack. So only use
394 # Re-raising from a variable loses the original stack. So only use
395 # that form if we need to.
395 # that form if we need to.
396 if seekerror:
396 if seekerror:
397 raise exc
397 raise exc
398 else:
398 else:
399 raise
399 raise
400 finally:
400 finally:
401 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
401 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
402
402
403 return op
403 return op
404
404
405 def _processchangegroup(op, cg, tr, source, url, **kwargs):
405 def _processchangegroup(op, cg, tr, source, url, **kwargs):
406 ret, addednodes = cg.apply(op.repo, tr, source, url, **kwargs)
406 ret, addednodes = cg.apply(op.repo, tr, source, url, **kwargs)
407 op.records.add('changegroup', {
407 op.records.add('changegroup', {
408 'return': ret,
408 'return': ret,
409 'addednodes': addednodes,
409 'addednodes': addednodes,
410 })
410 })
411 return ret
411 return ret
412
412
413 def _processpart(op, part):
413 def _processpart(op, part):
414 """process a single part from a bundle
414 """process a single part from a bundle
415
415
416 The part is guaranteed to have been fully consumed when the function exits
416 The part is guaranteed to have been fully consumed when the function exits
417 (even if an exception is raised)."""
417 (even if an exception is raised)."""
418 status = 'unknown' # used by debug output
418 status = 'unknown' # used by debug output
419 hardabort = False
419 hardabort = False
420 try:
420 try:
421 try:
421 try:
422 handler = parthandlermapping.get(part.type)
422 handler = parthandlermapping.get(part.type)
423 if handler is None:
423 if handler is None:
424 status = 'unsupported-type'
424 status = 'unsupported-type'
425 raise error.BundleUnknownFeatureError(parttype=part.type)
425 raise error.BundleUnknownFeatureError(parttype=part.type)
426 indebug(op.ui, 'found a handler for part %r' % part.type)
426 indebug(op.ui, 'found a handler for part %r' % part.type)
427 unknownparams = part.mandatorykeys - handler.params
427 unknownparams = part.mandatorykeys - handler.params
428 if unknownparams:
428 if unknownparams:
429 unknownparams = list(unknownparams)
429 unknownparams = list(unknownparams)
430 unknownparams.sort()
430 unknownparams.sort()
431 status = 'unsupported-params (%s)' % unknownparams
431 status = 'unsupported-params (%s)' % unknownparams
432 raise error.BundleUnknownFeatureError(parttype=part.type,
432 raise error.BundleUnknownFeatureError(parttype=part.type,
433 params=unknownparams)
433 params=unknownparams)
434 status = 'supported'
434 status = 'supported'
435 except error.BundleUnknownFeatureError as exc:
435 except error.BundleUnknownFeatureError as exc:
436 if part.mandatory: # mandatory parts
436 if part.mandatory: # mandatory parts
437 raise
437 raise
438 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
438 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
439 return # skip to part processing
439 return # skip to part processing
440 finally:
440 finally:
441 if op.ui.debugflag:
441 if op.ui.debugflag:
442 msg = ['bundle2-input-part: "%s"' % part.type]
442 msg = ['bundle2-input-part: "%s"' % part.type]
443 if not part.mandatory:
443 if not part.mandatory:
444 msg.append(' (advisory)')
444 msg.append(' (advisory)')
445 nbmp = len(part.mandatorykeys)
445 nbmp = len(part.mandatorykeys)
446 nbap = len(part.params) - nbmp
446 nbap = len(part.params) - nbmp
447 if nbmp or nbap:
447 if nbmp or nbap:
448 msg.append(' (params:')
448 msg.append(' (params:')
449 if nbmp:
449 if nbmp:
450 msg.append(' %i mandatory' % nbmp)
450 msg.append(' %i mandatory' % nbmp)
451 if nbap:
451 if nbap:
452 msg.append(' %i advisory' % nbmp)
452 msg.append(' %i advisory' % nbmp)
453 msg.append(')')
453 msg.append(')')
454 msg.append(' %s\n' % status)
454 msg.append(' %s\n' % status)
455 op.ui.debug(''.join(msg))
455 op.ui.debug(''.join(msg))
456
456
457 # handler is called outside the above try block so that we don't
457 # handler is called outside the above try block so that we don't
458 # risk catching KeyErrors from anything other than the
458 # risk catching KeyErrors from anything other than the
459 # parthandlermapping lookup (any KeyError raised by handler()
459 # parthandlermapping lookup (any KeyError raised by handler()
460 # itself represents a defect of a different variety).
460 # itself represents a defect of a different variety).
461 output = None
461 output = None
462 if op.captureoutput and op.reply is not None:
462 if op.captureoutput and op.reply is not None:
463 op.ui.pushbuffer(error=True, subproc=True)
463 op.ui.pushbuffer(error=True, subproc=True)
464 output = ''
464 output = ''
465 try:
465 try:
466 handler(op, part)
466 handler(op, part)
467 finally:
467 finally:
468 if output is not None:
468 if output is not None:
469 output = op.ui.popbuffer()
469 output = op.ui.popbuffer()
470 if output:
470 if output:
471 outpart = op.reply.newpart('output', data=output,
471 outpart = op.reply.newpart('output', data=output,
472 mandatory=False)
472 mandatory=False)
473 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
473 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
474 # If exiting or interrupted, do not attempt to seek the stream in the
474 # If exiting or interrupted, do not attempt to seek the stream in the
475 # finally block below. This makes abort faster.
475 # finally block below. This makes abort faster.
476 except (SystemExit, KeyboardInterrupt):
476 except (SystemExit, KeyboardInterrupt):
477 hardabort = True
477 hardabort = True
478 raise
478 raise
479 finally:
479 finally:
480 # consume the part content to not corrupt the stream.
480 # consume the part content to not corrupt the stream.
481 if not hardabort:
481 if not hardabort:
482 part.seek(0, 2)
482 part.seek(0, 2)
483
483
484
484
485 def decodecaps(blob):
485 def decodecaps(blob):
486 """decode a bundle2 caps bytes blob into a dictionary
486 """decode a bundle2 caps bytes blob into a dictionary
487
487
488 The blob is a list of capabilities (one per line)
488 The blob is a list of capabilities (one per line)
489 Capabilities may have values using a line of the form::
489 Capabilities may have values using a line of the form::
490
490
491 capability=value1,value2,value3
491 capability=value1,value2,value3
492
492
493 The values are always a list."""
493 The values are always a list."""
494 caps = {}
494 caps = {}
495 for line in blob.splitlines():
495 for line in blob.splitlines():
496 if not line:
496 if not line:
497 continue
497 continue
498 if '=' not in line:
498 if '=' not in line:
499 key, vals = line, ()
499 key, vals = line, ()
500 else:
500 else:
501 key, vals = line.split('=', 1)
501 key, vals = line.split('=', 1)
502 vals = vals.split(',')
502 vals = vals.split(',')
503 key = urlreq.unquote(key)
503 key = urlreq.unquote(key)
504 vals = [urlreq.unquote(v) for v in vals]
504 vals = [urlreq.unquote(v) for v in vals]
505 caps[key] = vals
505 caps[key] = vals
506 return caps
506 return caps
507
507
508 def encodecaps(caps):
508 def encodecaps(caps):
509 """encode a bundle2 caps dictionary into a bytes blob"""
509 """encode a bundle2 caps dictionary into a bytes blob"""
510 chunks = []
510 chunks = []
511 for ca in sorted(caps):
511 for ca in sorted(caps):
512 vals = caps[ca]
512 vals = caps[ca]
513 ca = urlreq.quote(ca)
513 ca = urlreq.quote(ca)
514 vals = [urlreq.quote(v) for v in vals]
514 vals = [urlreq.quote(v) for v in vals]
515 if vals:
515 if vals:
516 ca = "%s=%s" % (ca, ','.join(vals))
516 ca = "%s=%s" % (ca, ','.join(vals))
517 chunks.append(ca)
517 chunks.append(ca)
518 return '\n'.join(chunks)
518 return '\n'.join(chunks)
519
519
520 bundletypes = {
520 bundletypes = {
521 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
521 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
522 # since the unification ssh accepts a header but there
522 # since the unification ssh accepts a header but there
523 # is no capability signaling it.
523 # is no capability signaling it.
524 "HG20": (), # special-cased below
524 "HG20": (), # special-cased below
525 "HG10UN": ("HG10UN", 'UN'),
525 "HG10UN": ("HG10UN", 'UN'),
526 "HG10BZ": ("HG10", 'BZ'),
526 "HG10BZ": ("HG10", 'BZ'),
527 "HG10GZ": ("HG10GZ", 'GZ'),
527 "HG10GZ": ("HG10GZ", 'GZ'),
528 }
528 }
529
529
530 # hgweb uses this list to communicate its preferred type
530 # hgweb uses this list to communicate its preferred type
531 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
531 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
532
532
533 class bundle20(object):
533 class bundle20(object):
534 """represent an outgoing bundle2 container
534 """represent an outgoing bundle2 container
535
535
536 Use the `addparam` method to add stream level parameter. and `newpart` to
536 Use the `addparam` method to add stream level parameter. and `newpart` to
537 populate it. Then call `getchunks` to retrieve all the binary chunks of
537 populate it. Then call `getchunks` to retrieve all the binary chunks of
538 data that compose the bundle2 container."""
538 data that compose the bundle2 container."""
539
539
540 _magicstring = 'HG20'
540 _magicstring = 'HG20'
541
541
542 def __init__(self, ui, capabilities=()):
542 def __init__(self, ui, capabilities=()):
543 self.ui = ui
543 self.ui = ui
544 self._params = []
544 self._params = []
545 self._parts = []
545 self._parts = []
546 self.capabilities = dict(capabilities)
546 self.capabilities = dict(capabilities)
547 self._compengine = util.compengines.forbundletype('UN')
547 self._compengine = util.compengines.forbundletype('UN')
548 self._compopts = None
548 self._compopts = None
549
549
550 def setcompression(self, alg, compopts=None):
550 def setcompression(self, alg, compopts=None):
551 """setup core part compression to <alg>"""
551 """setup core part compression to <alg>"""
552 if alg in (None, 'UN'):
552 if alg in (None, 'UN'):
553 return
553 return
554 assert not any(n.lower() == 'compression' for n, v in self._params)
554 assert not any(n.lower() == 'compression' for n, v in self._params)
555 self.addparam('Compression', alg)
555 self.addparam('Compression', alg)
556 self._compengine = util.compengines.forbundletype(alg)
556 self._compengine = util.compengines.forbundletype(alg)
557 self._compopts = compopts
557 self._compopts = compopts
558
558
559 @property
559 @property
560 def nbparts(self):
560 def nbparts(self):
561 """total number of parts added to the bundler"""
561 """total number of parts added to the bundler"""
562 return len(self._parts)
562 return len(self._parts)
563
563
564 # methods used to defines the bundle2 content
564 # methods used to defines the bundle2 content
565 def addparam(self, name, value=None):
565 def addparam(self, name, value=None):
566 """add a stream level parameter"""
566 """add a stream level parameter"""
567 if not name:
567 if not name:
568 raise ValueError('empty parameter name')
568 raise ValueError('empty parameter name')
569 if name[0] not in string.letters:
569 if name[0] not in string.letters:
570 raise ValueError('non letter first character: %r' % name)
570 raise ValueError('non letter first character: %r' % name)
571 self._params.append((name, value))
571 self._params.append((name, value))
572
572
573 def addpart(self, part):
573 def addpart(self, part):
574 """add a new part to the bundle2 container
574 """add a new part to the bundle2 container
575
575
576 Parts contains the actual applicative payload."""
576 Parts contains the actual applicative payload."""
577 assert part.id is None
577 assert part.id is None
578 part.id = len(self._parts) # very cheap counter
578 part.id = len(self._parts) # very cheap counter
579 self._parts.append(part)
579 self._parts.append(part)
580
580
581 def newpart(self, typeid, *args, **kwargs):
581 def newpart(self, typeid, *args, **kwargs):
582 """create a new part and add it to the containers
582 """create a new part and add it to the containers
583
583
584 As the part is directly added to the containers. For now, this means
584 As the part is directly added to the containers. For now, this means
585 that any failure to properly initialize the part after calling
585 that any failure to properly initialize the part after calling
586 ``newpart`` should result in a failure of the whole bundling process.
586 ``newpart`` should result in a failure of the whole bundling process.
587
587
588 You can still fall back to manually create and add if you need better
588 You can still fall back to manually create and add if you need better
589 control."""
589 control."""
590 part = bundlepart(typeid, *args, **kwargs)
590 part = bundlepart(typeid, *args, **kwargs)
591 self.addpart(part)
591 self.addpart(part)
592 return part
592 return part
593
593
594 # methods used to generate the bundle2 stream
594 # methods used to generate the bundle2 stream
595 def getchunks(self):
595 def getchunks(self):
596 if self.ui.debugflag:
596 if self.ui.debugflag:
597 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
597 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
598 if self._params:
598 if self._params:
599 msg.append(' (%i params)' % len(self._params))
599 msg.append(' (%i params)' % len(self._params))
600 msg.append(' %i parts total\n' % len(self._parts))
600 msg.append(' %i parts total\n' % len(self._parts))
601 self.ui.debug(''.join(msg))
601 self.ui.debug(''.join(msg))
602 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
602 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
603 yield self._magicstring
603 yield self._magicstring
604 param = self._paramchunk()
604 param = self._paramchunk()
605 outdebug(self.ui, 'bundle parameter: %s' % param)
605 outdebug(self.ui, 'bundle parameter: %s' % param)
606 yield _pack(_fstreamparamsize, len(param))
606 yield _pack(_fstreamparamsize, len(param))
607 if param:
607 if param:
608 yield param
608 yield param
609 for chunk in self._compengine.compressstream(self._getcorechunk(),
609 for chunk in self._compengine.compressstream(self._getcorechunk(),
610 self._compopts):
610 self._compopts):
611 yield chunk
611 yield chunk
612
612
613 def _paramchunk(self):
613 def _paramchunk(self):
614 """return a encoded version of all stream parameters"""
614 """return a encoded version of all stream parameters"""
615 blocks = []
615 blocks = []
616 for par, value in self._params:
616 for par, value in self._params:
617 par = urlreq.quote(par)
617 par = urlreq.quote(par)
618 if value is not None:
618 if value is not None:
619 value = urlreq.quote(value)
619 value = urlreq.quote(value)
620 par = '%s=%s' % (par, value)
620 par = '%s=%s' % (par, value)
621 blocks.append(par)
621 blocks.append(par)
622 return ' '.join(blocks)
622 return ' '.join(blocks)
623
623
624 def _getcorechunk(self):
624 def _getcorechunk(self):
625 """yield chunk for the core part of the bundle
625 """yield chunk for the core part of the bundle
626
626
627 (all but headers and parameters)"""
627 (all but headers and parameters)"""
628 outdebug(self.ui, 'start of parts')
628 outdebug(self.ui, 'start of parts')
629 for part in self._parts:
629 for part in self._parts:
630 outdebug(self.ui, 'bundle part: "%s"' % part.type)
630 outdebug(self.ui, 'bundle part: "%s"' % part.type)
631 for chunk in part.getchunks(ui=self.ui):
631 for chunk in part.getchunks(ui=self.ui):
632 yield chunk
632 yield chunk
633 outdebug(self.ui, 'end of bundle')
633 outdebug(self.ui, 'end of bundle')
634 yield _pack(_fpartheadersize, 0)
634 yield _pack(_fpartheadersize, 0)
635
635
636
636
637 def salvageoutput(self):
637 def salvageoutput(self):
638 """return a list with a copy of all output parts in the bundle
638 """return a list with a copy of all output parts in the bundle
639
639
640 This is meant to be used during error handling to make sure we preserve
640 This is meant to be used during error handling to make sure we preserve
641 server output"""
641 server output"""
642 salvaged = []
642 salvaged = []
643 for part in self._parts:
643 for part in self._parts:
644 if part.type.startswith('output'):
644 if part.type.startswith('output'):
645 salvaged.append(part.copy())
645 salvaged.append(part.copy())
646 return salvaged
646 return salvaged
647
647
648
648
649 class unpackermixin(object):
649 class unpackermixin(object):
650 """A mixin to extract bytes and struct data from a stream"""
650 """A mixin to extract bytes and struct data from a stream"""
651
651
652 def __init__(self, fp):
652 def __init__(self, fp):
653 self._fp = fp
653 self._fp = fp
654
654
655 def _unpack(self, format):
655 def _unpack(self, format):
656 """unpack this struct format from the stream
656 """unpack this struct format from the stream
657
657
658 This method is meant for internal usage by the bundle2 protocol only.
658 This method is meant for internal usage by the bundle2 protocol only.
659 They directly manipulate the low level stream including bundle2 level
659 They directly manipulate the low level stream including bundle2 level
660 instruction.
660 instruction.
661
661
662 Do not use it to implement higher-level logic or methods."""
662 Do not use it to implement higher-level logic or methods."""
663 data = self._readexact(struct.calcsize(format))
663 data = self._readexact(struct.calcsize(format))
664 return _unpack(format, data)
664 return _unpack(format, data)
665
665
666 def _readexact(self, size):
666 def _readexact(self, size):
667 """read exactly <size> bytes from the stream
667 """read exactly <size> bytes from the stream
668
668
669 This method is meant for internal usage by the bundle2 protocol only.
669 This method is meant for internal usage by the bundle2 protocol only.
670 They directly manipulate the low level stream including bundle2 level
670 They directly manipulate the low level stream including bundle2 level
671 instruction.
671 instruction.
672
672
673 Do not use it to implement higher-level logic or methods."""
673 Do not use it to implement higher-level logic or methods."""
674 return changegroup.readexactly(self._fp, size)
674 return changegroup.readexactly(self._fp, size)
675
675
676 def getunbundler(ui, fp, magicstring=None):
676 def getunbundler(ui, fp, magicstring=None):
677 """return a valid unbundler object for a given magicstring"""
677 """return a valid unbundler object for a given magicstring"""
678 if magicstring is None:
678 if magicstring is None:
679 magicstring = changegroup.readexactly(fp, 4)
679 magicstring = changegroup.readexactly(fp, 4)
680 magic, version = magicstring[0:2], magicstring[2:4]
680 magic, version = magicstring[0:2], magicstring[2:4]
681 if magic != 'HG':
681 if magic != 'HG':
682 ui.debug(
682 ui.debug(
683 "error: invalid magic: %r (version %r), should be 'HG'\n"
683 "error: invalid magic: %r (version %r), should be 'HG'\n"
684 % (magic, version))
684 % (magic, version))
685 raise error.Abort(_('not a Mercurial bundle'))
685 raise error.Abort(_('not a Mercurial bundle'))
686 unbundlerclass = formatmap.get(version)
686 unbundlerclass = formatmap.get(version)
687 if unbundlerclass is None:
687 if unbundlerclass is None:
688 raise error.Abort(_('unknown bundle version %s') % version)
688 raise error.Abort(_('unknown bundle version %s') % version)
689 unbundler = unbundlerclass(ui, fp)
689 unbundler = unbundlerclass(ui, fp)
690 indebug(ui, 'start processing of %s stream' % magicstring)
690 indebug(ui, 'start processing of %s stream' % magicstring)
691 return unbundler
691 return unbundler
692
692
693 class unbundle20(unpackermixin):
693 class unbundle20(unpackermixin):
694 """interpret a bundle2 stream
694 """interpret a bundle2 stream
695
695
696 This class is fed with a binary stream and yields parts through its
696 This class is fed with a binary stream and yields parts through its
697 `iterparts` methods."""
697 `iterparts` methods."""
698
698
699 _magicstring = 'HG20'
699 _magicstring = 'HG20'
700
700
701 def __init__(self, ui, fp):
701 def __init__(self, ui, fp):
702 """If header is specified, we do not read it out of the stream."""
702 """If header is specified, we do not read it out of the stream."""
703 self.ui = ui
703 self.ui = ui
704 self._compengine = util.compengines.forbundletype('UN')
704 self._compengine = util.compengines.forbundletype('UN')
705 self._compressed = None
705 self._compressed = None
706 super(unbundle20, self).__init__(fp)
706 super(unbundle20, self).__init__(fp)
707
707
708 @util.propertycache
708 @util.propertycache
709 def params(self):
709 def params(self):
710 """dictionary of stream level parameters"""
710 """dictionary of stream level parameters"""
711 indebug(self.ui, 'reading bundle2 stream parameters')
711 indebug(self.ui, 'reading bundle2 stream parameters')
712 params = {}
712 params = {}
713 paramssize = self._unpack(_fstreamparamsize)[0]
713 paramssize = self._unpack(_fstreamparamsize)[0]
714 if paramssize < 0:
714 if paramssize < 0:
715 raise error.BundleValueError('negative bundle param size: %i'
715 raise error.BundleValueError('negative bundle param size: %i'
716 % paramssize)
716 % paramssize)
717 if paramssize:
717 if paramssize:
718 params = self._readexact(paramssize)
718 params = self._readexact(paramssize)
719 params = self._processallparams(params)
719 params = self._processallparams(params)
720 return params
720 return params
721
721
722 def _processallparams(self, paramsblock):
722 def _processallparams(self, paramsblock):
723 """"""
723 """"""
724 params = util.sortdict()
724 params = util.sortdict()
725 for p in paramsblock.split(' '):
725 for p in paramsblock.split(' '):
726 p = p.split('=', 1)
726 p = p.split('=', 1)
727 p = [urlreq.unquote(i) for i in p]
727 p = [urlreq.unquote(i) for i in p]
728 if len(p) < 2:
728 if len(p) < 2:
729 p.append(None)
729 p.append(None)
730 self._processparam(*p)
730 self._processparam(*p)
731 params[p[0]] = p[1]
731 params[p[0]] = p[1]
732 return params
732 return params
733
733
734
734
735 def _processparam(self, name, value):
735 def _processparam(self, name, value):
736 """process a parameter, applying its effect if needed
736 """process a parameter, applying its effect if needed
737
737
738 Parameter starting with a lower case letter are advisory and will be
738 Parameter starting with a lower case letter are advisory and will be
739 ignored when unknown. Those starting with an upper case letter are
739 ignored when unknown. Those starting with an upper case letter are
740 mandatory and will this function will raise a KeyError when unknown.
740 mandatory and will this function will raise a KeyError when unknown.
741
741
742 Note: no option are currently supported. Any input will be either
742 Note: no option are currently supported. Any input will be either
743 ignored or failing.
743 ignored or failing.
744 """
744 """
745 if not name:
745 if not name:
746 raise ValueError('empty parameter name')
746 raise ValueError('empty parameter name')
747 if name[0] not in string.letters:
747 if name[0] not in string.letters:
748 raise ValueError('non letter first character: %r' % name)
748 raise ValueError('non letter first character: %r' % name)
749 try:
749 try:
750 handler = b2streamparamsmap[name.lower()]
750 handler = b2streamparamsmap[name.lower()]
751 except KeyError:
751 except KeyError:
752 if name[0].islower():
752 if name[0].islower():
753 indebug(self.ui, "ignoring unknown parameter %r" % name)
753 indebug(self.ui, "ignoring unknown parameter %r" % name)
754 else:
754 else:
755 raise error.BundleUnknownFeatureError(params=(name,))
755 raise error.BundleUnknownFeatureError(params=(name,))
756 else:
756 else:
757 handler(self, name, value)
757 handler(self, name, value)
758
758
759 def _forwardchunks(self):
759 def _forwardchunks(self):
760 """utility to transfer a bundle2 as binary
760 """utility to transfer a bundle2 as binary
761
761
762 This is made necessary by the fact the 'getbundle' command over 'ssh'
762 This is made necessary by the fact the 'getbundle' command over 'ssh'
763 have no way to know then the reply end, relying on the bundle to be
763 have no way to know then the reply end, relying on the bundle to be
764 interpreted to know its end. This is terrible and we are sorry, but we
764 interpreted to know its end. This is terrible and we are sorry, but we
765 needed to move forward to get general delta enabled.
765 needed to move forward to get general delta enabled.
766 """
766 """
767 yield self._magicstring
767 yield self._magicstring
768 assert 'params' not in vars(self)
768 assert 'params' not in vars(self)
769 paramssize = self._unpack(_fstreamparamsize)[0]
769 paramssize = self._unpack(_fstreamparamsize)[0]
770 if paramssize < 0:
770 if paramssize < 0:
771 raise error.BundleValueError('negative bundle param size: %i'
771 raise error.BundleValueError('negative bundle param size: %i'
772 % paramssize)
772 % paramssize)
773 yield _pack(_fstreamparamsize, paramssize)
773 yield _pack(_fstreamparamsize, paramssize)
774 if paramssize:
774 if paramssize:
775 params = self._readexact(paramssize)
775 params = self._readexact(paramssize)
776 self._processallparams(params)
776 self._processallparams(params)
777 yield params
777 yield params
778 assert self._compengine.bundletype == 'UN'
778 assert self._compengine.bundletype == 'UN'
779 # From there, payload might need to be decompressed
779 # From there, payload might need to be decompressed
780 self._fp = self._compengine.decompressorreader(self._fp)
780 self._fp = self._compengine.decompressorreader(self._fp)
781 emptycount = 0
781 emptycount = 0
782 while emptycount < 2:
782 while emptycount < 2:
783 # so we can brainlessly loop
783 # so we can brainlessly loop
784 assert _fpartheadersize == _fpayloadsize
784 assert _fpartheadersize == _fpayloadsize
785 size = self._unpack(_fpartheadersize)[0]
785 size = self._unpack(_fpartheadersize)[0]
786 yield _pack(_fpartheadersize, size)
786 yield _pack(_fpartheadersize, size)
787 if size:
787 if size:
788 emptycount = 0
788 emptycount = 0
789 else:
789 else:
790 emptycount += 1
790 emptycount += 1
791 continue
791 continue
792 if size == flaginterrupt:
792 if size == flaginterrupt:
793 continue
793 continue
794 elif size < 0:
794 elif size < 0:
795 raise error.BundleValueError('negative chunk size: %i')
795 raise error.BundleValueError('negative chunk size: %i')
796 yield self._readexact(size)
796 yield self._readexact(size)
797
797
798
798
799 def iterparts(self):
799 def iterparts(self):
800 """yield all parts contained in the stream"""
800 """yield all parts contained in the stream"""
801 # make sure param have been loaded
801 # make sure param have been loaded
802 self.params
802 self.params
803 # From there, payload need to be decompressed
803 # From there, payload need to be decompressed
804 self._fp = self._compengine.decompressorreader(self._fp)
804 self._fp = self._compengine.decompressorreader(self._fp)
805 indebug(self.ui, 'start extraction of bundle2 parts')
805 indebug(self.ui, 'start extraction of bundle2 parts')
806 headerblock = self._readpartheader()
806 headerblock = self._readpartheader()
807 while headerblock is not None:
807 while headerblock is not None:
808 part = unbundlepart(self.ui, headerblock, self._fp)
808 part = unbundlepart(self.ui, headerblock, self._fp)
809 yield part
809 yield part
810 part.seek(0, 2)
810 part.seek(0, 2)
811 headerblock = self._readpartheader()
811 headerblock = self._readpartheader()
812 indebug(self.ui, 'end of bundle2 stream')
812 indebug(self.ui, 'end of bundle2 stream')
813
813
814 def _readpartheader(self):
814 def _readpartheader(self):
815 """reads a part header size and return the bytes blob
815 """reads a part header size and return the bytes blob
816
816
817 returns None if empty"""
817 returns None if empty"""
818 headersize = self._unpack(_fpartheadersize)[0]
818 headersize = self._unpack(_fpartheadersize)[0]
819 if headersize < 0:
819 if headersize < 0:
820 raise error.BundleValueError('negative part header size: %i'
820 raise error.BundleValueError('negative part header size: %i'
821 % headersize)
821 % headersize)
822 indebug(self.ui, 'part header size: %i' % headersize)
822 indebug(self.ui, 'part header size: %i' % headersize)
823 if headersize:
823 if headersize:
824 return self._readexact(headersize)
824 return self._readexact(headersize)
825 return None
825 return None
826
826
827 def compressed(self):
827 def compressed(self):
828 self.params # load params
828 self.params # load params
829 return self._compressed
829 return self._compressed
830
830
831 def close(self):
831 def close(self):
832 """close underlying file"""
832 """close underlying file"""
833 if util.safehasattr(self._fp, 'close'):
833 if util.safehasattr(self._fp, 'close'):
834 return self._fp.close()
834 return self._fp.close()
835
835
836 formatmap = {'20': unbundle20}
836 formatmap = {'20': unbundle20}
837
837
838 b2streamparamsmap = {}
838 b2streamparamsmap = {}
839
839
840 def b2streamparamhandler(name):
840 def b2streamparamhandler(name):
841 """register a handler for a stream level parameter"""
841 """register a handler for a stream level parameter"""
842 def decorator(func):
842 def decorator(func):
843 assert name not in formatmap
843 assert name not in formatmap
844 b2streamparamsmap[name] = func
844 b2streamparamsmap[name] = func
845 return func
845 return func
846 return decorator
846 return decorator
847
847
848 @b2streamparamhandler('compression')
848 @b2streamparamhandler('compression')
849 def processcompression(unbundler, param, value):
849 def processcompression(unbundler, param, value):
850 """read compression parameter and install payload decompression"""
850 """read compression parameter and install payload decompression"""
851 if value not in util.compengines.supportedbundletypes:
851 if value not in util.compengines.supportedbundletypes:
852 raise error.BundleUnknownFeatureError(params=(param,),
852 raise error.BundleUnknownFeatureError(params=(param,),
853 values=(value,))
853 values=(value,))
854 unbundler._compengine = util.compengines.forbundletype(value)
854 unbundler._compengine = util.compengines.forbundletype(value)
855 if value is not None:
855 if value is not None:
856 unbundler._compressed = True
856 unbundler._compressed = True
857
857
858 class bundlepart(object):
858 class bundlepart(object):
859 """A bundle2 part contains application level payload
859 """A bundle2 part contains application level payload
860
860
861 The part `type` is used to route the part to the application level
861 The part `type` is used to route the part to the application level
862 handler.
862 handler.
863
863
864 The part payload is contained in ``part.data``. It could be raw bytes or a
864 The part payload is contained in ``part.data``. It could be raw bytes or a
865 generator of byte chunks.
865 generator of byte chunks.
866
866
867 You can add parameters to the part using the ``addparam`` method.
867 You can add parameters to the part using the ``addparam`` method.
868 Parameters can be either mandatory (default) or advisory. Remote side
868 Parameters can be either mandatory (default) or advisory. Remote side
869 should be able to safely ignore the advisory ones.
869 should be able to safely ignore the advisory ones.
870
870
871 Both data and parameters cannot be modified after the generation has begun.
871 Both data and parameters cannot be modified after the generation has begun.
872 """
872 """
873
873
874 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
874 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
875 data='', mandatory=True):
875 data='', mandatory=True):
876 validateparttype(parttype)
876 validateparttype(parttype)
877 self.id = None
877 self.id = None
878 self.type = parttype
878 self.type = parttype
879 self._data = data
879 self._data = data
880 self._mandatoryparams = list(mandatoryparams)
880 self._mandatoryparams = list(mandatoryparams)
881 self._advisoryparams = list(advisoryparams)
881 self._advisoryparams = list(advisoryparams)
882 # checking for duplicated entries
882 # checking for duplicated entries
883 self._seenparams = set()
883 self._seenparams = set()
884 for pname, __ in self._mandatoryparams + self._advisoryparams:
884 for pname, __ in self._mandatoryparams + self._advisoryparams:
885 if pname in self._seenparams:
885 if pname in self._seenparams:
886 raise error.ProgrammingError('duplicated params: %s' % pname)
886 raise error.ProgrammingError('duplicated params: %s' % pname)
887 self._seenparams.add(pname)
887 self._seenparams.add(pname)
888 # status of the part's generation:
888 # status of the part's generation:
889 # - None: not started,
889 # - None: not started,
890 # - False: currently generated,
890 # - False: currently generated,
891 # - True: generation done.
891 # - True: generation done.
892 self._generated = None
892 self._generated = None
893 self.mandatory = mandatory
893 self.mandatory = mandatory
894
894
895 def __repr__(self):
895 def __repr__(self):
896 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
896 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
897 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
897 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
898 % (cls, id(self), self.id, self.type, self.mandatory))
898 % (cls, id(self), self.id, self.type, self.mandatory))
899
899
900 def copy(self):
900 def copy(self):
901 """return a copy of the part
901 """return a copy of the part
902
902
903 The new part have the very same content but no partid assigned yet.
903 The new part have the very same content but no partid assigned yet.
904 Parts with generated data cannot be copied."""
904 Parts with generated data cannot be copied."""
905 assert not util.safehasattr(self.data, 'next')
905 assert not util.safehasattr(self.data, 'next')
906 return self.__class__(self.type, self._mandatoryparams,
906 return self.__class__(self.type, self._mandatoryparams,
907 self._advisoryparams, self._data, self.mandatory)
907 self._advisoryparams, self._data, self.mandatory)
908
908
909 # methods used to defines the part content
909 # methods used to defines the part content
910 @property
910 @property
911 def data(self):
911 def data(self):
912 return self._data
912 return self._data
913
913
914 @data.setter
914 @data.setter
915 def data(self, data):
915 def data(self, data):
916 if self._generated is not None:
916 if self._generated is not None:
917 raise error.ReadOnlyPartError('part is being generated')
917 raise error.ReadOnlyPartError('part is being generated')
918 self._data = data
918 self._data = data
919
919
920 @property
920 @property
921 def mandatoryparams(self):
921 def mandatoryparams(self):
922 # make it an immutable tuple to force people through ``addparam``
922 # make it an immutable tuple to force people through ``addparam``
923 return tuple(self._mandatoryparams)
923 return tuple(self._mandatoryparams)
924
924
925 @property
925 @property
926 def advisoryparams(self):
926 def advisoryparams(self):
927 # make it an immutable tuple to force people through ``addparam``
927 # make it an immutable tuple to force people through ``addparam``
928 return tuple(self._advisoryparams)
928 return tuple(self._advisoryparams)
929
929
930 def addparam(self, name, value='', mandatory=True):
930 def addparam(self, name, value='', mandatory=True):
931 """add a parameter to the part
931 """add a parameter to the part
932
932
933 If 'mandatory' is set to True, the remote handler must claim support
933 If 'mandatory' is set to True, the remote handler must claim support
934 for this parameter or the unbundling will be aborted.
934 for this parameter or the unbundling will be aborted.
935
935
936 The 'name' and 'value' cannot exceed 255 bytes each.
936 The 'name' and 'value' cannot exceed 255 bytes each.
937 """
937 """
938 if self._generated is not None:
938 if self._generated is not None:
939 raise error.ReadOnlyPartError('part is being generated')
939 raise error.ReadOnlyPartError('part is being generated')
940 if name in self._seenparams:
940 if name in self._seenparams:
941 raise ValueError('duplicated params: %s' % name)
941 raise ValueError('duplicated params: %s' % name)
942 self._seenparams.add(name)
942 self._seenparams.add(name)
943 params = self._advisoryparams
943 params = self._advisoryparams
944 if mandatory:
944 if mandatory:
945 params = self._mandatoryparams
945 params = self._mandatoryparams
946 params.append((name, value))
946 params.append((name, value))
947
947
948 # methods used to generates the bundle2 stream
948 # methods used to generates the bundle2 stream
949 def getchunks(self, ui):
949 def getchunks(self, ui):
950 if self._generated is not None:
950 if self._generated is not None:
951 raise error.ProgrammingError('part can only be consumed once')
951 raise error.ProgrammingError('part can only be consumed once')
952 self._generated = False
952 self._generated = False
953
953
954 if ui.debugflag:
954 if ui.debugflag:
955 msg = ['bundle2-output-part: "%s"' % self.type]
955 msg = ['bundle2-output-part: "%s"' % self.type]
956 if not self.mandatory:
956 if not self.mandatory:
957 msg.append(' (advisory)')
957 msg.append(' (advisory)')
958 nbmp = len(self.mandatoryparams)
958 nbmp = len(self.mandatoryparams)
959 nbap = len(self.advisoryparams)
959 nbap = len(self.advisoryparams)
960 if nbmp or nbap:
960 if nbmp or nbap:
961 msg.append(' (params:')
961 msg.append(' (params:')
962 if nbmp:
962 if nbmp:
963 msg.append(' %i mandatory' % nbmp)
963 msg.append(' %i mandatory' % nbmp)
964 if nbap:
964 if nbap:
965 msg.append(' %i advisory' % nbmp)
965 msg.append(' %i advisory' % nbmp)
966 msg.append(')')
966 msg.append(')')
967 if not self.data:
967 if not self.data:
968 msg.append(' empty payload')
968 msg.append(' empty payload')
969 elif util.safehasattr(self.data, 'next'):
969 elif util.safehasattr(self.data, 'next'):
970 msg.append(' streamed payload')
970 msg.append(' streamed payload')
971 else:
971 else:
972 msg.append(' %i bytes payload' % len(self.data))
972 msg.append(' %i bytes payload' % len(self.data))
973 msg.append('\n')
973 msg.append('\n')
974 ui.debug(''.join(msg))
974 ui.debug(''.join(msg))
975
975
976 #### header
976 #### header
977 if self.mandatory:
977 if self.mandatory:
978 parttype = self.type.upper()
978 parttype = self.type.upper()
979 else:
979 else:
980 parttype = self.type.lower()
980 parttype = self.type.lower()
981 outdebug(ui, 'part %s: "%s"' % (self.id, parttype))
981 outdebug(ui, 'part %s: "%s"' % (self.id, parttype))
982 ## parttype
982 ## parttype
983 header = [_pack(_fparttypesize, len(parttype)),
983 header = [_pack(_fparttypesize, len(parttype)),
984 parttype, _pack(_fpartid, self.id),
984 parttype, _pack(_fpartid, self.id),
985 ]
985 ]
986 ## parameters
986 ## parameters
987 # count
987 # count
988 manpar = self.mandatoryparams
988 manpar = self.mandatoryparams
989 advpar = self.advisoryparams
989 advpar = self.advisoryparams
990 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
990 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
991 # size
991 # size
992 parsizes = []
992 parsizes = []
993 for key, value in manpar:
993 for key, value in manpar:
994 parsizes.append(len(key))
994 parsizes.append(len(key))
995 parsizes.append(len(value))
995 parsizes.append(len(value))
996 for key, value in advpar:
996 for key, value in advpar:
997 parsizes.append(len(key))
997 parsizes.append(len(key))
998 parsizes.append(len(value))
998 parsizes.append(len(value))
999 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
999 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
1000 header.append(paramsizes)
1000 header.append(paramsizes)
1001 # key, value
1001 # key, value
1002 for key, value in manpar:
1002 for key, value in manpar:
1003 header.append(key)
1003 header.append(key)
1004 header.append(value)
1004 header.append(value)
1005 for key, value in advpar:
1005 for key, value in advpar:
1006 header.append(key)
1006 header.append(key)
1007 header.append(value)
1007 header.append(value)
1008 ## finalize header
1008 ## finalize header
1009 headerchunk = ''.join(header)
1009 headerchunk = ''.join(header)
1010 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
1010 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
1011 yield _pack(_fpartheadersize, len(headerchunk))
1011 yield _pack(_fpartheadersize, len(headerchunk))
1012 yield headerchunk
1012 yield headerchunk
1013 ## payload
1013 ## payload
1014 try:
1014 try:
1015 for chunk in self._payloadchunks():
1015 for chunk in self._payloadchunks():
1016 outdebug(ui, 'payload chunk size: %i' % len(chunk))
1016 outdebug(ui, 'payload chunk size: %i' % len(chunk))
1017 yield _pack(_fpayloadsize, len(chunk))
1017 yield _pack(_fpayloadsize, len(chunk))
1018 yield chunk
1018 yield chunk
1019 except GeneratorExit:
1019 except GeneratorExit:
1020 # GeneratorExit means that nobody is listening for our
1020 # GeneratorExit means that nobody is listening for our
1021 # results anyway, so just bail quickly rather than trying
1021 # results anyway, so just bail quickly rather than trying
1022 # to produce an error part.
1022 # to produce an error part.
1023 ui.debug('bundle2-generatorexit\n')
1023 ui.debug('bundle2-generatorexit\n')
1024 raise
1024 raise
1025 except BaseException as exc:
1025 except BaseException as exc:
1026 # backup exception data for later
1026 # backup exception data for later
1027 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
1027 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
1028 % exc)
1028 % exc)
1029 tb = sys.exc_info()[2]
1029 tb = sys.exc_info()[2]
1030 msg = 'unexpected error: %s' % exc
1030 msg = 'unexpected error: %s' % exc
1031 interpart = bundlepart('error:abort', [('message', msg)],
1031 interpart = bundlepart('error:abort', [('message', msg)],
1032 mandatory=False)
1032 mandatory=False)
1033 interpart.id = 0
1033 interpart.id = 0
1034 yield _pack(_fpayloadsize, -1)
1034 yield _pack(_fpayloadsize, -1)
1035 for chunk in interpart.getchunks(ui=ui):
1035 for chunk in interpart.getchunks(ui=ui):
1036 yield chunk
1036 yield chunk
1037 outdebug(ui, 'closing payload chunk')
1037 outdebug(ui, 'closing payload chunk')
1038 # abort current part payload
1038 # abort current part payload
1039 yield _pack(_fpayloadsize, 0)
1039 yield _pack(_fpayloadsize, 0)
1040 pycompat.raisewithtb(exc, tb)
1040 pycompat.raisewithtb(exc, tb)
1041 # end of payload
1041 # end of payload
1042 outdebug(ui, 'closing payload chunk')
1042 outdebug(ui, 'closing payload chunk')
1043 yield _pack(_fpayloadsize, 0)
1043 yield _pack(_fpayloadsize, 0)
1044 self._generated = True
1044 self._generated = True
1045
1045
1046 def _payloadchunks(self):
1046 def _payloadchunks(self):
1047 """yield chunks of a the part payload
1047 """yield chunks of a the part payload
1048
1048
1049 Exists to handle the different methods to provide data to a part."""
1049 Exists to handle the different methods to provide data to a part."""
1050 # we only support fixed size data now.
1050 # we only support fixed size data now.
1051 # This will be improved in the future.
1051 # This will be improved in the future.
1052 if util.safehasattr(self.data, 'next'):
1052 if util.safehasattr(self.data, 'next'):
1053 buff = util.chunkbuffer(self.data)
1053 buff = util.chunkbuffer(self.data)
1054 chunk = buff.read(preferedchunksize)
1054 chunk = buff.read(preferedchunksize)
1055 while chunk:
1055 while chunk:
1056 yield chunk
1056 yield chunk
1057 chunk = buff.read(preferedchunksize)
1057 chunk = buff.read(preferedchunksize)
1058 elif len(self.data):
1058 elif len(self.data):
1059 yield self.data
1059 yield self.data
1060
1060
1061
1061
1062 flaginterrupt = -1
1062 flaginterrupt = -1
1063
1063
1064 class interrupthandler(unpackermixin):
1064 class interrupthandler(unpackermixin):
1065 """read one part and process it with restricted capability
1065 """read one part and process it with restricted capability
1066
1066
1067 This allows to transmit exception raised on the producer size during part
1067 This allows to transmit exception raised on the producer size during part
1068 iteration while the consumer is reading a part.
1068 iteration while the consumer is reading a part.
1069
1069
1070 Part processed in this manner only have access to a ui object,"""
1070 Part processed in this manner only have access to a ui object,"""
1071
1071
1072 def __init__(self, ui, fp):
1072 def __init__(self, ui, fp):
1073 super(interrupthandler, self).__init__(fp)
1073 super(interrupthandler, self).__init__(fp)
1074 self.ui = ui
1074 self.ui = ui
1075
1075
1076 def _readpartheader(self):
1076 def _readpartheader(self):
1077 """reads a part header size and return the bytes blob
1077 """reads a part header size and return the bytes blob
1078
1078
1079 returns None if empty"""
1079 returns None if empty"""
1080 headersize = self._unpack(_fpartheadersize)[0]
1080 headersize = self._unpack(_fpartheadersize)[0]
1081 if headersize < 0:
1081 if headersize < 0:
1082 raise error.BundleValueError('negative part header size: %i'
1082 raise error.BundleValueError('negative part header size: %i'
1083 % headersize)
1083 % headersize)
1084 indebug(self.ui, 'part header size: %i\n' % headersize)
1084 indebug(self.ui, 'part header size: %i\n' % headersize)
1085 if headersize:
1085 if headersize:
1086 return self._readexact(headersize)
1086 return self._readexact(headersize)
1087 return None
1087 return None
1088
1088
1089 def __call__(self):
1089 def __call__(self):
1090
1090
1091 self.ui.debug('bundle2-input-stream-interrupt:'
1091 self.ui.debug('bundle2-input-stream-interrupt:'
1092 ' opening out of band context\n')
1092 ' opening out of band context\n')
1093 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1093 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1094 headerblock = self._readpartheader()
1094 headerblock = self._readpartheader()
1095 if headerblock is None:
1095 if headerblock is None:
1096 indebug(self.ui, 'no part found during interruption.')
1096 indebug(self.ui, 'no part found during interruption.')
1097 return
1097 return
1098 part = unbundlepart(self.ui, headerblock, self._fp)
1098 part = unbundlepart(self.ui, headerblock, self._fp)
1099 op = interruptoperation(self.ui)
1099 op = interruptoperation(self.ui)
1100 _processpart(op, part)
1100 _processpart(op, part)
1101 self.ui.debug('bundle2-input-stream-interrupt:'
1101 self.ui.debug('bundle2-input-stream-interrupt:'
1102 ' closing out of band context\n')
1102 ' closing out of band context\n')
1103
1103
1104 class interruptoperation(object):
1104 class interruptoperation(object):
1105 """A limited operation to be use by part handler during interruption
1105 """A limited operation to be use by part handler during interruption
1106
1106
1107 It only have access to an ui object.
1107 It only have access to an ui object.
1108 """
1108 """
1109
1109
1110 def __init__(self, ui):
1110 def __init__(self, ui):
1111 self.ui = ui
1111 self.ui = ui
1112 self.reply = None
1112 self.reply = None
1113 self.captureoutput = False
1113 self.captureoutput = False
1114
1114
1115 @property
1115 @property
1116 def repo(self):
1116 def repo(self):
1117 raise error.ProgrammingError('no repo access from stream interruption')
1117 raise error.ProgrammingError('no repo access from stream interruption')
1118
1118
1119 def gettransaction(self):
1119 def gettransaction(self):
1120 raise TransactionUnavailable('no repo access from stream interruption')
1120 raise TransactionUnavailable('no repo access from stream interruption')
1121
1121
1122 class unbundlepart(unpackermixin):
1122 class unbundlepart(unpackermixin):
1123 """a bundle part read from a bundle"""
1123 """a bundle part read from a bundle"""
1124
1124
1125 def __init__(self, ui, header, fp):
1125 def __init__(self, ui, header, fp):
1126 super(unbundlepart, self).__init__(fp)
1126 super(unbundlepart, self).__init__(fp)
1127 self._seekable = (util.safehasattr(fp, 'seek') and
1127 self._seekable = (util.safehasattr(fp, 'seek') and
1128 util.safehasattr(fp, 'tell'))
1128 util.safehasattr(fp, 'tell'))
1129 self.ui = ui
1129 self.ui = ui
1130 # unbundle state attr
1130 # unbundle state attr
1131 self._headerdata = header
1131 self._headerdata = header
1132 self._headeroffset = 0
1132 self._headeroffset = 0
1133 self._initialized = False
1133 self._initialized = False
1134 self.consumed = False
1134 self.consumed = False
1135 # part data
1135 # part data
1136 self.id = None
1136 self.id = None
1137 self.type = None
1137 self.type = None
1138 self.mandatoryparams = None
1138 self.mandatoryparams = None
1139 self.advisoryparams = None
1139 self.advisoryparams = None
1140 self.params = None
1140 self.params = None
1141 self.mandatorykeys = ()
1141 self.mandatorykeys = ()
1142 self._payloadstream = None
1142 self._payloadstream = None
1143 self._readheader()
1143 self._readheader()
1144 self._mandatory = None
1144 self._mandatory = None
1145 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1145 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1146 self._pos = 0
1146 self._pos = 0
1147
1147
1148 def _fromheader(self, size):
1148 def _fromheader(self, size):
1149 """return the next <size> byte from the header"""
1149 """return the next <size> byte from the header"""
1150 offset = self._headeroffset
1150 offset = self._headeroffset
1151 data = self._headerdata[offset:(offset + size)]
1151 data = self._headerdata[offset:(offset + size)]
1152 self._headeroffset = offset + size
1152 self._headeroffset = offset + size
1153 return data
1153 return data
1154
1154
1155 def _unpackheader(self, format):
1155 def _unpackheader(self, format):
1156 """read given format from header
1156 """read given format from header
1157
1157
1158 This automatically compute the size of the format to read."""
1158 This automatically compute the size of the format to read."""
1159 data = self._fromheader(struct.calcsize(format))
1159 data = self._fromheader(struct.calcsize(format))
1160 return _unpack(format, data)
1160 return _unpack(format, data)
1161
1161
1162 def _initparams(self, mandatoryparams, advisoryparams):
1162 def _initparams(self, mandatoryparams, advisoryparams):
1163 """internal function to setup all logic related parameters"""
1163 """internal function to setup all logic related parameters"""
1164 # make it read only to prevent people touching it by mistake.
1164 # make it read only to prevent people touching it by mistake.
1165 self.mandatoryparams = tuple(mandatoryparams)
1165 self.mandatoryparams = tuple(mandatoryparams)
1166 self.advisoryparams = tuple(advisoryparams)
1166 self.advisoryparams = tuple(advisoryparams)
1167 # user friendly UI
1167 # user friendly UI
1168 self.params = util.sortdict(self.mandatoryparams)
1168 self.params = util.sortdict(self.mandatoryparams)
1169 self.params.update(self.advisoryparams)
1169 self.params.update(self.advisoryparams)
1170 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1170 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1171
1171
1172 def _payloadchunks(self, chunknum=0):
1172 def _payloadchunks(self, chunknum=0):
1173 '''seek to specified chunk and start yielding data'''
1173 '''seek to specified chunk and start yielding data'''
1174 if len(self._chunkindex) == 0:
1174 if len(self._chunkindex) == 0:
1175 assert chunknum == 0, 'Must start with chunk 0'
1175 assert chunknum == 0, 'Must start with chunk 0'
1176 self._chunkindex.append((0, self._tellfp()))
1176 self._chunkindex.append((0, self._tellfp()))
1177 else:
1177 else:
1178 assert chunknum < len(self._chunkindex), \
1178 assert chunknum < len(self._chunkindex), \
1179 'Unknown chunk %d' % chunknum
1179 'Unknown chunk %d' % chunknum
1180 self._seekfp(self._chunkindex[chunknum][1])
1180 self._seekfp(self._chunkindex[chunknum][1])
1181
1181
1182 pos = self._chunkindex[chunknum][0]
1182 pos = self._chunkindex[chunknum][0]
1183 payloadsize = self._unpack(_fpayloadsize)[0]
1183 payloadsize = self._unpack(_fpayloadsize)[0]
1184 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1184 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1185 while payloadsize:
1185 while payloadsize:
1186 if payloadsize == flaginterrupt:
1186 if payloadsize == flaginterrupt:
1187 # interruption detection, the handler will now read a
1187 # interruption detection, the handler will now read a
1188 # single part and process it.
1188 # single part and process it.
1189 interrupthandler(self.ui, self._fp)()
1189 interrupthandler(self.ui, self._fp)()
1190 elif payloadsize < 0:
1190 elif payloadsize < 0:
1191 msg = 'negative payload chunk size: %i' % payloadsize
1191 msg = 'negative payload chunk size: %i' % payloadsize
1192 raise error.BundleValueError(msg)
1192 raise error.BundleValueError(msg)
1193 else:
1193 else:
1194 result = self._readexact(payloadsize)
1194 result = self._readexact(payloadsize)
1195 chunknum += 1
1195 chunknum += 1
1196 pos += payloadsize
1196 pos += payloadsize
1197 if chunknum == len(self._chunkindex):
1197 if chunknum == len(self._chunkindex):
1198 self._chunkindex.append((pos, self._tellfp()))
1198 self._chunkindex.append((pos, self._tellfp()))
1199 yield result
1199 yield result
1200 payloadsize = self._unpack(_fpayloadsize)[0]
1200 payloadsize = self._unpack(_fpayloadsize)[0]
1201 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1201 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1202
1202
1203 def _findchunk(self, pos):
1203 def _findchunk(self, pos):
1204 '''for a given payload position, return a chunk number and offset'''
1204 '''for a given payload position, return a chunk number and offset'''
1205 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1205 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1206 if ppos == pos:
1206 if ppos == pos:
1207 return chunk, 0
1207 return chunk, 0
1208 elif ppos > pos:
1208 elif ppos > pos:
1209 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1209 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1210 raise ValueError('Unknown chunk')
1210 raise ValueError('Unknown chunk')
1211
1211
1212 def _readheader(self):
1212 def _readheader(self):
1213 """read the header and setup the object"""
1213 """read the header and setup the object"""
1214 typesize = self._unpackheader(_fparttypesize)[0]
1214 typesize = self._unpackheader(_fparttypesize)[0]
1215 self.type = self._fromheader(typesize)
1215 self.type = self._fromheader(typesize)
1216 indebug(self.ui, 'part type: "%s"' % self.type)
1216 indebug(self.ui, 'part type: "%s"' % self.type)
1217 self.id = self._unpackheader(_fpartid)[0]
1217 self.id = self._unpackheader(_fpartid)[0]
1218 indebug(self.ui, 'part id: "%s"' % self.id)
1218 indebug(self.ui, 'part id: "%s"' % self.id)
1219 # extract mandatory bit from type
1219 # extract mandatory bit from type
1220 self.mandatory = (self.type != self.type.lower())
1220 self.mandatory = (self.type != self.type.lower())
1221 self.type = self.type.lower()
1221 self.type = self.type.lower()
1222 ## reading parameters
1222 ## reading parameters
1223 # param count
1223 # param count
1224 mancount, advcount = self._unpackheader(_fpartparamcount)
1224 mancount, advcount = self._unpackheader(_fpartparamcount)
1225 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1225 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1226 # param size
1226 # param size
1227 fparamsizes = _makefpartparamsizes(mancount + advcount)
1227 fparamsizes = _makefpartparamsizes(mancount + advcount)
1228 paramsizes = self._unpackheader(fparamsizes)
1228 paramsizes = self._unpackheader(fparamsizes)
1229 # make it a list of couple again
1229 # make it a list of couple again
1230 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
1230 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
1231 # split mandatory from advisory
1231 # split mandatory from advisory
1232 mansizes = paramsizes[:mancount]
1232 mansizes = paramsizes[:mancount]
1233 advsizes = paramsizes[mancount:]
1233 advsizes = paramsizes[mancount:]
1234 # retrieve param value
1234 # retrieve param value
1235 manparams = []
1235 manparams = []
1236 for key, value in mansizes:
1236 for key, value in mansizes:
1237 manparams.append((self._fromheader(key), self._fromheader(value)))
1237 manparams.append((self._fromheader(key), self._fromheader(value)))
1238 advparams = []
1238 advparams = []
1239 for key, value in advsizes:
1239 for key, value in advsizes:
1240 advparams.append((self._fromheader(key), self._fromheader(value)))
1240 advparams.append((self._fromheader(key), self._fromheader(value)))
1241 self._initparams(manparams, advparams)
1241 self._initparams(manparams, advparams)
1242 ## part payload
1242 ## part payload
1243 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1243 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1244 # we read the data, tell it
1244 # we read the data, tell it
1245 self._initialized = True
1245 self._initialized = True
1246
1246
1247 def read(self, size=None):
1247 def read(self, size=None):
1248 """read payload data"""
1248 """read payload data"""
1249 if not self._initialized:
1249 if not self._initialized:
1250 self._readheader()
1250 self._readheader()
1251 if size is None:
1251 if size is None:
1252 data = self._payloadstream.read()
1252 data = self._payloadstream.read()
1253 else:
1253 else:
1254 data = self._payloadstream.read(size)
1254 data = self._payloadstream.read(size)
1255 self._pos += len(data)
1255 self._pos += len(data)
1256 if size is None or len(data) < size:
1256 if size is None or len(data) < size:
1257 if not self.consumed and self._pos:
1257 if not self.consumed and self._pos:
1258 self.ui.debug('bundle2-input-part: total payload size %i\n'
1258 self.ui.debug('bundle2-input-part: total payload size %i\n'
1259 % self._pos)
1259 % self._pos)
1260 self.consumed = True
1260 self.consumed = True
1261 return data
1261 return data
1262
1262
1263 def tell(self):
1263 def tell(self):
1264 return self._pos
1264 return self._pos
1265
1265
1266 def seek(self, offset, whence=0):
1266 def seek(self, offset, whence=0):
1267 if whence == 0:
1267 if whence == 0:
1268 newpos = offset
1268 newpos = offset
1269 elif whence == 1:
1269 elif whence == 1:
1270 newpos = self._pos + offset
1270 newpos = self._pos + offset
1271 elif whence == 2:
1271 elif whence == 2:
1272 if not self.consumed:
1272 if not self.consumed:
1273 self.read()
1273 self.read()
1274 newpos = self._chunkindex[-1][0] - offset
1274 newpos = self._chunkindex[-1][0] - offset
1275 else:
1275 else:
1276 raise ValueError('Unknown whence value: %r' % (whence,))
1276 raise ValueError('Unknown whence value: %r' % (whence,))
1277
1277
1278 if newpos > self._chunkindex[-1][0] and not self.consumed:
1278 if newpos > self._chunkindex[-1][0] and not self.consumed:
1279 self.read()
1279 self.read()
1280 if not 0 <= newpos <= self._chunkindex[-1][0]:
1280 if not 0 <= newpos <= self._chunkindex[-1][0]:
1281 raise ValueError('Offset out of range')
1281 raise ValueError('Offset out of range')
1282
1282
1283 if self._pos != newpos:
1283 if self._pos != newpos:
1284 chunk, internaloffset = self._findchunk(newpos)
1284 chunk, internaloffset = self._findchunk(newpos)
1285 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1285 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1286 adjust = self.read(internaloffset)
1286 adjust = self.read(internaloffset)
1287 if len(adjust) != internaloffset:
1287 if len(adjust) != internaloffset:
1288 raise error.Abort(_('Seek failed\n'))
1288 raise error.Abort(_('Seek failed\n'))
1289 self._pos = newpos
1289 self._pos = newpos
1290
1290
1291 def _seekfp(self, offset, whence=0):
1291 def _seekfp(self, offset, whence=0):
1292 """move the underlying file pointer
1292 """move the underlying file pointer
1293
1293
1294 This method is meant for internal usage by the bundle2 protocol only.
1294 This method is meant for internal usage by the bundle2 protocol only.
1295 They directly manipulate the low level stream including bundle2 level
1295 They directly manipulate the low level stream including bundle2 level
1296 instruction.
1296 instruction.
1297
1297
1298 Do not use it to implement higher-level logic or methods."""
1298 Do not use it to implement higher-level logic or methods."""
1299 if self._seekable:
1299 if self._seekable:
1300 return self._fp.seek(offset, whence)
1300 return self._fp.seek(offset, whence)
1301 else:
1301 else:
1302 raise NotImplementedError(_('File pointer is not seekable'))
1302 raise NotImplementedError(_('File pointer is not seekable'))
1303
1303
1304 def _tellfp(self):
1304 def _tellfp(self):
1305 """return the file offset, or None if file is not seekable
1305 """return the file offset, or None if file is not seekable
1306
1306
1307 This method is meant for internal usage by the bundle2 protocol only.
1307 This method is meant for internal usage by the bundle2 protocol only.
1308 They directly manipulate the low level stream including bundle2 level
1308 They directly manipulate the low level stream including bundle2 level
1309 instruction.
1309 instruction.
1310
1310
1311 Do not use it to implement higher-level logic or methods."""
1311 Do not use it to implement higher-level logic or methods."""
1312 if self._seekable:
1312 if self._seekable:
1313 try:
1313 try:
1314 return self._fp.tell()
1314 return self._fp.tell()
1315 except IOError as e:
1315 except IOError as e:
1316 if e.errno == errno.ESPIPE:
1316 if e.errno == errno.ESPIPE:
1317 self._seekable = False
1317 self._seekable = False
1318 else:
1318 else:
1319 raise
1319 raise
1320 return None
1320 return None
1321
1321
1322 # These are only the static capabilities.
1322 # These are only the static capabilities.
1323 # Check the 'getrepocaps' function for the rest.
1323 # Check the 'getrepocaps' function for the rest.
1324 capabilities = {'HG20': (),
1324 capabilities = {'HG20': (),
1325 'error': ('abort', 'unsupportedcontent', 'pushraced',
1325 'error': ('abort', 'unsupportedcontent', 'pushraced',
1326 'pushkey'),
1326 'pushkey'),
1327 'listkeys': (),
1327 'listkeys': (),
1328 'pushkey': (),
1328 'pushkey': (),
1329 'digests': tuple(sorted(util.DIGESTS.keys())),
1329 'digests': tuple(sorted(util.DIGESTS.keys())),
1330 'remote-changegroup': ('http', 'https'),
1330 'remote-changegroup': ('http', 'https'),
1331 'hgtagsfnodes': (),
1331 'hgtagsfnodes': (),
1332 }
1332 }
1333
1333
1334 def getrepocaps(repo, allowpushback=False):
1334 def getrepocaps(repo, allowpushback=False):
1335 """return the bundle2 capabilities for a given repo
1335 """return the bundle2 capabilities for a given repo
1336
1336
1337 Exists to allow extensions (like evolution) to mutate the capabilities.
1337 Exists to allow extensions (like evolution) to mutate the capabilities.
1338 """
1338 """
1339 caps = capabilities.copy()
1339 caps = capabilities.copy()
1340 caps['changegroup'] = tuple(sorted(
1340 caps['changegroup'] = tuple(sorted(
1341 changegroup.supportedincomingversions(repo)))
1341 changegroup.supportedincomingversions(repo)))
1342 if obsolete.isenabled(repo, obsolete.exchangeopt):
1342 if obsolete.isenabled(repo, obsolete.exchangeopt):
1343 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1343 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1344 caps['obsmarkers'] = supportedformat
1344 caps['obsmarkers'] = supportedformat
1345 if allowpushback:
1345 if allowpushback:
1346 caps['pushback'] = ()
1346 caps['pushback'] = ()
1347 cpmode = repo.ui.config('server', 'concurrent-push-mode')
1347 cpmode = repo.ui.config('server', 'concurrent-push-mode')
1348 if cpmode == 'check-related':
1348 if cpmode == 'check-related':
1349 caps['checkheads'] = ('related',)
1349 caps['checkheads'] = ('related',)
1350 return caps
1350 return caps
1351
1351
1352 def bundle2caps(remote):
1352 def bundle2caps(remote):
1353 """return the bundle capabilities of a peer as dict"""
1353 """return the bundle capabilities of a peer as dict"""
1354 raw = remote.capable('bundle2')
1354 raw = remote.capable('bundle2')
1355 if not raw and raw != '':
1355 if not raw and raw != '':
1356 return {}
1356 return {}
1357 capsblob = urlreq.unquote(remote.capable('bundle2'))
1357 capsblob = urlreq.unquote(remote.capable('bundle2'))
1358 return decodecaps(capsblob)
1358 return decodecaps(capsblob)
1359
1359
1360 def obsmarkersversion(caps):
1360 def obsmarkersversion(caps):
1361 """extract the list of supported obsmarkers versions from a bundle2caps dict
1361 """extract the list of supported obsmarkers versions from a bundle2caps dict
1362 """
1362 """
1363 obscaps = caps.get('obsmarkers', ())
1363 obscaps = caps.get('obsmarkers', ())
1364 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1364 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1365
1365
1366 def writenewbundle(ui, repo, source, filename, bundletype, outgoing, opts,
1366 def writenewbundle(ui, repo, source, filename, bundletype, outgoing, opts,
1367 vfs=None, compression=None, compopts=None):
1367 vfs=None, compression=None, compopts=None):
1368 if bundletype.startswith('HG10'):
1368 if bundletype.startswith('HG10'):
1369 cg = changegroup.getchangegroup(repo, source, outgoing, version='01')
1369 cg = changegroup.getchangegroup(repo, source, outgoing, version='01')
1370 return writebundle(ui, cg, filename, bundletype, vfs=vfs,
1370 return writebundle(ui, cg, filename, bundletype, vfs=vfs,
1371 compression=compression, compopts=compopts)
1371 compression=compression, compopts=compopts)
1372 elif not bundletype.startswith('HG20'):
1372 elif not bundletype.startswith('HG20'):
1373 raise error.ProgrammingError('unknown bundle type: %s' % bundletype)
1373 raise error.ProgrammingError('unknown bundle type: %s' % bundletype)
1374
1374
1375 caps = {}
1375 caps = {}
1376 if 'obsolescence' in opts:
1376 if 'obsolescence' in opts:
1377 caps['obsmarkers'] = ('V1',)
1377 caps['obsmarkers'] = ('V1',)
1378 bundle = bundle20(ui, caps)
1378 bundle = bundle20(ui, caps)
1379 bundle.setcompression(compression, compopts)
1379 bundle.setcompression(compression, compopts)
1380 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1380 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1381 chunkiter = bundle.getchunks()
1381 chunkiter = bundle.getchunks()
1382
1382
1383 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1383 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1384
1384
1385 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1385 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1386 # We should eventually reconcile this logic with the one behind
1386 # We should eventually reconcile this logic with the one behind
1387 # 'exchange.getbundle2partsgenerator'.
1387 # 'exchange.getbundle2partsgenerator'.
1388 #
1388 #
1389 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1389 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1390 # different right now. So we keep them separated for now for the sake of
1390 # different right now. So we keep them separated for now for the sake of
1391 # simplicity.
1391 # simplicity.
1392
1392
1393 # we always want a changegroup in such bundle
1393 # we always want a changegroup in such bundle
1394 cgversion = opts.get('cg.version')
1394 cgversion = opts.get('cg.version')
1395 if cgversion is None:
1395 if cgversion is None:
1396 cgversion = changegroup.safeversion(repo)
1396 cgversion = changegroup.safeversion(repo)
1397 cg = changegroup.getchangegroup(repo, source, outgoing,
1397 cg = changegroup.getchangegroup(repo, source, outgoing,
1398 version=cgversion)
1398 version=cgversion)
1399 part = bundler.newpart('changegroup', data=cg.getchunks())
1399 part = bundler.newpart('changegroup', data=cg.getchunks())
1400 part.addparam('version', cg.version)
1400 part.addparam('version', cg.version)
1401 if 'clcount' in cg.extras:
1401 if 'clcount' in cg.extras:
1402 part.addparam('nbchanges', str(cg.extras['clcount']),
1402 part.addparam('nbchanges', str(cg.extras['clcount']),
1403 mandatory=False)
1403 mandatory=False)
1404
1404
1405 addparttagsfnodescache(repo, bundler, outgoing)
1405 addparttagsfnodescache(repo, bundler, outgoing)
1406
1406
1407 if opts.get('obsolescence', False):
1407 if opts.get('obsolescence', False):
1408 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1408 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1409 buildobsmarkerspart(bundler, obsmarkers)
1409 buildobsmarkerspart(bundler, obsmarkers)
1410
1410
1411 if opts.get('phases', False):
1411 if opts.get('phases', False):
1412 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1412 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1413 phasedata = []
1413 phasedata = []
1414 for phase in phases.allphases:
1414 for phase in phases.allphases:
1415 for head in headsbyphase[phase]:
1415 for head in headsbyphase[phase]:
1416 phasedata.append(_pack(_fphasesentry, phase, head))
1416 phasedata.append(_pack(_fphasesentry, phase, head))
1417 bundler.newpart('phase-heads', data=''.join(phasedata))
1417 bundler.newpart('phase-heads', data=''.join(phasedata))
1418
1418
1419 def addparttagsfnodescache(repo, bundler, outgoing):
1419 def addparttagsfnodescache(repo, bundler, outgoing):
1420 # we include the tags fnode cache for the bundle changeset
1420 # we include the tags fnode cache for the bundle changeset
1421 # (as an optional parts)
1421 # (as an optional parts)
1422 cache = tags.hgtagsfnodescache(repo.unfiltered())
1422 cache = tags.hgtagsfnodescache(repo.unfiltered())
1423 chunks = []
1423 chunks = []
1424
1424
1425 # .hgtags fnodes are only relevant for head changesets. While we could
1425 # .hgtags fnodes are only relevant for head changesets. While we could
1426 # transfer values for all known nodes, there will likely be little to
1426 # transfer values for all known nodes, there will likely be little to
1427 # no benefit.
1427 # no benefit.
1428 #
1428 #
1429 # We don't bother using a generator to produce output data because
1429 # We don't bother using a generator to produce output data because
1430 # a) we only have 40 bytes per head and even esoteric numbers of heads
1430 # a) we only have 40 bytes per head and even esoteric numbers of heads
1431 # consume little memory (1M heads is 40MB) b) we don't want to send the
1431 # consume little memory (1M heads is 40MB) b) we don't want to send the
1432 # part if we don't have entries and knowing if we have entries requires
1432 # part if we don't have entries and knowing if we have entries requires
1433 # cache lookups.
1433 # cache lookups.
1434 for node in outgoing.missingheads:
1434 for node in outgoing.missingheads:
1435 # Don't compute missing, as this may slow down serving.
1435 # Don't compute missing, as this may slow down serving.
1436 fnode = cache.getfnode(node, computemissing=False)
1436 fnode = cache.getfnode(node, computemissing=False)
1437 if fnode is not None:
1437 if fnode is not None:
1438 chunks.extend([node, fnode])
1438 chunks.extend([node, fnode])
1439
1439
1440 if chunks:
1440 if chunks:
1441 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1441 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1442
1442
1443 def buildobsmarkerspart(bundler, markers):
1443 def buildobsmarkerspart(bundler, markers):
1444 """add an obsmarker part to the bundler with <markers>
1444 """add an obsmarker part to the bundler with <markers>
1445
1445
1446 No part is created if markers is empty.
1446 No part is created if markers is empty.
1447 Raises ValueError if the bundler doesn't support any known obsmarker format.
1447 Raises ValueError if the bundler doesn't support any known obsmarker format.
1448 """
1448 """
1449 if not markers:
1449 if not markers:
1450 return None
1450 return None
1451
1451
1452 remoteversions = obsmarkersversion(bundler.capabilities)
1452 remoteversions = obsmarkersversion(bundler.capabilities)
1453 version = obsolete.commonversion(remoteversions)
1453 version = obsolete.commonversion(remoteversions)
1454 if version is None:
1454 if version is None:
1455 raise ValueError('bundler does not support common obsmarker format')
1455 raise ValueError('bundler does not support common obsmarker format')
1456 stream = obsolete.encodemarkers(markers, True, version=version)
1456 stream = obsolete.encodemarkers(markers, True, version=version)
1457 return bundler.newpart('obsmarkers', data=stream)
1457 return bundler.newpart('obsmarkers', data=stream)
1458
1458
1459 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1459 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1460 compopts=None):
1460 compopts=None):
1461 """Write a bundle file and return its filename.
1461 """Write a bundle file and return its filename.
1462
1462
1463 Existing files will not be overwritten.
1463 Existing files will not be overwritten.
1464 If no filename is specified, a temporary file is created.
1464 If no filename is specified, a temporary file is created.
1465 bz2 compression can be turned off.
1465 bz2 compression can be turned off.
1466 The bundle file will be deleted in case of errors.
1466 The bundle file will be deleted in case of errors.
1467 """
1467 """
1468
1468
1469 if bundletype == "HG20":
1469 if bundletype == "HG20":
1470 bundle = bundle20(ui)
1470 bundle = bundle20(ui)
1471 bundle.setcompression(compression, compopts)
1471 bundle.setcompression(compression, compopts)
1472 part = bundle.newpart('changegroup', data=cg.getchunks())
1472 part = bundle.newpart('changegroup', data=cg.getchunks())
1473 part.addparam('version', cg.version)
1473 part.addparam('version', cg.version)
1474 if 'clcount' in cg.extras:
1474 if 'clcount' in cg.extras:
1475 part.addparam('nbchanges', str(cg.extras['clcount']),
1475 part.addparam('nbchanges', str(cg.extras['clcount']),
1476 mandatory=False)
1476 mandatory=False)
1477 chunkiter = bundle.getchunks()
1477 chunkiter = bundle.getchunks()
1478 else:
1478 else:
1479 # compression argument is only for the bundle2 case
1479 # compression argument is only for the bundle2 case
1480 assert compression is None
1480 assert compression is None
1481 if cg.version != '01':
1481 if cg.version != '01':
1482 raise error.Abort(_('old bundle types only supports v1 '
1482 raise error.Abort(_('old bundle types only supports v1 '
1483 'changegroups'))
1483 'changegroups'))
1484 header, comp = bundletypes[bundletype]
1484 header, comp = bundletypes[bundletype]
1485 if comp not in util.compengines.supportedbundletypes:
1485 if comp not in util.compengines.supportedbundletypes:
1486 raise error.Abort(_('unknown stream compression type: %s')
1486 raise error.Abort(_('unknown stream compression type: %s')
1487 % comp)
1487 % comp)
1488 compengine = util.compengines.forbundletype(comp)
1488 compengine = util.compengines.forbundletype(comp)
1489 def chunkiter():
1489 def chunkiter():
1490 yield header
1490 yield header
1491 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1491 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1492 yield chunk
1492 yield chunk
1493 chunkiter = chunkiter()
1493 chunkiter = chunkiter()
1494
1494
1495 # parse the changegroup data, otherwise we will block
1495 # parse the changegroup data, otherwise we will block
1496 # in case of sshrepo because we don't know the end of the stream
1496 # in case of sshrepo because we don't know the end of the stream
1497 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1497 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1498
1498
1499 def combinechangegroupresults(op):
1499 def combinechangegroupresults(op):
1500 """logic to combine 0 or more addchangegroup results into one"""
1500 """logic to combine 0 or more addchangegroup results into one"""
1501 results = [r.get('return', 0)
1501 results = [r.get('return', 0)
1502 for r in op.records['changegroup']]
1502 for r in op.records['changegroup']]
1503 changedheads = 0
1503 changedheads = 0
1504 result = 1
1504 result = 1
1505 for ret in results:
1505 for ret in results:
1506 # If any changegroup result is 0, return 0
1506 # If any changegroup result is 0, return 0
1507 if ret == 0:
1507 if ret == 0:
1508 result = 0
1508 result = 0
1509 break
1509 break
1510 if ret < -1:
1510 if ret < -1:
1511 changedheads += ret + 1
1511 changedheads += ret + 1
1512 elif ret > 1:
1512 elif ret > 1:
1513 changedheads += ret - 1
1513 changedheads += ret - 1
1514 if changedheads > 0:
1514 if changedheads > 0:
1515 result = 1 + changedheads
1515 result = 1 + changedheads
1516 elif changedheads < 0:
1516 elif changedheads < 0:
1517 result = -1 + changedheads
1517 result = -1 + changedheads
1518 return result
1518 return result
1519
1519
1520 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest'))
1520 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest',
1521 'targetphase'))
1521 def handlechangegroup(op, inpart):
1522 def handlechangegroup(op, inpart):
1522 """apply a changegroup part on the repo
1523 """apply a changegroup part on the repo
1523
1524
1524 This is a very early implementation that will massive rework before being
1525 This is a very early implementation that will massive rework before being
1525 inflicted to any end-user.
1526 inflicted to any end-user.
1526 """
1527 """
1527 tr = op.gettransaction()
1528 tr = op.gettransaction()
1528 unpackerversion = inpart.params.get('version', '01')
1529 unpackerversion = inpart.params.get('version', '01')
1529 # We should raise an appropriate exception here
1530 # We should raise an appropriate exception here
1530 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1531 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1531 # the source and url passed here are overwritten by the one contained in
1532 # the source and url passed here are overwritten by the one contained in
1532 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1533 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1533 nbchangesets = None
1534 nbchangesets = None
1534 if 'nbchanges' in inpart.params:
1535 if 'nbchanges' in inpart.params:
1535 nbchangesets = int(inpart.params.get('nbchanges'))
1536 nbchangesets = int(inpart.params.get('nbchanges'))
1536 if ('treemanifest' in inpart.params and
1537 if ('treemanifest' in inpart.params and
1537 'treemanifest' not in op.repo.requirements):
1538 'treemanifest' not in op.repo.requirements):
1538 if len(op.repo.changelog) != 0:
1539 if len(op.repo.changelog) != 0:
1539 raise error.Abort(_(
1540 raise error.Abort(_(
1540 "bundle contains tree manifests, but local repo is "
1541 "bundle contains tree manifests, but local repo is "
1541 "non-empty and does not use tree manifests"))
1542 "non-empty and does not use tree manifests"))
1542 op.repo.requirements.add('treemanifest')
1543 op.repo.requirements.add('treemanifest')
1543 op.repo._applyopenerreqs()
1544 op.repo._applyopenerreqs()
1544 op.repo._writerequirements()
1545 op.repo._writerequirements()
1546 extrakwargs = {}
1547 targetphase = inpart.params.get('targetphase')
1548 if targetphase is not None:
1549 extrakwargs['targetphase'] = int(targetphase)
1545 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2',
1550 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2',
1546 expectedtotal=nbchangesets)
1551 expectedtotal=nbchangesets, **extrakwargs)
1547 if op.reply is not None:
1552 if op.reply is not None:
1548 # This is definitely not the final form of this
1553 # This is definitely not the final form of this
1549 # return. But one need to start somewhere.
1554 # return. But one need to start somewhere.
1550 part = op.reply.newpart('reply:changegroup', mandatory=False)
1555 part = op.reply.newpart('reply:changegroup', mandatory=False)
1551 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1556 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1552 part.addparam('return', '%i' % ret, mandatory=False)
1557 part.addparam('return', '%i' % ret, mandatory=False)
1553 assert not inpart.read()
1558 assert not inpart.read()
1554
1559
1555 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1560 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1556 ['digest:%s' % k for k in util.DIGESTS.keys()])
1561 ['digest:%s' % k for k in util.DIGESTS.keys()])
1557 @parthandler('remote-changegroup', _remotechangegroupparams)
1562 @parthandler('remote-changegroup', _remotechangegroupparams)
1558 def handleremotechangegroup(op, inpart):
1563 def handleremotechangegroup(op, inpart):
1559 """apply a bundle10 on the repo, given an url and validation information
1564 """apply a bundle10 on the repo, given an url and validation information
1560
1565
1561 All the information about the remote bundle to import are given as
1566 All the information about the remote bundle to import are given as
1562 parameters. The parameters include:
1567 parameters. The parameters include:
1563 - url: the url to the bundle10.
1568 - url: the url to the bundle10.
1564 - size: the bundle10 file size. It is used to validate what was
1569 - size: the bundle10 file size. It is used to validate what was
1565 retrieved by the client matches the server knowledge about the bundle.
1570 retrieved by the client matches the server knowledge about the bundle.
1566 - digests: a space separated list of the digest types provided as
1571 - digests: a space separated list of the digest types provided as
1567 parameters.
1572 parameters.
1568 - digest:<digest-type>: the hexadecimal representation of the digest with
1573 - digest:<digest-type>: the hexadecimal representation of the digest with
1569 that name. Like the size, it is used to validate what was retrieved by
1574 that name. Like the size, it is used to validate what was retrieved by
1570 the client matches what the server knows about the bundle.
1575 the client matches what the server knows about the bundle.
1571
1576
1572 When multiple digest types are given, all of them are checked.
1577 When multiple digest types are given, all of them are checked.
1573 """
1578 """
1574 try:
1579 try:
1575 raw_url = inpart.params['url']
1580 raw_url = inpart.params['url']
1576 except KeyError:
1581 except KeyError:
1577 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1582 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1578 parsed_url = util.url(raw_url)
1583 parsed_url = util.url(raw_url)
1579 if parsed_url.scheme not in capabilities['remote-changegroup']:
1584 if parsed_url.scheme not in capabilities['remote-changegroup']:
1580 raise error.Abort(_('remote-changegroup does not support %s urls') %
1585 raise error.Abort(_('remote-changegroup does not support %s urls') %
1581 parsed_url.scheme)
1586 parsed_url.scheme)
1582
1587
1583 try:
1588 try:
1584 size = int(inpart.params['size'])
1589 size = int(inpart.params['size'])
1585 except ValueError:
1590 except ValueError:
1586 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1591 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1587 % 'size')
1592 % 'size')
1588 except KeyError:
1593 except KeyError:
1589 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1594 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1590
1595
1591 digests = {}
1596 digests = {}
1592 for typ in inpart.params.get('digests', '').split():
1597 for typ in inpart.params.get('digests', '').split():
1593 param = 'digest:%s' % typ
1598 param = 'digest:%s' % typ
1594 try:
1599 try:
1595 value = inpart.params[param]
1600 value = inpart.params[param]
1596 except KeyError:
1601 except KeyError:
1597 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1602 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1598 param)
1603 param)
1599 digests[typ] = value
1604 digests[typ] = value
1600
1605
1601 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1606 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1602
1607
1603 tr = op.gettransaction()
1608 tr = op.gettransaction()
1604 from . import exchange
1609 from . import exchange
1605 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1610 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1606 if not isinstance(cg, changegroup.cg1unpacker):
1611 if not isinstance(cg, changegroup.cg1unpacker):
1607 raise error.Abort(_('%s: not a bundle version 1.0') %
1612 raise error.Abort(_('%s: not a bundle version 1.0') %
1608 util.hidepassword(raw_url))
1613 util.hidepassword(raw_url))
1609 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2')
1614 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2')
1610 if op.reply is not None:
1615 if op.reply is not None:
1611 # This is definitely not the final form of this
1616 # This is definitely not the final form of this
1612 # return. But one need to start somewhere.
1617 # return. But one need to start somewhere.
1613 part = op.reply.newpart('reply:changegroup')
1618 part = op.reply.newpart('reply:changegroup')
1614 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1619 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1615 part.addparam('return', '%i' % ret, mandatory=False)
1620 part.addparam('return', '%i' % ret, mandatory=False)
1616 try:
1621 try:
1617 real_part.validate()
1622 real_part.validate()
1618 except error.Abort as e:
1623 except error.Abort as e:
1619 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1624 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1620 (util.hidepassword(raw_url), str(e)))
1625 (util.hidepassword(raw_url), str(e)))
1621 assert not inpart.read()
1626 assert not inpart.read()
1622
1627
1623 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1628 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1624 def handlereplychangegroup(op, inpart):
1629 def handlereplychangegroup(op, inpart):
1625 ret = int(inpart.params['return'])
1630 ret = int(inpart.params['return'])
1626 replyto = int(inpart.params['in-reply-to'])
1631 replyto = int(inpart.params['in-reply-to'])
1627 op.records.add('changegroup', {'return': ret}, replyto)
1632 op.records.add('changegroup', {'return': ret}, replyto)
1628
1633
1629 @parthandler('check:heads')
1634 @parthandler('check:heads')
1630 def handlecheckheads(op, inpart):
1635 def handlecheckheads(op, inpart):
1631 """check that head of the repo did not change
1636 """check that head of the repo did not change
1632
1637
1633 This is used to detect a push race when using unbundle.
1638 This is used to detect a push race when using unbundle.
1634 This replaces the "heads" argument of unbundle."""
1639 This replaces the "heads" argument of unbundle."""
1635 h = inpart.read(20)
1640 h = inpart.read(20)
1636 heads = []
1641 heads = []
1637 while len(h) == 20:
1642 while len(h) == 20:
1638 heads.append(h)
1643 heads.append(h)
1639 h = inpart.read(20)
1644 h = inpart.read(20)
1640 assert not h
1645 assert not h
1641 # Trigger a transaction so that we are guaranteed to have the lock now.
1646 # Trigger a transaction so that we are guaranteed to have the lock now.
1642 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1647 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1643 op.gettransaction()
1648 op.gettransaction()
1644 if sorted(heads) != sorted(op.repo.heads()):
1649 if sorted(heads) != sorted(op.repo.heads()):
1645 raise error.PushRaced('repository changed while pushing - '
1650 raise error.PushRaced('repository changed while pushing - '
1646 'please try again')
1651 'please try again')
1647
1652
1648 @parthandler('check:updated-heads')
1653 @parthandler('check:updated-heads')
1649 def handlecheckupdatedheads(op, inpart):
1654 def handlecheckupdatedheads(op, inpart):
1650 """check for race on the heads touched by a push
1655 """check for race on the heads touched by a push
1651
1656
1652 This is similar to 'check:heads' but focus on the heads actually updated
1657 This is similar to 'check:heads' but focus on the heads actually updated
1653 during the push. If other activities happen on unrelated heads, it is
1658 during the push. If other activities happen on unrelated heads, it is
1654 ignored.
1659 ignored.
1655
1660
1656 This allow server with high traffic to avoid push contention as long as
1661 This allow server with high traffic to avoid push contention as long as
1657 unrelated parts of the graph are involved."""
1662 unrelated parts of the graph are involved."""
1658 h = inpart.read(20)
1663 h = inpart.read(20)
1659 heads = []
1664 heads = []
1660 while len(h) == 20:
1665 while len(h) == 20:
1661 heads.append(h)
1666 heads.append(h)
1662 h = inpart.read(20)
1667 h = inpart.read(20)
1663 assert not h
1668 assert not h
1664 # trigger a transaction so that we are guaranteed to have the lock now.
1669 # trigger a transaction so that we are guaranteed to have the lock now.
1665 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1670 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1666 op.gettransaction()
1671 op.gettransaction()
1667
1672
1668 currentheads = set()
1673 currentheads = set()
1669 for ls in op.repo.branchmap().itervalues():
1674 for ls in op.repo.branchmap().itervalues():
1670 currentheads.update(ls)
1675 currentheads.update(ls)
1671
1676
1672 for h in heads:
1677 for h in heads:
1673 if h not in currentheads:
1678 if h not in currentheads:
1674 raise error.PushRaced('repository changed while pushing - '
1679 raise error.PushRaced('repository changed while pushing - '
1675 'please try again')
1680 'please try again')
1676
1681
1677 @parthandler('output')
1682 @parthandler('output')
1678 def handleoutput(op, inpart):
1683 def handleoutput(op, inpart):
1679 """forward output captured on the server to the client"""
1684 """forward output captured on the server to the client"""
1680 for line in inpart.read().splitlines():
1685 for line in inpart.read().splitlines():
1681 op.ui.status(_('remote: %s\n') % line)
1686 op.ui.status(_('remote: %s\n') % line)
1682
1687
1683 @parthandler('replycaps')
1688 @parthandler('replycaps')
1684 def handlereplycaps(op, inpart):
1689 def handlereplycaps(op, inpart):
1685 """Notify that a reply bundle should be created
1690 """Notify that a reply bundle should be created
1686
1691
1687 The payload contains the capabilities information for the reply"""
1692 The payload contains the capabilities information for the reply"""
1688 caps = decodecaps(inpart.read())
1693 caps = decodecaps(inpart.read())
1689 if op.reply is None:
1694 if op.reply is None:
1690 op.reply = bundle20(op.ui, caps)
1695 op.reply = bundle20(op.ui, caps)
1691
1696
1692 class AbortFromPart(error.Abort):
1697 class AbortFromPart(error.Abort):
1693 """Sub-class of Abort that denotes an error from a bundle2 part."""
1698 """Sub-class of Abort that denotes an error from a bundle2 part."""
1694
1699
1695 @parthandler('error:abort', ('message', 'hint'))
1700 @parthandler('error:abort', ('message', 'hint'))
1696 def handleerrorabort(op, inpart):
1701 def handleerrorabort(op, inpart):
1697 """Used to transmit abort error over the wire"""
1702 """Used to transmit abort error over the wire"""
1698 raise AbortFromPart(inpart.params['message'],
1703 raise AbortFromPart(inpart.params['message'],
1699 hint=inpart.params.get('hint'))
1704 hint=inpart.params.get('hint'))
1700
1705
1701 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1706 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1702 'in-reply-to'))
1707 'in-reply-to'))
1703 def handleerrorpushkey(op, inpart):
1708 def handleerrorpushkey(op, inpart):
1704 """Used to transmit failure of a mandatory pushkey over the wire"""
1709 """Used to transmit failure of a mandatory pushkey over the wire"""
1705 kwargs = {}
1710 kwargs = {}
1706 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1711 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1707 value = inpart.params.get(name)
1712 value = inpart.params.get(name)
1708 if value is not None:
1713 if value is not None:
1709 kwargs[name] = value
1714 kwargs[name] = value
1710 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1715 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1711
1716
1712 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1717 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1713 def handleerrorunsupportedcontent(op, inpart):
1718 def handleerrorunsupportedcontent(op, inpart):
1714 """Used to transmit unknown content error over the wire"""
1719 """Used to transmit unknown content error over the wire"""
1715 kwargs = {}
1720 kwargs = {}
1716 parttype = inpart.params.get('parttype')
1721 parttype = inpart.params.get('parttype')
1717 if parttype is not None:
1722 if parttype is not None:
1718 kwargs['parttype'] = parttype
1723 kwargs['parttype'] = parttype
1719 params = inpart.params.get('params')
1724 params = inpart.params.get('params')
1720 if params is not None:
1725 if params is not None:
1721 kwargs['params'] = params.split('\0')
1726 kwargs['params'] = params.split('\0')
1722
1727
1723 raise error.BundleUnknownFeatureError(**kwargs)
1728 raise error.BundleUnknownFeatureError(**kwargs)
1724
1729
1725 @parthandler('error:pushraced', ('message',))
1730 @parthandler('error:pushraced', ('message',))
1726 def handleerrorpushraced(op, inpart):
1731 def handleerrorpushraced(op, inpart):
1727 """Used to transmit push race error over the wire"""
1732 """Used to transmit push race error over the wire"""
1728 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1733 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1729
1734
1730 @parthandler('listkeys', ('namespace',))
1735 @parthandler('listkeys', ('namespace',))
1731 def handlelistkeys(op, inpart):
1736 def handlelistkeys(op, inpart):
1732 """retrieve pushkey namespace content stored in a bundle2"""
1737 """retrieve pushkey namespace content stored in a bundle2"""
1733 namespace = inpart.params['namespace']
1738 namespace = inpart.params['namespace']
1734 r = pushkey.decodekeys(inpart.read())
1739 r = pushkey.decodekeys(inpart.read())
1735 op.records.add('listkeys', (namespace, r))
1740 op.records.add('listkeys', (namespace, r))
1736
1741
1737 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1742 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1738 def handlepushkey(op, inpart):
1743 def handlepushkey(op, inpart):
1739 """process a pushkey request"""
1744 """process a pushkey request"""
1740 dec = pushkey.decode
1745 dec = pushkey.decode
1741 namespace = dec(inpart.params['namespace'])
1746 namespace = dec(inpart.params['namespace'])
1742 key = dec(inpart.params['key'])
1747 key = dec(inpart.params['key'])
1743 old = dec(inpart.params['old'])
1748 old = dec(inpart.params['old'])
1744 new = dec(inpart.params['new'])
1749 new = dec(inpart.params['new'])
1745 # Grab the transaction to ensure that we have the lock before performing the
1750 # Grab the transaction to ensure that we have the lock before performing the
1746 # pushkey.
1751 # pushkey.
1747 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1752 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1748 op.gettransaction()
1753 op.gettransaction()
1749 ret = op.repo.pushkey(namespace, key, old, new)
1754 ret = op.repo.pushkey(namespace, key, old, new)
1750 record = {'namespace': namespace,
1755 record = {'namespace': namespace,
1751 'key': key,
1756 'key': key,
1752 'old': old,
1757 'old': old,
1753 'new': new}
1758 'new': new}
1754 op.records.add('pushkey', record)
1759 op.records.add('pushkey', record)
1755 if op.reply is not None:
1760 if op.reply is not None:
1756 rpart = op.reply.newpart('reply:pushkey')
1761 rpart = op.reply.newpart('reply:pushkey')
1757 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1762 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1758 rpart.addparam('return', '%i' % ret, mandatory=False)
1763 rpart.addparam('return', '%i' % ret, mandatory=False)
1759 if inpart.mandatory and not ret:
1764 if inpart.mandatory and not ret:
1760 kwargs = {}
1765 kwargs = {}
1761 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1766 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1762 if key in inpart.params:
1767 if key in inpart.params:
1763 kwargs[key] = inpart.params[key]
1768 kwargs[key] = inpart.params[key]
1764 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1769 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1765
1770
1766 def _readphaseheads(inpart):
1771 def _readphaseheads(inpart):
1767 headsbyphase = [[] for i in phases.allphases]
1772 headsbyphase = [[] for i in phases.allphases]
1768 entrysize = struct.calcsize(_fphasesentry)
1773 entrysize = struct.calcsize(_fphasesentry)
1769 while True:
1774 while True:
1770 entry = inpart.read(entrysize)
1775 entry = inpart.read(entrysize)
1771 if len(entry) < entrysize:
1776 if len(entry) < entrysize:
1772 if entry:
1777 if entry:
1773 raise error.Abort(_('bad phase-heads bundle part'))
1778 raise error.Abort(_('bad phase-heads bundle part'))
1774 break
1779 break
1775 phase, node = struct.unpack(_fphasesentry, entry)
1780 phase, node = struct.unpack(_fphasesentry, entry)
1776 headsbyphase[phase].append(node)
1781 headsbyphase[phase].append(node)
1777 return headsbyphase
1782 return headsbyphase
1778
1783
1779 @parthandler('phase-heads')
1784 @parthandler('phase-heads')
1780 def handlephases(op, inpart):
1785 def handlephases(op, inpart):
1781 """apply phases from bundle part to repo"""
1786 """apply phases from bundle part to repo"""
1782 headsbyphase = _readphaseheads(inpart)
1787 headsbyphase = _readphaseheads(inpart)
1783 addednodes = []
1788 addednodes = []
1784 for entry in op.records['changegroup']:
1789 for entry in op.records['changegroup']:
1785 addednodes.extend(entry['addednodes'])
1790 addednodes.extend(entry['addednodes'])
1786 phases.updatephases(op.repo.unfiltered(), op.gettransaction(), headsbyphase,
1791 phases.updatephases(op.repo.unfiltered(), op.gettransaction(), headsbyphase,
1787 addednodes)
1792 addednodes)
1788
1793
1789 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1794 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1790 def handlepushkeyreply(op, inpart):
1795 def handlepushkeyreply(op, inpart):
1791 """retrieve the result of a pushkey request"""
1796 """retrieve the result of a pushkey request"""
1792 ret = int(inpart.params['return'])
1797 ret = int(inpart.params['return'])
1793 partid = int(inpart.params['in-reply-to'])
1798 partid = int(inpart.params['in-reply-to'])
1794 op.records.add('pushkey', {'return': ret}, partid)
1799 op.records.add('pushkey', {'return': ret}, partid)
1795
1800
1796 @parthandler('obsmarkers')
1801 @parthandler('obsmarkers')
1797 def handleobsmarker(op, inpart):
1802 def handleobsmarker(op, inpart):
1798 """add a stream of obsmarkers to the repo"""
1803 """add a stream of obsmarkers to the repo"""
1799 tr = op.gettransaction()
1804 tr = op.gettransaction()
1800 markerdata = inpart.read()
1805 markerdata = inpart.read()
1801 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1806 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1802 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1807 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1803 % len(markerdata))
1808 % len(markerdata))
1804 # The mergemarkers call will crash if marker creation is not enabled.
1809 # The mergemarkers call will crash if marker creation is not enabled.
1805 # we want to avoid this if the part is advisory.
1810 # we want to avoid this if the part is advisory.
1806 if not inpart.mandatory and op.repo.obsstore.readonly:
1811 if not inpart.mandatory and op.repo.obsstore.readonly:
1807 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1812 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1808 return
1813 return
1809 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1814 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1810 op.repo.invalidatevolatilesets()
1815 op.repo.invalidatevolatilesets()
1811 if new:
1816 if new:
1812 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1817 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1813 op.records.add('obsmarkers', {'new': new})
1818 op.records.add('obsmarkers', {'new': new})
1814 scmutil.registersummarycallback(op.repo, tr)
1819 scmutil.registersummarycallback(op.repo, tr)
1815 if op.reply is not None:
1820 if op.reply is not None:
1816 rpart = op.reply.newpart('reply:obsmarkers')
1821 rpart = op.reply.newpart('reply:obsmarkers')
1817 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1822 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1818 rpart.addparam('new', '%i' % new, mandatory=False)
1823 rpart.addparam('new', '%i' % new, mandatory=False)
1819
1824
1820
1825
1821 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1826 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1822 def handleobsmarkerreply(op, inpart):
1827 def handleobsmarkerreply(op, inpart):
1823 """retrieve the result of a pushkey request"""
1828 """retrieve the result of a pushkey request"""
1824 ret = int(inpart.params['new'])
1829 ret = int(inpart.params['new'])
1825 partid = int(inpart.params['in-reply-to'])
1830 partid = int(inpart.params['in-reply-to'])
1826 op.records.add('obsmarkers', {'new': ret}, partid)
1831 op.records.add('obsmarkers', {'new': ret}, partid)
1827
1832
1828 @parthandler('hgtagsfnodes')
1833 @parthandler('hgtagsfnodes')
1829 def handlehgtagsfnodes(op, inpart):
1834 def handlehgtagsfnodes(op, inpart):
1830 """Applies .hgtags fnodes cache entries to the local repo.
1835 """Applies .hgtags fnodes cache entries to the local repo.
1831
1836
1832 Payload is pairs of 20 byte changeset nodes and filenodes.
1837 Payload is pairs of 20 byte changeset nodes and filenodes.
1833 """
1838 """
1834 # Grab the transaction so we ensure that we have the lock at this point.
1839 # Grab the transaction so we ensure that we have the lock at this point.
1835 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1840 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1836 op.gettransaction()
1841 op.gettransaction()
1837 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1842 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1838
1843
1839 count = 0
1844 count = 0
1840 while True:
1845 while True:
1841 node = inpart.read(20)
1846 node = inpart.read(20)
1842 fnode = inpart.read(20)
1847 fnode = inpart.read(20)
1843 if len(node) < 20 or len(fnode) < 20:
1848 if len(node) < 20 or len(fnode) < 20:
1844 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1849 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1845 break
1850 break
1846 cache.setfnode(node, fnode)
1851 cache.setfnode(node, fnode)
1847 count += 1
1852 count += 1
1848
1853
1849 cache.write()
1854 cache.write()
1850 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
1855 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
General Comments 0
You need to be logged in to leave comments. Login now