##// END OF EJS Templates
bundle2: safely read unpack data from part header...
Pierre-Yves David -
r20887:662b79be default
parent child Browse files
Show More
@@ -1,385 +1,391
1 1 # bundle2.py - generic container format to transmit arbitrary data.
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 """Handling of the new bundle2 format
8 8
9 9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 11 that will be handed to and processed by the application layer.
12 12
13 13
14 14 General format architecture
15 15 ===========================
16 16
17 17 The format is architectured as follow
18 18
19 19 - magic string
20 20 - stream level parameters
21 21 - payload parts (any number)
22 22 - end of stream marker.
23 23
24 24 the Binary format
25 25 ============================
26 26
27 27 All numbers are unsigned and big endian.
28 28
29 29 stream level parameters
30 30 ------------------------
31 31
32 32 Binary format is as follow
33 33
34 34 :params size: (16 bits integer)
35 35
36 36 The total number of Bytes used by the parameters
37 37
38 38 :params value: arbitrary number of Bytes
39 39
40 40 A blob of `params size` containing the serialized version of all stream level
41 41 parameters.
42 42
43 43 The blob contains a space separated list of parameters. parameter with value
44 44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45 45
46 46 Empty name are obviously forbidden.
47 47
48 48 Name MUST start with a letter. If this first letter is lower case, the
49 49 parameter is advisory and can be safefly ignored. However when the first
50 50 letter is capital, the parameter is mandatory and the bundling process MUST
51 51 stop if he is not able to proceed it.
52 52
53 53 Stream parameters use a simple textual format for two main reasons:
54 54
55 55 - Stream level parameters should remains simple and we want to discourage any
56 56 crazy usage.
57 57 - Textual data allow easy human inspection of a the bundle2 header in case of
58 58 troubles.
59 59
60 60 Any Applicative level options MUST go into a bundle2 part instead.
61 61
62 62 Payload part
63 63 ------------------------
64 64
65 65 Binary format is as follow
66 66
67 67 :header size: (16 bits inter)
68 68
69 69 The total number of Bytes used by the part headers. When the header is empty
70 70 (size = 0) this is interpreted as the end of stream marker.
71 71
72 72 :header:
73 73
74 74 The header defines how to interpret the part. It contains two piece of
75 75 data: the part type, and the part parameters.
76 76
77 77 The part type is used to route an application level handler, that can
78 78 interpret payload.
79 79
80 80 Part parameters are passed to the application level handler. They are
81 81 meant to convey information that will help the application level object to
82 82 interpret the part payload.
83 83
84 84 The binary format of the header is has follow
85 85
86 86 :typesize: (one byte)
87 87
88 88 :typename: alphanumerical part name
89 89
90 90 :parameters:
91 91
92 92 Part's parameter may have arbitraty content, the binary structure is::
93 93
94 94 <mandatory-count><advisory-count><param-sizes><param-data>
95 95
96 96 :mandatory-count: 1 byte, number of mandatory parameters
97 97
98 98 :advisory-count: 1 byte, number of advisory parameters
99 99
100 100 :param-sizes:
101 101
102 102 N couple of bytes, where N is the total number of parameters. Each
103 103 couple contains (<size-of-key>, <size-of-value) for one parameter.
104 104
105 105 :param-data:
106 106
107 107 A blob of bytes from which each parameter key and value can be
108 108 retrieved using the list of size couples stored in the previous
109 109 field.
110 110
111 111 Mandatory parameters comes first, then the advisory ones.
112 112
113 113 :payload:
114 114
115 115 payload is a series of `<chunksize><chunkdata>`.
116 116
117 117 `chunksize` is a 32 bits integer, `chunkdata` are plain bytes (as much as
118 118 `chunksize` says)` The payload part is concluded by a zero size chunk.
119 119
120 120 The current implementation always produces either zero or one chunk.
121 121 This is an implementation limitation that will ultimatly be lifted.
122 122 """
123 123
124 124 import util
125 125 import struct
126 126 import urllib
127 127 import string
128 128
129 129 import changegroup
130 130 from i18n import _
131 131
132 132 _pack = struct.pack
133 133 _unpack = struct.unpack
134 134
135 135 _magicstring = 'HG20'
136 136
137 137 _fstreamparamsize = '>H'
138 138 _fpartheadersize = '>H'
139 139 _fparttypesize = '>B'
140 140 _fpayloadsize = '>I'
141 141 _fpartparamcount = '>BB'
142 142
143 143 def _makefpartparamsizes(nbparams):
144 144 """return a struct format to read part parameter sizes
145 145
146 146 The number parameters is variable so we need to build that format
147 147 dynamically.
148 148 """
149 149 return '>'+('BB'*nbparams)
150 150
151 151 class bundle20(object):
152 152 """represent an outgoing bundle2 container
153 153
154 154 Use the `addparam` method to add stream level parameter. and `addpart` to
155 155 populate it. Then call `getchunks` to retrieve all the binary chunks of
156 156 datathat compose the bundle2 container."""
157 157
158 158 def __init__(self, ui):
159 159 self.ui = ui
160 160 self._params = []
161 161 self._parts = []
162 162
163 163 def addparam(self, name, value=None):
164 164 """add a stream level parameter"""
165 165 if not name:
166 166 raise ValueError('empty parameter name')
167 167 if name[0] not in string.letters:
168 168 raise ValueError('non letter first character: %r' % name)
169 169 self._params.append((name, value))
170 170
171 171 def addpart(self, part):
172 172 """add a new part to the bundle2 container
173 173
174 174 Parts contains the actuall applicative payload."""
175 175 self._parts.append(part)
176 176
177 177 def getchunks(self):
178 178 self.ui.debug('start emission of %s stream\n' % _magicstring)
179 179 yield _magicstring
180 180 param = self._paramchunk()
181 181 self.ui.debug('bundle parameter: %s\n' % param)
182 182 yield _pack(_fstreamparamsize, len(param))
183 183 if param:
184 184 yield param
185 185
186 186 self.ui.debug('start of parts\n')
187 187 for part in self._parts:
188 188 self.ui.debug('bundle part: "%s"\n' % part.type)
189 189 for chunk in part.getchunks():
190 190 yield chunk
191 191 self.ui.debug('end of bundle\n')
192 192 yield '\0\0'
193 193
194 194 def _paramchunk(self):
195 195 """return a encoded version of all stream parameters"""
196 196 blocks = []
197 197 for par, value in self._params:
198 198 par = urllib.quote(par)
199 199 if value is not None:
200 200 value = urllib.quote(value)
201 201 par = '%s=%s' % (par, value)
202 202 blocks.append(par)
203 203 return ' '.join(blocks)
204 204
205 205 class unbundle20(object):
206 206 """interpret a bundle2 stream
207 207
208 208 (this will eventually yield parts)"""
209 209
210 210 def __init__(self, ui, fp):
211 211 self.ui = ui
212 212 self._fp = fp
213 213 header = self._readexact(4)
214 214 magic, version = header[0:2], header[2:4]
215 215 if magic != 'HG':
216 216 raise util.Abort(_('not a Mercurial bundle'))
217 217 if version != '20':
218 218 raise util.Abort(_('unknown bundle version %s') % version)
219 219 self.ui.debug('start processing of %s stream\n' % header)
220 220
221 221 def _unpack(self, format):
222 222 """unpack this struct format from the stream"""
223 223 data = self._readexact(struct.calcsize(format))
224 224 return _unpack(format, data)
225 225
226 226 def _readexact(self, size):
227 227 """read exactly <size> bytes from the stream"""
228 228 return changegroup.readexactly(self._fp, size)
229 229
230 230 @util.propertycache
231 231 def params(self):
232 232 """dictionnary of stream level parameters"""
233 233 self.ui.debug('reading bundle2 stream parameters\n')
234 234 params = {}
235 235 paramssize = self._unpack(_fstreamparamsize)[0]
236 236 if paramssize:
237 237 for p in self._readexact(paramssize).split(' '):
238 238 p = p.split('=', 1)
239 239 p = [urllib.unquote(i) for i in p]
240 240 if len(p) < 2:
241 241 p.append(None)
242 242 self._processparam(*p)
243 243 params[p[0]] = p[1]
244 244 return params
245 245
246 246 def _processparam(self, name, value):
247 247 """process a parameter, applying its effect if needed
248 248
249 249 Parameter starting with a lower case letter are advisory and will be
250 250 ignored when unknown. Those starting with an upper case letter are
251 251 mandatory and will this function will raise a KeyError when unknown.
252 252
253 253 Note: no option are currently supported. Any input will be either
254 254 ignored or failing.
255 255 """
256 256 if not name:
257 257 raise ValueError('empty parameter name')
258 258 if name[0] not in string.letters:
259 259 raise ValueError('non letter first character: %r' % name)
260 260 # Some logic will be later added here to try to process the option for
261 261 # a dict of known parameter.
262 262 if name[0].islower():
263 263 self.ui.debug("ignoring unknown parameter %r\n" % name)
264 264 else:
265 265 raise KeyError(name)
266 266
267 267
268 268 def __iter__(self):
269 269 """yield all parts contained in the stream"""
270 270 # make sure param have been loaded
271 271 self.params
272 272 self.ui.debug('start extraction of bundle2 parts\n')
273 273 part = self._readpart()
274 274 while part is not None:
275 275 yield part
276 276 part = self._readpart()
277 277 self.ui.debug('end of bundle2 stream\n')
278 278
279 279 def _readpart(self):
280 280 """return None when an end of stream markers is reach"""
281 281
282 282 headersize = self._unpack(_fpartheadersize)[0]
283 283 self.ui.debug('part header size: %i\n' % headersize)
284 284 if not headersize:
285 285 return None
286 286 headerblock = self._readexact(headersize)
287 287 # some utility to help reading from the header block
288 288 self._offset = 0 # layer violation to have something easy to understand
289 289 def fromheader(size):
290 290 """return the next <size> byte from the header"""
291 291 offset = self._offset
292 292 data = headerblock[offset:(offset + size)]
293 293 self._offset = offset + size
294 294 return data
295 typesize = _unpack(_fparttypesize, fromheader(1))[0]
295 def unpackheader(format):
296 """read given format from header
297
298 This automatically compute the size of the format to read."""
299 data = fromheader(struct.calcsize(format))
300 return _unpack(format, data)
301
302 typesize = unpackheader(_fparttypesize)[0]
296 303 parttype = fromheader(typesize)
297 304 self.ui.debug('part type: "%s"\n' % parttype)
298 305 ## reading parameters
299 306 # param count
300 mancount, advcount = _unpack(_fpartparamcount, fromheader(2))
307 mancount, advcount = unpackheader(_fpartparamcount)
301 308 self.ui.debug('part parameters: %i\n' % (mancount + advcount))
302 309 # param size
303 paramsizes = _unpack(_makefpartparamsizes(mancount + advcount),
304 fromheader(2*(mancount + advcount)))
310 paramsizes = unpackheader(_makefpartparamsizes(mancount + advcount))
305 311 # make it a list of couple again
306 312 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
307 313 # split mandatory from advisory
308 314 mansizes = paramsizes[:mancount]
309 315 advsizes = paramsizes[mancount:]
310 316 # retrive param value
311 317 manparams = []
312 318 for key, value in mansizes:
313 319 manparams.append((fromheader(key), fromheader(value)))
314 320 advparams = []
315 321 for key, value in advsizes:
316 322 advparams.append((fromheader(key), fromheader(value)))
317 323 del self._offset # clean up layer, nobody saw anything.
318 324 ## part payload
319 325 payload = []
320 326 payloadsize = self._unpack(_fpayloadsize)[0]
321 327 self.ui.debug('payload chunk size: %i\n' % payloadsize)
322 328 while payloadsize:
323 329 payload.append(self._readexact(payloadsize))
324 330 payloadsize = self._unpack(_fpayloadsize)[0]
325 331 self.ui.debug('payload chunk size: %i\n' % payloadsize)
326 332 payload = ''.join(payload)
327 333 current = part(parttype, manparams, advparams, data=payload)
328 334 return current
329 335
330 336
331 337 class part(object):
332 338 """A bundle2 part contains application level payload
333 339
334 340 The part `type` is used to route the part to the application level
335 341 handler.
336 342 """
337 343
338 344 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
339 345 data=''):
340 346 self.type = parttype
341 347 self.data = data
342 348 self.mandatoryparams = mandatoryparams
343 349 self.advisoryparams = advisoryparams
344 350
345 351 def getchunks(self):
346 352 #### header
347 353 ## parttype
348 354 header = [_pack(_fparttypesize, len(self.type)),
349 355 self.type,
350 356 ]
351 357 ## parameters
352 358 # count
353 359 manpar = self.mandatoryparams
354 360 advpar = self.advisoryparams
355 361 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
356 362 # size
357 363 parsizes = []
358 364 for key, value in manpar:
359 365 parsizes.append(len(key))
360 366 parsizes.append(len(value))
361 367 for key, value in advpar:
362 368 parsizes.append(len(key))
363 369 parsizes.append(len(value))
364 370 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
365 371 header.append(paramsizes)
366 372 # key, value
367 373 for key, value in manpar:
368 374 header.append(key)
369 375 header.append(value)
370 376 for key, value in advpar:
371 377 header.append(key)
372 378 header.append(value)
373 379 ## finalize header
374 380 headerchunk = ''.join(header)
375 381 yield _pack(_fpartheadersize, len(headerchunk))
376 382 yield headerchunk
377 383 ## payload
378 384 # we only support fixed size data now.
379 385 # This will be improved in the future.
380 386 if len(self.data):
381 387 yield _pack(_fpayloadsize, len(self.data))
382 388 yield self.data
383 389 # end of payload
384 390 yield _pack(_fpayloadsize, 0)
385 391
General Comments 0
You need to be logged in to leave comments. Login now