##// END OF EJS Templates
bundle2: have ``newpart`` automatically add the part to the bundle...
Pierre-Yves David -
r21599:57cd844d default
parent child Browse files
Show More
@@ -1,775 +1,775
1 # bundle2.py - generic container format to transmit arbitrary data.
1 # bundle2.py - generic container format to transmit arbitrary data.
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7 """Handling of the new bundle2 format
7 """Handling of the new bundle2 format
8
8
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 payloads in an application agnostic way. It consist in a sequence of "parts"
10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 that will be handed to and processed by the application layer.
11 that will be handed to and processed by the application layer.
12
12
13
13
14 General format architecture
14 General format architecture
15 ===========================
15 ===========================
16
16
17 The format is architectured as follow
17 The format is architectured as follow
18
18
19 - magic string
19 - magic string
20 - stream level parameters
20 - stream level parameters
21 - payload parts (any number)
21 - payload parts (any number)
22 - end of stream marker.
22 - end of stream marker.
23
23
24 the Binary format
24 the Binary format
25 ============================
25 ============================
26
26
27 All numbers are unsigned and big-endian.
27 All numbers are unsigned and big-endian.
28
28
29 stream level parameters
29 stream level parameters
30 ------------------------
30 ------------------------
31
31
32 Binary format is as follow
32 Binary format is as follow
33
33
34 :params size: (16 bits integer)
34 :params size: (16 bits integer)
35
35
36 The total number of Bytes used by the parameters
36 The total number of Bytes used by the parameters
37
37
38 :params value: arbitrary number of Bytes
38 :params value: arbitrary number of Bytes
39
39
40 A blob of `params size` containing the serialized version of all stream level
40 A blob of `params size` containing the serialized version of all stream level
41 parameters.
41 parameters.
42
42
43 The blob contains a space separated list of parameters. Parameters with value
43 The blob contains a space separated list of parameters. Parameters with value
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45
45
46 Empty name are obviously forbidden.
46 Empty name are obviously forbidden.
47
47
48 Name MUST start with a letter. If this first letter is lower case, the
48 Name MUST start with a letter. If this first letter is lower case, the
49 parameter is advisory and can be safely ignored. However when the first
49 parameter is advisory and can be safely ignored. However when the first
50 letter is capital, the parameter is mandatory and the bundling process MUST
50 letter is capital, the parameter is mandatory and the bundling process MUST
51 stop if he is not able to proceed it.
51 stop if he is not able to proceed it.
52
52
53 Stream parameters use a simple textual format for two main reasons:
53 Stream parameters use a simple textual format for two main reasons:
54
54
55 - Stream level parameters should remain simple and we want to discourage any
55 - Stream level parameters should remain simple and we want to discourage any
56 crazy usage.
56 crazy usage.
57 - Textual data allow easy human inspection of a bundle2 header in case of
57 - Textual data allow easy human inspection of a bundle2 header in case of
58 troubles.
58 troubles.
59
59
60 Any Applicative level options MUST go into a bundle2 part instead.
60 Any Applicative level options MUST go into a bundle2 part instead.
61
61
62 Payload part
62 Payload part
63 ------------------------
63 ------------------------
64
64
65 Binary format is as follow
65 Binary format is as follow
66
66
67 :header size: (16 bits inter)
67 :header size: (16 bits inter)
68
68
69 The total number of Bytes used by the part headers. When the header is empty
69 The total number of Bytes used by the part headers. When the header is empty
70 (size = 0) this is interpreted as the end of stream marker.
70 (size = 0) this is interpreted as the end of stream marker.
71
71
72 :header:
72 :header:
73
73
74 The header defines how to interpret the part. It contains two piece of
74 The header defines how to interpret the part. It contains two piece of
75 data: the part type, and the part parameters.
75 data: the part type, and the part parameters.
76
76
77 The part type is used to route an application level handler, that can
77 The part type is used to route an application level handler, that can
78 interpret payload.
78 interpret payload.
79
79
80 Part parameters are passed to the application level handler. They are
80 Part parameters are passed to the application level handler. They are
81 meant to convey information that will help the application level object to
81 meant to convey information that will help the application level object to
82 interpret the part payload.
82 interpret the part payload.
83
83
84 The binary format of the header is has follow
84 The binary format of the header is has follow
85
85
86 :typesize: (one byte)
86 :typesize: (one byte)
87
87
88 :parttype: alphanumerical part name
88 :parttype: alphanumerical part name
89
89
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 to this part.
91 to this part.
92
92
93 :parameters:
93 :parameters:
94
94
95 Part's parameter may have arbitrary content, the binary structure is::
95 Part's parameter may have arbitrary content, the binary structure is::
96
96
97 <mandatory-count><advisory-count><param-sizes><param-data>
97 <mandatory-count><advisory-count><param-sizes><param-data>
98
98
99 :mandatory-count: 1 byte, number of mandatory parameters
99 :mandatory-count: 1 byte, number of mandatory parameters
100
100
101 :advisory-count: 1 byte, number of advisory parameters
101 :advisory-count: 1 byte, number of advisory parameters
102
102
103 :param-sizes:
103 :param-sizes:
104
104
105 N couple of bytes, where N is the total number of parameters. Each
105 N couple of bytes, where N is the total number of parameters. Each
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107
107
108 :param-data:
108 :param-data:
109
109
110 A blob of bytes from which each parameter key and value can be
110 A blob of bytes from which each parameter key and value can be
111 retrieved using the list of size couples stored in the previous
111 retrieved using the list of size couples stored in the previous
112 field.
112 field.
113
113
114 Mandatory parameters comes first, then the advisory ones.
114 Mandatory parameters comes first, then the advisory ones.
115
115
116 :payload:
116 :payload:
117
117
118 payload is a series of `<chunksize><chunkdata>`.
118 payload is a series of `<chunksize><chunkdata>`.
119
119
120 `chunksize` is a 32 bits integer, `chunkdata` are plain bytes (as much as
120 `chunksize` is a 32 bits integer, `chunkdata` are plain bytes (as much as
121 `chunksize` says)` The payload part is concluded by a zero size chunk.
121 `chunksize` says)` The payload part is concluded by a zero size chunk.
122
122
123 The current implementation always produces either zero or one chunk.
123 The current implementation always produces either zero or one chunk.
124 This is an implementation limitation that will ultimately be lifted.
124 This is an implementation limitation that will ultimately be lifted.
125
125
126 Bundle processing
126 Bundle processing
127 ============================
127 ============================
128
128
129 Each part is processed in order using a "part handler". Handler are registered
129 Each part is processed in order using a "part handler". Handler are registered
130 for a certain part type.
130 for a certain part type.
131
131
132 The matching of a part to its handler is case insensitive. The case of the
132 The matching of a part to its handler is case insensitive. The case of the
133 part type is used to know if a part is mandatory or advisory. If the Part type
133 part type is used to know if a part is mandatory or advisory. If the Part type
134 contains any uppercase char it is considered mandatory. When no handler is
134 contains any uppercase char it is considered mandatory. When no handler is
135 known for a Mandatory part, the process is aborted and an exception is raised.
135 known for a Mandatory part, the process is aborted and an exception is raised.
136 If the part is advisory and no handler is known, the part is ignored. When the
136 If the part is advisory and no handler is known, the part is ignored. When the
137 process is aborted, the full bundle is still read from the stream to keep the
137 process is aborted, the full bundle is still read from the stream to keep the
138 channel usable. But none of the part read from an abort are processed. In the
138 channel usable. But none of the part read from an abort are processed. In the
139 future, dropping the stream may become an option for channel we do not care to
139 future, dropping the stream may become an option for channel we do not care to
140 preserve.
140 preserve.
141 """
141 """
142
142
143 import util
143 import util
144 import struct
144 import struct
145 import urllib
145 import urllib
146 import string
146 import string
147
147
148 import changegroup, error
148 import changegroup, error
149 from i18n import _
149 from i18n import _
150
150
151 _pack = struct.pack
151 _pack = struct.pack
152 _unpack = struct.unpack
152 _unpack = struct.unpack
153
153
154 _magicstring = 'HG2X'
154 _magicstring = 'HG2X'
155
155
156 _fstreamparamsize = '>H'
156 _fstreamparamsize = '>H'
157 _fpartheadersize = '>H'
157 _fpartheadersize = '>H'
158 _fparttypesize = '>B'
158 _fparttypesize = '>B'
159 _fpartid = '>I'
159 _fpartid = '>I'
160 _fpayloadsize = '>I'
160 _fpayloadsize = '>I'
161 _fpartparamcount = '>BB'
161 _fpartparamcount = '>BB'
162
162
163 preferedchunksize = 4096
163 preferedchunksize = 4096
164
164
165 def _makefpartparamsizes(nbparams):
165 def _makefpartparamsizes(nbparams):
166 """return a struct format to read part parameter sizes
166 """return a struct format to read part parameter sizes
167
167
168 The number parameters is variable so we need to build that format
168 The number parameters is variable so we need to build that format
169 dynamically.
169 dynamically.
170 """
170 """
171 return '>'+('BB'*nbparams)
171 return '>'+('BB'*nbparams)
172
172
173 class UnknownPartError(KeyError):
173 class UnknownPartError(KeyError):
174 """error raised when no handler is found for a Mandatory part"""
174 """error raised when no handler is found for a Mandatory part"""
175 pass
175 pass
176
176
177 parthandlermapping = {}
177 parthandlermapping = {}
178
178
179 def parthandler(parttype):
179 def parthandler(parttype):
180 """decorator that register a function as a bundle2 part handler
180 """decorator that register a function as a bundle2 part handler
181
181
182 eg::
182 eg::
183
183
184 @parthandler('myparttype')
184 @parthandler('myparttype')
185 def myparttypehandler(...):
185 def myparttypehandler(...):
186 '''process a part of type "my part".'''
186 '''process a part of type "my part".'''
187 ...
187 ...
188 """
188 """
189 def _decorator(func):
189 def _decorator(func):
190 lparttype = parttype.lower() # enforce lower case matching.
190 lparttype = parttype.lower() # enforce lower case matching.
191 assert lparttype not in parthandlermapping
191 assert lparttype not in parthandlermapping
192 parthandlermapping[lparttype] = func
192 parthandlermapping[lparttype] = func
193 return func
193 return func
194 return _decorator
194 return _decorator
195
195
196 class unbundlerecords(object):
196 class unbundlerecords(object):
197 """keep record of what happens during and unbundle
197 """keep record of what happens during and unbundle
198
198
199 New records are added using `records.add('cat', obj)`. Where 'cat' is a
199 New records are added using `records.add('cat', obj)`. Where 'cat' is a
200 category of record and obj is an arbitrary object.
200 category of record and obj is an arbitrary object.
201
201
202 `records['cat']` will return all entries of this category 'cat'.
202 `records['cat']` will return all entries of this category 'cat'.
203
203
204 Iterating on the object itself will yield `('category', obj)` tuples
204 Iterating on the object itself will yield `('category', obj)` tuples
205 for all entries.
205 for all entries.
206
206
207 All iterations happens in chronological order.
207 All iterations happens in chronological order.
208 """
208 """
209
209
210 def __init__(self):
210 def __init__(self):
211 self._categories = {}
211 self._categories = {}
212 self._sequences = []
212 self._sequences = []
213 self._replies = {}
213 self._replies = {}
214
214
215 def add(self, category, entry, inreplyto=None):
215 def add(self, category, entry, inreplyto=None):
216 """add a new record of a given category.
216 """add a new record of a given category.
217
217
218 The entry can then be retrieved in the list returned by
218 The entry can then be retrieved in the list returned by
219 self['category']."""
219 self['category']."""
220 self._categories.setdefault(category, []).append(entry)
220 self._categories.setdefault(category, []).append(entry)
221 self._sequences.append((category, entry))
221 self._sequences.append((category, entry))
222 if inreplyto is not None:
222 if inreplyto is not None:
223 self.getreplies(inreplyto).add(category, entry)
223 self.getreplies(inreplyto).add(category, entry)
224
224
225 def getreplies(self, partid):
225 def getreplies(self, partid):
226 """get the subrecords that replies to a specific part"""
226 """get the subrecords that replies to a specific part"""
227 return self._replies.setdefault(partid, unbundlerecords())
227 return self._replies.setdefault(partid, unbundlerecords())
228
228
229 def __getitem__(self, cat):
229 def __getitem__(self, cat):
230 return tuple(self._categories.get(cat, ()))
230 return tuple(self._categories.get(cat, ()))
231
231
232 def __iter__(self):
232 def __iter__(self):
233 return iter(self._sequences)
233 return iter(self._sequences)
234
234
235 def __len__(self):
235 def __len__(self):
236 return len(self._sequences)
236 return len(self._sequences)
237
237
238 def __nonzero__(self):
238 def __nonzero__(self):
239 return bool(self._sequences)
239 return bool(self._sequences)
240
240
241 class bundleoperation(object):
241 class bundleoperation(object):
242 """an object that represents a single bundling process
242 """an object that represents a single bundling process
243
243
244 Its purpose is to carry unbundle-related objects and states.
244 Its purpose is to carry unbundle-related objects and states.
245
245
246 A new object should be created at the beginning of each bundle processing.
246 A new object should be created at the beginning of each bundle processing.
247 The object is to be returned by the processing function.
247 The object is to be returned by the processing function.
248
248
249 The object has very little content now it will ultimately contain:
249 The object has very little content now it will ultimately contain:
250 * an access to the repo the bundle is applied to,
250 * an access to the repo the bundle is applied to,
251 * a ui object,
251 * a ui object,
252 * a way to retrieve a transaction to add changes to the repo,
252 * a way to retrieve a transaction to add changes to the repo,
253 * a way to record the result of processing each part,
253 * a way to record the result of processing each part,
254 * a way to construct a bundle response when applicable.
254 * a way to construct a bundle response when applicable.
255 """
255 """
256
256
257 def __init__(self, repo, transactiongetter):
257 def __init__(self, repo, transactiongetter):
258 self.repo = repo
258 self.repo = repo
259 self.ui = repo.ui
259 self.ui = repo.ui
260 self.records = unbundlerecords()
260 self.records = unbundlerecords()
261 self.gettransaction = transactiongetter
261 self.gettransaction = transactiongetter
262 self.reply = None
262 self.reply = None
263
263
264 class TransactionUnavailable(RuntimeError):
264 class TransactionUnavailable(RuntimeError):
265 pass
265 pass
266
266
267 def _notransaction():
267 def _notransaction():
268 """default method to get a transaction while processing a bundle
268 """default method to get a transaction while processing a bundle
269
269
270 Raise an exception to highlight the fact that no transaction was expected
270 Raise an exception to highlight the fact that no transaction was expected
271 to be created"""
271 to be created"""
272 raise TransactionUnavailable()
272 raise TransactionUnavailable()
273
273
274 def processbundle(repo, unbundler, transactiongetter=_notransaction):
274 def processbundle(repo, unbundler, transactiongetter=_notransaction):
275 """This function process a bundle, apply effect to/from a repo
275 """This function process a bundle, apply effect to/from a repo
276
276
277 It iterates over each part then searches for and uses the proper handling
277 It iterates over each part then searches for and uses the proper handling
278 code to process the part. Parts are processed in order.
278 code to process the part. Parts are processed in order.
279
279
280 This is very early version of this function that will be strongly reworked
280 This is very early version of this function that will be strongly reworked
281 before final usage.
281 before final usage.
282
282
283 Unknown Mandatory part will abort the process.
283 Unknown Mandatory part will abort the process.
284 """
284 """
285 op = bundleoperation(repo, transactiongetter)
285 op = bundleoperation(repo, transactiongetter)
286 # todo:
286 # todo:
287 # - replace this is a init function soon.
287 # - replace this is a init function soon.
288 # - exception catching
288 # - exception catching
289 unbundler.params
289 unbundler.params
290 iterparts = unbundler.iterparts()
290 iterparts = unbundler.iterparts()
291 part = None
291 part = None
292 try:
292 try:
293 for part in iterparts:
293 for part in iterparts:
294 parttype = part.type
294 parttype = part.type
295 # part key are matched lower case
295 # part key are matched lower case
296 key = parttype.lower()
296 key = parttype.lower()
297 try:
297 try:
298 handler = parthandlermapping[key]
298 handler = parthandlermapping[key]
299 op.ui.debug('found a handler for part %r\n' % parttype)
299 op.ui.debug('found a handler for part %r\n' % parttype)
300 except KeyError:
300 except KeyError:
301 if key != parttype: # mandatory parts
301 if key != parttype: # mandatory parts
302 # todo:
302 # todo:
303 # - use a more precise exception
303 # - use a more precise exception
304 raise UnknownPartError(key)
304 raise UnknownPartError(key)
305 op.ui.debug('ignoring unknown advisory part %r\n' % key)
305 op.ui.debug('ignoring unknown advisory part %r\n' % key)
306 # consuming the part
306 # consuming the part
307 part.read()
307 part.read()
308 continue
308 continue
309
309
310 # handler is called outside the above try block so that we don't
310 # handler is called outside the above try block so that we don't
311 # risk catching KeyErrors from anything other than the
311 # risk catching KeyErrors from anything other than the
312 # parthandlermapping lookup (any KeyError raised by handler()
312 # parthandlermapping lookup (any KeyError raised by handler()
313 # itself represents a defect of a different variety).
313 # itself represents a defect of a different variety).
314 output = None
314 output = None
315 if op.reply is not None:
315 if op.reply is not None:
316 op.ui.pushbuffer(error=True)
316 op.ui.pushbuffer(error=True)
317 output = ''
317 output = ''
318 try:
318 try:
319 handler(op, part)
319 handler(op, part)
320 finally:
320 finally:
321 if output is not None:
321 if output is not None:
322 output = op.ui.popbuffer()
322 output = op.ui.popbuffer()
323 if output:
323 if output:
324 outpart = bundlepart('b2x:output',
324 outpart = bundlepart('b2x:output',
325 advisoryparams=[('in-reply-to',
325 advisoryparams=[('in-reply-to',
326 str(part.id))],
326 str(part.id))],
327 data=output)
327 data=output)
328 op.reply.addpart(outpart)
328 op.reply.addpart(outpart)
329 part.read()
329 part.read()
330 except Exception, exc:
330 except Exception, exc:
331 if part is not None:
331 if part is not None:
332 # consume the bundle content
332 # consume the bundle content
333 part.read()
333 part.read()
334 for part in iterparts:
334 for part in iterparts:
335 # consume the bundle content
335 # consume the bundle content
336 part.read()
336 part.read()
337 # Small hack to let caller code distinguish exceptions from bundle2
337 # Small hack to let caller code distinguish exceptions from bundle2
338 # processing fron the ones from bundle1 processing. This is mostly
338 # processing fron the ones from bundle1 processing. This is mostly
339 # needed to handle different return codes to unbundle according to the
339 # needed to handle different return codes to unbundle according to the
340 # type of bundle. We should probably clean up or drop this return code
340 # type of bundle. We should probably clean up or drop this return code
341 # craziness in a future version.
341 # craziness in a future version.
342 exc.duringunbundle2 = True
342 exc.duringunbundle2 = True
343 raise
343 raise
344 return op
344 return op
345
345
346 def decodecaps(blob):
346 def decodecaps(blob):
347 """decode a bundle2 caps bytes blob into a dictionnary
347 """decode a bundle2 caps bytes blob into a dictionnary
348
348
349 The blob is a list of capabilities (one per line)
349 The blob is a list of capabilities (one per line)
350 Capabilities may have values using a line of the form::
350 Capabilities may have values using a line of the form::
351
351
352 capability=value1,value2,value3
352 capability=value1,value2,value3
353
353
354 The values are always a list."""
354 The values are always a list."""
355 caps = {}
355 caps = {}
356 for line in blob.splitlines():
356 for line in blob.splitlines():
357 if not line:
357 if not line:
358 continue
358 continue
359 if '=' not in line:
359 if '=' not in line:
360 key, vals = line, ()
360 key, vals = line, ()
361 else:
361 else:
362 key, vals = line.split('=', 1)
362 key, vals = line.split('=', 1)
363 vals = vals.split(',')
363 vals = vals.split(',')
364 key = urllib.unquote(key)
364 key = urllib.unquote(key)
365 vals = [urllib.unquote(v) for v in vals]
365 vals = [urllib.unquote(v) for v in vals]
366 caps[key] = vals
366 caps[key] = vals
367 return caps
367 return caps
368
368
369 def encodecaps(caps):
369 def encodecaps(caps):
370 """encode a bundle2 caps dictionary into a bytes blob"""
370 """encode a bundle2 caps dictionary into a bytes blob"""
371 chunks = []
371 chunks = []
372 for ca in sorted(caps):
372 for ca in sorted(caps):
373 vals = caps[ca]
373 vals = caps[ca]
374 ca = urllib.quote(ca)
374 ca = urllib.quote(ca)
375 vals = [urllib.quote(v) for v in vals]
375 vals = [urllib.quote(v) for v in vals]
376 if vals:
376 if vals:
377 ca = "%s=%s" % (ca, ','.join(vals))
377 ca = "%s=%s" % (ca, ','.join(vals))
378 chunks.append(ca)
378 chunks.append(ca)
379 return '\n'.join(chunks)
379 return '\n'.join(chunks)
380
380
381 class bundle20(object):
381 class bundle20(object):
382 """represent an outgoing bundle2 container
382 """represent an outgoing bundle2 container
383
383
384 Use the `addparam` method to add stream level parameter. and `addpart` to
384 Use the `addparam` method to add stream level parameter. and `newpart` to
385 populate it. Then call `getchunks` to retrieve all the binary chunks of
385 populate it. Then call `getchunks` to retrieve all the binary chunks of
386 data that compose the bundle2 container."""
386 data that compose the bundle2 container."""
387
387
388 def __init__(self, ui, capabilities=()):
388 def __init__(self, ui, capabilities=()):
389 self.ui = ui
389 self.ui = ui
390 self._params = []
390 self._params = []
391 self._parts = []
391 self._parts = []
392 self.capabilities = dict(capabilities)
392 self.capabilities = dict(capabilities)
393
393
394 # methods used to defines the bundle2 content
394 # methods used to defines the bundle2 content
395 def addparam(self, name, value=None):
395 def addparam(self, name, value=None):
396 """add a stream level parameter"""
396 """add a stream level parameter"""
397 if not name:
397 if not name:
398 raise ValueError('empty parameter name')
398 raise ValueError('empty parameter name')
399 if name[0] not in string.letters:
399 if name[0] not in string.letters:
400 raise ValueError('non letter first character: %r' % name)
400 raise ValueError('non letter first character: %r' % name)
401 self._params.append((name, value))
401 self._params.append((name, value))
402
402
403 def addpart(self, part):
403 def addpart(self, part):
404 """add a new part to the bundle2 container
404 """add a new part to the bundle2 container
405
405
406 Parts contains the actual applicative payload."""
406 Parts contains the actual applicative payload."""
407 assert part.id is None
407 assert part.id is None
408 part.id = len(self._parts) # very cheap counter
408 part.id = len(self._parts) # very cheap counter
409 self._parts.append(part)
409 self._parts.append(part)
410
410
411 def newpart(self, typeid, *args, **kwargs):
411 def newpart(self, typeid, *args, **kwargs):
412 """create a new part for the containers"""
412 """create a new part for the containers"""
413 part = bundlepart(typeid, *args, **kwargs)
413 part = bundlepart(typeid, *args, **kwargs)
414 self.addpart(part)
414 return part
415 return part
415
416
416 # methods used to generate the bundle2 stream
417 # methods used to generate the bundle2 stream
417 def getchunks(self):
418 def getchunks(self):
418 self.ui.debug('start emission of %s stream\n' % _magicstring)
419 self.ui.debug('start emission of %s stream\n' % _magicstring)
419 yield _magicstring
420 yield _magicstring
420 param = self._paramchunk()
421 param = self._paramchunk()
421 self.ui.debug('bundle parameter: %s\n' % param)
422 self.ui.debug('bundle parameter: %s\n' % param)
422 yield _pack(_fstreamparamsize, len(param))
423 yield _pack(_fstreamparamsize, len(param))
423 if param:
424 if param:
424 yield param
425 yield param
425
426
426 self.ui.debug('start of parts\n')
427 self.ui.debug('start of parts\n')
427 for part in self._parts:
428 for part in self._parts:
428 self.ui.debug('bundle part: "%s"\n' % part.type)
429 self.ui.debug('bundle part: "%s"\n' % part.type)
429 for chunk in part.getchunks():
430 for chunk in part.getchunks():
430 yield chunk
431 yield chunk
431 self.ui.debug('end of bundle\n')
432 self.ui.debug('end of bundle\n')
432 yield '\0\0'
433 yield '\0\0'
433
434
434 def _paramchunk(self):
435 def _paramchunk(self):
435 """return a encoded version of all stream parameters"""
436 """return a encoded version of all stream parameters"""
436 blocks = []
437 blocks = []
437 for par, value in self._params:
438 for par, value in self._params:
438 par = urllib.quote(par)
439 par = urllib.quote(par)
439 if value is not None:
440 if value is not None:
440 value = urllib.quote(value)
441 value = urllib.quote(value)
441 par = '%s=%s' % (par, value)
442 par = '%s=%s' % (par, value)
442 blocks.append(par)
443 blocks.append(par)
443 return ' '.join(blocks)
444 return ' '.join(blocks)
444
445
445 class unpackermixin(object):
446 class unpackermixin(object):
446 """A mixin to extract bytes and struct data from a stream"""
447 """A mixin to extract bytes and struct data from a stream"""
447
448
448 def __init__(self, fp):
449 def __init__(self, fp):
449 self._fp = fp
450 self._fp = fp
450
451
451 def _unpack(self, format):
452 def _unpack(self, format):
452 """unpack this struct format from the stream"""
453 """unpack this struct format from the stream"""
453 data = self._readexact(struct.calcsize(format))
454 data = self._readexact(struct.calcsize(format))
454 return _unpack(format, data)
455 return _unpack(format, data)
455
456
456 def _readexact(self, size):
457 def _readexact(self, size):
457 """read exactly <size> bytes from the stream"""
458 """read exactly <size> bytes from the stream"""
458 return changegroup.readexactly(self._fp, size)
459 return changegroup.readexactly(self._fp, size)
459
460
460
461
461 class unbundle20(unpackermixin):
462 class unbundle20(unpackermixin):
462 """interpret a bundle2 stream
463 """interpret a bundle2 stream
463
464
464 This class is fed with a binary stream and yields parts through its
465 This class is fed with a binary stream and yields parts through its
465 `iterparts` methods."""
466 `iterparts` methods."""
466
467
467 def __init__(self, ui, fp, header=None):
468 def __init__(self, ui, fp, header=None):
468 """If header is specified, we do not read it out of the stream."""
469 """If header is specified, we do not read it out of the stream."""
469 self.ui = ui
470 self.ui = ui
470 super(unbundle20, self).__init__(fp)
471 super(unbundle20, self).__init__(fp)
471 if header is None:
472 if header is None:
472 header = self._readexact(4)
473 header = self._readexact(4)
473 magic, version = header[0:2], header[2:4]
474 magic, version = header[0:2], header[2:4]
474 if magic != 'HG':
475 if magic != 'HG':
475 raise util.Abort(_('not a Mercurial bundle'))
476 raise util.Abort(_('not a Mercurial bundle'))
476 if version != '2X':
477 if version != '2X':
477 raise util.Abort(_('unknown bundle version %s') % version)
478 raise util.Abort(_('unknown bundle version %s') % version)
478 self.ui.debug('start processing of %s stream\n' % header)
479 self.ui.debug('start processing of %s stream\n' % header)
479
480
480 @util.propertycache
481 @util.propertycache
481 def params(self):
482 def params(self):
482 """dictionary of stream level parameters"""
483 """dictionary of stream level parameters"""
483 self.ui.debug('reading bundle2 stream parameters\n')
484 self.ui.debug('reading bundle2 stream parameters\n')
484 params = {}
485 params = {}
485 paramssize = self._unpack(_fstreamparamsize)[0]
486 paramssize = self._unpack(_fstreamparamsize)[0]
486 if paramssize:
487 if paramssize:
487 for p in self._readexact(paramssize).split(' '):
488 for p in self._readexact(paramssize).split(' '):
488 p = p.split('=', 1)
489 p = p.split('=', 1)
489 p = [urllib.unquote(i) for i in p]
490 p = [urllib.unquote(i) for i in p]
490 if len(p) < 2:
491 if len(p) < 2:
491 p.append(None)
492 p.append(None)
492 self._processparam(*p)
493 self._processparam(*p)
493 params[p[0]] = p[1]
494 params[p[0]] = p[1]
494 return params
495 return params
495
496
496 def _processparam(self, name, value):
497 def _processparam(self, name, value):
497 """process a parameter, applying its effect if needed
498 """process a parameter, applying its effect if needed
498
499
499 Parameter starting with a lower case letter are advisory and will be
500 Parameter starting with a lower case letter are advisory and will be
500 ignored when unknown. Those starting with an upper case letter are
501 ignored when unknown. Those starting with an upper case letter are
501 mandatory and will this function will raise a KeyError when unknown.
502 mandatory and will this function will raise a KeyError when unknown.
502
503
503 Note: no option are currently supported. Any input will be either
504 Note: no option are currently supported. Any input will be either
504 ignored or failing.
505 ignored or failing.
505 """
506 """
506 if not name:
507 if not name:
507 raise ValueError('empty parameter name')
508 raise ValueError('empty parameter name')
508 if name[0] not in string.letters:
509 if name[0] not in string.letters:
509 raise ValueError('non letter first character: %r' % name)
510 raise ValueError('non letter first character: %r' % name)
510 # Some logic will be later added here to try to process the option for
511 # Some logic will be later added here to try to process the option for
511 # a dict of known parameter.
512 # a dict of known parameter.
512 if name[0].islower():
513 if name[0].islower():
513 self.ui.debug("ignoring unknown parameter %r\n" % name)
514 self.ui.debug("ignoring unknown parameter %r\n" % name)
514 else:
515 else:
515 raise KeyError(name)
516 raise KeyError(name)
516
517
517
518
518 def iterparts(self):
519 def iterparts(self):
519 """yield all parts contained in the stream"""
520 """yield all parts contained in the stream"""
520 # make sure param have been loaded
521 # make sure param have been loaded
521 self.params
522 self.params
522 self.ui.debug('start extraction of bundle2 parts\n')
523 self.ui.debug('start extraction of bundle2 parts\n')
523 headerblock = self._readpartheader()
524 headerblock = self._readpartheader()
524 while headerblock is not None:
525 while headerblock is not None:
525 part = unbundlepart(self.ui, headerblock, self._fp)
526 part = unbundlepart(self.ui, headerblock, self._fp)
526 yield part
527 yield part
527 headerblock = self._readpartheader()
528 headerblock = self._readpartheader()
528 self.ui.debug('end of bundle2 stream\n')
529 self.ui.debug('end of bundle2 stream\n')
529
530
530 def _readpartheader(self):
531 def _readpartheader(self):
531 """reads a part header size and return the bytes blob
532 """reads a part header size and return the bytes blob
532
533
533 returns None if empty"""
534 returns None if empty"""
534 headersize = self._unpack(_fpartheadersize)[0]
535 headersize = self._unpack(_fpartheadersize)[0]
535 self.ui.debug('part header size: %i\n' % headersize)
536 self.ui.debug('part header size: %i\n' % headersize)
536 if headersize:
537 if headersize:
537 return self._readexact(headersize)
538 return self._readexact(headersize)
538 return None
539 return None
539
540
540
541
541 class bundlepart(object):
542 class bundlepart(object):
542 """A bundle2 part contains application level payload
543 """A bundle2 part contains application level payload
543
544
544 The part `type` is used to route the part to the application level
545 The part `type` is used to route the part to the application level
545 handler.
546 handler.
546 """
547 """
547
548
548 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
549 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
549 data=''):
550 data=''):
550 self.id = None
551 self.id = None
551 self.type = parttype
552 self.type = parttype
552 self.data = data
553 self.data = data
553 self.mandatoryparams = mandatoryparams
554 self.mandatoryparams = mandatoryparams
554 self.advisoryparams = advisoryparams
555 self.advisoryparams = advisoryparams
555
556
556 def getchunks(self):
557 def getchunks(self):
557 #### header
558 #### header
558 ## parttype
559 ## parttype
559 header = [_pack(_fparttypesize, len(self.type)),
560 header = [_pack(_fparttypesize, len(self.type)),
560 self.type, _pack(_fpartid, self.id),
561 self.type, _pack(_fpartid, self.id),
561 ]
562 ]
562 ## parameters
563 ## parameters
563 # count
564 # count
564 manpar = self.mandatoryparams
565 manpar = self.mandatoryparams
565 advpar = self.advisoryparams
566 advpar = self.advisoryparams
566 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
567 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
567 # size
568 # size
568 parsizes = []
569 parsizes = []
569 for key, value in manpar:
570 for key, value in manpar:
570 parsizes.append(len(key))
571 parsizes.append(len(key))
571 parsizes.append(len(value))
572 parsizes.append(len(value))
572 for key, value in advpar:
573 for key, value in advpar:
573 parsizes.append(len(key))
574 parsizes.append(len(key))
574 parsizes.append(len(value))
575 parsizes.append(len(value))
575 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
576 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
576 header.append(paramsizes)
577 header.append(paramsizes)
577 # key, value
578 # key, value
578 for key, value in manpar:
579 for key, value in manpar:
579 header.append(key)
580 header.append(key)
580 header.append(value)
581 header.append(value)
581 for key, value in advpar:
582 for key, value in advpar:
582 header.append(key)
583 header.append(key)
583 header.append(value)
584 header.append(value)
584 ## finalize header
585 ## finalize header
585 headerchunk = ''.join(header)
586 headerchunk = ''.join(header)
586 yield _pack(_fpartheadersize, len(headerchunk))
587 yield _pack(_fpartheadersize, len(headerchunk))
587 yield headerchunk
588 yield headerchunk
588 ## payload
589 ## payload
589 for chunk in self._payloadchunks():
590 for chunk in self._payloadchunks():
590 yield _pack(_fpayloadsize, len(chunk))
591 yield _pack(_fpayloadsize, len(chunk))
591 yield chunk
592 yield chunk
592 # end of payload
593 # end of payload
593 yield _pack(_fpayloadsize, 0)
594 yield _pack(_fpayloadsize, 0)
594
595
595 def _payloadchunks(self):
596 def _payloadchunks(self):
596 """yield chunks of a the part payload
597 """yield chunks of a the part payload
597
598
598 Exists to handle the different methods to provide data to a part."""
599 Exists to handle the different methods to provide data to a part."""
599 # we only support fixed size data now.
600 # we only support fixed size data now.
600 # This will be improved in the future.
601 # This will be improved in the future.
601 if util.safehasattr(self.data, 'next'):
602 if util.safehasattr(self.data, 'next'):
602 buff = util.chunkbuffer(self.data)
603 buff = util.chunkbuffer(self.data)
603 chunk = buff.read(preferedchunksize)
604 chunk = buff.read(preferedchunksize)
604 while chunk:
605 while chunk:
605 yield chunk
606 yield chunk
606 chunk = buff.read(preferedchunksize)
607 chunk = buff.read(preferedchunksize)
607 elif len(self.data):
608 elif len(self.data):
608 yield self.data
609 yield self.data
609
610
610 class unbundlepart(unpackermixin):
611 class unbundlepart(unpackermixin):
611 """a bundle part read from a bundle"""
612 """a bundle part read from a bundle"""
612
613
613 def __init__(self, ui, header, fp):
614 def __init__(self, ui, header, fp):
614 super(unbundlepart, self).__init__(fp)
615 super(unbundlepart, self).__init__(fp)
615 self.ui = ui
616 self.ui = ui
616 # unbundle state attr
617 # unbundle state attr
617 self._headerdata = header
618 self._headerdata = header
618 self._headeroffset = 0
619 self._headeroffset = 0
619 self._initialized = False
620 self._initialized = False
620 self.consumed = False
621 self.consumed = False
621 # part data
622 # part data
622 self.id = None
623 self.id = None
623 self.type = None
624 self.type = None
624 self.mandatoryparams = None
625 self.mandatoryparams = None
625 self.advisoryparams = None
626 self.advisoryparams = None
626 self._payloadstream = None
627 self._payloadstream = None
627 self._readheader()
628 self._readheader()
628
629
629 def _fromheader(self, size):
630 def _fromheader(self, size):
630 """return the next <size> byte from the header"""
631 """return the next <size> byte from the header"""
631 offset = self._headeroffset
632 offset = self._headeroffset
632 data = self._headerdata[offset:(offset + size)]
633 data = self._headerdata[offset:(offset + size)]
633 self._headeroffset = offset + size
634 self._headeroffset = offset + size
634 return data
635 return data
635
636
636 def _unpackheader(self, format):
637 def _unpackheader(self, format):
637 """read given format from header
638 """read given format from header
638
639
639 This automatically compute the size of the format to read."""
640 This automatically compute the size of the format to read."""
640 data = self._fromheader(struct.calcsize(format))
641 data = self._fromheader(struct.calcsize(format))
641 return _unpack(format, data)
642 return _unpack(format, data)
642
643
643 def _readheader(self):
644 def _readheader(self):
644 """read the header and setup the object"""
645 """read the header and setup the object"""
645 typesize = self._unpackheader(_fparttypesize)[0]
646 typesize = self._unpackheader(_fparttypesize)[0]
646 self.type = self._fromheader(typesize)
647 self.type = self._fromheader(typesize)
647 self.ui.debug('part type: "%s"\n' % self.type)
648 self.ui.debug('part type: "%s"\n' % self.type)
648 self.id = self._unpackheader(_fpartid)[0]
649 self.id = self._unpackheader(_fpartid)[0]
649 self.ui.debug('part id: "%s"\n' % self.id)
650 self.ui.debug('part id: "%s"\n' % self.id)
650 ## reading parameters
651 ## reading parameters
651 # param count
652 # param count
652 mancount, advcount = self._unpackheader(_fpartparamcount)
653 mancount, advcount = self._unpackheader(_fpartparamcount)
653 self.ui.debug('part parameters: %i\n' % (mancount + advcount))
654 self.ui.debug('part parameters: %i\n' % (mancount + advcount))
654 # param size
655 # param size
655 fparamsizes = _makefpartparamsizes(mancount + advcount)
656 fparamsizes = _makefpartparamsizes(mancount + advcount)
656 paramsizes = self._unpackheader(fparamsizes)
657 paramsizes = self._unpackheader(fparamsizes)
657 # make it a list of couple again
658 # make it a list of couple again
658 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
659 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
659 # split mandatory from advisory
660 # split mandatory from advisory
660 mansizes = paramsizes[:mancount]
661 mansizes = paramsizes[:mancount]
661 advsizes = paramsizes[mancount:]
662 advsizes = paramsizes[mancount:]
662 # retrive param value
663 # retrive param value
663 manparams = []
664 manparams = []
664 for key, value in mansizes:
665 for key, value in mansizes:
665 manparams.append((self._fromheader(key), self._fromheader(value)))
666 manparams.append((self._fromheader(key), self._fromheader(value)))
666 advparams = []
667 advparams = []
667 for key, value in advsizes:
668 for key, value in advsizes:
668 advparams.append((self._fromheader(key), self._fromheader(value)))
669 advparams.append((self._fromheader(key), self._fromheader(value)))
669 self.mandatoryparams = manparams
670 self.mandatoryparams = manparams
670 self.advisoryparams = advparams
671 self.advisoryparams = advparams
671 ## part payload
672 ## part payload
672 def payloadchunks():
673 def payloadchunks():
673 payloadsize = self._unpack(_fpayloadsize)[0]
674 payloadsize = self._unpack(_fpayloadsize)[0]
674 self.ui.debug('payload chunk size: %i\n' % payloadsize)
675 self.ui.debug('payload chunk size: %i\n' % payloadsize)
675 while payloadsize:
676 while payloadsize:
676 yield self._readexact(payloadsize)
677 yield self._readexact(payloadsize)
677 payloadsize = self._unpack(_fpayloadsize)[0]
678 payloadsize = self._unpack(_fpayloadsize)[0]
678 self.ui.debug('payload chunk size: %i\n' % payloadsize)
679 self.ui.debug('payload chunk size: %i\n' % payloadsize)
679 self._payloadstream = util.chunkbuffer(payloadchunks())
680 self._payloadstream = util.chunkbuffer(payloadchunks())
680 # we read the data, tell it
681 # we read the data, tell it
681 self._initialized = True
682 self._initialized = True
682
683
683 def read(self, size=None):
684 def read(self, size=None):
684 """read payload data"""
685 """read payload data"""
685 if not self._initialized:
686 if not self._initialized:
686 self._readheader()
687 self._readheader()
687 if size is None:
688 if size is None:
688 data = self._payloadstream.read()
689 data = self._payloadstream.read()
689 else:
690 else:
690 data = self._payloadstream.read(size)
691 data = self._payloadstream.read(size)
691 if size is None or len(data) < size:
692 if size is None or len(data) < size:
692 self.consumed = True
693 self.consumed = True
693 return data
694 return data
694
695
695
696
696 @parthandler('b2x:changegroup')
697 @parthandler('b2x:changegroup')
697 def handlechangegroup(op, inpart):
698 def handlechangegroup(op, inpart):
698 """apply a changegroup part on the repo
699 """apply a changegroup part on the repo
699
700
700 This is a very early implementation that will massive rework before being
701 This is a very early implementation that will massive rework before being
701 inflicted to any end-user.
702 inflicted to any end-user.
702 """
703 """
703 # Make sure we trigger a transaction creation
704 # Make sure we trigger a transaction creation
704 #
705 #
705 # The addchangegroup function will get a transaction object by itself, but
706 # The addchangegroup function will get a transaction object by itself, but
706 # we need to make sure we trigger the creation of a transaction object used
707 # we need to make sure we trigger the creation of a transaction object used
707 # for the whole processing scope.
708 # for the whole processing scope.
708 op.gettransaction()
709 op.gettransaction()
709 cg = changegroup.unbundle10(inpart, 'UN')
710 cg = changegroup.unbundle10(inpart, 'UN')
710 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
711 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
711 op.records.add('changegroup', {'return': ret})
712 op.records.add('changegroup', {'return': ret})
712 if op.reply is not None:
713 if op.reply is not None:
713 # This is definitly not the final form of this
714 # This is definitly not the final form of this
714 # return. But one need to start somewhere.
715 # return. But one need to start somewhere.
715 part = op.reply.newpart('b2x:reply:changegroup', (),
716 op.reply.newpart('b2x:reply:changegroup', (),
716 [('in-reply-to', str(inpart.id)),
717 [('in-reply-to', str(inpart.id)),
717 ('return', '%i' % ret)])
718 ('return', '%i' % ret)])
718 op.reply.addpart(part)
719 assert not inpart.read()
719 assert not inpart.read()
720
720
721 @parthandler('b2x:reply:changegroup')
721 @parthandler('b2x:reply:changegroup')
722 def handlechangegroup(op, inpart):
722 def handlechangegroup(op, inpart):
723 p = dict(inpart.advisoryparams)
723 p = dict(inpart.advisoryparams)
724 ret = int(p['return'])
724 ret = int(p['return'])
725 op.records.add('changegroup', {'return': ret}, int(p['in-reply-to']))
725 op.records.add('changegroup', {'return': ret}, int(p['in-reply-to']))
726
726
727 @parthandler('b2x:check:heads')
727 @parthandler('b2x:check:heads')
728 def handlechangegroup(op, inpart):
728 def handlechangegroup(op, inpart):
729 """check that head of the repo did not change
729 """check that head of the repo did not change
730
730
731 This is used to detect a push race when using unbundle.
731 This is used to detect a push race when using unbundle.
732 This replaces the "heads" argument of unbundle."""
732 This replaces the "heads" argument of unbundle."""
733 h = inpart.read(20)
733 h = inpart.read(20)
734 heads = []
734 heads = []
735 while len(h) == 20:
735 while len(h) == 20:
736 heads.append(h)
736 heads.append(h)
737 h = inpart.read(20)
737 h = inpart.read(20)
738 assert not h
738 assert not h
739 if heads != op.repo.heads():
739 if heads != op.repo.heads():
740 raise error.PushRaced('repository changed while pushing - '
740 raise error.PushRaced('repository changed while pushing - '
741 'please try again')
741 'please try again')
742
742
743 @parthandler('b2x:output')
743 @parthandler('b2x:output')
744 def handleoutput(op, inpart):
744 def handleoutput(op, inpart):
745 """forward output captured on the server to the client"""
745 """forward output captured on the server to the client"""
746 for line in inpart.read().splitlines():
746 for line in inpart.read().splitlines():
747 op.ui.write(('remote: %s\n' % line))
747 op.ui.write(('remote: %s\n' % line))
748
748
749 @parthandler('b2x:replycaps')
749 @parthandler('b2x:replycaps')
750 def handlereplycaps(op, inpart):
750 def handlereplycaps(op, inpart):
751 """Notify that a reply bundle should be created
751 """Notify that a reply bundle should be created
752
752
753 The payload contains the capabilities information for the reply"""
753 The payload contains the capabilities information for the reply"""
754 caps = decodecaps(inpart.read())
754 caps = decodecaps(inpart.read())
755 if op.reply is None:
755 if op.reply is None:
756 op.reply = bundle20(op.ui, caps)
756 op.reply = bundle20(op.ui, caps)
757
757
758 @parthandler('b2x:error:abort')
758 @parthandler('b2x:error:abort')
759 def handlereplycaps(op, inpart):
759 def handlereplycaps(op, inpart):
760 """Used to transmit abort error over the wire"""
760 """Used to transmit abort error over the wire"""
761 manargs = dict(inpart.mandatoryparams)
761 manargs = dict(inpart.mandatoryparams)
762 advargs = dict(inpart.advisoryparams)
762 advargs = dict(inpart.advisoryparams)
763 raise util.Abort(manargs['message'], hint=advargs.get('hint'))
763 raise util.Abort(manargs['message'], hint=advargs.get('hint'))
764
764
765 @parthandler('b2x:error:unknownpart')
765 @parthandler('b2x:error:unknownpart')
766 def handlereplycaps(op, inpart):
766 def handlereplycaps(op, inpart):
767 """Used to transmit unknown part error over the wire"""
767 """Used to transmit unknown part error over the wire"""
768 manargs = dict(inpart.mandatoryparams)
768 manargs = dict(inpart.mandatoryparams)
769 raise UnknownPartError(manargs['parttype'])
769 raise UnknownPartError(manargs['parttype'])
770
770
771 @parthandler('b2x:error:pushraced')
771 @parthandler('b2x:error:pushraced')
772 def handlereplycaps(op, inpart):
772 def handlereplycaps(op, inpart):
773 """Used to transmit push race error over the wire"""
773 """Used to transmit push race error over the wire"""
774 manargs = dict(inpart.mandatoryparams)
774 manargs = dict(inpart.mandatoryparams)
775 raise error.ResponseError(_('push failed:'), manargs['message'])
775 raise error.ResponseError(_('push failed:'), manargs['message'])
General Comments 0
You need to be logged in to leave comments. Login now