##// END OF EJS Templates
bundle2: make header reading optional...
Pierre-Yves David -
r21066:5ecfe76d default
parent child Browse files
Show More
@@ -1,673 +1,675
1 # bundle2.py - generic container format to transmit arbitrary data.
1 # bundle2.py - generic container format to transmit arbitrary data.
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7 """Handling of the new bundle2 format
7 """Handling of the new bundle2 format
8
8
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 payloads in an application agnostic way. It consist in a sequence of "parts"
10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 that will be handed to and processed by the application layer.
11 that will be handed to and processed by the application layer.
12
12
13
13
14 General format architecture
14 General format architecture
15 ===========================
15 ===========================
16
16
17 The format is architectured as follow
17 The format is architectured as follow
18
18
19 - magic string
19 - magic string
20 - stream level parameters
20 - stream level parameters
21 - payload parts (any number)
21 - payload parts (any number)
22 - end of stream marker.
22 - end of stream marker.
23
23
24 the Binary format
24 the Binary format
25 ============================
25 ============================
26
26
27 All numbers are unsigned and big-endian.
27 All numbers are unsigned and big-endian.
28
28
29 stream level parameters
29 stream level parameters
30 ------------------------
30 ------------------------
31
31
32 Binary format is as follow
32 Binary format is as follow
33
33
34 :params size: (16 bits integer)
34 :params size: (16 bits integer)
35
35
36 The total number of Bytes used by the parameters
36 The total number of Bytes used by the parameters
37
37
38 :params value: arbitrary number of Bytes
38 :params value: arbitrary number of Bytes
39
39
40 A blob of `params size` containing the serialized version of all stream level
40 A blob of `params size` containing the serialized version of all stream level
41 parameters.
41 parameters.
42
42
43 The blob contains a space separated list of parameters. Parameters with value
43 The blob contains a space separated list of parameters. Parameters with value
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45
45
46 Empty name are obviously forbidden.
46 Empty name are obviously forbidden.
47
47
48 Name MUST start with a letter. If this first letter is lower case, the
48 Name MUST start with a letter. If this first letter is lower case, the
49 parameter is advisory and can be safely ignored. However when the first
49 parameter is advisory and can be safely ignored. However when the first
50 letter is capital, the parameter is mandatory and the bundling process MUST
50 letter is capital, the parameter is mandatory and the bundling process MUST
51 stop if he is not able to proceed it.
51 stop if he is not able to proceed it.
52
52
53 Stream parameters use a simple textual format for two main reasons:
53 Stream parameters use a simple textual format for two main reasons:
54
54
55 - Stream level parameters should remain simple and we want to discourage any
55 - Stream level parameters should remain simple and we want to discourage any
56 crazy usage.
56 crazy usage.
57 - Textual data allow easy human inspection of a bundle2 header in case of
57 - Textual data allow easy human inspection of a bundle2 header in case of
58 troubles.
58 troubles.
59
59
60 Any Applicative level options MUST go into a bundle2 part instead.
60 Any Applicative level options MUST go into a bundle2 part instead.
61
61
62 Payload part
62 Payload part
63 ------------------------
63 ------------------------
64
64
65 Binary format is as follow
65 Binary format is as follow
66
66
67 :header size: (16 bits inter)
67 :header size: (16 bits inter)
68
68
69 The total number of Bytes used by the part headers. When the header is empty
69 The total number of Bytes used by the part headers. When the header is empty
70 (size = 0) this is interpreted as the end of stream marker.
70 (size = 0) this is interpreted as the end of stream marker.
71
71
72 :header:
72 :header:
73
73
74 The header defines how to interpret the part. It contains two piece of
74 The header defines how to interpret the part. It contains two piece of
75 data: the part type, and the part parameters.
75 data: the part type, and the part parameters.
76
76
77 The part type is used to route an application level handler, that can
77 The part type is used to route an application level handler, that can
78 interpret payload.
78 interpret payload.
79
79
80 Part parameters are passed to the application level handler. They are
80 Part parameters are passed to the application level handler. They are
81 meant to convey information that will help the application level object to
81 meant to convey information that will help the application level object to
82 interpret the part payload.
82 interpret the part payload.
83
83
84 The binary format of the header is has follow
84 The binary format of the header is has follow
85
85
86 :typesize: (one byte)
86 :typesize: (one byte)
87
87
88 :parttype: alphanumerical part name
88 :parttype: alphanumerical part name
89
89
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 to this part.
91 to this part.
92
92
93 :parameters:
93 :parameters:
94
94
95 Part's parameter may have arbitrary content, the binary structure is::
95 Part's parameter may have arbitrary content, the binary structure is::
96
96
97 <mandatory-count><advisory-count><param-sizes><param-data>
97 <mandatory-count><advisory-count><param-sizes><param-data>
98
98
99 :mandatory-count: 1 byte, number of mandatory parameters
99 :mandatory-count: 1 byte, number of mandatory parameters
100
100
101 :advisory-count: 1 byte, number of advisory parameters
101 :advisory-count: 1 byte, number of advisory parameters
102
102
103 :param-sizes:
103 :param-sizes:
104
104
105 N couple of bytes, where N is the total number of parameters. Each
105 N couple of bytes, where N is the total number of parameters. Each
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107
107
108 :param-data:
108 :param-data:
109
109
110 A blob of bytes from which each parameter key and value can be
110 A blob of bytes from which each parameter key and value can be
111 retrieved using the list of size couples stored in the previous
111 retrieved using the list of size couples stored in the previous
112 field.
112 field.
113
113
114 Mandatory parameters comes first, then the advisory ones.
114 Mandatory parameters comes first, then the advisory ones.
115
115
116 :payload:
116 :payload:
117
117
118 payload is a series of `<chunksize><chunkdata>`.
118 payload is a series of `<chunksize><chunkdata>`.
119
119
120 `chunksize` is a 32 bits integer, `chunkdata` are plain bytes (as much as
120 `chunksize` is a 32 bits integer, `chunkdata` are plain bytes (as much as
121 `chunksize` says)` The payload part is concluded by a zero size chunk.
121 `chunksize` says)` The payload part is concluded by a zero size chunk.
122
122
123 The current implementation always produces either zero or one chunk.
123 The current implementation always produces either zero or one chunk.
124 This is an implementation limitation that will ultimately be lifted.
124 This is an implementation limitation that will ultimately be lifted.
125
125
126 Bundle processing
126 Bundle processing
127 ============================
127 ============================
128
128
129 Each part is processed in order using a "part handler". Handler are registered
129 Each part is processed in order using a "part handler". Handler are registered
130 for a certain part type.
130 for a certain part type.
131
131
132 The matching of a part to its handler is case insensitive. The case of the
132 The matching of a part to its handler is case insensitive. The case of the
133 part type is used to know if a part is mandatory or advisory. If the Part type
133 part type is used to know if a part is mandatory or advisory. If the Part type
134 contains any uppercase char it is considered mandatory. When no handler is
134 contains any uppercase char it is considered mandatory. When no handler is
135 known for a Mandatory part, the process is aborted and an exception is raised.
135 known for a Mandatory part, the process is aborted and an exception is raised.
136 If the part is advisory and no handler is known, the part is ignored. When the
136 If the part is advisory and no handler is known, the part is ignored. When the
137 process is aborted, the full bundle is still read from the stream to keep the
137 process is aborted, the full bundle is still read from the stream to keep the
138 channel usable. But none of the part read from an abort are processed. In the
138 channel usable. But none of the part read from an abort are processed. In the
139 future, dropping the stream may become an option for channel we do not care to
139 future, dropping the stream may become an option for channel we do not care to
140 preserve.
140 preserve.
141 """
141 """
142
142
143 import util
143 import util
144 import struct
144 import struct
145 import urllib
145 import urllib
146 import string
146 import string
147
147
148 import changegroup
148 import changegroup
149 from i18n import _
149 from i18n import _
150
150
151 _pack = struct.pack
151 _pack = struct.pack
152 _unpack = struct.unpack
152 _unpack = struct.unpack
153
153
154 _magicstring = 'HG20'
154 _magicstring = 'HG20'
155
155
156 _fstreamparamsize = '>H'
156 _fstreamparamsize = '>H'
157 _fpartheadersize = '>H'
157 _fpartheadersize = '>H'
158 _fparttypesize = '>B'
158 _fparttypesize = '>B'
159 _fpartid = '>I'
159 _fpartid = '>I'
160 _fpayloadsize = '>I'
160 _fpayloadsize = '>I'
161 _fpartparamcount = '>BB'
161 _fpartparamcount = '>BB'
162
162
163 preferedchunksize = 4096
163 preferedchunksize = 4096
164
164
165 def _makefpartparamsizes(nbparams):
165 def _makefpartparamsizes(nbparams):
166 """return a struct format to read part parameter sizes
166 """return a struct format to read part parameter sizes
167
167
168 The number parameters is variable so we need to build that format
168 The number parameters is variable so we need to build that format
169 dynamically.
169 dynamically.
170 """
170 """
171 return '>'+('BB'*nbparams)
171 return '>'+('BB'*nbparams)
172
172
173 parthandlermapping = {}
173 parthandlermapping = {}
174
174
175 def parthandler(parttype):
175 def parthandler(parttype):
176 """decorator that register a function as a bundle2 part handler
176 """decorator that register a function as a bundle2 part handler
177
177
178 eg::
178 eg::
179
179
180 @parthandler('myparttype')
180 @parthandler('myparttype')
181 def myparttypehandler(...):
181 def myparttypehandler(...):
182 '''process a part of type "my part".'''
182 '''process a part of type "my part".'''
183 ...
183 ...
184 """
184 """
185 def _decorator(func):
185 def _decorator(func):
186 lparttype = parttype.lower() # enforce lower case matching.
186 lparttype = parttype.lower() # enforce lower case matching.
187 assert lparttype not in parthandlermapping
187 assert lparttype not in parthandlermapping
188 parthandlermapping[lparttype] = func
188 parthandlermapping[lparttype] = func
189 return func
189 return func
190 return _decorator
190 return _decorator
191
191
192 class unbundlerecords(object):
192 class unbundlerecords(object):
193 """keep record of what happens during and unbundle
193 """keep record of what happens during and unbundle
194
194
195 New records are added using `records.add('cat', obj)`. Where 'cat' is a
195 New records are added using `records.add('cat', obj)`. Where 'cat' is a
196 category of record and obj is an arbitrary object.
196 category of record and obj is an arbitrary object.
197
197
198 `records['cat']` will return all entries of this category 'cat'.
198 `records['cat']` will return all entries of this category 'cat'.
199
199
200 Iterating on the object itself will yield `('category', obj)` tuples
200 Iterating on the object itself will yield `('category', obj)` tuples
201 for all entries.
201 for all entries.
202
202
203 All iterations happens in chronological order.
203 All iterations happens in chronological order.
204 """
204 """
205
205
206 def __init__(self):
206 def __init__(self):
207 self._categories = {}
207 self._categories = {}
208 self._sequences = []
208 self._sequences = []
209 self._replies = {}
209 self._replies = {}
210
210
211 def add(self, category, entry, inreplyto=None):
211 def add(self, category, entry, inreplyto=None):
212 """add a new record of a given category.
212 """add a new record of a given category.
213
213
214 The entry can then be retrieved in the list returned by
214 The entry can then be retrieved in the list returned by
215 self['category']."""
215 self['category']."""
216 self._categories.setdefault(category, []).append(entry)
216 self._categories.setdefault(category, []).append(entry)
217 self._sequences.append((category, entry))
217 self._sequences.append((category, entry))
218 if inreplyto is not None:
218 if inreplyto is not None:
219 self.getreplies(inreplyto).add(category, entry)
219 self.getreplies(inreplyto).add(category, entry)
220
220
221 def getreplies(self, partid):
221 def getreplies(self, partid):
222 """get the subrecords that replies to a specific part"""
222 """get the subrecords that replies to a specific part"""
223 return self._replies.setdefault(partid, unbundlerecords())
223 return self._replies.setdefault(partid, unbundlerecords())
224
224
225 def __getitem__(self, cat):
225 def __getitem__(self, cat):
226 return tuple(self._categories.get(cat, ()))
226 return tuple(self._categories.get(cat, ()))
227
227
228 def __iter__(self):
228 def __iter__(self):
229 return iter(self._sequences)
229 return iter(self._sequences)
230
230
231 def __len__(self):
231 def __len__(self):
232 return len(self._sequences)
232 return len(self._sequences)
233
233
234 def __nonzero__(self):
234 def __nonzero__(self):
235 return bool(self._sequences)
235 return bool(self._sequences)
236
236
237 class bundleoperation(object):
237 class bundleoperation(object):
238 """an object that represents a single bundling process
238 """an object that represents a single bundling process
239
239
240 Its purpose is to carry unbundle-related objects and states.
240 Its purpose is to carry unbundle-related objects and states.
241
241
242 A new object should be created at the beginning of each bundle processing.
242 A new object should be created at the beginning of each bundle processing.
243 The object is to be returned by the processing function.
243 The object is to be returned by the processing function.
244
244
245 The object has very little content now it will ultimately contain:
245 The object has very little content now it will ultimately contain:
246 * an access to the repo the bundle is applied to,
246 * an access to the repo the bundle is applied to,
247 * a ui object,
247 * a ui object,
248 * a way to retrieve a transaction to add changes to the repo,
248 * a way to retrieve a transaction to add changes to the repo,
249 * a way to record the result of processing each part,
249 * a way to record the result of processing each part,
250 * a way to construct a bundle response when applicable.
250 * a way to construct a bundle response when applicable.
251 """
251 """
252
252
253 def __init__(self, repo, transactiongetter):
253 def __init__(self, repo, transactiongetter):
254 self.repo = repo
254 self.repo = repo
255 self.ui = repo.ui
255 self.ui = repo.ui
256 self.records = unbundlerecords()
256 self.records = unbundlerecords()
257 self.gettransaction = transactiongetter
257 self.gettransaction = transactiongetter
258 self.reply = None
258 self.reply = None
259
259
260 class TransactionUnavailable(RuntimeError):
260 class TransactionUnavailable(RuntimeError):
261 pass
261 pass
262
262
263 def _notransaction():
263 def _notransaction():
264 """default method to get a transaction while processing a bundle
264 """default method to get a transaction while processing a bundle
265
265
266 Raise an exception to highlight the fact that no transaction was expected
266 Raise an exception to highlight the fact that no transaction was expected
267 to be created"""
267 to be created"""
268 raise TransactionUnavailable()
268 raise TransactionUnavailable()
269
269
270 def processbundle(repo, unbundler, transactiongetter=_notransaction):
270 def processbundle(repo, unbundler, transactiongetter=_notransaction):
271 """This function process a bundle, apply effect to/from a repo
271 """This function process a bundle, apply effect to/from a repo
272
272
273 It iterates over each part then searches for and uses the proper handling
273 It iterates over each part then searches for and uses the proper handling
274 code to process the part. Parts are processed in order.
274 code to process the part. Parts are processed in order.
275
275
276 This is very early version of this function that will be strongly reworked
276 This is very early version of this function that will be strongly reworked
277 before final usage.
277 before final usage.
278
278
279 Unknown Mandatory part will abort the process.
279 Unknown Mandatory part will abort the process.
280 """
280 """
281 op = bundleoperation(repo, transactiongetter)
281 op = bundleoperation(repo, transactiongetter)
282 # todo:
282 # todo:
283 # - only create reply bundle if requested.
283 # - only create reply bundle if requested.
284 op.reply = bundle20(op.ui)
284 op.reply = bundle20(op.ui)
285 # todo:
285 # todo:
286 # - replace this is a init function soon.
286 # - replace this is a init function soon.
287 # - exception catching
287 # - exception catching
288 unbundler.params
288 unbundler.params
289 iterparts = iter(unbundler)
289 iterparts = iter(unbundler)
290 part = None
290 part = None
291 try:
291 try:
292 for part in iterparts:
292 for part in iterparts:
293 parttype = part.type
293 parttype = part.type
294 # part key are matched lower case
294 # part key are matched lower case
295 key = parttype.lower()
295 key = parttype.lower()
296 try:
296 try:
297 handler = parthandlermapping[key]
297 handler = parthandlermapping[key]
298 op.ui.debug('found a handler for part %r\n' % parttype)
298 op.ui.debug('found a handler for part %r\n' % parttype)
299 except KeyError:
299 except KeyError:
300 if key != parttype: # mandatory parts
300 if key != parttype: # mandatory parts
301 # todo:
301 # todo:
302 # - use a more precise exception
302 # - use a more precise exception
303 raise
303 raise
304 op.ui.debug('ignoring unknown advisory part %r\n' % key)
304 op.ui.debug('ignoring unknown advisory part %r\n' % key)
305 # consuming the part
305 # consuming the part
306 part.read()
306 part.read()
307 continue
307 continue
308
308
309 # handler is called outside the above try block so that we don't
309 # handler is called outside the above try block so that we don't
310 # risk catching KeyErrors from anything other than the
310 # risk catching KeyErrors from anything other than the
311 # parthandlermapping lookup (any KeyError raised by handler()
311 # parthandlermapping lookup (any KeyError raised by handler()
312 # itself represents a defect of a different variety).
312 # itself represents a defect of a different variety).
313 handler(op, part)
313 handler(op, part)
314 part.read()
314 part.read()
315 except Exception:
315 except Exception:
316 if part is not None:
316 if part is not None:
317 # consume the bundle content
317 # consume the bundle content
318 part.read()
318 part.read()
319 for part in iterparts:
319 for part in iterparts:
320 # consume the bundle content
320 # consume the bundle content
321 part.read()
321 part.read()
322 raise
322 raise
323 return op
323 return op
324
324
325 class bundle20(object):
325 class bundle20(object):
326 """represent an outgoing bundle2 container
326 """represent an outgoing bundle2 container
327
327
328 Use the `addparam` method to add stream level parameter. and `addpart` to
328 Use the `addparam` method to add stream level parameter. and `addpart` to
329 populate it. Then call `getchunks` to retrieve all the binary chunks of
329 populate it. Then call `getchunks` to retrieve all the binary chunks of
330 data that compose the bundle2 container."""
330 data that compose the bundle2 container."""
331
331
332 def __init__(self, ui):
332 def __init__(self, ui):
333 self.ui = ui
333 self.ui = ui
334 self._params = []
334 self._params = []
335 self._parts = []
335 self._parts = []
336
336
337 def addparam(self, name, value=None):
337 def addparam(self, name, value=None):
338 """add a stream level parameter"""
338 """add a stream level parameter"""
339 if not name:
339 if not name:
340 raise ValueError('empty parameter name')
340 raise ValueError('empty parameter name')
341 if name[0] not in string.letters:
341 if name[0] not in string.letters:
342 raise ValueError('non letter first character: %r' % name)
342 raise ValueError('non letter first character: %r' % name)
343 self._params.append((name, value))
343 self._params.append((name, value))
344
344
345 def addpart(self, part):
345 def addpart(self, part):
346 """add a new part to the bundle2 container
346 """add a new part to the bundle2 container
347
347
348 Parts contains the actual applicative payload."""
348 Parts contains the actual applicative payload."""
349 assert part.id is None
349 assert part.id is None
350 part.id = len(self._parts) # very cheap counter
350 part.id = len(self._parts) # very cheap counter
351 self._parts.append(part)
351 self._parts.append(part)
352
352
353 def getchunks(self):
353 def getchunks(self):
354 self.ui.debug('start emission of %s stream\n' % _magicstring)
354 self.ui.debug('start emission of %s stream\n' % _magicstring)
355 yield _magicstring
355 yield _magicstring
356 param = self._paramchunk()
356 param = self._paramchunk()
357 self.ui.debug('bundle parameter: %s\n' % param)
357 self.ui.debug('bundle parameter: %s\n' % param)
358 yield _pack(_fstreamparamsize, len(param))
358 yield _pack(_fstreamparamsize, len(param))
359 if param:
359 if param:
360 yield param
360 yield param
361
361
362 self.ui.debug('start of parts\n')
362 self.ui.debug('start of parts\n')
363 for part in self._parts:
363 for part in self._parts:
364 self.ui.debug('bundle part: "%s"\n' % part.type)
364 self.ui.debug('bundle part: "%s"\n' % part.type)
365 for chunk in part.getchunks():
365 for chunk in part.getchunks():
366 yield chunk
366 yield chunk
367 self.ui.debug('end of bundle\n')
367 self.ui.debug('end of bundle\n')
368 yield '\0\0'
368 yield '\0\0'
369
369
370 def _paramchunk(self):
370 def _paramchunk(self):
371 """return a encoded version of all stream parameters"""
371 """return a encoded version of all stream parameters"""
372 blocks = []
372 blocks = []
373 for par, value in self._params:
373 for par, value in self._params:
374 par = urllib.quote(par)
374 par = urllib.quote(par)
375 if value is not None:
375 if value is not None:
376 value = urllib.quote(value)
376 value = urllib.quote(value)
377 par = '%s=%s' % (par, value)
377 par = '%s=%s' % (par, value)
378 blocks.append(par)
378 blocks.append(par)
379 return ' '.join(blocks)
379 return ' '.join(blocks)
380
380
381 class unpackermixin(object):
381 class unpackermixin(object):
382 """A mixin to extract bytes and struct data from a stream"""
382 """A mixin to extract bytes and struct data from a stream"""
383
383
384 def __init__(self, fp):
384 def __init__(self, fp):
385 self._fp = fp
385 self._fp = fp
386
386
387 def _unpack(self, format):
387 def _unpack(self, format):
388 """unpack this struct format from the stream"""
388 """unpack this struct format from the stream"""
389 data = self._readexact(struct.calcsize(format))
389 data = self._readexact(struct.calcsize(format))
390 return _unpack(format, data)
390 return _unpack(format, data)
391
391
392 def _readexact(self, size):
392 def _readexact(self, size):
393 """read exactly <size> bytes from the stream"""
393 """read exactly <size> bytes from the stream"""
394 return changegroup.readexactly(self._fp, size)
394 return changegroup.readexactly(self._fp, size)
395
395
396
396
397 class unbundle20(unpackermixin):
397 class unbundle20(unpackermixin):
398 """interpret a bundle2 stream
398 """interpret a bundle2 stream
399
399
400 (this will eventually yield parts)"""
400 (this will eventually yield parts)"""
401
401
402 def __init__(self, ui, fp):
402 def __init__(self, ui, fp, header=None):
403 """If header is specified, we do not read it out of the stream."""
403 self.ui = ui
404 self.ui = ui
404 super(unbundle20, self).__init__(fp)
405 super(unbundle20, self).__init__(fp)
405 header = self._readexact(4)
406 if header is None:
406 magic, version = header[0:2], header[2:4]
407 header = self._readexact(4)
407 if magic != 'HG':
408 magic, version = header[0:2], header[2:4]
408 raise util.Abort(_('not a Mercurial bundle'))
409 if magic != 'HG':
409 if version != '20':
410 raise util.Abort(_('not a Mercurial bundle'))
410 raise util.Abort(_('unknown bundle version %s') % version)
411 if version != '20':
412 raise util.Abort(_('unknown bundle version %s') % version)
411 self.ui.debug('start processing of %s stream\n' % header)
413 self.ui.debug('start processing of %s stream\n' % header)
412
414
413 @util.propertycache
415 @util.propertycache
414 def params(self):
416 def params(self):
415 """dictionary of stream level parameters"""
417 """dictionary of stream level parameters"""
416 self.ui.debug('reading bundle2 stream parameters\n')
418 self.ui.debug('reading bundle2 stream parameters\n')
417 params = {}
419 params = {}
418 paramssize = self._unpack(_fstreamparamsize)[0]
420 paramssize = self._unpack(_fstreamparamsize)[0]
419 if paramssize:
421 if paramssize:
420 for p in self._readexact(paramssize).split(' '):
422 for p in self._readexact(paramssize).split(' '):
421 p = p.split('=', 1)
423 p = p.split('=', 1)
422 p = [urllib.unquote(i) for i in p]
424 p = [urllib.unquote(i) for i in p]
423 if len(p) < 2:
425 if len(p) < 2:
424 p.append(None)
426 p.append(None)
425 self._processparam(*p)
427 self._processparam(*p)
426 params[p[0]] = p[1]
428 params[p[0]] = p[1]
427 return params
429 return params
428
430
429 def _processparam(self, name, value):
431 def _processparam(self, name, value):
430 """process a parameter, applying its effect if needed
432 """process a parameter, applying its effect if needed
431
433
432 Parameter starting with a lower case letter are advisory and will be
434 Parameter starting with a lower case letter are advisory and will be
433 ignored when unknown. Those starting with an upper case letter are
435 ignored when unknown. Those starting with an upper case letter are
434 mandatory and will this function will raise a KeyError when unknown.
436 mandatory and will this function will raise a KeyError when unknown.
435
437
436 Note: no option are currently supported. Any input will be either
438 Note: no option are currently supported. Any input will be either
437 ignored or failing.
439 ignored or failing.
438 """
440 """
439 if not name:
441 if not name:
440 raise ValueError('empty parameter name')
442 raise ValueError('empty parameter name')
441 if name[0] not in string.letters:
443 if name[0] not in string.letters:
442 raise ValueError('non letter first character: %r' % name)
444 raise ValueError('non letter first character: %r' % name)
443 # Some logic will be later added here to try to process the option for
445 # Some logic will be later added here to try to process the option for
444 # a dict of known parameter.
446 # a dict of known parameter.
445 if name[0].islower():
447 if name[0].islower():
446 self.ui.debug("ignoring unknown parameter %r\n" % name)
448 self.ui.debug("ignoring unknown parameter %r\n" % name)
447 else:
449 else:
448 raise KeyError(name)
450 raise KeyError(name)
449
451
450
452
451 def __iter__(self):
453 def __iter__(self):
452 """yield all parts contained in the stream"""
454 """yield all parts contained in the stream"""
453 # make sure param have been loaded
455 # make sure param have been loaded
454 self.params
456 self.params
455 self.ui.debug('start extraction of bundle2 parts\n')
457 self.ui.debug('start extraction of bundle2 parts\n')
456 headerblock = self._readpartheader()
458 headerblock = self._readpartheader()
457 while headerblock is not None:
459 while headerblock is not None:
458 part = unbundlepart(self.ui, headerblock, self._fp)
460 part = unbundlepart(self.ui, headerblock, self._fp)
459 yield part
461 yield part
460 headerblock = self._readpartheader()
462 headerblock = self._readpartheader()
461 self.ui.debug('end of bundle2 stream\n')
463 self.ui.debug('end of bundle2 stream\n')
462
464
463 def _readpartheader(self):
465 def _readpartheader(self):
464 """reads a part header size and return the bytes blob
466 """reads a part header size and return the bytes blob
465
467
466 returns None if empty"""
468 returns None if empty"""
467 headersize = self._unpack(_fpartheadersize)[0]
469 headersize = self._unpack(_fpartheadersize)[0]
468 self.ui.debug('part header size: %i\n' % headersize)
470 self.ui.debug('part header size: %i\n' % headersize)
469 if headersize:
471 if headersize:
470 return self._readexact(headersize)
472 return self._readexact(headersize)
471 return None
473 return None
472
474
473
475
474 class bundlepart(object):
476 class bundlepart(object):
475 """A bundle2 part contains application level payload
477 """A bundle2 part contains application level payload
476
478
477 The part `type` is used to route the part to the application level
479 The part `type` is used to route the part to the application level
478 handler.
480 handler.
479 """
481 """
480
482
481 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
483 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
482 data=''):
484 data=''):
483 self.id = None
485 self.id = None
484 self.type = parttype
486 self.type = parttype
485 self.data = data
487 self.data = data
486 self.mandatoryparams = mandatoryparams
488 self.mandatoryparams = mandatoryparams
487 self.advisoryparams = advisoryparams
489 self.advisoryparams = advisoryparams
488
490
489 def getchunks(self):
491 def getchunks(self):
490 #### header
492 #### header
491 ## parttype
493 ## parttype
492 header = [_pack(_fparttypesize, len(self.type)),
494 header = [_pack(_fparttypesize, len(self.type)),
493 self.type, _pack(_fpartid, self.id),
495 self.type, _pack(_fpartid, self.id),
494 ]
496 ]
495 ## parameters
497 ## parameters
496 # count
498 # count
497 manpar = self.mandatoryparams
499 manpar = self.mandatoryparams
498 advpar = self.advisoryparams
500 advpar = self.advisoryparams
499 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
501 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
500 # size
502 # size
501 parsizes = []
503 parsizes = []
502 for key, value in manpar:
504 for key, value in manpar:
503 parsizes.append(len(key))
505 parsizes.append(len(key))
504 parsizes.append(len(value))
506 parsizes.append(len(value))
505 for key, value in advpar:
507 for key, value in advpar:
506 parsizes.append(len(key))
508 parsizes.append(len(key))
507 parsizes.append(len(value))
509 parsizes.append(len(value))
508 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
510 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
509 header.append(paramsizes)
511 header.append(paramsizes)
510 # key, value
512 # key, value
511 for key, value in manpar:
513 for key, value in manpar:
512 header.append(key)
514 header.append(key)
513 header.append(value)
515 header.append(value)
514 for key, value in advpar:
516 for key, value in advpar:
515 header.append(key)
517 header.append(key)
516 header.append(value)
518 header.append(value)
517 ## finalize header
519 ## finalize header
518 headerchunk = ''.join(header)
520 headerchunk = ''.join(header)
519 yield _pack(_fpartheadersize, len(headerchunk))
521 yield _pack(_fpartheadersize, len(headerchunk))
520 yield headerchunk
522 yield headerchunk
521 ## payload
523 ## payload
522 for chunk in self._payloadchunks():
524 for chunk in self._payloadchunks():
523 yield _pack(_fpayloadsize, len(chunk))
525 yield _pack(_fpayloadsize, len(chunk))
524 yield chunk
526 yield chunk
525 # end of payload
527 # end of payload
526 yield _pack(_fpayloadsize, 0)
528 yield _pack(_fpayloadsize, 0)
527
529
528 def _payloadchunks(self):
530 def _payloadchunks(self):
529 """yield chunks of a the part payload
531 """yield chunks of a the part payload
530
532
531 Exists to handle the different methods to provide data to a part."""
533 Exists to handle the different methods to provide data to a part."""
532 # we only support fixed size data now.
534 # we only support fixed size data now.
533 # This will be improved in the future.
535 # This will be improved in the future.
534 if util.safehasattr(self.data, 'next'):
536 if util.safehasattr(self.data, 'next'):
535 buff = util.chunkbuffer(self.data)
537 buff = util.chunkbuffer(self.data)
536 chunk = buff.read(preferedchunksize)
538 chunk = buff.read(preferedchunksize)
537 while chunk:
539 while chunk:
538 yield chunk
540 yield chunk
539 chunk = buff.read(preferedchunksize)
541 chunk = buff.read(preferedchunksize)
540 elif len(self.data):
542 elif len(self.data):
541 yield self.data
543 yield self.data
542
544
543 class unbundlepart(unpackermixin):
545 class unbundlepart(unpackermixin):
544 """a bundle part read from a bundle"""
546 """a bundle part read from a bundle"""
545
547
546 def __init__(self, ui, header, fp):
548 def __init__(self, ui, header, fp):
547 super(unbundlepart, self).__init__(fp)
549 super(unbundlepart, self).__init__(fp)
548 self.ui = ui
550 self.ui = ui
549 # unbundle state attr
551 # unbundle state attr
550 self._headerdata = header
552 self._headerdata = header
551 self._headeroffset = 0
553 self._headeroffset = 0
552 self._initialized = False
554 self._initialized = False
553 self.consumed = False
555 self.consumed = False
554 # part data
556 # part data
555 self.id = None
557 self.id = None
556 self.type = None
558 self.type = None
557 self.mandatoryparams = None
559 self.mandatoryparams = None
558 self.advisoryparams = None
560 self.advisoryparams = None
559 self._payloadstream = None
561 self._payloadstream = None
560 self._readheader()
562 self._readheader()
561
563
562 def _fromheader(self, size):
564 def _fromheader(self, size):
563 """return the next <size> byte from the header"""
565 """return the next <size> byte from the header"""
564 offset = self._headeroffset
566 offset = self._headeroffset
565 data = self._headerdata[offset:(offset + size)]
567 data = self._headerdata[offset:(offset + size)]
566 self._headeroffset = offset + size
568 self._headeroffset = offset + size
567 return data
569 return data
568
570
569 def _unpackheader(self, format):
571 def _unpackheader(self, format):
570 """read given format from header
572 """read given format from header
571
573
572 This automatically compute the size of the format to read."""
574 This automatically compute the size of the format to read."""
573 data = self._fromheader(struct.calcsize(format))
575 data = self._fromheader(struct.calcsize(format))
574 return _unpack(format, data)
576 return _unpack(format, data)
575
577
576 def _readheader(self):
578 def _readheader(self):
577 """read the header and setup the object"""
579 """read the header and setup the object"""
578 typesize = self._unpackheader(_fparttypesize)[0]
580 typesize = self._unpackheader(_fparttypesize)[0]
579 self.type = self._fromheader(typesize)
581 self.type = self._fromheader(typesize)
580 self.ui.debug('part type: "%s"\n' % self.type)
582 self.ui.debug('part type: "%s"\n' % self.type)
581 self.id = self._unpackheader(_fpartid)[0]
583 self.id = self._unpackheader(_fpartid)[0]
582 self.ui.debug('part id: "%s"\n' % self.id)
584 self.ui.debug('part id: "%s"\n' % self.id)
583 ## reading parameters
585 ## reading parameters
584 # param count
586 # param count
585 mancount, advcount = self._unpackheader(_fpartparamcount)
587 mancount, advcount = self._unpackheader(_fpartparamcount)
586 self.ui.debug('part parameters: %i\n' % (mancount + advcount))
588 self.ui.debug('part parameters: %i\n' % (mancount + advcount))
587 # param size
589 # param size
588 fparamsizes = _makefpartparamsizes(mancount + advcount)
590 fparamsizes = _makefpartparamsizes(mancount + advcount)
589 paramsizes = self._unpackheader(fparamsizes)
591 paramsizes = self._unpackheader(fparamsizes)
590 # make it a list of couple again
592 # make it a list of couple again
591 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
593 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
592 # split mandatory from advisory
594 # split mandatory from advisory
593 mansizes = paramsizes[:mancount]
595 mansizes = paramsizes[:mancount]
594 advsizes = paramsizes[mancount:]
596 advsizes = paramsizes[mancount:]
595 # retrive param value
597 # retrive param value
596 manparams = []
598 manparams = []
597 for key, value in mansizes:
599 for key, value in mansizes:
598 manparams.append((self._fromheader(key), self._fromheader(value)))
600 manparams.append((self._fromheader(key), self._fromheader(value)))
599 advparams = []
601 advparams = []
600 for key, value in advsizes:
602 for key, value in advsizes:
601 advparams.append((self._fromheader(key), self._fromheader(value)))
603 advparams.append((self._fromheader(key), self._fromheader(value)))
602 self.mandatoryparams = manparams
604 self.mandatoryparams = manparams
603 self.advisoryparams = advparams
605 self.advisoryparams = advparams
604 ## part payload
606 ## part payload
605 def payloadchunks():
607 def payloadchunks():
606 payloadsize = self._unpack(_fpayloadsize)[0]
608 payloadsize = self._unpack(_fpayloadsize)[0]
607 self.ui.debug('payload chunk size: %i\n' % payloadsize)
609 self.ui.debug('payload chunk size: %i\n' % payloadsize)
608 while payloadsize:
610 while payloadsize:
609 yield self._readexact(payloadsize)
611 yield self._readexact(payloadsize)
610 payloadsize = self._unpack(_fpayloadsize)[0]
612 payloadsize = self._unpack(_fpayloadsize)[0]
611 self.ui.debug('payload chunk size: %i\n' % payloadsize)
613 self.ui.debug('payload chunk size: %i\n' % payloadsize)
612 self._payloadstream = util.chunkbuffer(payloadchunks())
614 self._payloadstream = util.chunkbuffer(payloadchunks())
613 # we read the data, tell it
615 # we read the data, tell it
614 self._initialized = True
616 self._initialized = True
615
617
616 def read(self, size=None):
618 def read(self, size=None):
617 """read payload data"""
619 """read payload data"""
618 if not self._initialized:
620 if not self._initialized:
619 self._readheader()
621 self._readheader()
620 if size is None:
622 if size is None:
621 data = self._payloadstream.read()
623 data = self._payloadstream.read()
622 else:
624 else:
623 data = self._payloadstream.read(size)
625 data = self._payloadstream.read(size)
624 if size is None or len(data) < size:
626 if size is None or len(data) < size:
625 self.consumed = True
627 self.consumed = True
626 return data
628 return data
627
629
628
630
629 @parthandler('changegroup')
631 @parthandler('changegroup')
630 def handlechangegroup(op, inpart):
632 def handlechangegroup(op, inpart):
631 """apply a changegroup part on the repo
633 """apply a changegroup part on the repo
632
634
633 This is a very early implementation that will massive rework before being
635 This is a very early implementation that will massive rework before being
634 inflicted to any end-user.
636 inflicted to any end-user.
635 """
637 """
636 # Make sure we trigger a transaction creation
638 # Make sure we trigger a transaction creation
637 #
639 #
638 # The addchangegroup function will get a transaction object by itself, but
640 # The addchangegroup function will get a transaction object by itself, but
639 # we need to make sure we trigger the creation of a transaction object used
641 # we need to make sure we trigger the creation of a transaction object used
640 # for the whole processing scope.
642 # for the whole processing scope.
641 op.gettransaction()
643 op.gettransaction()
642 cg = changegroup.unbundle10(inpart, 'UN')
644 cg = changegroup.unbundle10(inpart, 'UN')
643 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
645 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
644 op.records.add('changegroup', {'return': ret})
646 op.records.add('changegroup', {'return': ret})
645 if op.reply is not None:
647 if op.reply is not None:
646 # This is definitly not the final form of this
648 # This is definitly not the final form of this
647 # return. But one need to start somewhere.
649 # return. But one need to start somewhere.
648 part = bundlepart('reply:changegroup', (),
650 part = bundlepart('reply:changegroup', (),
649 [('in-reply-to', str(inpart.id)),
651 [('in-reply-to', str(inpart.id)),
650 ('return', '%i' % ret)])
652 ('return', '%i' % ret)])
651 op.reply.addpart(part)
653 op.reply.addpart(part)
652 assert not inpart.read()
654 assert not inpart.read()
653
655
654 @parthandler('reply:changegroup')
656 @parthandler('reply:changegroup')
655 def handlechangegroup(op, inpart):
657 def handlechangegroup(op, inpart):
656 p = dict(inpart.advisoryparams)
658 p = dict(inpart.advisoryparams)
657 ret = int(p['return'])
659 ret = int(p['return'])
658 op.records.add('changegroup', {'return': ret}, int(p['in-reply-to']))
660 op.records.add('changegroup', {'return': ret}, int(p['in-reply-to']))
659
661
660 @parthandler('check:heads')
662 @parthandler('check:heads')
661 def handlechangegroup(op, inpart):
663 def handlechangegroup(op, inpart):
662 """check that head of the repo did not change
664 """check that head of the repo did not change
663
665
664 This is used to detect a push race when using unbundle.
666 This is used to detect a push race when using unbundle.
665 This replaces the "heads" argument of unbundle."""
667 This replaces the "heads" argument of unbundle."""
666 h = inpart.read(20)
668 h = inpart.read(20)
667 heads = []
669 heads = []
668 while len(h) == 20:
670 while len(h) == 20:
669 heads.append(h)
671 heads.append(h)
670 h = inpart.read(20)
672 h = inpart.read(20)
671 assert not h
673 assert not h
672 if heads != op.repo.heads():
674 if heads != op.repo.heads():
673 raise exchange.PushRaced()
675 raise exchange.PushRaced()
General Comments 0
You need to be logged in to leave comments. Login now