##// END OF EJS Templates
bundle2: use cg?unpacker.apply() instead of changegroup.addchangegroup()
Augie Fackler -
r26698:c94cdeeb default
parent child Browse files
Show More
@@ -1,1528 +1,1527 b''
1 # bundle2.py - generic container format to transmit arbitrary data.
1 # bundle2.py - generic container format to transmit arbitrary data.
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7 """Handling of the new bundle2 format
7 """Handling of the new bundle2 format
8
8
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 payloads in an application agnostic way. It consist in a sequence of "parts"
10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 that will be handed to and processed by the application layer.
11 that will be handed to and processed by the application layer.
12
12
13
13
14 General format architecture
14 General format architecture
15 ===========================
15 ===========================
16
16
17 The format is architectured as follow
17 The format is architectured as follow
18
18
19 - magic string
19 - magic string
20 - stream level parameters
20 - stream level parameters
21 - payload parts (any number)
21 - payload parts (any number)
22 - end of stream marker.
22 - end of stream marker.
23
23
24 the Binary format
24 the Binary format
25 ============================
25 ============================
26
26
27 All numbers are unsigned and big-endian.
27 All numbers are unsigned and big-endian.
28
28
29 stream level parameters
29 stream level parameters
30 ------------------------
30 ------------------------
31
31
32 Binary format is as follow
32 Binary format is as follow
33
33
34 :params size: int32
34 :params size: int32
35
35
36 The total number of Bytes used by the parameters
36 The total number of Bytes used by the parameters
37
37
38 :params value: arbitrary number of Bytes
38 :params value: arbitrary number of Bytes
39
39
40 A blob of `params size` containing the serialized version of all stream level
40 A blob of `params size` containing the serialized version of all stream level
41 parameters.
41 parameters.
42
42
43 The blob contains a space separated list of parameters. Parameters with value
43 The blob contains a space separated list of parameters. Parameters with value
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45
45
46 Empty name are obviously forbidden.
46 Empty name are obviously forbidden.
47
47
48 Name MUST start with a letter. If this first letter is lower case, the
48 Name MUST start with a letter. If this first letter is lower case, the
49 parameter is advisory and can be safely ignored. However when the first
49 parameter is advisory and can be safely ignored. However when the first
50 letter is capital, the parameter is mandatory and the bundling process MUST
50 letter is capital, the parameter is mandatory and the bundling process MUST
51 stop if he is not able to proceed it.
51 stop if he is not able to proceed it.
52
52
53 Stream parameters use a simple textual format for two main reasons:
53 Stream parameters use a simple textual format for two main reasons:
54
54
55 - Stream level parameters should remain simple and we want to discourage any
55 - Stream level parameters should remain simple and we want to discourage any
56 crazy usage.
56 crazy usage.
57 - Textual data allow easy human inspection of a bundle2 header in case of
57 - Textual data allow easy human inspection of a bundle2 header in case of
58 troubles.
58 troubles.
59
59
60 Any Applicative level options MUST go into a bundle2 part instead.
60 Any Applicative level options MUST go into a bundle2 part instead.
61
61
62 Payload part
62 Payload part
63 ------------------------
63 ------------------------
64
64
65 Binary format is as follow
65 Binary format is as follow
66
66
67 :header size: int32
67 :header size: int32
68
68
69 The total number of Bytes used by the part header. When the header is empty
69 The total number of Bytes used by the part header. When the header is empty
70 (size = 0) this is interpreted as the end of stream marker.
70 (size = 0) this is interpreted as the end of stream marker.
71
71
72 :header:
72 :header:
73
73
74 The header defines how to interpret the part. It contains two piece of
74 The header defines how to interpret the part. It contains two piece of
75 data: the part type, and the part parameters.
75 data: the part type, and the part parameters.
76
76
77 The part type is used to route an application level handler, that can
77 The part type is used to route an application level handler, that can
78 interpret payload.
78 interpret payload.
79
79
80 Part parameters are passed to the application level handler. They are
80 Part parameters are passed to the application level handler. They are
81 meant to convey information that will help the application level object to
81 meant to convey information that will help the application level object to
82 interpret the part payload.
82 interpret the part payload.
83
83
84 The binary format of the header is has follow
84 The binary format of the header is has follow
85
85
86 :typesize: (one byte)
86 :typesize: (one byte)
87
87
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89
89
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 to this part.
91 to this part.
92
92
93 :parameters:
93 :parameters:
94
94
95 Part's parameter may have arbitrary content, the binary structure is::
95 Part's parameter may have arbitrary content, the binary structure is::
96
96
97 <mandatory-count><advisory-count><param-sizes><param-data>
97 <mandatory-count><advisory-count><param-sizes><param-data>
98
98
99 :mandatory-count: 1 byte, number of mandatory parameters
99 :mandatory-count: 1 byte, number of mandatory parameters
100
100
101 :advisory-count: 1 byte, number of advisory parameters
101 :advisory-count: 1 byte, number of advisory parameters
102
102
103 :param-sizes:
103 :param-sizes:
104
104
105 N couple of bytes, where N is the total number of parameters. Each
105 N couple of bytes, where N is the total number of parameters. Each
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107
107
108 :param-data:
108 :param-data:
109
109
110 A blob of bytes from which each parameter key and value can be
110 A blob of bytes from which each parameter key and value can be
111 retrieved using the list of size couples stored in the previous
111 retrieved using the list of size couples stored in the previous
112 field.
112 field.
113
113
114 Mandatory parameters comes first, then the advisory ones.
114 Mandatory parameters comes first, then the advisory ones.
115
115
116 Each parameter's key MUST be unique within the part.
116 Each parameter's key MUST be unique within the part.
117
117
118 :payload:
118 :payload:
119
119
120 payload is a series of `<chunksize><chunkdata>`.
120 payload is a series of `<chunksize><chunkdata>`.
121
121
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124
124
125 The current implementation always produces either zero or one chunk.
125 The current implementation always produces either zero or one chunk.
126 This is an implementation limitation that will ultimately be lifted.
126 This is an implementation limitation that will ultimately be lifted.
127
127
128 `chunksize` can be negative to trigger special case processing. No such
128 `chunksize` can be negative to trigger special case processing. No such
129 processing is in place yet.
129 processing is in place yet.
130
130
131 Bundle processing
131 Bundle processing
132 ============================
132 ============================
133
133
134 Each part is processed in order using a "part handler". Handler are registered
134 Each part is processed in order using a "part handler". Handler are registered
135 for a certain part type.
135 for a certain part type.
136
136
137 The matching of a part to its handler is case insensitive. The case of the
137 The matching of a part to its handler is case insensitive. The case of the
138 part type is used to know if a part is mandatory or advisory. If the Part type
138 part type is used to know if a part is mandatory or advisory. If the Part type
139 contains any uppercase char it is considered mandatory. When no handler is
139 contains any uppercase char it is considered mandatory. When no handler is
140 known for a Mandatory part, the process is aborted and an exception is raised.
140 known for a Mandatory part, the process is aborted and an exception is raised.
141 If the part is advisory and no handler is known, the part is ignored. When the
141 If the part is advisory and no handler is known, the part is ignored. When the
142 process is aborted, the full bundle is still read from the stream to keep the
142 process is aborted, the full bundle is still read from the stream to keep the
143 channel usable. But none of the part read from an abort are processed. In the
143 channel usable. But none of the part read from an abort are processed. In the
144 future, dropping the stream may become an option for channel we do not care to
144 future, dropping the stream may become an option for channel we do not care to
145 preserve.
145 preserve.
146 """
146 """
147
147
148 from __future__ import absolute_import
148 from __future__ import absolute_import
149
149
150 import errno
150 import errno
151 import re
151 import re
152 import string
152 import string
153 import struct
153 import struct
154 import sys
154 import sys
155 import urllib
155 import urllib
156
156
157 from .i18n import _
157 from .i18n import _
158 from . import (
158 from . import (
159 changegroup,
159 changegroup,
160 error,
160 error,
161 obsolete,
161 obsolete,
162 pushkey,
162 pushkey,
163 tags,
163 tags,
164 url,
164 url,
165 util,
165 util,
166 )
166 )
167
167
168 _pack = struct.pack
168 _pack = struct.pack
169 _unpack = struct.unpack
169 _unpack = struct.unpack
170
170
171 _fstreamparamsize = '>i'
171 _fstreamparamsize = '>i'
172 _fpartheadersize = '>i'
172 _fpartheadersize = '>i'
173 _fparttypesize = '>B'
173 _fparttypesize = '>B'
174 _fpartid = '>I'
174 _fpartid = '>I'
175 _fpayloadsize = '>i'
175 _fpayloadsize = '>i'
176 _fpartparamcount = '>BB'
176 _fpartparamcount = '>BB'
177
177
178 preferedchunksize = 4096
178 preferedchunksize = 4096
179
179
180 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
180 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
181
181
182 def outdebug(ui, message):
182 def outdebug(ui, message):
183 """debug regarding output stream (bundling)"""
183 """debug regarding output stream (bundling)"""
184 if ui.configbool('devel', 'bundle2.debug', False):
184 if ui.configbool('devel', 'bundle2.debug', False):
185 ui.debug('bundle2-output: %s\n' % message)
185 ui.debug('bundle2-output: %s\n' % message)
186
186
187 def indebug(ui, message):
187 def indebug(ui, message):
188 """debug on input stream (unbundling)"""
188 """debug on input stream (unbundling)"""
189 if ui.configbool('devel', 'bundle2.debug', False):
189 if ui.configbool('devel', 'bundle2.debug', False):
190 ui.debug('bundle2-input: %s\n' % message)
190 ui.debug('bundle2-input: %s\n' % message)
191
191
192 def validateparttype(parttype):
192 def validateparttype(parttype):
193 """raise ValueError if a parttype contains invalid character"""
193 """raise ValueError if a parttype contains invalid character"""
194 if _parttypeforbidden.search(parttype):
194 if _parttypeforbidden.search(parttype):
195 raise ValueError(parttype)
195 raise ValueError(parttype)
196
196
197 def _makefpartparamsizes(nbparams):
197 def _makefpartparamsizes(nbparams):
198 """return a struct format to read part parameter sizes
198 """return a struct format to read part parameter sizes
199
199
200 The number parameters is variable so we need to build that format
200 The number parameters is variable so we need to build that format
201 dynamically.
201 dynamically.
202 """
202 """
203 return '>'+('BB'*nbparams)
203 return '>'+('BB'*nbparams)
204
204
205 parthandlermapping = {}
205 parthandlermapping = {}
206
206
207 def parthandler(parttype, params=()):
207 def parthandler(parttype, params=()):
208 """decorator that register a function as a bundle2 part handler
208 """decorator that register a function as a bundle2 part handler
209
209
210 eg::
210 eg::
211
211
212 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
212 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
213 def myparttypehandler(...):
213 def myparttypehandler(...):
214 '''process a part of type "my part".'''
214 '''process a part of type "my part".'''
215 ...
215 ...
216 """
216 """
217 validateparttype(parttype)
217 validateparttype(parttype)
218 def _decorator(func):
218 def _decorator(func):
219 lparttype = parttype.lower() # enforce lower case matching.
219 lparttype = parttype.lower() # enforce lower case matching.
220 assert lparttype not in parthandlermapping
220 assert lparttype not in parthandlermapping
221 parthandlermapping[lparttype] = func
221 parthandlermapping[lparttype] = func
222 func.params = frozenset(params)
222 func.params = frozenset(params)
223 return func
223 return func
224 return _decorator
224 return _decorator
225
225
226 class unbundlerecords(object):
226 class unbundlerecords(object):
227 """keep record of what happens during and unbundle
227 """keep record of what happens during and unbundle
228
228
229 New records are added using `records.add('cat', obj)`. Where 'cat' is a
229 New records are added using `records.add('cat', obj)`. Where 'cat' is a
230 category of record and obj is an arbitrary object.
230 category of record and obj is an arbitrary object.
231
231
232 `records['cat']` will return all entries of this category 'cat'.
232 `records['cat']` will return all entries of this category 'cat'.
233
233
234 Iterating on the object itself will yield `('category', obj)` tuples
234 Iterating on the object itself will yield `('category', obj)` tuples
235 for all entries.
235 for all entries.
236
236
237 All iterations happens in chronological order.
237 All iterations happens in chronological order.
238 """
238 """
239
239
240 def __init__(self):
240 def __init__(self):
241 self._categories = {}
241 self._categories = {}
242 self._sequences = []
242 self._sequences = []
243 self._replies = {}
243 self._replies = {}
244
244
245 def add(self, category, entry, inreplyto=None):
245 def add(self, category, entry, inreplyto=None):
246 """add a new record of a given category.
246 """add a new record of a given category.
247
247
248 The entry can then be retrieved in the list returned by
248 The entry can then be retrieved in the list returned by
249 self['category']."""
249 self['category']."""
250 self._categories.setdefault(category, []).append(entry)
250 self._categories.setdefault(category, []).append(entry)
251 self._sequences.append((category, entry))
251 self._sequences.append((category, entry))
252 if inreplyto is not None:
252 if inreplyto is not None:
253 self.getreplies(inreplyto).add(category, entry)
253 self.getreplies(inreplyto).add(category, entry)
254
254
255 def getreplies(self, partid):
255 def getreplies(self, partid):
256 """get the records that are replies to a specific part"""
256 """get the records that are replies to a specific part"""
257 return self._replies.setdefault(partid, unbundlerecords())
257 return self._replies.setdefault(partid, unbundlerecords())
258
258
259 def __getitem__(self, cat):
259 def __getitem__(self, cat):
260 return tuple(self._categories.get(cat, ()))
260 return tuple(self._categories.get(cat, ()))
261
261
262 def __iter__(self):
262 def __iter__(self):
263 return iter(self._sequences)
263 return iter(self._sequences)
264
264
265 def __len__(self):
265 def __len__(self):
266 return len(self._sequences)
266 return len(self._sequences)
267
267
268 def __nonzero__(self):
268 def __nonzero__(self):
269 return bool(self._sequences)
269 return bool(self._sequences)
270
270
271 class bundleoperation(object):
271 class bundleoperation(object):
272 """an object that represents a single bundling process
272 """an object that represents a single bundling process
273
273
274 Its purpose is to carry unbundle-related objects and states.
274 Its purpose is to carry unbundle-related objects and states.
275
275
276 A new object should be created at the beginning of each bundle processing.
276 A new object should be created at the beginning of each bundle processing.
277 The object is to be returned by the processing function.
277 The object is to be returned by the processing function.
278
278
279 The object has very little content now it will ultimately contain:
279 The object has very little content now it will ultimately contain:
280 * an access to the repo the bundle is applied to,
280 * an access to the repo the bundle is applied to,
281 * a ui object,
281 * a ui object,
282 * a way to retrieve a transaction to add changes to the repo,
282 * a way to retrieve a transaction to add changes to the repo,
283 * a way to record the result of processing each part,
283 * a way to record the result of processing each part,
284 * a way to construct a bundle response when applicable.
284 * a way to construct a bundle response when applicable.
285 """
285 """
286
286
287 def __init__(self, repo, transactiongetter, captureoutput=True):
287 def __init__(self, repo, transactiongetter, captureoutput=True):
288 self.repo = repo
288 self.repo = repo
289 self.ui = repo.ui
289 self.ui = repo.ui
290 self.records = unbundlerecords()
290 self.records = unbundlerecords()
291 self.gettransaction = transactiongetter
291 self.gettransaction = transactiongetter
292 self.reply = None
292 self.reply = None
293 self.captureoutput = captureoutput
293 self.captureoutput = captureoutput
294
294
295 class TransactionUnavailable(RuntimeError):
295 class TransactionUnavailable(RuntimeError):
296 pass
296 pass
297
297
298 def _notransaction():
298 def _notransaction():
299 """default method to get a transaction while processing a bundle
299 """default method to get a transaction while processing a bundle
300
300
301 Raise an exception to highlight the fact that no transaction was expected
301 Raise an exception to highlight the fact that no transaction was expected
302 to be created"""
302 to be created"""
303 raise TransactionUnavailable()
303 raise TransactionUnavailable()
304
304
305 def processbundle(repo, unbundler, transactiongetter=None, op=None):
305 def processbundle(repo, unbundler, transactiongetter=None, op=None):
306 """This function process a bundle, apply effect to/from a repo
306 """This function process a bundle, apply effect to/from a repo
307
307
308 It iterates over each part then searches for and uses the proper handling
308 It iterates over each part then searches for and uses the proper handling
309 code to process the part. Parts are processed in order.
309 code to process the part. Parts are processed in order.
310
310
311 This is very early version of this function that will be strongly reworked
311 This is very early version of this function that will be strongly reworked
312 before final usage.
312 before final usage.
313
313
314 Unknown Mandatory part will abort the process.
314 Unknown Mandatory part will abort the process.
315
315
316 It is temporarily possible to provide a prebuilt bundleoperation to the
316 It is temporarily possible to provide a prebuilt bundleoperation to the
317 function. This is used to ensure output is properly propagated in case of
317 function. This is used to ensure output is properly propagated in case of
318 an error during the unbundling. This output capturing part will likely be
318 an error during the unbundling. This output capturing part will likely be
319 reworked and this ability will probably go away in the process.
319 reworked and this ability will probably go away in the process.
320 """
320 """
321 if op is None:
321 if op is None:
322 if transactiongetter is None:
322 if transactiongetter is None:
323 transactiongetter = _notransaction
323 transactiongetter = _notransaction
324 op = bundleoperation(repo, transactiongetter)
324 op = bundleoperation(repo, transactiongetter)
325 # todo:
325 # todo:
326 # - replace this is a init function soon.
326 # - replace this is a init function soon.
327 # - exception catching
327 # - exception catching
328 unbundler.params
328 unbundler.params
329 if repo.ui.debugflag:
329 if repo.ui.debugflag:
330 msg = ['bundle2-input-bundle:']
330 msg = ['bundle2-input-bundle:']
331 if unbundler.params:
331 if unbundler.params:
332 msg.append(' %i params')
332 msg.append(' %i params')
333 if op.gettransaction is None:
333 if op.gettransaction is None:
334 msg.append(' no-transaction')
334 msg.append(' no-transaction')
335 else:
335 else:
336 msg.append(' with-transaction')
336 msg.append(' with-transaction')
337 msg.append('\n')
337 msg.append('\n')
338 repo.ui.debug(''.join(msg))
338 repo.ui.debug(''.join(msg))
339 iterparts = enumerate(unbundler.iterparts())
339 iterparts = enumerate(unbundler.iterparts())
340 part = None
340 part = None
341 nbpart = 0
341 nbpart = 0
342 try:
342 try:
343 for nbpart, part in iterparts:
343 for nbpart, part in iterparts:
344 _processpart(op, part)
344 _processpart(op, part)
345 except BaseException as exc:
345 except BaseException as exc:
346 for nbpart, part in iterparts:
346 for nbpart, part in iterparts:
347 # consume the bundle content
347 # consume the bundle content
348 part.seek(0, 2)
348 part.seek(0, 2)
349 # Small hack to let caller code distinguish exceptions from bundle2
349 # Small hack to let caller code distinguish exceptions from bundle2
350 # processing from processing the old format. This is mostly
350 # processing from processing the old format. This is mostly
351 # needed to handle different return codes to unbundle according to the
351 # needed to handle different return codes to unbundle according to the
352 # type of bundle. We should probably clean up or drop this return code
352 # type of bundle. We should probably clean up or drop this return code
353 # craziness in a future version.
353 # craziness in a future version.
354 exc.duringunbundle2 = True
354 exc.duringunbundle2 = True
355 salvaged = []
355 salvaged = []
356 replycaps = None
356 replycaps = None
357 if op.reply is not None:
357 if op.reply is not None:
358 salvaged = op.reply.salvageoutput()
358 salvaged = op.reply.salvageoutput()
359 replycaps = op.reply.capabilities
359 replycaps = op.reply.capabilities
360 exc._replycaps = replycaps
360 exc._replycaps = replycaps
361 exc._bundle2salvagedoutput = salvaged
361 exc._bundle2salvagedoutput = salvaged
362 raise
362 raise
363 finally:
363 finally:
364 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
364 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
365
365
366 return op
366 return op
367
367
368 def _processpart(op, part):
368 def _processpart(op, part):
369 """process a single part from a bundle
369 """process a single part from a bundle
370
370
371 The part is guaranteed to have been fully consumed when the function exits
371 The part is guaranteed to have been fully consumed when the function exits
372 (even if an exception is raised)."""
372 (even if an exception is raised)."""
373 status = 'unknown' # used by debug output
373 status = 'unknown' # used by debug output
374 try:
374 try:
375 try:
375 try:
376 handler = parthandlermapping.get(part.type)
376 handler = parthandlermapping.get(part.type)
377 if handler is None:
377 if handler is None:
378 status = 'unsupported-type'
378 status = 'unsupported-type'
379 raise error.BundleUnknownFeatureError(parttype=part.type)
379 raise error.BundleUnknownFeatureError(parttype=part.type)
380 indebug(op.ui, 'found a handler for part %r' % part.type)
380 indebug(op.ui, 'found a handler for part %r' % part.type)
381 unknownparams = part.mandatorykeys - handler.params
381 unknownparams = part.mandatorykeys - handler.params
382 if unknownparams:
382 if unknownparams:
383 unknownparams = list(unknownparams)
383 unknownparams = list(unknownparams)
384 unknownparams.sort()
384 unknownparams.sort()
385 status = 'unsupported-params (%s)' % unknownparams
385 status = 'unsupported-params (%s)' % unknownparams
386 raise error.BundleUnknownFeatureError(parttype=part.type,
386 raise error.BundleUnknownFeatureError(parttype=part.type,
387 params=unknownparams)
387 params=unknownparams)
388 status = 'supported'
388 status = 'supported'
389 except error.BundleUnknownFeatureError as exc:
389 except error.BundleUnknownFeatureError as exc:
390 if part.mandatory: # mandatory parts
390 if part.mandatory: # mandatory parts
391 raise
391 raise
392 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
392 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
393 return # skip to part processing
393 return # skip to part processing
394 finally:
394 finally:
395 if op.ui.debugflag:
395 if op.ui.debugflag:
396 msg = ['bundle2-input-part: "%s"' % part.type]
396 msg = ['bundle2-input-part: "%s"' % part.type]
397 if not part.mandatory:
397 if not part.mandatory:
398 msg.append(' (advisory)')
398 msg.append(' (advisory)')
399 nbmp = len(part.mandatorykeys)
399 nbmp = len(part.mandatorykeys)
400 nbap = len(part.params) - nbmp
400 nbap = len(part.params) - nbmp
401 if nbmp or nbap:
401 if nbmp or nbap:
402 msg.append(' (params:')
402 msg.append(' (params:')
403 if nbmp:
403 if nbmp:
404 msg.append(' %i mandatory' % nbmp)
404 msg.append(' %i mandatory' % nbmp)
405 if nbap:
405 if nbap:
406 msg.append(' %i advisory' % nbmp)
406 msg.append(' %i advisory' % nbmp)
407 msg.append(')')
407 msg.append(')')
408 msg.append(' %s\n' % status)
408 msg.append(' %s\n' % status)
409 op.ui.debug(''.join(msg))
409 op.ui.debug(''.join(msg))
410
410
411 # handler is called outside the above try block so that we don't
411 # handler is called outside the above try block so that we don't
412 # risk catching KeyErrors from anything other than the
412 # risk catching KeyErrors from anything other than the
413 # parthandlermapping lookup (any KeyError raised by handler()
413 # parthandlermapping lookup (any KeyError raised by handler()
414 # itself represents a defect of a different variety).
414 # itself represents a defect of a different variety).
415 output = None
415 output = None
416 if op.captureoutput and op.reply is not None:
416 if op.captureoutput and op.reply is not None:
417 op.ui.pushbuffer(error=True, subproc=True)
417 op.ui.pushbuffer(error=True, subproc=True)
418 output = ''
418 output = ''
419 try:
419 try:
420 handler(op, part)
420 handler(op, part)
421 finally:
421 finally:
422 if output is not None:
422 if output is not None:
423 output = op.ui.popbuffer()
423 output = op.ui.popbuffer()
424 if output:
424 if output:
425 outpart = op.reply.newpart('output', data=output,
425 outpart = op.reply.newpart('output', data=output,
426 mandatory=False)
426 mandatory=False)
427 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
427 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
428 finally:
428 finally:
429 # consume the part content to not corrupt the stream.
429 # consume the part content to not corrupt the stream.
430 part.seek(0, 2)
430 part.seek(0, 2)
431
431
432
432
433 def decodecaps(blob):
433 def decodecaps(blob):
434 """decode a bundle2 caps bytes blob into a dictionary
434 """decode a bundle2 caps bytes blob into a dictionary
435
435
436 The blob is a list of capabilities (one per line)
436 The blob is a list of capabilities (one per line)
437 Capabilities may have values using a line of the form::
437 Capabilities may have values using a line of the form::
438
438
439 capability=value1,value2,value3
439 capability=value1,value2,value3
440
440
441 The values are always a list."""
441 The values are always a list."""
442 caps = {}
442 caps = {}
443 for line in blob.splitlines():
443 for line in blob.splitlines():
444 if not line:
444 if not line:
445 continue
445 continue
446 if '=' not in line:
446 if '=' not in line:
447 key, vals = line, ()
447 key, vals = line, ()
448 else:
448 else:
449 key, vals = line.split('=', 1)
449 key, vals = line.split('=', 1)
450 vals = vals.split(',')
450 vals = vals.split(',')
451 key = urllib.unquote(key)
451 key = urllib.unquote(key)
452 vals = [urllib.unquote(v) for v in vals]
452 vals = [urllib.unquote(v) for v in vals]
453 caps[key] = vals
453 caps[key] = vals
454 return caps
454 return caps
455
455
456 def encodecaps(caps):
456 def encodecaps(caps):
457 """encode a bundle2 caps dictionary into a bytes blob"""
457 """encode a bundle2 caps dictionary into a bytes blob"""
458 chunks = []
458 chunks = []
459 for ca in sorted(caps):
459 for ca in sorted(caps):
460 vals = caps[ca]
460 vals = caps[ca]
461 ca = urllib.quote(ca)
461 ca = urllib.quote(ca)
462 vals = [urllib.quote(v) for v in vals]
462 vals = [urllib.quote(v) for v in vals]
463 if vals:
463 if vals:
464 ca = "%s=%s" % (ca, ','.join(vals))
464 ca = "%s=%s" % (ca, ','.join(vals))
465 chunks.append(ca)
465 chunks.append(ca)
466 return '\n'.join(chunks)
466 return '\n'.join(chunks)
467
467
468 class bundle20(object):
468 class bundle20(object):
469 """represent an outgoing bundle2 container
469 """represent an outgoing bundle2 container
470
470
471 Use the `addparam` method to add stream level parameter. and `newpart` to
471 Use the `addparam` method to add stream level parameter. and `newpart` to
472 populate it. Then call `getchunks` to retrieve all the binary chunks of
472 populate it. Then call `getchunks` to retrieve all the binary chunks of
473 data that compose the bundle2 container."""
473 data that compose the bundle2 container."""
474
474
475 _magicstring = 'HG20'
475 _magicstring = 'HG20'
476
476
477 def __init__(self, ui, capabilities=()):
477 def __init__(self, ui, capabilities=()):
478 self.ui = ui
478 self.ui = ui
479 self._params = []
479 self._params = []
480 self._parts = []
480 self._parts = []
481 self.capabilities = dict(capabilities)
481 self.capabilities = dict(capabilities)
482 self._compressor = util.compressors[None]()
482 self._compressor = util.compressors[None]()
483
483
484 def setcompression(self, alg):
484 def setcompression(self, alg):
485 """setup core part compression to <alg>"""
485 """setup core part compression to <alg>"""
486 if alg is None:
486 if alg is None:
487 return
487 return
488 assert not any(n.lower() == 'Compression' for n, v in self._params)
488 assert not any(n.lower() == 'Compression' for n, v in self._params)
489 self.addparam('Compression', alg)
489 self.addparam('Compression', alg)
490 self._compressor = util.compressors[alg]()
490 self._compressor = util.compressors[alg]()
491
491
492 @property
492 @property
493 def nbparts(self):
493 def nbparts(self):
494 """total number of parts added to the bundler"""
494 """total number of parts added to the bundler"""
495 return len(self._parts)
495 return len(self._parts)
496
496
497 # methods used to defines the bundle2 content
497 # methods used to defines the bundle2 content
498 def addparam(self, name, value=None):
498 def addparam(self, name, value=None):
499 """add a stream level parameter"""
499 """add a stream level parameter"""
500 if not name:
500 if not name:
501 raise ValueError('empty parameter name')
501 raise ValueError('empty parameter name')
502 if name[0] not in string.letters:
502 if name[0] not in string.letters:
503 raise ValueError('non letter first character: %r' % name)
503 raise ValueError('non letter first character: %r' % name)
504 self._params.append((name, value))
504 self._params.append((name, value))
505
505
506 def addpart(self, part):
506 def addpart(self, part):
507 """add a new part to the bundle2 container
507 """add a new part to the bundle2 container
508
508
509 Parts contains the actual applicative payload."""
509 Parts contains the actual applicative payload."""
510 assert part.id is None
510 assert part.id is None
511 part.id = len(self._parts) # very cheap counter
511 part.id = len(self._parts) # very cheap counter
512 self._parts.append(part)
512 self._parts.append(part)
513
513
514 def newpart(self, typeid, *args, **kwargs):
514 def newpart(self, typeid, *args, **kwargs):
515 """create a new part and add it to the containers
515 """create a new part and add it to the containers
516
516
517 As the part is directly added to the containers. For now, this means
517 As the part is directly added to the containers. For now, this means
518 that any failure to properly initialize the part after calling
518 that any failure to properly initialize the part after calling
519 ``newpart`` should result in a failure of the whole bundling process.
519 ``newpart`` should result in a failure of the whole bundling process.
520
520
521 You can still fall back to manually create and add if you need better
521 You can still fall back to manually create and add if you need better
522 control."""
522 control."""
523 part = bundlepart(typeid, *args, **kwargs)
523 part = bundlepart(typeid, *args, **kwargs)
524 self.addpart(part)
524 self.addpart(part)
525 return part
525 return part
526
526
527 # methods used to generate the bundle2 stream
527 # methods used to generate the bundle2 stream
528 def getchunks(self):
528 def getchunks(self):
529 if self.ui.debugflag:
529 if self.ui.debugflag:
530 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
530 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
531 if self._params:
531 if self._params:
532 msg.append(' (%i params)' % len(self._params))
532 msg.append(' (%i params)' % len(self._params))
533 msg.append(' %i parts total\n' % len(self._parts))
533 msg.append(' %i parts total\n' % len(self._parts))
534 self.ui.debug(''.join(msg))
534 self.ui.debug(''.join(msg))
535 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
535 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
536 yield self._magicstring
536 yield self._magicstring
537 param = self._paramchunk()
537 param = self._paramchunk()
538 outdebug(self.ui, 'bundle parameter: %s' % param)
538 outdebug(self.ui, 'bundle parameter: %s' % param)
539 yield _pack(_fstreamparamsize, len(param))
539 yield _pack(_fstreamparamsize, len(param))
540 if param:
540 if param:
541 yield param
541 yield param
542 # starting compression
542 # starting compression
543 for chunk in self._getcorechunk():
543 for chunk in self._getcorechunk():
544 yield self._compressor.compress(chunk)
544 yield self._compressor.compress(chunk)
545 yield self._compressor.flush()
545 yield self._compressor.flush()
546
546
547 def _paramchunk(self):
547 def _paramchunk(self):
548 """return a encoded version of all stream parameters"""
548 """return a encoded version of all stream parameters"""
549 blocks = []
549 blocks = []
550 for par, value in self._params:
550 for par, value in self._params:
551 par = urllib.quote(par)
551 par = urllib.quote(par)
552 if value is not None:
552 if value is not None:
553 value = urllib.quote(value)
553 value = urllib.quote(value)
554 par = '%s=%s' % (par, value)
554 par = '%s=%s' % (par, value)
555 blocks.append(par)
555 blocks.append(par)
556 return ' '.join(blocks)
556 return ' '.join(blocks)
557
557
558 def _getcorechunk(self):
558 def _getcorechunk(self):
559 """yield chunk for the core part of the bundle
559 """yield chunk for the core part of the bundle
560
560
561 (all but headers and parameters)"""
561 (all but headers and parameters)"""
562 outdebug(self.ui, 'start of parts')
562 outdebug(self.ui, 'start of parts')
563 for part in self._parts:
563 for part in self._parts:
564 outdebug(self.ui, 'bundle part: "%s"' % part.type)
564 outdebug(self.ui, 'bundle part: "%s"' % part.type)
565 for chunk in part.getchunks(ui=self.ui):
565 for chunk in part.getchunks(ui=self.ui):
566 yield chunk
566 yield chunk
567 outdebug(self.ui, 'end of bundle')
567 outdebug(self.ui, 'end of bundle')
568 yield _pack(_fpartheadersize, 0)
568 yield _pack(_fpartheadersize, 0)
569
569
570
570
571 def salvageoutput(self):
571 def salvageoutput(self):
572 """return a list with a copy of all output parts in the bundle
572 """return a list with a copy of all output parts in the bundle
573
573
574 This is meant to be used during error handling to make sure we preserve
574 This is meant to be used during error handling to make sure we preserve
575 server output"""
575 server output"""
576 salvaged = []
576 salvaged = []
577 for part in self._parts:
577 for part in self._parts:
578 if part.type.startswith('output'):
578 if part.type.startswith('output'):
579 salvaged.append(part.copy())
579 salvaged.append(part.copy())
580 return salvaged
580 return salvaged
581
581
582
582
583 class unpackermixin(object):
583 class unpackermixin(object):
584 """A mixin to extract bytes and struct data from a stream"""
584 """A mixin to extract bytes and struct data from a stream"""
585
585
586 def __init__(self, fp):
586 def __init__(self, fp):
587 self._fp = fp
587 self._fp = fp
588 self._seekable = (util.safehasattr(fp, 'seek') and
588 self._seekable = (util.safehasattr(fp, 'seek') and
589 util.safehasattr(fp, 'tell'))
589 util.safehasattr(fp, 'tell'))
590
590
591 def _unpack(self, format):
591 def _unpack(self, format):
592 """unpack this struct format from the stream"""
592 """unpack this struct format from the stream"""
593 data = self._readexact(struct.calcsize(format))
593 data = self._readexact(struct.calcsize(format))
594 return _unpack(format, data)
594 return _unpack(format, data)
595
595
596 def _readexact(self, size):
596 def _readexact(self, size):
597 """read exactly <size> bytes from the stream"""
597 """read exactly <size> bytes from the stream"""
598 return changegroup.readexactly(self._fp, size)
598 return changegroup.readexactly(self._fp, size)
599
599
600 def seek(self, offset, whence=0):
600 def seek(self, offset, whence=0):
601 """move the underlying file pointer"""
601 """move the underlying file pointer"""
602 if self._seekable:
602 if self._seekable:
603 return self._fp.seek(offset, whence)
603 return self._fp.seek(offset, whence)
604 else:
604 else:
605 raise NotImplementedError(_('File pointer is not seekable'))
605 raise NotImplementedError(_('File pointer is not seekable'))
606
606
607 def tell(self):
607 def tell(self):
608 """return the file offset, or None if file is not seekable"""
608 """return the file offset, or None if file is not seekable"""
609 if self._seekable:
609 if self._seekable:
610 try:
610 try:
611 return self._fp.tell()
611 return self._fp.tell()
612 except IOError as e:
612 except IOError as e:
613 if e.errno == errno.ESPIPE:
613 if e.errno == errno.ESPIPE:
614 self._seekable = False
614 self._seekable = False
615 else:
615 else:
616 raise
616 raise
617 return None
617 return None
618
618
619 def close(self):
619 def close(self):
620 """close underlying file"""
620 """close underlying file"""
621 if util.safehasattr(self._fp, 'close'):
621 if util.safehasattr(self._fp, 'close'):
622 return self._fp.close()
622 return self._fp.close()
623
623
624 def getunbundler(ui, fp, magicstring=None):
624 def getunbundler(ui, fp, magicstring=None):
625 """return a valid unbundler object for a given magicstring"""
625 """return a valid unbundler object for a given magicstring"""
626 if magicstring is None:
626 if magicstring is None:
627 magicstring = changegroup.readexactly(fp, 4)
627 magicstring = changegroup.readexactly(fp, 4)
628 magic, version = magicstring[0:2], magicstring[2:4]
628 magic, version = magicstring[0:2], magicstring[2:4]
629 if magic != 'HG':
629 if magic != 'HG':
630 raise error.Abort(_('not a Mercurial bundle'))
630 raise error.Abort(_('not a Mercurial bundle'))
631 unbundlerclass = formatmap.get(version)
631 unbundlerclass = formatmap.get(version)
632 if unbundlerclass is None:
632 if unbundlerclass is None:
633 raise error.Abort(_('unknown bundle version %s') % version)
633 raise error.Abort(_('unknown bundle version %s') % version)
634 unbundler = unbundlerclass(ui, fp)
634 unbundler = unbundlerclass(ui, fp)
635 indebug(ui, 'start processing of %s stream' % magicstring)
635 indebug(ui, 'start processing of %s stream' % magicstring)
636 return unbundler
636 return unbundler
637
637
638 class unbundle20(unpackermixin):
638 class unbundle20(unpackermixin):
639 """interpret a bundle2 stream
639 """interpret a bundle2 stream
640
640
641 This class is fed with a binary stream and yields parts through its
641 This class is fed with a binary stream and yields parts through its
642 `iterparts` methods."""
642 `iterparts` methods."""
643
643
644 _magicstring = 'HG20'
644 _magicstring = 'HG20'
645
645
646 def __init__(self, ui, fp):
646 def __init__(self, ui, fp):
647 """If header is specified, we do not read it out of the stream."""
647 """If header is specified, we do not read it out of the stream."""
648 self.ui = ui
648 self.ui = ui
649 self._decompressor = util.decompressors[None]
649 self._decompressor = util.decompressors[None]
650 super(unbundle20, self).__init__(fp)
650 super(unbundle20, self).__init__(fp)
651
651
652 @util.propertycache
652 @util.propertycache
653 def params(self):
653 def params(self):
654 """dictionary of stream level parameters"""
654 """dictionary of stream level parameters"""
655 indebug(self.ui, 'reading bundle2 stream parameters')
655 indebug(self.ui, 'reading bundle2 stream parameters')
656 params = {}
656 params = {}
657 paramssize = self._unpack(_fstreamparamsize)[0]
657 paramssize = self._unpack(_fstreamparamsize)[0]
658 if paramssize < 0:
658 if paramssize < 0:
659 raise error.BundleValueError('negative bundle param size: %i'
659 raise error.BundleValueError('negative bundle param size: %i'
660 % paramssize)
660 % paramssize)
661 if paramssize:
661 if paramssize:
662 params = self._readexact(paramssize)
662 params = self._readexact(paramssize)
663 params = self._processallparams(params)
663 params = self._processallparams(params)
664 return params
664 return params
665
665
666 def _processallparams(self, paramsblock):
666 def _processallparams(self, paramsblock):
667 """"""
667 """"""
668 params = {}
668 params = {}
669 for p in paramsblock.split(' '):
669 for p in paramsblock.split(' '):
670 p = p.split('=', 1)
670 p = p.split('=', 1)
671 p = [urllib.unquote(i) for i in p]
671 p = [urllib.unquote(i) for i in p]
672 if len(p) < 2:
672 if len(p) < 2:
673 p.append(None)
673 p.append(None)
674 self._processparam(*p)
674 self._processparam(*p)
675 params[p[0]] = p[1]
675 params[p[0]] = p[1]
676 return params
676 return params
677
677
678
678
679 def _processparam(self, name, value):
679 def _processparam(self, name, value):
680 """process a parameter, applying its effect if needed
680 """process a parameter, applying its effect if needed
681
681
682 Parameter starting with a lower case letter are advisory and will be
682 Parameter starting with a lower case letter are advisory and will be
683 ignored when unknown. Those starting with an upper case letter are
683 ignored when unknown. Those starting with an upper case letter are
684 mandatory and will this function will raise a KeyError when unknown.
684 mandatory and will this function will raise a KeyError when unknown.
685
685
686 Note: no option are currently supported. Any input will be either
686 Note: no option are currently supported. Any input will be either
687 ignored or failing.
687 ignored or failing.
688 """
688 """
689 if not name:
689 if not name:
690 raise ValueError('empty parameter name')
690 raise ValueError('empty parameter name')
691 if name[0] not in string.letters:
691 if name[0] not in string.letters:
692 raise ValueError('non letter first character: %r' % name)
692 raise ValueError('non letter first character: %r' % name)
693 try:
693 try:
694 handler = b2streamparamsmap[name.lower()]
694 handler = b2streamparamsmap[name.lower()]
695 except KeyError:
695 except KeyError:
696 if name[0].islower():
696 if name[0].islower():
697 indebug(self.ui, "ignoring unknown parameter %r" % name)
697 indebug(self.ui, "ignoring unknown parameter %r" % name)
698 else:
698 else:
699 raise error.BundleUnknownFeatureError(params=(name,))
699 raise error.BundleUnknownFeatureError(params=(name,))
700 else:
700 else:
701 handler(self, name, value)
701 handler(self, name, value)
702
702
703 def _forwardchunks(self):
703 def _forwardchunks(self):
704 """utility to transfer a bundle2 as binary
704 """utility to transfer a bundle2 as binary
705
705
706 This is made necessary by the fact the 'getbundle' command over 'ssh'
706 This is made necessary by the fact the 'getbundle' command over 'ssh'
707 have no way to know then the reply end, relying on the bundle to be
707 have no way to know then the reply end, relying on the bundle to be
708 interpreted to know its end. This is terrible and we are sorry, but we
708 interpreted to know its end. This is terrible and we are sorry, but we
709 needed to move forward to get general delta enabled.
709 needed to move forward to get general delta enabled.
710 """
710 """
711 yield self._magicstring
711 yield self._magicstring
712 assert 'params' not in vars(self)
712 assert 'params' not in vars(self)
713 paramssize = self._unpack(_fstreamparamsize)[0]
713 paramssize = self._unpack(_fstreamparamsize)[0]
714 if paramssize < 0:
714 if paramssize < 0:
715 raise error.BundleValueError('negative bundle param size: %i'
715 raise error.BundleValueError('negative bundle param size: %i'
716 % paramssize)
716 % paramssize)
717 yield _pack(_fstreamparamsize, paramssize)
717 yield _pack(_fstreamparamsize, paramssize)
718 if paramssize:
718 if paramssize:
719 params = self._readexact(paramssize)
719 params = self._readexact(paramssize)
720 self._processallparams(params)
720 self._processallparams(params)
721 yield params
721 yield params
722 assert self._decompressor is util.decompressors[None]
722 assert self._decompressor is util.decompressors[None]
723 # From there, payload might need to be decompressed
723 # From there, payload might need to be decompressed
724 self._fp = self._decompressor(self._fp)
724 self._fp = self._decompressor(self._fp)
725 emptycount = 0
725 emptycount = 0
726 while emptycount < 2:
726 while emptycount < 2:
727 # so we can brainlessly loop
727 # so we can brainlessly loop
728 assert _fpartheadersize == _fpayloadsize
728 assert _fpartheadersize == _fpayloadsize
729 size = self._unpack(_fpartheadersize)[0]
729 size = self._unpack(_fpartheadersize)[0]
730 yield _pack(_fpartheadersize, size)
730 yield _pack(_fpartheadersize, size)
731 if size:
731 if size:
732 emptycount = 0
732 emptycount = 0
733 else:
733 else:
734 emptycount += 1
734 emptycount += 1
735 continue
735 continue
736 if size == flaginterrupt:
736 if size == flaginterrupt:
737 continue
737 continue
738 elif size < 0:
738 elif size < 0:
739 raise error.BundleValueError('negative chunk size: %i')
739 raise error.BundleValueError('negative chunk size: %i')
740 yield self._readexact(size)
740 yield self._readexact(size)
741
741
742
742
743 def iterparts(self):
743 def iterparts(self):
744 """yield all parts contained in the stream"""
744 """yield all parts contained in the stream"""
745 # make sure param have been loaded
745 # make sure param have been loaded
746 self.params
746 self.params
747 # From there, payload need to be decompressed
747 # From there, payload need to be decompressed
748 self._fp = self._decompressor(self._fp)
748 self._fp = self._decompressor(self._fp)
749 indebug(self.ui, 'start extraction of bundle2 parts')
749 indebug(self.ui, 'start extraction of bundle2 parts')
750 headerblock = self._readpartheader()
750 headerblock = self._readpartheader()
751 while headerblock is not None:
751 while headerblock is not None:
752 part = unbundlepart(self.ui, headerblock, self._fp)
752 part = unbundlepart(self.ui, headerblock, self._fp)
753 yield part
753 yield part
754 part.seek(0, 2)
754 part.seek(0, 2)
755 headerblock = self._readpartheader()
755 headerblock = self._readpartheader()
756 indebug(self.ui, 'end of bundle2 stream')
756 indebug(self.ui, 'end of bundle2 stream')
757
757
758 def _readpartheader(self):
758 def _readpartheader(self):
759 """reads a part header size and return the bytes blob
759 """reads a part header size and return the bytes blob
760
760
761 returns None if empty"""
761 returns None if empty"""
762 headersize = self._unpack(_fpartheadersize)[0]
762 headersize = self._unpack(_fpartheadersize)[0]
763 if headersize < 0:
763 if headersize < 0:
764 raise error.BundleValueError('negative part header size: %i'
764 raise error.BundleValueError('negative part header size: %i'
765 % headersize)
765 % headersize)
766 indebug(self.ui, 'part header size: %i' % headersize)
766 indebug(self.ui, 'part header size: %i' % headersize)
767 if headersize:
767 if headersize:
768 return self._readexact(headersize)
768 return self._readexact(headersize)
769 return None
769 return None
770
770
771 def compressed(self):
771 def compressed(self):
772 return False
772 return False
773
773
774 formatmap = {'20': unbundle20}
774 formatmap = {'20': unbundle20}
775
775
776 b2streamparamsmap = {}
776 b2streamparamsmap = {}
777
777
778 def b2streamparamhandler(name):
778 def b2streamparamhandler(name):
779 """register a handler for a stream level parameter"""
779 """register a handler for a stream level parameter"""
780 def decorator(func):
780 def decorator(func):
781 assert name not in formatmap
781 assert name not in formatmap
782 b2streamparamsmap[name] = func
782 b2streamparamsmap[name] = func
783 return func
783 return func
784 return decorator
784 return decorator
785
785
786 @b2streamparamhandler('compression')
786 @b2streamparamhandler('compression')
787 def processcompression(unbundler, param, value):
787 def processcompression(unbundler, param, value):
788 """read compression parameter and install payload decompression"""
788 """read compression parameter and install payload decompression"""
789 if value not in util.decompressors:
789 if value not in util.decompressors:
790 raise error.BundleUnknownFeatureError(params=(param,),
790 raise error.BundleUnknownFeatureError(params=(param,),
791 values=(value,))
791 values=(value,))
792 unbundler._decompressor = util.decompressors[value]
792 unbundler._decompressor = util.decompressors[value]
793
793
794 class bundlepart(object):
794 class bundlepart(object):
795 """A bundle2 part contains application level payload
795 """A bundle2 part contains application level payload
796
796
797 The part `type` is used to route the part to the application level
797 The part `type` is used to route the part to the application level
798 handler.
798 handler.
799
799
800 The part payload is contained in ``part.data``. It could be raw bytes or a
800 The part payload is contained in ``part.data``. It could be raw bytes or a
801 generator of byte chunks.
801 generator of byte chunks.
802
802
803 You can add parameters to the part using the ``addparam`` method.
803 You can add parameters to the part using the ``addparam`` method.
804 Parameters can be either mandatory (default) or advisory. Remote side
804 Parameters can be either mandatory (default) or advisory. Remote side
805 should be able to safely ignore the advisory ones.
805 should be able to safely ignore the advisory ones.
806
806
807 Both data and parameters cannot be modified after the generation has begun.
807 Both data and parameters cannot be modified after the generation has begun.
808 """
808 """
809
809
810 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
810 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
811 data='', mandatory=True):
811 data='', mandatory=True):
812 validateparttype(parttype)
812 validateparttype(parttype)
813 self.id = None
813 self.id = None
814 self.type = parttype
814 self.type = parttype
815 self._data = data
815 self._data = data
816 self._mandatoryparams = list(mandatoryparams)
816 self._mandatoryparams = list(mandatoryparams)
817 self._advisoryparams = list(advisoryparams)
817 self._advisoryparams = list(advisoryparams)
818 # checking for duplicated entries
818 # checking for duplicated entries
819 self._seenparams = set()
819 self._seenparams = set()
820 for pname, __ in self._mandatoryparams + self._advisoryparams:
820 for pname, __ in self._mandatoryparams + self._advisoryparams:
821 if pname in self._seenparams:
821 if pname in self._seenparams:
822 raise RuntimeError('duplicated params: %s' % pname)
822 raise RuntimeError('duplicated params: %s' % pname)
823 self._seenparams.add(pname)
823 self._seenparams.add(pname)
824 # status of the part's generation:
824 # status of the part's generation:
825 # - None: not started,
825 # - None: not started,
826 # - False: currently generated,
826 # - False: currently generated,
827 # - True: generation done.
827 # - True: generation done.
828 self._generated = None
828 self._generated = None
829 self.mandatory = mandatory
829 self.mandatory = mandatory
830
830
831 def copy(self):
831 def copy(self):
832 """return a copy of the part
832 """return a copy of the part
833
833
834 The new part have the very same content but no partid assigned yet.
834 The new part have the very same content but no partid assigned yet.
835 Parts with generated data cannot be copied."""
835 Parts with generated data cannot be copied."""
836 assert not util.safehasattr(self.data, 'next')
836 assert not util.safehasattr(self.data, 'next')
837 return self.__class__(self.type, self._mandatoryparams,
837 return self.__class__(self.type, self._mandatoryparams,
838 self._advisoryparams, self._data, self.mandatory)
838 self._advisoryparams, self._data, self.mandatory)
839
839
840 # methods used to defines the part content
840 # methods used to defines the part content
841 def __setdata(self, data):
841 def __setdata(self, data):
842 if self._generated is not None:
842 if self._generated is not None:
843 raise error.ReadOnlyPartError('part is being generated')
843 raise error.ReadOnlyPartError('part is being generated')
844 self._data = data
844 self._data = data
845 def __getdata(self):
845 def __getdata(self):
846 return self._data
846 return self._data
847 data = property(__getdata, __setdata)
847 data = property(__getdata, __setdata)
848
848
849 @property
849 @property
850 def mandatoryparams(self):
850 def mandatoryparams(self):
851 # make it an immutable tuple to force people through ``addparam``
851 # make it an immutable tuple to force people through ``addparam``
852 return tuple(self._mandatoryparams)
852 return tuple(self._mandatoryparams)
853
853
854 @property
854 @property
855 def advisoryparams(self):
855 def advisoryparams(self):
856 # make it an immutable tuple to force people through ``addparam``
856 # make it an immutable tuple to force people through ``addparam``
857 return tuple(self._advisoryparams)
857 return tuple(self._advisoryparams)
858
858
859 def addparam(self, name, value='', mandatory=True):
859 def addparam(self, name, value='', mandatory=True):
860 if self._generated is not None:
860 if self._generated is not None:
861 raise error.ReadOnlyPartError('part is being generated')
861 raise error.ReadOnlyPartError('part is being generated')
862 if name in self._seenparams:
862 if name in self._seenparams:
863 raise ValueError('duplicated params: %s' % name)
863 raise ValueError('duplicated params: %s' % name)
864 self._seenparams.add(name)
864 self._seenparams.add(name)
865 params = self._advisoryparams
865 params = self._advisoryparams
866 if mandatory:
866 if mandatory:
867 params = self._mandatoryparams
867 params = self._mandatoryparams
868 params.append((name, value))
868 params.append((name, value))
869
869
870 # methods used to generates the bundle2 stream
870 # methods used to generates the bundle2 stream
871 def getchunks(self, ui):
871 def getchunks(self, ui):
872 if self._generated is not None:
872 if self._generated is not None:
873 raise RuntimeError('part can only be consumed once')
873 raise RuntimeError('part can only be consumed once')
874 self._generated = False
874 self._generated = False
875
875
876 if ui.debugflag:
876 if ui.debugflag:
877 msg = ['bundle2-output-part: "%s"' % self.type]
877 msg = ['bundle2-output-part: "%s"' % self.type]
878 if not self.mandatory:
878 if not self.mandatory:
879 msg.append(' (advisory)')
879 msg.append(' (advisory)')
880 nbmp = len(self.mandatoryparams)
880 nbmp = len(self.mandatoryparams)
881 nbap = len(self.advisoryparams)
881 nbap = len(self.advisoryparams)
882 if nbmp or nbap:
882 if nbmp or nbap:
883 msg.append(' (params:')
883 msg.append(' (params:')
884 if nbmp:
884 if nbmp:
885 msg.append(' %i mandatory' % nbmp)
885 msg.append(' %i mandatory' % nbmp)
886 if nbap:
886 if nbap:
887 msg.append(' %i advisory' % nbmp)
887 msg.append(' %i advisory' % nbmp)
888 msg.append(')')
888 msg.append(')')
889 if not self.data:
889 if not self.data:
890 msg.append(' empty payload')
890 msg.append(' empty payload')
891 elif util.safehasattr(self.data, 'next'):
891 elif util.safehasattr(self.data, 'next'):
892 msg.append(' streamed payload')
892 msg.append(' streamed payload')
893 else:
893 else:
894 msg.append(' %i bytes payload' % len(self.data))
894 msg.append(' %i bytes payload' % len(self.data))
895 msg.append('\n')
895 msg.append('\n')
896 ui.debug(''.join(msg))
896 ui.debug(''.join(msg))
897
897
898 #### header
898 #### header
899 if self.mandatory:
899 if self.mandatory:
900 parttype = self.type.upper()
900 parttype = self.type.upper()
901 else:
901 else:
902 parttype = self.type.lower()
902 parttype = self.type.lower()
903 outdebug(ui, 'part %s: "%s"' % (self.id, parttype))
903 outdebug(ui, 'part %s: "%s"' % (self.id, parttype))
904 ## parttype
904 ## parttype
905 header = [_pack(_fparttypesize, len(parttype)),
905 header = [_pack(_fparttypesize, len(parttype)),
906 parttype, _pack(_fpartid, self.id),
906 parttype, _pack(_fpartid, self.id),
907 ]
907 ]
908 ## parameters
908 ## parameters
909 # count
909 # count
910 manpar = self.mandatoryparams
910 manpar = self.mandatoryparams
911 advpar = self.advisoryparams
911 advpar = self.advisoryparams
912 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
912 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
913 # size
913 # size
914 parsizes = []
914 parsizes = []
915 for key, value in manpar:
915 for key, value in manpar:
916 parsizes.append(len(key))
916 parsizes.append(len(key))
917 parsizes.append(len(value))
917 parsizes.append(len(value))
918 for key, value in advpar:
918 for key, value in advpar:
919 parsizes.append(len(key))
919 parsizes.append(len(key))
920 parsizes.append(len(value))
920 parsizes.append(len(value))
921 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
921 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
922 header.append(paramsizes)
922 header.append(paramsizes)
923 # key, value
923 # key, value
924 for key, value in manpar:
924 for key, value in manpar:
925 header.append(key)
925 header.append(key)
926 header.append(value)
926 header.append(value)
927 for key, value in advpar:
927 for key, value in advpar:
928 header.append(key)
928 header.append(key)
929 header.append(value)
929 header.append(value)
930 ## finalize header
930 ## finalize header
931 headerchunk = ''.join(header)
931 headerchunk = ''.join(header)
932 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
932 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
933 yield _pack(_fpartheadersize, len(headerchunk))
933 yield _pack(_fpartheadersize, len(headerchunk))
934 yield headerchunk
934 yield headerchunk
935 ## payload
935 ## payload
936 try:
936 try:
937 for chunk in self._payloadchunks():
937 for chunk in self._payloadchunks():
938 outdebug(ui, 'payload chunk size: %i' % len(chunk))
938 outdebug(ui, 'payload chunk size: %i' % len(chunk))
939 yield _pack(_fpayloadsize, len(chunk))
939 yield _pack(_fpayloadsize, len(chunk))
940 yield chunk
940 yield chunk
941 except GeneratorExit:
941 except GeneratorExit:
942 # GeneratorExit means that nobody is listening for our
942 # GeneratorExit means that nobody is listening for our
943 # results anyway, so just bail quickly rather than trying
943 # results anyway, so just bail quickly rather than trying
944 # to produce an error part.
944 # to produce an error part.
945 ui.debug('bundle2-generatorexit\n')
945 ui.debug('bundle2-generatorexit\n')
946 raise
946 raise
947 except BaseException as exc:
947 except BaseException as exc:
948 # backup exception data for later
948 # backup exception data for later
949 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
949 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
950 % exc)
950 % exc)
951 exc_info = sys.exc_info()
951 exc_info = sys.exc_info()
952 msg = 'unexpected error: %s' % exc
952 msg = 'unexpected error: %s' % exc
953 interpart = bundlepart('error:abort', [('message', msg)],
953 interpart = bundlepart('error:abort', [('message', msg)],
954 mandatory=False)
954 mandatory=False)
955 interpart.id = 0
955 interpart.id = 0
956 yield _pack(_fpayloadsize, -1)
956 yield _pack(_fpayloadsize, -1)
957 for chunk in interpart.getchunks(ui=ui):
957 for chunk in interpart.getchunks(ui=ui):
958 yield chunk
958 yield chunk
959 outdebug(ui, 'closing payload chunk')
959 outdebug(ui, 'closing payload chunk')
960 # abort current part payload
960 # abort current part payload
961 yield _pack(_fpayloadsize, 0)
961 yield _pack(_fpayloadsize, 0)
962 raise exc_info[0], exc_info[1], exc_info[2]
962 raise exc_info[0], exc_info[1], exc_info[2]
963 # end of payload
963 # end of payload
964 outdebug(ui, 'closing payload chunk')
964 outdebug(ui, 'closing payload chunk')
965 yield _pack(_fpayloadsize, 0)
965 yield _pack(_fpayloadsize, 0)
966 self._generated = True
966 self._generated = True
967
967
968 def _payloadchunks(self):
968 def _payloadchunks(self):
969 """yield chunks of a the part payload
969 """yield chunks of a the part payload
970
970
971 Exists to handle the different methods to provide data to a part."""
971 Exists to handle the different methods to provide data to a part."""
972 # we only support fixed size data now.
972 # we only support fixed size data now.
973 # This will be improved in the future.
973 # This will be improved in the future.
974 if util.safehasattr(self.data, 'next'):
974 if util.safehasattr(self.data, 'next'):
975 buff = util.chunkbuffer(self.data)
975 buff = util.chunkbuffer(self.data)
976 chunk = buff.read(preferedchunksize)
976 chunk = buff.read(preferedchunksize)
977 while chunk:
977 while chunk:
978 yield chunk
978 yield chunk
979 chunk = buff.read(preferedchunksize)
979 chunk = buff.read(preferedchunksize)
980 elif len(self.data):
980 elif len(self.data):
981 yield self.data
981 yield self.data
982
982
983
983
984 flaginterrupt = -1
984 flaginterrupt = -1
985
985
986 class interrupthandler(unpackermixin):
986 class interrupthandler(unpackermixin):
987 """read one part and process it with restricted capability
987 """read one part and process it with restricted capability
988
988
989 This allows to transmit exception raised on the producer size during part
989 This allows to transmit exception raised on the producer size during part
990 iteration while the consumer is reading a part.
990 iteration while the consumer is reading a part.
991
991
992 Part processed in this manner only have access to a ui object,"""
992 Part processed in this manner only have access to a ui object,"""
993
993
994 def __init__(self, ui, fp):
994 def __init__(self, ui, fp):
995 super(interrupthandler, self).__init__(fp)
995 super(interrupthandler, self).__init__(fp)
996 self.ui = ui
996 self.ui = ui
997
997
998 def _readpartheader(self):
998 def _readpartheader(self):
999 """reads a part header size and return the bytes blob
999 """reads a part header size and return the bytes blob
1000
1000
1001 returns None if empty"""
1001 returns None if empty"""
1002 headersize = self._unpack(_fpartheadersize)[0]
1002 headersize = self._unpack(_fpartheadersize)[0]
1003 if headersize < 0:
1003 if headersize < 0:
1004 raise error.BundleValueError('negative part header size: %i'
1004 raise error.BundleValueError('negative part header size: %i'
1005 % headersize)
1005 % headersize)
1006 indebug(self.ui, 'part header size: %i\n' % headersize)
1006 indebug(self.ui, 'part header size: %i\n' % headersize)
1007 if headersize:
1007 if headersize:
1008 return self._readexact(headersize)
1008 return self._readexact(headersize)
1009 return None
1009 return None
1010
1010
1011 def __call__(self):
1011 def __call__(self):
1012
1012
1013 self.ui.debug('bundle2-input-stream-interrupt:'
1013 self.ui.debug('bundle2-input-stream-interrupt:'
1014 ' opening out of band context\n')
1014 ' opening out of band context\n')
1015 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1015 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1016 headerblock = self._readpartheader()
1016 headerblock = self._readpartheader()
1017 if headerblock is None:
1017 if headerblock is None:
1018 indebug(self.ui, 'no part found during interruption.')
1018 indebug(self.ui, 'no part found during interruption.')
1019 return
1019 return
1020 part = unbundlepart(self.ui, headerblock, self._fp)
1020 part = unbundlepart(self.ui, headerblock, self._fp)
1021 op = interruptoperation(self.ui)
1021 op = interruptoperation(self.ui)
1022 _processpart(op, part)
1022 _processpart(op, part)
1023 self.ui.debug('bundle2-input-stream-interrupt:'
1023 self.ui.debug('bundle2-input-stream-interrupt:'
1024 ' closing out of band context\n')
1024 ' closing out of band context\n')
1025
1025
1026 class interruptoperation(object):
1026 class interruptoperation(object):
1027 """A limited operation to be use by part handler during interruption
1027 """A limited operation to be use by part handler during interruption
1028
1028
1029 It only have access to an ui object.
1029 It only have access to an ui object.
1030 """
1030 """
1031
1031
1032 def __init__(self, ui):
1032 def __init__(self, ui):
1033 self.ui = ui
1033 self.ui = ui
1034 self.reply = None
1034 self.reply = None
1035 self.captureoutput = False
1035 self.captureoutput = False
1036
1036
1037 @property
1037 @property
1038 def repo(self):
1038 def repo(self):
1039 raise RuntimeError('no repo access from stream interruption')
1039 raise RuntimeError('no repo access from stream interruption')
1040
1040
1041 def gettransaction(self):
1041 def gettransaction(self):
1042 raise TransactionUnavailable('no repo access from stream interruption')
1042 raise TransactionUnavailable('no repo access from stream interruption')
1043
1043
1044 class unbundlepart(unpackermixin):
1044 class unbundlepart(unpackermixin):
1045 """a bundle part read from a bundle"""
1045 """a bundle part read from a bundle"""
1046
1046
1047 def __init__(self, ui, header, fp):
1047 def __init__(self, ui, header, fp):
1048 super(unbundlepart, self).__init__(fp)
1048 super(unbundlepart, self).__init__(fp)
1049 self.ui = ui
1049 self.ui = ui
1050 # unbundle state attr
1050 # unbundle state attr
1051 self._headerdata = header
1051 self._headerdata = header
1052 self._headeroffset = 0
1052 self._headeroffset = 0
1053 self._initialized = False
1053 self._initialized = False
1054 self.consumed = False
1054 self.consumed = False
1055 # part data
1055 # part data
1056 self.id = None
1056 self.id = None
1057 self.type = None
1057 self.type = None
1058 self.mandatoryparams = None
1058 self.mandatoryparams = None
1059 self.advisoryparams = None
1059 self.advisoryparams = None
1060 self.params = None
1060 self.params = None
1061 self.mandatorykeys = ()
1061 self.mandatorykeys = ()
1062 self._payloadstream = None
1062 self._payloadstream = None
1063 self._readheader()
1063 self._readheader()
1064 self._mandatory = None
1064 self._mandatory = None
1065 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1065 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1066 self._pos = 0
1066 self._pos = 0
1067
1067
1068 def _fromheader(self, size):
1068 def _fromheader(self, size):
1069 """return the next <size> byte from the header"""
1069 """return the next <size> byte from the header"""
1070 offset = self._headeroffset
1070 offset = self._headeroffset
1071 data = self._headerdata[offset:(offset + size)]
1071 data = self._headerdata[offset:(offset + size)]
1072 self._headeroffset = offset + size
1072 self._headeroffset = offset + size
1073 return data
1073 return data
1074
1074
1075 def _unpackheader(self, format):
1075 def _unpackheader(self, format):
1076 """read given format from header
1076 """read given format from header
1077
1077
1078 This automatically compute the size of the format to read."""
1078 This automatically compute the size of the format to read."""
1079 data = self._fromheader(struct.calcsize(format))
1079 data = self._fromheader(struct.calcsize(format))
1080 return _unpack(format, data)
1080 return _unpack(format, data)
1081
1081
1082 def _initparams(self, mandatoryparams, advisoryparams):
1082 def _initparams(self, mandatoryparams, advisoryparams):
1083 """internal function to setup all logic related parameters"""
1083 """internal function to setup all logic related parameters"""
1084 # make it read only to prevent people touching it by mistake.
1084 # make it read only to prevent people touching it by mistake.
1085 self.mandatoryparams = tuple(mandatoryparams)
1085 self.mandatoryparams = tuple(mandatoryparams)
1086 self.advisoryparams = tuple(advisoryparams)
1086 self.advisoryparams = tuple(advisoryparams)
1087 # user friendly UI
1087 # user friendly UI
1088 self.params = dict(self.mandatoryparams)
1088 self.params = dict(self.mandatoryparams)
1089 self.params.update(dict(self.advisoryparams))
1089 self.params.update(dict(self.advisoryparams))
1090 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1090 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1091
1091
1092 def _payloadchunks(self, chunknum=0):
1092 def _payloadchunks(self, chunknum=0):
1093 '''seek to specified chunk and start yielding data'''
1093 '''seek to specified chunk and start yielding data'''
1094 if len(self._chunkindex) == 0:
1094 if len(self._chunkindex) == 0:
1095 assert chunknum == 0, 'Must start with chunk 0'
1095 assert chunknum == 0, 'Must start with chunk 0'
1096 self._chunkindex.append((0, super(unbundlepart, self).tell()))
1096 self._chunkindex.append((0, super(unbundlepart, self).tell()))
1097 else:
1097 else:
1098 assert chunknum < len(self._chunkindex), \
1098 assert chunknum < len(self._chunkindex), \
1099 'Unknown chunk %d' % chunknum
1099 'Unknown chunk %d' % chunknum
1100 super(unbundlepart, self).seek(self._chunkindex[chunknum][1])
1100 super(unbundlepart, self).seek(self._chunkindex[chunknum][1])
1101
1101
1102 pos = self._chunkindex[chunknum][0]
1102 pos = self._chunkindex[chunknum][0]
1103 payloadsize = self._unpack(_fpayloadsize)[0]
1103 payloadsize = self._unpack(_fpayloadsize)[0]
1104 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1104 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1105 while payloadsize:
1105 while payloadsize:
1106 if payloadsize == flaginterrupt:
1106 if payloadsize == flaginterrupt:
1107 # interruption detection, the handler will now read a
1107 # interruption detection, the handler will now read a
1108 # single part and process it.
1108 # single part and process it.
1109 interrupthandler(self.ui, self._fp)()
1109 interrupthandler(self.ui, self._fp)()
1110 elif payloadsize < 0:
1110 elif payloadsize < 0:
1111 msg = 'negative payload chunk size: %i' % payloadsize
1111 msg = 'negative payload chunk size: %i' % payloadsize
1112 raise error.BundleValueError(msg)
1112 raise error.BundleValueError(msg)
1113 else:
1113 else:
1114 result = self._readexact(payloadsize)
1114 result = self._readexact(payloadsize)
1115 chunknum += 1
1115 chunknum += 1
1116 pos += payloadsize
1116 pos += payloadsize
1117 if chunknum == len(self._chunkindex):
1117 if chunknum == len(self._chunkindex):
1118 self._chunkindex.append((pos,
1118 self._chunkindex.append((pos,
1119 super(unbundlepart, self).tell()))
1119 super(unbundlepart, self).tell()))
1120 yield result
1120 yield result
1121 payloadsize = self._unpack(_fpayloadsize)[0]
1121 payloadsize = self._unpack(_fpayloadsize)[0]
1122 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1122 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1123
1123
1124 def _findchunk(self, pos):
1124 def _findchunk(self, pos):
1125 '''for a given payload position, return a chunk number and offset'''
1125 '''for a given payload position, return a chunk number and offset'''
1126 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1126 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1127 if ppos == pos:
1127 if ppos == pos:
1128 return chunk, 0
1128 return chunk, 0
1129 elif ppos > pos:
1129 elif ppos > pos:
1130 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1130 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1131 raise ValueError('Unknown chunk')
1131 raise ValueError('Unknown chunk')
1132
1132
1133 def _readheader(self):
1133 def _readheader(self):
1134 """read the header and setup the object"""
1134 """read the header and setup the object"""
1135 typesize = self._unpackheader(_fparttypesize)[0]
1135 typesize = self._unpackheader(_fparttypesize)[0]
1136 self.type = self._fromheader(typesize)
1136 self.type = self._fromheader(typesize)
1137 indebug(self.ui, 'part type: "%s"' % self.type)
1137 indebug(self.ui, 'part type: "%s"' % self.type)
1138 self.id = self._unpackheader(_fpartid)[0]
1138 self.id = self._unpackheader(_fpartid)[0]
1139 indebug(self.ui, 'part id: "%s"' % self.id)
1139 indebug(self.ui, 'part id: "%s"' % self.id)
1140 # extract mandatory bit from type
1140 # extract mandatory bit from type
1141 self.mandatory = (self.type != self.type.lower())
1141 self.mandatory = (self.type != self.type.lower())
1142 self.type = self.type.lower()
1142 self.type = self.type.lower()
1143 ## reading parameters
1143 ## reading parameters
1144 # param count
1144 # param count
1145 mancount, advcount = self._unpackheader(_fpartparamcount)
1145 mancount, advcount = self._unpackheader(_fpartparamcount)
1146 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1146 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1147 # param size
1147 # param size
1148 fparamsizes = _makefpartparamsizes(mancount + advcount)
1148 fparamsizes = _makefpartparamsizes(mancount + advcount)
1149 paramsizes = self._unpackheader(fparamsizes)
1149 paramsizes = self._unpackheader(fparamsizes)
1150 # make it a list of couple again
1150 # make it a list of couple again
1151 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
1151 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
1152 # split mandatory from advisory
1152 # split mandatory from advisory
1153 mansizes = paramsizes[:mancount]
1153 mansizes = paramsizes[:mancount]
1154 advsizes = paramsizes[mancount:]
1154 advsizes = paramsizes[mancount:]
1155 # retrieve param value
1155 # retrieve param value
1156 manparams = []
1156 manparams = []
1157 for key, value in mansizes:
1157 for key, value in mansizes:
1158 manparams.append((self._fromheader(key), self._fromheader(value)))
1158 manparams.append((self._fromheader(key), self._fromheader(value)))
1159 advparams = []
1159 advparams = []
1160 for key, value in advsizes:
1160 for key, value in advsizes:
1161 advparams.append((self._fromheader(key), self._fromheader(value)))
1161 advparams.append((self._fromheader(key), self._fromheader(value)))
1162 self._initparams(manparams, advparams)
1162 self._initparams(manparams, advparams)
1163 ## part payload
1163 ## part payload
1164 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1164 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1165 # we read the data, tell it
1165 # we read the data, tell it
1166 self._initialized = True
1166 self._initialized = True
1167
1167
1168 def read(self, size=None):
1168 def read(self, size=None):
1169 """read payload data"""
1169 """read payload data"""
1170 if not self._initialized:
1170 if not self._initialized:
1171 self._readheader()
1171 self._readheader()
1172 if size is None:
1172 if size is None:
1173 data = self._payloadstream.read()
1173 data = self._payloadstream.read()
1174 else:
1174 else:
1175 data = self._payloadstream.read(size)
1175 data = self._payloadstream.read(size)
1176 self._pos += len(data)
1176 self._pos += len(data)
1177 if size is None or len(data) < size:
1177 if size is None or len(data) < size:
1178 if not self.consumed and self._pos:
1178 if not self.consumed and self._pos:
1179 self.ui.debug('bundle2-input-part: total payload size %i\n'
1179 self.ui.debug('bundle2-input-part: total payload size %i\n'
1180 % self._pos)
1180 % self._pos)
1181 self.consumed = True
1181 self.consumed = True
1182 return data
1182 return data
1183
1183
1184 def tell(self):
1184 def tell(self):
1185 return self._pos
1185 return self._pos
1186
1186
1187 def seek(self, offset, whence=0):
1187 def seek(self, offset, whence=0):
1188 if whence == 0:
1188 if whence == 0:
1189 newpos = offset
1189 newpos = offset
1190 elif whence == 1:
1190 elif whence == 1:
1191 newpos = self._pos + offset
1191 newpos = self._pos + offset
1192 elif whence == 2:
1192 elif whence == 2:
1193 if not self.consumed:
1193 if not self.consumed:
1194 self.read()
1194 self.read()
1195 newpos = self._chunkindex[-1][0] - offset
1195 newpos = self._chunkindex[-1][0] - offset
1196 else:
1196 else:
1197 raise ValueError('Unknown whence value: %r' % (whence,))
1197 raise ValueError('Unknown whence value: %r' % (whence,))
1198
1198
1199 if newpos > self._chunkindex[-1][0] and not self.consumed:
1199 if newpos > self._chunkindex[-1][0] and not self.consumed:
1200 self.read()
1200 self.read()
1201 if not 0 <= newpos <= self._chunkindex[-1][0]:
1201 if not 0 <= newpos <= self._chunkindex[-1][0]:
1202 raise ValueError('Offset out of range')
1202 raise ValueError('Offset out of range')
1203
1203
1204 if self._pos != newpos:
1204 if self._pos != newpos:
1205 chunk, internaloffset = self._findchunk(newpos)
1205 chunk, internaloffset = self._findchunk(newpos)
1206 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1206 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1207 adjust = self.read(internaloffset)
1207 adjust = self.read(internaloffset)
1208 if len(adjust) != internaloffset:
1208 if len(adjust) != internaloffset:
1209 raise error.Abort(_('Seek failed\n'))
1209 raise error.Abort(_('Seek failed\n'))
1210 self._pos = newpos
1210 self._pos = newpos
1211
1211
1212 # These are only the static capabilities.
1212 # These are only the static capabilities.
1213 # Check the 'getrepocaps' function for the rest.
1213 # Check the 'getrepocaps' function for the rest.
1214 capabilities = {'HG20': (),
1214 capabilities = {'HG20': (),
1215 'error': ('abort', 'unsupportedcontent', 'pushraced',
1215 'error': ('abort', 'unsupportedcontent', 'pushraced',
1216 'pushkey'),
1216 'pushkey'),
1217 'listkeys': (),
1217 'listkeys': (),
1218 'pushkey': (),
1218 'pushkey': (),
1219 'digests': tuple(sorted(util.DIGESTS.keys())),
1219 'digests': tuple(sorted(util.DIGESTS.keys())),
1220 'remote-changegroup': ('http', 'https'),
1220 'remote-changegroup': ('http', 'https'),
1221 'hgtagsfnodes': (),
1221 'hgtagsfnodes': (),
1222 }
1222 }
1223
1223
1224 def getrepocaps(repo, allowpushback=False):
1224 def getrepocaps(repo, allowpushback=False):
1225 """return the bundle2 capabilities for a given repo
1225 """return the bundle2 capabilities for a given repo
1226
1226
1227 Exists to allow extensions (like evolution) to mutate the capabilities.
1227 Exists to allow extensions (like evolution) to mutate the capabilities.
1228 """
1228 """
1229 caps = capabilities.copy()
1229 caps = capabilities.copy()
1230 caps['changegroup'] = tuple(sorted(changegroup.packermap.keys()))
1230 caps['changegroup'] = tuple(sorted(changegroup.packermap.keys()))
1231 if obsolete.isenabled(repo, obsolete.exchangeopt):
1231 if obsolete.isenabled(repo, obsolete.exchangeopt):
1232 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1232 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1233 caps['obsmarkers'] = supportedformat
1233 caps['obsmarkers'] = supportedformat
1234 if allowpushback:
1234 if allowpushback:
1235 caps['pushback'] = ()
1235 caps['pushback'] = ()
1236 return caps
1236 return caps
1237
1237
1238 def bundle2caps(remote):
1238 def bundle2caps(remote):
1239 """return the bundle capabilities of a peer as dict"""
1239 """return the bundle capabilities of a peer as dict"""
1240 raw = remote.capable('bundle2')
1240 raw = remote.capable('bundle2')
1241 if not raw and raw != '':
1241 if not raw and raw != '':
1242 return {}
1242 return {}
1243 capsblob = urllib.unquote(remote.capable('bundle2'))
1243 capsblob = urllib.unquote(remote.capable('bundle2'))
1244 return decodecaps(capsblob)
1244 return decodecaps(capsblob)
1245
1245
1246 def obsmarkersversion(caps):
1246 def obsmarkersversion(caps):
1247 """extract the list of supported obsmarkers versions from a bundle2caps dict
1247 """extract the list of supported obsmarkers versions from a bundle2caps dict
1248 """
1248 """
1249 obscaps = caps.get('obsmarkers', ())
1249 obscaps = caps.get('obsmarkers', ())
1250 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1250 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1251
1251
1252 @parthandler('changegroup', ('version', 'nbchanges'))
1252 @parthandler('changegroup', ('version', 'nbchanges'))
1253 def handlechangegroup(op, inpart):
1253 def handlechangegroup(op, inpart):
1254 """apply a changegroup part on the repo
1254 """apply a changegroup part on the repo
1255
1255
1256 This is a very early implementation that will massive rework before being
1256 This is a very early implementation that will massive rework before being
1257 inflicted to any end-user.
1257 inflicted to any end-user.
1258 """
1258 """
1259 # Make sure we trigger a transaction creation
1259 # Make sure we trigger a transaction creation
1260 #
1260 #
1261 # The addchangegroup function will get a transaction object by itself, but
1261 # The addchangegroup function will get a transaction object by itself, but
1262 # we need to make sure we trigger the creation of a transaction object used
1262 # we need to make sure we trigger the creation of a transaction object used
1263 # for the whole processing scope.
1263 # for the whole processing scope.
1264 op.gettransaction()
1264 op.gettransaction()
1265 unpackerversion = inpart.params.get('version', '01')
1265 unpackerversion = inpart.params.get('version', '01')
1266 # We should raise an appropriate exception here
1266 # We should raise an appropriate exception here
1267 unpacker = changegroup.packermap[unpackerversion][1]
1267 unpacker = changegroup.packermap[unpackerversion][1]
1268 cg = unpacker(inpart, None)
1268 cg = unpacker(inpart, None)
1269 # the source and url passed here are overwritten by the one contained in
1269 # the source and url passed here are overwritten by the one contained in
1270 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1270 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1271 nbchangesets = None
1271 nbchangesets = None
1272 if 'nbchanges' in inpart.params:
1272 if 'nbchanges' in inpart.params:
1273 nbchangesets = int(inpart.params.get('nbchanges'))
1273 nbchangesets = int(inpart.params.get('nbchanges'))
1274 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2',
1274 ret = cg.apply(op.repo, 'bundle2', 'bundle2', expectedtotal=nbchangesets)
1275 expectedtotal=nbchangesets)
1276 op.records.add('changegroup', {'return': ret})
1275 op.records.add('changegroup', {'return': ret})
1277 if op.reply is not None:
1276 if op.reply is not None:
1278 # This is definitely not the final form of this
1277 # This is definitely not the final form of this
1279 # return. But one need to start somewhere.
1278 # return. But one need to start somewhere.
1280 part = op.reply.newpart('reply:changegroup', mandatory=False)
1279 part = op.reply.newpart('reply:changegroup', mandatory=False)
1281 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1280 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1282 part.addparam('return', '%i' % ret, mandatory=False)
1281 part.addparam('return', '%i' % ret, mandatory=False)
1283 assert not inpart.read()
1282 assert not inpart.read()
1284
1283
1285 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1284 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1286 ['digest:%s' % k for k in util.DIGESTS.keys()])
1285 ['digest:%s' % k for k in util.DIGESTS.keys()])
1287 @parthandler('remote-changegroup', _remotechangegroupparams)
1286 @parthandler('remote-changegroup', _remotechangegroupparams)
1288 def handleremotechangegroup(op, inpart):
1287 def handleremotechangegroup(op, inpart):
1289 """apply a bundle10 on the repo, given an url and validation information
1288 """apply a bundle10 on the repo, given an url and validation information
1290
1289
1291 All the information about the remote bundle to import are given as
1290 All the information about the remote bundle to import are given as
1292 parameters. The parameters include:
1291 parameters. The parameters include:
1293 - url: the url to the bundle10.
1292 - url: the url to the bundle10.
1294 - size: the bundle10 file size. It is used to validate what was
1293 - size: the bundle10 file size. It is used to validate what was
1295 retrieved by the client matches the server knowledge about the bundle.
1294 retrieved by the client matches the server knowledge about the bundle.
1296 - digests: a space separated list of the digest types provided as
1295 - digests: a space separated list of the digest types provided as
1297 parameters.
1296 parameters.
1298 - digest:<digest-type>: the hexadecimal representation of the digest with
1297 - digest:<digest-type>: the hexadecimal representation of the digest with
1299 that name. Like the size, it is used to validate what was retrieved by
1298 that name. Like the size, it is used to validate what was retrieved by
1300 the client matches what the server knows about the bundle.
1299 the client matches what the server knows about the bundle.
1301
1300
1302 When multiple digest types are given, all of them are checked.
1301 When multiple digest types are given, all of them are checked.
1303 """
1302 """
1304 try:
1303 try:
1305 raw_url = inpart.params['url']
1304 raw_url = inpart.params['url']
1306 except KeyError:
1305 except KeyError:
1307 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1306 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1308 parsed_url = util.url(raw_url)
1307 parsed_url = util.url(raw_url)
1309 if parsed_url.scheme not in capabilities['remote-changegroup']:
1308 if parsed_url.scheme not in capabilities['remote-changegroup']:
1310 raise error.Abort(_('remote-changegroup does not support %s urls') %
1309 raise error.Abort(_('remote-changegroup does not support %s urls') %
1311 parsed_url.scheme)
1310 parsed_url.scheme)
1312
1311
1313 try:
1312 try:
1314 size = int(inpart.params['size'])
1313 size = int(inpart.params['size'])
1315 except ValueError:
1314 except ValueError:
1316 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1315 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1317 % 'size')
1316 % 'size')
1318 except KeyError:
1317 except KeyError:
1319 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1318 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1320
1319
1321 digests = {}
1320 digests = {}
1322 for typ in inpart.params.get('digests', '').split():
1321 for typ in inpart.params.get('digests', '').split():
1323 param = 'digest:%s' % typ
1322 param = 'digest:%s' % typ
1324 try:
1323 try:
1325 value = inpart.params[param]
1324 value = inpart.params[param]
1326 except KeyError:
1325 except KeyError:
1327 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1326 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1328 param)
1327 param)
1329 digests[typ] = value
1328 digests[typ] = value
1330
1329
1331 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1330 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1332
1331
1333 # Make sure we trigger a transaction creation
1332 # Make sure we trigger a transaction creation
1334 #
1333 #
1335 # The addchangegroup function will get a transaction object by itself, but
1334 # The addchangegroup function will get a transaction object by itself, but
1336 # we need to make sure we trigger the creation of a transaction object used
1335 # we need to make sure we trigger the creation of a transaction object used
1337 # for the whole processing scope.
1336 # for the whole processing scope.
1338 op.gettransaction()
1337 op.gettransaction()
1339 from . import exchange
1338 from . import exchange
1340 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1339 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1341 if not isinstance(cg, changegroup.cg1unpacker):
1340 if not isinstance(cg, changegroup.cg1unpacker):
1342 raise error.Abort(_('%s: not a bundle version 1.0') %
1341 raise error.Abort(_('%s: not a bundle version 1.0') %
1343 util.hidepassword(raw_url))
1342 util.hidepassword(raw_url))
1344 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
1343 ret = cg.apply(op.repo, 'bundle2', 'bundle2')
1345 op.records.add('changegroup', {'return': ret})
1344 op.records.add('changegroup', {'return': ret})
1346 if op.reply is not None:
1345 if op.reply is not None:
1347 # This is definitely not the final form of this
1346 # This is definitely not the final form of this
1348 # return. But one need to start somewhere.
1347 # return. But one need to start somewhere.
1349 part = op.reply.newpart('reply:changegroup')
1348 part = op.reply.newpart('reply:changegroup')
1350 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1349 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1351 part.addparam('return', '%i' % ret, mandatory=False)
1350 part.addparam('return', '%i' % ret, mandatory=False)
1352 try:
1351 try:
1353 real_part.validate()
1352 real_part.validate()
1354 except error.Abort as e:
1353 except error.Abort as e:
1355 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1354 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1356 (util.hidepassword(raw_url), str(e)))
1355 (util.hidepassword(raw_url), str(e)))
1357 assert not inpart.read()
1356 assert not inpart.read()
1358
1357
1359 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1358 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1360 def handlereplychangegroup(op, inpart):
1359 def handlereplychangegroup(op, inpart):
1361 ret = int(inpart.params['return'])
1360 ret = int(inpart.params['return'])
1362 replyto = int(inpart.params['in-reply-to'])
1361 replyto = int(inpart.params['in-reply-to'])
1363 op.records.add('changegroup', {'return': ret}, replyto)
1362 op.records.add('changegroup', {'return': ret}, replyto)
1364
1363
1365 @parthandler('check:heads')
1364 @parthandler('check:heads')
1366 def handlecheckheads(op, inpart):
1365 def handlecheckheads(op, inpart):
1367 """check that head of the repo did not change
1366 """check that head of the repo did not change
1368
1367
1369 This is used to detect a push race when using unbundle.
1368 This is used to detect a push race when using unbundle.
1370 This replaces the "heads" argument of unbundle."""
1369 This replaces the "heads" argument of unbundle."""
1371 h = inpart.read(20)
1370 h = inpart.read(20)
1372 heads = []
1371 heads = []
1373 while len(h) == 20:
1372 while len(h) == 20:
1374 heads.append(h)
1373 heads.append(h)
1375 h = inpart.read(20)
1374 h = inpart.read(20)
1376 assert not h
1375 assert not h
1377 # Trigger a transaction so that we are guaranteed to have the lock now.
1376 # Trigger a transaction so that we are guaranteed to have the lock now.
1378 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1377 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1379 op.gettransaction()
1378 op.gettransaction()
1380 if heads != op.repo.heads():
1379 if heads != op.repo.heads():
1381 raise error.PushRaced('repository changed while pushing - '
1380 raise error.PushRaced('repository changed while pushing - '
1382 'please try again')
1381 'please try again')
1383
1382
1384 @parthandler('output')
1383 @parthandler('output')
1385 def handleoutput(op, inpart):
1384 def handleoutput(op, inpart):
1386 """forward output captured on the server to the client"""
1385 """forward output captured on the server to the client"""
1387 for line in inpart.read().splitlines():
1386 for line in inpart.read().splitlines():
1388 op.ui.status(('remote: %s\n' % line))
1387 op.ui.status(('remote: %s\n' % line))
1389
1388
1390 @parthandler('replycaps')
1389 @parthandler('replycaps')
1391 def handlereplycaps(op, inpart):
1390 def handlereplycaps(op, inpart):
1392 """Notify that a reply bundle should be created
1391 """Notify that a reply bundle should be created
1393
1392
1394 The payload contains the capabilities information for the reply"""
1393 The payload contains the capabilities information for the reply"""
1395 caps = decodecaps(inpart.read())
1394 caps = decodecaps(inpart.read())
1396 if op.reply is None:
1395 if op.reply is None:
1397 op.reply = bundle20(op.ui, caps)
1396 op.reply = bundle20(op.ui, caps)
1398
1397
1399 @parthandler('error:abort', ('message', 'hint'))
1398 @parthandler('error:abort', ('message', 'hint'))
1400 def handleerrorabort(op, inpart):
1399 def handleerrorabort(op, inpart):
1401 """Used to transmit abort error over the wire"""
1400 """Used to transmit abort error over the wire"""
1402 raise error.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
1401 raise error.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
1403
1402
1404 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1403 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1405 'in-reply-to'))
1404 'in-reply-to'))
1406 def handleerrorpushkey(op, inpart):
1405 def handleerrorpushkey(op, inpart):
1407 """Used to transmit failure of a mandatory pushkey over the wire"""
1406 """Used to transmit failure of a mandatory pushkey over the wire"""
1408 kwargs = {}
1407 kwargs = {}
1409 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1408 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1410 value = inpart.params.get(name)
1409 value = inpart.params.get(name)
1411 if value is not None:
1410 if value is not None:
1412 kwargs[name] = value
1411 kwargs[name] = value
1413 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1412 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1414
1413
1415 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1414 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1416 def handleerrorunsupportedcontent(op, inpart):
1415 def handleerrorunsupportedcontent(op, inpart):
1417 """Used to transmit unknown content error over the wire"""
1416 """Used to transmit unknown content error over the wire"""
1418 kwargs = {}
1417 kwargs = {}
1419 parttype = inpart.params.get('parttype')
1418 parttype = inpart.params.get('parttype')
1420 if parttype is not None:
1419 if parttype is not None:
1421 kwargs['parttype'] = parttype
1420 kwargs['parttype'] = parttype
1422 params = inpart.params.get('params')
1421 params = inpart.params.get('params')
1423 if params is not None:
1422 if params is not None:
1424 kwargs['params'] = params.split('\0')
1423 kwargs['params'] = params.split('\0')
1425
1424
1426 raise error.BundleUnknownFeatureError(**kwargs)
1425 raise error.BundleUnknownFeatureError(**kwargs)
1427
1426
1428 @parthandler('error:pushraced', ('message',))
1427 @parthandler('error:pushraced', ('message',))
1429 def handleerrorpushraced(op, inpart):
1428 def handleerrorpushraced(op, inpart):
1430 """Used to transmit push race error over the wire"""
1429 """Used to transmit push race error over the wire"""
1431 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1430 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1432
1431
1433 @parthandler('listkeys', ('namespace',))
1432 @parthandler('listkeys', ('namespace',))
1434 def handlelistkeys(op, inpart):
1433 def handlelistkeys(op, inpart):
1435 """retrieve pushkey namespace content stored in a bundle2"""
1434 """retrieve pushkey namespace content stored in a bundle2"""
1436 namespace = inpart.params['namespace']
1435 namespace = inpart.params['namespace']
1437 r = pushkey.decodekeys(inpart.read())
1436 r = pushkey.decodekeys(inpart.read())
1438 op.records.add('listkeys', (namespace, r))
1437 op.records.add('listkeys', (namespace, r))
1439
1438
1440 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1439 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1441 def handlepushkey(op, inpart):
1440 def handlepushkey(op, inpart):
1442 """process a pushkey request"""
1441 """process a pushkey request"""
1443 dec = pushkey.decode
1442 dec = pushkey.decode
1444 namespace = dec(inpart.params['namespace'])
1443 namespace = dec(inpart.params['namespace'])
1445 key = dec(inpart.params['key'])
1444 key = dec(inpart.params['key'])
1446 old = dec(inpart.params['old'])
1445 old = dec(inpart.params['old'])
1447 new = dec(inpart.params['new'])
1446 new = dec(inpart.params['new'])
1448 # Grab the transaction to ensure that we have the lock before performing the
1447 # Grab the transaction to ensure that we have the lock before performing the
1449 # pushkey.
1448 # pushkey.
1450 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1449 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1451 op.gettransaction()
1450 op.gettransaction()
1452 ret = op.repo.pushkey(namespace, key, old, new)
1451 ret = op.repo.pushkey(namespace, key, old, new)
1453 record = {'namespace': namespace,
1452 record = {'namespace': namespace,
1454 'key': key,
1453 'key': key,
1455 'old': old,
1454 'old': old,
1456 'new': new}
1455 'new': new}
1457 op.records.add('pushkey', record)
1456 op.records.add('pushkey', record)
1458 if op.reply is not None:
1457 if op.reply is not None:
1459 rpart = op.reply.newpart('reply:pushkey')
1458 rpart = op.reply.newpart('reply:pushkey')
1460 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1459 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1461 rpart.addparam('return', '%i' % ret, mandatory=False)
1460 rpart.addparam('return', '%i' % ret, mandatory=False)
1462 if inpart.mandatory and not ret:
1461 if inpart.mandatory and not ret:
1463 kwargs = {}
1462 kwargs = {}
1464 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1463 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1465 if key in inpart.params:
1464 if key in inpart.params:
1466 kwargs[key] = inpart.params[key]
1465 kwargs[key] = inpart.params[key]
1467 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1466 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1468
1467
1469 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1468 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1470 def handlepushkeyreply(op, inpart):
1469 def handlepushkeyreply(op, inpart):
1471 """retrieve the result of a pushkey request"""
1470 """retrieve the result of a pushkey request"""
1472 ret = int(inpart.params['return'])
1471 ret = int(inpart.params['return'])
1473 partid = int(inpart.params['in-reply-to'])
1472 partid = int(inpart.params['in-reply-to'])
1474 op.records.add('pushkey', {'return': ret}, partid)
1473 op.records.add('pushkey', {'return': ret}, partid)
1475
1474
1476 @parthandler('obsmarkers')
1475 @parthandler('obsmarkers')
1477 def handleobsmarker(op, inpart):
1476 def handleobsmarker(op, inpart):
1478 """add a stream of obsmarkers to the repo"""
1477 """add a stream of obsmarkers to the repo"""
1479 tr = op.gettransaction()
1478 tr = op.gettransaction()
1480 markerdata = inpart.read()
1479 markerdata = inpart.read()
1481 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1480 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1482 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1481 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1483 % len(markerdata))
1482 % len(markerdata))
1484 # The mergemarkers call will crash if marker creation is not enabled.
1483 # The mergemarkers call will crash if marker creation is not enabled.
1485 # we want to avoid this if the part is advisory.
1484 # we want to avoid this if the part is advisory.
1486 if not inpart.mandatory and op.repo.obsstore.readonly:
1485 if not inpart.mandatory and op.repo.obsstore.readonly:
1487 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1486 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1488 return
1487 return
1489 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1488 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1490 if new:
1489 if new:
1491 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1490 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1492 op.records.add('obsmarkers', {'new': new})
1491 op.records.add('obsmarkers', {'new': new})
1493 if op.reply is not None:
1492 if op.reply is not None:
1494 rpart = op.reply.newpart('reply:obsmarkers')
1493 rpart = op.reply.newpart('reply:obsmarkers')
1495 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1494 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1496 rpart.addparam('new', '%i' % new, mandatory=False)
1495 rpart.addparam('new', '%i' % new, mandatory=False)
1497
1496
1498
1497
1499 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1498 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1500 def handleobsmarkerreply(op, inpart):
1499 def handleobsmarkerreply(op, inpart):
1501 """retrieve the result of a pushkey request"""
1500 """retrieve the result of a pushkey request"""
1502 ret = int(inpart.params['new'])
1501 ret = int(inpart.params['new'])
1503 partid = int(inpart.params['in-reply-to'])
1502 partid = int(inpart.params['in-reply-to'])
1504 op.records.add('obsmarkers', {'new': ret}, partid)
1503 op.records.add('obsmarkers', {'new': ret}, partid)
1505
1504
1506 @parthandler('hgtagsfnodes')
1505 @parthandler('hgtagsfnodes')
1507 def handlehgtagsfnodes(op, inpart):
1506 def handlehgtagsfnodes(op, inpart):
1508 """Applies .hgtags fnodes cache entries to the local repo.
1507 """Applies .hgtags fnodes cache entries to the local repo.
1509
1508
1510 Payload is pairs of 20 byte changeset nodes and filenodes.
1509 Payload is pairs of 20 byte changeset nodes and filenodes.
1511 """
1510 """
1512 # Grab the transaction so we ensure that we have the lock at this point.
1511 # Grab the transaction so we ensure that we have the lock at this point.
1513 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1512 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1514 op.gettransaction()
1513 op.gettransaction()
1515 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1514 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1516
1515
1517 count = 0
1516 count = 0
1518 while True:
1517 while True:
1519 node = inpart.read(20)
1518 node = inpart.read(20)
1520 fnode = inpart.read(20)
1519 fnode = inpart.read(20)
1521 if len(node) < 20 or len(fnode) < 20:
1520 if len(node) < 20 or len(fnode) < 20:
1522 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1521 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1523 break
1522 break
1524 cache.setfnode(node, fnode)
1523 cache.setfnode(node, fnode)
1525 count += 1
1524 count += 1
1526
1525
1527 cache.write()
1526 cache.write()
1528 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
1527 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
General Comments 0
You need to be logged in to leave comments. Login now