##// END OF EJS Templates
applybundle: take url as argument...
Pierre-Yves David -
r26795:dff05b3f default
parent child Browse files
Show More
@@ -1,1534 +1,1536 b''
1 # bundle2.py - generic container format to transmit arbitrary data.
1 # bundle2.py - generic container format to transmit arbitrary data.
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7 """Handling of the new bundle2 format
7 """Handling of the new bundle2 format
8
8
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 payloads in an application agnostic way. It consist in a sequence of "parts"
10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 that will be handed to and processed by the application layer.
11 that will be handed to and processed by the application layer.
12
12
13
13
14 General format architecture
14 General format architecture
15 ===========================
15 ===========================
16
16
17 The format is architectured as follow
17 The format is architectured as follow
18
18
19 - magic string
19 - magic string
20 - stream level parameters
20 - stream level parameters
21 - payload parts (any number)
21 - payload parts (any number)
22 - end of stream marker.
22 - end of stream marker.
23
23
24 the Binary format
24 the Binary format
25 ============================
25 ============================
26
26
27 All numbers are unsigned and big-endian.
27 All numbers are unsigned and big-endian.
28
28
29 stream level parameters
29 stream level parameters
30 ------------------------
30 ------------------------
31
31
32 Binary format is as follow
32 Binary format is as follow
33
33
34 :params size: int32
34 :params size: int32
35
35
36 The total number of Bytes used by the parameters
36 The total number of Bytes used by the parameters
37
37
38 :params value: arbitrary number of Bytes
38 :params value: arbitrary number of Bytes
39
39
40 A blob of `params size` containing the serialized version of all stream level
40 A blob of `params size` containing the serialized version of all stream level
41 parameters.
41 parameters.
42
42
43 The blob contains a space separated list of parameters. Parameters with value
43 The blob contains a space separated list of parameters. Parameters with value
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45
45
46 Empty name are obviously forbidden.
46 Empty name are obviously forbidden.
47
47
48 Name MUST start with a letter. If this first letter is lower case, the
48 Name MUST start with a letter. If this first letter is lower case, the
49 parameter is advisory and can be safely ignored. However when the first
49 parameter is advisory and can be safely ignored. However when the first
50 letter is capital, the parameter is mandatory and the bundling process MUST
50 letter is capital, the parameter is mandatory and the bundling process MUST
51 stop if he is not able to proceed it.
51 stop if he is not able to proceed it.
52
52
53 Stream parameters use a simple textual format for two main reasons:
53 Stream parameters use a simple textual format for two main reasons:
54
54
55 - Stream level parameters should remain simple and we want to discourage any
55 - Stream level parameters should remain simple and we want to discourage any
56 crazy usage.
56 crazy usage.
57 - Textual data allow easy human inspection of a bundle2 header in case of
57 - Textual data allow easy human inspection of a bundle2 header in case of
58 troubles.
58 troubles.
59
59
60 Any Applicative level options MUST go into a bundle2 part instead.
60 Any Applicative level options MUST go into a bundle2 part instead.
61
61
62 Payload part
62 Payload part
63 ------------------------
63 ------------------------
64
64
65 Binary format is as follow
65 Binary format is as follow
66
66
67 :header size: int32
67 :header size: int32
68
68
69 The total number of Bytes used by the part header. When the header is empty
69 The total number of Bytes used by the part header. When the header is empty
70 (size = 0) this is interpreted as the end of stream marker.
70 (size = 0) this is interpreted as the end of stream marker.
71
71
72 :header:
72 :header:
73
73
74 The header defines how to interpret the part. It contains two piece of
74 The header defines how to interpret the part. It contains two piece of
75 data: the part type, and the part parameters.
75 data: the part type, and the part parameters.
76
76
77 The part type is used to route an application level handler, that can
77 The part type is used to route an application level handler, that can
78 interpret payload.
78 interpret payload.
79
79
80 Part parameters are passed to the application level handler. They are
80 Part parameters are passed to the application level handler. They are
81 meant to convey information that will help the application level object to
81 meant to convey information that will help the application level object to
82 interpret the part payload.
82 interpret the part payload.
83
83
84 The binary format of the header is has follow
84 The binary format of the header is has follow
85
85
86 :typesize: (one byte)
86 :typesize: (one byte)
87
87
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89
89
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 to this part.
91 to this part.
92
92
93 :parameters:
93 :parameters:
94
94
95 Part's parameter may have arbitrary content, the binary structure is::
95 Part's parameter may have arbitrary content, the binary structure is::
96
96
97 <mandatory-count><advisory-count><param-sizes><param-data>
97 <mandatory-count><advisory-count><param-sizes><param-data>
98
98
99 :mandatory-count: 1 byte, number of mandatory parameters
99 :mandatory-count: 1 byte, number of mandatory parameters
100
100
101 :advisory-count: 1 byte, number of advisory parameters
101 :advisory-count: 1 byte, number of advisory parameters
102
102
103 :param-sizes:
103 :param-sizes:
104
104
105 N couple of bytes, where N is the total number of parameters. Each
105 N couple of bytes, where N is the total number of parameters. Each
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107
107
108 :param-data:
108 :param-data:
109
109
110 A blob of bytes from which each parameter key and value can be
110 A blob of bytes from which each parameter key and value can be
111 retrieved using the list of size couples stored in the previous
111 retrieved using the list of size couples stored in the previous
112 field.
112 field.
113
113
114 Mandatory parameters comes first, then the advisory ones.
114 Mandatory parameters comes first, then the advisory ones.
115
115
116 Each parameter's key MUST be unique within the part.
116 Each parameter's key MUST be unique within the part.
117
117
118 :payload:
118 :payload:
119
119
120 payload is a series of `<chunksize><chunkdata>`.
120 payload is a series of `<chunksize><chunkdata>`.
121
121
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124
124
125 The current implementation always produces either zero or one chunk.
125 The current implementation always produces either zero or one chunk.
126 This is an implementation limitation that will ultimately be lifted.
126 This is an implementation limitation that will ultimately be lifted.
127
127
128 `chunksize` can be negative to trigger special case processing. No such
128 `chunksize` can be negative to trigger special case processing. No such
129 processing is in place yet.
129 processing is in place yet.
130
130
131 Bundle processing
131 Bundle processing
132 ============================
132 ============================
133
133
134 Each part is processed in order using a "part handler". Handler are registered
134 Each part is processed in order using a "part handler". Handler are registered
135 for a certain part type.
135 for a certain part type.
136
136
137 The matching of a part to its handler is case insensitive. The case of the
137 The matching of a part to its handler is case insensitive. The case of the
138 part type is used to know if a part is mandatory or advisory. If the Part type
138 part type is used to know if a part is mandatory or advisory. If the Part type
139 contains any uppercase char it is considered mandatory. When no handler is
139 contains any uppercase char it is considered mandatory. When no handler is
140 known for a Mandatory part, the process is aborted and an exception is raised.
140 known for a Mandatory part, the process is aborted and an exception is raised.
141 If the part is advisory and no handler is known, the part is ignored. When the
141 If the part is advisory and no handler is known, the part is ignored. When the
142 process is aborted, the full bundle is still read from the stream to keep the
142 process is aborted, the full bundle is still read from the stream to keep the
143 channel usable. But none of the part read from an abort are processed. In the
143 channel usable. But none of the part read from an abort are processed. In the
144 future, dropping the stream may become an option for channel we do not care to
144 future, dropping the stream may become an option for channel we do not care to
145 preserve.
145 preserve.
146 """
146 """
147
147
148 from __future__ import absolute_import
148 from __future__ import absolute_import
149
149
150 import errno
150 import errno
151 import re
151 import re
152 import string
152 import string
153 import struct
153 import struct
154 import sys
154 import sys
155 import urllib
155 import urllib
156
156
157 from .i18n import _
157 from .i18n import _
158 from . import (
158 from . import (
159 changegroup,
159 changegroup,
160 error,
160 error,
161 obsolete,
161 obsolete,
162 pushkey,
162 pushkey,
163 tags,
163 tags,
164 url,
164 url,
165 util,
165 util,
166 )
166 )
167
167
168 _pack = struct.pack
168 _pack = struct.pack
169 _unpack = struct.unpack
169 _unpack = struct.unpack
170
170
171 _fstreamparamsize = '>i'
171 _fstreamparamsize = '>i'
172 _fpartheadersize = '>i'
172 _fpartheadersize = '>i'
173 _fparttypesize = '>B'
173 _fparttypesize = '>B'
174 _fpartid = '>I'
174 _fpartid = '>I'
175 _fpayloadsize = '>i'
175 _fpayloadsize = '>i'
176 _fpartparamcount = '>BB'
176 _fpartparamcount = '>BB'
177
177
178 preferedchunksize = 4096
178 preferedchunksize = 4096
179
179
180 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
180 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
181
181
182 def outdebug(ui, message):
182 def outdebug(ui, message):
183 """debug regarding output stream (bundling)"""
183 """debug regarding output stream (bundling)"""
184 if ui.configbool('devel', 'bundle2.debug', False):
184 if ui.configbool('devel', 'bundle2.debug', False):
185 ui.debug('bundle2-output: %s\n' % message)
185 ui.debug('bundle2-output: %s\n' % message)
186
186
187 def indebug(ui, message):
187 def indebug(ui, message):
188 """debug on input stream (unbundling)"""
188 """debug on input stream (unbundling)"""
189 if ui.configbool('devel', 'bundle2.debug', False):
189 if ui.configbool('devel', 'bundle2.debug', False):
190 ui.debug('bundle2-input: %s\n' % message)
190 ui.debug('bundle2-input: %s\n' % message)
191
191
192 def validateparttype(parttype):
192 def validateparttype(parttype):
193 """raise ValueError if a parttype contains invalid character"""
193 """raise ValueError if a parttype contains invalid character"""
194 if _parttypeforbidden.search(parttype):
194 if _parttypeforbidden.search(parttype):
195 raise ValueError(parttype)
195 raise ValueError(parttype)
196
196
197 def _makefpartparamsizes(nbparams):
197 def _makefpartparamsizes(nbparams):
198 """return a struct format to read part parameter sizes
198 """return a struct format to read part parameter sizes
199
199
200 The number parameters is variable so we need to build that format
200 The number parameters is variable so we need to build that format
201 dynamically.
201 dynamically.
202 """
202 """
203 return '>'+('BB'*nbparams)
203 return '>'+('BB'*nbparams)
204
204
205 parthandlermapping = {}
205 parthandlermapping = {}
206
206
207 def parthandler(parttype, params=()):
207 def parthandler(parttype, params=()):
208 """decorator that register a function as a bundle2 part handler
208 """decorator that register a function as a bundle2 part handler
209
209
210 eg::
210 eg::
211
211
212 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
212 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
213 def myparttypehandler(...):
213 def myparttypehandler(...):
214 '''process a part of type "my part".'''
214 '''process a part of type "my part".'''
215 ...
215 ...
216 """
216 """
217 validateparttype(parttype)
217 validateparttype(parttype)
218 def _decorator(func):
218 def _decorator(func):
219 lparttype = parttype.lower() # enforce lower case matching.
219 lparttype = parttype.lower() # enforce lower case matching.
220 assert lparttype not in parthandlermapping
220 assert lparttype not in parthandlermapping
221 parthandlermapping[lparttype] = func
221 parthandlermapping[lparttype] = func
222 func.params = frozenset(params)
222 func.params = frozenset(params)
223 return func
223 return func
224 return _decorator
224 return _decorator
225
225
226 class unbundlerecords(object):
226 class unbundlerecords(object):
227 """keep record of what happens during and unbundle
227 """keep record of what happens during and unbundle
228
228
229 New records are added using `records.add('cat', obj)`. Where 'cat' is a
229 New records are added using `records.add('cat', obj)`. Where 'cat' is a
230 category of record and obj is an arbitrary object.
230 category of record and obj is an arbitrary object.
231
231
232 `records['cat']` will return all entries of this category 'cat'.
232 `records['cat']` will return all entries of this category 'cat'.
233
233
234 Iterating on the object itself will yield `('category', obj)` tuples
234 Iterating on the object itself will yield `('category', obj)` tuples
235 for all entries.
235 for all entries.
236
236
237 All iterations happens in chronological order.
237 All iterations happens in chronological order.
238 """
238 """
239
239
240 def __init__(self):
240 def __init__(self):
241 self._categories = {}
241 self._categories = {}
242 self._sequences = []
242 self._sequences = []
243 self._replies = {}
243 self._replies = {}
244
244
245 def add(self, category, entry, inreplyto=None):
245 def add(self, category, entry, inreplyto=None):
246 """add a new record of a given category.
246 """add a new record of a given category.
247
247
248 The entry can then be retrieved in the list returned by
248 The entry can then be retrieved in the list returned by
249 self['category']."""
249 self['category']."""
250 self._categories.setdefault(category, []).append(entry)
250 self._categories.setdefault(category, []).append(entry)
251 self._sequences.append((category, entry))
251 self._sequences.append((category, entry))
252 if inreplyto is not None:
252 if inreplyto is not None:
253 self.getreplies(inreplyto).add(category, entry)
253 self.getreplies(inreplyto).add(category, entry)
254
254
255 def getreplies(self, partid):
255 def getreplies(self, partid):
256 """get the records that are replies to a specific part"""
256 """get the records that are replies to a specific part"""
257 return self._replies.setdefault(partid, unbundlerecords())
257 return self._replies.setdefault(partid, unbundlerecords())
258
258
259 def __getitem__(self, cat):
259 def __getitem__(self, cat):
260 return tuple(self._categories.get(cat, ()))
260 return tuple(self._categories.get(cat, ()))
261
261
262 def __iter__(self):
262 def __iter__(self):
263 return iter(self._sequences)
263 return iter(self._sequences)
264
264
265 def __len__(self):
265 def __len__(self):
266 return len(self._sequences)
266 return len(self._sequences)
267
267
268 def __nonzero__(self):
268 def __nonzero__(self):
269 return bool(self._sequences)
269 return bool(self._sequences)
270
270
271 class bundleoperation(object):
271 class bundleoperation(object):
272 """an object that represents a single bundling process
272 """an object that represents a single bundling process
273
273
274 Its purpose is to carry unbundle-related objects and states.
274 Its purpose is to carry unbundle-related objects and states.
275
275
276 A new object should be created at the beginning of each bundle processing.
276 A new object should be created at the beginning of each bundle processing.
277 The object is to be returned by the processing function.
277 The object is to be returned by the processing function.
278
278
279 The object has very little content now it will ultimately contain:
279 The object has very little content now it will ultimately contain:
280 * an access to the repo the bundle is applied to,
280 * an access to the repo the bundle is applied to,
281 * a ui object,
281 * a ui object,
282 * a way to retrieve a transaction to add changes to the repo,
282 * a way to retrieve a transaction to add changes to the repo,
283 * a way to record the result of processing each part,
283 * a way to record the result of processing each part,
284 * a way to construct a bundle response when applicable.
284 * a way to construct a bundle response when applicable.
285 """
285 """
286
286
287 def __init__(self, repo, transactiongetter, captureoutput=True):
287 def __init__(self, repo, transactiongetter, captureoutput=True):
288 self.repo = repo
288 self.repo = repo
289 self.ui = repo.ui
289 self.ui = repo.ui
290 self.records = unbundlerecords()
290 self.records = unbundlerecords()
291 self.gettransaction = transactiongetter
291 self.gettransaction = transactiongetter
292 self.reply = None
292 self.reply = None
293 self.captureoutput = captureoutput
293 self.captureoutput = captureoutput
294
294
295 class TransactionUnavailable(RuntimeError):
295 class TransactionUnavailable(RuntimeError):
296 pass
296 pass
297
297
298 def _notransaction():
298 def _notransaction():
299 """default method to get a transaction while processing a bundle
299 """default method to get a transaction while processing a bundle
300
300
301 Raise an exception to highlight the fact that no transaction was expected
301 Raise an exception to highlight the fact that no transaction was expected
302 to be created"""
302 to be created"""
303 raise TransactionUnavailable()
303 raise TransactionUnavailable()
304
304
305 def applybundle(repo, unbundler, tr, source=None, op=None):
305 def applybundle(repo, unbundler, tr, source=None, url=None, op=None):
306 # transform me into unbundler.apply() as soon as the freeze is lifted
306 # transform me into unbundler.apply() as soon as the freeze is lifted
307 tr.hookargs['bundle2'] = '1'
307 tr.hookargs['bundle2'] = '1'
308 if source is not None and 'source' not in tr.hookargs:
308 if source is not None and 'source' not in tr.hookargs:
309 tr.hookargs['source'] = source
309 tr.hookargs['source'] = source
310 if url is not None and 'url' not in tr.hookargs:
311 tr.hookargs['url'] = url
310 return processbundle(repo, unbundler, lambda: tr, op=op)
312 return processbundle(repo, unbundler, lambda: tr, op=op)
311
313
312 def processbundle(repo, unbundler, transactiongetter=None, op=None):
314 def processbundle(repo, unbundler, transactiongetter=None, op=None):
313 """This function process a bundle, apply effect to/from a repo
315 """This function process a bundle, apply effect to/from a repo
314
316
315 It iterates over each part then searches for and uses the proper handling
317 It iterates over each part then searches for and uses the proper handling
316 code to process the part. Parts are processed in order.
318 code to process the part. Parts are processed in order.
317
319
318 This is very early version of this function that will be strongly reworked
320 This is very early version of this function that will be strongly reworked
319 before final usage.
321 before final usage.
320
322
321 Unknown Mandatory part will abort the process.
323 Unknown Mandatory part will abort the process.
322
324
323 It is temporarily possible to provide a prebuilt bundleoperation to the
325 It is temporarily possible to provide a prebuilt bundleoperation to the
324 function. This is used to ensure output is properly propagated in case of
326 function. This is used to ensure output is properly propagated in case of
325 an error during the unbundling. This output capturing part will likely be
327 an error during the unbundling. This output capturing part will likely be
326 reworked and this ability will probably go away in the process.
328 reworked and this ability will probably go away in the process.
327 """
329 """
328 if op is None:
330 if op is None:
329 if transactiongetter is None:
331 if transactiongetter is None:
330 transactiongetter = _notransaction
332 transactiongetter = _notransaction
331 op = bundleoperation(repo, transactiongetter)
333 op = bundleoperation(repo, transactiongetter)
332 # todo:
334 # todo:
333 # - replace this is a init function soon.
335 # - replace this is a init function soon.
334 # - exception catching
336 # - exception catching
335 unbundler.params
337 unbundler.params
336 if repo.ui.debugflag:
338 if repo.ui.debugflag:
337 msg = ['bundle2-input-bundle:']
339 msg = ['bundle2-input-bundle:']
338 if unbundler.params:
340 if unbundler.params:
339 msg.append(' %i params')
341 msg.append(' %i params')
340 if op.gettransaction is None:
342 if op.gettransaction is None:
341 msg.append(' no-transaction')
343 msg.append(' no-transaction')
342 else:
344 else:
343 msg.append(' with-transaction')
345 msg.append(' with-transaction')
344 msg.append('\n')
346 msg.append('\n')
345 repo.ui.debug(''.join(msg))
347 repo.ui.debug(''.join(msg))
346 iterparts = enumerate(unbundler.iterparts())
348 iterparts = enumerate(unbundler.iterparts())
347 part = None
349 part = None
348 nbpart = 0
350 nbpart = 0
349 try:
351 try:
350 for nbpart, part in iterparts:
352 for nbpart, part in iterparts:
351 _processpart(op, part)
353 _processpart(op, part)
352 except BaseException as exc:
354 except BaseException as exc:
353 for nbpart, part in iterparts:
355 for nbpart, part in iterparts:
354 # consume the bundle content
356 # consume the bundle content
355 part.seek(0, 2)
357 part.seek(0, 2)
356 # Small hack to let caller code distinguish exceptions from bundle2
358 # Small hack to let caller code distinguish exceptions from bundle2
357 # processing from processing the old format. This is mostly
359 # processing from processing the old format. This is mostly
358 # needed to handle different return codes to unbundle according to the
360 # needed to handle different return codes to unbundle according to the
359 # type of bundle. We should probably clean up or drop this return code
361 # type of bundle. We should probably clean up or drop this return code
360 # craziness in a future version.
362 # craziness in a future version.
361 exc.duringunbundle2 = True
363 exc.duringunbundle2 = True
362 salvaged = []
364 salvaged = []
363 replycaps = None
365 replycaps = None
364 if op.reply is not None:
366 if op.reply is not None:
365 salvaged = op.reply.salvageoutput()
367 salvaged = op.reply.salvageoutput()
366 replycaps = op.reply.capabilities
368 replycaps = op.reply.capabilities
367 exc._replycaps = replycaps
369 exc._replycaps = replycaps
368 exc._bundle2salvagedoutput = salvaged
370 exc._bundle2salvagedoutput = salvaged
369 raise
371 raise
370 finally:
372 finally:
371 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
373 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
372
374
373 return op
375 return op
374
376
375 def _processpart(op, part):
377 def _processpart(op, part):
376 """process a single part from a bundle
378 """process a single part from a bundle
377
379
378 The part is guaranteed to have been fully consumed when the function exits
380 The part is guaranteed to have been fully consumed when the function exits
379 (even if an exception is raised)."""
381 (even if an exception is raised)."""
380 status = 'unknown' # used by debug output
382 status = 'unknown' # used by debug output
381 try:
383 try:
382 try:
384 try:
383 handler = parthandlermapping.get(part.type)
385 handler = parthandlermapping.get(part.type)
384 if handler is None:
386 if handler is None:
385 status = 'unsupported-type'
387 status = 'unsupported-type'
386 raise error.BundleUnknownFeatureError(parttype=part.type)
388 raise error.BundleUnknownFeatureError(parttype=part.type)
387 indebug(op.ui, 'found a handler for part %r' % part.type)
389 indebug(op.ui, 'found a handler for part %r' % part.type)
388 unknownparams = part.mandatorykeys - handler.params
390 unknownparams = part.mandatorykeys - handler.params
389 if unknownparams:
391 if unknownparams:
390 unknownparams = list(unknownparams)
392 unknownparams = list(unknownparams)
391 unknownparams.sort()
393 unknownparams.sort()
392 status = 'unsupported-params (%s)' % unknownparams
394 status = 'unsupported-params (%s)' % unknownparams
393 raise error.BundleUnknownFeatureError(parttype=part.type,
395 raise error.BundleUnknownFeatureError(parttype=part.type,
394 params=unknownparams)
396 params=unknownparams)
395 status = 'supported'
397 status = 'supported'
396 except error.BundleUnknownFeatureError as exc:
398 except error.BundleUnknownFeatureError as exc:
397 if part.mandatory: # mandatory parts
399 if part.mandatory: # mandatory parts
398 raise
400 raise
399 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
401 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
400 return # skip to part processing
402 return # skip to part processing
401 finally:
403 finally:
402 if op.ui.debugflag:
404 if op.ui.debugflag:
403 msg = ['bundle2-input-part: "%s"' % part.type]
405 msg = ['bundle2-input-part: "%s"' % part.type]
404 if not part.mandatory:
406 if not part.mandatory:
405 msg.append(' (advisory)')
407 msg.append(' (advisory)')
406 nbmp = len(part.mandatorykeys)
408 nbmp = len(part.mandatorykeys)
407 nbap = len(part.params) - nbmp
409 nbap = len(part.params) - nbmp
408 if nbmp or nbap:
410 if nbmp or nbap:
409 msg.append(' (params:')
411 msg.append(' (params:')
410 if nbmp:
412 if nbmp:
411 msg.append(' %i mandatory' % nbmp)
413 msg.append(' %i mandatory' % nbmp)
412 if nbap:
414 if nbap:
413 msg.append(' %i advisory' % nbmp)
415 msg.append(' %i advisory' % nbmp)
414 msg.append(')')
416 msg.append(')')
415 msg.append(' %s\n' % status)
417 msg.append(' %s\n' % status)
416 op.ui.debug(''.join(msg))
418 op.ui.debug(''.join(msg))
417
419
418 # handler is called outside the above try block so that we don't
420 # handler is called outside the above try block so that we don't
419 # risk catching KeyErrors from anything other than the
421 # risk catching KeyErrors from anything other than the
420 # parthandlermapping lookup (any KeyError raised by handler()
422 # parthandlermapping lookup (any KeyError raised by handler()
421 # itself represents a defect of a different variety).
423 # itself represents a defect of a different variety).
422 output = None
424 output = None
423 if op.captureoutput and op.reply is not None:
425 if op.captureoutput and op.reply is not None:
424 op.ui.pushbuffer(error=True, subproc=True)
426 op.ui.pushbuffer(error=True, subproc=True)
425 output = ''
427 output = ''
426 try:
428 try:
427 handler(op, part)
429 handler(op, part)
428 finally:
430 finally:
429 if output is not None:
431 if output is not None:
430 output = op.ui.popbuffer()
432 output = op.ui.popbuffer()
431 if output:
433 if output:
432 outpart = op.reply.newpart('output', data=output,
434 outpart = op.reply.newpart('output', data=output,
433 mandatory=False)
435 mandatory=False)
434 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
436 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
435 finally:
437 finally:
436 # consume the part content to not corrupt the stream.
438 # consume the part content to not corrupt the stream.
437 part.seek(0, 2)
439 part.seek(0, 2)
438
440
439
441
440 def decodecaps(blob):
442 def decodecaps(blob):
441 """decode a bundle2 caps bytes blob into a dictionary
443 """decode a bundle2 caps bytes blob into a dictionary
442
444
443 The blob is a list of capabilities (one per line)
445 The blob is a list of capabilities (one per line)
444 Capabilities may have values using a line of the form::
446 Capabilities may have values using a line of the form::
445
447
446 capability=value1,value2,value3
448 capability=value1,value2,value3
447
449
448 The values are always a list."""
450 The values are always a list."""
449 caps = {}
451 caps = {}
450 for line in blob.splitlines():
452 for line in blob.splitlines():
451 if not line:
453 if not line:
452 continue
454 continue
453 if '=' not in line:
455 if '=' not in line:
454 key, vals = line, ()
456 key, vals = line, ()
455 else:
457 else:
456 key, vals = line.split('=', 1)
458 key, vals = line.split('=', 1)
457 vals = vals.split(',')
459 vals = vals.split(',')
458 key = urllib.unquote(key)
460 key = urllib.unquote(key)
459 vals = [urllib.unquote(v) for v in vals]
461 vals = [urllib.unquote(v) for v in vals]
460 caps[key] = vals
462 caps[key] = vals
461 return caps
463 return caps
462
464
463 def encodecaps(caps):
465 def encodecaps(caps):
464 """encode a bundle2 caps dictionary into a bytes blob"""
466 """encode a bundle2 caps dictionary into a bytes blob"""
465 chunks = []
467 chunks = []
466 for ca in sorted(caps):
468 for ca in sorted(caps):
467 vals = caps[ca]
469 vals = caps[ca]
468 ca = urllib.quote(ca)
470 ca = urllib.quote(ca)
469 vals = [urllib.quote(v) for v in vals]
471 vals = [urllib.quote(v) for v in vals]
470 if vals:
472 if vals:
471 ca = "%s=%s" % (ca, ','.join(vals))
473 ca = "%s=%s" % (ca, ','.join(vals))
472 chunks.append(ca)
474 chunks.append(ca)
473 return '\n'.join(chunks)
475 return '\n'.join(chunks)
474
476
475 class bundle20(object):
477 class bundle20(object):
476 """represent an outgoing bundle2 container
478 """represent an outgoing bundle2 container
477
479
478 Use the `addparam` method to add stream level parameter. and `newpart` to
480 Use the `addparam` method to add stream level parameter. and `newpart` to
479 populate it. Then call `getchunks` to retrieve all the binary chunks of
481 populate it. Then call `getchunks` to retrieve all the binary chunks of
480 data that compose the bundle2 container."""
482 data that compose the bundle2 container."""
481
483
482 _magicstring = 'HG20'
484 _magicstring = 'HG20'
483
485
484 def __init__(self, ui, capabilities=()):
486 def __init__(self, ui, capabilities=()):
485 self.ui = ui
487 self.ui = ui
486 self._params = []
488 self._params = []
487 self._parts = []
489 self._parts = []
488 self.capabilities = dict(capabilities)
490 self.capabilities = dict(capabilities)
489 self._compressor = util.compressors[None]()
491 self._compressor = util.compressors[None]()
490
492
491 def setcompression(self, alg):
493 def setcompression(self, alg):
492 """setup core part compression to <alg>"""
494 """setup core part compression to <alg>"""
493 if alg is None:
495 if alg is None:
494 return
496 return
495 assert not any(n.lower() == 'Compression' for n, v in self._params)
497 assert not any(n.lower() == 'Compression' for n, v in self._params)
496 self.addparam('Compression', alg)
498 self.addparam('Compression', alg)
497 self._compressor = util.compressors[alg]()
499 self._compressor = util.compressors[alg]()
498
500
499 @property
501 @property
500 def nbparts(self):
502 def nbparts(self):
501 """total number of parts added to the bundler"""
503 """total number of parts added to the bundler"""
502 return len(self._parts)
504 return len(self._parts)
503
505
504 # methods used to defines the bundle2 content
506 # methods used to defines the bundle2 content
505 def addparam(self, name, value=None):
507 def addparam(self, name, value=None):
506 """add a stream level parameter"""
508 """add a stream level parameter"""
507 if not name:
509 if not name:
508 raise ValueError('empty parameter name')
510 raise ValueError('empty parameter name')
509 if name[0] not in string.letters:
511 if name[0] not in string.letters:
510 raise ValueError('non letter first character: %r' % name)
512 raise ValueError('non letter first character: %r' % name)
511 self._params.append((name, value))
513 self._params.append((name, value))
512
514
513 def addpart(self, part):
515 def addpart(self, part):
514 """add a new part to the bundle2 container
516 """add a new part to the bundle2 container
515
517
516 Parts contains the actual applicative payload."""
518 Parts contains the actual applicative payload."""
517 assert part.id is None
519 assert part.id is None
518 part.id = len(self._parts) # very cheap counter
520 part.id = len(self._parts) # very cheap counter
519 self._parts.append(part)
521 self._parts.append(part)
520
522
521 def newpart(self, typeid, *args, **kwargs):
523 def newpart(self, typeid, *args, **kwargs):
522 """create a new part and add it to the containers
524 """create a new part and add it to the containers
523
525
524 As the part is directly added to the containers. For now, this means
526 As the part is directly added to the containers. For now, this means
525 that any failure to properly initialize the part after calling
527 that any failure to properly initialize the part after calling
526 ``newpart`` should result in a failure of the whole bundling process.
528 ``newpart`` should result in a failure of the whole bundling process.
527
529
528 You can still fall back to manually create and add if you need better
530 You can still fall back to manually create and add if you need better
529 control."""
531 control."""
530 part = bundlepart(typeid, *args, **kwargs)
532 part = bundlepart(typeid, *args, **kwargs)
531 self.addpart(part)
533 self.addpart(part)
532 return part
534 return part
533
535
534 # methods used to generate the bundle2 stream
536 # methods used to generate the bundle2 stream
535 def getchunks(self):
537 def getchunks(self):
536 if self.ui.debugflag:
538 if self.ui.debugflag:
537 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
539 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
538 if self._params:
540 if self._params:
539 msg.append(' (%i params)' % len(self._params))
541 msg.append(' (%i params)' % len(self._params))
540 msg.append(' %i parts total\n' % len(self._parts))
542 msg.append(' %i parts total\n' % len(self._parts))
541 self.ui.debug(''.join(msg))
543 self.ui.debug(''.join(msg))
542 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
544 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
543 yield self._magicstring
545 yield self._magicstring
544 param = self._paramchunk()
546 param = self._paramchunk()
545 outdebug(self.ui, 'bundle parameter: %s' % param)
547 outdebug(self.ui, 'bundle parameter: %s' % param)
546 yield _pack(_fstreamparamsize, len(param))
548 yield _pack(_fstreamparamsize, len(param))
547 if param:
549 if param:
548 yield param
550 yield param
549 # starting compression
551 # starting compression
550 for chunk in self._getcorechunk():
552 for chunk in self._getcorechunk():
551 yield self._compressor.compress(chunk)
553 yield self._compressor.compress(chunk)
552 yield self._compressor.flush()
554 yield self._compressor.flush()
553
555
554 def _paramchunk(self):
556 def _paramchunk(self):
555 """return a encoded version of all stream parameters"""
557 """return a encoded version of all stream parameters"""
556 blocks = []
558 blocks = []
557 for par, value in self._params:
559 for par, value in self._params:
558 par = urllib.quote(par)
560 par = urllib.quote(par)
559 if value is not None:
561 if value is not None:
560 value = urllib.quote(value)
562 value = urllib.quote(value)
561 par = '%s=%s' % (par, value)
563 par = '%s=%s' % (par, value)
562 blocks.append(par)
564 blocks.append(par)
563 return ' '.join(blocks)
565 return ' '.join(blocks)
564
566
565 def _getcorechunk(self):
567 def _getcorechunk(self):
566 """yield chunk for the core part of the bundle
568 """yield chunk for the core part of the bundle
567
569
568 (all but headers and parameters)"""
570 (all but headers and parameters)"""
569 outdebug(self.ui, 'start of parts')
571 outdebug(self.ui, 'start of parts')
570 for part in self._parts:
572 for part in self._parts:
571 outdebug(self.ui, 'bundle part: "%s"' % part.type)
573 outdebug(self.ui, 'bundle part: "%s"' % part.type)
572 for chunk in part.getchunks(ui=self.ui):
574 for chunk in part.getchunks(ui=self.ui):
573 yield chunk
575 yield chunk
574 outdebug(self.ui, 'end of bundle')
576 outdebug(self.ui, 'end of bundle')
575 yield _pack(_fpartheadersize, 0)
577 yield _pack(_fpartheadersize, 0)
576
578
577
579
578 def salvageoutput(self):
580 def salvageoutput(self):
579 """return a list with a copy of all output parts in the bundle
581 """return a list with a copy of all output parts in the bundle
580
582
581 This is meant to be used during error handling to make sure we preserve
583 This is meant to be used during error handling to make sure we preserve
582 server output"""
584 server output"""
583 salvaged = []
585 salvaged = []
584 for part in self._parts:
586 for part in self._parts:
585 if part.type.startswith('output'):
587 if part.type.startswith('output'):
586 salvaged.append(part.copy())
588 salvaged.append(part.copy())
587 return salvaged
589 return salvaged
588
590
589
591
590 class unpackermixin(object):
592 class unpackermixin(object):
591 """A mixin to extract bytes and struct data from a stream"""
593 """A mixin to extract bytes and struct data from a stream"""
592
594
593 def __init__(self, fp):
595 def __init__(self, fp):
594 self._fp = fp
596 self._fp = fp
595 self._seekable = (util.safehasattr(fp, 'seek') and
597 self._seekable = (util.safehasattr(fp, 'seek') and
596 util.safehasattr(fp, 'tell'))
598 util.safehasattr(fp, 'tell'))
597
599
598 def _unpack(self, format):
600 def _unpack(self, format):
599 """unpack this struct format from the stream"""
601 """unpack this struct format from the stream"""
600 data = self._readexact(struct.calcsize(format))
602 data = self._readexact(struct.calcsize(format))
601 return _unpack(format, data)
603 return _unpack(format, data)
602
604
603 def _readexact(self, size):
605 def _readexact(self, size):
604 """read exactly <size> bytes from the stream"""
606 """read exactly <size> bytes from the stream"""
605 return changegroup.readexactly(self._fp, size)
607 return changegroup.readexactly(self._fp, size)
606
608
607 def seek(self, offset, whence=0):
609 def seek(self, offset, whence=0):
608 """move the underlying file pointer"""
610 """move the underlying file pointer"""
609 if self._seekable:
611 if self._seekable:
610 return self._fp.seek(offset, whence)
612 return self._fp.seek(offset, whence)
611 else:
613 else:
612 raise NotImplementedError(_('File pointer is not seekable'))
614 raise NotImplementedError(_('File pointer is not seekable'))
613
615
614 def tell(self):
616 def tell(self):
615 """return the file offset, or None if file is not seekable"""
617 """return the file offset, or None if file is not seekable"""
616 if self._seekable:
618 if self._seekable:
617 try:
619 try:
618 return self._fp.tell()
620 return self._fp.tell()
619 except IOError as e:
621 except IOError as e:
620 if e.errno == errno.ESPIPE:
622 if e.errno == errno.ESPIPE:
621 self._seekable = False
623 self._seekable = False
622 else:
624 else:
623 raise
625 raise
624 return None
626 return None
625
627
626 def close(self):
628 def close(self):
627 """close underlying file"""
629 """close underlying file"""
628 if util.safehasattr(self._fp, 'close'):
630 if util.safehasattr(self._fp, 'close'):
629 return self._fp.close()
631 return self._fp.close()
630
632
631 def getunbundler(ui, fp, magicstring=None):
633 def getunbundler(ui, fp, magicstring=None):
632 """return a valid unbundler object for a given magicstring"""
634 """return a valid unbundler object for a given magicstring"""
633 if magicstring is None:
635 if magicstring is None:
634 magicstring = changegroup.readexactly(fp, 4)
636 magicstring = changegroup.readexactly(fp, 4)
635 magic, version = magicstring[0:2], magicstring[2:4]
637 magic, version = magicstring[0:2], magicstring[2:4]
636 if magic != 'HG':
638 if magic != 'HG':
637 raise error.Abort(_('not a Mercurial bundle'))
639 raise error.Abort(_('not a Mercurial bundle'))
638 unbundlerclass = formatmap.get(version)
640 unbundlerclass = formatmap.get(version)
639 if unbundlerclass is None:
641 if unbundlerclass is None:
640 raise error.Abort(_('unknown bundle version %s') % version)
642 raise error.Abort(_('unknown bundle version %s') % version)
641 unbundler = unbundlerclass(ui, fp)
643 unbundler = unbundlerclass(ui, fp)
642 indebug(ui, 'start processing of %s stream' % magicstring)
644 indebug(ui, 'start processing of %s stream' % magicstring)
643 return unbundler
645 return unbundler
644
646
645 class unbundle20(unpackermixin):
647 class unbundle20(unpackermixin):
646 """interpret a bundle2 stream
648 """interpret a bundle2 stream
647
649
648 This class is fed with a binary stream and yields parts through its
650 This class is fed with a binary stream and yields parts through its
649 `iterparts` methods."""
651 `iterparts` methods."""
650
652
651 _magicstring = 'HG20'
653 _magicstring = 'HG20'
652
654
653 def __init__(self, ui, fp):
655 def __init__(self, ui, fp):
654 """If header is specified, we do not read it out of the stream."""
656 """If header is specified, we do not read it out of the stream."""
655 self.ui = ui
657 self.ui = ui
656 self._decompressor = util.decompressors[None]
658 self._decompressor = util.decompressors[None]
657 super(unbundle20, self).__init__(fp)
659 super(unbundle20, self).__init__(fp)
658
660
659 @util.propertycache
661 @util.propertycache
660 def params(self):
662 def params(self):
661 """dictionary of stream level parameters"""
663 """dictionary of stream level parameters"""
662 indebug(self.ui, 'reading bundle2 stream parameters')
664 indebug(self.ui, 'reading bundle2 stream parameters')
663 params = {}
665 params = {}
664 paramssize = self._unpack(_fstreamparamsize)[0]
666 paramssize = self._unpack(_fstreamparamsize)[0]
665 if paramssize < 0:
667 if paramssize < 0:
666 raise error.BundleValueError('negative bundle param size: %i'
668 raise error.BundleValueError('negative bundle param size: %i'
667 % paramssize)
669 % paramssize)
668 if paramssize:
670 if paramssize:
669 params = self._readexact(paramssize)
671 params = self._readexact(paramssize)
670 params = self._processallparams(params)
672 params = self._processallparams(params)
671 return params
673 return params
672
674
673 def _processallparams(self, paramsblock):
675 def _processallparams(self, paramsblock):
674 """"""
676 """"""
675 params = {}
677 params = {}
676 for p in paramsblock.split(' '):
678 for p in paramsblock.split(' '):
677 p = p.split('=', 1)
679 p = p.split('=', 1)
678 p = [urllib.unquote(i) for i in p]
680 p = [urllib.unquote(i) for i in p]
679 if len(p) < 2:
681 if len(p) < 2:
680 p.append(None)
682 p.append(None)
681 self._processparam(*p)
683 self._processparam(*p)
682 params[p[0]] = p[1]
684 params[p[0]] = p[1]
683 return params
685 return params
684
686
685
687
686 def _processparam(self, name, value):
688 def _processparam(self, name, value):
687 """process a parameter, applying its effect if needed
689 """process a parameter, applying its effect if needed
688
690
689 Parameter starting with a lower case letter are advisory and will be
691 Parameter starting with a lower case letter are advisory and will be
690 ignored when unknown. Those starting with an upper case letter are
692 ignored when unknown. Those starting with an upper case letter are
691 mandatory and will this function will raise a KeyError when unknown.
693 mandatory and will this function will raise a KeyError when unknown.
692
694
693 Note: no option are currently supported. Any input will be either
695 Note: no option are currently supported. Any input will be either
694 ignored or failing.
696 ignored or failing.
695 """
697 """
696 if not name:
698 if not name:
697 raise ValueError('empty parameter name')
699 raise ValueError('empty parameter name')
698 if name[0] not in string.letters:
700 if name[0] not in string.letters:
699 raise ValueError('non letter first character: %r' % name)
701 raise ValueError('non letter first character: %r' % name)
700 try:
702 try:
701 handler = b2streamparamsmap[name.lower()]
703 handler = b2streamparamsmap[name.lower()]
702 except KeyError:
704 except KeyError:
703 if name[0].islower():
705 if name[0].islower():
704 indebug(self.ui, "ignoring unknown parameter %r" % name)
706 indebug(self.ui, "ignoring unknown parameter %r" % name)
705 else:
707 else:
706 raise error.BundleUnknownFeatureError(params=(name,))
708 raise error.BundleUnknownFeatureError(params=(name,))
707 else:
709 else:
708 handler(self, name, value)
710 handler(self, name, value)
709
711
710 def _forwardchunks(self):
712 def _forwardchunks(self):
711 """utility to transfer a bundle2 as binary
713 """utility to transfer a bundle2 as binary
712
714
713 This is made necessary by the fact the 'getbundle' command over 'ssh'
715 This is made necessary by the fact the 'getbundle' command over 'ssh'
714 have no way to know then the reply end, relying on the bundle to be
716 have no way to know then the reply end, relying on the bundle to be
715 interpreted to know its end. This is terrible and we are sorry, but we
717 interpreted to know its end. This is terrible and we are sorry, but we
716 needed to move forward to get general delta enabled.
718 needed to move forward to get general delta enabled.
717 """
719 """
718 yield self._magicstring
720 yield self._magicstring
719 assert 'params' not in vars(self)
721 assert 'params' not in vars(self)
720 paramssize = self._unpack(_fstreamparamsize)[0]
722 paramssize = self._unpack(_fstreamparamsize)[0]
721 if paramssize < 0:
723 if paramssize < 0:
722 raise error.BundleValueError('negative bundle param size: %i'
724 raise error.BundleValueError('negative bundle param size: %i'
723 % paramssize)
725 % paramssize)
724 yield _pack(_fstreamparamsize, paramssize)
726 yield _pack(_fstreamparamsize, paramssize)
725 if paramssize:
727 if paramssize:
726 params = self._readexact(paramssize)
728 params = self._readexact(paramssize)
727 self._processallparams(params)
729 self._processallparams(params)
728 yield params
730 yield params
729 assert self._decompressor is util.decompressors[None]
731 assert self._decompressor is util.decompressors[None]
730 # From there, payload might need to be decompressed
732 # From there, payload might need to be decompressed
731 self._fp = self._decompressor(self._fp)
733 self._fp = self._decompressor(self._fp)
732 emptycount = 0
734 emptycount = 0
733 while emptycount < 2:
735 while emptycount < 2:
734 # so we can brainlessly loop
736 # so we can brainlessly loop
735 assert _fpartheadersize == _fpayloadsize
737 assert _fpartheadersize == _fpayloadsize
736 size = self._unpack(_fpartheadersize)[0]
738 size = self._unpack(_fpartheadersize)[0]
737 yield _pack(_fpartheadersize, size)
739 yield _pack(_fpartheadersize, size)
738 if size:
740 if size:
739 emptycount = 0
741 emptycount = 0
740 else:
742 else:
741 emptycount += 1
743 emptycount += 1
742 continue
744 continue
743 if size == flaginterrupt:
745 if size == flaginterrupt:
744 continue
746 continue
745 elif size < 0:
747 elif size < 0:
746 raise error.BundleValueError('negative chunk size: %i')
748 raise error.BundleValueError('negative chunk size: %i')
747 yield self._readexact(size)
749 yield self._readexact(size)
748
750
749
751
750 def iterparts(self):
752 def iterparts(self):
751 """yield all parts contained in the stream"""
753 """yield all parts contained in the stream"""
752 # make sure param have been loaded
754 # make sure param have been loaded
753 self.params
755 self.params
754 # From there, payload need to be decompressed
756 # From there, payload need to be decompressed
755 self._fp = self._decompressor(self._fp)
757 self._fp = self._decompressor(self._fp)
756 indebug(self.ui, 'start extraction of bundle2 parts')
758 indebug(self.ui, 'start extraction of bundle2 parts')
757 headerblock = self._readpartheader()
759 headerblock = self._readpartheader()
758 while headerblock is not None:
760 while headerblock is not None:
759 part = unbundlepart(self.ui, headerblock, self._fp)
761 part = unbundlepart(self.ui, headerblock, self._fp)
760 yield part
762 yield part
761 part.seek(0, 2)
763 part.seek(0, 2)
762 headerblock = self._readpartheader()
764 headerblock = self._readpartheader()
763 indebug(self.ui, 'end of bundle2 stream')
765 indebug(self.ui, 'end of bundle2 stream')
764
766
765 def _readpartheader(self):
767 def _readpartheader(self):
766 """reads a part header size and return the bytes blob
768 """reads a part header size and return the bytes blob
767
769
768 returns None if empty"""
770 returns None if empty"""
769 headersize = self._unpack(_fpartheadersize)[0]
771 headersize = self._unpack(_fpartheadersize)[0]
770 if headersize < 0:
772 if headersize < 0:
771 raise error.BundleValueError('negative part header size: %i'
773 raise error.BundleValueError('negative part header size: %i'
772 % headersize)
774 % headersize)
773 indebug(self.ui, 'part header size: %i' % headersize)
775 indebug(self.ui, 'part header size: %i' % headersize)
774 if headersize:
776 if headersize:
775 return self._readexact(headersize)
777 return self._readexact(headersize)
776 return None
778 return None
777
779
778 def compressed(self):
780 def compressed(self):
779 return False
781 return False
780
782
781 formatmap = {'20': unbundle20}
783 formatmap = {'20': unbundle20}
782
784
783 b2streamparamsmap = {}
785 b2streamparamsmap = {}
784
786
785 def b2streamparamhandler(name):
787 def b2streamparamhandler(name):
786 """register a handler for a stream level parameter"""
788 """register a handler for a stream level parameter"""
787 def decorator(func):
789 def decorator(func):
788 assert name not in formatmap
790 assert name not in formatmap
789 b2streamparamsmap[name] = func
791 b2streamparamsmap[name] = func
790 return func
792 return func
791 return decorator
793 return decorator
792
794
793 @b2streamparamhandler('compression')
795 @b2streamparamhandler('compression')
794 def processcompression(unbundler, param, value):
796 def processcompression(unbundler, param, value):
795 """read compression parameter and install payload decompression"""
797 """read compression parameter and install payload decompression"""
796 if value not in util.decompressors:
798 if value not in util.decompressors:
797 raise error.BundleUnknownFeatureError(params=(param,),
799 raise error.BundleUnknownFeatureError(params=(param,),
798 values=(value,))
800 values=(value,))
799 unbundler._decompressor = util.decompressors[value]
801 unbundler._decompressor = util.decompressors[value]
800
802
801 class bundlepart(object):
803 class bundlepart(object):
802 """A bundle2 part contains application level payload
804 """A bundle2 part contains application level payload
803
805
804 The part `type` is used to route the part to the application level
806 The part `type` is used to route the part to the application level
805 handler.
807 handler.
806
808
807 The part payload is contained in ``part.data``. It could be raw bytes or a
809 The part payload is contained in ``part.data``. It could be raw bytes or a
808 generator of byte chunks.
810 generator of byte chunks.
809
811
810 You can add parameters to the part using the ``addparam`` method.
812 You can add parameters to the part using the ``addparam`` method.
811 Parameters can be either mandatory (default) or advisory. Remote side
813 Parameters can be either mandatory (default) or advisory. Remote side
812 should be able to safely ignore the advisory ones.
814 should be able to safely ignore the advisory ones.
813
815
814 Both data and parameters cannot be modified after the generation has begun.
816 Both data and parameters cannot be modified after the generation has begun.
815 """
817 """
816
818
817 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
819 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
818 data='', mandatory=True):
820 data='', mandatory=True):
819 validateparttype(parttype)
821 validateparttype(parttype)
820 self.id = None
822 self.id = None
821 self.type = parttype
823 self.type = parttype
822 self._data = data
824 self._data = data
823 self._mandatoryparams = list(mandatoryparams)
825 self._mandatoryparams = list(mandatoryparams)
824 self._advisoryparams = list(advisoryparams)
826 self._advisoryparams = list(advisoryparams)
825 # checking for duplicated entries
827 # checking for duplicated entries
826 self._seenparams = set()
828 self._seenparams = set()
827 for pname, __ in self._mandatoryparams + self._advisoryparams:
829 for pname, __ in self._mandatoryparams + self._advisoryparams:
828 if pname in self._seenparams:
830 if pname in self._seenparams:
829 raise RuntimeError('duplicated params: %s' % pname)
831 raise RuntimeError('duplicated params: %s' % pname)
830 self._seenparams.add(pname)
832 self._seenparams.add(pname)
831 # status of the part's generation:
833 # status of the part's generation:
832 # - None: not started,
834 # - None: not started,
833 # - False: currently generated,
835 # - False: currently generated,
834 # - True: generation done.
836 # - True: generation done.
835 self._generated = None
837 self._generated = None
836 self.mandatory = mandatory
838 self.mandatory = mandatory
837
839
838 def copy(self):
840 def copy(self):
839 """return a copy of the part
841 """return a copy of the part
840
842
841 The new part have the very same content but no partid assigned yet.
843 The new part have the very same content but no partid assigned yet.
842 Parts with generated data cannot be copied."""
844 Parts with generated data cannot be copied."""
843 assert not util.safehasattr(self.data, 'next')
845 assert not util.safehasattr(self.data, 'next')
844 return self.__class__(self.type, self._mandatoryparams,
846 return self.__class__(self.type, self._mandatoryparams,
845 self._advisoryparams, self._data, self.mandatory)
847 self._advisoryparams, self._data, self.mandatory)
846
848
847 # methods used to defines the part content
849 # methods used to defines the part content
848 def __setdata(self, data):
850 def __setdata(self, data):
849 if self._generated is not None:
851 if self._generated is not None:
850 raise error.ReadOnlyPartError('part is being generated')
852 raise error.ReadOnlyPartError('part is being generated')
851 self._data = data
853 self._data = data
852 def __getdata(self):
854 def __getdata(self):
853 return self._data
855 return self._data
854 data = property(__getdata, __setdata)
856 data = property(__getdata, __setdata)
855
857
856 @property
858 @property
857 def mandatoryparams(self):
859 def mandatoryparams(self):
858 # make it an immutable tuple to force people through ``addparam``
860 # make it an immutable tuple to force people through ``addparam``
859 return tuple(self._mandatoryparams)
861 return tuple(self._mandatoryparams)
860
862
861 @property
863 @property
862 def advisoryparams(self):
864 def advisoryparams(self):
863 # make it an immutable tuple to force people through ``addparam``
865 # make it an immutable tuple to force people through ``addparam``
864 return tuple(self._advisoryparams)
866 return tuple(self._advisoryparams)
865
867
866 def addparam(self, name, value='', mandatory=True):
868 def addparam(self, name, value='', mandatory=True):
867 if self._generated is not None:
869 if self._generated is not None:
868 raise error.ReadOnlyPartError('part is being generated')
870 raise error.ReadOnlyPartError('part is being generated')
869 if name in self._seenparams:
871 if name in self._seenparams:
870 raise ValueError('duplicated params: %s' % name)
872 raise ValueError('duplicated params: %s' % name)
871 self._seenparams.add(name)
873 self._seenparams.add(name)
872 params = self._advisoryparams
874 params = self._advisoryparams
873 if mandatory:
875 if mandatory:
874 params = self._mandatoryparams
876 params = self._mandatoryparams
875 params.append((name, value))
877 params.append((name, value))
876
878
877 # methods used to generates the bundle2 stream
879 # methods used to generates the bundle2 stream
878 def getchunks(self, ui):
880 def getchunks(self, ui):
879 if self._generated is not None:
881 if self._generated is not None:
880 raise RuntimeError('part can only be consumed once')
882 raise RuntimeError('part can only be consumed once')
881 self._generated = False
883 self._generated = False
882
884
883 if ui.debugflag:
885 if ui.debugflag:
884 msg = ['bundle2-output-part: "%s"' % self.type]
886 msg = ['bundle2-output-part: "%s"' % self.type]
885 if not self.mandatory:
887 if not self.mandatory:
886 msg.append(' (advisory)')
888 msg.append(' (advisory)')
887 nbmp = len(self.mandatoryparams)
889 nbmp = len(self.mandatoryparams)
888 nbap = len(self.advisoryparams)
890 nbap = len(self.advisoryparams)
889 if nbmp or nbap:
891 if nbmp or nbap:
890 msg.append(' (params:')
892 msg.append(' (params:')
891 if nbmp:
893 if nbmp:
892 msg.append(' %i mandatory' % nbmp)
894 msg.append(' %i mandatory' % nbmp)
893 if nbap:
895 if nbap:
894 msg.append(' %i advisory' % nbmp)
896 msg.append(' %i advisory' % nbmp)
895 msg.append(')')
897 msg.append(')')
896 if not self.data:
898 if not self.data:
897 msg.append(' empty payload')
899 msg.append(' empty payload')
898 elif util.safehasattr(self.data, 'next'):
900 elif util.safehasattr(self.data, 'next'):
899 msg.append(' streamed payload')
901 msg.append(' streamed payload')
900 else:
902 else:
901 msg.append(' %i bytes payload' % len(self.data))
903 msg.append(' %i bytes payload' % len(self.data))
902 msg.append('\n')
904 msg.append('\n')
903 ui.debug(''.join(msg))
905 ui.debug(''.join(msg))
904
906
905 #### header
907 #### header
906 if self.mandatory:
908 if self.mandatory:
907 parttype = self.type.upper()
909 parttype = self.type.upper()
908 else:
910 else:
909 parttype = self.type.lower()
911 parttype = self.type.lower()
910 outdebug(ui, 'part %s: "%s"' % (self.id, parttype))
912 outdebug(ui, 'part %s: "%s"' % (self.id, parttype))
911 ## parttype
913 ## parttype
912 header = [_pack(_fparttypesize, len(parttype)),
914 header = [_pack(_fparttypesize, len(parttype)),
913 parttype, _pack(_fpartid, self.id),
915 parttype, _pack(_fpartid, self.id),
914 ]
916 ]
915 ## parameters
917 ## parameters
916 # count
918 # count
917 manpar = self.mandatoryparams
919 manpar = self.mandatoryparams
918 advpar = self.advisoryparams
920 advpar = self.advisoryparams
919 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
921 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
920 # size
922 # size
921 parsizes = []
923 parsizes = []
922 for key, value in manpar:
924 for key, value in manpar:
923 parsizes.append(len(key))
925 parsizes.append(len(key))
924 parsizes.append(len(value))
926 parsizes.append(len(value))
925 for key, value in advpar:
927 for key, value in advpar:
926 parsizes.append(len(key))
928 parsizes.append(len(key))
927 parsizes.append(len(value))
929 parsizes.append(len(value))
928 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
930 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
929 header.append(paramsizes)
931 header.append(paramsizes)
930 # key, value
932 # key, value
931 for key, value in manpar:
933 for key, value in manpar:
932 header.append(key)
934 header.append(key)
933 header.append(value)
935 header.append(value)
934 for key, value in advpar:
936 for key, value in advpar:
935 header.append(key)
937 header.append(key)
936 header.append(value)
938 header.append(value)
937 ## finalize header
939 ## finalize header
938 headerchunk = ''.join(header)
940 headerchunk = ''.join(header)
939 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
941 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
940 yield _pack(_fpartheadersize, len(headerchunk))
942 yield _pack(_fpartheadersize, len(headerchunk))
941 yield headerchunk
943 yield headerchunk
942 ## payload
944 ## payload
943 try:
945 try:
944 for chunk in self._payloadchunks():
946 for chunk in self._payloadchunks():
945 outdebug(ui, 'payload chunk size: %i' % len(chunk))
947 outdebug(ui, 'payload chunk size: %i' % len(chunk))
946 yield _pack(_fpayloadsize, len(chunk))
948 yield _pack(_fpayloadsize, len(chunk))
947 yield chunk
949 yield chunk
948 except GeneratorExit:
950 except GeneratorExit:
949 # GeneratorExit means that nobody is listening for our
951 # GeneratorExit means that nobody is listening for our
950 # results anyway, so just bail quickly rather than trying
952 # results anyway, so just bail quickly rather than trying
951 # to produce an error part.
953 # to produce an error part.
952 ui.debug('bundle2-generatorexit\n')
954 ui.debug('bundle2-generatorexit\n')
953 raise
955 raise
954 except BaseException as exc:
956 except BaseException as exc:
955 # backup exception data for later
957 # backup exception data for later
956 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
958 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
957 % exc)
959 % exc)
958 exc_info = sys.exc_info()
960 exc_info = sys.exc_info()
959 msg = 'unexpected error: %s' % exc
961 msg = 'unexpected error: %s' % exc
960 interpart = bundlepart('error:abort', [('message', msg)],
962 interpart = bundlepart('error:abort', [('message', msg)],
961 mandatory=False)
963 mandatory=False)
962 interpart.id = 0
964 interpart.id = 0
963 yield _pack(_fpayloadsize, -1)
965 yield _pack(_fpayloadsize, -1)
964 for chunk in interpart.getchunks(ui=ui):
966 for chunk in interpart.getchunks(ui=ui):
965 yield chunk
967 yield chunk
966 outdebug(ui, 'closing payload chunk')
968 outdebug(ui, 'closing payload chunk')
967 # abort current part payload
969 # abort current part payload
968 yield _pack(_fpayloadsize, 0)
970 yield _pack(_fpayloadsize, 0)
969 raise exc_info[0], exc_info[1], exc_info[2]
971 raise exc_info[0], exc_info[1], exc_info[2]
970 # end of payload
972 # end of payload
971 outdebug(ui, 'closing payload chunk')
973 outdebug(ui, 'closing payload chunk')
972 yield _pack(_fpayloadsize, 0)
974 yield _pack(_fpayloadsize, 0)
973 self._generated = True
975 self._generated = True
974
976
975 def _payloadchunks(self):
977 def _payloadchunks(self):
976 """yield chunks of a the part payload
978 """yield chunks of a the part payload
977
979
978 Exists to handle the different methods to provide data to a part."""
980 Exists to handle the different methods to provide data to a part."""
979 # we only support fixed size data now.
981 # we only support fixed size data now.
980 # This will be improved in the future.
982 # This will be improved in the future.
981 if util.safehasattr(self.data, 'next'):
983 if util.safehasattr(self.data, 'next'):
982 buff = util.chunkbuffer(self.data)
984 buff = util.chunkbuffer(self.data)
983 chunk = buff.read(preferedchunksize)
985 chunk = buff.read(preferedchunksize)
984 while chunk:
986 while chunk:
985 yield chunk
987 yield chunk
986 chunk = buff.read(preferedchunksize)
988 chunk = buff.read(preferedchunksize)
987 elif len(self.data):
989 elif len(self.data):
988 yield self.data
990 yield self.data
989
991
990
992
991 flaginterrupt = -1
993 flaginterrupt = -1
992
994
993 class interrupthandler(unpackermixin):
995 class interrupthandler(unpackermixin):
994 """read one part and process it with restricted capability
996 """read one part and process it with restricted capability
995
997
996 This allows to transmit exception raised on the producer size during part
998 This allows to transmit exception raised on the producer size during part
997 iteration while the consumer is reading a part.
999 iteration while the consumer is reading a part.
998
1000
999 Part processed in this manner only have access to a ui object,"""
1001 Part processed in this manner only have access to a ui object,"""
1000
1002
1001 def __init__(self, ui, fp):
1003 def __init__(self, ui, fp):
1002 super(interrupthandler, self).__init__(fp)
1004 super(interrupthandler, self).__init__(fp)
1003 self.ui = ui
1005 self.ui = ui
1004
1006
1005 def _readpartheader(self):
1007 def _readpartheader(self):
1006 """reads a part header size and return the bytes blob
1008 """reads a part header size and return the bytes blob
1007
1009
1008 returns None if empty"""
1010 returns None if empty"""
1009 headersize = self._unpack(_fpartheadersize)[0]
1011 headersize = self._unpack(_fpartheadersize)[0]
1010 if headersize < 0:
1012 if headersize < 0:
1011 raise error.BundleValueError('negative part header size: %i'
1013 raise error.BundleValueError('negative part header size: %i'
1012 % headersize)
1014 % headersize)
1013 indebug(self.ui, 'part header size: %i\n' % headersize)
1015 indebug(self.ui, 'part header size: %i\n' % headersize)
1014 if headersize:
1016 if headersize:
1015 return self._readexact(headersize)
1017 return self._readexact(headersize)
1016 return None
1018 return None
1017
1019
1018 def __call__(self):
1020 def __call__(self):
1019
1021
1020 self.ui.debug('bundle2-input-stream-interrupt:'
1022 self.ui.debug('bundle2-input-stream-interrupt:'
1021 ' opening out of band context\n')
1023 ' opening out of band context\n')
1022 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1024 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1023 headerblock = self._readpartheader()
1025 headerblock = self._readpartheader()
1024 if headerblock is None:
1026 if headerblock is None:
1025 indebug(self.ui, 'no part found during interruption.')
1027 indebug(self.ui, 'no part found during interruption.')
1026 return
1028 return
1027 part = unbundlepart(self.ui, headerblock, self._fp)
1029 part = unbundlepart(self.ui, headerblock, self._fp)
1028 op = interruptoperation(self.ui)
1030 op = interruptoperation(self.ui)
1029 _processpart(op, part)
1031 _processpart(op, part)
1030 self.ui.debug('bundle2-input-stream-interrupt:'
1032 self.ui.debug('bundle2-input-stream-interrupt:'
1031 ' closing out of band context\n')
1033 ' closing out of band context\n')
1032
1034
1033 class interruptoperation(object):
1035 class interruptoperation(object):
1034 """A limited operation to be use by part handler during interruption
1036 """A limited operation to be use by part handler during interruption
1035
1037
1036 It only have access to an ui object.
1038 It only have access to an ui object.
1037 """
1039 """
1038
1040
1039 def __init__(self, ui):
1041 def __init__(self, ui):
1040 self.ui = ui
1042 self.ui = ui
1041 self.reply = None
1043 self.reply = None
1042 self.captureoutput = False
1044 self.captureoutput = False
1043
1045
1044 @property
1046 @property
1045 def repo(self):
1047 def repo(self):
1046 raise RuntimeError('no repo access from stream interruption')
1048 raise RuntimeError('no repo access from stream interruption')
1047
1049
1048 def gettransaction(self):
1050 def gettransaction(self):
1049 raise TransactionUnavailable('no repo access from stream interruption')
1051 raise TransactionUnavailable('no repo access from stream interruption')
1050
1052
1051 class unbundlepart(unpackermixin):
1053 class unbundlepart(unpackermixin):
1052 """a bundle part read from a bundle"""
1054 """a bundle part read from a bundle"""
1053
1055
1054 def __init__(self, ui, header, fp):
1056 def __init__(self, ui, header, fp):
1055 super(unbundlepart, self).__init__(fp)
1057 super(unbundlepart, self).__init__(fp)
1056 self.ui = ui
1058 self.ui = ui
1057 # unbundle state attr
1059 # unbundle state attr
1058 self._headerdata = header
1060 self._headerdata = header
1059 self._headeroffset = 0
1061 self._headeroffset = 0
1060 self._initialized = False
1062 self._initialized = False
1061 self.consumed = False
1063 self.consumed = False
1062 # part data
1064 # part data
1063 self.id = None
1065 self.id = None
1064 self.type = None
1066 self.type = None
1065 self.mandatoryparams = None
1067 self.mandatoryparams = None
1066 self.advisoryparams = None
1068 self.advisoryparams = None
1067 self.params = None
1069 self.params = None
1068 self.mandatorykeys = ()
1070 self.mandatorykeys = ()
1069 self._payloadstream = None
1071 self._payloadstream = None
1070 self._readheader()
1072 self._readheader()
1071 self._mandatory = None
1073 self._mandatory = None
1072 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1074 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1073 self._pos = 0
1075 self._pos = 0
1074
1076
1075 def _fromheader(self, size):
1077 def _fromheader(self, size):
1076 """return the next <size> byte from the header"""
1078 """return the next <size> byte from the header"""
1077 offset = self._headeroffset
1079 offset = self._headeroffset
1078 data = self._headerdata[offset:(offset + size)]
1080 data = self._headerdata[offset:(offset + size)]
1079 self._headeroffset = offset + size
1081 self._headeroffset = offset + size
1080 return data
1082 return data
1081
1083
1082 def _unpackheader(self, format):
1084 def _unpackheader(self, format):
1083 """read given format from header
1085 """read given format from header
1084
1086
1085 This automatically compute the size of the format to read."""
1087 This automatically compute the size of the format to read."""
1086 data = self._fromheader(struct.calcsize(format))
1088 data = self._fromheader(struct.calcsize(format))
1087 return _unpack(format, data)
1089 return _unpack(format, data)
1088
1090
1089 def _initparams(self, mandatoryparams, advisoryparams):
1091 def _initparams(self, mandatoryparams, advisoryparams):
1090 """internal function to setup all logic related parameters"""
1092 """internal function to setup all logic related parameters"""
1091 # make it read only to prevent people touching it by mistake.
1093 # make it read only to prevent people touching it by mistake.
1092 self.mandatoryparams = tuple(mandatoryparams)
1094 self.mandatoryparams = tuple(mandatoryparams)
1093 self.advisoryparams = tuple(advisoryparams)
1095 self.advisoryparams = tuple(advisoryparams)
1094 # user friendly UI
1096 # user friendly UI
1095 self.params = dict(self.mandatoryparams)
1097 self.params = dict(self.mandatoryparams)
1096 self.params.update(dict(self.advisoryparams))
1098 self.params.update(dict(self.advisoryparams))
1097 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1099 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1098
1100
1099 def _payloadchunks(self, chunknum=0):
1101 def _payloadchunks(self, chunknum=0):
1100 '''seek to specified chunk and start yielding data'''
1102 '''seek to specified chunk and start yielding data'''
1101 if len(self._chunkindex) == 0:
1103 if len(self._chunkindex) == 0:
1102 assert chunknum == 0, 'Must start with chunk 0'
1104 assert chunknum == 0, 'Must start with chunk 0'
1103 self._chunkindex.append((0, super(unbundlepart, self).tell()))
1105 self._chunkindex.append((0, super(unbundlepart, self).tell()))
1104 else:
1106 else:
1105 assert chunknum < len(self._chunkindex), \
1107 assert chunknum < len(self._chunkindex), \
1106 'Unknown chunk %d' % chunknum
1108 'Unknown chunk %d' % chunknum
1107 super(unbundlepart, self).seek(self._chunkindex[chunknum][1])
1109 super(unbundlepart, self).seek(self._chunkindex[chunknum][1])
1108
1110
1109 pos = self._chunkindex[chunknum][0]
1111 pos = self._chunkindex[chunknum][0]
1110 payloadsize = self._unpack(_fpayloadsize)[0]
1112 payloadsize = self._unpack(_fpayloadsize)[0]
1111 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1113 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1112 while payloadsize:
1114 while payloadsize:
1113 if payloadsize == flaginterrupt:
1115 if payloadsize == flaginterrupt:
1114 # interruption detection, the handler will now read a
1116 # interruption detection, the handler will now read a
1115 # single part and process it.
1117 # single part and process it.
1116 interrupthandler(self.ui, self._fp)()
1118 interrupthandler(self.ui, self._fp)()
1117 elif payloadsize < 0:
1119 elif payloadsize < 0:
1118 msg = 'negative payload chunk size: %i' % payloadsize
1120 msg = 'negative payload chunk size: %i' % payloadsize
1119 raise error.BundleValueError(msg)
1121 raise error.BundleValueError(msg)
1120 else:
1122 else:
1121 result = self._readexact(payloadsize)
1123 result = self._readexact(payloadsize)
1122 chunknum += 1
1124 chunknum += 1
1123 pos += payloadsize
1125 pos += payloadsize
1124 if chunknum == len(self._chunkindex):
1126 if chunknum == len(self._chunkindex):
1125 self._chunkindex.append((pos,
1127 self._chunkindex.append((pos,
1126 super(unbundlepart, self).tell()))
1128 super(unbundlepart, self).tell()))
1127 yield result
1129 yield result
1128 payloadsize = self._unpack(_fpayloadsize)[0]
1130 payloadsize = self._unpack(_fpayloadsize)[0]
1129 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1131 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1130
1132
1131 def _findchunk(self, pos):
1133 def _findchunk(self, pos):
1132 '''for a given payload position, return a chunk number and offset'''
1134 '''for a given payload position, return a chunk number and offset'''
1133 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1135 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1134 if ppos == pos:
1136 if ppos == pos:
1135 return chunk, 0
1137 return chunk, 0
1136 elif ppos > pos:
1138 elif ppos > pos:
1137 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1139 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1138 raise ValueError('Unknown chunk')
1140 raise ValueError('Unknown chunk')
1139
1141
1140 def _readheader(self):
1142 def _readheader(self):
1141 """read the header and setup the object"""
1143 """read the header and setup the object"""
1142 typesize = self._unpackheader(_fparttypesize)[0]
1144 typesize = self._unpackheader(_fparttypesize)[0]
1143 self.type = self._fromheader(typesize)
1145 self.type = self._fromheader(typesize)
1144 indebug(self.ui, 'part type: "%s"' % self.type)
1146 indebug(self.ui, 'part type: "%s"' % self.type)
1145 self.id = self._unpackheader(_fpartid)[0]
1147 self.id = self._unpackheader(_fpartid)[0]
1146 indebug(self.ui, 'part id: "%s"' % self.id)
1148 indebug(self.ui, 'part id: "%s"' % self.id)
1147 # extract mandatory bit from type
1149 # extract mandatory bit from type
1148 self.mandatory = (self.type != self.type.lower())
1150 self.mandatory = (self.type != self.type.lower())
1149 self.type = self.type.lower()
1151 self.type = self.type.lower()
1150 ## reading parameters
1152 ## reading parameters
1151 # param count
1153 # param count
1152 mancount, advcount = self._unpackheader(_fpartparamcount)
1154 mancount, advcount = self._unpackheader(_fpartparamcount)
1153 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1155 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1154 # param size
1156 # param size
1155 fparamsizes = _makefpartparamsizes(mancount + advcount)
1157 fparamsizes = _makefpartparamsizes(mancount + advcount)
1156 paramsizes = self._unpackheader(fparamsizes)
1158 paramsizes = self._unpackheader(fparamsizes)
1157 # make it a list of couple again
1159 # make it a list of couple again
1158 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
1160 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
1159 # split mandatory from advisory
1161 # split mandatory from advisory
1160 mansizes = paramsizes[:mancount]
1162 mansizes = paramsizes[:mancount]
1161 advsizes = paramsizes[mancount:]
1163 advsizes = paramsizes[mancount:]
1162 # retrieve param value
1164 # retrieve param value
1163 manparams = []
1165 manparams = []
1164 for key, value in mansizes:
1166 for key, value in mansizes:
1165 manparams.append((self._fromheader(key), self._fromheader(value)))
1167 manparams.append((self._fromheader(key), self._fromheader(value)))
1166 advparams = []
1168 advparams = []
1167 for key, value in advsizes:
1169 for key, value in advsizes:
1168 advparams.append((self._fromheader(key), self._fromheader(value)))
1170 advparams.append((self._fromheader(key), self._fromheader(value)))
1169 self._initparams(manparams, advparams)
1171 self._initparams(manparams, advparams)
1170 ## part payload
1172 ## part payload
1171 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1173 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1172 # we read the data, tell it
1174 # we read the data, tell it
1173 self._initialized = True
1175 self._initialized = True
1174
1176
1175 def read(self, size=None):
1177 def read(self, size=None):
1176 """read payload data"""
1178 """read payload data"""
1177 if not self._initialized:
1179 if not self._initialized:
1178 self._readheader()
1180 self._readheader()
1179 if size is None:
1181 if size is None:
1180 data = self._payloadstream.read()
1182 data = self._payloadstream.read()
1181 else:
1183 else:
1182 data = self._payloadstream.read(size)
1184 data = self._payloadstream.read(size)
1183 self._pos += len(data)
1185 self._pos += len(data)
1184 if size is None or len(data) < size:
1186 if size is None or len(data) < size:
1185 if not self.consumed and self._pos:
1187 if not self.consumed and self._pos:
1186 self.ui.debug('bundle2-input-part: total payload size %i\n'
1188 self.ui.debug('bundle2-input-part: total payload size %i\n'
1187 % self._pos)
1189 % self._pos)
1188 self.consumed = True
1190 self.consumed = True
1189 return data
1191 return data
1190
1192
1191 def tell(self):
1193 def tell(self):
1192 return self._pos
1194 return self._pos
1193
1195
1194 def seek(self, offset, whence=0):
1196 def seek(self, offset, whence=0):
1195 if whence == 0:
1197 if whence == 0:
1196 newpos = offset
1198 newpos = offset
1197 elif whence == 1:
1199 elif whence == 1:
1198 newpos = self._pos + offset
1200 newpos = self._pos + offset
1199 elif whence == 2:
1201 elif whence == 2:
1200 if not self.consumed:
1202 if not self.consumed:
1201 self.read()
1203 self.read()
1202 newpos = self._chunkindex[-1][0] - offset
1204 newpos = self._chunkindex[-1][0] - offset
1203 else:
1205 else:
1204 raise ValueError('Unknown whence value: %r' % (whence,))
1206 raise ValueError('Unknown whence value: %r' % (whence,))
1205
1207
1206 if newpos > self._chunkindex[-1][0] and not self.consumed:
1208 if newpos > self._chunkindex[-1][0] and not self.consumed:
1207 self.read()
1209 self.read()
1208 if not 0 <= newpos <= self._chunkindex[-1][0]:
1210 if not 0 <= newpos <= self._chunkindex[-1][0]:
1209 raise ValueError('Offset out of range')
1211 raise ValueError('Offset out of range')
1210
1212
1211 if self._pos != newpos:
1213 if self._pos != newpos:
1212 chunk, internaloffset = self._findchunk(newpos)
1214 chunk, internaloffset = self._findchunk(newpos)
1213 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1215 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1214 adjust = self.read(internaloffset)
1216 adjust = self.read(internaloffset)
1215 if len(adjust) != internaloffset:
1217 if len(adjust) != internaloffset:
1216 raise error.Abort(_('Seek failed\n'))
1218 raise error.Abort(_('Seek failed\n'))
1217 self._pos = newpos
1219 self._pos = newpos
1218
1220
1219 # These are only the static capabilities.
1221 # These are only the static capabilities.
1220 # Check the 'getrepocaps' function for the rest.
1222 # Check the 'getrepocaps' function for the rest.
1221 capabilities = {'HG20': (),
1223 capabilities = {'HG20': (),
1222 'error': ('abort', 'unsupportedcontent', 'pushraced',
1224 'error': ('abort', 'unsupportedcontent', 'pushraced',
1223 'pushkey'),
1225 'pushkey'),
1224 'listkeys': (),
1226 'listkeys': (),
1225 'pushkey': (),
1227 'pushkey': (),
1226 'digests': tuple(sorted(util.DIGESTS.keys())),
1228 'digests': tuple(sorted(util.DIGESTS.keys())),
1227 'remote-changegroup': ('http', 'https'),
1229 'remote-changegroup': ('http', 'https'),
1228 'hgtagsfnodes': (),
1230 'hgtagsfnodes': (),
1229 }
1231 }
1230
1232
1231 def getrepocaps(repo, allowpushback=False):
1233 def getrepocaps(repo, allowpushback=False):
1232 """return the bundle2 capabilities for a given repo
1234 """return the bundle2 capabilities for a given repo
1233
1235
1234 Exists to allow extensions (like evolution) to mutate the capabilities.
1236 Exists to allow extensions (like evolution) to mutate the capabilities.
1235 """
1237 """
1236 caps = capabilities.copy()
1238 caps = capabilities.copy()
1237 caps['changegroup'] = tuple(sorted(changegroup.packermap.keys()))
1239 caps['changegroup'] = tuple(sorted(changegroup.packermap.keys()))
1238 if obsolete.isenabled(repo, obsolete.exchangeopt):
1240 if obsolete.isenabled(repo, obsolete.exchangeopt):
1239 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1241 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1240 caps['obsmarkers'] = supportedformat
1242 caps['obsmarkers'] = supportedformat
1241 if allowpushback:
1243 if allowpushback:
1242 caps['pushback'] = ()
1244 caps['pushback'] = ()
1243 return caps
1245 return caps
1244
1246
1245 def bundle2caps(remote):
1247 def bundle2caps(remote):
1246 """return the bundle capabilities of a peer as dict"""
1248 """return the bundle capabilities of a peer as dict"""
1247 raw = remote.capable('bundle2')
1249 raw = remote.capable('bundle2')
1248 if not raw and raw != '':
1250 if not raw and raw != '':
1249 return {}
1251 return {}
1250 capsblob = urllib.unquote(remote.capable('bundle2'))
1252 capsblob = urllib.unquote(remote.capable('bundle2'))
1251 return decodecaps(capsblob)
1253 return decodecaps(capsblob)
1252
1254
1253 def obsmarkersversion(caps):
1255 def obsmarkersversion(caps):
1254 """extract the list of supported obsmarkers versions from a bundle2caps dict
1256 """extract the list of supported obsmarkers versions from a bundle2caps dict
1255 """
1257 """
1256 obscaps = caps.get('obsmarkers', ())
1258 obscaps = caps.get('obsmarkers', ())
1257 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1259 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1258
1260
1259 @parthandler('changegroup', ('version', 'nbchanges'))
1261 @parthandler('changegroup', ('version', 'nbchanges'))
1260 def handlechangegroup(op, inpart):
1262 def handlechangegroup(op, inpart):
1261 """apply a changegroup part on the repo
1263 """apply a changegroup part on the repo
1262
1264
1263 This is a very early implementation that will massive rework before being
1265 This is a very early implementation that will massive rework before being
1264 inflicted to any end-user.
1266 inflicted to any end-user.
1265 """
1267 """
1266 # Make sure we trigger a transaction creation
1268 # Make sure we trigger a transaction creation
1267 #
1269 #
1268 # The addchangegroup function will get a transaction object by itself, but
1270 # The addchangegroup function will get a transaction object by itself, but
1269 # we need to make sure we trigger the creation of a transaction object used
1271 # we need to make sure we trigger the creation of a transaction object used
1270 # for the whole processing scope.
1272 # for the whole processing scope.
1271 op.gettransaction()
1273 op.gettransaction()
1272 unpackerversion = inpart.params.get('version', '01')
1274 unpackerversion = inpart.params.get('version', '01')
1273 # We should raise an appropriate exception here
1275 # We should raise an appropriate exception here
1274 unpacker = changegroup.packermap[unpackerversion][1]
1276 unpacker = changegroup.packermap[unpackerversion][1]
1275 cg = unpacker(inpart, None)
1277 cg = unpacker(inpart, None)
1276 # the source and url passed here are overwritten by the one contained in
1278 # the source and url passed here are overwritten by the one contained in
1277 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1279 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1278 nbchangesets = None
1280 nbchangesets = None
1279 if 'nbchanges' in inpart.params:
1281 if 'nbchanges' in inpart.params:
1280 nbchangesets = int(inpart.params.get('nbchanges'))
1282 nbchangesets = int(inpart.params.get('nbchanges'))
1281 ret = cg.apply(op.repo, 'bundle2', 'bundle2', expectedtotal=nbchangesets)
1283 ret = cg.apply(op.repo, 'bundle2', 'bundle2', expectedtotal=nbchangesets)
1282 op.records.add('changegroup', {'return': ret})
1284 op.records.add('changegroup', {'return': ret})
1283 if op.reply is not None:
1285 if op.reply is not None:
1284 # This is definitely not the final form of this
1286 # This is definitely not the final form of this
1285 # return. But one need to start somewhere.
1287 # return. But one need to start somewhere.
1286 part = op.reply.newpart('reply:changegroup', mandatory=False)
1288 part = op.reply.newpart('reply:changegroup', mandatory=False)
1287 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1289 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1288 part.addparam('return', '%i' % ret, mandatory=False)
1290 part.addparam('return', '%i' % ret, mandatory=False)
1289 assert not inpart.read()
1291 assert not inpart.read()
1290
1292
1291 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1293 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1292 ['digest:%s' % k for k in util.DIGESTS.keys()])
1294 ['digest:%s' % k for k in util.DIGESTS.keys()])
1293 @parthandler('remote-changegroup', _remotechangegroupparams)
1295 @parthandler('remote-changegroup', _remotechangegroupparams)
1294 def handleremotechangegroup(op, inpart):
1296 def handleremotechangegroup(op, inpart):
1295 """apply a bundle10 on the repo, given an url and validation information
1297 """apply a bundle10 on the repo, given an url and validation information
1296
1298
1297 All the information about the remote bundle to import are given as
1299 All the information about the remote bundle to import are given as
1298 parameters. The parameters include:
1300 parameters. The parameters include:
1299 - url: the url to the bundle10.
1301 - url: the url to the bundle10.
1300 - size: the bundle10 file size. It is used to validate what was
1302 - size: the bundle10 file size. It is used to validate what was
1301 retrieved by the client matches the server knowledge about the bundle.
1303 retrieved by the client matches the server knowledge about the bundle.
1302 - digests: a space separated list of the digest types provided as
1304 - digests: a space separated list of the digest types provided as
1303 parameters.
1305 parameters.
1304 - digest:<digest-type>: the hexadecimal representation of the digest with
1306 - digest:<digest-type>: the hexadecimal representation of the digest with
1305 that name. Like the size, it is used to validate what was retrieved by
1307 that name. Like the size, it is used to validate what was retrieved by
1306 the client matches what the server knows about the bundle.
1308 the client matches what the server knows about the bundle.
1307
1309
1308 When multiple digest types are given, all of them are checked.
1310 When multiple digest types are given, all of them are checked.
1309 """
1311 """
1310 try:
1312 try:
1311 raw_url = inpart.params['url']
1313 raw_url = inpart.params['url']
1312 except KeyError:
1314 except KeyError:
1313 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1315 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1314 parsed_url = util.url(raw_url)
1316 parsed_url = util.url(raw_url)
1315 if parsed_url.scheme not in capabilities['remote-changegroup']:
1317 if parsed_url.scheme not in capabilities['remote-changegroup']:
1316 raise error.Abort(_('remote-changegroup does not support %s urls') %
1318 raise error.Abort(_('remote-changegroup does not support %s urls') %
1317 parsed_url.scheme)
1319 parsed_url.scheme)
1318
1320
1319 try:
1321 try:
1320 size = int(inpart.params['size'])
1322 size = int(inpart.params['size'])
1321 except ValueError:
1323 except ValueError:
1322 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1324 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1323 % 'size')
1325 % 'size')
1324 except KeyError:
1326 except KeyError:
1325 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1327 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1326
1328
1327 digests = {}
1329 digests = {}
1328 for typ in inpart.params.get('digests', '').split():
1330 for typ in inpart.params.get('digests', '').split():
1329 param = 'digest:%s' % typ
1331 param = 'digest:%s' % typ
1330 try:
1332 try:
1331 value = inpart.params[param]
1333 value = inpart.params[param]
1332 except KeyError:
1334 except KeyError:
1333 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1335 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1334 param)
1336 param)
1335 digests[typ] = value
1337 digests[typ] = value
1336
1338
1337 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1339 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1338
1340
1339 # Make sure we trigger a transaction creation
1341 # Make sure we trigger a transaction creation
1340 #
1342 #
1341 # The addchangegroup function will get a transaction object by itself, but
1343 # The addchangegroup function will get a transaction object by itself, but
1342 # we need to make sure we trigger the creation of a transaction object used
1344 # we need to make sure we trigger the creation of a transaction object used
1343 # for the whole processing scope.
1345 # for the whole processing scope.
1344 op.gettransaction()
1346 op.gettransaction()
1345 from . import exchange
1347 from . import exchange
1346 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1348 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1347 if not isinstance(cg, changegroup.cg1unpacker):
1349 if not isinstance(cg, changegroup.cg1unpacker):
1348 raise error.Abort(_('%s: not a bundle version 1.0') %
1350 raise error.Abort(_('%s: not a bundle version 1.0') %
1349 util.hidepassword(raw_url))
1351 util.hidepassword(raw_url))
1350 ret = cg.apply(op.repo, 'bundle2', 'bundle2')
1352 ret = cg.apply(op.repo, 'bundle2', 'bundle2')
1351 op.records.add('changegroup', {'return': ret})
1353 op.records.add('changegroup', {'return': ret})
1352 if op.reply is not None:
1354 if op.reply is not None:
1353 # This is definitely not the final form of this
1355 # This is definitely not the final form of this
1354 # return. But one need to start somewhere.
1356 # return. But one need to start somewhere.
1355 part = op.reply.newpart('reply:changegroup')
1357 part = op.reply.newpart('reply:changegroup')
1356 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1358 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1357 part.addparam('return', '%i' % ret, mandatory=False)
1359 part.addparam('return', '%i' % ret, mandatory=False)
1358 try:
1360 try:
1359 real_part.validate()
1361 real_part.validate()
1360 except error.Abort as e:
1362 except error.Abort as e:
1361 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1363 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1362 (util.hidepassword(raw_url), str(e)))
1364 (util.hidepassword(raw_url), str(e)))
1363 assert not inpart.read()
1365 assert not inpart.read()
1364
1366
1365 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1367 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1366 def handlereplychangegroup(op, inpart):
1368 def handlereplychangegroup(op, inpart):
1367 ret = int(inpart.params['return'])
1369 ret = int(inpart.params['return'])
1368 replyto = int(inpart.params['in-reply-to'])
1370 replyto = int(inpart.params['in-reply-to'])
1369 op.records.add('changegroup', {'return': ret}, replyto)
1371 op.records.add('changegroup', {'return': ret}, replyto)
1370
1372
1371 @parthandler('check:heads')
1373 @parthandler('check:heads')
1372 def handlecheckheads(op, inpart):
1374 def handlecheckheads(op, inpart):
1373 """check that head of the repo did not change
1375 """check that head of the repo did not change
1374
1376
1375 This is used to detect a push race when using unbundle.
1377 This is used to detect a push race when using unbundle.
1376 This replaces the "heads" argument of unbundle."""
1378 This replaces the "heads" argument of unbundle."""
1377 h = inpart.read(20)
1379 h = inpart.read(20)
1378 heads = []
1380 heads = []
1379 while len(h) == 20:
1381 while len(h) == 20:
1380 heads.append(h)
1382 heads.append(h)
1381 h = inpart.read(20)
1383 h = inpart.read(20)
1382 assert not h
1384 assert not h
1383 # Trigger a transaction so that we are guaranteed to have the lock now.
1385 # Trigger a transaction so that we are guaranteed to have the lock now.
1384 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1386 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1385 op.gettransaction()
1387 op.gettransaction()
1386 if heads != op.repo.heads():
1388 if heads != op.repo.heads():
1387 raise error.PushRaced('repository changed while pushing - '
1389 raise error.PushRaced('repository changed while pushing - '
1388 'please try again')
1390 'please try again')
1389
1391
1390 @parthandler('output')
1392 @parthandler('output')
1391 def handleoutput(op, inpart):
1393 def handleoutput(op, inpart):
1392 """forward output captured on the server to the client"""
1394 """forward output captured on the server to the client"""
1393 for line in inpart.read().splitlines():
1395 for line in inpart.read().splitlines():
1394 op.ui.status(('remote: %s\n' % line))
1396 op.ui.status(('remote: %s\n' % line))
1395
1397
1396 @parthandler('replycaps')
1398 @parthandler('replycaps')
1397 def handlereplycaps(op, inpart):
1399 def handlereplycaps(op, inpart):
1398 """Notify that a reply bundle should be created
1400 """Notify that a reply bundle should be created
1399
1401
1400 The payload contains the capabilities information for the reply"""
1402 The payload contains the capabilities information for the reply"""
1401 caps = decodecaps(inpart.read())
1403 caps = decodecaps(inpart.read())
1402 if op.reply is None:
1404 if op.reply is None:
1403 op.reply = bundle20(op.ui, caps)
1405 op.reply = bundle20(op.ui, caps)
1404
1406
1405 @parthandler('error:abort', ('message', 'hint'))
1407 @parthandler('error:abort', ('message', 'hint'))
1406 def handleerrorabort(op, inpart):
1408 def handleerrorabort(op, inpart):
1407 """Used to transmit abort error over the wire"""
1409 """Used to transmit abort error over the wire"""
1408 raise error.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
1410 raise error.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
1409
1411
1410 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1412 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1411 'in-reply-to'))
1413 'in-reply-to'))
1412 def handleerrorpushkey(op, inpart):
1414 def handleerrorpushkey(op, inpart):
1413 """Used to transmit failure of a mandatory pushkey over the wire"""
1415 """Used to transmit failure of a mandatory pushkey over the wire"""
1414 kwargs = {}
1416 kwargs = {}
1415 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1417 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1416 value = inpart.params.get(name)
1418 value = inpart.params.get(name)
1417 if value is not None:
1419 if value is not None:
1418 kwargs[name] = value
1420 kwargs[name] = value
1419 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1421 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1420
1422
1421 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1423 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1422 def handleerrorunsupportedcontent(op, inpart):
1424 def handleerrorunsupportedcontent(op, inpart):
1423 """Used to transmit unknown content error over the wire"""
1425 """Used to transmit unknown content error over the wire"""
1424 kwargs = {}
1426 kwargs = {}
1425 parttype = inpart.params.get('parttype')
1427 parttype = inpart.params.get('parttype')
1426 if parttype is not None:
1428 if parttype is not None:
1427 kwargs['parttype'] = parttype
1429 kwargs['parttype'] = parttype
1428 params = inpart.params.get('params')
1430 params = inpart.params.get('params')
1429 if params is not None:
1431 if params is not None:
1430 kwargs['params'] = params.split('\0')
1432 kwargs['params'] = params.split('\0')
1431
1433
1432 raise error.BundleUnknownFeatureError(**kwargs)
1434 raise error.BundleUnknownFeatureError(**kwargs)
1433
1435
1434 @parthandler('error:pushraced', ('message',))
1436 @parthandler('error:pushraced', ('message',))
1435 def handleerrorpushraced(op, inpart):
1437 def handleerrorpushraced(op, inpart):
1436 """Used to transmit push race error over the wire"""
1438 """Used to transmit push race error over the wire"""
1437 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1439 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1438
1440
1439 @parthandler('listkeys', ('namespace',))
1441 @parthandler('listkeys', ('namespace',))
1440 def handlelistkeys(op, inpart):
1442 def handlelistkeys(op, inpart):
1441 """retrieve pushkey namespace content stored in a bundle2"""
1443 """retrieve pushkey namespace content stored in a bundle2"""
1442 namespace = inpart.params['namespace']
1444 namespace = inpart.params['namespace']
1443 r = pushkey.decodekeys(inpart.read())
1445 r = pushkey.decodekeys(inpart.read())
1444 op.records.add('listkeys', (namespace, r))
1446 op.records.add('listkeys', (namespace, r))
1445
1447
1446 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1448 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1447 def handlepushkey(op, inpart):
1449 def handlepushkey(op, inpart):
1448 """process a pushkey request"""
1450 """process a pushkey request"""
1449 dec = pushkey.decode
1451 dec = pushkey.decode
1450 namespace = dec(inpart.params['namespace'])
1452 namespace = dec(inpart.params['namespace'])
1451 key = dec(inpart.params['key'])
1453 key = dec(inpart.params['key'])
1452 old = dec(inpart.params['old'])
1454 old = dec(inpart.params['old'])
1453 new = dec(inpart.params['new'])
1455 new = dec(inpart.params['new'])
1454 # Grab the transaction to ensure that we have the lock before performing the
1456 # Grab the transaction to ensure that we have the lock before performing the
1455 # pushkey.
1457 # pushkey.
1456 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1458 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1457 op.gettransaction()
1459 op.gettransaction()
1458 ret = op.repo.pushkey(namespace, key, old, new)
1460 ret = op.repo.pushkey(namespace, key, old, new)
1459 record = {'namespace': namespace,
1461 record = {'namespace': namespace,
1460 'key': key,
1462 'key': key,
1461 'old': old,
1463 'old': old,
1462 'new': new}
1464 'new': new}
1463 op.records.add('pushkey', record)
1465 op.records.add('pushkey', record)
1464 if op.reply is not None:
1466 if op.reply is not None:
1465 rpart = op.reply.newpart('reply:pushkey')
1467 rpart = op.reply.newpart('reply:pushkey')
1466 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1468 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1467 rpart.addparam('return', '%i' % ret, mandatory=False)
1469 rpart.addparam('return', '%i' % ret, mandatory=False)
1468 if inpart.mandatory and not ret:
1470 if inpart.mandatory and not ret:
1469 kwargs = {}
1471 kwargs = {}
1470 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1472 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1471 if key in inpart.params:
1473 if key in inpart.params:
1472 kwargs[key] = inpart.params[key]
1474 kwargs[key] = inpart.params[key]
1473 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1475 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1474
1476
1475 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1477 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1476 def handlepushkeyreply(op, inpart):
1478 def handlepushkeyreply(op, inpart):
1477 """retrieve the result of a pushkey request"""
1479 """retrieve the result of a pushkey request"""
1478 ret = int(inpart.params['return'])
1480 ret = int(inpart.params['return'])
1479 partid = int(inpart.params['in-reply-to'])
1481 partid = int(inpart.params['in-reply-to'])
1480 op.records.add('pushkey', {'return': ret}, partid)
1482 op.records.add('pushkey', {'return': ret}, partid)
1481
1483
1482 @parthandler('obsmarkers')
1484 @parthandler('obsmarkers')
1483 def handleobsmarker(op, inpart):
1485 def handleobsmarker(op, inpart):
1484 """add a stream of obsmarkers to the repo"""
1486 """add a stream of obsmarkers to the repo"""
1485 tr = op.gettransaction()
1487 tr = op.gettransaction()
1486 markerdata = inpart.read()
1488 markerdata = inpart.read()
1487 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1489 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1488 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1490 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1489 % len(markerdata))
1491 % len(markerdata))
1490 # The mergemarkers call will crash if marker creation is not enabled.
1492 # The mergemarkers call will crash if marker creation is not enabled.
1491 # we want to avoid this if the part is advisory.
1493 # we want to avoid this if the part is advisory.
1492 if not inpart.mandatory and op.repo.obsstore.readonly:
1494 if not inpart.mandatory and op.repo.obsstore.readonly:
1493 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1495 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1494 return
1496 return
1495 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1497 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1496 if new:
1498 if new:
1497 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1499 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1498 op.records.add('obsmarkers', {'new': new})
1500 op.records.add('obsmarkers', {'new': new})
1499 if op.reply is not None:
1501 if op.reply is not None:
1500 rpart = op.reply.newpart('reply:obsmarkers')
1502 rpart = op.reply.newpart('reply:obsmarkers')
1501 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1503 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1502 rpart.addparam('new', '%i' % new, mandatory=False)
1504 rpart.addparam('new', '%i' % new, mandatory=False)
1503
1505
1504
1506
1505 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1507 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1506 def handleobsmarkerreply(op, inpart):
1508 def handleobsmarkerreply(op, inpart):
1507 """retrieve the result of a pushkey request"""
1509 """retrieve the result of a pushkey request"""
1508 ret = int(inpart.params['new'])
1510 ret = int(inpart.params['new'])
1509 partid = int(inpart.params['in-reply-to'])
1511 partid = int(inpart.params['in-reply-to'])
1510 op.records.add('obsmarkers', {'new': ret}, partid)
1512 op.records.add('obsmarkers', {'new': ret}, partid)
1511
1513
1512 @parthandler('hgtagsfnodes')
1514 @parthandler('hgtagsfnodes')
1513 def handlehgtagsfnodes(op, inpart):
1515 def handlehgtagsfnodes(op, inpart):
1514 """Applies .hgtags fnodes cache entries to the local repo.
1516 """Applies .hgtags fnodes cache entries to the local repo.
1515
1517
1516 Payload is pairs of 20 byte changeset nodes and filenodes.
1518 Payload is pairs of 20 byte changeset nodes and filenodes.
1517 """
1519 """
1518 # Grab the transaction so we ensure that we have the lock at this point.
1520 # Grab the transaction so we ensure that we have the lock at this point.
1519 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1521 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1520 op.gettransaction()
1522 op.gettransaction()
1521 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1523 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1522
1524
1523 count = 0
1525 count = 0
1524 while True:
1526 while True:
1525 node = inpart.read(20)
1527 node = inpart.read(20)
1526 fnode = inpart.read(20)
1528 fnode = inpart.read(20)
1527 if len(node) < 20 or len(fnode) < 20:
1529 if len(node) < 20 or len(fnode) < 20:
1528 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1530 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1529 break
1531 break
1530 cache.setfnode(node, fnode)
1532 cache.setfnode(node, fnode)
1531 count += 1
1533 count += 1
1532
1534
1533 cache.write()
1535 cache.write()
1534 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
1536 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
General Comments 0
You need to be logged in to leave comments. Login now