##// END OF EJS Templates
bundle2: add a ``newpart`` method to ``bundle20``...
Pierre-Yves David -
r21598:1b0dbb91 default
parent child Browse files
Show More
@@ -1,770 +1,775
1 # bundle2.py - generic container format to transmit arbitrary data.
1 # bundle2.py - generic container format to transmit arbitrary data.
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7 """Handling of the new bundle2 format
7 """Handling of the new bundle2 format
8
8
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 payloads in an application agnostic way. It consist in a sequence of "parts"
10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 that will be handed to and processed by the application layer.
11 that will be handed to and processed by the application layer.
12
12
13
13
14 General format architecture
14 General format architecture
15 ===========================
15 ===========================
16
16
17 The format is architectured as follow
17 The format is architectured as follow
18
18
19 - magic string
19 - magic string
20 - stream level parameters
20 - stream level parameters
21 - payload parts (any number)
21 - payload parts (any number)
22 - end of stream marker.
22 - end of stream marker.
23
23
24 the Binary format
24 the Binary format
25 ============================
25 ============================
26
26
27 All numbers are unsigned and big-endian.
27 All numbers are unsigned and big-endian.
28
28
29 stream level parameters
29 stream level parameters
30 ------------------------
30 ------------------------
31
31
32 Binary format is as follow
32 Binary format is as follow
33
33
34 :params size: (16 bits integer)
34 :params size: (16 bits integer)
35
35
36 The total number of Bytes used by the parameters
36 The total number of Bytes used by the parameters
37
37
38 :params value: arbitrary number of Bytes
38 :params value: arbitrary number of Bytes
39
39
40 A blob of `params size` containing the serialized version of all stream level
40 A blob of `params size` containing the serialized version of all stream level
41 parameters.
41 parameters.
42
42
43 The blob contains a space separated list of parameters. Parameters with value
43 The blob contains a space separated list of parameters. Parameters with value
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45
45
46 Empty name are obviously forbidden.
46 Empty name are obviously forbidden.
47
47
48 Name MUST start with a letter. If this first letter is lower case, the
48 Name MUST start with a letter. If this first letter is lower case, the
49 parameter is advisory and can be safely ignored. However when the first
49 parameter is advisory and can be safely ignored. However when the first
50 letter is capital, the parameter is mandatory and the bundling process MUST
50 letter is capital, the parameter is mandatory and the bundling process MUST
51 stop if he is not able to proceed it.
51 stop if he is not able to proceed it.
52
52
53 Stream parameters use a simple textual format for two main reasons:
53 Stream parameters use a simple textual format for two main reasons:
54
54
55 - Stream level parameters should remain simple and we want to discourage any
55 - Stream level parameters should remain simple and we want to discourage any
56 crazy usage.
56 crazy usage.
57 - Textual data allow easy human inspection of a bundle2 header in case of
57 - Textual data allow easy human inspection of a bundle2 header in case of
58 troubles.
58 troubles.
59
59
60 Any Applicative level options MUST go into a bundle2 part instead.
60 Any Applicative level options MUST go into a bundle2 part instead.
61
61
62 Payload part
62 Payload part
63 ------------------------
63 ------------------------
64
64
65 Binary format is as follow
65 Binary format is as follow
66
66
67 :header size: (16 bits inter)
67 :header size: (16 bits inter)
68
68
69 The total number of Bytes used by the part headers. When the header is empty
69 The total number of Bytes used by the part headers. When the header is empty
70 (size = 0) this is interpreted as the end of stream marker.
70 (size = 0) this is interpreted as the end of stream marker.
71
71
72 :header:
72 :header:
73
73
74 The header defines how to interpret the part. It contains two piece of
74 The header defines how to interpret the part. It contains two piece of
75 data: the part type, and the part parameters.
75 data: the part type, and the part parameters.
76
76
77 The part type is used to route an application level handler, that can
77 The part type is used to route an application level handler, that can
78 interpret payload.
78 interpret payload.
79
79
80 Part parameters are passed to the application level handler. They are
80 Part parameters are passed to the application level handler. They are
81 meant to convey information that will help the application level object to
81 meant to convey information that will help the application level object to
82 interpret the part payload.
82 interpret the part payload.
83
83
84 The binary format of the header is has follow
84 The binary format of the header is has follow
85
85
86 :typesize: (one byte)
86 :typesize: (one byte)
87
87
88 :parttype: alphanumerical part name
88 :parttype: alphanumerical part name
89
89
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 to this part.
91 to this part.
92
92
93 :parameters:
93 :parameters:
94
94
95 Part's parameter may have arbitrary content, the binary structure is::
95 Part's parameter may have arbitrary content, the binary structure is::
96
96
97 <mandatory-count><advisory-count><param-sizes><param-data>
97 <mandatory-count><advisory-count><param-sizes><param-data>
98
98
99 :mandatory-count: 1 byte, number of mandatory parameters
99 :mandatory-count: 1 byte, number of mandatory parameters
100
100
101 :advisory-count: 1 byte, number of advisory parameters
101 :advisory-count: 1 byte, number of advisory parameters
102
102
103 :param-sizes:
103 :param-sizes:
104
104
105 N couple of bytes, where N is the total number of parameters. Each
105 N couple of bytes, where N is the total number of parameters. Each
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107
107
108 :param-data:
108 :param-data:
109
109
110 A blob of bytes from which each parameter key and value can be
110 A blob of bytes from which each parameter key and value can be
111 retrieved using the list of size couples stored in the previous
111 retrieved using the list of size couples stored in the previous
112 field.
112 field.
113
113
114 Mandatory parameters comes first, then the advisory ones.
114 Mandatory parameters comes first, then the advisory ones.
115
115
116 :payload:
116 :payload:
117
117
118 payload is a series of `<chunksize><chunkdata>`.
118 payload is a series of `<chunksize><chunkdata>`.
119
119
120 `chunksize` is a 32 bits integer, `chunkdata` are plain bytes (as much as
120 `chunksize` is a 32 bits integer, `chunkdata` are plain bytes (as much as
121 `chunksize` says)` The payload part is concluded by a zero size chunk.
121 `chunksize` says)` The payload part is concluded by a zero size chunk.
122
122
123 The current implementation always produces either zero or one chunk.
123 The current implementation always produces either zero or one chunk.
124 This is an implementation limitation that will ultimately be lifted.
124 This is an implementation limitation that will ultimately be lifted.
125
125
126 Bundle processing
126 Bundle processing
127 ============================
127 ============================
128
128
129 Each part is processed in order using a "part handler". Handler are registered
129 Each part is processed in order using a "part handler". Handler are registered
130 for a certain part type.
130 for a certain part type.
131
131
132 The matching of a part to its handler is case insensitive. The case of the
132 The matching of a part to its handler is case insensitive. The case of the
133 part type is used to know if a part is mandatory or advisory. If the Part type
133 part type is used to know if a part is mandatory or advisory. If the Part type
134 contains any uppercase char it is considered mandatory. When no handler is
134 contains any uppercase char it is considered mandatory. When no handler is
135 known for a Mandatory part, the process is aborted and an exception is raised.
135 known for a Mandatory part, the process is aborted and an exception is raised.
136 If the part is advisory and no handler is known, the part is ignored. When the
136 If the part is advisory and no handler is known, the part is ignored. When the
137 process is aborted, the full bundle is still read from the stream to keep the
137 process is aborted, the full bundle is still read from the stream to keep the
138 channel usable. But none of the part read from an abort are processed. In the
138 channel usable. But none of the part read from an abort are processed. In the
139 future, dropping the stream may become an option for channel we do not care to
139 future, dropping the stream may become an option for channel we do not care to
140 preserve.
140 preserve.
141 """
141 """
142
142
143 import util
143 import util
144 import struct
144 import struct
145 import urllib
145 import urllib
146 import string
146 import string
147
147
148 import changegroup, error
148 import changegroup, error
149 from i18n import _
149 from i18n import _
150
150
151 _pack = struct.pack
151 _pack = struct.pack
152 _unpack = struct.unpack
152 _unpack = struct.unpack
153
153
154 _magicstring = 'HG2X'
154 _magicstring = 'HG2X'
155
155
156 _fstreamparamsize = '>H'
156 _fstreamparamsize = '>H'
157 _fpartheadersize = '>H'
157 _fpartheadersize = '>H'
158 _fparttypesize = '>B'
158 _fparttypesize = '>B'
159 _fpartid = '>I'
159 _fpartid = '>I'
160 _fpayloadsize = '>I'
160 _fpayloadsize = '>I'
161 _fpartparamcount = '>BB'
161 _fpartparamcount = '>BB'
162
162
163 preferedchunksize = 4096
163 preferedchunksize = 4096
164
164
165 def _makefpartparamsizes(nbparams):
165 def _makefpartparamsizes(nbparams):
166 """return a struct format to read part parameter sizes
166 """return a struct format to read part parameter sizes
167
167
168 The number parameters is variable so we need to build that format
168 The number parameters is variable so we need to build that format
169 dynamically.
169 dynamically.
170 """
170 """
171 return '>'+('BB'*nbparams)
171 return '>'+('BB'*nbparams)
172
172
173 class UnknownPartError(KeyError):
173 class UnknownPartError(KeyError):
174 """error raised when no handler is found for a Mandatory part"""
174 """error raised when no handler is found for a Mandatory part"""
175 pass
175 pass
176
176
177 parthandlermapping = {}
177 parthandlermapping = {}
178
178
179 def parthandler(parttype):
179 def parthandler(parttype):
180 """decorator that register a function as a bundle2 part handler
180 """decorator that register a function as a bundle2 part handler
181
181
182 eg::
182 eg::
183
183
184 @parthandler('myparttype')
184 @parthandler('myparttype')
185 def myparttypehandler(...):
185 def myparttypehandler(...):
186 '''process a part of type "my part".'''
186 '''process a part of type "my part".'''
187 ...
187 ...
188 """
188 """
189 def _decorator(func):
189 def _decorator(func):
190 lparttype = parttype.lower() # enforce lower case matching.
190 lparttype = parttype.lower() # enforce lower case matching.
191 assert lparttype not in parthandlermapping
191 assert lparttype not in parthandlermapping
192 parthandlermapping[lparttype] = func
192 parthandlermapping[lparttype] = func
193 return func
193 return func
194 return _decorator
194 return _decorator
195
195
196 class unbundlerecords(object):
196 class unbundlerecords(object):
197 """keep record of what happens during and unbundle
197 """keep record of what happens during and unbundle
198
198
199 New records are added using `records.add('cat', obj)`. Where 'cat' is a
199 New records are added using `records.add('cat', obj)`. Where 'cat' is a
200 category of record and obj is an arbitrary object.
200 category of record and obj is an arbitrary object.
201
201
202 `records['cat']` will return all entries of this category 'cat'.
202 `records['cat']` will return all entries of this category 'cat'.
203
203
204 Iterating on the object itself will yield `('category', obj)` tuples
204 Iterating on the object itself will yield `('category', obj)` tuples
205 for all entries.
205 for all entries.
206
206
207 All iterations happens in chronological order.
207 All iterations happens in chronological order.
208 """
208 """
209
209
210 def __init__(self):
210 def __init__(self):
211 self._categories = {}
211 self._categories = {}
212 self._sequences = []
212 self._sequences = []
213 self._replies = {}
213 self._replies = {}
214
214
215 def add(self, category, entry, inreplyto=None):
215 def add(self, category, entry, inreplyto=None):
216 """add a new record of a given category.
216 """add a new record of a given category.
217
217
218 The entry can then be retrieved in the list returned by
218 The entry can then be retrieved in the list returned by
219 self['category']."""
219 self['category']."""
220 self._categories.setdefault(category, []).append(entry)
220 self._categories.setdefault(category, []).append(entry)
221 self._sequences.append((category, entry))
221 self._sequences.append((category, entry))
222 if inreplyto is not None:
222 if inreplyto is not None:
223 self.getreplies(inreplyto).add(category, entry)
223 self.getreplies(inreplyto).add(category, entry)
224
224
225 def getreplies(self, partid):
225 def getreplies(self, partid):
226 """get the subrecords that replies to a specific part"""
226 """get the subrecords that replies to a specific part"""
227 return self._replies.setdefault(partid, unbundlerecords())
227 return self._replies.setdefault(partid, unbundlerecords())
228
228
229 def __getitem__(self, cat):
229 def __getitem__(self, cat):
230 return tuple(self._categories.get(cat, ()))
230 return tuple(self._categories.get(cat, ()))
231
231
232 def __iter__(self):
232 def __iter__(self):
233 return iter(self._sequences)
233 return iter(self._sequences)
234
234
235 def __len__(self):
235 def __len__(self):
236 return len(self._sequences)
236 return len(self._sequences)
237
237
238 def __nonzero__(self):
238 def __nonzero__(self):
239 return bool(self._sequences)
239 return bool(self._sequences)
240
240
241 class bundleoperation(object):
241 class bundleoperation(object):
242 """an object that represents a single bundling process
242 """an object that represents a single bundling process
243
243
244 Its purpose is to carry unbundle-related objects and states.
244 Its purpose is to carry unbundle-related objects and states.
245
245
246 A new object should be created at the beginning of each bundle processing.
246 A new object should be created at the beginning of each bundle processing.
247 The object is to be returned by the processing function.
247 The object is to be returned by the processing function.
248
248
249 The object has very little content now it will ultimately contain:
249 The object has very little content now it will ultimately contain:
250 * an access to the repo the bundle is applied to,
250 * an access to the repo the bundle is applied to,
251 * a ui object,
251 * a ui object,
252 * a way to retrieve a transaction to add changes to the repo,
252 * a way to retrieve a transaction to add changes to the repo,
253 * a way to record the result of processing each part,
253 * a way to record the result of processing each part,
254 * a way to construct a bundle response when applicable.
254 * a way to construct a bundle response when applicable.
255 """
255 """
256
256
257 def __init__(self, repo, transactiongetter):
257 def __init__(self, repo, transactiongetter):
258 self.repo = repo
258 self.repo = repo
259 self.ui = repo.ui
259 self.ui = repo.ui
260 self.records = unbundlerecords()
260 self.records = unbundlerecords()
261 self.gettransaction = transactiongetter
261 self.gettransaction = transactiongetter
262 self.reply = None
262 self.reply = None
263
263
264 class TransactionUnavailable(RuntimeError):
264 class TransactionUnavailable(RuntimeError):
265 pass
265 pass
266
266
267 def _notransaction():
267 def _notransaction():
268 """default method to get a transaction while processing a bundle
268 """default method to get a transaction while processing a bundle
269
269
270 Raise an exception to highlight the fact that no transaction was expected
270 Raise an exception to highlight the fact that no transaction was expected
271 to be created"""
271 to be created"""
272 raise TransactionUnavailable()
272 raise TransactionUnavailable()
273
273
274 def processbundle(repo, unbundler, transactiongetter=_notransaction):
274 def processbundle(repo, unbundler, transactiongetter=_notransaction):
275 """This function process a bundle, apply effect to/from a repo
275 """This function process a bundle, apply effect to/from a repo
276
276
277 It iterates over each part then searches for and uses the proper handling
277 It iterates over each part then searches for and uses the proper handling
278 code to process the part. Parts are processed in order.
278 code to process the part. Parts are processed in order.
279
279
280 This is very early version of this function that will be strongly reworked
280 This is very early version of this function that will be strongly reworked
281 before final usage.
281 before final usage.
282
282
283 Unknown Mandatory part will abort the process.
283 Unknown Mandatory part will abort the process.
284 """
284 """
285 op = bundleoperation(repo, transactiongetter)
285 op = bundleoperation(repo, transactiongetter)
286 # todo:
286 # todo:
287 # - replace this is a init function soon.
287 # - replace this is a init function soon.
288 # - exception catching
288 # - exception catching
289 unbundler.params
289 unbundler.params
290 iterparts = unbundler.iterparts()
290 iterparts = unbundler.iterparts()
291 part = None
291 part = None
292 try:
292 try:
293 for part in iterparts:
293 for part in iterparts:
294 parttype = part.type
294 parttype = part.type
295 # part key are matched lower case
295 # part key are matched lower case
296 key = parttype.lower()
296 key = parttype.lower()
297 try:
297 try:
298 handler = parthandlermapping[key]
298 handler = parthandlermapping[key]
299 op.ui.debug('found a handler for part %r\n' % parttype)
299 op.ui.debug('found a handler for part %r\n' % parttype)
300 except KeyError:
300 except KeyError:
301 if key != parttype: # mandatory parts
301 if key != parttype: # mandatory parts
302 # todo:
302 # todo:
303 # - use a more precise exception
303 # - use a more precise exception
304 raise UnknownPartError(key)
304 raise UnknownPartError(key)
305 op.ui.debug('ignoring unknown advisory part %r\n' % key)
305 op.ui.debug('ignoring unknown advisory part %r\n' % key)
306 # consuming the part
306 # consuming the part
307 part.read()
307 part.read()
308 continue
308 continue
309
309
310 # handler is called outside the above try block so that we don't
310 # handler is called outside the above try block so that we don't
311 # risk catching KeyErrors from anything other than the
311 # risk catching KeyErrors from anything other than the
312 # parthandlermapping lookup (any KeyError raised by handler()
312 # parthandlermapping lookup (any KeyError raised by handler()
313 # itself represents a defect of a different variety).
313 # itself represents a defect of a different variety).
314 output = None
314 output = None
315 if op.reply is not None:
315 if op.reply is not None:
316 op.ui.pushbuffer(error=True)
316 op.ui.pushbuffer(error=True)
317 output = ''
317 output = ''
318 try:
318 try:
319 handler(op, part)
319 handler(op, part)
320 finally:
320 finally:
321 if output is not None:
321 if output is not None:
322 output = op.ui.popbuffer()
322 output = op.ui.popbuffer()
323 if output:
323 if output:
324 outpart = bundlepart('b2x:output',
324 outpart = bundlepart('b2x:output',
325 advisoryparams=[('in-reply-to',
325 advisoryparams=[('in-reply-to',
326 str(part.id))],
326 str(part.id))],
327 data=output)
327 data=output)
328 op.reply.addpart(outpart)
328 op.reply.addpart(outpart)
329 part.read()
329 part.read()
330 except Exception, exc:
330 except Exception, exc:
331 if part is not None:
331 if part is not None:
332 # consume the bundle content
332 # consume the bundle content
333 part.read()
333 part.read()
334 for part in iterparts:
334 for part in iterparts:
335 # consume the bundle content
335 # consume the bundle content
336 part.read()
336 part.read()
337 # Small hack to let caller code distinguish exceptions from bundle2
337 # Small hack to let caller code distinguish exceptions from bundle2
338 # processing fron the ones from bundle1 processing. This is mostly
338 # processing fron the ones from bundle1 processing. This is mostly
339 # needed to handle different return codes to unbundle according to the
339 # needed to handle different return codes to unbundle according to the
340 # type of bundle. We should probably clean up or drop this return code
340 # type of bundle. We should probably clean up or drop this return code
341 # craziness in a future version.
341 # craziness in a future version.
342 exc.duringunbundle2 = True
342 exc.duringunbundle2 = True
343 raise
343 raise
344 return op
344 return op
345
345
346 def decodecaps(blob):
346 def decodecaps(blob):
347 """decode a bundle2 caps bytes blob into a dictionnary
347 """decode a bundle2 caps bytes blob into a dictionnary
348
348
349 The blob is a list of capabilities (one per line)
349 The blob is a list of capabilities (one per line)
350 Capabilities may have values using a line of the form::
350 Capabilities may have values using a line of the form::
351
351
352 capability=value1,value2,value3
352 capability=value1,value2,value3
353
353
354 The values are always a list."""
354 The values are always a list."""
355 caps = {}
355 caps = {}
356 for line in blob.splitlines():
356 for line in blob.splitlines():
357 if not line:
357 if not line:
358 continue
358 continue
359 if '=' not in line:
359 if '=' not in line:
360 key, vals = line, ()
360 key, vals = line, ()
361 else:
361 else:
362 key, vals = line.split('=', 1)
362 key, vals = line.split('=', 1)
363 vals = vals.split(',')
363 vals = vals.split(',')
364 key = urllib.unquote(key)
364 key = urllib.unquote(key)
365 vals = [urllib.unquote(v) for v in vals]
365 vals = [urllib.unquote(v) for v in vals]
366 caps[key] = vals
366 caps[key] = vals
367 return caps
367 return caps
368
368
369 def encodecaps(caps):
369 def encodecaps(caps):
370 """encode a bundle2 caps dictionary into a bytes blob"""
370 """encode a bundle2 caps dictionary into a bytes blob"""
371 chunks = []
371 chunks = []
372 for ca in sorted(caps):
372 for ca in sorted(caps):
373 vals = caps[ca]
373 vals = caps[ca]
374 ca = urllib.quote(ca)
374 ca = urllib.quote(ca)
375 vals = [urllib.quote(v) for v in vals]
375 vals = [urllib.quote(v) for v in vals]
376 if vals:
376 if vals:
377 ca = "%s=%s" % (ca, ','.join(vals))
377 ca = "%s=%s" % (ca, ','.join(vals))
378 chunks.append(ca)
378 chunks.append(ca)
379 return '\n'.join(chunks)
379 return '\n'.join(chunks)
380
380
381 class bundle20(object):
381 class bundle20(object):
382 """represent an outgoing bundle2 container
382 """represent an outgoing bundle2 container
383
383
384 Use the `addparam` method to add stream level parameter. and `addpart` to
384 Use the `addparam` method to add stream level parameter. and `addpart` to
385 populate it. Then call `getchunks` to retrieve all the binary chunks of
385 populate it. Then call `getchunks` to retrieve all the binary chunks of
386 data that compose the bundle2 container."""
386 data that compose the bundle2 container."""
387
387
388 def __init__(self, ui, capabilities=()):
388 def __init__(self, ui, capabilities=()):
389 self.ui = ui
389 self.ui = ui
390 self._params = []
390 self._params = []
391 self._parts = []
391 self._parts = []
392 self.capabilities = dict(capabilities)
392 self.capabilities = dict(capabilities)
393
393
394 # methods used to defines the bundle2 content
394 # methods used to defines the bundle2 content
395 def addparam(self, name, value=None):
395 def addparam(self, name, value=None):
396 """add a stream level parameter"""
396 """add a stream level parameter"""
397 if not name:
397 if not name:
398 raise ValueError('empty parameter name')
398 raise ValueError('empty parameter name')
399 if name[0] not in string.letters:
399 if name[0] not in string.letters:
400 raise ValueError('non letter first character: %r' % name)
400 raise ValueError('non letter first character: %r' % name)
401 self._params.append((name, value))
401 self._params.append((name, value))
402
402
403 def addpart(self, part):
403 def addpart(self, part):
404 """add a new part to the bundle2 container
404 """add a new part to the bundle2 container
405
405
406 Parts contains the actual applicative payload."""
406 Parts contains the actual applicative payload."""
407 assert part.id is None
407 assert part.id is None
408 part.id = len(self._parts) # very cheap counter
408 part.id = len(self._parts) # very cheap counter
409 self._parts.append(part)
409 self._parts.append(part)
410
410
411 def newpart(self, typeid, *args, **kwargs):
412 """create a new part for the containers"""
413 part = bundlepart(typeid, *args, **kwargs)
414 return part
415
411 # methods used to generate the bundle2 stream
416 # methods used to generate the bundle2 stream
412 def getchunks(self):
417 def getchunks(self):
413 self.ui.debug('start emission of %s stream\n' % _magicstring)
418 self.ui.debug('start emission of %s stream\n' % _magicstring)
414 yield _magicstring
419 yield _magicstring
415 param = self._paramchunk()
420 param = self._paramchunk()
416 self.ui.debug('bundle parameter: %s\n' % param)
421 self.ui.debug('bundle parameter: %s\n' % param)
417 yield _pack(_fstreamparamsize, len(param))
422 yield _pack(_fstreamparamsize, len(param))
418 if param:
423 if param:
419 yield param
424 yield param
420
425
421 self.ui.debug('start of parts\n')
426 self.ui.debug('start of parts\n')
422 for part in self._parts:
427 for part in self._parts:
423 self.ui.debug('bundle part: "%s"\n' % part.type)
428 self.ui.debug('bundle part: "%s"\n' % part.type)
424 for chunk in part.getchunks():
429 for chunk in part.getchunks():
425 yield chunk
430 yield chunk
426 self.ui.debug('end of bundle\n')
431 self.ui.debug('end of bundle\n')
427 yield '\0\0'
432 yield '\0\0'
428
433
429 def _paramchunk(self):
434 def _paramchunk(self):
430 """return a encoded version of all stream parameters"""
435 """return a encoded version of all stream parameters"""
431 blocks = []
436 blocks = []
432 for par, value in self._params:
437 for par, value in self._params:
433 par = urllib.quote(par)
438 par = urllib.quote(par)
434 if value is not None:
439 if value is not None:
435 value = urllib.quote(value)
440 value = urllib.quote(value)
436 par = '%s=%s' % (par, value)
441 par = '%s=%s' % (par, value)
437 blocks.append(par)
442 blocks.append(par)
438 return ' '.join(blocks)
443 return ' '.join(blocks)
439
444
440 class unpackermixin(object):
445 class unpackermixin(object):
441 """A mixin to extract bytes and struct data from a stream"""
446 """A mixin to extract bytes and struct data from a stream"""
442
447
443 def __init__(self, fp):
448 def __init__(self, fp):
444 self._fp = fp
449 self._fp = fp
445
450
446 def _unpack(self, format):
451 def _unpack(self, format):
447 """unpack this struct format from the stream"""
452 """unpack this struct format from the stream"""
448 data = self._readexact(struct.calcsize(format))
453 data = self._readexact(struct.calcsize(format))
449 return _unpack(format, data)
454 return _unpack(format, data)
450
455
451 def _readexact(self, size):
456 def _readexact(self, size):
452 """read exactly <size> bytes from the stream"""
457 """read exactly <size> bytes from the stream"""
453 return changegroup.readexactly(self._fp, size)
458 return changegroup.readexactly(self._fp, size)
454
459
455
460
456 class unbundle20(unpackermixin):
461 class unbundle20(unpackermixin):
457 """interpret a bundle2 stream
462 """interpret a bundle2 stream
458
463
459 This class is fed with a binary stream and yields parts through its
464 This class is fed with a binary stream and yields parts through its
460 `iterparts` methods."""
465 `iterparts` methods."""
461
466
462 def __init__(self, ui, fp, header=None):
467 def __init__(self, ui, fp, header=None):
463 """If header is specified, we do not read it out of the stream."""
468 """If header is specified, we do not read it out of the stream."""
464 self.ui = ui
469 self.ui = ui
465 super(unbundle20, self).__init__(fp)
470 super(unbundle20, self).__init__(fp)
466 if header is None:
471 if header is None:
467 header = self._readexact(4)
472 header = self._readexact(4)
468 magic, version = header[0:2], header[2:4]
473 magic, version = header[0:2], header[2:4]
469 if magic != 'HG':
474 if magic != 'HG':
470 raise util.Abort(_('not a Mercurial bundle'))
475 raise util.Abort(_('not a Mercurial bundle'))
471 if version != '2X':
476 if version != '2X':
472 raise util.Abort(_('unknown bundle version %s') % version)
477 raise util.Abort(_('unknown bundle version %s') % version)
473 self.ui.debug('start processing of %s stream\n' % header)
478 self.ui.debug('start processing of %s stream\n' % header)
474
479
475 @util.propertycache
480 @util.propertycache
476 def params(self):
481 def params(self):
477 """dictionary of stream level parameters"""
482 """dictionary of stream level parameters"""
478 self.ui.debug('reading bundle2 stream parameters\n')
483 self.ui.debug('reading bundle2 stream parameters\n')
479 params = {}
484 params = {}
480 paramssize = self._unpack(_fstreamparamsize)[0]
485 paramssize = self._unpack(_fstreamparamsize)[0]
481 if paramssize:
486 if paramssize:
482 for p in self._readexact(paramssize).split(' '):
487 for p in self._readexact(paramssize).split(' '):
483 p = p.split('=', 1)
488 p = p.split('=', 1)
484 p = [urllib.unquote(i) for i in p]
489 p = [urllib.unquote(i) for i in p]
485 if len(p) < 2:
490 if len(p) < 2:
486 p.append(None)
491 p.append(None)
487 self._processparam(*p)
492 self._processparam(*p)
488 params[p[0]] = p[1]
493 params[p[0]] = p[1]
489 return params
494 return params
490
495
491 def _processparam(self, name, value):
496 def _processparam(self, name, value):
492 """process a parameter, applying its effect if needed
497 """process a parameter, applying its effect if needed
493
498
494 Parameter starting with a lower case letter are advisory and will be
499 Parameter starting with a lower case letter are advisory and will be
495 ignored when unknown. Those starting with an upper case letter are
500 ignored when unknown. Those starting with an upper case letter are
496 mandatory and will this function will raise a KeyError when unknown.
501 mandatory and will this function will raise a KeyError when unknown.
497
502
498 Note: no option are currently supported. Any input will be either
503 Note: no option are currently supported. Any input will be either
499 ignored or failing.
504 ignored or failing.
500 """
505 """
501 if not name:
506 if not name:
502 raise ValueError('empty parameter name')
507 raise ValueError('empty parameter name')
503 if name[0] not in string.letters:
508 if name[0] not in string.letters:
504 raise ValueError('non letter first character: %r' % name)
509 raise ValueError('non letter first character: %r' % name)
505 # Some logic will be later added here to try to process the option for
510 # Some logic will be later added here to try to process the option for
506 # a dict of known parameter.
511 # a dict of known parameter.
507 if name[0].islower():
512 if name[0].islower():
508 self.ui.debug("ignoring unknown parameter %r\n" % name)
513 self.ui.debug("ignoring unknown parameter %r\n" % name)
509 else:
514 else:
510 raise KeyError(name)
515 raise KeyError(name)
511
516
512
517
513 def iterparts(self):
518 def iterparts(self):
514 """yield all parts contained in the stream"""
519 """yield all parts contained in the stream"""
515 # make sure param have been loaded
520 # make sure param have been loaded
516 self.params
521 self.params
517 self.ui.debug('start extraction of bundle2 parts\n')
522 self.ui.debug('start extraction of bundle2 parts\n')
518 headerblock = self._readpartheader()
523 headerblock = self._readpartheader()
519 while headerblock is not None:
524 while headerblock is not None:
520 part = unbundlepart(self.ui, headerblock, self._fp)
525 part = unbundlepart(self.ui, headerblock, self._fp)
521 yield part
526 yield part
522 headerblock = self._readpartheader()
527 headerblock = self._readpartheader()
523 self.ui.debug('end of bundle2 stream\n')
528 self.ui.debug('end of bundle2 stream\n')
524
529
525 def _readpartheader(self):
530 def _readpartheader(self):
526 """reads a part header size and return the bytes blob
531 """reads a part header size and return the bytes blob
527
532
528 returns None if empty"""
533 returns None if empty"""
529 headersize = self._unpack(_fpartheadersize)[0]
534 headersize = self._unpack(_fpartheadersize)[0]
530 self.ui.debug('part header size: %i\n' % headersize)
535 self.ui.debug('part header size: %i\n' % headersize)
531 if headersize:
536 if headersize:
532 return self._readexact(headersize)
537 return self._readexact(headersize)
533 return None
538 return None
534
539
535
540
536 class bundlepart(object):
541 class bundlepart(object):
537 """A bundle2 part contains application level payload
542 """A bundle2 part contains application level payload
538
543
539 The part `type` is used to route the part to the application level
544 The part `type` is used to route the part to the application level
540 handler.
545 handler.
541 """
546 """
542
547
543 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
548 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
544 data=''):
549 data=''):
545 self.id = None
550 self.id = None
546 self.type = parttype
551 self.type = parttype
547 self.data = data
552 self.data = data
548 self.mandatoryparams = mandatoryparams
553 self.mandatoryparams = mandatoryparams
549 self.advisoryparams = advisoryparams
554 self.advisoryparams = advisoryparams
550
555
551 def getchunks(self):
556 def getchunks(self):
552 #### header
557 #### header
553 ## parttype
558 ## parttype
554 header = [_pack(_fparttypesize, len(self.type)),
559 header = [_pack(_fparttypesize, len(self.type)),
555 self.type, _pack(_fpartid, self.id),
560 self.type, _pack(_fpartid, self.id),
556 ]
561 ]
557 ## parameters
562 ## parameters
558 # count
563 # count
559 manpar = self.mandatoryparams
564 manpar = self.mandatoryparams
560 advpar = self.advisoryparams
565 advpar = self.advisoryparams
561 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
566 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
562 # size
567 # size
563 parsizes = []
568 parsizes = []
564 for key, value in manpar:
569 for key, value in manpar:
565 parsizes.append(len(key))
570 parsizes.append(len(key))
566 parsizes.append(len(value))
571 parsizes.append(len(value))
567 for key, value in advpar:
572 for key, value in advpar:
568 parsizes.append(len(key))
573 parsizes.append(len(key))
569 parsizes.append(len(value))
574 parsizes.append(len(value))
570 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
575 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
571 header.append(paramsizes)
576 header.append(paramsizes)
572 # key, value
577 # key, value
573 for key, value in manpar:
578 for key, value in manpar:
574 header.append(key)
579 header.append(key)
575 header.append(value)
580 header.append(value)
576 for key, value in advpar:
581 for key, value in advpar:
577 header.append(key)
582 header.append(key)
578 header.append(value)
583 header.append(value)
579 ## finalize header
584 ## finalize header
580 headerchunk = ''.join(header)
585 headerchunk = ''.join(header)
581 yield _pack(_fpartheadersize, len(headerchunk))
586 yield _pack(_fpartheadersize, len(headerchunk))
582 yield headerchunk
587 yield headerchunk
583 ## payload
588 ## payload
584 for chunk in self._payloadchunks():
589 for chunk in self._payloadchunks():
585 yield _pack(_fpayloadsize, len(chunk))
590 yield _pack(_fpayloadsize, len(chunk))
586 yield chunk
591 yield chunk
587 # end of payload
592 # end of payload
588 yield _pack(_fpayloadsize, 0)
593 yield _pack(_fpayloadsize, 0)
589
594
590 def _payloadchunks(self):
595 def _payloadchunks(self):
591 """yield chunks of a the part payload
596 """yield chunks of a the part payload
592
597
593 Exists to handle the different methods to provide data to a part."""
598 Exists to handle the different methods to provide data to a part."""
594 # we only support fixed size data now.
599 # we only support fixed size data now.
595 # This will be improved in the future.
600 # This will be improved in the future.
596 if util.safehasattr(self.data, 'next'):
601 if util.safehasattr(self.data, 'next'):
597 buff = util.chunkbuffer(self.data)
602 buff = util.chunkbuffer(self.data)
598 chunk = buff.read(preferedchunksize)
603 chunk = buff.read(preferedchunksize)
599 while chunk:
604 while chunk:
600 yield chunk
605 yield chunk
601 chunk = buff.read(preferedchunksize)
606 chunk = buff.read(preferedchunksize)
602 elif len(self.data):
607 elif len(self.data):
603 yield self.data
608 yield self.data
604
609
605 class unbundlepart(unpackermixin):
610 class unbundlepart(unpackermixin):
606 """a bundle part read from a bundle"""
611 """a bundle part read from a bundle"""
607
612
608 def __init__(self, ui, header, fp):
613 def __init__(self, ui, header, fp):
609 super(unbundlepart, self).__init__(fp)
614 super(unbundlepart, self).__init__(fp)
610 self.ui = ui
615 self.ui = ui
611 # unbundle state attr
616 # unbundle state attr
612 self._headerdata = header
617 self._headerdata = header
613 self._headeroffset = 0
618 self._headeroffset = 0
614 self._initialized = False
619 self._initialized = False
615 self.consumed = False
620 self.consumed = False
616 # part data
621 # part data
617 self.id = None
622 self.id = None
618 self.type = None
623 self.type = None
619 self.mandatoryparams = None
624 self.mandatoryparams = None
620 self.advisoryparams = None
625 self.advisoryparams = None
621 self._payloadstream = None
626 self._payloadstream = None
622 self._readheader()
627 self._readheader()
623
628
624 def _fromheader(self, size):
629 def _fromheader(self, size):
625 """return the next <size> byte from the header"""
630 """return the next <size> byte from the header"""
626 offset = self._headeroffset
631 offset = self._headeroffset
627 data = self._headerdata[offset:(offset + size)]
632 data = self._headerdata[offset:(offset + size)]
628 self._headeroffset = offset + size
633 self._headeroffset = offset + size
629 return data
634 return data
630
635
631 def _unpackheader(self, format):
636 def _unpackheader(self, format):
632 """read given format from header
637 """read given format from header
633
638
634 This automatically compute the size of the format to read."""
639 This automatically compute the size of the format to read."""
635 data = self._fromheader(struct.calcsize(format))
640 data = self._fromheader(struct.calcsize(format))
636 return _unpack(format, data)
641 return _unpack(format, data)
637
642
638 def _readheader(self):
643 def _readheader(self):
639 """read the header and setup the object"""
644 """read the header and setup the object"""
640 typesize = self._unpackheader(_fparttypesize)[0]
645 typesize = self._unpackheader(_fparttypesize)[0]
641 self.type = self._fromheader(typesize)
646 self.type = self._fromheader(typesize)
642 self.ui.debug('part type: "%s"\n' % self.type)
647 self.ui.debug('part type: "%s"\n' % self.type)
643 self.id = self._unpackheader(_fpartid)[0]
648 self.id = self._unpackheader(_fpartid)[0]
644 self.ui.debug('part id: "%s"\n' % self.id)
649 self.ui.debug('part id: "%s"\n' % self.id)
645 ## reading parameters
650 ## reading parameters
646 # param count
651 # param count
647 mancount, advcount = self._unpackheader(_fpartparamcount)
652 mancount, advcount = self._unpackheader(_fpartparamcount)
648 self.ui.debug('part parameters: %i\n' % (mancount + advcount))
653 self.ui.debug('part parameters: %i\n' % (mancount + advcount))
649 # param size
654 # param size
650 fparamsizes = _makefpartparamsizes(mancount + advcount)
655 fparamsizes = _makefpartparamsizes(mancount + advcount)
651 paramsizes = self._unpackheader(fparamsizes)
656 paramsizes = self._unpackheader(fparamsizes)
652 # make it a list of couple again
657 # make it a list of couple again
653 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
658 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
654 # split mandatory from advisory
659 # split mandatory from advisory
655 mansizes = paramsizes[:mancount]
660 mansizes = paramsizes[:mancount]
656 advsizes = paramsizes[mancount:]
661 advsizes = paramsizes[mancount:]
657 # retrive param value
662 # retrive param value
658 manparams = []
663 manparams = []
659 for key, value in mansizes:
664 for key, value in mansizes:
660 manparams.append((self._fromheader(key), self._fromheader(value)))
665 manparams.append((self._fromheader(key), self._fromheader(value)))
661 advparams = []
666 advparams = []
662 for key, value in advsizes:
667 for key, value in advsizes:
663 advparams.append((self._fromheader(key), self._fromheader(value)))
668 advparams.append((self._fromheader(key), self._fromheader(value)))
664 self.mandatoryparams = manparams
669 self.mandatoryparams = manparams
665 self.advisoryparams = advparams
670 self.advisoryparams = advparams
666 ## part payload
671 ## part payload
667 def payloadchunks():
672 def payloadchunks():
668 payloadsize = self._unpack(_fpayloadsize)[0]
673 payloadsize = self._unpack(_fpayloadsize)[0]
669 self.ui.debug('payload chunk size: %i\n' % payloadsize)
674 self.ui.debug('payload chunk size: %i\n' % payloadsize)
670 while payloadsize:
675 while payloadsize:
671 yield self._readexact(payloadsize)
676 yield self._readexact(payloadsize)
672 payloadsize = self._unpack(_fpayloadsize)[0]
677 payloadsize = self._unpack(_fpayloadsize)[0]
673 self.ui.debug('payload chunk size: %i\n' % payloadsize)
678 self.ui.debug('payload chunk size: %i\n' % payloadsize)
674 self._payloadstream = util.chunkbuffer(payloadchunks())
679 self._payloadstream = util.chunkbuffer(payloadchunks())
675 # we read the data, tell it
680 # we read the data, tell it
676 self._initialized = True
681 self._initialized = True
677
682
678 def read(self, size=None):
683 def read(self, size=None):
679 """read payload data"""
684 """read payload data"""
680 if not self._initialized:
685 if not self._initialized:
681 self._readheader()
686 self._readheader()
682 if size is None:
687 if size is None:
683 data = self._payloadstream.read()
688 data = self._payloadstream.read()
684 else:
689 else:
685 data = self._payloadstream.read(size)
690 data = self._payloadstream.read(size)
686 if size is None or len(data) < size:
691 if size is None or len(data) < size:
687 self.consumed = True
692 self.consumed = True
688 return data
693 return data
689
694
690
695
691 @parthandler('b2x:changegroup')
696 @parthandler('b2x:changegroup')
692 def handlechangegroup(op, inpart):
697 def handlechangegroup(op, inpart):
693 """apply a changegroup part on the repo
698 """apply a changegroup part on the repo
694
699
695 This is a very early implementation that will massive rework before being
700 This is a very early implementation that will massive rework before being
696 inflicted to any end-user.
701 inflicted to any end-user.
697 """
702 """
698 # Make sure we trigger a transaction creation
703 # Make sure we trigger a transaction creation
699 #
704 #
700 # The addchangegroup function will get a transaction object by itself, but
705 # The addchangegroup function will get a transaction object by itself, but
701 # we need to make sure we trigger the creation of a transaction object used
706 # we need to make sure we trigger the creation of a transaction object used
702 # for the whole processing scope.
707 # for the whole processing scope.
703 op.gettransaction()
708 op.gettransaction()
704 cg = changegroup.unbundle10(inpart, 'UN')
709 cg = changegroup.unbundle10(inpart, 'UN')
705 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
710 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
706 op.records.add('changegroup', {'return': ret})
711 op.records.add('changegroup', {'return': ret})
707 if op.reply is not None:
712 if op.reply is not None:
708 # This is definitly not the final form of this
713 # This is definitly not the final form of this
709 # return. But one need to start somewhere.
714 # return. But one need to start somewhere.
710 part = bundlepart('b2x:reply:changegroup', (),
715 part = op.reply.newpart('b2x:reply:changegroup', (),
711 [('in-reply-to', str(inpart.id)),
716 [('in-reply-to', str(inpart.id)),
712 ('return', '%i' % ret)])
717 ('return', '%i' % ret)])
713 op.reply.addpart(part)
718 op.reply.addpart(part)
714 assert not inpart.read()
719 assert not inpart.read()
715
720
716 @parthandler('b2x:reply:changegroup')
721 @parthandler('b2x:reply:changegroup')
717 def handlechangegroup(op, inpart):
722 def handlechangegroup(op, inpart):
718 p = dict(inpart.advisoryparams)
723 p = dict(inpart.advisoryparams)
719 ret = int(p['return'])
724 ret = int(p['return'])
720 op.records.add('changegroup', {'return': ret}, int(p['in-reply-to']))
725 op.records.add('changegroup', {'return': ret}, int(p['in-reply-to']))
721
726
722 @parthandler('b2x:check:heads')
727 @parthandler('b2x:check:heads')
723 def handlechangegroup(op, inpart):
728 def handlechangegroup(op, inpart):
724 """check that head of the repo did not change
729 """check that head of the repo did not change
725
730
726 This is used to detect a push race when using unbundle.
731 This is used to detect a push race when using unbundle.
727 This replaces the "heads" argument of unbundle."""
732 This replaces the "heads" argument of unbundle."""
728 h = inpart.read(20)
733 h = inpart.read(20)
729 heads = []
734 heads = []
730 while len(h) == 20:
735 while len(h) == 20:
731 heads.append(h)
736 heads.append(h)
732 h = inpart.read(20)
737 h = inpart.read(20)
733 assert not h
738 assert not h
734 if heads != op.repo.heads():
739 if heads != op.repo.heads():
735 raise error.PushRaced('repository changed while pushing - '
740 raise error.PushRaced('repository changed while pushing - '
736 'please try again')
741 'please try again')
737
742
738 @parthandler('b2x:output')
743 @parthandler('b2x:output')
739 def handleoutput(op, inpart):
744 def handleoutput(op, inpart):
740 """forward output captured on the server to the client"""
745 """forward output captured on the server to the client"""
741 for line in inpart.read().splitlines():
746 for line in inpart.read().splitlines():
742 op.ui.write(('remote: %s\n' % line))
747 op.ui.write(('remote: %s\n' % line))
743
748
744 @parthandler('b2x:replycaps')
749 @parthandler('b2x:replycaps')
745 def handlereplycaps(op, inpart):
750 def handlereplycaps(op, inpart):
746 """Notify that a reply bundle should be created
751 """Notify that a reply bundle should be created
747
752
748 The payload contains the capabilities information for the reply"""
753 The payload contains the capabilities information for the reply"""
749 caps = decodecaps(inpart.read())
754 caps = decodecaps(inpart.read())
750 if op.reply is None:
755 if op.reply is None:
751 op.reply = bundle20(op.ui, caps)
756 op.reply = bundle20(op.ui, caps)
752
757
753 @parthandler('b2x:error:abort')
758 @parthandler('b2x:error:abort')
754 def handlereplycaps(op, inpart):
759 def handlereplycaps(op, inpart):
755 """Used to transmit abort error over the wire"""
760 """Used to transmit abort error over the wire"""
756 manargs = dict(inpart.mandatoryparams)
761 manargs = dict(inpart.mandatoryparams)
757 advargs = dict(inpart.advisoryparams)
762 advargs = dict(inpart.advisoryparams)
758 raise util.Abort(manargs['message'], hint=advargs.get('hint'))
763 raise util.Abort(manargs['message'], hint=advargs.get('hint'))
759
764
760 @parthandler('b2x:error:unknownpart')
765 @parthandler('b2x:error:unknownpart')
761 def handlereplycaps(op, inpart):
766 def handlereplycaps(op, inpart):
762 """Used to transmit unknown part error over the wire"""
767 """Used to transmit unknown part error over the wire"""
763 manargs = dict(inpart.mandatoryparams)
768 manargs = dict(inpart.mandatoryparams)
764 raise UnknownPartError(manargs['parttype'])
769 raise UnknownPartError(manargs['parttype'])
765
770
766 @parthandler('b2x:error:pushraced')
771 @parthandler('b2x:error:pushraced')
767 def handlereplycaps(op, inpart):
772 def handlereplycaps(op, inpart):
768 """Used to transmit push race error over the wire"""
773 """Used to transmit push race error over the wire"""
769 manargs = dict(inpart.mandatoryparams)
774 manargs = dict(inpart.mandatoryparams)
770 raise error.ResponseError(_('push failed:'), manargs['message'])
775 raise error.ResponseError(_('push failed:'), manargs['message'])
General Comments 0
You need to be logged in to leave comments. Login now