##// END OF EJS Templates
bundle: inline applybundle1()...
Martin von Zweigbergk -
r33044:1d2b6895 default
parent child Browse files
Show More
@@ -1,1848 +1,1845 b''
1 # bundle2.py - generic container format to transmit arbitrary data.
1 # bundle2.py - generic container format to transmit arbitrary data.
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7 """Handling of the new bundle2 format
7 """Handling of the new bundle2 format
8
8
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 payloads in an application agnostic way. It consist in a sequence of "parts"
10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 that will be handed to and processed by the application layer.
11 that will be handed to and processed by the application layer.
12
12
13
13
14 General format architecture
14 General format architecture
15 ===========================
15 ===========================
16
16
17 The format is architectured as follow
17 The format is architectured as follow
18
18
19 - magic string
19 - magic string
20 - stream level parameters
20 - stream level parameters
21 - payload parts (any number)
21 - payload parts (any number)
22 - end of stream marker.
22 - end of stream marker.
23
23
24 the Binary format
24 the Binary format
25 ============================
25 ============================
26
26
27 All numbers are unsigned and big-endian.
27 All numbers are unsigned and big-endian.
28
28
29 stream level parameters
29 stream level parameters
30 ------------------------
30 ------------------------
31
31
32 Binary format is as follow
32 Binary format is as follow
33
33
34 :params size: int32
34 :params size: int32
35
35
36 The total number of Bytes used by the parameters
36 The total number of Bytes used by the parameters
37
37
38 :params value: arbitrary number of Bytes
38 :params value: arbitrary number of Bytes
39
39
40 A blob of `params size` containing the serialized version of all stream level
40 A blob of `params size` containing the serialized version of all stream level
41 parameters.
41 parameters.
42
42
43 The blob contains a space separated list of parameters. Parameters with value
43 The blob contains a space separated list of parameters. Parameters with value
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45
45
46 Empty name are obviously forbidden.
46 Empty name are obviously forbidden.
47
47
48 Name MUST start with a letter. If this first letter is lower case, the
48 Name MUST start with a letter. If this first letter is lower case, the
49 parameter is advisory and can be safely ignored. However when the first
49 parameter is advisory and can be safely ignored. However when the first
50 letter is capital, the parameter is mandatory and the bundling process MUST
50 letter is capital, the parameter is mandatory and the bundling process MUST
51 stop if he is not able to proceed it.
51 stop if he is not able to proceed it.
52
52
53 Stream parameters use a simple textual format for two main reasons:
53 Stream parameters use a simple textual format for two main reasons:
54
54
55 - Stream level parameters should remain simple and we want to discourage any
55 - Stream level parameters should remain simple and we want to discourage any
56 crazy usage.
56 crazy usage.
57 - Textual data allow easy human inspection of a bundle2 header in case of
57 - Textual data allow easy human inspection of a bundle2 header in case of
58 troubles.
58 troubles.
59
59
60 Any Applicative level options MUST go into a bundle2 part instead.
60 Any Applicative level options MUST go into a bundle2 part instead.
61
61
62 Payload part
62 Payload part
63 ------------------------
63 ------------------------
64
64
65 Binary format is as follow
65 Binary format is as follow
66
66
67 :header size: int32
67 :header size: int32
68
68
69 The total number of Bytes used by the part header. When the header is empty
69 The total number of Bytes used by the part header. When the header is empty
70 (size = 0) this is interpreted as the end of stream marker.
70 (size = 0) this is interpreted as the end of stream marker.
71
71
72 :header:
72 :header:
73
73
74 The header defines how to interpret the part. It contains two piece of
74 The header defines how to interpret the part. It contains two piece of
75 data: the part type, and the part parameters.
75 data: the part type, and the part parameters.
76
76
77 The part type is used to route an application level handler, that can
77 The part type is used to route an application level handler, that can
78 interpret payload.
78 interpret payload.
79
79
80 Part parameters are passed to the application level handler. They are
80 Part parameters are passed to the application level handler. They are
81 meant to convey information that will help the application level object to
81 meant to convey information that will help the application level object to
82 interpret the part payload.
82 interpret the part payload.
83
83
84 The binary format of the header is has follow
84 The binary format of the header is has follow
85
85
86 :typesize: (one byte)
86 :typesize: (one byte)
87
87
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89
89
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 to this part.
91 to this part.
92
92
93 :parameters:
93 :parameters:
94
94
95 Part's parameter may have arbitrary content, the binary structure is::
95 Part's parameter may have arbitrary content, the binary structure is::
96
96
97 <mandatory-count><advisory-count><param-sizes><param-data>
97 <mandatory-count><advisory-count><param-sizes><param-data>
98
98
99 :mandatory-count: 1 byte, number of mandatory parameters
99 :mandatory-count: 1 byte, number of mandatory parameters
100
100
101 :advisory-count: 1 byte, number of advisory parameters
101 :advisory-count: 1 byte, number of advisory parameters
102
102
103 :param-sizes:
103 :param-sizes:
104
104
105 N couple of bytes, where N is the total number of parameters. Each
105 N couple of bytes, where N is the total number of parameters. Each
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107
107
108 :param-data:
108 :param-data:
109
109
110 A blob of bytes from which each parameter key and value can be
110 A blob of bytes from which each parameter key and value can be
111 retrieved using the list of size couples stored in the previous
111 retrieved using the list of size couples stored in the previous
112 field.
112 field.
113
113
114 Mandatory parameters comes first, then the advisory ones.
114 Mandatory parameters comes first, then the advisory ones.
115
115
116 Each parameter's key MUST be unique within the part.
116 Each parameter's key MUST be unique within the part.
117
117
118 :payload:
118 :payload:
119
119
120 payload is a series of `<chunksize><chunkdata>`.
120 payload is a series of `<chunksize><chunkdata>`.
121
121
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124
124
125 The current implementation always produces either zero or one chunk.
125 The current implementation always produces either zero or one chunk.
126 This is an implementation limitation that will ultimately be lifted.
126 This is an implementation limitation that will ultimately be lifted.
127
127
128 `chunksize` can be negative to trigger special case processing. No such
128 `chunksize` can be negative to trigger special case processing. No such
129 processing is in place yet.
129 processing is in place yet.
130
130
131 Bundle processing
131 Bundle processing
132 ============================
132 ============================
133
133
134 Each part is processed in order using a "part handler". Handler are registered
134 Each part is processed in order using a "part handler". Handler are registered
135 for a certain part type.
135 for a certain part type.
136
136
137 The matching of a part to its handler is case insensitive. The case of the
137 The matching of a part to its handler is case insensitive. The case of the
138 part type is used to know if a part is mandatory or advisory. If the Part type
138 part type is used to know if a part is mandatory or advisory. If the Part type
139 contains any uppercase char it is considered mandatory. When no handler is
139 contains any uppercase char it is considered mandatory. When no handler is
140 known for a Mandatory part, the process is aborted and an exception is raised.
140 known for a Mandatory part, the process is aborted and an exception is raised.
141 If the part is advisory and no handler is known, the part is ignored. When the
141 If the part is advisory and no handler is known, the part is ignored. When the
142 process is aborted, the full bundle is still read from the stream to keep the
142 process is aborted, the full bundle is still read from the stream to keep the
143 channel usable. But none of the part read from an abort are processed. In the
143 channel usable. But none of the part read from an abort are processed. In the
144 future, dropping the stream may become an option for channel we do not care to
144 future, dropping the stream may become an option for channel we do not care to
145 preserve.
145 preserve.
146 """
146 """
147
147
148 from __future__ import absolute_import
148 from __future__ import absolute_import
149
149
150 import errno
150 import errno
151 import re
151 import re
152 import string
152 import string
153 import struct
153 import struct
154 import sys
154 import sys
155
155
156 from .i18n import _
156 from .i18n import _
157 from . import (
157 from . import (
158 changegroup,
158 changegroup,
159 error,
159 error,
160 obsolete,
160 obsolete,
161 phases,
161 phases,
162 pushkey,
162 pushkey,
163 pycompat,
163 pycompat,
164 tags,
164 tags,
165 url,
165 url,
166 util,
166 util,
167 )
167 )
168
168
169 urlerr = util.urlerr
169 urlerr = util.urlerr
170 urlreq = util.urlreq
170 urlreq = util.urlreq
171
171
172 _pack = struct.pack
172 _pack = struct.pack
173 _unpack = struct.unpack
173 _unpack = struct.unpack
174
174
175 _fstreamparamsize = '>i'
175 _fstreamparamsize = '>i'
176 _fpartheadersize = '>i'
176 _fpartheadersize = '>i'
177 _fparttypesize = '>B'
177 _fparttypesize = '>B'
178 _fpartid = '>I'
178 _fpartid = '>I'
179 _fpayloadsize = '>i'
179 _fpayloadsize = '>i'
180 _fpartparamcount = '>BB'
180 _fpartparamcount = '>BB'
181
181
182 _fphasesentry = '>i20s'
182 _fphasesentry = '>i20s'
183
183
184 preferedchunksize = 4096
184 preferedchunksize = 4096
185
185
186 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
186 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
187
187
188 def outdebug(ui, message):
188 def outdebug(ui, message):
189 """debug regarding output stream (bundling)"""
189 """debug regarding output stream (bundling)"""
190 if ui.configbool('devel', 'bundle2.debug', False):
190 if ui.configbool('devel', 'bundle2.debug', False):
191 ui.debug('bundle2-output: %s\n' % message)
191 ui.debug('bundle2-output: %s\n' % message)
192
192
193 def indebug(ui, message):
193 def indebug(ui, message):
194 """debug on input stream (unbundling)"""
194 """debug on input stream (unbundling)"""
195 if ui.configbool('devel', 'bundle2.debug', False):
195 if ui.configbool('devel', 'bundle2.debug', False):
196 ui.debug('bundle2-input: %s\n' % message)
196 ui.debug('bundle2-input: %s\n' % message)
197
197
198 def validateparttype(parttype):
198 def validateparttype(parttype):
199 """raise ValueError if a parttype contains invalid character"""
199 """raise ValueError if a parttype contains invalid character"""
200 if _parttypeforbidden.search(parttype):
200 if _parttypeforbidden.search(parttype):
201 raise ValueError(parttype)
201 raise ValueError(parttype)
202
202
203 def _makefpartparamsizes(nbparams):
203 def _makefpartparamsizes(nbparams):
204 """return a struct format to read part parameter sizes
204 """return a struct format to read part parameter sizes
205
205
206 The number parameters is variable so we need to build that format
206 The number parameters is variable so we need to build that format
207 dynamically.
207 dynamically.
208 """
208 """
209 return '>'+('BB'*nbparams)
209 return '>'+('BB'*nbparams)
210
210
211 parthandlermapping = {}
211 parthandlermapping = {}
212
212
213 def parthandler(parttype, params=()):
213 def parthandler(parttype, params=()):
214 """decorator that register a function as a bundle2 part handler
214 """decorator that register a function as a bundle2 part handler
215
215
216 eg::
216 eg::
217
217
218 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
218 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
219 def myparttypehandler(...):
219 def myparttypehandler(...):
220 '''process a part of type "my part".'''
220 '''process a part of type "my part".'''
221 ...
221 ...
222 """
222 """
223 validateparttype(parttype)
223 validateparttype(parttype)
224 def _decorator(func):
224 def _decorator(func):
225 lparttype = parttype.lower() # enforce lower case matching.
225 lparttype = parttype.lower() # enforce lower case matching.
226 assert lparttype not in parthandlermapping
226 assert lparttype not in parthandlermapping
227 parthandlermapping[lparttype] = func
227 parthandlermapping[lparttype] = func
228 func.params = frozenset(params)
228 func.params = frozenset(params)
229 return func
229 return func
230 return _decorator
230 return _decorator
231
231
232 class unbundlerecords(object):
232 class unbundlerecords(object):
233 """keep record of what happens during and unbundle
233 """keep record of what happens during and unbundle
234
234
235 New records are added using `records.add('cat', obj)`. Where 'cat' is a
235 New records are added using `records.add('cat', obj)`. Where 'cat' is a
236 category of record and obj is an arbitrary object.
236 category of record and obj is an arbitrary object.
237
237
238 `records['cat']` will return all entries of this category 'cat'.
238 `records['cat']` will return all entries of this category 'cat'.
239
239
240 Iterating on the object itself will yield `('category', obj)` tuples
240 Iterating on the object itself will yield `('category', obj)` tuples
241 for all entries.
241 for all entries.
242
242
243 All iterations happens in chronological order.
243 All iterations happens in chronological order.
244 """
244 """
245
245
246 def __init__(self):
246 def __init__(self):
247 self._categories = {}
247 self._categories = {}
248 self._sequences = []
248 self._sequences = []
249 self._replies = {}
249 self._replies = {}
250
250
251 def add(self, category, entry, inreplyto=None):
251 def add(self, category, entry, inreplyto=None):
252 """add a new record of a given category.
252 """add a new record of a given category.
253
253
254 The entry can then be retrieved in the list returned by
254 The entry can then be retrieved in the list returned by
255 self['category']."""
255 self['category']."""
256 self._categories.setdefault(category, []).append(entry)
256 self._categories.setdefault(category, []).append(entry)
257 self._sequences.append((category, entry))
257 self._sequences.append((category, entry))
258 if inreplyto is not None:
258 if inreplyto is not None:
259 self.getreplies(inreplyto).add(category, entry)
259 self.getreplies(inreplyto).add(category, entry)
260
260
261 def getreplies(self, partid):
261 def getreplies(self, partid):
262 """get the records that are replies to a specific part"""
262 """get the records that are replies to a specific part"""
263 return self._replies.setdefault(partid, unbundlerecords())
263 return self._replies.setdefault(partid, unbundlerecords())
264
264
265 def __getitem__(self, cat):
265 def __getitem__(self, cat):
266 return tuple(self._categories.get(cat, ()))
266 return tuple(self._categories.get(cat, ()))
267
267
268 def __iter__(self):
268 def __iter__(self):
269 return iter(self._sequences)
269 return iter(self._sequences)
270
270
271 def __len__(self):
271 def __len__(self):
272 return len(self._sequences)
272 return len(self._sequences)
273
273
274 def __nonzero__(self):
274 def __nonzero__(self):
275 return bool(self._sequences)
275 return bool(self._sequences)
276
276
277 __bool__ = __nonzero__
277 __bool__ = __nonzero__
278
278
279 class bundleoperation(object):
279 class bundleoperation(object):
280 """an object that represents a single bundling process
280 """an object that represents a single bundling process
281
281
282 Its purpose is to carry unbundle-related objects and states.
282 Its purpose is to carry unbundle-related objects and states.
283
283
284 A new object should be created at the beginning of each bundle processing.
284 A new object should be created at the beginning of each bundle processing.
285 The object is to be returned by the processing function.
285 The object is to be returned by the processing function.
286
286
287 The object has very little content now it will ultimately contain:
287 The object has very little content now it will ultimately contain:
288 * an access to the repo the bundle is applied to,
288 * an access to the repo the bundle is applied to,
289 * a ui object,
289 * a ui object,
290 * a way to retrieve a transaction to add changes to the repo,
290 * a way to retrieve a transaction to add changes to the repo,
291 * a way to record the result of processing each part,
291 * a way to record the result of processing each part,
292 * a way to construct a bundle response when applicable.
292 * a way to construct a bundle response when applicable.
293 """
293 """
294
294
295 def __init__(self, repo, transactiongetter, captureoutput=True):
295 def __init__(self, repo, transactiongetter, captureoutput=True):
296 self.repo = repo
296 self.repo = repo
297 self.ui = repo.ui
297 self.ui = repo.ui
298 self.records = unbundlerecords()
298 self.records = unbundlerecords()
299 self.gettransaction = transactiongetter
299 self.gettransaction = transactiongetter
300 self.reply = None
300 self.reply = None
301 self.captureoutput = captureoutput
301 self.captureoutput = captureoutput
302
302
303 class TransactionUnavailable(RuntimeError):
303 class TransactionUnavailable(RuntimeError):
304 pass
304 pass
305
305
306 def _notransaction():
306 def _notransaction():
307 """default method to get a transaction while processing a bundle
307 """default method to get a transaction while processing a bundle
308
308
309 Raise an exception to highlight the fact that no transaction was expected
309 Raise an exception to highlight the fact that no transaction was expected
310 to be created"""
310 to be created"""
311 raise TransactionUnavailable()
311 raise TransactionUnavailable()
312
312
313 def applybundle1(repo, cg, tr, source, url, **kwargs):
314 # the transactiongetter won't be used, but we might as well set it
315 op = bundleoperation(repo, lambda: tr)
316 _processchangegroup(op, cg, tr, source, url, **kwargs)
317 return op
318
319 def applybundle(repo, unbundler, tr, source=None, url=None, **kwargs):
313 def applybundle(repo, unbundler, tr, source=None, url=None, **kwargs):
320 # transform me into unbundler.apply() as soon as the freeze is lifted
314 # transform me into unbundler.apply() as soon as the freeze is lifted
321 if isinstance(unbundler, unbundle20):
315 if isinstance(unbundler, unbundle20):
322 tr.hookargs['bundle2'] = '1'
316 tr.hookargs['bundle2'] = '1'
323 if source is not None and 'source' not in tr.hookargs:
317 if source is not None and 'source' not in tr.hookargs:
324 tr.hookargs['source'] = source
318 tr.hookargs['source'] = source
325 if url is not None and 'url' not in tr.hookargs:
319 if url is not None and 'url' not in tr.hookargs:
326 tr.hookargs['url'] = url
320 tr.hookargs['url'] = url
327 return processbundle(repo, unbundler, lambda: tr)
321 return processbundle(repo, unbundler, lambda: tr)
328 else:
322 else:
329 return applybundle1(repo, unbundler, tr, source, url, **kwargs)
323 # the transactiongetter won't be used, but we might as well set it
324 op = bundleoperation(repo, lambda: tr)
325 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
326 return op
330
327
331 def processbundle(repo, unbundler, transactiongetter=None, op=None):
328 def processbundle(repo, unbundler, transactiongetter=None, op=None):
332 """This function process a bundle, apply effect to/from a repo
329 """This function process a bundle, apply effect to/from a repo
333
330
334 It iterates over each part then searches for and uses the proper handling
331 It iterates over each part then searches for and uses the proper handling
335 code to process the part. Parts are processed in order.
332 code to process the part. Parts are processed in order.
336
333
337 Unknown Mandatory part will abort the process.
334 Unknown Mandatory part will abort the process.
338
335
339 It is temporarily possible to provide a prebuilt bundleoperation to the
336 It is temporarily possible to provide a prebuilt bundleoperation to the
340 function. This is used to ensure output is properly propagated in case of
337 function. This is used to ensure output is properly propagated in case of
341 an error during the unbundling. This output capturing part will likely be
338 an error during the unbundling. This output capturing part will likely be
342 reworked and this ability will probably go away in the process.
339 reworked and this ability will probably go away in the process.
343 """
340 """
344 if op is None:
341 if op is None:
345 if transactiongetter is None:
342 if transactiongetter is None:
346 transactiongetter = _notransaction
343 transactiongetter = _notransaction
347 op = bundleoperation(repo, transactiongetter)
344 op = bundleoperation(repo, transactiongetter)
348 # todo:
345 # todo:
349 # - replace this is a init function soon.
346 # - replace this is a init function soon.
350 # - exception catching
347 # - exception catching
351 unbundler.params
348 unbundler.params
352 if repo.ui.debugflag:
349 if repo.ui.debugflag:
353 msg = ['bundle2-input-bundle:']
350 msg = ['bundle2-input-bundle:']
354 if unbundler.params:
351 if unbundler.params:
355 msg.append(' %i params')
352 msg.append(' %i params')
356 if op.gettransaction is None or op.gettransaction is _notransaction:
353 if op.gettransaction is None or op.gettransaction is _notransaction:
357 msg.append(' no-transaction')
354 msg.append(' no-transaction')
358 else:
355 else:
359 msg.append(' with-transaction')
356 msg.append(' with-transaction')
360 msg.append('\n')
357 msg.append('\n')
361 repo.ui.debug(''.join(msg))
358 repo.ui.debug(''.join(msg))
362 iterparts = enumerate(unbundler.iterparts())
359 iterparts = enumerate(unbundler.iterparts())
363 part = None
360 part = None
364 nbpart = 0
361 nbpart = 0
365 try:
362 try:
366 for nbpart, part in iterparts:
363 for nbpart, part in iterparts:
367 _processpart(op, part)
364 _processpart(op, part)
368 except Exception as exc:
365 except Exception as exc:
369 # Any exceptions seeking to the end of the bundle at this point are
366 # Any exceptions seeking to the end of the bundle at this point are
370 # almost certainly related to the underlying stream being bad.
367 # almost certainly related to the underlying stream being bad.
371 # And, chances are that the exception we're handling is related to
368 # And, chances are that the exception we're handling is related to
372 # getting in that bad state. So, we swallow the seeking error and
369 # getting in that bad state. So, we swallow the seeking error and
373 # re-raise the original error.
370 # re-raise the original error.
374 seekerror = False
371 seekerror = False
375 try:
372 try:
376 for nbpart, part in iterparts:
373 for nbpart, part in iterparts:
377 # consume the bundle content
374 # consume the bundle content
378 part.seek(0, 2)
375 part.seek(0, 2)
379 except Exception:
376 except Exception:
380 seekerror = True
377 seekerror = True
381
378
382 # Small hack to let caller code distinguish exceptions from bundle2
379 # Small hack to let caller code distinguish exceptions from bundle2
383 # processing from processing the old format. This is mostly
380 # processing from processing the old format. This is mostly
384 # needed to handle different return codes to unbundle according to the
381 # needed to handle different return codes to unbundle according to the
385 # type of bundle. We should probably clean up or drop this return code
382 # type of bundle. We should probably clean up or drop this return code
386 # craziness in a future version.
383 # craziness in a future version.
387 exc.duringunbundle2 = True
384 exc.duringunbundle2 = True
388 salvaged = []
385 salvaged = []
389 replycaps = None
386 replycaps = None
390 if op.reply is not None:
387 if op.reply is not None:
391 salvaged = op.reply.salvageoutput()
388 salvaged = op.reply.salvageoutput()
392 replycaps = op.reply.capabilities
389 replycaps = op.reply.capabilities
393 exc._replycaps = replycaps
390 exc._replycaps = replycaps
394 exc._bundle2salvagedoutput = salvaged
391 exc._bundle2salvagedoutput = salvaged
395
392
396 # Re-raising from a variable loses the original stack. So only use
393 # Re-raising from a variable loses the original stack. So only use
397 # that form if we need to.
394 # that form if we need to.
398 if seekerror:
395 if seekerror:
399 raise exc
396 raise exc
400 else:
397 else:
401 raise
398 raise
402 finally:
399 finally:
403 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
400 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
404
401
405 return op
402 return op
406
403
407 def _processchangegroup(op, cg, tr, source, url, **kwargs):
404 def _processchangegroup(op, cg, tr, source, url, **kwargs):
408 ret, addednodes = cg.apply(op.repo, tr, source, url, **kwargs)
405 ret, addednodes = cg.apply(op.repo, tr, source, url, **kwargs)
409 op.records.add('changegroup', {
406 op.records.add('changegroup', {
410 'return': ret,
407 'return': ret,
411 'addednodes': addednodes,
408 'addednodes': addednodes,
412 })
409 })
413 return ret
410 return ret
414
411
415 def _processpart(op, part):
412 def _processpart(op, part):
416 """process a single part from a bundle
413 """process a single part from a bundle
417
414
418 The part is guaranteed to have been fully consumed when the function exits
415 The part is guaranteed to have been fully consumed when the function exits
419 (even if an exception is raised)."""
416 (even if an exception is raised)."""
420 status = 'unknown' # used by debug output
417 status = 'unknown' # used by debug output
421 hardabort = False
418 hardabort = False
422 try:
419 try:
423 try:
420 try:
424 handler = parthandlermapping.get(part.type)
421 handler = parthandlermapping.get(part.type)
425 if handler is None:
422 if handler is None:
426 status = 'unsupported-type'
423 status = 'unsupported-type'
427 raise error.BundleUnknownFeatureError(parttype=part.type)
424 raise error.BundleUnknownFeatureError(parttype=part.type)
428 indebug(op.ui, 'found a handler for part %r' % part.type)
425 indebug(op.ui, 'found a handler for part %r' % part.type)
429 unknownparams = part.mandatorykeys - handler.params
426 unknownparams = part.mandatorykeys - handler.params
430 if unknownparams:
427 if unknownparams:
431 unknownparams = list(unknownparams)
428 unknownparams = list(unknownparams)
432 unknownparams.sort()
429 unknownparams.sort()
433 status = 'unsupported-params (%s)' % unknownparams
430 status = 'unsupported-params (%s)' % unknownparams
434 raise error.BundleUnknownFeatureError(parttype=part.type,
431 raise error.BundleUnknownFeatureError(parttype=part.type,
435 params=unknownparams)
432 params=unknownparams)
436 status = 'supported'
433 status = 'supported'
437 except error.BundleUnknownFeatureError as exc:
434 except error.BundleUnknownFeatureError as exc:
438 if part.mandatory: # mandatory parts
435 if part.mandatory: # mandatory parts
439 raise
436 raise
440 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
437 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
441 return # skip to part processing
438 return # skip to part processing
442 finally:
439 finally:
443 if op.ui.debugflag:
440 if op.ui.debugflag:
444 msg = ['bundle2-input-part: "%s"' % part.type]
441 msg = ['bundle2-input-part: "%s"' % part.type]
445 if not part.mandatory:
442 if not part.mandatory:
446 msg.append(' (advisory)')
443 msg.append(' (advisory)')
447 nbmp = len(part.mandatorykeys)
444 nbmp = len(part.mandatorykeys)
448 nbap = len(part.params) - nbmp
445 nbap = len(part.params) - nbmp
449 if nbmp or nbap:
446 if nbmp or nbap:
450 msg.append(' (params:')
447 msg.append(' (params:')
451 if nbmp:
448 if nbmp:
452 msg.append(' %i mandatory' % nbmp)
449 msg.append(' %i mandatory' % nbmp)
453 if nbap:
450 if nbap:
454 msg.append(' %i advisory' % nbmp)
451 msg.append(' %i advisory' % nbmp)
455 msg.append(')')
452 msg.append(')')
456 msg.append(' %s\n' % status)
453 msg.append(' %s\n' % status)
457 op.ui.debug(''.join(msg))
454 op.ui.debug(''.join(msg))
458
455
459 # handler is called outside the above try block so that we don't
456 # handler is called outside the above try block so that we don't
460 # risk catching KeyErrors from anything other than the
457 # risk catching KeyErrors from anything other than the
461 # parthandlermapping lookup (any KeyError raised by handler()
458 # parthandlermapping lookup (any KeyError raised by handler()
462 # itself represents a defect of a different variety).
459 # itself represents a defect of a different variety).
463 output = None
460 output = None
464 if op.captureoutput and op.reply is not None:
461 if op.captureoutput and op.reply is not None:
465 op.ui.pushbuffer(error=True, subproc=True)
462 op.ui.pushbuffer(error=True, subproc=True)
466 output = ''
463 output = ''
467 try:
464 try:
468 handler(op, part)
465 handler(op, part)
469 finally:
466 finally:
470 if output is not None:
467 if output is not None:
471 output = op.ui.popbuffer()
468 output = op.ui.popbuffer()
472 if output:
469 if output:
473 outpart = op.reply.newpart('output', data=output,
470 outpart = op.reply.newpart('output', data=output,
474 mandatory=False)
471 mandatory=False)
475 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
472 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
476 # If exiting or interrupted, do not attempt to seek the stream in the
473 # If exiting or interrupted, do not attempt to seek the stream in the
477 # finally block below. This makes abort faster.
474 # finally block below. This makes abort faster.
478 except (SystemExit, KeyboardInterrupt):
475 except (SystemExit, KeyboardInterrupt):
479 hardabort = True
476 hardabort = True
480 raise
477 raise
481 finally:
478 finally:
482 # consume the part content to not corrupt the stream.
479 # consume the part content to not corrupt the stream.
483 if not hardabort:
480 if not hardabort:
484 part.seek(0, 2)
481 part.seek(0, 2)
485
482
486
483
487 def decodecaps(blob):
484 def decodecaps(blob):
488 """decode a bundle2 caps bytes blob into a dictionary
485 """decode a bundle2 caps bytes blob into a dictionary
489
486
490 The blob is a list of capabilities (one per line)
487 The blob is a list of capabilities (one per line)
491 Capabilities may have values using a line of the form::
488 Capabilities may have values using a line of the form::
492
489
493 capability=value1,value2,value3
490 capability=value1,value2,value3
494
491
495 The values are always a list."""
492 The values are always a list."""
496 caps = {}
493 caps = {}
497 for line in blob.splitlines():
494 for line in blob.splitlines():
498 if not line:
495 if not line:
499 continue
496 continue
500 if '=' not in line:
497 if '=' not in line:
501 key, vals = line, ()
498 key, vals = line, ()
502 else:
499 else:
503 key, vals = line.split('=', 1)
500 key, vals = line.split('=', 1)
504 vals = vals.split(',')
501 vals = vals.split(',')
505 key = urlreq.unquote(key)
502 key = urlreq.unquote(key)
506 vals = [urlreq.unquote(v) for v in vals]
503 vals = [urlreq.unquote(v) for v in vals]
507 caps[key] = vals
504 caps[key] = vals
508 return caps
505 return caps
509
506
510 def encodecaps(caps):
507 def encodecaps(caps):
511 """encode a bundle2 caps dictionary into a bytes blob"""
508 """encode a bundle2 caps dictionary into a bytes blob"""
512 chunks = []
509 chunks = []
513 for ca in sorted(caps):
510 for ca in sorted(caps):
514 vals = caps[ca]
511 vals = caps[ca]
515 ca = urlreq.quote(ca)
512 ca = urlreq.quote(ca)
516 vals = [urlreq.quote(v) for v in vals]
513 vals = [urlreq.quote(v) for v in vals]
517 if vals:
514 if vals:
518 ca = "%s=%s" % (ca, ','.join(vals))
515 ca = "%s=%s" % (ca, ','.join(vals))
519 chunks.append(ca)
516 chunks.append(ca)
520 return '\n'.join(chunks)
517 return '\n'.join(chunks)
521
518
522 bundletypes = {
519 bundletypes = {
523 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
520 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
524 # since the unification ssh accepts a header but there
521 # since the unification ssh accepts a header but there
525 # is no capability signaling it.
522 # is no capability signaling it.
526 "HG20": (), # special-cased below
523 "HG20": (), # special-cased below
527 "HG10UN": ("HG10UN", 'UN'),
524 "HG10UN": ("HG10UN", 'UN'),
528 "HG10BZ": ("HG10", 'BZ'),
525 "HG10BZ": ("HG10", 'BZ'),
529 "HG10GZ": ("HG10GZ", 'GZ'),
526 "HG10GZ": ("HG10GZ", 'GZ'),
530 }
527 }
531
528
532 # hgweb uses this list to communicate its preferred type
529 # hgweb uses this list to communicate its preferred type
533 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
530 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
534
531
535 class bundle20(object):
532 class bundle20(object):
536 """represent an outgoing bundle2 container
533 """represent an outgoing bundle2 container
537
534
538 Use the `addparam` method to add stream level parameter. and `newpart` to
535 Use the `addparam` method to add stream level parameter. and `newpart` to
539 populate it. Then call `getchunks` to retrieve all the binary chunks of
536 populate it. Then call `getchunks` to retrieve all the binary chunks of
540 data that compose the bundle2 container."""
537 data that compose the bundle2 container."""
541
538
542 _magicstring = 'HG20'
539 _magicstring = 'HG20'
543
540
544 def __init__(self, ui, capabilities=()):
541 def __init__(self, ui, capabilities=()):
545 self.ui = ui
542 self.ui = ui
546 self._params = []
543 self._params = []
547 self._parts = []
544 self._parts = []
548 self.capabilities = dict(capabilities)
545 self.capabilities = dict(capabilities)
549 self._compengine = util.compengines.forbundletype('UN')
546 self._compengine = util.compengines.forbundletype('UN')
550 self._compopts = None
547 self._compopts = None
551
548
552 def setcompression(self, alg, compopts=None):
549 def setcompression(self, alg, compopts=None):
553 """setup core part compression to <alg>"""
550 """setup core part compression to <alg>"""
554 if alg in (None, 'UN'):
551 if alg in (None, 'UN'):
555 return
552 return
556 assert not any(n.lower() == 'compression' for n, v in self._params)
553 assert not any(n.lower() == 'compression' for n, v in self._params)
557 self.addparam('Compression', alg)
554 self.addparam('Compression', alg)
558 self._compengine = util.compengines.forbundletype(alg)
555 self._compengine = util.compengines.forbundletype(alg)
559 self._compopts = compopts
556 self._compopts = compopts
560
557
561 @property
558 @property
562 def nbparts(self):
559 def nbparts(self):
563 """total number of parts added to the bundler"""
560 """total number of parts added to the bundler"""
564 return len(self._parts)
561 return len(self._parts)
565
562
566 # methods used to defines the bundle2 content
563 # methods used to defines the bundle2 content
567 def addparam(self, name, value=None):
564 def addparam(self, name, value=None):
568 """add a stream level parameter"""
565 """add a stream level parameter"""
569 if not name:
566 if not name:
570 raise ValueError('empty parameter name')
567 raise ValueError('empty parameter name')
571 if name[0] not in string.letters:
568 if name[0] not in string.letters:
572 raise ValueError('non letter first character: %r' % name)
569 raise ValueError('non letter first character: %r' % name)
573 self._params.append((name, value))
570 self._params.append((name, value))
574
571
575 def addpart(self, part):
572 def addpart(self, part):
576 """add a new part to the bundle2 container
573 """add a new part to the bundle2 container
577
574
578 Parts contains the actual applicative payload."""
575 Parts contains the actual applicative payload."""
579 assert part.id is None
576 assert part.id is None
580 part.id = len(self._parts) # very cheap counter
577 part.id = len(self._parts) # very cheap counter
581 self._parts.append(part)
578 self._parts.append(part)
582
579
583 def newpart(self, typeid, *args, **kwargs):
580 def newpart(self, typeid, *args, **kwargs):
584 """create a new part and add it to the containers
581 """create a new part and add it to the containers
585
582
586 As the part is directly added to the containers. For now, this means
583 As the part is directly added to the containers. For now, this means
587 that any failure to properly initialize the part after calling
584 that any failure to properly initialize the part after calling
588 ``newpart`` should result in a failure of the whole bundling process.
585 ``newpart`` should result in a failure of the whole bundling process.
589
586
590 You can still fall back to manually create and add if you need better
587 You can still fall back to manually create and add if you need better
591 control."""
588 control."""
592 part = bundlepart(typeid, *args, **kwargs)
589 part = bundlepart(typeid, *args, **kwargs)
593 self.addpart(part)
590 self.addpart(part)
594 return part
591 return part
595
592
596 # methods used to generate the bundle2 stream
593 # methods used to generate the bundle2 stream
597 def getchunks(self):
594 def getchunks(self):
598 if self.ui.debugflag:
595 if self.ui.debugflag:
599 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
596 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
600 if self._params:
597 if self._params:
601 msg.append(' (%i params)' % len(self._params))
598 msg.append(' (%i params)' % len(self._params))
602 msg.append(' %i parts total\n' % len(self._parts))
599 msg.append(' %i parts total\n' % len(self._parts))
603 self.ui.debug(''.join(msg))
600 self.ui.debug(''.join(msg))
604 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
601 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
605 yield self._magicstring
602 yield self._magicstring
606 param = self._paramchunk()
603 param = self._paramchunk()
607 outdebug(self.ui, 'bundle parameter: %s' % param)
604 outdebug(self.ui, 'bundle parameter: %s' % param)
608 yield _pack(_fstreamparamsize, len(param))
605 yield _pack(_fstreamparamsize, len(param))
609 if param:
606 if param:
610 yield param
607 yield param
611 for chunk in self._compengine.compressstream(self._getcorechunk(),
608 for chunk in self._compengine.compressstream(self._getcorechunk(),
612 self._compopts):
609 self._compopts):
613 yield chunk
610 yield chunk
614
611
615 def _paramchunk(self):
612 def _paramchunk(self):
616 """return a encoded version of all stream parameters"""
613 """return a encoded version of all stream parameters"""
617 blocks = []
614 blocks = []
618 for par, value in self._params:
615 for par, value in self._params:
619 par = urlreq.quote(par)
616 par = urlreq.quote(par)
620 if value is not None:
617 if value is not None:
621 value = urlreq.quote(value)
618 value = urlreq.quote(value)
622 par = '%s=%s' % (par, value)
619 par = '%s=%s' % (par, value)
623 blocks.append(par)
620 blocks.append(par)
624 return ' '.join(blocks)
621 return ' '.join(blocks)
625
622
626 def _getcorechunk(self):
623 def _getcorechunk(self):
627 """yield chunk for the core part of the bundle
624 """yield chunk for the core part of the bundle
628
625
629 (all but headers and parameters)"""
626 (all but headers and parameters)"""
630 outdebug(self.ui, 'start of parts')
627 outdebug(self.ui, 'start of parts')
631 for part in self._parts:
628 for part in self._parts:
632 outdebug(self.ui, 'bundle part: "%s"' % part.type)
629 outdebug(self.ui, 'bundle part: "%s"' % part.type)
633 for chunk in part.getchunks(ui=self.ui):
630 for chunk in part.getchunks(ui=self.ui):
634 yield chunk
631 yield chunk
635 outdebug(self.ui, 'end of bundle')
632 outdebug(self.ui, 'end of bundle')
636 yield _pack(_fpartheadersize, 0)
633 yield _pack(_fpartheadersize, 0)
637
634
638
635
639 def salvageoutput(self):
636 def salvageoutput(self):
640 """return a list with a copy of all output parts in the bundle
637 """return a list with a copy of all output parts in the bundle
641
638
642 This is meant to be used during error handling to make sure we preserve
639 This is meant to be used during error handling to make sure we preserve
643 server output"""
640 server output"""
644 salvaged = []
641 salvaged = []
645 for part in self._parts:
642 for part in self._parts:
646 if part.type.startswith('output'):
643 if part.type.startswith('output'):
647 salvaged.append(part.copy())
644 salvaged.append(part.copy())
648 return salvaged
645 return salvaged
649
646
650
647
651 class unpackermixin(object):
648 class unpackermixin(object):
652 """A mixin to extract bytes and struct data from a stream"""
649 """A mixin to extract bytes and struct data from a stream"""
653
650
654 def __init__(self, fp):
651 def __init__(self, fp):
655 self._fp = fp
652 self._fp = fp
656
653
657 def _unpack(self, format):
654 def _unpack(self, format):
658 """unpack this struct format from the stream
655 """unpack this struct format from the stream
659
656
660 This method is meant for internal usage by the bundle2 protocol only.
657 This method is meant for internal usage by the bundle2 protocol only.
661 They directly manipulate the low level stream including bundle2 level
658 They directly manipulate the low level stream including bundle2 level
662 instruction.
659 instruction.
663
660
664 Do not use it to implement higher-level logic or methods."""
661 Do not use it to implement higher-level logic or methods."""
665 data = self._readexact(struct.calcsize(format))
662 data = self._readexact(struct.calcsize(format))
666 return _unpack(format, data)
663 return _unpack(format, data)
667
664
668 def _readexact(self, size):
665 def _readexact(self, size):
669 """read exactly <size> bytes from the stream
666 """read exactly <size> bytes from the stream
670
667
671 This method is meant for internal usage by the bundle2 protocol only.
668 This method is meant for internal usage by the bundle2 protocol only.
672 They directly manipulate the low level stream including bundle2 level
669 They directly manipulate the low level stream including bundle2 level
673 instruction.
670 instruction.
674
671
675 Do not use it to implement higher-level logic or methods."""
672 Do not use it to implement higher-level logic or methods."""
676 return changegroup.readexactly(self._fp, size)
673 return changegroup.readexactly(self._fp, size)
677
674
678 def getunbundler(ui, fp, magicstring=None):
675 def getunbundler(ui, fp, magicstring=None):
679 """return a valid unbundler object for a given magicstring"""
676 """return a valid unbundler object for a given magicstring"""
680 if magicstring is None:
677 if magicstring is None:
681 magicstring = changegroup.readexactly(fp, 4)
678 magicstring = changegroup.readexactly(fp, 4)
682 magic, version = magicstring[0:2], magicstring[2:4]
679 magic, version = magicstring[0:2], magicstring[2:4]
683 if magic != 'HG':
680 if magic != 'HG':
684 raise error.Abort(_('not a Mercurial bundle'))
681 raise error.Abort(_('not a Mercurial bundle'))
685 unbundlerclass = formatmap.get(version)
682 unbundlerclass = formatmap.get(version)
686 if unbundlerclass is None:
683 if unbundlerclass is None:
687 raise error.Abort(_('unknown bundle version %s') % version)
684 raise error.Abort(_('unknown bundle version %s') % version)
688 unbundler = unbundlerclass(ui, fp)
685 unbundler = unbundlerclass(ui, fp)
689 indebug(ui, 'start processing of %s stream' % magicstring)
686 indebug(ui, 'start processing of %s stream' % magicstring)
690 return unbundler
687 return unbundler
691
688
692 class unbundle20(unpackermixin):
689 class unbundle20(unpackermixin):
693 """interpret a bundle2 stream
690 """interpret a bundle2 stream
694
691
695 This class is fed with a binary stream and yields parts through its
692 This class is fed with a binary stream and yields parts through its
696 `iterparts` methods."""
693 `iterparts` methods."""
697
694
698 _magicstring = 'HG20'
695 _magicstring = 'HG20'
699
696
700 def __init__(self, ui, fp):
697 def __init__(self, ui, fp):
701 """If header is specified, we do not read it out of the stream."""
698 """If header is specified, we do not read it out of the stream."""
702 self.ui = ui
699 self.ui = ui
703 self._compengine = util.compengines.forbundletype('UN')
700 self._compengine = util.compengines.forbundletype('UN')
704 self._compressed = None
701 self._compressed = None
705 super(unbundle20, self).__init__(fp)
702 super(unbundle20, self).__init__(fp)
706
703
707 @util.propertycache
704 @util.propertycache
708 def params(self):
705 def params(self):
709 """dictionary of stream level parameters"""
706 """dictionary of stream level parameters"""
710 indebug(self.ui, 'reading bundle2 stream parameters')
707 indebug(self.ui, 'reading bundle2 stream parameters')
711 params = {}
708 params = {}
712 paramssize = self._unpack(_fstreamparamsize)[0]
709 paramssize = self._unpack(_fstreamparamsize)[0]
713 if paramssize < 0:
710 if paramssize < 0:
714 raise error.BundleValueError('negative bundle param size: %i'
711 raise error.BundleValueError('negative bundle param size: %i'
715 % paramssize)
712 % paramssize)
716 if paramssize:
713 if paramssize:
717 params = self._readexact(paramssize)
714 params = self._readexact(paramssize)
718 params = self._processallparams(params)
715 params = self._processallparams(params)
719 return params
716 return params
720
717
721 def _processallparams(self, paramsblock):
718 def _processallparams(self, paramsblock):
722 """"""
719 """"""
723 params = util.sortdict()
720 params = util.sortdict()
724 for p in paramsblock.split(' '):
721 for p in paramsblock.split(' '):
725 p = p.split('=', 1)
722 p = p.split('=', 1)
726 p = [urlreq.unquote(i) for i in p]
723 p = [urlreq.unquote(i) for i in p]
727 if len(p) < 2:
724 if len(p) < 2:
728 p.append(None)
725 p.append(None)
729 self._processparam(*p)
726 self._processparam(*p)
730 params[p[0]] = p[1]
727 params[p[0]] = p[1]
731 return params
728 return params
732
729
733
730
734 def _processparam(self, name, value):
731 def _processparam(self, name, value):
735 """process a parameter, applying its effect if needed
732 """process a parameter, applying its effect if needed
736
733
737 Parameter starting with a lower case letter are advisory and will be
734 Parameter starting with a lower case letter are advisory and will be
738 ignored when unknown. Those starting with an upper case letter are
735 ignored when unknown. Those starting with an upper case letter are
739 mandatory and will this function will raise a KeyError when unknown.
736 mandatory and will this function will raise a KeyError when unknown.
740
737
741 Note: no option are currently supported. Any input will be either
738 Note: no option are currently supported. Any input will be either
742 ignored or failing.
739 ignored or failing.
743 """
740 """
744 if not name:
741 if not name:
745 raise ValueError('empty parameter name')
742 raise ValueError('empty parameter name')
746 if name[0] not in string.letters:
743 if name[0] not in string.letters:
747 raise ValueError('non letter first character: %r' % name)
744 raise ValueError('non letter first character: %r' % name)
748 try:
745 try:
749 handler = b2streamparamsmap[name.lower()]
746 handler = b2streamparamsmap[name.lower()]
750 except KeyError:
747 except KeyError:
751 if name[0].islower():
748 if name[0].islower():
752 indebug(self.ui, "ignoring unknown parameter %r" % name)
749 indebug(self.ui, "ignoring unknown parameter %r" % name)
753 else:
750 else:
754 raise error.BundleUnknownFeatureError(params=(name,))
751 raise error.BundleUnknownFeatureError(params=(name,))
755 else:
752 else:
756 handler(self, name, value)
753 handler(self, name, value)
757
754
758 def _forwardchunks(self):
755 def _forwardchunks(self):
759 """utility to transfer a bundle2 as binary
756 """utility to transfer a bundle2 as binary
760
757
761 This is made necessary by the fact the 'getbundle' command over 'ssh'
758 This is made necessary by the fact the 'getbundle' command over 'ssh'
762 have no way to know then the reply end, relying on the bundle to be
759 have no way to know then the reply end, relying on the bundle to be
763 interpreted to know its end. This is terrible and we are sorry, but we
760 interpreted to know its end. This is terrible and we are sorry, but we
764 needed to move forward to get general delta enabled.
761 needed to move forward to get general delta enabled.
765 """
762 """
766 yield self._magicstring
763 yield self._magicstring
767 assert 'params' not in vars(self)
764 assert 'params' not in vars(self)
768 paramssize = self._unpack(_fstreamparamsize)[0]
765 paramssize = self._unpack(_fstreamparamsize)[0]
769 if paramssize < 0:
766 if paramssize < 0:
770 raise error.BundleValueError('negative bundle param size: %i'
767 raise error.BundleValueError('negative bundle param size: %i'
771 % paramssize)
768 % paramssize)
772 yield _pack(_fstreamparamsize, paramssize)
769 yield _pack(_fstreamparamsize, paramssize)
773 if paramssize:
770 if paramssize:
774 params = self._readexact(paramssize)
771 params = self._readexact(paramssize)
775 self._processallparams(params)
772 self._processallparams(params)
776 yield params
773 yield params
777 assert self._compengine.bundletype == 'UN'
774 assert self._compengine.bundletype == 'UN'
778 # From there, payload might need to be decompressed
775 # From there, payload might need to be decompressed
779 self._fp = self._compengine.decompressorreader(self._fp)
776 self._fp = self._compengine.decompressorreader(self._fp)
780 emptycount = 0
777 emptycount = 0
781 while emptycount < 2:
778 while emptycount < 2:
782 # so we can brainlessly loop
779 # so we can brainlessly loop
783 assert _fpartheadersize == _fpayloadsize
780 assert _fpartheadersize == _fpayloadsize
784 size = self._unpack(_fpartheadersize)[0]
781 size = self._unpack(_fpartheadersize)[0]
785 yield _pack(_fpartheadersize, size)
782 yield _pack(_fpartheadersize, size)
786 if size:
783 if size:
787 emptycount = 0
784 emptycount = 0
788 else:
785 else:
789 emptycount += 1
786 emptycount += 1
790 continue
787 continue
791 if size == flaginterrupt:
788 if size == flaginterrupt:
792 continue
789 continue
793 elif size < 0:
790 elif size < 0:
794 raise error.BundleValueError('negative chunk size: %i')
791 raise error.BundleValueError('negative chunk size: %i')
795 yield self._readexact(size)
792 yield self._readexact(size)
796
793
797
794
798 def iterparts(self):
795 def iterparts(self):
799 """yield all parts contained in the stream"""
796 """yield all parts contained in the stream"""
800 # make sure param have been loaded
797 # make sure param have been loaded
801 self.params
798 self.params
802 # From there, payload need to be decompressed
799 # From there, payload need to be decompressed
803 self._fp = self._compengine.decompressorreader(self._fp)
800 self._fp = self._compengine.decompressorreader(self._fp)
804 indebug(self.ui, 'start extraction of bundle2 parts')
801 indebug(self.ui, 'start extraction of bundle2 parts')
805 headerblock = self._readpartheader()
802 headerblock = self._readpartheader()
806 while headerblock is not None:
803 while headerblock is not None:
807 part = unbundlepart(self.ui, headerblock, self._fp)
804 part = unbundlepart(self.ui, headerblock, self._fp)
808 yield part
805 yield part
809 part.seek(0, 2)
806 part.seek(0, 2)
810 headerblock = self._readpartheader()
807 headerblock = self._readpartheader()
811 indebug(self.ui, 'end of bundle2 stream')
808 indebug(self.ui, 'end of bundle2 stream')
812
809
813 def _readpartheader(self):
810 def _readpartheader(self):
814 """reads a part header size and return the bytes blob
811 """reads a part header size and return the bytes blob
815
812
816 returns None if empty"""
813 returns None if empty"""
817 headersize = self._unpack(_fpartheadersize)[0]
814 headersize = self._unpack(_fpartheadersize)[0]
818 if headersize < 0:
815 if headersize < 0:
819 raise error.BundleValueError('negative part header size: %i'
816 raise error.BundleValueError('negative part header size: %i'
820 % headersize)
817 % headersize)
821 indebug(self.ui, 'part header size: %i' % headersize)
818 indebug(self.ui, 'part header size: %i' % headersize)
822 if headersize:
819 if headersize:
823 return self._readexact(headersize)
820 return self._readexact(headersize)
824 return None
821 return None
825
822
826 def compressed(self):
823 def compressed(self):
827 self.params # load params
824 self.params # load params
828 return self._compressed
825 return self._compressed
829
826
830 def close(self):
827 def close(self):
831 """close underlying file"""
828 """close underlying file"""
832 if util.safehasattr(self._fp, 'close'):
829 if util.safehasattr(self._fp, 'close'):
833 return self._fp.close()
830 return self._fp.close()
834
831
835 formatmap = {'20': unbundle20}
832 formatmap = {'20': unbundle20}
836
833
837 b2streamparamsmap = {}
834 b2streamparamsmap = {}
838
835
839 def b2streamparamhandler(name):
836 def b2streamparamhandler(name):
840 """register a handler for a stream level parameter"""
837 """register a handler for a stream level parameter"""
841 def decorator(func):
838 def decorator(func):
842 assert name not in formatmap
839 assert name not in formatmap
843 b2streamparamsmap[name] = func
840 b2streamparamsmap[name] = func
844 return func
841 return func
845 return decorator
842 return decorator
846
843
847 @b2streamparamhandler('compression')
844 @b2streamparamhandler('compression')
848 def processcompression(unbundler, param, value):
845 def processcompression(unbundler, param, value):
849 """read compression parameter and install payload decompression"""
846 """read compression parameter and install payload decompression"""
850 if value not in util.compengines.supportedbundletypes:
847 if value not in util.compengines.supportedbundletypes:
851 raise error.BundleUnknownFeatureError(params=(param,),
848 raise error.BundleUnknownFeatureError(params=(param,),
852 values=(value,))
849 values=(value,))
853 unbundler._compengine = util.compengines.forbundletype(value)
850 unbundler._compengine = util.compengines.forbundletype(value)
854 if value is not None:
851 if value is not None:
855 unbundler._compressed = True
852 unbundler._compressed = True
856
853
857 class bundlepart(object):
854 class bundlepart(object):
858 """A bundle2 part contains application level payload
855 """A bundle2 part contains application level payload
859
856
860 The part `type` is used to route the part to the application level
857 The part `type` is used to route the part to the application level
861 handler.
858 handler.
862
859
863 The part payload is contained in ``part.data``. It could be raw bytes or a
860 The part payload is contained in ``part.data``. It could be raw bytes or a
864 generator of byte chunks.
861 generator of byte chunks.
865
862
866 You can add parameters to the part using the ``addparam`` method.
863 You can add parameters to the part using the ``addparam`` method.
867 Parameters can be either mandatory (default) or advisory. Remote side
864 Parameters can be either mandatory (default) or advisory. Remote side
868 should be able to safely ignore the advisory ones.
865 should be able to safely ignore the advisory ones.
869
866
870 Both data and parameters cannot be modified after the generation has begun.
867 Both data and parameters cannot be modified after the generation has begun.
871 """
868 """
872
869
873 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
870 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
874 data='', mandatory=True):
871 data='', mandatory=True):
875 validateparttype(parttype)
872 validateparttype(parttype)
876 self.id = None
873 self.id = None
877 self.type = parttype
874 self.type = parttype
878 self._data = data
875 self._data = data
879 self._mandatoryparams = list(mandatoryparams)
876 self._mandatoryparams = list(mandatoryparams)
880 self._advisoryparams = list(advisoryparams)
877 self._advisoryparams = list(advisoryparams)
881 # checking for duplicated entries
878 # checking for duplicated entries
882 self._seenparams = set()
879 self._seenparams = set()
883 for pname, __ in self._mandatoryparams + self._advisoryparams:
880 for pname, __ in self._mandatoryparams + self._advisoryparams:
884 if pname in self._seenparams:
881 if pname in self._seenparams:
885 raise error.ProgrammingError('duplicated params: %s' % pname)
882 raise error.ProgrammingError('duplicated params: %s' % pname)
886 self._seenparams.add(pname)
883 self._seenparams.add(pname)
887 # status of the part's generation:
884 # status of the part's generation:
888 # - None: not started,
885 # - None: not started,
889 # - False: currently generated,
886 # - False: currently generated,
890 # - True: generation done.
887 # - True: generation done.
891 self._generated = None
888 self._generated = None
892 self.mandatory = mandatory
889 self.mandatory = mandatory
893
890
894 def __repr__(self):
891 def __repr__(self):
895 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
892 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
896 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
893 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
897 % (cls, id(self), self.id, self.type, self.mandatory))
894 % (cls, id(self), self.id, self.type, self.mandatory))
898
895
899 def copy(self):
896 def copy(self):
900 """return a copy of the part
897 """return a copy of the part
901
898
902 The new part have the very same content but no partid assigned yet.
899 The new part have the very same content but no partid assigned yet.
903 Parts with generated data cannot be copied."""
900 Parts with generated data cannot be copied."""
904 assert not util.safehasattr(self.data, 'next')
901 assert not util.safehasattr(self.data, 'next')
905 return self.__class__(self.type, self._mandatoryparams,
902 return self.__class__(self.type, self._mandatoryparams,
906 self._advisoryparams, self._data, self.mandatory)
903 self._advisoryparams, self._data, self.mandatory)
907
904
908 # methods used to defines the part content
905 # methods used to defines the part content
909 @property
906 @property
910 def data(self):
907 def data(self):
911 return self._data
908 return self._data
912
909
913 @data.setter
910 @data.setter
914 def data(self, data):
911 def data(self, data):
915 if self._generated is not None:
912 if self._generated is not None:
916 raise error.ReadOnlyPartError('part is being generated')
913 raise error.ReadOnlyPartError('part is being generated')
917 self._data = data
914 self._data = data
918
915
919 @property
916 @property
920 def mandatoryparams(self):
917 def mandatoryparams(self):
921 # make it an immutable tuple to force people through ``addparam``
918 # make it an immutable tuple to force people through ``addparam``
922 return tuple(self._mandatoryparams)
919 return tuple(self._mandatoryparams)
923
920
924 @property
921 @property
925 def advisoryparams(self):
922 def advisoryparams(self):
926 # make it an immutable tuple to force people through ``addparam``
923 # make it an immutable tuple to force people through ``addparam``
927 return tuple(self._advisoryparams)
924 return tuple(self._advisoryparams)
928
925
929 def addparam(self, name, value='', mandatory=True):
926 def addparam(self, name, value='', mandatory=True):
930 """add a parameter to the part
927 """add a parameter to the part
931
928
932 If 'mandatory' is set to True, the remote handler must claim support
929 If 'mandatory' is set to True, the remote handler must claim support
933 for this parameter or the unbundling will be aborted.
930 for this parameter or the unbundling will be aborted.
934
931
935 The 'name' and 'value' cannot exceed 255 bytes each.
932 The 'name' and 'value' cannot exceed 255 bytes each.
936 """
933 """
937 if self._generated is not None:
934 if self._generated is not None:
938 raise error.ReadOnlyPartError('part is being generated')
935 raise error.ReadOnlyPartError('part is being generated')
939 if name in self._seenparams:
936 if name in self._seenparams:
940 raise ValueError('duplicated params: %s' % name)
937 raise ValueError('duplicated params: %s' % name)
941 self._seenparams.add(name)
938 self._seenparams.add(name)
942 params = self._advisoryparams
939 params = self._advisoryparams
943 if mandatory:
940 if mandatory:
944 params = self._mandatoryparams
941 params = self._mandatoryparams
945 params.append((name, value))
942 params.append((name, value))
946
943
947 # methods used to generates the bundle2 stream
944 # methods used to generates the bundle2 stream
948 def getchunks(self, ui):
945 def getchunks(self, ui):
949 if self._generated is not None:
946 if self._generated is not None:
950 raise error.ProgrammingError('part can only be consumed once')
947 raise error.ProgrammingError('part can only be consumed once')
951 self._generated = False
948 self._generated = False
952
949
953 if ui.debugflag:
950 if ui.debugflag:
954 msg = ['bundle2-output-part: "%s"' % self.type]
951 msg = ['bundle2-output-part: "%s"' % self.type]
955 if not self.mandatory:
952 if not self.mandatory:
956 msg.append(' (advisory)')
953 msg.append(' (advisory)')
957 nbmp = len(self.mandatoryparams)
954 nbmp = len(self.mandatoryparams)
958 nbap = len(self.advisoryparams)
955 nbap = len(self.advisoryparams)
959 if nbmp or nbap:
956 if nbmp or nbap:
960 msg.append(' (params:')
957 msg.append(' (params:')
961 if nbmp:
958 if nbmp:
962 msg.append(' %i mandatory' % nbmp)
959 msg.append(' %i mandatory' % nbmp)
963 if nbap:
960 if nbap:
964 msg.append(' %i advisory' % nbmp)
961 msg.append(' %i advisory' % nbmp)
965 msg.append(')')
962 msg.append(')')
966 if not self.data:
963 if not self.data:
967 msg.append(' empty payload')
964 msg.append(' empty payload')
968 elif util.safehasattr(self.data, 'next'):
965 elif util.safehasattr(self.data, 'next'):
969 msg.append(' streamed payload')
966 msg.append(' streamed payload')
970 else:
967 else:
971 msg.append(' %i bytes payload' % len(self.data))
968 msg.append(' %i bytes payload' % len(self.data))
972 msg.append('\n')
969 msg.append('\n')
973 ui.debug(''.join(msg))
970 ui.debug(''.join(msg))
974
971
975 #### header
972 #### header
976 if self.mandatory:
973 if self.mandatory:
977 parttype = self.type.upper()
974 parttype = self.type.upper()
978 else:
975 else:
979 parttype = self.type.lower()
976 parttype = self.type.lower()
980 outdebug(ui, 'part %s: "%s"' % (self.id, parttype))
977 outdebug(ui, 'part %s: "%s"' % (self.id, parttype))
981 ## parttype
978 ## parttype
982 header = [_pack(_fparttypesize, len(parttype)),
979 header = [_pack(_fparttypesize, len(parttype)),
983 parttype, _pack(_fpartid, self.id),
980 parttype, _pack(_fpartid, self.id),
984 ]
981 ]
985 ## parameters
982 ## parameters
986 # count
983 # count
987 manpar = self.mandatoryparams
984 manpar = self.mandatoryparams
988 advpar = self.advisoryparams
985 advpar = self.advisoryparams
989 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
986 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
990 # size
987 # size
991 parsizes = []
988 parsizes = []
992 for key, value in manpar:
989 for key, value in manpar:
993 parsizes.append(len(key))
990 parsizes.append(len(key))
994 parsizes.append(len(value))
991 parsizes.append(len(value))
995 for key, value in advpar:
992 for key, value in advpar:
996 parsizes.append(len(key))
993 parsizes.append(len(key))
997 parsizes.append(len(value))
994 parsizes.append(len(value))
998 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
995 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
999 header.append(paramsizes)
996 header.append(paramsizes)
1000 # key, value
997 # key, value
1001 for key, value in manpar:
998 for key, value in manpar:
1002 header.append(key)
999 header.append(key)
1003 header.append(value)
1000 header.append(value)
1004 for key, value in advpar:
1001 for key, value in advpar:
1005 header.append(key)
1002 header.append(key)
1006 header.append(value)
1003 header.append(value)
1007 ## finalize header
1004 ## finalize header
1008 headerchunk = ''.join(header)
1005 headerchunk = ''.join(header)
1009 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
1006 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
1010 yield _pack(_fpartheadersize, len(headerchunk))
1007 yield _pack(_fpartheadersize, len(headerchunk))
1011 yield headerchunk
1008 yield headerchunk
1012 ## payload
1009 ## payload
1013 try:
1010 try:
1014 for chunk in self._payloadchunks():
1011 for chunk in self._payloadchunks():
1015 outdebug(ui, 'payload chunk size: %i' % len(chunk))
1012 outdebug(ui, 'payload chunk size: %i' % len(chunk))
1016 yield _pack(_fpayloadsize, len(chunk))
1013 yield _pack(_fpayloadsize, len(chunk))
1017 yield chunk
1014 yield chunk
1018 except GeneratorExit:
1015 except GeneratorExit:
1019 # GeneratorExit means that nobody is listening for our
1016 # GeneratorExit means that nobody is listening for our
1020 # results anyway, so just bail quickly rather than trying
1017 # results anyway, so just bail quickly rather than trying
1021 # to produce an error part.
1018 # to produce an error part.
1022 ui.debug('bundle2-generatorexit\n')
1019 ui.debug('bundle2-generatorexit\n')
1023 raise
1020 raise
1024 except BaseException as exc:
1021 except BaseException as exc:
1025 # backup exception data for later
1022 # backup exception data for later
1026 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
1023 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
1027 % exc)
1024 % exc)
1028 tb = sys.exc_info()[2]
1025 tb = sys.exc_info()[2]
1029 msg = 'unexpected error: %s' % exc
1026 msg = 'unexpected error: %s' % exc
1030 interpart = bundlepart('error:abort', [('message', msg)],
1027 interpart = bundlepart('error:abort', [('message', msg)],
1031 mandatory=False)
1028 mandatory=False)
1032 interpart.id = 0
1029 interpart.id = 0
1033 yield _pack(_fpayloadsize, -1)
1030 yield _pack(_fpayloadsize, -1)
1034 for chunk in interpart.getchunks(ui=ui):
1031 for chunk in interpart.getchunks(ui=ui):
1035 yield chunk
1032 yield chunk
1036 outdebug(ui, 'closing payload chunk')
1033 outdebug(ui, 'closing payload chunk')
1037 # abort current part payload
1034 # abort current part payload
1038 yield _pack(_fpayloadsize, 0)
1035 yield _pack(_fpayloadsize, 0)
1039 pycompat.raisewithtb(exc, tb)
1036 pycompat.raisewithtb(exc, tb)
1040 # end of payload
1037 # end of payload
1041 outdebug(ui, 'closing payload chunk')
1038 outdebug(ui, 'closing payload chunk')
1042 yield _pack(_fpayloadsize, 0)
1039 yield _pack(_fpayloadsize, 0)
1043 self._generated = True
1040 self._generated = True
1044
1041
1045 def _payloadchunks(self):
1042 def _payloadchunks(self):
1046 """yield chunks of a the part payload
1043 """yield chunks of a the part payload
1047
1044
1048 Exists to handle the different methods to provide data to a part."""
1045 Exists to handle the different methods to provide data to a part."""
1049 # we only support fixed size data now.
1046 # we only support fixed size data now.
1050 # This will be improved in the future.
1047 # This will be improved in the future.
1051 if util.safehasattr(self.data, 'next'):
1048 if util.safehasattr(self.data, 'next'):
1052 buff = util.chunkbuffer(self.data)
1049 buff = util.chunkbuffer(self.data)
1053 chunk = buff.read(preferedchunksize)
1050 chunk = buff.read(preferedchunksize)
1054 while chunk:
1051 while chunk:
1055 yield chunk
1052 yield chunk
1056 chunk = buff.read(preferedchunksize)
1053 chunk = buff.read(preferedchunksize)
1057 elif len(self.data):
1054 elif len(self.data):
1058 yield self.data
1055 yield self.data
1059
1056
1060
1057
1061 flaginterrupt = -1
1058 flaginterrupt = -1
1062
1059
1063 class interrupthandler(unpackermixin):
1060 class interrupthandler(unpackermixin):
1064 """read one part and process it with restricted capability
1061 """read one part and process it with restricted capability
1065
1062
1066 This allows to transmit exception raised on the producer size during part
1063 This allows to transmit exception raised on the producer size during part
1067 iteration while the consumer is reading a part.
1064 iteration while the consumer is reading a part.
1068
1065
1069 Part processed in this manner only have access to a ui object,"""
1066 Part processed in this manner only have access to a ui object,"""
1070
1067
1071 def __init__(self, ui, fp):
1068 def __init__(self, ui, fp):
1072 super(interrupthandler, self).__init__(fp)
1069 super(interrupthandler, self).__init__(fp)
1073 self.ui = ui
1070 self.ui = ui
1074
1071
1075 def _readpartheader(self):
1072 def _readpartheader(self):
1076 """reads a part header size and return the bytes blob
1073 """reads a part header size and return the bytes blob
1077
1074
1078 returns None if empty"""
1075 returns None if empty"""
1079 headersize = self._unpack(_fpartheadersize)[0]
1076 headersize = self._unpack(_fpartheadersize)[0]
1080 if headersize < 0:
1077 if headersize < 0:
1081 raise error.BundleValueError('negative part header size: %i'
1078 raise error.BundleValueError('negative part header size: %i'
1082 % headersize)
1079 % headersize)
1083 indebug(self.ui, 'part header size: %i\n' % headersize)
1080 indebug(self.ui, 'part header size: %i\n' % headersize)
1084 if headersize:
1081 if headersize:
1085 return self._readexact(headersize)
1082 return self._readexact(headersize)
1086 return None
1083 return None
1087
1084
1088 def __call__(self):
1085 def __call__(self):
1089
1086
1090 self.ui.debug('bundle2-input-stream-interrupt:'
1087 self.ui.debug('bundle2-input-stream-interrupt:'
1091 ' opening out of band context\n')
1088 ' opening out of band context\n')
1092 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1089 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1093 headerblock = self._readpartheader()
1090 headerblock = self._readpartheader()
1094 if headerblock is None:
1091 if headerblock is None:
1095 indebug(self.ui, 'no part found during interruption.')
1092 indebug(self.ui, 'no part found during interruption.')
1096 return
1093 return
1097 part = unbundlepart(self.ui, headerblock, self._fp)
1094 part = unbundlepart(self.ui, headerblock, self._fp)
1098 op = interruptoperation(self.ui)
1095 op = interruptoperation(self.ui)
1099 _processpart(op, part)
1096 _processpart(op, part)
1100 self.ui.debug('bundle2-input-stream-interrupt:'
1097 self.ui.debug('bundle2-input-stream-interrupt:'
1101 ' closing out of band context\n')
1098 ' closing out of band context\n')
1102
1099
1103 class interruptoperation(object):
1100 class interruptoperation(object):
1104 """A limited operation to be use by part handler during interruption
1101 """A limited operation to be use by part handler during interruption
1105
1102
1106 It only have access to an ui object.
1103 It only have access to an ui object.
1107 """
1104 """
1108
1105
1109 def __init__(self, ui):
1106 def __init__(self, ui):
1110 self.ui = ui
1107 self.ui = ui
1111 self.reply = None
1108 self.reply = None
1112 self.captureoutput = False
1109 self.captureoutput = False
1113
1110
1114 @property
1111 @property
1115 def repo(self):
1112 def repo(self):
1116 raise error.ProgrammingError('no repo access from stream interruption')
1113 raise error.ProgrammingError('no repo access from stream interruption')
1117
1114
1118 def gettransaction(self):
1115 def gettransaction(self):
1119 raise TransactionUnavailable('no repo access from stream interruption')
1116 raise TransactionUnavailable('no repo access from stream interruption')
1120
1117
1121 class unbundlepart(unpackermixin):
1118 class unbundlepart(unpackermixin):
1122 """a bundle part read from a bundle"""
1119 """a bundle part read from a bundle"""
1123
1120
1124 def __init__(self, ui, header, fp):
1121 def __init__(self, ui, header, fp):
1125 super(unbundlepart, self).__init__(fp)
1122 super(unbundlepart, self).__init__(fp)
1126 self._seekable = (util.safehasattr(fp, 'seek') and
1123 self._seekable = (util.safehasattr(fp, 'seek') and
1127 util.safehasattr(fp, 'tell'))
1124 util.safehasattr(fp, 'tell'))
1128 self.ui = ui
1125 self.ui = ui
1129 # unbundle state attr
1126 # unbundle state attr
1130 self._headerdata = header
1127 self._headerdata = header
1131 self._headeroffset = 0
1128 self._headeroffset = 0
1132 self._initialized = False
1129 self._initialized = False
1133 self.consumed = False
1130 self.consumed = False
1134 # part data
1131 # part data
1135 self.id = None
1132 self.id = None
1136 self.type = None
1133 self.type = None
1137 self.mandatoryparams = None
1134 self.mandatoryparams = None
1138 self.advisoryparams = None
1135 self.advisoryparams = None
1139 self.params = None
1136 self.params = None
1140 self.mandatorykeys = ()
1137 self.mandatorykeys = ()
1141 self._payloadstream = None
1138 self._payloadstream = None
1142 self._readheader()
1139 self._readheader()
1143 self._mandatory = None
1140 self._mandatory = None
1144 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1141 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1145 self._pos = 0
1142 self._pos = 0
1146
1143
1147 def _fromheader(self, size):
1144 def _fromheader(self, size):
1148 """return the next <size> byte from the header"""
1145 """return the next <size> byte from the header"""
1149 offset = self._headeroffset
1146 offset = self._headeroffset
1150 data = self._headerdata[offset:(offset + size)]
1147 data = self._headerdata[offset:(offset + size)]
1151 self._headeroffset = offset + size
1148 self._headeroffset = offset + size
1152 return data
1149 return data
1153
1150
1154 def _unpackheader(self, format):
1151 def _unpackheader(self, format):
1155 """read given format from header
1152 """read given format from header
1156
1153
1157 This automatically compute the size of the format to read."""
1154 This automatically compute the size of the format to read."""
1158 data = self._fromheader(struct.calcsize(format))
1155 data = self._fromheader(struct.calcsize(format))
1159 return _unpack(format, data)
1156 return _unpack(format, data)
1160
1157
1161 def _initparams(self, mandatoryparams, advisoryparams):
1158 def _initparams(self, mandatoryparams, advisoryparams):
1162 """internal function to setup all logic related parameters"""
1159 """internal function to setup all logic related parameters"""
1163 # make it read only to prevent people touching it by mistake.
1160 # make it read only to prevent people touching it by mistake.
1164 self.mandatoryparams = tuple(mandatoryparams)
1161 self.mandatoryparams = tuple(mandatoryparams)
1165 self.advisoryparams = tuple(advisoryparams)
1162 self.advisoryparams = tuple(advisoryparams)
1166 # user friendly UI
1163 # user friendly UI
1167 self.params = util.sortdict(self.mandatoryparams)
1164 self.params = util.sortdict(self.mandatoryparams)
1168 self.params.update(self.advisoryparams)
1165 self.params.update(self.advisoryparams)
1169 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1166 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1170
1167
1171 def _payloadchunks(self, chunknum=0):
1168 def _payloadchunks(self, chunknum=0):
1172 '''seek to specified chunk and start yielding data'''
1169 '''seek to specified chunk and start yielding data'''
1173 if len(self._chunkindex) == 0:
1170 if len(self._chunkindex) == 0:
1174 assert chunknum == 0, 'Must start with chunk 0'
1171 assert chunknum == 0, 'Must start with chunk 0'
1175 self._chunkindex.append((0, self._tellfp()))
1172 self._chunkindex.append((0, self._tellfp()))
1176 else:
1173 else:
1177 assert chunknum < len(self._chunkindex), \
1174 assert chunknum < len(self._chunkindex), \
1178 'Unknown chunk %d' % chunknum
1175 'Unknown chunk %d' % chunknum
1179 self._seekfp(self._chunkindex[chunknum][1])
1176 self._seekfp(self._chunkindex[chunknum][1])
1180
1177
1181 pos = self._chunkindex[chunknum][0]
1178 pos = self._chunkindex[chunknum][0]
1182 payloadsize = self._unpack(_fpayloadsize)[0]
1179 payloadsize = self._unpack(_fpayloadsize)[0]
1183 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1180 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1184 while payloadsize:
1181 while payloadsize:
1185 if payloadsize == flaginterrupt:
1182 if payloadsize == flaginterrupt:
1186 # interruption detection, the handler will now read a
1183 # interruption detection, the handler will now read a
1187 # single part and process it.
1184 # single part and process it.
1188 interrupthandler(self.ui, self._fp)()
1185 interrupthandler(self.ui, self._fp)()
1189 elif payloadsize < 0:
1186 elif payloadsize < 0:
1190 msg = 'negative payload chunk size: %i' % payloadsize
1187 msg = 'negative payload chunk size: %i' % payloadsize
1191 raise error.BundleValueError(msg)
1188 raise error.BundleValueError(msg)
1192 else:
1189 else:
1193 result = self._readexact(payloadsize)
1190 result = self._readexact(payloadsize)
1194 chunknum += 1
1191 chunknum += 1
1195 pos += payloadsize
1192 pos += payloadsize
1196 if chunknum == len(self._chunkindex):
1193 if chunknum == len(self._chunkindex):
1197 self._chunkindex.append((pos, self._tellfp()))
1194 self._chunkindex.append((pos, self._tellfp()))
1198 yield result
1195 yield result
1199 payloadsize = self._unpack(_fpayloadsize)[0]
1196 payloadsize = self._unpack(_fpayloadsize)[0]
1200 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1197 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1201
1198
1202 def _findchunk(self, pos):
1199 def _findchunk(self, pos):
1203 '''for a given payload position, return a chunk number and offset'''
1200 '''for a given payload position, return a chunk number and offset'''
1204 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1201 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1205 if ppos == pos:
1202 if ppos == pos:
1206 return chunk, 0
1203 return chunk, 0
1207 elif ppos > pos:
1204 elif ppos > pos:
1208 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1205 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1209 raise ValueError('Unknown chunk')
1206 raise ValueError('Unknown chunk')
1210
1207
1211 def _readheader(self):
1208 def _readheader(self):
1212 """read the header and setup the object"""
1209 """read the header and setup the object"""
1213 typesize = self._unpackheader(_fparttypesize)[0]
1210 typesize = self._unpackheader(_fparttypesize)[0]
1214 self.type = self._fromheader(typesize)
1211 self.type = self._fromheader(typesize)
1215 indebug(self.ui, 'part type: "%s"' % self.type)
1212 indebug(self.ui, 'part type: "%s"' % self.type)
1216 self.id = self._unpackheader(_fpartid)[0]
1213 self.id = self._unpackheader(_fpartid)[0]
1217 indebug(self.ui, 'part id: "%s"' % self.id)
1214 indebug(self.ui, 'part id: "%s"' % self.id)
1218 # extract mandatory bit from type
1215 # extract mandatory bit from type
1219 self.mandatory = (self.type != self.type.lower())
1216 self.mandatory = (self.type != self.type.lower())
1220 self.type = self.type.lower()
1217 self.type = self.type.lower()
1221 ## reading parameters
1218 ## reading parameters
1222 # param count
1219 # param count
1223 mancount, advcount = self._unpackheader(_fpartparamcount)
1220 mancount, advcount = self._unpackheader(_fpartparamcount)
1224 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1221 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1225 # param size
1222 # param size
1226 fparamsizes = _makefpartparamsizes(mancount + advcount)
1223 fparamsizes = _makefpartparamsizes(mancount + advcount)
1227 paramsizes = self._unpackheader(fparamsizes)
1224 paramsizes = self._unpackheader(fparamsizes)
1228 # make it a list of couple again
1225 # make it a list of couple again
1229 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
1226 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
1230 # split mandatory from advisory
1227 # split mandatory from advisory
1231 mansizes = paramsizes[:mancount]
1228 mansizes = paramsizes[:mancount]
1232 advsizes = paramsizes[mancount:]
1229 advsizes = paramsizes[mancount:]
1233 # retrieve param value
1230 # retrieve param value
1234 manparams = []
1231 manparams = []
1235 for key, value in mansizes:
1232 for key, value in mansizes:
1236 manparams.append((self._fromheader(key), self._fromheader(value)))
1233 manparams.append((self._fromheader(key), self._fromheader(value)))
1237 advparams = []
1234 advparams = []
1238 for key, value in advsizes:
1235 for key, value in advsizes:
1239 advparams.append((self._fromheader(key), self._fromheader(value)))
1236 advparams.append((self._fromheader(key), self._fromheader(value)))
1240 self._initparams(manparams, advparams)
1237 self._initparams(manparams, advparams)
1241 ## part payload
1238 ## part payload
1242 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1239 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1243 # we read the data, tell it
1240 # we read the data, tell it
1244 self._initialized = True
1241 self._initialized = True
1245
1242
1246 def read(self, size=None):
1243 def read(self, size=None):
1247 """read payload data"""
1244 """read payload data"""
1248 if not self._initialized:
1245 if not self._initialized:
1249 self._readheader()
1246 self._readheader()
1250 if size is None:
1247 if size is None:
1251 data = self._payloadstream.read()
1248 data = self._payloadstream.read()
1252 else:
1249 else:
1253 data = self._payloadstream.read(size)
1250 data = self._payloadstream.read(size)
1254 self._pos += len(data)
1251 self._pos += len(data)
1255 if size is None or len(data) < size:
1252 if size is None or len(data) < size:
1256 if not self.consumed and self._pos:
1253 if not self.consumed and self._pos:
1257 self.ui.debug('bundle2-input-part: total payload size %i\n'
1254 self.ui.debug('bundle2-input-part: total payload size %i\n'
1258 % self._pos)
1255 % self._pos)
1259 self.consumed = True
1256 self.consumed = True
1260 return data
1257 return data
1261
1258
1262 def tell(self):
1259 def tell(self):
1263 return self._pos
1260 return self._pos
1264
1261
1265 def seek(self, offset, whence=0):
1262 def seek(self, offset, whence=0):
1266 if whence == 0:
1263 if whence == 0:
1267 newpos = offset
1264 newpos = offset
1268 elif whence == 1:
1265 elif whence == 1:
1269 newpos = self._pos + offset
1266 newpos = self._pos + offset
1270 elif whence == 2:
1267 elif whence == 2:
1271 if not self.consumed:
1268 if not self.consumed:
1272 self.read()
1269 self.read()
1273 newpos = self._chunkindex[-1][0] - offset
1270 newpos = self._chunkindex[-1][0] - offset
1274 else:
1271 else:
1275 raise ValueError('Unknown whence value: %r' % (whence,))
1272 raise ValueError('Unknown whence value: %r' % (whence,))
1276
1273
1277 if newpos > self._chunkindex[-1][0] and not self.consumed:
1274 if newpos > self._chunkindex[-1][0] and not self.consumed:
1278 self.read()
1275 self.read()
1279 if not 0 <= newpos <= self._chunkindex[-1][0]:
1276 if not 0 <= newpos <= self._chunkindex[-1][0]:
1280 raise ValueError('Offset out of range')
1277 raise ValueError('Offset out of range')
1281
1278
1282 if self._pos != newpos:
1279 if self._pos != newpos:
1283 chunk, internaloffset = self._findchunk(newpos)
1280 chunk, internaloffset = self._findchunk(newpos)
1284 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1281 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1285 adjust = self.read(internaloffset)
1282 adjust = self.read(internaloffset)
1286 if len(adjust) != internaloffset:
1283 if len(adjust) != internaloffset:
1287 raise error.Abort(_('Seek failed\n'))
1284 raise error.Abort(_('Seek failed\n'))
1288 self._pos = newpos
1285 self._pos = newpos
1289
1286
1290 def _seekfp(self, offset, whence=0):
1287 def _seekfp(self, offset, whence=0):
1291 """move the underlying file pointer
1288 """move the underlying file pointer
1292
1289
1293 This method is meant for internal usage by the bundle2 protocol only.
1290 This method is meant for internal usage by the bundle2 protocol only.
1294 They directly manipulate the low level stream including bundle2 level
1291 They directly manipulate the low level stream including bundle2 level
1295 instruction.
1292 instruction.
1296
1293
1297 Do not use it to implement higher-level logic or methods."""
1294 Do not use it to implement higher-level logic or methods."""
1298 if self._seekable:
1295 if self._seekable:
1299 return self._fp.seek(offset, whence)
1296 return self._fp.seek(offset, whence)
1300 else:
1297 else:
1301 raise NotImplementedError(_('File pointer is not seekable'))
1298 raise NotImplementedError(_('File pointer is not seekable'))
1302
1299
1303 def _tellfp(self):
1300 def _tellfp(self):
1304 """return the file offset, or None if file is not seekable
1301 """return the file offset, or None if file is not seekable
1305
1302
1306 This method is meant for internal usage by the bundle2 protocol only.
1303 This method is meant for internal usage by the bundle2 protocol only.
1307 They directly manipulate the low level stream including bundle2 level
1304 They directly manipulate the low level stream including bundle2 level
1308 instruction.
1305 instruction.
1309
1306
1310 Do not use it to implement higher-level logic or methods."""
1307 Do not use it to implement higher-level logic or methods."""
1311 if self._seekable:
1308 if self._seekable:
1312 try:
1309 try:
1313 return self._fp.tell()
1310 return self._fp.tell()
1314 except IOError as e:
1311 except IOError as e:
1315 if e.errno == errno.ESPIPE:
1312 if e.errno == errno.ESPIPE:
1316 self._seekable = False
1313 self._seekable = False
1317 else:
1314 else:
1318 raise
1315 raise
1319 return None
1316 return None
1320
1317
1321 # These are only the static capabilities.
1318 # These are only the static capabilities.
1322 # Check the 'getrepocaps' function for the rest.
1319 # Check the 'getrepocaps' function for the rest.
1323 capabilities = {'HG20': (),
1320 capabilities = {'HG20': (),
1324 'error': ('abort', 'unsupportedcontent', 'pushraced',
1321 'error': ('abort', 'unsupportedcontent', 'pushraced',
1325 'pushkey'),
1322 'pushkey'),
1326 'listkeys': (),
1323 'listkeys': (),
1327 'pushkey': (),
1324 'pushkey': (),
1328 'digests': tuple(sorted(util.DIGESTS.keys())),
1325 'digests': tuple(sorted(util.DIGESTS.keys())),
1329 'remote-changegroup': ('http', 'https'),
1326 'remote-changegroup': ('http', 'https'),
1330 'hgtagsfnodes': (),
1327 'hgtagsfnodes': (),
1331 }
1328 }
1332
1329
1333 def getrepocaps(repo, allowpushback=False):
1330 def getrepocaps(repo, allowpushback=False):
1334 """return the bundle2 capabilities for a given repo
1331 """return the bundle2 capabilities for a given repo
1335
1332
1336 Exists to allow extensions (like evolution) to mutate the capabilities.
1333 Exists to allow extensions (like evolution) to mutate the capabilities.
1337 """
1334 """
1338 caps = capabilities.copy()
1335 caps = capabilities.copy()
1339 caps['changegroup'] = tuple(sorted(
1336 caps['changegroup'] = tuple(sorted(
1340 changegroup.supportedincomingversions(repo)))
1337 changegroup.supportedincomingversions(repo)))
1341 if obsolete.isenabled(repo, obsolete.exchangeopt):
1338 if obsolete.isenabled(repo, obsolete.exchangeopt):
1342 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1339 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1343 caps['obsmarkers'] = supportedformat
1340 caps['obsmarkers'] = supportedformat
1344 if allowpushback:
1341 if allowpushback:
1345 caps['pushback'] = ()
1342 caps['pushback'] = ()
1346 cpmode = repo.ui.config('server', 'concurrent-push-mode', 'strict')
1343 cpmode = repo.ui.config('server', 'concurrent-push-mode', 'strict')
1347 if cpmode == 'check-related':
1344 if cpmode == 'check-related':
1348 caps['checkheads'] = ('related',)
1345 caps['checkheads'] = ('related',)
1349 return caps
1346 return caps
1350
1347
1351 def bundle2caps(remote):
1348 def bundle2caps(remote):
1352 """return the bundle capabilities of a peer as dict"""
1349 """return the bundle capabilities of a peer as dict"""
1353 raw = remote.capable('bundle2')
1350 raw = remote.capable('bundle2')
1354 if not raw and raw != '':
1351 if not raw and raw != '':
1355 return {}
1352 return {}
1356 capsblob = urlreq.unquote(remote.capable('bundle2'))
1353 capsblob = urlreq.unquote(remote.capable('bundle2'))
1357 return decodecaps(capsblob)
1354 return decodecaps(capsblob)
1358
1355
1359 def obsmarkersversion(caps):
1356 def obsmarkersversion(caps):
1360 """extract the list of supported obsmarkers versions from a bundle2caps dict
1357 """extract the list of supported obsmarkers versions from a bundle2caps dict
1361 """
1358 """
1362 obscaps = caps.get('obsmarkers', ())
1359 obscaps = caps.get('obsmarkers', ())
1363 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1360 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1364
1361
1365 def writenewbundle(ui, repo, source, filename, bundletype, outgoing, opts,
1362 def writenewbundle(ui, repo, source, filename, bundletype, outgoing, opts,
1366 vfs=None, compression=None, compopts=None):
1363 vfs=None, compression=None, compopts=None):
1367 if bundletype.startswith('HG10'):
1364 if bundletype.startswith('HG10'):
1368 cg = changegroup.getchangegroup(repo, source, outgoing, version='01')
1365 cg = changegroup.getchangegroup(repo, source, outgoing, version='01')
1369 return writebundle(ui, cg, filename, bundletype, vfs=vfs,
1366 return writebundle(ui, cg, filename, bundletype, vfs=vfs,
1370 compression=compression, compopts=compopts)
1367 compression=compression, compopts=compopts)
1371 elif not bundletype.startswith('HG20'):
1368 elif not bundletype.startswith('HG20'):
1372 raise error.ProgrammingError('unknown bundle type: %s' % bundletype)
1369 raise error.ProgrammingError('unknown bundle type: %s' % bundletype)
1373
1370
1374 caps = {}
1371 caps = {}
1375 if 'obsolescence' in opts:
1372 if 'obsolescence' in opts:
1376 caps['obsmarkers'] = ('V1',)
1373 caps['obsmarkers'] = ('V1',)
1377 bundle = bundle20(ui, caps)
1374 bundle = bundle20(ui, caps)
1378 bundle.setcompression(compression, compopts)
1375 bundle.setcompression(compression, compopts)
1379 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1376 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1380 chunkiter = bundle.getchunks()
1377 chunkiter = bundle.getchunks()
1381
1378
1382 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1379 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1383
1380
1384 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1381 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1385 # We should eventually reconcile this logic with the one behind
1382 # We should eventually reconcile this logic with the one behind
1386 # 'exchange.getbundle2partsgenerator'.
1383 # 'exchange.getbundle2partsgenerator'.
1387 #
1384 #
1388 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1385 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1389 # different right now. So we keep them separated for now for the sake of
1386 # different right now. So we keep them separated for now for the sake of
1390 # simplicity.
1387 # simplicity.
1391
1388
1392 # we always want a changegroup in such bundle
1389 # we always want a changegroup in such bundle
1393 cgversion = opts.get('cg.version')
1390 cgversion = opts.get('cg.version')
1394 if cgversion is None:
1391 if cgversion is None:
1395 cgversion = changegroup.safeversion(repo)
1392 cgversion = changegroup.safeversion(repo)
1396 cg = changegroup.getchangegroup(repo, source, outgoing,
1393 cg = changegroup.getchangegroup(repo, source, outgoing,
1397 version=cgversion)
1394 version=cgversion)
1398 part = bundler.newpart('changegroup', data=cg.getchunks())
1395 part = bundler.newpart('changegroup', data=cg.getchunks())
1399 part.addparam('version', cg.version)
1396 part.addparam('version', cg.version)
1400 if 'clcount' in cg.extras:
1397 if 'clcount' in cg.extras:
1401 part.addparam('nbchanges', str(cg.extras['clcount']),
1398 part.addparam('nbchanges', str(cg.extras['clcount']),
1402 mandatory=False)
1399 mandatory=False)
1403
1400
1404 addparttagsfnodescache(repo, bundler, outgoing)
1401 addparttagsfnodescache(repo, bundler, outgoing)
1405
1402
1406 if opts.get('obsolescence', False):
1403 if opts.get('obsolescence', False):
1407 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1404 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1408 buildobsmarkerspart(bundler, obsmarkers)
1405 buildobsmarkerspart(bundler, obsmarkers)
1409
1406
1410 if opts.get('phases', False):
1407 if opts.get('phases', False):
1411 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1408 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1412 phasedata = []
1409 phasedata = []
1413 for phase in phases.allphases:
1410 for phase in phases.allphases:
1414 for head in headsbyphase[phase]:
1411 for head in headsbyphase[phase]:
1415 phasedata.append(_pack(_fphasesentry, phase, head))
1412 phasedata.append(_pack(_fphasesentry, phase, head))
1416 bundler.newpart('phase-heads', data=''.join(phasedata))
1413 bundler.newpart('phase-heads', data=''.join(phasedata))
1417
1414
1418 def addparttagsfnodescache(repo, bundler, outgoing):
1415 def addparttagsfnodescache(repo, bundler, outgoing):
1419 # we include the tags fnode cache for the bundle changeset
1416 # we include the tags fnode cache for the bundle changeset
1420 # (as an optional parts)
1417 # (as an optional parts)
1421 cache = tags.hgtagsfnodescache(repo.unfiltered())
1418 cache = tags.hgtagsfnodescache(repo.unfiltered())
1422 chunks = []
1419 chunks = []
1423
1420
1424 # .hgtags fnodes are only relevant for head changesets. While we could
1421 # .hgtags fnodes are only relevant for head changesets. While we could
1425 # transfer values for all known nodes, there will likely be little to
1422 # transfer values for all known nodes, there will likely be little to
1426 # no benefit.
1423 # no benefit.
1427 #
1424 #
1428 # We don't bother using a generator to produce output data because
1425 # We don't bother using a generator to produce output data because
1429 # a) we only have 40 bytes per head and even esoteric numbers of heads
1426 # a) we only have 40 bytes per head and even esoteric numbers of heads
1430 # consume little memory (1M heads is 40MB) b) we don't want to send the
1427 # consume little memory (1M heads is 40MB) b) we don't want to send the
1431 # part if we don't have entries and knowing if we have entries requires
1428 # part if we don't have entries and knowing if we have entries requires
1432 # cache lookups.
1429 # cache lookups.
1433 for node in outgoing.missingheads:
1430 for node in outgoing.missingheads:
1434 # Don't compute missing, as this may slow down serving.
1431 # Don't compute missing, as this may slow down serving.
1435 fnode = cache.getfnode(node, computemissing=False)
1432 fnode = cache.getfnode(node, computemissing=False)
1436 if fnode is not None:
1433 if fnode is not None:
1437 chunks.extend([node, fnode])
1434 chunks.extend([node, fnode])
1438
1435
1439 if chunks:
1436 if chunks:
1440 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1437 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1441
1438
1442 def buildobsmarkerspart(bundler, markers):
1439 def buildobsmarkerspart(bundler, markers):
1443 """add an obsmarker part to the bundler with <markers>
1440 """add an obsmarker part to the bundler with <markers>
1444
1441
1445 No part is created if markers is empty.
1442 No part is created if markers is empty.
1446 Raises ValueError if the bundler doesn't support any known obsmarker format.
1443 Raises ValueError if the bundler doesn't support any known obsmarker format.
1447 """
1444 """
1448 if not markers:
1445 if not markers:
1449 return None
1446 return None
1450
1447
1451 remoteversions = obsmarkersversion(bundler.capabilities)
1448 remoteversions = obsmarkersversion(bundler.capabilities)
1452 version = obsolete.commonversion(remoteversions)
1449 version = obsolete.commonversion(remoteversions)
1453 if version is None:
1450 if version is None:
1454 raise ValueError('bundler does not support common obsmarker format')
1451 raise ValueError('bundler does not support common obsmarker format')
1455 stream = obsolete.encodemarkers(markers, True, version=version)
1452 stream = obsolete.encodemarkers(markers, True, version=version)
1456 return bundler.newpart('obsmarkers', data=stream)
1453 return bundler.newpart('obsmarkers', data=stream)
1457
1454
1458 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1455 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1459 compopts=None):
1456 compopts=None):
1460 """Write a bundle file and return its filename.
1457 """Write a bundle file and return its filename.
1461
1458
1462 Existing files will not be overwritten.
1459 Existing files will not be overwritten.
1463 If no filename is specified, a temporary file is created.
1460 If no filename is specified, a temporary file is created.
1464 bz2 compression can be turned off.
1461 bz2 compression can be turned off.
1465 The bundle file will be deleted in case of errors.
1462 The bundle file will be deleted in case of errors.
1466 """
1463 """
1467
1464
1468 if bundletype == "HG20":
1465 if bundletype == "HG20":
1469 bundle = bundle20(ui)
1466 bundle = bundle20(ui)
1470 bundle.setcompression(compression, compopts)
1467 bundle.setcompression(compression, compopts)
1471 part = bundle.newpart('changegroup', data=cg.getchunks())
1468 part = bundle.newpart('changegroup', data=cg.getchunks())
1472 part.addparam('version', cg.version)
1469 part.addparam('version', cg.version)
1473 if 'clcount' in cg.extras:
1470 if 'clcount' in cg.extras:
1474 part.addparam('nbchanges', str(cg.extras['clcount']),
1471 part.addparam('nbchanges', str(cg.extras['clcount']),
1475 mandatory=False)
1472 mandatory=False)
1476 chunkiter = bundle.getchunks()
1473 chunkiter = bundle.getchunks()
1477 else:
1474 else:
1478 # compression argument is only for the bundle2 case
1475 # compression argument is only for the bundle2 case
1479 assert compression is None
1476 assert compression is None
1480 if cg.version != '01':
1477 if cg.version != '01':
1481 raise error.Abort(_('old bundle types only supports v1 '
1478 raise error.Abort(_('old bundle types only supports v1 '
1482 'changegroups'))
1479 'changegroups'))
1483 header, comp = bundletypes[bundletype]
1480 header, comp = bundletypes[bundletype]
1484 if comp not in util.compengines.supportedbundletypes:
1481 if comp not in util.compengines.supportedbundletypes:
1485 raise error.Abort(_('unknown stream compression type: %s')
1482 raise error.Abort(_('unknown stream compression type: %s')
1486 % comp)
1483 % comp)
1487 compengine = util.compengines.forbundletype(comp)
1484 compengine = util.compengines.forbundletype(comp)
1488 def chunkiter():
1485 def chunkiter():
1489 yield header
1486 yield header
1490 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1487 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1491 yield chunk
1488 yield chunk
1492 chunkiter = chunkiter()
1489 chunkiter = chunkiter()
1493
1490
1494 # parse the changegroup data, otherwise we will block
1491 # parse the changegroup data, otherwise we will block
1495 # in case of sshrepo because we don't know the end of the stream
1492 # in case of sshrepo because we don't know the end of the stream
1496 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1493 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1497
1494
1498 def combinechangegroupresults(op):
1495 def combinechangegroupresults(op):
1499 """logic to combine 0 or more addchangegroup results into one"""
1496 """logic to combine 0 or more addchangegroup results into one"""
1500 results = [r.get('return', 0)
1497 results = [r.get('return', 0)
1501 for r in op.records['changegroup']]
1498 for r in op.records['changegroup']]
1502 changedheads = 0
1499 changedheads = 0
1503 result = 1
1500 result = 1
1504 for ret in results:
1501 for ret in results:
1505 # If any changegroup result is 0, return 0
1502 # If any changegroup result is 0, return 0
1506 if ret == 0:
1503 if ret == 0:
1507 result = 0
1504 result = 0
1508 break
1505 break
1509 if ret < -1:
1506 if ret < -1:
1510 changedheads += ret + 1
1507 changedheads += ret + 1
1511 elif ret > 1:
1508 elif ret > 1:
1512 changedheads += ret - 1
1509 changedheads += ret - 1
1513 if changedheads > 0:
1510 if changedheads > 0:
1514 result = 1 + changedheads
1511 result = 1 + changedheads
1515 elif changedheads < 0:
1512 elif changedheads < 0:
1516 result = -1 + changedheads
1513 result = -1 + changedheads
1517 return result
1514 return result
1518
1515
1519 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest'))
1516 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest'))
1520 def handlechangegroup(op, inpart):
1517 def handlechangegroup(op, inpart):
1521 """apply a changegroup part on the repo
1518 """apply a changegroup part on the repo
1522
1519
1523 This is a very early implementation that will massive rework before being
1520 This is a very early implementation that will massive rework before being
1524 inflicted to any end-user.
1521 inflicted to any end-user.
1525 """
1522 """
1526 tr = op.gettransaction()
1523 tr = op.gettransaction()
1527 unpackerversion = inpart.params.get('version', '01')
1524 unpackerversion = inpart.params.get('version', '01')
1528 # We should raise an appropriate exception here
1525 # We should raise an appropriate exception here
1529 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1526 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1530 # the source and url passed here are overwritten by the one contained in
1527 # the source and url passed here are overwritten by the one contained in
1531 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1528 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1532 nbchangesets = None
1529 nbchangesets = None
1533 if 'nbchanges' in inpart.params:
1530 if 'nbchanges' in inpart.params:
1534 nbchangesets = int(inpart.params.get('nbchanges'))
1531 nbchangesets = int(inpart.params.get('nbchanges'))
1535 if ('treemanifest' in inpart.params and
1532 if ('treemanifest' in inpart.params and
1536 'treemanifest' not in op.repo.requirements):
1533 'treemanifest' not in op.repo.requirements):
1537 if len(op.repo.changelog) != 0:
1534 if len(op.repo.changelog) != 0:
1538 raise error.Abort(_(
1535 raise error.Abort(_(
1539 "bundle contains tree manifests, but local repo is "
1536 "bundle contains tree manifests, but local repo is "
1540 "non-empty and does not use tree manifests"))
1537 "non-empty and does not use tree manifests"))
1541 op.repo.requirements.add('treemanifest')
1538 op.repo.requirements.add('treemanifest')
1542 op.repo._applyopenerreqs()
1539 op.repo._applyopenerreqs()
1543 op.repo._writerequirements()
1540 op.repo._writerequirements()
1544 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2',
1541 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2',
1545 expectedtotal=nbchangesets)
1542 expectedtotal=nbchangesets)
1546 if op.reply is not None:
1543 if op.reply is not None:
1547 # This is definitely not the final form of this
1544 # This is definitely not the final form of this
1548 # return. But one need to start somewhere.
1545 # return. But one need to start somewhere.
1549 part = op.reply.newpart('reply:changegroup', mandatory=False)
1546 part = op.reply.newpart('reply:changegroup', mandatory=False)
1550 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1547 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1551 part.addparam('return', '%i' % ret, mandatory=False)
1548 part.addparam('return', '%i' % ret, mandatory=False)
1552 assert not inpart.read()
1549 assert not inpart.read()
1553
1550
1554 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1551 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1555 ['digest:%s' % k for k in util.DIGESTS.keys()])
1552 ['digest:%s' % k for k in util.DIGESTS.keys()])
1556 @parthandler('remote-changegroup', _remotechangegroupparams)
1553 @parthandler('remote-changegroup', _remotechangegroupparams)
1557 def handleremotechangegroup(op, inpart):
1554 def handleremotechangegroup(op, inpart):
1558 """apply a bundle10 on the repo, given an url and validation information
1555 """apply a bundle10 on the repo, given an url and validation information
1559
1556
1560 All the information about the remote bundle to import are given as
1557 All the information about the remote bundle to import are given as
1561 parameters. The parameters include:
1558 parameters. The parameters include:
1562 - url: the url to the bundle10.
1559 - url: the url to the bundle10.
1563 - size: the bundle10 file size. It is used to validate what was
1560 - size: the bundle10 file size. It is used to validate what was
1564 retrieved by the client matches the server knowledge about the bundle.
1561 retrieved by the client matches the server knowledge about the bundle.
1565 - digests: a space separated list of the digest types provided as
1562 - digests: a space separated list of the digest types provided as
1566 parameters.
1563 parameters.
1567 - digest:<digest-type>: the hexadecimal representation of the digest with
1564 - digest:<digest-type>: the hexadecimal representation of the digest with
1568 that name. Like the size, it is used to validate what was retrieved by
1565 that name. Like the size, it is used to validate what was retrieved by
1569 the client matches what the server knows about the bundle.
1566 the client matches what the server knows about the bundle.
1570
1567
1571 When multiple digest types are given, all of them are checked.
1568 When multiple digest types are given, all of them are checked.
1572 """
1569 """
1573 try:
1570 try:
1574 raw_url = inpart.params['url']
1571 raw_url = inpart.params['url']
1575 except KeyError:
1572 except KeyError:
1576 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1573 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1577 parsed_url = util.url(raw_url)
1574 parsed_url = util.url(raw_url)
1578 if parsed_url.scheme not in capabilities['remote-changegroup']:
1575 if parsed_url.scheme not in capabilities['remote-changegroup']:
1579 raise error.Abort(_('remote-changegroup does not support %s urls') %
1576 raise error.Abort(_('remote-changegroup does not support %s urls') %
1580 parsed_url.scheme)
1577 parsed_url.scheme)
1581
1578
1582 try:
1579 try:
1583 size = int(inpart.params['size'])
1580 size = int(inpart.params['size'])
1584 except ValueError:
1581 except ValueError:
1585 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1582 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1586 % 'size')
1583 % 'size')
1587 except KeyError:
1584 except KeyError:
1588 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1585 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1589
1586
1590 digests = {}
1587 digests = {}
1591 for typ in inpart.params.get('digests', '').split():
1588 for typ in inpart.params.get('digests', '').split():
1592 param = 'digest:%s' % typ
1589 param = 'digest:%s' % typ
1593 try:
1590 try:
1594 value = inpart.params[param]
1591 value = inpart.params[param]
1595 except KeyError:
1592 except KeyError:
1596 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1593 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1597 param)
1594 param)
1598 digests[typ] = value
1595 digests[typ] = value
1599
1596
1600 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1597 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1601
1598
1602 tr = op.gettransaction()
1599 tr = op.gettransaction()
1603 from . import exchange
1600 from . import exchange
1604 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1601 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1605 if not isinstance(cg, changegroup.cg1unpacker):
1602 if not isinstance(cg, changegroup.cg1unpacker):
1606 raise error.Abort(_('%s: not a bundle version 1.0') %
1603 raise error.Abort(_('%s: not a bundle version 1.0') %
1607 util.hidepassword(raw_url))
1604 util.hidepassword(raw_url))
1608 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2')
1605 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2')
1609 if op.reply is not None:
1606 if op.reply is not None:
1610 # This is definitely not the final form of this
1607 # This is definitely not the final form of this
1611 # return. But one need to start somewhere.
1608 # return. But one need to start somewhere.
1612 part = op.reply.newpart('reply:changegroup')
1609 part = op.reply.newpart('reply:changegroup')
1613 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1610 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1614 part.addparam('return', '%i' % ret, mandatory=False)
1611 part.addparam('return', '%i' % ret, mandatory=False)
1615 try:
1612 try:
1616 real_part.validate()
1613 real_part.validate()
1617 except error.Abort as e:
1614 except error.Abort as e:
1618 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1615 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1619 (util.hidepassword(raw_url), str(e)))
1616 (util.hidepassword(raw_url), str(e)))
1620 assert not inpart.read()
1617 assert not inpart.read()
1621
1618
1622 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1619 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1623 def handlereplychangegroup(op, inpart):
1620 def handlereplychangegroup(op, inpart):
1624 ret = int(inpart.params['return'])
1621 ret = int(inpart.params['return'])
1625 replyto = int(inpart.params['in-reply-to'])
1622 replyto = int(inpart.params['in-reply-to'])
1626 op.records.add('changegroup', {'return': ret}, replyto)
1623 op.records.add('changegroup', {'return': ret}, replyto)
1627
1624
1628 @parthandler('check:heads')
1625 @parthandler('check:heads')
1629 def handlecheckheads(op, inpart):
1626 def handlecheckheads(op, inpart):
1630 """check that head of the repo did not change
1627 """check that head of the repo did not change
1631
1628
1632 This is used to detect a push race when using unbundle.
1629 This is used to detect a push race when using unbundle.
1633 This replaces the "heads" argument of unbundle."""
1630 This replaces the "heads" argument of unbundle."""
1634 h = inpart.read(20)
1631 h = inpart.read(20)
1635 heads = []
1632 heads = []
1636 while len(h) == 20:
1633 while len(h) == 20:
1637 heads.append(h)
1634 heads.append(h)
1638 h = inpart.read(20)
1635 h = inpart.read(20)
1639 assert not h
1636 assert not h
1640 # Trigger a transaction so that we are guaranteed to have the lock now.
1637 # Trigger a transaction so that we are guaranteed to have the lock now.
1641 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1638 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1642 op.gettransaction()
1639 op.gettransaction()
1643 if sorted(heads) != sorted(op.repo.heads()):
1640 if sorted(heads) != sorted(op.repo.heads()):
1644 raise error.PushRaced('repository changed while pushing - '
1641 raise error.PushRaced('repository changed while pushing - '
1645 'please try again')
1642 'please try again')
1646
1643
1647 @parthandler('check:updated-heads')
1644 @parthandler('check:updated-heads')
1648 def handlecheckupdatedheads(op, inpart):
1645 def handlecheckupdatedheads(op, inpart):
1649 """check for race on the heads touched by a push
1646 """check for race on the heads touched by a push
1650
1647
1651 This is similar to 'check:heads' but focus on the heads actually updated
1648 This is similar to 'check:heads' but focus on the heads actually updated
1652 during the push. If other activities happen on unrelated heads, it is
1649 during the push. If other activities happen on unrelated heads, it is
1653 ignored.
1650 ignored.
1654
1651
1655 This allow server with high traffic to avoid push contention as long as
1652 This allow server with high traffic to avoid push contention as long as
1656 unrelated parts of the graph are involved."""
1653 unrelated parts of the graph are involved."""
1657 h = inpart.read(20)
1654 h = inpart.read(20)
1658 heads = []
1655 heads = []
1659 while len(h) == 20:
1656 while len(h) == 20:
1660 heads.append(h)
1657 heads.append(h)
1661 h = inpart.read(20)
1658 h = inpart.read(20)
1662 assert not h
1659 assert not h
1663 # trigger a transaction so that we are guaranteed to have the lock now.
1660 # trigger a transaction so that we are guaranteed to have the lock now.
1664 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1661 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1665 op.gettransaction()
1662 op.gettransaction()
1666
1663
1667 currentheads = set()
1664 currentheads = set()
1668 for ls in op.repo.branchmap().itervalues():
1665 for ls in op.repo.branchmap().itervalues():
1669 currentheads.update(ls)
1666 currentheads.update(ls)
1670
1667
1671 for h in heads:
1668 for h in heads:
1672 if h not in currentheads:
1669 if h not in currentheads:
1673 raise error.PushRaced('repository changed while pushing - '
1670 raise error.PushRaced('repository changed while pushing - '
1674 'please try again')
1671 'please try again')
1675
1672
1676 @parthandler('output')
1673 @parthandler('output')
1677 def handleoutput(op, inpart):
1674 def handleoutput(op, inpart):
1678 """forward output captured on the server to the client"""
1675 """forward output captured on the server to the client"""
1679 for line in inpart.read().splitlines():
1676 for line in inpart.read().splitlines():
1680 op.ui.status(_('remote: %s\n') % line)
1677 op.ui.status(_('remote: %s\n') % line)
1681
1678
1682 @parthandler('replycaps')
1679 @parthandler('replycaps')
1683 def handlereplycaps(op, inpart):
1680 def handlereplycaps(op, inpart):
1684 """Notify that a reply bundle should be created
1681 """Notify that a reply bundle should be created
1685
1682
1686 The payload contains the capabilities information for the reply"""
1683 The payload contains the capabilities information for the reply"""
1687 caps = decodecaps(inpart.read())
1684 caps = decodecaps(inpart.read())
1688 if op.reply is None:
1685 if op.reply is None:
1689 op.reply = bundle20(op.ui, caps)
1686 op.reply = bundle20(op.ui, caps)
1690
1687
1691 class AbortFromPart(error.Abort):
1688 class AbortFromPart(error.Abort):
1692 """Sub-class of Abort that denotes an error from a bundle2 part."""
1689 """Sub-class of Abort that denotes an error from a bundle2 part."""
1693
1690
1694 @parthandler('error:abort', ('message', 'hint'))
1691 @parthandler('error:abort', ('message', 'hint'))
1695 def handleerrorabort(op, inpart):
1692 def handleerrorabort(op, inpart):
1696 """Used to transmit abort error over the wire"""
1693 """Used to transmit abort error over the wire"""
1697 raise AbortFromPart(inpart.params['message'],
1694 raise AbortFromPart(inpart.params['message'],
1698 hint=inpart.params.get('hint'))
1695 hint=inpart.params.get('hint'))
1699
1696
1700 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1697 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1701 'in-reply-to'))
1698 'in-reply-to'))
1702 def handleerrorpushkey(op, inpart):
1699 def handleerrorpushkey(op, inpart):
1703 """Used to transmit failure of a mandatory pushkey over the wire"""
1700 """Used to transmit failure of a mandatory pushkey over the wire"""
1704 kwargs = {}
1701 kwargs = {}
1705 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1702 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1706 value = inpart.params.get(name)
1703 value = inpart.params.get(name)
1707 if value is not None:
1704 if value is not None:
1708 kwargs[name] = value
1705 kwargs[name] = value
1709 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1706 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1710
1707
1711 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1708 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1712 def handleerrorunsupportedcontent(op, inpart):
1709 def handleerrorunsupportedcontent(op, inpart):
1713 """Used to transmit unknown content error over the wire"""
1710 """Used to transmit unknown content error over the wire"""
1714 kwargs = {}
1711 kwargs = {}
1715 parttype = inpart.params.get('parttype')
1712 parttype = inpart.params.get('parttype')
1716 if parttype is not None:
1713 if parttype is not None:
1717 kwargs['parttype'] = parttype
1714 kwargs['parttype'] = parttype
1718 params = inpart.params.get('params')
1715 params = inpart.params.get('params')
1719 if params is not None:
1716 if params is not None:
1720 kwargs['params'] = params.split('\0')
1717 kwargs['params'] = params.split('\0')
1721
1718
1722 raise error.BundleUnknownFeatureError(**kwargs)
1719 raise error.BundleUnknownFeatureError(**kwargs)
1723
1720
1724 @parthandler('error:pushraced', ('message',))
1721 @parthandler('error:pushraced', ('message',))
1725 def handleerrorpushraced(op, inpart):
1722 def handleerrorpushraced(op, inpart):
1726 """Used to transmit push race error over the wire"""
1723 """Used to transmit push race error over the wire"""
1727 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1724 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1728
1725
1729 @parthandler('listkeys', ('namespace',))
1726 @parthandler('listkeys', ('namespace',))
1730 def handlelistkeys(op, inpart):
1727 def handlelistkeys(op, inpart):
1731 """retrieve pushkey namespace content stored in a bundle2"""
1728 """retrieve pushkey namespace content stored in a bundle2"""
1732 namespace = inpart.params['namespace']
1729 namespace = inpart.params['namespace']
1733 r = pushkey.decodekeys(inpart.read())
1730 r = pushkey.decodekeys(inpart.read())
1734 op.records.add('listkeys', (namespace, r))
1731 op.records.add('listkeys', (namespace, r))
1735
1732
1736 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1733 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1737 def handlepushkey(op, inpart):
1734 def handlepushkey(op, inpart):
1738 """process a pushkey request"""
1735 """process a pushkey request"""
1739 dec = pushkey.decode
1736 dec = pushkey.decode
1740 namespace = dec(inpart.params['namespace'])
1737 namespace = dec(inpart.params['namespace'])
1741 key = dec(inpart.params['key'])
1738 key = dec(inpart.params['key'])
1742 old = dec(inpart.params['old'])
1739 old = dec(inpart.params['old'])
1743 new = dec(inpart.params['new'])
1740 new = dec(inpart.params['new'])
1744 # Grab the transaction to ensure that we have the lock before performing the
1741 # Grab the transaction to ensure that we have the lock before performing the
1745 # pushkey.
1742 # pushkey.
1746 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1743 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1747 op.gettransaction()
1744 op.gettransaction()
1748 ret = op.repo.pushkey(namespace, key, old, new)
1745 ret = op.repo.pushkey(namespace, key, old, new)
1749 record = {'namespace': namespace,
1746 record = {'namespace': namespace,
1750 'key': key,
1747 'key': key,
1751 'old': old,
1748 'old': old,
1752 'new': new}
1749 'new': new}
1753 op.records.add('pushkey', record)
1750 op.records.add('pushkey', record)
1754 if op.reply is not None:
1751 if op.reply is not None:
1755 rpart = op.reply.newpart('reply:pushkey')
1752 rpart = op.reply.newpart('reply:pushkey')
1756 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1753 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1757 rpart.addparam('return', '%i' % ret, mandatory=False)
1754 rpart.addparam('return', '%i' % ret, mandatory=False)
1758 if inpart.mandatory and not ret:
1755 if inpart.mandatory and not ret:
1759 kwargs = {}
1756 kwargs = {}
1760 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1757 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1761 if key in inpart.params:
1758 if key in inpart.params:
1762 kwargs[key] = inpart.params[key]
1759 kwargs[key] = inpart.params[key]
1763 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1760 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1764
1761
1765 def _readphaseheads(inpart):
1762 def _readphaseheads(inpart):
1766 headsbyphase = [[] for i in phases.allphases]
1763 headsbyphase = [[] for i in phases.allphases]
1767 entrysize = struct.calcsize(_fphasesentry)
1764 entrysize = struct.calcsize(_fphasesentry)
1768 while True:
1765 while True:
1769 entry = inpart.read(entrysize)
1766 entry = inpart.read(entrysize)
1770 if len(entry) < entrysize:
1767 if len(entry) < entrysize:
1771 if entry:
1768 if entry:
1772 raise error.Abort(_('bad phase-heads bundle part'))
1769 raise error.Abort(_('bad phase-heads bundle part'))
1773 break
1770 break
1774 phase, node = struct.unpack(_fphasesentry, entry)
1771 phase, node = struct.unpack(_fphasesentry, entry)
1775 headsbyphase[phase].append(node)
1772 headsbyphase[phase].append(node)
1776 return headsbyphase
1773 return headsbyphase
1777
1774
1778 @parthandler('phase-heads')
1775 @parthandler('phase-heads')
1779 def handlephases(op, inpart):
1776 def handlephases(op, inpart):
1780 """apply phases from bundle part to repo"""
1777 """apply phases from bundle part to repo"""
1781 headsbyphase = _readphaseheads(inpart)
1778 headsbyphase = _readphaseheads(inpart)
1782 addednodes = []
1779 addednodes = []
1783 for entry in op.records['changegroup']:
1780 for entry in op.records['changegroup']:
1784 addednodes.extend(entry['addednodes'])
1781 addednodes.extend(entry['addednodes'])
1785 phases.updatephases(op.repo.unfiltered(), op.gettransaction(), headsbyphase,
1782 phases.updatephases(op.repo.unfiltered(), op.gettransaction(), headsbyphase,
1786 addednodes)
1783 addednodes)
1787
1784
1788 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1785 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1789 def handlepushkeyreply(op, inpart):
1786 def handlepushkeyreply(op, inpart):
1790 """retrieve the result of a pushkey request"""
1787 """retrieve the result of a pushkey request"""
1791 ret = int(inpart.params['return'])
1788 ret = int(inpart.params['return'])
1792 partid = int(inpart.params['in-reply-to'])
1789 partid = int(inpart.params['in-reply-to'])
1793 op.records.add('pushkey', {'return': ret}, partid)
1790 op.records.add('pushkey', {'return': ret}, partid)
1794
1791
1795 @parthandler('obsmarkers')
1792 @parthandler('obsmarkers')
1796 def handleobsmarker(op, inpart):
1793 def handleobsmarker(op, inpart):
1797 """add a stream of obsmarkers to the repo"""
1794 """add a stream of obsmarkers to the repo"""
1798 tr = op.gettransaction()
1795 tr = op.gettransaction()
1799 markerdata = inpart.read()
1796 markerdata = inpart.read()
1800 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1797 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1801 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1798 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1802 % len(markerdata))
1799 % len(markerdata))
1803 # The mergemarkers call will crash if marker creation is not enabled.
1800 # The mergemarkers call will crash if marker creation is not enabled.
1804 # we want to avoid this if the part is advisory.
1801 # we want to avoid this if the part is advisory.
1805 if not inpart.mandatory and op.repo.obsstore.readonly:
1802 if not inpart.mandatory and op.repo.obsstore.readonly:
1806 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1803 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1807 return
1804 return
1808 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1805 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1809 op.repo.invalidatevolatilesets()
1806 op.repo.invalidatevolatilesets()
1810 if new:
1807 if new:
1811 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1808 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1812 op.records.add('obsmarkers', {'new': new})
1809 op.records.add('obsmarkers', {'new': new})
1813 if op.reply is not None:
1810 if op.reply is not None:
1814 rpart = op.reply.newpart('reply:obsmarkers')
1811 rpart = op.reply.newpart('reply:obsmarkers')
1815 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1812 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1816 rpart.addparam('new', '%i' % new, mandatory=False)
1813 rpart.addparam('new', '%i' % new, mandatory=False)
1817
1814
1818
1815
1819 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1816 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1820 def handleobsmarkerreply(op, inpart):
1817 def handleobsmarkerreply(op, inpart):
1821 """retrieve the result of a pushkey request"""
1818 """retrieve the result of a pushkey request"""
1822 ret = int(inpart.params['new'])
1819 ret = int(inpart.params['new'])
1823 partid = int(inpart.params['in-reply-to'])
1820 partid = int(inpart.params['in-reply-to'])
1824 op.records.add('obsmarkers', {'new': ret}, partid)
1821 op.records.add('obsmarkers', {'new': ret}, partid)
1825
1822
1826 @parthandler('hgtagsfnodes')
1823 @parthandler('hgtagsfnodes')
1827 def handlehgtagsfnodes(op, inpart):
1824 def handlehgtagsfnodes(op, inpart):
1828 """Applies .hgtags fnodes cache entries to the local repo.
1825 """Applies .hgtags fnodes cache entries to the local repo.
1829
1826
1830 Payload is pairs of 20 byte changeset nodes and filenodes.
1827 Payload is pairs of 20 byte changeset nodes and filenodes.
1831 """
1828 """
1832 # Grab the transaction so we ensure that we have the lock at this point.
1829 # Grab the transaction so we ensure that we have the lock at this point.
1833 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1830 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1834 op.gettransaction()
1831 op.gettransaction()
1835 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1832 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1836
1833
1837 count = 0
1834 count = 0
1838 while True:
1835 while True:
1839 node = inpart.read(20)
1836 node = inpart.read(20)
1840 fnode = inpart.read(20)
1837 fnode = inpart.read(20)
1841 if len(node) < 20 or len(fnode) < 20:
1838 if len(node) < 20 or len(fnode) < 20:
1842 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1839 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1843 break
1840 break
1844 cache.setfnode(node, fnode)
1841 cache.setfnode(node, fnode)
1845 count += 1
1842 count += 1
1846
1843
1847 cache.write()
1844 cache.write()
1848 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
1845 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
General Comments 0
You need to be logged in to leave comments. Login now