##// END OF EJS Templates
bundle2: add the capability to store hookargs on bundle operation object...
Pulkit Goyal -
r33629:f3407d56 default
parent child Browse files
Show More
@@ -1,1851 +1,1855 b''
1 # bundle2.py - generic container format to transmit arbitrary data.
1 # bundle2.py - generic container format to transmit arbitrary data.
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7 """Handling of the new bundle2 format
7 """Handling of the new bundle2 format
8
8
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 payloads in an application agnostic way. It consist in a sequence of "parts"
10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 that will be handed to and processed by the application layer.
11 that will be handed to and processed by the application layer.
12
12
13
13
14 General format architecture
14 General format architecture
15 ===========================
15 ===========================
16
16
17 The format is architectured as follow
17 The format is architectured as follow
18
18
19 - magic string
19 - magic string
20 - stream level parameters
20 - stream level parameters
21 - payload parts (any number)
21 - payload parts (any number)
22 - end of stream marker.
22 - end of stream marker.
23
23
24 the Binary format
24 the Binary format
25 ============================
25 ============================
26
26
27 All numbers are unsigned and big-endian.
27 All numbers are unsigned and big-endian.
28
28
29 stream level parameters
29 stream level parameters
30 ------------------------
30 ------------------------
31
31
32 Binary format is as follow
32 Binary format is as follow
33
33
34 :params size: int32
34 :params size: int32
35
35
36 The total number of Bytes used by the parameters
36 The total number of Bytes used by the parameters
37
37
38 :params value: arbitrary number of Bytes
38 :params value: arbitrary number of Bytes
39
39
40 A blob of `params size` containing the serialized version of all stream level
40 A blob of `params size` containing the serialized version of all stream level
41 parameters.
41 parameters.
42
42
43 The blob contains a space separated list of parameters. Parameters with value
43 The blob contains a space separated list of parameters. Parameters with value
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45
45
46 Empty name are obviously forbidden.
46 Empty name are obviously forbidden.
47
47
48 Name MUST start with a letter. If this first letter is lower case, the
48 Name MUST start with a letter. If this first letter is lower case, the
49 parameter is advisory and can be safely ignored. However when the first
49 parameter is advisory and can be safely ignored. However when the first
50 letter is capital, the parameter is mandatory and the bundling process MUST
50 letter is capital, the parameter is mandatory and the bundling process MUST
51 stop if he is not able to proceed it.
51 stop if he is not able to proceed it.
52
52
53 Stream parameters use a simple textual format for two main reasons:
53 Stream parameters use a simple textual format for two main reasons:
54
54
55 - Stream level parameters should remain simple and we want to discourage any
55 - Stream level parameters should remain simple and we want to discourage any
56 crazy usage.
56 crazy usage.
57 - Textual data allow easy human inspection of a bundle2 header in case of
57 - Textual data allow easy human inspection of a bundle2 header in case of
58 troubles.
58 troubles.
59
59
60 Any Applicative level options MUST go into a bundle2 part instead.
60 Any Applicative level options MUST go into a bundle2 part instead.
61
61
62 Payload part
62 Payload part
63 ------------------------
63 ------------------------
64
64
65 Binary format is as follow
65 Binary format is as follow
66
66
67 :header size: int32
67 :header size: int32
68
68
69 The total number of Bytes used by the part header. When the header is empty
69 The total number of Bytes used by the part header. When the header is empty
70 (size = 0) this is interpreted as the end of stream marker.
70 (size = 0) this is interpreted as the end of stream marker.
71
71
72 :header:
72 :header:
73
73
74 The header defines how to interpret the part. It contains two piece of
74 The header defines how to interpret the part. It contains two piece of
75 data: the part type, and the part parameters.
75 data: the part type, and the part parameters.
76
76
77 The part type is used to route an application level handler, that can
77 The part type is used to route an application level handler, that can
78 interpret payload.
78 interpret payload.
79
79
80 Part parameters are passed to the application level handler. They are
80 Part parameters are passed to the application level handler. They are
81 meant to convey information that will help the application level object to
81 meant to convey information that will help the application level object to
82 interpret the part payload.
82 interpret the part payload.
83
83
84 The binary format of the header is has follow
84 The binary format of the header is has follow
85
85
86 :typesize: (one byte)
86 :typesize: (one byte)
87
87
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89
89
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 to this part.
91 to this part.
92
92
93 :parameters:
93 :parameters:
94
94
95 Part's parameter may have arbitrary content, the binary structure is::
95 Part's parameter may have arbitrary content, the binary structure is::
96
96
97 <mandatory-count><advisory-count><param-sizes><param-data>
97 <mandatory-count><advisory-count><param-sizes><param-data>
98
98
99 :mandatory-count: 1 byte, number of mandatory parameters
99 :mandatory-count: 1 byte, number of mandatory parameters
100
100
101 :advisory-count: 1 byte, number of advisory parameters
101 :advisory-count: 1 byte, number of advisory parameters
102
102
103 :param-sizes:
103 :param-sizes:
104
104
105 N couple of bytes, where N is the total number of parameters. Each
105 N couple of bytes, where N is the total number of parameters. Each
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107
107
108 :param-data:
108 :param-data:
109
109
110 A blob of bytes from which each parameter key and value can be
110 A blob of bytes from which each parameter key and value can be
111 retrieved using the list of size couples stored in the previous
111 retrieved using the list of size couples stored in the previous
112 field.
112 field.
113
113
114 Mandatory parameters comes first, then the advisory ones.
114 Mandatory parameters comes first, then the advisory ones.
115
115
116 Each parameter's key MUST be unique within the part.
116 Each parameter's key MUST be unique within the part.
117
117
118 :payload:
118 :payload:
119
119
120 payload is a series of `<chunksize><chunkdata>`.
120 payload is a series of `<chunksize><chunkdata>`.
121
121
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124
124
125 The current implementation always produces either zero or one chunk.
125 The current implementation always produces either zero or one chunk.
126 This is an implementation limitation that will ultimately be lifted.
126 This is an implementation limitation that will ultimately be lifted.
127
127
128 `chunksize` can be negative to trigger special case processing. No such
128 `chunksize` can be negative to trigger special case processing. No such
129 processing is in place yet.
129 processing is in place yet.
130
130
131 Bundle processing
131 Bundle processing
132 ============================
132 ============================
133
133
134 Each part is processed in order using a "part handler". Handler are registered
134 Each part is processed in order using a "part handler". Handler are registered
135 for a certain part type.
135 for a certain part type.
136
136
137 The matching of a part to its handler is case insensitive. The case of the
137 The matching of a part to its handler is case insensitive. The case of the
138 part type is used to know if a part is mandatory or advisory. If the Part type
138 part type is used to know if a part is mandatory or advisory. If the Part type
139 contains any uppercase char it is considered mandatory. When no handler is
139 contains any uppercase char it is considered mandatory. When no handler is
140 known for a Mandatory part, the process is aborted and an exception is raised.
140 known for a Mandatory part, the process is aborted and an exception is raised.
141 If the part is advisory and no handler is known, the part is ignored. When the
141 If the part is advisory and no handler is known, the part is ignored. When the
142 process is aborted, the full bundle is still read from the stream to keep the
142 process is aborted, the full bundle is still read from the stream to keep the
143 channel usable. But none of the part read from an abort are processed. In the
143 channel usable. But none of the part read from an abort are processed. In the
144 future, dropping the stream may become an option for channel we do not care to
144 future, dropping the stream may become an option for channel we do not care to
145 preserve.
145 preserve.
146 """
146 """
147
147
148 from __future__ import absolute_import
148 from __future__ import absolute_import
149
149
150 import errno
150 import errno
151 import re
151 import re
152 import string
152 import string
153 import struct
153 import struct
154 import sys
154 import sys
155
155
156 from .i18n import _
156 from .i18n import _
157 from . import (
157 from . import (
158 changegroup,
158 changegroup,
159 error,
159 error,
160 obsolete,
160 obsolete,
161 phases,
161 phases,
162 pushkey,
162 pushkey,
163 pycompat,
163 pycompat,
164 tags,
164 tags,
165 url,
165 url,
166 util,
166 util,
167 )
167 )
168
168
169 urlerr = util.urlerr
169 urlerr = util.urlerr
170 urlreq = util.urlreq
170 urlreq = util.urlreq
171
171
172 _pack = struct.pack
172 _pack = struct.pack
173 _unpack = struct.unpack
173 _unpack = struct.unpack
174
174
175 _fstreamparamsize = '>i'
175 _fstreamparamsize = '>i'
176 _fpartheadersize = '>i'
176 _fpartheadersize = '>i'
177 _fparttypesize = '>B'
177 _fparttypesize = '>B'
178 _fpartid = '>I'
178 _fpartid = '>I'
179 _fpayloadsize = '>i'
179 _fpayloadsize = '>i'
180 _fpartparamcount = '>BB'
180 _fpartparamcount = '>BB'
181
181
182 _fphasesentry = '>i20s'
182 _fphasesentry = '>i20s'
183
183
184 preferedchunksize = 4096
184 preferedchunksize = 4096
185
185
186 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
186 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
187
187
188 def outdebug(ui, message):
188 def outdebug(ui, message):
189 """debug regarding output stream (bundling)"""
189 """debug regarding output stream (bundling)"""
190 if ui.configbool('devel', 'bundle2.debug'):
190 if ui.configbool('devel', 'bundle2.debug'):
191 ui.debug('bundle2-output: %s\n' % message)
191 ui.debug('bundle2-output: %s\n' % message)
192
192
193 def indebug(ui, message):
193 def indebug(ui, message):
194 """debug on input stream (unbundling)"""
194 """debug on input stream (unbundling)"""
195 if ui.configbool('devel', 'bundle2.debug'):
195 if ui.configbool('devel', 'bundle2.debug'):
196 ui.debug('bundle2-input: %s\n' % message)
196 ui.debug('bundle2-input: %s\n' % message)
197
197
198 def validateparttype(parttype):
198 def validateparttype(parttype):
199 """raise ValueError if a parttype contains invalid character"""
199 """raise ValueError if a parttype contains invalid character"""
200 if _parttypeforbidden.search(parttype):
200 if _parttypeforbidden.search(parttype):
201 raise ValueError(parttype)
201 raise ValueError(parttype)
202
202
203 def _makefpartparamsizes(nbparams):
203 def _makefpartparamsizes(nbparams):
204 """return a struct format to read part parameter sizes
204 """return a struct format to read part parameter sizes
205
205
206 The number parameters is variable so we need to build that format
206 The number parameters is variable so we need to build that format
207 dynamically.
207 dynamically.
208 """
208 """
209 return '>'+('BB'*nbparams)
209 return '>'+('BB'*nbparams)
210
210
211 parthandlermapping = {}
211 parthandlermapping = {}
212
212
213 def parthandler(parttype, params=()):
213 def parthandler(parttype, params=()):
214 """decorator that register a function as a bundle2 part handler
214 """decorator that register a function as a bundle2 part handler
215
215
216 eg::
216 eg::
217
217
218 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
218 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
219 def myparttypehandler(...):
219 def myparttypehandler(...):
220 '''process a part of type "my part".'''
220 '''process a part of type "my part".'''
221 ...
221 ...
222 """
222 """
223 validateparttype(parttype)
223 validateparttype(parttype)
224 def _decorator(func):
224 def _decorator(func):
225 lparttype = parttype.lower() # enforce lower case matching.
225 lparttype = parttype.lower() # enforce lower case matching.
226 assert lparttype not in parthandlermapping
226 assert lparttype not in parthandlermapping
227 parthandlermapping[lparttype] = func
227 parthandlermapping[lparttype] = func
228 func.params = frozenset(params)
228 func.params = frozenset(params)
229 return func
229 return func
230 return _decorator
230 return _decorator
231
231
232 class unbundlerecords(object):
232 class unbundlerecords(object):
233 """keep record of what happens during and unbundle
233 """keep record of what happens during and unbundle
234
234
235 New records are added using `records.add('cat', obj)`. Where 'cat' is a
235 New records are added using `records.add('cat', obj)`. Where 'cat' is a
236 category of record and obj is an arbitrary object.
236 category of record and obj is an arbitrary object.
237
237
238 `records['cat']` will return all entries of this category 'cat'.
238 `records['cat']` will return all entries of this category 'cat'.
239
239
240 Iterating on the object itself will yield `('category', obj)` tuples
240 Iterating on the object itself will yield `('category', obj)` tuples
241 for all entries.
241 for all entries.
242
242
243 All iterations happens in chronological order.
243 All iterations happens in chronological order.
244 """
244 """
245
245
246 def __init__(self):
246 def __init__(self):
247 self._categories = {}
247 self._categories = {}
248 self._sequences = []
248 self._sequences = []
249 self._replies = {}
249 self._replies = {}
250
250
251 def add(self, category, entry, inreplyto=None):
251 def add(self, category, entry, inreplyto=None):
252 """add a new record of a given category.
252 """add a new record of a given category.
253
253
254 The entry can then be retrieved in the list returned by
254 The entry can then be retrieved in the list returned by
255 self['category']."""
255 self['category']."""
256 self._categories.setdefault(category, []).append(entry)
256 self._categories.setdefault(category, []).append(entry)
257 self._sequences.append((category, entry))
257 self._sequences.append((category, entry))
258 if inreplyto is not None:
258 if inreplyto is not None:
259 self.getreplies(inreplyto).add(category, entry)
259 self.getreplies(inreplyto).add(category, entry)
260
260
261 def getreplies(self, partid):
261 def getreplies(self, partid):
262 """get the records that are replies to a specific part"""
262 """get the records that are replies to a specific part"""
263 return self._replies.setdefault(partid, unbundlerecords())
263 return self._replies.setdefault(partid, unbundlerecords())
264
264
265 def __getitem__(self, cat):
265 def __getitem__(self, cat):
266 return tuple(self._categories.get(cat, ()))
266 return tuple(self._categories.get(cat, ()))
267
267
268 def __iter__(self):
268 def __iter__(self):
269 return iter(self._sequences)
269 return iter(self._sequences)
270
270
271 def __len__(self):
271 def __len__(self):
272 return len(self._sequences)
272 return len(self._sequences)
273
273
274 def __nonzero__(self):
274 def __nonzero__(self):
275 return bool(self._sequences)
275 return bool(self._sequences)
276
276
277 __bool__ = __nonzero__
277 __bool__ = __nonzero__
278
278
279 class bundleoperation(object):
279 class bundleoperation(object):
280 """an object that represents a single bundling process
280 """an object that represents a single bundling process
281
281
282 Its purpose is to carry unbundle-related objects and states.
282 Its purpose is to carry unbundle-related objects and states.
283
283
284 A new object should be created at the beginning of each bundle processing.
284 A new object should be created at the beginning of each bundle processing.
285 The object is to be returned by the processing function.
285 The object is to be returned by the processing function.
286
286
287 The object has very little content now it will ultimately contain:
287 The object has very little content now it will ultimately contain:
288 * an access to the repo the bundle is applied to,
288 * an access to the repo the bundle is applied to,
289 * a ui object,
289 * a ui object,
290 * a way to retrieve a transaction to add changes to the repo,
290 * a way to retrieve a transaction to add changes to the repo,
291 * a way to record the result of processing each part,
291 * a way to record the result of processing each part,
292 * a way to construct a bundle response when applicable.
292 * a way to construct a bundle response when applicable.
293 """
293 """
294
294
295 def __init__(self, repo, transactiongetter, captureoutput=True):
295 def __init__(self, repo, transactiongetter, captureoutput=True):
296 self.repo = repo
296 self.repo = repo
297 self.ui = repo.ui
297 self.ui = repo.ui
298 self.records = unbundlerecords()
298 self.records = unbundlerecords()
299 self.gettransaction = transactiongetter
299 self.gettransaction = transactiongetter
300 self.reply = None
300 self.reply = None
301 self.captureoutput = captureoutput
301 self.captureoutput = captureoutput
302 self.hookargs = {}
303
304 def addhookargs(self, hookargs):
305 self.hookargs.update(hookargs)
302
306
303 class TransactionUnavailable(RuntimeError):
307 class TransactionUnavailable(RuntimeError):
304 pass
308 pass
305
309
306 def _notransaction():
310 def _notransaction():
307 """default method to get a transaction while processing a bundle
311 """default method to get a transaction while processing a bundle
308
312
309 Raise an exception to highlight the fact that no transaction was expected
313 Raise an exception to highlight the fact that no transaction was expected
310 to be created"""
314 to be created"""
311 raise TransactionUnavailable()
315 raise TransactionUnavailable()
312
316
313 def applybundle(repo, unbundler, tr, source=None, url=None, **kwargs):
317 def applybundle(repo, unbundler, tr, source=None, url=None, **kwargs):
314 # transform me into unbundler.apply() as soon as the freeze is lifted
318 # transform me into unbundler.apply() as soon as the freeze is lifted
315 if isinstance(unbundler, unbundle20):
319 if isinstance(unbundler, unbundle20):
316 tr.hookargs['bundle2'] = '1'
320 tr.hookargs['bundle2'] = '1'
317 if source is not None and 'source' not in tr.hookargs:
321 if source is not None and 'source' not in tr.hookargs:
318 tr.hookargs['source'] = source
322 tr.hookargs['source'] = source
319 if url is not None and 'url' not in tr.hookargs:
323 if url is not None and 'url' not in tr.hookargs:
320 tr.hookargs['url'] = url
324 tr.hookargs['url'] = url
321 return processbundle(repo, unbundler, lambda: tr)
325 return processbundle(repo, unbundler, lambda: tr)
322 else:
326 else:
323 # the transactiongetter won't be used, but we might as well set it
327 # the transactiongetter won't be used, but we might as well set it
324 op = bundleoperation(repo, lambda: tr)
328 op = bundleoperation(repo, lambda: tr)
325 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
329 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
326 return op
330 return op
327
331
328 def processbundle(repo, unbundler, transactiongetter=None, op=None):
332 def processbundle(repo, unbundler, transactiongetter=None, op=None):
329 """This function process a bundle, apply effect to/from a repo
333 """This function process a bundle, apply effect to/from a repo
330
334
331 It iterates over each part then searches for and uses the proper handling
335 It iterates over each part then searches for and uses the proper handling
332 code to process the part. Parts are processed in order.
336 code to process the part. Parts are processed in order.
333
337
334 Unknown Mandatory part will abort the process.
338 Unknown Mandatory part will abort the process.
335
339
336 It is temporarily possible to provide a prebuilt bundleoperation to the
340 It is temporarily possible to provide a prebuilt bundleoperation to the
337 function. This is used to ensure output is properly propagated in case of
341 function. This is used to ensure output is properly propagated in case of
338 an error during the unbundling. This output capturing part will likely be
342 an error during the unbundling. This output capturing part will likely be
339 reworked and this ability will probably go away in the process.
343 reworked and this ability will probably go away in the process.
340 """
344 """
341 if op is None:
345 if op is None:
342 if transactiongetter is None:
346 if transactiongetter is None:
343 transactiongetter = _notransaction
347 transactiongetter = _notransaction
344 op = bundleoperation(repo, transactiongetter)
348 op = bundleoperation(repo, transactiongetter)
345 # todo:
349 # todo:
346 # - replace this is a init function soon.
350 # - replace this is a init function soon.
347 # - exception catching
351 # - exception catching
348 unbundler.params
352 unbundler.params
349 if repo.ui.debugflag:
353 if repo.ui.debugflag:
350 msg = ['bundle2-input-bundle:']
354 msg = ['bundle2-input-bundle:']
351 if unbundler.params:
355 if unbundler.params:
352 msg.append(' %i params' % len(unbundler.params))
356 msg.append(' %i params' % len(unbundler.params))
353 if op.gettransaction is None or op.gettransaction is _notransaction:
357 if op.gettransaction is None or op.gettransaction is _notransaction:
354 msg.append(' no-transaction')
358 msg.append(' no-transaction')
355 else:
359 else:
356 msg.append(' with-transaction')
360 msg.append(' with-transaction')
357 msg.append('\n')
361 msg.append('\n')
358 repo.ui.debug(''.join(msg))
362 repo.ui.debug(''.join(msg))
359 iterparts = enumerate(unbundler.iterparts())
363 iterparts = enumerate(unbundler.iterparts())
360 part = None
364 part = None
361 nbpart = 0
365 nbpart = 0
362 try:
366 try:
363 for nbpart, part in iterparts:
367 for nbpart, part in iterparts:
364 _processpart(op, part)
368 _processpart(op, part)
365 except Exception as exc:
369 except Exception as exc:
366 # Any exceptions seeking to the end of the bundle at this point are
370 # Any exceptions seeking to the end of the bundle at this point are
367 # almost certainly related to the underlying stream being bad.
371 # almost certainly related to the underlying stream being bad.
368 # And, chances are that the exception we're handling is related to
372 # And, chances are that the exception we're handling is related to
369 # getting in that bad state. So, we swallow the seeking error and
373 # getting in that bad state. So, we swallow the seeking error and
370 # re-raise the original error.
374 # re-raise the original error.
371 seekerror = False
375 seekerror = False
372 try:
376 try:
373 for nbpart, part in iterparts:
377 for nbpart, part in iterparts:
374 # consume the bundle content
378 # consume the bundle content
375 part.seek(0, 2)
379 part.seek(0, 2)
376 except Exception:
380 except Exception:
377 seekerror = True
381 seekerror = True
378
382
379 # Small hack to let caller code distinguish exceptions from bundle2
383 # Small hack to let caller code distinguish exceptions from bundle2
380 # processing from processing the old format. This is mostly
384 # processing from processing the old format. This is mostly
381 # needed to handle different return codes to unbundle according to the
385 # needed to handle different return codes to unbundle according to the
382 # type of bundle. We should probably clean up or drop this return code
386 # type of bundle. We should probably clean up or drop this return code
383 # craziness in a future version.
387 # craziness in a future version.
384 exc.duringunbundle2 = True
388 exc.duringunbundle2 = True
385 salvaged = []
389 salvaged = []
386 replycaps = None
390 replycaps = None
387 if op.reply is not None:
391 if op.reply is not None:
388 salvaged = op.reply.salvageoutput()
392 salvaged = op.reply.salvageoutput()
389 replycaps = op.reply.capabilities
393 replycaps = op.reply.capabilities
390 exc._replycaps = replycaps
394 exc._replycaps = replycaps
391 exc._bundle2salvagedoutput = salvaged
395 exc._bundle2salvagedoutput = salvaged
392
396
393 # Re-raising from a variable loses the original stack. So only use
397 # Re-raising from a variable loses the original stack. So only use
394 # that form if we need to.
398 # that form if we need to.
395 if seekerror:
399 if seekerror:
396 raise exc
400 raise exc
397 else:
401 else:
398 raise
402 raise
399 finally:
403 finally:
400 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
404 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
401
405
402 return op
406 return op
403
407
404 def _processchangegroup(op, cg, tr, source, url, **kwargs):
408 def _processchangegroup(op, cg, tr, source, url, **kwargs):
405 ret = cg.apply(op.repo, tr, source, url, **kwargs)
409 ret = cg.apply(op.repo, tr, source, url, **kwargs)
406 op.records.add('changegroup', {
410 op.records.add('changegroup', {
407 'return': ret,
411 'return': ret,
408 })
412 })
409 return ret
413 return ret
410
414
411 def _processpart(op, part):
415 def _processpart(op, part):
412 """process a single part from a bundle
416 """process a single part from a bundle
413
417
414 The part is guaranteed to have been fully consumed when the function exits
418 The part is guaranteed to have been fully consumed when the function exits
415 (even if an exception is raised)."""
419 (even if an exception is raised)."""
416 status = 'unknown' # used by debug output
420 status = 'unknown' # used by debug output
417 hardabort = False
421 hardabort = False
418 try:
422 try:
419 try:
423 try:
420 handler = parthandlermapping.get(part.type)
424 handler = parthandlermapping.get(part.type)
421 if handler is None:
425 if handler is None:
422 status = 'unsupported-type'
426 status = 'unsupported-type'
423 raise error.BundleUnknownFeatureError(parttype=part.type)
427 raise error.BundleUnknownFeatureError(parttype=part.type)
424 indebug(op.ui, 'found a handler for part %r' % part.type)
428 indebug(op.ui, 'found a handler for part %r' % part.type)
425 unknownparams = part.mandatorykeys - handler.params
429 unknownparams = part.mandatorykeys - handler.params
426 if unknownparams:
430 if unknownparams:
427 unknownparams = list(unknownparams)
431 unknownparams = list(unknownparams)
428 unknownparams.sort()
432 unknownparams.sort()
429 status = 'unsupported-params (%s)' % unknownparams
433 status = 'unsupported-params (%s)' % unknownparams
430 raise error.BundleUnknownFeatureError(parttype=part.type,
434 raise error.BundleUnknownFeatureError(parttype=part.type,
431 params=unknownparams)
435 params=unknownparams)
432 status = 'supported'
436 status = 'supported'
433 except error.BundleUnknownFeatureError as exc:
437 except error.BundleUnknownFeatureError as exc:
434 if part.mandatory: # mandatory parts
438 if part.mandatory: # mandatory parts
435 raise
439 raise
436 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
440 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
437 return # skip to part processing
441 return # skip to part processing
438 finally:
442 finally:
439 if op.ui.debugflag:
443 if op.ui.debugflag:
440 msg = ['bundle2-input-part: "%s"' % part.type]
444 msg = ['bundle2-input-part: "%s"' % part.type]
441 if not part.mandatory:
445 if not part.mandatory:
442 msg.append(' (advisory)')
446 msg.append(' (advisory)')
443 nbmp = len(part.mandatorykeys)
447 nbmp = len(part.mandatorykeys)
444 nbap = len(part.params) - nbmp
448 nbap = len(part.params) - nbmp
445 if nbmp or nbap:
449 if nbmp or nbap:
446 msg.append(' (params:')
450 msg.append(' (params:')
447 if nbmp:
451 if nbmp:
448 msg.append(' %i mandatory' % nbmp)
452 msg.append(' %i mandatory' % nbmp)
449 if nbap:
453 if nbap:
450 msg.append(' %i advisory' % nbmp)
454 msg.append(' %i advisory' % nbmp)
451 msg.append(')')
455 msg.append(')')
452 msg.append(' %s\n' % status)
456 msg.append(' %s\n' % status)
453 op.ui.debug(''.join(msg))
457 op.ui.debug(''.join(msg))
454
458
455 # handler is called outside the above try block so that we don't
459 # handler is called outside the above try block so that we don't
456 # risk catching KeyErrors from anything other than the
460 # risk catching KeyErrors from anything other than the
457 # parthandlermapping lookup (any KeyError raised by handler()
461 # parthandlermapping lookup (any KeyError raised by handler()
458 # itself represents a defect of a different variety).
462 # itself represents a defect of a different variety).
459 output = None
463 output = None
460 if op.captureoutput and op.reply is not None:
464 if op.captureoutput and op.reply is not None:
461 op.ui.pushbuffer(error=True, subproc=True)
465 op.ui.pushbuffer(error=True, subproc=True)
462 output = ''
466 output = ''
463 try:
467 try:
464 handler(op, part)
468 handler(op, part)
465 finally:
469 finally:
466 if output is not None:
470 if output is not None:
467 output = op.ui.popbuffer()
471 output = op.ui.popbuffer()
468 if output:
472 if output:
469 outpart = op.reply.newpart('output', data=output,
473 outpart = op.reply.newpart('output', data=output,
470 mandatory=False)
474 mandatory=False)
471 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
475 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
472 # If exiting or interrupted, do not attempt to seek the stream in the
476 # If exiting or interrupted, do not attempt to seek the stream in the
473 # finally block below. This makes abort faster.
477 # finally block below. This makes abort faster.
474 except (SystemExit, KeyboardInterrupt):
478 except (SystemExit, KeyboardInterrupt):
475 hardabort = True
479 hardabort = True
476 raise
480 raise
477 finally:
481 finally:
478 # consume the part content to not corrupt the stream.
482 # consume the part content to not corrupt the stream.
479 if not hardabort:
483 if not hardabort:
480 part.seek(0, 2)
484 part.seek(0, 2)
481
485
482
486
483 def decodecaps(blob):
487 def decodecaps(blob):
484 """decode a bundle2 caps bytes blob into a dictionary
488 """decode a bundle2 caps bytes blob into a dictionary
485
489
486 The blob is a list of capabilities (one per line)
490 The blob is a list of capabilities (one per line)
487 Capabilities may have values using a line of the form::
491 Capabilities may have values using a line of the form::
488
492
489 capability=value1,value2,value3
493 capability=value1,value2,value3
490
494
491 The values are always a list."""
495 The values are always a list."""
492 caps = {}
496 caps = {}
493 for line in blob.splitlines():
497 for line in blob.splitlines():
494 if not line:
498 if not line:
495 continue
499 continue
496 if '=' not in line:
500 if '=' not in line:
497 key, vals = line, ()
501 key, vals = line, ()
498 else:
502 else:
499 key, vals = line.split('=', 1)
503 key, vals = line.split('=', 1)
500 vals = vals.split(',')
504 vals = vals.split(',')
501 key = urlreq.unquote(key)
505 key = urlreq.unquote(key)
502 vals = [urlreq.unquote(v) for v in vals]
506 vals = [urlreq.unquote(v) for v in vals]
503 caps[key] = vals
507 caps[key] = vals
504 return caps
508 return caps
505
509
506 def encodecaps(caps):
510 def encodecaps(caps):
507 """encode a bundle2 caps dictionary into a bytes blob"""
511 """encode a bundle2 caps dictionary into a bytes blob"""
508 chunks = []
512 chunks = []
509 for ca in sorted(caps):
513 for ca in sorted(caps):
510 vals = caps[ca]
514 vals = caps[ca]
511 ca = urlreq.quote(ca)
515 ca = urlreq.quote(ca)
512 vals = [urlreq.quote(v) for v in vals]
516 vals = [urlreq.quote(v) for v in vals]
513 if vals:
517 if vals:
514 ca = "%s=%s" % (ca, ','.join(vals))
518 ca = "%s=%s" % (ca, ','.join(vals))
515 chunks.append(ca)
519 chunks.append(ca)
516 return '\n'.join(chunks)
520 return '\n'.join(chunks)
517
521
518 bundletypes = {
522 bundletypes = {
519 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
523 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
520 # since the unification ssh accepts a header but there
524 # since the unification ssh accepts a header but there
521 # is no capability signaling it.
525 # is no capability signaling it.
522 "HG20": (), # special-cased below
526 "HG20": (), # special-cased below
523 "HG10UN": ("HG10UN", 'UN'),
527 "HG10UN": ("HG10UN", 'UN'),
524 "HG10BZ": ("HG10", 'BZ'),
528 "HG10BZ": ("HG10", 'BZ'),
525 "HG10GZ": ("HG10GZ", 'GZ'),
529 "HG10GZ": ("HG10GZ", 'GZ'),
526 }
530 }
527
531
528 # hgweb uses this list to communicate its preferred type
532 # hgweb uses this list to communicate its preferred type
529 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
533 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
530
534
531 class bundle20(object):
535 class bundle20(object):
532 """represent an outgoing bundle2 container
536 """represent an outgoing bundle2 container
533
537
534 Use the `addparam` method to add stream level parameter. and `newpart` to
538 Use the `addparam` method to add stream level parameter. and `newpart` to
535 populate it. Then call `getchunks` to retrieve all the binary chunks of
539 populate it. Then call `getchunks` to retrieve all the binary chunks of
536 data that compose the bundle2 container."""
540 data that compose the bundle2 container."""
537
541
538 _magicstring = 'HG20'
542 _magicstring = 'HG20'
539
543
540 def __init__(self, ui, capabilities=()):
544 def __init__(self, ui, capabilities=()):
541 self.ui = ui
545 self.ui = ui
542 self._params = []
546 self._params = []
543 self._parts = []
547 self._parts = []
544 self.capabilities = dict(capabilities)
548 self.capabilities = dict(capabilities)
545 self._compengine = util.compengines.forbundletype('UN')
549 self._compengine = util.compengines.forbundletype('UN')
546 self._compopts = None
550 self._compopts = None
547
551
548 def setcompression(self, alg, compopts=None):
552 def setcompression(self, alg, compopts=None):
549 """setup core part compression to <alg>"""
553 """setup core part compression to <alg>"""
550 if alg in (None, 'UN'):
554 if alg in (None, 'UN'):
551 return
555 return
552 assert not any(n.lower() == 'compression' for n, v in self._params)
556 assert not any(n.lower() == 'compression' for n, v in self._params)
553 self.addparam('Compression', alg)
557 self.addparam('Compression', alg)
554 self._compengine = util.compengines.forbundletype(alg)
558 self._compengine = util.compengines.forbundletype(alg)
555 self._compopts = compopts
559 self._compopts = compopts
556
560
557 @property
561 @property
558 def nbparts(self):
562 def nbparts(self):
559 """total number of parts added to the bundler"""
563 """total number of parts added to the bundler"""
560 return len(self._parts)
564 return len(self._parts)
561
565
562 # methods used to defines the bundle2 content
566 # methods used to defines the bundle2 content
563 def addparam(self, name, value=None):
567 def addparam(self, name, value=None):
564 """add a stream level parameter"""
568 """add a stream level parameter"""
565 if not name:
569 if not name:
566 raise ValueError('empty parameter name')
570 raise ValueError('empty parameter name')
567 if name[0] not in string.letters:
571 if name[0] not in string.letters:
568 raise ValueError('non letter first character: %r' % name)
572 raise ValueError('non letter first character: %r' % name)
569 self._params.append((name, value))
573 self._params.append((name, value))
570
574
571 def addpart(self, part):
575 def addpart(self, part):
572 """add a new part to the bundle2 container
576 """add a new part to the bundle2 container
573
577
574 Parts contains the actual applicative payload."""
578 Parts contains the actual applicative payload."""
575 assert part.id is None
579 assert part.id is None
576 part.id = len(self._parts) # very cheap counter
580 part.id = len(self._parts) # very cheap counter
577 self._parts.append(part)
581 self._parts.append(part)
578
582
579 def newpart(self, typeid, *args, **kwargs):
583 def newpart(self, typeid, *args, **kwargs):
580 """create a new part and add it to the containers
584 """create a new part and add it to the containers
581
585
582 As the part is directly added to the containers. For now, this means
586 As the part is directly added to the containers. For now, this means
583 that any failure to properly initialize the part after calling
587 that any failure to properly initialize the part after calling
584 ``newpart`` should result in a failure of the whole bundling process.
588 ``newpart`` should result in a failure of the whole bundling process.
585
589
586 You can still fall back to manually create and add if you need better
590 You can still fall back to manually create and add if you need better
587 control."""
591 control."""
588 part = bundlepart(typeid, *args, **kwargs)
592 part = bundlepart(typeid, *args, **kwargs)
589 self.addpart(part)
593 self.addpart(part)
590 return part
594 return part
591
595
592 # methods used to generate the bundle2 stream
596 # methods used to generate the bundle2 stream
593 def getchunks(self):
597 def getchunks(self):
594 if self.ui.debugflag:
598 if self.ui.debugflag:
595 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
599 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
596 if self._params:
600 if self._params:
597 msg.append(' (%i params)' % len(self._params))
601 msg.append(' (%i params)' % len(self._params))
598 msg.append(' %i parts total\n' % len(self._parts))
602 msg.append(' %i parts total\n' % len(self._parts))
599 self.ui.debug(''.join(msg))
603 self.ui.debug(''.join(msg))
600 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
604 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
601 yield self._magicstring
605 yield self._magicstring
602 param = self._paramchunk()
606 param = self._paramchunk()
603 outdebug(self.ui, 'bundle parameter: %s' % param)
607 outdebug(self.ui, 'bundle parameter: %s' % param)
604 yield _pack(_fstreamparamsize, len(param))
608 yield _pack(_fstreamparamsize, len(param))
605 if param:
609 if param:
606 yield param
610 yield param
607 for chunk in self._compengine.compressstream(self._getcorechunk(),
611 for chunk in self._compengine.compressstream(self._getcorechunk(),
608 self._compopts):
612 self._compopts):
609 yield chunk
613 yield chunk
610
614
611 def _paramchunk(self):
615 def _paramchunk(self):
612 """return a encoded version of all stream parameters"""
616 """return a encoded version of all stream parameters"""
613 blocks = []
617 blocks = []
614 for par, value in self._params:
618 for par, value in self._params:
615 par = urlreq.quote(par)
619 par = urlreq.quote(par)
616 if value is not None:
620 if value is not None:
617 value = urlreq.quote(value)
621 value = urlreq.quote(value)
618 par = '%s=%s' % (par, value)
622 par = '%s=%s' % (par, value)
619 blocks.append(par)
623 blocks.append(par)
620 return ' '.join(blocks)
624 return ' '.join(blocks)
621
625
622 def _getcorechunk(self):
626 def _getcorechunk(self):
623 """yield chunk for the core part of the bundle
627 """yield chunk for the core part of the bundle
624
628
625 (all but headers and parameters)"""
629 (all but headers and parameters)"""
626 outdebug(self.ui, 'start of parts')
630 outdebug(self.ui, 'start of parts')
627 for part in self._parts:
631 for part in self._parts:
628 outdebug(self.ui, 'bundle part: "%s"' % part.type)
632 outdebug(self.ui, 'bundle part: "%s"' % part.type)
629 for chunk in part.getchunks(ui=self.ui):
633 for chunk in part.getchunks(ui=self.ui):
630 yield chunk
634 yield chunk
631 outdebug(self.ui, 'end of bundle')
635 outdebug(self.ui, 'end of bundle')
632 yield _pack(_fpartheadersize, 0)
636 yield _pack(_fpartheadersize, 0)
633
637
634
638
635 def salvageoutput(self):
639 def salvageoutput(self):
636 """return a list with a copy of all output parts in the bundle
640 """return a list with a copy of all output parts in the bundle
637
641
638 This is meant to be used during error handling to make sure we preserve
642 This is meant to be used during error handling to make sure we preserve
639 server output"""
643 server output"""
640 salvaged = []
644 salvaged = []
641 for part in self._parts:
645 for part in self._parts:
642 if part.type.startswith('output'):
646 if part.type.startswith('output'):
643 salvaged.append(part.copy())
647 salvaged.append(part.copy())
644 return salvaged
648 return salvaged
645
649
646
650
647 class unpackermixin(object):
651 class unpackermixin(object):
648 """A mixin to extract bytes and struct data from a stream"""
652 """A mixin to extract bytes and struct data from a stream"""
649
653
650 def __init__(self, fp):
654 def __init__(self, fp):
651 self._fp = fp
655 self._fp = fp
652
656
653 def _unpack(self, format):
657 def _unpack(self, format):
654 """unpack this struct format from the stream
658 """unpack this struct format from the stream
655
659
656 This method is meant for internal usage by the bundle2 protocol only.
660 This method is meant for internal usage by the bundle2 protocol only.
657 They directly manipulate the low level stream including bundle2 level
661 They directly manipulate the low level stream including bundle2 level
658 instruction.
662 instruction.
659
663
660 Do not use it to implement higher-level logic or methods."""
664 Do not use it to implement higher-level logic or methods."""
661 data = self._readexact(struct.calcsize(format))
665 data = self._readexact(struct.calcsize(format))
662 return _unpack(format, data)
666 return _unpack(format, data)
663
667
664 def _readexact(self, size):
668 def _readexact(self, size):
665 """read exactly <size> bytes from the stream
669 """read exactly <size> bytes from the stream
666
670
667 This method is meant for internal usage by the bundle2 protocol only.
671 This method is meant for internal usage by the bundle2 protocol only.
668 They directly manipulate the low level stream including bundle2 level
672 They directly manipulate the low level stream including bundle2 level
669 instruction.
673 instruction.
670
674
671 Do not use it to implement higher-level logic or methods."""
675 Do not use it to implement higher-level logic or methods."""
672 return changegroup.readexactly(self._fp, size)
676 return changegroup.readexactly(self._fp, size)
673
677
674 def getunbundler(ui, fp, magicstring=None):
678 def getunbundler(ui, fp, magicstring=None):
675 """return a valid unbundler object for a given magicstring"""
679 """return a valid unbundler object for a given magicstring"""
676 if magicstring is None:
680 if magicstring is None:
677 magicstring = changegroup.readexactly(fp, 4)
681 magicstring = changegroup.readexactly(fp, 4)
678 magic, version = magicstring[0:2], magicstring[2:4]
682 magic, version = magicstring[0:2], magicstring[2:4]
679 if magic != 'HG':
683 if magic != 'HG':
680 ui.debug(
684 ui.debug(
681 "error: invalid magic: %r (version %r), should be 'HG'\n"
685 "error: invalid magic: %r (version %r), should be 'HG'\n"
682 % (magic, version))
686 % (magic, version))
683 raise error.Abort(_('not a Mercurial bundle'))
687 raise error.Abort(_('not a Mercurial bundle'))
684 unbundlerclass = formatmap.get(version)
688 unbundlerclass = formatmap.get(version)
685 if unbundlerclass is None:
689 if unbundlerclass is None:
686 raise error.Abort(_('unknown bundle version %s') % version)
690 raise error.Abort(_('unknown bundle version %s') % version)
687 unbundler = unbundlerclass(ui, fp)
691 unbundler = unbundlerclass(ui, fp)
688 indebug(ui, 'start processing of %s stream' % magicstring)
692 indebug(ui, 'start processing of %s stream' % magicstring)
689 return unbundler
693 return unbundler
690
694
691 class unbundle20(unpackermixin):
695 class unbundle20(unpackermixin):
692 """interpret a bundle2 stream
696 """interpret a bundle2 stream
693
697
694 This class is fed with a binary stream and yields parts through its
698 This class is fed with a binary stream and yields parts through its
695 `iterparts` methods."""
699 `iterparts` methods."""
696
700
697 _magicstring = 'HG20'
701 _magicstring = 'HG20'
698
702
699 def __init__(self, ui, fp):
703 def __init__(self, ui, fp):
700 """If header is specified, we do not read it out of the stream."""
704 """If header is specified, we do not read it out of the stream."""
701 self.ui = ui
705 self.ui = ui
702 self._compengine = util.compengines.forbundletype('UN')
706 self._compengine = util.compengines.forbundletype('UN')
703 self._compressed = None
707 self._compressed = None
704 super(unbundle20, self).__init__(fp)
708 super(unbundle20, self).__init__(fp)
705
709
706 @util.propertycache
710 @util.propertycache
707 def params(self):
711 def params(self):
708 """dictionary of stream level parameters"""
712 """dictionary of stream level parameters"""
709 indebug(self.ui, 'reading bundle2 stream parameters')
713 indebug(self.ui, 'reading bundle2 stream parameters')
710 params = {}
714 params = {}
711 paramssize = self._unpack(_fstreamparamsize)[0]
715 paramssize = self._unpack(_fstreamparamsize)[0]
712 if paramssize < 0:
716 if paramssize < 0:
713 raise error.BundleValueError('negative bundle param size: %i'
717 raise error.BundleValueError('negative bundle param size: %i'
714 % paramssize)
718 % paramssize)
715 if paramssize:
719 if paramssize:
716 params = self._readexact(paramssize)
720 params = self._readexact(paramssize)
717 params = self._processallparams(params)
721 params = self._processallparams(params)
718 return params
722 return params
719
723
720 def _processallparams(self, paramsblock):
724 def _processallparams(self, paramsblock):
721 """"""
725 """"""
722 params = util.sortdict()
726 params = util.sortdict()
723 for p in paramsblock.split(' '):
727 for p in paramsblock.split(' '):
724 p = p.split('=', 1)
728 p = p.split('=', 1)
725 p = [urlreq.unquote(i) for i in p]
729 p = [urlreq.unquote(i) for i in p]
726 if len(p) < 2:
730 if len(p) < 2:
727 p.append(None)
731 p.append(None)
728 self._processparam(*p)
732 self._processparam(*p)
729 params[p[0]] = p[1]
733 params[p[0]] = p[1]
730 return params
734 return params
731
735
732
736
733 def _processparam(self, name, value):
737 def _processparam(self, name, value):
734 """process a parameter, applying its effect if needed
738 """process a parameter, applying its effect if needed
735
739
736 Parameter starting with a lower case letter are advisory and will be
740 Parameter starting with a lower case letter are advisory and will be
737 ignored when unknown. Those starting with an upper case letter are
741 ignored when unknown. Those starting with an upper case letter are
738 mandatory and will this function will raise a KeyError when unknown.
742 mandatory and will this function will raise a KeyError when unknown.
739
743
740 Note: no option are currently supported. Any input will be either
744 Note: no option are currently supported. Any input will be either
741 ignored or failing.
745 ignored or failing.
742 """
746 """
743 if not name:
747 if not name:
744 raise ValueError('empty parameter name')
748 raise ValueError('empty parameter name')
745 if name[0] not in string.letters:
749 if name[0] not in string.letters:
746 raise ValueError('non letter first character: %r' % name)
750 raise ValueError('non letter first character: %r' % name)
747 try:
751 try:
748 handler = b2streamparamsmap[name.lower()]
752 handler = b2streamparamsmap[name.lower()]
749 except KeyError:
753 except KeyError:
750 if name[0].islower():
754 if name[0].islower():
751 indebug(self.ui, "ignoring unknown parameter %r" % name)
755 indebug(self.ui, "ignoring unknown parameter %r" % name)
752 else:
756 else:
753 raise error.BundleUnknownFeatureError(params=(name,))
757 raise error.BundleUnknownFeatureError(params=(name,))
754 else:
758 else:
755 handler(self, name, value)
759 handler(self, name, value)
756
760
757 def _forwardchunks(self):
761 def _forwardchunks(self):
758 """utility to transfer a bundle2 as binary
762 """utility to transfer a bundle2 as binary
759
763
760 This is made necessary by the fact the 'getbundle' command over 'ssh'
764 This is made necessary by the fact the 'getbundle' command over 'ssh'
761 have no way to know then the reply end, relying on the bundle to be
765 have no way to know then the reply end, relying on the bundle to be
762 interpreted to know its end. This is terrible and we are sorry, but we
766 interpreted to know its end. This is terrible and we are sorry, but we
763 needed to move forward to get general delta enabled.
767 needed to move forward to get general delta enabled.
764 """
768 """
765 yield self._magicstring
769 yield self._magicstring
766 assert 'params' not in vars(self)
770 assert 'params' not in vars(self)
767 paramssize = self._unpack(_fstreamparamsize)[0]
771 paramssize = self._unpack(_fstreamparamsize)[0]
768 if paramssize < 0:
772 if paramssize < 0:
769 raise error.BundleValueError('negative bundle param size: %i'
773 raise error.BundleValueError('negative bundle param size: %i'
770 % paramssize)
774 % paramssize)
771 yield _pack(_fstreamparamsize, paramssize)
775 yield _pack(_fstreamparamsize, paramssize)
772 if paramssize:
776 if paramssize:
773 params = self._readexact(paramssize)
777 params = self._readexact(paramssize)
774 self._processallparams(params)
778 self._processallparams(params)
775 yield params
779 yield params
776 assert self._compengine.bundletype == 'UN'
780 assert self._compengine.bundletype == 'UN'
777 # From there, payload might need to be decompressed
781 # From there, payload might need to be decompressed
778 self._fp = self._compengine.decompressorreader(self._fp)
782 self._fp = self._compengine.decompressorreader(self._fp)
779 emptycount = 0
783 emptycount = 0
780 while emptycount < 2:
784 while emptycount < 2:
781 # so we can brainlessly loop
785 # so we can brainlessly loop
782 assert _fpartheadersize == _fpayloadsize
786 assert _fpartheadersize == _fpayloadsize
783 size = self._unpack(_fpartheadersize)[0]
787 size = self._unpack(_fpartheadersize)[0]
784 yield _pack(_fpartheadersize, size)
788 yield _pack(_fpartheadersize, size)
785 if size:
789 if size:
786 emptycount = 0
790 emptycount = 0
787 else:
791 else:
788 emptycount += 1
792 emptycount += 1
789 continue
793 continue
790 if size == flaginterrupt:
794 if size == flaginterrupt:
791 continue
795 continue
792 elif size < 0:
796 elif size < 0:
793 raise error.BundleValueError('negative chunk size: %i')
797 raise error.BundleValueError('negative chunk size: %i')
794 yield self._readexact(size)
798 yield self._readexact(size)
795
799
796
800
797 def iterparts(self):
801 def iterparts(self):
798 """yield all parts contained in the stream"""
802 """yield all parts contained in the stream"""
799 # make sure param have been loaded
803 # make sure param have been loaded
800 self.params
804 self.params
801 # From there, payload need to be decompressed
805 # From there, payload need to be decompressed
802 self._fp = self._compengine.decompressorreader(self._fp)
806 self._fp = self._compengine.decompressorreader(self._fp)
803 indebug(self.ui, 'start extraction of bundle2 parts')
807 indebug(self.ui, 'start extraction of bundle2 parts')
804 headerblock = self._readpartheader()
808 headerblock = self._readpartheader()
805 while headerblock is not None:
809 while headerblock is not None:
806 part = unbundlepart(self.ui, headerblock, self._fp)
810 part = unbundlepart(self.ui, headerblock, self._fp)
807 yield part
811 yield part
808 part.seek(0, 2)
812 part.seek(0, 2)
809 headerblock = self._readpartheader()
813 headerblock = self._readpartheader()
810 indebug(self.ui, 'end of bundle2 stream')
814 indebug(self.ui, 'end of bundle2 stream')
811
815
812 def _readpartheader(self):
816 def _readpartheader(self):
813 """reads a part header size and return the bytes blob
817 """reads a part header size and return the bytes blob
814
818
815 returns None if empty"""
819 returns None if empty"""
816 headersize = self._unpack(_fpartheadersize)[0]
820 headersize = self._unpack(_fpartheadersize)[0]
817 if headersize < 0:
821 if headersize < 0:
818 raise error.BundleValueError('negative part header size: %i'
822 raise error.BundleValueError('negative part header size: %i'
819 % headersize)
823 % headersize)
820 indebug(self.ui, 'part header size: %i' % headersize)
824 indebug(self.ui, 'part header size: %i' % headersize)
821 if headersize:
825 if headersize:
822 return self._readexact(headersize)
826 return self._readexact(headersize)
823 return None
827 return None
824
828
825 def compressed(self):
829 def compressed(self):
826 self.params # load params
830 self.params # load params
827 return self._compressed
831 return self._compressed
828
832
829 def close(self):
833 def close(self):
830 """close underlying file"""
834 """close underlying file"""
831 if util.safehasattr(self._fp, 'close'):
835 if util.safehasattr(self._fp, 'close'):
832 return self._fp.close()
836 return self._fp.close()
833
837
834 formatmap = {'20': unbundle20}
838 formatmap = {'20': unbundle20}
835
839
836 b2streamparamsmap = {}
840 b2streamparamsmap = {}
837
841
838 def b2streamparamhandler(name):
842 def b2streamparamhandler(name):
839 """register a handler for a stream level parameter"""
843 """register a handler for a stream level parameter"""
840 def decorator(func):
844 def decorator(func):
841 assert name not in formatmap
845 assert name not in formatmap
842 b2streamparamsmap[name] = func
846 b2streamparamsmap[name] = func
843 return func
847 return func
844 return decorator
848 return decorator
845
849
846 @b2streamparamhandler('compression')
850 @b2streamparamhandler('compression')
847 def processcompression(unbundler, param, value):
851 def processcompression(unbundler, param, value):
848 """read compression parameter and install payload decompression"""
852 """read compression parameter and install payload decompression"""
849 if value not in util.compengines.supportedbundletypes:
853 if value not in util.compengines.supportedbundletypes:
850 raise error.BundleUnknownFeatureError(params=(param,),
854 raise error.BundleUnknownFeatureError(params=(param,),
851 values=(value,))
855 values=(value,))
852 unbundler._compengine = util.compengines.forbundletype(value)
856 unbundler._compengine = util.compengines.forbundletype(value)
853 if value is not None:
857 if value is not None:
854 unbundler._compressed = True
858 unbundler._compressed = True
855
859
856 class bundlepart(object):
860 class bundlepart(object):
857 """A bundle2 part contains application level payload
861 """A bundle2 part contains application level payload
858
862
859 The part `type` is used to route the part to the application level
863 The part `type` is used to route the part to the application level
860 handler.
864 handler.
861
865
862 The part payload is contained in ``part.data``. It could be raw bytes or a
866 The part payload is contained in ``part.data``. It could be raw bytes or a
863 generator of byte chunks.
867 generator of byte chunks.
864
868
865 You can add parameters to the part using the ``addparam`` method.
869 You can add parameters to the part using the ``addparam`` method.
866 Parameters can be either mandatory (default) or advisory. Remote side
870 Parameters can be either mandatory (default) or advisory. Remote side
867 should be able to safely ignore the advisory ones.
871 should be able to safely ignore the advisory ones.
868
872
869 Both data and parameters cannot be modified after the generation has begun.
873 Both data and parameters cannot be modified after the generation has begun.
870 """
874 """
871
875
872 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
876 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
873 data='', mandatory=True):
877 data='', mandatory=True):
874 validateparttype(parttype)
878 validateparttype(parttype)
875 self.id = None
879 self.id = None
876 self.type = parttype
880 self.type = parttype
877 self._data = data
881 self._data = data
878 self._mandatoryparams = list(mandatoryparams)
882 self._mandatoryparams = list(mandatoryparams)
879 self._advisoryparams = list(advisoryparams)
883 self._advisoryparams = list(advisoryparams)
880 # checking for duplicated entries
884 # checking for duplicated entries
881 self._seenparams = set()
885 self._seenparams = set()
882 for pname, __ in self._mandatoryparams + self._advisoryparams:
886 for pname, __ in self._mandatoryparams + self._advisoryparams:
883 if pname in self._seenparams:
887 if pname in self._seenparams:
884 raise error.ProgrammingError('duplicated params: %s' % pname)
888 raise error.ProgrammingError('duplicated params: %s' % pname)
885 self._seenparams.add(pname)
889 self._seenparams.add(pname)
886 # status of the part's generation:
890 # status of the part's generation:
887 # - None: not started,
891 # - None: not started,
888 # - False: currently generated,
892 # - False: currently generated,
889 # - True: generation done.
893 # - True: generation done.
890 self._generated = None
894 self._generated = None
891 self.mandatory = mandatory
895 self.mandatory = mandatory
892
896
893 def __repr__(self):
897 def __repr__(self):
894 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
898 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
895 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
899 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
896 % (cls, id(self), self.id, self.type, self.mandatory))
900 % (cls, id(self), self.id, self.type, self.mandatory))
897
901
898 def copy(self):
902 def copy(self):
899 """return a copy of the part
903 """return a copy of the part
900
904
901 The new part have the very same content but no partid assigned yet.
905 The new part have the very same content but no partid assigned yet.
902 Parts with generated data cannot be copied."""
906 Parts with generated data cannot be copied."""
903 assert not util.safehasattr(self.data, 'next')
907 assert not util.safehasattr(self.data, 'next')
904 return self.__class__(self.type, self._mandatoryparams,
908 return self.__class__(self.type, self._mandatoryparams,
905 self._advisoryparams, self._data, self.mandatory)
909 self._advisoryparams, self._data, self.mandatory)
906
910
907 # methods used to defines the part content
911 # methods used to defines the part content
908 @property
912 @property
909 def data(self):
913 def data(self):
910 return self._data
914 return self._data
911
915
912 @data.setter
916 @data.setter
913 def data(self, data):
917 def data(self, data):
914 if self._generated is not None:
918 if self._generated is not None:
915 raise error.ReadOnlyPartError('part is being generated')
919 raise error.ReadOnlyPartError('part is being generated')
916 self._data = data
920 self._data = data
917
921
918 @property
922 @property
919 def mandatoryparams(self):
923 def mandatoryparams(self):
920 # make it an immutable tuple to force people through ``addparam``
924 # make it an immutable tuple to force people through ``addparam``
921 return tuple(self._mandatoryparams)
925 return tuple(self._mandatoryparams)
922
926
923 @property
927 @property
924 def advisoryparams(self):
928 def advisoryparams(self):
925 # make it an immutable tuple to force people through ``addparam``
929 # make it an immutable tuple to force people through ``addparam``
926 return tuple(self._advisoryparams)
930 return tuple(self._advisoryparams)
927
931
928 def addparam(self, name, value='', mandatory=True):
932 def addparam(self, name, value='', mandatory=True):
929 """add a parameter to the part
933 """add a parameter to the part
930
934
931 If 'mandatory' is set to True, the remote handler must claim support
935 If 'mandatory' is set to True, the remote handler must claim support
932 for this parameter or the unbundling will be aborted.
936 for this parameter or the unbundling will be aborted.
933
937
934 The 'name' and 'value' cannot exceed 255 bytes each.
938 The 'name' and 'value' cannot exceed 255 bytes each.
935 """
939 """
936 if self._generated is not None:
940 if self._generated is not None:
937 raise error.ReadOnlyPartError('part is being generated')
941 raise error.ReadOnlyPartError('part is being generated')
938 if name in self._seenparams:
942 if name in self._seenparams:
939 raise ValueError('duplicated params: %s' % name)
943 raise ValueError('duplicated params: %s' % name)
940 self._seenparams.add(name)
944 self._seenparams.add(name)
941 params = self._advisoryparams
945 params = self._advisoryparams
942 if mandatory:
946 if mandatory:
943 params = self._mandatoryparams
947 params = self._mandatoryparams
944 params.append((name, value))
948 params.append((name, value))
945
949
946 # methods used to generates the bundle2 stream
950 # methods used to generates the bundle2 stream
947 def getchunks(self, ui):
951 def getchunks(self, ui):
948 if self._generated is not None:
952 if self._generated is not None:
949 raise error.ProgrammingError('part can only be consumed once')
953 raise error.ProgrammingError('part can only be consumed once')
950 self._generated = False
954 self._generated = False
951
955
952 if ui.debugflag:
956 if ui.debugflag:
953 msg = ['bundle2-output-part: "%s"' % self.type]
957 msg = ['bundle2-output-part: "%s"' % self.type]
954 if not self.mandatory:
958 if not self.mandatory:
955 msg.append(' (advisory)')
959 msg.append(' (advisory)')
956 nbmp = len(self.mandatoryparams)
960 nbmp = len(self.mandatoryparams)
957 nbap = len(self.advisoryparams)
961 nbap = len(self.advisoryparams)
958 if nbmp or nbap:
962 if nbmp or nbap:
959 msg.append(' (params:')
963 msg.append(' (params:')
960 if nbmp:
964 if nbmp:
961 msg.append(' %i mandatory' % nbmp)
965 msg.append(' %i mandatory' % nbmp)
962 if nbap:
966 if nbap:
963 msg.append(' %i advisory' % nbmp)
967 msg.append(' %i advisory' % nbmp)
964 msg.append(')')
968 msg.append(')')
965 if not self.data:
969 if not self.data:
966 msg.append(' empty payload')
970 msg.append(' empty payload')
967 elif util.safehasattr(self.data, 'next'):
971 elif util.safehasattr(self.data, 'next'):
968 msg.append(' streamed payload')
972 msg.append(' streamed payload')
969 else:
973 else:
970 msg.append(' %i bytes payload' % len(self.data))
974 msg.append(' %i bytes payload' % len(self.data))
971 msg.append('\n')
975 msg.append('\n')
972 ui.debug(''.join(msg))
976 ui.debug(''.join(msg))
973
977
974 #### header
978 #### header
975 if self.mandatory:
979 if self.mandatory:
976 parttype = self.type.upper()
980 parttype = self.type.upper()
977 else:
981 else:
978 parttype = self.type.lower()
982 parttype = self.type.lower()
979 outdebug(ui, 'part %s: "%s"' % (self.id, parttype))
983 outdebug(ui, 'part %s: "%s"' % (self.id, parttype))
980 ## parttype
984 ## parttype
981 header = [_pack(_fparttypesize, len(parttype)),
985 header = [_pack(_fparttypesize, len(parttype)),
982 parttype, _pack(_fpartid, self.id),
986 parttype, _pack(_fpartid, self.id),
983 ]
987 ]
984 ## parameters
988 ## parameters
985 # count
989 # count
986 manpar = self.mandatoryparams
990 manpar = self.mandatoryparams
987 advpar = self.advisoryparams
991 advpar = self.advisoryparams
988 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
992 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
989 # size
993 # size
990 parsizes = []
994 parsizes = []
991 for key, value in manpar:
995 for key, value in manpar:
992 parsizes.append(len(key))
996 parsizes.append(len(key))
993 parsizes.append(len(value))
997 parsizes.append(len(value))
994 for key, value in advpar:
998 for key, value in advpar:
995 parsizes.append(len(key))
999 parsizes.append(len(key))
996 parsizes.append(len(value))
1000 parsizes.append(len(value))
997 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
1001 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
998 header.append(paramsizes)
1002 header.append(paramsizes)
999 # key, value
1003 # key, value
1000 for key, value in manpar:
1004 for key, value in manpar:
1001 header.append(key)
1005 header.append(key)
1002 header.append(value)
1006 header.append(value)
1003 for key, value in advpar:
1007 for key, value in advpar:
1004 header.append(key)
1008 header.append(key)
1005 header.append(value)
1009 header.append(value)
1006 ## finalize header
1010 ## finalize header
1007 headerchunk = ''.join(header)
1011 headerchunk = ''.join(header)
1008 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
1012 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
1009 yield _pack(_fpartheadersize, len(headerchunk))
1013 yield _pack(_fpartheadersize, len(headerchunk))
1010 yield headerchunk
1014 yield headerchunk
1011 ## payload
1015 ## payload
1012 try:
1016 try:
1013 for chunk in self._payloadchunks():
1017 for chunk in self._payloadchunks():
1014 outdebug(ui, 'payload chunk size: %i' % len(chunk))
1018 outdebug(ui, 'payload chunk size: %i' % len(chunk))
1015 yield _pack(_fpayloadsize, len(chunk))
1019 yield _pack(_fpayloadsize, len(chunk))
1016 yield chunk
1020 yield chunk
1017 except GeneratorExit:
1021 except GeneratorExit:
1018 # GeneratorExit means that nobody is listening for our
1022 # GeneratorExit means that nobody is listening for our
1019 # results anyway, so just bail quickly rather than trying
1023 # results anyway, so just bail quickly rather than trying
1020 # to produce an error part.
1024 # to produce an error part.
1021 ui.debug('bundle2-generatorexit\n')
1025 ui.debug('bundle2-generatorexit\n')
1022 raise
1026 raise
1023 except BaseException as exc:
1027 except BaseException as exc:
1024 # backup exception data for later
1028 # backup exception data for later
1025 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
1029 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
1026 % exc)
1030 % exc)
1027 tb = sys.exc_info()[2]
1031 tb = sys.exc_info()[2]
1028 msg = 'unexpected error: %s' % exc
1032 msg = 'unexpected error: %s' % exc
1029 interpart = bundlepart('error:abort', [('message', msg)],
1033 interpart = bundlepart('error:abort', [('message', msg)],
1030 mandatory=False)
1034 mandatory=False)
1031 interpart.id = 0
1035 interpart.id = 0
1032 yield _pack(_fpayloadsize, -1)
1036 yield _pack(_fpayloadsize, -1)
1033 for chunk in interpart.getchunks(ui=ui):
1037 for chunk in interpart.getchunks(ui=ui):
1034 yield chunk
1038 yield chunk
1035 outdebug(ui, 'closing payload chunk')
1039 outdebug(ui, 'closing payload chunk')
1036 # abort current part payload
1040 # abort current part payload
1037 yield _pack(_fpayloadsize, 0)
1041 yield _pack(_fpayloadsize, 0)
1038 pycompat.raisewithtb(exc, tb)
1042 pycompat.raisewithtb(exc, tb)
1039 # end of payload
1043 # end of payload
1040 outdebug(ui, 'closing payload chunk')
1044 outdebug(ui, 'closing payload chunk')
1041 yield _pack(_fpayloadsize, 0)
1045 yield _pack(_fpayloadsize, 0)
1042 self._generated = True
1046 self._generated = True
1043
1047
1044 def _payloadchunks(self):
1048 def _payloadchunks(self):
1045 """yield chunks of a the part payload
1049 """yield chunks of a the part payload
1046
1050
1047 Exists to handle the different methods to provide data to a part."""
1051 Exists to handle the different methods to provide data to a part."""
1048 # we only support fixed size data now.
1052 # we only support fixed size data now.
1049 # This will be improved in the future.
1053 # This will be improved in the future.
1050 if util.safehasattr(self.data, 'next'):
1054 if util.safehasattr(self.data, 'next'):
1051 buff = util.chunkbuffer(self.data)
1055 buff = util.chunkbuffer(self.data)
1052 chunk = buff.read(preferedchunksize)
1056 chunk = buff.read(preferedchunksize)
1053 while chunk:
1057 while chunk:
1054 yield chunk
1058 yield chunk
1055 chunk = buff.read(preferedchunksize)
1059 chunk = buff.read(preferedchunksize)
1056 elif len(self.data):
1060 elif len(self.data):
1057 yield self.data
1061 yield self.data
1058
1062
1059
1063
1060 flaginterrupt = -1
1064 flaginterrupt = -1
1061
1065
1062 class interrupthandler(unpackermixin):
1066 class interrupthandler(unpackermixin):
1063 """read one part and process it with restricted capability
1067 """read one part and process it with restricted capability
1064
1068
1065 This allows to transmit exception raised on the producer size during part
1069 This allows to transmit exception raised on the producer size during part
1066 iteration while the consumer is reading a part.
1070 iteration while the consumer is reading a part.
1067
1071
1068 Part processed in this manner only have access to a ui object,"""
1072 Part processed in this manner only have access to a ui object,"""
1069
1073
1070 def __init__(self, ui, fp):
1074 def __init__(self, ui, fp):
1071 super(interrupthandler, self).__init__(fp)
1075 super(interrupthandler, self).__init__(fp)
1072 self.ui = ui
1076 self.ui = ui
1073
1077
1074 def _readpartheader(self):
1078 def _readpartheader(self):
1075 """reads a part header size and return the bytes blob
1079 """reads a part header size and return the bytes blob
1076
1080
1077 returns None if empty"""
1081 returns None if empty"""
1078 headersize = self._unpack(_fpartheadersize)[0]
1082 headersize = self._unpack(_fpartheadersize)[0]
1079 if headersize < 0:
1083 if headersize < 0:
1080 raise error.BundleValueError('negative part header size: %i'
1084 raise error.BundleValueError('negative part header size: %i'
1081 % headersize)
1085 % headersize)
1082 indebug(self.ui, 'part header size: %i\n' % headersize)
1086 indebug(self.ui, 'part header size: %i\n' % headersize)
1083 if headersize:
1087 if headersize:
1084 return self._readexact(headersize)
1088 return self._readexact(headersize)
1085 return None
1089 return None
1086
1090
1087 def __call__(self):
1091 def __call__(self):
1088
1092
1089 self.ui.debug('bundle2-input-stream-interrupt:'
1093 self.ui.debug('bundle2-input-stream-interrupt:'
1090 ' opening out of band context\n')
1094 ' opening out of band context\n')
1091 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1095 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1092 headerblock = self._readpartheader()
1096 headerblock = self._readpartheader()
1093 if headerblock is None:
1097 if headerblock is None:
1094 indebug(self.ui, 'no part found during interruption.')
1098 indebug(self.ui, 'no part found during interruption.')
1095 return
1099 return
1096 part = unbundlepart(self.ui, headerblock, self._fp)
1100 part = unbundlepart(self.ui, headerblock, self._fp)
1097 op = interruptoperation(self.ui)
1101 op = interruptoperation(self.ui)
1098 _processpart(op, part)
1102 _processpart(op, part)
1099 self.ui.debug('bundle2-input-stream-interrupt:'
1103 self.ui.debug('bundle2-input-stream-interrupt:'
1100 ' closing out of band context\n')
1104 ' closing out of band context\n')
1101
1105
1102 class interruptoperation(object):
1106 class interruptoperation(object):
1103 """A limited operation to be use by part handler during interruption
1107 """A limited operation to be use by part handler during interruption
1104
1108
1105 It only have access to an ui object.
1109 It only have access to an ui object.
1106 """
1110 """
1107
1111
1108 def __init__(self, ui):
1112 def __init__(self, ui):
1109 self.ui = ui
1113 self.ui = ui
1110 self.reply = None
1114 self.reply = None
1111 self.captureoutput = False
1115 self.captureoutput = False
1112
1116
1113 @property
1117 @property
1114 def repo(self):
1118 def repo(self):
1115 raise error.ProgrammingError('no repo access from stream interruption')
1119 raise error.ProgrammingError('no repo access from stream interruption')
1116
1120
1117 def gettransaction(self):
1121 def gettransaction(self):
1118 raise TransactionUnavailable('no repo access from stream interruption')
1122 raise TransactionUnavailable('no repo access from stream interruption')
1119
1123
1120 class unbundlepart(unpackermixin):
1124 class unbundlepart(unpackermixin):
1121 """a bundle part read from a bundle"""
1125 """a bundle part read from a bundle"""
1122
1126
1123 def __init__(self, ui, header, fp):
1127 def __init__(self, ui, header, fp):
1124 super(unbundlepart, self).__init__(fp)
1128 super(unbundlepart, self).__init__(fp)
1125 self._seekable = (util.safehasattr(fp, 'seek') and
1129 self._seekable = (util.safehasattr(fp, 'seek') and
1126 util.safehasattr(fp, 'tell'))
1130 util.safehasattr(fp, 'tell'))
1127 self.ui = ui
1131 self.ui = ui
1128 # unbundle state attr
1132 # unbundle state attr
1129 self._headerdata = header
1133 self._headerdata = header
1130 self._headeroffset = 0
1134 self._headeroffset = 0
1131 self._initialized = False
1135 self._initialized = False
1132 self.consumed = False
1136 self.consumed = False
1133 # part data
1137 # part data
1134 self.id = None
1138 self.id = None
1135 self.type = None
1139 self.type = None
1136 self.mandatoryparams = None
1140 self.mandatoryparams = None
1137 self.advisoryparams = None
1141 self.advisoryparams = None
1138 self.params = None
1142 self.params = None
1139 self.mandatorykeys = ()
1143 self.mandatorykeys = ()
1140 self._payloadstream = None
1144 self._payloadstream = None
1141 self._readheader()
1145 self._readheader()
1142 self._mandatory = None
1146 self._mandatory = None
1143 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1147 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1144 self._pos = 0
1148 self._pos = 0
1145
1149
1146 def _fromheader(self, size):
1150 def _fromheader(self, size):
1147 """return the next <size> byte from the header"""
1151 """return the next <size> byte from the header"""
1148 offset = self._headeroffset
1152 offset = self._headeroffset
1149 data = self._headerdata[offset:(offset + size)]
1153 data = self._headerdata[offset:(offset + size)]
1150 self._headeroffset = offset + size
1154 self._headeroffset = offset + size
1151 return data
1155 return data
1152
1156
1153 def _unpackheader(self, format):
1157 def _unpackheader(self, format):
1154 """read given format from header
1158 """read given format from header
1155
1159
1156 This automatically compute the size of the format to read."""
1160 This automatically compute the size of the format to read."""
1157 data = self._fromheader(struct.calcsize(format))
1161 data = self._fromheader(struct.calcsize(format))
1158 return _unpack(format, data)
1162 return _unpack(format, data)
1159
1163
1160 def _initparams(self, mandatoryparams, advisoryparams):
1164 def _initparams(self, mandatoryparams, advisoryparams):
1161 """internal function to setup all logic related parameters"""
1165 """internal function to setup all logic related parameters"""
1162 # make it read only to prevent people touching it by mistake.
1166 # make it read only to prevent people touching it by mistake.
1163 self.mandatoryparams = tuple(mandatoryparams)
1167 self.mandatoryparams = tuple(mandatoryparams)
1164 self.advisoryparams = tuple(advisoryparams)
1168 self.advisoryparams = tuple(advisoryparams)
1165 # user friendly UI
1169 # user friendly UI
1166 self.params = util.sortdict(self.mandatoryparams)
1170 self.params = util.sortdict(self.mandatoryparams)
1167 self.params.update(self.advisoryparams)
1171 self.params.update(self.advisoryparams)
1168 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1172 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1169
1173
1170 def _payloadchunks(self, chunknum=0):
1174 def _payloadchunks(self, chunknum=0):
1171 '''seek to specified chunk and start yielding data'''
1175 '''seek to specified chunk and start yielding data'''
1172 if len(self._chunkindex) == 0:
1176 if len(self._chunkindex) == 0:
1173 assert chunknum == 0, 'Must start with chunk 0'
1177 assert chunknum == 0, 'Must start with chunk 0'
1174 self._chunkindex.append((0, self._tellfp()))
1178 self._chunkindex.append((0, self._tellfp()))
1175 else:
1179 else:
1176 assert chunknum < len(self._chunkindex), \
1180 assert chunknum < len(self._chunkindex), \
1177 'Unknown chunk %d' % chunknum
1181 'Unknown chunk %d' % chunknum
1178 self._seekfp(self._chunkindex[chunknum][1])
1182 self._seekfp(self._chunkindex[chunknum][1])
1179
1183
1180 pos = self._chunkindex[chunknum][0]
1184 pos = self._chunkindex[chunknum][0]
1181 payloadsize = self._unpack(_fpayloadsize)[0]
1185 payloadsize = self._unpack(_fpayloadsize)[0]
1182 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1186 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1183 while payloadsize:
1187 while payloadsize:
1184 if payloadsize == flaginterrupt:
1188 if payloadsize == flaginterrupt:
1185 # interruption detection, the handler will now read a
1189 # interruption detection, the handler will now read a
1186 # single part and process it.
1190 # single part and process it.
1187 interrupthandler(self.ui, self._fp)()
1191 interrupthandler(self.ui, self._fp)()
1188 elif payloadsize < 0:
1192 elif payloadsize < 0:
1189 msg = 'negative payload chunk size: %i' % payloadsize
1193 msg = 'negative payload chunk size: %i' % payloadsize
1190 raise error.BundleValueError(msg)
1194 raise error.BundleValueError(msg)
1191 else:
1195 else:
1192 result = self._readexact(payloadsize)
1196 result = self._readexact(payloadsize)
1193 chunknum += 1
1197 chunknum += 1
1194 pos += payloadsize
1198 pos += payloadsize
1195 if chunknum == len(self._chunkindex):
1199 if chunknum == len(self._chunkindex):
1196 self._chunkindex.append((pos, self._tellfp()))
1200 self._chunkindex.append((pos, self._tellfp()))
1197 yield result
1201 yield result
1198 payloadsize = self._unpack(_fpayloadsize)[0]
1202 payloadsize = self._unpack(_fpayloadsize)[0]
1199 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1203 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1200
1204
1201 def _findchunk(self, pos):
1205 def _findchunk(self, pos):
1202 '''for a given payload position, return a chunk number and offset'''
1206 '''for a given payload position, return a chunk number and offset'''
1203 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1207 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1204 if ppos == pos:
1208 if ppos == pos:
1205 return chunk, 0
1209 return chunk, 0
1206 elif ppos > pos:
1210 elif ppos > pos:
1207 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1211 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1208 raise ValueError('Unknown chunk')
1212 raise ValueError('Unknown chunk')
1209
1213
1210 def _readheader(self):
1214 def _readheader(self):
1211 """read the header and setup the object"""
1215 """read the header and setup the object"""
1212 typesize = self._unpackheader(_fparttypesize)[0]
1216 typesize = self._unpackheader(_fparttypesize)[0]
1213 self.type = self._fromheader(typesize)
1217 self.type = self._fromheader(typesize)
1214 indebug(self.ui, 'part type: "%s"' % self.type)
1218 indebug(self.ui, 'part type: "%s"' % self.type)
1215 self.id = self._unpackheader(_fpartid)[0]
1219 self.id = self._unpackheader(_fpartid)[0]
1216 indebug(self.ui, 'part id: "%s"' % self.id)
1220 indebug(self.ui, 'part id: "%s"' % self.id)
1217 # extract mandatory bit from type
1221 # extract mandatory bit from type
1218 self.mandatory = (self.type != self.type.lower())
1222 self.mandatory = (self.type != self.type.lower())
1219 self.type = self.type.lower()
1223 self.type = self.type.lower()
1220 ## reading parameters
1224 ## reading parameters
1221 # param count
1225 # param count
1222 mancount, advcount = self._unpackheader(_fpartparamcount)
1226 mancount, advcount = self._unpackheader(_fpartparamcount)
1223 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1227 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1224 # param size
1228 # param size
1225 fparamsizes = _makefpartparamsizes(mancount + advcount)
1229 fparamsizes = _makefpartparamsizes(mancount + advcount)
1226 paramsizes = self._unpackheader(fparamsizes)
1230 paramsizes = self._unpackheader(fparamsizes)
1227 # make it a list of couple again
1231 # make it a list of couple again
1228 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
1232 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
1229 # split mandatory from advisory
1233 # split mandatory from advisory
1230 mansizes = paramsizes[:mancount]
1234 mansizes = paramsizes[:mancount]
1231 advsizes = paramsizes[mancount:]
1235 advsizes = paramsizes[mancount:]
1232 # retrieve param value
1236 # retrieve param value
1233 manparams = []
1237 manparams = []
1234 for key, value in mansizes:
1238 for key, value in mansizes:
1235 manparams.append((self._fromheader(key), self._fromheader(value)))
1239 manparams.append((self._fromheader(key), self._fromheader(value)))
1236 advparams = []
1240 advparams = []
1237 for key, value in advsizes:
1241 for key, value in advsizes:
1238 advparams.append((self._fromheader(key), self._fromheader(value)))
1242 advparams.append((self._fromheader(key), self._fromheader(value)))
1239 self._initparams(manparams, advparams)
1243 self._initparams(manparams, advparams)
1240 ## part payload
1244 ## part payload
1241 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1245 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1242 # we read the data, tell it
1246 # we read the data, tell it
1243 self._initialized = True
1247 self._initialized = True
1244
1248
1245 def read(self, size=None):
1249 def read(self, size=None):
1246 """read payload data"""
1250 """read payload data"""
1247 if not self._initialized:
1251 if not self._initialized:
1248 self._readheader()
1252 self._readheader()
1249 if size is None:
1253 if size is None:
1250 data = self._payloadstream.read()
1254 data = self._payloadstream.read()
1251 else:
1255 else:
1252 data = self._payloadstream.read(size)
1256 data = self._payloadstream.read(size)
1253 self._pos += len(data)
1257 self._pos += len(data)
1254 if size is None or len(data) < size:
1258 if size is None or len(data) < size:
1255 if not self.consumed and self._pos:
1259 if not self.consumed and self._pos:
1256 self.ui.debug('bundle2-input-part: total payload size %i\n'
1260 self.ui.debug('bundle2-input-part: total payload size %i\n'
1257 % self._pos)
1261 % self._pos)
1258 self.consumed = True
1262 self.consumed = True
1259 return data
1263 return data
1260
1264
1261 def tell(self):
1265 def tell(self):
1262 return self._pos
1266 return self._pos
1263
1267
1264 def seek(self, offset, whence=0):
1268 def seek(self, offset, whence=0):
1265 if whence == 0:
1269 if whence == 0:
1266 newpos = offset
1270 newpos = offset
1267 elif whence == 1:
1271 elif whence == 1:
1268 newpos = self._pos + offset
1272 newpos = self._pos + offset
1269 elif whence == 2:
1273 elif whence == 2:
1270 if not self.consumed:
1274 if not self.consumed:
1271 self.read()
1275 self.read()
1272 newpos = self._chunkindex[-1][0] - offset
1276 newpos = self._chunkindex[-1][0] - offset
1273 else:
1277 else:
1274 raise ValueError('Unknown whence value: %r' % (whence,))
1278 raise ValueError('Unknown whence value: %r' % (whence,))
1275
1279
1276 if newpos > self._chunkindex[-1][0] and not self.consumed:
1280 if newpos > self._chunkindex[-1][0] and not self.consumed:
1277 self.read()
1281 self.read()
1278 if not 0 <= newpos <= self._chunkindex[-1][0]:
1282 if not 0 <= newpos <= self._chunkindex[-1][0]:
1279 raise ValueError('Offset out of range')
1283 raise ValueError('Offset out of range')
1280
1284
1281 if self._pos != newpos:
1285 if self._pos != newpos:
1282 chunk, internaloffset = self._findchunk(newpos)
1286 chunk, internaloffset = self._findchunk(newpos)
1283 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1287 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1284 adjust = self.read(internaloffset)
1288 adjust = self.read(internaloffset)
1285 if len(adjust) != internaloffset:
1289 if len(adjust) != internaloffset:
1286 raise error.Abort(_('Seek failed\n'))
1290 raise error.Abort(_('Seek failed\n'))
1287 self._pos = newpos
1291 self._pos = newpos
1288
1292
1289 def _seekfp(self, offset, whence=0):
1293 def _seekfp(self, offset, whence=0):
1290 """move the underlying file pointer
1294 """move the underlying file pointer
1291
1295
1292 This method is meant for internal usage by the bundle2 protocol only.
1296 This method is meant for internal usage by the bundle2 protocol only.
1293 They directly manipulate the low level stream including bundle2 level
1297 They directly manipulate the low level stream including bundle2 level
1294 instruction.
1298 instruction.
1295
1299
1296 Do not use it to implement higher-level logic or methods."""
1300 Do not use it to implement higher-level logic or methods."""
1297 if self._seekable:
1301 if self._seekable:
1298 return self._fp.seek(offset, whence)
1302 return self._fp.seek(offset, whence)
1299 else:
1303 else:
1300 raise NotImplementedError(_('File pointer is not seekable'))
1304 raise NotImplementedError(_('File pointer is not seekable'))
1301
1305
1302 def _tellfp(self):
1306 def _tellfp(self):
1303 """return the file offset, or None if file is not seekable
1307 """return the file offset, or None if file is not seekable
1304
1308
1305 This method is meant for internal usage by the bundle2 protocol only.
1309 This method is meant for internal usage by the bundle2 protocol only.
1306 They directly manipulate the low level stream including bundle2 level
1310 They directly manipulate the low level stream including bundle2 level
1307 instruction.
1311 instruction.
1308
1312
1309 Do not use it to implement higher-level logic or methods."""
1313 Do not use it to implement higher-level logic or methods."""
1310 if self._seekable:
1314 if self._seekable:
1311 try:
1315 try:
1312 return self._fp.tell()
1316 return self._fp.tell()
1313 except IOError as e:
1317 except IOError as e:
1314 if e.errno == errno.ESPIPE:
1318 if e.errno == errno.ESPIPE:
1315 self._seekable = False
1319 self._seekable = False
1316 else:
1320 else:
1317 raise
1321 raise
1318 return None
1322 return None
1319
1323
1320 # These are only the static capabilities.
1324 # These are only the static capabilities.
1321 # Check the 'getrepocaps' function for the rest.
1325 # Check the 'getrepocaps' function for the rest.
1322 capabilities = {'HG20': (),
1326 capabilities = {'HG20': (),
1323 'error': ('abort', 'unsupportedcontent', 'pushraced',
1327 'error': ('abort', 'unsupportedcontent', 'pushraced',
1324 'pushkey'),
1328 'pushkey'),
1325 'listkeys': (),
1329 'listkeys': (),
1326 'pushkey': (),
1330 'pushkey': (),
1327 'digests': tuple(sorted(util.DIGESTS.keys())),
1331 'digests': tuple(sorted(util.DIGESTS.keys())),
1328 'remote-changegroup': ('http', 'https'),
1332 'remote-changegroup': ('http', 'https'),
1329 'hgtagsfnodes': (),
1333 'hgtagsfnodes': (),
1330 }
1334 }
1331
1335
1332 def getrepocaps(repo, allowpushback=False):
1336 def getrepocaps(repo, allowpushback=False):
1333 """return the bundle2 capabilities for a given repo
1337 """return the bundle2 capabilities for a given repo
1334
1338
1335 Exists to allow extensions (like evolution) to mutate the capabilities.
1339 Exists to allow extensions (like evolution) to mutate the capabilities.
1336 """
1340 """
1337 caps = capabilities.copy()
1341 caps = capabilities.copy()
1338 caps['changegroup'] = tuple(sorted(
1342 caps['changegroup'] = tuple(sorted(
1339 changegroup.supportedincomingversions(repo)))
1343 changegroup.supportedincomingversions(repo)))
1340 if obsolete.isenabled(repo, obsolete.exchangeopt):
1344 if obsolete.isenabled(repo, obsolete.exchangeopt):
1341 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1345 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1342 caps['obsmarkers'] = supportedformat
1346 caps['obsmarkers'] = supportedformat
1343 if allowpushback:
1347 if allowpushback:
1344 caps['pushback'] = ()
1348 caps['pushback'] = ()
1345 cpmode = repo.ui.config('server', 'concurrent-push-mode')
1349 cpmode = repo.ui.config('server', 'concurrent-push-mode')
1346 if cpmode == 'check-related':
1350 if cpmode == 'check-related':
1347 caps['checkheads'] = ('related',)
1351 caps['checkheads'] = ('related',)
1348 return caps
1352 return caps
1349
1353
1350 def bundle2caps(remote):
1354 def bundle2caps(remote):
1351 """return the bundle capabilities of a peer as dict"""
1355 """return the bundle capabilities of a peer as dict"""
1352 raw = remote.capable('bundle2')
1356 raw = remote.capable('bundle2')
1353 if not raw and raw != '':
1357 if not raw and raw != '':
1354 return {}
1358 return {}
1355 capsblob = urlreq.unquote(remote.capable('bundle2'))
1359 capsblob = urlreq.unquote(remote.capable('bundle2'))
1356 return decodecaps(capsblob)
1360 return decodecaps(capsblob)
1357
1361
1358 def obsmarkersversion(caps):
1362 def obsmarkersversion(caps):
1359 """extract the list of supported obsmarkers versions from a bundle2caps dict
1363 """extract the list of supported obsmarkers versions from a bundle2caps dict
1360 """
1364 """
1361 obscaps = caps.get('obsmarkers', ())
1365 obscaps = caps.get('obsmarkers', ())
1362 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1366 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1363
1367
1364 def writenewbundle(ui, repo, source, filename, bundletype, outgoing, opts,
1368 def writenewbundle(ui, repo, source, filename, bundletype, outgoing, opts,
1365 vfs=None, compression=None, compopts=None):
1369 vfs=None, compression=None, compopts=None):
1366 if bundletype.startswith('HG10'):
1370 if bundletype.startswith('HG10'):
1367 cg = changegroup.getchangegroup(repo, source, outgoing, version='01')
1371 cg = changegroup.getchangegroup(repo, source, outgoing, version='01')
1368 return writebundle(ui, cg, filename, bundletype, vfs=vfs,
1372 return writebundle(ui, cg, filename, bundletype, vfs=vfs,
1369 compression=compression, compopts=compopts)
1373 compression=compression, compopts=compopts)
1370 elif not bundletype.startswith('HG20'):
1374 elif not bundletype.startswith('HG20'):
1371 raise error.ProgrammingError('unknown bundle type: %s' % bundletype)
1375 raise error.ProgrammingError('unknown bundle type: %s' % bundletype)
1372
1376
1373 caps = {}
1377 caps = {}
1374 if 'obsolescence' in opts:
1378 if 'obsolescence' in opts:
1375 caps['obsmarkers'] = ('V1',)
1379 caps['obsmarkers'] = ('V1',)
1376 bundle = bundle20(ui, caps)
1380 bundle = bundle20(ui, caps)
1377 bundle.setcompression(compression, compopts)
1381 bundle.setcompression(compression, compopts)
1378 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1382 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1379 chunkiter = bundle.getchunks()
1383 chunkiter = bundle.getchunks()
1380
1384
1381 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1385 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1382
1386
1383 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1387 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1384 # We should eventually reconcile this logic with the one behind
1388 # We should eventually reconcile this logic with the one behind
1385 # 'exchange.getbundle2partsgenerator'.
1389 # 'exchange.getbundle2partsgenerator'.
1386 #
1390 #
1387 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1391 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1388 # different right now. So we keep them separated for now for the sake of
1392 # different right now. So we keep them separated for now for the sake of
1389 # simplicity.
1393 # simplicity.
1390
1394
1391 # we always want a changegroup in such bundle
1395 # we always want a changegroup in such bundle
1392 cgversion = opts.get('cg.version')
1396 cgversion = opts.get('cg.version')
1393 if cgversion is None:
1397 if cgversion is None:
1394 cgversion = changegroup.safeversion(repo)
1398 cgversion = changegroup.safeversion(repo)
1395 cg = changegroup.getchangegroup(repo, source, outgoing,
1399 cg = changegroup.getchangegroup(repo, source, outgoing,
1396 version=cgversion)
1400 version=cgversion)
1397 part = bundler.newpart('changegroup', data=cg.getchunks())
1401 part = bundler.newpart('changegroup', data=cg.getchunks())
1398 part.addparam('version', cg.version)
1402 part.addparam('version', cg.version)
1399 if 'clcount' in cg.extras:
1403 if 'clcount' in cg.extras:
1400 part.addparam('nbchanges', str(cg.extras['clcount']),
1404 part.addparam('nbchanges', str(cg.extras['clcount']),
1401 mandatory=False)
1405 mandatory=False)
1402 if opts.get('phases') and repo.revs('%ln and secret()',
1406 if opts.get('phases') and repo.revs('%ln and secret()',
1403 outgoing.missingheads):
1407 outgoing.missingheads):
1404 part.addparam('targetphase', '%d' % phases.secret, mandatory=False)
1408 part.addparam('targetphase', '%d' % phases.secret, mandatory=False)
1405
1409
1406 addparttagsfnodescache(repo, bundler, outgoing)
1410 addparttagsfnodescache(repo, bundler, outgoing)
1407
1411
1408 if opts.get('obsolescence', False):
1412 if opts.get('obsolescence', False):
1409 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1413 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1410 buildobsmarkerspart(bundler, obsmarkers)
1414 buildobsmarkerspart(bundler, obsmarkers)
1411
1415
1412 if opts.get('phases', False):
1416 if opts.get('phases', False):
1413 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1417 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1414 phasedata = []
1418 phasedata = []
1415 for phase in phases.allphases:
1419 for phase in phases.allphases:
1416 for head in headsbyphase[phase]:
1420 for head in headsbyphase[phase]:
1417 phasedata.append(_pack(_fphasesentry, phase, head))
1421 phasedata.append(_pack(_fphasesentry, phase, head))
1418 bundler.newpart('phase-heads', data=''.join(phasedata))
1422 bundler.newpart('phase-heads', data=''.join(phasedata))
1419
1423
1420 def addparttagsfnodescache(repo, bundler, outgoing):
1424 def addparttagsfnodescache(repo, bundler, outgoing):
1421 # we include the tags fnode cache for the bundle changeset
1425 # we include the tags fnode cache for the bundle changeset
1422 # (as an optional parts)
1426 # (as an optional parts)
1423 cache = tags.hgtagsfnodescache(repo.unfiltered())
1427 cache = tags.hgtagsfnodescache(repo.unfiltered())
1424 chunks = []
1428 chunks = []
1425
1429
1426 # .hgtags fnodes are only relevant for head changesets. While we could
1430 # .hgtags fnodes are only relevant for head changesets. While we could
1427 # transfer values for all known nodes, there will likely be little to
1431 # transfer values for all known nodes, there will likely be little to
1428 # no benefit.
1432 # no benefit.
1429 #
1433 #
1430 # We don't bother using a generator to produce output data because
1434 # We don't bother using a generator to produce output data because
1431 # a) we only have 40 bytes per head and even esoteric numbers of heads
1435 # a) we only have 40 bytes per head and even esoteric numbers of heads
1432 # consume little memory (1M heads is 40MB) b) we don't want to send the
1436 # consume little memory (1M heads is 40MB) b) we don't want to send the
1433 # part if we don't have entries and knowing if we have entries requires
1437 # part if we don't have entries and knowing if we have entries requires
1434 # cache lookups.
1438 # cache lookups.
1435 for node in outgoing.missingheads:
1439 for node in outgoing.missingheads:
1436 # Don't compute missing, as this may slow down serving.
1440 # Don't compute missing, as this may slow down serving.
1437 fnode = cache.getfnode(node, computemissing=False)
1441 fnode = cache.getfnode(node, computemissing=False)
1438 if fnode is not None:
1442 if fnode is not None:
1439 chunks.extend([node, fnode])
1443 chunks.extend([node, fnode])
1440
1444
1441 if chunks:
1445 if chunks:
1442 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1446 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1443
1447
1444 def buildobsmarkerspart(bundler, markers):
1448 def buildobsmarkerspart(bundler, markers):
1445 """add an obsmarker part to the bundler with <markers>
1449 """add an obsmarker part to the bundler with <markers>
1446
1450
1447 No part is created if markers is empty.
1451 No part is created if markers is empty.
1448 Raises ValueError if the bundler doesn't support any known obsmarker format.
1452 Raises ValueError if the bundler doesn't support any known obsmarker format.
1449 """
1453 """
1450 if not markers:
1454 if not markers:
1451 return None
1455 return None
1452
1456
1453 remoteversions = obsmarkersversion(bundler.capabilities)
1457 remoteversions = obsmarkersversion(bundler.capabilities)
1454 version = obsolete.commonversion(remoteversions)
1458 version = obsolete.commonversion(remoteversions)
1455 if version is None:
1459 if version is None:
1456 raise ValueError('bundler does not support common obsmarker format')
1460 raise ValueError('bundler does not support common obsmarker format')
1457 stream = obsolete.encodemarkers(markers, True, version=version)
1461 stream = obsolete.encodemarkers(markers, True, version=version)
1458 return bundler.newpart('obsmarkers', data=stream)
1462 return bundler.newpart('obsmarkers', data=stream)
1459
1463
1460 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1464 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1461 compopts=None):
1465 compopts=None):
1462 """Write a bundle file and return its filename.
1466 """Write a bundle file and return its filename.
1463
1467
1464 Existing files will not be overwritten.
1468 Existing files will not be overwritten.
1465 If no filename is specified, a temporary file is created.
1469 If no filename is specified, a temporary file is created.
1466 bz2 compression can be turned off.
1470 bz2 compression can be turned off.
1467 The bundle file will be deleted in case of errors.
1471 The bundle file will be deleted in case of errors.
1468 """
1472 """
1469
1473
1470 if bundletype == "HG20":
1474 if bundletype == "HG20":
1471 bundle = bundle20(ui)
1475 bundle = bundle20(ui)
1472 bundle.setcompression(compression, compopts)
1476 bundle.setcompression(compression, compopts)
1473 part = bundle.newpart('changegroup', data=cg.getchunks())
1477 part = bundle.newpart('changegroup', data=cg.getchunks())
1474 part.addparam('version', cg.version)
1478 part.addparam('version', cg.version)
1475 if 'clcount' in cg.extras:
1479 if 'clcount' in cg.extras:
1476 part.addparam('nbchanges', str(cg.extras['clcount']),
1480 part.addparam('nbchanges', str(cg.extras['clcount']),
1477 mandatory=False)
1481 mandatory=False)
1478 chunkiter = bundle.getchunks()
1482 chunkiter = bundle.getchunks()
1479 else:
1483 else:
1480 # compression argument is only for the bundle2 case
1484 # compression argument is only for the bundle2 case
1481 assert compression is None
1485 assert compression is None
1482 if cg.version != '01':
1486 if cg.version != '01':
1483 raise error.Abort(_('old bundle types only supports v1 '
1487 raise error.Abort(_('old bundle types only supports v1 '
1484 'changegroups'))
1488 'changegroups'))
1485 header, comp = bundletypes[bundletype]
1489 header, comp = bundletypes[bundletype]
1486 if comp not in util.compengines.supportedbundletypes:
1490 if comp not in util.compengines.supportedbundletypes:
1487 raise error.Abort(_('unknown stream compression type: %s')
1491 raise error.Abort(_('unknown stream compression type: %s')
1488 % comp)
1492 % comp)
1489 compengine = util.compengines.forbundletype(comp)
1493 compengine = util.compengines.forbundletype(comp)
1490 def chunkiter():
1494 def chunkiter():
1491 yield header
1495 yield header
1492 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1496 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1493 yield chunk
1497 yield chunk
1494 chunkiter = chunkiter()
1498 chunkiter = chunkiter()
1495
1499
1496 # parse the changegroup data, otherwise we will block
1500 # parse the changegroup data, otherwise we will block
1497 # in case of sshrepo because we don't know the end of the stream
1501 # in case of sshrepo because we don't know the end of the stream
1498 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1502 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1499
1503
1500 def combinechangegroupresults(op):
1504 def combinechangegroupresults(op):
1501 """logic to combine 0 or more addchangegroup results into one"""
1505 """logic to combine 0 or more addchangegroup results into one"""
1502 results = [r.get('return', 0)
1506 results = [r.get('return', 0)
1503 for r in op.records['changegroup']]
1507 for r in op.records['changegroup']]
1504 changedheads = 0
1508 changedheads = 0
1505 result = 1
1509 result = 1
1506 for ret in results:
1510 for ret in results:
1507 # If any changegroup result is 0, return 0
1511 # If any changegroup result is 0, return 0
1508 if ret == 0:
1512 if ret == 0:
1509 result = 0
1513 result = 0
1510 break
1514 break
1511 if ret < -1:
1515 if ret < -1:
1512 changedheads += ret + 1
1516 changedheads += ret + 1
1513 elif ret > 1:
1517 elif ret > 1:
1514 changedheads += ret - 1
1518 changedheads += ret - 1
1515 if changedheads > 0:
1519 if changedheads > 0:
1516 result = 1 + changedheads
1520 result = 1 + changedheads
1517 elif changedheads < 0:
1521 elif changedheads < 0:
1518 result = -1 + changedheads
1522 result = -1 + changedheads
1519 return result
1523 return result
1520
1524
1521 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest',
1525 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest',
1522 'targetphase'))
1526 'targetphase'))
1523 def handlechangegroup(op, inpart):
1527 def handlechangegroup(op, inpart):
1524 """apply a changegroup part on the repo
1528 """apply a changegroup part on the repo
1525
1529
1526 This is a very early implementation that will massive rework before being
1530 This is a very early implementation that will massive rework before being
1527 inflicted to any end-user.
1531 inflicted to any end-user.
1528 """
1532 """
1529 tr = op.gettransaction()
1533 tr = op.gettransaction()
1530 unpackerversion = inpart.params.get('version', '01')
1534 unpackerversion = inpart.params.get('version', '01')
1531 # We should raise an appropriate exception here
1535 # We should raise an appropriate exception here
1532 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1536 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1533 # the source and url passed here are overwritten by the one contained in
1537 # the source and url passed here are overwritten by the one contained in
1534 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1538 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1535 nbchangesets = None
1539 nbchangesets = None
1536 if 'nbchanges' in inpart.params:
1540 if 'nbchanges' in inpart.params:
1537 nbchangesets = int(inpart.params.get('nbchanges'))
1541 nbchangesets = int(inpart.params.get('nbchanges'))
1538 if ('treemanifest' in inpart.params and
1542 if ('treemanifest' in inpart.params and
1539 'treemanifest' not in op.repo.requirements):
1543 'treemanifest' not in op.repo.requirements):
1540 if len(op.repo.changelog) != 0:
1544 if len(op.repo.changelog) != 0:
1541 raise error.Abort(_(
1545 raise error.Abort(_(
1542 "bundle contains tree manifests, but local repo is "
1546 "bundle contains tree manifests, but local repo is "
1543 "non-empty and does not use tree manifests"))
1547 "non-empty and does not use tree manifests"))
1544 op.repo.requirements.add('treemanifest')
1548 op.repo.requirements.add('treemanifest')
1545 op.repo._applyopenerreqs()
1549 op.repo._applyopenerreqs()
1546 op.repo._writerequirements()
1550 op.repo._writerequirements()
1547 extrakwargs = {}
1551 extrakwargs = {}
1548 targetphase = inpart.params.get('targetphase')
1552 targetphase = inpart.params.get('targetphase')
1549 if targetphase is not None:
1553 if targetphase is not None:
1550 extrakwargs['targetphase'] = int(targetphase)
1554 extrakwargs['targetphase'] = int(targetphase)
1551 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2',
1555 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2',
1552 expectedtotal=nbchangesets, **extrakwargs)
1556 expectedtotal=nbchangesets, **extrakwargs)
1553 if op.reply is not None:
1557 if op.reply is not None:
1554 # This is definitely not the final form of this
1558 # This is definitely not the final form of this
1555 # return. But one need to start somewhere.
1559 # return. But one need to start somewhere.
1556 part = op.reply.newpart('reply:changegroup', mandatory=False)
1560 part = op.reply.newpart('reply:changegroup', mandatory=False)
1557 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1561 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1558 part.addparam('return', '%i' % ret, mandatory=False)
1562 part.addparam('return', '%i' % ret, mandatory=False)
1559 assert not inpart.read()
1563 assert not inpart.read()
1560
1564
1561 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1565 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1562 ['digest:%s' % k for k in util.DIGESTS.keys()])
1566 ['digest:%s' % k for k in util.DIGESTS.keys()])
1563 @parthandler('remote-changegroup', _remotechangegroupparams)
1567 @parthandler('remote-changegroup', _remotechangegroupparams)
1564 def handleremotechangegroup(op, inpart):
1568 def handleremotechangegroup(op, inpart):
1565 """apply a bundle10 on the repo, given an url and validation information
1569 """apply a bundle10 on the repo, given an url and validation information
1566
1570
1567 All the information about the remote bundle to import are given as
1571 All the information about the remote bundle to import are given as
1568 parameters. The parameters include:
1572 parameters. The parameters include:
1569 - url: the url to the bundle10.
1573 - url: the url to the bundle10.
1570 - size: the bundle10 file size. It is used to validate what was
1574 - size: the bundle10 file size. It is used to validate what was
1571 retrieved by the client matches the server knowledge about the bundle.
1575 retrieved by the client matches the server knowledge about the bundle.
1572 - digests: a space separated list of the digest types provided as
1576 - digests: a space separated list of the digest types provided as
1573 parameters.
1577 parameters.
1574 - digest:<digest-type>: the hexadecimal representation of the digest with
1578 - digest:<digest-type>: the hexadecimal representation of the digest with
1575 that name. Like the size, it is used to validate what was retrieved by
1579 that name. Like the size, it is used to validate what was retrieved by
1576 the client matches what the server knows about the bundle.
1580 the client matches what the server knows about the bundle.
1577
1581
1578 When multiple digest types are given, all of them are checked.
1582 When multiple digest types are given, all of them are checked.
1579 """
1583 """
1580 try:
1584 try:
1581 raw_url = inpart.params['url']
1585 raw_url = inpart.params['url']
1582 except KeyError:
1586 except KeyError:
1583 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1587 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1584 parsed_url = util.url(raw_url)
1588 parsed_url = util.url(raw_url)
1585 if parsed_url.scheme not in capabilities['remote-changegroup']:
1589 if parsed_url.scheme not in capabilities['remote-changegroup']:
1586 raise error.Abort(_('remote-changegroup does not support %s urls') %
1590 raise error.Abort(_('remote-changegroup does not support %s urls') %
1587 parsed_url.scheme)
1591 parsed_url.scheme)
1588
1592
1589 try:
1593 try:
1590 size = int(inpart.params['size'])
1594 size = int(inpart.params['size'])
1591 except ValueError:
1595 except ValueError:
1592 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1596 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1593 % 'size')
1597 % 'size')
1594 except KeyError:
1598 except KeyError:
1595 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1599 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1596
1600
1597 digests = {}
1601 digests = {}
1598 for typ in inpart.params.get('digests', '').split():
1602 for typ in inpart.params.get('digests', '').split():
1599 param = 'digest:%s' % typ
1603 param = 'digest:%s' % typ
1600 try:
1604 try:
1601 value = inpart.params[param]
1605 value = inpart.params[param]
1602 except KeyError:
1606 except KeyError:
1603 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1607 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1604 param)
1608 param)
1605 digests[typ] = value
1609 digests[typ] = value
1606
1610
1607 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1611 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1608
1612
1609 tr = op.gettransaction()
1613 tr = op.gettransaction()
1610 from . import exchange
1614 from . import exchange
1611 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1615 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1612 if not isinstance(cg, changegroup.cg1unpacker):
1616 if not isinstance(cg, changegroup.cg1unpacker):
1613 raise error.Abort(_('%s: not a bundle version 1.0') %
1617 raise error.Abort(_('%s: not a bundle version 1.0') %
1614 util.hidepassword(raw_url))
1618 util.hidepassword(raw_url))
1615 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2')
1619 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2')
1616 if op.reply is not None:
1620 if op.reply is not None:
1617 # This is definitely not the final form of this
1621 # This is definitely not the final form of this
1618 # return. But one need to start somewhere.
1622 # return. But one need to start somewhere.
1619 part = op.reply.newpart('reply:changegroup')
1623 part = op.reply.newpart('reply:changegroup')
1620 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1624 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1621 part.addparam('return', '%i' % ret, mandatory=False)
1625 part.addparam('return', '%i' % ret, mandatory=False)
1622 try:
1626 try:
1623 real_part.validate()
1627 real_part.validate()
1624 except error.Abort as e:
1628 except error.Abort as e:
1625 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1629 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1626 (util.hidepassword(raw_url), str(e)))
1630 (util.hidepassword(raw_url), str(e)))
1627 assert not inpart.read()
1631 assert not inpart.read()
1628
1632
1629 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1633 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1630 def handlereplychangegroup(op, inpart):
1634 def handlereplychangegroup(op, inpart):
1631 ret = int(inpart.params['return'])
1635 ret = int(inpart.params['return'])
1632 replyto = int(inpart.params['in-reply-to'])
1636 replyto = int(inpart.params['in-reply-to'])
1633 op.records.add('changegroup', {'return': ret}, replyto)
1637 op.records.add('changegroup', {'return': ret}, replyto)
1634
1638
1635 @parthandler('check:heads')
1639 @parthandler('check:heads')
1636 def handlecheckheads(op, inpart):
1640 def handlecheckheads(op, inpart):
1637 """check that head of the repo did not change
1641 """check that head of the repo did not change
1638
1642
1639 This is used to detect a push race when using unbundle.
1643 This is used to detect a push race when using unbundle.
1640 This replaces the "heads" argument of unbundle."""
1644 This replaces the "heads" argument of unbundle."""
1641 h = inpart.read(20)
1645 h = inpart.read(20)
1642 heads = []
1646 heads = []
1643 while len(h) == 20:
1647 while len(h) == 20:
1644 heads.append(h)
1648 heads.append(h)
1645 h = inpart.read(20)
1649 h = inpart.read(20)
1646 assert not h
1650 assert not h
1647 # Trigger a transaction so that we are guaranteed to have the lock now.
1651 # Trigger a transaction so that we are guaranteed to have the lock now.
1648 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1652 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1649 op.gettransaction()
1653 op.gettransaction()
1650 if sorted(heads) != sorted(op.repo.heads()):
1654 if sorted(heads) != sorted(op.repo.heads()):
1651 raise error.PushRaced('repository changed while pushing - '
1655 raise error.PushRaced('repository changed while pushing - '
1652 'please try again')
1656 'please try again')
1653
1657
1654 @parthandler('check:updated-heads')
1658 @parthandler('check:updated-heads')
1655 def handlecheckupdatedheads(op, inpart):
1659 def handlecheckupdatedheads(op, inpart):
1656 """check for race on the heads touched by a push
1660 """check for race on the heads touched by a push
1657
1661
1658 This is similar to 'check:heads' but focus on the heads actually updated
1662 This is similar to 'check:heads' but focus on the heads actually updated
1659 during the push. If other activities happen on unrelated heads, it is
1663 during the push. If other activities happen on unrelated heads, it is
1660 ignored.
1664 ignored.
1661
1665
1662 This allow server with high traffic to avoid push contention as long as
1666 This allow server with high traffic to avoid push contention as long as
1663 unrelated parts of the graph are involved."""
1667 unrelated parts of the graph are involved."""
1664 h = inpart.read(20)
1668 h = inpart.read(20)
1665 heads = []
1669 heads = []
1666 while len(h) == 20:
1670 while len(h) == 20:
1667 heads.append(h)
1671 heads.append(h)
1668 h = inpart.read(20)
1672 h = inpart.read(20)
1669 assert not h
1673 assert not h
1670 # trigger a transaction so that we are guaranteed to have the lock now.
1674 # trigger a transaction so that we are guaranteed to have the lock now.
1671 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1675 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1672 op.gettransaction()
1676 op.gettransaction()
1673
1677
1674 currentheads = set()
1678 currentheads = set()
1675 for ls in op.repo.branchmap().itervalues():
1679 for ls in op.repo.branchmap().itervalues():
1676 currentheads.update(ls)
1680 currentheads.update(ls)
1677
1681
1678 for h in heads:
1682 for h in heads:
1679 if h not in currentheads:
1683 if h not in currentheads:
1680 raise error.PushRaced('repository changed while pushing - '
1684 raise error.PushRaced('repository changed while pushing - '
1681 'please try again')
1685 'please try again')
1682
1686
1683 @parthandler('output')
1687 @parthandler('output')
1684 def handleoutput(op, inpart):
1688 def handleoutput(op, inpart):
1685 """forward output captured on the server to the client"""
1689 """forward output captured on the server to the client"""
1686 for line in inpart.read().splitlines():
1690 for line in inpart.read().splitlines():
1687 op.ui.status(_('remote: %s\n') % line)
1691 op.ui.status(_('remote: %s\n') % line)
1688
1692
1689 @parthandler('replycaps')
1693 @parthandler('replycaps')
1690 def handlereplycaps(op, inpart):
1694 def handlereplycaps(op, inpart):
1691 """Notify that a reply bundle should be created
1695 """Notify that a reply bundle should be created
1692
1696
1693 The payload contains the capabilities information for the reply"""
1697 The payload contains the capabilities information for the reply"""
1694 caps = decodecaps(inpart.read())
1698 caps = decodecaps(inpart.read())
1695 if op.reply is None:
1699 if op.reply is None:
1696 op.reply = bundle20(op.ui, caps)
1700 op.reply = bundle20(op.ui, caps)
1697
1701
1698 class AbortFromPart(error.Abort):
1702 class AbortFromPart(error.Abort):
1699 """Sub-class of Abort that denotes an error from a bundle2 part."""
1703 """Sub-class of Abort that denotes an error from a bundle2 part."""
1700
1704
1701 @parthandler('error:abort', ('message', 'hint'))
1705 @parthandler('error:abort', ('message', 'hint'))
1702 def handleerrorabort(op, inpart):
1706 def handleerrorabort(op, inpart):
1703 """Used to transmit abort error over the wire"""
1707 """Used to transmit abort error over the wire"""
1704 raise AbortFromPart(inpart.params['message'],
1708 raise AbortFromPart(inpart.params['message'],
1705 hint=inpart.params.get('hint'))
1709 hint=inpart.params.get('hint'))
1706
1710
1707 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1711 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1708 'in-reply-to'))
1712 'in-reply-to'))
1709 def handleerrorpushkey(op, inpart):
1713 def handleerrorpushkey(op, inpart):
1710 """Used to transmit failure of a mandatory pushkey over the wire"""
1714 """Used to transmit failure of a mandatory pushkey over the wire"""
1711 kwargs = {}
1715 kwargs = {}
1712 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1716 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1713 value = inpart.params.get(name)
1717 value = inpart.params.get(name)
1714 if value is not None:
1718 if value is not None:
1715 kwargs[name] = value
1719 kwargs[name] = value
1716 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1720 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1717
1721
1718 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1722 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1719 def handleerrorunsupportedcontent(op, inpart):
1723 def handleerrorunsupportedcontent(op, inpart):
1720 """Used to transmit unknown content error over the wire"""
1724 """Used to transmit unknown content error over the wire"""
1721 kwargs = {}
1725 kwargs = {}
1722 parttype = inpart.params.get('parttype')
1726 parttype = inpart.params.get('parttype')
1723 if parttype is not None:
1727 if parttype is not None:
1724 kwargs['parttype'] = parttype
1728 kwargs['parttype'] = parttype
1725 params = inpart.params.get('params')
1729 params = inpart.params.get('params')
1726 if params is not None:
1730 if params is not None:
1727 kwargs['params'] = params.split('\0')
1731 kwargs['params'] = params.split('\0')
1728
1732
1729 raise error.BundleUnknownFeatureError(**kwargs)
1733 raise error.BundleUnknownFeatureError(**kwargs)
1730
1734
1731 @parthandler('error:pushraced', ('message',))
1735 @parthandler('error:pushraced', ('message',))
1732 def handleerrorpushraced(op, inpart):
1736 def handleerrorpushraced(op, inpart):
1733 """Used to transmit push race error over the wire"""
1737 """Used to transmit push race error over the wire"""
1734 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1738 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1735
1739
1736 @parthandler('listkeys', ('namespace',))
1740 @parthandler('listkeys', ('namespace',))
1737 def handlelistkeys(op, inpart):
1741 def handlelistkeys(op, inpart):
1738 """retrieve pushkey namespace content stored in a bundle2"""
1742 """retrieve pushkey namespace content stored in a bundle2"""
1739 namespace = inpart.params['namespace']
1743 namespace = inpart.params['namespace']
1740 r = pushkey.decodekeys(inpart.read())
1744 r = pushkey.decodekeys(inpart.read())
1741 op.records.add('listkeys', (namespace, r))
1745 op.records.add('listkeys', (namespace, r))
1742
1746
1743 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1747 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1744 def handlepushkey(op, inpart):
1748 def handlepushkey(op, inpart):
1745 """process a pushkey request"""
1749 """process a pushkey request"""
1746 dec = pushkey.decode
1750 dec = pushkey.decode
1747 namespace = dec(inpart.params['namespace'])
1751 namespace = dec(inpart.params['namespace'])
1748 key = dec(inpart.params['key'])
1752 key = dec(inpart.params['key'])
1749 old = dec(inpart.params['old'])
1753 old = dec(inpart.params['old'])
1750 new = dec(inpart.params['new'])
1754 new = dec(inpart.params['new'])
1751 # Grab the transaction to ensure that we have the lock before performing the
1755 # Grab the transaction to ensure that we have the lock before performing the
1752 # pushkey.
1756 # pushkey.
1753 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1757 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1754 op.gettransaction()
1758 op.gettransaction()
1755 ret = op.repo.pushkey(namespace, key, old, new)
1759 ret = op.repo.pushkey(namespace, key, old, new)
1756 record = {'namespace': namespace,
1760 record = {'namespace': namespace,
1757 'key': key,
1761 'key': key,
1758 'old': old,
1762 'old': old,
1759 'new': new}
1763 'new': new}
1760 op.records.add('pushkey', record)
1764 op.records.add('pushkey', record)
1761 if op.reply is not None:
1765 if op.reply is not None:
1762 rpart = op.reply.newpart('reply:pushkey')
1766 rpart = op.reply.newpart('reply:pushkey')
1763 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1767 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1764 rpart.addparam('return', '%i' % ret, mandatory=False)
1768 rpart.addparam('return', '%i' % ret, mandatory=False)
1765 if inpart.mandatory and not ret:
1769 if inpart.mandatory and not ret:
1766 kwargs = {}
1770 kwargs = {}
1767 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1771 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1768 if key in inpart.params:
1772 if key in inpart.params:
1769 kwargs[key] = inpart.params[key]
1773 kwargs[key] = inpart.params[key]
1770 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1774 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1771
1775
1772 def _readphaseheads(inpart):
1776 def _readphaseheads(inpart):
1773 headsbyphase = [[] for i in phases.allphases]
1777 headsbyphase = [[] for i in phases.allphases]
1774 entrysize = struct.calcsize(_fphasesentry)
1778 entrysize = struct.calcsize(_fphasesentry)
1775 while True:
1779 while True:
1776 entry = inpart.read(entrysize)
1780 entry = inpart.read(entrysize)
1777 if len(entry) < entrysize:
1781 if len(entry) < entrysize:
1778 if entry:
1782 if entry:
1779 raise error.Abort(_('bad phase-heads bundle part'))
1783 raise error.Abort(_('bad phase-heads bundle part'))
1780 break
1784 break
1781 phase, node = struct.unpack(_fphasesentry, entry)
1785 phase, node = struct.unpack(_fphasesentry, entry)
1782 headsbyphase[phase].append(node)
1786 headsbyphase[phase].append(node)
1783 return headsbyphase
1787 return headsbyphase
1784
1788
1785 @parthandler('phase-heads')
1789 @parthandler('phase-heads')
1786 def handlephases(op, inpart):
1790 def handlephases(op, inpart):
1787 """apply phases from bundle part to repo"""
1791 """apply phases from bundle part to repo"""
1788 headsbyphase = _readphaseheads(inpart)
1792 headsbyphase = _readphaseheads(inpart)
1789 phases.updatephases(op.repo.unfiltered(), op.gettransaction(), headsbyphase)
1793 phases.updatephases(op.repo.unfiltered(), op.gettransaction(), headsbyphase)
1790
1794
1791 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1795 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1792 def handlepushkeyreply(op, inpart):
1796 def handlepushkeyreply(op, inpart):
1793 """retrieve the result of a pushkey request"""
1797 """retrieve the result of a pushkey request"""
1794 ret = int(inpart.params['return'])
1798 ret = int(inpart.params['return'])
1795 partid = int(inpart.params['in-reply-to'])
1799 partid = int(inpart.params['in-reply-to'])
1796 op.records.add('pushkey', {'return': ret}, partid)
1800 op.records.add('pushkey', {'return': ret}, partid)
1797
1801
1798 @parthandler('obsmarkers')
1802 @parthandler('obsmarkers')
1799 def handleobsmarker(op, inpart):
1803 def handleobsmarker(op, inpart):
1800 """add a stream of obsmarkers to the repo"""
1804 """add a stream of obsmarkers to the repo"""
1801 tr = op.gettransaction()
1805 tr = op.gettransaction()
1802 markerdata = inpart.read()
1806 markerdata = inpart.read()
1803 if op.ui.config('experimental', 'obsmarkers-exchange-debug'):
1807 if op.ui.config('experimental', 'obsmarkers-exchange-debug'):
1804 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1808 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1805 % len(markerdata))
1809 % len(markerdata))
1806 # The mergemarkers call will crash if marker creation is not enabled.
1810 # The mergemarkers call will crash if marker creation is not enabled.
1807 # we want to avoid this if the part is advisory.
1811 # we want to avoid this if the part is advisory.
1808 if not inpart.mandatory and op.repo.obsstore.readonly:
1812 if not inpart.mandatory and op.repo.obsstore.readonly:
1809 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1813 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1810 return
1814 return
1811 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1815 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1812 op.repo.invalidatevolatilesets()
1816 op.repo.invalidatevolatilesets()
1813 if new:
1817 if new:
1814 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1818 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1815 op.records.add('obsmarkers', {'new': new})
1819 op.records.add('obsmarkers', {'new': new})
1816 if op.reply is not None:
1820 if op.reply is not None:
1817 rpart = op.reply.newpart('reply:obsmarkers')
1821 rpart = op.reply.newpart('reply:obsmarkers')
1818 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1822 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1819 rpart.addparam('new', '%i' % new, mandatory=False)
1823 rpart.addparam('new', '%i' % new, mandatory=False)
1820
1824
1821
1825
1822 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1826 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1823 def handleobsmarkerreply(op, inpart):
1827 def handleobsmarkerreply(op, inpart):
1824 """retrieve the result of a pushkey request"""
1828 """retrieve the result of a pushkey request"""
1825 ret = int(inpart.params['new'])
1829 ret = int(inpart.params['new'])
1826 partid = int(inpart.params['in-reply-to'])
1830 partid = int(inpart.params['in-reply-to'])
1827 op.records.add('obsmarkers', {'new': ret}, partid)
1831 op.records.add('obsmarkers', {'new': ret}, partid)
1828
1832
1829 @parthandler('hgtagsfnodes')
1833 @parthandler('hgtagsfnodes')
1830 def handlehgtagsfnodes(op, inpart):
1834 def handlehgtagsfnodes(op, inpart):
1831 """Applies .hgtags fnodes cache entries to the local repo.
1835 """Applies .hgtags fnodes cache entries to the local repo.
1832
1836
1833 Payload is pairs of 20 byte changeset nodes and filenodes.
1837 Payload is pairs of 20 byte changeset nodes and filenodes.
1834 """
1838 """
1835 # Grab the transaction so we ensure that we have the lock at this point.
1839 # Grab the transaction so we ensure that we have the lock at this point.
1836 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1840 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1837 op.gettransaction()
1841 op.gettransaction()
1838 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1842 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1839
1843
1840 count = 0
1844 count = 0
1841 while True:
1845 while True:
1842 node = inpart.read(20)
1846 node = inpart.read(20)
1843 fnode = inpart.read(20)
1847 fnode = inpart.read(20)
1844 if len(node) < 20 or len(fnode) < 20:
1848 if len(node) < 20 or len(fnode) < 20:
1845 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1849 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1846 break
1850 break
1847 cache.setfnode(node, fnode)
1851 cache.setfnode(node, fnode)
1848 count += 1
1852 count += 1
1849
1853
1850 cache.write()
1854 cache.write()
1851 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
1855 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
General Comments 0
You need to be logged in to leave comments. Login now