##// END OF EJS Templates
bundle20: extract core payload generation in its own function...
Pierre-Yves David -
r26396:d90c3080 default
parent child Browse files
Show More
@@ -1,1434 +1,1441 b''
1 # bundle2.py - generic container format to transmit arbitrary data.
1 # bundle2.py - generic container format to transmit arbitrary data.
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7 """Handling of the new bundle2 format
7 """Handling of the new bundle2 format
8
8
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 payloads in an application agnostic way. It consist in a sequence of "parts"
10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 that will be handed to and processed by the application layer.
11 that will be handed to and processed by the application layer.
12
12
13
13
14 General format architecture
14 General format architecture
15 ===========================
15 ===========================
16
16
17 The format is architectured as follow
17 The format is architectured as follow
18
18
19 - magic string
19 - magic string
20 - stream level parameters
20 - stream level parameters
21 - payload parts (any number)
21 - payload parts (any number)
22 - end of stream marker.
22 - end of stream marker.
23
23
24 the Binary format
24 the Binary format
25 ============================
25 ============================
26
26
27 All numbers are unsigned and big-endian.
27 All numbers are unsigned and big-endian.
28
28
29 stream level parameters
29 stream level parameters
30 ------------------------
30 ------------------------
31
31
32 Binary format is as follow
32 Binary format is as follow
33
33
34 :params size: int32
34 :params size: int32
35
35
36 The total number of Bytes used by the parameters
36 The total number of Bytes used by the parameters
37
37
38 :params value: arbitrary number of Bytes
38 :params value: arbitrary number of Bytes
39
39
40 A blob of `params size` containing the serialized version of all stream level
40 A blob of `params size` containing the serialized version of all stream level
41 parameters.
41 parameters.
42
42
43 The blob contains a space separated list of parameters. Parameters with value
43 The blob contains a space separated list of parameters. Parameters with value
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45
45
46 Empty name are obviously forbidden.
46 Empty name are obviously forbidden.
47
47
48 Name MUST start with a letter. If this first letter is lower case, the
48 Name MUST start with a letter. If this first letter is lower case, the
49 parameter is advisory and can be safely ignored. However when the first
49 parameter is advisory and can be safely ignored. However when the first
50 letter is capital, the parameter is mandatory and the bundling process MUST
50 letter is capital, the parameter is mandatory and the bundling process MUST
51 stop if he is not able to proceed it.
51 stop if he is not able to proceed it.
52
52
53 Stream parameters use a simple textual format for two main reasons:
53 Stream parameters use a simple textual format for two main reasons:
54
54
55 - Stream level parameters should remain simple and we want to discourage any
55 - Stream level parameters should remain simple and we want to discourage any
56 crazy usage.
56 crazy usage.
57 - Textual data allow easy human inspection of a bundle2 header in case of
57 - Textual data allow easy human inspection of a bundle2 header in case of
58 troubles.
58 troubles.
59
59
60 Any Applicative level options MUST go into a bundle2 part instead.
60 Any Applicative level options MUST go into a bundle2 part instead.
61
61
62 Payload part
62 Payload part
63 ------------------------
63 ------------------------
64
64
65 Binary format is as follow
65 Binary format is as follow
66
66
67 :header size: int32
67 :header size: int32
68
68
69 The total number of Bytes used by the part header. When the header is empty
69 The total number of Bytes used by the part header. When the header is empty
70 (size = 0) this is interpreted as the end of stream marker.
70 (size = 0) this is interpreted as the end of stream marker.
71
71
72 :header:
72 :header:
73
73
74 The header defines how to interpret the part. It contains two piece of
74 The header defines how to interpret the part. It contains two piece of
75 data: the part type, and the part parameters.
75 data: the part type, and the part parameters.
76
76
77 The part type is used to route an application level handler, that can
77 The part type is used to route an application level handler, that can
78 interpret payload.
78 interpret payload.
79
79
80 Part parameters are passed to the application level handler. They are
80 Part parameters are passed to the application level handler. They are
81 meant to convey information that will help the application level object to
81 meant to convey information that will help the application level object to
82 interpret the part payload.
82 interpret the part payload.
83
83
84 The binary format of the header is has follow
84 The binary format of the header is has follow
85
85
86 :typesize: (one byte)
86 :typesize: (one byte)
87
87
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89
89
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 to this part.
91 to this part.
92
92
93 :parameters:
93 :parameters:
94
94
95 Part's parameter may have arbitrary content, the binary structure is::
95 Part's parameter may have arbitrary content, the binary structure is::
96
96
97 <mandatory-count><advisory-count><param-sizes><param-data>
97 <mandatory-count><advisory-count><param-sizes><param-data>
98
98
99 :mandatory-count: 1 byte, number of mandatory parameters
99 :mandatory-count: 1 byte, number of mandatory parameters
100
100
101 :advisory-count: 1 byte, number of advisory parameters
101 :advisory-count: 1 byte, number of advisory parameters
102
102
103 :param-sizes:
103 :param-sizes:
104
104
105 N couple of bytes, where N is the total number of parameters. Each
105 N couple of bytes, where N is the total number of parameters. Each
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107
107
108 :param-data:
108 :param-data:
109
109
110 A blob of bytes from which each parameter key and value can be
110 A blob of bytes from which each parameter key and value can be
111 retrieved using the list of size couples stored in the previous
111 retrieved using the list of size couples stored in the previous
112 field.
112 field.
113
113
114 Mandatory parameters comes first, then the advisory ones.
114 Mandatory parameters comes first, then the advisory ones.
115
115
116 Each parameter's key MUST be unique within the part.
116 Each parameter's key MUST be unique within the part.
117
117
118 :payload:
118 :payload:
119
119
120 payload is a series of `<chunksize><chunkdata>`.
120 payload is a series of `<chunksize><chunkdata>`.
121
121
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124
124
125 The current implementation always produces either zero or one chunk.
125 The current implementation always produces either zero or one chunk.
126 This is an implementation limitation that will ultimately be lifted.
126 This is an implementation limitation that will ultimately be lifted.
127
127
128 `chunksize` can be negative to trigger special case processing. No such
128 `chunksize` can be negative to trigger special case processing. No such
129 processing is in place yet.
129 processing is in place yet.
130
130
131 Bundle processing
131 Bundle processing
132 ============================
132 ============================
133
133
134 Each part is processed in order using a "part handler". Handler are registered
134 Each part is processed in order using a "part handler". Handler are registered
135 for a certain part type.
135 for a certain part type.
136
136
137 The matching of a part to its handler is case insensitive. The case of the
137 The matching of a part to its handler is case insensitive. The case of the
138 part type is used to know if a part is mandatory or advisory. If the Part type
138 part type is used to know if a part is mandatory or advisory. If the Part type
139 contains any uppercase char it is considered mandatory. When no handler is
139 contains any uppercase char it is considered mandatory. When no handler is
140 known for a Mandatory part, the process is aborted and an exception is raised.
140 known for a Mandatory part, the process is aborted and an exception is raised.
141 If the part is advisory and no handler is known, the part is ignored. When the
141 If the part is advisory and no handler is known, the part is ignored. When the
142 process is aborted, the full bundle is still read from the stream to keep the
142 process is aborted, the full bundle is still read from the stream to keep the
143 channel usable. But none of the part read from an abort are processed. In the
143 channel usable. But none of the part read from an abort are processed. In the
144 future, dropping the stream may become an option for channel we do not care to
144 future, dropping the stream may become an option for channel we do not care to
145 preserve.
145 preserve.
146 """
146 """
147
147
148 from __future__ import absolute_import
148 from __future__ import absolute_import
149
149
150 import errno
150 import errno
151 import re
151 import re
152 import string
152 import string
153 import struct
153 import struct
154 import sys
154 import sys
155 import urllib
155 import urllib
156
156
157 from .i18n import _
157 from .i18n import _
158 from . import (
158 from . import (
159 changegroup,
159 changegroup,
160 error,
160 error,
161 obsolete,
161 obsolete,
162 pushkey,
162 pushkey,
163 tags,
163 tags,
164 url,
164 url,
165 util,
165 util,
166 )
166 )
167
167
168 _pack = struct.pack
168 _pack = struct.pack
169 _unpack = struct.unpack
169 _unpack = struct.unpack
170
170
171 _fstreamparamsize = '>i'
171 _fstreamparamsize = '>i'
172 _fpartheadersize = '>i'
172 _fpartheadersize = '>i'
173 _fparttypesize = '>B'
173 _fparttypesize = '>B'
174 _fpartid = '>I'
174 _fpartid = '>I'
175 _fpayloadsize = '>i'
175 _fpayloadsize = '>i'
176 _fpartparamcount = '>BB'
176 _fpartparamcount = '>BB'
177
177
178 preferedchunksize = 4096
178 preferedchunksize = 4096
179
179
180 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
180 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
181
181
182 def outdebug(ui, message):
182 def outdebug(ui, message):
183 """debug regarding output stream (bundling)"""
183 """debug regarding output stream (bundling)"""
184 if ui.configbool('devel', 'bundle2.debug', False):
184 if ui.configbool('devel', 'bundle2.debug', False):
185 ui.debug('bundle2-output: %s\n' % message)
185 ui.debug('bundle2-output: %s\n' % message)
186
186
187 def indebug(ui, message):
187 def indebug(ui, message):
188 """debug on input stream (unbundling)"""
188 """debug on input stream (unbundling)"""
189 if ui.configbool('devel', 'bundle2.debug', False):
189 if ui.configbool('devel', 'bundle2.debug', False):
190 ui.debug('bundle2-input: %s\n' % message)
190 ui.debug('bundle2-input: %s\n' % message)
191
191
192 def validateparttype(parttype):
192 def validateparttype(parttype):
193 """raise ValueError if a parttype contains invalid character"""
193 """raise ValueError if a parttype contains invalid character"""
194 if _parttypeforbidden.search(parttype):
194 if _parttypeforbidden.search(parttype):
195 raise ValueError(parttype)
195 raise ValueError(parttype)
196
196
197 def _makefpartparamsizes(nbparams):
197 def _makefpartparamsizes(nbparams):
198 """return a struct format to read part parameter sizes
198 """return a struct format to read part parameter sizes
199
199
200 The number parameters is variable so we need to build that format
200 The number parameters is variable so we need to build that format
201 dynamically.
201 dynamically.
202 """
202 """
203 return '>'+('BB'*nbparams)
203 return '>'+('BB'*nbparams)
204
204
205 parthandlermapping = {}
205 parthandlermapping = {}
206
206
207 def parthandler(parttype, params=()):
207 def parthandler(parttype, params=()):
208 """decorator that register a function as a bundle2 part handler
208 """decorator that register a function as a bundle2 part handler
209
209
210 eg::
210 eg::
211
211
212 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
212 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
213 def myparttypehandler(...):
213 def myparttypehandler(...):
214 '''process a part of type "my part".'''
214 '''process a part of type "my part".'''
215 ...
215 ...
216 """
216 """
217 validateparttype(parttype)
217 validateparttype(parttype)
218 def _decorator(func):
218 def _decorator(func):
219 lparttype = parttype.lower() # enforce lower case matching.
219 lparttype = parttype.lower() # enforce lower case matching.
220 assert lparttype not in parthandlermapping
220 assert lparttype not in parthandlermapping
221 parthandlermapping[lparttype] = func
221 parthandlermapping[lparttype] = func
222 func.params = frozenset(params)
222 func.params = frozenset(params)
223 return func
223 return func
224 return _decorator
224 return _decorator
225
225
226 class unbundlerecords(object):
226 class unbundlerecords(object):
227 """keep record of what happens during and unbundle
227 """keep record of what happens during and unbundle
228
228
229 New records are added using `records.add('cat', obj)`. Where 'cat' is a
229 New records are added using `records.add('cat', obj)`. Where 'cat' is a
230 category of record and obj is an arbitrary object.
230 category of record and obj is an arbitrary object.
231
231
232 `records['cat']` will return all entries of this category 'cat'.
232 `records['cat']` will return all entries of this category 'cat'.
233
233
234 Iterating on the object itself will yield `('category', obj)` tuples
234 Iterating on the object itself will yield `('category', obj)` tuples
235 for all entries.
235 for all entries.
236
236
237 All iterations happens in chronological order.
237 All iterations happens in chronological order.
238 """
238 """
239
239
240 def __init__(self):
240 def __init__(self):
241 self._categories = {}
241 self._categories = {}
242 self._sequences = []
242 self._sequences = []
243 self._replies = {}
243 self._replies = {}
244
244
245 def add(self, category, entry, inreplyto=None):
245 def add(self, category, entry, inreplyto=None):
246 """add a new record of a given category.
246 """add a new record of a given category.
247
247
248 The entry can then be retrieved in the list returned by
248 The entry can then be retrieved in the list returned by
249 self['category']."""
249 self['category']."""
250 self._categories.setdefault(category, []).append(entry)
250 self._categories.setdefault(category, []).append(entry)
251 self._sequences.append((category, entry))
251 self._sequences.append((category, entry))
252 if inreplyto is not None:
252 if inreplyto is not None:
253 self.getreplies(inreplyto).add(category, entry)
253 self.getreplies(inreplyto).add(category, entry)
254
254
255 def getreplies(self, partid):
255 def getreplies(self, partid):
256 """get the records that are replies to a specific part"""
256 """get the records that are replies to a specific part"""
257 return self._replies.setdefault(partid, unbundlerecords())
257 return self._replies.setdefault(partid, unbundlerecords())
258
258
259 def __getitem__(self, cat):
259 def __getitem__(self, cat):
260 return tuple(self._categories.get(cat, ()))
260 return tuple(self._categories.get(cat, ()))
261
261
262 def __iter__(self):
262 def __iter__(self):
263 return iter(self._sequences)
263 return iter(self._sequences)
264
264
265 def __len__(self):
265 def __len__(self):
266 return len(self._sequences)
266 return len(self._sequences)
267
267
268 def __nonzero__(self):
268 def __nonzero__(self):
269 return bool(self._sequences)
269 return bool(self._sequences)
270
270
271 class bundleoperation(object):
271 class bundleoperation(object):
272 """an object that represents a single bundling process
272 """an object that represents a single bundling process
273
273
274 Its purpose is to carry unbundle-related objects and states.
274 Its purpose is to carry unbundle-related objects and states.
275
275
276 A new object should be created at the beginning of each bundle processing.
276 A new object should be created at the beginning of each bundle processing.
277 The object is to be returned by the processing function.
277 The object is to be returned by the processing function.
278
278
279 The object has very little content now it will ultimately contain:
279 The object has very little content now it will ultimately contain:
280 * an access to the repo the bundle is applied to,
280 * an access to the repo the bundle is applied to,
281 * a ui object,
281 * a ui object,
282 * a way to retrieve a transaction to add changes to the repo,
282 * a way to retrieve a transaction to add changes to the repo,
283 * a way to record the result of processing each part,
283 * a way to record the result of processing each part,
284 * a way to construct a bundle response when applicable.
284 * a way to construct a bundle response when applicable.
285 """
285 """
286
286
287 def __init__(self, repo, transactiongetter, captureoutput=True):
287 def __init__(self, repo, transactiongetter, captureoutput=True):
288 self.repo = repo
288 self.repo = repo
289 self.ui = repo.ui
289 self.ui = repo.ui
290 self.records = unbundlerecords()
290 self.records = unbundlerecords()
291 self.gettransaction = transactiongetter
291 self.gettransaction = transactiongetter
292 self.reply = None
292 self.reply = None
293 self.captureoutput = captureoutput
293 self.captureoutput = captureoutput
294
294
295 class TransactionUnavailable(RuntimeError):
295 class TransactionUnavailable(RuntimeError):
296 pass
296 pass
297
297
298 def _notransaction():
298 def _notransaction():
299 """default method to get a transaction while processing a bundle
299 """default method to get a transaction while processing a bundle
300
300
301 Raise an exception to highlight the fact that no transaction was expected
301 Raise an exception to highlight the fact that no transaction was expected
302 to be created"""
302 to be created"""
303 raise TransactionUnavailable()
303 raise TransactionUnavailable()
304
304
305 def processbundle(repo, unbundler, transactiongetter=None, op=None):
305 def processbundle(repo, unbundler, transactiongetter=None, op=None):
306 """This function process a bundle, apply effect to/from a repo
306 """This function process a bundle, apply effect to/from a repo
307
307
308 It iterates over each part then searches for and uses the proper handling
308 It iterates over each part then searches for and uses the proper handling
309 code to process the part. Parts are processed in order.
309 code to process the part. Parts are processed in order.
310
310
311 This is very early version of this function that will be strongly reworked
311 This is very early version of this function that will be strongly reworked
312 before final usage.
312 before final usage.
313
313
314 Unknown Mandatory part will abort the process.
314 Unknown Mandatory part will abort the process.
315
315
316 It is temporarily possible to provide a prebuilt bundleoperation to the
316 It is temporarily possible to provide a prebuilt bundleoperation to the
317 function. This is used to ensure output is properly propagated in case of
317 function. This is used to ensure output is properly propagated in case of
318 an error during the unbundling. This output capturing part will likely be
318 an error during the unbundling. This output capturing part will likely be
319 reworked and this ability will probably go away in the process.
319 reworked and this ability will probably go away in the process.
320 """
320 """
321 if op is None:
321 if op is None:
322 if transactiongetter is None:
322 if transactiongetter is None:
323 transactiongetter = _notransaction
323 transactiongetter = _notransaction
324 op = bundleoperation(repo, transactiongetter)
324 op = bundleoperation(repo, transactiongetter)
325 # todo:
325 # todo:
326 # - replace this is a init function soon.
326 # - replace this is a init function soon.
327 # - exception catching
327 # - exception catching
328 unbundler.params
328 unbundler.params
329 if repo.ui.debugflag:
329 if repo.ui.debugflag:
330 msg = ['bundle2-input-bundle:']
330 msg = ['bundle2-input-bundle:']
331 if unbundler.params:
331 if unbundler.params:
332 msg.append(' %i params')
332 msg.append(' %i params')
333 if op.gettransaction is None:
333 if op.gettransaction is None:
334 msg.append(' no-transaction')
334 msg.append(' no-transaction')
335 else:
335 else:
336 msg.append(' with-transaction')
336 msg.append(' with-transaction')
337 msg.append('\n')
337 msg.append('\n')
338 repo.ui.debug(''.join(msg))
338 repo.ui.debug(''.join(msg))
339 iterparts = enumerate(unbundler.iterparts())
339 iterparts = enumerate(unbundler.iterparts())
340 part = None
340 part = None
341 nbpart = 0
341 nbpart = 0
342 try:
342 try:
343 for nbpart, part in iterparts:
343 for nbpart, part in iterparts:
344 _processpart(op, part)
344 _processpart(op, part)
345 except BaseException as exc:
345 except BaseException as exc:
346 for nbpart, part in iterparts:
346 for nbpart, part in iterparts:
347 # consume the bundle content
347 # consume the bundle content
348 part.seek(0, 2)
348 part.seek(0, 2)
349 # Small hack to let caller code distinguish exceptions from bundle2
349 # Small hack to let caller code distinguish exceptions from bundle2
350 # processing from processing the old format. This is mostly
350 # processing from processing the old format. This is mostly
351 # needed to handle different return codes to unbundle according to the
351 # needed to handle different return codes to unbundle according to the
352 # type of bundle. We should probably clean up or drop this return code
352 # type of bundle. We should probably clean up or drop this return code
353 # craziness in a future version.
353 # craziness in a future version.
354 exc.duringunbundle2 = True
354 exc.duringunbundle2 = True
355 salvaged = []
355 salvaged = []
356 replycaps = None
356 replycaps = None
357 if op.reply is not None:
357 if op.reply is not None:
358 salvaged = op.reply.salvageoutput()
358 salvaged = op.reply.salvageoutput()
359 replycaps = op.reply.capabilities
359 replycaps = op.reply.capabilities
360 exc._replycaps = replycaps
360 exc._replycaps = replycaps
361 exc._bundle2salvagedoutput = salvaged
361 exc._bundle2salvagedoutput = salvaged
362 raise
362 raise
363 finally:
363 finally:
364 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
364 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
365
365
366 return op
366 return op
367
367
368 def _processpart(op, part):
368 def _processpart(op, part):
369 """process a single part from a bundle
369 """process a single part from a bundle
370
370
371 The part is guaranteed to have been fully consumed when the function exits
371 The part is guaranteed to have been fully consumed when the function exits
372 (even if an exception is raised)."""
372 (even if an exception is raised)."""
373 status = 'unknown' # used by debug output
373 status = 'unknown' # used by debug output
374 try:
374 try:
375 try:
375 try:
376 handler = parthandlermapping.get(part.type)
376 handler = parthandlermapping.get(part.type)
377 if handler is None:
377 if handler is None:
378 status = 'unsupported-type'
378 status = 'unsupported-type'
379 raise error.BundleUnknownFeatureError(parttype=part.type)
379 raise error.BundleUnknownFeatureError(parttype=part.type)
380 indebug(op.ui, 'found a handler for part %r' % part.type)
380 indebug(op.ui, 'found a handler for part %r' % part.type)
381 unknownparams = part.mandatorykeys - handler.params
381 unknownparams = part.mandatorykeys - handler.params
382 if unknownparams:
382 if unknownparams:
383 unknownparams = list(unknownparams)
383 unknownparams = list(unknownparams)
384 unknownparams.sort()
384 unknownparams.sort()
385 status = 'unsupported-params (%s)' % unknownparams
385 status = 'unsupported-params (%s)' % unknownparams
386 raise error.BundleUnknownFeatureError(parttype=part.type,
386 raise error.BundleUnknownFeatureError(parttype=part.type,
387 params=unknownparams)
387 params=unknownparams)
388 status = 'supported'
388 status = 'supported'
389 except error.BundleUnknownFeatureError as exc:
389 except error.BundleUnknownFeatureError as exc:
390 if part.mandatory: # mandatory parts
390 if part.mandatory: # mandatory parts
391 raise
391 raise
392 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
392 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
393 return # skip to part processing
393 return # skip to part processing
394 finally:
394 finally:
395 if op.ui.debugflag:
395 if op.ui.debugflag:
396 msg = ['bundle2-input-part: "%s"' % part.type]
396 msg = ['bundle2-input-part: "%s"' % part.type]
397 if not part.mandatory:
397 if not part.mandatory:
398 msg.append(' (advisory)')
398 msg.append(' (advisory)')
399 nbmp = len(part.mandatorykeys)
399 nbmp = len(part.mandatorykeys)
400 nbap = len(part.params) - nbmp
400 nbap = len(part.params) - nbmp
401 if nbmp or nbap:
401 if nbmp or nbap:
402 msg.append(' (params:')
402 msg.append(' (params:')
403 if nbmp:
403 if nbmp:
404 msg.append(' %i mandatory' % nbmp)
404 msg.append(' %i mandatory' % nbmp)
405 if nbap:
405 if nbap:
406 msg.append(' %i advisory' % nbmp)
406 msg.append(' %i advisory' % nbmp)
407 msg.append(')')
407 msg.append(')')
408 msg.append(' %s\n' % status)
408 msg.append(' %s\n' % status)
409 op.ui.debug(''.join(msg))
409 op.ui.debug(''.join(msg))
410
410
411 # handler is called outside the above try block so that we don't
411 # handler is called outside the above try block so that we don't
412 # risk catching KeyErrors from anything other than the
412 # risk catching KeyErrors from anything other than the
413 # parthandlermapping lookup (any KeyError raised by handler()
413 # parthandlermapping lookup (any KeyError raised by handler()
414 # itself represents a defect of a different variety).
414 # itself represents a defect of a different variety).
415 output = None
415 output = None
416 if op.captureoutput and op.reply is not None:
416 if op.captureoutput and op.reply is not None:
417 op.ui.pushbuffer(error=True, subproc=True)
417 op.ui.pushbuffer(error=True, subproc=True)
418 output = ''
418 output = ''
419 try:
419 try:
420 handler(op, part)
420 handler(op, part)
421 finally:
421 finally:
422 if output is not None:
422 if output is not None:
423 output = op.ui.popbuffer()
423 output = op.ui.popbuffer()
424 if output:
424 if output:
425 outpart = op.reply.newpart('output', data=output,
425 outpart = op.reply.newpart('output', data=output,
426 mandatory=False)
426 mandatory=False)
427 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
427 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
428 finally:
428 finally:
429 # consume the part content to not corrupt the stream.
429 # consume the part content to not corrupt the stream.
430 part.seek(0, 2)
430 part.seek(0, 2)
431
431
432
432
433 def decodecaps(blob):
433 def decodecaps(blob):
434 """decode a bundle2 caps bytes blob into a dictionary
434 """decode a bundle2 caps bytes blob into a dictionary
435
435
436 The blob is a list of capabilities (one per line)
436 The blob is a list of capabilities (one per line)
437 Capabilities may have values using a line of the form::
437 Capabilities may have values using a line of the form::
438
438
439 capability=value1,value2,value3
439 capability=value1,value2,value3
440
440
441 The values are always a list."""
441 The values are always a list."""
442 caps = {}
442 caps = {}
443 for line in blob.splitlines():
443 for line in blob.splitlines():
444 if not line:
444 if not line:
445 continue
445 continue
446 if '=' not in line:
446 if '=' not in line:
447 key, vals = line, ()
447 key, vals = line, ()
448 else:
448 else:
449 key, vals = line.split('=', 1)
449 key, vals = line.split('=', 1)
450 vals = vals.split(',')
450 vals = vals.split(',')
451 key = urllib.unquote(key)
451 key = urllib.unquote(key)
452 vals = [urllib.unquote(v) for v in vals]
452 vals = [urllib.unquote(v) for v in vals]
453 caps[key] = vals
453 caps[key] = vals
454 return caps
454 return caps
455
455
456 def encodecaps(caps):
456 def encodecaps(caps):
457 """encode a bundle2 caps dictionary into a bytes blob"""
457 """encode a bundle2 caps dictionary into a bytes blob"""
458 chunks = []
458 chunks = []
459 for ca in sorted(caps):
459 for ca in sorted(caps):
460 vals = caps[ca]
460 vals = caps[ca]
461 ca = urllib.quote(ca)
461 ca = urllib.quote(ca)
462 vals = [urllib.quote(v) for v in vals]
462 vals = [urllib.quote(v) for v in vals]
463 if vals:
463 if vals:
464 ca = "%s=%s" % (ca, ','.join(vals))
464 ca = "%s=%s" % (ca, ','.join(vals))
465 chunks.append(ca)
465 chunks.append(ca)
466 return '\n'.join(chunks)
466 return '\n'.join(chunks)
467
467
468 class bundle20(object):
468 class bundle20(object):
469 """represent an outgoing bundle2 container
469 """represent an outgoing bundle2 container
470
470
471 Use the `addparam` method to add stream level parameter. and `newpart` to
471 Use the `addparam` method to add stream level parameter. and `newpart` to
472 populate it. Then call `getchunks` to retrieve all the binary chunks of
472 populate it. Then call `getchunks` to retrieve all the binary chunks of
473 data that compose the bundle2 container."""
473 data that compose the bundle2 container."""
474
474
475 _magicstring = 'HG20'
475 _magicstring = 'HG20'
476
476
477 def __init__(self, ui, capabilities=()):
477 def __init__(self, ui, capabilities=()):
478 self.ui = ui
478 self.ui = ui
479 self._params = []
479 self._params = []
480 self._parts = []
480 self._parts = []
481 self.capabilities = dict(capabilities)
481 self.capabilities = dict(capabilities)
482
482
483 @property
483 @property
484 def nbparts(self):
484 def nbparts(self):
485 """total number of parts added to the bundler"""
485 """total number of parts added to the bundler"""
486 return len(self._parts)
486 return len(self._parts)
487
487
488 # methods used to defines the bundle2 content
488 # methods used to defines the bundle2 content
489 def addparam(self, name, value=None):
489 def addparam(self, name, value=None):
490 """add a stream level parameter"""
490 """add a stream level parameter"""
491 if not name:
491 if not name:
492 raise ValueError('empty parameter name')
492 raise ValueError('empty parameter name')
493 if name[0] not in string.letters:
493 if name[0] not in string.letters:
494 raise ValueError('non letter first character: %r' % name)
494 raise ValueError('non letter first character: %r' % name)
495 self._params.append((name, value))
495 self._params.append((name, value))
496
496
497 def addpart(self, part):
497 def addpart(self, part):
498 """add a new part to the bundle2 container
498 """add a new part to the bundle2 container
499
499
500 Parts contains the actual applicative payload."""
500 Parts contains the actual applicative payload."""
501 assert part.id is None
501 assert part.id is None
502 part.id = len(self._parts) # very cheap counter
502 part.id = len(self._parts) # very cheap counter
503 self._parts.append(part)
503 self._parts.append(part)
504
504
505 def newpart(self, typeid, *args, **kwargs):
505 def newpart(self, typeid, *args, **kwargs):
506 """create a new part and add it to the containers
506 """create a new part and add it to the containers
507
507
508 As the part is directly added to the containers. For now, this means
508 As the part is directly added to the containers. For now, this means
509 that any failure to properly initialize the part after calling
509 that any failure to properly initialize the part after calling
510 ``newpart`` should result in a failure of the whole bundling process.
510 ``newpart`` should result in a failure of the whole bundling process.
511
511
512 You can still fall back to manually create and add if you need better
512 You can still fall back to manually create and add if you need better
513 control."""
513 control."""
514 part = bundlepart(typeid, *args, **kwargs)
514 part = bundlepart(typeid, *args, **kwargs)
515 self.addpart(part)
515 self.addpart(part)
516 return part
516 return part
517
517
518 # methods used to generate the bundle2 stream
518 # methods used to generate the bundle2 stream
519 def getchunks(self):
519 def getchunks(self):
520 if self.ui.debugflag:
520 if self.ui.debugflag:
521 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
521 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
522 if self._params:
522 if self._params:
523 msg.append(' (%i params)' % len(self._params))
523 msg.append(' (%i params)' % len(self._params))
524 msg.append(' %i parts total\n' % len(self._parts))
524 msg.append(' %i parts total\n' % len(self._parts))
525 self.ui.debug(''.join(msg))
525 self.ui.debug(''.join(msg))
526 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
526 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
527 yield self._magicstring
527 yield self._magicstring
528 param = self._paramchunk()
528 param = self._paramchunk()
529 outdebug(self.ui, 'bundle parameter: %s' % param)
529 outdebug(self.ui, 'bundle parameter: %s' % param)
530 yield _pack(_fstreamparamsize, len(param))
530 yield _pack(_fstreamparamsize, len(param))
531 if param:
531 if param:
532 yield param
532 yield param
533
533 for chunk in self._getcorechunk():
534 outdebug(self.ui, 'start of parts')
534 yield chunk
535 for part in self._parts:
536 outdebug(self.ui, 'bundle part: "%s"' % part.type)
537 for chunk in part.getchunks(ui=self.ui):
538 yield chunk
539 outdebug(self.ui, 'end of bundle')
540 yield _pack(_fpartheadersize, 0)
541
535
542 def _paramchunk(self):
536 def _paramchunk(self):
543 """return a encoded version of all stream parameters"""
537 """return a encoded version of all stream parameters"""
544 blocks = []
538 blocks = []
545 for par, value in self._params:
539 for par, value in self._params:
546 par = urllib.quote(par)
540 par = urllib.quote(par)
547 if value is not None:
541 if value is not None:
548 value = urllib.quote(value)
542 value = urllib.quote(value)
549 par = '%s=%s' % (par, value)
543 par = '%s=%s' % (par, value)
550 blocks.append(par)
544 blocks.append(par)
551 return ' '.join(blocks)
545 return ' '.join(blocks)
552
546
547 def _getcorechunk(self):
548 """yield chunk for the core part of the bundle
549
550 (all but headers and parameters)"""
551 outdebug(self.ui, 'start of parts')
552 for part in self._parts:
553 outdebug(self.ui, 'bundle part: "%s"' % part.type)
554 for chunk in part.getchunks(ui=self.ui):
555 yield chunk
556 outdebug(self.ui, 'end of bundle')
557 yield _pack(_fpartheadersize, 0)
558
559
553 def salvageoutput(self):
560 def salvageoutput(self):
554 """return a list with a copy of all output parts in the bundle
561 """return a list with a copy of all output parts in the bundle
555
562
556 This is meant to be used during error handling to make sure we preserve
563 This is meant to be used during error handling to make sure we preserve
557 server output"""
564 server output"""
558 salvaged = []
565 salvaged = []
559 for part in self._parts:
566 for part in self._parts:
560 if part.type.startswith('output'):
567 if part.type.startswith('output'):
561 salvaged.append(part.copy())
568 salvaged.append(part.copy())
562 return salvaged
569 return salvaged
563
570
564
571
565 class unpackermixin(object):
572 class unpackermixin(object):
566 """A mixin to extract bytes and struct data from a stream"""
573 """A mixin to extract bytes and struct data from a stream"""
567
574
568 def __init__(self, fp):
575 def __init__(self, fp):
569 self._fp = fp
576 self._fp = fp
570 self._seekable = (util.safehasattr(fp, 'seek') and
577 self._seekable = (util.safehasattr(fp, 'seek') and
571 util.safehasattr(fp, 'tell'))
578 util.safehasattr(fp, 'tell'))
572
579
573 def _unpack(self, format):
580 def _unpack(self, format):
574 """unpack this struct format from the stream"""
581 """unpack this struct format from the stream"""
575 data = self._readexact(struct.calcsize(format))
582 data = self._readexact(struct.calcsize(format))
576 return _unpack(format, data)
583 return _unpack(format, data)
577
584
578 def _readexact(self, size):
585 def _readexact(self, size):
579 """read exactly <size> bytes from the stream"""
586 """read exactly <size> bytes from the stream"""
580 return changegroup.readexactly(self._fp, size)
587 return changegroup.readexactly(self._fp, size)
581
588
582 def seek(self, offset, whence=0):
589 def seek(self, offset, whence=0):
583 """move the underlying file pointer"""
590 """move the underlying file pointer"""
584 if self._seekable:
591 if self._seekable:
585 return self._fp.seek(offset, whence)
592 return self._fp.seek(offset, whence)
586 else:
593 else:
587 raise NotImplementedError(_('File pointer is not seekable'))
594 raise NotImplementedError(_('File pointer is not seekable'))
588
595
589 def tell(self):
596 def tell(self):
590 """return the file offset, or None if file is not seekable"""
597 """return the file offset, or None if file is not seekable"""
591 if self._seekable:
598 if self._seekable:
592 try:
599 try:
593 return self._fp.tell()
600 return self._fp.tell()
594 except IOError as e:
601 except IOError as e:
595 if e.errno == errno.ESPIPE:
602 if e.errno == errno.ESPIPE:
596 self._seekable = False
603 self._seekable = False
597 else:
604 else:
598 raise
605 raise
599 return None
606 return None
600
607
601 def close(self):
608 def close(self):
602 """close underlying file"""
609 """close underlying file"""
603 if util.safehasattr(self._fp, 'close'):
610 if util.safehasattr(self._fp, 'close'):
604 return self._fp.close()
611 return self._fp.close()
605
612
606 def getunbundler(ui, fp, magicstring=None):
613 def getunbundler(ui, fp, magicstring=None):
607 """return a valid unbundler object for a given magicstring"""
614 """return a valid unbundler object for a given magicstring"""
608 if magicstring is None:
615 if magicstring is None:
609 magicstring = changegroup.readexactly(fp, 4)
616 magicstring = changegroup.readexactly(fp, 4)
610 magic, version = magicstring[0:2], magicstring[2:4]
617 magic, version = magicstring[0:2], magicstring[2:4]
611 if magic != 'HG':
618 if magic != 'HG':
612 raise util.Abort(_('not a Mercurial bundle'))
619 raise util.Abort(_('not a Mercurial bundle'))
613 unbundlerclass = formatmap.get(version)
620 unbundlerclass = formatmap.get(version)
614 if unbundlerclass is None:
621 if unbundlerclass is None:
615 raise util.Abort(_('unknown bundle version %s') % version)
622 raise util.Abort(_('unknown bundle version %s') % version)
616 unbundler = unbundlerclass(ui, fp)
623 unbundler = unbundlerclass(ui, fp)
617 indebug(ui, 'start processing of %s stream' % magicstring)
624 indebug(ui, 'start processing of %s stream' % magicstring)
618 return unbundler
625 return unbundler
619
626
620 class unbundle20(unpackermixin):
627 class unbundle20(unpackermixin):
621 """interpret a bundle2 stream
628 """interpret a bundle2 stream
622
629
623 This class is fed with a binary stream and yields parts through its
630 This class is fed with a binary stream and yields parts through its
624 `iterparts` methods."""
631 `iterparts` methods."""
625
632
626 def __init__(self, ui, fp):
633 def __init__(self, ui, fp):
627 """If header is specified, we do not read it out of the stream."""
634 """If header is specified, we do not read it out of the stream."""
628 self.ui = ui
635 self.ui = ui
629 super(unbundle20, self).__init__(fp)
636 super(unbundle20, self).__init__(fp)
630
637
631 @util.propertycache
638 @util.propertycache
632 def params(self):
639 def params(self):
633 """dictionary of stream level parameters"""
640 """dictionary of stream level parameters"""
634 indebug(self.ui, 'reading bundle2 stream parameters')
641 indebug(self.ui, 'reading bundle2 stream parameters')
635 params = {}
642 params = {}
636 paramssize = self._unpack(_fstreamparamsize)[0]
643 paramssize = self._unpack(_fstreamparamsize)[0]
637 if paramssize < 0:
644 if paramssize < 0:
638 raise error.BundleValueError('negative bundle param size: %i'
645 raise error.BundleValueError('negative bundle param size: %i'
639 % paramssize)
646 % paramssize)
640 if paramssize:
647 if paramssize:
641 for p in self._readexact(paramssize).split(' '):
648 for p in self._readexact(paramssize).split(' '):
642 p = p.split('=', 1)
649 p = p.split('=', 1)
643 p = [urllib.unquote(i) for i in p]
650 p = [urllib.unquote(i) for i in p]
644 if len(p) < 2:
651 if len(p) < 2:
645 p.append(None)
652 p.append(None)
646 self._processparam(*p)
653 self._processparam(*p)
647 params[p[0]] = p[1]
654 params[p[0]] = p[1]
648 return params
655 return params
649
656
650 def _processparam(self, name, value):
657 def _processparam(self, name, value):
651 """process a parameter, applying its effect if needed
658 """process a parameter, applying its effect if needed
652
659
653 Parameter starting with a lower case letter are advisory and will be
660 Parameter starting with a lower case letter are advisory and will be
654 ignored when unknown. Those starting with an upper case letter are
661 ignored when unknown. Those starting with an upper case letter are
655 mandatory and will this function will raise a KeyError when unknown.
662 mandatory and will this function will raise a KeyError when unknown.
656
663
657 Note: no option are currently supported. Any input will be either
664 Note: no option are currently supported. Any input will be either
658 ignored or failing.
665 ignored or failing.
659 """
666 """
660 if not name:
667 if not name:
661 raise ValueError('empty parameter name')
668 raise ValueError('empty parameter name')
662 if name[0] not in string.letters:
669 if name[0] not in string.letters:
663 raise ValueError('non letter first character: %r' % name)
670 raise ValueError('non letter first character: %r' % name)
664 try:
671 try:
665 handler = b2streamparamsmap[name.lower()]
672 handler = b2streamparamsmap[name.lower()]
666 except KeyError:
673 except KeyError:
667 if name[0].islower():
674 if name[0].islower():
668 indebug(self.ui, "ignoring unknown parameter %r" % name)
675 indebug(self.ui, "ignoring unknown parameter %r" % name)
669 else:
676 else:
670 raise error.BundleUnknownFeatureError(params=(name,))
677 raise error.BundleUnknownFeatureError(params=(name,))
671 else:
678 else:
672 handler(self, name, value)
679 handler(self, name, value)
673
680
674 def iterparts(self):
681 def iterparts(self):
675 """yield all parts contained in the stream"""
682 """yield all parts contained in the stream"""
676 # make sure param have been loaded
683 # make sure param have been loaded
677 self.params
684 self.params
678 indebug(self.ui, 'start extraction of bundle2 parts')
685 indebug(self.ui, 'start extraction of bundle2 parts')
679 headerblock = self._readpartheader()
686 headerblock = self._readpartheader()
680 while headerblock is not None:
687 while headerblock is not None:
681 part = unbundlepart(self.ui, headerblock, self._fp)
688 part = unbundlepart(self.ui, headerblock, self._fp)
682 yield part
689 yield part
683 part.seek(0, 2)
690 part.seek(0, 2)
684 headerblock = self._readpartheader()
691 headerblock = self._readpartheader()
685 indebug(self.ui, 'end of bundle2 stream')
692 indebug(self.ui, 'end of bundle2 stream')
686
693
687 def _readpartheader(self):
694 def _readpartheader(self):
688 """reads a part header size and return the bytes blob
695 """reads a part header size and return the bytes blob
689
696
690 returns None if empty"""
697 returns None if empty"""
691 headersize = self._unpack(_fpartheadersize)[0]
698 headersize = self._unpack(_fpartheadersize)[0]
692 if headersize < 0:
699 if headersize < 0:
693 raise error.BundleValueError('negative part header size: %i'
700 raise error.BundleValueError('negative part header size: %i'
694 % headersize)
701 % headersize)
695 indebug(self.ui, 'part header size: %i' % headersize)
702 indebug(self.ui, 'part header size: %i' % headersize)
696 if headersize:
703 if headersize:
697 return self._readexact(headersize)
704 return self._readexact(headersize)
698 return None
705 return None
699
706
700 def compressed(self):
707 def compressed(self):
701 return False
708 return False
702
709
703 formatmap = {'20': unbundle20}
710 formatmap = {'20': unbundle20}
704
711
705 b2streamparamsmap = {}
712 b2streamparamsmap = {}
706
713
707 def b2streamparamhandler(name):
714 def b2streamparamhandler(name):
708 """register a handler for a stream level parameter"""
715 """register a handler for a stream level parameter"""
709 def decorator(func):
716 def decorator(func):
710 assert name not in formatmap
717 assert name not in formatmap
711 b2streamparamsmap[name] = func
718 b2streamparamsmap[name] = func
712 return func
719 return func
713 return decorator
720 return decorator
714
721
715 class bundlepart(object):
722 class bundlepart(object):
716 """A bundle2 part contains application level payload
723 """A bundle2 part contains application level payload
717
724
718 The part `type` is used to route the part to the application level
725 The part `type` is used to route the part to the application level
719 handler.
726 handler.
720
727
721 The part payload is contained in ``part.data``. It could be raw bytes or a
728 The part payload is contained in ``part.data``. It could be raw bytes or a
722 generator of byte chunks.
729 generator of byte chunks.
723
730
724 You can add parameters to the part using the ``addparam`` method.
731 You can add parameters to the part using the ``addparam`` method.
725 Parameters can be either mandatory (default) or advisory. Remote side
732 Parameters can be either mandatory (default) or advisory. Remote side
726 should be able to safely ignore the advisory ones.
733 should be able to safely ignore the advisory ones.
727
734
728 Both data and parameters cannot be modified after the generation has begun.
735 Both data and parameters cannot be modified after the generation has begun.
729 """
736 """
730
737
731 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
738 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
732 data='', mandatory=True):
739 data='', mandatory=True):
733 validateparttype(parttype)
740 validateparttype(parttype)
734 self.id = None
741 self.id = None
735 self.type = parttype
742 self.type = parttype
736 self._data = data
743 self._data = data
737 self._mandatoryparams = list(mandatoryparams)
744 self._mandatoryparams = list(mandatoryparams)
738 self._advisoryparams = list(advisoryparams)
745 self._advisoryparams = list(advisoryparams)
739 # checking for duplicated entries
746 # checking for duplicated entries
740 self._seenparams = set()
747 self._seenparams = set()
741 for pname, __ in self._mandatoryparams + self._advisoryparams:
748 for pname, __ in self._mandatoryparams + self._advisoryparams:
742 if pname in self._seenparams:
749 if pname in self._seenparams:
743 raise RuntimeError('duplicated params: %s' % pname)
750 raise RuntimeError('duplicated params: %s' % pname)
744 self._seenparams.add(pname)
751 self._seenparams.add(pname)
745 # status of the part's generation:
752 # status of the part's generation:
746 # - None: not started,
753 # - None: not started,
747 # - False: currently generated,
754 # - False: currently generated,
748 # - True: generation done.
755 # - True: generation done.
749 self._generated = None
756 self._generated = None
750 self.mandatory = mandatory
757 self.mandatory = mandatory
751
758
752 def copy(self):
759 def copy(self):
753 """return a copy of the part
760 """return a copy of the part
754
761
755 The new part have the very same content but no partid assigned yet.
762 The new part have the very same content but no partid assigned yet.
756 Parts with generated data cannot be copied."""
763 Parts with generated data cannot be copied."""
757 assert not util.safehasattr(self.data, 'next')
764 assert not util.safehasattr(self.data, 'next')
758 return self.__class__(self.type, self._mandatoryparams,
765 return self.__class__(self.type, self._mandatoryparams,
759 self._advisoryparams, self._data, self.mandatory)
766 self._advisoryparams, self._data, self.mandatory)
760
767
761 # methods used to defines the part content
768 # methods used to defines the part content
762 def __setdata(self, data):
769 def __setdata(self, data):
763 if self._generated is not None:
770 if self._generated is not None:
764 raise error.ReadOnlyPartError('part is being generated')
771 raise error.ReadOnlyPartError('part is being generated')
765 self._data = data
772 self._data = data
766 def __getdata(self):
773 def __getdata(self):
767 return self._data
774 return self._data
768 data = property(__getdata, __setdata)
775 data = property(__getdata, __setdata)
769
776
770 @property
777 @property
771 def mandatoryparams(self):
778 def mandatoryparams(self):
772 # make it an immutable tuple to force people through ``addparam``
779 # make it an immutable tuple to force people through ``addparam``
773 return tuple(self._mandatoryparams)
780 return tuple(self._mandatoryparams)
774
781
775 @property
782 @property
776 def advisoryparams(self):
783 def advisoryparams(self):
777 # make it an immutable tuple to force people through ``addparam``
784 # make it an immutable tuple to force people through ``addparam``
778 return tuple(self._advisoryparams)
785 return tuple(self._advisoryparams)
779
786
780 def addparam(self, name, value='', mandatory=True):
787 def addparam(self, name, value='', mandatory=True):
781 if self._generated is not None:
788 if self._generated is not None:
782 raise error.ReadOnlyPartError('part is being generated')
789 raise error.ReadOnlyPartError('part is being generated')
783 if name in self._seenparams:
790 if name in self._seenparams:
784 raise ValueError('duplicated params: %s' % name)
791 raise ValueError('duplicated params: %s' % name)
785 self._seenparams.add(name)
792 self._seenparams.add(name)
786 params = self._advisoryparams
793 params = self._advisoryparams
787 if mandatory:
794 if mandatory:
788 params = self._mandatoryparams
795 params = self._mandatoryparams
789 params.append((name, value))
796 params.append((name, value))
790
797
791 # methods used to generates the bundle2 stream
798 # methods used to generates the bundle2 stream
792 def getchunks(self, ui):
799 def getchunks(self, ui):
793 if self._generated is not None:
800 if self._generated is not None:
794 raise RuntimeError('part can only be consumed once')
801 raise RuntimeError('part can only be consumed once')
795 self._generated = False
802 self._generated = False
796
803
797 if ui.debugflag:
804 if ui.debugflag:
798 msg = ['bundle2-output-part: "%s"' % self.type]
805 msg = ['bundle2-output-part: "%s"' % self.type]
799 if not self.mandatory:
806 if not self.mandatory:
800 msg.append(' (advisory)')
807 msg.append(' (advisory)')
801 nbmp = len(self.mandatoryparams)
808 nbmp = len(self.mandatoryparams)
802 nbap = len(self.advisoryparams)
809 nbap = len(self.advisoryparams)
803 if nbmp or nbap:
810 if nbmp or nbap:
804 msg.append(' (params:')
811 msg.append(' (params:')
805 if nbmp:
812 if nbmp:
806 msg.append(' %i mandatory' % nbmp)
813 msg.append(' %i mandatory' % nbmp)
807 if nbap:
814 if nbap:
808 msg.append(' %i advisory' % nbmp)
815 msg.append(' %i advisory' % nbmp)
809 msg.append(')')
816 msg.append(')')
810 if not self.data:
817 if not self.data:
811 msg.append(' empty payload')
818 msg.append(' empty payload')
812 elif util.safehasattr(self.data, 'next'):
819 elif util.safehasattr(self.data, 'next'):
813 msg.append(' streamed payload')
820 msg.append(' streamed payload')
814 else:
821 else:
815 msg.append(' %i bytes payload' % len(self.data))
822 msg.append(' %i bytes payload' % len(self.data))
816 msg.append('\n')
823 msg.append('\n')
817 ui.debug(''.join(msg))
824 ui.debug(''.join(msg))
818
825
819 #### header
826 #### header
820 if self.mandatory:
827 if self.mandatory:
821 parttype = self.type.upper()
828 parttype = self.type.upper()
822 else:
829 else:
823 parttype = self.type.lower()
830 parttype = self.type.lower()
824 outdebug(ui, 'part %s: "%s"' % (self.id, parttype))
831 outdebug(ui, 'part %s: "%s"' % (self.id, parttype))
825 ## parttype
832 ## parttype
826 header = [_pack(_fparttypesize, len(parttype)),
833 header = [_pack(_fparttypesize, len(parttype)),
827 parttype, _pack(_fpartid, self.id),
834 parttype, _pack(_fpartid, self.id),
828 ]
835 ]
829 ## parameters
836 ## parameters
830 # count
837 # count
831 manpar = self.mandatoryparams
838 manpar = self.mandatoryparams
832 advpar = self.advisoryparams
839 advpar = self.advisoryparams
833 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
840 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
834 # size
841 # size
835 parsizes = []
842 parsizes = []
836 for key, value in manpar:
843 for key, value in manpar:
837 parsizes.append(len(key))
844 parsizes.append(len(key))
838 parsizes.append(len(value))
845 parsizes.append(len(value))
839 for key, value in advpar:
846 for key, value in advpar:
840 parsizes.append(len(key))
847 parsizes.append(len(key))
841 parsizes.append(len(value))
848 parsizes.append(len(value))
842 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
849 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
843 header.append(paramsizes)
850 header.append(paramsizes)
844 # key, value
851 # key, value
845 for key, value in manpar:
852 for key, value in manpar:
846 header.append(key)
853 header.append(key)
847 header.append(value)
854 header.append(value)
848 for key, value in advpar:
855 for key, value in advpar:
849 header.append(key)
856 header.append(key)
850 header.append(value)
857 header.append(value)
851 ## finalize header
858 ## finalize header
852 headerchunk = ''.join(header)
859 headerchunk = ''.join(header)
853 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
860 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
854 yield _pack(_fpartheadersize, len(headerchunk))
861 yield _pack(_fpartheadersize, len(headerchunk))
855 yield headerchunk
862 yield headerchunk
856 ## payload
863 ## payload
857 try:
864 try:
858 for chunk in self._payloadchunks():
865 for chunk in self._payloadchunks():
859 outdebug(ui, 'payload chunk size: %i' % len(chunk))
866 outdebug(ui, 'payload chunk size: %i' % len(chunk))
860 yield _pack(_fpayloadsize, len(chunk))
867 yield _pack(_fpayloadsize, len(chunk))
861 yield chunk
868 yield chunk
862 except GeneratorExit:
869 except GeneratorExit:
863 # GeneratorExit means that nobody is listening for our
870 # GeneratorExit means that nobody is listening for our
864 # results anyway, so just bail quickly rather than trying
871 # results anyway, so just bail quickly rather than trying
865 # to produce an error part.
872 # to produce an error part.
866 ui.debug('bundle2-generatorexit\n')
873 ui.debug('bundle2-generatorexit\n')
867 raise
874 raise
868 except BaseException as exc:
875 except BaseException as exc:
869 # backup exception data for later
876 # backup exception data for later
870 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
877 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
871 % exc)
878 % exc)
872 exc_info = sys.exc_info()
879 exc_info = sys.exc_info()
873 msg = 'unexpected error: %s' % exc
880 msg = 'unexpected error: %s' % exc
874 interpart = bundlepart('error:abort', [('message', msg)],
881 interpart = bundlepart('error:abort', [('message', msg)],
875 mandatory=False)
882 mandatory=False)
876 interpart.id = 0
883 interpart.id = 0
877 yield _pack(_fpayloadsize, -1)
884 yield _pack(_fpayloadsize, -1)
878 for chunk in interpart.getchunks(ui=ui):
885 for chunk in interpart.getchunks(ui=ui):
879 yield chunk
886 yield chunk
880 outdebug(ui, 'closing payload chunk')
887 outdebug(ui, 'closing payload chunk')
881 # abort current part payload
888 # abort current part payload
882 yield _pack(_fpayloadsize, 0)
889 yield _pack(_fpayloadsize, 0)
883 raise exc_info[0], exc_info[1], exc_info[2]
890 raise exc_info[0], exc_info[1], exc_info[2]
884 # end of payload
891 # end of payload
885 outdebug(ui, 'closing payload chunk')
892 outdebug(ui, 'closing payload chunk')
886 yield _pack(_fpayloadsize, 0)
893 yield _pack(_fpayloadsize, 0)
887 self._generated = True
894 self._generated = True
888
895
889 def _payloadchunks(self):
896 def _payloadchunks(self):
890 """yield chunks of a the part payload
897 """yield chunks of a the part payload
891
898
892 Exists to handle the different methods to provide data to a part."""
899 Exists to handle the different methods to provide data to a part."""
893 # we only support fixed size data now.
900 # we only support fixed size data now.
894 # This will be improved in the future.
901 # This will be improved in the future.
895 if util.safehasattr(self.data, 'next'):
902 if util.safehasattr(self.data, 'next'):
896 buff = util.chunkbuffer(self.data)
903 buff = util.chunkbuffer(self.data)
897 chunk = buff.read(preferedchunksize)
904 chunk = buff.read(preferedchunksize)
898 while chunk:
905 while chunk:
899 yield chunk
906 yield chunk
900 chunk = buff.read(preferedchunksize)
907 chunk = buff.read(preferedchunksize)
901 elif len(self.data):
908 elif len(self.data):
902 yield self.data
909 yield self.data
903
910
904
911
905 flaginterrupt = -1
912 flaginterrupt = -1
906
913
907 class interrupthandler(unpackermixin):
914 class interrupthandler(unpackermixin):
908 """read one part and process it with restricted capability
915 """read one part and process it with restricted capability
909
916
910 This allows to transmit exception raised on the producer size during part
917 This allows to transmit exception raised on the producer size during part
911 iteration while the consumer is reading a part.
918 iteration while the consumer is reading a part.
912
919
913 Part processed in this manner only have access to a ui object,"""
920 Part processed in this manner only have access to a ui object,"""
914
921
915 def __init__(self, ui, fp):
922 def __init__(self, ui, fp):
916 super(interrupthandler, self).__init__(fp)
923 super(interrupthandler, self).__init__(fp)
917 self.ui = ui
924 self.ui = ui
918
925
919 def _readpartheader(self):
926 def _readpartheader(self):
920 """reads a part header size and return the bytes blob
927 """reads a part header size and return the bytes blob
921
928
922 returns None if empty"""
929 returns None if empty"""
923 headersize = self._unpack(_fpartheadersize)[0]
930 headersize = self._unpack(_fpartheadersize)[0]
924 if headersize < 0:
931 if headersize < 0:
925 raise error.BundleValueError('negative part header size: %i'
932 raise error.BundleValueError('negative part header size: %i'
926 % headersize)
933 % headersize)
927 indebug(self.ui, 'part header size: %i\n' % headersize)
934 indebug(self.ui, 'part header size: %i\n' % headersize)
928 if headersize:
935 if headersize:
929 return self._readexact(headersize)
936 return self._readexact(headersize)
930 return None
937 return None
931
938
932 def __call__(self):
939 def __call__(self):
933
940
934 self.ui.debug('bundle2-input-stream-interrupt:'
941 self.ui.debug('bundle2-input-stream-interrupt:'
935 ' opening out of band context\n')
942 ' opening out of band context\n')
936 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
943 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
937 headerblock = self._readpartheader()
944 headerblock = self._readpartheader()
938 if headerblock is None:
945 if headerblock is None:
939 indebug(self.ui, 'no part found during interruption.')
946 indebug(self.ui, 'no part found during interruption.')
940 return
947 return
941 part = unbundlepart(self.ui, headerblock, self._fp)
948 part = unbundlepart(self.ui, headerblock, self._fp)
942 op = interruptoperation(self.ui)
949 op = interruptoperation(self.ui)
943 _processpart(op, part)
950 _processpart(op, part)
944 self.ui.debug('bundle2-input-stream-interrupt:'
951 self.ui.debug('bundle2-input-stream-interrupt:'
945 ' closing out of band context\n')
952 ' closing out of band context\n')
946
953
947 class interruptoperation(object):
954 class interruptoperation(object):
948 """A limited operation to be use by part handler during interruption
955 """A limited operation to be use by part handler during interruption
949
956
950 It only have access to an ui object.
957 It only have access to an ui object.
951 """
958 """
952
959
953 def __init__(self, ui):
960 def __init__(self, ui):
954 self.ui = ui
961 self.ui = ui
955 self.reply = None
962 self.reply = None
956 self.captureoutput = False
963 self.captureoutput = False
957
964
958 @property
965 @property
959 def repo(self):
966 def repo(self):
960 raise RuntimeError('no repo access from stream interruption')
967 raise RuntimeError('no repo access from stream interruption')
961
968
962 def gettransaction(self):
969 def gettransaction(self):
963 raise TransactionUnavailable('no repo access from stream interruption')
970 raise TransactionUnavailable('no repo access from stream interruption')
964
971
965 class unbundlepart(unpackermixin):
972 class unbundlepart(unpackermixin):
966 """a bundle part read from a bundle"""
973 """a bundle part read from a bundle"""
967
974
968 def __init__(self, ui, header, fp):
975 def __init__(self, ui, header, fp):
969 super(unbundlepart, self).__init__(fp)
976 super(unbundlepart, self).__init__(fp)
970 self.ui = ui
977 self.ui = ui
971 # unbundle state attr
978 # unbundle state attr
972 self._headerdata = header
979 self._headerdata = header
973 self._headeroffset = 0
980 self._headeroffset = 0
974 self._initialized = False
981 self._initialized = False
975 self.consumed = False
982 self.consumed = False
976 # part data
983 # part data
977 self.id = None
984 self.id = None
978 self.type = None
985 self.type = None
979 self.mandatoryparams = None
986 self.mandatoryparams = None
980 self.advisoryparams = None
987 self.advisoryparams = None
981 self.params = None
988 self.params = None
982 self.mandatorykeys = ()
989 self.mandatorykeys = ()
983 self._payloadstream = None
990 self._payloadstream = None
984 self._readheader()
991 self._readheader()
985 self._mandatory = None
992 self._mandatory = None
986 self._chunkindex = [] #(payload, file) position tuples for chunk starts
993 self._chunkindex = [] #(payload, file) position tuples for chunk starts
987 self._pos = 0
994 self._pos = 0
988
995
989 def _fromheader(self, size):
996 def _fromheader(self, size):
990 """return the next <size> byte from the header"""
997 """return the next <size> byte from the header"""
991 offset = self._headeroffset
998 offset = self._headeroffset
992 data = self._headerdata[offset:(offset + size)]
999 data = self._headerdata[offset:(offset + size)]
993 self._headeroffset = offset + size
1000 self._headeroffset = offset + size
994 return data
1001 return data
995
1002
996 def _unpackheader(self, format):
1003 def _unpackheader(self, format):
997 """read given format from header
1004 """read given format from header
998
1005
999 This automatically compute the size of the format to read."""
1006 This automatically compute the size of the format to read."""
1000 data = self._fromheader(struct.calcsize(format))
1007 data = self._fromheader(struct.calcsize(format))
1001 return _unpack(format, data)
1008 return _unpack(format, data)
1002
1009
1003 def _initparams(self, mandatoryparams, advisoryparams):
1010 def _initparams(self, mandatoryparams, advisoryparams):
1004 """internal function to setup all logic related parameters"""
1011 """internal function to setup all logic related parameters"""
1005 # make it read only to prevent people touching it by mistake.
1012 # make it read only to prevent people touching it by mistake.
1006 self.mandatoryparams = tuple(mandatoryparams)
1013 self.mandatoryparams = tuple(mandatoryparams)
1007 self.advisoryparams = tuple(advisoryparams)
1014 self.advisoryparams = tuple(advisoryparams)
1008 # user friendly UI
1015 # user friendly UI
1009 self.params = dict(self.mandatoryparams)
1016 self.params = dict(self.mandatoryparams)
1010 self.params.update(dict(self.advisoryparams))
1017 self.params.update(dict(self.advisoryparams))
1011 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1018 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1012
1019
1013 def _payloadchunks(self, chunknum=0):
1020 def _payloadchunks(self, chunknum=0):
1014 '''seek to specified chunk and start yielding data'''
1021 '''seek to specified chunk and start yielding data'''
1015 if len(self._chunkindex) == 0:
1022 if len(self._chunkindex) == 0:
1016 assert chunknum == 0, 'Must start with chunk 0'
1023 assert chunknum == 0, 'Must start with chunk 0'
1017 self._chunkindex.append((0, super(unbundlepart, self).tell()))
1024 self._chunkindex.append((0, super(unbundlepart, self).tell()))
1018 else:
1025 else:
1019 assert chunknum < len(self._chunkindex), \
1026 assert chunknum < len(self._chunkindex), \
1020 'Unknown chunk %d' % chunknum
1027 'Unknown chunk %d' % chunknum
1021 super(unbundlepart, self).seek(self._chunkindex[chunknum][1])
1028 super(unbundlepart, self).seek(self._chunkindex[chunknum][1])
1022
1029
1023 pos = self._chunkindex[chunknum][0]
1030 pos = self._chunkindex[chunknum][0]
1024 payloadsize = self._unpack(_fpayloadsize)[0]
1031 payloadsize = self._unpack(_fpayloadsize)[0]
1025 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1032 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1026 while payloadsize:
1033 while payloadsize:
1027 if payloadsize == flaginterrupt:
1034 if payloadsize == flaginterrupt:
1028 # interruption detection, the handler will now read a
1035 # interruption detection, the handler will now read a
1029 # single part and process it.
1036 # single part and process it.
1030 interrupthandler(self.ui, self._fp)()
1037 interrupthandler(self.ui, self._fp)()
1031 elif payloadsize < 0:
1038 elif payloadsize < 0:
1032 msg = 'negative payload chunk size: %i' % payloadsize
1039 msg = 'negative payload chunk size: %i' % payloadsize
1033 raise error.BundleValueError(msg)
1040 raise error.BundleValueError(msg)
1034 else:
1041 else:
1035 result = self._readexact(payloadsize)
1042 result = self._readexact(payloadsize)
1036 chunknum += 1
1043 chunknum += 1
1037 pos += payloadsize
1044 pos += payloadsize
1038 if chunknum == len(self._chunkindex):
1045 if chunknum == len(self._chunkindex):
1039 self._chunkindex.append((pos,
1046 self._chunkindex.append((pos,
1040 super(unbundlepart, self).tell()))
1047 super(unbundlepart, self).tell()))
1041 yield result
1048 yield result
1042 payloadsize = self._unpack(_fpayloadsize)[0]
1049 payloadsize = self._unpack(_fpayloadsize)[0]
1043 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1050 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1044
1051
1045 def _findchunk(self, pos):
1052 def _findchunk(self, pos):
1046 '''for a given payload position, return a chunk number and offset'''
1053 '''for a given payload position, return a chunk number and offset'''
1047 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1054 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1048 if ppos == pos:
1055 if ppos == pos:
1049 return chunk, 0
1056 return chunk, 0
1050 elif ppos > pos:
1057 elif ppos > pos:
1051 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1058 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1052 raise ValueError('Unknown chunk')
1059 raise ValueError('Unknown chunk')
1053
1060
1054 def _readheader(self):
1061 def _readheader(self):
1055 """read the header and setup the object"""
1062 """read the header and setup the object"""
1056 typesize = self._unpackheader(_fparttypesize)[0]
1063 typesize = self._unpackheader(_fparttypesize)[0]
1057 self.type = self._fromheader(typesize)
1064 self.type = self._fromheader(typesize)
1058 indebug(self.ui, 'part type: "%s"' % self.type)
1065 indebug(self.ui, 'part type: "%s"' % self.type)
1059 self.id = self._unpackheader(_fpartid)[0]
1066 self.id = self._unpackheader(_fpartid)[0]
1060 indebug(self.ui, 'part id: "%s"' % self.id)
1067 indebug(self.ui, 'part id: "%s"' % self.id)
1061 # extract mandatory bit from type
1068 # extract mandatory bit from type
1062 self.mandatory = (self.type != self.type.lower())
1069 self.mandatory = (self.type != self.type.lower())
1063 self.type = self.type.lower()
1070 self.type = self.type.lower()
1064 ## reading parameters
1071 ## reading parameters
1065 # param count
1072 # param count
1066 mancount, advcount = self._unpackheader(_fpartparamcount)
1073 mancount, advcount = self._unpackheader(_fpartparamcount)
1067 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1074 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1068 # param size
1075 # param size
1069 fparamsizes = _makefpartparamsizes(mancount + advcount)
1076 fparamsizes = _makefpartparamsizes(mancount + advcount)
1070 paramsizes = self._unpackheader(fparamsizes)
1077 paramsizes = self._unpackheader(fparamsizes)
1071 # make it a list of couple again
1078 # make it a list of couple again
1072 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
1079 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
1073 # split mandatory from advisory
1080 # split mandatory from advisory
1074 mansizes = paramsizes[:mancount]
1081 mansizes = paramsizes[:mancount]
1075 advsizes = paramsizes[mancount:]
1082 advsizes = paramsizes[mancount:]
1076 # retrieve param value
1083 # retrieve param value
1077 manparams = []
1084 manparams = []
1078 for key, value in mansizes:
1085 for key, value in mansizes:
1079 manparams.append((self._fromheader(key), self._fromheader(value)))
1086 manparams.append((self._fromheader(key), self._fromheader(value)))
1080 advparams = []
1087 advparams = []
1081 for key, value in advsizes:
1088 for key, value in advsizes:
1082 advparams.append((self._fromheader(key), self._fromheader(value)))
1089 advparams.append((self._fromheader(key), self._fromheader(value)))
1083 self._initparams(manparams, advparams)
1090 self._initparams(manparams, advparams)
1084 ## part payload
1091 ## part payload
1085 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1092 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1086 # we read the data, tell it
1093 # we read the data, tell it
1087 self._initialized = True
1094 self._initialized = True
1088
1095
1089 def read(self, size=None):
1096 def read(self, size=None):
1090 """read payload data"""
1097 """read payload data"""
1091 if not self._initialized:
1098 if not self._initialized:
1092 self._readheader()
1099 self._readheader()
1093 if size is None:
1100 if size is None:
1094 data = self._payloadstream.read()
1101 data = self._payloadstream.read()
1095 else:
1102 else:
1096 data = self._payloadstream.read(size)
1103 data = self._payloadstream.read(size)
1097 self._pos += len(data)
1104 self._pos += len(data)
1098 if size is None or len(data) < size:
1105 if size is None or len(data) < size:
1099 if not self.consumed and self._pos:
1106 if not self.consumed and self._pos:
1100 self.ui.debug('bundle2-input-part: total payload size %i\n'
1107 self.ui.debug('bundle2-input-part: total payload size %i\n'
1101 % self._pos)
1108 % self._pos)
1102 self.consumed = True
1109 self.consumed = True
1103 return data
1110 return data
1104
1111
1105 def tell(self):
1112 def tell(self):
1106 return self._pos
1113 return self._pos
1107
1114
1108 def seek(self, offset, whence=0):
1115 def seek(self, offset, whence=0):
1109 if whence == 0:
1116 if whence == 0:
1110 newpos = offset
1117 newpos = offset
1111 elif whence == 1:
1118 elif whence == 1:
1112 newpos = self._pos + offset
1119 newpos = self._pos + offset
1113 elif whence == 2:
1120 elif whence == 2:
1114 if not self.consumed:
1121 if not self.consumed:
1115 self.read()
1122 self.read()
1116 newpos = self._chunkindex[-1][0] - offset
1123 newpos = self._chunkindex[-1][0] - offset
1117 else:
1124 else:
1118 raise ValueError('Unknown whence value: %r' % (whence,))
1125 raise ValueError('Unknown whence value: %r' % (whence,))
1119
1126
1120 if newpos > self._chunkindex[-1][0] and not self.consumed:
1127 if newpos > self._chunkindex[-1][0] and not self.consumed:
1121 self.read()
1128 self.read()
1122 if not 0 <= newpos <= self._chunkindex[-1][0]:
1129 if not 0 <= newpos <= self._chunkindex[-1][0]:
1123 raise ValueError('Offset out of range')
1130 raise ValueError('Offset out of range')
1124
1131
1125 if self._pos != newpos:
1132 if self._pos != newpos:
1126 chunk, internaloffset = self._findchunk(newpos)
1133 chunk, internaloffset = self._findchunk(newpos)
1127 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1134 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1128 adjust = self.read(internaloffset)
1135 adjust = self.read(internaloffset)
1129 if len(adjust) != internaloffset:
1136 if len(adjust) != internaloffset:
1130 raise util.Abort(_('Seek failed\n'))
1137 raise util.Abort(_('Seek failed\n'))
1131 self._pos = newpos
1138 self._pos = newpos
1132
1139
1133 # These are only the static capabilities.
1140 # These are only the static capabilities.
1134 # Check the 'getrepocaps' function for the rest.
1141 # Check the 'getrepocaps' function for the rest.
1135 capabilities = {'HG20': (),
1142 capabilities = {'HG20': (),
1136 'error': ('abort', 'unsupportedcontent', 'pushraced',
1143 'error': ('abort', 'unsupportedcontent', 'pushraced',
1137 'pushkey'),
1144 'pushkey'),
1138 'listkeys': (),
1145 'listkeys': (),
1139 'pushkey': (),
1146 'pushkey': (),
1140 'digests': tuple(sorted(util.DIGESTS.keys())),
1147 'digests': tuple(sorted(util.DIGESTS.keys())),
1141 'remote-changegroup': ('http', 'https'),
1148 'remote-changegroup': ('http', 'https'),
1142 'hgtagsfnodes': (),
1149 'hgtagsfnodes': (),
1143 }
1150 }
1144
1151
1145 def getrepocaps(repo, allowpushback=False):
1152 def getrepocaps(repo, allowpushback=False):
1146 """return the bundle2 capabilities for a given repo
1153 """return the bundle2 capabilities for a given repo
1147
1154
1148 Exists to allow extensions (like evolution) to mutate the capabilities.
1155 Exists to allow extensions (like evolution) to mutate the capabilities.
1149 """
1156 """
1150 caps = capabilities.copy()
1157 caps = capabilities.copy()
1151 caps['changegroup'] = tuple(sorted(changegroup.packermap.keys()))
1158 caps['changegroup'] = tuple(sorted(changegroup.packermap.keys()))
1152 if obsolete.isenabled(repo, obsolete.exchangeopt):
1159 if obsolete.isenabled(repo, obsolete.exchangeopt):
1153 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1160 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1154 caps['obsmarkers'] = supportedformat
1161 caps['obsmarkers'] = supportedformat
1155 if allowpushback:
1162 if allowpushback:
1156 caps['pushback'] = ()
1163 caps['pushback'] = ()
1157 return caps
1164 return caps
1158
1165
1159 def bundle2caps(remote):
1166 def bundle2caps(remote):
1160 """return the bundle capabilities of a peer as dict"""
1167 """return the bundle capabilities of a peer as dict"""
1161 raw = remote.capable('bundle2')
1168 raw = remote.capable('bundle2')
1162 if not raw and raw != '':
1169 if not raw and raw != '':
1163 return {}
1170 return {}
1164 capsblob = urllib.unquote(remote.capable('bundle2'))
1171 capsblob = urllib.unquote(remote.capable('bundle2'))
1165 return decodecaps(capsblob)
1172 return decodecaps(capsblob)
1166
1173
1167 def obsmarkersversion(caps):
1174 def obsmarkersversion(caps):
1168 """extract the list of supported obsmarkers versions from a bundle2caps dict
1175 """extract the list of supported obsmarkers versions from a bundle2caps dict
1169 """
1176 """
1170 obscaps = caps.get('obsmarkers', ())
1177 obscaps = caps.get('obsmarkers', ())
1171 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1178 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1172
1179
1173 @parthandler('changegroup', ('version', 'nbchanges'))
1180 @parthandler('changegroup', ('version', 'nbchanges'))
1174 def handlechangegroup(op, inpart):
1181 def handlechangegroup(op, inpart):
1175 """apply a changegroup part on the repo
1182 """apply a changegroup part on the repo
1176
1183
1177 This is a very early implementation that will massive rework before being
1184 This is a very early implementation that will massive rework before being
1178 inflicted to any end-user.
1185 inflicted to any end-user.
1179 """
1186 """
1180 # Make sure we trigger a transaction creation
1187 # Make sure we trigger a transaction creation
1181 #
1188 #
1182 # The addchangegroup function will get a transaction object by itself, but
1189 # The addchangegroup function will get a transaction object by itself, but
1183 # we need to make sure we trigger the creation of a transaction object used
1190 # we need to make sure we trigger the creation of a transaction object used
1184 # for the whole processing scope.
1191 # for the whole processing scope.
1185 op.gettransaction()
1192 op.gettransaction()
1186 unpackerversion = inpart.params.get('version', '01')
1193 unpackerversion = inpart.params.get('version', '01')
1187 # We should raise an appropriate exception here
1194 # We should raise an appropriate exception here
1188 unpacker = changegroup.packermap[unpackerversion][1]
1195 unpacker = changegroup.packermap[unpackerversion][1]
1189 cg = unpacker(inpart, None)
1196 cg = unpacker(inpart, None)
1190 # the source and url passed here are overwritten by the one contained in
1197 # the source and url passed here are overwritten by the one contained in
1191 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1198 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1192 nbchangesets = None
1199 nbchangesets = None
1193 if 'nbchanges' in inpart.params:
1200 if 'nbchanges' in inpart.params:
1194 nbchangesets = int(inpart.params.get('nbchanges'))
1201 nbchangesets = int(inpart.params.get('nbchanges'))
1195 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2',
1202 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2',
1196 expectedtotal=nbchangesets)
1203 expectedtotal=nbchangesets)
1197 op.records.add('changegroup', {'return': ret})
1204 op.records.add('changegroup', {'return': ret})
1198 if op.reply is not None:
1205 if op.reply is not None:
1199 # This is definitely not the final form of this
1206 # This is definitely not the final form of this
1200 # return. But one need to start somewhere.
1207 # return. But one need to start somewhere.
1201 part = op.reply.newpart('reply:changegroup', mandatory=False)
1208 part = op.reply.newpart('reply:changegroup', mandatory=False)
1202 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1209 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1203 part.addparam('return', '%i' % ret, mandatory=False)
1210 part.addparam('return', '%i' % ret, mandatory=False)
1204 assert not inpart.read()
1211 assert not inpart.read()
1205
1212
1206 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1213 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1207 ['digest:%s' % k for k in util.DIGESTS.keys()])
1214 ['digest:%s' % k for k in util.DIGESTS.keys()])
1208 @parthandler('remote-changegroup', _remotechangegroupparams)
1215 @parthandler('remote-changegroup', _remotechangegroupparams)
1209 def handleremotechangegroup(op, inpart):
1216 def handleremotechangegroup(op, inpart):
1210 """apply a bundle10 on the repo, given an url and validation information
1217 """apply a bundle10 on the repo, given an url and validation information
1211
1218
1212 All the information about the remote bundle to import are given as
1219 All the information about the remote bundle to import are given as
1213 parameters. The parameters include:
1220 parameters. The parameters include:
1214 - url: the url to the bundle10.
1221 - url: the url to the bundle10.
1215 - size: the bundle10 file size. It is used to validate what was
1222 - size: the bundle10 file size. It is used to validate what was
1216 retrieved by the client matches the server knowledge about the bundle.
1223 retrieved by the client matches the server knowledge about the bundle.
1217 - digests: a space separated list of the digest types provided as
1224 - digests: a space separated list of the digest types provided as
1218 parameters.
1225 parameters.
1219 - digest:<digest-type>: the hexadecimal representation of the digest with
1226 - digest:<digest-type>: the hexadecimal representation of the digest with
1220 that name. Like the size, it is used to validate what was retrieved by
1227 that name. Like the size, it is used to validate what was retrieved by
1221 the client matches what the server knows about the bundle.
1228 the client matches what the server knows about the bundle.
1222
1229
1223 When multiple digest types are given, all of them are checked.
1230 When multiple digest types are given, all of them are checked.
1224 """
1231 """
1225 try:
1232 try:
1226 raw_url = inpart.params['url']
1233 raw_url = inpart.params['url']
1227 except KeyError:
1234 except KeyError:
1228 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1235 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1229 parsed_url = util.url(raw_url)
1236 parsed_url = util.url(raw_url)
1230 if parsed_url.scheme not in capabilities['remote-changegroup']:
1237 if parsed_url.scheme not in capabilities['remote-changegroup']:
1231 raise util.Abort(_('remote-changegroup does not support %s urls') %
1238 raise util.Abort(_('remote-changegroup does not support %s urls') %
1232 parsed_url.scheme)
1239 parsed_url.scheme)
1233
1240
1234 try:
1241 try:
1235 size = int(inpart.params['size'])
1242 size = int(inpart.params['size'])
1236 except ValueError:
1243 except ValueError:
1237 raise util.Abort(_('remote-changegroup: invalid value for param "%s"')
1244 raise util.Abort(_('remote-changegroup: invalid value for param "%s"')
1238 % 'size')
1245 % 'size')
1239 except KeyError:
1246 except KeyError:
1240 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1247 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1241
1248
1242 digests = {}
1249 digests = {}
1243 for typ in inpart.params.get('digests', '').split():
1250 for typ in inpart.params.get('digests', '').split():
1244 param = 'digest:%s' % typ
1251 param = 'digest:%s' % typ
1245 try:
1252 try:
1246 value = inpart.params[param]
1253 value = inpart.params[param]
1247 except KeyError:
1254 except KeyError:
1248 raise util.Abort(_('remote-changegroup: missing "%s" param') %
1255 raise util.Abort(_('remote-changegroup: missing "%s" param') %
1249 param)
1256 param)
1250 digests[typ] = value
1257 digests[typ] = value
1251
1258
1252 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1259 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1253
1260
1254 # Make sure we trigger a transaction creation
1261 # Make sure we trigger a transaction creation
1255 #
1262 #
1256 # The addchangegroup function will get a transaction object by itself, but
1263 # The addchangegroup function will get a transaction object by itself, but
1257 # we need to make sure we trigger the creation of a transaction object used
1264 # we need to make sure we trigger the creation of a transaction object used
1258 # for the whole processing scope.
1265 # for the whole processing scope.
1259 op.gettransaction()
1266 op.gettransaction()
1260 from . import exchange
1267 from . import exchange
1261 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1268 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1262 if not isinstance(cg, changegroup.cg1unpacker):
1269 if not isinstance(cg, changegroup.cg1unpacker):
1263 raise util.Abort(_('%s: not a bundle version 1.0') %
1270 raise util.Abort(_('%s: not a bundle version 1.0') %
1264 util.hidepassword(raw_url))
1271 util.hidepassword(raw_url))
1265 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
1272 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
1266 op.records.add('changegroup', {'return': ret})
1273 op.records.add('changegroup', {'return': ret})
1267 if op.reply is not None:
1274 if op.reply is not None:
1268 # This is definitely not the final form of this
1275 # This is definitely not the final form of this
1269 # return. But one need to start somewhere.
1276 # return. But one need to start somewhere.
1270 part = op.reply.newpart('reply:changegroup')
1277 part = op.reply.newpart('reply:changegroup')
1271 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1278 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1272 part.addparam('return', '%i' % ret, mandatory=False)
1279 part.addparam('return', '%i' % ret, mandatory=False)
1273 try:
1280 try:
1274 real_part.validate()
1281 real_part.validate()
1275 except util.Abort as e:
1282 except util.Abort as e:
1276 raise util.Abort(_('bundle at %s is corrupted:\n%s') %
1283 raise util.Abort(_('bundle at %s is corrupted:\n%s') %
1277 (util.hidepassword(raw_url), str(e)))
1284 (util.hidepassword(raw_url), str(e)))
1278 assert not inpart.read()
1285 assert not inpart.read()
1279
1286
1280 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1287 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1281 def handlereplychangegroup(op, inpart):
1288 def handlereplychangegroup(op, inpart):
1282 ret = int(inpart.params['return'])
1289 ret = int(inpart.params['return'])
1283 replyto = int(inpart.params['in-reply-to'])
1290 replyto = int(inpart.params['in-reply-to'])
1284 op.records.add('changegroup', {'return': ret}, replyto)
1291 op.records.add('changegroup', {'return': ret}, replyto)
1285
1292
1286 @parthandler('check:heads')
1293 @parthandler('check:heads')
1287 def handlecheckheads(op, inpart):
1294 def handlecheckheads(op, inpart):
1288 """check that head of the repo did not change
1295 """check that head of the repo did not change
1289
1296
1290 This is used to detect a push race when using unbundle.
1297 This is used to detect a push race when using unbundle.
1291 This replaces the "heads" argument of unbundle."""
1298 This replaces the "heads" argument of unbundle."""
1292 h = inpart.read(20)
1299 h = inpart.read(20)
1293 heads = []
1300 heads = []
1294 while len(h) == 20:
1301 while len(h) == 20:
1295 heads.append(h)
1302 heads.append(h)
1296 h = inpart.read(20)
1303 h = inpart.read(20)
1297 assert not h
1304 assert not h
1298 if heads != op.repo.heads():
1305 if heads != op.repo.heads():
1299 raise error.PushRaced('repository changed while pushing - '
1306 raise error.PushRaced('repository changed while pushing - '
1300 'please try again')
1307 'please try again')
1301
1308
1302 @parthandler('output')
1309 @parthandler('output')
1303 def handleoutput(op, inpart):
1310 def handleoutput(op, inpart):
1304 """forward output captured on the server to the client"""
1311 """forward output captured on the server to the client"""
1305 for line in inpart.read().splitlines():
1312 for line in inpart.read().splitlines():
1306 op.ui.status(('remote: %s\n' % line))
1313 op.ui.status(('remote: %s\n' % line))
1307
1314
1308 @parthandler('replycaps')
1315 @parthandler('replycaps')
1309 def handlereplycaps(op, inpart):
1316 def handlereplycaps(op, inpart):
1310 """Notify that a reply bundle should be created
1317 """Notify that a reply bundle should be created
1311
1318
1312 The payload contains the capabilities information for the reply"""
1319 The payload contains the capabilities information for the reply"""
1313 caps = decodecaps(inpart.read())
1320 caps = decodecaps(inpart.read())
1314 if op.reply is None:
1321 if op.reply is None:
1315 op.reply = bundle20(op.ui, caps)
1322 op.reply = bundle20(op.ui, caps)
1316
1323
1317 @parthandler('error:abort', ('message', 'hint'))
1324 @parthandler('error:abort', ('message', 'hint'))
1318 def handleerrorabort(op, inpart):
1325 def handleerrorabort(op, inpart):
1319 """Used to transmit abort error over the wire"""
1326 """Used to transmit abort error over the wire"""
1320 raise util.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
1327 raise util.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
1321
1328
1322 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1329 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1323 'in-reply-to'))
1330 'in-reply-to'))
1324 def handleerrorpushkey(op, inpart):
1331 def handleerrorpushkey(op, inpart):
1325 """Used to transmit failure of a mandatory pushkey over the wire"""
1332 """Used to transmit failure of a mandatory pushkey over the wire"""
1326 kwargs = {}
1333 kwargs = {}
1327 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1334 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1328 value = inpart.params.get(name)
1335 value = inpart.params.get(name)
1329 if value is not None:
1336 if value is not None:
1330 kwargs[name] = value
1337 kwargs[name] = value
1331 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1338 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1332
1339
1333 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1340 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1334 def handleerrorunsupportedcontent(op, inpart):
1341 def handleerrorunsupportedcontent(op, inpart):
1335 """Used to transmit unknown content error over the wire"""
1342 """Used to transmit unknown content error over the wire"""
1336 kwargs = {}
1343 kwargs = {}
1337 parttype = inpart.params.get('parttype')
1344 parttype = inpart.params.get('parttype')
1338 if parttype is not None:
1345 if parttype is not None:
1339 kwargs['parttype'] = parttype
1346 kwargs['parttype'] = parttype
1340 params = inpart.params.get('params')
1347 params = inpart.params.get('params')
1341 if params is not None:
1348 if params is not None:
1342 kwargs['params'] = params.split('\0')
1349 kwargs['params'] = params.split('\0')
1343
1350
1344 raise error.BundleUnknownFeatureError(**kwargs)
1351 raise error.BundleUnknownFeatureError(**kwargs)
1345
1352
1346 @parthandler('error:pushraced', ('message',))
1353 @parthandler('error:pushraced', ('message',))
1347 def handleerrorpushraced(op, inpart):
1354 def handleerrorpushraced(op, inpart):
1348 """Used to transmit push race error over the wire"""
1355 """Used to transmit push race error over the wire"""
1349 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1356 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1350
1357
1351 @parthandler('listkeys', ('namespace',))
1358 @parthandler('listkeys', ('namespace',))
1352 def handlelistkeys(op, inpart):
1359 def handlelistkeys(op, inpart):
1353 """retrieve pushkey namespace content stored in a bundle2"""
1360 """retrieve pushkey namespace content stored in a bundle2"""
1354 namespace = inpart.params['namespace']
1361 namespace = inpart.params['namespace']
1355 r = pushkey.decodekeys(inpart.read())
1362 r = pushkey.decodekeys(inpart.read())
1356 op.records.add('listkeys', (namespace, r))
1363 op.records.add('listkeys', (namespace, r))
1357
1364
1358 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1365 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1359 def handlepushkey(op, inpart):
1366 def handlepushkey(op, inpart):
1360 """process a pushkey request"""
1367 """process a pushkey request"""
1361 dec = pushkey.decode
1368 dec = pushkey.decode
1362 namespace = dec(inpart.params['namespace'])
1369 namespace = dec(inpart.params['namespace'])
1363 key = dec(inpart.params['key'])
1370 key = dec(inpart.params['key'])
1364 old = dec(inpart.params['old'])
1371 old = dec(inpart.params['old'])
1365 new = dec(inpart.params['new'])
1372 new = dec(inpart.params['new'])
1366 ret = op.repo.pushkey(namespace, key, old, new)
1373 ret = op.repo.pushkey(namespace, key, old, new)
1367 record = {'namespace': namespace,
1374 record = {'namespace': namespace,
1368 'key': key,
1375 'key': key,
1369 'old': old,
1376 'old': old,
1370 'new': new}
1377 'new': new}
1371 op.records.add('pushkey', record)
1378 op.records.add('pushkey', record)
1372 if op.reply is not None:
1379 if op.reply is not None:
1373 rpart = op.reply.newpart('reply:pushkey')
1380 rpart = op.reply.newpart('reply:pushkey')
1374 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1381 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1375 rpart.addparam('return', '%i' % ret, mandatory=False)
1382 rpart.addparam('return', '%i' % ret, mandatory=False)
1376 if inpart.mandatory and not ret:
1383 if inpart.mandatory and not ret:
1377 kwargs = {}
1384 kwargs = {}
1378 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1385 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1379 if key in inpart.params:
1386 if key in inpart.params:
1380 kwargs[key] = inpart.params[key]
1387 kwargs[key] = inpart.params[key]
1381 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1388 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1382
1389
1383 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1390 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1384 def handlepushkeyreply(op, inpart):
1391 def handlepushkeyreply(op, inpart):
1385 """retrieve the result of a pushkey request"""
1392 """retrieve the result of a pushkey request"""
1386 ret = int(inpart.params['return'])
1393 ret = int(inpart.params['return'])
1387 partid = int(inpart.params['in-reply-to'])
1394 partid = int(inpart.params['in-reply-to'])
1388 op.records.add('pushkey', {'return': ret}, partid)
1395 op.records.add('pushkey', {'return': ret}, partid)
1389
1396
1390 @parthandler('obsmarkers')
1397 @parthandler('obsmarkers')
1391 def handleobsmarker(op, inpart):
1398 def handleobsmarker(op, inpart):
1392 """add a stream of obsmarkers to the repo"""
1399 """add a stream of obsmarkers to the repo"""
1393 tr = op.gettransaction()
1400 tr = op.gettransaction()
1394 markerdata = inpart.read()
1401 markerdata = inpart.read()
1395 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1402 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1396 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1403 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1397 % len(markerdata))
1404 % len(markerdata))
1398 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1405 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1399 if new:
1406 if new:
1400 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1407 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1401 op.records.add('obsmarkers', {'new': new})
1408 op.records.add('obsmarkers', {'new': new})
1402 if op.reply is not None:
1409 if op.reply is not None:
1403 rpart = op.reply.newpart('reply:obsmarkers')
1410 rpart = op.reply.newpart('reply:obsmarkers')
1404 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1411 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1405 rpart.addparam('new', '%i' % new, mandatory=False)
1412 rpart.addparam('new', '%i' % new, mandatory=False)
1406
1413
1407
1414
1408 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1415 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1409 def handleobsmarkerreply(op, inpart):
1416 def handleobsmarkerreply(op, inpart):
1410 """retrieve the result of a pushkey request"""
1417 """retrieve the result of a pushkey request"""
1411 ret = int(inpart.params['new'])
1418 ret = int(inpart.params['new'])
1412 partid = int(inpart.params['in-reply-to'])
1419 partid = int(inpart.params['in-reply-to'])
1413 op.records.add('obsmarkers', {'new': ret}, partid)
1420 op.records.add('obsmarkers', {'new': ret}, partid)
1414
1421
1415 @parthandler('hgtagsfnodes')
1422 @parthandler('hgtagsfnodes')
1416 def handlehgtagsfnodes(op, inpart):
1423 def handlehgtagsfnodes(op, inpart):
1417 """Applies .hgtags fnodes cache entries to the local repo.
1424 """Applies .hgtags fnodes cache entries to the local repo.
1418
1425
1419 Payload is pairs of 20 byte changeset nodes and filenodes.
1426 Payload is pairs of 20 byte changeset nodes and filenodes.
1420 """
1427 """
1421 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1428 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1422
1429
1423 count = 0
1430 count = 0
1424 while True:
1431 while True:
1425 node = inpart.read(20)
1432 node = inpart.read(20)
1426 fnode = inpart.read(20)
1433 fnode = inpart.read(20)
1427 if len(node) < 20 or len(fnode) < 20:
1434 if len(node) < 20 or len(fnode) < 20:
1428 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1435 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1429 break
1436 break
1430 cache.setfnode(node, fnode)
1437 cache.setfnode(node, fnode)
1431 count += 1
1438 count += 1
1432
1439
1433 cache.write()
1440 cache.write()
1434 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
1441 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
General Comments 0
You need to be logged in to leave comments. Login now