##// END OF EJS Templates
bundle2: add generic debug output regarding processed interruption...
Pierre-Yves David -
r25335:8f7137a8 default
parent child Browse files
Show More
@@ -1,1355 +1,1360 b''
1 # bundle2.py - generic container format to transmit arbitrary data.
1 # bundle2.py - generic container format to transmit arbitrary data.
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7 """Handling of the new bundle2 format
7 """Handling of the new bundle2 format
8
8
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 payloads in an application agnostic way. It consist in a sequence of "parts"
10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 that will be handed to and processed by the application layer.
11 that will be handed to and processed by the application layer.
12
12
13
13
14 General format architecture
14 General format architecture
15 ===========================
15 ===========================
16
16
17 The format is architectured as follow
17 The format is architectured as follow
18
18
19 - magic string
19 - magic string
20 - stream level parameters
20 - stream level parameters
21 - payload parts (any number)
21 - payload parts (any number)
22 - end of stream marker.
22 - end of stream marker.
23
23
24 the Binary format
24 the Binary format
25 ============================
25 ============================
26
26
27 All numbers are unsigned and big-endian.
27 All numbers are unsigned and big-endian.
28
28
29 stream level parameters
29 stream level parameters
30 ------------------------
30 ------------------------
31
31
32 Binary format is as follow
32 Binary format is as follow
33
33
34 :params size: int32
34 :params size: int32
35
35
36 The total number of Bytes used by the parameters
36 The total number of Bytes used by the parameters
37
37
38 :params value: arbitrary number of Bytes
38 :params value: arbitrary number of Bytes
39
39
40 A blob of `params size` containing the serialized version of all stream level
40 A blob of `params size` containing the serialized version of all stream level
41 parameters.
41 parameters.
42
42
43 The blob contains a space separated list of parameters. Parameters with value
43 The blob contains a space separated list of parameters. Parameters with value
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45
45
46 Empty name are obviously forbidden.
46 Empty name are obviously forbidden.
47
47
48 Name MUST start with a letter. If this first letter is lower case, the
48 Name MUST start with a letter. If this first letter is lower case, the
49 parameter is advisory and can be safely ignored. However when the first
49 parameter is advisory and can be safely ignored. However when the first
50 letter is capital, the parameter is mandatory and the bundling process MUST
50 letter is capital, the parameter is mandatory and the bundling process MUST
51 stop if he is not able to proceed it.
51 stop if he is not able to proceed it.
52
52
53 Stream parameters use a simple textual format for two main reasons:
53 Stream parameters use a simple textual format for two main reasons:
54
54
55 - Stream level parameters should remain simple and we want to discourage any
55 - Stream level parameters should remain simple and we want to discourage any
56 crazy usage.
56 crazy usage.
57 - Textual data allow easy human inspection of a bundle2 header in case of
57 - Textual data allow easy human inspection of a bundle2 header in case of
58 troubles.
58 troubles.
59
59
60 Any Applicative level options MUST go into a bundle2 part instead.
60 Any Applicative level options MUST go into a bundle2 part instead.
61
61
62 Payload part
62 Payload part
63 ------------------------
63 ------------------------
64
64
65 Binary format is as follow
65 Binary format is as follow
66
66
67 :header size: int32
67 :header size: int32
68
68
69 The total number of Bytes used by the part headers. When the header is empty
69 The total number of Bytes used by the part headers. When the header is empty
70 (size = 0) this is interpreted as the end of stream marker.
70 (size = 0) this is interpreted as the end of stream marker.
71
71
72 :header:
72 :header:
73
73
74 The header defines how to interpret the part. It contains two piece of
74 The header defines how to interpret the part. It contains two piece of
75 data: the part type, and the part parameters.
75 data: the part type, and the part parameters.
76
76
77 The part type is used to route an application level handler, that can
77 The part type is used to route an application level handler, that can
78 interpret payload.
78 interpret payload.
79
79
80 Part parameters are passed to the application level handler. They are
80 Part parameters are passed to the application level handler. They are
81 meant to convey information that will help the application level object to
81 meant to convey information that will help the application level object to
82 interpret the part payload.
82 interpret the part payload.
83
83
84 The binary format of the header is has follow
84 The binary format of the header is has follow
85
85
86 :typesize: (one byte)
86 :typesize: (one byte)
87
87
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89
89
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 to this part.
91 to this part.
92
92
93 :parameters:
93 :parameters:
94
94
95 Part's parameter may have arbitrary content, the binary structure is::
95 Part's parameter may have arbitrary content, the binary structure is::
96
96
97 <mandatory-count><advisory-count><param-sizes><param-data>
97 <mandatory-count><advisory-count><param-sizes><param-data>
98
98
99 :mandatory-count: 1 byte, number of mandatory parameters
99 :mandatory-count: 1 byte, number of mandatory parameters
100
100
101 :advisory-count: 1 byte, number of advisory parameters
101 :advisory-count: 1 byte, number of advisory parameters
102
102
103 :param-sizes:
103 :param-sizes:
104
104
105 N couple of bytes, where N is the total number of parameters. Each
105 N couple of bytes, where N is the total number of parameters. Each
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107
107
108 :param-data:
108 :param-data:
109
109
110 A blob of bytes from which each parameter key and value can be
110 A blob of bytes from which each parameter key and value can be
111 retrieved using the list of size couples stored in the previous
111 retrieved using the list of size couples stored in the previous
112 field.
112 field.
113
113
114 Mandatory parameters comes first, then the advisory ones.
114 Mandatory parameters comes first, then the advisory ones.
115
115
116 Each parameter's key MUST be unique within the part.
116 Each parameter's key MUST be unique within the part.
117
117
118 :payload:
118 :payload:
119
119
120 payload is a series of `<chunksize><chunkdata>`.
120 payload is a series of `<chunksize><chunkdata>`.
121
121
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124
124
125 The current implementation always produces either zero or one chunk.
125 The current implementation always produces either zero or one chunk.
126 This is an implementation limitation that will ultimately be lifted.
126 This is an implementation limitation that will ultimately be lifted.
127
127
128 `chunksize` can be negative to trigger special case processing. No such
128 `chunksize` can be negative to trigger special case processing. No such
129 processing is in place yet.
129 processing is in place yet.
130
130
131 Bundle processing
131 Bundle processing
132 ============================
132 ============================
133
133
134 Each part is processed in order using a "part handler". Handler are registered
134 Each part is processed in order using a "part handler". Handler are registered
135 for a certain part type.
135 for a certain part type.
136
136
137 The matching of a part to its handler is case insensitive. The case of the
137 The matching of a part to its handler is case insensitive. The case of the
138 part type is used to know if a part is mandatory or advisory. If the Part type
138 part type is used to know if a part is mandatory or advisory. If the Part type
139 contains any uppercase char it is considered mandatory. When no handler is
139 contains any uppercase char it is considered mandatory. When no handler is
140 known for a Mandatory part, the process is aborted and an exception is raised.
140 known for a Mandatory part, the process is aborted and an exception is raised.
141 If the part is advisory and no handler is known, the part is ignored. When the
141 If the part is advisory and no handler is known, the part is ignored. When the
142 process is aborted, the full bundle is still read from the stream to keep the
142 process is aborted, the full bundle is still read from the stream to keep the
143 channel usable. But none of the part read from an abort are processed. In the
143 channel usable. But none of the part read from an abort are processed. In the
144 future, dropping the stream may become an option for channel we do not care to
144 future, dropping the stream may become an option for channel we do not care to
145 preserve.
145 preserve.
146 """
146 """
147
147
148 import errno
148 import errno
149 import sys
149 import sys
150 import util
150 import util
151 import struct
151 import struct
152 import urllib
152 import urllib
153 import string
153 import string
154 import obsolete
154 import obsolete
155 import pushkey
155 import pushkey
156 import url
156 import url
157 import re
157 import re
158
158
159 import changegroup, error
159 import changegroup, error
160 from i18n import _
160 from i18n import _
161
161
162 _pack = struct.pack
162 _pack = struct.pack
163 _unpack = struct.unpack
163 _unpack = struct.unpack
164
164
165 _fstreamparamsize = '>i'
165 _fstreamparamsize = '>i'
166 _fpartheadersize = '>i'
166 _fpartheadersize = '>i'
167 _fparttypesize = '>B'
167 _fparttypesize = '>B'
168 _fpartid = '>I'
168 _fpartid = '>I'
169 _fpayloadsize = '>i'
169 _fpayloadsize = '>i'
170 _fpartparamcount = '>BB'
170 _fpartparamcount = '>BB'
171
171
172 preferedchunksize = 4096
172 preferedchunksize = 4096
173
173
174 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
174 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
175
175
176 def outdebug(ui, message):
176 def outdebug(ui, message):
177 """debug regarding output stream (bundling)"""
177 """debug regarding output stream (bundling)"""
178 ui.debug('bundle2-output: %s\n' % message)
178 ui.debug('bundle2-output: %s\n' % message)
179
179
180 def indebug(ui, message):
180 def indebug(ui, message):
181 """debug on input stream (unbundling)"""
181 """debug on input stream (unbundling)"""
182 ui.debug('bundle2-input: %s\n' % message)
182 ui.debug('bundle2-input: %s\n' % message)
183
183
184 def validateparttype(parttype):
184 def validateparttype(parttype):
185 """raise ValueError if a parttype contains invalid character"""
185 """raise ValueError if a parttype contains invalid character"""
186 if _parttypeforbidden.search(parttype):
186 if _parttypeforbidden.search(parttype):
187 raise ValueError(parttype)
187 raise ValueError(parttype)
188
188
189 def _makefpartparamsizes(nbparams):
189 def _makefpartparamsizes(nbparams):
190 """return a struct format to read part parameter sizes
190 """return a struct format to read part parameter sizes
191
191
192 The number parameters is variable so we need to build that format
192 The number parameters is variable so we need to build that format
193 dynamically.
193 dynamically.
194 """
194 """
195 return '>'+('BB'*nbparams)
195 return '>'+('BB'*nbparams)
196
196
197 parthandlermapping = {}
197 parthandlermapping = {}
198
198
199 def parthandler(parttype, params=()):
199 def parthandler(parttype, params=()):
200 """decorator that register a function as a bundle2 part handler
200 """decorator that register a function as a bundle2 part handler
201
201
202 eg::
202 eg::
203
203
204 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
204 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
205 def myparttypehandler(...):
205 def myparttypehandler(...):
206 '''process a part of type "my part".'''
206 '''process a part of type "my part".'''
207 ...
207 ...
208 """
208 """
209 validateparttype(parttype)
209 validateparttype(parttype)
210 def _decorator(func):
210 def _decorator(func):
211 lparttype = parttype.lower() # enforce lower case matching.
211 lparttype = parttype.lower() # enforce lower case matching.
212 assert lparttype not in parthandlermapping
212 assert lparttype not in parthandlermapping
213 parthandlermapping[lparttype] = func
213 parthandlermapping[lparttype] = func
214 func.params = frozenset(params)
214 func.params = frozenset(params)
215 return func
215 return func
216 return _decorator
216 return _decorator
217
217
218 class unbundlerecords(object):
218 class unbundlerecords(object):
219 """keep record of what happens during and unbundle
219 """keep record of what happens during and unbundle
220
220
221 New records are added using `records.add('cat', obj)`. Where 'cat' is a
221 New records are added using `records.add('cat', obj)`. Where 'cat' is a
222 category of record and obj is an arbitrary object.
222 category of record and obj is an arbitrary object.
223
223
224 `records['cat']` will return all entries of this category 'cat'.
224 `records['cat']` will return all entries of this category 'cat'.
225
225
226 Iterating on the object itself will yield `('category', obj)` tuples
226 Iterating on the object itself will yield `('category', obj)` tuples
227 for all entries.
227 for all entries.
228
228
229 All iterations happens in chronological order.
229 All iterations happens in chronological order.
230 """
230 """
231
231
232 def __init__(self):
232 def __init__(self):
233 self._categories = {}
233 self._categories = {}
234 self._sequences = []
234 self._sequences = []
235 self._replies = {}
235 self._replies = {}
236
236
237 def add(self, category, entry, inreplyto=None):
237 def add(self, category, entry, inreplyto=None):
238 """add a new record of a given category.
238 """add a new record of a given category.
239
239
240 The entry can then be retrieved in the list returned by
240 The entry can then be retrieved in the list returned by
241 self['category']."""
241 self['category']."""
242 self._categories.setdefault(category, []).append(entry)
242 self._categories.setdefault(category, []).append(entry)
243 self._sequences.append((category, entry))
243 self._sequences.append((category, entry))
244 if inreplyto is not None:
244 if inreplyto is not None:
245 self.getreplies(inreplyto).add(category, entry)
245 self.getreplies(inreplyto).add(category, entry)
246
246
247 def getreplies(self, partid):
247 def getreplies(self, partid):
248 """get the records that are replies to a specific part"""
248 """get the records that are replies to a specific part"""
249 return self._replies.setdefault(partid, unbundlerecords())
249 return self._replies.setdefault(partid, unbundlerecords())
250
250
251 def __getitem__(self, cat):
251 def __getitem__(self, cat):
252 return tuple(self._categories.get(cat, ()))
252 return tuple(self._categories.get(cat, ()))
253
253
254 def __iter__(self):
254 def __iter__(self):
255 return iter(self._sequences)
255 return iter(self._sequences)
256
256
257 def __len__(self):
257 def __len__(self):
258 return len(self._sequences)
258 return len(self._sequences)
259
259
260 def __nonzero__(self):
260 def __nonzero__(self):
261 return bool(self._sequences)
261 return bool(self._sequences)
262
262
263 class bundleoperation(object):
263 class bundleoperation(object):
264 """an object that represents a single bundling process
264 """an object that represents a single bundling process
265
265
266 Its purpose is to carry unbundle-related objects and states.
266 Its purpose is to carry unbundle-related objects and states.
267
267
268 A new object should be created at the beginning of each bundle processing.
268 A new object should be created at the beginning of each bundle processing.
269 The object is to be returned by the processing function.
269 The object is to be returned by the processing function.
270
270
271 The object has very little content now it will ultimately contain:
271 The object has very little content now it will ultimately contain:
272 * an access to the repo the bundle is applied to,
272 * an access to the repo the bundle is applied to,
273 * a ui object,
273 * a ui object,
274 * a way to retrieve a transaction to add changes to the repo,
274 * a way to retrieve a transaction to add changes to the repo,
275 * a way to record the result of processing each part,
275 * a way to record the result of processing each part,
276 * a way to construct a bundle response when applicable.
276 * a way to construct a bundle response when applicable.
277 """
277 """
278
278
279 def __init__(self, repo, transactiongetter, captureoutput=True):
279 def __init__(self, repo, transactiongetter, captureoutput=True):
280 self.repo = repo
280 self.repo = repo
281 self.ui = repo.ui
281 self.ui = repo.ui
282 self.records = unbundlerecords()
282 self.records = unbundlerecords()
283 self.gettransaction = transactiongetter
283 self.gettransaction = transactiongetter
284 self.reply = None
284 self.reply = None
285 self.captureoutput = captureoutput
285 self.captureoutput = captureoutput
286
286
287 class TransactionUnavailable(RuntimeError):
287 class TransactionUnavailable(RuntimeError):
288 pass
288 pass
289
289
290 def _notransaction():
290 def _notransaction():
291 """default method to get a transaction while processing a bundle
291 """default method to get a transaction while processing a bundle
292
292
293 Raise an exception to highlight the fact that no transaction was expected
293 Raise an exception to highlight the fact that no transaction was expected
294 to be created"""
294 to be created"""
295 raise TransactionUnavailable()
295 raise TransactionUnavailable()
296
296
297 def processbundle(repo, unbundler, transactiongetter=None, op=None):
297 def processbundle(repo, unbundler, transactiongetter=None, op=None):
298 """This function process a bundle, apply effect to/from a repo
298 """This function process a bundle, apply effect to/from a repo
299
299
300 It iterates over each part then searches for and uses the proper handling
300 It iterates over each part then searches for and uses the proper handling
301 code to process the part. Parts are processed in order.
301 code to process the part. Parts are processed in order.
302
302
303 This is very early version of this function that will be strongly reworked
303 This is very early version of this function that will be strongly reworked
304 before final usage.
304 before final usage.
305
305
306 Unknown Mandatory part will abort the process.
306 Unknown Mandatory part will abort the process.
307
307
308 It is temporarily possible to provide a prebuilt bundleoperation to the
308 It is temporarily possible to provide a prebuilt bundleoperation to the
309 function. This is used to ensure output is properly propagated in case of
309 function. This is used to ensure output is properly propagated in case of
310 an error during the unbundling. This output capturing part will likely be
310 an error during the unbundling. This output capturing part will likely be
311 reworked and this ability will probably go away in the process.
311 reworked and this ability will probably go away in the process.
312 """
312 """
313 if op is None:
313 if op is None:
314 if transactiongetter is None:
314 if transactiongetter is None:
315 transactiongetter = _notransaction
315 transactiongetter = _notransaction
316 op = bundleoperation(repo, transactiongetter)
316 op = bundleoperation(repo, transactiongetter)
317 # todo:
317 # todo:
318 # - replace this is a init function soon.
318 # - replace this is a init function soon.
319 # - exception catching
319 # - exception catching
320 unbundler.params
320 unbundler.params
321 if repo.ui.debugflag:
321 if repo.ui.debugflag:
322 msg = ['bundle2-input-bundle:']
322 msg = ['bundle2-input-bundle:']
323 if unbundler.params:
323 if unbundler.params:
324 msg.append(' %i params')
324 msg.append(' %i params')
325 if op.gettransaction is None:
325 if op.gettransaction is None:
326 msg.append(' no-transaction')
326 msg.append(' no-transaction')
327 else:
327 else:
328 msg.append(' with-transaction')
328 msg.append(' with-transaction')
329 msg.append('\n')
329 msg.append('\n')
330 repo.ui.debug(''.join(msg))
330 repo.ui.debug(''.join(msg))
331 iterparts = enumerate(unbundler.iterparts())
331 iterparts = enumerate(unbundler.iterparts())
332 part = None
332 part = None
333 nbpart = 0
333 nbpart = 0
334 try:
334 try:
335 for nbpart, part in iterparts:
335 for nbpart, part in iterparts:
336 _processpart(op, part)
336 _processpart(op, part)
337 except BaseException, exc:
337 except BaseException, exc:
338 for nbpart, part in iterparts:
338 for nbpart, part in iterparts:
339 # consume the bundle content
339 # consume the bundle content
340 part.seek(0, 2)
340 part.seek(0, 2)
341 # Small hack to let caller code distinguish exceptions from bundle2
341 # Small hack to let caller code distinguish exceptions from bundle2
342 # processing from processing the old format. This is mostly
342 # processing from processing the old format. This is mostly
343 # needed to handle different return codes to unbundle according to the
343 # needed to handle different return codes to unbundle according to the
344 # type of bundle. We should probably clean up or drop this return code
344 # type of bundle. We should probably clean up or drop this return code
345 # craziness in a future version.
345 # craziness in a future version.
346 exc.duringunbundle2 = True
346 exc.duringunbundle2 = True
347 salvaged = []
347 salvaged = []
348 if op.reply is not None:
348 if op.reply is not None:
349 salvaged = op.reply.salvageoutput()
349 salvaged = op.reply.salvageoutput()
350 exc._bundle2salvagedoutput = salvaged
350 exc._bundle2salvagedoutput = salvaged
351 raise
351 raise
352 finally:
352 finally:
353 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
353 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
354
354
355 return op
355 return op
356
356
357 def _processpart(op, part):
357 def _processpart(op, part):
358 """process a single part from a bundle
358 """process a single part from a bundle
359
359
360 The part is guaranteed to have been fully consumed when the function exits
360 The part is guaranteed to have been fully consumed when the function exits
361 (even if an exception is raised)."""
361 (even if an exception is raised)."""
362 status = 'unknown' # used by debug output
362 status = 'unknown' # used by debug output
363 try:
363 try:
364 try:
364 try:
365 handler = parthandlermapping.get(part.type)
365 handler = parthandlermapping.get(part.type)
366 if handler is None:
366 if handler is None:
367 status = 'unsupported-type'
367 status = 'unsupported-type'
368 raise error.UnsupportedPartError(parttype=part.type)
368 raise error.UnsupportedPartError(parttype=part.type)
369 indebug(op.ui, 'found a handler for part %r' % part.type)
369 indebug(op.ui, 'found a handler for part %r' % part.type)
370 unknownparams = part.mandatorykeys - handler.params
370 unknownparams = part.mandatorykeys - handler.params
371 if unknownparams:
371 if unknownparams:
372 unknownparams = list(unknownparams)
372 unknownparams = list(unknownparams)
373 unknownparams.sort()
373 unknownparams.sort()
374 status = 'unsupported-params (%s)' % unknownparams
374 status = 'unsupported-params (%s)' % unknownparams
375 raise error.UnsupportedPartError(parttype=part.type,
375 raise error.UnsupportedPartError(parttype=part.type,
376 params=unknownparams)
376 params=unknownparams)
377 status = 'supported'
377 status = 'supported'
378 except error.UnsupportedPartError, exc:
378 except error.UnsupportedPartError, exc:
379 if part.mandatory: # mandatory parts
379 if part.mandatory: # mandatory parts
380 raise
380 raise
381 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
381 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
382 return # skip to part processing
382 return # skip to part processing
383 finally:
383 finally:
384 if op.ui.debugflag:
384 if op.ui.debugflag:
385 msg = ['bundle2-input-part: "%s"' % part.type]
385 msg = ['bundle2-input-part: "%s"' % part.type]
386 if not part.mandatory:
386 if not part.mandatory:
387 msg.append(' (advisory)')
387 msg.append(' (advisory)')
388 nbmp = len(part.mandatorykeys)
388 nbmp = len(part.mandatorykeys)
389 nbap = len(part.params) - nbmp
389 nbap = len(part.params) - nbmp
390 if nbmp or nbap:
390 if nbmp or nbap:
391 msg.append(' (params:')
391 msg.append(' (params:')
392 if nbmp:
392 if nbmp:
393 msg.append(' %i mandatory' % nbmp)
393 msg.append(' %i mandatory' % nbmp)
394 if nbap:
394 if nbap:
395 msg.append(' %i advisory' % nbmp)
395 msg.append(' %i advisory' % nbmp)
396 msg.append(')')
396 msg.append(')')
397 msg.append(' %s\n' % status)
397 msg.append(' %s\n' % status)
398 op.ui.debug(''.join(msg))
398 op.ui.debug(''.join(msg))
399
399
400 # handler is called outside the above try block so that we don't
400 # handler is called outside the above try block so that we don't
401 # risk catching KeyErrors from anything other than the
401 # risk catching KeyErrors from anything other than the
402 # parthandlermapping lookup (any KeyError raised by handler()
402 # parthandlermapping lookup (any KeyError raised by handler()
403 # itself represents a defect of a different variety).
403 # itself represents a defect of a different variety).
404 output = None
404 output = None
405 if op.captureoutput and op.reply is not None:
405 if op.captureoutput and op.reply is not None:
406 op.ui.pushbuffer(error=True, subproc=True)
406 op.ui.pushbuffer(error=True, subproc=True)
407 output = ''
407 output = ''
408 try:
408 try:
409 handler(op, part)
409 handler(op, part)
410 finally:
410 finally:
411 if output is not None:
411 if output is not None:
412 output = op.ui.popbuffer()
412 output = op.ui.popbuffer()
413 if output:
413 if output:
414 outpart = op.reply.newpart('output', data=output,
414 outpart = op.reply.newpart('output', data=output,
415 mandatory=False)
415 mandatory=False)
416 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
416 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
417 finally:
417 finally:
418 # consume the part content to not corrupt the stream.
418 # consume the part content to not corrupt the stream.
419 part.seek(0, 2)
419 part.seek(0, 2)
420
420
421
421
422 def decodecaps(blob):
422 def decodecaps(blob):
423 """decode a bundle2 caps bytes blob into a dictionary
423 """decode a bundle2 caps bytes blob into a dictionary
424
424
425 The blob is a list of capabilities (one per line)
425 The blob is a list of capabilities (one per line)
426 Capabilities may have values using a line of the form::
426 Capabilities may have values using a line of the form::
427
427
428 capability=value1,value2,value3
428 capability=value1,value2,value3
429
429
430 The values are always a list."""
430 The values are always a list."""
431 caps = {}
431 caps = {}
432 for line in blob.splitlines():
432 for line in blob.splitlines():
433 if not line:
433 if not line:
434 continue
434 continue
435 if '=' not in line:
435 if '=' not in line:
436 key, vals = line, ()
436 key, vals = line, ()
437 else:
437 else:
438 key, vals = line.split('=', 1)
438 key, vals = line.split('=', 1)
439 vals = vals.split(',')
439 vals = vals.split(',')
440 key = urllib.unquote(key)
440 key = urllib.unquote(key)
441 vals = [urllib.unquote(v) for v in vals]
441 vals = [urllib.unquote(v) for v in vals]
442 caps[key] = vals
442 caps[key] = vals
443 return caps
443 return caps
444
444
445 def encodecaps(caps):
445 def encodecaps(caps):
446 """encode a bundle2 caps dictionary into a bytes blob"""
446 """encode a bundle2 caps dictionary into a bytes blob"""
447 chunks = []
447 chunks = []
448 for ca in sorted(caps):
448 for ca in sorted(caps):
449 vals = caps[ca]
449 vals = caps[ca]
450 ca = urllib.quote(ca)
450 ca = urllib.quote(ca)
451 vals = [urllib.quote(v) for v in vals]
451 vals = [urllib.quote(v) for v in vals]
452 if vals:
452 if vals:
453 ca = "%s=%s" % (ca, ','.join(vals))
453 ca = "%s=%s" % (ca, ','.join(vals))
454 chunks.append(ca)
454 chunks.append(ca)
455 return '\n'.join(chunks)
455 return '\n'.join(chunks)
456
456
457 class bundle20(object):
457 class bundle20(object):
458 """represent an outgoing bundle2 container
458 """represent an outgoing bundle2 container
459
459
460 Use the `addparam` method to add stream level parameter. and `newpart` to
460 Use the `addparam` method to add stream level parameter. and `newpart` to
461 populate it. Then call `getchunks` to retrieve all the binary chunks of
461 populate it. Then call `getchunks` to retrieve all the binary chunks of
462 data that compose the bundle2 container."""
462 data that compose the bundle2 container."""
463
463
464 _magicstring = 'HG20'
464 _magicstring = 'HG20'
465
465
466 def __init__(self, ui, capabilities=()):
466 def __init__(self, ui, capabilities=()):
467 self.ui = ui
467 self.ui = ui
468 self._params = []
468 self._params = []
469 self._parts = []
469 self._parts = []
470 self.capabilities = dict(capabilities)
470 self.capabilities = dict(capabilities)
471
471
472 @property
472 @property
473 def nbparts(self):
473 def nbparts(self):
474 """total number of parts added to the bundler"""
474 """total number of parts added to the bundler"""
475 return len(self._parts)
475 return len(self._parts)
476
476
477 # methods used to defines the bundle2 content
477 # methods used to defines the bundle2 content
478 def addparam(self, name, value=None):
478 def addparam(self, name, value=None):
479 """add a stream level parameter"""
479 """add a stream level parameter"""
480 if not name:
480 if not name:
481 raise ValueError('empty parameter name')
481 raise ValueError('empty parameter name')
482 if name[0] not in string.letters:
482 if name[0] not in string.letters:
483 raise ValueError('non letter first character: %r' % name)
483 raise ValueError('non letter first character: %r' % name)
484 self._params.append((name, value))
484 self._params.append((name, value))
485
485
486 def addpart(self, part):
486 def addpart(self, part):
487 """add a new part to the bundle2 container
487 """add a new part to the bundle2 container
488
488
489 Parts contains the actual applicative payload."""
489 Parts contains the actual applicative payload."""
490 assert part.id is None
490 assert part.id is None
491 part.id = len(self._parts) # very cheap counter
491 part.id = len(self._parts) # very cheap counter
492 self._parts.append(part)
492 self._parts.append(part)
493
493
494 def newpart(self, typeid, *args, **kwargs):
494 def newpart(self, typeid, *args, **kwargs):
495 """create a new part and add it to the containers
495 """create a new part and add it to the containers
496
496
497 As the part is directly added to the containers. For now, this means
497 As the part is directly added to the containers. For now, this means
498 that any failure to properly initialize the part after calling
498 that any failure to properly initialize the part after calling
499 ``newpart`` should result in a failure of the whole bundling process.
499 ``newpart`` should result in a failure of the whole bundling process.
500
500
501 You can still fall back to manually create and add if you need better
501 You can still fall back to manually create and add if you need better
502 control."""
502 control."""
503 part = bundlepart(typeid, *args, **kwargs)
503 part = bundlepart(typeid, *args, **kwargs)
504 self.addpart(part)
504 self.addpart(part)
505 return part
505 return part
506
506
507 # methods used to generate the bundle2 stream
507 # methods used to generate the bundle2 stream
508 def getchunks(self):
508 def getchunks(self):
509 if self.ui.debugflag:
509 if self.ui.debugflag:
510 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
510 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
511 if self._params:
511 if self._params:
512 msg.append(' (%i params)' % len(self._params))
512 msg.append(' (%i params)' % len(self._params))
513 msg.append(' %i parts total\n' % len(self._parts))
513 msg.append(' %i parts total\n' % len(self._parts))
514 self.ui.debug(''.join(msg))
514 self.ui.debug(''.join(msg))
515 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
515 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
516 yield self._magicstring
516 yield self._magicstring
517 param = self._paramchunk()
517 param = self._paramchunk()
518 outdebug(self.ui, 'bundle parameter: %s' % param)
518 outdebug(self.ui, 'bundle parameter: %s' % param)
519 yield _pack(_fstreamparamsize, len(param))
519 yield _pack(_fstreamparamsize, len(param))
520 if param:
520 if param:
521 yield param
521 yield param
522
522
523 outdebug(self.ui, 'start of parts')
523 outdebug(self.ui, 'start of parts')
524 for part in self._parts:
524 for part in self._parts:
525 outdebug(self.ui, 'bundle part: "%s"' % part.type)
525 outdebug(self.ui, 'bundle part: "%s"' % part.type)
526 for chunk in part.getchunks(ui=self.ui):
526 for chunk in part.getchunks(ui=self.ui):
527 yield chunk
527 yield chunk
528 outdebug(self.ui, 'end of bundle')
528 outdebug(self.ui, 'end of bundle')
529 yield _pack(_fpartheadersize, 0)
529 yield _pack(_fpartheadersize, 0)
530
530
531 def _paramchunk(self):
531 def _paramchunk(self):
532 """return a encoded version of all stream parameters"""
532 """return a encoded version of all stream parameters"""
533 blocks = []
533 blocks = []
534 for par, value in self._params:
534 for par, value in self._params:
535 par = urllib.quote(par)
535 par = urllib.quote(par)
536 if value is not None:
536 if value is not None:
537 value = urllib.quote(value)
537 value = urllib.quote(value)
538 par = '%s=%s' % (par, value)
538 par = '%s=%s' % (par, value)
539 blocks.append(par)
539 blocks.append(par)
540 return ' '.join(blocks)
540 return ' '.join(blocks)
541
541
542 def salvageoutput(self):
542 def salvageoutput(self):
543 """return a list with a copy of all output parts in the bundle
543 """return a list with a copy of all output parts in the bundle
544
544
545 This is meant to be used during error handling to make sure we preserve
545 This is meant to be used during error handling to make sure we preserve
546 server output"""
546 server output"""
547 salvaged = []
547 salvaged = []
548 for part in self._parts:
548 for part in self._parts:
549 if part.type.startswith('output'):
549 if part.type.startswith('output'):
550 salvaged.append(part.copy())
550 salvaged.append(part.copy())
551 return salvaged
551 return salvaged
552
552
553
553
554 class unpackermixin(object):
554 class unpackermixin(object):
555 """A mixin to extract bytes and struct data from a stream"""
555 """A mixin to extract bytes and struct data from a stream"""
556
556
557 def __init__(self, fp):
557 def __init__(self, fp):
558 self._fp = fp
558 self._fp = fp
559 self._seekable = (util.safehasattr(fp, 'seek') and
559 self._seekable = (util.safehasattr(fp, 'seek') and
560 util.safehasattr(fp, 'tell'))
560 util.safehasattr(fp, 'tell'))
561
561
562 def _unpack(self, format):
562 def _unpack(self, format):
563 """unpack this struct format from the stream"""
563 """unpack this struct format from the stream"""
564 data = self._readexact(struct.calcsize(format))
564 data = self._readexact(struct.calcsize(format))
565 return _unpack(format, data)
565 return _unpack(format, data)
566
566
567 def _readexact(self, size):
567 def _readexact(self, size):
568 """read exactly <size> bytes from the stream"""
568 """read exactly <size> bytes from the stream"""
569 return changegroup.readexactly(self._fp, size)
569 return changegroup.readexactly(self._fp, size)
570
570
571 def seek(self, offset, whence=0):
571 def seek(self, offset, whence=0):
572 """move the underlying file pointer"""
572 """move the underlying file pointer"""
573 if self._seekable:
573 if self._seekable:
574 return self._fp.seek(offset, whence)
574 return self._fp.seek(offset, whence)
575 else:
575 else:
576 raise NotImplementedError(_('File pointer is not seekable'))
576 raise NotImplementedError(_('File pointer is not seekable'))
577
577
578 def tell(self):
578 def tell(self):
579 """return the file offset, or None if file is not seekable"""
579 """return the file offset, or None if file is not seekable"""
580 if self._seekable:
580 if self._seekable:
581 try:
581 try:
582 return self._fp.tell()
582 return self._fp.tell()
583 except IOError, e:
583 except IOError, e:
584 if e.errno == errno.ESPIPE:
584 if e.errno == errno.ESPIPE:
585 self._seekable = False
585 self._seekable = False
586 else:
586 else:
587 raise
587 raise
588 return None
588 return None
589
589
590 def close(self):
590 def close(self):
591 """close underlying file"""
591 """close underlying file"""
592 if util.safehasattr(self._fp, 'close'):
592 if util.safehasattr(self._fp, 'close'):
593 return self._fp.close()
593 return self._fp.close()
594
594
595 def getunbundler(ui, fp, header=None):
595 def getunbundler(ui, fp, header=None):
596 """return a valid unbundler object for a given header"""
596 """return a valid unbundler object for a given header"""
597 if header is None:
597 if header is None:
598 header = changegroup.readexactly(fp, 4)
598 header = changegroup.readexactly(fp, 4)
599 magic, version = header[0:2], header[2:4]
599 magic, version = header[0:2], header[2:4]
600 if magic != 'HG':
600 if magic != 'HG':
601 raise util.Abort(_('not a Mercurial bundle'))
601 raise util.Abort(_('not a Mercurial bundle'))
602 unbundlerclass = formatmap.get(version)
602 unbundlerclass = formatmap.get(version)
603 if unbundlerclass is None:
603 if unbundlerclass is None:
604 raise util.Abort(_('unknown bundle version %s') % version)
604 raise util.Abort(_('unknown bundle version %s') % version)
605 unbundler = unbundlerclass(ui, fp)
605 unbundler = unbundlerclass(ui, fp)
606 indebug(ui, 'start processing of %s stream' % header)
606 indebug(ui, 'start processing of %s stream' % header)
607 return unbundler
607 return unbundler
608
608
609 class unbundle20(unpackermixin):
609 class unbundle20(unpackermixin):
610 """interpret a bundle2 stream
610 """interpret a bundle2 stream
611
611
612 This class is fed with a binary stream and yields parts through its
612 This class is fed with a binary stream and yields parts through its
613 `iterparts` methods."""
613 `iterparts` methods."""
614
614
615 def __init__(self, ui, fp):
615 def __init__(self, ui, fp):
616 """If header is specified, we do not read it out of the stream."""
616 """If header is specified, we do not read it out of the stream."""
617 self.ui = ui
617 self.ui = ui
618 super(unbundle20, self).__init__(fp)
618 super(unbundle20, self).__init__(fp)
619
619
620 @util.propertycache
620 @util.propertycache
621 def params(self):
621 def params(self):
622 """dictionary of stream level parameters"""
622 """dictionary of stream level parameters"""
623 indebug(self.ui, 'reading bundle2 stream parameters')
623 indebug(self.ui, 'reading bundle2 stream parameters')
624 params = {}
624 params = {}
625 paramssize = self._unpack(_fstreamparamsize)[0]
625 paramssize = self._unpack(_fstreamparamsize)[0]
626 if paramssize < 0:
626 if paramssize < 0:
627 raise error.BundleValueError('negative bundle param size: %i'
627 raise error.BundleValueError('negative bundle param size: %i'
628 % paramssize)
628 % paramssize)
629 if paramssize:
629 if paramssize:
630 for p in self._readexact(paramssize).split(' '):
630 for p in self._readexact(paramssize).split(' '):
631 p = p.split('=', 1)
631 p = p.split('=', 1)
632 p = [urllib.unquote(i) for i in p]
632 p = [urllib.unquote(i) for i in p]
633 if len(p) < 2:
633 if len(p) < 2:
634 p.append(None)
634 p.append(None)
635 self._processparam(*p)
635 self._processparam(*p)
636 params[p[0]] = p[1]
636 params[p[0]] = p[1]
637 return params
637 return params
638
638
639 def _processparam(self, name, value):
639 def _processparam(self, name, value):
640 """process a parameter, applying its effect if needed
640 """process a parameter, applying its effect if needed
641
641
642 Parameter starting with a lower case letter are advisory and will be
642 Parameter starting with a lower case letter are advisory and will be
643 ignored when unknown. Those starting with an upper case letter are
643 ignored when unknown. Those starting with an upper case letter are
644 mandatory and will this function will raise a KeyError when unknown.
644 mandatory and will this function will raise a KeyError when unknown.
645
645
646 Note: no option are currently supported. Any input will be either
646 Note: no option are currently supported. Any input will be either
647 ignored or failing.
647 ignored or failing.
648 """
648 """
649 if not name:
649 if not name:
650 raise ValueError('empty parameter name')
650 raise ValueError('empty parameter name')
651 if name[0] not in string.letters:
651 if name[0] not in string.letters:
652 raise ValueError('non letter first character: %r' % name)
652 raise ValueError('non letter first character: %r' % name)
653 # Some logic will be later added here to try to process the option for
653 # Some logic will be later added here to try to process the option for
654 # a dict of known parameter.
654 # a dict of known parameter.
655 if name[0].islower():
655 if name[0].islower():
656 indebug(self.ui, "ignoring unknown parameter %r" % name)
656 indebug(self.ui, "ignoring unknown parameter %r" % name)
657 else:
657 else:
658 raise error.UnsupportedPartError(params=(name,))
658 raise error.UnsupportedPartError(params=(name,))
659
659
660
660
661 def iterparts(self):
661 def iterparts(self):
662 """yield all parts contained in the stream"""
662 """yield all parts contained in the stream"""
663 # make sure param have been loaded
663 # make sure param have been loaded
664 self.params
664 self.params
665 indebug(self.ui, 'start extraction of bundle2 parts')
665 indebug(self.ui, 'start extraction of bundle2 parts')
666 headerblock = self._readpartheader()
666 headerblock = self._readpartheader()
667 while headerblock is not None:
667 while headerblock is not None:
668 part = unbundlepart(self.ui, headerblock, self._fp)
668 part = unbundlepart(self.ui, headerblock, self._fp)
669 yield part
669 yield part
670 part.seek(0, 2)
670 part.seek(0, 2)
671 headerblock = self._readpartheader()
671 headerblock = self._readpartheader()
672 indebug(self.ui, 'end of bundle2 stream')
672 indebug(self.ui, 'end of bundle2 stream')
673
673
674 def _readpartheader(self):
674 def _readpartheader(self):
675 """reads a part header size and return the bytes blob
675 """reads a part header size and return the bytes blob
676
676
677 returns None if empty"""
677 returns None if empty"""
678 headersize = self._unpack(_fpartheadersize)[0]
678 headersize = self._unpack(_fpartheadersize)[0]
679 if headersize < 0:
679 if headersize < 0:
680 raise error.BundleValueError('negative part header size: %i'
680 raise error.BundleValueError('negative part header size: %i'
681 % headersize)
681 % headersize)
682 indebug(self.ui, 'part header size: %i' % headersize)
682 indebug(self.ui, 'part header size: %i' % headersize)
683 if headersize:
683 if headersize:
684 return self._readexact(headersize)
684 return self._readexact(headersize)
685 return None
685 return None
686
686
687 def compressed(self):
687 def compressed(self):
688 return False
688 return False
689
689
690 formatmap = {'20': unbundle20}
690 formatmap = {'20': unbundle20}
691
691
692 class bundlepart(object):
692 class bundlepart(object):
693 """A bundle2 part contains application level payload
693 """A bundle2 part contains application level payload
694
694
695 The part `type` is used to route the part to the application level
695 The part `type` is used to route the part to the application level
696 handler.
696 handler.
697
697
698 The part payload is contained in ``part.data``. It could be raw bytes or a
698 The part payload is contained in ``part.data``. It could be raw bytes or a
699 generator of byte chunks.
699 generator of byte chunks.
700
700
701 You can add parameters to the part using the ``addparam`` method.
701 You can add parameters to the part using the ``addparam`` method.
702 Parameters can be either mandatory (default) or advisory. Remote side
702 Parameters can be either mandatory (default) or advisory. Remote side
703 should be able to safely ignore the advisory ones.
703 should be able to safely ignore the advisory ones.
704
704
705 Both data and parameters cannot be modified after the generation has begun.
705 Both data and parameters cannot be modified after the generation has begun.
706 """
706 """
707
707
708 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
708 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
709 data='', mandatory=True):
709 data='', mandatory=True):
710 validateparttype(parttype)
710 validateparttype(parttype)
711 self.id = None
711 self.id = None
712 self.type = parttype
712 self.type = parttype
713 self._data = data
713 self._data = data
714 self._mandatoryparams = list(mandatoryparams)
714 self._mandatoryparams = list(mandatoryparams)
715 self._advisoryparams = list(advisoryparams)
715 self._advisoryparams = list(advisoryparams)
716 # checking for duplicated entries
716 # checking for duplicated entries
717 self._seenparams = set()
717 self._seenparams = set()
718 for pname, __ in self._mandatoryparams + self._advisoryparams:
718 for pname, __ in self._mandatoryparams + self._advisoryparams:
719 if pname in self._seenparams:
719 if pname in self._seenparams:
720 raise RuntimeError('duplicated params: %s' % pname)
720 raise RuntimeError('duplicated params: %s' % pname)
721 self._seenparams.add(pname)
721 self._seenparams.add(pname)
722 # status of the part's generation:
722 # status of the part's generation:
723 # - None: not started,
723 # - None: not started,
724 # - False: currently generated,
724 # - False: currently generated,
725 # - True: generation done.
725 # - True: generation done.
726 self._generated = None
726 self._generated = None
727 self.mandatory = mandatory
727 self.mandatory = mandatory
728
728
729 def copy(self):
729 def copy(self):
730 """return a copy of the part
730 """return a copy of the part
731
731
732 The new part have the very same content but no partid assigned yet.
732 The new part have the very same content but no partid assigned yet.
733 Parts with generated data cannot be copied."""
733 Parts with generated data cannot be copied."""
734 assert not util.safehasattr(self.data, 'next')
734 assert not util.safehasattr(self.data, 'next')
735 return self.__class__(self.type, self._mandatoryparams,
735 return self.__class__(self.type, self._mandatoryparams,
736 self._advisoryparams, self._data, self.mandatory)
736 self._advisoryparams, self._data, self.mandatory)
737
737
738 # methods used to defines the part content
738 # methods used to defines the part content
739 def __setdata(self, data):
739 def __setdata(self, data):
740 if self._generated is not None:
740 if self._generated is not None:
741 raise error.ReadOnlyPartError('part is being generated')
741 raise error.ReadOnlyPartError('part is being generated')
742 self._data = data
742 self._data = data
743 def __getdata(self):
743 def __getdata(self):
744 return self._data
744 return self._data
745 data = property(__getdata, __setdata)
745 data = property(__getdata, __setdata)
746
746
747 @property
747 @property
748 def mandatoryparams(self):
748 def mandatoryparams(self):
749 # make it an immutable tuple to force people through ``addparam``
749 # make it an immutable tuple to force people through ``addparam``
750 return tuple(self._mandatoryparams)
750 return tuple(self._mandatoryparams)
751
751
752 @property
752 @property
753 def advisoryparams(self):
753 def advisoryparams(self):
754 # make it an immutable tuple to force people through ``addparam``
754 # make it an immutable tuple to force people through ``addparam``
755 return tuple(self._advisoryparams)
755 return tuple(self._advisoryparams)
756
756
757 def addparam(self, name, value='', mandatory=True):
757 def addparam(self, name, value='', mandatory=True):
758 if self._generated is not None:
758 if self._generated is not None:
759 raise error.ReadOnlyPartError('part is being generated')
759 raise error.ReadOnlyPartError('part is being generated')
760 if name in self._seenparams:
760 if name in self._seenparams:
761 raise ValueError('duplicated params: %s' % name)
761 raise ValueError('duplicated params: %s' % name)
762 self._seenparams.add(name)
762 self._seenparams.add(name)
763 params = self._advisoryparams
763 params = self._advisoryparams
764 if mandatory:
764 if mandatory:
765 params = self._mandatoryparams
765 params = self._mandatoryparams
766 params.append((name, value))
766 params.append((name, value))
767
767
768 # methods used to generates the bundle2 stream
768 # methods used to generates the bundle2 stream
769 def getchunks(self, ui):
769 def getchunks(self, ui):
770 if self._generated is not None:
770 if self._generated is not None:
771 raise RuntimeError('part can only be consumed once')
771 raise RuntimeError('part can only be consumed once')
772 self._generated = False
772 self._generated = False
773
773
774 if ui.debugflag:
774 if ui.debugflag:
775 msg = ['bundle2-output-part: "%s"' % self.type]
775 msg = ['bundle2-output-part: "%s"' % self.type]
776 if not self.mandatory:
776 if not self.mandatory:
777 msg.append(' (advisory)')
777 msg.append(' (advisory)')
778 nbmp = len(self.mandatoryparams)
778 nbmp = len(self.mandatoryparams)
779 nbap = len(self.advisoryparams)
779 nbap = len(self.advisoryparams)
780 if nbmp or nbap:
780 if nbmp or nbap:
781 msg.append(' (params:')
781 msg.append(' (params:')
782 if nbmp:
782 if nbmp:
783 msg.append(' %i mandatory' % nbmp)
783 msg.append(' %i mandatory' % nbmp)
784 if nbap:
784 if nbap:
785 msg.append(' %i advisory' % nbmp)
785 msg.append(' %i advisory' % nbmp)
786 msg.append(')')
786 msg.append(')')
787 if not self.data:
787 if not self.data:
788 msg.append(' empty payload')
788 msg.append(' empty payload')
789 elif util.safehasattr(self.data, 'next'):
789 elif util.safehasattr(self.data, 'next'):
790 msg.append(' streamed payload')
790 msg.append(' streamed payload')
791 else:
791 else:
792 msg.append(' %i bytes payload' % len(self.data))
792 msg.append(' %i bytes payload' % len(self.data))
793 msg.append('\n')
793 msg.append('\n')
794 ui.debug(''.join(msg))
794 ui.debug(''.join(msg))
795
795
796 #### header
796 #### header
797 if self.mandatory:
797 if self.mandatory:
798 parttype = self.type.upper()
798 parttype = self.type.upper()
799 else:
799 else:
800 parttype = self.type.lower()
800 parttype = self.type.lower()
801 outdebug(ui, 'part %s: "%s"' % (self.id, parttype))
801 outdebug(ui, 'part %s: "%s"' % (self.id, parttype))
802 ## parttype
802 ## parttype
803 header = [_pack(_fparttypesize, len(parttype)),
803 header = [_pack(_fparttypesize, len(parttype)),
804 parttype, _pack(_fpartid, self.id),
804 parttype, _pack(_fpartid, self.id),
805 ]
805 ]
806 ## parameters
806 ## parameters
807 # count
807 # count
808 manpar = self.mandatoryparams
808 manpar = self.mandatoryparams
809 advpar = self.advisoryparams
809 advpar = self.advisoryparams
810 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
810 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
811 # size
811 # size
812 parsizes = []
812 parsizes = []
813 for key, value in manpar:
813 for key, value in manpar:
814 parsizes.append(len(key))
814 parsizes.append(len(key))
815 parsizes.append(len(value))
815 parsizes.append(len(value))
816 for key, value in advpar:
816 for key, value in advpar:
817 parsizes.append(len(key))
817 parsizes.append(len(key))
818 parsizes.append(len(value))
818 parsizes.append(len(value))
819 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
819 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
820 header.append(paramsizes)
820 header.append(paramsizes)
821 # key, value
821 # key, value
822 for key, value in manpar:
822 for key, value in manpar:
823 header.append(key)
823 header.append(key)
824 header.append(value)
824 header.append(value)
825 for key, value in advpar:
825 for key, value in advpar:
826 header.append(key)
826 header.append(key)
827 header.append(value)
827 header.append(value)
828 ## finalize header
828 ## finalize header
829 headerchunk = ''.join(header)
829 headerchunk = ''.join(header)
830 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
830 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
831 yield _pack(_fpartheadersize, len(headerchunk))
831 yield _pack(_fpartheadersize, len(headerchunk))
832 yield headerchunk
832 yield headerchunk
833 ## payload
833 ## payload
834 try:
834 try:
835 for chunk in self._payloadchunks():
835 for chunk in self._payloadchunks():
836 outdebug(ui, 'payload chunk size: %i' % len(chunk))
836 outdebug(ui, 'payload chunk size: %i' % len(chunk))
837 yield _pack(_fpayloadsize, len(chunk))
837 yield _pack(_fpayloadsize, len(chunk))
838 yield chunk
838 yield chunk
839 except BaseException, exc:
839 except BaseException, exc:
840 # backup exception data for later
840 # backup exception data for later
841 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
841 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
842 % exc)
842 % exc)
843 exc_info = sys.exc_info()
843 exc_info = sys.exc_info()
844 msg = 'unexpected error: %s' % exc
844 msg = 'unexpected error: %s' % exc
845 interpart = bundlepart('error:abort', [('message', msg)],
845 interpart = bundlepart('error:abort', [('message', msg)],
846 mandatory=False)
846 mandatory=False)
847 interpart.id = 0
847 interpart.id = 0
848 yield _pack(_fpayloadsize, -1)
848 yield _pack(_fpayloadsize, -1)
849 for chunk in interpart.getchunks(ui=ui):
849 for chunk in interpart.getchunks(ui=ui):
850 yield chunk
850 yield chunk
851 outdebug(ui, 'closing payload chunk')
851 outdebug(ui, 'closing payload chunk')
852 # abort current part payload
852 # abort current part payload
853 yield _pack(_fpayloadsize, 0)
853 yield _pack(_fpayloadsize, 0)
854 raise exc_info[0], exc_info[1], exc_info[2]
854 raise exc_info[0], exc_info[1], exc_info[2]
855 # end of payload
855 # end of payload
856 outdebug(ui, 'closing payload chunk')
856 outdebug(ui, 'closing payload chunk')
857 yield _pack(_fpayloadsize, 0)
857 yield _pack(_fpayloadsize, 0)
858 self._generated = True
858 self._generated = True
859
859
860 def _payloadchunks(self):
860 def _payloadchunks(self):
861 """yield chunks of a the part payload
861 """yield chunks of a the part payload
862
862
863 Exists to handle the different methods to provide data to a part."""
863 Exists to handle the different methods to provide data to a part."""
864 # we only support fixed size data now.
864 # we only support fixed size data now.
865 # This will be improved in the future.
865 # This will be improved in the future.
866 if util.safehasattr(self.data, 'next'):
866 if util.safehasattr(self.data, 'next'):
867 buff = util.chunkbuffer(self.data)
867 buff = util.chunkbuffer(self.data)
868 chunk = buff.read(preferedchunksize)
868 chunk = buff.read(preferedchunksize)
869 while chunk:
869 while chunk:
870 yield chunk
870 yield chunk
871 chunk = buff.read(preferedchunksize)
871 chunk = buff.read(preferedchunksize)
872 elif len(self.data):
872 elif len(self.data):
873 yield self.data
873 yield self.data
874
874
875
875
876 flaginterrupt = -1
876 flaginterrupt = -1
877
877
878 class interrupthandler(unpackermixin):
878 class interrupthandler(unpackermixin):
879 """read one part and process it with restricted capability
879 """read one part and process it with restricted capability
880
880
881 This allows to transmit exception raised on the producer size during part
881 This allows to transmit exception raised on the producer size during part
882 iteration while the consumer is reading a part.
882 iteration while the consumer is reading a part.
883
883
884 Part processed in this manner only have access to a ui object,"""
884 Part processed in this manner only have access to a ui object,"""
885
885
886 def __init__(self, ui, fp):
886 def __init__(self, ui, fp):
887 super(interrupthandler, self).__init__(fp)
887 super(interrupthandler, self).__init__(fp)
888 self.ui = ui
888 self.ui = ui
889
889
890 def _readpartheader(self):
890 def _readpartheader(self):
891 """reads a part header size and return the bytes blob
891 """reads a part header size and return the bytes blob
892
892
893 returns None if empty"""
893 returns None if empty"""
894 headersize = self._unpack(_fpartheadersize)[0]
894 headersize = self._unpack(_fpartheadersize)[0]
895 if headersize < 0:
895 if headersize < 0:
896 raise error.BundleValueError('negative part header size: %i'
896 raise error.BundleValueError('negative part header size: %i'
897 % headersize)
897 % headersize)
898 indebug(self.ui, 'part header size: %i\n' % headersize)
898 indebug(self.ui, 'part header size: %i\n' % headersize)
899 if headersize:
899 if headersize:
900 return self._readexact(headersize)
900 return self._readexact(headersize)
901 return None
901 return None
902
902
903 def __call__(self):
903 def __call__(self):
904
905 self.ui.debug('bundle2-input-stream-interrupt:'
906 ' opening out of band context\n')
904 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
907 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
905 headerblock = self._readpartheader()
908 headerblock = self._readpartheader()
906 if headerblock is None:
909 if headerblock is None:
907 indebug(self.ui, 'no part found during interruption.')
910 indebug(self.ui, 'no part found during interruption.')
908 return
911 return
909 part = unbundlepart(self.ui, headerblock, self._fp)
912 part = unbundlepart(self.ui, headerblock, self._fp)
910 op = interruptoperation(self.ui)
913 op = interruptoperation(self.ui)
911 _processpart(op, part)
914 _processpart(op, part)
915 self.ui.debug('bundle2-input-stream-interrupt:'
916 ' closing out of band context\n')
912
917
913 class interruptoperation(object):
918 class interruptoperation(object):
914 """A limited operation to be use by part handler during interruption
919 """A limited operation to be use by part handler during interruption
915
920
916 It only have access to an ui object.
921 It only have access to an ui object.
917 """
922 """
918
923
919 def __init__(self, ui):
924 def __init__(self, ui):
920 self.ui = ui
925 self.ui = ui
921 self.reply = None
926 self.reply = None
922 self.captureoutput = False
927 self.captureoutput = False
923
928
924 @property
929 @property
925 def repo(self):
930 def repo(self):
926 raise RuntimeError('no repo access from stream interruption')
931 raise RuntimeError('no repo access from stream interruption')
927
932
928 def gettransaction(self):
933 def gettransaction(self):
929 raise TransactionUnavailable('no repo access from stream interruption')
934 raise TransactionUnavailable('no repo access from stream interruption')
930
935
931 class unbundlepart(unpackermixin):
936 class unbundlepart(unpackermixin):
932 """a bundle part read from a bundle"""
937 """a bundle part read from a bundle"""
933
938
934 def __init__(self, ui, header, fp):
939 def __init__(self, ui, header, fp):
935 super(unbundlepart, self).__init__(fp)
940 super(unbundlepart, self).__init__(fp)
936 self.ui = ui
941 self.ui = ui
937 # unbundle state attr
942 # unbundle state attr
938 self._headerdata = header
943 self._headerdata = header
939 self._headeroffset = 0
944 self._headeroffset = 0
940 self._initialized = False
945 self._initialized = False
941 self.consumed = False
946 self.consumed = False
942 # part data
947 # part data
943 self.id = None
948 self.id = None
944 self.type = None
949 self.type = None
945 self.mandatoryparams = None
950 self.mandatoryparams = None
946 self.advisoryparams = None
951 self.advisoryparams = None
947 self.params = None
952 self.params = None
948 self.mandatorykeys = ()
953 self.mandatorykeys = ()
949 self._payloadstream = None
954 self._payloadstream = None
950 self._readheader()
955 self._readheader()
951 self._mandatory = None
956 self._mandatory = None
952 self._chunkindex = [] #(payload, file) position tuples for chunk starts
957 self._chunkindex = [] #(payload, file) position tuples for chunk starts
953 self._pos = 0
958 self._pos = 0
954
959
955 def _fromheader(self, size):
960 def _fromheader(self, size):
956 """return the next <size> byte from the header"""
961 """return the next <size> byte from the header"""
957 offset = self._headeroffset
962 offset = self._headeroffset
958 data = self._headerdata[offset:(offset + size)]
963 data = self._headerdata[offset:(offset + size)]
959 self._headeroffset = offset + size
964 self._headeroffset = offset + size
960 return data
965 return data
961
966
962 def _unpackheader(self, format):
967 def _unpackheader(self, format):
963 """read given format from header
968 """read given format from header
964
969
965 This automatically compute the size of the format to read."""
970 This automatically compute the size of the format to read."""
966 data = self._fromheader(struct.calcsize(format))
971 data = self._fromheader(struct.calcsize(format))
967 return _unpack(format, data)
972 return _unpack(format, data)
968
973
969 def _initparams(self, mandatoryparams, advisoryparams):
974 def _initparams(self, mandatoryparams, advisoryparams):
970 """internal function to setup all logic related parameters"""
975 """internal function to setup all logic related parameters"""
971 # make it read only to prevent people touching it by mistake.
976 # make it read only to prevent people touching it by mistake.
972 self.mandatoryparams = tuple(mandatoryparams)
977 self.mandatoryparams = tuple(mandatoryparams)
973 self.advisoryparams = tuple(advisoryparams)
978 self.advisoryparams = tuple(advisoryparams)
974 # user friendly UI
979 # user friendly UI
975 self.params = dict(self.mandatoryparams)
980 self.params = dict(self.mandatoryparams)
976 self.params.update(dict(self.advisoryparams))
981 self.params.update(dict(self.advisoryparams))
977 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
982 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
978
983
979 def _payloadchunks(self, chunknum=0):
984 def _payloadchunks(self, chunknum=0):
980 '''seek to specified chunk and start yielding data'''
985 '''seek to specified chunk and start yielding data'''
981 if len(self._chunkindex) == 0:
986 if len(self._chunkindex) == 0:
982 assert chunknum == 0, 'Must start with chunk 0'
987 assert chunknum == 0, 'Must start with chunk 0'
983 self._chunkindex.append((0, super(unbundlepart, self).tell()))
988 self._chunkindex.append((0, super(unbundlepart, self).tell()))
984 else:
989 else:
985 assert chunknum < len(self._chunkindex), \
990 assert chunknum < len(self._chunkindex), \
986 'Unknown chunk %d' % chunknum
991 'Unknown chunk %d' % chunknum
987 super(unbundlepart, self).seek(self._chunkindex[chunknum][1])
992 super(unbundlepart, self).seek(self._chunkindex[chunknum][1])
988
993
989 pos = self._chunkindex[chunknum][0]
994 pos = self._chunkindex[chunknum][0]
990 payloadsize = self._unpack(_fpayloadsize)[0]
995 payloadsize = self._unpack(_fpayloadsize)[0]
991 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
996 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
992 while payloadsize:
997 while payloadsize:
993 if payloadsize == flaginterrupt:
998 if payloadsize == flaginterrupt:
994 # interruption detection, the handler will now read a
999 # interruption detection, the handler will now read a
995 # single part and process it.
1000 # single part and process it.
996 interrupthandler(self.ui, self._fp)()
1001 interrupthandler(self.ui, self._fp)()
997 elif payloadsize < 0:
1002 elif payloadsize < 0:
998 msg = 'negative payload chunk size: %i' % payloadsize
1003 msg = 'negative payload chunk size: %i' % payloadsize
999 raise error.BundleValueError(msg)
1004 raise error.BundleValueError(msg)
1000 else:
1005 else:
1001 result = self._readexact(payloadsize)
1006 result = self._readexact(payloadsize)
1002 chunknum += 1
1007 chunknum += 1
1003 pos += payloadsize
1008 pos += payloadsize
1004 if chunknum == len(self._chunkindex):
1009 if chunknum == len(self._chunkindex):
1005 self._chunkindex.append((pos,
1010 self._chunkindex.append((pos,
1006 super(unbundlepart, self).tell()))
1011 super(unbundlepart, self).tell()))
1007 yield result
1012 yield result
1008 payloadsize = self._unpack(_fpayloadsize)[0]
1013 payloadsize = self._unpack(_fpayloadsize)[0]
1009 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1014 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1010
1015
1011 def _findchunk(self, pos):
1016 def _findchunk(self, pos):
1012 '''for a given payload position, return a chunk number and offset'''
1017 '''for a given payload position, return a chunk number and offset'''
1013 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1018 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1014 if ppos == pos:
1019 if ppos == pos:
1015 return chunk, 0
1020 return chunk, 0
1016 elif ppos > pos:
1021 elif ppos > pos:
1017 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1022 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1018 raise ValueError('Unknown chunk')
1023 raise ValueError('Unknown chunk')
1019
1024
1020 def _readheader(self):
1025 def _readheader(self):
1021 """read the header and setup the object"""
1026 """read the header and setup the object"""
1022 typesize = self._unpackheader(_fparttypesize)[0]
1027 typesize = self._unpackheader(_fparttypesize)[0]
1023 self.type = self._fromheader(typesize)
1028 self.type = self._fromheader(typesize)
1024 indebug(self.ui, 'part type: "%s"' % self.type)
1029 indebug(self.ui, 'part type: "%s"' % self.type)
1025 self.id = self._unpackheader(_fpartid)[0]
1030 self.id = self._unpackheader(_fpartid)[0]
1026 indebug(self.ui, 'part id: "%s"' % self.id)
1031 indebug(self.ui, 'part id: "%s"' % self.id)
1027 # extract mandatory bit from type
1032 # extract mandatory bit from type
1028 self.mandatory = (self.type != self.type.lower())
1033 self.mandatory = (self.type != self.type.lower())
1029 self.type = self.type.lower()
1034 self.type = self.type.lower()
1030 ## reading parameters
1035 ## reading parameters
1031 # param count
1036 # param count
1032 mancount, advcount = self._unpackheader(_fpartparamcount)
1037 mancount, advcount = self._unpackheader(_fpartparamcount)
1033 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1038 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1034 # param size
1039 # param size
1035 fparamsizes = _makefpartparamsizes(mancount + advcount)
1040 fparamsizes = _makefpartparamsizes(mancount + advcount)
1036 paramsizes = self._unpackheader(fparamsizes)
1041 paramsizes = self._unpackheader(fparamsizes)
1037 # make it a list of couple again
1042 # make it a list of couple again
1038 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
1043 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
1039 # split mandatory from advisory
1044 # split mandatory from advisory
1040 mansizes = paramsizes[:mancount]
1045 mansizes = paramsizes[:mancount]
1041 advsizes = paramsizes[mancount:]
1046 advsizes = paramsizes[mancount:]
1042 # retrieve param value
1047 # retrieve param value
1043 manparams = []
1048 manparams = []
1044 for key, value in mansizes:
1049 for key, value in mansizes:
1045 manparams.append((self._fromheader(key), self._fromheader(value)))
1050 manparams.append((self._fromheader(key), self._fromheader(value)))
1046 advparams = []
1051 advparams = []
1047 for key, value in advsizes:
1052 for key, value in advsizes:
1048 advparams.append((self._fromheader(key), self._fromheader(value)))
1053 advparams.append((self._fromheader(key), self._fromheader(value)))
1049 self._initparams(manparams, advparams)
1054 self._initparams(manparams, advparams)
1050 ## part payload
1055 ## part payload
1051 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1056 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1052 # we read the data, tell it
1057 # we read the data, tell it
1053 self._initialized = True
1058 self._initialized = True
1054
1059
1055 def read(self, size=None):
1060 def read(self, size=None):
1056 """read payload data"""
1061 """read payload data"""
1057 if not self._initialized:
1062 if not self._initialized:
1058 self._readheader()
1063 self._readheader()
1059 if size is None:
1064 if size is None:
1060 data = self._payloadstream.read()
1065 data = self._payloadstream.read()
1061 else:
1066 else:
1062 data = self._payloadstream.read(size)
1067 data = self._payloadstream.read(size)
1063 self._pos += len(data)
1068 self._pos += len(data)
1064 if size is None or len(data) < size:
1069 if size is None or len(data) < size:
1065 if not self.consumed and self._pos:
1070 if not self.consumed and self._pos:
1066 self.ui.debug('bundle2-input-part: total payload size %i\n'
1071 self.ui.debug('bundle2-input-part: total payload size %i\n'
1067 % self._pos)
1072 % self._pos)
1068 self.consumed = True
1073 self.consumed = True
1069 return data
1074 return data
1070
1075
1071 def tell(self):
1076 def tell(self):
1072 return self._pos
1077 return self._pos
1073
1078
1074 def seek(self, offset, whence=0):
1079 def seek(self, offset, whence=0):
1075 if whence == 0:
1080 if whence == 0:
1076 newpos = offset
1081 newpos = offset
1077 elif whence == 1:
1082 elif whence == 1:
1078 newpos = self._pos + offset
1083 newpos = self._pos + offset
1079 elif whence == 2:
1084 elif whence == 2:
1080 if not self.consumed:
1085 if not self.consumed:
1081 self.read()
1086 self.read()
1082 newpos = self._chunkindex[-1][0] - offset
1087 newpos = self._chunkindex[-1][0] - offset
1083 else:
1088 else:
1084 raise ValueError('Unknown whence value: %r' % (whence,))
1089 raise ValueError('Unknown whence value: %r' % (whence,))
1085
1090
1086 if newpos > self._chunkindex[-1][0] and not self.consumed:
1091 if newpos > self._chunkindex[-1][0] and not self.consumed:
1087 self.read()
1092 self.read()
1088 if not 0 <= newpos <= self._chunkindex[-1][0]:
1093 if not 0 <= newpos <= self._chunkindex[-1][0]:
1089 raise ValueError('Offset out of range')
1094 raise ValueError('Offset out of range')
1090
1095
1091 if self._pos != newpos:
1096 if self._pos != newpos:
1092 chunk, internaloffset = self._findchunk(newpos)
1097 chunk, internaloffset = self._findchunk(newpos)
1093 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1098 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1094 adjust = self.read(internaloffset)
1099 adjust = self.read(internaloffset)
1095 if len(adjust) != internaloffset:
1100 if len(adjust) != internaloffset:
1096 raise util.Abort(_('Seek failed\n'))
1101 raise util.Abort(_('Seek failed\n'))
1097 self._pos = newpos
1102 self._pos = newpos
1098
1103
1099 # These are only the static capabilities.
1104 # These are only the static capabilities.
1100 # Check the 'getrepocaps' function for the rest.
1105 # Check the 'getrepocaps' function for the rest.
1101 capabilities = {'HG20': (),
1106 capabilities = {'HG20': (),
1102 'listkeys': (),
1107 'listkeys': (),
1103 'pushkey': (),
1108 'pushkey': (),
1104 'digests': tuple(sorted(util.DIGESTS.keys())),
1109 'digests': tuple(sorted(util.DIGESTS.keys())),
1105 'remote-changegroup': ('http', 'https'),
1110 'remote-changegroup': ('http', 'https'),
1106 }
1111 }
1107
1112
1108 def getrepocaps(repo, allowpushback=False):
1113 def getrepocaps(repo, allowpushback=False):
1109 """return the bundle2 capabilities for a given repo
1114 """return the bundle2 capabilities for a given repo
1110
1115
1111 Exists to allow extensions (like evolution) to mutate the capabilities.
1116 Exists to allow extensions (like evolution) to mutate the capabilities.
1112 """
1117 """
1113 caps = capabilities.copy()
1118 caps = capabilities.copy()
1114 caps['changegroup'] = tuple(sorted(changegroup.packermap.keys()))
1119 caps['changegroup'] = tuple(sorted(changegroup.packermap.keys()))
1115 if obsolete.isenabled(repo, obsolete.exchangeopt):
1120 if obsolete.isenabled(repo, obsolete.exchangeopt):
1116 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1121 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1117 caps['obsmarkers'] = supportedformat
1122 caps['obsmarkers'] = supportedformat
1118 if allowpushback:
1123 if allowpushback:
1119 caps['pushback'] = ()
1124 caps['pushback'] = ()
1120 return caps
1125 return caps
1121
1126
1122 def bundle2caps(remote):
1127 def bundle2caps(remote):
1123 """return the bundle capabilities of a peer as dict"""
1128 """return the bundle capabilities of a peer as dict"""
1124 raw = remote.capable('bundle2')
1129 raw = remote.capable('bundle2')
1125 if not raw and raw != '':
1130 if not raw and raw != '':
1126 return {}
1131 return {}
1127 capsblob = urllib.unquote(remote.capable('bundle2'))
1132 capsblob = urllib.unquote(remote.capable('bundle2'))
1128 return decodecaps(capsblob)
1133 return decodecaps(capsblob)
1129
1134
1130 def obsmarkersversion(caps):
1135 def obsmarkersversion(caps):
1131 """extract the list of supported obsmarkers versions from a bundle2caps dict
1136 """extract the list of supported obsmarkers versions from a bundle2caps dict
1132 """
1137 """
1133 obscaps = caps.get('obsmarkers', ())
1138 obscaps = caps.get('obsmarkers', ())
1134 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1139 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1135
1140
1136 @parthandler('changegroup', ('version',))
1141 @parthandler('changegroup', ('version',))
1137 def handlechangegroup(op, inpart):
1142 def handlechangegroup(op, inpart):
1138 """apply a changegroup part on the repo
1143 """apply a changegroup part on the repo
1139
1144
1140 This is a very early implementation that will massive rework before being
1145 This is a very early implementation that will massive rework before being
1141 inflicted to any end-user.
1146 inflicted to any end-user.
1142 """
1147 """
1143 # Make sure we trigger a transaction creation
1148 # Make sure we trigger a transaction creation
1144 #
1149 #
1145 # The addchangegroup function will get a transaction object by itself, but
1150 # The addchangegroup function will get a transaction object by itself, but
1146 # we need to make sure we trigger the creation of a transaction object used
1151 # we need to make sure we trigger the creation of a transaction object used
1147 # for the whole processing scope.
1152 # for the whole processing scope.
1148 op.gettransaction()
1153 op.gettransaction()
1149 unpackerversion = inpart.params.get('version', '01')
1154 unpackerversion = inpart.params.get('version', '01')
1150 # We should raise an appropriate exception here
1155 # We should raise an appropriate exception here
1151 unpacker = changegroup.packermap[unpackerversion][1]
1156 unpacker = changegroup.packermap[unpackerversion][1]
1152 cg = unpacker(inpart, 'UN')
1157 cg = unpacker(inpart, 'UN')
1153 # the source and url passed here are overwritten by the one contained in
1158 # the source and url passed here are overwritten by the one contained in
1154 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1159 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1155 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
1160 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
1156 op.records.add('changegroup', {'return': ret})
1161 op.records.add('changegroup', {'return': ret})
1157 if op.reply is not None:
1162 if op.reply is not None:
1158 # This is definitely not the final form of this
1163 # This is definitely not the final form of this
1159 # return. But one need to start somewhere.
1164 # return. But one need to start somewhere.
1160 part = op.reply.newpart('reply:changegroup', mandatory=False)
1165 part = op.reply.newpart('reply:changegroup', mandatory=False)
1161 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1166 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1162 part.addparam('return', '%i' % ret, mandatory=False)
1167 part.addparam('return', '%i' % ret, mandatory=False)
1163 assert not inpart.read()
1168 assert not inpart.read()
1164
1169
1165 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1170 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1166 ['digest:%s' % k for k in util.DIGESTS.keys()])
1171 ['digest:%s' % k for k in util.DIGESTS.keys()])
1167 @parthandler('remote-changegroup', _remotechangegroupparams)
1172 @parthandler('remote-changegroup', _remotechangegroupparams)
1168 def handleremotechangegroup(op, inpart):
1173 def handleremotechangegroup(op, inpart):
1169 """apply a bundle10 on the repo, given an url and validation information
1174 """apply a bundle10 on the repo, given an url and validation information
1170
1175
1171 All the information about the remote bundle to import are given as
1176 All the information about the remote bundle to import are given as
1172 parameters. The parameters include:
1177 parameters. The parameters include:
1173 - url: the url to the bundle10.
1178 - url: the url to the bundle10.
1174 - size: the bundle10 file size. It is used to validate what was
1179 - size: the bundle10 file size. It is used to validate what was
1175 retrieved by the client matches the server knowledge about the bundle.
1180 retrieved by the client matches the server knowledge about the bundle.
1176 - digests: a space separated list of the digest types provided as
1181 - digests: a space separated list of the digest types provided as
1177 parameters.
1182 parameters.
1178 - digest:<digest-type>: the hexadecimal representation of the digest with
1183 - digest:<digest-type>: the hexadecimal representation of the digest with
1179 that name. Like the size, it is used to validate what was retrieved by
1184 that name. Like the size, it is used to validate what was retrieved by
1180 the client matches what the server knows about the bundle.
1185 the client matches what the server knows about the bundle.
1181
1186
1182 When multiple digest types are given, all of them are checked.
1187 When multiple digest types are given, all of them are checked.
1183 """
1188 """
1184 try:
1189 try:
1185 raw_url = inpart.params['url']
1190 raw_url = inpart.params['url']
1186 except KeyError:
1191 except KeyError:
1187 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1192 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1188 parsed_url = util.url(raw_url)
1193 parsed_url = util.url(raw_url)
1189 if parsed_url.scheme not in capabilities['remote-changegroup']:
1194 if parsed_url.scheme not in capabilities['remote-changegroup']:
1190 raise util.Abort(_('remote-changegroup does not support %s urls') %
1195 raise util.Abort(_('remote-changegroup does not support %s urls') %
1191 parsed_url.scheme)
1196 parsed_url.scheme)
1192
1197
1193 try:
1198 try:
1194 size = int(inpart.params['size'])
1199 size = int(inpart.params['size'])
1195 except ValueError:
1200 except ValueError:
1196 raise util.Abort(_('remote-changegroup: invalid value for param "%s"')
1201 raise util.Abort(_('remote-changegroup: invalid value for param "%s"')
1197 % 'size')
1202 % 'size')
1198 except KeyError:
1203 except KeyError:
1199 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1204 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1200
1205
1201 digests = {}
1206 digests = {}
1202 for typ in inpart.params.get('digests', '').split():
1207 for typ in inpart.params.get('digests', '').split():
1203 param = 'digest:%s' % typ
1208 param = 'digest:%s' % typ
1204 try:
1209 try:
1205 value = inpart.params[param]
1210 value = inpart.params[param]
1206 except KeyError:
1211 except KeyError:
1207 raise util.Abort(_('remote-changegroup: missing "%s" param') %
1212 raise util.Abort(_('remote-changegroup: missing "%s" param') %
1208 param)
1213 param)
1209 digests[typ] = value
1214 digests[typ] = value
1210
1215
1211 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1216 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1212
1217
1213 # Make sure we trigger a transaction creation
1218 # Make sure we trigger a transaction creation
1214 #
1219 #
1215 # The addchangegroup function will get a transaction object by itself, but
1220 # The addchangegroup function will get a transaction object by itself, but
1216 # we need to make sure we trigger the creation of a transaction object used
1221 # we need to make sure we trigger the creation of a transaction object used
1217 # for the whole processing scope.
1222 # for the whole processing scope.
1218 op.gettransaction()
1223 op.gettransaction()
1219 import exchange
1224 import exchange
1220 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1225 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1221 if not isinstance(cg, changegroup.cg1unpacker):
1226 if not isinstance(cg, changegroup.cg1unpacker):
1222 raise util.Abort(_('%s: not a bundle version 1.0') %
1227 raise util.Abort(_('%s: not a bundle version 1.0') %
1223 util.hidepassword(raw_url))
1228 util.hidepassword(raw_url))
1224 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
1229 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
1225 op.records.add('changegroup', {'return': ret})
1230 op.records.add('changegroup', {'return': ret})
1226 if op.reply is not None:
1231 if op.reply is not None:
1227 # This is definitely not the final form of this
1232 # This is definitely not the final form of this
1228 # return. But one need to start somewhere.
1233 # return. But one need to start somewhere.
1229 part = op.reply.newpart('reply:changegroup')
1234 part = op.reply.newpart('reply:changegroup')
1230 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1235 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1231 part.addparam('return', '%i' % ret, mandatory=False)
1236 part.addparam('return', '%i' % ret, mandatory=False)
1232 try:
1237 try:
1233 real_part.validate()
1238 real_part.validate()
1234 except util.Abort, e:
1239 except util.Abort, e:
1235 raise util.Abort(_('bundle at %s is corrupted:\n%s') %
1240 raise util.Abort(_('bundle at %s is corrupted:\n%s') %
1236 (util.hidepassword(raw_url), str(e)))
1241 (util.hidepassword(raw_url), str(e)))
1237 assert not inpart.read()
1242 assert not inpart.read()
1238
1243
1239 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1244 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1240 def handlereplychangegroup(op, inpart):
1245 def handlereplychangegroup(op, inpart):
1241 ret = int(inpart.params['return'])
1246 ret = int(inpart.params['return'])
1242 replyto = int(inpart.params['in-reply-to'])
1247 replyto = int(inpart.params['in-reply-to'])
1243 op.records.add('changegroup', {'return': ret}, replyto)
1248 op.records.add('changegroup', {'return': ret}, replyto)
1244
1249
1245 @parthandler('check:heads')
1250 @parthandler('check:heads')
1246 def handlecheckheads(op, inpart):
1251 def handlecheckheads(op, inpart):
1247 """check that head of the repo did not change
1252 """check that head of the repo did not change
1248
1253
1249 This is used to detect a push race when using unbundle.
1254 This is used to detect a push race when using unbundle.
1250 This replaces the "heads" argument of unbundle."""
1255 This replaces the "heads" argument of unbundle."""
1251 h = inpart.read(20)
1256 h = inpart.read(20)
1252 heads = []
1257 heads = []
1253 while len(h) == 20:
1258 while len(h) == 20:
1254 heads.append(h)
1259 heads.append(h)
1255 h = inpart.read(20)
1260 h = inpart.read(20)
1256 assert not h
1261 assert not h
1257 if heads != op.repo.heads():
1262 if heads != op.repo.heads():
1258 raise error.PushRaced('repository changed while pushing - '
1263 raise error.PushRaced('repository changed while pushing - '
1259 'please try again')
1264 'please try again')
1260
1265
1261 @parthandler('output')
1266 @parthandler('output')
1262 def handleoutput(op, inpart):
1267 def handleoutput(op, inpart):
1263 """forward output captured on the server to the client"""
1268 """forward output captured on the server to the client"""
1264 for line in inpart.read().splitlines():
1269 for line in inpart.read().splitlines():
1265 op.ui.status(('remote: %s\n' % line))
1270 op.ui.status(('remote: %s\n' % line))
1266
1271
1267 @parthandler('replycaps')
1272 @parthandler('replycaps')
1268 def handlereplycaps(op, inpart):
1273 def handlereplycaps(op, inpart):
1269 """Notify that a reply bundle should be created
1274 """Notify that a reply bundle should be created
1270
1275
1271 The payload contains the capabilities information for the reply"""
1276 The payload contains the capabilities information for the reply"""
1272 caps = decodecaps(inpart.read())
1277 caps = decodecaps(inpart.read())
1273 if op.reply is None:
1278 if op.reply is None:
1274 op.reply = bundle20(op.ui, caps)
1279 op.reply = bundle20(op.ui, caps)
1275
1280
1276 @parthandler('error:abort', ('message', 'hint'))
1281 @parthandler('error:abort', ('message', 'hint'))
1277 def handleerrorabort(op, inpart):
1282 def handleerrorabort(op, inpart):
1278 """Used to transmit abort error over the wire"""
1283 """Used to transmit abort error over the wire"""
1279 raise util.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
1284 raise util.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
1280
1285
1281 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1286 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1282 def handleerrorunsupportedcontent(op, inpart):
1287 def handleerrorunsupportedcontent(op, inpart):
1283 """Used to transmit unknown content error over the wire"""
1288 """Used to transmit unknown content error over the wire"""
1284 kwargs = {}
1289 kwargs = {}
1285 parttype = inpart.params.get('parttype')
1290 parttype = inpart.params.get('parttype')
1286 if parttype is not None:
1291 if parttype is not None:
1287 kwargs['parttype'] = parttype
1292 kwargs['parttype'] = parttype
1288 params = inpart.params.get('params')
1293 params = inpart.params.get('params')
1289 if params is not None:
1294 if params is not None:
1290 kwargs['params'] = params.split('\0')
1295 kwargs['params'] = params.split('\0')
1291
1296
1292 raise error.UnsupportedPartError(**kwargs)
1297 raise error.UnsupportedPartError(**kwargs)
1293
1298
1294 @parthandler('error:pushraced', ('message',))
1299 @parthandler('error:pushraced', ('message',))
1295 def handleerrorpushraced(op, inpart):
1300 def handleerrorpushraced(op, inpart):
1296 """Used to transmit push race error over the wire"""
1301 """Used to transmit push race error over the wire"""
1297 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1302 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1298
1303
1299 @parthandler('listkeys', ('namespace',))
1304 @parthandler('listkeys', ('namespace',))
1300 def handlelistkeys(op, inpart):
1305 def handlelistkeys(op, inpart):
1301 """retrieve pushkey namespace content stored in a bundle2"""
1306 """retrieve pushkey namespace content stored in a bundle2"""
1302 namespace = inpart.params['namespace']
1307 namespace = inpart.params['namespace']
1303 r = pushkey.decodekeys(inpart.read())
1308 r = pushkey.decodekeys(inpart.read())
1304 op.records.add('listkeys', (namespace, r))
1309 op.records.add('listkeys', (namespace, r))
1305
1310
1306 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1311 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1307 def handlepushkey(op, inpart):
1312 def handlepushkey(op, inpart):
1308 """process a pushkey request"""
1313 """process a pushkey request"""
1309 dec = pushkey.decode
1314 dec = pushkey.decode
1310 namespace = dec(inpart.params['namespace'])
1315 namespace = dec(inpart.params['namespace'])
1311 key = dec(inpart.params['key'])
1316 key = dec(inpart.params['key'])
1312 old = dec(inpart.params['old'])
1317 old = dec(inpart.params['old'])
1313 new = dec(inpart.params['new'])
1318 new = dec(inpart.params['new'])
1314 ret = op.repo.pushkey(namespace, key, old, new)
1319 ret = op.repo.pushkey(namespace, key, old, new)
1315 record = {'namespace': namespace,
1320 record = {'namespace': namespace,
1316 'key': key,
1321 'key': key,
1317 'old': old,
1322 'old': old,
1318 'new': new}
1323 'new': new}
1319 op.records.add('pushkey', record)
1324 op.records.add('pushkey', record)
1320 if op.reply is not None:
1325 if op.reply is not None:
1321 rpart = op.reply.newpart('reply:pushkey')
1326 rpart = op.reply.newpart('reply:pushkey')
1322 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1327 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1323 rpart.addparam('return', '%i' % ret, mandatory=False)
1328 rpart.addparam('return', '%i' % ret, mandatory=False)
1324
1329
1325 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1330 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1326 def handlepushkeyreply(op, inpart):
1331 def handlepushkeyreply(op, inpart):
1327 """retrieve the result of a pushkey request"""
1332 """retrieve the result of a pushkey request"""
1328 ret = int(inpart.params['return'])
1333 ret = int(inpart.params['return'])
1329 partid = int(inpart.params['in-reply-to'])
1334 partid = int(inpart.params['in-reply-to'])
1330 op.records.add('pushkey', {'return': ret}, partid)
1335 op.records.add('pushkey', {'return': ret}, partid)
1331
1336
1332 @parthandler('obsmarkers')
1337 @parthandler('obsmarkers')
1333 def handleobsmarker(op, inpart):
1338 def handleobsmarker(op, inpart):
1334 """add a stream of obsmarkers to the repo"""
1339 """add a stream of obsmarkers to the repo"""
1335 tr = op.gettransaction()
1340 tr = op.gettransaction()
1336 markerdata = inpart.read()
1341 markerdata = inpart.read()
1337 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1342 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1338 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1343 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1339 % len(markerdata))
1344 % len(markerdata))
1340 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1345 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1341 if new:
1346 if new:
1342 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1347 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1343 op.records.add('obsmarkers', {'new': new})
1348 op.records.add('obsmarkers', {'new': new})
1344 if op.reply is not None:
1349 if op.reply is not None:
1345 rpart = op.reply.newpart('reply:obsmarkers')
1350 rpart = op.reply.newpart('reply:obsmarkers')
1346 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1351 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1347 rpart.addparam('new', '%i' % new, mandatory=False)
1352 rpart.addparam('new', '%i' % new, mandatory=False)
1348
1353
1349
1354
1350 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1355 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1351 def handlepushkeyreply(op, inpart):
1356 def handlepushkeyreply(op, inpart):
1352 """retrieve the result of a pushkey request"""
1357 """retrieve the result of a pushkey request"""
1353 ret = int(inpart.params['new'])
1358 ret = int(inpart.params['new'])
1354 partid = int(inpart.params['in-reply-to'])
1359 partid = int(inpart.params['in-reply-to'])
1355 op.records.add('obsmarkers', {'new': ret}, partid)
1360 op.records.add('obsmarkers', {'new': ret}, partid)
General Comments 0
You need to be logged in to leave comments. Login now