##// END OF EJS Templates
bundle2: adds a capabilities attribute on bundler20...
Pierre-Yves David -
r21134:2f8c4fa2 default
parent child Browse files
Show More
@@ -1,702 +1,703
1 # bundle2.py - generic container format to transmit arbitrary data.
1 # bundle2.py - generic container format to transmit arbitrary data.
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7 """Handling of the new bundle2 format
7 """Handling of the new bundle2 format
8
8
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 payloads in an application agnostic way. It consist in a sequence of "parts"
10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 that will be handed to and processed by the application layer.
11 that will be handed to and processed by the application layer.
12
12
13
13
14 General format architecture
14 General format architecture
15 ===========================
15 ===========================
16
16
17 The format is architectured as follow
17 The format is architectured as follow
18
18
19 - magic string
19 - magic string
20 - stream level parameters
20 - stream level parameters
21 - payload parts (any number)
21 - payload parts (any number)
22 - end of stream marker.
22 - end of stream marker.
23
23
24 the Binary format
24 the Binary format
25 ============================
25 ============================
26
26
27 All numbers are unsigned and big-endian.
27 All numbers are unsigned and big-endian.
28
28
29 stream level parameters
29 stream level parameters
30 ------------------------
30 ------------------------
31
31
32 Binary format is as follow
32 Binary format is as follow
33
33
34 :params size: (16 bits integer)
34 :params size: (16 bits integer)
35
35
36 The total number of Bytes used by the parameters
36 The total number of Bytes used by the parameters
37
37
38 :params value: arbitrary number of Bytes
38 :params value: arbitrary number of Bytes
39
39
40 A blob of `params size` containing the serialized version of all stream level
40 A blob of `params size` containing the serialized version of all stream level
41 parameters.
41 parameters.
42
42
43 The blob contains a space separated list of parameters. Parameters with value
43 The blob contains a space separated list of parameters. Parameters with value
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45
45
46 Empty name are obviously forbidden.
46 Empty name are obviously forbidden.
47
47
48 Name MUST start with a letter. If this first letter is lower case, the
48 Name MUST start with a letter. If this first letter is lower case, the
49 parameter is advisory and can be safely ignored. However when the first
49 parameter is advisory and can be safely ignored. However when the first
50 letter is capital, the parameter is mandatory and the bundling process MUST
50 letter is capital, the parameter is mandatory and the bundling process MUST
51 stop if he is not able to proceed it.
51 stop if he is not able to proceed it.
52
52
53 Stream parameters use a simple textual format for two main reasons:
53 Stream parameters use a simple textual format for two main reasons:
54
54
55 - Stream level parameters should remain simple and we want to discourage any
55 - Stream level parameters should remain simple and we want to discourage any
56 crazy usage.
56 crazy usage.
57 - Textual data allow easy human inspection of a bundle2 header in case of
57 - Textual data allow easy human inspection of a bundle2 header in case of
58 troubles.
58 troubles.
59
59
60 Any Applicative level options MUST go into a bundle2 part instead.
60 Any Applicative level options MUST go into a bundle2 part instead.
61
61
62 Payload part
62 Payload part
63 ------------------------
63 ------------------------
64
64
65 Binary format is as follow
65 Binary format is as follow
66
66
67 :header size: (16 bits inter)
67 :header size: (16 bits inter)
68
68
69 The total number of Bytes used by the part headers. When the header is empty
69 The total number of Bytes used by the part headers. When the header is empty
70 (size = 0) this is interpreted as the end of stream marker.
70 (size = 0) this is interpreted as the end of stream marker.
71
71
72 :header:
72 :header:
73
73
74 The header defines how to interpret the part. It contains two piece of
74 The header defines how to interpret the part. It contains two piece of
75 data: the part type, and the part parameters.
75 data: the part type, and the part parameters.
76
76
77 The part type is used to route an application level handler, that can
77 The part type is used to route an application level handler, that can
78 interpret payload.
78 interpret payload.
79
79
80 Part parameters are passed to the application level handler. They are
80 Part parameters are passed to the application level handler. They are
81 meant to convey information that will help the application level object to
81 meant to convey information that will help the application level object to
82 interpret the part payload.
82 interpret the part payload.
83
83
84 The binary format of the header is has follow
84 The binary format of the header is has follow
85
85
86 :typesize: (one byte)
86 :typesize: (one byte)
87
87
88 :parttype: alphanumerical part name
88 :parttype: alphanumerical part name
89
89
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 to this part.
91 to this part.
92
92
93 :parameters:
93 :parameters:
94
94
95 Part's parameter may have arbitrary content, the binary structure is::
95 Part's parameter may have arbitrary content, the binary structure is::
96
96
97 <mandatory-count><advisory-count><param-sizes><param-data>
97 <mandatory-count><advisory-count><param-sizes><param-data>
98
98
99 :mandatory-count: 1 byte, number of mandatory parameters
99 :mandatory-count: 1 byte, number of mandatory parameters
100
100
101 :advisory-count: 1 byte, number of advisory parameters
101 :advisory-count: 1 byte, number of advisory parameters
102
102
103 :param-sizes:
103 :param-sizes:
104
104
105 N couple of bytes, where N is the total number of parameters. Each
105 N couple of bytes, where N is the total number of parameters. Each
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107
107
108 :param-data:
108 :param-data:
109
109
110 A blob of bytes from which each parameter key and value can be
110 A blob of bytes from which each parameter key and value can be
111 retrieved using the list of size couples stored in the previous
111 retrieved using the list of size couples stored in the previous
112 field.
112 field.
113
113
114 Mandatory parameters comes first, then the advisory ones.
114 Mandatory parameters comes first, then the advisory ones.
115
115
116 :payload:
116 :payload:
117
117
118 payload is a series of `<chunksize><chunkdata>`.
118 payload is a series of `<chunksize><chunkdata>`.
119
119
120 `chunksize` is a 32 bits integer, `chunkdata` are plain bytes (as much as
120 `chunksize` is a 32 bits integer, `chunkdata` are plain bytes (as much as
121 `chunksize` says)` The payload part is concluded by a zero size chunk.
121 `chunksize` says)` The payload part is concluded by a zero size chunk.
122
122
123 The current implementation always produces either zero or one chunk.
123 The current implementation always produces either zero or one chunk.
124 This is an implementation limitation that will ultimately be lifted.
124 This is an implementation limitation that will ultimately be lifted.
125
125
126 Bundle processing
126 Bundle processing
127 ============================
127 ============================
128
128
129 Each part is processed in order using a "part handler". Handler are registered
129 Each part is processed in order using a "part handler". Handler are registered
130 for a certain part type.
130 for a certain part type.
131
131
132 The matching of a part to its handler is case insensitive. The case of the
132 The matching of a part to its handler is case insensitive. The case of the
133 part type is used to know if a part is mandatory or advisory. If the Part type
133 part type is used to know if a part is mandatory or advisory. If the Part type
134 contains any uppercase char it is considered mandatory. When no handler is
134 contains any uppercase char it is considered mandatory. When no handler is
135 known for a Mandatory part, the process is aborted and an exception is raised.
135 known for a Mandatory part, the process is aborted and an exception is raised.
136 If the part is advisory and no handler is known, the part is ignored. When the
136 If the part is advisory and no handler is known, the part is ignored. When the
137 process is aborted, the full bundle is still read from the stream to keep the
137 process is aborted, the full bundle is still read from the stream to keep the
138 channel usable. But none of the part read from an abort are processed. In the
138 channel usable. But none of the part read from an abort are processed. In the
139 future, dropping the stream may become an option for channel we do not care to
139 future, dropping the stream may become an option for channel we do not care to
140 preserve.
140 preserve.
141 """
141 """
142
142
143 import util
143 import util
144 import struct
144 import struct
145 import urllib
145 import urllib
146 import string
146 import string
147
147
148 import changegroup
148 import changegroup
149 from i18n import _
149 from i18n import _
150
150
151 _pack = struct.pack
151 _pack = struct.pack
152 _unpack = struct.unpack
152 _unpack = struct.unpack
153
153
154 _magicstring = 'HG20'
154 _magicstring = 'HG20'
155
155
156 _fstreamparamsize = '>H'
156 _fstreamparamsize = '>H'
157 _fpartheadersize = '>H'
157 _fpartheadersize = '>H'
158 _fparttypesize = '>B'
158 _fparttypesize = '>B'
159 _fpartid = '>I'
159 _fpartid = '>I'
160 _fpayloadsize = '>I'
160 _fpayloadsize = '>I'
161 _fpartparamcount = '>BB'
161 _fpartparamcount = '>BB'
162
162
163 preferedchunksize = 4096
163 preferedchunksize = 4096
164
164
165 def _makefpartparamsizes(nbparams):
165 def _makefpartparamsizes(nbparams):
166 """return a struct format to read part parameter sizes
166 """return a struct format to read part parameter sizes
167
167
168 The number parameters is variable so we need to build that format
168 The number parameters is variable so we need to build that format
169 dynamically.
169 dynamically.
170 """
170 """
171 return '>'+('BB'*nbparams)
171 return '>'+('BB'*nbparams)
172
172
173 parthandlermapping = {}
173 parthandlermapping = {}
174
174
175 def parthandler(parttype):
175 def parthandler(parttype):
176 """decorator that register a function as a bundle2 part handler
176 """decorator that register a function as a bundle2 part handler
177
177
178 eg::
178 eg::
179
179
180 @parthandler('myparttype')
180 @parthandler('myparttype')
181 def myparttypehandler(...):
181 def myparttypehandler(...):
182 '''process a part of type "my part".'''
182 '''process a part of type "my part".'''
183 ...
183 ...
184 """
184 """
185 def _decorator(func):
185 def _decorator(func):
186 lparttype = parttype.lower() # enforce lower case matching.
186 lparttype = parttype.lower() # enforce lower case matching.
187 assert lparttype not in parthandlermapping
187 assert lparttype not in parthandlermapping
188 parthandlermapping[lparttype] = func
188 parthandlermapping[lparttype] = func
189 return func
189 return func
190 return _decorator
190 return _decorator
191
191
192 class unbundlerecords(object):
192 class unbundlerecords(object):
193 """keep record of what happens during and unbundle
193 """keep record of what happens during and unbundle
194
194
195 New records are added using `records.add('cat', obj)`. Where 'cat' is a
195 New records are added using `records.add('cat', obj)`. Where 'cat' is a
196 category of record and obj is an arbitrary object.
196 category of record and obj is an arbitrary object.
197
197
198 `records['cat']` will return all entries of this category 'cat'.
198 `records['cat']` will return all entries of this category 'cat'.
199
199
200 Iterating on the object itself will yield `('category', obj)` tuples
200 Iterating on the object itself will yield `('category', obj)` tuples
201 for all entries.
201 for all entries.
202
202
203 All iterations happens in chronological order.
203 All iterations happens in chronological order.
204 """
204 """
205
205
206 def __init__(self):
206 def __init__(self):
207 self._categories = {}
207 self._categories = {}
208 self._sequences = []
208 self._sequences = []
209 self._replies = {}
209 self._replies = {}
210
210
211 def add(self, category, entry, inreplyto=None):
211 def add(self, category, entry, inreplyto=None):
212 """add a new record of a given category.
212 """add a new record of a given category.
213
213
214 The entry can then be retrieved in the list returned by
214 The entry can then be retrieved in the list returned by
215 self['category']."""
215 self['category']."""
216 self._categories.setdefault(category, []).append(entry)
216 self._categories.setdefault(category, []).append(entry)
217 self._sequences.append((category, entry))
217 self._sequences.append((category, entry))
218 if inreplyto is not None:
218 if inreplyto is not None:
219 self.getreplies(inreplyto).add(category, entry)
219 self.getreplies(inreplyto).add(category, entry)
220
220
221 def getreplies(self, partid):
221 def getreplies(self, partid):
222 """get the subrecords that replies to a specific part"""
222 """get the subrecords that replies to a specific part"""
223 return self._replies.setdefault(partid, unbundlerecords())
223 return self._replies.setdefault(partid, unbundlerecords())
224
224
225 def __getitem__(self, cat):
225 def __getitem__(self, cat):
226 return tuple(self._categories.get(cat, ()))
226 return tuple(self._categories.get(cat, ()))
227
227
228 def __iter__(self):
228 def __iter__(self):
229 return iter(self._sequences)
229 return iter(self._sequences)
230
230
231 def __len__(self):
231 def __len__(self):
232 return len(self._sequences)
232 return len(self._sequences)
233
233
234 def __nonzero__(self):
234 def __nonzero__(self):
235 return bool(self._sequences)
235 return bool(self._sequences)
236
236
237 class bundleoperation(object):
237 class bundleoperation(object):
238 """an object that represents a single bundling process
238 """an object that represents a single bundling process
239
239
240 Its purpose is to carry unbundle-related objects and states.
240 Its purpose is to carry unbundle-related objects and states.
241
241
242 A new object should be created at the beginning of each bundle processing.
242 A new object should be created at the beginning of each bundle processing.
243 The object is to be returned by the processing function.
243 The object is to be returned by the processing function.
244
244
245 The object has very little content now it will ultimately contain:
245 The object has very little content now it will ultimately contain:
246 * an access to the repo the bundle is applied to,
246 * an access to the repo the bundle is applied to,
247 * a ui object,
247 * a ui object,
248 * a way to retrieve a transaction to add changes to the repo,
248 * a way to retrieve a transaction to add changes to the repo,
249 * a way to record the result of processing each part,
249 * a way to record the result of processing each part,
250 * a way to construct a bundle response when applicable.
250 * a way to construct a bundle response when applicable.
251 """
251 """
252
252
253 def __init__(self, repo, transactiongetter):
253 def __init__(self, repo, transactiongetter):
254 self.repo = repo
254 self.repo = repo
255 self.ui = repo.ui
255 self.ui = repo.ui
256 self.records = unbundlerecords()
256 self.records = unbundlerecords()
257 self.gettransaction = transactiongetter
257 self.gettransaction = transactiongetter
258 self.reply = None
258 self.reply = None
259
259
260 class TransactionUnavailable(RuntimeError):
260 class TransactionUnavailable(RuntimeError):
261 pass
261 pass
262
262
263 def _notransaction():
263 def _notransaction():
264 """default method to get a transaction while processing a bundle
264 """default method to get a transaction while processing a bundle
265
265
266 Raise an exception to highlight the fact that no transaction was expected
266 Raise an exception to highlight the fact that no transaction was expected
267 to be created"""
267 to be created"""
268 raise TransactionUnavailable()
268 raise TransactionUnavailable()
269
269
270 def processbundle(repo, unbundler, transactiongetter=_notransaction):
270 def processbundle(repo, unbundler, transactiongetter=_notransaction):
271 """This function process a bundle, apply effect to/from a repo
271 """This function process a bundle, apply effect to/from a repo
272
272
273 It iterates over each part then searches for and uses the proper handling
273 It iterates over each part then searches for and uses the proper handling
274 code to process the part. Parts are processed in order.
274 code to process the part. Parts are processed in order.
275
275
276 This is very early version of this function that will be strongly reworked
276 This is very early version of this function that will be strongly reworked
277 before final usage.
277 before final usage.
278
278
279 Unknown Mandatory part will abort the process.
279 Unknown Mandatory part will abort the process.
280 """
280 """
281 op = bundleoperation(repo, transactiongetter)
281 op = bundleoperation(repo, transactiongetter)
282 # todo:
282 # todo:
283 # - replace this is a init function soon.
283 # - replace this is a init function soon.
284 # - exception catching
284 # - exception catching
285 unbundler.params
285 unbundler.params
286 iterparts = unbundler.iterparts()
286 iterparts = unbundler.iterparts()
287 part = None
287 part = None
288 try:
288 try:
289 for part in iterparts:
289 for part in iterparts:
290 parttype = part.type
290 parttype = part.type
291 # part key are matched lower case
291 # part key are matched lower case
292 key = parttype.lower()
292 key = parttype.lower()
293 try:
293 try:
294 handler = parthandlermapping[key]
294 handler = parthandlermapping[key]
295 op.ui.debug('found a handler for part %r\n' % parttype)
295 op.ui.debug('found a handler for part %r\n' % parttype)
296 except KeyError:
296 except KeyError:
297 if key != parttype: # mandatory parts
297 if key != parttype: # mandatory parts
298 # todo:
298 # todo:
299 # - use a more precise exception
299 # - use a more precise exception
300 raise
300 raise
301 op.ui.debug('ignoring unknown advisory part %r\n' % key)
301 op.ui.debug('ignoring unknown advisory part %r\n' % key)
302 # consuming the part
302 # consuming the part
303 part.read()
303 part.read()
304 continue
304 continue
305
305
306 # handler is called outside the above try block so that we don't
306 # handler is called outside the above try block so that we don't
307 # risk catching KeyErrors from anything other than the
307 # risk catching KeyErrors from anything other than the
308 # parthandlermapping lookup (any KeyError raised by handler()
308 # parthandlermapping lookup (any KeyError raised by handler()
309 # itself represents a defect of a different variety).
309 # itself represents a defect of a different variety).
310 output = None
310 output = None
311 if op.reply is not None:
311 if op.reply is not None:
312 op.ui.pushbuffer(error=True)
312 op.ui.pushbuffer(error=True)
313 output = ''
313 output = ''
314 try:
314 try:
315 handler(op, part)
315 handler(op, part)
316 finally:
316 finally:
317 if output is not None:
317 if output is not None:
318 output = op.ui.popbuffer()
318 output = op.ui.popbuffer()
319 if output:
319 if output:
320 outpart = bundlepart('output',
320 outpart = bundlepart('output',
321 advisoryparams=[('in-reply-to',
321 advisoryparams=[('in-reply-to',
322 str(part.id))],
322 str(part.id))],
323 data=output)
323 data=output)
324 op.reply.addpart(outpart)
324 op.reply.addpart(outpart)
325 part.read()
325 part.read()
326 except Exception:
326 except Exception:
327 if part is not None:
327 if part is not None:
328 # consume the bundle content
328 # consume the bundle content
329 part.read()
329 part.read()
330 for part in iterparts:
330 for part in iterparts:
331 # consume the bundle content
331 # consume the bundle content
332 part.read()
332 part.read()
333 raise
333 raise
334 return op
334 return op
335
335
336 class bundle20(object):
336 class bundle20(object):
337 """represent an outgoing bundle2 container
337 """represent an outgoing bundle2 container
338
338
339 Use the `addparam` method to add stream level parameter. and `addpart` to
339 Use the `addparam` method to add stream level parameter. and `addpart` to
340 populate it. Then call `getchunks` to retrieve all the binary chunks of
340 populate it. Then call `getchunks` to retrieve all the binary chunks of
341 data that compose the bundle2 container."""
341 data that compose the bundle2 container."""
342
342
343 def __init__(self, ui):
343 def __init__(self, ui, capabilities=()):
344 self.ui = ui
344 self.ui = ui
345 self._params = []
345 self._params = []
346 self._parts = []
346 self._parts = []
347 self.capabilities = set(capabilities)
347
348
348 def addparam(self, name, value=None):
349 def addparam(self, name, value=None):
349 """add a stream level parameter"""
350 """add a stream level parameter"""
350 if not name:
351 if not name:
351 raise ValueError('empty parameter name')
352 raise ValueError('empty parameter name')
352 if name[0] not in string.letters:
353 if name[0] not in string.letters:
353 raise ValueError('non letter first character: %r' % name)
354 raise ValueError('non letter first character: %r' % name)
354 self._params.append((name, value))
355 self._params.append((name, value))
355
356
356 def addpart(self, part):
357 def addpart(self, part):
357 """add a new part to the bundle2 container
358 """add a new part to the bundle2 container
358
359
359 Parts contains the actual applicative payload."""
360 Parts contains the actual applicative payload."""
360 assert part.id is None
361 assert part.id is None
361 part.id = len(self._parts) # very cheap counter
362 part.id = len(self._parts) # very cheap counter
362 self._parts.append(part)
363 self._parts.append(part)
363
364
364 def getchunks(self):
365 def getchunks(self):
365 self.ui.debug('start emission of %s stream\n' % _magicstring)
366 self.ui.debug('start emission of %s stream\n' % _magicstring)
366 yield _magicstring
367 yield _magicstring
367 param = self._paramchunk()
368 param = self._paramchunk()
368 self.ui.debug('bundle parameter: %s\n' % param)
369 self.ui.debug('bundle parameter: %s\n' % param)
369 yield _pack(_fstreamparamsize, len(param))
370 yield _pack(_fstreamparamsize, len(param))
370 if param:
371 if param:
371 yield param
372 yield param
372
373
373 self.ui.debug('start of parts\n')
374 self.ui.debug('start of parts\n')
374 for part in self._parts:
375 for part in self._parts:
375 self.ui.debug('bundle part: "%s"\n' % part.type)
376 self.ui.debug('bundle part: "%s"\n' % part.type)
376 for chunk in part.getchunks():
377 for chunk in part.getchunks():
377 yield chunk
378 yield chunk
378 self.ui.debug('end of bundle\n')
379 self.ui.debug('end of bundle\n')
379 yield '\0\0'
380 yield '\0\0'
380
381
381 def _paramchunk(self):
382 def _paramchunk(self):
382 """return a encoded version of all stream parameters"""
383 """return a encoded version of all stream parameters"""
383 blocks = []
384 blocks = []
384 for par, value in self._params:
385 for par, value in self._params:
385 par = urllib.quote(par)
386 par = urllib.quote(par)
386 if value is not None:
387 if value is not None:
387 value = urllib.quote(value)
388 value = urllib.quote(value)
388 par = '%s=%s' % (par, value)
389 par = '%s=%s' % (par, value)
389 blocks.append(par)
390 blocks.append(par)
390 return ' '.join(blocks)
391 return ' '.join(blocks)
391
392
392 class unpackermixin(object):
393 class unpackermixin(object):
393 """A mixin to extract bytes and struct data from a stream"""
394 """A mixin to extract bytes and struct data from a stream"""
394
395
395 def __init__(self, fp):
396 def __init__(self, fp):
396 self._fp = fp
397 self._fp = fp
397
398
398 def _unpack(self, format):
399 def _unpack(self, format):
399 """unpack this struct format from the stream"""
400 """unpack this struct format from the stream"""
400 data = self._readexact(struct.calcsize(format))
401 data = self._readexact(struct.calcsize(format))
401 return _unpack(format, data)
402 return _unpack(format, data)
402
403
403 def _readexact(self, size):
404 def _readexact(self, size):
404 """read exactly <size> bytes from the stream"""
405 """read exactly <size> bytes from the stream"""
405 return changegroup.readexactly(self._fp, size)
406 return changegroup.readexactly(self._fp, size)
406
407
407
408
408 class unbundle20(unpackermixin):
409 class unbundle20(unpackermixin):
409 """interpret a bundle2 stream
410 """interpret a bundle2 stream
410
411
411 This class is fed with a binary stream and yields parts through its
412 This class is fed with a binary stream and yields parts through its
412 `iterparts` methods."""
413 `iterparts` methods."""
413
414
414 def __init__(self, ui, fp, header=None):
415 def __init__(self, ui, fp, header=None):
415 """If header is specified, we do not read it out of the stream."""
416 """If header is specified, we do not read it out of the stream."""
416 self.ui = ui
417 self.ui = ui
417 super(unbundle20, self).__init__(fp)
418 super(unbundle20, self).__init__(fp)
418 if header is None:
419 if header is None:
419 header = self._readexact(4)
420 header = self._readexact(4)
420 magic, version = header[0:2], header[2:4]
421 magic, version = header[0:2], header[2:4]
421 if magic != 'HG':
422 if magic != 'HG':
422 raise util.Abort(_('not a Mercurial bundle'))
423 raise util.Abort(_('not a Mercurial bundle'))
423 if version != '20':
424 if version != '20':
424 raise util.Abort(_('unknown bundle version %s') % version)
425 raise util.Abort(_('unknown bundle version %s') % version)
425 self.ui.debug('start processing of %s stream\n' % header)
426 self.ui.debug('start processing of %s stream\n' % header)
426
427
427 @util.propertycache
428 @util.propertycache
428 def params(self):
429 def params(self):
429 """dictionary of stream level parameters"""
430 """dictionary of stream level parameters"""
430 self.ui.debug('reading bundle2 stream parameters\n')
431 self.ui.debug('reading bundle2 stream parameters\n')
431 params = {}
432 params = {}
432 paramssize = self._unpack(_fstreamparamsize)[0]
433 paramssize = self._unpack(_fstreamparamsize)[0]
433 if paramssize:
434 if paramssize:
434 for p in self._readexact(paramssize).split(' '):
435 for p in self._readexact(paramssize).split(' '):
435 p = p.split('=', 1)
436 p = p.split('=', 1)
436 p = [urllib.unquote(i) for i in p]
437 p = [urllib.unquote(i) for i in p]
437 if len(p) < 2:
438 if len(p) < 2:
438 p.append(None)
439 p.append(None)
439 self._processparam(*p)
440 self._processparam(*p)
440 params[p[0]] = p[1]
441 params[p[0]] = p[1]
441 return params
442 return params
442
443
443 def _processparam(self, name, value):
444 def _processparam(self, name, value):
444 """process a parameter, applying its effect if needed
445 """process a parameter, applying its effect if needed
445
446
446 Parameter starting with a lower case letter are advisory and will be
447 Parameter starting with a lower case letter are advisory and will be
447 ignored when unknown. Those starting with an upper case letter are
448 ignored when unknown. Those starting with an upper case letter are
448 mandatory and will this function will raise a KeyError when unknown.
449 mandatory and will this function will raise a KeyError when unknown.
449
450
450 Note: no option are currently supported. Any input will be either
451 Note: no option are currently supported. Any input will be either
451 ignored or failing.
452 ignored or failing.
452 """
453 """
453 if not name:
454 if not name:
454 raise ValueError('empty parameter name')
455 raise ValueError('empty parameter name')
455 if name[0] not in string.letters:
456 if name[0] not in string.letters:
456 raise ValueError('non letter first character: %r' % name)
457 raise ValueError('non letter first character: %r' % name)
457 # Some logic will be later added here to try to process the option for
458 # Some logic will be later added here to try to process the option for
458 # a dict of known parameter.
459 # a dict of known parameter.
459 if name[0].islower():
460 if name[0].islower():
460 self.ui.debug("ignoring unknown parameter %r\n" % name)
461 self.ui.debug("ignoring unknown parameter %r\n" % name)
461 else:
462 else:
462 raise KeyError(name)
463 raise KeyError(name)
463
464
464
465
465 def iterparts(self):
466 def iterparts(self):
466 """yield all parts contained in the stream"""
467 """yield all parts contained in the stream"""
467 # make sure param have been loaded
468 # make sure param have been loaded
468 self.params
469 self.params
469 self.ui.debug('start extraction of bundle2 parts\n')
470 self.ui.debug('start extraction of bundle2 parts\n')
470 headerblock = self._readpartheader()
471 headerblock = self._readpartheader()
471 while headerblock is not None:
472 while headerblock is not None:
472 part = unbundlepart(self.ui, headerblock, self._fp)
473 part = unbundlepart(self.ui, headerblock, self._fp)
473 yield part
474 yield part
474 headerblock = self._readpartheader()
475 headerblock = self._readpartheader()
475 self.ui.debug('end of bundle2 stream\n')
476 self.ui.debug('end of bundle2 stream\n')
476
477
477 def _readpartheader(self):
478 def _readpartheader(self):
478 """reads a part header size and return the bytes blob
479 """reads a part header size and return the bytes blob
479
480
480 returns None if empty"""
481 returns None if empty"""
481 headersize = self._unpack(_fpartheadersize)[0]
482 headersize = self._unpack(_fpartheadersize)[0]
482 self.ui.debug('part header size: %i\n' % headersize)
483 self.ui.debug('part header size: %i\n' % headersize)
483 if headersize:
484 if headersize:
484 return self._readexact(headersize)
485 return self._readexact(headersize)
485 return None
486 return None
486
487
487
488
488 class bundlepart(object):
489 class bundlepart(object):
489 """A bundle2 part contains application level payload
490 """A bundle2 part contains application level payload
490
491
491 The part `type` is used to route the part to the application level
492 The part `type` is used to route the part to the application level
492 handler.
493 handler.
493 """
494 """
494
495
495 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
496 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
496 data=''):
497 data=''):
497 self.id = None
498 self.id = None
498 self.type = parttype
499 self.type = parttype
499 self.data = data
500 self.data = data
500 self.mandatoryparams = mandatoryparams
501 self.mandatoryparams = mandatoryparams
501 self.advisoryparams = advisoryparams
502 self.advisoryparams = advisoryparams
502
503
503 def getchunks(self):
504 def getchunks(self):
504 #### header
505 #### header
505 ## parttype
506 ## parttype
506 header = [_pack(_fparttypesize, len(self.type)),
507 header = [_pack(_fparttypesize, len(self.type)),
507 self.type, _pack(_fpartid, self.id),
508 self.type, _pack(_fpartid, self.id),
508 ]
509 ]
509 ## parameters
510 ## parameters
510 # count
511 # count
511 manpar = self.mandatoryparams
512 manpar = self.mandatoryparams
512 advpar = self.advisoryparams
513 advpar = self.advisoryparams
513 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
514 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
514 # size
515 # size
515 parsizes = []
516 parsizes = []
516 for key, value in manpar:
517 for key, value in manpar:
517 parsizes.append(len(key))
518 parsizes.append(len(key))
518 parsizes.append(len(value))
519 parsizes.append(len(value))
519 for key, value in advpar:
520 for key, value in advpar:
520 parsizes.append(len(key))
521 parsizes.append(len(key))
521 parsizes.append(len(value))
522 parsizes.append(len(value))
522 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
523 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
523 header.append(paramsizes)
524 header.append(paramsizes)
524 # key, value
525 # key, value
525 for key, value in manpar:
526 for key, value in manpar:
526 header.append(key)
527 header.append(key)
527 header.append(value)
528 header.append(value)
528 for key, value in advpar:
529 for key, value in advpar:
529 header.append(key)
530 header.append(key)
530 header.append(value)
531 header.append(value)
531 ## finalize header
532 ## finalize header
532 headerchunk = ''.join(header)
533 headerchunk = ''.join(header)
533 yield _pack(_fpartheadersize, len(headerchunk))
534 yield _pack(_fpartheadersize, len(headerchunk))
534 yield headerchunk
535 yield headerchunk
535 ## payload
536 ## payload
536 for chunk in self._payloadchunks():
537 for chunk in self._payloadchunks():
537 yield _pack(_fpayloadsize, len(chunk))
538 yield _pack(_fpayloadsize, len(chunk))
538 yield chunk
539 yield chunk
539 # end of payload
540 # end of payload
540 yield _pack(_fpayloadsize, 0)
541 yield _pack(_fpayloadsize, 0)
541
542
542 def _payloadchunks(self):
543 def _payloadchunks(self):
543 """yield chunks of a the part payload
544 """yield chunks of a the part payload
544
545
545 Exists to handle the different methods to provide data to a part."""
546 Exists to handle the different methods to provide data to a part."""
546 # we only support fixed size data now.
547 # we only support fixed size data now.
547 # This will be improved in the future.
548 # This will be improved in the future.
548 if util.safehasattr(self.data, 'next'):
549 if util.safehasattr(self.data, 'next'):
549 buff = util.chunkbuffer(self.data)
550 buff = util.chunkbuffer(self.data)
550 chunk = buff.read(preferedchunksize)
551 chunk = buff.read(preferedchunksize)
551 while chunk:
552 while chunk:
552 yield chunk
553 yield chunk
553 chunk = buff.read(preferedchunksize)
554 chunk = buff.read(preferedchunksize)
554 elif len(self.data):
555 elif len(self.data):
555 yield self.data
556 yield self.data
556
557
557 class unbundlepart(unpackermixin):
558 class unbundlepart(unpackermixin):
558 """a bundle part read from a bundle"""
559 """a bundle part read from a bundle"""
559
560
560 def __init__(self, ui, header, fp):
561 def __init__(self, ui, header, fp):
561 super(unbundlepart, self).__init__(fp)
562 super(unbundlepart, self).__init__(fp)
562 self.ui = ui
563 self.ui = ui
563 # unbundle state attr
564 # unbundle state attr
564 self._headerdata = header
565 self._headerdata = header
565 self._headeroffset = 0
566 self._headeroffset = 0
566 self._initialized = False
567 self._initialized = False
567 self.consumed = False
568 self.consumed = False
568 # part data
569 # part data
569 self.id = None
570 self.id = None
570 self.type = None
571 self.type = None
571 self.mandatoryparams = None
572 self.mandatoryparams = None
572 self.advisoryparams = None
573 self.advisoryparams = None
573 self._payloadstream = None
574 self._payloadstream = None
574 self._readheader()
575 self._readheader()
575
576
576 def _fromheader(self, size):
577 def _fromheader(self, size):
577 """return the next <size> byte from the header"""
578 """return the next <size> byte from the header"""
578 offset = self._headeroffset
579 offset = self._headeroffset
579 data = self._headerdata[offset:(offset + size)]
580 data = self._headerdata[offset:(offset + size)]
580 self._headeroffset = offset + size
581 self._headeroffset = offset + size
581 return data
582 return data
582
583
583 def _unpackheader(self, format):
584 def _unpackheader(self, format):
584 """read given format from header
585 """read given format from header
585
586
586 This automatically compute the size of the format to read."""
587 This automatically compute the size of the format to read."""
587 data = self._fromheader(struct.calcsize(format))
588 data = self._fromheader(struct.calcsize(format))
588 return _unpack(format, data)
589 return _unpack(format, data)
589
590
590 def _readheader(self):
591 def _readheader(self):
591 """read the header and setup the object"""
592 """read the header and setup the object"""
592 typesize = self._unpackheader(_fparttypesize)[0]
593 typesize = self._unpackheader(_fparttypesize)[0]
593 self.type = self._fromheader(typesize)
594 self.type = self._fromheader(typesize)
594 self.ui.debug('part type: "%s"\n' % self.type)
595 self.ui.debug('part type: "%s"\n' % self.type)
595 self.id = self._unpackheader(_fpartid)[0]
596 self.id = self._unpackheader(_fpartid)[0]
596 self.ui.debug('part id: "%s"\n' % self.id)
597 self.ui.debug('part id: "%s"\n' % self.id)
597 ## reading parameters
598 ## reading parameters
598 # param count
599 # param count
599 mancount, advcount = self._unpackheader(_fpartparamcount)
600 mancount, advcount = self._unpackheader(_fpartparamcount)
600 self.ui.debug('part parameters: %i\n' % (mancount + advcount))
601 self.ui.debug('part parameters: %i\n' % (mancount + advcount))
601 # param size
602 # param size
602 fparamsizes = _makefpartparamsizes(mancount + advcount)
603 fparamsizes = _makefpartparamsizes(mancount + advcount)
603 paramsizes = self._unpackheader(fparamsizes)
604 paramsizes = self._unpackheader(fparamsizes)
604 # make it a list of couple again
605 # make it a list of couple again
605 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
606 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
606 # split mandatory from advisory
607 # split mandatory from advisory
607 mansizes = paramsizes[:mancount]
608 mansizes = paramsizes[:mancount]
608 advsizes = paramsizes[mancount:]
609 advsizes = paramsizes[mancount:]
609 # retrive param value
610 # retrive param value
610 manparams = []
611 manparams = []
611 for key, value in mansizes:
612 for key, value in mansizes:
612 manparams.append((self._fromheader(key), self._fromheader(value)))
613 manparams.append((self._fromheader(key), self._fromheader(value)))
613 advparams = []
614 advparams = []
614 for key, value in advsizes:
615 for key, value in advsizes:
615 advparams.append((self._fromheader(key), self._fromheader(value)))
616 advparams.append((self._fromheader(key), self._fromheader(value)))
616 self.mandatoryparams = manparams
617 self.mandatoryparams = manparams
617 self.advisoryparams = advparams
618 self.advisoryparams = advparams
618 ## part payload
619 ## part payload
619 def payloadchunks():
620 def payloadchunks():
620 payloadsize = self._unpack(_fpayloadsize)[0]
621 payloadsize = self._unpack(_fpayloadsize)[0]
621 self.ui.debug('payload chunk size: %i\n' % payloadsize)
622 self.ui.debug('payload chunk size: %i\n' % payloadsize)
622 while payloadsize:
623 while payloadsize:
623 yield self._readexact(payloadsize)
624 yield self._readexact(payloadsize)
624 payloadsize = self._unpack(_fpayloadsize)[0]
625 payloadsize = self._unpack(_fpayloadsize)[0]
625 self.ui.debug('payload chunk size: %i\n' % payloadsize)
626 self.ui.debug('payload chunk size: %i\n' % payloadsize)
626 self._payloadstream = util.chunkbuffer(payloadchunks())
627 self._payloadstream = util.chunkbuffer(payloadchunks())
627 # we read the data, tell it
628 # we read the data, tell it
628 self._initialized = True
629 self._initialized = True
629
630
630 def read(self, size=None):
631 def read(self, size=None):
631 """read payload data"""
632 """read payload data"""
632 if not self._initialized:
633 if not self._initialized:
633 self._readheader()
634 self._readheader()
634 if size is None:
635 if size is None:
635 data = self._payloadstream.read()
636 data = self._payloadstream.read()
636 else:
637 else:
637 data = self._payloadstream.read(size)
638 data = self._payloadstream.read(size)
638 if size is None or len(data) < size:
639 if size is None or len(data) < size:
639 self.consumed = True
640 self.consumed = True
640 return data
641 return data
641
642
642
643
643 @parthandler('changegroup')
644 @parthandler('changegroup')
644 def handlechangegroup(op, inpart):
645 def handlechangegroup(op, inpart):
645 """apply a changegroup part on the repo
646 """apply a changegroup part on the repo
646
647
647 This is a very early implementation that will massive rework before being
648 This is a very early implementation that will massive rework before being
648 inflicted to any end-user.
649 inflicted to any end-user.
649 """
650 """
650 # Make sure we trigger a transaction creation
651 # Make sure we trigger a transaction creation
651 #
652 #
652 # The addchangegroup function will get a transaction object by itself, but
653 # The addchangegroup function will get a transaction object by itself, but
653 # we need to make sure we trigger the creation of a transaction object used
654 # we need to make sure we trigger the creation of a transaction object used
654 # for the whole processing scope.
655 # for the whole processing scope.
655 op.gettransaction()
656 op.gettransaction()
656 cg = changegroup.unbundle10(inpart, 'UN')
657 cg = changegroup.unbundle10(inpart, 'UN')
657 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
658 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
658 op.records.add('changegroup', {'return': ret})
659 op.records.add('changegroup', {'return': ret})
659 if op.reply is not None:
660 if op.reply is not None:
660 # This is definitly not the final form of this
661 # This is definitly not the final form of this
661 # return. But one need to start somewhere.
662 # return. But one need to start somewhere.
662 part = bundlepart('reply:changegroup', (),
663 part = bundlepart('reply:changegroup', (),
663 [('in-reply-to', str(inpart.id)),
664 [('in-reply-to', str(inpart.id)),
664 ('return', '%i' % ret)])
665 ('return', '%i' % ret)])
665 op.reply.addpart(part)
666 op.reply.addpart(part)
666 assert not inpart.read()
667 assert not inpart.read()
667
668
668 @parthandler('reply:changegroup')
669 @parthandler('reply:changegroup')
669 def handlechangegroup(op, inpart):
670 def handlechangegroup(op, inpart):
670 p = dict(inpart.advisoryparams)
671 p = dict(inpart.advisoryparams)
671 ret = int(p['return'])
672 ret = int(p['return'])
672 op.records.add('changegroup', {'return': ret}, int(p['in-reply-to']))
673 op.records.add('changegroup', {'return': ret}, int(p['in-reply-to']))
673
674
674 @parthandler('check:heads')
675 @parthandler('check:heads')
675 def handlechangegroup(op, inpart):
676 def handlechangegroup(op, inpart):
676 """check that head of the repo did not change
677 """check that head of the repo did not change
677
678
678 This is used to detect a push race when using unbundle.
679 This is used to detect a push race when using unbundle.
679 This replaces the "heads" argument of unbundle."""
680 This replaces the "heads" argument of unbundle."""
680 h = inpart.read(20)
681 h = inpart.read(20)
681 heads = []
682 heads = []
682 while len(h) == 20:
683 while len(h) == 20:
683 heads.append(h)
684 heads.append(h)
684 h = inpart.read(20)
685 h = inpart.read(20)
685 assert not h
686 assert not h
686 if heads != op.repo.heads():
687 if heads != op.repo.heads():
687 raise exchange.PushRaced()
688 raise exchange.PushRaced()
688
689
689 @parthandler('output')
690 @parthandler('output')
690 def handleoutput(op, inpart):
691 def handleoutput(op, inpart):
691 """forward output captured on the server to the client"""
692 """forward output captured on the server to the client"""
692 for line in inpart.read().splitlines():
693 for line in inpart.read().splitlines():
693 op.ui.write(('remote: %s\n' % line))
694 op.ui.write(('remote: %s\n' % line))
694
695
695 @parthandler('replycaps')
696 @parthandler('replycaps')
696 def handlereplycaps(op, inpart):
697 def handlereplycaps(op, inpart):
697 """Notify that a reply bundle should be created
698 """Notify that a reply bundle should be created
698
699
699 Will convey bundle capability at some point too."""
700 Will convey bundle capability at some point too."""
700 if op.reply is None:
701 if op.reply is None:
701 op.reply = bundle20(op.ui)
702 op.reply = bundle20(op.ui)
702
703
General Comments 0
You need to be logged in to leave comments. Login now