##// END OF EJS Templates
bundle2: warn about error during initialization in ``newpart`` docstring...
Pierre-Yves David -
r21602:cc33ae50 default
parent child Browse files
Show More
@@ -1,784 +1,791 b''
1 # bundle2.py - generic container format to transmit arbitrary data.
1 # bundle2.py - generic container format to transmit arbitrary data.
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7 """Handling of the new bundle2 format
7 """Handling of the new bundle2 format
8
8
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 payloads in an application agnostic way. It consist in a sequence of "parts"
10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 that will be handed to and processed by the application layer.
11 that will be handed to and processed by the application layer.
12
12
13
13
14 General format architecture
14 General format architecture
15 ===========================
15 ===========================
16
16
17 The format is architectured as follow
17 The format is architectured as follow
18
18
19 - magic string
19 - magic string
20 - stream level parameters
20 - stream level parameters
21 - payload parts (any number)
21 - payload parts (any number)
22 - end of stream marker.
22 - end of stream marker.
23
23
24 the Binary format
24 the Binary format
25 ============================
25 ============================
26
26
27 All numbers are unsigned and big-endian.
27 All numbers are unsigned and big-endian.
28
28
29 stream level parameters
29 stream level parameters
30 ------------------------
30 ------------------------
31
31
32 Binary format is as follow
32 Binary format is as follow
33
33
34 :params size: (16 bits integer)
34 :params size: (16 bits integer)
35
35
36 The total number of Bytes used by the parameters
36 The total number of Bytes used by the parameters
37
37
38 :params value: arbitrary number of Bytes
38 :params value: arbitrary number of Bytes
39
39
40 A blob of `params size` containing the serialized version of all stream level
40 A blob of `params size` containing the serialized version of all stream level
41 parameters.
41 parameters.
42
42
43 The blob contains a space separated list of parameters. Parameters with value
43 The blob contains a space separated list of parameters. Parameters with value
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45
45
46 Empty name are obviously forbidden.
46 Empty name are obviously forbidden.
47
47
48 Name MUST start with a letter. If this first letter is lower case, the
48 Name MUST start with a letter. If this first letter is lower case, the
49 parameter is advisory and can be safely ignored. However when the first
49 parameter is advisory and can be safely ignored. However when the first
50 letter is capital, the parameter is mandatory and the bundling process MUST
50 letter is capital, the parameter is mandatory and the bundling process MUST
51 stop if he is not able to proceed it.
51 stop if he is not able to proceed it.
52
52
53 Stream parameters use a simple textual format for two main reasons:
53 Stream parameters use a simple textual format for two main reasons:
54
54
55 - Stream level parameters should remain simple and we want to discourage any
55 - Stream level parameters should remain simple and we want to discourage any
56 crazy usage.
56 crazy usage.
57 - Textual data allow easy human inspection of a bundle2 header in case of
57 - Textual data allow easy human inspection of a bundle2 header in case of
58 troubles.
58 troubles.
59
59
60 Any Applicative level options MUST go into a bundle2 part instead.
60 Any Applicative level options MUST go into a bundle2 part instead.
61
61
62 Payload part
62 Payload part
63 ------------------------
63 ------------------------
64
64
65 Binary format is as follow
65 Binary format is as follow
66
66
67 :header size: (16 bits inter)
67 :header size: (16 bits inter)
68
68
69 The total number of Bytes used by the part headers. When the header is empty
69 The total number of Bytes used by the part headers. When the header is empty
70 (size = 0) this is interpreted as the end of stream marker.
70 (size = 0) this is interpreted as the end of stream marker.
71
71
72 :header:
72 :header:
73
73
74 The header defines how to interpret the part. It contains two piece of
74 The header defines how to interpret the part. It contains two piece of
75 data: the part type, and the part parameters.
75 data: the part type, and the part parameters.
76
76
77 The part type is used to route an application level handler, that can
77 The part type is used to route an application level handler, that can
78 interpret payload.
78 interpret payload.
79
79
80 Part parameters are passed to the application level handler. They are
80 Part parameters are passed to the application level handler. They are
81 meant to convey information that will help the application level object to
81 meant to convey information that will help the application level object to
82 interpret the part payload.
82 interpret the part payload.
83
83
84 The binary format of the header is has follow
84 The binary format of the header is has follow
85
85
86 :typesize: (one byte)
86 :typesize: (one byte)
87
87
88 :parttype: alphanumerical part name
88 :parttype: alphanumerical part name
89
89
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 to this part.
91 to this part.
92
92
93 :parameters:
93 :parameters:
94
94
95 Part's parameter may have arbitrary content, the binary structure is::
95 Part's parameter may have arbitrary content, the binary structure is::
96
96
97 <mandatory-count><advisory-count><param-sizes><param-data>
97 <mandatory-count><advisory-count><param-sizes><param-data>
98
98
99 :mandatory-count: 1 byte, number of mandatory parameters
99 :mandatory-count: 1 byte, number of mandatory parameters
100
100
101 :advisory-count: 1 byte, number of advisory parameters
101 :advisory-count: 1 byte, number of advisory parameters
102
102
103 :param-sizes:
103 :param-sizes:
104
104
105 N couple of bytes, where N is the total number of parameters. Each
105 N couple of bytes, where N is the total number of parameters. Each
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107
107
108 :param-data:
108 :param-data:
109
109
110 A blob of bytes from which each parameter key and value can be
110 A blob of bytes from which each parameter key and value can be
111 retrieved using the list of size couples stored in the previous
111 retrieved using the list of size couples stored in the previous
112 field.
112 field.
113
113
114 Mandatory parameters comes first, then the advisory ones.
114 Mandatory parameters comes first, then the advisory ones.
115
115
116 :payload:
116 :payload:
117
117
118 payload is a series of `<chunksize><chunkdata>`.
118 payload is a series of `<chunksize><chunkdata>`.
119
119
120 `chunksize` is a 32 bits integer, `chunkdata` are plain bytes (as much as
120 `chunksize` is a 32 bits integer, `chunkdata` are plain bytes (as much as
121 `chunksize` says)` The payload part is concluded by a zero size chunk.
121 `chunksize` says)` The payload part is concluded by a zero size chunk.
122
122
123 The current implementation always produces either zero or one chunk.
123 The current implementation always produces either zero or one chunk.
124 This is an implementation limitation that will ultimately be lifted.
124 This is an implementation limitation that will ultimately be lifted.
125
125
126 Bundle processing
126 Bundle processing
127 ============================
127 ============================
128
128
129 Each part is processed in order using a "part handler". Handler are registered
129 Each part is processed in order using a "part handler". Handler are registered
130 for a certain part type.
130 for a certain part type.
131
131
132 The matching of a part to its handler is case insensitive. The case of the
132 The matching of a part to its handler is case insensitive. The case of the
133 part type is used to know if a part is mandatory or advisory. If the Part type
133 part type is used to know if a part is mandatory or advisory. If the Part type
134 contains any uppercase char it is considered mandatory. When no handler is
134 contains any uppercase char it is considered mandatory. When no handler is
135 known for a Mandatory part, the process is aborted and an exception is raised.
135 known for a Mandatory part, the process is aborted and an exception is raised.
136 If the part is advisory and no handler is known, the part is ignored. When the
136 If the part is advisory and no handler is known, the part is ignored. When the
137 process is aborted, the full bundle is still read from the stream to keep the
137 process is aborted, the full bundle is still read from the stream to keep the
138 channel usable. But none of the part read from an abort are processed. In the
138 channel usable. But none of the part read from an abort are processed. In the
139 future, dropping the stream may become an option for channel we do not care to
139 future, dropping the stream may become an option for channel we do not care to
140 preserve.
140 preserve.
141 """
141 """
142
142
143 import util
143 import util
144 import struct
144 import struct
145 import urllib
145 import urllib
146 import string
146 import string
147
147
148 import changegroup, error
148 import changegroup, error
149 from i18n import _
149 from i18n import _
150
150
151 _pack = struct.pack
151 _pack = struct.pack
152 _unpack = struct.unpack
152 _unpack = struct.unpack
153
153
154 _magicstring = 'HG2X'
154 _magicstring = 'HG2X'
155
155
156 _fstreamparamsize = '>H'
156 _fstreamparamsize = '>H'
157 _fpartheadersize = '>H'
157 _fpartheadersize = '>H'
158 _fparttypesize = '>B'
158 _fparttypesize = '>B'
159 _fpartid = '>I'
159 _fpartid = '>I'
160 _fpayloadsize = '>I'
160 _fpayloadsize = '>I'
161 _fpartparamcount = '>BB'
161 _fpartparamcount = '>BB'
162
162
163 preferedchunksize = 4096
163 preferedchunksize = 4096
164
164
165 def _makefpartparamsizes(nbparams):
165 def _makefpartparamsizes(nbparams):
166 """return a struct format to read part parameter sizes
166 """return a struct format to read part parameter sizes
167
167
168 The number parameters is variable so we need to build that format
168 The number parameters is variable so we need to build that format
169 dynamically.
169 dynamically.
170 """
170 """
171 return '>'+('BB'*nbparams)
171 return '>'+('BB'*nbparams)
172
172
173 class UnknownPartError(KeyError):
173 class UnknownPartError(KeyError):
174 """error raised when no handler is found for a Mandatory part"""
174 """error raised when no handler is found for a Mandatory part"""
175 pass
175 pass
176
176
177 parthandlermapping = {}
177 parthandlermapping = {}
178
178
179 def parthandler(parttype):
179 def parthandler(parttype):
180 """decorator that register a function as a bundle2 part handler
180 """decorator that register a function as a bundle2 part handler
181
181
182 eg::
182 eg::
183
183
184 @parthandler('myparttype')
184 @parthandler('myparttype')
185 def myparttypehandler(...):
185 def myparttypehandler(...):
186 '''process a part of type "my part".'''
186 '''process a part of type "my part".'''
187 ...
187 ...
188 """
188 """
189 def _decorator(func):
189 def _decorator(func):
190 lparttype = parttype.lower() # enforce lower case matching.
190 lparttype = parttype.lower() # enforce lower case matching.
191 assert lparttype not in parthandlermapping
191 assert lparttype not in parthandlermapping
192 parthandlermapping[lparttype] = func
192 parthandlermapping[lparttype] = func
193 return func
193 return func
194 return _decorator
194 return _decorator
195
195
196 class unbundlerecords(object):
196 class unbundlerecords(object):
197 """keep record of what happens during and unbundle
197 """keep record of what happens during and unbundle
198
198
199 New records are added using `records.add('cat', obj)`. Where 'cat' is a
199 New records are added using `records.add('cat', obj)`. Where 'cat' is a
200 category of record and obj is an arbitrary object.
200 category of record and obj is an arbitrary object.
201
201
202 `records['cat']` will return all entries of this category 'cat'.
202 `records['cat']` will return all entries of this category 'cat'.
203
203
204 Iterating on the object itself will yield `('category', obj)` tuples
204 Iterating on the object itself will yield `('category', obj)` tuples
205 for all entries.
205 for all entries.
206
206
207 All iterations happens in chronological order.
207 All iterations happens in chronological order.
208 """
208 """
209
209
210 def __init__(self):
210 def __init__(self):
211 self._categories = {}
211 self._categories = {}
212 self._sequences = []
212 self._sequences = []
213 self._replies = {}
213 self._replies = {}
214
214
215 def add(self, category, entry, inreplyto=None):
215 def add(self, category, entry, inreplyto=None):
216 """add a new record of a given category.
216 """add a new record of a given category.
217
217
218 The entry can then be retrieved in the list returned by
218 The entry can then be retrieved in the list returned by
219 self['category']."""
219 self['category']."""
220 self._categories.setdefault(category, []).append(entry)
220 self._categories.setdefault(category, []).append(entry)
221 self._sequences.append((category, entry))
221 self._sequences.append((category, entry))
222 if inreplyto is not None:
222 if inreplyto is not None:
223 self.getreplies(inreplyto).add(category, entry)
223 self.getreplies(inreplyto).add(category, entry)
224
224
225 def getreplies(self, partid):
225 def getreplies(self, partid):
226 """get the subrecords that replies to a specific part"""
226 """get the subrecords that replies to a specific part"""
227 return self._replies.setdefault(partid, unbundlerecords())
227 return self._replies.setdefault(partid, unbundlerecords())
228
228
229 def __getitem__(self, cat):
229 def __getitem__(self, cat):
230 return tuple(self._categories.get(cat, ()))
230 return tuple(self._categories.get(cat, ()))
231
231
232 def __iter__(self):
232 def __iter__(self):
233 return iter(self._sequences)
233 return iter(self._sequences)
234
234
235 def __len__(self):
235 def __len__(self):
236 return len(self._sequences)
236 return len(self._sequences)
237
237
238 def __nonzero__(self):
238 def __nonzero__(self):
239 return bool(self._sequences)
239 return bool(self._sequences)
240
240
241 class bundleoperation(object):
241 class bundleoperation(object):
242 """an object that represents a single bundling process
242 """an object that represents a single bundling process
243
243
244 Its purpose is to carry unbundle-related objects and states.
244 Its purpose is to carry unbundle-related objects and states.
245
245
246 A new object should be created at the beginning of each bundle processing.
246 A new object should be created at the beginning of each bundle processing.
247 The object is to be returned by the processing function.
247 The object is to be returned by the processing function.
248
248
249 The object has very little content now it will ultimately contain:
249 The object has very little content now it will ultimately contain:
250 * an access to the repo the bundle is applied to,
250 * an access to the repo the bundle is applied to,
251 * a ui object,
251 * a ui object,
252 * a way to retrieve a transaction to add changes to the repo,
252 * a way to retrieve a transaction to add changes to the repo,
253 * a way to record the result of processing each part,
253 * a way to record the result of processing each part,
254 * a way to construct a bundle response when applicable.
254 * a way to construct a bundle response when applicable.
255 """
255 """
256
256
257 def __init__(self, repo, transactiongetter):
257 def __init__(self, repo, transactiongetter):
258 self.repo = repo
258 self.repo = repo
259 self.ui = repo.ui
259 self.ui = repo.ui
260 self.records = unbundlerecords()
260 self.records = unbundlerecords()
261 self.gettransaction = transactiongetter
261 self.gettransaction = transactiongetter
262 self.reply = None
262 self.reply = None
263
263
264 class TransactionUnavailable(RuntimeError):
264 class TransactionUnavailable(RuntimeError):
265 pass
265 pass
266
266
267 def _notransaction():
267 def _notransaction():
268 """default method to get a transaction while processing a bundle
268 """default method to get a transaction while processing a bundle
269
269
270 Raise an exception to highlight the fact that no transaction was expected
270 Raise an exception to highlight the fact that no transaction was expected
271 to be created"""
271 to be created"""
272 raise TransactionUnavailable()
272 raise TransactionUnavailable()
273
273
274 def processbundle(repo, unbundler, transactiongetter=_notransaction):
274 def processbundle(repo, unbundler, transactiongetter=_notransaction):
275 """This function process a bundle, apply effect to/from a repo
275 """This function process a bundle, apply effect to/from a repo
276
276
277 It iterates over each part then searches for and uses the proper handling
277 It iterates over each part then searches for and uses the proper handling
278 code to process the part. Parts are processed in order.
278 code to process the part. Parts are processed in order.
279
279
280 This is very early version of this function that will be strongly reworked
280 This is very early version of this function that will be strongly reworked
281 before final usage.
281 before final usage.
282
282
283 Unknown Mandatory part will abort the process.
283 Unknown Mandatory part will abort the process.
284 """
284 """
285 op = bundleoperation(repo, transactiongetter)
285 op = bundleoperation(repo, transactiongetter)
286 # todo:
286 # todo:
287 # - replace this is a init function soon.
287 # - replace this is a init function soon.
288 # - exception catching
288 # - exception catching
289 unbundler.params
289 unbundler.params
290 iterparts = unbundler.iterparts()
290 iterparts = unbundler.iterparts()
291 part = None
291 part = None
292 try:
292 try:
293 for part in iterparts:
293 for part in iterparts:
294 parttype = part.type
294 parttype = part.type
295 # part key are matched lower case
295 # part key are matched lower case
296 key = parttype.lower()
296 key = parttype.lower()
297 try:
297 try:
298 handler = parthandlermapping[key]
298 handler = parthandlermapping[key]
299 op.ui.debug('found a handler for part %r\n' % parttype)
299 op.ui.debug('found a handler for part %r\n' % parttype)
300 except KeyError:
300 except KeyError:
301 if key != parttype: # mandatory parts
301 if key != parttype: # mandatory parts
302 # todo:
302 # todo:
303 # - use a more precise exception
303 # - use a more precise exception
304 raise UnknownPartError(key)
304 raise UnknownPartError(key)
305 op.ui.debug('ignoring unknown advisory part %r\n' % key)
305 op.ui.debug('ignoring unknown advisory part %r\n' % key)
306 # consuming the part
306 # consuming the part
307 part.read()
307 part.read()
308 continue
308 continue
309
309
310 # handler is called outside the above try block so that we don't
310 # handler is called outside the above try block so that we don't
311 # risk catching KeyErrors from anything other than the
311 # risk catching KeyErrors from anything other than the
312 # parthandlermapping lookup (any KeyError raised by handler()
312 # parthandlermapping lookup (any KeyError raised by handler()
313 # itself represents a defect of a different variety).
313 # itself represents a defect of a different variety).
314 output = None
314 output = None
315 if op.reply is not None:
315 if op.reply is not None:
316 op.ui.pushbuffer(error=True)
316 op.ui.pushbuffer(error=True)
317 output = ''
317 output = ''
318 try:
318 try:
319 handler(op, part)
319 handler(op, part)
320 finally:
320 finally:
321 if output is not None:
321 if output is not None:
322 output = op.ui.popbuffer()
322 output = op.ui.popbuffer()
323 if output:
323 if output:
324 op.reply.newpart('b2x:output',
324 op.reply.newpart('b2x:output',
325 advisoryparams=[('in-reply-to',
325 advisoryparams=[('in-reply-to',
326 str(part.id))],
326 str(part.id))],
327 data=output)
327 data=output)
328 part.read()
328 part.read()
329 except Exception, exc:
329 except Exception, exc:
330 if part is not None:
330 if part is not None:
331 # consume the bundle content
331 # consume the bundle content
332 part.read()
332 part.read()
333 for part in iterparts:
333 for part in iterparts:
334 # consume the bundle content
334 # consume the bundle content
335 part.read()
335 part.read()
336 # Small hack to let caller code distinguish exceptions from bundle2
336 # Small hack to let caller code distinguish exceptions from bundle2
337 # processing fron the ones from bundle1 processing. This is mostly
337 # processing fron the ones from bundle1 processing. This is mostly
338 # needed to handle different return codes to unbundle according to the
338 # needed to handle different return codes to unbundle according to the
339 # type of bundle. We should probably clean up or drop this return code
339 # type of bundle. We should probably clean up or drop this return code
340 # craziness in a future version.
340 # craziness in a future version.
341 exc.duringunbundle2 = True
341 exc.duringunbundle2 = True
342 raise
342 raise
343 return op
343 return op
344
344
345 def decodecaps(blob):
345 def decodecaps(blob):
346 """decode a bundle2 caps bytes blob into a dictionnary
346 """decode a bundle2 caps bytes blob into a dictionnary
347
347
348 The blob is a list of capabilities (one per line)
348 The blob is a list of capabilities (one per line)
349 Capabilities may have values using a line of the form::
349 Capabilities may have values using a line of the form::
350
350
351 capability=value1,value2,value3
351 capability=value1,value2,value3
352
352
353 The values are always a list."""
353 The values are always a list."""
354 caps = {}
354 caps = {}
355 for line in blob.splitlines():
355 for line in blob.splitlines():
356 if not line:
356 if not line:
357 continue
357 continue
358 if '=' not in line:
358 if '=' not in line:
359 key, vals = line, ()
359 key, vals = line, ()
360 else:
360 else:
361 key, vals = line.split('=', 1)
361 key, vals = line.split('=', 1)
362 vals = vals.split(',')
362 vals = vals.split(',')
363 key = urllib.unquote(key)
363 key = urllib.unquote(key)
364 vals = [urllib.unquote(v) for v in vals]
364 vals = [urllib.unquote(v) for v in vals]
365 caps[key] = vals
365 caps[key] = vals
366 return caps
366 return caps
367
367
368 def encodecaps(caps):
368 def encodecaps(caps):
369 """encode a bundle2 caps dictionary into a bytes blob"""
369 """encode a bundle2 caps dictionary into a bytes blob"""
370 chunks = []
370 chunks = []
371 for ca in sorted(caps):
371 for ca in sorted(caps):
372 vals = caps[ca]
372 vals = caps[ca]
373 ca = urllib.quote(ca)
373 ca = urllib.quote(ca)
374 vals = [urllib.quote(v) for v in vals]
374 vals = [urllib.quote(v) for v in vals]
375 if vals:
375 if vals:
376 ca = "%s=%s" % (ca, ','.join(vals))
376 ca = "%s=%s" % (ca, ','.join(vals))
377 chunks.append(ca)
377 chunks.append(ca)
378 return '\n'.join(chunks)
378 return '\n'.join(chunks)
379
379
380 class bundle20(object):
380 class bundle20(object):
381 """represent an outgoing bundle2 container
381 """represent an outgoing bundle2 container
382
382
383 Use the `addparam` method to add stream level parameter. and `newpart` to
383 Use the `addparam` method to add stream level parameter. and `newpart` to
384 populate it. Then call `getchunks` to retrieve all the binary chunks of
384 populate it. Then call `getchunks` to retrieve all the binary chunks of
385 data that compose the bundle2 container."""
385 data that compose the bundle2 container."""
386
386
387 def __init__(self, ui, capabilities=()):
387 def __init__(self, ui, capabilities=()):
388 self.ui = ui
388 self.ui = ui
389 self._params = []
389 self._params = []
390 self._parts = []
390 self._parts = []
391 self.capabilities = dict(capabilities)
391 self.capabilities = dict(capabilities)
392
392
393 # methods used to defines the bundle2 content
393 # methods used to defines the bundle2 content
394 def addparam(self, name, value=None):
394 def addparam(self, name, value=None):
395 """add a stream level parameter"""
395 """add a stream level parameter"""
396 if not name:
396 if not name:
397 raise ValueError('empty parameter name')
397 raise ValueError('empty parameter name')
398 if name[0] not in string.letters:
398 if name[0] not in string.letters:
399 raise ValueError('non letter first character: %r' % name)
399 raise ValueError('non letter first character: %r' % name)
400 self._params.append((name, value))
400 self._params.append((name, value))
401
401
402 def addpart(self, part):
402 def addpart(self, part):
403 """add a new part to the bundle2 container
403 """add a new part to the bundle2 container
404
404
405 Parts contains the actual applicative payload."""
405 Parts contains the actual applicative payload."""
406 assert part.id is None
406 assert part.id is None
407 part.id = len(self._parts) # very cheap counter
407 part.id = len(self._parts) # very cheap counter
408 self._parts.append(part)
408 self._parts.append(part)
409
409
410 def newpart(self, typeid, *args, **kwargs):
410 def newpart(self, typeid, *args, **kwargs):
411 """create a new part and add it to the containers"""
411 """create a new part and add it to the containers
412
413 As the part is directly added to the containers. For now, this means
414 that any failure to properly initialize the part after calling
415 ``newpart`` should result in a failure of the whole bundling process.
416
417 You can still fall back to manually create and add if you need better
418 control."""
412 part = bundlepart(typeid, *args, **kwargs)
419 part = bundlepart(typeid, *args, **kwargs)
413 self.addpart(part)
420 self.addpart(part)
414 return part
421 return part
415
422
416 # methods used to generate the bundle2 stream
423 # methods used to generate the bundle2 stream
417 def getchunks(self):
424 def getchunks(self):
418 self.ui.debug('start emission of %s stream\n' % _magicstring)
425 self.ui.debug('start emission of %s stream\n' % _magicstring)
419 yield _magicstring
426 yield _magicstring
420 param = self._paramchunk()
427 param = self._paramchunk()
421 self.ui.debug('bundle parameter: %s\n' % param)
428 self.ui.debug('bundle parameter: %s\n' % param)
422 yield _pack(_fstreamparamsize, len(param))
429 yield _pack(_fstreamparamsize, len(param))
423 if param:
430 if param:
424 yield param
431 yield param
425
432
426 self.ui.debug('start of parts\n')
433 self.ui.debug('start of parts\n')
427 for part in self._parts:
434 for part in self._parts:
428 self.ui.debug('bundle part: "%s"\n' % part.type)
435 self.ui.debug('bundle part: "%s"\n' % part.type)
429 for chunk in part.getchunks():
436 for chunk in part.getchunks():
430 yield chunk
437 yield chunk
431 self.ui.debug('end of bundle\n')
438 self.ui.debug('end of bundle\n')
432 yield '\0\0'
439 yield '\0\0'
433
440
434 def _paramchunk(self):
441 def _paramchunk(self):
435 """return a encoded version of all stream parameters"""
442 """return a encoded version of all stream parameters"""
436 blocks = []
443 blocks = []
437 for par, value in self._params:
444 for par, value in self._params:
438 par = urllib.quote(par)
445 par = urllib.quote(par)
439 if value is not None:
446 if value is not None:
440 value = urllib.quote(value)
447 value = urllib.quote(value)
441 par = '%s=%s' % (par, value)
448 par = '%s=%s' % (par, value)
442 blocks.append(par)
449 blocks.append(par)
443 return ' '.join(blocks)
450 return ' '.join(blocks)
444
451
445 class unpackermixin(object):
452 class unpackermixin(object):
446 """A mixin to extract bytes and struct data from a stream"""
453 """A mixin to extract bytes and struct data from a stream"""
447
454
448 def __init__(self, fp):
455 def __init__(self, fp):
449 self._fp = fp
456 self._fp = fp
450
457
451 def _unpack(self, format):
458 def _unpack(self, format):
452 """unpack this struct format from the stream"""
459 """unpack this struct format from the stream"""
453 data = self._readexact(struct.calcsize(format))
460 data = self._readexact(struct.calcsize(format))
454 return _unpack(format, data)
461 return _unpack(format, data)
455
462
456 def _readexact(self, size):
463 def _readexact(self, size):
457 """read exactly <size> bytes from the stream"""
464 """read exactly <size> bytes from the stream"""
458 return changegroup.readexactly(self._fp, size)
465 return changegroup.readexactly(self._fp, size)
459
466
460
467
461 class unbundle20(unpackermixin):
468 class unbundle20(unpackermixin):
462 """interpret a bundle2 stream
469 """interpret a bundle2 stream
463
470
464 This class is fed with a binary stream and yields parts through its
471 This class is fed with a binary stream and yields parts through its
465 `iterparts` methods."""
472 `iterparts` methods."""
466
473
467 def __init__(self, ui, fp, header=None):
474 def __init__(self, ui, fp, header=None):
468 """If header is specified, we do not read it out of the stream."""
475 """If header is specified, we do not read it out of the stream."""
469 self.ui = ui
476 self.ui = ui
470 super(unbundle20, self).__init__(fp)
477 super(unbundle20, self).__init__(fp)
471 if header is None:
478 if header is None:
472 header = self._readexact(4)
479 header = self._readexact(4)
473 magic, version = header[0:2], header[2:4]
480 magic, version = header[0:2], header[2:4]
474 if magic != 'HG':
481 if magic != 'HG':
475 raise util.Abort(_('not a Mercurial bundle'))
482 raise util.Abort(_('not a Mercurial bundle'))
476 if version != '2X':
483 if version != '2X':
477 raise util.Abort(_('unknown bundle version %s') % version)
484 raise util.Abort(_('unknown bundle version %s') % version)
478 self.ui.debug('start processing of %s stream\n' % header)
485 self.ui.debug('start processing of %s stream\n' % header)
479
486
480 @util.propertycache
487 @util.propertycache
481 def params(self):
488 def params(self):
482 """dictionary of stream level parameters"""
489 """dictionary of stream level parameters"""
483 self.ui.debug('reading bundle2 stream parameters\n')
490 self.ui.debug('reading bundle2 stream parameters\n')
484 params = {}
491 params = {}
485 paramssize = self._unpack(_fstreamparamsize)[0]
492 paramssize = self._unpack(_fstreamparamsize)[0]
486 if paramssize:
493 if paramssize:
487 for p in self._readexact(paramssize).split(' '):
494 for p in self._readexact(paramssize).split(' '):
488 p = p.split('=', 1)
495 p = p.split('=', 1)
489 p = [urllib.unquote(i) for i in p]
496 p = [urllib.unquote(i) for i in p]
490 if len(p) < 2:
497 if len(p) < 2:
491 p.append(None)
498 p.append(None)
492 self._processparam(*p)
499 self._processparam(*p)
493 params[p[0]] = p[1]
500 params[p[0]] = p[1]
494 return params
501 return params
495
502
496 def _processparam(self, name, value):
503 def _processparam(self, name, value):
497 """process a parameter, applying its effect if needed
504 """process a parameter, applying its effect if needed
498
505
499 Parameter starting with a lower case letter are advisory and will be
506 Parameter starting with a lower case letter are advisory and will be
500 ignored when unknown. Those starting with an upper case letter are
507 ignored when unknown. Those starting with an upper case letter are
501 mandatory and will this function will raise a KeyError when unknown.
508 mandatory and will this function will raise a KeyError when unknown.
502
509
503 Note: no option are currently supported. Any input will be either
510 Note: no option are currently supported. Any input will be either
504 ignored or failing.
511 ignored or failing.
505 """
512 """
506 if not name:
513 if not name:
507 raise ValueError('empty parameter name')
514 raise ValueError('empty parameter name')
508 if name[0] not in string.letters:
515 if name[0] not in string.letters:
509 raise ValueError('non letter first character: %r' % name)
516 raise ValueError('non letter first character: %r' % name)
510 # Some logic will be later added here to try to process the option for
517 # Some logic will be later added here to try to process the option for
511 # a dict of known parameter.
518 # a dict of known parameter.
512 if name[0].islower():
519 if name[0].islower():
513 self.ui.debug("ignoring unknown parameter %r\n" % name)
520 self.ui.debug("ignoring unknown parameter %r\n" % name)
514 else:
521 else:
515 raise KeyError(name)
522 raise KeyError(name)
516
523
517
524
518 def iterparts(self):
525 def iterparts(self):
519 """yield all parts contained in the stream"""
526 """yield all parts contained in the stream"""
520 # make sure param have been loaded
527 # make sure param have been loaded
521 self.params
528 self.params
522 self.ui.debug('start extraction of bundle2 parts\n')
529 self.ui.debug('start extraction of bundle2 parts\n')
523 headerblock = self._readpartheader()
530 headerblock = self._readpartheader()
524 while headerblock is not None:
531 while headerblock is not None:
525 part = unbundlepart(self.ui, headerblock, self._fp)
532 part = unbundlepart(self.ui, headerblock, self._fp)
526 yield part
533 yield part
527 headerblock = self._readpartheader()
534 headerblock = self._readpartheader()
528 self.ui.debug('end of bundle2 stream\n')
535 self.ui.debug('end of bundle2 stream\n')
529
536
530 def _readpartheader(self):
537 def _readpartheader(self):
531 """reads a part header size and return the bytes blob
538 """reads a part header size and return the bytes blob
532
539
533 returns None if empty"""
540 returns None if empty"""
534 headersize = self._unpack(_fpartheadersize)[0]
541 headersize = self._unpack(_fpartheadersize)[0]
535 self.ui.debug('part header size: %i\n' % headersize)
542 self.ui.debug('part header size: %i\n' % headersize)
536 if headersize:
543 if headersize:
537 return self._readexact(headersize)
544 return self._readexact(headersize)
538 return None
545 return None
539
546
540
547
541 class bundlepart(object):
548 class bundlepart(object):
542 """A bundle2 part contains application level payload
549 """A bundle2 part contains application level payload
543
550
544 The part `type` is used to route the part to the application level
551 The part `type` is used to route the part to the application level
545 handler.
552 handler.
546 """
553 """
547
554
548 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
555 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
549 data=''):
556 data=''):
550 self.id = None
557 self.id = None
551 self.type = parttype
558 self.type = parttype
552 self.data = data
559 self.data = data
553 self.mandatoryparams = mandatoryparams
560 self.mandatoryparams = mandatoryparams
554 self.advisoryparams = advisoryparams
561 self.advisoryparams = advisoryparams
555 # status of the part's generation:
562 # status of the part's generation:
556 # - None: not started,
563 # - None: not started,
557 # - False: currently generated,
564 # - False: currently generated,
558 # - True: generation done.
565 # - True: generation done.
559 self._generated = None
566 self._generated = None
560
567
561 # methods used to generates the bundle2 stream
568 # methods used to generates the bundle2 stream
562 def getchunks(self):
569 def getchunks(self):
563 if self._generated is not None:
570 if self._generated is not None:
564 raise RuntimeError('part can only be consumed once')
571 raise RuntimeError('part can only be consumed once')
565 self._generated = False
572 self._generated = False
566 #### header
573 #### header
567 ## parttype
574 ## parttype
568 header = [_pack(_fparttypesize, len(self.type)),
575 header = [_pack(_fparttypesize, len(self.type)),
569 self.type, _pack(_fpartid, self.id),
576 self.type, _pack(_fpartid, self.id),
570 ]
577 ]
571 ## parameters
578 ## parameters
572 # count
579 # count
573 manpar = self.mandatoryparams
580 manpar = self.mandatoryparams
574 advpar = self.advisoryparams
581 advpar = self.advisoryparams
575 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
582 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
576 # size
583 # size
577 parsizes = []
584 parsizes = []
578 for key, value in manpar:
585 for key, value in manpar:
579 parsizes.append(len(key))
586 parsizes.append(len(key))
580 parsizes.append(len(value))
587 parsizes.append(len(value))
581 for key, value in advpar:
588 for key, value in advpar:
582 parsizes.append(len(key))
589 parsizes.append(len(key))
583 parsizes.append(len(value))
590 parsizes.append(len(value))
584 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
591 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
585 header.append(paramsizes)
592 header.append(paramsizes)
586 # key, value
593 # key, value
587 for key, value in manpar:
594 for key, value in manpar:
588 header.append(key)
595 header.append(key)
589 header.append(value)
596 header.append(value)
590 for key, value in advpar:
597 for key, value in advpar:
591 header.append(key)
598 header.append(key)
592 header.append(value)
599 header.append(value)
593 ## finalize header
600 ## finalize header
594 headerchunk = ''.join(header)
601 headerchunk = ''.join(header)
595 yield _pack(_fpartheadersize, len(headerchunk))
602 yield _pack(_fpartheadersize, len(headerchunk))
596 yield headerchunk
603 yield headerchunk
597 ## payload
604 ## payload
598 for chunk in self._payloadchunks():
605 for chunk in self._payloadchunks():
599 yield _pack(_fpayloadsize, len(chunk))
606 yield _pack(_fpayloadsize, len(chunk))
600 yield chunk
607 yield chunk
601 # end of payload
608 # end of payload
602 yield _pack(_fpayloadsize, 0)
609 yield _pack(_fpayloadsize, 0)
603 self._generated = True
610 self._generated = True
604
611
605 def _payloadchunks(self):
612 def _payloadchunks(self):
606 """yield chunks of a the part payload
613 """yield chunks of a the part payload
607
614
608 Exists to handle the different methods to provide data to a part."""
615 Exists to handle the different methods to provide data to a part."""
609 # we only support fixed size data now.
616 # we only support fixed size data now.
610 # This will be improved in the future.
617 # This will be improved in the future.
611 if util.safehasattr(self.data, 'next'):
618 if util.safehasattr(self.data, 'next'):
612 buff = util.chunkbuffer(self.data)
619 buff = util.chunkbuffer(self.data)
613 chunk = buff.read(preferedchunksize)
620 chunk = buff.read(preferedchunksize)
614 while chunk:
621 while chunk:
615 yield chunk
622 yield chunk
616 chunk = buff.read(preferedchunksize)
623 chunk = buff.read(preferedchunksize)
617 elif len(self.data):
624 elif len(self.data):
618 yield self.data
625 yield self.data
619
626
620 class unbundlepart(unpackermixin):
627 class unbundlepart(unpackermixin):
621 """a bundle part read from a bundle"""
628 """a bundle part read from a bundle"""
622
629
623 def __init__(self, ui, header, fp):
630 def __init__(self, ui, header, fp):
624 super(unbundlepart, self).__init__(fp)
631 super(unbundlepart, self).__init__(fp)
625 self.ui = ui
632 self.ui = ui
626 # unbundle state attr
633 # unbundle state attr
627 self._headerdata = header
634 self._headerdata = header
628 self._headeroffset = 0
635 self._headeroffset = 0
629 self._initialized = False
636 self._initialized = False
630 self.consumed = False
637 self.consumed = False
631 # part data
638 # part data
632 self.id = None
639 self.id = None
633 self.type = None
640 self.type = None
634 self.mandatoryparams = None
641 self.mandatoryparams = None
635 self.advisoryparams = None
642 self.advisoryparams = None
636 self._payloadstream = None
643 self._payloadstream = None
637 self._readheader()
644 self._readheader()
638
645
639 def _fromheader(self, size):
646 def _fromheader(self, size):
640 """return the next <size> byte from the header"""
647 """return the next <size> byte from the header"""
641 offset = self._headeroffset
648 offset = self._headeroffset
642 data = self._headerdata[offset:(offset + size)]
649 data = self._headerdata[offset:(offset + size)]
643 self._headeroffset = offset + size
650 self._headeroffset = offset + size
644 return data
651 return data
645
652
646 def _unpackheader(self, format):
653 def _unpackheader(self, format):
647 """read given format from header
654 """read given format from header
648
655
649 This automatically compute the size of the format to read."""
656 This automatically compute the size of the format to read."""
650 data = self._fromheader(struct.calcsize(format))
657 data = self._fromheader(struct.calcsize(format))
651 return _unpack(format, data)
658 return _unpack(format, data)
652
659
653 def _readheader(self):
660 def _readheader(self):
654 """read the header and setup the object"""
661 """read the header and setup the object"""
655 typesize = self._unpackheader(_fparttypesize)[0]
662 typesize = self._unpackheader(_fparttypesize)[0]
656 self.type = self._fromheader(typesize)
663 self.type = self._fromheader(typesize)
657 self.ui.debug('part type: "%s"\n' % self.type)
664 self.ui.debug('part type: "%s"\n' % self.type)
658 self.id = self._unpackheader(_fpartid)[0]
665 self.id = self._unpackheader(_fpartid)[0]
659 self.ui.debug('part id: "%s"\n' % self.id)
666 self.ui.debug('part id: "%s"\n' % self.id)
660 ## reading parameters
667 ## reading parameters
661 # param count
668 # param count
662 mancount, advcount = self._unpackheader(_fpartparamcount)
669 mancount, advcount = self._unpackheader(_fpartparamcount)
663 self.ui.debug('part parameters: %i\n' % (mancount + advcount))
670 self.ui.debug('part parameters: %i\n' % (mancount + advcount))
664 # param size
671 # param size
665 fparamsizes = _makefpartparamsizes(mancount + advcount)
672 fparamsizes = _makefpartparamsizes(mancount + advcount)
666 paramsizes = self._unpackheader(fparamsizes)
673 paramsizes = self._unpackheader(fparamsizes)
667 # make it a list of couple again
674 # make it a list of couple again
668 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
675 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
669 # split mandatory from advisory
676 # split mandatory from advisory
670 mansizes = paramsizes[:mancount]
677 mansizes = paramsizes[:mancount]
671 advsizes = paramsizes[mancount:]
678 advsizes = paramsizes[mancount:]
672 # retrive param value
679 # retrive param value
673 manparams = []
680 manparams = []
674 for key, value in mansizes:
681 for key, value in mansizes:
675 manparams.append((self._fromheader(key), self._fromheader(value)))
682 manparams.append((self._fromheader(key), self._fromheader(value)))
676 advparams = []
683 advparams = []
677 for key, value in advsizes:
684 for key, value in advsizes:
678 advparams.append((self._fromheader(key), self._fromheader(value)))
685 advparams.append((self._fromheader(key), self._fromheader(value)))
679 self.mandatoryparams = manparams
686 self.mandatoryparams = manparams
680 self.advisoryparams = advparams
687 self.advisoryparams = advparams
681 ## part payload
688 ## part payload
682 def payloadchunks():
689 def payloadchunks():
683 payloadsize = self._unpack(_fpayloadsize)[0]
690 payloadsize = self._unpack(_fpayloadsize)[0]
684 self.ui.debug('payload chunk size: %i\n' % payloadsize)
691 self.ui.debug('payload chunk size: %i\n' % payloadsize)
685 while payloadsize:
692 while payloadsize:
686 yield self._readexact(payloadsize)
693 yield self._readexact(payloadsize)
687 payloadsize = self._unpack(_fpayloadsize)[0]
694 payloadsize = self._unpack(_fpayloadsize)[0]
688 self.ui.debug('payload chunk size: %i\n' % payloadsize)
695 self.ui.debug('payload chunk size: %i\n' % payloadsize)
689 self._payloadstream = util.chunkbuffer(payloadchunks())
696 self._payloadstream = util.chunkbuffer(payloadchunks())
690 # we read the data, tell it
697 # we read the data, tell it
691 self._initialized = True
698 self._initialized = True
692
699
693 def read(self, size=None):
700 def read(self, size=None):
694 """read payload data"""
701 """read payload data"""
695 if not self._initialized:
702 if not self._initialized:
696 self._readheader()
703 self._readheader()
697 if size is None:
704 if size is None:
698 data = self._payloadstream.read()
705 data = self._payloadstream.read()
699 else:
706 else:
700 data = self._payloadstream.read(size)
707 data = self._payloadstream.read(size)
701 if size is None or len(data) < size:
708 if size is None or len(data) < size:
702 self.consumed = True
709 self.consumed = True
703 return data
710 return data
704
711
705
712
706 @parthandler('b2x:changegroup')
713 @parthandler('b2x:changegroup')
707 def handlechangegroup(op, inpart):
714 def handlechangegroup(op, inpart):
708 """apply a changegroup part on the repo
715 """apply a changegroup part on the repo
709
716
710 This is a very early implementation that will massive rework before being
717 This is a very early implementation that will massive rework before being
711 inflicted to any end-user.
718 inflicted to any end-user.
712 """
719 """
713 # Make sure we trigger a transaction creation
720 # Make sure we trigger a transaction creation
714 #
721 #
715 # The addchangegroup function will get a transaction object by itself, but
722 # The addchangegroup function will get a transaction object by itself, but
716 # we need to make sure we trigger the creation of a transaction object used
723 # we need to make sure we trigger the creation of a transaction object used
717 # for the whole processing scope.
724 # for the whole processing scope.
718 op.gettransaction()
725 op.gettransaction()
719 cg = changegroup.unbundle10(inpart, 'UN')
726 cg = changegroup.unbundle10(inpart, 'UN')
720 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
727 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
721 op.records.add('changegroup', {'return': ret})
728 op.records.add('changegroup', {'return': ret})
722 if op.reply is not None:
729 if op.reply is not None:
723 # This is definitly not the final form of this
730 # This is definitly not the final form of this
724 # return. But one need to start somewhere.
731 # return. But one need to start somewhere.
725 op.reply.newpart('b2x:reply:changegroup', (),
732 op.reply.newpart('b2x:reply:changegroup', (),
726 [('in-reply-to', str(inpart.id)),
733 [('in-reply-to', str(inpart.id)),
727 ('return', '%i' % ret)])
734 ('return', '%i' % ret)])
728 assert not inpart.read()
735 assert not inpart.read()
729
736
730 @parthandler('b2x:reply:changegroup')
737 @parthandler('b2x:reply:changegroup')
731 def handlechangegroup(op, inpart):
738 def handlechangegroup(op, inpart):
732 p = dict(inpart.advisoryparams)
739 p = dict(inpart.advisoryparams)
733 ret = int(p['return'])
740 ret = int(p['return'])
734 op.records.add('changegroup', {'return': ret}, int(p['in-reply-to']))
741 op.records.add('changegroup', {'return': ret}, int(p['in-reply-to']))
735
742
736 @parthandler('b2x:check:heads')
743 @parthandler('b2x:check:heads')
737 def handlechangegroup(op, inpart):
744 def handlechangegroup(op, inpart):
738 """check that head of the repo did not change
745 """check that head of the repo did not change
739
746
740 This is used to detect a push race when using unbundle.
747 This is used to detect a push race when using unbundle.
741 This replaces the "heads" argument of unbundle."""
748 This replaces the "heads" argument of unbundle."""
742 h = inpart.read(20)
749 h = inpart.read(20)
743 heads = []
750 heads = []
744 while len(h) == 20:
751 while len(h) == 20:
745 heads.append(h)
752 heads.append(h)
746 h = inpart.read(20)
753 h = inpart.read(20)
747 assert not h
754 assert not h
748 if heads != op.repo.heads():
755 if heads != op.repo.heads():
749 raise error.PushRaced('repository changed while pushing - '
756 raise error.PushRaced('repository changed while pushing - '
750 'please try again')
757 'please try again')
751
758
752 @parthandler('b2x:output')
759 @parthandler('b2x:output')
753 def handleoutput(op, inpart):
760 def handleoutput(op, inpart):
754 """forward output captured on the server to the client"""
761 """forward output captured on the server to the client"""
755 for line in inpart.read().splitlines():
762 for line in inpart.read().splitlines():
756 op.ui.write(('remote: %s\n' % line))
763 op.ui.write(('remote: %s\n' % line))
757
764
758 @parthandler('b2x:replycaps')
765 @parthandler('b2x:replycaps')
759 def handlereplycaps(op, inpart):
766 def handlereplycaps(op, inpart):
760 """Notify that a reply bundle should be created
767 """Notify that a reply bundle should be created
761
768
762 The payload contains the capabilities information for the reply"""
769 The payload contains the capabilities information for the reply"""
763 caps = decodecaps(inpart.read())
770 caps = decodecaps(inpart.read())
764 if op.reply is None:
771 if op.reply is None:
765 op.reply = bundle20(op.ui, caps)
772 op.reply = bundle20(op.ui, caps)
766
773
767 @parthandler('b2x:error:abort')
774 @parthandler('b2x:error:abort')
768 def handlereplycaps(op, inpart):
775 def handlereplycaps(op, inpart):
769 """Used to transmit abort error over the wire"""
776 """Used to transmit abort error over the wire"""
770 manargs = dict(inpart.mandatoryparams)
777 manargs = dict(inpart.mandatoryparams)
771 advargs = dict(inpart.advisoryparams)
778 advargs = dict(inpart.advisoryparams)
772 raise util.Abort(manargs['message'], hint=advargs.get('hint'))
779 raise util.Abort(manargs['message'], hint=advargs.get('hint'))
773
780
774 @parthandler('b2x:error:unknownpart')
781 @parthandler('b2x:error:unknownpart')
775 def handlereplycaps(op, inpart):
782 def handlereplycaps(op, inpart):
776 """Used to transmit unknown part error over the wire"""
783 """Used to transmit unknown part error over the wire"""
777 manargs = dict(inpart.mandatoryparams)
784 manargs = dict(inpart.mandatoryparams)
778 raise UnknownPartError(manargs['parttype'])
785 raise UnknownPartError(manargs['parttype'])
779
786
780 @parthandler('b2x:error:pushraced')
787 @parthandler('b2x:error:pushraced')
781 def handlereplycaps(op, inpart):
788 def handlereplycaps(op, inpart):
782 """Used to transmit push race error over the wire"""
789 """Used to transmit push race error over the wire"""
783 manargs = dict(inpart.mandatoryparams)
790 manargs = dict(inpart.mandatoryparams)
784 raise error.ResponseError(_('push failed:'), manargs['message'])
791 raise error.ResponseError(_('push failed:'), manargs['message'])
General Comments 0
You need to be logged in to leave comments. Login now