##// END OF EJS Templates
bundle2: use absolute_import
Gregory Szorc -
r25919:8221fefa default
parent child Browse files
Show More
@@ -1,1410 +1,1416 b''
1 # bundle2.py - generic container format to transmit arbitrary data.
1 # bundle2.py - generic container format to transmit arbitrary data.
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7 """Handling of the new bundle2 format
7 """Handling of the new bundle2 format
8
8
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 payloads in an application agnostic way. It consist in a sequence of "parts"
10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 that will be handed to and processed by the application layer.
11 that will be handed to and processed by the application layer.
12
12
13
13
14 General format architecture
14 General format architecture
15 ===========================
15 ===========================
16
16
17 The format is architectured as follow
17 The format is architectured as follow
18
18
19 - magic string
19 - magic string
20 - stream level parameters
20 - stream level parameters
21 - payload parts (any number)
21 - payload parts (any number)
22 - end of stream marker.
22 - end of stream marker.
23
23
24 the Binary format
24 the Binary format
25 ============================
25 ============================
26
26
27 All numbers are unsigned and big-endian.
27 All numbers are unsigned and big-endian.
28
28
29 stream level parameters
29 stream level parameters
30 ------------------------
30 ------------------------
31
31
32 Binary format is as follow
32 Binary format is as follow
33
33
34 :params size: int32
34 :params size: int32
35
35
36 The total number of Bytes used by the parameters
36 The total number of Bytes used by the parameters
37
37
38 :params value: arbitrary number of Bytes
38 :params value: arbitrary number of Bytes
39
39
40 A blob of `params size` containing the serialized version of all stream level
40 A blob of `params size` containing the serialized version of all stream level
41 parameters.
41 parameters.
42
42
43 The blob contains a space separated list of parameters. Parameters with value
43 The blob contains a space separated list of parameters. Parameters with value
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45
45
46 Empty name are obviously forbidden.
46 Empty name are obviously forbidden.
47
47
48 Name MUST start with a letter. If this first letter is lower case, the
48 Name MUST start with a letter. If this first letter is lower case, the
49 parameter is advisory and can be safely ignored. However when the first
49 parameter is advisory and can be safely ignored. However when the first
50 letter is capital, the parameter is mandatory and the bundling process MUST
50 letter is capital, the parameter is mandatory and the bundling process MUST
51 stop if he is not able to proceed it.
51 stop if he is not able to proceed it.
52
52
53 Stream parameters use a simple textual format for two main reasons:
53 Stream parameters use a simple textual format for two main reasons:
54
54
55 - Stream level parameters should remain simple and we want to discourage any
55 - Stream level parameters should remain simple and we want to discourage any
56 crazy usage.
56 crazy usage.
57 - Textual data allow easy human inspection of a bundle2 header in case of
57 - Textual data allow easy human inspection of a bundle2 header in case of
58 troubles.
58 troubles.
59
59
60 Any Applicative level options MUST go into a bundle2 part instead.
60 Any Applicative level options MUST go into a bundle2 part instead.
61
61
62 Payload part
62 Payload part
63 ------------------------
63 ------------------------
64
64
65 Binary format is as follow
65 Binary format is as follow
66
66
67 :header size: int32
67 :header size: int32
68
68
69 The total number of Bytes used by the part header. When the header is empty
69 The total number of Bytes used by the part header. When the header is empty
70 (size = 0) this is interpreted as the end of stream marker.
70 (size = 0) this is interpreted as the end of stream marker.
71
71
72 :header:
72 :header:
73
73
74 The header defines how to interpret the part. It contains two piece of
74 The header defines how to interpret the part. It contains two piece of
75 data: the part type, and the part parameters.
75 data: the part type, and the part parameters.
76
76
77 The part type is used to route an application level handler, that can
77 The part type is used to route an application level handler, that can
78 interpret payload.
78 interpret payload.
79
79
80 Part parameters are passed to the application level handler. They are
80 Part parameters are passed to the application level handler. They are
81 meant to convey information that will help the application level object to
81 meant to convey information that will help the application level object to
82 interpret the part payload.
82 interpret the part payload.
83
83
84 The binary format of the header is has follow
84 The binary format of the header is has follow
85
85
86 :typesize: (one byte)
86 :typesize: (one byte)
87
87
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89
89
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 to this part.
91 to this part.
92
92
93 :parameters:
93 :parameters:
94
94
95 Part's parameter may have arbitrary content, the binary structure is::
95 Part's parameter may have arbitrary content, the binary structure is::
96
96
97 <mandatory-count><advisory-count><param-sizes><param-data>
97 <mandatory-count><advisory-count><param-sizes><param-data>
98
98
99 :mandatory-count: 1 byte, number of mandatory parameters
99 :mandatory-count: 1 byte, number of mandatory parameters
100
100
101 :advisory-count: 1 byte, number of advisory parameters
101 :advisory-count: 1 byte, number of advisory parameters
102
102
103 :param-sizes:
103 :param-sizes:
104
104
105 N couple of bytes, where N is the total number of parameters. Each
105 N couple of bytes, where N is the total number of parameters. Each
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107
107
108 :param-data:
108 :param-data:
109
109
110 A blob of bytes from which each parameter key and value can be
110 A blob of bytes from which each parameter key and value can be
111 retrieved using the list of size couples stored in the previous
111 retrieved using the list of size couples stored in the previous
112 field.
112 field.
113
113
114 Mandatory parameters comes first, then the advisory ones.
114 Mandatory parameters comes first, then the advisory ones.
115
115
116 Each parameter's key MUST be unique within the part.
116 Each parameter's key MUST be unique within the part.
117
117
118 :payload:
118 :payload:
119
119
120 payload is a series of `<chunksize><chunkdata>`.
120 payload is a series of `<chunksize><chunkdata>`.
121
121
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124
124
125 The current implementation always produces either zero or one chunk.
125 The current implementation always produces either zero or one chunk.
126 This is an implementation limitation that will ultimately be lifted.
126 This is an implementation limitation that will ultimately be lifted.
127
127
128 `chunksize` can be negative to trigger special case processing. No such
128 `chunksize` can be negative to trigger special case processing. No such
129 processing is in place yet.
129 processing is in place yet.
130
130
131 Bundle processing
131 Bundle processing
132 ============================
132 ============================
133
133
134 Each part is processed in order using a "part handler". Handler are registered
134 Each part is processed in order using a "part handler". Handler are registered
135 for a certain part type.
135 for a certain part type.
136
136
137 The matching of a part to its handler is case insensitive. The case of the
137 The matching of a part to its handler is case insensitive. The case of the
138 part type is used to know if a part is mandatory or advisory. If the Part type
138 part type is used to know if a part is mandatory or advisory. If the Part type
139 contains any uppercase char it is considered mandatory. When no handler is
139 contains any uppercase char it is considered mandatory. When no handler is
140 known for a Mandatory part, the process is aborted and an exception is raised.
140 known for a Mandatory part, the process is aborted and an exception is raised.
141 If the part is advisory and no handler is known, the part is ignored. When the
141 If the part is advisory and no handler is known, the part is ignored. When the
142 process is aborted, the full bundle is still read from the stream to keep the
142 process is aborted, the full bundle is still read from the stream to keep the
143 channel usable. But none of the part read from an abort are processed. In the
143 channel usable. But none of the part read from an abort are processed. In the
144 future, dropping the stream may become an option for channel we do not care to
144 future, dropping the stream may become an option for channel we do not care to
145 preserve.
145 preserve.
146 """
146 """
147
147
148 from __future__ import absolute_import
149
148 import errno
150 import errno
149 import sys
151 import re
150 import util
151 import struct
152 import urllib
153 import string
152 import string
154 import obsolete
153 import struct
155 import pushkey
154 import sys
156 import url
155 import urllib
157 import re
158
156
159 import changegroup, error, tags
157 from .i18n import _
160 from i18n import _
158 from . import (
159 changegroup,
160 error,
161 obsolete,
162 pushkey,
163 tags,
164 url,
165 util,
166 )
161
167
162 _pack = struct.pack
168 _pack = struct.pack
163 _unpack = struct.unpack
169 _unpack = struct.unpack
164
170
165 _fstreamparamsize = '>i'
171 _fstreamparamsize = '>i'
166 _fpartheadersize = '>i'
172 _fpartheadersize = '>i'
167 _fparttypesize = '>B'
173 _fparttypesize = '>B'
168 _fpartid = '>I'
174 _fpartid = '>I'
169 _fpayloadsize = '>i'
175 _fpayloadsize = '>i'
170 _fpartparamcount = '>BB'
176 _fpartparamcount = '>BB'
171
177
172 preferedchunksize = 4096
178 preferedchunksize = 4096
173
179
174 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
180 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
175
181
176 def outdebug(ui, message):
182 def outdebug(ui, message):
177 """debug regarding output stream (bundling)"""
183 """debug regarding output stream (bundling)"""
178 if ui.configbool('devel', 'bundle2.debug', False):
184 if ui.configbool('devel', 'bundle2.debug', False):
179 ui.debug('bundle2-output: %s\n' % message)
185 ui.debug('bundle2-output: %s\n' % message)
180
186
181 def indebug(ui, message):
187 def indebug(ui, message):
182 """debug on input stream (unbundling)"""
188 """debug on input stream (unbundling)"""
183 if ui.configbool('devel', 'bundle2.debug', False):
189 if ui.configbool('devel', 'bundle2.debug', False):
184 ui.debug('bundle2-input: %s\n' % message)
190 ui.debug('bundle2-input: %s\n' % message)
185
191
186 def validateparttype(parttype):
192 def validateparttype(parttype):
187 """raise ValueError if a parttype contains invalid character"""
193 """raise ValueError if a parttype contains invalid character"""
188 if _parttypeforbidden.search(parttype):
194 if _parttypeforbidden.search(parttype):
189 raise ValueError(parttype)
195 raise ValueError(parttype)
190
196
191 def _makefpartparamsizes(nbparams):
197 def _makefpartparamsizes(nbparams):
192 """return a struct format to read part parameter sizes
198 """return a struct format to read part parameter sizes
193
199
194 The number parameters is variable so we need to build that format
200 The number parameters is variable so we need to build that format
195 dynamically.
201 dynamically.
196 """
202 """
197 return '>'+('BB'*nbparams)
203 return '>'+('BB'*nbparams)
198
204
199 parthandlermapping = {}
205 parthandlermapping = {}
200
206
201 def parthandler(parttype, params=()):
207 def parthandler(parttype, params=()):
202 """decorator that register a function as a bundle2 part handler
208 """decorator that register a function as a bundle2 part handler
203
209
204 eg::
210 eg::
205
211
206 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
212 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
207 def myparttypehandler(...):
213 def myparttypehandler(...):
208 '''process a part of type "my part".'''
214 '''process a part of type "my part".'''
209 ...
215 ...
210 """
216 """
211 validateparttype(parttype)
217 validateparttype(parttype)
212 def _decorator(func):
218 def _decorator(func):
213 lparttype = parttype.lower() # enforce lower case matching.
219 lparttype = parttype.lower() # enforce lower case matching.
214 assert lparttype not in parthandlermapping
220 assert lparttype not in parthandlermapping
215 parthandlermapping[lparttype] = func
221 parthandlermapping[lparttype] = func
216 func.params = frozenset(params)
222 func.params = frozenset(params)
217 return func
223 return func
218 return _decorator
224 return _decorator
219
225
220 class unbundlerecords(object):
226 class unbundlerecords(object):
221 """keep record of what happens during and unbundle
227 """keep record of what happens during and unbundle
222
228
223 New records are added using `records.add('cat', obj)`. Where 'cat' is a
229 New records are added using `records.add('cat', obj)`. Where 'cat' is a
224 category of record and obj is an arbitrary object.
230 category of record and obj is an arbitrary object.
225
231
226 `records['cat']` will return all entries of this category 'cat'.
232 `records['cat']` will return all entries of this category 'cat'.
227
233
228 Iterating on the object itself will yield `('category', obj)` tuples
234 Iterating on the object itself will yield `('category', obj)` tuples
229 for all entries.
235 for all entries.
230
236
231 All iterations happens in chronological order.
237 All iterations happens in chronological order.
232 """
238 """
233
239
234 def __init__(self):
240 def __init__(self):
235 self._categories = {}
241 self._categories = {}
236 self._sequences = []
242 self._sequences = []
237 self._replies = {}
243 self._replies = {}
238
244
239 def add(self, category, entry, inreplyto=None):
245 def add(self, category, entry, inreplyto=None):
240 """add a new record of a given category.
246 """add a new record of a given category.
241
247
242 The entry can then be retrieved in the list returned by
248 The entry can then be retrieved in the list returned by
243 self['category']."""
249 self['category']."""
244 self._categories.setdefault(category, []).append(entry)
250 self._categories.setdefault(category, []).append(entry)
245 self._sequences.append((category, entry))
251 self._sequences.append((category, entry))
246 if inreplyto is not None:
252 if inreplyto is not None:
247 self.getreplies(inreplyto).add(category, entry)
253 self.getreplies(inreplyto).add(category, entry)
248
254
249 def getreplies(self, partid):
255 def getreplies(self, partid):
250 """get the records that are replies to a specific part"""
256 """get the records that are replies to a specific part"""
251 return self._replies.setdefault(partid, unbundlerecords())
257 return self._replies.setdefault(partid, unbundlerecords())
252
258
253 def __getitem__(self, cat):
259 def __getitem__(self, cat):
254 return tuple(self._categories.get(cat, ()))
260 return tuple(self._categories.get(cat, ()))
255
261
256 def __iter__(self):
262 def __iter__(self):
257 return iter(self._sequences)
263 return iter(self._sequences)
258
264
259 def __len__(self):
265 def __len__(self):
260 return len(self._sequences)
266 return len(self._sequences)
261
267
262 def __nonzero__(self):
268 def __nonzero__(self):
263 return bool(self._sequences)
269 return bool(self._sequences)
264
270
265 class bundleoperation(object):
271 class bundleoperation(object):
266 """an object that represents a single bundling process
272 """an object that represents a single bundling process
267
273
268 Its purpose is to carry unbundle-related objects and states.
274 Its purpose is to carry unbundle-related objects and states.
269
275
270 A new object should be created at the beginning of each bundle processing.
276 A new object should be created at the beginning of each bundle processing.
271 The object is to be returned by the processing function.
277 The object is to be returned by the processing function.
272
278
273 The object has very little content now it will ultimately contain:
279 The object has very little content now it will ultimately contain:
274 * an access to the repo the bundle is applied to,
280 * an access to the repo the bundle is applied to,
275 * a ui object,
281 * a ui object,
276 * a way to retrieve a transaction to add changes to the repo,
282 * a way to retrieve a transaction to add changes to the repo,
277 * a way to record the result of processing each part,
283 * a way to record the result of processing each part,
278 * a way to construct a bundle response when applicable.
284 * a way to construct a bundle response when applicable.
279 """
285 """
280
286
281 def __init__(self, repo, transactiongetter, captureoutput=True):
287 def __init__(self, repo, transactiongetter, captureoutput=True):
282 self.repo = repo
288 self.repo = repo
283 self.ui = repo.ui
289 self.ui = repo.ui
284 self.records = unbundlerecords()
290 self.records = unbundlerecords()
285 self.gettransaction = transactiongetter
291 self.gettransaction = transactiongetter
286 self.reply = None
292 self.reply = None
287 self.captureoutput = captureoutput
293 self.captureoutput = captureoutput
288
294
289 class TransactionUnavailable(RuntimeError):
295 class TransactionUnavailable(RuntimeError):
290 pass
296 pass
291
297
292 def _notransaction():
298 def _notransaction():
293 """default method to get a transaction while processing a bundle
299 """default method to get a transaction while processing a bundle
294
300
295 Raise an exception to highlight the fact that no transaction was expected
301 Raise an exception to highlight the fact that no transaction was expected
296 to be created"""
302 to be created"""
297 raise TransactionUnavailable()
303 raise TransactionUnavailable()
298
304
299 def processbundle(repo, unbundler, transactiongetter=None, op=None):
305 def processbundle(repo, unbundler, transactiongetter=None, op=None):
300 """This function process a bundle, apply effect to/from a repo
306 """This function process a bundle, apply effect to/from a repo
301
307
302 It iterates over each part then searches for and uses the proper handling
308 It iterates over each part then searches for and uses the proper handling
303 code to process the part. Parts are processed in order.
309 code to process the part. Parts are processed in order.
304
310
305 This is very early version of this function that will be strongly reworked
311 This is very early version of this function that will be strongly reworked
306 before final usage.
312 before final usage.
307
313
308 Unknown Mandatory part will abort the process.
314 Unknown Mandatory part will abort the process.
309
315
310 It is temporarily possible to provide a prebuilt bundleoperation to the
316 It is temporarily possible to provide a prebuilt bundleoperation to the
311 function. This is used to ensure output is properly propagated in case of
317 function. This is used to ensure output is properly propagated in case of
312 an error during the unbundling. This output capturing part will likely be
318 an error during the unbundling. This output capturing part will likely be
313 reworked and this ability will probably go away in the process.
319 reworked and this ability will probably go away in the process.
314 """
320 """
315 if op is None:
321 if op is None:
316 if transactiongetter is None:
322 if transactiongetter is None:
317 transactiongetter = _notransaction
323 transactiongetter = _notransaction
318 op = bundleoperation(repo, transactiongetter)
324 op = bundleoperation(repo, transactiongetter)
319 # todo:
325 # todo:
320 # - replace this is a init function soon.
326 # - replace this is a init function soon.
321 # - exception catching
327 # - exception catching
322 unbundler.params
328 unbundler.params
323 if repo.ui.debugflag:
329 if repo.ui.debugflag:
324 msg = ['bundle2-input-bundle:']
330 msg = ['bundle2-input-bundle:']
325 if unbundler.params:
331 if unbundler.params:
326 msg.append(' %i params')
332 msg.append(' %i params')
327 if op.gettransaction is None:
333 if op.gettransaction is None:
328 msg.append(' no-transaction')
334 msg.append(' no-transaction')
329 else:
335 else:
330 msg.append(' with-transaction')
336 msg.append(' with-transaction')
331 msg.append('\n')
337 msg.append('\n')
332 repo.ui.debug(''.join(msg))
338 repo.ui.debug(''.join(msg))
333 iterparts = enumerate(unbundler.iterparts())
339 iterparts = enumerate(unbundler.iterparts())
334 part = None
340 part = None
335 nbpart = 0
341 nbpart = 0
336 try:
342 try:
337 for nbpart, part in iterparts:
343 for nbpart, part in iterparts:
338 _processpart(op, part)
344 _processpart(op, part)
339 except BaseException as exc:
345 except BaseException as exc:
340 for nbpart, part in iterparts:
346 for nbpart, part in iterparts:
341 # consume the bundle content
347 # consume the bundle content
342 part.seek(0, 2)
348 part.seek(0, 2)
343 # Small hack to let caller code distinguish exceptions from bundle2
349 # Small hack to let caller code distinguish exceptions from bundle2
344 # processing from processing the old format. This is mostly
350 # processing from processing the old format. This is mostly
345 # needed to handle different return codes to unbundle according to the
351 # needed to handle different return codes to unbundle according to the
346 # type of bundle. We should probably clean up or drop this return code
352 # type of bundle. We should probably clean up or drop this return code
347 # craziness in a future version.
353 # craziness in a future version.
348 exc.duringunbundle2 = True
354 exc.duringunbundle2 = True
349 salvaged = []
355 salvaged = []
350 replycaps = None
356 replycaps = None
351 if op.reply is not None:
357 if op.reply is not None:
352 salvaged = op.reply.salvageoutput()
358 salvaged = op.reply.salvageoutput()
353 replycaps = op.reply.capabilities
359 replycaps = op.reply.capabilities
354 exc._replycaps = replycaps
360 exc._replycaps = replycaps
355 exc._bundle2salvagedoutput = salvaged
361 exc._bundle2salvagedoutput = salvaged
356 raise
362 raise
357 finally:
363 finally:
358 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
364 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
359
365
360 return op
366 return op
361
367
362 def _processpart(op, part):
368 def _processpart(op, part):
363 """process a single part from a bundle
369 """process a single part from a bundle
364
370
365 The part is guaranteed to have been fully consumed when the function exits
371 The part is guaranteed to have been fully consumed when the function exits
366 (even if an exception is raised)."""
372 (even if an exception is raised)."""
367 status = 'unknown' # used by debug output
373 status = 'unknown' # used by debug output
368 try:
374 try:
369 try:
375 try:
370 handler = parthandlermapping.get(part.type)
376 handler = parthandlermapping.get(part.type)
371 if handler is None:
377 if handler is None:
372 status = 'unsupported-type'
378 status = 'unsupported-type'
373 raise error.UnsupportedPartError(parttype=part.type)
379 raise error.UnsupportedPartError(parttype=part.type)
374 indebug(op.ui, 'found a handler for part %r' % part.type)
380 indebug(op.ui, 'found a handler for part %r' % part.type)
375 unknownparams = part.mandatorykeys - handler.params
381 unknownparams = part.mandatorykeys - handler.params
376 if unknownparams:
382 if unknownparams:
377 unknownparams = list(unknownparams)
383 unknownparams = list(unknownparams)
378 unknownparams.sort()
384 unknownparams.sort()
379 status = 'unsupported-params (%s)' % unknownparams
385 status = 'unsupported-params (%s)' % unknownparams
380 raise error.UnsupportedPartError(parttype=part.type,
386 raise error.UnsupportedPartError(parttype=part.type,
381 params=unknownparams)
387 params=unknownparams)
382 status = 'supported'
388 status = 'supported'
383 except error.UnsupportedPartError as exc:
389 except error.UnsupportedPartError as exc:
384 if part.mandatory: # mandatory parts
390 if part.mandatory: # mandatory parts
385 raise
391 raise
386 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
392 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
387 return # skip to part processing
393 return # skip to part processing
388 finally:
394 finally:
389 if op.ui.debugflag:
395 if op.ui.debugflag:
390 msg = ['bundle2-input-part: "%s"' % part.type]
396 msg = ['bundle2-input-part: "%s"' % part.type]
391 if not part.mandatory:
397 if not part.mandatory:
392 msg.append(' (advisory)')
398 msg.append(' (advisory)')
393 nbmp = len(part.mandatorykeys)
399 nbmp = len(part.mandatorykeys)
394 nbap = len(part.params) - nbmp
400 nbap = len(part.params) - nbmp
395 if nbmp or nbap:
401 if nbmp or nbap:
396 msg.append(' (params:')
402 msg.append(' (params:')
397 if nbmp:
403 if nbmp:
398 msg.append(' %i mandatory' % nbmp)
404 msg.append(' %i mandatory' % nbmp)
399 if nbap:
405 if nbap:
400 msg.append(' %i advisory' % nbmp)
406 msg.append(' %i advisory' % nbmp)
401 msg.append(')')
407 msg.append(')')
402 msg.append(' %s\n' % status)
408 msg.append(' %s\n' % status)
403 op.ui.debug(''.join(msg))
409 op.ui.debug(''.join(msg))
404
410
405 # handler is called outside the above try block so that we don't
411 # handler is called outside the above try block so that we don't
406 # risk catching KeyErrors from anything other than the
412 # risk catching KeyErrors from anything other than the
407 # parthandlermapping lookup (any KeyError raised by handler()
413 # parthandlermapping lookup (any KeyError raised by handler()
408 # itself represents a defect of a different variety).
414 # itself represents a defect of a different variety).
409 output = None
415 output = None
410 if op.captureoutput and op.reply is not None:
416 if op.captureoutput and op.reply is not None:
411 op.ui.pushbuffer(error=True, subproc=True)
417 op.ui.pushbuffer(error=True, subproc=True)
412 output = ''
418 output = ''
413 try:
419 try:
414 handler(op, part)
420 handler(op, part)
415 finally:
421 finally:
416 if output is not None:
422 if output is not None:
417 output = op.ui.popbuffer()
423 output = op.ui.popbuffer()
418 if output:
424 if output:
419 outpart = op.reply.newpart('output', data=output,
425 outpart = op.reply.newpart('output', data=output,
420 mandatory=False)
426 mandatory=False)
421 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
427 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
422 finally:
428 finally:
423 # consume the part content to not corrupt the stream.
429 # consume the part content to not corrupt the stream.
424 part.seek(0, 2)
430 part.seek(0, 2)
425
431
426
432
427 def decodecaps(blob):
433 def decodecaps(blob):
428 """decode a bundle2 caps bytes blob into a dictionary
434 """decode a bundle2 caps bytes blob into a dictionary
429
435
430 The blob is a list of capabilities (one per line)
436 The blob is a list of capabilities (one per line)
431 Capabilities may have values using a line of the form::
437 Capabilities may have values using a line of the form::
432
438
433 capability=value1,value2,value3
439 capability=value1,value2,value3
434
440
435 The values are always a list."""
441 The values are always a list."""
436 caps = {}
442 caps = {}
437 for line in blob.splitlines():
443 for line in blob.splitlines():
438 if not line:
444 if not line:
439 continue
445 continue
440 if '=' not in line:
446 if '=' not in line:
441 key, vals = line, ()
447 key, vals = line, ()
442 else:
448 else:
443 key, vals = line.split('=', 1)
449 key, vals = line.split('=', 1)
444 vals = vals.split(',')
450 vals = vals.split(',')
445 key = urllib.unquote(key)
451 key = urllib.unquote(key)
446 vals = [urllib.unquote(v) for v in vals]
452 vals = [urllib.unquote(v) for v in vals]
447 caps[key] = vals
453 caps[key] = vals
448 return caps
454 return caps
449
455
450 def encodecaps(caps):
456 def encodecaps(caps):
451 """encode a bundle2 caps dictionary into a bytes blob"""
457 """encode a bundle2 caps dictionary into a bytes blob"""
452 chunks = []
458 chunks = []
453 for ca in sorted(caps):
459 for ca in sorted(caps):
454 vals = caps[ca]
460 vals = caps[ca]
455 ca = urllib.quote(ca)
461 ca = urllib.quote(ca)
456 vals = [urllib.quote(v) for v in vals]
462 vals = [urllib.quote(v) for v in vals]
457 if vals:
463 if vals:
458 ca = "%s=%s" % (ca, ','.join(vals))
464 ca = "%s=%s" % (ca, ','.join(vals))
459 chunks.append(ca)
465 chunks.append(ca)
460 return '\n'.join(chunks)
466 return '\n'.join(chunks)
461
467
462 class bundle20(object):
468 class bundle20(object):
463 """represent an outgoing bundle2 container
469 """represent an outgoing bundle2 container
464
470
465 Use the `addparam` method to add stream level parameter. and `newpart` to
471 Use the `addparam` method to add stream level parameter. and `newpart` to
466 populate it. Then call `getchunks` to retrieve all the binary chunks of
472 populate it. Then call `getchunks` to retrieve all the binary chunks of
467 data that compose the bundle2 container."""
473 data that compose the bundle2 container."""
468
474
469 _magicstring = 'HG20'
475 _magicstring = 'HG20'
470
476
471 def __init__(self, ui, capabilities=()):
477 def __init__(self, ui, capabilities=()):
472 self.ui = ui
478 self.ui = ui
473 self._params = []
479 self._params = []
474 self._parts = []
480 self._parts = []
475 self.capabilities = dict(capabilities)
481 self.capabilities = dict(capabilities)
476
482
477 @property
483 @property
478 def nbparts(self):
484 def nbparts(self):
479 """total number of parts added to the bundler"""
485 """total number of parts added to the bundler"""
480 return len(self._parts)
486 return len(self._parts)
481
487
482 # methods used to defines the bundle2 content
488 # methods used to defines the bundle2 content
483 def addparam(self, name, value=None):
489 def addparam(self, name, value=None):
484 """add a stream level parameter"""
490 """add a stream level parameter"""
485 if not name:
491 if not name:
486 raise ValueError('empty parameter name')
492 raise ValueError('empty parameter name')
487 if name[0] not in string.letters:
493 if name[0] not in string.letters:
488 raise ValueError('non letter first character: %r' % name)
494 raise ValueError('non letter first character: %r' % name)
489 self._params.append((name, value))
495 self._params.append((name, value))
490
496
491 def addpart(self, part):
497 def addpart(self, part):
492 """add a new part to the bundle2 container
498 """add a new part to the bundle2 container
493
499
494 Parts contains the actual applicative payload."""
500 Parts contains the actual applicative payload."""
495 assert part.id is None
501 assert part.id is None
496 part.id = len(self._parts) # very cheap counter
502 part.id = len(self._parts) # very cheap counter
497 self._parts.append(part)
503 self._parts.append(part)
498
504
499 def newpart(self, typeid, *args, **kwargs):
505 def newpart(self, typeid, *args, **kwargs):
500 """create a new part and add it to the containers
506 """create a new part and add it to the containers
501
507
502 As the part is directly added to the containers. For now, this means
508 As the part is directly added to the containers. For now, this means
503 that any failure to properly initialize the part after calling
509 that any failure to properly initialize the part after calling
504 ``newpart`` should result in a failure of the whole bundling process.
510 ``newpart`` should result in a failure of the whole bundling process.
505
511
506 You can still fall back to manually create and add if you need better
512 You can still fall back to manually create and add if you need better
507 control."""
513 control."""
508 part = bundlepart(typeid, *args, **kwargs)
514 part = bundlepart(typeid, *args, **kwargs)
509 self.addpart(part)
515 self.addpart(part)
510 return part
516 return part
511
517
512 # methods used to generate the bundle2 stream
518 # methods used to generate the bundle2 stream
513 def getchunks(self):
519 def getchunks(self):
514 if self.ui.debugflag:
520 if self.ui.debugflag:
515 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
521 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
516 if self._params:
522 if self._params:
517 msg.append(' (%i params)' % len(self._params))
523 msg.append(' (%i params)' % len(self._params))
518 msg.append(' %i parts total\n' % len(self._parts))
524 msg.append(' %i parts total\n' % len(self._parts))
519 self.ui.debug(''.join(msg))
525 self.ui.debug(''.join(msg))
520 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
526 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
521 yield self._magicstring
527 yield self._magicstring
522 param = self._paramchunk()
528 param = self._paramchunk()
523 outdebug(self.ui, 'bundle parameter: %s' % param)
529 outdebug(self.ui, 'bundle parameter: %s' % param)
524 yield _pack(_fstreamparamsize, len(param))
530 yield _pack(_fstreamparamsize, len(param))
525 if param:
531 if param:
526 yield param
532 yield param
527
533
528 outdebug(self.ui, 'start of parts')
534 outdebug(self.ui, 'start of parts')
529 for part in self._parts:
535 for part in self._parts:
530 outdebug(self.ui, 'bundle part: "%s"' % part.type)
536 outdebug(self.ui, 'bundle part: "%s"' % part.type)
531 for chunk in part.getchunks(ui=self.ui):
537 for chunk in part.getchunks(ui=self.ui):
532 yield chunk
538 yield chunk
533 outdebug(self.ui, 'end of bundle')
539 outdebug(self.ui, 'end of bundle')
534 yield _pack(_fpartheadersize, 0)
540 yield _pack(_fpartheadersize, 0)
535
541
536 def _paramchunk(self):
542 def _paramchunk(self):
537 """return a encoded version of all stream parameters"""
543 """return a encoded version of all stream parameters"""
538 blocks = []
544 blocks = []
539 for par, value in self._params:
545 for par, value in self._params:
540 par = urllib.quote(par)
546 par = urllib.quote(par)
541 if value is not None:
547 if value is not None:
542 value = urllib.quote(value)
548 value = urllib.quote(value)
543 par = '%s=%s' % (par, value)
549 par = '%s=%s' % (par, value)
544 blocks.append(par)
550 blocks.append(par)
545 return ' '.join(blocks)
551 return ' '.join(blocks)
546
552
547 def salvageoutput(self):
553 def salvageoutput(self):
548 """return a list with a copy of all output parts in the bundle
554 """return a list with a copy of all output parts in the bundle
549
555
550 This is meant to be used during error handling to make sure we preserve
556 This is meant to be used during error handling to make sure we preserve
551 server output"""
557 server output"""
552 salvaged = []
558 salvaged = []
553 for part in self._parts:
559 for part in self._parts:
554 if part.type.startswith('output'):
560 if part.type.startswith('output'):
555 salvaged.append(part.copy())
561 salvaged.append(part.copy())
556 return salvaged
562 return salvaged
557
563
558
564
559 class unpackermixin(object):
565 class unpackermixin(object):
560 """A mixin to extract bytes and struct data from a stream"""
566 """A mixin to extract bytes and struct data from a stream"""
561
567
562 def __init__(self, fp):
568 def __init__(self, fp):
563 self._fp = fp
569 self._fp = fp
564 self._seekable = (util.safehasattr(fp, 'seek') and
570 self._seekable = (util.safehasattr(fp, 'seek') and
565 util.safehasattr(fp, 'tell'))
571 util.safehasattr(fp, 'tell'))
566
572
567 def _unpack(self, format):
573 def _unpack(self, format):
568 """unpack this struct format from the stream"""
574 """unpack this struct format from the stream"""
569 data = self._readexact(struct.calcsize(format))
575 data = self._readexact(struct.calcsize(format))
570 return _unpack(format, data)
576 return _unpack(format, data)
571
577
572 def _readexact(self, size):
578 def _readexact(self, size):
573 """read exactly <size> bytes from the stream"""
579 """read exactly <size> bytes from the stream"""
574 return changegroup.readexactly(self._fp, size)
580 return changegroup.readexactly(self._fp, size)
575
581
576 def seek(self, offset, whence=0):
582 def seek(self, offset, whence=0):
577 """move the underlying file pointer"""
583 """move the underlying file pointer"""
578 if self._seekable:
584 if self._seekable:
579 return self._fp.seek(offset, whence)
585 return self._fp.seek(offset, whence)
580 else:
586 else:
581 raise NotImplementedError(_('File pointer is not seekable'))
587 raise NotImplementedError(_('File pointer is not seekable'))
582
588
583 def tell(self):
589 def tell(self):
584 """return the file offset, or None if file is not seekable"""
590 """return the file offset, or None if file is not seekable"""
585 if self._seekable:
591 if self._seekable:
586 try:
592 try:
587 return self._fp.tell()
593 return self._fp.tell()
588 except IOError as e:
594 except IOError as e:
589 if e.errno == errno.ESPIPE:
595 if e.errno == errno.ESPIPE:
590 self._seekable = False
596 self._seekable = False
591 else:
597 else:
592 raise
598 raise
593 return None
599 return None
594
600
595 def close(self):
601 def close(self):
596 """close underlying file"""
602 """close underlying file"""
597 if util.safehasattr(self._fp, 'close'):
603 if util.safehasattr(self._fp, 'close'):
598 return self._fp.close()
604 return self._fp.close()
599
605
600 def getunbundler(ui, fp, magicstring=None):
606 def getunbundler(ui, fp, magicstring=None):
601 """return a valid unbundler object for a given magicstring"""
607 """return a valid unbundler object for a given magicstring"""
602 if magicstring is None:
608 if magicstring is None:
603 magicstring = changegroup.readexactly(fp, 4)
609 magicstring = changegroup.readexactly(fp, 4)
604 magic, version = magicstring[0:2], magicstring[2:4]
610 magic, version = magicstring[0:2], magicstring[2:4]
605 if magic != 'HG':
611 if magic != 'HG':
606 raise util.Abort(_('not a Mercurial bundle'))
612 raise util.Abort(_('not a Mercurial bundle'))
607 unbundlerclass = formatmap.get(version)
613 unbundlerclass = formatmap.get(version)
608 if unbundlerclass is None:
614 if unbundlerclass is None:
609 raise util.Abort(_('unknown bundle version %s') % version)
615 raise util.Abort(_('unknown bundle version %s') % version)
610 unbundler = unbundlerclass(ui, fp)
616 unbundler = unbundlerclass(ui, fp)
611 indebug(ui, 'start processing of %s stream' % magicstring)
617 indebug(ui, 'start processing of %s stream' % magicstring)
612 return unbundler
618 return unbundler
613
619
614 class unbundle20(unpackermixin):
620 class unbundle20(unpackermixin):
615 """interpret a bundle2 stream
621 """interpret a bundle2 stream
616
622
617 This class is fed with a binary stream and yields parts through its
623 This class is fed with a binary stream and yields parts through its
618 `iterparts` methods."""
624 `iterparts` methods."""
619
625
620 def __init__(self, ui, fp):
626 def __init__(self, ui, fp):
621 """If header is specified, we do not read it out of the stream."""
627 """If header is specified, we do not read it out of the stream."""
622 self.ui = ui
628 self.ui = ui
623 super(unbundle20, self).__init__(fp)
629 super(unbundle20, self).__init__(fp)
624
630
625 @util.propertycache
631 @util.propertycache
626 def params(self):
632 def params(self):
627 """dictionary of stream level parameters"""
633 """dictionary of stream level parameters"""
628 indebug(self.ui, 'reading bundle2 stream parameters')
634 indebug(self.ui, 'reading bundle2 stream parameters')
629 params = {}
635 params = {}
630 paramssize = self._unpack(_fstreamparamsize)[0]
636 paramssize = self._unpack(_fstreamparamsize)[0]
631 if paramssize < 0:
637 if paramssize < 0:
632 raise error.BundleValueError('negative bundle param size: %i'
638 raise error.BundleValueError('negative bundle param size: %i'
633 % paramssize)
639 % paramssize)
634 if paramssize:
640 if paramssize:
635 for p in self._readexact(paramssize).split(' '):
641 for p in self._readexact(paramssize).split(' '):
636 p = p.split('=', 1)
642 p = p.split('=', 1)
637 p = [urllib.unquote(i) for i in p]
643 p = [urllib.unquote(i) for i in p]
638 if len(p) < 2:
644 if len(p) < 2:
639 p.append(None)
645 p.append(None)
640 self._processparam(*p)
646 self._processparam(*p)
641 params[p[0]] = p[1]
647 params[p[0]] = p[1]
642 return params
648 return params
643
649
644 def _processparam(self, name, value):
650 def _processparam(self, name, value):
645 """process a parameter, applying its effect if needed
651 """process a parameter, applying its effect if needed
646
652
647 Parameter starting with a lower case letter are advisory and will be
653 Parameter starting with a lower case letter are advisory and will be
648 ignored when unknown. Those starting with an upper case letter are
654 ignored when unknown. Those starting with an upper case letter are
649 mandatory and will this function will raise a KeyError when unknown.
655 mandatory and will this function will raise a KeyError when unknown.
650
656
651 Note: no option are currently supported. Any input will be either
657 Note: no option are currently supported. Any input will be either
652 ignored or failing.
658 ignored or failing.
653 """
659 """
654 if not name:
660 if not name:
655 raise ValueError('empty parameter name')
661 raise ValueError('empty parameter name')
656 if name[0] not in string.letters:
662 if name[0] not in string.letters:
657 raise ValueError('non letter first character: %r' % name)
663 raise ValueError('non letter first character: %r' % name)
658 # Some logic will be later added here to try to process the option for
664 # Some logic will be later added here to try to process the option for
659 # a dict of known parameter.
665 # a dict of known parameter.
660 if name[0].islower():
666 if name[0].islower():
661 indebug(self.ui, "ignoring unknown parameter %r" % name)
667 indebug(self.ui, "ignoring unknown parameter %r" % name)
662 else:
668 else:
663 raise error.UnsupportedPartError(params=(name,))
669 raise error.UnsupportedPartError(params=(name,))
664
670
665
671
666 def iterparts(self):
672 def iterparts(self):
667 """yield all parts contained in the stream"""
673 """yield all parts contained in the stream"""
668 # make sure param have been loaded
674 # make sure param have been loaded
669 self.params
675 self.params
670 indebug(self.ui, 'start extraction of bundle2 parts')
676 indebug(self.ui, 'start extraction of bundle2 parts')
671 headerblock = self._readpartheader()
677 headerblock = self._readpartheader()
672 while headerblock is not None:
678 while headerblock is not None:
673 part = unbundlepart(self.ui, headerblock, self._fp)
679 part = unbundlepart(self.ui, headerblock, self._fp)
674 yield part
680 yield part
675 part.seek(0, 2)
681 part.seek(0, 2)
676 headerblock = self._readpartheader()
682 headerblock = self._readpartheader()
677 indebug(self.ui, 'end of bundle2 stream')
683 indebug(self.ui, 'end of bundle2 stream')
678
684
679 def _readpartheader(self):
685 def _readpartheader(self):
680 """reads a part header size and return the bytes blob
686 """reads a part header size and return the bytes blob
681
687
682 returns None if empty"""
688 returns None if empty"""
683 headersize = self._unpack(_fpartheadersize)[0]
689 headersize = self._unpack(_fpartheadersize)[0]
684 if headersize < 0:
690 if headersize < 0:
685 raise error.BundleValueError('negative part header size: %i'
691 raise error.BundleValueError('negative part header size: %i'
686 % headersize)
692 % headersize)
687 indebug(self.ui, 'part header size: %i' % headersize)
693 indebug(self.ui, 'part header size: %i' % headersize)
688 if headersize:
694 if headersize:
689 return self._readexact(headersize)
695 return self._readexact(headersize)
690 return None
696 return None
691
697
692 def compressed(self):
698 def compressed(self):
693 return False
699 return False
694
700
695 formatmap = {'20': unbundle20}
701 formatmap = {'20': unbundle20}
696
702
697 class bundlepart(object):
703 class bundlepart(object):
698 """A bundle2 part contains application level payload
704 """A bundle2 part contains application level payload
699
705
700 The part `type` is used to route the part to the application level
706 The part `type` is used to route the part to the application level
701 handler.
707 handler.
702
708
703 The part payload is contained in ``part.data``. It could be raw bytes or a
709 The part payload is contained in ``part.data``. It could be raw bytes or a
704 generator of byte chunks.
710 generator of byte chunks.
705
711
706 You can add parameters to the part using the ``addparam`` method.
712 You can add parameters to the part using the ``addparam`` method.
707 Parameters can be either mandatory (default) or advisory. Remote side
713 Parameters can be either mandatory (default) or advisory. Remote side
708 should be able to safely ignore the advisory ones.
714 should be able to safely ignore the advisory ones.
709
715
710 Both data and parameters cannot be modified after the generation has begun.
716 Both data and parameters cannot be modified after the generation has begun.
711 """
717 """
712
718
713 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
719 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
714 data='', mandatory=True):
720 data='', mandatory=True):
715 validateparttype(parttype)
721 validateparttype(parttype)
716 self.id = None
722 self.id = None
717 self.type = parttype
723 self.type = parttype
718 self._data = data
724 self._data = data
719 self._mandatoryparams = list(mandatoryparams)
725 self._mandatoryparams = list(mandatoryparams)
720 self._advisoryparams = list(advisoryparams)
726 self._advisoryparams = list(advisoryparams)
721 # checking for duplicated entries
727 # checking for duplicated entries
722 self._seenparams = set()
728 self._seenparams = set()
723 for pname, __ in self._mandatoryparams + self._advisoryparams:
729 for pname, __ in self._mandatoryparams + self._advisoryparams:
724 if pname in self._seenparams:
730 if pname in self._seenparams:
725 raise RuntimeError('duplicated params: %s' % pname)
731 raise RuntimeError('duplicated params: %s' % pname)
726 self._seenparams.add(pname)
732 self._seenparams.add(pname)
727 # status of the part's generation:
733 # status of the part's generation:
728 # - None: not started,
734 # - None: not started,
729 # - False: currently generated,
735 # - False: currently generated,
730 # - True: generation done.
736 # - True: generation done.
731 self._generated = None
737 self._generated = None
732 self.mandatory = mandatory
738 self.mandatory = mandatory
733
739
734 def copy(self):
740 def copy(self):
735 """return a copy of the part
741 """return a copy of the part
736
742
737 The new part have the very same content but no partid assigned yet.
743 The new part have the very same content but no partid assigned yet.
738 Parts with generated data cannot be copied."""
744 Parts with generated data cannot be copied."""
739 assert not util.safehasattr(self.data, 'next')
745 assert not util.safehasattr(self.data, 'next')
740 return self.__class__(self.type, self._mandatoryparams,
746 return self.__class__(self.type, self._mandatoryparams,
741 self._advisoryparams, self._data, self.mandatory)
747 self._advisoryparams, self._data, self.mandatory)
742
748
743 # methods used to defines the part content
749 # methods used to defines the part content
744 def __setdata(self, data):
750 def __setdata(self, data):
745 if self._generated is not None:
751 if self._generated is not None:
746 raise error.ReadOnlyPartError('part is being generated')
752 raise error.ReadOnlyPartError('part is being generated')
747 self._data = data
753 self._data = data
748 def __getdata(self):
754 def __getdata(self):
749 return self._data
755 return self._data
750 data = property(__getdata, __setdata)
756 data = property(__getdata, __setdata)
751
757
752 @property
758 @property
753 def mandatoryparams(self):
759 def mandatoryparams(self):
754 # make it an immutable tuple to force people through ``addparam``
760 # make it an immutable tuple to force people through ``addparam``
755 return tuple(self._mandatoryparams)
761 return tuple(self._mandatoryparams)
756
762
757 @property
763 @property
758 def advisoryparams(self):
764 def advisoryparams(self):
759 # make it an immutable tuple to force people through ``addparam``
765 # make it an immutable tuple to force people through ``addparam``
760 return tuple(self._advisoryparams)
766 return tuple(self._advisoryparams)
761
767
762 def addparam(self, name, value='', mandatory=True):
768 def addparam(self, name, value='', mandatory=True):
763 if self._generated is not None:
769 if self._generated is not None:
764 raise error.ReadOnlyPartError('part is being generated')
770 raise error.ReadOnlyPartError('part is being generated')
765 if name in self._seenparams:
771 if name in self._seenparams:
766 raise ValueError('duplicated params: %s' % name)
772 raise ValueError('duplicated params: %s' % name)
767 self._seenparams.add(name)
773 self._seenparams.add(name)
768 params = self._advisoryparams
774 params = self._advisoryparams
769 if mandatory:
775 if mandatory:
770 params = self._mandatoryparams
776 params = self._mandatoryparams
771 params.append((name, value))
777 params.append((name, value))
772
778
773 # methods used to generates the bundle2 stream
779 # methods used to generates the bundle2 stream
774 def getchunks(self, ui):
780 def getchunks(self, ui):
775 if self._generated is not None:
781 if self._generated is not None:
776 raise RuntimeError('part can only be consumed once')
782 raise RuntimeError('part can only be consumed once')
777 self._generated = False
783 self._generated = False
778
784
779 if ui.debugflag:
785 if ui.debugflag:
780 msg = ['bundle2-output-part: "%s"' % self.type]
786 msg = ['bundle2-output-part: "%s"' % self.type]
781 if not self.mandatory:
787 if not self.mandatory:
782 msg.append(' (advisory)')
788 msg.append(' (advisory)')
783 nbmp = len(self.mandatoryparams)
789 nbmp = len(self.mandatoryparams)
784 nbap = len(self.advisoryparams)
790 nbap = len(self.advisoryparams)
785 if nbmp or nbap:
791 if nbmp or nbap:
786 msg.append(' (params:')
792 msg.append(' (params:')
787 if nbmp:
793 if nbmp:
788 msg.append(' %i mandatory' % nbmp)
794 msg.append(' %i mandatory' % nbmp)
789 if nbap:
795 if nbap:
790 msg.append(' %i advisory' % nbmp)
796 msg.append(' %i advisory' % nbmp)
791 msg.append(')')
797 msg.append(')')
792 if not self.data:
798 if not self.data:
793 msg.append(' empty payload')
799 msg.append(' empty payload')
794 elif util.safehasattr(self.data, 'next'):
800 elif util.safehasattr(self.data, 'next'):
795 msg.append(' streamed payload')
801 msg.append(' streamed payload')
796 else:
802 else:
797 msg.append(' %i bytes payload' % len(self.data))
803 msg.append(' %i bytes payload' % len(self.data))
798 msg.append('\n')
804 msg.append('\n')
799 ui.debug(''.join(msg))
805 ui.debug(''.join(msg))
800
806
801 #### header
807 #### header
802 if self.mandatory:
808 if self.mandatory:
803 parttype = self.type.upper()
809 parttype = self.type.upper()
804 else:
810 else:
805 parttype = self.type.lower()
811 parttype = self.type.lower()
806 outdebug(ui, 'part %s: "%s"' % (self.id, parttype))
812 outdebug(ui, 'part %s: "%s"' % (self.id, parttype))
807 ## parttype
813 ## parttype
808 header = [_pack(_fparttypesize, len(parttype)),
814 header = [_pack(_fparttypesize, len(parttype)),
809 parttype, _pack(_fpartid, self.id),
815 parttype, _pack(_fpartid, self.id),
810 ]
816 ]
811 ## parameters
817 ## parameters
812 # count
818 # count
813 manpar = self.mandatoryparams
819 manpar = self.mandatoryparams
814 advpar = self.advisoryparams
820 advpar = self.advisoryparams
815 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
821 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
816 # size
822 # size
817 parsizes = []
823 parsizes = []
818 for key, value in manpar:
824 for key, value in manpar:
819 parsizes.append(len(key))
825 parsizes.append(len(key))
820 parsizes.append(len(value))
826 parsizes.append(len(value))
821 for key, value in advpar:
827 for key, value in advpar:
822 parsizes.append(len(key))
828 parsizes.append(len(key))
823 parsizes.append(len(value))
829 parsizes.append(len(value))
824 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
830 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
825 header.append(paramsizes)
831 header.append(paramsizes)
826 # key, value
832 # key, value
827 for key, value in manpar:
833 for key, value in manpar:
828 header.append(key)
834 header.append(key)
829 header.append(value)
835 header.append(value)
830 for key, value in advpar:
836 for key, value in advpar:
831 header.append(key)
837 header.append(key)
832 header.append(value)
838 header.append(value)
833 ## finalize header
839 ## finalize header
834 headerchunk = ''.join(header)
840 headerchunk = ''.join(header)
835 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
841 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
836 yield _pack(_fpartheadersize, len(headerchunk))
842 yield _pack(_fpartheadersize, len(headerchunk))
837 yield headerchunk
843 yield headerchunk
838 ## payload
844 ## payload
839 try:
845 try:
840 for chunk in self._payloadchunks():
846 for chunk in self._payloadchunks():
841 outdebug(ui, 'payload chunk size: %i' % len(chunk))
847 outdebug(ui, 'payload chunk size: %i' % len(chunk))
842 yield _pack(_fpayloadsize, len(chunk))
848 yield _pack(_fpayloadsize, len(chunk))
843 yield chunk
849 yield chunk
844 except BaseException as exc:
850 except BaseException as exc:
845 # backup exception data for later
851 # backup exception data for later
846 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
852 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
847 % exc)
853 % exc)
848 exc_info = sys.exc_info()
854 exc_info = sys.exc_info()
849 msg = 'unexpected error: %s' % exc
855 msg = 'unexpected error: %s' % exc
850 interpart = bundlepart('error:abort', [('message', msg)],
856 interpart = bundlepart('error:abort', [('message', msg)],
851 mandatory=False)
857 mandatory=False)
852 interpart.id = 0
858 interpart.id = 0
853 yield _pack(_fpayloadsize, -1)
859 yield _pack(_fpayloadsize, -1)
854 for chunk in interpart.getchunks(ui=ui):
860 for chunk in interpart.getchunks(ui=ui):
855 yield chunk
861 yield chunk
856 outdebug(ui, 'closing payload chunk')
862 outdebug(ui, 'closing payload chunk')
857 # abort current part payload
863 # abort current part payload
858 yield _pack(_fpayloadsize, 0)
864 yield _pack(_fpayloadsize, 0)
859 raise exc_info[0], exc_info[1], exc_info[2]
865 raise exc_info[0], exc_info[1], exc_info[2]
860 # end of payload
866 # end of payload
861 outdebug(ui, 'closing payload chunk')
867 outdebug(ui, 'closing payload chunk')
862 yield _pack(_fpayloadsize, 0)
868 yield _pack(_fpayloadsize, 0)
863 self._generated = True
869 self._generated = True
864
870
865 def _payloadchunks(self):
871 def _payloadchunks(self):
866 """yield chunks of a the part payload
872 """yield chunks of a the part payload
867
873
868 Exists to handle the different methods to provide data to a part."""
874 Exists to handle the different methods to provide data to a part."""
869 # we only support fixed size data now.
875 # we only support fixed size data now.
870 # This will be improved in the future.
876 # This will be improved in the future.
871 if util.safehasattr(self.data, 'next'):
877 if util.safehasattr(self.data, 'next'):
872 buff = util.chunkbuffer(self.data)
878 buff = util.chunkbuffer(self.data)
873 chunk = buff.read(preferedchunksize)
879 chunk = buff.read(preferedchunksize)
874 while chunk:
880 while chunk:
875 yield chunk
881 yield chunk
876 chunk = buff.read(preferedchunksize)
882 chunk = buff.read(preferedchunksize)
877 elif len(self.data):
883 elif len(self.data):
878 yield self.data
884 yield self.data
879
885
880
886
881 flaginterrupt = -1
887 flaginterrupt = -1
882
888
883 class interrupthandler(unpackermixin):
889 class interrupthandler(unpackermixin):
884 """read one part and process it with restricted capability
890 """read one part and process it with restricted capability
885
891
886 This allows to transmit exception raised on the producer size during part
892 This allows to transmit exception raised on the producer size during part
887 iteration while the consumer is reading a part.
893 iteration while the consumer is reading a part.
888
894
889 Part processed in this manner only have access to a ui object,"""
895 Part processed in this manner only have access to a ui object,"""
890
896
891 def __init__(self, ui, fp):
897 def __init__(self, ui, fp):
892 super(interrupthandler, self).__init__(fp)
898 super(interrupthandler, self).__init__(fp)
893 self.ui = ui
899 self.ui = ui
894
900
895 def _readpartheader(self):
901 def _readpartheader(self):
896 """reads a part header size and return the bytes blob
902 """reads a part header size and return the bytes blob
897
903
898 returns None if empty"""
904 returns None if empty"""
899 headersize = self._unpack(_fpartheadersize)[0]
905 headersize = self._unpack(_fpartheadersize)[0]
900 if headersize < 0:
906 if headersize < 0:
901 raise error.BundleValueError('negative part header size: %i'
907 raise error.BundleValueError('negative part header size: %i'
902 % headersize)
908 % headersize)
903 indebug(self.ui, 'part header size: %i\n' % headersize)
909 indebug(self.ui, 'part header size: %i\n' % headersize)
904 if headersize:
910 if headersize:
905 return self._readexact(headersize)
911 return self._readexact(headersize)
906 return None
912 return None
907
913
908 def __call__(self):
914 def __call__(self):
909
915
910 self.ui.debug('bundle2-input-stream-interrupt:'
916 self.ui.debug('bundle2-input-stream-interrupt:'
911 ' opening out of band context\n')
917 ' opening out of band context\n')
912 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
918 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
913 headerblock = self._readpartheader()
919 headerblock = self._readpartheader()
914 if headerblock is None:
920 if headerblock is None:
915 indebug(self.ui, 'no part found during interruption.')
921 indebug(self.ui, 'no part found during interruption.')
916 return
922 return
917 part = unbundlepart(self.ui, headerblock, self._fp)
923 part = unbundlepart(self.ui, headerblock, self._fp)
918 op = interruptoperation(self.ui)
924 op = interruptoperation(self.ui)
919 _processpart(op, part)
925 _processpart(op, part)
920 self.ui.debug('bundle2-input-stream-interrupt:'
926 self.ui.debug('bundle2-input-stream-interrupt:'
921 ' closing out of band context\n')
927 ' closing out of band context\n')
922
928
923 class interruptoperation(object):
929 class interruptoperation(object):
924 """A limited operation to be use by part handler during interruption
930 """A limited operation to be use by part handler during interruption
925
931
926 It only have access to an ui object.
932 It only have access to an ui object.
927 """
933 """
928
934
929 def __init__(self, ui):
935 def __init__(self, ui):
930 self.ui = ui
936 self.ui = ui
931 self.reply = None
937 self.reply = None
932 self.captureoutput = False
938 self.captureoutput = False
933
939
934 @property
940 @property
935 def repo(self):
941 def repo(self):
936 raise RuntimeError('no repo access from stream interruption')
942 raise RuntimeError('no repo access from stream interruption')
937
943
938 def gettransaction(self):
944 def gettransaction(self):
939 raise TransactionUnavailable('no repo access from stream interruption')
945 raise TransactionUnavailable('no repo access from stream interruption')
940
946
941 class unbundlepart(unpackermixin):
947 class unbundlepart(unpackermixin):
942 """a bundle part read from a bundle"""
948 """a bundle part read from a bundle"""
943
949
944 def __init__(self, ui, header, fp):
950 def __init__(self, ui, header, fp):
945 super(unbundlepart, self).__init__(fp)
951 super(unbundlepart, self).__init__(fp)
946 self.ui = ui
952 self.ui = ui
947 # unbundle state attr
953 # unbundle state attr
948 self._headerdata = header
954 self._headerdata = header
949 self._headeroffset = 0
955 self._headeroffset = 0
950 self._initialized = False
956 self._initialized = False
951 self.consumed = False
957 self.consumed = False
952 # part data
958 # part data
953 self.id = None
959 self.id = None
954 self.type = None
960 self.type = None
955 self.mandatoryparams = None
961 self.mandatoryparams = None
956 self.advisoryparams = None
962 self.advisoryparams = None
957 self.params = None
963 self.params = None
958 self.mandatorykeys = ()
964 self.mandatorykeys = ()
959 self._payloadstream = None
965 self._payloadstream = None
960 self._readheader()
966 self._readheader()
961 self._mandatory = None
967 self._mandatory = None
962 self._chunkindex = [] #(payload, file) position tuples for chunk starts
968 self._chunkindex = [] #(payload, file) position tuples for chunk starts
963 self._pos = 0
969 self._pos = 0
964
970
965 def _fromheader(self, size):
971 def _fromheader(self, size):
966 """return the next <size> byte from the header"""
972 """return the next <size> byte from the header"""
967 offset = self._headeroffset
973 offset = self._headeroffset
968 data = self._headerdata[offset:(offset + size)]
974 data = self._headerdata[offset:(offset + size)]
969 self._headeroffset = offset + size
975 self._headeroffset = offset + size
970 return data
976 return data
971
977
972 def _unpackheader(self, format):
978 def _unpackheader(self, format):
973 """read given format from header
979 """read given format from header
974
980
975 This automatically compute the size of the format to read."""
981 This automatically compute the size of the format to read."""
976 data = self._fromheader(struct.calcsize(format))
982 data = self._fromheader(struct.calcsize(format))
977 return _unpack(format, data)
983 return _unpack(format, data)
978
984
979 def _initparams(self, mandatoryparams, advisoryparams):
985 def _initparams(self, mandatoryparams, advisoryparams):
980 """internal function to setup all logic related parameters"""
986 """internal function to setup all logic related parameters"""
981 # make it read only to prevent people touching it by mistake.
987 # make it read only to prevent people touching it by mistake.
982 self.mandatoryparams = tuple(mandatoryparams)
988 self.mandatoryparams = tuple(mandatoryparams)
983 self.advisoryparams = tuple(advisoryparams)
989 self.advisoryparams = tuple(advisoryparams)
984 # user friendly UI
990 # user friendly UI
985 self.params = dict(self.mandatoryparams)
991 self.params = dict(self.mandatoryparams)
986 self.params.update(dict(self.advisoryparams))
992 self.params.update(dict(self.advisoryparams))
987 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
993 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
988
994
989 def _payloadchunks(self, chunknum=0):
995 def _payloadchunks(self, chunknum=0):
990 '''seek to specified chunk and start yielding data'''
996 '''seek to specified chunk and start yielding data'''
991 if len(self._chunkindex) == 0:
997 if len(self._chunkindex) == 0:
992 assert chunknum == 0, 'Must start with chunk 0'
998 assert chunknum == 0, 'Must start with chunk 0'
993 self._chunkindex.append((0, super(unbundlepart, self).tell()))
999 self._chunkindex.append((0, super(unbundlepart, self).tell()))
994 else:
1000 else:
995 assert chunknum < len(self._chunkindex), \
1001 assert chunknum < len(self._chunkindex), \
996 'Unknown chunk %d' % chunknum
1002 'Unknown chunk %d' % chunknum
997 super(unbundlepart, self).seek(self._chunkindex[chunknum][1])
1003 super(unbundlepart, self).seek(self._chunkindex[chunknum][1])
998
1004
999 pos = self._chunkindex[chunknum][0]
1005 pos = self._chunkindex[chunknum][0]
1000 payloadsize = self._unpack(_fpayloadsize)[0]
1006 payloadsize = self._unpack(_fpayloadsize)[0]
1001 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1007 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1002 while payloadsize:
1008 while payloadsize:
1003 if payloadsize == flaginterrupt:
1009 if payloadsize == flaginterrupt:
1004 # interruption detection, the handler will now read a
1010 # interruption detection, the handler will now read a
1005 # single part and process it.
1011 # single part and process it.
1006 interrupthandler(self.ui, self._fp)()
1012 interrupthandler(self.ui, self._fp)()
1007 elif payloadsize < 0:
1013 elif payloadsize < 0:
1008 msg = 'negative payload chunk size: %i' % payloadsize
1014 msg = 'negative payload chunk size: %i' % payloadsize
1009 raise error.BundleValueError(msg)
1015 raise error.BundleValueError(msg)
1010 else:
1016 else:
1011 result = self._readexact(payloadsize)
1017 result = self._readexact(payloadsize)
1012 chunknum += 1
1018 chunknum += 1
1013 pos += payloadsize
1019 pos += payloadsize
1014 if chunknum == len(self._chunkindex):
1020 if chunknum == len(self._chunkindex):
1015 self._chunkindex.append((pos,
1021 self._chunkindex.append((pos,
1016 super(unbundlepart, self).tell()))
1022 super(unbundlepart, self).tell()))
1017 yield result
1023 yield result
1018 payloadsize = self._unpack(_fpayloadsize)[0]
1024 payloadsize = self._unpack(_fpayloadsize)[0]
1019 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1025 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1020
1026
1021 def _findchunk(self, pos):
1027 def _findchunk(self, pos):
1022 '''for a given payload position, return a chunk number and offset'''
1028 '''for a given payload position, return a chunk number and offset'''
1023 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1029 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1024 if ppos == pos:
1030 if ppos == pos:
1025 return chunk, 0
1031 return chunk, 0
1026 elif ppos > pos:
1032 elif ppos > pos:
1027 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1033 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1028 raise ValueError('Unknown chunk')
1034 raise ValueError('Unknown chunk')
1029
1035
1030 def _readheader(self):
1036 def _readheader(self):
1031 """read the header and setup the object"""
1037 """read the header and setup the object"""
1032 typesize = self._unpackheader(_fparttypesize)[0]
1038 typesize = self._unpackheader(_fparttypesize)[0]
1033 self.type = self._fromheader(typesize)
1039 self.type = self._fromheader(typesize)
1034 indebug(self.ui, 'part type: "%s"' % self.type)
1040 indebug(self.ui, 'part type: "%s"' % self.type)
1035 self.id = self._unpackheader(_fpartid)[0]
1041 self.id = self._unpackheader(_fpartid)[0]
1036 indebug(self.ui, 'part id: "%s"' % self.id)
1042 indebug(self.ui, 'part id: "%s"' % self.id)
1037 # extract mandatory bit from type
1043 # extract mandatory bit from type
1038 self.mandatory = (self.type != self.type.lower())
1044 self.mandatory = (self.type != self.type.lower())
1039 self.type = self.type.lower()
1045 self.type = self.type.lower()
1040 ## reading parameters
1046 ## reading parameters
1041 # param count
1047 # param count
1042 mancount, advcount = self._unpackheader(_fpartparamcount)
1048 mancount, advcount = self._unpackheader(_fpartparamcount)
1043 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1049 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1044 # param size
1050 # param size
1045 fparamsizes = _makefpartparamsizes(mancount + advcount)
1051 fparamsizes = _makefpartparamsizes(mancount + advcount)
1046 paramsizes = self._unpackheader(fparamsizes)
1052 paramsizes = self._unpackheader(fparamsizes)
1047 # make it a list of couple again
1053 # make it a list of couple again
1048 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
1054 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
1049 # split mandatory from advisory
1055 # split mandatory from advisory
1050 mansizes = paramsizes[:mancount]
1056 mansizes = paramsizes[:mancount]
1051 advsizes = paramsizes[mancount:]
1057 advsizes = paramsizes[mancount:]
1052 # retrieve param value
1058 # retrieve param value
1053 manparams = []
1059 manparams = []
1054 for key, value in mansizes:
1060 for key, value in mansizes:
1055 manparams.append((self._fromheader(key), self._fromheader(value)))
1061 manparams.append((self._fromheader(key), self._fromheader(value)))
1056 advparams = []
1062 advparams = []
1057 for key, value in advsizes:
1063 for key, value in advsizes:
1058 advparams.append((self._fromheader(key), self._fromheader(value)))
1064 advparams.append((self._fromheader(key), self._fromheader(value)))
1059 self._initparams(manparams, advparams)
1065 self._initparams(manparams, advparams)
1060 ## part payload
1066 ## part payload
1061 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1067 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1062 # we read the data, tell it
1068 # we read the data, tell it
1063 self._initialized = True
1069 self._initialized = True
1064
1070
1065 def read(self, size=None):
1071 def read(self, size=None):
1066 """read payload data"""
1072 """read payload data"""
1067 if not self._initialized:
1073 if not self._initialized:
1068 self._readheader()
1074 self._readheader()
1069 if size is None:
1075 if size is None:
1070 data = self._payloadstream.read()
1076 data = self._payloadstream.read()
1071 else:
1077 else:
1072 data = self._payloadstream.read(size)
1078 data = self._payloadstream.read(size)
1073 self._pos += len(data)
1079 self._pos += len(data)
1074 if size is None or len(data) < size:
1080 if size is None or len(data) < size:
1075 if not self.consumed and self._pos:
1081 if not self.consumed and self._pos:
1076 self.ui.debug('bundle2-input-part: total payload size %i\n'
1082 self.ui.debug('bundle2-input-part: total payload size %i\n'
1077 % self._pos)
1083 % self._pos)
1078 self.consumed = True
1084 self.consumed = True
1079 return data
1085 return data
1080
1086
1081 def tell(self):
1087 def tell(self):
1082 return self._pos
1088 return self._pos
1083
1089
1084 def seek(self, offset, whence=0):
1090 def seek(self, offset, whence=0):
1085 if whence == 0:
1091 if whence == 0:
1086 newpos = offset
1092 newpos = offset
1087 elif whence == 1:
1093 elif whence == 1:
1088 newpos = self._pos + offset
1094 newpos = self._pos + offset
1089 elif whence == 2:
1095 elif whence == 2:
1090 if not self.consumed:
1096 if not self.consumed:
1091 self.read()
1097 self.read()
1092 newpos = self._chunkindex[-1][0] - offset
1098 newpos = self._chunkindex[-1][0] - offset
1093 else:
1099 else:
1094 raise ValueError('Unknown whence value: %r' % (whence,))
1100 raise ValueError('Unknown whence value: %r' % (whence,))
1095
1101
1096 if newpos > self._chunkindex[-1][0] and not self.consumed:
1102 if newpos > self._chunkindex[-1][0] and not self.consumed:
1097 self.read()
1103 self.read()
1098 if not 0 <= newpos <= self._chunkindex[-1][0]:
1104 if not 0 <= newpos <= self._chunkindex[-1][0]:
1099 raise ValueError('Offset out of range')
1105 raise ValueError('Offset out of range')
1100
1106
1101 if self._pos != newpos:
1107 if self._pos != newpos:
1102 chunk, internaloffset = self._findchunk(newpos)
1108 chunk, internaloffset = self._findchunk(newpos)
1103 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1109 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1104 adjust = self.read(internaloffset)
1110 adjust = self.read(internaloffset)
1105 if len(adjust) != internaloffset:
1111 if len(adjust) != internaloffset:
1106 raise util.Abort(_('Seek failed\n'))
1112 raise util.Abort(_('Seek failed\n'))
1107 self._pos = newpos
1113 self._pos = newpos
1108
1114
1109 # These are only the static capabilities.
1115 # These are only the static capabilities.
1110 # Check the 'getrepocaps' function for the rest.
1116 # Check the 'getrepocaps' function for the rest.
1111 capabilities = {'HG20': (),
1117 capabilities = {'HG20': (),
1112 'error': ('abort', 'unsupportedcontent', 'pushraced',
1118 'error': ('abort', 'unsupportedcontent', 'pushraced',
1113 'pushkey'),
1119 'pushkey'),
1114 'listkeys': (),
1120 'listkeys': (),
1115 'pushkey': (),
1121 'pushkey': (),
1116 'digests': tuple(sorted(util.DIGESTS.keys())),
1122 'digests': tuple(sorted(util.DIGESTS.keys())),
1117 'remote-changegroup': ('http', 'https'),
1123 'remote-changegroup': ('http', 'https'),
1118 'hgtagsfnodes': (),
1124 'hgtagsfnodes': (),
1119 }
1125 }
1120
1126
1121 def getrepocaps(repo, allowpushback=False):
1127 def getrepocaps(repo, allowpushback=False):
1122 """return the bundle2 capabilities for a given repo
1128 """return the bundle2 capabilities for a given repo
1123
1129
1124 Exists to allow extensions (like evolution) to mutate the capabilities.
1130 Exists to allow extensions (like evolution) to mutate the capabilities.
1125 """
1131 """
1126 caps = capabilities.copy()
1132 caps = capabilities.copy()
1127 caps['changegroup'] = tuple(sorted(changegroup.packermap.keys()))
1133 caps['changegroup'] = tuple(sorted(changegroup.packermap.keys()))
1128 if obsolete.isenabled(repo, obsolete.exchangeopt):
1134 if obsolete.isenabled(repo, obsolete.exchangeopt):
1129 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1135 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1130 caps['obsmarkers'] = supportedformat
1136 caps['obsmarkers'] = supportedformat
1131 if allowpushback:
1137 if allowpushback:
1132 caps['pushback'] = ()
1138 caps['pushback'] = ()
1133 return caps
1139 return caps
1134
1140
1135 def bundle2caps(remote):
1141 def bundle2caps(remote):
1136 """return the bundle capabilities of a peer as dict"""
1142 """return the bundle capabilities of a peer as dict"""
1137 raw = remote.capable('bundle2')
1143 raw = remote.capable('bundle2')
1138 if not raw and raw != '':
1144 if not raw and raw != '':
1139 return {}
1145 return {}
1140 capsblob = urllib.unquote(remote.capable('bundle2'))
1146 capsblob = urllib.unquote(remote.capable('bundle2'))
1141 return decodecaps(capsblob)
1147 return decodecaps(capsblob)
1142
1148
1143 def obsmarkersversion(caps):
1149 def obsmarkersversion(caps):
1144 """extract the list of supported obsmarkers versions from a bundle2caps dict
1150 """extract the list of supported obsmarkers versions from a bundle2caps dict
1145 """
1151 """
1146 obscaps = caps.get('obsmarkers', ())
1152 obscaps = caps.get('obsmarkers', ())
1147 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1153 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1148
1154
1149 @parthandler('changegroup', ('version', 'nbchanges'))
1155 @parthandler('changegroup', ('version', 'nbchanges'))
1150 def handlechangegroup(op, inpart):
1156 def handlechangegroup(op, inpart):
1151 """apply a changegroup part on the repo
1157 """apply a changegroup part on the repo
1152
1158
1153 This is a very early implementation that will massive rework before being
1159 This is a very early implementation that will massive rework before being
1154 inflicted to any end-user.
1160 inflicted to any end-user.
1155 """
1161 """
1156 # Make sure we trigger a transaction creation
1162 # Make sure we trigger a transaction creation
1157 #
1163 #
1158 # The addchangegroup function will get a transaction object by itself, but
1164 # The addchangegroup function will get a transaction object by itself, but
1159 # we need to make sure we trigger the creation of a transaction object used
1165 # we need to make sure we trigger the creation of a transaction object used
1160 # for the whole processing scope.
1166 # for the whole processing scope.
1161 op.gettransaction()
1167 op.gettransaction()
1162 unpackerversion = inpart.params.get('version', '01')
1168 unpackerversion = inpart.params.get('version', '01')
1163 # We should raise an appropriate exception here
1169 # We should raise an appropriate exception here
1164 unpacker = changegroup.packermap[unpackerversion][1]
1170 unpacker = changegroup.packermap[unpackerversion][1]
1165 cg = unpacker(inpart, 'UN')
1171 cg = unpacker(inpart, 'UN')
1166 # the source and url passed here are overwritten by the one contained in
1172 # the source and url passed here are overwritten by the one contained in
1167 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1173 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1168 nbchangesets = None
1174 nbchangesets = None
1169 if 'nbchanges' in inpart.params:
1175 if 'nbchanges' in inpart.params:
1170 nbchangesets = int(inpart.params.get('nbchanges'))
1176 nbchangesets = int(inpart.params.get('nbchanges'))
1171 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2',
1177 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2',
1172 expectedtotal=nbchangesets)
1178 expectedtotal=nbchangesets)
1173 op.records.add('changegroup', {'return': ret})
1179 op.records.add('changegroup', {'return': ret})
1174 if op.reply is not None:
1180 if op.reply is not None:
1175 # This is definitely not the final form of this
1181 # This is definitely not the final form of this
1176 # return. But one need to start somewhere.
1182 # return. But one need to start somewhere.
1177 part = op.reply.newpart('reply:changegroup', mandatory=False)
1183 part = op.reply.newpart('reply:changegroup', mandatory=False)
1178 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1184 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1179 part.addparam('return', '%i' % ret, mandatory=False)
1185 part.addparam('return', '%i' % ret, mandatory=False)
1180 assert not inpart.read()
1186 assert not inpart.read()
1181
1187
1182 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1188 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1183 ['digest:%s' % k for k in util.DIGESTS.keys()])
1189 ['digest:%s' % k for k in util.DIGESTS.keys()])
1184 @parthandler('remote-changegroup', _remotechangegroupparams)
1190 @parthandler('remote-changegroup', _remotechangegroupparams)
1185 def handleremotechangegroup(op, inpart):
1191 def handleremotechangegroup(op, inpart):
1186 """apply a bundle10 on the repo, given an url and validation information
1192 """apply a bundle10 on the repo, given an url and validation information
1187
1193
1188 All the information about the remote bundle to import are given as
1194 All the information about the remote bundle to import are given as
1189 parameters. The parameters include:
1195 parameters. The parameters include:
1190 - url: the url to the bundle10.
1196 - url: the url to the bundle10.
1191 - size: the bundle10 file size. It is used to validate what was
1197 - size: the bundle10 file size. It is used to validate what was
1192 retrieved by the client matches the server knowledge about the bundle.
1198 retrieved by the client matches the server knowledge about the bundle.
1193 - digests: a space separated list of the digest types provided as
1199 - digests: a space separated list of the digest types provided as
1194 parameters.
1200 parameters.
1195 - digest:<digest-type>: the hexadecimal representation of the digest with
1201 - digest:<digest-type>: the hexadecimal representation of the digest with
1196 that name. Like the size, it is used to validate what was retrieved by
1202 that name. Like the size, it is used to validate what was retrieved by
1197 the client matches what the server knows about the bundle.
1203 the client matches what the server knows about the bundle.
1198
1204
1199 When multiple digest types are given, all of them are checked.
1205 When multiple digest types are given, all of them are checked.
1200 """
1206 """
1201 try:
1207 try:
1202 raw_url = inpart.params['url']
1208 raw_url = inpart.params['url']
1203 except KeyError:
1209 except KeyError:
1204 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1210 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1205 parsed_url = util.url(raw_url)
1211 parsed_url = util.url(raw_url)
1206 if parsed_url.scheme not in capabilities['remote-changegroup']:
1212 if parsed_url.scheme not in capabilities['remote-changegroup']:
1207 raise util.Abort(_('remote-changegroup does not support %s urls') %
1213 raise util.Abort(_('remote-changegroup does not support %s urls') %
1208 parsed_url.scheme)
1214 parsed_url.scheme)
1209
1215
1210 try:
1216 try:
1211 size = int(inpart.params['size'])
1217 size = int(inpart.params['size'])
1212 except ValueError:
1218 except ValueError:
1213 raise util.Abort(_('remote-changegroup: invalid value for param "%s"')
1219 raise util.Abort(_('remote-changegroup: invalid value for param "%s"')
1214 % 'size')
1220 % 'size')
1215 except KeyError:
1221 except KeyError:
1216 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1222 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1217
1223
1218 digests = {}
1224 digests = {}
1219 for typ in inpart.params.get('digests', '').split():
1225 for typ in inpart.params.get('digests', '').split():
1220 param = 'digest:%s' % typ
1226 param = 'digest:%s' % typ
1221 try:
1227 try:
1222 value = inpart.params[param]
1228 value = inpart.params[param]
1223 except KeyError:
1229 except KeyError:
1224 raise util.Abort(_('remote-changegroup: missing "%s" param') %
1230 raise util.Abort(_('remote-changegroup: missing "%s" param') %
1225 param)
1231 param)
1226 digests[typ] = value
1232 digests[typ] = value
1227
1233
1228 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1234 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1229
1235
1230 # Make sure we trigger a transaction creation
1236 # Make sure we trigger a transaction creation
1231 #
1237 #
1232 # The addchangegroup function will get a transaction object by itself, but
1238 # The addchangegroup function will get a transaction object by itself, but
1233 # we need to make sure we trigger the creation of a transaction object used
1239 # we need to make sure we trigger the creation of a transaction object used
1234 # for the whole processing scope.
1240 # for the whole processing scope.
1235 op.gettransaction()
1241 op.gettransaction()
1236 import exchange
1242 from . import exchange
1237 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1243 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1238 if not isinstance(cg, changegroup.cg1unpacker):
1244 if not isinstance(cg, changegroup.cg1unpacker):
1239 raise util.Abort(_('%s: not a bundle version 1.0') %
1245 raise util.Abort(_('%s: not a bundle version 1.0') %
1240 util.hidepassword(raw_url))
1246 util.hidepassword(raw_url))
1241 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
1247 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
1242 op.records.add('changegroup', {'return': ret})
1248 op.records.add('changegroup', {'return': ret})
1243 if op.reply is not None:
1249 if op.reply is not None:
1244 # This is definitely not the final form of this
1250 # This is definitely not the final form of this
1245 # return. But one need to start somewhere.
1251 # return. But one need to start somewhere.
1246 part = op.reply.newpart('reply:changegroup')
1252 part = op.reply.newpart('reply:changegroup')
1247 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1253 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1248 part.addparam('return', '%i' % ret, mandatory=False)
1254 part.addparam('return', '%i' % ret, mandatory=False)
1249 try:
1255 try:
1250 real_part.validate()
1256 real_part.validate()
1251 except util.Abort as e:
1257 except util.Abort as e:
1252 raise util.Abort(_('bundle at %s is corrupted:\n%s') %
1258 raise util.Abort(_('bundle at %s is corrupted:\n%s') %
1253 (util.hidepassword(raw_url), str(e)))
1259 (util.hidepassword(raw_url), str(e)))
1254 assert not inpart.read()
1260 assert not inpart.read()
1255
1261
1256 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1262 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1257 def handlereplychangegroup(op, inpart):
1263 def handlereplychangegroup(op, inpart):
1258 ret = int(inpart.params['return'])
1264 ret = int(inpart.params['return'])
1259 replyto = int(inpart.params['in-reply-to'])
1265 replyto = int(inpart.params['in-reply-to'])
1260 op.records.add('changegroup', {'return': ret}, replyto)
1266 op.records.add('changegroup', {'return': ret}, replyto)
1261
1267
1262 @parthandler('check:heads')
1268 @parthandler('check:heads')
1263 def handlecheckheads(op, inpart):
1269 def handlecheckheads(op, inpart):
1264 """check that head of the repo did not change
1270 """check that head of the repo did not change
1265
1271
1266 This is used to detect a push race when using unbundle.
1272 This is used to detect a push race when using unbundle.
1267 This replaces the "heads" argument of unbundle."""
1273 This replaces the "heads" argument of unbundle."""
1268 h = inpart.read(20)
1274 h = inpart.read(20)
1269 heads = []
1275 heads = []
1270 while len(h) == 20:
1276 while len(h) == 20:
1271 heads.append(h)
1277 heads.append(h)
1272 h = inpart.read(20)
1278 h = inpart.read(20)
1273 assert not h
1279 assert not h
1274 if heads != op.repo.heads():
1280 if heads != op.repo.heads():
1275 raise error.PushRaced('repository changed while pushing - '
1281 raise error.PushRaced('repository changed while pushing - '
1276 'please try again')
1282 'please try again')
1277
1283
1278 @parthandler('output')
1284 @parthandler('output')
1279 def handleoutput(op, inpart):
1285 def handleoutput(op, inpart):
1280 """forward output captured on the server to the client"""
1286 """forward output captured on the server to the client"""
1281 for line in inpart.read().splitlines():
1287 for line in inpart.read().splitlines():
1282 op.ui.status(('remote: %s\n' % line))
1288 op.ui.status(('remote: %s\n' % line))
1283
1289
1284 @parthandler('replycaps')
1290 @parthandler('replycaps')
1285 def handlereplycaps(op, inpart):
1291 def handlereplycaps(op, inpart):
1286 """Notify that a reply bundle should be created
1292 """Notify that a reply bundle should be created
1287
1293
1288 The payload contains the capabilities information for the reply"""
1294 The payload contains the capabilities information for the reply"""
1289 caps = decodecaps(inpart.read())
1295 caps = decodecaps(inpart.read())
1290 if op.reply is None:
1296 if op.reply is None:
1291 op.reply = bundle20(op.ui, caps)
1297 op.reply = bundle20(op.ui, caps)
1292
1298
1293 @parthandler('error:abort', ('message', 'hint'))
1299 @parthandler('error:abort', ('message', 'hint'))
1294 def handleerrorabort(op, inpart):
1300 def handleerrorabort(op, inpart):
1295 """Used to transmit abort error over the wire"""
1301 """Used to transmit abort error over the wire"""
1296 raise util.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
1302 raise util.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
1297
1303
1298 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1304 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1299 'in-reply-to'))
1305 'in-reply-to'))
1300 def handleerrorpushkey(op, inpart):
1306 def handleerrorpushkey(op, inpart):
1301 """Used to transmit failure of a mandatory pushkey over the wire"""
1307 """Used to transmit failure of a mandatory pushkey over the wire"""
1302 kwargs = {}
1308 kwargs = {}
1303 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1309 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1304 value = inpart.params.get(name)
1310 value = inpart.params.get(name)
1305 if value is not None:
1311 if value is not None:
1306 kwargs[name] = value
1312 kwargs[name] = value
1307 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1313 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1308
1314
1309 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1315 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1310 def handleerrorunsupportedcontent(op, inpart):
1316 def handleerrorunsupportedcontent(op, inpart):
1311 """Used to transmit unknown content error over the wire"""
1317 """Used to transmit unknown content error over the wire"""
1312 kwargs = {}
1318 kwargs = {}
1313 parttype = inpart.params.get('parttype')
1319 parttype = inpart.params.get('parttype')
1314 if parttype is not None:
1320 if parttype is not None:
1315 kwargs['parttype'] = parttype
1321 kwargs['parttype'] = parttype
1316 params = inpart.params.get('params')
1322 params = inpart.params.get('params')
1317 if params is not None:
1323 if params is not None:
1318 kwargs['params'] = params.split('\0')
1324 kwargs['params'] = params.split('\0')
1319
1325
1320 raise error.UnsupportedPartError(**kwargs)
1326 raise error.UnsupportedPartError(**kwargs)
1321
1327
1322 @parthandler('error:pushraced', ('message',))
1328 @parthandler('error:pushraced', ('message',))
1323 def handleerrorpushraced(op, inpart):
1329 def handleerrorpushraced(op, inpart):
1324 """Used to transmit push race error over the wire"""
1330 """Used to transmit push race error over the wire"""
1325 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1331 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1326
1332
1327 @parthandler('listkeys', ('namespace',))
1333 @parthandler('listkeys', ('namespace',))
1328 def handlelistkeys(op, inpart):
1334 def handlelistkeys(op, inpart):
1329 """retrieve pushkey namespace content stored in a bundle2"""
1335 """retrieve pushkey namespace content stored in a bundle2"""
1330 namespace = inpart.params['namespace']
1336 namespace = inpart.params['namespace']
1331 r = pushkey.decodekeys(inpart.read())
1337 r = pushkey.decodekeys(inpart.read())
1332 op.records.add('listkeys', (namespace, r))
1338 op.records.add('listkeys', (namespace, r))
1333
1339
1334 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1340 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1335 def handlepushkey(op, inpart):
1341 def handlepushkey(op, inpart):
1336 """process a pushkey request"""
1342 """process a pushkey request"""
1337 dec = pushkey.decode
1343 dec = pushkey.decode
1338 namespace = dec(inpart.params['namespace'])
1344 namespace = dec(inpart.params['namespace'])
1339 key = dec(inpart.params['key'])
1345 key = dec(inpart.params['key'])
1340 old = dec(inpart.params['old'])
1346 old = dec(inpart.params['old'])
1341 new = dec(inpart.params['new'])
1347 new = dec(inpart.params['new'])
1342 ret = op.repo.pushkey(namespace, key, old, new)
1348 ret = op.repo.pushkey(namespace, key, old, new)
1343 record = {'namespace': namespace,
1349 record = {'namespace': namespace,
1344 'key': key,
1350 'key': key,
1345 'old': old,
1351 'old': old,
1346 'new': new}
1352 'new': new}
1347 op.records.add('pushkey', record)
1353 op.records.add('pushkey', record)
1348 if op.reply is not None:
1354 if op.reply is not None:
1349 rpart = op.reply.newpart('reply:pushkey')
1355 rpart = op.reply.newpart('reply:pushkey')
1350 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1356 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1351 rpart.addparam('return', '%i' % ret, mandatory=False)
1357 rpart.addparam('return', '%i' % ret, mandatory=False)
1352 if inpart.mandatory and not ret:
1358 if inpart.mandatory and not ret:
1353 kwargs = {}
1359 kwargs = {}
1354 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1360 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1355 if key in inpart.params:
1361 if key in inpart.params:
1356 kwargs[key] = inpart.params[key]
1362 kwargs[key] = inpart.params[key]
1357 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1363 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1358
1364
1359 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1365 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1360 def handlepushkeyreply(op, inpart):
1366 def handlepushkeyreply(op, inpart):
1361 """retrieve the result of a pushkey request"""
1367 """retrieve the result of a pushkey request"""
1362 ret = int(inpart.params['return'])
1368 ret = int(inpart.params['return'])
1363 partid = int(inpart.params['in-reply-to'])
1369 partid = int(inpart.params['in-reply-to'])
1364 op.records.add('pushkey', {'return': ret}, partid)
1370 op.records.add('pushkey', {'return': ret}, partid)
1365
1371
1366 @parthandler('obsmarkers')
1372 @parthandler('obsmarkers')
1367 def handleobsmarker(op, inpart):
1373 def handleobsmarker(op, inpart):
1368 """add a stream of obsmarkers to the repo"""
1374 """add a stream of obsmarkers to the repo"""
1369 tr = op.gettransaction()
1375 tr = op.gettransaction()
1370 markerdata = inpart.read()
1376 markerdata = inpart.read()
1371 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1377 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1372 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1378 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1373 % len(markerdata))
1379 % len(markerdata))
1374 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1380 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1375 if new:
1381 if new:
1376 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1382 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1377 op.records.add('obsmarkers', {'new': new})
1383 op.records.add('obsmarkers', {'new': new})
1378 if op.reply is not None:
1384 if op.reply is not None:
1379 rpart = op.reply.newpart('reply:obsmarkers')
1385 rpart = op.reply.newpart('reply:obsmarkers')
1380 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1386 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1381 rpart.addparam('new', '%i' % new, mandatory=False)
1387 rpart.addparam('new', '%i' % new, mandatory=False)
1382
1388
1383
1389
1384 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1390 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1385 def handleobsmarkerreply(op, inpart):
1391 def handleobsmarkerreply(op, inpart):
1386 """retrieve the result of a pushkey request"""
1392 """retrieve the result of a pushkey request"""
1387 ret = int(inpart.params['new'])
1393 ret = int(inpart.params['new'])
1388 partid = int(inpart.params['in-reply-to'])
1394 partid = int(inpart.params['in-reply-to'])
1389 op.records.add('obsmarkers', {'new': ret}, partid)
1395 op.records.add('obsmarkers', {'new': ret}, partid)
1390
1396
1391 @parthandler('hgtagsfnodes')
1397 @parthandler('hgtagsfnodes')
1392 def handlehgtagsfnodes(op, inpart):
1398 def handlehgtagsfnodes(op, inpart):
1393 """Applies .hgtags fnodes cache entries to the local repo.
1399 """Applies .hgtags fnodes cache entries to the local repo.
1394
1400
1395 Payload is pairs of 20 byte changeset nodes and filenodes.
1401 Payload is pairs of 20 byte changeset nodes and filenodes.
1396 """
1402 """
1397 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1403 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1398
1404
1399 count = 0
1405 count = 0
1400 while True:
1406 while True:
1401 node = inpart.read(20)
1407 node = inpart.read(20)
1402 fnode = inpart.read(20)
1408 fnode = inpart.read(20)
1403 if len(node) < 20 or len(fnode) < 20:
1409 if len(node) < 20 or len(fnode) < 20:
1404 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1410 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1405 break
1411 break
1406 cache.setfnode(node, fnode)
1412 cache.setfnode(node, fnode)
1407 count += 1
1413 count += 1
1408
1414
1409 cache.write()
1415 cache.write()
1410 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
1416 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
General Comments 0
You need to be logged in to leave comments. Login now