##// END OF EJS Templates
bundle2: split parameter retrieval and processing...
Pierre-Yves David -
r26541:d40029b4 default
parent child Browse files
Show More
@@ -1,1463 +1,1471 b''
1 # bundle2.py - generic container format to transmit arbitrary data.
1 # bundle2.py - generic container format to transmit arbitrary data.
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7 """Handling of the new bundle2 format
7 """Handling of the new bundle2 format
8
8
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 payloads in an application agnostic way. It consist in a sequence of "parts"
10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 that will be handed to and processed by the application layer.
11 that will be handed to and processed by the application layer.
12
12
13
13
14 General format architecture
14 General format architecture
15 ===========================
15 ===========================
16
16
17 The format is architectured as follow
17 The format is architectured as follow
18
18
19 - magic string
19 - magic string
20 - stream level parameters
20 - stream level parameters
21 - payload parts (any number)
21 - payload parts (any number)
22 - end of stream marker.
22 - end of stream marker.
23
23
24 the Binary format
24 the Binary format
25 ============================
25 ============================
26
26
27 All numbers are unsigned and big-endian.
27 All numbers are unsigned and big-endian.
28
28
29 stream level parameters
29 stream level parameters
30 ------------------------
30 ------------------------
31
31
32 Binary format is as follow
32 Binary format is as follow
33
33
34 :params size: int32
34 :params size: int32
35
35
36 The total number of Bytes used by the parameters
36 The total number of Bytes used by the parameters
37
37
38 :params value: arbitrary number of Bytes
38 :params value: arbitrary number of Bytes
39
39
40 A blob of `params size` containing the serialized version of all stream level
40 A blob of `params size` containing the serialized version of all stream level
41 parameters.
41 parameters.
42
42
43 The blob contains a space separated list of parameters. Parameters with value
43 The blob contains a space separated list of parameters. Parameters with value
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45
45
46 Empty name are obviously forbidden.
46 Empty name are obviously forbidden.
47
47
48 Name MUST start with a letter. If this first letter is lower case, the
48 Name MUST start with a letter. If this first letter is lower case, the
49 parameter is advisory and can be safely ignored. However when the first
49 parameter is advisory and can be safely ignored. However when the first
50 letter is capital, the parameter is mandatory and the bundling process MUST
50 letter is capital, the parameter is mandatory and the bundling process MUST
51 stop if he is not able to proceed it.
51 stop if he is not able to proceed it.
52
52
53 Stream parameters use a simple textual format for two main reasons:
53 Stream parameters use a simple textual format for two main reasons:
54
54
55 - Stream level parameters should remain simple and we want to discourage any
55 - Stream level parameters should remain simple and we want to discourage any
56 crazy usage.
56 crazy usage.
57 - Textual data allow easy human inspection of a bundle2 header in case of
57 - Textual data allow easy human inspection of a bundle2 header in case of
58 troubles.
58 troubles.
59
59
60 Any Applicative level options MUST go into a bundle2 part instead.
60 Any Applicative level options MUST go into a bundle2 part instead.
61
61
62 Payload part
62 Payload part
63 ------------------------
63 ------------------------
64
64
65 Binary format is as follow
65 Binary format is as follow
66
66
67 :header size: int32
67 :header size: int32
68
68
69 The total number of Bytes used by the part header. When the header is empty
69 The total number of Bytes used by the part header. When the header is empty
70 (size = 0) this is interpreted as the end of stream marker.
70 (size = 0) this is interpreted as the end of stream marker.
71
71
72 :header:
72 :header:
73
73
74 The header defines how to interpret the part. It contains two piece of
74 The header defines how to interpret the part. It contains two piece of
75 data: the part type, and the part parameters.
75 data: the part type, and the part parameters.
76
76
77 The part type is used to route an application level handler, that can
77 The part type is used to route an application level handler, that can
78 interpret payload.
78 interpret payload.
79
79
80 Part parameters are passed to the application level handler. They are
80 Part parameters are passed to the application level handler. They are
81 meant to convey information that will help the application level object to
81 meant to convey information that will help the application level object to
82 interpret the part payload.
82 interpret the part payload.
83
83
84 The binary format of the header is has follow
84 The binary format of the header is has follow
85
85
86 :typesize: (one byte)
86 :typesize: (one byte)
87
87
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89
89
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 to this part.
91 to this part.
92
92
93 :parameters:
93 :parameters:
94
94
95 Part's parameter may have arbitrary content, the binary structure is::
95 Part's parameter may have arbitrary content, the binary structure is::
96
96
97 <mandatory-count><advisory-count><param-sizes><param-data>
97 <mandatory-count><advisory-count><param-sizes><param-data>
98
98
99 :mandatory-count: 1 byte, number of mandatory parameters
99 :mandatory-count: 1 byte, number of mandatory parameters
100
100
101 :advisory-count: 1 byte, number of advisory parameters
101 :advisory-count: 1 byte, number of advisory parameters
102
102
103 :param-sizes:
103 :param-sizes:
104
104
105 N couple of bytes, where N is the total number of parameters. Each
105 N couple of bytes, where N is the total number of parameters. Each
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107
107
108 :param-data:
108 :param-data:
109
109
110 A blob of bytes from which each parameter key and value can be
110 A blob of bytes from which each parameter key and value can be
111 retrieved using the list of size couples stored in the previous
111 retrieved using the list of size couples stored in the previous
112 field.
112 field.
113
113
114 Mandatory parameters comes first, then the advisory ones.
114 Mandatory parameters comes first, then the advisory ones.
115
115
116 Each parameter's key MUST be unique within the part.
116 Each parameter's key MUST be unique within the part.
117
117
118 :payload:
118 :payload:
119
119
120 payload is a series of `<chunksize><chunkdata>`.
120 payload is a series of `<chunksize><chunkdata>`.
121
121
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124
124
125 The current implementation always produces either zero or one chunk.
125 The current implementation always produces either zero or one chunk.
126 This is an implementation limitation that will ultimately be lifted.
126 This is an implementation limitation that will ultimately be lifted.
127
127
128 `chunksize` can be negative to trigger special case processing. No such
128 `chunksize` can be negative to trigger special case processing. No such
129 processing is in place yet.
129 processing is in place yet.
130
130
131 Bundle processing
131 Bundle processing
132 ============================
132 ============================
133
133
134 Each part is processed in order using a "part handler". Handler are registered
134 Each part is processed in order using a "part handler". Handler are registered
135 for a certain part type.
135 for a certain part type.
136
136
137 The matching of a part to its handler is case insensitive. The case of the
137 The matching of a part to its handler is case insensitive. The case of the
138 part type is used to know if a part is mandatory or advisory. If the Part type
138 part type is used to know if a part is mandatory or advisory. If the Part type
139 contains any uppercase char it is considered mandatory. When no handler is
139 contains any uppercase char it is considered mandatory. When no handler is
140 known for a Mandatory part, the process is aborted and an exception is raised.
140 known for a Mandatory part, the process is aborted and an exception is raised.
141 If the part is advisory and no handler is known, the part is ignored. When the
141 If the part is advisory and no handler is known, the part is ignored. When the
142 process is aborted, the full bundle is still read from the stream to keep the
142 process is aborted, the full bundle is still read from the stream to keep the
143 channel usable. But none of the part read from an abort are processed. In the
143 channel usable. But none of the part read from an abort are processed. In the
144 future, dropping the stream may become an option for channel we do not care to
144 future, dropping the stream may become an option for channel we do not care to
145 preserve.
145 preserve.
146 """
146 """
147
147
148 from __future__ import absolute_import
148 from __future__ import absolute_import
149
149
150 import errno
150 import errno
151 import re
151 import re
152 import string
152 import string
153 import struct
153 import struct
154 import sys
154 import sys
155 import urllib
155 import urllib
156
156
157 from .i18n import _
157 from .i18n import _
158 from . import (
158 from . import (
159 changegroup,
159 changegroup,
160 error,
160 error,
161 obsolete,
161 obsolete,
162 pushkey,
162 pushkey,
163 tags,
163 tags,
164 url,
164 url,
165 util,
165 util,
166 )
166 )
167
167
168 _pack = struct.pack
168 _pack = struct.pack
169 _unpack = struct.unpack
169 _unpack = struct.unpack
170
170
171 _fstreamparamsize = '>i'
171 _fstreamparamsize = '>i'
172 _fpartheadersize = '>i'
172 _fpartheadersize = '>i'
173 _fparttypesize = '>B'
173 _fparttypesize = '>B'
174 _fpartid = '>I'
174 _fpartid = '>I'
175 _fpayloadsize = '>i'
175 _fpayloadsize = '>i'
176 _fpartparamcount = '>BB'
176 _fpartparamcount = '>BB'
177
177
178 preferedchunksize = 4096
178 preferedchunksize = 4096
179
179
180 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
180 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
181
181
182 def outdebug(ui, message):
182 def outdebug(ui, message):
183 """debug regarding output stream (bundling)"""
183 """debug regarding output stream (bundling)"""
184 if ui.configbool('devel', 'bundle2.debug', False):
184 if ui.configbool('devel', 'bundle2.debug', False):
185 ui.debug('bundle2-output: %s\n' % message)
185 ui.debug('bundle2-output: %s\n' % message)
186
186
187 def indebug(ui, message):
187 def indebug(ui, message):
188 """debug on input stream (unbundling)"""
188 """debug on input stream (unbundling)"""
189 if ui.configbool('devel', 'bundle2.debug', False):
189 if ui.configbool('devel', 'bundle2.debug', False):
190 ui.debug('bundle2-input: %s\n' % message)
190 ui.debug('bundle2-input: %s\n' % message)
191
191
192 def validateparttype(parttype):
192 def validateparttype(parttype):
193 """raise ValueError if a parttype contains invalid character"""
193 """raise ValueError if a parttype contains invalid character"""
194 if _parttypeforbidden.search(parttype):
194 if _parttypeforbidden.search(parttype):
195 raise ValueError(parttype)
195 raise ValueError(parttype)
196
196
197 def _makefpartparamsizes(nbparams):
197 def _makefpartparamsizes(nbparams):
198 """return a struct format to read part parameter sizes
198 """return a struct format to read part parameter sizes
199
199
200 The number parameters is variable so we need to build that format
200 The number parameters is variable so we need to build that format
201 dynamically.
201 dynamically.
202 """
202 """
203 return '>'+('BB'*nbparams)
203 return '>'+('BB'*nbparams)
204
204
205 parthandlermapping = {}
205 parthandlermapping = {}
206
206
207 def parthandler(parttype, params=()):
207 def parthandler(parttype, params=()):
208 """decorator that register a function as a bundle2 part handler
208 """decorator that register a function as a bundle2 part handler
209
209
210 eg::
210 eg::
211
211
212 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
212 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
213 def myparttypehandler(...):
213 def myparttypehandler(...):
214 '''process a part of type "my part".'''
214 '''process a part of type "my part".'''
215 ...
215 ...
216 """
216 """
217 validateparttype(parttype)
217 validateparttype(parttype)
218 def _decorator(func):
218 def _decorator(func):
219 lparttype = parttype.lower() # enforce lower case matching.
219 lparttype = parttype.lower() # enforce lower case matching.
220 assert lparttype not in parthandlermapping
220 assert lparttype not in parthandlermapping
221 parthandlermapping[lparttype] = func
221 parthandlermapping[lparttype] = func
222 func.params = frozenset(params)
222 func.params = frozenset(params)
223 return func
223 return func
224 return _decorator
224 return _decorator
225
225
226 class unbundlerecords(object):
226 class unbundlerecords(object):
227 """keep record of what happens during and unbundle
227 """keep record of what happens during and unbundle
228
228
229 New records are added using `records.add('cat', obj)`. Where 'cat' is a
229 New records are added using `records.add('cat', obj)`. Where 'cat' is a
230 category of record and obj is an arbitrary object.
230 category of record and obj is an arbitrary object.
231
231
232 `records['cat']` will return all entries of this category 'cat'.
232 `records['cat']` will return all entries of this category 'cat'.
233
233
234 Iterating on the object itself will yield `('category', obj)` tuples
234 Iterating on the object itself will yield `('category', obj)` tuples
235 for all entries.
235 for all entries.
236
236
237 All iterations happens in chronological order.
237 All iterations happens in chronological order.
238 """
238 """
239
239
240 def __init__(self):
240 def __init__(self):
241 self._categories = {}
241 self._categories = {}
242 self._sequences = []
242 self._sequences = []
243 self._replies = {}
243 self._replies = {}
244
244
245 def add(self, category, entry, inreplyto=None):
245 def add(self, category, entry, inreplyto=None):
246 """add a new record of a given category.
246 """add a new record of a given category.
247
247
248 The entry can then be retrieved in the list returned by
248 The entry can then be retrieved in the list returned by
249 self['category']."""
249 self['category']."""
250 self._categories.setdefault(category, []).append(entry)
250 self._categories.setdefault(category, []).append(entry)
251 self._sequences.append((category, entry))
251 self._sequences.append((category, entry))
252 if inreplyto is not None:
252 if inreplyto is not None:
253 self.getreplies(inreplyto).add(category, entry)
253 self.getreplies(inreplyto).add(category, entry)
254
254
255 def getreplies(self, partid):
255 def getreplies(self, partid):
256 """get the records that are replies to a specific part"""
256 """get the records that are replies to a specific part"""
257 return self._replies.setdefault(partid, unbundlerecords())
257 return self._replies.setdefault(partid, unbundlerecords())
258
258
259 def __getitem__(self, cat):
259 def __getitem__(self, cat):
260 return tuple(self._categories.get(cat, ()))
260 return tuple(self._categories.get(cat, ()))
261
261
262 def __iter__(self):
262 def __iter__(self):
263 return iter(self._sequences)
263 return iter(self._sequences)
264
264
265 def __len__(self):
265 def __len__(self):
266 return len(self._sequences)
266 return len(self._sequences)
267
267
268 def __nonzero__(self):
268 def __nonzero__(self):
269 return bool(self._sequences)
269 return bool(self._sequences)
270
270
271 class bundleoperation(object):
271 class bundleoperation(object):
272 """an object that represents a single bundling process
272 """an object that represents a single bundling process
273
273
274 Its purpose is to carry unbundle-related objects and states.
274 Its purpose is to carry unbundle-related objects and states.
275
275
276 A new object should be created at the beginning of each bundle processing.
276 A new object should be created at the beginning of each bundle processing.
277 The object is to be returned by the processing function.
277 The object is to be returned by the processing function.
278
278
279 The object has very little content now it will ultimately contain:
279 The object has very little content now it will ultimately contain:
280 * an access to the repo the bundle is applied to,
280 * an access to the repo the bundle is applied to,
281 * a ui object,
281 * a ui object,
282 * a way to retrieve a transaction to add changes to the repo,
282 * a way to retrieve a transaction to add changes to the repo,
283 * a way to record the result of processing each part,
283 * a way to record the result of processing each part,
284 * a way to construct a bundle response when applicable.
284 * a way to construct a bundle response when applicable.
285 """
285 """
286
286
287 def __init__(self, repo, transactiongetter, captureoutput=True):
287 def __init__(self, repo, transactiongetter, captureoutput=True):
288 self.repo = repo
288 self.repo = repo
289 self.ui = repo.ui
289 self.ui = repo.ui
290 self.records = unbundlerecords()
290 self.records = unbundlerecords()
291 self.gettransaction = transactiongetter
291 self.gettransaction = transactiongetter
292 self.reply = None
292 self.reply = None
293 self.captureoutput = captureoutput
293 self.captureoutput = captureoutput
294
294
295 class TransactionUnavailable(RuntimeError):
295 class TransactionUnavailable(RuntimeError):
296 pass
296 pass
297
297
298 def _notransaction():
298 def _notransaction():
299 """default method to get a transaction while processing a bundle
299 """default method to get a transaction while processing a bundle
300
300
301 Raise an exception to highlight the fact that no transaction was expected
301 Raise an exception to highlight the fact that no transaction was expected
302 to be created"""
302 to be created"""
303 raise TransactionUnavailable()
303 raise TransactionUnavailable()
304
304
305 def processbundle(repo, unbundler, transactiongetter=None, op=None):
305 def processbundle(repo, unbundler, transactiongetter=None, op=None):
306 """This function process a bundle, apply effect to/from a repo
306 """This function process a bundle, apply effect to/from a repo
307
307
308 It iterates over each part then searches for and uses the proper handling
308 It iterates over each part then searches for and uses the proper handling
309 code to process the part. Parts are processed in order.
309 code to process the part. Parts are processed in order.
310
310
311 This is very early version of this function that will be strongly reworked
311 This is very early version of this function that will be strongly reworked
312 before final usage.
312 before final usage.
313
313
314 Unknown Mandatory part will abort the process.
314 Unknown Mandatory part will abort the process.
315
315
316 It is temporarily possible to provide a prebuilt bundleoperation to the
316 It is temporarily possible to provide a prebuilt bundleoperation to the
317 function. This is used to ensure output is properly propagated in case of
317 function. This is used to ensure output is properly propagated in case of
318 an error during the unbundling. This output capturing part will likely be
318 an error during the unbundling. This output capturing part will likely be
319 reworked and this ability will probably go away in the process.
319 reworked and this ability will probably go away in the process.
320 """
320 """
321 if op is None:
321 if op is None:
322 if transactiongetter is None:
322 if transactiongetter is None:
323 transactiongetter = _notransaction
323 transactiongetter = _notransaction
324 op = bundleoperation(repo, transactiongetter)
324 op = bundleoperation(repo, transactiongetter)
325 # todo:
325 # todo:
326 # - replace this is a init function soon.
326 # - replace this is a init function soon.
327 # - exception catching
327 # - exception catching
328 unbundler.params
328 unbundler.params
329 if repo.ui.debugflag:
329 if repo.ui.debugflag:
330 msg = ['bundle2-input-bundle:']
330 msg = ['bundle2-input-bundle:']
331 if unbundler.params:
331 if unbundler.params:
332 msg.append(' %i params')
332 msg.append(' %i params')
333 if op.gettransaction is None:
333 if op.gettransaction is None:
334 msg.append(' no-transaction')
334 msg.append(' no-transaction')
335 else:
335 else:
336 msg.append(' with-transaction')
336 msg.append(' with-transaction')
337 msg.append('\n')
337 msg.append('\n')
338 repo.ui.debug(''.join(msg))
338 repo.ui.debug(''.join(msg))
339 iterparts = enumerate(unbundler.iterparts())
339 iterparts = enumerate(unbundler.iterparts())
340 part = None
340 part = None
341 nbpart = 0
341 nbpart = 0
342 try:
342 try:
343 for nbpart, part in iterparts:
343 for nbpart, part in iterparts:
344 _processpart(op, part)
344 _processpart(op, part)
345 except BaseException as exc:
345 except BaseException as exc:
346 for nbpart, part in iterparts:
346 for nbpart, part in iterparts:
347 # consume the bundle content
347 # consume the bundle content
348 part.seek(0, 2)
348 part.seek(0, 2)
349 # Small hack to let caller code distinguish exceptions from bundle2
349 # Small hack to let caller code distinguish exceptions from bundle2
350 # processing from processing the old format. This is mostly
350 # processing from processing the old format. This is mostly
351 # needed to handle different return codes to unbundle according to the
351 # needed to handle different return codes to unbundle according to the
352 # type of bundle. We should probably clean up or drop this return code
352 # type of bundle. We should probably clean up or drop this return code
353 # craziness in a future version.
353 # craziness in a future version.
354 exc.duringunbundle2 = True
354 exc.duringunbundle2 = True
355 salvaged = []
355 salvaged = []
356 replycaps = None
356 replycaps = None
357 if op.reply is not None:
357 if op.reply is not None:
358 salvaged = op.reply.salvageoutput()
358 salvaged = op.reply.salvageoutput()
359 replycaps = op.reply.capabilities
359 replycaps = op.reply.capabilities
360 exc._replycaps = replycaps
360 exc._replycaps = replycaps
361 exc._bundle2salvagedoutput = salvaged
361 exc._bundle2salvagedoutput = salvaged
362 raise
362 raise
363 finally:
363 finally:
364 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
364 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
365
365
366 return op
366 return op
367
367
368 def _processpart(op, part):
368 def _processpart(op, part):
369 """process a single part from a bundle
369 """process a single part from a bundle
370
370
371 The part is guaranteed to have been fully consumed when the function exits
371 The part is guaranteed to have been fully consumed when the function exits
372 (even if an exception is raised)."""
372 (even if an exception is raised)."""
373 status = 'unknown' # used by debug output
373 status = 'unknown' # used by debug output
374 try:
374 try:
375 try:
375 try:
376 handler = parthandlermapping.get(part.type)
376 handler = parthandlermapping.get(part.type)
377 if handler is None:
377 if handler is None:
378 status = 'unsupported-type'
378 status = 'unsupported-type'
379 raise error.BundleUnknownFeatureError(parttype=part.type)
379 raise error.BundleUnknownFeatureError(parttype=part.type)
380 indebug(op.ui, 'found a handler for part %r' % part.type)
380 indebug(op.ui, 'found a handler for part %r' % part.type)
381 unknownparams = part.mandatorykeys - handler.params
381 unknownparams = part.mandatorykeys - handler.params
382 if unknownparams:
382 if unknownparams:
383 unknownparams = list(unknownparams)
383 unknownparams = list(unknownparams)
384 unknownparams.sort()
384 unknownparams.sort()
385 status = 'unsupported-params (%s)' % unknownparams
385 status = 'unsupported-params (%s)' % unknownparams
386 raise error.BundleUnknownFeatureError(parttype=part.type,
386 raise error.BundleUnknownFeatureError(parttype=part.type,
387 params=unknownparams)
387 params=unknownparams)
388 status = 'supported'
388 status = 'supported'
389 except error.BundleUnknownFeatureError as exc:
389 except error.BundleUnknownFeatureError as exc:
390 if part.mandatory: # mandatory parts
390 if part.mandatory: # mandatory parts
391 raise
391 raise
392 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
392 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
393 return # skip to part processing
393 return # skip to part processing
394 finally:
394 finally:
395 if op.ui.debugflag:
395 if op.ui.debugflag:
396 msg = ['bundle2-input-part: "%s"' % part.type]
396 msg = ['bundle2-input-part: "%s"' % part.type]
397 if not part.mandatory:
397 if not part.mandatory:
398 msg.append(' (advisory)')
398 msg.append(' (advisory)')
399 nbmp = len(part.mandatorykeys)
399 nbmp = len(part.mandatorykeys)
400 nbap = len(part.params) - nbmp
400 nbap = len(part.params) - nbmp
401 if nbmp or nbap:
401 if nbmp or nbap:
402 msg.append(' (params:')
402 msg.append(' (params:')
403 if nbmp:
403 if nbmp:
404 msg.append(' %i mandatory' % nbmp)
404 msg.append(' %i mandatory' % nbmp)
405 if nbap:
405 if nbap:
406 msg.append(' %i advisory' % nbmp)
406 msg.append(' %i advisory' % nbmp)
407 msg.append(')')
407 msg.append(')')
408 msg.append(' %s\n' % status)
408 msg.append(' %s\n' % status)
409 op.ui.debug(''.join(msg))
409 op.ui.debug(''.join(msg))
410
410
411 # handler is called outside the above try block so that we don't
411 # handler is called outside the above try block so that we don't
412 # risk catching KeyErrors from anything other than the
412 # risk catching KeyErrors from anything other than the
413 # parthandlermapping lookup (any KeyError raised by handler()
413 # parthandlermapping lookup (any KeyError raised by handler()
414 # itself represents a defect of a different variety).
414 # itself represents a defect of a different variety).
415 output = None
415 output = None
416 if op.captureoutput and op.reply is not None:
416 if op.captureoutput and op.reply is not None:
417 op.ui.pushbuffer(error=True, subproc=True)
417 op.ui.pushbuffer(error=True, subproc=True)
418 output = ''
418 output = ''
419 try:
419 try:
420 handler(op, part)
420 handler(op, part)
421 finally:
421 finally:
422 if output is not None:
422 if output is not None:
423 output = op.ui.popbuffer()
423 output = op.ui.popbuffer()
424 if output:
424 if output:
425 outpart = op.reply.newpart('output', data=output,
425 outpart = op.reply.newpart('output', data=output,
426 mandatory=False)
426 mandatory=False)
427 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
427 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
428 finally:
428 finally:
429 # consume the part content to not corrupt the stream.
429 # consume the part content to not corrupt the stream.
430 part.seek(0, 2)
430 part.seek(0, 2)
431
431
432
432
433 def decodecaps(blob):
433 def decodecaps(blob):
434 """decode a bundle2 caps bytes blob into a dictionary
434 """decode a bundle2 caps bytes blob into a dictionary
435
435
436 The blob is a list of capabilities (one per line)
436 The blob is a list of capabilities (one per line)
437 Capabilities may have values using a line of the form::
437 Capabilities may have values using a line of the form::
438
438
439 capability=value1,value2,value3
439 capability=value1,value2,value3
440
440
441 The values are always a list."""
441 The values are always a list."""
442 caps = {}
442 caps = {}
443 for line in blob.splitlines():
443 for line in blob.splitlines():
444 if not line:
444 if not line:
445 continue
445 continue
446 if '=' not in line:
446 if '=' not in line:
447 key, vals = line, ()
447 key, vals = line, ()
448 else:
448 else:
449 key, vals = line.split('=', 1)
449 key, vals = line.split('=', 1)
450 vals = vals.split(',')
450 vals = vals.split(',')
451 key = urllib.unquote(key)
451 key = urllib.unquote(key)
452 vals = [urllib.unquote(v) for v in vals]
452 vals = [urllib.unquote(v) for v in vals]
453 caps[key] = vals
453 caps[key] = vals
454 return caps
454 return caps
455
455
456 def encodecaps(caps):
456 def encodecaps(caps):
457 """encode a bundle2 caps dictionary into a bytes blob"""
457 """encode a bundle2 caps dictionary into a bytes blob"""
458 chunks = []
458 chunks = []
459 for ca in sorted(caps):
459 for ca in sorted(caps):
460 vals = caps[ca]
460 vals = caps[ca]
461 ca = urllib.quote(ca)
461 ca = urllib.quote(ca)
462 vals = [urllib.quote(v) for v in vals]
462 vals = [urllib.quote(v) for v in vals]
463 if vals:
463 if vals:
464 ca = "%s=%s" % (ca, ','.join(vals))
464 ca = "%s=%s" % (ca, ','.join(vals))
465 chunks.append(ca)
465 chunks.append(ca)
466 return '\n'.join(chunks)
466 return '\n'.join(chunks)
467
467
468 class bundle20(object):
468 class bundle20(object):
469 """represent an outgoing bundle2 container
469 """represent an outgoing bundle2 container
470
470
471 Use the `addparam` method to add stream level parameter. and `newpart` to
471 Use the `addparam` method to add stream level parameter. and `newpart` to
472 populate it. Then call `getchunks` to retrieve all the binary chunks of
472 populate it. Then call `getchunks` to retrieve all the binary chunks of
473 data that compose the bundle2 container."""
473 data that compose the bundle2 container."""
474
474
475 _magicstring = 'HG20'
475 _magicstring = 'HG20'
476
476
477 def __init__(self, ui, capabilities=()):
477 def __init__(self, ui, capabilities=()):
478 self.ui = ui
478 self.ui = ui
479 self._params = []
479 self._params = []
480 self._parts = []
480 self._parts = []
481 self.capabilities = dict(capabilities)
481 self.capabilities = dict(capabilities)
482 self._compressor = util.compressors[None]()
482 self._compressor = util.compressors[None]()
483
483
484 def setcompression(self, alg):
484 def setcompression(self, alg):
485 """setup core part compression to <alg>"""
485 """setup core part compression to <alg>"""
486 if alg is None:
486 if alg is None:
487 return
487 return
488 assert not any(n.lower() == 'Compression' for n, v in self._params)
488 assert not any(n.lower() == 'Compression' for n, v in self._params)
489 self.addparam('Compression', alg)
489 self.addparam('Compression', alg)
490 self._compressor = util.compressors[alg]()
490 self._compressor = util.compressors[alg]()
491
491
492 @property
492 @property
493 def nbparts(self):
493 def nbparts(self):
494 """total number of parts added to the bundler"""
494 """total number of parts added to the bundler"""
495 return len(self._parts)
495 return len(self._parts)
496
496
497 # methods used to defines the bundle2 content
497 # methods used to defines the bundle2 content
498 def addparam(self, name, value=None):
498 def addparam(self, name, value=None):
499 """add a stream level parameter"""
499 """add a stream level parameter"""
500 if not name:
500 if not name:
501 raise ValueError('empty parameter name')
501 raise ValueError('empty parameter name')
502 if name[0] not in string.letters:
502 if name[0] not in string.letters:
503 raise ValueError('non letter first character: %r' % name)
503 raise ValueError('non letter first character: %r' % name)
504 self._params.append((name, value))
504 self._params.append((name, value))
505
505
506 def addpart(self, part):
506 def addpart(self, part):
507 """add a new part to the bundle2 container
507 """add a new part to the bundle2 container
508
508
509 Parts contains the actual applicative payload."""
509 Parts contains the actual applicative payload."""
510 assert part.id is None
510 assert part.id is None
511 part.id = len(self._parts) # very cheap counter
511 part.id = len(self._parts) # very cheap counter
512 self._parts.append(part)
512 self._parts.append(part)
513
513
514 def newpart(self, typeid, *args, **kwargs):
514 def newpart(self, typeid, *args, **kwargs):
515 """create a new part and add it to the containers
515 """create a new part and add it to the containers
516
516
517 As the part is directly added to the containers. For now, this means
517 As the part is directly added to the containers. For now, this means
518 that any failure to properly initialize the part after calling
518 that any failure to properly initialize the part after calling
519 ``newpart`` should result in a failure of the whole bundling process.
519 ``newpart`` should result in a failure of the whole bundling process.
520
520
521 You can still fall back to manually create and add if you need better
521 You can still fall back to manually create and add if you need better
522 control."""
522 control."""
523 part = bundlepart(typeid, *args, **kwargs)
523 part = bundlepart(typeid, *args, **kwargs)
524 self.addpart(part)
524 self.addpart(part)
525 return part
525 return part
526
526
527 # methods used to generate the bundle2 stream
527 # methods used to generate the bundle2 stream
528 def getchunks(self):
528 def getchunks(self):
529 if self.ui.debugflag:
529 if self.ui.debugflag:
530 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
530 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
531 if self._params:
531 if self._params:
532 msg.append(' (%i params)' % len(self._params))
532 msg.append(' (%i params)' % len(self._params))
533 msg.append(' %i parts total\n' % len(self._parts))
533 msg.append(' %i parts total\n' % len(self._parts))
534 self.ui.debug(''.join(msg))
534 self.ui.debug(''.join(msg))
535 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
535 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
536 yield self._magicstring
536 yield self._magicstring
537 param = self._paramchunk()
537 param = self._paramchunk()
538 outdebug(self.ui, 'bundle parameter: %s' % param)
538 outdebug(self.ui, 'bundle parameter: %s' % param)
539 yield _pack(_fstreamparamsize, len(param))
539 yield _pack(_fstreamparamsize, len(param))
540 if param:
540 if param:
541 yield param
541 yield param
542 # starting compression
542 # starting compression
543 for chunk in self._getcorechunk():
543 for chunk in self._getcorechunk():
544 yield self._compressor.compress(chunk)
544 yield self._compressor.compress(chunk)
545 yield self._compressor.flush()
545 yield self._compressor.flush()
546
546
547 def _paramchunk(self):
547 def _paramchunk(self):
548 """return a encoded version of all stream parameters"""
548 """return a encoded version of all stream parameters"""
549 blocks = []
549 blocks = []
550 for par, value in self._params:
550 for par, value in self._params:
551 par = urllib.quote(par)
551 par = urllib.quote(par)
552 if value is not None:
552 if value is not None:
553 value = urllib.quote(value)
553 value = urllib.quote(value)
554 par = '%s=%s' % (par, value)
554 par = '%s=%s' % (par, value)
555 blocks.append(par)
555 blocks.append(par)
556 return ' '.join(blocks)
556 return ' '.join(blocks)
557
557
558 def _getcorechunk(self):
558 def _getcorechunk(self):
559 """yield chunk for the core part of the bundle
559 """yield chunk for the core part of the bundle
560
560
561 (all but headers and parameters)"""
561 (all but headers and parameters)"""
562 outdebug(self.ui, 'start of parts')
562 outdebug(self.ui, 'start of parts')
563 for part in self._parts:
563 for part in self._parts:
564 outdebug(self.ui, 'bundle part: "%s"' % part.type)
564 outdebug(self.ui, 'bundle part: "%s"' % part.type)
565 for chunk in part.getchunks(ui=self.ui):
565 for chunk in part.getchunks(ui=self.ui):
566 yield chunk
566 yield chunk
567 outdebug(self.ui, 'end of bundle')
567 outdebug(self.ui, 'end of bundle')
568 yield _pack(_fpartheadersize, 0)
568 yield _pack(_fpartheadersize, 0)
569
569
570
570
571 def salvageoutput(self):
571 def salvageoutput(self):
572 """return a list with a copy of all output parts in the bundle
572 """return a list with a copy of all output parts in the bundle
573
573
574 This is meant to be used during error handling to make sure we preserve
574 This is meant to be used during error handling to make sure we preserve
575 server output"""
575 server output"""
576 salvaged = []
576 salvaged = []
577 for part in self._parts:
577 for part in self._parts:
578 if part.type.startswith('output'):
578 if part.type.startswith('output'):
579 salvaged.append(part.copy())
579 salvaged.append(part.copy())
580 return salvaged
580 return salvaged
581
581
582
582
583 class unpackermixin(object):
583 class unpackermixin(object):
584 """A mixin to extract bytes and struct data from a stream"""
584 """A mixin to extract bytes and struct data from a stream"""
585
585
586 def __init__(self, fp):
586 def __init__(self, fp):
587 self._fp = fp
587 self._fp = fp
588 self._seekable = (util.safehasattr(fp, 'seek') and
588 self._seekable = (util.safehasattr(fp, 'seek') and
589 util.safehasattr(fp, 'tell'))
589 util.safehasattr(fp, 'tell'))
590
590
591 def _unpack(self, format):
591 def _unpack(self, format):
592 """unpack this struct format from the stream"""
592 """unpack this struct format from the stream"""
593 data = self._readexact(struct.calcsize(format))
593 data = self._readexact(struct.calcsize(format))
594 return _unpack(format, data)
594 return _unpack(format, data)
595
595
596 def _readexact(self, size):
596 def _readexact(self, size):
597 """read exactly <size> bytes from the stream"""
597 """read exactly <size> bytes from the stream"""
598 return changegroup.readexactly(self._fp, size)
598 return changegroup.readexactly(self._fp, size)
599
599
600 def seek(self, offset, whence=0):
600 def seek(self, offset, whence=0):
601 """move the underlying file pointer"""
601 """move the underlying file pointer"""
602 if self._seekable:
602 if self._seekable:
603 return self._fp.seek(offset, whence)
603 return self._fp.seek(offset, whence)
604 else:
604 else:
605 raise NotImplementedError(_('File pointer is not seekable'))
605 raise NotImplementedError(_('File pointer is not seekable'))
606
606
607 def tell(self):
607 def tell(self):
608 """return the file offset, or None if file is not seekable"""
608 """return the file offset, or None if file is not seekable"""
609 if self._seekable:
609 if self._seekable:
610 try:
610 try:
611 return self._fp.tell()
611 return self._fp.tell()
612 except IOError as e:
612 except IOError as e:
613 if e.errno == errno.ESPIPE:
613 if e.errno == errno.ESPIPE:
614 self._seekable = False
614 self._seekable = False
615 else:
615 else:
616 raise
616 raise
617 return None
617 return None
618
618
619 def close(self):
619 def close(self):
620 """close underlying file"""
620 """close underlying file"""
621 if util.safehasattr(self._fp, 'close'):
621 if util.safehasattr(self._fp, 'close'):
622 return self._fp.close()
622 return self._fp.close()
623
623
624 def getunbundler(ui, fp, magicstring=None):
624 def getunbundler(ui, fp, magicstring=None):
625 """return a valid unbundler object for a given magicstring"""
625 """return a valid unbundler object for a given magicstring"""
626 if magicstring is None:
626 if magicstring is None:
627 magicstring = changegroup.readexactly(fp, 4)
627 magicstring = changegroup.readexactly(fp, 4)
628 magic, version = magicstring[0:2], magicstring[2:4]
628 magic, version = magicstring[0:2], magicstring[2:4]
629 if magic != 'HG':
629 if magic != 'HG':
630 raise util.Abort(_('not a Mercurial bundle'))
630 raise util.Abort(_('not a Mercurial bundle'))
631 unbundlerclass = formatmap.get(version)
631 unbundlerclass = formatmap.get(version)
632 if unbundlerclass is None:
632 if unbundlerclass is None:
633 raise util.Abort(_('unknown bundle version %s') % version)
633 raise util.Abort(_('unknown bundle version %s') % version)
634 unbundler = unbundlerclass(ui, fp)
634 unbundler = unbundlerclass(ui, fp)
635 indebug(ui, 'start processing of %s stream' % magicstring)
635 indebug(ui, 'start processing of %s stream' % magicstring)
636 return unbundler
636 return unbundler
637
637
638 class unbundle20(unpackermixin):
638 class unbundle20(unpackermixin):
639 """interpret a bundle2 stream
639 """interpret a bundle2 stream
640
640
641 This class is fed with a binary stream and yields parts through its
641 This class is fed with a binary stream and yields parts through its
642 `iterparts` methods."""
642 `iterparts` methods."""
643
643
644 def __init__(self, ui, fp):
644 def __init__(self, ui, fp):
645 """If header is specified, we do not read it out of the stream."""
645 """If header is specified, we do not read it out of the stream."""
646 self.ui = ui
646 self.ui = ui
647 self._decompressor = util.decompressors[None]
647 self._decompressor = util.decompressors[None]
648 super(unbundle20, self).__init__(fp)
648 super(unbundle20, self).__init__(fp)
649
649
650 @util.propertycache
650 @util.propertycache
651 def params(self):
651 def params(self):
652 """dictionary of stream level parameters"""
652 """dictionary of stream level parameters"""
653 indebug(self.ui, 'reading bundle2 stream parameters')
653 indebug(self.ui, 'reading bundle2 stream parameters')
654 params = {}
654 params = {}
655 paramssize = self._unpack(_fstreamparamsize)[0]
655 paramssize = self._unpack(_fstreamparamsize)[0]
656 if paramssize < 0:
656 if paramssize < 0:
657 raise error.BundleValueError('negative bundle param size: %i'
657 raise error.BundleValueError('negative bundle param size: %i'
658 % paramssize)
658 % paramssize)
659 if paramssize:
659 if paramssize:
660 for p in self._readexact(paramssize).split(' '):
660 params = self._readexact(paramssize)
661 p = p.split('=', 1)
661 params = self._processallparams(params)
662 p = [urllib.unquote(i) for i in p]
663 if len(p) < 2:
664 p.append(None)
665 self._processparam(*p)
666 params[p[0]] = p[1]
667 return params
662 return params
668
663
664 def _processallparams(self, paramsblock):
665 """"""
666 params = {}
667 for p in paramsblock.split(' '):
668 p = p.split('=', 1)
669 p = [urllib.unquote(i) for i in p]
670 if len(p) < 2:
671 p.append(None)
672 self._processparam(*p)
673 params[p[0]] = p[1]
674 return params
675
676
669 def _processparam(self, name, value):
677 def _processparam(self, name, value):
670 """process a parameter, applying its effect if needed
678 """process a parameter, applying its effect if needed
671
679
672 Parameter starting with a lower case letter are advisory and will be
680 Parameter starting with a lower case letter are advisory and will be
673 ignored when unknown. Those starting with an upper case letter are
681 ignored when unknown. Those starting with an upper case letter are
674 mandatory and will this function will raise a KeyError when unknown.
682 mandatory and will this function will raise a KeyError when unknown.
675
683
676 Note: no option are currently supported. Any input will be either
684 Note: no option are currently supported. Any input will be either
677 ignored or failing.
685 ignored or failing.
678 """
686 """
679 if not name:
687 if not name:
680 raise ValueError('empty parameter name')
688 raise ValueError('empty parameter name')
681 if name[0] not in string.letters:
689 if name[0] not in string.letters:
682 raise ValueError('non letter first character: %r' % name)
690 raise ValueError('non letter first character: %r' % name)
683 try:
691 try:
684 handler = b2streamparamsmap[name.lower()]
692 handler = b2streamparamsmap[name.lower()]
685 except KeyError:
693 except KeyError:
686 if name[0].islower():
694 if name[0].islower():
687 indebug(self.ui, "ignoring unknown parameter %r" % name)
695 indebug(self.ui, "ignoring unknown parameter %r" % name)
688 else:
696 else:
689 raise error.BundleUnknownFeatureError(params=(name,))
697 raise error.BundleUnknownFeatureError(params=(name,))
690 else:
698 else:
691 handler(self, name, value)
699 handler(self, name, value)
692
700
693 def iterparts(self):
701 def iterparts(self):
694 """yield all parts contained in the stream"""
702 """yield all parts contained in the stream"""
695 # make sure param have been loaded
703 # make sure param have been loaded
696 self.params
704 self.params
697 # From there, payload need to be decompressed
705 # From there, payload need to be decompressed
698 self._fp = self._decompressor(self._fp)
706 self._fp = self._decompressor(self._fp)
699 indebug(self.ui, 'start extraction of bundle2 parts')
707 indebug(self.ui, 'start extraction of bundle2 parts')
700 headerblock = self._readpartheader()
708 headerblock = self._readpartheader()
701 while headerblock is not None:
709 while headerblock is not None:
702 part = unbundlepart(self.ui, headerblock, self._fp)
710 part = unbundlepart(self.ui, headerblock, self._fp)
703 yield part
711 yield part
704 part.seek(0, 2)
712 part.seek(0, 2)
705 headerblock = self._readpartheader()
713 headerblock = self._readpartheader()
706 indebug(self.ui, 'end of bundle2 stream')
714 indebug(self.ui, 'end of bundle2 stream')
707
715
708 def _readpartheader(self):
716 def _readpartheader(self):
709 """reads a part header size and return the bytes blob
717 """reads a part header size and return the bytes blob
710
718
711 returns None if empty"""
719 returns None if empty"""
712 headersize = self._unpack(_fpartheadersize)[0]
720 headersize = self._unpack(_fpartheadersize)[0]
713 if headersize < 0:
721 if headersize < 0:
714 raise error.BundleValueError('negative part header size: %i'
722 raise error.BundleValueError('negative part header size: %i'
715 % headersize)
723 % headersize)
716 indebug(self.ui, 'part header size: %i' % headersize)
724 indebug(self.ui, 'part header size: %i' % headersize)
717 if headersize:
725 if headersize:
718 return self._readexact(headersize)
726 return self._readexact(headersize)
719 return None
727 return None
720
728
721 def compressed(self):
729 def compressed(self):
722 return False
730 return False
723
731
724 formatmap = {'20': unbundle20}
732 formatmap = {'20': unbundle20}
725
733
726 b2streamparamsmap = {}
734 b2streamparamsmap = {}
727
735
728 def b2streamparamhandler(name):
736 def b2streamparamhandler(name):
729 """register a handler for a stream level parameter"""
737 """register a handler for a stream level parameter"""
730 def decorator(func):
738 def decorator(func):
731 assert name not in formatmap
739 assert name not in formatmap
732 b2streamparamsmap[name] = func
740 b2streamparamsmap[name] = func
733 return func
741 return func
734 return decorator
742 return decorator
735
743
736 @b2streamparamhandler('compression')
744 @b2streamparamhandler('compression')
737 def processcompression(unbundler, param, value):
745 def processcompression(unbundler, param, value):
738 """read compression parameter and install payload decompression"""
746 """read compression parameter and install payload decompression"""
739 if value not in util.decompressors:
747 if value not in util.decompressors:
740 raise error.BundleUnknownFeatureError(params=(param,),
748 raise error.BundleUnknownFeatureError(params=(param,),
741 values=(value,))
749 values=(value,))
742 unbundler._decompressor = util.decompressors[value]
750 unbundler._decompressor = util.decompressors[value]
743
751
744 class bundlepart(object):
752 class bundlepart(object):
745 """A bundle2 part contains application level payload
753 """A bundle2 part contains application level payload
746
754
747 The part `type` is used to route the part to the application level
755 The part `type` is used to route the part to the application level
748 handler.
756 handler.
749
757
750 The part payload is contained in ``part.data``. It could be raw bytes or a
758 The part payload is contained in ``part.data``. It could be raw bytes or a
751 generator of byte chunks.
759 generator of byte chunks.
752
760
753 You can add parameters to the part using the ``addparam`` method.
761 You can add parameters to the part using the ``addparam`` method.
754 Parameters can be either mandatory (default) or advisory. Remote side
762 Parameters can be either mandatory (default) or advisory. Remote side
755 should be able to safely ignore the advisory ones.
763 should be able to safely ignore the advisory ones.
756
764
757 Both data and parameters cannot be modified after the generation has begun.
765 Both data and parameters cannot be modified after the generation has begun.
758 """
766 """
759
767
760 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
768 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
761 data='', mandatory=True):
769 data='', mandatory=True):
762 validateparttype(parttype)
770 validateparttype(parttype)
763 self.id = None
771 self.id = None
764 self.type = parttype
772 self.type = parttype
765 self._data = data
773 self._data = data
766 self._mandatoryparams = list(mandatoryparams)
774 self._mandatoryparams = list(mandatoryparams)
767 self._advisoryparams = list(advisoryparams)
775 self._advisoryparams = list(advisoryparams)
768 # checking for duplicated entries
776 # checking for duplicated entries
769 self._seenparams = set()
777 self._seenparams = set()
770 for pname, __ in self._mandatoryparams + self._advisoryparams:
778 for pname, __ in self._mandatoryparams + self._advisoryparams:
771 if pname in self._seenparams:
779 if pname in self._seenparams:
772 raise RuntimeError('duplicated params: %s' % pname)
780 raise RuntimeError('duplicated params: %s' % pname)
773 self._seenparams.add(pname)
781 self._seenparams.add(pname)
774 # status of the part's generation:
782 # status of the part's generation:
775 # - None: not started,
783 # - None: not started,
776 # - False: currently generated,
784 # - False: currently generated,
777 # - True: generation done.
785 # - True: generation done.
778 self._generated = None
786 self._generated = None
779 self.mandatory = mandatory
787 self.mandatory = mandatory
780
788
781 def copy(self):
789 def copy(self):
782 """return a copy of the part
790 """return a copy of the part
783
791
784 The new part have the very same content but no partid assigned yet.
792 The new part have the very same content but no partid assigned yet.
785 Parts with generated data cannot be copied."""
793 Parts with generated data cannot be copied."""
786 assert not util.safehasattr(self.data, 'next')
794 assert not util.safehasattr(self.data, 'next')
787 return self.__class__(self.type, self._mandatoryparams,
795 return self.__class__(self.type, self._mandatoryparams,
788 self._advisoryparams, self._data, self.mandatory)
796 self._advisoryparams, self._data, self.mandatory)
789
797
790 # methods used to defines the part content
798 # methods used to defines the part content
791 def __setdata(self, data):
799 def __setdata(self, data):
792 if self._generated is not None:
800 if self._generated is not None:
793 raise error.ReadOnlyPartError('part is being generated')
801 raise error.ReadOnlyPartError('part is being generated')
794 self._data = data
802 self._data = data
795 def __getdata(self):
803 def __getdata(self):
796 return self._data
804 return self._data
797 data = property(__getdata, __setdata)
805 data = property(__getdata, __setdata)
798
806
799 @property
807 @property
800 def mandatoryparams(self):
808 def mandatoryparams(self):
801 # make it an immutable tuple to force people through ``addparam``
809 # make it an immutable tuple to force people through ``addparam``
802 return tuple(self._mandatoryparams)
810 return tuple(self._mandatoryparams)
803
811
804 @property
812 @property
805 def advisoryparams(self):
813 def advisoryparams(self):
806 # make it an immutable tuple to force people through ``addparam``
814 # make it an immutable tuple to force people through ``addparam``
807 return tuple(self._advisoryparams)
815 return tuple(self._advisoryparams)
808
816
809 def addparam(self, name, value='', mandatory=True):
817 def addparam(self, name, value='', mandatory=True):
810 if self._generated is not None:
818 if self._generated is not None:
811 raise error.ReadOnlyPartError('part is being generated')
819 raise error.ReadOnlyPartError('part is being generated')
812 if name in self._seenparams:
820 if name in self._seenparams:
813 raise ValueError('duplicated params: %s' % name)
821 raise ValueError('duplicated params: %s' % name)
814 self._seenparams.add(name)
822 self._seenparams.add(name)
815 params = self._advisoryparams
823 params = self._advisoryparams
816 if mandatory:
824 if mandatory:
817 params = self._mandatoryparams
825 params = self._mandatoryparams
818 params.append((name, value))
826 params.append((name, value))
819
827
820 # methods used to generates the bundle2 stream
828 # methods used to generates the bundle2 stream
821 def getchunks(self, ui):
829 def getchunks(self, ui):
822 if self._generated is not None:
830 if self._generated is not None:
823 raise RuntimeError('part can only be consumed once')
831 raise RuntimeError('part can only be consumed once')
824 self._generated = False
832 self._generated = False
825
833
826 if ui.debugflag:
834 if ui.debugflag:
827 msg = ['bundle2-output-part: "%s"' % self.type]
835 msg = ['bundle2-output-part: "%s"' % self.type]
828 if not self.mandatory:
836 if not self.mandatory:
829 msg.append(' (advisory)')
837 msg.append(' (advisory)')
830 nbmp = len(self.mandatoryparams)
838 nbmp = len(self.mandatoryparams)
831 nbap = len(self.advisoryparams)
839 nbap = len(self.advisoryparams)
832 if nbmp or nbap:
840 if nbmp or nbap:
833 msg.append(' (params:')
841 msg.append(' (params:')
834 if nbmp:
842 if nbmp:
835 msg.append(' %i mandatory' % nbmp)
843 msg.append(' %i mandatory' % nbmp)
836 if nbap:
844 if nbap:
837 msg.append(' %i advisory' % nbmp)
845 msg.append(' %i advisory' % nbmp)
838 msg.append(')')
846 msg.append(')')
839 if not self.data:
847 if not self.data:
840 msg.append(' empty payload')
848 msg.append(' empty payload')
841 elif util.safehasattr(self.data, 'next'):
849 elif util.safehasattr(self.data, 'next'):
842 msg.append(' streamed payload')
850 msg.append(' streamed payload')
843 else:
851 else:
844 msg.append(' %i bytes payload' % len(self.data))
852 msg.append(' %i bytes payload' % len(self.data))
845 msg.append('\n')
853 msg.append('\n')
846 ui.debug(''.join(msg))
854 ui.debug(''.join(msg))
847
855
848 #### header
856 #### header
849 if self.mandatory:
857 if self.mandatory:
850 parttype = self.type.upper()
858 parttype = self.type.upper()
851 else:
859 else:
852 parttype = self.type.lower()
860 parttype = self.type.lower()
853 outdebug(ui, 'part %s: "%s"' % (self.id, parttype))
861 outdebug(ui, 'part %s: "%s"' % (self.id, parttype))
854 ## parttype
862 ## parttype
855 header = [_pack(_fparttypesize, len(parttype)),
863 header = [_pack(_fparttypesize, len(parttype)),
856 parttype, _pack(_fpartid, self.id),
864 parttype, _pack(_fpartid, self.id),
857 ]
865 ]
858 ## parameters
866 ## parameters
859 # count
867 # count
860 manpar = self.mandatoryparams
868 manpar = self.mandatoryparams
861 advpar = self.advisoryparams
869 advpar = self.advisoryparams
862 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
870 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
863 # size
871 # size
864 parsizes = []
872 parsizes = []
865 for key, value in manpar:
873 for key, value in manpar:
866 parsizes.append(len(key))
874 parsizes.append(len(key))
867 parsizes.append(len(value))
875 parsizes.append(len(value))
868 for key, value in advpar:
876 for key, value in advpar:
869 parsizes.append(len(key))
877 parsizes.append(len(key))
870 parsizes.append(len(value))
878 parsizes.append(len(value))
871 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
879 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
872 header.append(paramsizes)
880 header.append(paramsizes)
873 # key, value
881 # key, value
874 for key, value in manpar:
882 for key, value in manpar:
875 header.append(key)
883 header.append(key)
876 header.append(value)
884 header.append(value)
877 for key, value in advpar:
885 for key, value in advpar:
878 header.append(key)
886 header.append(key)
879 header.append(value)
887 header.append(value)
880 ## finalize header
888 ## finalize header
881 headerchunk = ''.join(header)
889 headerchunk = ''.join(header)
882 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
890 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
883 yield _pack(_fpartheadersize, len(headerchunk))
891 yield _pack(_fpartheadersize, len(headerchunk))
884 yield headerchunk
892 yield headerchunk
885 ## payload
893 ## payload
886 try:
894 try:
887 for chunk in self._payloadchunks():
895 for chunk in self._payloadchunks():
888 outdebug(ui, 'payload chunk size: %i' % len(chunk))
896 outdebug(ui, 'payload chunk size: %i' % len(chunk))
889 yield _pack(_fpayloadsize, len(chunk))
897 yield _pack(_fpayloadsize, len(chunk))
890 yield chunk
898 yield chunk
891 except GeneratorExit:
899 except GeneratorExit:
892 # GeneratorExit means that nobody is listening for our
900 # GeneratorExit means that nobody is listening for our
893 # results anyway, so just bail quickly rather than trying
901 # results anyway, so just bail quickly rather than trying
894 # to produce an error part.
902 # to produce an error part.
895 ui.debug('bundle2-generatorexit\n')
903 ui.debug('bundle2-generatorexit\n')
896 raise
904 raise
897 except BaseException as exc:
905 except BaseException as exc:
898 # backup exception data for later
906 # backup exception data for later
899 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
907 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
900 % exc)
908 % exc)
901 exc_info = sys.exc_info()
909 exc_info = sys.exc_info()
902 msg = 'unexpected error: %s' % exc
910 msg = 'unexpected error: %s' % exc
903 interpart = bundlepart('error:abort', [('message', msg)],
911 interpart = bundlepart('error:abort', [('message', msg)],
904 mandatory=False)
912 mandatory=False)
905 interpart.id = 0
913 interpart.id = 0
906 yield _pack(_fpayloadsize, -1)
914 yield _pack(_fpayloadsize, -1)
907 for chunk in interpart.getchunks(ui=ui):
915 for chunk in interpart.getchunks(ui=ui):
908 yield chunk
916 yield chunk
909 outdebug(ui, 'closing payload chunk')
917 outdebug(ui, 'closing payload chunk')
910 # abort current part payload
918 # abort current part payload
911 yield _pack(_fpayloadsize, 0)
919 yield _pack(_fpayloadsize, 0)
912 raise exc_info[0], exc_info[1], exc_info[2]
920 raise exc_info[0], exc_info[1], exc_info[2]
913 # end of payload
921 # end of payload
914 outdebug(ui, 'closing payload chunk')
922 outdebug(ui, 'closing payload chunk')
915 yield _pack(_fpayloadsize, 0)
923 yield _pack(_fpayloadsize, 0)
916 self._generated = True
924 self._generated = True
917
925
918 def _payloadchunks(self):
926 def _payloadchunks(self):
919 """yield chunks of a the part payload
927 """yield chunks of a the part payload
920
928
921 Exists to handle the different methods to provide data to a part."""
929 Exists to handle the different methods to provide data to a part."""
922 # we only support fixed size data now.
930 # we only support fixed size data now.
923 # This will be improved in the future.
931 # This will be improved in the future.
924 if util.safehasattr(self.data, 'next'):
932 if util.safehasattr(self.data, 'next'):
925 buff = util.chunkbuffer(self.data)
933 buff = util.chunkbuffer(self.data)
926 chunk = buff.read(preferedchunksize)
934 chunk = buff.read(preferedchunksize)
927 while chunk:
935 while chunk:
928 yield chunk
936 yield chunk
929 chunk = buff.read(preferedchunksize)
937 chunk = buff.read(preferedchunksize)
930 elif len(self.data):
938 elif len(self.data):
931 yield self.data
939 yield self.data
932
940
933
941
934 flaginterrupt = -1
942 flaginterrupt = -1
935
943
936 class interrupthandler(unpackermixin):
944 class interrupthandler(unpackermixin):
937 """read one part and process it with restricted capability
945 """read one part and process it with restricted capability
938
946
939 This allows to transmit exception raised on the producer size during part
947 This allows to transmit exception raised on the producer size during part
940 iteration while the consumer is reading a part.
948 iteration while the consumer is reading a part.
941
949
942 Part processed in this manner only have access to a ui object,"""
950 Part processed in this manner only have access to a ui object,"""
943
951
944 def __init__(self, ui, fp):
952 def __init__(self, ui, fp):
945 super(interrupthandler, self).__init__(fp)
953 super(interrupthandler, self).__init__(fp)
946 self.ui = ui
954 self.ui = ui
947
955
948 def _readpartheader(self):
956 def _readpartheader(self):
949 """reads a part header size and return the bytes blob
957 """reads a part header size and return the bytes blob
950
958
951 returns None if empty"""
959 returns None if empty"""
952 headersize = self._unpack(_fpartheadersize)[0]
960 headersize = self._unpack(_fpartheadersize)[0]
953 if headersize < 0:
961 if headersize < 0:
954 raise error.BundleValueError('negative part header size: %i'
962 raise error.BundleValueError('negative part header size: %i'
955 % headersize)
963 % headersize)
956 indebug(self.ui, 'part header size: %i\n' % headersize)
964 indebug(self.ui, 'part header size: %i\n' % headersize)
957 if headersize:
965 if headersize:
958 return self._readexact(headersize)
966 return self._readexact(headersize)
959 return None
967 return None
960
968
961 def __call__(self):
969 def __call__(self):
962
970
963 self.ui.debug('bundle2-input-stream-interrupt:'
971 self.ui.debug('bundle2-input-stream-interrupt:'
964 ' opening out of band context\n')
972 ' opening out of band context\n')
965 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
973 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
966 headerblock = self._readpartheader()
974 headerblock = self._readpartheader()
967 if headerblock is None:
975 if headerblock is None:
968 indebug(self.ui, 'no part found during interruption.')
976 indebug(self.ui, 'no part found during interruption.')
969 return
977 return
970 part = unbundlepart(self.ui, headerblock, self._fp)
978 part = unbundlepart(self.ui, headerblock, self._fp)
971 op = interruptoperation(self.ui)
979 op = interruptoperation(self.ui)
972 _processpart(op, part)
980 _processpart(op, part)
973 self.ui.debug('bundle2-input-stream-interrupt:'
981 self.ui.debug('bundle2-input-stream-interrupt:'
974 ' closing out of band context\n')
982 ' closing out of band context\n')
975
983
976 class interruptoperation(object):
984 class interruptoperation(object):
977 """A limited operation to be use by part handler during interruption
985 """A limited operation to be use by part handler during interruption
978
986
979 It only have access to an ui object.
987 It only have access to an ui object.
980 """
988 """
981
989
982 def __init__(self, ui):
990 def __init__(self, ui):
983 self.ui = ui
991 self.ui = ui
984 self.reply = None
992 self.reply = None
985 self.captureoutput = False
993 self.captureoutput = False
986
994
987 @property
995 @property
988 def repo(self):
996 def repo(self):
989 raise RuntimeError('no repo access from stream interruption')
997 raise RuntimeError('no repo access from stream interruption')
990
998
991 def gettransaction(self):
999 def gettransaction(self):
992 raise TransactionUnavailable('no repo access from stream interruption')
1000 raise TransactionUnavailable('no repo access from stream interruption')
993
1001
994 class unbundlepart(unpackermixin):
1002 class unbundlepart(unpackermixin):
995 """a bundle part read from a bundle"""
1003 """a bundle part read from a bundle"""
996
1004
997 def __init__(self, ui, header, fp):
1005 def __init__(self, ui, header, fp):
998 super(unbundlepart, self).__init__(fp)
1006 super(unbundlepart, self).__init__(fp)
999 self.ui = ui
1007 self.ui = ui
1000 # unbundle state attr
1008 # unbundle state attr
1001 self._headerdata = header
1009 self._headerdata = header
1002 self._headeroffset = 0
1010 self._headeroffset = 0
1003 self._initialized = False
1011 self._initialized = False
1004 self.consumed = False
1012 self.consumed = False
1005 # part data
1013 # part data
1006 self.id = None
1014 self.id = None
1007 self.type = None
1015 self.type = None
1008 self.mandatoryparams = None
1016 self.mandatoryparams = None
1009 self.advisoryparams = None
1017 self.advisoryparams = None
1010 self.params = None
1018 self.params = None
1011 self.mandatorykeys = ()
1019 self.mandatorykeys = ()
1012 self._payloadstream = None
1020 self._payloadstream = None
1013 self._readheader()
1021 self._readheader()
1014 self._mandatory = None
1022 self._mandatory = None
1015 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1023 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1016 self._pos = 0
1024 self._pos = 0
1017
1025
1018 def _fromheader(self, size):
1026 def _fromheader(self, size):
1019 """return the next <size> byte from the header"""
1027 """return the next <size> byte from the header"""
1020 offset = self._headeroffset
1028 offset = self._headeroffset
1021 data = self._headerdata[offset:(offset + size)]
1029 data = self._headerdata[offset:(offset + size)]
1022 self._headeroffset = offset + size
1030 self._headeroffset = offset + size
1023 return data
1031 return data
1024
1032
1025 def _unpackheader(self, format):
1033 def _unpackheader(self, format):
1026 """read given format from header
1034 """read given format from header
1027
1035
1028 This automatically compute the size of the format to read."""
1036 This automatically compute the size of the format to read."""
1029 data = self._fromheader(struct.calcsize(format))
1037 data = self._fromheader(struct.calcsize(format))
1030 return _unpack(format, data)
1038 return _unpack(format, data)
1031
1039
1032 def _initparams(self, mandatoryparams, advisoryparams):
1040 def _initparams(self, mandatoryparams, advisoryparams):
1033 """internal function to setup all logic related parameters"""
1041 """internal function to setup all logic related parameters"""
1034 # make it read only to prevent people touching it by mistake.
1042 # make it read only to prevent people touching it by mistake.
1035 self.mandatoryparams = tuple(mandatoryparams)
1043 self.mandatoryparams = tuple(mandatoryparams)
1036 self.advisoryparams = tuple(advisoryparams)
1044 self.advisoryparams = tuple(advisoryparams)
1037 # user friendly UI
1045 # user friendly UI
1038 self.params = dict(self.mandatoryparams)
1046 self.params = dict(self.mandatoryparams)
1039 self.params.update(dict(self.advisoryparams))
1047 self.params.update(dict(self.advisoryparams))
1040 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1048 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1041
1049
1042 def _payloadchunks(self, chunknum=0):
1050 def _payloadchunks(self, chunknum=0):
1043 '''seek to specified chunk and start yielding data'''
1051 '''seek to specified chunk and start yielding data'''
1044 if len(self._chunkindex) == 0:
1052 if len(self._chunkindex) == 0:
1045 assert chunknum == 0, 'Must start with chunk 0'
1053 assert chunknum == 0, 'Must start with chunk 0'
1046 self._chunkindex.append((0, super(unbundlepart, self).tell()))
1054 self._chunkindex.append((0, super(unbundlepart, self).tell()))
1047 else:
1055 else:
1048 assert chunknum < len(self._chunkindex), \
1056 assert chunknum < len(self._chunkindex), \
1049 'Unknown chunk %d' % chunknum
1057 'Unknown chunk %d' % chunknum
1050 super(unbundlepart, self).seek(self._chunkindex[chunknum][1])
1058 super(unbundlepart, self).seek(self._chunkindex[chunknum][1])
1051
1059
1052 pos = self._chunkindex[chunknum][0]
1060 pos = self._chunkindex[chunknum][0]
1053 payloadsize = self._unpack(_fpayloadsize)[0]
1061 payloadsize = self._unpack(_fpayloadsize)[0]
1054 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1062 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1055 while payloadsize:
1063 while payloadsize:
1056 if payloadsize == flaginterrupt:
1064 if payloadsize == flaginterrupt:
1057 # interruption detection, the handler will now read a
1065 # interruption detection, the handler will now read a
1058 # single part and process it.
1066 # single part and process it.
1059 interrupthandler(self.ui, self._fp)()
1067 interrupthandler(self.ui, self._fp)()
1060 elif payloadsize < 0:
1068 elif payloadsize < 0:
1061 msg = 'negative payload chunk size: %i' % payloadsize
1069 msg = 'negative payload chunk size: %i' % payloadsize
1062 raise error.BundleValueError(msg)
1070 raise error.BundleValueError(msg)
1063 else:
1071 else:
1064 result = self._readexact(payloadsize)
1072 result = self._readexact(payloadsize)
1065 chunknum += 1
1073 chunknum += 1
1066 pos += payloadsize
1074 pos += payloadsize
1067 if chunknum == len(self._chunkindex):
1075 if chunknum == len(self._chunkindex):
1068 self._chunkindex.append((pos,
1076 self._chunkindex.append((pos,
1069 super(unbundlepart, self).tell()))
1077 super(unbundlepart, self).tell()))
1070 yield result
1078 yield result
1071 payloadsize = self._unpack(_fpayloadsize)[0]
1079 payloadsize = self._unpack(_fpayloadsize)[0]
1072 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1080 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1073
1081
1074 def _findchunk(self, pos):
1082 def _findchunk(self, pos):
1075 '''for a given payload position, return a chunk number and offset'''
1083 '''for a given payload position, return a chunk number and offset'''
1076 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1084 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1077 if ppos == pos:
1085 if ppos == pos:
1078 return chunk, 0
1086 return chunk, 0
1079 elif ppos > pos:
1087 elif ppos > pos:
1080 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1088 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1081 raise ValueError('Unknown chunk')
1089 raise ValueError('Unknown chunk')
1082
1090
1083 def _readheader(self):
1091 def _readheader(self):
1084 """read the header and setup the object"""
1092 """read the header and setup the object"""
1085 typesize = self._unpackheader(_fparttypesize)[0]
1093 typesize = self._unpackheader(_fparttypesize)[0]
1086 self.type = self._fromheader(typesize)
1094 self.type = self._fromheader(typesize)
1087 indebug(self.ui, 'part type: "%s"' % self.type)
1095 indebug(self.ui, 'part type: "%s"' % self.type)
1088 self.id = self._unpackheader(_fpartid)[0]
1096 self.id = self._unpackheader(_fpartid)[0]
1089 indebug(self.ui, 'part id: "%s"' % self.id)
1097 indebug(self.ui, 'part id: "%s"' % self.id)
1090 # extract mandatory bit from type
1098 # extract mandatory bit from type
1091 self.mandatory = (self.type != self.type.lower())
1099 self.mandatory = (self.type != self.type.lower())
1092 self.type = self.type.lower()
1100 self.type = self.type.lower()
1093 ## reading parameters
1101 ## reading parameters
1094 # param count
1102 # param count
1095 mancount, advcount = self._unpackheader(_fpartparamcount)
1103 mancount, advcount = self._unpackheader(_fpartparamcount)
1096 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1104 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1097 # param size
1105 # param size
1098 fparamsizes = _makefpartparamsizes(mancount + advcount)
1106 fparamsizes = _makefpartparamsizes(mancount + advcount)
1099 paramsizes = self._unpackheader(fparamsizes)
1107 paramsizes = self._unpackheader(fparamsizes)
1100 # make it a list of couple again
1108 # make it a list of couple again
1101 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
1109 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
1102 # split mandatory from advisory
1110 # split mandatory from advisory
1103 mansizes = paramsizes[:mancount]
1111 mansizes = paramsizes[:mancount]
1104 advsizes = paramsizes[mancount:]
1112 advsizes = paramsizes[mancount:]
1105 # retrieve param value
1113 # retrieve param value
1106 manparams = []
1114 manparams = []
1107 for key, value in mansizes:
1115 for key, value in mansizes:
1108 manparams.append((self._fromheader(key), self._fromheader(value)))
1116 manparams.append((self._fromheader(key), self._fromheader(value)))
1109 advparams = []
1117 advparams = []
1110 for key, value in advsizes:
1118 for key, value in advsizes:
1111 advparams.append((self._fromheader(key), self._fromheader(value)))
1119 advparams.append((self._fromheader(key), self._fromheader(value)))
1112 self._initparams(manparams, advparams)
1120 self._initparams(manparams, advparams)
1113 ## part payload
1121 ## part payload
1114 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1122 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1115 # we read the data, tell it
1123 # we read the data, tell it
1116 self._initialized = True
1124 self._initialized = True
1117
1125
1118 def read(self, size=None):
1126 def read(self, size=None):
1119 """read payload data"""
1127 """read payload data"""
1120 if not self._initialized:
1128 if not self._initialized:
1121 self._readheader()
1129 self._readheader()
1122 if size is None:
1130 if size is None:
1123 data = self._payloadstream.read()
1131 data = self._payloadstream.read()
1124 else:
1132 else:
1125 data = self._payloadstream.read(size)
1133 data = self._payloadstream.read(size)
1126 self._pos += len(data)
1134 self._pos += len(data)
1127 if size is None or len(data) < size:
1135 if size is None or len(data) < size:
1128 if not self.consumed and self._pos:
1136 if not self.consumed and self._pos:
1129 self.ui.debug('bundle2-input-part: total payload size %i\n'
1137 self.ui.debug('bundle2-input-part: total payload size %i\n'
1130 % self._pos)
1138 % self._pos)
1131 self.consumed = True
1139 self.consumed = True
1132 return data
1140 return data
1133
1141
1134 def tell(self):
1142 def tell(self):
1135 return self._pos
1143 return self._pos
1136
1144
1137 def seek(self, offset, whence=0):
1145 def seek(self, offset, whence=0):
1138 if whence == 0:
1146 if whence == 0:
1139 newpos = offset
1147 newpos = offset
1140 elif whence == 1:
1148 elif whence == 1:
1141 newpos = self._pos + offset
1149 newpos = self._pos + offset
1142 elif whence == 2:
1150 elif whence == 2:
1143 if not self.consumed:
1151 if not self.consumed:
1144 self.read()
1152 self.read()
1145 newpos = self._chunkindex[-1][0] - offset
1153 newpos = self._chunkindex[-1][0] - offset
1146 else:
1154 else:
1147 raise ValueError('Unknown whence value: %r' % (whence,))
1155 raise ValueError('Unknown whence value: %r' % (whence,))
1148
1156
1149 if newpos > self._chunkindex[-1][0] and not self.consumed:
1157 if newpos > self._chunkindex[-1][0] and not self.consumed:
1150 self.read()
1158 self.read()
1151 if not 0 <= newpos <= self._chunkindex[-1][0]:
1159 if not 0 <= newpos <= self._chunkindex[-1][0]:
1152 raise ValueError('Offset out of range')
1160 raise ValueError('Offset out of range')
1153
1161
1154 if self._pos != newpos:
1162 if self._pos != newpos:
1155 chunk, internaloffset = self._findchunk(newpos)
1163 chunk, internaloffset = self._findchunk(newpos)
1156 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1164 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1157 adjust = self.read(internaloffset)
1165 adjust = self.read(internaloffset)
1158 if len(adjust) != internaloffset:
1166 if len(adjust) != internaloffset:
1159 raise util.Abort(_('Seek failed\n'))
1167 raise util.Abort(_('Seek failed\n'))
1160 self._pos = newpos
1168 self._pos = newpos
1161
1169
1162 # These are only the static capabilities.
1170 # These are only the static capabilities.
1163 # Check the 'getrepocaps' function for the rest.
1171 # Check the 'getrepocaps' function for the rest.
1164 capabilities = {'HG20': (),
1172 capabilities = {'HG20': (),
1165 'error': ('abort', 'unsupportedcontent', 'pushraced',
1173 'error': ('abort', 'unsupportedcontent', 'pushraced',
1166 'pushkey'),
1174 'pushkey'),
1167 'listkeys': (),
1175 'listkeys': (),
1168 'pushkey': (),
1176 'pushkey': (),
1169 'digests': tuple(sorted(util.DIGESTS.keys())),
1177 'digests': tuple(sorted(util.DIGESTS.keys())),
1170 'remote-changegroup': ('http', 'https'),
1178 'remote-changegroup': ('http', 'https'),
1171 'hgtagsfnodes': (),
1179 'hgtagsfnodes': (),
1172 }
1180 }
1173
1181
1174 def getrepocaps(repo, allowpushback=False):
1182 def getrepocaps(repo, allowpushback=False):
1175 """return the bundle2 capabilities for a given repo
1183 """return the bundle2 capabilities for a given repo
1176
1184
1177 Exists to allow extensions (like evolution) to mutate the capabilities.
1185 Exists to allow extensions (like evolution) to mutate the capabilities.
1178 """
1186 """
1179 caps = capabilities.copy()
1187 caps = capabilities.copy()
1180 caps['changegroup'] = tuple(sorted(changegroup.packermap.keys()))
1188 caps['changegroup'] = tuple(sorted(changegroup.packermap.keys()))
1181 if obsolete.isenabled(repo, obsolete.exchangeopt):
1189 if obsolete.isenabled(repo, obsolete.exchangeopt):
1182 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1190 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1183 caps['obsmarkers'] = supportedformat
1191 caps['obsmarkers'] = supportedformat
1184 if allowpushback:
1192 if allowpushback:
1185 caps['pushback'] = ()
1193 caps['pushback'] = ()
1186 return caps
1194 return caps
1187
1195
1188 def bundle2caps(remote):
1196 def bundle2caps(remote):
1189 """return the bundle capabilities of a peer as dict"""
1197 """return the bundle capabilities of a peer as dict"""
1190 raw = remote.capable('bundle2')
1198 raw = remote.capable('bundle2')
1191 if not raw and raw != '':
1199 if not raw and raw != '':
1192 return {}
1200 return {}
1193 capsblob = urllib.unquote(remote.capable('bundle2'))
1201 capsblob = urllib.unquote(remote.capable('bundle2'))
1194 return decodecaps(capsblob)
1202 return decodecaps(capsblob)
1195
1203
1196 def obsmarkersversion(caps):
1204 def obsmarkersversion(caps):
1197 """extract the list of supported obsmarkers versions from a bundle2caps dict
1205 """extract the list of supported obsmarkers versions from a bundle2caps dict
1198 """
1206 """
1199 obscaps = caps.get('obsmarkers', ())
1207 obscaps = caps.get('obsmarkers', ())
1200 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1208 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1201
1209
1202 @parthandler('changegroup', ('version', 'nbchanges'))
1210 @parthandler('changegroup', ('version', 'nbchanges'))
1203 def handlechangegroup(op, inpart):
1211 def handlechangegroup(op, inpart):
1204 """apply a changegroup part on the repo
1212 """apply a changegroup part on the repo
1205
1213
1206 This is a very early implementation that will massive rework before being
1214 This is a very early implementation that will massive rework before being
1207 inflicted to any end-user.
1215 inflicted to any end-user.
1208 """
1216 """
1209 # Make sure we trigger a transaction creation
1217 # Make sure we trigger a transaction creation
1210 #
1218 #
1211 # The addchangegroup function will get a transaction object by itself, but
1219 # The addchangegroup function will get a transaction object by itself, but
1212 # we need to make sure we trigger the creation of a transaction object used
1220 # we need to make sure we trigger the creation of a transaction object used
1213 # for the whole processing scope.
1221 # for the whole processing scope.
1214 op.gettransaction()
1222 op.gettransaction()
1215 unpackerversion = inpart.params.get('version', '01')
1223 unpackerversion = inpart.params.get('version', '01')
1216 # We should raise an appropriate exception here
1224 # We should raise an appropriate exception here
1217 unpacker = changegroup.packermap[unpackerversion][1]
1225 unpacker = changegroup.packermap[unpackerversion][1]
1218 cg = unpacker(inpart, None)
1226 cg = unpacker(inpart, None)
1219 # the source and url passed here are overwritten by the one contained in
1227 # the source and url passed here are overwritten by the one contained in
1220 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1228 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1221 nbchangesets = None
1229 nbchangesets = None
1222 if 'nbchanges' in inpart.params:
1230 if 'nbchanges' in inpart.params:
1223 nbchangesets = int(inpart.params.get('nbchanges'))
1231 nbchangesets = int(inpart.params.get('nbchanges'))
1224 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2',
1232 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2',
1225 expectedtotal=nbchangesets)
1233 expectedtotal=nbchangesets)
1226 op.records.add('changegroup', {'return': ret})
1234 op.records.add('changegroup', {'return': ret})
1227 if op.reply is not None:
1235 if op.reply is not None:
1228 # This is definitely not the final form of this
1236 # This is definitely not the final form of this
1229 # return. But one need to start somewhere.
1237 # return. But one need to start somewhere.
1230 part = op.reply.newpart('reply:changegroup', mandatory=False)
1238 part = op.reply.newpart('reply:changegroup', mandatory=False)
1231 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1239 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1232 part.addparam('return', '%i' % ret, mandatory=False)
1240 part.addparam('return', '%i' % ret, mandatory=False)
1233 assert not inpart.read()
1241 assert not inpart.read()
1234
1242
1235 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1243 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1236 ['digest:%s' % k for k in util.DIGESTS.keys()])
1244 ['digest:%s' % k for k in util.DIGESTS.keys()])
1237 @parthandler('remote-changegroup', _remotechangegroupparams)
1245 @parthandler('remote-changegroup', _remotechangegroupparams)
1238 def handleremotechangegroup(op, inpart):
1246 def handleremotechangegroup(op, inpart):
1239 """apply a bundle10 on the repo, given an url and validation information
1247 """apply a bundle10 on the repo, given an url and validation information
1240
1248
1241 All the information about the remote bundle to import are given as
1249 All the information about the remote bundle to import are given as
1242 parameters. The parameters include:
1250 parameters. The parameters include:
1243 - url: the url to the bundle10.
1251 - url: the url to the bundle10.
1244 - size: the bundle10 file size. It is used to validate what was
1252 - size: the bundle10 file size. It is used to validate what was
1245 retrieved by the client matches the server knowledge about the bundle.
1253 retrieved by the client matches the server knowledge about the bundle.
1246 - digests: a space separated list of the digest types provided as
1254 - digests: a space separated list of the digest types provided as
1247 parameters.
1255 parameters.
1248 - digest:<digest-type>: the hexadecimal representation of the digest with
1256 - digest:<digest-type>: the hexadecimal representation of the digest with
1249 that name. Like the size, it is used to validate what was retrieved by
1257 that name. Like the size, it is used to validate what was retrieved by
1250 the client matches what the server knows about the bundle.
1258 the client matches what the server knows about the bundle.
1251
1259
1252 When multiple digest types are given, all of them are checked.
1260 When multiple digest types are given, all of them are checked.
1253 """
1261 """
1254 try:
1262 try:
1255 raw_url = inpart.params['url']
1263 raw_url = inpart.params['url']
1256 except KeyError:
1264 except KeyError:
1257 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1265 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1258 parsed_url = util.url(raw_url)
1266 parsed_url = util.url(raw_url)
1259 if parsed_url.scheme not in capabilities['remote-changegroup']:
1267 if parsed_url.scheme not in capabilities['remote-changegroup']:
1260 raise util.Abort(_('remote-changegroup does not support %s urls') %
1268 raise util.Abort(_('remote-changegroup does not support %s urls') %
1261 parsed_url.scheme)
1269 parsed_url.scheme)
1262
1270
1263 try:
1271 try:
1264 size = int(inpart.params['size'])
1272 size = int(inpart.params['size'])
1265 except ValueError:
1273 except ValueError:
1266 raise util.Abort(_('remote-changegroup: invalid value for param "%s"')
1274 raise util.Abort(_('remote-changegroup: invalid value for param "%s"')
1267 % 'size')
1275 % 'size')
1268 except KeyError:
1276 except KeyError:
1269 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1277 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1270
1278
1271 digests = {}
1279 digests = {}
1272 for typ in inpart.params.get('digests', '').split():
1280 for typ in inpart.params.get('digests', '').split():
1273 param = 'digest:%s' % typ
1281 param = 'digest:%s' % typ
1274 try:
1282 try:
1275 value = inpart.params[param]
1283 value = inpart.params[param]
1276 except KeyError:
1284 except KeyError:
1277 raise util.Abort(_('remote-changegroup: missing "%s" param') %
1285 raise util.Abort(_('remote-changegroup: missing "%s" param') %
1278 param)
1286 param)
1279 digests[typ] = value
1287 digests[typ] = value
1280
1288
1281 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1289 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1282
1290
1283 # Make sure we trigger a transaction creation
1291 # Make sure we trigger a transaction creation
1284 #
1292 #
1285 # The addchangegroup function will get a transaction object by itself, but
1293 # The addchangegroup function will get a transaction object by itself, but
1286 # we need to make sure we trigger the creation of a transaction object used
1294 # we need to make sure we trigger the creation of a transaction object used
1287 # for the whole processing scope.
1295 # for the whole processing scope.
1288 op.gettransaction()
1296 op.gettransaction()
1289 from . import exchange
1297 from . import exchange
1290 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1298 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1291 if not isinstance(cg, changegroup.cg1unpacker):
1299 if not isinstance(cg, changegroup.cg1unpacker):
1292 raise util.Abort(_('%s: not a bundle version 1.0') %
1300 raise util.Abort(_('%s: not a bundle version 1.0') %
1293 util.hidepassword(raw_url))
1301 util.hidepassword(raw_url))
1294 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
1302 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
1295 op.records.add('changegroup', {'return': ret})
1303 op.records.add('changegroup', {'return': ret})
1296 if op.reply is not None:
1304 if op.reply is not None:
1297 # This is definitely not the final form of this
1305 # This is definitely not the final form of this
1298 # return. But one need to start somewhere.
1306 # return. But one need to start somewhere.
1299 part = op.reply.newpart('reply:changegroup')
1307 part = op.reply.newpart('reply:changegroup')
1300 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1308 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1301 part.addparam('return', '%i' % ret, mandatory=False)
1309 part.addparam('return', '%i' % ret, mandatory=False)
1302 try:
1310 try:
1303 real_part.validate()
1311 real_part.validate()
1304 except util.Abort as e:
1312 except util.Abort as e:
1305 raise util.Abort(_('bundle at %s is corrupted:\n%s') %
1313 raise util.Abort(_('bundle at %s is corrupted:\n%s') %
1306 (util.hidepassword(raw_url), str(e)))
1314 (util.hidepassword(raw_url), str(e)))
1307 assert not inpart.read()
1315 assert not inpart.read()
1308
1316
1309 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1317 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1310 def handlereplychangegroup(op, inpart):
1318 def handlereplychangegroup(op, inpart):
1311 ret = int(inpart.params['return'])
1319 ret = int(inpart.params['return'])
1312 replyto = int(inpart.params['in-reply-to'])
1320 replyto = int(inpart.params['in-reply-to'])
1313 op.records.add('changegroup', {'return': ret}, replyto)
1321 op.records.add('changegroup', {'return': ret}, replyto)
1314
1322
1315 @parthandler('check:heads')
1323 @parthandler('check:heads')
1316 def handlecheckheads(op, inpart):
1324 def handlecheckheads(op, inpart):
1317 """check that head of the repo did not change
1325 """check that head of the repo did not change
1318
1326
1319 This is used to detect a push race when using unbundle.
1327 This is used to detect a push race when using unbundle.
1320 This replaces the "heads" argument of unbundle."""
1328 This replaces the "heads" argument of unbundle."""
1321 h = inpart.read(20)
1329 h = inpart.read(20)
1322 heads = []
1330 heads = []
1323 while len(h) == 20:
1331 while len(h) == 20:
1324 heads.append(h)
1332 heads.append(h)
1325 h = inpart.read(20)
1333 h = inpart.read(20)
1326 assert not h
1334 assert not h
1327 if heads != op.repo.heads():
1335 if heads != op.repo.heads():
1328 raise error.PushRaced('repository changed while pushing - '
1336 raise error.PushRaced('repository changed while pushing - '
1329 'please try again')
1337 'please try again')
1330
1338
1331 @parthandler('output')
1339 @parthandler('output')
1332 def handleoutput(op, inpart):
1340 def handleoutput(op, inpart):
1333 """forward output captured on the server to the client"""
1341 """forward output captured on the server to the client"""
1334 for line in inpart.read().splitlines():
1342 for line in inpart.read().splitlines():
1335 op.ui.status(('remote: %s\n' % line))
1343 op.ui.status(('remote: %s\n' % line))
1336
1344
1337 @parthandler('replycaps')
1345 @parthandler('replycaps')
1338 def handlereplycaps(op, inpart):
1346 def handlereplycaps(op, inpart):
1339 """Notify that a reply bundle should be created
1347 """Notify that a reply bundle should be created
1340
1348
1341 The payload contains the capabilities information for the reply"""
1349 The payload contains the capabilities information for the reply"""
1342 caps = decodecaps(inpart.read())
1350 caps = decodecaps(inpart.read())
1343 if op.reply is None:
1351 if op.reply is None:
1344 op.reply = bundle20(op.ui, caps)
1352 op.reply = bundle20(op.ui, caps)
1345
1353
1346 @parthandler('error:abort', ('message', 'hint'))
1354 @parthandler('error:abort', ('message', 'hint'))
1347 def handleerrorabort(op, inpart):
1355 def handleerrorabort(op, inpart):
1348 """Used to transmit abort error over the wire"""
1356 """Used to transmit abort error over the wire"""
1349 raise util.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
1357 raise util.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
1350
1358
1351 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1359 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1352 'in-reply-to'))
1360 'in-reply-to'))
1353 def handleerrorpushkey(op, inpart):
1361 def handleerrorpushkey(op, inpart):
1354 """Used to transmit failure of a mandatory pushkey over the wire"""
1362 """Used to transmit failure of a mandatory pushkey over the wire"""
1355 kwargs = {}
1363 kwargs = {}
1356 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1364 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1357 value = inpart.params.get(name)
1365 value = inpart.params.get(name)
1358 if value is not None:
1366 if value is not None:
1359 kwargs[name] = value
1367 kwargs[name] = value
1360 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1368 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1361
1369
1362 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1370 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1363 def handleerrorunsupportedcontent(op, inpart):
1371 def handleerrorunsupportedcontent(op, inpart):
1364 """Used to transmit unknown content error over the wire"""
1372 """Used to transmit unknown content error over the wire"""
1365 kwargs = {}
1373 kwargs = {}
1366 parttype = inpart.params.get('parttype')
1374 parttype = inpart.params.get('parttype')
1367 if parttype is not None:
1375 if parttype is not None:
1368 kwargs['parttype'] = parttype
1376 kwargs['parttype'] = parttype
1369 params = inpart.params.get('params')
1377 params = inpart.params.get('params')
1370 if params is not None:
1378 if params is not None:
1371 kwargs['params'] = params.split('\0')
1379 kwargs['params'] = params.split('\0')
1372
1380
1373 raise error.BundleUnknownFeatureError(**kwargs)
1381 raise error.BundleUnknownFeatureError(**kwargs)
1374
1382
1375 @parthandler('error:pushraced', ('message',))
1383 @parthandler('error:pushraced', ('message',))
1376 def handleerrorpushraced(op, inpart):
1384 def handleerrorpushraced(op, inpart):
1377 """Used to transmit push race error over the wire"""
1385 """Used to transmit push race error over the wire"""
1378 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1386 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1379
1387
1380 @parthandler('listkeys', ('namespace',))
1388 @parthandler('listkeys', ('namespace',))
1381 def handlelistkeys(op, inpart):
1389 def handlelistkeys(op, inpart):
1382 """retrieve pushkey namespace content stored in a bundle2"""
1390 """retrieve pushkey namespace content stored in a bundle2"""
1383 namespace = inpart.params['namespace']
1391 namespace = inpart.params['namespace']
1384 r = pushkey.decodekeys(inpart.read())
1392 r = pushkey.decodekeys(inpart.read())
1385 op.records.add('listkeys', (namespace, r))
1393 op.records.add('listkeys', (namespace, r))
1386
1394
1387 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1395 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1388 def handlepushkey(op, inpart):
1396 def handlepushkey(op, inpart):
1389 """process a pushkey request"""
1397 """process a pushkey request"""
1390 dec = pushkey.decode
1398 dec = pushkey.decode
1391 namespace = dec(inpart.params['namespace'])
1399 namespace = dec(inpart.params['namespace'])
1392 key = dec(inpart.params['key'])
1400 key = dec(inpart.params['key'])
1393 old = dec(inpart.params['old'])
1401 old = dec(inpart.params['old'])
1394 new = dec(inpart.params['new'])
1402 new = dec(inpart.params['new'])
1395 ret = op.repo.pushkey(namespace, key, old, new)
1403 ret = op.repo.pushkey(namespace, key, old, new)
1396 record = {'namespace': namespace,
1404 record = {'namespace': namespace,
1397 'key': key,
1405 'key': key,
1398 'old': old,
1406 'old': old,
1399 'new': new}
1407 'new': new}
1400 op.records.add('pushkey', record)
1408 op.records.add('pushkey', record)
1401 if op.reply is not None:
1409 if op.reply is not None:
1402 rpart = op.reply.newpart('reply:pushkey')
1410 rpart = op.reply.newpart('reply:pushkey')
1403 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1411 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1404 rpart.addparam('return', '%i' % ret, mandatory=False)
1412 rpart.addparam('return', '%i' % ret, mandatory=False)
1405 if inpart.mandatory and not ret:
1413 if inpart.mandatory and not ret:
1406 kwargs = {}
1414 kwargs = {}
1407 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1415 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1408 if key in inpart.params:
1416 if key in inpart.params:
1409 kwargs[key] = inpart.params[key]
1417 kwargs[key] = inpart.params[key]
1410 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1418 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1411
1419
1412 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1420 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1413 def handlepushkeyreply(op, inpart):
1421 def handlepushkeyreply(op, inpart):
1414 """retrieve the result of a pushkey request"""
1422 """retrieve the result of a pushkey request"""
1415 ret = int(inpart.params['return'])
1423 ret = int(inpart.params['return'])
1416 partid = int(inpart.params['in-reply-to'])
1424 partid = int(inpart.params['in-reply-to'])
1417 op.records.add('pushkey', {'return': ret}, partid)
1425 op.records.add('pushkey', {'return': ret}, partid)
1418
1426
1419 @parthandler('obsmarkers')
1427 @parthandler('obsmarkers')
1420 def handleobsmarker(op, inpart):
1428 def handleobsmarker(op, inpart):
1421 """add a stream of obsmarkers to the repo"""
1429 """add a stream of obsmarkers to the repo"""
1422 tr = op.gettransaction()
1430 tr = op.gettransaction()
1423 markerdata = inpart.read()
1431 markerdata = inpart.read()
1424 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1432 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1425 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1433 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1426 % len(markerdata))
1434 % len(markerdata))
1427 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1435 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1428 if new:
1436 if new:
1429 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1437 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1430 op.records.add('obsmarkers', {'new': new})
1438 op.records.add('obsmarkers', {'new': new})
1431 if op.reply is not None:
1439 if op.reply is not None:
1432 rpart = op.reply.newpart('reply:obsmarkers')
1440 rpart = op.reply.newpart('reply:obsmarkers')
1433 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1441 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1434 rpart.addparam('new', '%i' % new, mandatory=False)
1442 rpart.addparam('new', '%i' % new, mandatory=False)
1435
1443
1436
1444
1437 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1445 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1438 def handleobsmarkerreply(op, inpart):
1446 def handleobsmarkerreply(op, inpart):
1439 """retrieve the result of a pushkey request"""
1447 """retrieve the result of a pushkey request"""
1440 ret = int(inpart.params['new'])
1448 ret = int(inpart.params['new'])
1441 partid = int(inpart.params['in-reply-to'])
1449 partid = int(inpart.params['in-reply-to'])
1442 op.records.add('obsmarkers', {'new': ret}, partid)
1450 op.records.add('obsmarkers', {'new': ret}, partid)
1443
1451
1444 @parthandler('hgtagsfnodes')
1452 @parthandler('hgtagsfnodes')
1445 def handlehgtagsfnodes(op, inpart):
1453 def handlehgtagsfnodes(op, inpart):
1446 """Applies .hgtags fnodes cache entries to the local repo.
1454 """Applies .hgtags fnodes cache entries to the local repo.
1447
1455
1448 Payload is pairs of 20 byte changeset nodes and filenodes.
1456 Payload is pairs of 20 byte changeset nodes and filenodes.
1449 """
1457 """
1450 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1458 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1451
1459
1452 count = 0
1460 count = 0
1453 while True:
1461 while True:
1454 node = inpart.read(20)
1462 node = inpart.read(20)
1455 fnode = inpart.read(20)
1463 fnode = inpart.read(20)
1456 if len(node) < 20 or len(fnode) < 20:
1464 if len(node) < 20 or len(fnode) < 20:
1457 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1465 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1458 break
1466 break
1459 cache.setfnode(node, fnode)
1467 cache.setfnode(node, fnode)
1460 count += 1
1468 count += 1
1461
1469
1462 cache.write()
1470 cache.write()
1463 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
1471 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
General Comments 0
You need to be logged in to leave comments. Login now