##// END OF EJS Templates
bundle2: extract a _payloadchunks method for part...
Pierre-Yves David -
r21000:4cae06ae default
parent child Browse files
Show More
@@ -1,590 +1,597 b''
1 # bundle2.py - generic container format to transmit arbitrary data.
1 # bundle2.py - generic container format to transmit arbitrary data.
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7 """Handling of the new bundle2 format
7 """Handling of the new bundle2 format
8
8
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 payloads in an application agnostic way. It consist in a sequence of "parts"
10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 that will be handed to and processed by the application layer.
11 that will be handed to and processed by the application layer.
12
12
13
13
14 General format architecture
14 General format architecture
15 ===========================
15 ===========================
16
16
17 The format is architectured as follow
17 The format is architectured as follow
18
18
19 - magic string
19 - magic string
20 - stream level parameters
20 - stream level parameters
21 - payload parts (any number)
21 - payload parts (any number)
22 - end of stream marker.
22 - end of stream marker.
23
23
24 the Binary format
24 the Binary format
25 ============================
25 ============================
26
26
27 All numbers are unsigned and big endian.
27 All numbers are unsigned and big endian.
28
28
29 stream level parameters
29 stream level parameters
30 ------------------------
30 ------------------------
31
31
32 Binary format is as follow
32 Binary format is as follow
33
33
34 :params size: (16 bits integer)
34 :params size: (16 bits integer)
35
35
36 The total number of Bytes used by the parameters
36 The total number of Bytes used by the parameters
37
37
38 :params value: arbitrary number of Bytes
38 :params value: arbitrary number of Bytes
39
39
40 A blob of `params size` containing the serialized version of all stream level
40 A blob of `params size` containing the serialized version of all stream level
41 parameters.
41 parameters.
42
42
43 The blob contains a space separated list of parameters. parameter with value
43 The blob contains a space separated list of parameters. parameter with value
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45
45
46 Empty name are obviously forbidden.
46 Empty name are obviously forbidden.
47
47
48 Name MUST start with a letter. If this first letter is lower case, the
48 Name MUST start with a letter. If this first letter is lower case, the
49 parameter is advisory and can be safefly ignored. However when the first
49 parameter is advisory and can be safefly ignored. However when the first
50 letter is capital, the parameter is mandatory and the bundling process MUST
50 letter is capital, the parameter is mandatory and the bundling process MUST
51 stop if he is not able to proceed it.
51 stop if he is not able to proceed it.
52
52
53 Stream parameters use a simple textual format for two main reasons:
53 Stream parameters use a simple textual format for two main reasons:
54
54
55 - Stream level parameters should remains simple and we want to discourage any
55 - Stream level parameters should remains simple and we want to discourage any
56 crazy usage.
56 crazy usage.
57 - Textual data allow easy human inspection of a the bundle2 header in case of
57 - Textual data allow easy human inspection of a the bundle2 header in case of
58 troubles.
58 troubles.
59
59
60 Any Applicative level options MUST go into a bundle2 part instead.
60 Any Applicative level options MUST go into a bundle2 part instead.
61
61
62 Payload part
62 Payload part
63 ------------------------
63 ------------------------
64
64
65 Binary format is as follow
65 Binary format is as follow
66
66
67 :header size: (16 bits inter)
67 :header size: (16 bits inter)
68
68
69 The total number of Bytes used by the part headers. When the header is empty
69 The total number of Bytes used by the part headers. When the header is empty
70 (size = 0) this is interpreted as the end of stream marker.
70 (size = 0) this is interpreted as the end of stream marker.
71
71
72 :header:
72 :header:
73
73
74 The header defines how to interpret the part. It contains two piece of
74 The header defines how to interpret the part. It contains two piece of
75 data: the part type, and the part parameters.
75 data: the part type, and the part parameters.
76
76
77 The part type is used to route an application level handler, that can
77 The part type is used to route an application level handler, that can
78 interpret payload.
78 interpret payload.
79
79
80 Part parameters are passed to the application level handler. They are
80 Part parameters are passed to the application level handler. They are
81 meant to convey information that will help the application level object to
81 meant to convey information that will help the application level object to
82 interpret the part payload.
82 interpret the part payload.
83
83
84 The binary format of the header is has follow
84 The binary format of the header is has follow
85
85
86 :typesize: (one byte)
86 :typesize: (one byte)
87
87
88 :typename: alphanumerical part name
88 :typename: alphanumerical part name
89
89
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 to this part.
91 to this part.
92
92
93 :parameters:
93 :parameters:
94
94
95 Part's parameter may have arbitraty content, the binary structure is::
95 Part's parameter may have arbitraty content, the binary structure is::
96
96
97 <mandatory-count><advisory-count><param-sizes><param-data>
97 <mandatory-count><advisory-count><param-sizes><param-data>
98
98
99 :mandatory-count: 1 byte, number of mandatory parameters
99 :mandatory-count: 1 byte, number of mandatory parameters
100
100
101 :advisory-count: 1 byte, number of advisory parameters
101 :advisory-count: 1 byte, number of advisory parameters
102
102
103 :param-sizes:
103 :param-sizes:
104
104
105 N couple of bytes, where N is the total number of parameters. Each
105 N couple of bytes, where N is the total number of parameters. Each
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107
107
108 :param-data:
108 :param-data:
109
109
110 A blob of bytes from which each parameter key and value can be
110 A blob of bytes from which each parameter key and value can be
111 retrieved using the list of size couples stored in the previous
111 retrieved using the list of size couples stored in the previous
112 field.
112 field.
113
113
114 Mandatory parameters comes first, then the advisory ones.
114 Mandatory parameters comes first, then the advisory ones.
115
115
116 :payload:
116 :payload:
117
117
118 payload is a series of `<chunksize><chunkdata>`.
118 payload is a series of `<chunksize><chunkdata>`.
119
119
120 `chunksize` is a 32 bits integer, `chunkdata` are plain bytes (as much as
120 `chunksize` is a 32 bits integer, `chunkdata` are plain bytes (as much as
121 `chunksize` says)` The payload part is concluded by a zero size chunk.
121 `chunksize` says)` The payload part is concluded by a zero size chunk.
122
122
123 The current implementation always produces either zero or one chunk.
123 The current implementation always produces either zero or one chunk.
124 This is an implementation limitation that will ultimatly be lifted.
124 This is an implementation limitation that will ultimatly be lifted.
125
125
126 Bundle processing
126 Bundle processing
127 ============================
127 ============================
128
128
129 Each part is processed in order using a "part handler". Handler are registered
129 Each part is processed in order using a "part handler". Handler are registered
130 for a certain part type.
130 for a certain part type.
131
131
132 The matching of a part to its handler is case insensitive. The case of the
132 The matching of a part to its handler is case insensitive. The case of the
133 part type is used to know if a part is mandatory or advisory. If the Part type
133 part type is used to know if a part is mandatory or advisory. If the Part type
134 contains any uppercase char it is considered mandatory. When no handler is
134 contains any uppercase char it is considered mandatory. When no handler is
135 known for a Mandatory part, the process is aborted and an exception is raised.
135 known for a Mandatory part, the process is aborted and an exception is raised.
136 If the part is advisory and no handler is known, the part is ignored. When the
136 If the part is advisory and no handler is known, the part is ignored. When the
137 process is aborted, the full bundle is still read from the stream to keep the
137 process is aborted, the full bundle is still read from the stream to keep the
138 channel usable. But none of the part read from an abort are processed. In the
138 channel usable. But none of the part read from an abort are processed. In the
139 future, dropping the stream may become an option for channel we do not care to
139 future, dropping the stream may become an option for channel we do not care to
140 preserve.
140 preserve.
141 """
141 """
142
142
143 import util
143 import util
144 import struct
144 import struct
145 import urllib
145 import urllib
146 import string
146 import string
147 import StringIO
147 import StringIO
148
148
149 import changegroup
149 import changegroup
150 from i18n import _
150 from i18n import _
151
151
152 _pack = struct.pack
152 _pack = struct.pack
153 _unpack = struct.unpack
153 _unpack = struct.unpack
154
154
155 _magicstring = 'HG20'
155 _magicstring = 'HG20'
156
156
157 _fstreamparamsize = '>H'
157 _fstreamparamsize = '>H'
158 _fpartheadersize = '>H'
158 _fpartheadersize = '>H'
159 _fparttypesize = '>B'
159 _fparttypesize = '>B'
160 _fpartid = '>I'
160 _fpartid = '>I'
161 _fpayloadsize = '>I'
161 _fpayloadsize = '>I'
162 _fpartparamcount = '>BB'
162 _fpartparamcount = '>BB'
163
163
164 def _makefpartparamsizes(nbparams):
164 def _makefpartparamsizes(nbparams):
165 """return a struct format to read part parameter sizes
165 """return a struct format to read part parameter sizes
166
166
167 The number parameters is variable so we need to build that format
167 The number parameters is variable so we need to build that format
168 dynamically.
168 dynamically.
169 """
169 """
170 return '>'+('BB'*nbparams)
170 return '>'+('BB'*nbparams)
171
171
172 parthandlermapping = {}
172 parthandlermapping = {}
173
173
174 def parthandler(parttype):
174 def parthandler(parttype):
175 """decorator that register a function as a bundle2 part handler
175 """decorator that register a function as a bundle2 part handler
176
176
177 eg::
177 eg::
178
178
179 @parthandler('myparttype')
179 @parthandler('myparttype')
180 def myparttypehandler(...):
180 def myparttypehandler(...):
181 '''process a part of type "my part".'''
181 '''process a part of type "my part".'''
182 ...
182 ...
183 """
183 """
184 def _decorator(func):
184 def _decorator(func):
185 lparttype = parttype.lower() # enforce lower case matching.
185 lparttype = parttype.lower() # enforce lower case matching.
186 assert lparttype not in parthandlermapping
186 assert lparttype not in parthandlermapping
187 parthandlermapping[lparttype] = func
187 parthandlermapping[lparttype] = func
188 return func
188 return func
189 return _decorator
189 return _decorator
190
190
191 class unbundlerecords(object):
191 class unbundlerecords(object):
192 """keep record of what happens during and unbundle
192 """keep record of what happens during and unbundle
193
193
194 New records are added using `records.add('cat', obj)`. Where 'cat' is a
194 New records are added using `records.add('cat', obj)`. Where 'cat' is a
195 category of record and obj is an arbitraty object.
195 category of record and obj is an arbitraty object.
196
196
197 `records['cat']` will return all entries of this category 'cat'.
197 `records['cat']` will return all entries of this category 'cat'.
198
198
199 Iterating on the object itself will yield `('category', obj)` tuples
199 Iterating on the object itself will yield `('category', obj)` tuples
200 for all entries.
200 for all entries.
201
201
202 All iterations happens in chronological order.
202 All iterations happens in chronological order.
203 """
203 """
204
204
205 def __init__(self):
205 def __init__(self):
206 self._categories = {}
206 self._categories = {}
207 self._sequences = []
207 self._sequences = []
208 self._replies = {}
208 self._replies = {}
209
209
210 def add(self, category, entry, inreplyto=None):
210 def add(self, category, entry, inreplyto=None):
211 """add a new record of a given category.
211 """add a new record of a given category.
212
212
213 The entry can then be retrieved in the list returned by
213 The entry can then be retrieved in the list returned by
214 self['category']."""
214 self['category']."""
215 self._categories.setdefault(category, []).append(entry)
215 self._categories.setdefault(category, []).append(entry)
216 self._sequences.append((category, entry))
216 self._sequences.append((category, entry))
217 if inreplyto is not None:
217 if inreplyto is not None:
218 self.getreplies(inreplyto).add(category, entry)
218 self.getreplies(inreplyto).add(category, entry)
219
219
220 def getreplies(self, partid):
220 def getreplies(self, partid):
221 """get the subrecords that replies to a specific part"""
221 """get the subrecords that replies to a specific part"""
222 return self._replies.setdefault(partid, unbundlerecords())
222 return self._replies.setdefault(partid, unbundlerecords())
223
223
224 def __getitem__(self, cat):
224 def __getitem__(self, cat):
225 return tuple(self._categories.get(cat, ()))
225 return tuple(self._categories.get(cat, ()))
226
226
227 def __iter__(self):
227 def __iter__(self):
228 return iter(self._sequences)
228 return iter(self._sequences)
229
229
230 def __len__(self):
230 def __len__(self):
231 return len(self._sequences)
231 return len(self._sequences)
232
232
233 def __nonzero__(self):
233 def __nonzero__(self):
234 return bool(self._sequences)
234 return bool(self._sequences)
235
235
236 class bundleoperation(object):
236 class bundleoperation(object):
237 """an object that represents a single bundling process
237 """an object that represents a single bundling process
238
238
239 Its purpose is to carry unbundle-related objects and states.
239 Its purpose is to carry unbundle-related objects and states.
240
240
241 A new object should be created at the beginning of each bundle processing.
241 A new object should be created at the beginning of each bundle processing.
242 The object is to be returned by the processing function.
242 The object is to be returned by the processing function.
243
243
244 The object has very little content now it will ultimately contain:
244 The object has very little content now it will ultimately contain:
245 * an access to the repo the bundle is applied to,
245 * an access to the repo the bundle is applied to,
246 * a ui object,
246 * a ui object,
247 * a way to retrieve a transaction to add changes to the repo,
247 * a way to retrieve a transaction to add changes to the repo,
248 * a way to record the result of processing each part,
248 * a way to record the result of processing each part,
249 * a way to construct a bundle response when applicable.
249 * a way to construct a bundle response when applicable.
250 """
250 """
251
251
252 def __init__(self, repo, transactiongetter):
252 def __init__(self, repo, transactiongetter):
253 self.repo = repo
253 self.repo = repo
254 self.ui = repo.ui
254 self.ui = repo.ui
255 self.records = unbundlerecords()
255 self.records = unbundlerecords()
256 self.gettransaction = transactiongetter
256 self.gettransaction = transactiongetter
257 self.reply = None
257 self.reply = None
258
258
259 class TransactionUnavailable(RuntimeError):
259 class TransactionUnavailable(RuntimeError):
260 pass
260 pass
261
261
262 def _notransaction():
262 def _notransaction():
263 """default method to get a transaction while processing a bundle
263 """default method to get a transaction while processing a bundle
264
264
265 Raise an exception to highlight the fact that no transaction was expected
265 Raise an exception to highlight the fact that no transaction was expected
266 to be created"""
266 to be created"""
267 raise TransactionUnavailable()
267 raise TransactionUnavailable()
268
268
269 def processbundle(repo, unbundler, transactiongetter=_notransaction):
269 def processbundle(repo, unbundler, transactiongetter=_notransaction):
270 """This function process a bundle, apply effect to/from a repo
270 """This function process a bundle, apply effect to/from a repo
271
271
272 It iterates over each part then searches for and uses the proper handling
272 It iterates over each part then searches for and uses the proper handling
273 code to process the part. Parts are processed in order.
273 code to process the part. Parts are processed in order.
274
274
275 This is very early version of this function that will be strongly reworked
275 This is very early version of this function that will be strongly reworked
276 before final usage.
276 before final usage.
277
277
278 Unknown Mandatory part will abort the process.
278 Unknown Mandatory part will abort the process.
279 """
279 """
280 op = bundleoperation(repo, transactiongetter)
280 op = bundleoperation(repo, transactiongetter)
281 # todo:
281 # todo:
282 # - only create reply bundle if requested.
282 # - only create reply bundle if requested.
283 op.reply = bundle20(op.ui)
283 op.reply = bundle20(op.ui)
284 # todo:
284 # todo:
285 # - replace this is a init function soon.
285 # - replace this is a init function soon.
286 # - exception catching
286 # - exception catching
287 unbundler.params
287 unbundler.params
288 iterparts = iter(unbundler)
288 iterparts = iter(unbundler)
289 try:
289 try:
290 for part in iterparts:
290 for part in iterparts:
291 parttype = part.type
291 parttype = part.type
292 # part key are matched lower case
292 # part key are matched lower case
293 key = parttype.lower()
293 key = parttype.lower()
294 try:
294 try:
295 handler = parthandlermapping[key]
295 handler = parthandlermapping[key]
296 op.ui.debug('found a handler for part %r\n' % parttype)
296 op.ui.debug('found a handler for part %r\n' % parttype)
297 except KeyError:
297 except KeyError:
298 if key != parttype: # mandatory parts
298 if key != parttype: # mandatory parts
299 # todo:
299 # todo:
300 # - use a more precise exception
300 # - use a more precise exception
301 raise
301 raise
302 op.ui.debug('ignoring unknown advisory part %r\n' % key)
302 op.ui.debug('ignoring unknown advisory part %r\n' % key)
303 # todo:
303 # todo:
304 # - consume the part once we use streaming
304 # - consume the part once we use streaming
305 continue
305 continue
306 handler(op, part)
306 handler(op, part)
307 except Exception:
307 except Exception:
308 for part in iterparts:
308 for part in iterparts:
309 pass # consume the bundle content
309 pass # consume the bundle content
310 raise
310 raise
311 return op
311 return op
312
312
313 class bundle20(object):
313 class bundle20(object):
314 """represent an outgoing bundle2 container
314 """represent an outgoing bundle2 container
315
315
316 Use the `addparam` method to add stream level parameter. and `addpart` to
316 Use the `addparam` method to add stream level parameter. and `addpart` to
317 populate it. Then call `getchunks` to retrieve all the binary chunks of
317 populate it. Then call `getchunks` to retrieve all the binary chunks of
318 datathat compose the bundle2 container."""
318 datathat compose the bundle2 container."""
319
319
320 def __init__(self, ui):
320 def __init__(self, ui):
321 self.ui = ui
321 self.ui = ui
322 self._params = []
322 self._params = []
323 self._parts = []
323 self._parts = []
324
324
325 def addparam(self, name, value=None):
325 def addparam(self, name, value=None):
326 """add a stream level parameter"""
326 """add a stream level parameter"""
327 if not name:
327 if not name:
328 raise ValueError('empty parameter name')
328 raise ValueError('empty parameter name')
329 if name[0] not in string.letters:
329 if name[0] not in string.letters:
330 raise ValueError('non letter first character: %r' % name)
330 raise ValueError('non letter first character: %r' % name)
331 self._params.append((name, value))
331 self._params.append((name, value))
332
332
333 def addpart(self, part):
333 def addpart(self, part):
334 """add a new part to the bundle2 container
334 """add a new part to the bundle2 container
335
335
336 Parts contains the actuall applicative payload."""
336 Parts contains the actuall applicative payload."""
337 assert part.id is None
337 assert part.id is None
338 part.id = len(self._parts) # very cheap counter
338 part.id = len(self._parts) # very cheap counter
339 self._parts.append(part)
339 self._parts.append(part)
340
340
341 def getchunks(self):
341 def getchunks(self):
342 self.ui.debug('start emission of %s stream\n' % _magicstring)
342 self.ui.debug('start emission of %s stream\n' % _magicstring)
343 yield _magicstring
343 yield _magicstring
344 param = self._paramchunk()
344 param = self._paramchunk()
345 self.ui.debug('bundle parameter: %s\n' % param)
345 self.ui.debug('bundle parameter: %s\n' % param)
346 yield _pack(_fstreamparamsize, len(param))
346 yield _pack(_fstreamparamsize, len(param))
347 if param:
347 if param:
348 yield param
348 yield param
349
349
350 self.ui.debug('start of parts\n')
350 self.ui.debug('start of parts\n')
351 for part in self._parts:
351 for part in self._parts:
352 self.ui.debug('bundle part: "%s"\n' % part.type)
352 self.ui.debug('bundle part: "%s"\n' % part.type)
353 for chunk in part.getchunks():
353 for chunk in part.getchunks():
354 yield chunk
354 yield chunk
355 self.ui.debug('end of bundle\n')
355 self.ui.debug('end of bundle\n')
356 yield '\0\0'
356 yield '\0\0'
357
357
358 def _paramchunk(self):
358 def _paramchunk(self):
359 """return a encoded version of all stream parameters"""
359 """return a encoded version of all stream parameters"""
360 blocks = []
360 blocks = []
361 for par, value in self._params:
361 for par, value in self._params:
362 par = urllib.quote(par)
362 par = urllib.quote(par)
363 if value is not None:
363 if value is not None:
364 value = urllib.quote(value)
364 value = urllib.quote(value)
365 par = '%s=%s' % (par, value)
365 par = '%s=%s' % (par, value)
366 blocks.append(par)
366 blocks.append(par)
367 return ' '.join(blocks)
367 return ' '.join(blocks)
368
368
369 class unbundle20(object):
369 class unbundle20(object):
370 """interpret a bundle2 stream
370 """interpret a bundle2 stream
371
371
372 (this will eventually yield parts)"""
372 (this will eventually yield parts)"""
373
373
374 def __init__(self, ui, fp):
374 def __init__(self, ui, fp):
375 self.ui = ui
375 self.ui = ui
376 self._fp = fp
376 self._fp = fp
377 header = self._readexact(4)
377 header = self._readexact(4)
378 magic, version = header[0:2], header[2:4]
378 magic, version = header[0:2], header[2:4]
379 if magic != 'HG':
379 if magic != 'HG':
380 raise util.Abort(_('not a Mercurial bundle'))
380 raise util.Abort(_('not a Mercurial bundle'))
381 if version != '20':
381 if version != '20':
382 raise util.Abort(_('unknown bundle version %s') % version)
382 raise util.Abort(_('unknown bundle version %s') % version)
383 self.ui.debug('start processing of %s stream\n' % header)
383 self.ui.debug('start processing of %s stream\n' % header)
384
384
385 def _unpack(self, format):
385 def _unpack(self, format):
386 """unpack this struct format from the stream"""
386 """unpack this struct format from the stream"""
387 data = self._readexact(struct.calcsize(format))
387 data = self._readexact(struct.calcsize(format))
388 return _unpack(format, data)
388 return _unpack(format, data)
389
389
390 def _readexact(self, size):
390 def _readexact(self, size):
391 """read exactly <size> bytes from the stream"""
391 """read exactly <size> bytes from the stream"""
392 return changegroup.readexactly(self._fp, size)
392 return changegroup.readexactly(self._fp, size)
393
393
394 @util.propertycache
394 @util.propertycache
395 def params(self):
395 def params(self):
396 """dictionnary of stream level parameters"""
396 """dictionnary of stream level parameters"""
397 self.ui.debug('reading bundle2 stream parameters\n')
397 self.ui.debug('reading bundle2 stream parameters\n')
398 params = {}
398 params = {}
399 paramssize = self._unpack(_fstreamparamsize)[0]
399 paramssize = self._unpack(_fstreamparamsize)[0]
400 if paramssize:
400 if paramssize:
401 for p in self._readexact(paramssize).split(' '):
401 for p in self._readexact(paramssize).split(' '):
402 p = p.split('=', 1)
402 p = p.split('=', 1)
403 p = [urllib.unquote(i) for i in p]
403 p = [urllib.unquote(i) for i in p]
404 if len(p) < 2:
404 if len(p) < 2:
405 p.append(None)
405 p.append(None)
406 self._processparam(*p)
406 self._processparam(*p)
407 params[p[0]] = p[1]
407 params[p[0]] = p[1]
408 return params
408 return params
409
409
410 def _processparam(self, name, value):
410 def _processparam(self, name, value):
411 """process a parameter, applying its effect if needed
411 """process a parameter, applying its effect if needed
412
412
413 Parameter starting with a lower case letter are advisory and will be
413 Parameter starting with a lower case letter are advisory and will be
414 ignored when unknown. Those starting with an upper case letter are
414 ignored when unknown. Those starting with an upper case letter are
415 mandatory and will this function will raise a KeyError when unknown.
415 mandatory and will this function will raise a KeyError when unknown.
416
416
417 Note: no option are currently supported. Any input will be either
417 Note: no option are currently supported. Any input will be either
418 ignored or failing.
418 ignored or failing.
419 """
419 """
420 if not name:
420 if not name:
421 raise ValueError('empty parameter name')
421 raise ValueError('empty parameter name')
422 if name[0] not in string.letters:
422 if name[0] not in string.letters:
423 raise ValueError('non letter first character: %r' % name)
423 raise ValueError('non letter first character: %r' % name)
424 # Some logic will be later added here to try to process the option for
424 # Some logic will be later added here to try to process the option for
425 # a dict of known parameter.
425 # a dict of known parameter.
426 if name[0].islower():
426 if name[0].islower():
427 self.ui.debug("ignoring unknown parameter %r\n" % name)
427 self.ui.debug("ignoring unknown parameter %r\n" % name)
428 else:
428 else:
429 raise KeyError(name)
429 raise KeyError(name)
430
430
431
431
432 def __iter__(self):
432 def __iter__(self):
433 """yield all parts contained in the stream"""
433 """yield all parts contained in the stream"""
434 # make sure param have been loaded
434 # make sure param have been loaded
435 self.params
435 self.params
436 self.ui.debug('start extraction of bundle2 parts\n')
436 self.ui.debug('start extraction of bundle2 parts\n')
437 part = self._readpart()
437 part = self._readpart()
438 while part is not None:
438 while part is not None:
439 yield part
439 yield part
440 part = self._readpart()
440 part = self._readpart()
441 self.ui.debug('end of bundle2 stream\n')
441 self.ui.debug('end of bundle2 stream\n')
442
442
443 def _readpart(self):
443 def _readpart(self):
444 """return None when an end of stream markers is reach"""
444 """return None when an end of stream markers is reach"""
445
445
446 headersize = self._unpack(_fpartheadersize)[0]
446 headersize = self._unpack(_fpartheadersize)[0]
447 self.ui.debug('part header size: %i\n' % headersize)
447 self.ui.debug('part header size: %i\n' % headersize)
448 if not headersize:
448 if not headersize:
449 return None
449 return None
450 headerblock = self._readexact(headersize)
450 headerblock = self._readexact(headersize)
451 # some utility to help reading from the header block
451 # some utility to help reading from the header block
452 self._offset = 0 # layer violation to have something easy to understand
452 self._offset = 0 # layer violation to have something easy to understand
453 def fromheader(size):
453 def fromheader(size):
454 """return the next <size> byte from the header"""
454 """return the next <size> byte from the header"""
455 offset = self._offset
455 offset = self._offset
456 data = headerblock[offset:(offset + size)]
456 data = headerblock[offset:(offset + size)]
457 self._offset = offset + size
457 self._offset = offset + size
458 return data
458 return data
459 def unpackheader(format):
459 def unpackheader(format):
460 """read given format from header
460 """read given format from header
461
461
462 This automatically compute the size of the format to read."""
462 This automatically compute the size of the format to read."""
463 data = fromheader(struct.calcsize(format))
463 data = fromheader(struct.calcsize(format))
464 return _unpack(format, data)
464 return _unpack(format, data)
465
465
466 typesize = unpackheader(_fparttypesize)[0]
466 typesize = unpackheader(_fparttypesize)[0]
467 parttype = fromheader(typesize)
467 parttype = fromheader(typesize)
468 self.ui.debug('part type: "%s"\n' % parttype)
468 self.ui.debug('part type: "%s"\n' % parttype)
469 partid = unpackheader(_fpartid)[0]
469 partid = unpackheader(_fpartid)[0]
470 self.ui.debug('part id: "%s"\n' % partid)
470 self.ui.debug('part id: "%s"\n' % partid)
471 ## reading parameters
471 ## reading parameters
472 # param count
472 # param count
473 mancount, advcount = unpackheader(_fpartparamcount)
473 mancount, advcount = unpackheader(_fpartparamcount)
474 self.ui.debug('part parameters: %i\n' % (mancount + advcount))
474 self.ui.debug('part parameters: %i\n' % (mancount + advcount))
475 # param size
475 # param size
476 paramsizes = unpackheader(_makefpartparamsizes(mancount + advcount))
476 paramsizes = unpackheader(_makefpartparamsizes(mancount + advcount))
477 # make it a list of couple again
477 # make it a list of couple again
478 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
478 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
479 # split mandatory from advisory
479 # split mandatory from advisory
480 mansizes = paramsizes[:mancount]
480 mansizes = paramsizes[:mancount]
481 advsizes = paramsizes[mancount:]
481 advsizes = paramsizes[mancount:]
482 # retrive param value
482 # retrive param value
483 manparams = []
483 manparams = []
484 for key, value in mansizes:
484 for key, value in mansizes:
485 manparams.append((fromheader(key), fromheader(value)))
485 manparams.append((fromheader(key), fromheader(value)))
486 advparams = []
486 advparams = []
487 for key, value in advsizes:
487 for key, value in advsizes:
488 advparams.append((fromheader(key), fromheader(value)))
488 advparams.append((fromheader(key), fromheader(value)))
489 del self._offset # clean up layer, nobody saw anything.
489 del self._offset # clean up layer, nobody saw anything.
490 ## part payload
490 ## part payload
491 payload = []
491 payload = []
492 payloadsize = self._unpack(_fpayloadsize)[0]
492 payloadsize = self._unpack(_fpayloadsize)[0]
493 self.ui.debug('payload chunk size: %i\n' % payloadsize)
493 self.ui.debug('payload chunk size: %i\n' % payloadsize)
494 while payloadsize:
494 while payloadsize:
495 payload.append(self._readexact(payloadsize))
495 payload.append(self._readexact(payloadsize))
496 payloadsize = self._unpack(_fpayloadsize)[0]
496 payloadsize = self._unpack(_fpayloadsize)[0]
497 self.ui.debug('payload chunk size: %i\n' % payloadsize)
497 self.ui.debug('payload chunk size: %i\n' % payloadsize)
498 payload = ''.join(payload)
498 payload = ''.join(payload)
499 current = part(parttype, manparams, advparams, data=payload)
499 current = part(parttype, manparams, advparams, data=payload)
500 current.id = partid
500 current.id = partid
501 return current
501 return current
502
502
503
503
504 class part(object):
504 class part(object):
505 """A bundle2 part contains application level payload
505 """A bundle2 part contains application level payload
506
506
507 The part `type` is used to route the part to the application level
507 The part `type` is used to route the part to the application level
508 handler.
508 handler.
509 """
509 """
510
510
511 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
511 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
512 data=''):
512 data=''):
513 self.id = None
513 self.id = None
514 self.type = parttype
514 self.type = parttype
515 self.data = data
515 self.data = data
516 self.mandatoryparams = mandatoryparams
516 self.mandatoryparams = mandatoryparams
517 self.advisoryparams = advisoryparams
517 self.advisoryparams = advisoryparams
518
518
519 def getchunks(self):
519 def getchunks(self):
520 #### header
520 #### header
521 ## parttype
521 ## parttype
522 header = [_pack(_fparttypesize, len(self.type)),
522 header = [_pack(_fparttypesize, len(self.type)),
523 self.type, _pack(_fpartid, self.id),
523 self.type, _pack(_fpartid, self.id),
524 ]
524 ]
525 ## parameters
525 ## parameters
526 # count
526 # count
527 manpar = self.mandatoryparams
527 manpar = self.mandatoryparams
528 advpar = self.advisoryparams
528 advpar = self.advisoryparams
529 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
529 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
530 # size
530 # size
531 parsizes = []
531 parsizes = []
532 for key, value in manpar:
532 for key, value in manpar:
533 parsizes.append(len(key))
533 parsizes.append(len(key))
534 parsizes.append(len(value))
534 parsizes.append(len(value))
535 for key, value in advpar:
535 for key, value in advpar:
536 parsizes.append(len(key))
536 parsizes.append(len(key))
537 parsizes.append(len(value))
537 parsizes.append(len(value))
538 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
538 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
539 header.append(paramsizes)
539 header.append(paramsizes)
540 # key, value
540 # key, value
541 for key, value in manpar:
541 for key, value in manpar:
542 header.append(key)
542 header.append(key)
543 header.append(value)
543 header.append(value)
544 for key, value in advpar:
544 for key, value in advpar:
545 header.append(key)
545 header.append(key)
546 header.append(value)
546 header.append(value)
547 ## finalize header
547 ## finalize header
548 headerchunk = ''.join(header)
548 headerchunk = ''.join(header)
549 yield _pack(_fpartheadersize, len(headerchunk))
549 yield _pack(_fpartheadersize, len(headerchunk))
550 yield headerchunk
550 yield headerchunk
551 ## payload
551 ## payload
552 for chunk in self._payloadchunks():
553 yield _pack(_fpayloadsize, len(chunk))
554 yield chunk
555 # end of payload
556 yield _pack(_fpayloadsize, 0)
557
558 def _payloadchunks(self):
559 """yield chunks of a the part payload
560
561 Exists to handle the different methods to provide data to a part."""
552 # we only support fixed size data now.
562 # we only support fixed size data now.
553 # This will be improved in the future.
563 # This will be improved in the future.
554 if len(self.data):
564 if len(self.data):
555 yield _pack(_fpayloadsize, len(self.data))
556 yield self.data
565 yield self.data
557 # end of payload
558 yield _pack(_fpayloadsize, 0)
559
566
560 @parthandler('changegroup')
567 @parthandler('changegroup')
561 def handlechangegroup(op, inpart):
568 def handlechangegroup(op, inpart):
562 """apply a changegroup part on the repo
569 """apply a changegroup part on the repo
563
570
564 This is a very early implementation that will massive rework before being
571 This is a very early implementation that will massive rework before being
565 inflicted to any end-user.
572 inflicted to any end-user.
566 """
573 """
567 # Make sure we trigger a transaction creation
574 # Make sure we trigger a transaction creation
568 #
575 #
569 # The addchangegroup function will get a transaction object by itself, but
576 # The addchangegroup function will get a transaction object by itself, but
570 # we need to make sure we trigger the creation of a transaction object used
577 # we need to make sure we trigger the creation of a transaction object used
571 # for the whole processing scope.
578 # for the whole processing scope.
572 op.gettransaction()
579 op.gettransaction()
573 data = StringIO.StringIO(inpart.data)
580 data = StringIO.StringIO(inpart.data)
574 data.seek(0)
581 data.seek(0)
575 cg = changegroup.readbundle(data, 'bundle2part')
582 cg = changegroup.readbundle(data, 'bundle2part')
576 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
583 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
577 op.records.add('changegroup', {'return': ret})
584 op.records.add('changegroup', {'return': ret})
578 if op.reply is not None:
585 if op.reply is not None:
579 # This is definitly not the final form of this
586 # This is definitly not the final form of this
580 # return. But one need to start somewhere.
587 # return. But one need to start somewhere.
581 op.reply.addpart(part('reply:changegroup', (),
588 op.reply.addpart(part('reply:changegroup', (),
582 [('in-reply-to', str(inpart.id)),
589 [('in-reply-to', str(inpart.id)),
583 ('return', '%i' % ret)]))
590 ('return', '%i' % ret)]))
584
591
585 @parthandler('reply:changegroup')
592 @parthandler('reply:changegroup')
586 def handlechangegroup(op, inpart):
593 def handlechangegroup(op, inpart):
587 p = dict(inpart.advisoryparams)
594 p = dict(inpart.advisoryparams)
588 ret = int(p['return'])
595 ret = int(p['return'])
589 op.records.add('changegroup', {'return': ret}, int(p['in-reply-to']))
596 op.records.add('changegroup', {'return': ret}, int(p['in-reply-to']))
590
597
General Comments 0
You need to be logged in to leave comments. Login now