##// END OF EJS Templates
bundle2: extract capabilities decoding...
Pierre-Yves David -
r21138:f469879d default
parent child Browse files
Show More
@@ -1,720 +1,727 b''
1 # bundle2.py - generic container format to transmit arbitrary data.
1 # bundle2.py - generic container format to transmit arbitrary data.
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7 """Handling of the new bundle2 format
7 """Handling of the new bundle2 format
8
8
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 payloads in an application agnostic way. It consist in a sequence of "parts"
10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 that will be handed to and processed by the application layer.
11 that will be handed to and processed by the application layer.
12
12
13
13
14 General format architecture
14 General format architecture
15 ===========================
15 ===========================
16
16
17 The format is architectured as follow
17 The format is architectured as follow
18
18
19 - magic string
19 - magic string
20 - stream level parameters
20 - stream level parameters
21 - payload parts (any number)
21 - payload parts (any number)
22 - end of stream marker.
22 - end of stream marker.
23
23
24 the Binary format
24 the Binary format
25 ============================
25 ============================
26
26
27 All numbers are unsigned and big-endian.
27 All numbers are unsigned and big-endian.
28
28
29 stream level parameters
29 stream level parameters
30 ------------------------
30 ------------------------
31
31
32 Binary format is as follow
32 Binary format is as follow
33
33
34 :params size: (16 bits integer)
34 :params size: (16 bits integer)
35
35
36 The total number of Bytes used by the parameters
36 The total number of Bytes used by the parameters
37
37
38 :params value: arbitrary number of Bytes
38 :params value: arbitrary number of Bytes
39
39
40 A blob of `params size` containing the serialized version of all stream level
40 A blob of `params size` containing the serialized version of all stream level
41 parameters.
41 parameters.
42
42
43 The blob contains a space separated list of parameters. Parameters with value
43 The blob contains a space separated list of parameters. Parameters with value
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45
45
46 Empty name are obviously forbidden.
46 Empty name are obviously forbidden.
47
47
48 Name MUST start with a letter. If this first letter is lower case, the
48 Name MUST start with a letter. If this first letter is lower case, the
49 parameter is advisory and can be safely ignored. However when the first
49 parameter is advisory and can be safely ignored. However when the first
50 letter is capital, the parameter is mandatory and the bundling process MUST
50 letter is capital, the parameter is mandatory and the bundling process MUST
51 stop if he is not able to proceed it.
51 stop if he is not able to proceed it.
52
52
53 Stream parameters use a simple textual format for two main reasons:
53 Stream parameters use a simple textual format for two main reasons:
54
54
55 - Stream level parameters should remain simple and we want to discourage any
55 - Stream level parameters should remain simple and we want to discourage any
56 crazy usage.
56 crazy usage.
57 - Textual data allow easy human inspection of a bundle2 header in case of
57 - Textual data allow easy human inspection of a bundle2 header in case of
58 troubles.
58 troubles.
59
59
60 Any Applicative level options MUST go into a bundle2 part instead.
60 Any Applicative level options MUST go into a bundle2 part instead.
61
61
62 Payload part
62 Payload part
63 ------------------------
63 ------------------------
64
64
65 Binary format is as follow
65 Binary format is as follow
66
66
67 :header size: (16 bits inter)
67 :header size: (16 bits inter)
68
68
69 The total number of Bytes used by the part headers. When the header is empty
69 The total number of Bytes used by the part headers. When the header is empty
70 (size = 0) this is interpreted as the end of stream marker.
70 (size = 0) this is interpreted as the end of stream marker.
71
71
72 :header:
72 :header:
73
73
74 The header defines how to interpret the part. It contains two piece of
74 The header defines how to interpret the part. It contains two piece of
75 data: the part type, and the part parameters.
75 data: the part type, and the part parameters.
76
76
77 The part type is used to route an application level handler, that can
77 The part type is used to route an application level handler, that can
78 interpret payload.
78 interpret payload.
79
79
80 Part parameters are passed to the application level handler. They are
80 Part parameters are passed to the application level handler. They are
81 meant to convey information that will help the application level object to
81 meant to convey information that will help the application level object to
82 interpret the part payload.
82 interpret the part payload.
83
83
84 The binary format of the header is has follow
84 The binary format of the header is has follow
85
85
86 :typesize: (one byte)
86 :typesize: (one byte)
87
87
88 :parttype: alphanumerical part name
88 :parttype: alphanumerical part name
89
89
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 to this part.
91 to this part.
92
92
93 :parameters:
93 :parameters:
94
94
95 Part's parameter may have arbitrary content, the binary structure is::
95 Part's parameter may have arbitrary content, the binary structure is::
96
96
97 <mandatory-count><advisory-count><param-sizes><param-data>
97 <mandatory-count><advisory-count><param-sizes><param-data>
98
98
99 :mandatory-count: 1 byte, number of mandatory parameters
99 :mandatory-count: 1 byte, number of mandatory parameters
100
100
101 :advisory-count: 1 byte, number of advisory parameters
101 :advisory-count: 1 byte, number of advisory parameters
102
102
103 :param-sizes:
103 :param-sizes:
104
104
105 N couple of bytes, where N is the total number of parameters. Each
105 N couple of bytes, where N is the total number of parameters. Each
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107
107
108 :param-data:
108 :param-data:
109
109
110 A blob of bytes from which each parameter key and value can be
110 A blob of bytes from which each parameter key and value can be
111 retrieved using the list of size couples stored in the previous
111 retrieved using the list of size couples stored in the previous
112 field.
112 field.
113
113
114 Mandatory parameters comes first, then the advisory ones.
114 Mandatory parameters comes first, then the advisory ones.
115
115
116 :payload:
116 :payload:
117
117
118 payload is a series of `<chunksize><chunkdata>`.
118 payload is a series of `<chunksize><chunkdata>`.
119
119
120 `chunksize` is a 32 bits integer, `chunkdata` are plain bytes (as much as
120 `chunksize` is a 32 bits integer, `chunkdata` are plain bytes (as much as
121 `chunksize` says)` The payload part is concluded by a zero size chunk.
121 `chunksize` says)` The payload part is concluded by a zero size chunk.
122
122
123 The current implementation always produces either zero or one chunk.
123 The current implementation always produces either zero or one chunk.
124 This is an implementation limitation that will ultimately be lifted.
124 This is an implementation limitation that will ultimately be lifted.
125
125
126 Bundle processing
126 Bundle processing
127 ============================
127 ============================
128
128
129 Each part is processed in order using a "part handler". Handler are registered
129 Each part is processed in order using a "part handler". Handler are registered
130 for a certain part type.
130 for a certain part type.
131
131
132 The matching of a part to its handler is case insensitive. The case of the
132 The matching of a part to its handler is case insensitive. The case of the
133 part type is used to know if a part is mandatory or advisory. If the Part type
133 part type is used to know if a part is mandatory or advisory. If the Part type
134 contains any uppercase char it is considered mandatory. When no handler is
134 contains any uppercase char it is considered mandatory. When no handler is
135 known for a Mandatory part, the process is aborted and an exception is raised.
135 known for a Mandatory part, the process is aborted and an exception is raised.
136 If the part is advisory and no handler is known, the part is ignored. When the
136 If the part is advisory and no handler is known, the part is ignored. When the
137 process is aborted, the full bundle is still read from the stream to keep the
137 process is aborted, the full bundle is still read from the stream to keep the
138 channel usable. But none of the part read from an abort are processed. In the
138 channel usable. But none of the part read from an abort are processed. In the
139 future, dropping the stream may become an option for channel we do not care to
139 future, dropping the stream may become an option for channel we do not care to
140 preserve.
140 preserve.
141 """
141 """
142
142
143 import util
143 import util
144 import struct
144 import struct
145 import urllib
145 import urllib
146 import string
146 import string
147
147
148 import changegroup
148 import changegroup
149 from i18n import _
149 from i18n import _
150
150
151 _pack = struct.pack
151 _pack = struct.pack
152 _unpack = struct.unpack
152 _unpack = struct.unpack
153
153
154 _magicstring = 'HG20'
154 _magicstring = 'HG20'
155
155
156 _fstreamparamsize = '>H'
156 _fstreamparamsize = '>H'
157 _fpartheadersize = '>H'
157 _fpartheadersize = '>H'
158 _fparttypesize = '>B'
158 _fparttypesize = '>B'
159 _fpartid = '>I'
159 _fpartid = '>I'
160 _fpayloadsize = '>I'
160 _fpayloadsize = '>I'
161 _fpartparamcount = '>BB'
161 _fpartparamcount = '>BB'
162
162
163 preferedchunksize = 4096
163 preferedchunksize = 4096
164
164
165 def _makefpartparamsizes(nbparams):
165 def _makefpartparamsizes(nbparams):
166 """return a struct format to read part parameter sizes
166 """return a struct format to read part parameter sizes
167
167
168 The number parameters is variable so we need to build that format
168 The number parameters is variable so we need to build that format
169 dynamically.
169 dynamically.
170 """
170 """
171 return '>'+('BB'*nbparams)
171 return '>'+('BB'*nbparams)
172
172
173 parthandlermapping = {}
173 parthandlermapping = {}
174
174
175 def parthandler(parttype):
175 def parthandler(parttype):
176 """decorator that register a function as a bundle2 part handler
176 """decorator that register a function as a bundle2 part handler
177
177
178 eg::
178 eg::
179
179
180 @parthandler('myparttype')
180 @parthandler('myparttype')
181 def myparttypehandler(...):
181 def myparttypehandler(...):
182 '''process a part of type "my part".'''
182 '''process a part of type "my part".'''
183 ...
183 ...
184 """
184 """
185 def _decorator(func):
185 def _decorator(func):
186 lparttype = parttype.lower() # enforce lower case matching.
186 lparttype = parttype.lower() # enforce lower case matching.
187 assert lparttype not in parthandlermapping
187 assert lparttype not in parthandlermapping
188 parthandlermapping[lparttype] = func
188 parthandlermapping[lparttype] = func
189 return func
189 return func
190 return _decorator
190 return _decorator
191
191
192 class unbundlerecords(object):
192 class unbundlerecords(object):
193 """keep record of what happens during and unbundle
193 """keep record of what happens during and unbundle
194
194
195 New records are added using `records.add('cat', obj)`. Where 'cat' is a
195 New records are added using `records.add('cat', obj)`. Where 'cat' is a
196 category of record and obj is an arbitrary object.
196 category of record and obj is an arbitrary object.
197
197
198 `records['cat']` will return all entries of this category 'cat'.
198 `records['cat']` will return all entries of this category 'cat'.
199
199
200 Iterating on the object itself will yield `('category', obj)` tuples
200 Iterating on the object itself will yield `('category', obj)` tuples
201 for all entries.
201 for all entries.
202
202
203 All iterations happens in chronological order.
203 All iterations happens in chronological order.
204 """
204 """
205
205
206 def __init__(self):
206 def __init__(self):
207 self._categories = {}
207 self._categories = {}
208 self._sequences = []
208 self._sequences = []
209 self._replies = {}
209 self._replies = {}
210
210
211 def add(self, category, entry, inreplyto=None):
211 def add(self, category, entry, inreplyto=None):
212 """add a new record of a given category.
212 """add a new record of a given category.
213
213
214 The entry can then be retrieved in the list returned by
214 The entry can then be retrieved in the list returned by
215 self['category']."""
215 self['category']."""
216 self._categories.setdefault(category, []).append(entry)
216 self._categories.setdefault(category, []).append(entry)
217 self._sequences.append((category, entry))
217 self._sequences.append((category, entry))
218 if inreplyto is not None:
218 if inreplyto is not None:
219 self.getreplies(inreplyto).add(category, entry)
219 self.getreplies(inreplyto).add(category, entry)
220
220
221 def getreplies(self, partid):
221 def getreplies(self, partid):
222 """get the subrecords that replies to a specific part"""
222 """get the subrecords that replies to a specific part"""
223 return self._replies.setdefault(partid, unbundlerecords())
223 return self._replies.setdefault(partid, unbundlerecords())
224
224
225 def __getitem__(self, cat):
225 def __getitem__(self, cat):
226 return tuple(self._categories.get(cat, ()))
226 return tuple(self._categories.get(cat, ()))
227
227
228 def __iter__(self):
228 def __iter__(self):
229 return iter(self._sequences)
229 return iter(self._sequences)
230
230
231 def __len__(self):
231 def __len__(self):
232 return len(self._sequences)
232 return len(self._sequences)
233
233
234 def __nonzero__(self):
234 def __nonzero__(self):
235 return bool(self._sequences)
235 return bool(self._sequences)
236
236
237 class bundleoperation(object):
237 class bundleoperation(object):
238 """an object that represents a single bundling process
238 """an object that represents a single bundling process
239
239
240 Its purpose is to carry unbundle-related objects and states.
240 Its purpose is to carry unbundle-related objects and states.
241
241
242 A new object should be created at the beginning of each bundle processing.
242 A new object should be created at the beginning of each bundle processing.
243 The object is to be returned by the processing function.
243 The object is to be returned by the processing function.
244
244
245 The object has very little content now it will ultimately contain:
245 The object has very little content now it will ultimately contain:
246 * an access to the repo the bundle is applied to,
246 * an access to the repo the bundle is applied to,
247 * a ui object,
247 * a ui object,
248 * a way to retrieve a transaction to add changes to the repo,
248 * a way to retrieve a transaction to add changes to the repo,
249 * a way to record the result of processing each part,
249 * a way to record the result of processing each part,
250 * a way to construct a bundle response when applicable.
250 * a way to construct a bundle response when applicable.
251 """
251 """
252
252
253 def __init__(self, repo, transactiongetter):
253 def __init__(self, repo, transactiongetter):
254 self.repo = repo
254 self.repo = repo
255 self.ui = repo.ui
255 self.ui = repo.ui
256 self.records = unbundlerecords()
256 self.records = unbundlerecords()
257 self.gettransaction = transactiongetter
257 self.gettransaction = transactiongetter
258 self.reply = None
258 self.reply = None
259
259
260 class TransactionUnavailable(RuntimeError):
260 class TransactionUnavailable(RuntimeError):
261 pass
261 pass
262
262
263 def _notransaction():
263 def _notransaction():
264 """default method to get a transaction while processing a bundle
264 """default method to get a transaction while processing a bundle
265
265
266 Raise an exception to highlight the fact that no transaction was expected
266 Raise an exception to highlight the fact that no transaction was expected
267 to be created"""
267 to be created"""
268 raise TransactionUnavailable()
268 raise TransactionUnavailable()
269
269
270 def processbundle(repo, unbundler, transactiongetter=_notransaction):
270 def processbundle(repo, unbundler, transactiongetter=_notransaction):
271 """This function process a bundle, apply effect to/from a repo
271 """This function process a bundle, apply effect to/from a repo
272
272
273 It iterates over each part then searches for and uses the proper handling
273 It iterates over each part then searches for and uses the proper handling
274 code to process the part. Parts are processed in order.
274 code to process the part. Parts are processed in order.
275
275
276 This is very early version of this function that will be strongly reworked
276 This is very early version of this function that will be strongly reworked
277 before final usage.
277 before final usage.
278
278
279 Unknown Mandatory part will abort the process.
279 Unknown Mandatory part will abort the process.
280 """
280 """
281 op = bundleoperation(repo, transactiongetter)
281 op = bundleoperation(repo, transactiongetter)
282 # todo:
282 # todo:
283 # - replace this is a init function soon.
283 # - replace this is a init function soon.
284 # - exception catching
284 # - exception catching
285 unbundler.params
285 unbundler.params
286 iterparts = unbundler.iterparts()
286 iterparts = unbundler.iterparts()
287 part = None
287 part = None
288 try:
288 try:
289 for part in iterparts:
289 for part in iterparts:
290 parttype = part.type
290 parttype = part.type
291 # part key are matched lower case
291 # part key are matched lower case
292 key = parttype.lower()
292 key = parttype.lower()
293 try:
293 try:
294 handler = parthandlermapping[key]
294 handler = parthandlermapping[key]
295 op.ui.debug('found a handler for part %r\n' % parttype)
295 op.ui.debug('found a handler for part %r\n' % parttype)
296 except KeyError:
296 except KeyError:
297 if key != parttype: # mandatory parts
297 if key != parttype: # mandatory parts
298 # todo:
298 # todo:
299 # - use a more precise exception
299 # - use a more precise exception
300 raise
300 raise
301 op.ui.debug('ignoring unknown advisory part %r\n' % key)
301 op.ui.debug('ignoring unknown advisory part %r\n' % key)
302 # consuming the part
302 # consuming the part
303 part.read()
303 part.read()
304 continue
304 continue
305
305
306 # handler is called outside the above try block so that we don't
306 # handler is called outside the above try block so that we don't
307 # risk catching KeyErrors from anything other than the
307 # risk catching KeyErrors from anything other than the
308 # parthandlermapping lookup (any KeyError raised by handler()
308 # parthandlermapping lookup (any KeyError raised by handler()
309 # itself represents a defect of a different variety).
309 # itself represents a defect of a different variety).
310 output = None
310 output = None
311 if op.reply is not None:
311 if op.reply is not None:
312 op.ui.pushbuffer(error=True)
312 op.ui.pushbuffer(error=True)
313 output = ''
313 output = ''
314 try:
314 try:
315 handler(op, part)
315 handler(op, part)
316 finally:
316 finally:
317 if output is not None:
317 if output is not None:
318 output = op.ui.popbuffer()
318 output = op.ui.popbuffer()
319 if output:
319 if output:
320 outpart = bundlepart('output',
320 outpart = bundlepart('output',
321 advisoryparams=[('in-reply-to',
321 advisoryparams=[('in-reply-to',
322 str(part.id))],
322 str(part.id))],
323 data=output)
323 data=output)
324 op.reply.addpart(outpart)
324 op.reply.addpart(outpart)
325 part.read()
325 part.read()
326 except Exception:
326 except Exception:
327 if part is not None:
327 if part is not None:
328 # consume the bundle content
328 # consume the bundle content
329 part.read()
329 part.read()
330 for part in iterparts:
330 for part in iterparts:
331 # consume the bundle content
331 # consume the bundle content
332 part.read()
332 part.read()
333 raise
333 raise
334 return op
334 return op
335
335
336 def decodecaps(blob):
337 """decode a bundle2 caps bytes blob into a dictionnary
338
339 The blob is a list of capabilities (one per line)
340 Capabilities may have values using a line of the form::
341
342 capability=value1,value2,value3
343
344 The values are always a list."""
345 caps = {}
346 for line in blob.splitlines():
347 if not line:
348 continue
349 if '=' not in line:
350 key, vals = line, ()
351 else:
352 key, vals = line.split('=', 1)
353 vals = vals.split(',')
354 key = urllib.unquote(key)
355 vals = [urllib.unquote(v) for v in vals]
356 caps[key] = vals
357 return caps
358
336 class bundle20(object):
359 class bundle20(object):
337 """represent an outgoing bundle2 container
360 """represent an outgoing bundle2 container
338
361
339 Use the `addparam` method to add stream level parameter. and `addpart` to
362 Use the `addparam` method to add stream level parameter. and `addpart` to
340 populate it. Then call `getchunks` to retrieve all the binary chunks of
363 populate it. Then call `getchunks` to retrieve all the binary chunks of
341 data that compose the bundle2 container."""
364 data that compose the bundle2 container."""
342
365
343 def __init__(self, ui, capabilities=()):
366 def __init__(self, ui, capabilities=()):
344 self.ui = ui
367 self.ui = ui
345 self._params = []
368 self._params = []
346 self._parts = []
369 self._parts = []
347 self.capabilities = dict(capabilities)
370 self.capabilities = dict(capabilities)
348
371
349 def addparam(self, name, value=None):
372 def addparam(self, name, value=None):
350 """add a stream level parameter"""
373 """add a stream level parameter"""
351 if not name:
374 if not name:
352 raise ValueError('empty parameter name')
375 raise ValueError('empty parameter name')
353 if name[0] not in string.letters:
376 if name[0] not in string.letters:
354 raise ValueError('non letter first character: %r' % name)
377 raise ValueError('non letter first character: %r' % name)
355 self._params.append((name, value))
378 self._params.append((name, value))
356
379
357 def addpart(self, part):
380 def addpart(self, part):
358 """add a new part to the bundle2 container
381 """add a new part to the bundle2 container
359
382
360 Parts contains the actual applicative payload."""
383 Parts contains the actual applicative payload."""
361 assert part.id is None
384 assert part.id is None
362 part.id = len(self._parts) # very cheap counter
385 part.id = len(self._parts) # very cheap counter
363 self._parts.append(part)
386 self._parts.append(part)
364
387
365 def getchunks(self):
388 def getchunks(self):
366 self.ui.debug('start emission of %s stream\n' % _magicstring)
389 self.ui.debug('start emission of %s stream\n' % _magicstring)
367 yield _magicstring
390 yield _magicstring
368 param = self._paramchunk()
391 param = self._paramchunk()
369 self.ui.debug('bundle parameter: %s\n' % param)
392 self.ui.debug('bundle parameter: %s\n' % param)
370 yield _pack(_fstreamparamsize, len(param))
393 yield _pack(_fstreamparamsize, len(param))
371 if param:
394 if param:
372 yield param
395 yield param
373
396
374 self.ui.debug('start of parts\n')
397 self.ui.debug('start of parts\n')
375 for part in self._parts:
398 for part in self._parts:
376 self.ui.debug('bundle part: "%s"\n' % part.type)
399 self.ui.debug('bundle part: "%s"\n' % part.type)
377 for chunk in part.getchunks():
400 for chunk in part.getchunks():
378 yield chunk
401 yield chunk
379 self.ui.debug('end of bundle\n')
402 self.ui.debug('end of bundle\n')
380 yield '\0\0'
403 yield '\0\0'
381
404
382 def _paramchunk(self):
405 def _paramchunk(self):
383 """return a encoded version of all stream parameters"""
406 """return a encoded version of all stream parameters"""
384 blocks = []
407 blocks = []
385 for par, value in self._params:
408 for par, value in self._params:
386 par = urllib.quote(par)
409 par = urllib.quote(par)
387 if value is not None:
410 if value is not None:
388 value = urllib.quote(value)
411 value = urllib.quote(value)
389 par = '%s=%s' % (par, value)
412 par = '%s=%s' % (par, value)
390 blocks.append(par)
413 blocks.append(par)
391 return ' '.join(blocks)
414 return ' '.join(blocks)
392
415
393 class unpackermixin(object):
416 class unpackermixin(object):
394 """A mixin to extract bytes and struct data from a stream"""
417 """A mixin to extract bytes and struct data from a stream"""
395
418
396 def __init__(self, fp):
419 def __init__(self, fp):
397 self._fp = fp
420 self._fp = fp
398
421
399 def _unpack(self, format):
422 def _unpack(self, format):
400 """unpack this struct format from the stream"""
423 """unpack this struct format from the stream"""
401 data = self._readexact(struct.calcsize(format))
424 data = self._readexact(struct.calcsize(format))
402 return _unpack(format, data)
425 return _unpack(format, data)
403
426
404 def _readexact(self, size):
427 def _readexact(self, size):
405 """read exactly <size> bytes from the stream"""
428 """read exactly <size> bytes from the stream"""
406 return changegroup.readexactly(self._fp, size)
429 return changegroup.readexactly(self._fp, size)
407
430
408
431
409 class unbundle20(unpackermixin):
432 class unbundle20(unpackermixin):
410 """interpret a bundle2 stream
433 """interpret a bundle2 stream
411
434
412 This class is fed with a binary stream and yields parts through its
435 This class is fed with a binary stream and yields parts through its
413 `iterparts` methods."""
436 `iterparts` methods."""
414
437
415 def __init__(self, ui, fp, header=None):
438 def __init__(self, ui, fp, header=None):
416 """If header is specified, we do not read it out of the stream."""
439 """If header is specified, we do not read it out of the stream."""
417 self.ui = ui
440 self.ui = ui
418 super(unbundle20, self).__init__(fp)
441 super(unbundle20, self).__init__(fp)
419 if header is None:
442 if header is None:
420 header = self._readexact(4)
443 header = self._readexact(4)
421 magic, version = header[0:2], header[2:4]
444 magic, version = header[0:2], header[2:4]
422 if magic != 'HG':
445 if magic != 'HG':
423 raise util.Abort(_('not a Mercurial bundle'))
446 raise util.Abort(_('not a Mercurial bundle'))
424 if version != '20':
447 if version != '20':
425 raise util.Abort(_('unknown bundle version %s') % version)
448 raise util.Abort(_('unknown bundle version %s') % version)
426 self.ui.debug('start processing of %s stream\n' % header)
449 self.ui.debug('start processing of %s stream\n' % header)
427
450
428 @util.propertycache
451 @util.propertycache
429 def params(self):
452 def params(self):
430 """dictionary of stream level parameters"""
453 """dictionary of stream level parameters"""
431 self.ui.debug('reading bundle2 stream parameters\n')
454 self.ui.debug('reading bundle2 stream parameters\n')
432 params = {}
455 params = {}
433 paramssize = self._unpack(_fstreamparamsize)[0]
456 paramssize = self._unpack(_fstreamparamsize)[0]
434 if paramssize:
457 if paramssize:
435 for p in self._readexact(paramssize).split(' '):
458 for p in self._readexact(paramssize).split(' '):
436 p = p.split('=', 1)
459 p = p.split('=', 1)
437 p = [urllib.unquote(i) for i in p]
460 p = [urllib.unquote(i) for i in p]
438 if len(p) < 2:
461 if len(p) < 2:
439 p.append(None)
462 p.append(None)
440 self._processparam(*p)
463 self._processparam(*p)
441 params[p[0]] = p[1]
464 params[p[0]] = p[1]
442 return params
465 return params
443
466
444 def _processparam(self, name, value):
467 def _processparam(self, name, value):
445 """process a parameter, applying its effect if needed
468 """process a parameter, applying its effect if needed
446
469
447 Parameter starting with a lower case letter are advisory and will be
470 Parameter starting with a lower case letter are advisory and will be
448 ignored when unknown. Those starting with an upper case letter are
471 ignored when unknown. Those starting with an upper case letter are
449 mandatory and will this function will raise a KeyError when unknown.
472 mandatory and will this function will raise a KeyError when unknown.
450
473
451 Note: no option are currently supported. Any input will be either
474 Note: no option are currently supported. Any input will be either
452 ignored or failing.
475 ignored or failing.
453 """
476 """
454 if not name:
477 if not name:
455 raise ValueError('empty parameter name')
478 raise ValueError('empty parameter name')
456 if name[0] not in string.letters:
479 if name[0] not in string.letters:
457 raise ValueError('non letter first character: %r' % name)
480 raise ValueError('non letter first character: %r' % name)
458 # Some logic will be later added here to try to process the option for
481 # Some logic will be later added here to try to process the option for
459 # a dict of known parameter.
482 # a dict of known parameter.
460 if name[0].islower():
483 if name[0].islower():
461 self.ui.debug("ignoring unknown parameter %r\n" % name)
484 self.ui.debug("ignoring unknown parameter %r\n" % name)
462 else:
485 else:
463 raise KeyError(name)
486 raise KeyError(name)
464
487
465
488
466 def iterparts(self):
489 def iterparts(self):
467 """yield all parts contained in the stream"""
490 """yield all parts contained in the stream"""
468 # make sure param have been loaded
491 # make sure param have been loaded
469 self.params
492 self.params
470 self.ui.debug('start extraction of bundle2 parts\n')
493 self.ui.debug('start extraction of bundle2 parts\n')
471 headerblock = self._readpartheader()
494 headerblock = self._readpartheader()
472 while headerblock is not None:
495 while headerblock is not None:
473 part = unbundlepart(self.ui, headerblock, self._fp)
496 part = unbundlepart(self.ui, headerblock, self._fp)
474 yield part
497 yield part
475 headerblock = self._readpartheader()
498 headerblock = self._readpartheader()
476 self.ui.debug('end of bundle2 stream\n')
499 self.ui.debug('end of bundle2 stream\n')
477
500
478 def _readpartheader(self):
501 def _readpartheader(self):
479 """reads a part header size and return the bytes blob
502 """reads a part header size and return the bytes blob
480
503
481 returns None if empty"""
504 returns None if empty"""
482 headersize = self._unpack(_fpartheadersize)[0]
505 headersize = self._unpack(_fpartheadersize)[0]
483 self.ui.debug('part header size: %i\n' % headersize)
506 self.ui.debug('part header size: %i\n' % headersize)
484 if headersize:
507 if headersize:
485 return self._readexact(headersize)
508 return self._readexact(headersize)
486 return None
509 return None
487
510
488
511
489 class bundlepart(object):
512 class bundlepart(object):
490 """A bundle2 part contains application level payload
513 """A bundle2 part contains application level payload
491
514
492 The part `type` is used to route the part to the application level
515 The part `type` is used to route the part to the application level
493 handler.
516 handler.
494 """
517 """
495
518
496 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
519 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
497 data=''):
520 data=''):
498 self.id = None
521 self.id = None
499 self.type = parttype
522 self.type = parttype
500 self.data = data
523 self.data = data
501 self.mandatoryparams = mandatoryparams
524 self.mandatoryparams = mandatoryparams
502 self.advisoryparams = advisoryparams
525 self.advisoryparams = advisoryparams
503
526
504 def getchunks(self):
527 def getchunks(self):
505 #### header
528 #### header
506 ## parttype
529 ## parttype
507 header = [_pack(_fparttypesize, len(self.type)),
530 header = [_pack(_fparttypesize, len(self.type)),
508 self.type, _pack(_fpartid, self.id),
531 self.type, _pack(_fpartid, self.id),
509 ]
532 ]
510 ## parameters
533 ## parameters
511 # count
534 # count
512 manpar = self.mandatoryparams
535 manpar = self.mandatoryparams
513 advpar = self.advisoryparams
536 advpar = self.advisoryparams
514 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
537 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
515 # size
538 # size
516 parsizes = []
539 parsizes = []
517 for key, value in manpar:
540 for key, value in manpar:
518 parsizes.append(len(key))
541 parsizes.append(len(key))
519 parsizes.append(len(value))
542 parsizes.append(len(value))
520 for key, value in advpar:
543 for key, value in advpar:
521 parsizes.append(len(key))
544 parsizes.append(len(key))
522 parsizes.append(len(value))
545 parsizes.append(len(value))
523 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
546 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
524 header.append(paramsizes)
547 header.append(paramsizes)
525 # key, value
548 # key, value
526 for key, value in manpar:
549 for key, value in manpar:
527 header.append(key)
550 header.append(key)
528 header.append(value)
551 header.append(value)
529 for key, value in advpar:
552 for key, value in advpar:
530 header.append(key)
553 header.append(key)
531 header.append(value)
554 header.append(value)
532 ## finalize header
555 ## finalize header
533 headerchunk = ''.join(header)
556 headerchunk = ''.join(header)
534 yield _pack(_fpartheadersize, len(headerchunk))
557 yield _pack(_fpartheadersize, len(headerchunk))
535 yield headerchunk
558 yield headerchunk
536 ## payload
559 ## payload
537 for chunk in self._payloadchunks():
560 for chunk in self._payloadchunks():
538 yield _pack(_fpayloadsize, len(chunk))
561 yield _pack(_fpayloadsize, len(chunk))
539 yield chunk
562 yield chunk
540 # end of payload
563 # end of payload
541 yield _pack(_fpayloadsize, 0)
564 yield _pack(_fpayloadsize, 0)
542
565
543 def _payloadchunks(self):
566 def _payloadchunks(self):
544 """yield chunks of a the part payload
567 """yield chunks of a the part payload
545
568
546 Exists to handle the different methods to provide data to a part."""
569 Exists to handle the different methods to provide data to a part."""
547 # we only support fixed size data now.
570 # we only support fixed size data now.
548 # This will be improved in the future.
571 # This will be improved in the future.
549 if util.safehasattr(self.data, 'next'):
572 if util.safehasattr(self.data, 'next'):
550 buff = util.chunkbuffer(self.data)
573 buff = util.chunkbuffer(self.data)
551 chunk = buff.read(preferedchunksize)
574 chunk = buff.read(preferedchunksize)
552 while chunk:
575 while chunk:
553 yield chunk
576 yield chunk
554 chunk = buff.read(preferedchunksize)
577 chunk = buff.read(preferedchunksize)
555 elif len(self.data):
578 elif len(self.data):
556 yield self.data
579 yield self.data
557
580
558 class unbundlepart(unpackermixin):
581 class unbundlepart(unpackermixin):
559 """a bundle part read from a bundle"""
582 """a bundle part read from a bundle"""
560
583
561 def __init__(self, ui, header, fp):
584 def __init__(self, ui, header, fp):
562 super(unbundlepart, self).__init__(fp)
585 super(unbundlepart, self).__init__(fp)
563 self.ui = ui
586 self.ui = ui
564 # unbundle state attr
587 # unbundle state attr
565 self._headerdata = header
588 self._headerdata = header
566 self._headeroffset = 0
589 self._headeroffset = 0
567 self._initialized = False
590 self._initialized = False
568 self.consumed = False
591 self.consumed = False
569 # part data
592 # part data
570 self.id = None
593 self.id = None
571 self.type = None
594 self.type = None
572 self.mandatoryparams = None
595 self.mandatoryparams = None
573 self.advisoryparams = None
596 self.advisoryparams = None
574 self._payloadstream = None
597 self._payloadstream = None
575 self._readheader()
598 self._readheader()
576
599
577 def _fromheader(self, size):
600 def _fromheader(self, size):
578 """return the next <size> byte from the header"""
601 """return the next <size> byte from the header"""
579 offset = self._headeroffset
602 offset = self._headeroffset
580 data = self._headerdata[offset:(offset + size)]
603 data = self._headerdata[offset:(offset + size)]
581 self._headeroffset = offset + size
604 self._headeroffset = offset + size
582 return data
605 return data
583
606
584 def _unpackheader(self, format):
607 def _unpackheader(self, format):
585 """read given format from header
608 """read given format from header
586
609
587 This automatically compute the size of the format to read."""
610 This automatically compute the size of the format to read."""
588 data = self._fromheader(struct.calcsize(format))
611 data = self._fromheader(struct.calcsize(format))
589 return _unpack(format, data)
612 return _unpack(format, data)
590
613
591 def _readheader(self):
614 def _readheader(self):
592 """read the header and setup the object"""
615 """read the header and setup the object"""
593 typesize = self._unpackheader(_fparttypesize)[0]
616 typesize = self._unpackheader(_fparttypesize)[0]
594 self.type = self._fromheader(typesize)
617 self.type = self._fromheader(typesize)
595 self.ui.debug('part type: "%s"\n' % self.type)
618 self.ui.debug('part type: "%s"\n' % self.type)
596 self.id = self._unpackheader(_fpartid)[0]
619 self.id = self._unpackheader(_fpartid)[0]
597 self.ui.debug('part id: "%s"\n' % self.id)
620 self.ui.debug('part id: "%s"\n' % self.id)
598 ## reading parameters
621 ## reading parameters
599 # param count
622 # param count
600 mancount, advcount = self._unpackheader(_fpartparamcount)
623 mancount, advcount = self._unpackheader(_fpartparamcount)
601 self.ui.debug('part parameters: %i\n' % (mancount + advcount))
624 self.ui.debug('part parameters: %i\n' % (mancount + advcount))
602 # param size
625 # param size
603 fparamsizes = _makefpartparamsizes(mancount + advcount)
626 fparamsizes = _makefpartparamsizes(mancount + advcount)
604 paramsizes = self._unpackheader(fparamsizes)
627 paramsizes = self._unpackheader(fparamsizes)
605 # make it a list of couple again
628 # make it a list of couple again
606 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
629 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
607 # split mandatory from advisory
630 # split mandatory from advisory
608 mansizes = paramsizes[:mancount]
631 mansizes = paramsizes[:mancount]
609 advsizes = paramsizes[mancount:]
632 advsizes = paramsizes[mancount:]
610 # retrive param value
633 # retrive param value
611 manparams = []
634 manparams = []
612 for key, value in mansizes:
635 for key, value in mansizes:
613 manparams.append((self._fromheader(key), self._fromheader(value)))
636 manparams.append((self._fromheader(key), self._fromheader(value)))
614 advparams = []
637 advparams = []
615 for key, value in advsizes:
638 for key, value in advsizes:
616 advparams.append((self._fromheader(key), self._fromheader(value)))
639 advparams.append((self._fromheader(key), self._fromheader(value)))
617 self.mandatoryparams = manparams
640 self.mandatoryparams = manparams
618 self.advisoryparams = advparams
641 self.advisoryparams = advparams
619 ## part payload
642 ## part payload
620 def payloadchunks():
643 def payloadchunks():
621 payloadsize = self._unpack(_fpayloadsize)[0]
644 payloadsize = self._unpack(_fpayloadsize)[0]
622 self.ui.debug('payload chunk size: %i\n' % payloadsize)
645 self.ui.debug('payload chunk size: %i\n' % payloadsize)
623 while payloadsize:
646 while payloadsize:
624 yield self._readexact(payloadsize)
647 yield self._readexact(payloadsize)
625 payloadsize = self._unpack(_fpayloadsize)[0]
648 payloadsize = self._unpack(_fpayloadsize)[0]
626 self.ui.debug('payload chunk size: %i\n' % payloadsize)
649 self.ui.debug('payload chunk size: %i\n' % payloadsize)
627 self._payloadstream = util.chunkbuffer(payloadchunks())
650 self._payloadstream = util.chunkbuffer(payloadchunks())
628 # we read the data, tell it
651 # we read the data, tell it
629 self._initialized = True
652 self._initialized = True
630
653
631 def read(self, size=None):
654 def read(self, size=None):
632 """read payload data"""
655 """read payload data"""
633 if not self._initialized:
656 if not self._initialized:
634 self._readheader()
657 self._readheader()
635 if size is None:
658 if size is None:
636 data = self._payloadstream.read()
659 data = self._payloadstream.read()
637 else:
660 else:
638 data = self._payloadstream.read(size)
661 data = self._payloadstream.read(size)
639 if size is None or len(data) < size:
662 if size is None or len(data) < size:
640 self.consumed = True
663 self.consumed = True
641 return data
664 return data
642
665
643
666
644 @parthandler('changegroup')
667 @parthandler('changegroup')
645 def handlechangegroup(op, inpart):
668 def handlechangegroup(op, inpart):
646 """apply a changegroup part on the repo
669 """apply a changegroup part on the repo
647
670
648 This is a very early implementation that will massive rework before being
671 This is a very early implementation that will massive rework before being
649 inflicted to any end-user.
672 inflicted to any end-user.
650 """
673 """
651 # Make sure we trigger a transaction creation
674 # Make sure we trigger a transaction creation
652 #
675 #
653 # The addchangegroup function will get a transaction object by itself, but
676 # The addchangegroup function will get a transaction object by itself, but
654 # we need to make sure we trigger the creation of a transaction object used
677 # we need to make sure we trigger the creation of a transaction object used
655 # for the whole processing scope.
678 # for the whole processing scope.
656 op.gettransaction()
679 op.gettransaction()
657 cg = changegroup.unbundle10(inpart, 'UN')
680 cg = changegroup.unbundle10(inpart, 'UN')
658 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
681 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
659 op.records.add('changegroup', {'return': ret})
682 op.records.add('changegroup', {'return': ret})
660 if op.reply is not None:
683 if op.reply is not None:
661 # This is definitly not the final form of this
684 # This is definitly not the final form of this
662 # return. But one need to start somewhere.
685 # return. But one need to start somewhere.
663 part = bundlepart('reply:changegroup', (),
686 part = bundlepart('reply:changegroup', (),
664 [('in-reply-to', str(inpart.id)),
687 [('in-reply-to', str(inpart.id)),
665 ('return', '%i' % ret)])
688 ('return', '%i' % ret)])
666 op.reply.addpart(part)
689 op.reply.addpart(part)
667 assert not inpart.read()
690 assert not inpart.read()
668
691
669 @parthandler('reply:changegroup')
692 @parthandler('reply:changegroup')
670 def handlechangegroup(op, inpart):
693 def handlechangegroup(op, inpart):
671 p = dict(inpart.advisoryparams)
694 p = dict(inpart.advisoryparams)
672 ret = int(p['return'])
695 ret = int(p['return'])
673 op.records.add('changegroup', {'return': ret}, int(p['in-reply-to']))
696 op.records.add('changegroup', {'return': ret}, int(p['in-reply-to']))
674
697
675 @parthandler('check:heads')
698 @parthandler('check:heads')
676 def handlechangegroup(op, inpart):
699 def handlechangegroup(op, inpart):
677 """check that head of the repo did not change
700 """check that head of the repo did not change
678
701
679 This is used to detect a push race when using unbundle.
702 This is used to detect a push race when using unbundle.
680 This replaces the "heads" argument of unbundle."""
703 This replaces the "heads" argument of unbundle."""
681 h = inpart.read(20)
704 h = inpart.read(20)
682 heads = []
705 heads = []
683 while len(h) == 20:
706 while len(h) == 20:
684 heads.append(h)
707 heads.append(h)
685 h = inpart.read(20)
708 h = inpart.read(20)
686 assert not h
709 assert not h
687 if heads != op.repo.heads():
710 if heads != op.repo.heads():
688 raise exchange.PushRaced()
711 raise exchange.PushRaced()
689
712
690 @parthandler('output')
713 @parthandler('output')
691 def handleoutput(op, inpart):
714 def handleoutput(op, inpart):
692 """forward output captured on the server to the client"""
715 """forward output captured on the server to the client"""
693 for line in inpart.read().splitlines():
716 for line in inpart.read().splitlines():
694 op.ui.write(('remote: %s\n' % line))
717 op.ui.write(('remote: %s\n' % line))
695
718
696 @parthandler('replycaps')
719 @parthandler('replycaps')
697 def handlereplycaps(op, inpart):
720 def handlereplycaps(op, inpart):
698 """Notify that a reply bundle should be created
721 """Notify that a reply bundle should be created
699
722
700 The part payload is a list of capabilities (one per line)
723 The payload contains the capabilities information for the reply"""
701 Capabilities may have values using a line of form::
724 caps = decodecaps(inpart.read())
702
703 capability=value1,value2,value3
704
705 The value are alway a list."""
706 caps = {}
707 for line in inpart.read().splitlines():
708 if not line:
709 continue
710 if '=' not in line:
711 key, vals = line, ()
712 else:
713 key, vals = line.split('=', 1)
714 vals = vals.split(',')
715 key = urllib.unquote(key)
716 vals = [urllib.unquote(v) for v in vals]
717 caps[key] = vals
718 if op.reply is None:
725 if op.reply is None:
719 op.reply = bundle20(op.ui, caps)
726 op.reply = bundle20(op.ui, caps)
720
727
General Comments 0
You need to be logged in to leave comments. Login now