##// END OF EJS Templates
bundle2: add an unbundle part responsible from unbundling part...
Pierre-Yves David -
r21014:a6246bba default
parent child Browse files
Show More
@@ -1,617 +1,639 b''
1 # bundle2.py - generic container format to transmit arbitrary data.
1 # bundle2.py - generic container format to transmit arbitrary data.
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7 """Handling of the new bundle2 format
7 """Handling of the new bundle2 format
8
8
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 payloads in an application agnostic way. It consist in a sequence of "parts"
10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 that will be handed to and processed by the application layer.
11 that will be handed to and processed by the application layer.
12
12
13
13
14 General format architecture
14 General format architecture
15 ===========================
15 ===========================
16
16
17 The format is architectured as follow
17 The format is architectured as follow
18
18
19 - magic string
19 - magic string
20 - stream level parameters
20 - stream level parameters
21 - payload parts (any number)
21 - payload parts (any number)
22 - end of stream marker.
22 - end of stream marker.
23
23
24 the Binary format
24 the Binary format
25 ============================
25 ============================
26
26
27 All numbers are unsigned and big endian.
27 All numbers are unsigned and big endian.
28
28
29 stream level parameters
29 stream level parameters
30 ------------------------
30 ------------------------
31
31
32 Binary format is as follow
32 Binary format is as follow
33
33
34 :params size: (16 bits integer)
34 :params size: (16 bits integer)
35
35
36 The total number of Bytes used by the parameters
36 The total number of Bytes used by the parameters
37
37
38 :params value: arbitrary number of Bytes
38 :params value: arbitrary number of Bytes
39
39
40 A blob of `params size` containing the serialized version of all stream level
40 A blob of `params size` containing the serialized version of all stream level
41 parameters.
41 parameters.
42
42
43 The blob contains a space separated list of parameters. parameter with value
43 The blob contains a space separated list of parameters. parameter with value
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45
45
46 Empty name are obviously forbidden.
46 Empty name are obviously forbidden.
47
47
48 Name MUST start with a letter. If this first letter is lower case, the
48 Name MUST start with a letter. If this first letter is lower case, the
49 parameter is advisory and can be safefly ignored. However when the first
49 parameter is advisory and can be safefly ignored. However when the first
50 letter is capital, the parameter is mandatory and the bundling process MUST
50 letter is capital, the parameter is mandatory and the bundling process MUST
51 stop if he is not able to proceed it.
51 stop if he is not able to proceed it.
52
52
53 Stream parameters use a simple textual format for two main reasons:
53 Stream parameters use a simple textual format for two main reasons:
54
54
55 - Stream level parameters should remains simple and we want to discourage any
55 - Stream level parameters should remains simple and we want to discourage any
56 crazy usage.
56 crazy usage.
57 - Textual data allow easy human inspection of a the bundle2 header in case of
57 - Textual data allow easy human inspection of a the bundle2 header in case of
58 troubles.
58 troubles.
59
59
60 Any Applicative level options MUST go into a bundle2 part instead.
60 Any Applicative level options MUST go into a bundle2 part instead.
61
61
62 Payload part
62 Payload part
63 ------------------------
63 ------------------------
64
64
65 Binary format is as follow
65 Binary format is as follow
66
66
67 :header size: (16 bits inter)
67 :header size: (16 bits inter)
68
68
69 The total number of Bytes used by the part headers. When the header is empty
69 The total number of Bytes used by the part headers. When the header is empty
70 (size = 0) this is interpreted as the end of stream marker.
70 (size = 0) this is interpreted as the end of stream marker.
71
71
72 :header:
72 :header:
73
73
74 The header defines how to interpret the part. It contains two piece of
74 The header defines how to interpret the part. It contains two piece of
75 data: the part type, and the part parameters.
75 data: the part type, and the part parameters.
76
76
77 The part type is used to route an application level handler, that can
77 The part type is used to route an application level handler, that can
78 interpret payload.
78 interpret payload.
79
79
80 Part parameters are passed to the application level handler. They are
80 Part parameters are passed to the application level handler. They are
81 meant to convey information that will help the application level object to
81 meant to convey information that will help the application level object to
82 interpret the part payload.
82 interpret the part payload.
83
83
84 The binary format of the header is has follow
84 The binary format of the header is has follow
85
85
86 :typesize: (one byte)
86 :typesize: (one byte)
87
87
88 :typename: alphanumerical part name
88 :typename: alphanumerical part name
89
89
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 to this part.
91 to this part.
92
92
93 :parameters:
93 :parameters:
94
94
95 Part's parameter may have arbitraty content, the binary structure is::
95 Part's parameter may have arbitraty content, the binary structure is::
96
96
97 <mandatory-count><advisory-count><param-sizes><param-data>
97 <mandatory-count><advisory-count><param-sizes><param-data>
98
98
99 :mandatory-count: 1 byte, number of mandatory parameters
99 :mandatory-count: 1 byte, number of mandatory parameters
100
100
101 :advisory-count: 1 byte, number of advisory parameters
101 :advisory-count: 1 byte, number of advisory parameters
102
102
103 :param-sizes:
103 :param-sizes:
104
104
105 N couple of bytes, where N is the total number of parameters. Each
105 N couple of bytes, where N is the total number of parameters. Each
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107
107
108 :param-data:
108 :param-data:
109
109
110 A blob of bytes from which each parameter key and value can be
110 A blob of bytes from which each parameter key and value can be
111 retrieved using the list of size couples stored in the previous
111 retrieved using the list of size couples stored in the previous
112 field.
112 field.
113
113
114 Mandatory parameters comes first, then the advisory ones.
114 Mandatory parameters comes first, then the advisory ones.
115
115
116 :payload:
116 :payload:
117
117
118 payload is a series of `<chunksize><chunkdata>`.
118 payload is a series of `<chunksize><chunkdata>`.
119
119
120 `chunksize` is a 32 bits integer, `chunkdata` are plain bytes (as much as
120 `chunksize` is a 32 bits integer, `chunkdata` are plain bytes (as much as
121 `chunksize` says)` The payload part is concluded by a zero size chunk.
121 `chunksize` says)` The payload part is concluded by a zero size chunk.
122
122
123 The current implementation always produces either zero or one chunk.
123 The current implementation always produces either zero or one chunk.
124 This is an implementation limitation that will ultimatly be lifted.
124 This is an implementation limitation that will ultimatly be lifted.
125
125
126 Bundle processing
126 Bundle processing
127 ============================
127 ============================
128
128
129 Each part is processed in order using a "part handler". Handler are registered
129 Each part is processed in order using a "part handler". Handler are registered
130 for a certain part type.
130 for a certain part type.
131
131
132 The matching of a part to its handler is case insensitive. The case of the
132 The matching of a part to its handler is case insensitive. The case of the
133 part type is used to know if a part is mandatory or advisory. If the Part type
133 part type is used to know if a part is mandatory or advisory. If the Part type
134 contains any uppercase char it is considered mandatory. When no handler is
134 contains any uppercase char it is considered mandatory. When no handler is
135 known for a Mandatory part, the process is aborted and an exception is raised.
135 known for a Mandatory part, the process is aborted and an exception is raised.
136 If the part is advisory and no handler is known, the part is ignored. When the
136 If the part is advisory and no handler is known, the part is ignored. When the
137 process is aborted, the full bundle is still read from the stream to keep the
137 process is aborted, the full bundle is still read from the stream to keep the
138 channel usable. But none of the part read from an abort are processed. In the
138 channel usable. But none of the part read from an abort are processed. In the
139 future, dropping the stream may become an option for channel we do not care to
139 future, dropping the stream may become an option for channel we do not care to
140 preserve.
140 preserve.
141 """
141 """
142
142
143 import util
143 import util
144 import struct
144 import struct
145 import urllib
145 import urllib
146 import string
146 import string
147 import StringIO
147 import StringIO
148
148
149 import changegroup
149 import changegroup
150 from i18n import _
150 from i18n import _
151
151
152 _pack = struct.pack
152 _pack = struct.pack
153 _unpack = struct.unpack
153 _unpack = struct.unpack
154
154
155 _magicstring = 'HG20'
155 _magicstring = 'HG20'
156
156
157 _fstreamparamsize = '>H'
157 _fstreamparamsize = '>H'
158 _fpartheadersize = '>H'
158 _fpartheadersize = '>H'
159 _fparttypesize = '>B'
159 _fparttypesize = '>B'
160 _fpartid = '>I'
160 _fpartid = '>I'
161 _fpayloadsize = '>I'
161 _fpayloadsize = '>I'
162 _fpartparamcount = '>BB'
162 _fpartparamcount = '>BB'
163
163
164 preferedchunksize = 4096
164 preferedchunksize = 4096
165
165
166 def _makefpartparamsizes(nbparams):
166 def _makefpartparamsizes(nbparams):
167 """return a struct format to read part parameter sizes
167 """return a struct format to read part parameter sizes
168
168
169 The number parameters is variable so we need to build that format
169 The number parameters is variable so we need to build that format
170 dynamically.
170 dynamically.
171 """
171 """
172 return '>'+('BB'*nbparams)
172 return '>'+('BB'*nbparams)
173
173
174 parthandlermapping = {}
174 parthandlermapping = {}
175
175
176 def parthandler(parttype):
176 def parthandler(parttype):
177 """decorator that register a function as a bundle2 part handler
177 """decorator that register a function as a bundle2 part handler
178
178
179 eg::
179 eg::
180
180
181 @parthandler('myparttype')
181 @parthandler('myparttype')
182 def myparttypehandler(...):
182 def myparttypehandler(...):
183 '''process a part of type "my part".'''
183 '''process a part of type "my part".'''
184 ...
184 ...
185 """
185 """
186 def _decorator(func):
186 def _decorator(func):
187 lparttype = parttype.lower() # enforce lower case matching.
187 lparttype = parttype.lower() # enforce lower case matching.
188 assert lparttype not in parthandlermapping
188 assert lparttype not in parthandlermapping
189 parthandlermapping[lparttype] = func
189 parthandlermapping[lparttype] = func
190 return func
190 return func
191 return _decorator
191 return _decorator
192
192
193 class unbundlerecords(object):
193 class unbundlerecords(object):
194 """keep record of what happens during and unbundle
194 """keep record of what happens during and unbundle
195
195
196 New records are added using `records.add('cat', obj)`. Where 'cat' is a
196 New records are added using `records.add('cat', obj)`. Where 'cat' is a
197 category of record and obj is an arbitraty object.
197 category of record and obj is an arbitraty object.
198
198
199 `records['cat']` will return all entries of this category 'cat'.
199 `records['cat']` will return all entries of this category 'cat'.
200
200
201 Iterating on the object itself will yield `('category', obj)` tuples
201 Iterating on the object itself will yield `('category', obj)` tuples
202 for all entries.
202 for all entries.
203
203
204 All iterations happens in chronological order.
204 All iterations happens in chronological order.
205 """
205 """
206
206
207 def __init__(self):
207 def __init__(self):
208 self._categories = {}
208 self._categories = {}
209 self._sequences = []
209 self._sequences = []
210 self._replies = {}
210 self._replies = {}
211
211
212 def add(self, category, entry, inreplyto=None):
212 def add(self, category, entry, inreplyto=None):
213 """add a new record of a given category.
213 """add a new record of a given category.
214
214
215 The entry can then be retrieved in the list returned by
215 The entry can then be retrieved in the list returned by
216 self['category']."""
216 self['category']."""
217 self._categories.setdefault(category, []).append(entry)
217 self._categories.setdefault(category, []).append(entry)
218 self._sequences.append((category, entry))
218 self._sequences.append((category, entry))
219 if inreplyto is not None:
219 if inreplyto is not None:
220 self.getreplies(inreplyto).add(category, entry)
220 self.getreplies(inreplyto).add(category, entry)
221
221
222 def getreplies(self, partid):
222 def getreplies(self, partid):
223 """get the subrecords that replies to a specific part"""
223 """get the subrecords that replies to a specific part"""
224 return self._replies.setdefault(partid, unbundlerecords())
224 return self._replies.setdefault(partid, unbundlerecords())
225
225
226 def __getitem__(self, cat):
226 def __getitem__(self, cat):
227 return tuple(self._categories.get(cat, ()))
227 return tuple(self._categories.get(cat, ()))
228
228
229 def __iter__(self):
229 def __iter__(self):
230 return iter(self._sequences)
230 return iter(self._sequences)
231
231
232 def __len__(self):
232 def __len__(self):
233 return len(self._sequences)
233 return len(self._sequences)
234
234
235 def __nonzero__(self):
235 def __nonzero__(self):
236 return bool(self._sequences)
236 return bool(self._sequences)
237
237
238 class bundleoperation(object):
238 class bundleoperation(object):
239 """an object that represents a single bundling process
239 """an object that represents a single bundling process
240
240
241 Its purpose is to carry unbundle-related objects and states.
241 Its purpose is to carry unbundle-related objects and states.
242
242
243 A new object should be created at the beginning of each bundle processing.
243 A new object should be created at the beginning of each bundle processing.
244 The object is to be returned by the processing function.
244 The object is to be returned by the processing function.
245
245
246 The object has very little content now it will ultimately contain:
246 The object has very little content now it will ultimately contain:
247 * an access to the repo the bundle is applied to,
247 * an access to the repo the bundle is applied to,
248 * a ui object,
248 * a ui object,
249 * a way to retrieve a transaction to add changes to the repo,
249 * a way to retrieve a transaction to add changes to the repo,
250 * a way to record the result of processing each part,
250 * a way to record the result of processing each part,
251 * a way to construct a bundle response when applicable.
251 * a way to construct a bundle response when applicable.
252 """
252 """
253
253
254 def __init__(self, repo, transactiongetter):
254 def __init__(self, repo, transactiongetter):
255 self.repo = repo
255 self.repo = repo
256 self.ui = repo.ui
256 self.ui = repo.ui
257 self.records = unbundlerecords()
257 self.records = unbundlerecords()
258 self.gettransaction = transactiongetter
258 self.gettransaction = transactiongetter
259 self.reply = None
259 self.reply = None
260
260
261 class TransactionUnavailable(RuntimeError):
261 class TransactionUnavailable(RuntimeError):
262 pass
262 pass
263
263
264 def _notransaction():
264 def _notransaction():
265 """default method to get a transaction while processing a bundle
265 """default method to get a transaction while processing a bundle
266
266
267 Raise an exception to highlight the fact that no transaction was expected
267 Raise an exception to highlight the fact that no transaction was expected
268 to be created"""
268 to be created"""
269 raise TransactionUnavailable()
269 raise TransactionUnavailable()
270
270
271 def processbundle(repo, unbundler, transactiongetter=_notransaction):
271 def processbundle(repo, unbundler, transactiongetter=_notransaction):
272 """This function process a bundle, apply effect to/from a repo
272 """This function process a bundle, apply effect to/from a repo
273
273
274 It iterates over each part then searches for and uses the proper handling
274 It iterates over each part then searches for and uses the proper handling
275 code to process the part. Parts are processed in order.
275 code to process the part. Parts are processed in order.
276
276
277 This is very early version of this function that will be strongly reworked
277 This is very early version of this function that will be strongly reworked
278 before final usage.
278 before final usage.
279
279
280 Unknown Mandatory part will abort the process.
280 Unknown Mandatory part will abort the process.
281 """
281 """
282 op = bundleoperation(repo, transactiongetter)
282 op = bundleoperation(repo, transactiongetter)
283 # todo:
283 # todo:
284 # - only create reply bundle if requested.
284 # - only create reply bundle if requested.
285 op.reply = bundle20(op.ui)
285 op.reply = bundle20(op.ui)
286 # todo:
286 # todo:
287 # - replace this is a init function soon.
287 # - replace this is a init function soon.
288 # - exception catching
288 # - exception catching
289 unbundler.params
289 unbundler.params
290 iterparts = iter(unbundler)
290 iterparts = iter(unbundler)
291 try:
291 try:
292 for part in iterparts:
292 for part in iterparts:
293 parttype = part.type
293 parttype = part.type
294 # part key are matched lower case
294 # part key are matched lower case
295 key = parttype.lower()
295 key = parttype.lower()
296 try:
296 try:
297 handler = parthandlermapping[key]
297 handler = parthandlermapping[key]
298 op.ui.debug('found a handler for part %r\n' % parttype)
298 op.ui.debug('found a handler for part %r\n' % parttype)
299 except KeyError:
299 except KeyError:
300 if key != parttype: # mandatory parts
300 if key != parttype: # mandatory parts
301 # todo:
301 # todo:
302 # - use a more precise exception
302 # - use a more precise exception
303 raise
303 raise
304 op.ui.debug('ignoring unknown advisory part %r\n' % key)
304 op.ui.debug('ignoring unknown advisory part %r\n' % key)
305 # todo:
305 # todo:
306 # - consume the part once we use streaming
306 # - consume the part once we use streaming
307 continue
307 continue
308
308
309 # handler is called outside the above try block so that we don't
309 # handler is called outside the above try block so that we don't
310 # risk catching KeyErrors from anything other than the
310 # risk catching KeyErrors from anything other than the
311 # parthandlermapping lookup (any KeyError raised by handler()
311 # parthandlermapping lookup (any KeyError raised by handler()
312 # itself represents a defect of a different variety).
312 # itself represents a defect of a different variety).
313 handler(op, part)
313 handler(op, part)
314 except Exception:
314 except Exception:
315 for part in iterparts:
315 for part in iterparts:
316 pass # consume the bundle content
316 pass # consume the bundle content
317 raise
317 raise
318 return op
318 return op
319
319
320 class bundle20(object):
320 class bundle20(object):
321 """represent an outgoing bundle2 container
321 """represent an outgoing bundle2 container
322
322
323 Use the `addparam` method to add stream level parameter. and `addpart` to
323 Use the `addparam` method to add stream level parameter. and `addpart` to
324 populate it. Then call `getchunks` to retrieve all the binary chunks of
324 populate it. Then call `getchunks` to retrieve all the binary chunks of
325 datathat compose the bundle2 container."""
325 datathat compose the bundle2 container."""
326
326
327 def __init__(self, ui):
327 def __init__(self, ui):
328 self.ui = ui
328 self.ui = ui
329 self._params = []
329 self._params = []
330 self._parts = []
330 self._parts = []
331
331
332 def addparam(self, name, value=None):
332 def addparam(self, name, value=None):
333 """add a stream level parameter"""
333 """add a stream level parameter"""
334 if not name:
334 if not name:
335 raise ValueError('empty parameter name')
335 raise ValueError('empty parameter name')
336 if name[0] not in string.letters:
336 if name[0] not in string.letters:
337 raise ValueError('non letter first character: %r' % name)
337 raise ValueError('non letter first character: %r' % name)
338 self._params.append((name, value))
338 self._params.append((name, value))
339
339
340 def addpart(self, part):
340 def addpart(self, part):
341 """add a new part to the bundle2 container
341 """add a new part to the bundle2 container
342
342
343 Parts contains the actuall applicative payload."""
343 Parts contains the actuall applicative payload."""
344 assert part.id is None
344 assert part.id is None
345 part.id = len(self._parts) # very cheap counter
345 part.id = len(self._parts) # very cheap counter
346 self._parts.append(part)
346 self._parts.append(part)
347
347
348 def getchunks(self):
348 def getchunks(self):
349 self.ui.debug('start emission of %s stream\n' % _magicstring)
349 self.ui.debug('start emission of %s stream\n' % _magicstring)
350 yield _magicstring
350 yield _magicstring
351 param = self._paramchunk()
351 param = self._paramchunk()
352 self.ui.debug('bundle parameter: %s\n' % param)
352 self.ui.debug('bundle parameter: %s\n' % param)
353 yield _pack(_fstreamparamsize, len(param))
353 yield _pack(_fstreamparamsize, len(param))
354 if param:
354 if param:
355 yield param
355 yield param
356
356
357 self.ui.debug('start of parts\n')
357 self.ui.debug('start of parts\n')
358 for part in self._parts:
358 for part in self._parts:
359 self.ui.debug('bundle part: "%s"\n' % part.type)
359 self.ui.debug('bundle part: "%s"\n' % part.type)
360 for chunk in part.getchunks():
360 for chunk in part.getchunks():
361 yield chunk
361 yield chunk
362 self.ui.debug('end of bundle\n')
362 self.ui.debug('end of bundle\n')
363 yield '\0\0'
363 yield '\0\0'
364
364
365 def _paramchunk(self):
365 def _paramchunk(self):
366 """return a encoded version of all stream parameters"""
366 """return a encoded version of all stream parameters"""
367 blocks = []
367 blocks = []
368 for par, value in self._params:
368 for par, value in self._params:
369 par = urllib.quote(par)
369 par = urllib.quote(par)
370 if value is not None:
370 if value is not None:
371 value = urllib.quote(value)
371 value = urllib.quote(value)
372 par = '%s=%s' % (par, value)
372 par = '%s=%s' % (par, value)
373 blocks.append(par)
373 blocks.append(par)
374 return ' '.join(blocks)
374 return ' '.join(blocks)
375
375
376 class unpackermixin(object):
376 class unpackermixin(object):
377 """A mixin to extract bytes and struct data from a stream"""
377 """A mixin to extract bytes and struct data from a stream"""
378
378
379 def __init__(self, fp):
379 def __init__(self, fp):
380 self._fp = fp
380 self._fp = fp
381
381
382 def _unpack(self, format):
382 def _unpack(self, format):
383 """unpack this struct format from the stream"""
383 """unpack this struct format from the stream"""
384 data = self._readexact(struct.calcsize(format))
384 data = self._readexact(struct.calcsize(format))
385 return _unpack(format, data)
385 return _unpack(format, data)
386
386
387 def _readexact(self, size):
387 def _readexact(self, size):
388 """read exactly <size> bytes from the stream"""
388 """read exactly <size> bytes from the stream"""
389 return changegroup.readexactly(self._fp, size)
389 return changegroup.readexactly(self._fp, size)
390
390
391
391
392 class unbundle20(unpackermixin):
392 class unbundle20(unpackermixin):
393 """interpret a bundle2 stream
393 """interpret a bundle2 stream
394
394
395 (this will eventually yield parts)"""
395 (this will eventually yield parts)"""
396
396
397 def __init__(self, ui, fp):
397 def __init__(self, ui, fp):
398 self.ui = ui
398 self.ui = ui
399 super(unbundle20, self).__init__(fp)
399 super(unbundle20, self).__init__(fp)
400 header = self._readexact(4)
400 header = self._readexact(4)
401 magic, version = header[0:2], header[2:4]
401 magic, version = header[0:2], header[2:4]
402 if magic != 'HG':
402 if magic != 'HG':
403 raise util.Abort(_('not a Mercurial bundle'))
403 raise util.Abort(_('not a Mercurial bundle'))
404 if version != '20':
404 if version != '20':
405 raise util.Abort(_('unknown bundle version %s') % version)
405 raise util.Abort(_('unknown bundle version %s') % version)
406 self.ui.debug('start processing of %s stream\n' % header)
406 self.ui.debug('start processing of %s stream\n' % header)
407
407
408 @util.propertycache
408 @util.propertycache
409 def params(self):
409 def params(self):
410 """dictionnary of stream level parameters"""
410 """dictionnary of stream level parameters"""
411 self.ui.debug('reading bundle2 stream parameters\n')
411 self.ui.debug('reading bundle2 stream parameters\n')
412 params = {}
412 params = {}
413 paramssize = self._unpack(_fstreamparamsize)[0]
413 paramssize = self._unpack(_fstreamparamsize)[0]
414 if paramssize:
414 if paramssize:
415 for p in self._readexact(paramssize).split(' '):
415 for p in self._readexact(paramssize).split(' '):
416 p = p.split('=', 1)
416 p = p.split('=', 1)
417 p = [urllib.unquote(i) for i in p]
417 p = [urllib.unquote(i) for i in p]
418 if len(p) < 2:
418 if len(p) < 2:
419 p.append(None)
419 p.append(None)
420 self._processparam(*p)
420 self._processparam(*p)
421 params[p[0]] = p[1]
421 params[p[0]] = p[1]
422 return params
422 return params
423
423
424 def _processparam(self, name, value):
424 def _processparam(self, name, value):
425 """process a parameter, applying its effect if needed
425 """process a parameter, applying its effect if needed
426
426
427 Parameter starting with a lower case letter are advisory and will be
427 Parameter starting with a lower case letter are advisory and will be
428 ignored when unknown. Those starting with an upper case letter are
428 ignored when unknown. Those starting with an upper case letter are
429 mandatory and will this function will raise a KeyError when unknown.
429 mandatory and will this function will raise a KeyError when unknown.
430
430
431 Note: no option are currently supported. Any input will be either
431 Note: no option are currently supported. Any input will be either
432 ignored or failing.
432 ignored or failing.
433 """
433 """
434 if not name:
434 if not name:
435 raise ValueError('empty parameter name')
435 raise ValueError('empty parameter name')
436 if name[0] not in string.letters:
436 if name[0] not in string.letters:
437 raise ValueError('non letter first character: %r' % name)
437 raise ValueError('non letter first character: %r' % name)
438 # Some logic will be later added here to try to process the option for
438 # Some logic will be later added here to try to process the option for
439 # a dict of known parameter.
439 # a dict of known parameter.
440 if name[0].islower():
440 if name[0].islower():
441 self.ui.debug("ignoring unknown parameter %r\n" % name)
441 self.ui.debug("ignoring unknown parameter %r\n" % name)
442 else:
442 else:
443 raise KeyError(name)
443 raise KeyError(name)
444
444
445
445
446 def __iter__(self):
446 def __iter__(self):
447 """yield all parts contained in the stream"""
447 """yield all parts contained in the stream"""
448 # make sure param have been loaded
448 # make sure param have been loaded
449 self.params
449 self.params
450 self.ui.debug('start extraction of bundle2 parts\n')
450 self.ui.debug('start extraction of bundle2 parts\n')
451 part = self._readpart()
451 headerblock = self._readpartheader()
452 while part is not None:
452 while headerblock is not None:
453 part = unbundlepart(self.ui, headerblock, self._fp)
453 yield part
454 yield part
454 part = self._readpart()
455 headerblock = self._readpartheader()
455 self.ui.debug('end of bundle2 stream\n')
456 self.ui.debug('end of bundle2 stream\n')
456
457
457 def _readpart(self):
458 def _readpartheader(self):
458 """return None when an end of stream markers is reach"""
459 """reads a part header size and return the bytes blob
459
460
461 returns None if empty"""
460 headersize = self._unpack(_fpartheadersize)[0]
462 headersize = self._unpack(_fpartheadersize)[0]
461 self.ui.debug('part header size: %i\n' % headersize)
463 self.ui.debug('part header size: %i\n' % headersize)
462 if not headersize:
464 if headersize:
463 return None
465 return self._readexact(headersize)
464 headerblock = self._readexact(headersize)
466 return None
465 # some utility to help reading from the header block
466 self._offset = 0 # layer violation to have something easy to understand
467 def fromheader(size):
468 """return the next <size> byte from the header"""
469 offset = self._offset
470 data = headerblock[offset:(offset + size)]
471 self._offset = offset + size
472 return data
473 def unpackheader(format):
474 """read given format from header
475
476 This automatically compute the size of the format to read."""
477 data = fromheader(struct.calcsize(format))
478 return _unpack(format, data)
479
480 typesize = unpackheader(_fparttypesize)[0]
481 parttype = fromheader(typesize)
482 self.ui.debug('part type: "%s"\n' % parttype)
483 partid = unpackheader(_fpartid)[0]
484 self.ui.debug('part id: "%s"\n' % partid)
485 ## reading parameters
486 # param count
487 mancount, advcount = unpackheader(_fpartparamcount)
488 self.ui.debug('part parameters: %i\n' % (mancount + advcount))
489 # param size
490 paramsizes = unpackheader(_makefpartparamsizes(mancount + advcount))
491 # make it a list of couple again
492 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
493 # split mandatory from advisory
494 mansizes = paramsizes[:mancount]
495 advsizes = paramsizes[mancount:]
496 # retrive param value
497 manparams = []
498 for key, value in mansizes:
499 manparams.append((fromheader(key), fromheader(value)))
500 advparams = []
501 for key, value in advsizes:
502 advparams.append((fromheader(key), fromheader(value)))
503 del self._offset # clean up layer, nobody saw anything.
504 ## part payload
505 payload = []
506 payloadsize = self._unpack(_fpayloadsize)[0]
507 self.ui.debug('payload chunk size: %i\n' % payloadsize)
508 while payloadsize:
509 payload.append(self._readexact(payloadsize))
510 payloadsize = self._unpack(_fpayloadsize)[0]
511 self.ui.debug('payload chunk size: %i\n' % payloadsize)
512 payload = ''.join(payload)
513 current = bundlepart(parttype, manparams, advparams, data=payload)
514 current.id = partid
515 return current
516
467
517
468
518 class bundlepart(object):
469 class bundlepart(object):
519 """A bundle2 part contains application level payload
470 """A bundle2 part contains application level payload
520
471
521 The part `type` is used to route the part to the application level
472 The part `type` is used to route the part to the application level
522 handler.
473 handler.
523 """
474 """
524
475
525 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
476 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
526 data=''):
477 data=''):
527 self.id = None
478 self.id = None
528 self.type = parttype
479 self.type = parttype
529 self.data = data
480 self.data = data
530 self.mandatoryparams = mandatoryparams
481 self.mandatoryparams = mandatoryparams
531 self.advisoryparams = advisoryparams
482 self.advisoryparams = advisoryparams
532
483
533 def getchunks(self):
484 def getchunks(self):
534 #### header
485 #### header
535 ## parttype
486 ## parttype
536 header = [_pack(_fparttypesize, len(self.type)),
487 header = [_pack(_fparttypesize, len(self.type)),
537 self.type, _pack(_fpartid, self.id),
488 self.type, _pack(_fpartid, self.id),
538 ]
489 ]
539 ## parameters
490 ## parameters
540 # count
491 # count
541 manpar = self.mandatoryparams
492 manpar = self.mandatoryparams
542 advpar = self.advisoryparams
493 advpar = self.advisoryparams
543 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
494 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
544 # size
495 # size
545 parsizes = []
496 parsizes = []
546 for key, value in manpar:
497 for key, value in manpar:
547 parsizes.append(len(key))
498 parsizes.append(len(key))
548 parsizes.append(len(value))
499 parsizes.append(len(value))
549 for key, value in advpar:
500 for key, value in advpar:
550 parsizes.append(len(key))
501 parsizes.append(len(key))
551 parsizes.append(len(value))
502 parsizes.append(len(value))
552 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
503 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
553 header.append(paramsizes)
504 header.append(paramsizes)
554 # key, value
505 # key, value
555 for key, value in manpar:
506 for key, value in manpar:
556 header.append(key)
507 header.append(key)
557 header.append(value)
508 header.append(value)
558 for key, value in advpar:
509 for key, value in advpar:
559 header.append(key)
510 header.append(key)
560 header.append(value)
511 header.append(value)
561 ## finalize header
512 ## finalize header
562 headerchunk = ''.join(header)
513 headerchunk = ''.join(header)
563 yield _pack(_fpartheadersize, len(headerchunk))
514 yield _pack(_fpartheadersize, len(headerchunk))
564 yield headerchunk
515 yield headerchunk
565 ## payload
516 ## payload
566 for chunk in self._payloadchunks():
517 for chunk in self._payloadchunks():
567 yield _pack(_fpayloadsize, len(chunk))
518 yield _pack(_fpayloadsize, len(chunk))
568 yield chunk
519 yield chunk
569 # end of payload
520 # end of payload
570 yield _pack(_fpayloadsize, 0)
521 yield _pack(_fpayloadsize, 0)
571
522
572 def _payloadchunks(self):
523 def _payloadchunks(self):
573 """yield chunks of a the part payload
524 """yield chunks of a the part payload
574
525
575 Exists to handle the different methods to provide data to a part."""
526 Exists to handle the different methods to provide data to a part."""
576 # we only support fixed size data now.
527 # we only support fixed size data now.
577 # This will be improved in the future.
528 # This will be improved in the future.
578 if util.safehasattr(self.data, 'next'):
529 if util.safehasattr(self.data, 'next'):
579 buff = util.chunkbuffer(self.data)
530 buff = util.chunkbuffer(self.data)
580 chunk = buff.read(preferedchunksize)
531 chunk = buff.read(preferedchunksize)
581 while chunk:
532 while chunk:
582 yield chunk
533 yield chunk
583 chunk = buff.read(preferedchunksize)
534 chunk = buff.read(preferedchunksize)
584 elif len(self.data):
535 elif len(self.data):
585 yield self.data
536 yield self.data
586
537
538 class unbundlepart(unpackermixin):
539 """a bundle part read from a bundle"""
540
541 def __init__(self, ui, header, fp):
542 super(unbundlepart, self).__init__(fp)
543 self.ui = ui
544 # unbundle state attr
545 self._headerdata = header
546 # part data
547 self.id = None
548 self.type = None
549 self.mandatoryparams = None
550 self.advisoryparams = None
551 self.data = None
552 self._readdata()
553
554 def _readdata(self):
555 """read the header and setup the object"""
556 # some utility to help reading from the header block
557 headerblock = self._headerdata
558 self._offset = 0 # layer violation to have something easy to understand
559 def fromheader(size):
560 """return the next <size> byte from the header"""
561 offset = self._offset
562 data = headerblock[offset:(offset + size)]
563 self._offset = offset + size
564 return data
565 def unpackheader(format):
566 """read given format from header
567
568 This automatically compute the size of the format to read."""
569 data = fromheader(struct.calcsize(format))
570 return _unpack(format, data)
571
572 typesize = unpackheader(_fparttypesize)[0]
573 self.type = fromheader(typesize)
574 self.ui.debug('part type: "%s"\n' % self.type)
575 self.id = unpackheader(_fpartid)[0]
576 self.ui.debug('part id: "%s"\n' % self.id)
577 ## reading parameters
578 # param count
579 mancount, advcount = unpackheader(_fpartparamcount)
580 self.ui.debug('part parameters: %i\n' % (mancount + advcount))
581 # param size
582 paramsizes = unpackheader(_makefpartparamsizes(mancount + advcount))
583 # make it a list of couple again
584 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
585 # split mandatory from advisory
586 mansizes = paramsizes[:mancount]
587 advsizes = paramsizes[mancount:]
588 # retrive param value
589 manparams = []
590 for key, value in mansizes:
591 manparams.append((fromheader(key), fromheader(value)))
592 advparams = []
593 for key, value in advsizes:
594 advparams.append((fromheader(key), fromheader(value)))
595 del self._offset # clean up layer, nobody saw anything.
596 self.mandatoryparams = manparams
597 self.advisoryparams = advparams
598 ## part payload
599 payload = []
600 payloadsize = self._unpack(_fpayloadsize)[0]
601 self.ui.debug('payload chunk size: %i\n' % payloadsize)
602 while payloadsize:
603 payload.append(self._readexact(payloadsize))
604 payloadsize = self._unpack(_fpayloadsize)[0]
605 self.ui.debug('payload chunk size: %i\n' % payloadsize)
606 self.data = ''.join(payload)
607
587 @parthandler('changegroup')
608 @parthandler('changegroup')
588 def handlechangegroup(op, inpart):
609 def handlechangegroup(op, inpart):
589 """apply a changegroup part on the repo
610 """apply a changegroup part on the repo
590
611
591 This is a very early implementation that will massive rework before being
612 This is a very early implementation that will massive rework before being
592 inflicted to any end-user.
613 inflicted to any end-user.
593 """
614 """
594 # Make sure we trigger a transaction creation
615 # Make sure we trigger a transaction creation
595 #
616 #
596 # The addchangegroup function will get a transaction object by itself, but
617 # The addchangegroup function will get a transaction object by itself, but
597 # we need to make sure we trigger the creation of a transaction object used
618 # we need to make sure we trigger the creation of a transaction object used
598 # for the whole processing scope.
619 # for the whole processing scope.
599 op.gettransaction()
620 op.gettransaction()
600 data = StringIO.StringIO(inpart.data)
621 data = StringIO.StringIO(inpart.data)
601 data.seek(0)
622 data.seek(0)
602 cg = changegroup.readbundle(data, 'bundle2part')
623 cg = changegroup.readbundle(data, 'bundle2part')
603 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
624 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
604 op.records.add('changegroup', {'return': ret})
625 op.records.add('changegroup', {'return': ret})
605 if op.reply is not None:
626 if op.reply is not None:
606 # This is definitly not the final form of this
627 # This is definitly not the final form of this
607 # return. But one need to start somewhere.
628 # return. But one need to start somewhere.
608 op.reply.addpart(bundlepart('reply:changegroup', (),
629 part = bundlepart('reply:changegroup', (),
609 [('in-reply-to', str(inpart.id)),
630 [('in-reply-to', str(inpart.id)),
610 ('return', '%i' % ret)]))
631 ('return', '%i' % ret)])
632 op.reply.addpart(part)
611
633
612 @parthandler('reply:changegroup')
634 @parthandler('reply:changegroup')
613 def handlechangegroup(op, inpart):
635 def handlechangegroup(op, inpart):
614 p = dict(inpart.advisoryparams)
636 p = dict(inpart.advisoryparams)
615 ret = int(p['return'])
637 ret = int(p['return'])
616 op.records.add('changegroup', {'return': ret}, int(p['in-reply-to']))
638 op.records.add('changegroup', {'return': ret}, int(p['in-reply-to']))
617
639
General Comments 0
You need to be logged in to leave comments. Login now