##// END OF EJS Templates
bundle2: use a more specific UnknownPartError when no handler is found...
Pierre-Yves David -
r21179:372f4772 stable
parent child Browse files
Show More
@@ -1,751 +1,755 b''
1 # bundle2.py - generic container format to transmit arbitrary data.
1 # bundle2.py - generic container format to transmit arbitrary data.
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7 """Handling of the new bundle2 format
7 """Handling of the new bundle2 format
8
8
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 payloads in an application agnostic way. It consist in a sequence of "parts"
10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 that will be handed to and processed by the application layer.
11 that will be handed to and processed by the application layer.
12
12
13
13
14 General format architecture
14 General format architecture
15 ===========================
15 ===========================
16
16
17 The format is architectured as follow
17 The format is architectured as follow
18
18
19 - magic string
19 - magic string
20 - stream level parameters
20 - stream level parameters
21 - payload parts (any number)
21 - payload parts (any number)
22 - end of stream marker.
22 - end of stream marker.
23
23
24 the Binary format
24 the Binary format
25 ============================
25 ============================
26
26
27 All numbers are unsigned and big-endian.
27 All numbers are unsigned and big-endian.
28
28
29 stream level parameters
29 stream level parameters
30 ------------------------
30 ------------------------
31
31
32 Binary format is as follow
32 Binary format is as follow
33
33
34 :params size: (16 bits integer)
34 :params size: (16 bits integer)
35
35
36 The total number of Bytes used by the parameters
36 The total number of Bytes used by the parameters
37
37
38 :params value: arbitrary number of Bytes
38 :params value: arbitrary number of Bytes
39
39
40 A blob of `params size` containing the serialized version of all stream level
40 A blob of `params size` containing the serialized version of all stream level
41 parameters.
41 parameters.
42
42
43 The blob contains a space separated list of parameters. Parameters with value
43 The blob contains a space separated list of parameters. Parameters with value
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45
45
46 Empty name are obviously forbidden.
46 Empty name are obviously forbidden.
47
47
48 Name MUST start with a letter. If this first letter is lower case, the
48 Name MUST start with a letter. If this first letter is lower case, the
49 parameter is advisory and can be safely ignored. However when the first
49 parameter is advisory and can be safely ignored. However when the first
50 letter is capital, the parameter is mandatory and the bundling process MUST
50 letter is capital, the parameter is mandatory and the bundling process MUST
51 stop if he is not able to proceed it.
51 stop if he is not able to proceed it.
52
52
53 Stream parameters use a simple textual format for two main reasons:
53 Stream parameters use a simple textual format for two main reasons:
54
54
55 - Stream level parameters should remain simple and we want to discourage any
55 - Stream level parameters should remain simple and we want to discourage any
56 crazy usage.
56 crazy usage.
57 - Textual data allow easy human inspection of a bundle2 header in case of
57 - Textual data allow easy human inspection of a bundle2 header in case of
58 troubles.
58 troubles.
59
59
60 Any Applicative level options MUST go into a bundle2 part instead.
60 Any Applicative level options MUST go into a bundle2 part instead.
61
61
62 Payload part
62 Payload part
63 ------------------------
63 ------------------------
64
64
65 Binary format is as follow
65 Binary format is as follow
66
66
67 :header size: (16 bits inter)
67 :header size: (16 bits inter)
68
68
69 The total number of Bytes used by the part headers. When the header is empty
69 The total number of Bytes used by the part headers. When the header is empty
70 (size = 0) this is interpreted as the end of stream marker.
70 (size = 0) this is interpreted as the end of stream marker.
71
71
72 :header:
72 :header:
73
73
74 The header defines how to interpret the part. It contains two piece of
74 The header defines how to interpret the part. It contains two piece of
75 data: the part type, and the part parameters.
75 data: the part type, and the part parameters.
76
76
77 The part type is used to route an application level handler, that can
77 The part type is used to route an application level handler, that can
78 interpret payload.
78 interpret payload.
79
79
80 Part parameters are passed to the application level handler. They are
80 Part parameters are passed to the application level handler. They are
81 meant to convey information that will help the application level object to
81 meant to convey information that will help the application level object to
82 interpret the part payload.
82 interpret the part payload.
83
83
84 The binary format of the header is has follow
84 The binary format of the header is has follow
85
85
86 :typesize: (one byte)
86 :typesize: (one byte)
87
87
88 :parttype: alphanumerical part name
88 :parttype: alphanumerical part name
89
89
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 to this part.
91 to this part.
92
92
93 :parameters:
93 :parameters:
94
94
95 Part's parameter may have arbitrary content, the binary structure is::
95 Part's parameter may have arbitrary content, the binary structure is::
96
96
97 <mandatory-count><advisory-count><param-sizes><param-data>
97 <mandatory-count><advisory-count><param-sizes><param-data>
98
98
99 :mandatory-count: 1 byte, number of mandatory parameters
99 :mandatory-count: 1 byte, number of mandatory parameters
100
100
101 :advisory-count: 1 byte, number of advisory parameters
101 :advisory-count: 1 byte, number of advisory parameters
102
102
103 :param-sizes:
103 :param-sizes:
104
104
105 N couple of bytes, where N is the total number of parameters. Each
105 N couple of bytes, where N is the total number of parameters. Each
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107
107
108 :param-data:
108 :param-data:
109
109
110 A blob of bytes from which each parameter key and value can be
110 A blob of bytes from which each parameter key and value can be
111 retrieved using the list of size couples stored in the previous
111 retrieved using the list of size couples stored in the previous
112 field.
112 field.
113
113
114 Mandatory parameters comes first, then the advisory ones.
114 Mandatory parameters comes first, then the advisory ones.
115
115
116 :payload:
116 :payload:
117
117
118 payload is a series of `<chunksize><chunkdata>`.
118 payload is a series of `<chunksize><chunkdata>`.
119
119
120 `chunksize` is a 32 bits integer, `chunkdata` are plain bytes (as much as
120 `chunksize` is a 32 bits integer, `chunkdata` are plain bytes (as much as
121 `chunksize` says)` The payload part is concluded by a zero size chunk.
121 `chunksize` says)` The payload part is concluded by a zero size chunk.
122
122
123 The current implementation always produces either zero or one chunk.
123 The current implementation always produces either zero or one chunk.
124 This is an implementation limitation that will ultimately be lifted.
124 This is an implementation limitation that will ultimately be lifted.
125
125
126 Bundle processing
126 Bundle processing
127 ============================
127 ============================
128
128
129 Each part is processed in order using a "part handler". Handler are registered
129 Each part is processed in order using a "part handler". Handler are registered
130 for a certain part type.
130 for a certain part type.
131
131
132 The matching of a part to its handler is case insensitive. The case of the
132 The matching of a part to its handler is case insensitive. The case of the
133 part type is used to know if a part is mandatory or advisory. If the Part type
133 part type is used to know if a part is mandatory or advisory. If the Part type
134 contains any uppercase char it is considered mandatory. When no handler is
134 contains any uppercase char it is considered mandatory. When no handler is
135 known for a Mandatory part, the process is aborted and an exception is raised.
135 known for a Mandatory part, the process is aborted and an exception is raised.
136 If the part is advisory and no handler is known, the part is ignored. When the
136 If the part is advisory and no handler is known, the part is ignored. When the
137 process is aborted, the full bundle is still read from the stream to keep the
137 process is aborted, the full bundle is still read from the stream to keep the
138 channel usable. But none of the part read from an abort are processed. In the
138 channel usable. But none of the part read from an abort are processed. In the
139 future, dropping the stream may become an option for channel we do not care to
139 future, dropping the stream may become an option for channel we do not care to
140 preserve.
140 preserve.
141 """
141 """
142
142
143 import util
143 import util
144 import struct
144 import struct
145 import urllib
145 import urllib
146 import string
146 import string
147
147
148 import changegroup
148 import changegroup
149 from i18n import _
149 from i18n import _
150
150
151 _pack = struct.pack
151 _pack = struct.pack
152 _unpack = struct.unpack
152 _unpack = struct.unpack
153
153
154 _magicstring = 'HG2X'
154 _magicstring = 'HG2X'
155
155
156 _fstreamparamsize = '>H'
156 _fstreamparamsize = '>H'
157 _fpartheadersize = '>H'
157 _fpartheadersize = '>H'
158 _fparttypesize = '>B'
158 _fparttypesize = '>B'
159 _fpartid = '>I'
159 _fpartid = '>I'
160 _fpayloadsize = '>I'
160 _fpayloadsize = '>I'
161 _fpartparamcount = '>BB'
161 _fpartparamcount = '>BB'
162
162
163 preferedchunksize = 4096
163 preferedchunksize = 4096
164
164
165 def _makefpartparamsizes(nbparams):
165 def _makefpartparamsizes(nbparams):
166 """return a struct format to read part parameter sizes
166 """return a struct format to read part parameter sizes
167
167
168 The number parameters is variable so we need to build that format
168 The number parameters is variable so we need to build that format
169 dynamically.
169 dynamically.
170 """
170 """
171 return '>'+('BB'*nbparams)
171 return '>'+('BB'*nbparams)
172
172
173 class UnknownPartError(KeyError):
174 """error raised when no handler is found for a Mandatory part"""
175 pass
176
173 parthandlermapping = {}
177 parthandlermapping = {}
174
178
175 def parthandler(parttype):
179 def parthandler(parttype):
176 """decorator that register a function as a bundle2 part handler
180 """decorator that register a function as a bundle2 part handler
177
181
178 eg::
182 eg::
179
183
180 @parthandler('myparttype')
184 @parthandler('myparttype')
181 def myparttypehandler(...):
185 def myparttypehandler(...):
182 '''process a part of type "my part".'''
186 '''process a part of type "my part".'''
183 ...
187 ...
184 """
188 """
185 def _decorator(func):
189 def _decorator(func):
186 lparttype = parttype.lower() # enforce lower case matching.
190 lparttype = parttype.lower() # enforce lower case matching.
187 assert lparttype not in parthandlermapping
191 assert lparttype not in parthandlermapping
188 parthandlermapping[lparttype] = func
192 parthandlermapping[lparttype] = func
189 return func
193 return func
190 return _decorator
194 return _decorator
191
195
192 class unbundlerecords(object):
196 class unbundlerecords(object):
193 """keep record of what happens during and unbundle
197 """keep record of what happens during and unbundle
194
198
195 New records are added using `records.add('cat', obj)`. Where 'cat' is a
199 New records are added using `records.add('cat', obj)`. Where 'cat' is a
196 category of record and obj is an arbitrary object.
200 category of record and obj is an arbitrary object.
197
201
198 `records['cat']` will return all entries of this category 'cat'.
202 `records['cat']` will return all entries of this category 'cat'.
199
203
200 Iterating on the object itself will yield `('category', obj)` tuples
204 Iterating on the object itself will yield `('category', obj)` tuples
201 for all entries.
205 for all entries.
202
206
203 All iterations happens in chronological order.
207 All iterations happens in chronological order.
204 """
208 """
205
209
206 def __init__(self):
210 def __init__(self):
207 self._categories = {}
211 self._categories = {}
208 self._sequences = []
212 self._sequences = []
209 self._replies = {}
213 self._replies = {}
210
214
211 def add(self, category, entry, inreplyto=None):
215 def add(self, category, entry, inreplyto=None):
212 """add a new record of a given category.
216 """add a new record of a given category.
213
217
214 The entry can then be retrieved in the list returned by
218 The entry can then be retrieved in the list returned by
215 self['category']."""
219 self['category']."""
216 self._categories.setdefault(category, []).append(entry)
220 self._categories.setdefault(category, []).append(entry)
217 self._sequences.append((category, entry))
221 self._sequences.append((category, entry))
218 if inreplyto is not None:
222 if inreplyto is not None:
219 self.getreplies(inreplyto).add(category, entry)
223 self.getreplies(inreplyto).add(category, entry)
220
224
221 def getreplies(self, partid):
225 def getreplies(self, partid):
222 """get the subrecords that replies to a specific part"""
226 """get the subrecords that replies to a specific part"""
223 return self._replies.setdefault(partid, unbundlerecords())
227 return self._replies.setdefault(partid, unbundlerecords())
224
228
225 def __getitem__(self, cat):
229 def __getitem__(self, cat):
226 return tuple(self._categories.get(cat, ()))
230 return tuple(self._categories.get(cat, ()))
227
231
228 def __iter__(self):
232 def __iter__(self):
229 return iter(self._sequences)
233 return iter(self._sequences)
230
234
231 def __len__(self):
235 def __len__(self):
232 return len(self._sequences)
236 return len(self._sequences)
233
237
234 def __nonzero__(self):
238 def __nonzero__(self):
235 return bool(self._sequences)
239 return bool(self._sequences)
236
240
237 class bundleoperation(object):
241 class bundleoperation(object):
238 """an object that represents a single bundling process
242 """an object that represents a single bundling process
239
243
240 Its purpose is to carry unbundle-related objects and states.
244 Its purpose is to carry unbundle-related objects and states.
241
245
242 A new object should be created at the beginning of each bundle processing.
246 A new object should be created at the beginning of each bundle processing.
243 The object is to be returned by the processing function.
247 The object is to be returned by the processing function.
244
248
245 The object has very little content now it will ultimately contain:
249 The object has very little content now it will ultimately contain:
246 * an access to the repo the bundle is applied to,
250 * an access to the repo the bundle is applied to,
247 * a ui object,
251 * a ui object,
248 * a way to retrieve a transaction to add changes to the repo,
252 * a way to retrieve a transaction to add changes to the repo,
249 * a way to record the result of processing each part,
253 * a way to record the result of processing each part,
250 * a way to construct a bundle response when applicable.
254 * a way to construct a bundle response when applicable.
251 """
255 """
252
256
253 def __init__(self, repo, transactiongetter):
257 def __init__(self, repo, transactiongetter):
254 self.repo = repo
258 self.repo = repo
255 self.ui = repo.ui
259 self.ui = repo.ui
256 self.records = unbundlerecords()
260 self.records = unbundlerecords()
257 self.gettransaction = transactiongetter
261 self.gettransaction = transactiongetter
258 self.reply = None
262 self.reply = None
259
263
260 class TransactionUnavailable(RuntimeError):
264 class TransactionUnavailable(RuntimeError):
261 pass
265 pass
262
266
263 def _notransaction():
267 def _notransaction():
264 """default method to get a transaction while processing a bundle
268 """default method to get a transaction while processing a bundle
265
269
266 Raise an exception to highlight the fact that no transaction was expected
270 Raise an exception to highlight the fact that no transaction was expected
267 to be created"""
271 to be created"""
268 raise TransactionUnavailable()
272 raise TransactionUnavailable()
269
273
270 def processbundle(repo, unbundler, transactiongetter=_notransaction):
274 def processbundle(repo, unbundler, transactiongetter=_notransaction):
271 """This function process a bundle, apply effect to/from a repo
275 """This function process a bundle, apply effect to/from a repo
272
276
273 It iterates over each part then searches for and uses the proper handling
277 It iterates over each part then searches for and uses the proper handling
274 code to process the part. Parts are processed in order.
278 code to process the part. Parts are processed in order.
275
279
276 This is very early version of this function that will be strongly reworked
280 This is very early version of this function that will be strongly reworked
277 before final usage.
281 before final usage.
278
282
279 Unknown Mandatory part will abort the process.
283 Unknown Mandatory part will abort the process.
280 """
284 """
281 op = bundleoperation(repo, transactiongetter)
285 op = bundleoperation(repo, transactiongetter)
282 # todo:
286 # todo:
283 # - replace this is a init function soon.
287 # - replace this is a init function soon.
284 # - exception catching
288 # - exception catching
285 unbundler.params
289 unbundler.params
286 iterparts = unbundler.iterparts()
290 iterparts = unbundler.iterparts()
287 part = None
291 part = None
288 try:
292 try:
289 for part in iterparts:
293 for part in iterparts:
290 parttype = part.type
294 parttype = part.type
291 # part key are matched lower case
295 # part key are matched lower case
292 key = parttype.lower()
296 key = parttype.lower()
293 try:
297 try:
294 handler = parthandlermapping[key]
298 handler = parthandlermapping[key]
295 op.ui.debug('found a handler for part %r\n' % parttype)
299 op.ui.debug('found a handler for part %r\n' % parttype)
296 except KeyError:
300 except KeyError:
297 if key != parttype: # mandatory parts
301 if key != parttype: # mandatory parts
298 # todo:
302 # todo:
299 # - use a more precise exception
303 # - use a more precise exception
300 raise
304 raise UnknownPartError(key)
301 op.ui.debug('ignoring unknown advisory part %r\n' % key)
305 op.ui.debug('ignoring unknown advisory part %r\n' % key)
302 # consuming the part
306 # consuming the part
303 part.read()
307 part.read()
304 continue
308 continue
305
309
306 # handler is called outside the above try block so that we don't
310 # handler is called outside the above try block so that we don't
307 # risk catching KeyErrors from anything other than the
311 # risk catching KeyErrors from anything other than the
308 # parthandlermapping lookup (any KeyError raised by handler()
312 # parthandlermapping lookup (any KeyError raised by handler()
309 # itself represents a defect of a different variety).
313 # itself represents a defect of a different variety).
310 output = None
314 output = None
311 if op.reply is not None:
315 if op.reply is not None:
312 op.ui.pushbuffer(error=True)
316 op.ui.pushbuffer(error=True)
313 output = ''
317 output = ''
314 try:
318 try:
315 handler(op, part)
319 handler(op, part)
316 finally:
320 finally:
317 if output is not None:
321 if output is not None:
318 output = op.ui.popbuffer()
322 output = op.ui.popbuffer()
319 if output:
323 if output:
320 outpart = bundlepart('b2x:output',
324 outpart = bundlepart('b2x:output',
321 advisoryparams=[('in-reply-to',
325 advisoryparams=[('in-reply-to',
322 str(part.id))],
326 str(part.id))],
323 data=output)
327 data=output)
324 op.reply.addpart(outpart)
328 op.reply.addpart(outpart)
325 part.read()
329 part.read()
326 except Exception, exc:
330 except Exception, exc:
327 if part is not None:
331 if part is not None:
328 # consume the bundle content
332 # consume the bundle content
329 part.read()
333 part.read()
330 for part in iterparts:
334 for part in iterparts:
331 # consume the bundle content
335 # consume the bundle content
332 part.read()
336 part.read()
333 # Small hack to let caller code distinguish exceptions from bundle2
337 # Small hack to let caller code distinguish exceptions from bundle2
334 # processing fron the ones from bundle1 processing. This is mostly
338 # processing fron the ones from bundle1 processing. This is mostly
335 # needed to handle different return codes to unbundle according to the
339 # needed to handle different return codes to unbundle according to the
336 # type of bundle. We should probably clean up or drop this return code
340 # type of bundle. We should probably clean up or drop this return code
337 # craziness in a future version.
341 # craziness in a future version.
338 exc.duringunbundle2 = True
342 exc.duringunbundle2 = True
339 raise
343 raise
340 return op
344 return op
341
345
342 def decodecaps(blob):
346 def decodecaps(blob):
343 """decode a bundle2 caps bytes blob into a dictionnary
347 """decode a bundle2 caps bytes blob into a dictionnary
344
348
345 The blob is a list of capabilities (one per line)
349 The blob is a list of capabilities (one per line)
346 Capabilities may have values using a line of the form::
350 Capabilities may have values using a line of the form::
347
351
348 capability=value1,value2,value3
352 capability=value1,value2,value3
349
353
350 The values are always a list."""
354 The values are always a list."""
351 caps = {}
355 caps = {}
352 for line in blob.splitlines():
356 for line in blob.splitlines():
353 if not line:
357 if not line:
354 continue
358 continue
355 if '=' not in line:
359 if '=' not in line:
356 key, vals = line, ()
360 key, vals = line, ()
357 else:
361 else:
358 key, vals = line.split('=', 1)
362 key, vals = line.split('=', 1)
359 vals = vals.split(',')
363 vals = vals.split(',')
360 key = urllib.unquote(key)
364 key = urllib.unquote(key)
361 vals = [urllib.unquote(v) for v in vals]
365 vals = [urllib.unquote(v) for v in vals]
362 caps[key] = vals
366 caps[key] = vals
363 return caps
367 return caps
364
368
365 def encodecaps(caps):
369 def encodecaps(caps):
366 """encode a bundle2 caps dictionary into a bytes blob"""
370 """encode a bundle2 caps dictionary into a bytes blob"""
367 chunks = []
371 chunks = []
368 for ca in sorted(caps):
372 for ca in sorted(caps):
369 vals = caps[ca]
373 vals = caps[ca]
370 ca = urllib.quote(ca)
374 ca = urllib.quote(ca)
371 vals = [urllib.quote(v) for v in vals]
375 vals = [urllib.quote(v) for v in vals]
372 if vals:
376 if vals:
373 ca = "%s=%s" % (ca, ','.join(vals))
377 ca = "%s=%s" % (ca, ','.join(vals))
374 chunks.append(ca)
378 chunks.append(ca)
375 return '\n'.join(chunks)
379 return '\n'.join(chunks)
376
380
377 class bundle20(object):
381 class bundle20(object):
378 """represent an outgoing bundle2 container
382 """represent an outgoing bundle2 container
379
383
380 Use the `addparam` method to add stream level parameter. and `addpart` to
384 Use the `addparam` method to add stream level parameter. and `addpart` to
381 populate it. Then call `getchunks` to retrieve all the binary chunks of
385 populate it. Then call `getchunks` to retrieve all the binary chunks of
382 data that compose the bundle2 container."""
386 data that compose the bundle2 container."""
383
387
384 def __init__(self, ui, capabilities=()):
388 def __init__(self, ui, capabilities=()):
385 self.ui = ui
389 self.ui = ui
386 self._params = []
390 self._params = []
387 self._parts = []
391 self._parts = []
388 self.capabilities = dict(capabilities)
392 self.capabilities = dict(capabilities)
389
393
390 def addparam(self, name, value=None):
394 def addparam(self, name, value=None):
391 """add a stream level parameter"""
395 """add a stream level parameter"""
392 if not name:
396 if not name:
393 raise ValueError('empty parameter name')
397 raise ValueError('empty parameter name')
394 if name[0] not in string.letters:
398 if name[0] not in string.letters:
395 raise ValueError('non letter first character: %r' % name)
399 raise ValueError('non letter first character: %r' % name)
396 self._params.append((name, value))
400 self._params.append((name, value))
397
401
398 def addpart(self, part):
402 def addpart(self, part):
399 """add a new part to the bundle2 container
403 """add a new part to the bundle2 container
400
404
401 Parts contains the actual applicative payload."""
405 Parts contains the actual applicative payload."""
402 assert part.id is None
406 assert part.id is None
403 part.id = len(self._parts) # very cheap counter
407 part.id = len(self._parts) # very cheap counter
404 self._parts.append(part)
408 self._parts.append(part)
405
409
406 def getchunks(self):
410 def getchunks(self):
407 self.ui.debug('start emission of %s stream\n' % _magicstring)
411 self.ui.debug('start emission of %s stream\n' % _magicstring)
408 yield _magicstring
412 yield _magicstring
409 param = self._paramchunk()
413 param = self._paramchunk()
410 self.ui.debug('bundle parameter: %s\n' % param)
414 self.ui.debug('bundle parameter: %s\n' % param)
411 yield _pack(_fstreamparamsize, len(param))
415 yield _pack(_fstreamparamsize, len(param))
412 if param:
416 if param:
413 yield param
417 yield param
414
418
415 self.ui.debug('start of parts\n')
419 self.ui.debug('start of parts\n')
416 for part in self._parts:
420 for part in self._parts:
417 self.ui.debug('bundle part: "%s"\n' % part.type)
421 self.ui.debug('bundle part: "%s"\n' % part.type)
418 for chunk in part.getchunks():
422 for chunk in part.getchunks():
419 yield chunk
423 yield chunk
420 self.ui.debug('end of bundle\n')
424 self.ui.debug('end of bundle\n')
421 yield '\0\0'
425 yield '\0\0'
422
426
423 def _paramchunk(self):
427 def _paramchunk(self):
424 """return a encoded version of all stream parameters"""
428 """return a encoded version of all stream parameters"""
425 blocks = []
429 blocks = []
426 for par, value in self._params:
430 for par, value in self._params:
427 par = urllib.quote(par)
431 par = urllib.quote(par)
428 if value is not None:
432 if value is not None:
429 value = urllib.quote(value)
433 value = urllib.quote(value)
430 par = '%s=%s' % (par, value)
434 par = '%s=%s' % (par, value)
431 blocks.append(par)
435 blocks.append(par)
432 return ' '.join(blocks)
436 return ' '.join(blocks)
433
437
434 class unpackermixin(object):
438 class unpackermixin(object):
435 """A mixin to extract bytes and struct data from a stream"""
439 """A mixin to extract bytes and struct data from a stream"""
436
440
437 def __init__(self, fp):
441 def __init__(self, fp):
438 self._fp = fp
442 self._fp = fp
439
443
440 def _unpack(self, format):
444 def _unpack(self, format):
441 """unpack this struct format from the stream"""
445 """unpack this struct format from the stream"""
442 data = self._readexact(struct.calcsize(format))
446 data = self._readexact(struct.calcsize(format))
443 return _unpack(format, data)
447 return _unpack(format, data)
444
448
445 def _readexact(self, size):
449 def _readexact(self, size):
446 """read exactly <size> bytes from the stream"""
450 """read exactly <size> bytes from the stream"""
447 return changegroup.readexactly(self._fp, size)
451 return changegroup.readexactly(self._fp, size)
448
452
449
453
450 class unbundle20(unpackermixin):
454 class unbundle20(unpackermixin):
451 """interpret a bundle2 stream
455 """interpret a bundle2 stream
452
456
453 This class is fed with a binary stream and yields parts through its
457 This class is fed with a binary stream and yields parts through its
454 `iterparts` methods."""
458 `iterparts` methods."""
455
459
456 def __init__(self, ui, fp, header=None):
460 def __init__(self, ui, fp, header=None):
457 """If header is specified, we do not read it out of the stream."""
461 """If header is specified, we do not read it out of the stream."""
458 self.ui = ui
462 self.ui = ui
459 super(unbundle20, self).__init__(fp)
463 super(unbundle20, self).__init__(fp)
460 if header is None:
464 if header is None:
461 header = self._readexact(4)
465 header = self._readexact(4)
462 magic, version = header[0:2], header[2:4]
466 magic, version = header[0:2], header[2:4]
463 if magic != 'HG':
467 if magic != 'HG':
464 raise util.Abort(_('not a Mercurial bundle'))
468 raise util.Abort(_('not a Mercurial bundle'))
465 if version != '2X':
469 if version != '2X':
466 raise util.Abort(_('unknown bundle version %s') % version)
470 raise util.Abort(_('unknown bundle version %s') % version)
467 self.ui.debug('start processing of %s stream\n' % header)
471 self.ui.debug('start processing of %s stream\n' % header)
468
472
469 @util.propertycache
473 @util.propertycache
470 def params(self):
474 def params(self):
471 """dictionary of stream level parameters"""
475 """dictionary of stream level parameters"""
472 self.ui.debug('reading bundle2 stream parameters\n')
476 self.ui.debug('reading bundle2 stream parameters\n')
473 params = {}
477 params = {}
474 paramssize = self._unpack(_fstreamparamsize)[0]
478 paramssize = self._unpack(_fstreamparamsize)[0]
475 if paramssize:
479 if paramssize:
476 for p in self._readexact(paramssize).split(' '):
480 for p in self._readexact(paramssize).split(' '):
477 p = p.split('=', 1)
481 p = p.split('=', 1)
478 p = [urllib.unquote(i) for i in p]
482 p = [urllib.unquote(i) for i in p]
479 if len(p) < 2:
483 if len(p) < 2:
480 p.append(None)
484 p.append(None)
481 self._processparam(*p)
485 self._processparam(*p)
482 params[p[0]] = p[1]
486 params[p[0]] = p[1]
483 return params
487 return params
484
488
485 def _processparam(self, name, value):
489 def _processparam(self, name, value):
486 """process a parameter, applying its effect if needed
490 """process a parameter, applying its effect if needed
487
491
488 Parameter starting with a lower case letter are advisory and will be
492 Parameter starting with a lower case letter are advisory and will be
489 ignored when unknown. Those starting with an upper case letter are
493 ignored when unknown. Those starting with an upper case letter are
490 mandatory and will this function will raise a KeyError when unknown.
494 mandatory and will this function will raise a KeyError when unknown.
491
495
492 Note: no option are currently supported. Any input will be either
496 Note: no option are currently supported. Any input will be either
493 ignored or failing.
497 ignored or failing.
494 """
498 """
495 if not name:
499 if not name:
496 raise ValueError('empty parameter name')
500 raise ValueError('empty parameter name')
497 if name[0] not in string.letters:
501 if name[0] not in string.letters:
498 raise ValueError('non letter first character: %r' % name)
502 raise ValueError('non letter first character: %r' % name)
499 # Some logic will be later added here to try to process the option for
503 # Some logic will be later added here to try to process the option for
500 # a dict of known parameter.
504 # a dict of known parameter.
501 if name[0].islower():
505 if name[0].islower():
502 self.ui.debug("ignoring unknown parameter %r\n" % name)
506 self.ui.debug("ignoring unknown parameter %r\n" % name)
503 else:
507 else:
504 raise KeyError(name)
508 raise KeyError(name)
505
509
506
510
507 def iterparts(self):
511 def iterparts(self):
508 """yield all parts contained in the stream"""
512 """yield all parts contained in the stream"""
509 # make sure param have been loaded
513 # make sure param have been loaded
510 self.params
514 self.params
511 self.ui.debug('start extraction of bundle2 parts\n')
515 self.ui.debug('start extraction of bundle2 parts\n')
512 headerblock = self._readpartheader()
516 headerblock = self._readpartheader()
513 while headerblock is not None:
517 while headerblock is not None:
514 part = unbundlepart(self.ui, headerblock, self._fp)
518 part = unbundlepart(self.ui, headerblock, self._fp)
515 yield part
519 yield part
516 headerblock = self._readpartheader()
520 headerblock = self._readpartheader()
517 self.ui.debug('end of bundle2 stream\n')
521 self.ui.debug('end of bundle2 stream\n')
518
522
519 def _readpartheader(self):
523 def _readpartheader(self):
520 """reads a part header size and return the bytes blob
524 """reads a part header size and return the bytes blob
521
525
522 returns None if empty"""
526 returns None if empty"""
523 headersize = self._unpack(_fpartheadersize)[0]
527 headersize = self._unpack(_fpartheadersize)[0]
524 self.ui.debug('part header size: %i\n' % headersize)
528 self.ui.debug('part header size: %i\n' % headersize)
525 if headersize:
529 if headersize:
526 return self._readexact(headersize)
530 return self._readexact(headersize)
527 return None
531 return None
528
532
529
533
530 class bundlepart(object):
534 class bundlepart(object):
531 """A bundle2 part contains application level payload
535 """A bundle2 part contains application level payload
532
536
533 The part `type` is used to route the part to the application level
537 The part `type` is used to route the part to the application level
534 handler.
538 handler.
535 """
539 """
536
540
537 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
541 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
538 data=''):
542 data=''):
539 self.id = None
543 self.id = None
540 self.type = parttype
544 self.type = parttype
541 self.data = data
545 self.data = data
542 self.mandatoryparams = mandatoryparams
546 self.mandatoryparams = mandatoryparams
543 self.advisoryparams = advisoryparams
547 self.advisoryparams = advisoryparams
544
548
545 def getchunks(self):
549 def getchunks(self):
546 #### header
550 #### header
547 ## parttype
551 ## parttype
548 header = [_pack(_fparttypesize, len(self.type)),
552 header = [_pack(_fparttypesize, len(self.type)),
549 self.type, _pack(_fpartid, self.id),
553 self.type, _pack(_fpartid, self.id),
550 ]
554 ]
551 ## parameters
555 ## parameters
552 # count
556 # count
553 manpar = self.mandatoryparams
557 manpar = self.mandatoryparams
554 advpar = self.advisoryparams
558 advpar = self.advisoryparams
555 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
559 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
556 # size
560 # size
557 parsizes = []
561 parsizes = []
558 for key, value in manpar:
562 for key, value in manpar:
559 parsizes.append(len(key))
563 parsizes.append(len(key))
560 parsizes.append(len(value))
564 parsizes.append(len(value))
561 for key, value in advpar:
565 for key, value in advpar:
562 parsizes.append(len(key))
566 parsizes.append(len(key))
563 parsizes.append(len(value))
567 parsizes.append(len(value))
564 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
568 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
565 header.append(paramsizes)
569 header.append(paramsizes)
566 # key, value
570 # key, value
567 for key, value in manpar:
571 for key, value in manpar:
568 header.append(key)
572 header.append(key)
569 header.append(value)
573 header.append(value)
570 for key, value in advpar:
574 for key, value in advpar:
571 header.append(key)
575 header.append(key)
572 header.append(value)
576 header.append(value)
573 ## finalize header
577 ## finalize header
574 headerchunk = ''.join(header)
578 headerchunk = ''.join(header)
575 yield _pack(_fpartheadersize, len(headerchunk))
579 yield _pack(_fpartheadersize, len(headerchunk))
576 yield headerchunk
580 yield headerchunk
577 ## payload
581 ## payload
578 for chunk in self._payloadchunks():
582 for chunk in self._payloadchunks():
579 yield _pack(_fpayloadsize, len(chunk))
583 yield _pack(_fpayloadsize, len(chunk))
580 yield chunk
584 yield chunk
581 # end of payload
585 # end of payload
582 yield _pack(_fpayloadsize, 0)
586 yield _pack(_fpayloadsize, 0)
583
587
584 def _payloadchunks(self):
588 def _payloadchunks(self):
585 """yield chunks of a the part payload
589 """yield chunks of a the part payload
586
590
587 Exists to handle the different methods to provide data to a part."""
591 Exists to handle the different methods to provide data to a part."""
588 # we only support fixed size data now.
592 # we only support fixed size data now.
589 # This will be improved in the future.
593 # This will be improved in the future.
590 if util.safehasattr(self.data, 'next'):
594 if util.safehasattr(self.data, 'next'):
591 buff = util.chunkbuffer(self.data)
595 buff = util.chunkbuffer(self.data)
592 chunk = buff.read(preferedchunksize)
596 chunk = buff.read(preferedchunksize)
593 while chunk:
597 while chunk:
594 yield chunk
598 yield chunk
595 chunk = buff.read(preferedchunksize)
599 chunk = buff.read(preferedchunksize)
596 elif len(self.data):
600 elif len(self.data):
597 yield self.data
601 yield self.data
598
602
599 class unbundlepart(unpackermixin):
603 class unbundlepart(unpackermixin):
600 """a bundle part read from a bundle"""
604 """a bundle part read from a bundle"""
601
605
602 def __init__(self, ui, header, fp):
606 def __init__(self, ui, header, fp):
603 super(unbundlepart, self).__init__(fp)
607 super(unbundlepart, self).__init__(fp)
604 self.ui = ui
608 self.ui = ui
605 # unbundle state attr
609 # unbundle state attr
606 self._headerdata = header
610 self._headerdata = header
607 self._headeroffset = 0
611 self._headeroffset = 0
608 self._initialized = False
612 self._initialized = False
609 self.consumed = False
613 self.consumed = False
610 # part data
614 # part data
611 self.id = None
615 self.id = None
612 self.type = None
616 self.type = None
613 self.mandatoryparams = None
617 self.mandatoryparams = None
614 self.advisoryparams = None
618 self.advisoryparams = None
615 self._payloadstream = None
619 self._payloadstream = None
616 self._readheader()
620 self._readheader()
617
621
618 def _fromheader(self, size):
622 def _fromheader(self, size):
619 """return the next <size> byte from the header"""
623 """return the next <size> byte from the header"""
620 offset = self._headeroffset
624 offset = self._headeroffset
621 data = self._headerdata[offset:(offset + size)]
625 data = self._headerdata[offset:(offset + size)]
622 self._headeroffset = offset + size
626 self._headeroffset = offset + size
623 return data
627 return data
624
628
625 def _unpackheader(self, format):
629 def _unpackheader(self, format):
626 """read given format from header
630 """read given format from header
627
631
628 This automatically compute the size of the format to read."""
632 This automatically compute the size of the format to read."""
629 data = self._fromheader(struct.calcsize(format))
633 data = self._fromheader(struct.calcsize(format))
630 return _unpack(format, data)
634 return _unpack(format, data)
631
635
632 def _readheader(self):
636 def _readheader(self):
633 """read the header and setup the object"""
637 """read the header and setup the object"""
634 typesize = self._unpackheader(_fparttypesize)[0]
638 typesize = self._unpackheader(_fparttypesize)[0]
635 self.type = self._fromheader(typesize)
639 self.type = self._fromheader(typesize)
636 self.ui.debug('part type: "%s"\n' % self.type)
640 self.ui.debug('part type: "%s"\n' % self.type)
637 self.id = self._unpackheader(_fpartid)[0]
641 self.id = self._unpackheader(_fpartid)[0]
638 self.ui.debug('part id: "%s"\n' % self.id)
642 self.ui.debug('part id: "%s"\n' % self.id)
639 ## reading parameters
643 ## reading parameters
640 # param count
644 # param count
641 mancount, advcount = self._unpackheader(_fpartparamcount)
645 mancount, advcount = self._unpackheader(_fpartparamcount)
642 self.ui.debug('part parameters: %i\n' % (mancount + advcount))
646 self.ui.debug('part parameters: %i\n' % (mancount + advcount))
643 # param size
647 # param size
644 fparamsizes = _makefpartparamsizes(mancount + advcount)
648 fparamsizes = _makefpartparamsizes(mancount + advcount)
645 paramsizes = self._unpackheader(fparamsizes)
649 paramsizes = self._unpackheader(fparamsizes)
646 # make it a list of couple again
650 # make it a list of couple again
647 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
651 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
648 # split mandatory from advisory
652 # split mandatory from advisory
649 mansizes = paramsizes[:mancount]
653 mansizes = paramsizes[:mancount]
650 advsizes = paramsizes[mancount:]
654 advsizes = paramsizes[mancount:]
651 # retrive param value
655 # retrive param value
652 manparams = []
656 manparams = []
653 for key, value in mansizes:
657 for key, value in mansizes:
654 manparams.append((self._fromheader(key), self._fromheader(value)))
658 manparams.append((self._fromheader(key), self._fromheader(value)))
655 advparams = []
659 advparams = []
656 for key, value in advsizes:
660 for key, value in advsizes:
657 advparams.append((self._fromheader(key), self._fromheader(value)))
661 advparams.append((self._fromheader(key), self._fromheader(value)))
658 self.mandatoryparams = manparams
662 self.mandatoryparams = manparams
659 self.advisoryparams = advparams
663 self.advisoryparams = advparams
660 ## part payload
664 ## part payload
661 def payloadchunks():
665 def payloadchunks():
662 payloadsize = self._unpack(_fpayloadsize)[0]
666 payloadsize = self._unpack(_fpayloadsize)[0]
663 self.ui.debug('payload chunk size: %i\n' % payloadsize)
667 self.ui.debug('payload chunk size: %i\n' % payloadsize)
664 while payloadsize:
668 while payloadsize:
665 yield self._readexact(payloadsize)
669 yield self._readexact(payloadsize)
666 payloadsize = self._unpack(_fpayloadsize)[0]
670 payloadsize = self._unpack(_fpayloadsize)[0]
667 self.ui.debug('payload chunk size: %i\n' % payloadsize)
671 self.ui.debug('payload chunk size: %i\n' % payloadsize)
668 self._payloadstream = util.chunkbuffer(payloadchunks())
672 self._payloadstream = util.chunkbuffer(payloadchunks())
669 # we read the data, tell it
673 # we read the data, tell it
670 self._initialized = True
674 self._initialized = True
671
675
672 def read(self, size=None):
676 def read(self, size=None):
673 """read payload data"""
677 """read payload data"""
674 if not self._initialized:
678 if not self._initialized:
675 self._readheader()
679 self._readheader()
676 if size is None:
680 if size is None:
677 data = self._payloadstream.read()
681 data = self._payloadstream.read()
678 else:
682 else:
679 data = self._payloadstream.read(size)
683 data = self._payloadstream.read(size)
680 if size is None or len(data) < size:
684 if size is None or len(data) < size:
681 self.consumed = True
685 self.consumed = True
682 return data
686 return data
683
687
684
688
685 @parthandler('b2x:changegroup')
689 @parthandler('b2x:changegroup')
686 def handlechangegroup(op, inpart):
690 def handlechangegroup(op, inpart):
687 """apply a changegroup part on the repo
691 """apply a changegroup part on the repo
688
692
689 This is a very early implementation that will massive rework before being
693 This is a very early implementation that will massive rework before being
690 inflicted to any end-user.
694 inflicted to any end-user.
691 """
695 """
692 # Make sure we trigger a transaction creation
696 # Make sure we trigger a transaction creation
693 #
697 #
694 # The addchangegroup function will get a transaction object by itself, but
698 # The addchangegroup function will get a transaction object by itself, but
695 # we need to make sure we trigger the creation of a transaction object used
699 # we need to make sure we trigger the creation of a transaction object used
696 # for the whole processing scope.
700 # for the whole processing scope.
697 op.gettransaction()
701 op.gettransaction()
698 cg = changegroup.unbundle10(inpart, 'UN')
702 cg = changegroup.unbundle10(inpart, 'UN')
699 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
703 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
700 op.records.add('changegroup', {'return': ret})
704 op.records.add('changegroup', {'return': ret})
701 if op.reply is not None:
705 if op.reply is not None:
702 # This is definitly not the final form of this
706 # This is definitly not the final form of this
703 # return. But one need to start somewhere.
707 # return. But one need to start somewhere.
704 part = bundlepart('b2x:reply:changegroup', (),
708 part = bundlepart('b2x:reply:changegroup', (),
705 [('in-reply-to', str(inpart.id)),
709 [('in-reply-to', str(inpart.id)),
706 ('return', '%i' % ret)])
710 ('return', '%i' % ret)])
707 op.reply.addpart(part)
711 op.reply.addpart(part)
708 assert not inpart.read()
712 assert not inpart.read()
709
713
710 @parthandler('b2x:reply:changegroup')
714 @parthandler('b2x:reply:changegroup')
711 def handlechangegroup(op, inpart):
715 def handlechangegroup(op, inpart):
712 p = dict(inpart.advisoryparams)
716 p = dict(inpart.advisoryparams)
713 ret = int(p['return'])
717 ret = int(p['return'])
714 op.records.add('changegroup', {'return': ret}, int(p['in-reply-to']))
718 op.records.add('changegroup', {'return': ret}, int(p['in-reply-to']))
715
719
716 @parthandler('b2x:check:heads')
720 @parthandler('b2x:check:heads')
717 def handlechangegroup(op, inpart):
721 def handlechangegroup(op, inpart):
718 """check that head of the repo did not change
722 """check that head of the repo did not change
719
723
720 This is used to detect a push race when using unbundle.
724 This is used to detect a push race when using unbundle.
721 This replaces the "heads" argument of unbundle."""
725 This replaces the "heads" argument of unbundle."""
722 h = inpart.read(20)
726 h = inpart.read(20)
723 heads = []
727 heads = []
724 while len(h) == 20:
728 while len(h) == 20:
725 heads.append(h)
729 heads.append(h)
726 h = inpart.read(20)
730 h = inpart.read(20)
727 assert not h
731 assert not h
728 if heads != op.repo.heads():
732 if heads != op.repo.heads():
729 raise exchange.PushRaced()
733 raise exchange.PushRaced()
730
734
731 @parthandler('b2x:output')
735 @parthandler('b2x:output')
732 def handleoutput(op, inpart):
736 def handleoutput(op, inpart):
733 """forward output captured on the server to the client"""
737 """forward output captured on the server to the client"""
734 for line in inpart.read().splitlines():
738 for line in inpart.read().splitlines():
735 op.ui.write(('remote: %s\n' % line))
739 op.ui.write(('remote: %s\n' % line))
736
740
737 @parthandler('b2x:replycaps')
741 @parthandler('b2x:replycaps')
738 def handlereplycaps(op, inpart):
742 def handlereplycaps(op, inpart):
739 """Notify that a reply bundle should be created
743 """Notify that a reply bundle should be created
740
744
741 The payload contains the capabilities information for the reply"""
745 The payload contains the capabilities information for the reply"""
742 caps = decodecaps(inpart.read())
746 caps = decodecaps(inpart.read())
743 if op.reply is None:
747 if op.reply is None:
744 op.reply = bundle20(op.ui, caps)
748 op.reply = bundle20(op.ui, caps)
745
749
746 @parthandler('b2x:error:abort')
750 @parthandler('b2x:error:abort')
747 def handlereplycaps(op, inpart):
751 def handlereplycaps(op, inpart):
748 """Used to transmit abort error over the wire"""
752 """Used to transmit abort error over the wire"""
749 manargs = dict(inpart.mandatoryparams)
753 manargs = dict(inpart.mandatoryparams)
750 advargs = dict(inpart.advisoryparams)
754 advargs = dict(inpart.advisoryparams)
751 raise util.Abort(manargs['message'], hint=advargs.get('hint'))
755 raise util.Abort(manargs['message'], hint=advargs.get('hint'))
General Comments 0
You need to be logged in to leave comments. Login now