##// END OF EJS Templates
bundle2: add reply awareness to unbundlerecords...
Pierre-Yves David -
r20996:ed3c5e18 default
parent child Browse files
Show More
@@ -1,568 +1,575 b''
1 # bundle2.py - generic container format to transmit arbitrary data.
1 # bundle2.py - generic container format to transmit arbitrary data.
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7 """Handling of the new bundle2 format
7 """Handling of the new bundle2 format
8
8
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 payloads in an application agnostic way. It consist in a sequence of "parts"
10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 that will be handed to and processed by the application layer.
11 that will be handed to and processed by the application layer.
12
12
13
13
14 General format architecture
14 General format architecture
15 ===========================
15 ===========================
16
16
17 The format is architectured as follow
17 The format is architectured as follow
18
18
19 - magic string
19 - magic string
20 - stream level parameters
20 - stream level parameters
21 - payload parts (any number)
21 - payload parts (any number)
22 - end of stream marker.
22 - end of stream marker.
23
23
24 the Binary format
24 the Binary format
25 ============================
25 ============================
26
26
27 All numbers are unsigned and big endian.
27 All numbers are unsigned and big endian.
28
28
29 stream level parameters
29 stream level parameters
30 ------------------------
30 ------------------------
31
31
32 Binary format is as follow
32 Binary format is as follow
33
33
34 :params size: (16 bits integer)
34 :params size: (16 bits integer)
35
35
36 The total number of Bytes used by the parameters
36 The total number of Bytes used by the parameters
37
37
38 :params value: arbitrary number of Bytes
38 :params value: arbitrary number of Bytes
39
39
40 A blob of `params size` containing the serialized version of all stream level
40 A blob of `params size` containing the serialized version of all stream level
41 parameters.
41 parameters.
42
42
43 The blob contains a space separated list of parameters. parameter with value
43 The blob contains a space separated list of parameters. parameter with value
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45
45
46 Empty name are obviously forbidden.
46 Empty name are obviously forbidden.
47
47
48 Name MUST start with a letter. If this first letter is lower case, the
48 Name MUST start with a letter. If this first letter is lower case, the
49 parameter is advisory and can be safefly ignored. However when the first
49 parameter is advisory and can be safefly ignored. However when the first
50 letter is capital, the parameter is mandatory and the bundling process MUST
50 letter is capital, the parameter is mandatory and the bundling process MUST
51 stop if he is not able to proceed it.
51 stop if he is not able to proceed it.
52
52
53 Stream parameters use a simple textual format for two main reasons:
53 Stream parameters use a simple textual format for two main reasons:
54
54
55 - Stream level parameters should remains simple and we want to discourage any
55 - Stream level parameters should remains simple and we want to discourage any
56 crazy usage.
56 crazy usage.
57 - Textual data allow easy human inspection of a the bundle2 header in case of
57 - Textual data allow easy human inspection of a the bundle2 header in case of
58 troubles.
58 troubles.
59
59
60 Any Applicative level options MUST go into a bundle2 part instead.
60 Any Applicative level options MUST go into a bundle2 part instead.
61
61
62 Payload part
62 Payload part
63 ------------------------
63 ------------------------
64
64
65 Binary format is as follow
65 Binary format is as follow
66
66
67 :header size: (16 bits inter)
67 :header size: (16 bits inter)
68
68
69 The total number of Bytes used by the part headers. When the header is empty
69 The total number of Bytes used by the part headers. When the header is empty
70 (size = 0) this is interpreted as the end of stream marker.
70 (size = 0) this is interpreted as the end of stream marker.
71
71
72 :header:
72 :header:
73
73
74 The header defines how to interpret the part. It contains two piece of
74 The header defines how to interpret the part. It contains two piece of
75 data: the part type, and the part parameters.
75 data: the part type, and the part parameters.
76
76
77 The part type is used to route an application level handler, that can
77 The part type is used to route an application level handler, that can
78 interpret payload.
78 interpret payload.
79
79
80 Part parameters are passed to the application level handler. They are
80 Part parameters are passed to the application level handler. They are
81 meant to convey information that will help the application level object to
81 meant to convey information that will help the application level object to
82 interpret the part payload.
82 interpret the part payload.
83
83
84 The binary format of the header is has follow
84 The binary format of the header is has follow
85
85
86 :typesize: (one byte)
86 :typesize: (one byte)
87
87
88 :typename: alphanumerical part name
88 :typename: alphanumerical part name
89
89
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 to this part.
91 to this part.
92
92
93 :parameters:
93 :parameters:
94
94
95 Part's parameter may have arbitraty content, the binary structure is::
95 Part's parameter may have arbitraty content, the binary structure is::
96
96
97 <mandatory-count><advisory-count><param-sizes><param-data>
97 <mandatory-count><advisory-count><param-sizes><param-data>
98
98
99 :mandatory-count: 1 byte, number of mandatory parameters
99 :mandatory-count: 1 byte, number of mandatory parameters
100
100
101 :advisory-count: 1 byte, number of advisory parameters
101 :advisory-count: 1 byte, number of advisory parameters
102
102
103 :param-sizes:
103 :param-sizes:
104
104
105 N couple of bytes, where N is the total number of parameters. Each
105 N couple of bytes, where N is the total number of parameters. Each
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107
107
108 :param-data:
108 :param-data:
109
109
110 A blob of bytes from which each parameter key and value can be
110 A blob of bytes from which each parameter key and value can be
111 retrieved using the list of size couples stored in the previous
111 retrieved using the list of size couples stored in the previous
112 field.
112 field.
113
113
114 Mandatory parameters comes first, then the advisory ones.
114 Mandatory parameters comes first, then the advisory ones.
115
115
116 :payload:
116 :payload:
117
117
118 payload is a series of `<chunksize><chunkdata>`.
118 payload is a series of `<chunksize><chunkdata>`.
119
119
120 `chunksize` is a 32 bits integer, `chunkdata` are plain bytes (as much as
120 `chunksize` is a 32 bits integer, `chunkdata` are plain bytes (as much as
121 `chunksize` says)` The payload part is concluded by a zero size chunk.
121 `chunksize` says)` The payload part is concluded by a zero size chunk.
122
122
123 The current implementation always produces either zero or one chunk.
123 The current implementation always produces either zero or one chunk.
124 This is an implementation limitation that will ultimatly be lifted.
124 This is an implementation limitation that will ultimatly be lifted.
125
125
126 Bundle processing
126 Bundle processing
127 ============================
127 ============================
128
128
129 Each part is processed in order using a "part handler". Handler are registered
129 Each part is processed in order using a "part handler". Handler are registered
130 for a certain part type.
130 for a certain part type.
131
131
132 The matching of a part to its handler is case insensitive. The case of the
132 The matching of a part to its handler is case insensitive. The case of the
133 part type is used to know if a part is mandatory or advisory. If the Part type
133 part type is used to know if a part is mandatory or advisory. If the Part type
134 contains any uppercase char it is considered mandatory. When no handler is
134 contains any uppercase char it is considered mandatory. When no handler is
135 known for a Mandatory part, the process is aborted and an exception is raised.
135 known for a Mandatory part, the process is aborted and an exception is raised.
136 If the part is advisory and no handler is known, the part is ignored. When the
136 If the part is advisory and no handler is known, the part is ignored. When the
137 process is aborted, the full bundle is still read from the stream to keep the
137 process is aborted, the full bundle is still read from the stream to keep the
138 channel usable. But none of the part read from an abort are processed. In the
138 channel usable. But none of the part read from an abort are processed. In the
139 future, dropping the stream may become an option for channel we do not care to
139 future, dropping the stream may become an option for channel we do not care to
140 preserve.
140 preserve.
141 """
141 """
142
142
143 import util
143 import util
144 import struct
144 import struct
145 import urllib
145 import urllib
146 import string
146 import string
147 import StringIO
147 import StringIO
148
148
149 import changegroup
149 import changegroup
150 from i18n import _
150 from i18n import _
151
151
152 _pack = struct.pack
152 _pack = struct.pack
153 _unpack = struct.unpack
153 _unpack = struct.unpack
154
154
155 _magicstring = 'HG20'
155 _magicstring = 'HG20'
156
156
157 _fstreamparamsize = '>H'
157 _fstreamparamsize = '>H'
158 _fpartheadersize = '>H'
158 _fpartheadersize = '>H'
159 _fparttypesize = '>B'
159 _fparttypesize = '>B'
160 _fpartid = '>I'
160 _fpartid = '>I'
161 _fpayloadsize = '>I'
161 _fpayloadsize = '>I'
162 _fpartparamcount = '>BB'
162 _fpartparamcount = '>BB'
163
163
164 def _makefpartparamsizes(nbparams):
164 def _makefpartparamsizes(nbparams):
165 """return a struct format to read part parameter sizes
165 """return a struct format to read part parameter sizes
166
166
167 The number parameters is variable so we need to build that format
167 The number parameters is variable so we need to build that format
168 dynamically.
168 dynamically.
169 """
169 """
170 return '>'+('BB'*nbparams)
170 return '>'+('BB'*nbparams)
171
171
172 parthandlermapping = {}
172 parthandlermapping = {}
173
173
174 def parthandler(parttype):
174 def parthandler(parttype):
175 """decorator that register a function as a bundle2 part handler
175 """decorator that register a function as a bundle2 part handler
176
176
177 eg::
177 eg::
178
178
179 @parthandler('myparttype')
179 @parthandler('myparttype')
180 def myparttypehandler(...):
180 def myparttypehandler(...):
181 '''process a part of type "my part".'''
181 '''process a part of type "my part".'''
182 ...
182 ...
183 """
183 """
184 def _decorator(func):
184 def _decorator(func):
185 lparttype = parttype.lower() # enforce lower case matching.
185 lparttype = parttype.lower() # enforce lower case matching.
186 assert lparttype not in parthandlermapping
186 assert lparttype not in parthandlermapping
187 parthandlermapping[lparttype] = func
187 parthandlermapping[lparttype] = func
188 return func
188 return func
189 return _decorator
189 return _decorator
190
190
191 class unbundlerecords(object):
191 class unbundlerecords(object):
192 """keep record of what happens during and unbundle
192 """keep record of what happens during and unbundle
193
193
194 New records are added using `records.add('cat', obj)`. Where 'cat' is a
194 New records are added using `records.add('cat', obj)`. Where 'cat' is a
195 category of record and obj is an arbitraty object.
195 category of record and obj is an arbitraty object.
196
196
197 `records['cat']` will return all entries of this category 'cat'.
197 `records['cat']` will return all entries of this category 'cat'.
198
198
199 Iterating on the object itself will yield `('category', obj)` tuples
199 Iterating on the object itself will yield `('category', obj)` tuples
200 for all entries.
200 for all entries.
201
201
202 All iterations happens in chronological order.
202 All iterations happens in chronological order.
203 """
203 """
204
204
205 def __init__(self):
205 def __init__(self):
206 self._categories = {}
206 self._categories = {}
207 self._sequences = []
207 self._sequences = []
208 self._replies = {}
208
209
209 def add(self, category, entry):
210 def add(self, category, entry, inreplyto=None):
210 """add a new record of a given category.
211 """add a new record of a given category.
211
212
212 The entry can then be retrieved in the list returned by
213 The entry can then be retrieved in the list returned by
213 self['category']."""
214 self['category']."""
214 self._categories.setdefault(category, []).append(entry)
215 self._categories.setdefault(category, []).append(entry)
215 self._sequences.append((category, entry))
216 self._sequences.append((category, entry))
217 if inreplyto is not None:
218 self.getreplies(inreplyto).add(category, entry)
219
220 def getreplies(self, partid):
221 """get the subrecords that replies to a specific part"""
222 return self._replies.setdefault(partid, unbundlerecords())
216
223
217 def __getitem__(self, cat):
224 def __getitem__(self, cat):
218 return tuple(self._categories.get(cat, ()))
225 return tuple(self._categories.get(cat, ()))
219
226
220 def __iter__(self):
227 def __iter__(self):
221 return iter(self._sequences)
228 return iter(self._sequences)
222
229
223 def __len__(self):
230 def __len__(self):
224 return len(self._sequences)
231 return len(self._sequences)
225
232
226 def __nonzero__(self):
233 def __nonzero__(self):
227 return bool(self._sequences)
234 return bool(self._sequences)
228
235
229 class bundleoperation(object):
236 class bundleoperation(object):
230 """an object that represents a single bundling process
237 """an object that represents a single bundling process
231
238
232 Its purpose is to carry unbundle-related objects and states.
239 Its purpose is to carry unbundle-related objects and states.
233
240
234 A new object should be created at the beginning of each bundle processing.
241 A new object should be created at the beginning of each bundle processing.
235 The object is to be returned by the processing function.
242 The object is to be returned by the processing function.
236
243
237 The object has very little content now it will ultimately contain:
244 The object has very little content now it will ultimately contain:
238 * an access to the repo the bundle is applied to,
245 * an access to the repo the bundle is applied to,
239 * a ui object,
246 * a ui object,
240 * a way to retrieve a transaction to add changes to the repo,
247 * a way to retrieve a transaction to add changes to the repo,
241 * a way to record the result of processing each part,
248 * a way to record the result of processing each part,
242 * a way to construct a bundle response when applicable.
249 * a way to construct a bundle response when applicable.
243 """
250 """
244
251
245 def __init__(self, repo, transactiongetter):
252 def __init__(self, repo, transactiongetter):
246 self.repo = repo
253 self.repo = repo
247 self.ui = repo.ui
254 self.ui = repo.ui
248 self.records = unbundlerecords()
255 self.records = unbundlerecords()
249 self.gettransaction = transactiongetter
256 self.gettransaction = transactiongetter
250
257
251 class TransactionUnavailable(RuntimeError):
258 class TransactionUnavailable(RuntimeError):
252 pass
259 pass
253
260
254 def _notransaction():
261 def _notransaction():
255 """default method to get a transaction while processing a bundle
262 """default method to get a transaction while processing a bundle
256
263
257 Raise an exception to highlight the fact that no transaction was expected
264 Raise an exception to highlight the fact that no transaction was expected
258 to be created"""
265 to be created"""
259 raise TransactionUnavailable()
266 raise TransactionUnavailable()
260
267
261 def processbundle(repo, unbundler, transactiongetter=_notransaction):
268 def processbundle(repo, unbundler, transactiongetter=_notransaction):
262 """This function process a bundle, apply effect to/from a repo
269 """This function process a bundle, apply effect to/from a repo
263
270
264 It iterates over each part then searches for and uses the proper handling
271 It iterates over each part then searches for and uses the proper handling
265 code to process the part. Parts are processed in order.
272 code to process the part. Parts are processed in order.
266
273
267 This is very early version of this function that will be strongly reworked
274 This is very early version of this function that will be strongly reworked
268 before final usage.
275 before final usage.
269
276
270 Unknown Mandatory part will abort the process.
277 Unknown Mandatory part will abort the process.
271 """
278 """
272 op = bundleoperation(repo, transactiongetter)
279 op = bundleoperation(repo, transactiongetter)
273 # todo:
280 # todo:
274 # - replace this is a init function soon.
281 # - replace this is a init function soon.
275 # - exception catching
282 # - exception catching
276 unbundler.params
283 unbundler.params
277 iterparts = iter(unbundler)
284 iterparts = iter(unbundler)
278 try:
285 try:
279 for part in iterparts:
286 for part in iterparts:
280 parttype = part.type
287 parttype = part.type
281 # part key are matched lower case
288 # part key are matched lower case
282 key = parttype.lower()
289 key = parttype.lower()
283 try:
290 try:
284 handler = parthandlermapping[key]
291 handler = parthandlermapping[key]
285 op.ui.debug('found a handler for part %r\n' % parttype)
292 op.ui.debug('found a handler for part %r\n' % parttype)
286 except KeyError:
293 except KeyError:
287 if key != parttype: # mandatory parts
294 if key != parttype: # mandatory parts
288 # todo:
295 # todo:
289 # - use a more precise exception
296 # - use a more precise exception
290 raise
297 raise
291 op.ui.debug('ignoring unknown advisory part %r\n' % key)
298 op.ui.debug('ignoring unknown advisory part %r\n' % key)
292 # todo:
299 # todo:
293 # - consume the part once we use streaming
300 # - consume the part once we use streaming
294 continue
301 continue
295 handler(op, part)
302 handler(op, part)
296 except Exception:
303 except Exception:
297 for part in iterparts:
304 for part in iterparts:
298 pass # consume the bundle content
305 pass # consume the bundle content
299 raise
306 raise
300 return op
307 return op
301
308
302 class bundle20(object):
309 class bundle20(object):
303 """represent an outgoing bundle2 container
310 """represent an outgoing bundle2 container
304
311
305 Use the `addparam` method to add stream level parameter. and `addpart` to
312 Use the `addparam` method to add stream level parameter. and `addpart` to
306 populate it. Then call `getchunks` to retrieve all the binary chunks of
313 populate it. Then call `getchunks` to retrieve all the binary chunks of
307 datathat compose the bundle2 container."""
314 datathat compose the bundle2 container."""
308
315
309 def __init__(self, ui):
316 def __init__(self, ui):
310 self.ui = ui
317 self.ui = ui
311 self._params = []
318 self._params = []
312 self._parts = []
319 self._parts = []
313
320
314 def addparam(self, name, value=None):
321 def addparam(self, name, value=None):
315 """add a stream level parameter"""
322 """add a stream level parameter"""
316 if not name:
323 if not name:
317 raise ValueError('empty parameter name')
324 raise ValueError('empty parameter name')
318 if name[0] not in string.letters:
325 if name[0] not in string.letters:
319 raise ValueError('non letter first character: %r' % name)
326 raise ValueError('non letter first character: %r' % name)
320 self._params.append((name, value))
327 self._params.append((name, value))
321
328
322 def addpart(self, part):
329 def addpart(self, part):
323 """add a new part to the bundle2 container
330 """add a new part to the bundle2 container
324
331
325 Parts contains the actuall applicative payload."""
332 Parts contains the actuall applicative payload."""
326 assert part.id is None
333 assert part.id is None
327 part.id = len(self._parts) # very cheap counter
334 part.id = len(self._parts) # very cheap counter
328 self._parts.append(part)
335 self._parts.append(part)
329
336
330 def getchunks(self):
337 def getchunks(self):
331 self.ui.debug('start emission of %s stream\n' % _magicstring)
338 self.ui.debug('start emission of %s stream\n' % _magicstring)
332 yield _magicstring
339 yield _magicstring
333 param = self._paramchunk()
340 param = self._paramchunk()
334 self.ui.debug('bundle parameter: %s\n' % param)
341 self.ui.debug('bundle parameter: %s\n' % param)
335 yield _pack(_fstreamparamsize, len(param))
342 yield _pack(_fstreamparamsize, len(param))
336 if param:
343 if param:
337 yield param
344 yield param
338
345
339 self.ui.debug('start of parts\n')
346 self.ui.debug('start of parts\n')
340 for part in self._parts:
347 for part in self._parts:
341 self.ui.debug('bundle part: "%s"\n' % part.type)
348 self.ui.debug('bundle part: "%s"\n' % part.type)
342 for chunk in part.getchunks():
349 for chunk in part.getchunks():
343 yield chunk
350 yield chunk
344 self.ui.debug('end of bundle\n')
351 self.ui.debug('end of bundle\n')
345 yield '\0\0'
352 yield '\0\0'
346
353
347 def _paramchunk(self):
354 def _paramchunk(self):
348 """return a encoded version of all stream parameters"""
355 """return a encoded version of all stream parameters"""
349 blocks = []
356 blocks = []
350 for par, value in self._params:
357 for par, value in self._params:
351 par = urllib.quote(par)
358 par = urllib.quote(par)
352 if value is not None:
359 if value is not None:
353 value = urllib.quote(value)
360 value = urllib.quote(value)
354 par = '%s=%s' % (par, value)
361 par = '%s=%s' % (par, value)
355 blocks.append(par)
362 blocks.append(par)
356 return ' '.join(blocks)
363 return ' '.join(blocks)
357
364
358 class unbundle20(object):
365 class unbundle20(object):
359 """interpret a bundle2 stream
366 """interpret a bundle2 stream
360
367
361 (this will eventually yield parts)"""
368 (this will eventually yield parts)"""
362
369
363 def __init__(self, ui, fp):
370 def __init__(self, ui, fp):
364 self.ui = ui
371 self.ui = ui
365 self._fp = fp
372 self._fp = fp
366 header = self._readexact(4)
373 header = self._readexact(4)
367 magic, version = header[0:2], header[2:4]
374 magic, version = header[0:2], header[2:4]
368 if magic != 'HG':
375 if magic != 'HG':
369 raise util.Abort(_('not a Mercurial bundle'))
376 raise util.Abort(_('not a Mercurial bundle'))
370 if version != '20':
377 if version != '20':
371 raise util.Abort(_('unknown bundle version %s') % version)
378 raise util.Abort(_('unknown bundle version %s') % version)
372 self.ui.debug('start processing of %s stream\n' % header)
379 self.ui.debug('start processing of %s stream\n' % header)
373
380
374 def _unpack(self, format):
381 def _unpack(self, format):
375 """unpack this struct format from the stream"""
382 """unpack this struct format from the stream"""
376 data = self._readexact(struct.calcsize(format))
383 data = self._readexact(struct.calcsize(format))
377 return _unpack(format, data)
384 return _unpack(format, data)
378
385
379 def _readexact(self, size):
386 def _readexact(self, size):
380 """read exactly <size> bytes from the stream"""
387 """read exactly <size> bytes from the stream"""
381 return changegroup.readexactly(self._fp, size)
388 return changegroup.readexactly(self._fp, size)
382
389
383 @util.propertycache
390 @util.propertycache
384 def params(self):
391 def params(self):
385 """dictionnary of stream level parameters"""
392 """dictionnary of stream level parameters"""
386 self.ui.debug('reading bundle2 stream parameters\n')
393 self.ui.debug('reading bundle2 stream parameters\n')
387 params = {}
394 params = {}
388 paramssize = self._unpack(_fstreamparamsize)[0]
395 paramssize = self._unpack(_fstreamparamsize)[0]
389 if paramssize:
396 if paramssize:
390 for p in self._readexact(paramssize).split(' '):
397 for p in self._readexact(paramssize).split(' '):
391 p = p.split('=', 1)
398 p = p.split('=', 1)
392 p = [urllib.unquote(i) for i in p]
399 p = [urllib.unquote(i) for i in p]
393 if len(p) < 2:
400 if len(p) < 2:
394 p.append(None)
401 p.append(None)
395 self._processparam(*p)
402 self._processparam(*p)
396 params[p[0]] = p[1]
403 params[p[0]] = p[1]
397 return params
404 return params
398
405
399 def _processparam(self, name, value):
406 def _processparam(self, name, value):
400 """process a parameter, applying its effect if needed
407 """process a parameter, applying its effect if needed
401
408
402 Parameter starting with a lower case letter are advisory and will be
409 Parameter starting with a lower case letter are advisory and will be
403 ignored when unknown. Those starting with an upper case letter are
410 ignored when unknown. Those starting with an upper case letter are
404 mandatory and will this function will raise a KeyError when unknown.
411 mandatory and will this function will raise a KeyError when unknown.
405
412
406 Note: no option are currently supported. Any input will be either
413 Note: no option are currently supported. Any input will be either
407 ignored or failing.
414 ignored or failing.
408 """
415 """
409 if not name:
416 if not name:
410 raise ValueError('empty parameter name')
417 raise ValueError('empty parameter name')
411 if name[0] not in string.letters:
418 if name[0] not in string.letters:
412 raise ValueError('non letter first character: %r' % name)
419 raise ValueError('non letter first character: %r' % name)
413 # Some logic will be later added here to try to process the option for
420 # Some logic will be later added here to try to process the option for
414 # a dict of known parameter.
421 # a dict of known parameter.
415 if name[0].islower():
422 if name[0].islower():
416 self.ui.debug("ignoring unknown parameter %r\n" % name)
423 self.ui.debug("ignoring unknown parameter %r\n" % name)
417 else:
424 else:
418 raise KeyError(name)
425 raise KeyError(name)
419
426
420
427
421 def __iter__(self):
428 def __iter__(self):
422 """yield all parts contained in the stream"""
429 """yield all parts contained in the stream"""
423 # make sure param have been loaded
430 # make sure param have been loaded
424 self.params
431 self.params
425 self.ui.debug('start extraction of bundle2 parts\n')
432 self.ui.debug('start extraction of bundle2 parts\n')
426 part = self._readpart()
433 part = self._readpart()
427 while part is not None:
434 while part is not None:
428 yield part
435 yield part
429 part = self._readpart()
436 part = self._readpart()
430 self.ui.debug('end of bundle2 stream\n')
437 self.ui.debug('end of bundle2 stream\n')
431
438
432 def _readpart(self):
439 def _readpart(self):
433 """return None when an end of stream markers is reach"""
440 """return None when an end of stream markers is reach"""
434
441
435 headersize = self._unpack(_fpartheadersize)[0]
442 headersize = self._unpack(_fpartheadersize)[0]
436 self.ui.debug('part header size: %i\n' % headersize)
443 self.ui.debug('part header size: %i\n' % headersize)
437 if not headersize:
444 if not headersize:
438 return None
445 return None
439 headerblock = self._readexact(headersize)
446 headerblock = self._readexact(headersize)
440 # some utility to help reading from the header block
447 # some utility to help reading from the header block
441 self._offset = 0 # layer violation to have something easy to understand
448 self._offset = 0 # layer violation to have something easy to understand
442 def fromheader(size):
449 def fromheader(size):
443 """return the next <size> byte from the header"""
450 """return the next <size> byte from the header"""
444 offset = self._offset
451 offset = self._offset
445 data = headerblock[offset:(offset + size)]
452 data = headerblock[offset:(offset + size)]
446 self._offset = offset + size
453 self._offset = offset + size
447 return data
454 return data
448 def unpackheader(format):
455 def unpackheader(format):
449 """read given format from header
456 """read given format from header
450
457
451 This automatically compute the size of the format to read."""
458 This automatically compute the size of the format to read."""
452 data = fromheader(struct.calcsize(format))
459 data = fromheader(struct.calcsize(format))
453 return _unpack(format, data)
460 return _unpack(format, data)
454
461
455 typesize = unpackheader(_fparttypesize)[0]
462 typesize = unpackheader(_fparttypesize)[0]
456 parttype = fromheader(typesize)
463 parttype = fromheader(typesize)
457 self.ui.debug('part type: "%s"\n' % parttype)
464 self.ui.debug('part type: "%s"\n' % parttype)
458 partid = unpackheader(_fpartid)[0]
465 partid = unpackheader(_fpartid)[0]
459 self.ui.debug('part id: "%s"\n' % partid)
466 self.ui.debug('part id: "%s"\n' % partid)
460 ## reading parameters
467 ## reading parameters
461 # param count
468 # param count
462 mancount, advcount = unpackheader(_fpartparamcount)
469 mancount, advcount = unpackheader(_fpartparamcount)
463 self.ui.debug('part parameters: %i\n' % (mancount + advcount))
470 self.ui.debug('part parameters: %i\n' % (mancount + advcount))
464 # param size
471 # param size
465 paramsizes = unpackheader(_makefpartparamsizes(mancount + advcount))
472 paramsizes = unpackheader(_makefpartparamsizes(mancount + advcount))
466 # make it a list of couple again
473 # make it a list of couple again
467 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
474 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
468 # split mandatory from advisory
475 # split mandatory from advisory
469 mansizes = paramsizes[:mancount]
476 mansizes = paramsizes[:mancount]
470 advsizes = paramsizes[mancount:]
477 advsizes = paramsizes[mancount:]
471 # retrive param value
478 # retrive param value
472 manparams = []
479 manparams = []
473 for key, value in mansizes:
480 for key, value in mansizes:
474 manparams.append((fromheader(key), fromheader(value)))
481 manparams.append((fromheader(key), fromheader(value)))
475 advparams = []
482 advparams = []
476 for key, value in advsizes:
483 for key, value in advsizes:
477 advparams.append((fromheader(key), fromheader(value)))
484 advparams.append((fromheader(key), fromheader(value)))
478 del self._offset # clean up layer, nobody saw anything.
485 del self._offset # clean up layer, nobody saw anything.
479 ## part payload
486 ## part payload
480 payload = []
487 payload = []
481 payloadsize = self._unpack(_fpayloadsize)[0]
488 payloadsize = self._unpack(_fpayloadsize)[0]
482 self.ui.debug('payload chunk size: %i\n' % payloadsize)
489 self.ui.debug('payload chunk size: %i\n' % payloadsize)
483 while payloadsize:
490 while payloadsize:
484 payload.append(self._readexact(payloadsize))
491 payload.append(self._readexact(payloadsize))
485 payloadsize = self._unpack(_fpayloadsize)[0]
492 payloadsize = self._unpack(_fpayloadsize)[0]
486 self.ui.debug('payload chunk size: %i\n' % payloadsize)
493 self.ui.debug('payload chunk size: %i\n' % payloadsize)
487 payload = ''.join(payload)
494 payload = ''.join(payload)
488 current = part(parttype, manparams, advparams, data=payload)
495 current = part(parttype, manparams, advparams, data=payload)
489 current.id = partid
496 current.id = partid
490 return current
497 return current
491
498
492
499
493 class part(object):
500 class part(object):
494 """A bundle2 part contains application level payload
501 """A bundle2 part contains application level payload
495
502
496 The part `type` is used to route the part to the application level
503 The part `type` is used to route the part to the application level
497 handler.
504 handler.
498 """
505 """
499
506
500 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
507 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
501 data=''):
508 data=''):
502 self.id = None
509 self.id = None
503 self.type = parttype
510 self.type = parttype
504 self.data = data
511 self.data = data
505 self.mandatoryparams = mandatoryparams
512 self.mandatoryparams = mandatoryparams
506 self.advisoryparams = advisoryparams
513 self.advisoryparams = advisoryparams
507
514
508 def getchunks(self):
515 def getchunks(self):
509 #### header
516 #### header
510 ## parttype
517 ## parttype
511 header = [_pack(_fparttypesize, len(self.type)),
518 header = [_pack(_fparttypesize, len(self.type)),
512 self.type, _pack(_fpartid, self.id),
519 self.type, _pack(_fpartid, self.id),
513 ]
520 ]
514 ## parameters
521 ## parameters
515 # count
522 # count
516 manpar = self.mandatoryparams
523 manpar = self.mandatoryparams
517 advpar = self.advisoryparams
524 advpar = self.advisoryparams
518 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
525 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
519 # size
526 # size
520 parsizes = []
527 parsizes = []
521 for key, value in manpar:
528 for key, value in manpar:
522 parsizes.append(len(key))
529 parsizes.append(len(key))
523 parsizes.append(len(value))
530 parsizes.append(len(value))
524 for key, value in advpar:
531 for key, value in advpar:
525 parsizes.append(len(key))
532 parsizes.append(len(key))
526 parsizes.append(len(value))
533 parsizes.append(len(value))
527 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
534 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
528 header.append(paramsizes)
535 header.append(paramsizes)
529 # key, value
536 # key, value
530 for key, value in manpar:
537 for key, value in manpar:
531 header.append(key)
538 header.append(key)
532 header.append(value)
539 header.append(value)
533 for key, value in advpar:
540 for key, value in advpar:
534 header.append(key)
541 header.append(key)
535 header.append(value)
542 header.append(value)
536 ## finalize header
543 ## finalize header
537 headerchunk = ''.join(header)
544 headerchunk = ''.join(header)
538 yield _pack(_fpartheadersize, len(headerchunk))
545 yield _pack(_fpartheadersize, len(headerchunk))
539 yield headerchunk
546 yield headerchunk
540 ## payload
547 ## payload
541 # we only support fixed size data now.
548 # we only support fixed size data now.
542 # This will be improved in the future.
549 # This will be improved in the future.
543 if len(self.data):
550 if len(self.data):
544 yield _pack(_fpayloadsize, len(self.data))
551 yield _pack(_fpayloadsize, len(self.data))
545 yield self.data
552 yield self.data
546 # end of payload
553 # end of payload
547 yield _pack(_fpayloadsize, 0)
554 yield _pack(_fpayloadsize, 0)
548
555
549 @parthandler('changegroup')
556 @parthandler('changegroup')
550 def handlechangegroup(op, part):
557 def handlechangegroup(op, part):
551 """apply a changegroup part on the repo
558 """apply a changegroup part on the repo
552
559
553 This is a very early implementation that will massive rework before being
560 This is a very early implementation that will massive rework before being
554 inflicted to any end-user.
561 inflicted to any end-user.
555 """
562 """
556 # Make sure we trigger a transaction creation
563 # Make sure we trigger a transaction creation
557 #
564 #
558 # The addchangegroup function will get a transaction object by itself, but
565 # The addchangegroup function will get a transaction object by itself, but
559 # we need to make sure we trigger the creation of a transaction object used
566 # we need to make sure we trigger the creation of a transaction object used
560 # for the whole processing scope.
567 # for the whole processing scope.
561 op.gettransaction()
568 op.gettransaction()
562 data = StringIO.StringIO(part.data)
569 data = StringIO.StringIO(part.data)
563 data.seek(0)
570 data.seek(0)
564 cg = changegroup.readbundle(data, 'bundle2part')
571 cg = changegroup.readbundle(data, 'bundle2part')
565 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
572 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
566 op.records.add('changegroup', {'return': ret})
573 op.records.add('changegroup', {'return': ret})
567
574
568
575
General Comments 0
You need to be logged in to leave comments. Login now