##// END OF EJS Templates
bundle2: extract stream/unpack logic in an unpackermixin...
Pierre-Yves David -
r21013:a813caca default
parent child Browse files
Show More
@@ -1,610 +1,617
1 # bundle2.py - generic container format to transmit arbitrary data.
1 # bundle2.py - generic container format to transmit arbitrary data.
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7 """Handling of the new bundle2 format
7 """Handling of the new bundle2 format
8
8
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 payloads in an application agnostic way. It consist in a sequence of "parts"
10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 that will be handed to and processed by the application layer.
11 that will be handed to and processed by the application layer.
12
12
13
13
14 General format architecture
14 General format architecture
15 ===========================
15 ===========================
16
16
17 The format is architectured as follow
17 The format is architectured as follow
18
18
19 - magic string
19 - magic string
20 - stream level parameters
20 - stream level parameters
21 - payload parts (any number)
21 - payload parts (any number)
22 - end of stream marker.
22 - end of stream marker.
23
23
24 the Binary format
24 the Binary format
25 ============================
25 ============================
26
26
27 All numbers are unsigned and big endian.
27 All numbers are unsigned and big endian.
28
28
29 stream level parameters
29 stream level parameters
30 ------------------------
30 ------------------------
31
31
32 Binary format is as follow
32 Binary format is as follow
33
33
34 :params size: (16 bits integer)
34 :params size: (16 bits integer)
35
35
36 The total number of Bytes used by the parameters
36 The total number of Bytes used by the parameters
37
37
38 :params value: arbitrary number of Bytes
38 :params value: arbitrary number of Bytes
39
39
40 A blob of `params size` containing the serialized version of all stream level
40 A blob of `params size` containing the serialized version of all stream level
41 parameters.
41 parameters.
42
42
43 The blob contains a space separated list of parameters. parameter with value
43 The blob contains a space separated list of parameters. parameter with value
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45
45
46 Empty name are obviously forbidden.
46 Empty name are obviously forbidden.
47
47
48 Name MUST start with a letter. If this first letter is lower case, the
48 Name MUST start with a letter. If this first letter is lower case, the
49 parameter is advisory and can be safefly ignored. However when the first
49 parameter is advisory and can be safefly ignored. However when the first
50 letter is capital, the parameter is mandatory and the bundling process MUST
50 letter is capital, the parameter is mandatory and the bundling process MUST
51 stop if he is not able to proceed it.
51 stop if he is not able to proceed it.
52
52
53 Stream parameters use a simple textual format for two main reasons:
53 Stream parameters use a simple textual format for two main reasons:
54
54
55 - Stream level parameters should remains simple and we want to discourage any
55 - Stream level parameters should remains simple and we want to discourage any
56 crazy usage.
56 crazy usage.
57 - Textual data allow easy human inspection of a the bundle2 header in case of
57 - Textual data allow easy human inspection of a the bundle2 header in case of
58 troubles.
58 troubles.
59
59
60 Any Applicative level options MUST go into a bundle2 part instead.
60 Any Applicative level options MUST go into a bundle2 part instead.
61
61
62 Payload part
62 Payload part
63 ------------------------
63 ------------------------
64
64
65 Binary format is as follow
65 Binary format is as follow
66
66
67 :header size: (16 bits inter)
67 :header size: (16 bits inter)
68
68
69 The total number of Bytes used by the part headers. When the header is empty
69 The total number of Bytes used by the part headers. When the header is empty
70 (size = 0) this is interpreted as the end of stream marker.
70 (size = 0) this is interpreted as the end of stream marker.
71
71
72 :header:
72 :header:
73
73
74 The header defines how to interpret the part. It contains two piece of
74 The header defines how to interpret the part. It contains two piece of
75 data: the part type, and the part parameters.
75 data: the part type, and the part parameters.
76
76
77 The part type is used to route an application level handler, that can
77 The part type is used to route an application level handler, that can
78 interpret payload.
78 interpret payload.
79
79
80 Part parameters are passed to the application level handler. They are
80 Part parameters are passed to the application level handler. They are
81 meant to convey information that will help the application level object to
81 meant to convey information that will help the application level object to
82 interpret the part payload.
82 interpret the part payload.
83
83
84 The binary format of the header is has follow
84 The binary format of the header is has follow
85
85
86 :typesize: (one byte)
86 :typesize: (one byte)
87
87
88 :typename: alphanumerical part name
88 :typename: alphanumerical part name
89
89
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 to this part.
91 to this part.
92
92
93 :parameters:
93 :parameters:
94
94
95 Part's parameter may have arbitraty content, the binary structure is::
95 Part's parameter may have arbitraty content, the binary structure is::
96
96
97 <mandatory-count><advisory-count><param-sizes><param-data>
97 <mandatory-count><advisory-count><param-sizes><param-data>
98
98
99 :mandatory-count: 1 byte, number of mandatory parameters
99 :mandatory-count: 1 byte, number of mandatory parameters
100
100
101 :advisory-count: 1 byte, number of advisory parameters
101 :advisory-count: 1 byte, number of advisory parameters
102
102
103 :param-sizes:
103 :param-sizes:
104
104
105 N couple of bytes, where N is the total number of parameters. Each
105 N couple of bytes, where N is the total number of parameters. Each
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107
107
108 :param-data:
108 :param-data:
109
109
110 A blob of bytes from which each parameter key and value can be
110 A blob of bytes from which each parameter key and value can be
111 retrieved using the list of size couples stored in the previous
111 retrieved using the list of size couples stored in the previous
112 field.
112 field.
113
113
114 Mandatory parameters comes first, then the advisory ones.
114 Mandatory parameters comes first, then the advisory ones.
115
115
116 :payload:
116 :payload:
117
117
118 payload is a series of `<chunksize><chunkdata>`.
118 payload is a series of `<chunksize><chunkdata>`.
119
119
120 `chunksize` is a 32 bits integer, `chunkdata` are plain bytes (as much as
120 `chunksize` is a 32 bits integer, `chunkdata` are plain bytes (as much as
121 `chunksize` says)` The payload part is concluded by a zero size chunk.
121 `chunksize` says)` The payload part is concluded by a zero size chunk.
122
122
123 The current implementation always produces either zero or one chunk.
123 The current implementation always produces either zero or one chunk.
124 This is an implementation limitation that will ultimatly be lifted.
124 This is an implementation limitation that will ultimatly be lifted.
125
125
126 Bundle processing
126 Bundle processing
127 ============================
127 ============================
128
128
129 Each part is processed in order using a "part handler". Handler are registered
129 Each part is processed in order using a "part handler". Handler are registered
130 for a certain part type.
130 for a certain part type.
131
131
132 The matching of a part to its handler is case insensitive. The case of the
132 The matching of a part to its handler is case insensitive. The case of the
133 part type is used to know if a part is mandatory or advisory. If the Part type
133 part type is used to know if a part is mandatory or advisory. If the Part type
134 contains any uppercase char it is considered mandatory. When no handler is
134 contains any uppercase char it is considered mandatory. When no handler is
135 known for a Mandatory part, the process is aborted and an exception is raised.
135 known for a Mandatory part, the process is aborted and an exception is raised.
136 If the part is advisory and no handler is known, the part is ignored. When the
136 If the part is advisory and no handler is known, the part is ignored. When the
137 process is aborted, the full bundle is still read from the stream to keep the
137 process is aborted, the full bundle is still read from the stream to keep the
138 channel usable. But none of the part read from an abort are processed. In the
138 channel usable. But none of the part read from an abort are processed. In the
139 future, dropping the stream may become an option for channel we do not care to
139 future, dropping the stream may become an option for channel we do not care to
140 preserve.
140 preserve.
141 """
141 """
142
142
143 import util
143 import util
144 import struct
144 import struct
145 import urllib
145 import urllib
146 import string
146 import string
147 import StringIO
147 import StringIO
148
148
149 import changegroup
149 import changegroup
150 from i18n import _
150 from i18n import _
151
151
152 _pack = struct.pack
152 _pack = struct.pack
153 _unpack = struct.unpack
153 _unpack = struct.unpack
154
154
155 _magicstring = 'HG20'
155 _magicstring = 'HG20'
156
156
157 _fstreamparamsize = '>H'
157 _fstreamparamsize = '>H'
158 _fpartheadersize = '>H'
158 _fpartheadersize = '>H'
159 _fparttypesize = '>B'
159 _fparttypesize = '>B'
160 _fpartid = '>I'
160 _fpartid = '>I'
161 _fpayloadsize = '>I'
161 _fpayloadsize = '>I'
162 _fpartparamcount = '>BB'
162 _fpartparamcount = '>BB'
163
163
164 preferedchunksize = 4096
164 preferedchunksize = 4096
165
165
166 def _makefpartparamsizes(nbparams):
166 def _makefpartparamsizes(nbparams):
167 """return a struct format to read part parameter sizes
167 """return a struct format to read part parameter sizes
168
168
169 The number parameters is variable so we need to build that format
169 The number parameters is variable so we need to build that format
170 dynamically.
170 dynamically.
171 """
171 """
172 return '>'+('BB'*nbparams)
172 return '>'+('BB'*nbparams)
173
173
174 parthandlermapping = {}
174 parthandlermapping = {}
175
175
176 def parthandler(parttype):
176 def parthandler(parttype):
177 """decorator that register a function as a bundle2 part handler
177 """decorator that register a function as a bundle2 part handler
178
178
179 eg::
179 eg::
180
180
181 @parthandler('myparttype')
181 @parthandler('myparttype')
182 def myparttypehandler(...):
182 def myparttypehandler(...):
183 '''process a part of type "my part".'''
183 '''process a part of type "my part".'''
184 ...
184 ...
185 """
185 """
186 def _decorator(func):
186 def _decorator(func):
187 lparttype = parttype.lower() # enforce lower case matching.
187 lparttype = parttype.lower() # enforce lower case matching.
188 assert lparttype not in parthandlermapping
188 assert lparttype not in parthandlermapping
189 parthandlermapping[lparttype] = func
189 parthandlermapping[lparttype] = func
190 return func
190 return func
191 return _decorator
191 return _decorator
192
192
193 class unbundlerecords(object):
193 class unbundlerecords(object):
194 """keep record of what happens during and unbundle
194 """keep record of what happens during and unbundle
195
195
196 New records are added using `records.add('cat', obj)`. Where 'cat' is a
196 New records are added using `records.add('cat', obj)`. Where 'cat' is a
197 category of record and obj is an arbitraty object.
197 category of record and obj is an arbitraty object.
198
198
199 `records['cat']` will return all entries of this category 'cat'.
199 `records['cat']` will return all entries of this category 'cat'.
200
200
201 Iterating on the object itself will yield `('category', obj)` tuples
201 Iterating on the object itself will yield `('category', obj)` tuples
202 for all entries.
202 for all entries.
203
203
204 All iterations happens in chronological order.
204 All iterations happens in chronological order.
205 """
205 """
206
206
207 def __init__(self):
207 def __init__(self):
208 self._categories = {}
208 self._categories = {}
209 self._sequences = []
209 self._sequences = []
210 self._replies = {}
210 self._replies = {}
211
211
212 def add(self, category, entry, inreplyto=None):
212 def add(self, category, entry, inreplyto=None):
213 """add a new record of a given category.
213 """add a new record of a given category.
214
214
215 The entry can then be retrieved in the list returned by
215 The entry can then be retrieved in the list returned by
216 self['category']."""
216 self['category']."""
217 self._categories.setdefault(category, []).append(entry)
217 self._categories.setdefault(category, []).append(entry)
218 self._sequences.append((category, entry))
218 self._sequences.append((category, entry))
219 if inreplyto is not None:
219 if inreplyto is not None:
220 self.getreplies(inreplyto).add(category, entry)
220 self.getreplies(inreplyto).add(category, entry)
221
221
222 def getreplies(self, partid):
222 def getreplies(self, partid):
223 """get the subrecords that replies to a specific part"""
223 """get the subrecords that replies to a specific part"""
224 return self._replies.setdefault(partid, unbundlerecords())
224 return self._replies.setdefault(partid, unbundlerecords())
225
225
226 def __getitem__(self, cat):
226 def __getitem__(self, cat):
227 return tuple(self._categories.get(cat, ()))
227 return tuple(self._categories.get(cat, ()))
228
228
229 def __iter__(self):
229 def __iter__(self):
230 return iter(self._sequences)
230 return iter(self._sequences)
231
231
232 def __len__(self):
232 def __len__(self):
233 return len(self._sequences)
233 return len(self._sequences)
234
234
235 def __nonzero__(self):
235 def __nonzero__(self):
236 return bool(self._sequences)
236 return bool(self._sequences)
237
237
238 class bundleoperation(object):
238 class bundleoperation(object):
239 """an object that represents a single bundling process
239 """an object that represents a single bundling process
240
240
241 Its purpose is to carry unbundle-related objects and states.
241 Its purpose is to carry unbundle-related objects and states.
242
242
243 A new object should be created at the beginning of each bundle processing.
243 A new object should be created at the beginning of each bundle processing.
244 The object is to be returned by the processing function.
244 The object is to be returned by the processing function.
245
245
246 The object has very little content now it will ultimately contain:
246 The object has very little content now it will ultimately contain:
247 * an access to the repo the bundle is applied to,
247 * an access to the repo the bundle is applied to,
248 * a ui object,
248 * a ui object,
249 * a way to retrieve a transaction to add changes to the repo,
249 * a way to retrieve a transaction to add changes to the repo,
250 * a way to record the result of processing each part,
250 * a way to record the result of processing each part,
251 * a way to construct a bundle response when applicable.
251 * a way to construct a bundle response when applicable.
252 """
252 """
253
253
254 def __init__(self, repo, transactiongetter):
254 def __init__(self, repo, transactiongetter):
255 self.repo = repo
255 self.repo = repo
256 self.ui = repo.ui
256 self.ui = repo.ui
257 self.records = unbundlerecords()
257 self.records = unbundlerecords()
258 self.gettransaction = transactiongetter
258 self.gettransaction = transactiongetter
259 self.reply = None
259 self.reply = None
260
260
261 class TransactionUnavailable(RuntimeError):
261 class TransactionUnavailable(RuntimeError):
262 pass
262 pass
263
263
264 def _notransaction():
264 def _notransaction():
265 """default method to get a transaction while processing a bundle
265 """default method to get a transaction while processing a bundle
266
266
267 Raise an exception to highlight the fact that no transaction was expected
267 Raise an exception to highlight the fact that no transaction was expected
268 to be created"""
268 to be created"""
269 raise TransactionUnavailable()
269 raise TransactionUnavailable()
270
270
271 def processbundle(repo, unbundler, transactiongetter=_notransaction):
271 def processbundle(repo, unbundler, transactiongetter=_notransaction):
272 """This function process a bundle, apply effect to/from a repo
272 """This function process a bundle, apply effect to/from a repo
273
273
274 It iterates over each part then searches for and uses the proper handling
274 It iterates over each part then searches for and uses the proper handling
275 code to process the part. Parts are processed in order.
275 code to process the part. Parts are processed in order.
276
276
277 This is very early version of this function that will be strongly reworked
277 This is very early version of this function that will be strongly reworked
278 before final usage.
278 before final usage.
279
279
280 Unknown Mandatory part will abort the process.
280 Unknown Mandatory part will abort the process.
281 """
281 """
282 op = bundleoperation(repo, transactiongetter)
282 op = bundleoperation(repo, transactiongetter)
283 # todo:
283 # todo:
284 # - only create reply bundle if requested.
284 # - only create reply bundle if requested.
285 op.reply = bundle20(op.ui)
285 op.reply = bundle20(op.ui)
286 # todo:
286 # todo:
287 # - replace this is a init function soon.
287 # - replace this is a init function soon.
288 # - exception catching
288 # - exception catching
289 unbundler.params
289 unbundler.params
290 iterparts = iter(unbundler)
290 iterparts = iter(unbundler)
291 try:
291 try:
292 for part in iterparts:
292 for part in iterparts:
293 parttype = part.type
293 parttype = part.type
294 # part key are matched lower case
294 # part key are matched lower case
295 key = parttype.lower()
295 key = parttype.lower()
296 try:
296 try:
297 handler = parthandlermapping[key]
297 handler = parthandlermapping[key]
298 op.ui.debug('found a handler for part %r\n' % parttype)
298 op.ui.debug('found a handler for part %r\n' % parttype)
299 except KeyError:
299 except KeyError:
300 if key != parttype: # mandatory parts
300 if key != parttype: # mandatory parts
301 # todo:
301 # todo:
302 # - use a more precise exception
302 # - use a more precise exception
303 raise
303 raise
304 op.ui.debug('ignoring unknown advisory part %r\n' % key)
304 op.ui.debug('ignoring unknown advisory part %r\n' % key)
305 # todo:
305 # todo:
306 # - consume the part once we use streaming
306 # - consume the part once we use streaming
307 continue
307 continue
308
308
309 # handler is called outside the above try block so that we don't
309 # handler is called outside the above try block so that we don't
310 # risk catching KeyErrors from anything other than the
310 # risk catching KeyErrors from anything other than the
311 # parthandlermapping lookup (any KeyError raised by handler()
311 # parthandlermapping lookup (any KeyError raised by handler()
312 # itself represents a defect of a different variety).
312 # itself represents a defect of a different variety).
313 handler(op, part)
313 handler(op, part)
314 except Exception:
314 except Exception:
315 for part in iterparts:
315 for part in iterparts:
316 pass # consume the bundle content
316 pass # consume the bundle content
317 raise
317 raise
318 return op
318 return op
319
319
320 class bundle20(object):
320 class bundle20(object):
321 """represent an outgoing bundle2 container
321 """represent an outgoing bundle2 container
322
322
323 Use the `addparam` method to add stream level parameter. and `addpart` to
323 Use the `addparam` method to add stream level parameter. and `addpart` to
324 populate it. Then call `getchunks` to retrieve all the binary chunks of
324 populate it. Then call `getchunks` to retrieve all the binary chunks of
325 datathat compose the bundle2 container."""
325 datathat compose the bundle2 container."""
326
326
327 def __init__(self, ui):
327 def __init__(self, ui):
328 self.ui = ui
328 self.ui = ui
329 self._params = []
329 self._params = []
330 self._parts = []
330 self._parts = []
331
331
332 def addparam(self, name, value=None):
332 def addparam(self, name, value=None):
333 """add a stream level parameter"""
333 """add a stream level parameter"""
334 if not name:
334 if not name:
335 raise ValueError('empty parameter name')
335 raise ValueError('empty parameter name')
336 if name[0] not in string.letters:
336 if name[0] not in string.letters:
337 raise ValueError('non letter first character: %r' % name)
337 raise ValueError('non letter first character: %r' % name)
338 self._params.append((name, value))
338 self._params.append((name, value))
339
339
340 def addpart(self, part):
340 def addpart(self, part):
341 """add a new part to the bundle2 container
341 """add a new part to the bundle2 container
342
342
343 Parts contains the actuall applicative payload."""
343 Parts contains the actuall applicative payload."""
344 assert part.id is None
344 assert part.id is None
345 part.id = len(self._parts) # very cheap counter
345 part.id = len(self._parts) # very cheap counter
346 self._parts.append(part)
346 self._parts.append(part)
347
347
348 def getchunks(self):
348 def getchunks(self):
349 self.ui.debug('start emission of %s stream\n' % _magicstring)
349 self.ui.debug('start emission of %s stream\n' % _magicstring)
350 yield _magicstring
350 yield _magicstring
351 param = self._paramchunk()
351 param = self._paramchunk()
352 self.ui.debug('bundle parameter: %s\n' % param)
352 self.ui.debug('bundle parameter: %s\n' % param)
353 yield _pack(_fstreamparamsize, len(param))
353 yield _pack(_fstreamparamsize, len(param))
354 if param:
354 if param:
355 yield param
355 yield param
356
356
357 self.ui.debug('start of parts\n')
357 self.ui.debug('start of parts\n')
358 for part in self._parts:
358 for part in self._parts:
359 self.ui.debug('bundle part: "%s"\n' % part.type)
359 self.ui.debug('bundle part: "%s"\n' % part.type)
360 for chunk in part.getchunks():
360 for chunk in part.getchunks():
361 yield chunk
361 yield chunk
362 self.ui.debug('end of bundle\n')
362 self.ui.debug('end of bundle\n')
363 yield '\0\0'
363 yield '\0\0'
364
364
365 def _paramchunk(self):
365 def _paramchunk(self):
366 """return a encoded version of all stream parameters"""
366 """return a encoded version of all stream parameters"""
367 blocks = []
367 blocks = []
368 for par, value in self._params:
368 for par, value in self._params:
369 par = urllib.quote(par)
369 par = urllib.quote(par)
370 if value is not None:
370 if value is not None:
371 value = urllib.quote(value)
371 value = urllib.quote(value)
372 par = '%s=%s' % (par, value)
372 par = '%s=%s' % (par, value)
373 blocks.append(par)
373 blocks.append(par)
374 return ' '.join(blocks)
374 return ' '.join(blocks)
375
375
376 class unbundle20(object):
376 class unpackermixin(object):
377 """interpret a bundle2 stream
377 """A mixin to extract bytes and struct data from a stream"""
378
378
379 (this will eventually yield parts)"""
379 def __init__(self, fp):
380
381 def __init__(self, ui, fp):
382 self.ui = ui
383 self._fp = fp
380 self._fp = fp
384 header = self._readexact(4)
385 magic, version = header[0:2], header[2:4]
386 if magic != 'HG':
387 raise util.Abort(_('not a Mercurial bundle'))
388 if version != '20':
389 raise util.Abort(_('unknown bundle version %s') % version)
390 self.ui.debug('start processing of %s stream\n' % header)
391
381
392 def _unpack(self, format):
382 def _unpack(self, format):
393 """unpack this struct format from the stream"""
383 """unpack this struct format from the stream"""
394 data = self._readexact(struct.calcsize(format))
384 data = self._readexact(struct.calcsize(format))
395 return _unpack(format, data)
385 return _unpack(format, data)
396
386
397 def _readexact(self, size):
387 def _readexact(self, size):
398 """read exactly <size> bytes from the stream"""
388 """read exactly <size> bytes from the stream"""
399 return changegroup.readexactly(self._fp, size)
389 return changegroup.readexactly(self._fp, size)
400
390
391
392 class unbundle20(unpackermixin):
393 """interpret a bundle2 stream
394
395 (this will eventually yield parts)"""
396
397 def __init__(self, ui, fp):
398 self.ui = ui
399 super(unbundle20, self).__init__(fp)
400 header = self._readexact(4)
401 magic, version = header[0:2], header[2:4]
402 if magic != 'HG':
403 raise util.Abort(_('not a Mercurial bundle'))
404 if version != '20':
405 raise util.Abort(_('unknown bundle version %s') % version)
406 self.ui.debug('start processing of %s stream\n' % header)
407
401 @util.propertycache
408 @util.propertycache
402 def params(self):
409 def params(self):
403 """dictionnary of stream level parameters"""
410 """dictionnary of stream level parameters"""
404 self.ui.debug('reading bundle2 stream parameters\n')
411 self.ui.debug('reading bundle2 stream parameters\n')
405 params = {}
412 params = {}
406 paramssize = self._unpack(_fstreamparamsize)[0]
413 paramssize = self._unpack(_fstreamparamsize)[0]
407 if paramssize:
414 if paramssize:
408 for p in self._readexact(paramssize).split(' '):
415 for p in self._readexact(paramssize).split(' '):
409 p = p.split('=', 1)
416 p = p.split('=', 1)
410 p = [urllib.unquote(i) for i in p]
417 p = [urllib.unquote(i) for i in p]
411 if len(p) < 2:
418 if len(p) < 2:
412 p.append(None)
419 p.append(None)
413 self._processparam(*p)
420 self._processparam(*p)
414 params[p[0]] = p[1]
421 params[p[0]] = p[1]
415 return params
422 return params
416
423
417 def _processparam(self, name, value):
424 def _processparam(self, name, value):
418 """process a parameter, applying its effect if needed
425 """process a parameter, applying its effect if needed
419
426
420 Parameter starting with a lower case letter are advisory and will be
427 Parameter starting with a lower case letter are advisory and will be
421 ignored when unknown. Those starting with an upper case letter are
428 ignored when unknown. Those starting with an upper case letter are
422 mandatory and will this function will raise a KeyError when unknown.
429 mandatory and will this function will raise a KeyError when unknown.
423
430
424 Note: no option are currently supported. Any input will be either
431 Note: no option are currently supported. Any input will be either
425 ignored or failing.
432 ignored or failing.
426 """
433 """
427 if not name:
434 if not name:
428 raise ValueError('empty parameter name')
435 raise ValueError('empty parameter name')
429 if name[0] not in string.letters:
436 if name[0] not in string.letters:
430 raise ValueError('non letter first character: %r' % name)
437 raise ValueError('non letter first character: %r' % name)
431 # Some logic will be later added here to try to process the option for
438 # Some logic will be later added here to try to process the option for
432 # a dict of known parameter.
439 # a dict of known parameter.
433 if name[0].islower():
440 if name[0].islower():
434 self.ui.debug("ignoring unknown parameter %r\n" % name)
441 self.ui.debug("ignoring unknown parameter %r\n" % name)
435 else:
442 else:
436 raise KeyError(name)
443 raise KeyError(name)
437
444
438
445
439 def __iter__(self):
446 def __iter__(self):
440 """yield all parts contained in the stream"""
447 """yield all parts contained in the stream"""
441 # make sure param have been loaded
448 # make sure param have been loaded
442 self.params
449 self.params
443 self.ui.debug('start extraction of bundle2 parts\n')
450 self.ui.debug('start extraction of bundle2 parts\n')
444 part = self._readpart()
451 part = self._readpart()
445 while part is not None:
452 while part is not None:
446 yield part
453 yield part
447 part = self._readpart()
454 part = self._readpart()
448 self.ui.debug('end of bundle2 stream\n')
455 self.ui.debug('end of bundle2 stream\n')
449
456
450 def _readpart(self):
457 def _readpart(self):
451 """return None when an end of stream markers is reach"""
458 """return None when an end of stream markers is reach"""
452
459
453 headersize = self._unpack(_fpartheadersize)[0]
460 headersize = self._unpack(_fpartheadersize)[0]
454 self.ui.debug('part header size: %i\n' % headersize)
461 self.ui.debug('part header size: %i\n' % headersize)
455 if not headersize:
462 if not headersize:
456 return None
463 return None
457 headerblock = self._readexact(headersize)
464 headerblock = self._readexact(headersize)
458 # some utility to help reading from the header block
465 # some utility to help reading from the header block
459 self._offset = 0 # layer violation to have something easy to understand
466 self._offset = 0 # layer violation to have something easy to understand
460 def fromheader(size):
467 def fromheader(size):
461 """return the next <size> byte from the header"""
468 """return the next <size> byte from the header"""
462 offset = self._offset
469 offset = self._offset
463 data = headerblock[offset:(offset + size)]
470 data = headerblock[offset:(offset + size)]
464 self._offset = offset + size
471 self._offset = offset + size
465 return data
472 return data
466 def unpackheader(format):
473 def unpackheader(format):
467 """read given format from header
474 """read given format from header
468
475
469 This automatically compute the size of the format to read."""
476 This automatically compute the size of the format to read."""
470 data = fromheader(struct.calcsize(format))
477 data = fromheader(struct.calcsize(format))
471 return _unpack(format, data)
478 return _unpack(format, data)
472
479
473 typesize = unpackheader(_fparttypesize)[0]
480 typesize = unpackheader(_fparttypesize)[0]
474 parttype = fromheader(typesize)
481 parttype = fromheader(typesize)
475 self.ui.debug('part type: "%s"\n' % parttype)
482 self.ui.debug('part type: "%s"\n' % parttype)
476 partid = unpackheader(_fpartid)[0]
483 partid = unpackheader(_fpartid)[0]
477 self.ui.debug('part id: "%s"\n' % partid)
484 self.ui.debug('part id: "%s"\n' % partid)
478 ## reading parameters
485 ## reading parameters
479 # param count
486 # param count
480 mancount, advcount = unpackheader(_fpartparamcount)
487 mancount, advcount = unpackheader(_fpartparamcount)
481 self.ui.debug('part parameters: %i\n' % (mancount + advcount))
488 self.ui.debug('part parameters: %i\n' % (mancount + advcount))
482 # param size
489 # param size
483 paramsizes = unpackheader(_makefpartparamsizes(mancount + advcount))
490 paramsizes = unpackheader(_makefpartparamsizes(mancount + advcount))
484 # make it a list of couple again
491 # make it a list of couple again
485 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
492 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
486 # split mandatory from advisory
493 # split mandatory from advisory
487 mansizes = paramsizes[:mancount]
494 mansizes = paramsizes[:mancount]
488 advsizes = paramsizes[mancount:]
495 advsizes = paramsizes[mancount:]
489 # retrive param value
496 # retrive param value
490 manparams = []
497 manparams = []
491 for key, value in mansizes:
498 for key, value in mansizes:
492 manparams.append((fromheader(key), fromheader(value)))
499 manparams.append((fromheader(key), fromheader(value)))
493 advparams = []
500 advparams = []
494 for key, value in advsizes:
501 for key, value in advsizes:
495 advparams.append((fromheader(key), fromheader(value)))
502 advparams.append((fromheader(key), fromheader(value)))
496 del self._offset # clean up layer, nobody saw anything.
503 del self._offset # clean up layer, nobody saw anything.
497 ## part payload
504 ## part payload
498 payload = []
505 payload = []
499 payloadsize = self._unpack(_fpayloadsize)[0]
506 payloadsize = self._unpack(_fpayloadsize)[0]
500 self.ui.debug('payload chunk size: %i\n' % payloadsize)
507 self.ui.debug('payload chunk size: %i\n' % payloadsize)
501 while payloadsize:
508 while payloadsize:
502 payload.append(self._readexact(payloadsize))
509 payload.append(self._readexact(payloadsize))
503 payloadsize = self._unpack(_fpayloadsize)[0]
510 payloadsize = self._unpack(_fpayloadsize)[0]
504 self.ui.debug('payload chunk size: %i\n' % payloadsize)
511 self.ui.debug('payload chunk size: %i\n' % payloadsize)
505 payload = ''.join(payload)
512 payload = ''.join(payload)
506 current = bundlepart(parttype, manparams, advparams, data=payload)
513 current = bundlepart(parttype, manparams, advparams, data=payload)
507 current.id = partid
514 current.id = partid
508 return current
515 return current
509
516
510
517
511 class bundlepart(object):
518 class bundlepart(object):
512 """A bundle2 part contains application level payload
519 """A bundle2 part contains application level payload
513
520
514 The part `type` is used to route the part to the application level
521 The part `type` is used to route the part to the application level
515 handler.
522 handler.
516 """
523 """
517
524
518 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
525 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
519 data=''):
526 data=''):
520 self.id = None
527 self.id = None
521 self.type = parttype
528 self.type = parttype
522 self.data = data
529 self.data = data
523 self.mandatoryparams = mandatoryparams
530 self.mandatoryparams = mandatoryparams
524 self.advisoryparams = advisoryparams
531 self.advisoryparams = advisoryparams
525
532
526 def getchunks(self):
533 def getchunks(self):
527 #### header
534 #### header
528 ## parttype
535 ## parttype
529 header = [_pack(_fparttypesize, len(self.type)),
536 header = [_pack(_fparttypesize, len(self.type)),
530 self.type, _pack(_fpartid, self.id),
537 self.type, _pack(_fpartid, self.id),
531 ]
538 ]
532 ## parameters
539 ## parameters
533 # count
540 # count
534 manpar = self.mandatoryparams
541 manpar = self.mandatoryparams
535 advpar = self.advisoryparams
542 advpar = self.advisoryparams
536 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
543 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
537 # size
544 # size
538 parsizes = []
545 parsizes = []
539 for key, value in manpar:
546 for key, value in manpar:
540 parsizes.append(len(key))
547 parsizes.append(len(key))
541 parsizes.append(len(value))
548 parsizes.append(len(value))
542 for key, value in advpar:
549 for key, value in advpar:
543 parsizes.append(len(key))
550 parsizes.append(len(key))
544 parsizes.append(len(value))
551 parsizes.append(len(value))
545 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
552 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
546 header.append(paramsizes)
553 header.append(paramsizes)
547 # key, value
554 # key, value
548 for key, value in manpar:
555 for key, value in manpar:
549 header.append(key)
556 header.append(key)
550 header.append(value)
557 header.append(value)
551 for key, value in advpar:
558 for key, value in advpar:
552 header.append(key)
559 header.append(key)
553 header.append(value)
560 header.append(value)
554 ## finalize header
561 ## finalize header
555 headerchunk = ''.join(header)
562 headerchunk = ''.join(header)
556 yield _pack(_fpartheadersize, len(headerchunk))
563 yield _pack(_fpartheadersize, len(headerchunk))
557 yield headerchunk
564 yield headerchunk
558 ## payload
565 ## payload
559 for chunk in self._payloadchunks():
566 for chunk in self._payloadchunks():
560 yield _pack(_fpayloadsize, len(chunk))
567 yield _pack(_fpayloadsize, len(chunk))
561 yield chunk
568 yield chunk
562 # end of payload
569 # end of payload
563 yield _pack(_fpayloadsize, 0)
570 yield _pack(_fpayloadsize, 0)
564
571
565 def _payloadchunks(self):
572 def _payloadchunks(self):
566 """yield chunks of a the part payload
573 """yield chunks of a the part payload
567
574
568 Exists to handle the different methods to provide data to a part."""
575 Exists to handle the different methods to provide data to a part."""
569 # we only support fixed size data now.
576 # we only support fixed size data now.
570 # This will be improved in the future.
577 # This will be improved in the future.
571 if util.safehasattr(self.data, 'next'):
578 if util.safehasattr(self.data, 'next'):
572 buff = util.chunkbuffer(self.data)
579 buff = util.chunkbuffer(self.data)
573 chunk = buff.read(preferedchunksize)
580 chunk = buff.read(preferedchunksize)
574 while chunk:
581 while chunk:
575 yield chunk
582 yield chunk
576 chunk = buff.read(preferedchunksize)
583 chunk = buff.read(preferedchunksize)
577 elif len(self.data):
584 elif len(self.data):
578 yield self.data
585 yield self.data
579
586
580 @parthandler('changegroup')
587 @parthandler('changegroup')
581 def handlechangegroup(op, inpart):
588 def handlechangegroup(op, inpart):
582 """apply a changegroup part on the repo
589 """apply a changegroup part on the repo
583
590
584 This is a very early implementation that will massive rework before being
591 This is a very early implementation that will massive rework before being
585 inflicted to any end-user.
592 inflicted to any end-user.
586 """
593 """
587 # Make sure we trigger a transaction creation
594 # Make sure we trigger a transaction creation
588 #
595 #
589 # The addchangegroup function will get a transaction object by itself, but
596 # The addchangegroup function will get a transaction object by itself, but
590 # we need to make sure we trigger the creation of a transaction object used
597 # we need to make sure we trigger the creation of a transaction object used
591 # for the whole processing scope.
598 # for the whole processing scope.
592 op.gettransaction()
599 op.gettransaction()
593 data = StringIO.StringIO(inpart.data)
600 data = StringIO.StringIO(inpart.data)
594 data.seek(0)
601 data.seek(0)
595 cg = changegroup.readbundle(data, 'bundle2part')
602 cg = changegroup.readbundle(data, 'bundle2part')
596 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
603 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
597 op.records.add('changegroup', {'return': ret})
604 op.records.add('changegroup', {'return': ret})
598 if op.reply is not None:
605 if op.reply is not None:
599 # This is definitly not the final form of this
606 # This is definitly not the final form of this
600 # return. But one need to start somewhere.
607 # return. But one need to start somewhere.
601 op.reply.addpart(bundlepart('reply:changegroup', (),
608 op.reply.addpart(bundlepart('reply:changegroup', (),
602 [('in-reply-to', str(inpart.id)),
609 [('in-reply-to', str(inpart.id)),
603 ('return', '%i' % ret)]))
610 ('return', '%i' % ret)]))
604
611
605 @parthandler('reply:changegroup')
612 @parthandler('reply:changegroup')
606 def handlechangegroup(op, inpart):
613 def handlechangegroup(op, inpart):
607 p = dict(inpart.advisoryparams)
614 p = dict(inpart.advisoryparams)
608 ret = int(p['return'])
615 ret = int(p['return'])
609 op.records.add('changegroup', {'return': ret}, int(p['in-reply-to']))
616 op.records.add('changegroup', {'return': ret}, int(p['in-reply-to']))
610
617
General Comments 0
You need to be logged in to leave comments. Login now