##// END OF EJS Templates
bundle2: track life cycle of parts...
Pierre-Yves David -
r21601:7ff01bef default
parent child Browse files
Show More
@@ -1,774 +1,784 b''
1 # bundle2.py - generic container format to transmit arbitrary data.
1 # bundle2.py - generic container format to transmit arbitrary data.
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7 """Handling of the new bundle2 format
7 """Handling of the new bundle2 format
8
8
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 payloads in an application agnostic way. It consist in a sequence of "parts"
10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 that will be handed to and processed by the application layer.
11 that will be handed to and processed by the application layer.
12
12
13
13
14 General format architecture
14 General format architecture
15 ===========================
15 ===========================
16
16
17 The format is architectured as follow
17 The format is architectured as follow
18
18
19 - magic string
19 - magic string
20 - stream level parameters
20 - stream level parameters
21 - payload parts (any number)
21 - payload parts (any number)
22 - end of stream marker.
22 - end of stream marker.
23
23
24 the Binary format
24 the Binary format
25 ============================
25 ============================
26
26
27 All numbers are unsigned and big-endian.
27 All numbers are unsigned and big-endian.
28
28
29 stream level parameters
29 stream level parameters
30 ------------------------
30 ------------------------
31
31
32 Binary format is as follow
32 Binary format is as follow
33
33
34 :params size: (16 bits integer)
34 :params size: (16 bits integer)
35
35
36 The total number of Bytes used by the parameters
36 The total number of Bytes used by the parameters
37
37
38 :params value: arbitrary number of Bytes
38 :params value: arbitrary number of Bytes
39
39
40 A blob of `params size` containing the serialized version of all stream level
40 A blob of `params size` containing the serialized version of all stream level
41 parameters.
41 parameters.
42
42
43 The blob contains a space separated list of parameters. Parameters with value
43 The blob contains a space separated list of parameters. Parameters with value
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45
45
46 Empty name are obviously forbidden.
46 Empty name are obviously forbidden.
47
47
48 Name MUST start with a letter. If this first letter is lower case, the
48 Name MUST start with a letter. If this first letter is lower case, the
49 parameter is advisory and can be safely ignored. However when the first
49 parameter is advisory and can be safely ignored. However when the first
50 letter is capital, the parameter is mandatory and the bundling process MUST
50 letter is capital, the parameter is mandatory and the bundling process MUST
51 stop if he is not able to proceed it.
51 stop if he is not able to proceed it.
52
52
53 Stream parameters use a simple textual format for two main reasons:
53 Stream parameters use a simple textual format for two main reasons:
54
54
55 - Stream level parameters should remain simple and we want to discourage any
55 - Stream level parameters should remain simple and we want to discourage any
56 crazy usage.
56 crazy usage.
57 - Textual data allow easy human inspection of a bundle2 header in case of
57 - Textual data allow easy human inspection of a bundle2 header in case of
58 troubles.
58 troubles.
59
59
60 Any Applicative level options MUST go into a bundle2 part instead.
60 Any Applicative level options MUST go into a bundle2 part instead.
61
61
62 Payload part
62 Payload part
63 ------------------------
63 ------------------------
64
64
65 Binary format is as follow
65 Binary format is as follow
66
66
67 :header size: (16 bits inter)
67 :header size: (16 bits inter)
68
68
69 The total number of Bytes used by the part headers. When the header is empty
69 The total number of Bytes used by the part headers. When the header is empty
70 (size = 0) this is interpreted as the end of stream marker.
70 (size = 0) this is interpreted as the end of stream marker.
71
71
72 :header:
72 :header:
73
73
74 The header defines how to interpret the part. It contains two piece of
74 The header defines how to interpret the part. It contains two piece of
75 data: the part type, and the part parameters.
75 data: the part type, and the part parameters.
76
76
77 The part type is used to route an application level handler, that can
77 The part type is used to route an application level handler, that can
78 interpret payload.
78 interpret payload.
79
79
80 Part parameters are passed to the application level handler. They are
80 Part parameters are passed to the application level handler. They are
81 meant to convey information that will help the application level object to
81 meant to convey information that will help the application level object to
82 interpret the part payload.
82 interpret the part payload.
83
83
84 The binary format of the header is has follow
84 The binary format of the header is has follow
85
85
86 :typesize: (one byte)
86 :typesize: (one byte)
87
87
88 :parttype: alphanumerical part name
88 :parttype: alphanumerical part name
89
89
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 to this part.
91 to this part.
92
92
93 :parameters:
93 :parameters:
94
94
95 Part's parameter may have arbitrary content, the binary structure is::
95 Part's parameter may have arbitrary content, the binary structure is::
96
96
97 <mandatory-count><advisory-count><param-sizes><param-data>
97 <mandatory-count><advisory-count><param-sizes><param-data>
98
98
99 :mandatory-count: 1 byte, number of mandatory parameters
99 :mandatory-count: 1 byte, number of mandatory parameters
100
100
101 :advisory-count: 1 byte, number of advisory parameters
101 :advisory-count: 1 byte, number of advisory parameters
102
102
103 :param-sizes:
103 :param-sizes:
104
104
105 N couple of bytes, where N is the total number of parameters. Each
105 N couple of bytes, where N is the total number of parameters. Each
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107
107
108 :param-data:
108 :param-data:
109
109
110 A blob of bytes from which each parameter key and value can be
110 A blob of bytes from which each parameter key and value can be
111 retrieved using the list of size couples stored in the previous
111 retrieved using the list of size couples stored in the previous
112 field.
112 field.
113
113
114 Mandatory parameters comes first, then the advisory ones.
114 Mandatory parameters comes first, then the advisory ones.
115
115
116 :payload:
116 :payload:
117
117
118 payload is a series of `<chunksize><chunkdata>`.
118 payload is a series of `<chunksize><chunkdata>`.
119
119
120 `chunksize` is a 32 bits integer, `chunkdata` are plain bytes (as much as
120 `chunksize` is a 32 bits integer, `chunkdata` are plain bytes (as much as
121 `chunksize` says)` The payload part is concluded by a zero size chunk.
121 `chunksize` says)` The payload part is concluded by a zero size chunk.
122
122
123 The current implementation always produces either zero or one chunk.
123 The current implementation always produces either zero or one chunk.
124 This is an implementation limitation that will ultimately be lifted.
124 This is an implementation limitation that will ultimately be lifted.
125
125
126 Bundle processing
126 Bundle processing
127 ============================
127 ============================
128
128
129 Each part is processed in order using a "part handler". Handler are registered
129 Each part is processed in order using a "part handler". Handler are registered
130 for a certain part type.
130 for a certain part type.
131
131
132 The matching of a part to its handler is case insensitive. The case of the
132 The matching of a part to its handler is case insensitive. The case of the
133 part type is used to know if a part is mandatory or advisory. If the Part type
133 part type is used to know if a part is mandatory or advisory. If the Part type
134 contains any uppercase char it is considered mandatory. When no handler is
134 contains any uppercase char it is considered mandatory. When no handler is
135 known for a Mandatory part, the process is aborted and an exception is raised.
135 known for a Mandatory part, the process is aborted and an exception is raised.
136 If the part is advisory and no handler is known, the part is ignored. When the
136 If the part is advisory and no handler is known, the part is ignored. When the
137 process is aborted, the full bundle is still read from the stream to keep the
137 process is aborted, the full bundle is still read from the stream to keep the
138 channel usable. But none of the part read from an abort are processed. In the
138 channel usable. But none of the part read from an abort are processed. In the
139 future, dropping the stream may become an option for channel we do not care to
139 future, dropping the stream may become an option for channel we do not care to
140 preserve.
140 preserve.
141 """
141 """
142
142
143 import util
143 import util
144 import struct
144 import struct
145 import urllib
145 import urllib
146 import string
146 import string
147
147
148 import changegroup, error
148 import changegroup, error
149 from i18n import _
149 from i18n import _
150
150
151 _pack = struct.pack
151 _pack = struct.pack
152 _unpack = struct.unpack
152 _unpack = struct.unpack
153
153
154 _magicstring = 'HG2X'
154 _magicstring = 'HG2X'
155
155
156 _fstreamparamsize = '>H'
156 _fstreamparamsize = '>H'
157 _fpartheadersize = '>H'
157 _fpartheadersize = '>H'
158 _fparttypesize = '>B'
158 _fparttypesize = '>B'
159 _fpartid = '>I'
159 _fpartid = '>I'
160 _fpayloadsize = '>I'
160 _fpayloadsize = '>I'
161 _fpartparamcount = '>BB'
161 _fpartparamcount = '>BB'
162
162
163 preferedchunksize = 4096
163 preferedchunksize = 4096
164
164
165 def _makefpartparamsizes(nbparams):
165 def _makefpartparamsizes(nbparams):
166 """return a struct format to read part parameter sizes
166 """return a struct format to read part parameter sizes
167
167
168 The number parameters is variable so we need to build that format
168 The number parameters is variable so we need to build that format
169 dynamically.
169 dynamically.
170 """
170 """
171 return '>'+('BB'*nbparams)
171 return '>'+('BB'*nbparams)
172
172
173 class UnknownPartError(KeyError):
173 class UnknownPartError(KeyError):
174 """error raised when no handler is found for a Mandatory part"""
174 """error raised when no handler is found for a Mandatory part"""
175 pass
175 pass
176
176
177 parthandlermapping = {}
177 parthandlermapping = {}
178
178
179 def parthandler(parttype):
179 def parthandler(parttype):
180 """decorator that register a function as a bundle2 part handler
180 """decorator that register a function as a bundle2 part handler
181
181
182 eg::
182 eg::
183
183
184 @parthandler('myparttype')
184 @parthandler('myparttype')
185 def myparttypehandler(...):
185 def myparttypehandler(...):
186 '''process a part of type "my part".'''
186 '''process a part of type "my part".'''
187 ...
187 ...
188 """
188 """
189 def _decorator(func):
189 def _decorator(func):
190 lparttype = parttype.lower() # enforce lower case matching.
190 lparttype = parttype.lower() # enforce lower case matching.
191 assert lparttype not in parthandlermapping
191 assert lparttype not in parthandlermapping
192 parthandlermapping[lparttype] = func
192 parthandlermapping[lparttype] = func
193 return func
193 return func
194 return _decorator
194 return _decorator
195
195
196 class unbundlerecords(object):
196 class unbundlerecords(object):
197 """keep record of what happens during and unbundle
197 """keep record of what happens during and unbundle
198
198
199 New records are added using `records.add('cat', obj)`. Where 'cat' is a
199 New records are added using `records.add('cat', obj)`. Where 'cat' is a
200 category of record and obj is an arbitrary object.
200 category of record and obj is an arbitrary object.
201
201
202 `records['cat']` will return all entries of this category 'cat'.
202 `records['cat']` will return all entries of this category 'cat'.
203
203
204 Iterating on the object itself will yield `('category', obj)` tuples
204 Iterating on the object itself will yield `('category', obj)` tuples
205 for all entries.
205 for all entries.
206
206
207 All iterations happens in chronological order.
207 All iterations happens in chronological order.
208 """
208 """
209
209
210 def __init__(self):
210 def __init__(self):
211 self._categories = {}
211 self._categories = {}
212 self._sequences = []
212 self._sequences = []
213 self._replies = {}
213 self._replies = {}
214
214
215 def add(self, category, entry, inreplyto=None):
215 def add(self, category, entry, inreplyto=None):
216 """add a new record of a given category.
216 """add a new record of a given category.
217
217
218 The entry can then be retrieved in the list returned by
218 The entry can then be retrieved in the list returned by
219 self['category']."""
219 self['category']."""
220 self._categories.setdefault(category, []).append(entry)
220 self._categories.setdefault(category, []).append(entry)
221 self._sequences.append((category, entry))
221 self._sequences.append((category, entry))
222 if inreplyto is not None:
222 if inreplyto is not None:
223 self.getreplies(inreplyto).add(category, entry)
223 self.getreplies(inreplyto).add(category, entry)
224
224
225 def getreplies(self, partid):
225 def getreplies(self, partid):
226 """get the subrecords that replies to a specific part"""
226 """get the subrecords that replies to a specific part"""
227 return self._replies.setdefault(partid, unbundlerecords())
227 return self._replies.setdefault(partid, unbundlerecords())
228
228
229 def __getitem__(self, cat):
229 def __getitem__(self, cat):
230 return tuple(self._categories.get(cat, ()))
230 return tuple(self._categories.get(cat, ()))
231
231
232 def __iter__(self):
232 def __iter__(self):
233 return iter(self._sequences)
233 return iter(self._sequences)
234
234
235 def __len__(self):
235 def __len__(self):
236 return len(self._sequences)
236 return len(self._sequences)
237
237
238 def __nonzero__(self):
238 def __nonzero__(self):
239 return bool(self._sequences)
239 return bool(self._sequences)
240
240
241 class bundleoperation(object):
241 class bundleoperation(object):
242 """an object that represents a single bundling process
242 """an object that represents a single bundling process
243
243
244 Its purpose is to carry unbundle-related objects and states.
244 Its purpose is to carry unbundle-related objects and states.
245
245
246 A new object should be created at the beginning of each bundle processing.
246 A new object should be created at the beginning of each bundle processing.
247 The object is to be returned by the processing function.
247 The object is to be returned by the processing function.
248
248
249 The object has very little content now it will ultimately contain:
249 The object has very little content now it will ultimately contain:
250 * an access to the repo the bundle is applied to,
250 * an access to the repo the bundle is applied to,
251 * a ui object,
251 * a ui object,
252 * a way to retrieve a transaction to add changes to the repo,
252 * a way to retrieve a transaction to add changes to the repo,
253 * a way to record the result of processing each part,
253 * a way to record the result of processing each part,
254 * a way to construct a bundle response when applicable.
254 * a way to construct a bundle response when applicable.
255 """
255 """
256
256
257 def __init__(self, repo, transactiongetter):
257 def __init__(self, repo, transactiongetter):
258 self.repo = repo
258 self.repo = repo
259 self.ui = repo.ui
259 self.ui = repo.ui
260 self.records = unbundlerecords()
260 self.records = unbundlerecords()
261 self.gettransaction = transactiongetter
261 self.gettransaction = transactiongetter
262 self.reply = None
262 self.reply = None
263
263
264 class TransactionUnavailable(RuntimeError):
264 class TransactionUnavailable(RuntimeError):
265 pass
265 pass
266
266
267 def _notransaction():
267 def _notransaction():
268 """default method to get a transaction while processing a bundle
268 """default method to get a transaction while processing a bundle
269
269
270 Raise an exception to highlight the fact that no transaction was expected
270 Raise an exception to highlight the fact that no transaction was expected
271 to be created"""
271 to be created"""
272 raise TransactionUnavailable()
272 raise TransactionUnavailable()
273
273
274 def processbundle(repo, unbundler, transactiongetter=_notransaction):
274 def processbundle(repo, unbundler, transactiongetter=_notransaction):
275 """This function process a bundle, apply effect to/from a repo
275 """This function process a bundle, apply effect to/from a repo
276
276
277 It iterates over each part then searches for and uses the proper handling
277 It iterates over each part then searches for and uses the proper handling
278 code to process the part. Parts are processed in order.
278 code to process the part. Parts are processed in order.
279
279
280 This is very early version of this function that will be strongly reworked
280 This is very early version of this function that will be strongly reworked
281 before final usage.
281 before final usage.
282
282
283 Unknown Mandatory part will abort the process.
283 Unknown Mandatory part will abort the process.
284 """
284 """
285 op = bundleoperation(repo, transactiongetter)
285 op = bundleoperation(repo, transactiongetter)
286 # todo:
286 # todo:
287 # - replace this is a init function soon.
287 # - replace this is a init function soon.
288 # - exception catching
288 # - exception catching
289 unbundler.params
289 unbundler.params
290 iterparts = unbundler.iterparts()
290 iterparts = unbundler.iterparts()
291 part = None
291 part = None
292 try:
292 try:
293 for part in iterparts:
293 for part in iterparts:
294 parttype = part.type
294 parttype = part.type
295 # part key are matched lower case
295 # part key are matched lower case
296 key = parttype.lower()
296 key = parttype.lower()
297 try:
297 try:
298 handler = parthandlermapping[key]
298 handler = parthandlermapping[key]
299 op.ui.debug('found a handler for part %r\n' % parttype)
299 op.ui.debug('found a handler for part %r\n' % parttype)
300 except KeyError:
300 except KeyError:
301 if key != parttype: # mandatory parts
301 if key != parttype: # mandatory parts
302 # todo:
302 # todo:
303 # - use a more precise exception
303 # - use a more precise exception
304 raise UnknownPartError(key)
304 raise UnknownPartError(key)
305 op.ui.debug('ignoring unknown advisory part %r\n' % key)
305 op.ui.debug('ignoring unknown advisory part %r\n' % key)
306 # consuming the part
306 # consuming the part
307 part.read()
307 part.read()
308 continue
308 continue
309
309
310 # handler is called outside the above try block so that we don't
310 # handler is called outside the above try block so that we don't
311 # risk catching KeyErrors from anything other than the
311 # risk catching KeyErrors from anything other than the
312 # parthandlermapping lookup (any KeyError raised by handler()
312 # parthandlermapping lookup (any KeyError raised by handler()
313 # itself represents a defect of a different variety).
313 # itself represents a defect of a different variety).
314 output = None
314 output = None
315 if op.reply is not None:
315 if op.reply is not None:
316 op.ui.pushbuffer(error=True)
316 op.ui.pushbuffer(error=True)
317 output = ''
317 output = ''
318 try:
318 try:
319 handler(op, part)
319 handler(op, part)
320 finally:
320 finally:
321 if output is not None:
321 if output is not None:
322 output = op.ui.popbuffer()
322 output = op.ui.popbuffer()
323 if output:
323 if output:
324 op.reply.newpart('b2x:output',
324 op.reply.newpart('b2x:output',
325 advisoryparams=[('in-reply-to',
325 advisoryparams=[('in-reply-to',
326 str(part.id))],
326 str(part.id))],
327 data=output)
327 data=output)
328 part.read()
328 part.read()
329 except Exception, exc:
329 except Exception, exc:
330 if part is not None:
330 if part is not None:
331 # consume the bundle content
331 # consume the bundle content
332 part.read()
332 part.read()
333 for part in iterparts:
333 for part in iterparts:
334 # consume the bundle content
334 # consume the bundle content
335 part.read()
335 part.read()
336 # Small hack to let caller code distinguish exceptions from bundle2
336 # Small hack to let caller code distinguish exceptions from bundle2
337 # processing fron the ones from bundle1 processing. This is mostly
337 # processing fron the ones from bundle1 processing. This is mostly
338 # needed to handle different return codes to unbundle according to the
338 # needed to handle different return codes to unbundle according to the
339 # type of bundle. We should probably clean up or drop this return code
339 # type of bundle. We should probably clean up or drop this return code
340 # craziness in a future version.
340 # craziness in a future version.
341 exc.duringunbundle2 = True
341 exc.duringunbundle2 = True
342 raise
342 raise
343 return op
343 return op
344
344
345 def decodecaps(blob):
345 def decodecaps(blob):
346 """decode a bundle2 caps bytes blob into a dictionnary
346 """decode a bundle2 caps bytes blob into a dictionnary
347
347
348 The blob is a list of capabilities (one per line)
348 The blob is a list of capabilities (one per line)
349 Capabilities may have values using a line of the form::
349 Capabilities may have values using a line of the form::
350
350
351 capability=value1,value2,value3
351 capability=value1,value2,value3
352
352
353 The values are always a list."""
353 The values are always a list."""
354 caps = {}
354 caps = {}
355 for line in blob.splitlines():
355 for line in blob.splitlines():
356 if not line:
356 if not line:
357 continue
357 continue
358 if '=' not in line:
358 if '=' not in line:
359 key, vals = line, ()
359 key, vals = line, ()
360 else:
360 else:
361 key, vals = line.split('=', 1)
361 key, vals = line.split('=', 1)
362 vals = vals.split(',')
362 vals = vals.split(',')
363 key = urllib.unquote(key)
363 key = urllib.unquote(key)
364 vals = [urllib.unquote(v) for v in vals]
364 vals = [urllib.unquote(v) for v in vals]
365 caps[key] = vals
365 caps[key] = vals
366 return caps
366 return caps
367
367
368 def encodecaps(caps):
368 def encodecaps(caps):
369 """encode a bundle2 caps dictionary into a bytes blob"""
369 """encode a bundle2 caps dictionary into a bytes blob"""
370 chunks = []
370 chunks = []
371 for ca in sorted(caps):
371 for ca in sorted(caps):
372 vals = caps[ca]
372 vals = caps[ca]
373 ca = urllib.quote(ca)
373 ca = urllib.quote(ca)
374 vals = [urllib.quote(v) for v in vals]
374 vals = [urllib.quote(v) for v in vals]
375 if vals:
375 if vals:
376 ca = "%s=%s" % (ca, ','.join(vals))
376 ca = "%s=%s" % (ca, ','.join(vals))
377 chunks.append(ca)
377 chunks.append(ca)
378 return '\n'.join(chunks)
378 return '\n'.join(chunks)
379
379
380 class bundle20(object):
380 class bundle20(object):
381 """represent an outgoing bundle2 container
381 """represent an outgoing bundle2 container
382
382
383 Use the `addparam` method to add stream level parameter. and `newpart` to
383 Use the `addparam` method to add stream level parameter. and `newpart` to
384 populate it. Then call `getchunks` to retrieve all the binary chunks of
384 populate it. Then call `getchunks` to retrieve all the binary chunks of
385 data that compose the bundle2 container."""
385 data that compose the bundle2 container."""
386
386
387 def __init__(self, ui, capabilities=()):
387 def __init__(self, ui, capabilities=()):
388 self.ui = ui
388 self.ui = ui
389 self._params = []
389 self._params = []
390 self._parts = []
390 self._parts = []
391 self.capabilities = dict(capabilities)
391 self.capabilities = dict(capabilities)
392
392
393 # methods used to defines the bundle2 content
393 # methods used to defines the bundle2 content
394 def addparam(self, name, value=None):
394 def addparam(self, name, value=None):
395 """add a stream level parameter"""
395 """add a stream level parameter"""
396 if not name:
396 if not name:
397 raise ValueError('empty parameter name')
397 raise ValueError('empty parameter name')
398 if name[0] not in string.letters:
398 if name[0] not in string.letters:
399 raise ValueError('non letter first character: %r' % name)
399 raise ValueError('non letter first character: %r' % name)
400 self._params.append((name, value))
400 self._params.append((name, value))
401
401
402 def addpart(self, part):
402 def addpart(self, part):
403 """add a new part to the bundle2 container
403 """add a new part to the bundle2 container
404
404
405 Parts contains the actual applicative payload."""
405 Parts contains the actual applicative payload."""
406 assert part.id is None
406 assert part.id is None
407 part.id = len(self._parts) # very cheap counter
407 part.id = len(self._parts) # very cheap counter
408 self._parts.append(part)
408 self._parts.append(part)
409
409
410 def newpart(self, typeid, *args, **kwargs):
410 def newpart(self, typeid, *args, **kwargs):
411 """create a new part and add it to the containers"""
411 """create a new part and add it to the containers"""
412 part = bundlepart(typeid, *args, **kwargs)
412 part = bundlepart(typeid, *args, **kwargs)
413 self.addpart(part)
413 self.addpart(part)
414 return part
414 return part
415
415
416 # methods used to generate the bundle2 stream
416 # methods used to generate the bundle2 stream
417 def getchunks(self):
417 def getchunks(self):
418 self.ui.debug('start emission of %s stream\n' % _magicstring)
418 self.ui.debug('start emission of %s stream\n' % _magicstring)
419 yield _magicstring
419 yield _magicstring
420 param = self._paramchunk()
420 param = self._paramchunk()
421 self.ui.debug('bundle parameter: %s\n' % param)
421 self.ui.debug('bundle parameter: %s\n' % param)
422 yield _pack(_fstreamparamsize, len(param))
422 yield _pack(_fstreamparamsize, len(param))
423 if param:
423 if param:
424 yield param
424 yield param
425
425
426 self.ui.debug('start of parts\n')
426 self.ui.debug('start of parts\n')
427 for part in self._parts:
427 for part in self._parts:
428 self.ui.debug('bundle part: "%s"\n' % part.type)
428 self.ui.debug('bundle part: "%s"\n' % part.type)
429 for chunk in part.getchunks():
429 for chunk in part.getchunks():
430 yield chunk
430 yield chunk
431 self.ui.debug('end of bundle\n')
431 self.ui.debug('end of bundle\n')
432 yield '\0\0'
432 yield '\0\0'
433
433
434 def _paramchunk(self):
434 def _paramchunk(self):
435 """return a encoded version of all stream parameters"""
435 """return a encoded version of all stream parameters"""
436 blocks = []
436 blocks = []
437 for par, value in self._params:
437 for par, value in self._params:
438 par = urllib.quote(par)
438 par = urllib.quote(par)
439 if value is not None:
439 if value is not None:
440 value = urllib.quote(value)
440 value = urllib.quote(value)
441 par = '%s=%s' % (par, value)
441 par = '%s=%s' % (par, value)
442 blocks.append(par)
442 blocks.append(par)
443 return ' '.join(blocks)
443 return ' '.join(blocks)
444
444
445 class unpackermixin(object):
445 class unpackermixin(object):
446 """A mixin to extract bytes and struct data from a stream"""
446 """A mixin to extract bytes and struct data from a stream"""
447
447
448 def __init__(self, fp):
448 def __init__(self, fp):
449 self._fp = fp
449 self._fp = fp
450
450
451 def _unpack(self, format):
451 def _unpack(self, format):
452 """unpack this struct format from the stream"""
452 """unpack this struct format from the stream"""
453 data = self._readexact(struct.calcsize(format))
453 data = self._readexact(struct.calcsize(format))
454 return _unpack(format, data)
454 return _unpack(format, data)
455
455
456 def _readexact(self, size):
456 def _readexact(self, size):
457 """read exactly <size> bytes from the stream"""
457 """read exactly <size> bytes from the stream"""
458 return changegroup.readexactly(self._fp, size)
458 return changegroup.readexactly(self._fp, size)
459
459
460
460
461 class unbundle20(unpackermixin):
461 class unbundle20(unpackermixin):
462 """interpret a bundle2 stream
462 """interpret a bundle2 stream
463
463
464 This class is fed with a binary stream and yields parts through its
464 This class is fed with a binary stream and yields parts through its
465 `iterparts` methods."""
465 `iterparts` methods."""
466
466
467 def __init__(self, ui, fp, header=None):
467 def __init__(self, ui, fp, header=None):
468 """If header is specified, we do not read it out of the stream."""
468 """If header is specified, we do not read it out of the stream."""
469 self.ui = ui
469 self.ui = ui
470 super(unbundle20, self).__init__(fp)
470 super(unbundle20, self).__init__(fp)
471 if header is None:
471 if header is None:
472 header = self._readexact(4)
472 header = self._readexact(4)
473 magic, version = header[0:2], header[2:4]
473 magic, version = header[0:2], header[2:4]
474 if magic != 'HG':
474 if magic != 'HG':
475 raise util.Abort(_('not a Mercurial bundle'))
475 raise util.Abort(_('not a Mercurial bundle'))
476 if version != '2X':
476 if version != '2X':
477 raise util.Abort(_('unknown bundle version %s') % version)
477 raise util.Abort(_('unknown bundle version %s') % version)
478 self.ui.debug('start processing of %s stream\n' % header)
478 self.ui.debug('start processing of %s stream\n' % header)
479
479
480 @util.propertycache
480 @util.propertycache
481 def params(self):
481 def params(self):
482 """dictionary of stream level parameters"""
482 """dictionary of stream level parameters"""
483 self.ui.debug('reading bundle2 stream parameters\n')
483 self.ui.debug('reading bundle2 stream parameters\n')
484 params = {}
484 params = {}
485 paramssize = self._unpack(_fstreamparamsize)[0]
485 paramssize = self._unpack(_fstreamparamsize)[0]
486 if paramssize:
486 if paramssize:
487 for p in self._readexact(paramssize).split(' '):
487 for p in self._readexact(paramssize).split(' '):
488 p = p.split('=', 1)
488 p = p.split('=', 1)
489 p = [urllib.unquote(i) for i in p]
489 p = [urllib.unquote(i) for i in p]
490 if len(p) < 2:
490 if len(p) < 2:
491 p.append(None)
491 p.append(None)
492 self._processparam(*p)
492 self._processparam(*p)
493 params[p[0]] = p[1]
493 params[p[0]] = p[1]
494 return params
494 return params
495
495
496 def _processparam(self, name, value):
496 def _processparam(self, name, value):
497 """process a parameter, applying its effect if needed
497 """process a parameter, applying its effect if needed
498
498
499 Parameter starting with a lower case letter are advisory and will be
499 Parameter starting with a lower case letter are advisory and will be
500 ignored when unknown. Those starting with an upper case letter are
500 ignored when unknown. Those starting with an upper case letter are
501 mandatory and will this function will raise a KeyError when unknown.
501 mandatory and will this function will raise a KeyError when unknown.
502
502
503 Note: no option are currently supported. Any input will be either
503 Note: no option are currently supported. Any input will be either
504 ignored or failing.
504 ignored or failing.
505 """
505 """
506 if not name:
506 if not name:
507 raise ValueError('empty parameter name')
507 raise ValueError('empty parameter name')
508 if name[0] not in string.letters:
508 if name[0] not in string.letters:
509 raise ValueError('non letter first character: %r' % name)
509 raise ValueError('non letter first character: %r' % name)
510 # Some logic will be later added here to try to process the option for
510 # Some logic will be later added here to try to process the option for
511 # a dict of known parameter.
511 # a dict of known parameter.
512 if name[0].islower():
512 if name[0].islower():
513 self.ui.debug("ignoring unknown parameter %r\n" % name)
513 self.ui.debug("ignoring unknown parameter %r\n" % name)
514 else:
514 else:
515 raise KeyError(name)
515 raise KeyError(name)
516
516
517
517
518 def iterparts(self):
518 def iterparts(self):
519 """yield all parts contained in the stream"""
519 """yield all parts contained in the stream"""
520 # make sure param have been loaded
520 # make sure param have been loaded
521 self.params
521 self.params
522 self.ui.debug('start extraction of bundle2 parts\n')
522 self.ui.debug('start extraction of bundle2 parts\n')
523 headerblock = self._readpartheader()
523 headerblock = self._readpartheader()
524 while headerblock is not None:
524 while headerblock is not None:
525 part = unbundlepart(self.ui, headerblock, self._fp)
525 part = unbundlepart(self.ui, headerblock, self._fp)
526 yield part
526 yield part
527 headerblock = self._readpartheader()
527 headerblock = self._readpartheader()
528 self.ui.debug('end of bundle2 stream\n')
528 self.ui.debug('end of bundle2 stream\n')
529
529
530 def _readpartheader(self):
530 def _readpartheader(self):
531 """reads a part header size and return the bytes blob
531 """reads a part header size and return the bytes blob
532
532
533 returns None if empty"""
533 returns None if empty"""
534 headersize = self._unpack(_fpartheadersize)[0]
534 headersize = self._unpack(_fpartheadersize)[0]
535 self.ui.debug('part header size: %i\n' % headersize)
535 self.ui.debug('part header size: %i\n' % headersize)
536 if headersize:
536 if headersize:
537 return self._readexact(headersize)
537 return self._readexact(headersize)
538 return None
538 return None
539
539
540
540
541 class bundlepart(object):
541 class bundlepart(object):
542 """A bundle2 part contains application level payload
542 """A bundle2 part contains application level payload
543
543
544 The part `type` is used to route the part to the application level
544 The part `type` is used to route the part to the application level
545 handler.
545 handler.
546 """
546 """
547
547
548 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
548 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
549 data=''):
549 data=''):
550 self.id = None
550 self.id = None
551 self.type = parttype
551 self.type = parttype
552 self.data = data
552 self.data = data
553 self.mandatoryparams = mandatoryparams
553 self.mandatoryparams = mandatoryparams
554 self.advisoryparams = advisoryparams
554 self.advisoryparams = advisoryparams
555 # status of the part's generation:
556 # - None: not started,
557 # - False: currently generated,
558 # - True: generation done.
559 self._generated = None
555
560
561 # methods used to generates the bundle2 stream
556 def getchunks(self):
562 def getchunks(self):
563 if self._generated is not None:
564 raise RuntimeError('part can only be consumed once')
565 self._generated = False
557 #### header
566 #### header
558 ## parttype
567 ## parttype
559 header = [_pack(_fparttypesize, len(self.type)),
568 header = [_pack(_fparttypesize, len(self.type)),
560 self.type, _pack(_fpartid, self.id),
569 self.type, _pack(_fpartid, self.id),
561 ]
570 ]
562 ## parameters
571 ## parameters
563 # count
572 # count
564 manpar = self.mandatoryparams
573 manpar = self.mandatoryparams
565 advpar = self.advisoryparams
574 advpar = self.advisoryparams
566 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
575 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
567 # size
576 # size
568 parsizes = []
577 parsizes = []
569 for key, value in manpar:
578 for key, value in manpar:
570 parsizes.append(len(key))
579 parsizes.append(len(key))
571 parsizes.append(len(value))
580 parsizes.append(len(value))
572 for key, value in advpar:
581 for key, value in advpar:
573 parsizes.append(len(key))
582 parsizes.append(len(key))
574 parsizes.append(len(value))
583 parsizes.append(len(value))
575 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
584 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
576 header.append(paramsizes)
585 header.append(paramsizes)
577 # key, value
586 # key, value
578 for key, value in manpar:
587 for key, value in manpar:
579 header.append(key)
588 header.append(key)
580 header.append(value)
589 header.append(value)
581 for key, value in advpar:
590 for key, value in advpar:
582 header.append(key)
591 header.append(key)
583 header.append(value)
592 header.append(value)
584 ## finalize header
593 ## finalize header
585 headerchunk = ''.join(header)
594 headerchunk = ''.join(header)
586 yield _pack(_fpartheadersize, len(headerchunk))
595 yield _pack(_fpartheadersize, len(headerchunk))
587 yield headerchunk
596 yield headerchunk
588 ## payload
597 ## payload
589 for chunk in self._payloadchunks():
598 for chunk in self._payloadchunks():
590 yield _pack(_fpayloadsize, len(chunk))
599 yield _pack(_fpayloadsize, len(chunk))
591 yield chunk
600 yield chunk
592 # end of payload
601 # end of payload
593 yield _pack(_fpayloadsize, 0)
602 yield _pack(_fpayloadsize, 0)
603 self._generated = True
594
604
595 def _payloadchunks(self):
605 def _payloadchunks(self):
596 """yield chunks of a the part payload
606 """yield chunks of a the part payload
597
607
598 Exists to handle the different methods to provide data to a part."""
608 Exists to handle the different methods to provide data to a part."""
599 # we only support fixed size data now.
609 # we only support fixed size data now.
600 # This will be improved in the future.
610 # This will be improved in the future.
601 if util.safehasattr(self.data, 'next'):
611 if util.safehasattr(self.data, 'next'):
602 buff = util.chunkbuffer(self.data)
612 buff = util.chunkbuffer(self.data)
603 chunk = buff.read(preferedchunksize)
613 chunk = buff.read(preferedchunksize)
604 while chunk:
614 while chunk:
605 yield chunk
615 yield chunk
606 chunk = buff.read(preferedchunksize)
616 chunk = buff.read(preferedchunksize)
607 elif len(self.data):
617 elif len(self.data):
608 yield self.data
618 yield self.data
609
619
610 class unbundlepart(unpackermixin):
620 class unbundlepart(unpackermixin):
611 """a bundle part read from a bundle"""
621 """a bundle part read from a bundle"""
612
622
613 def __init__(self, ui, header, fp):
623 def __init__(self, ui, header, fp):
614 super(unbundlepart, self).__init__(fp)
624 super(unbundlepart, self).__init__(fp)
615 self.ui = ui
625 self.ui = ui
616 # unbundle state attr
626 # unbundle state attr
617 self._headerdata = header
627 self._headerdata = header
618 self._headeroffset = 0
628 self._headeroffset = 0
619 self._initialized = False
629 self._initialized = False
620 self.consumed = False
630 self.consumed = False
621 # part data
631 # part data
622 self.id = None
632 self.id = None
623 self.type = None
633 self.type = None
624 self.mandatoryparams = None
634 self.mandatoryparams = None
625 self.advisoryparams = None
635 self.advisoryparams = None
626 self._payloadstream = None
636 self._payloadstream = None
627 self._readheader()
637 self._readheader()
628
638
629 def _fromheader(self, size):
639 def _fromheader(self, size):
630 """return the next <size> byte from the header"""
640 """return the next <size> byte from the header"""
631 offset = self._headeroffset
641 offset = self._headeroffset
632 data = self._headerdata[offset:(offset + size)]
642 data = self._headerdata[offset:(offset + size)]
633 self._headeroffset = offset + size
643 self._headeroffset = offset + size
634 return data
644 return data
635
645
636 def _unpackheader(self, format):
646 def _unpackheader(self, format):
637 """read given format from header
647 """read given format from header
638
648
639 This automatically compute the size of the format to read."""
649 This automatically compute the size of the format to read."""
640 data = self._fromheader(struct.calcsize(format))
650 data = self._fromheader(struct.calcsize(format))
641 return _unpack(format, data)
651 return _unpack(format, data)
642
652
643 def _readheader(self):
653 def _readheader(self):
644 """read the header and setup the object"""
654 """read the header and setup the object"""
645 typesize = self._unpackheader(_fparttypesize)[0]
655 typesize = self._unpackheader(_fparttypesize)[0]
646 self.type = self._fromheader(typesize)
656 self.type = self._fromheader(typesize)
647 self.ui.debug('part type: "%s"\n' % self.type)
657 self.ui.debug('part type: "%s"\n' % self.type)
648 self.id = self._unpackheader(_fpartid)[0]
658 self.id = self._unpackheader(_fpartid)[0]
649 self.ui.debug('part id: "%s"\n' % self.id)
659 self.ui.debug('part id: "%s"\n' % self.id)
650 ## reading parameters
660 ## reading parameters
651 # param count
661 # param count
652 mancount, advcount = self._unpackheader(_fpartparamcount)
662 mancount, advcount = self._unpackheader(_fpartparamcount)
653 self.ui.debug('part parameters: %i\n' % (mancount + advcount))
663 self.ui.debug('part parameters: %i\n' % (mancount + advcount))
654 # param size
664 # param size
655 fparamsizes = _makefpartparamsizes(mancount + advcount)
665 fparamsizes = _makefpartparamsizes(mancount + advcount)
656 paramsizes = self._unpackheader(fparamsizes)
666 paramsizes = self._unpackheader(fparamsizes)
657 # make it a list of couple again
667 # make it a list of couple again
658 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
668 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
659 # split mandatory from advisory
669 # split mandatory from advisory
660 mansizes = paramsizes[:mancount]
670 mansizes = paramsizes[:mancount]
661 advsizes = paramsizes[mancount:]
671 advsizes = paramsizes[mancount:]
662 # retrive param value
672 # retrive param value
663 manparams = []
673 manparams = []
664 for key, value in mansizes:
674 for key, value in mansizes:
665 manparams.append((self._fromheader(key), self._fromheader(value)))
675 manparams.append((self._fromheader(key), self._fromheader(value)))
666 advparams = []
676 advparams = []
667 for key, value in advsizes:
677 for key, value in advsizes:
668 advparams.append((self._fromheader(key), self._fromheader(value)))
678 advparams.append((self._fromheader(key), self._fromheader(value)))
669 self.mandatoryparams = manparams
679 self.mandatoryparams = manparams
670 self.advisoryparams = advparams
680 self.advisoryparams = advparams
671 ## part payload
681 ## part payload
672 def payloadchunks():
682 def payloadchunks():
673 payloadsize = self._unpack(_fpayloadsize)[0]
683 payloadsize = self._unpack(_fpayloadsize)[0]
674 self.ui.debug('payload chunk size: %i\n' % payloadsize)
684 self.ui.debug('payload chunk size: %i\n' % payloadsize)
675 while payloadsize:
685 while payloadsize:
676 yield self._readexact(payloadsize)
686 yield self._readexact(payloadsize)
677 payloadsize = self._unpack(_fpayloadsize)[0]
687 payloadsize = self._unpack(_fpayloadsize)[0]
678 self.ui.debug('payload chunk size: %i\n' % payloadsize)
688 self.ui.debug('payload chunk size: %i\n' % payloadsize)
679 self._payloadstream = util.chunkbuffer(payloadchunks())
689 self._payloadstream = util.chunkbuffer(payloadchunks())
680 # we read the data, tell it
690 # we read the data, tell it
681 self._initialized = True
691 self._initialized = True
682
692
683 def read(self, size=None):
693 def read(self, size=None):
684 """read payload data"""
694 """read payload data"""
685 if not self._initialized:
695 if not self._initialized:
686 self._readheader()
696 self._readheader()
687 if size is None:
697 if size is None:
688 data = self._payloadstream.read()
698 data = self._payloadstream.read()
689 else:
699 else:
690 data = self._payloadstream.read(size)
700 data = self._payloadstream.read(size)
691 if size is None or len(data) < size:
701 if size is None or len(data) < size:
692 self.consumed = True
702 self.consumed = True
693 return data
703 return data
694
704
695
705
696 @parthandler('b2x:changegroup')
706 @parthandler('b2x:changegroup')
697 def handlechangegroup(op, inpart):
707 def handlechangegroup(op, inpart):
698 """apply a changegroup part on the repo
708 """apply a changegroup part on the repo
699
709
700 This is a very early implementation that will massive rework before being
710 This is a very early implementation that will massive rework before being
701 inflicted to any end-user.
711 inflicted to any end-user.
702 """
712 """
703 # Make sure we trigger a transaction creation
713 # Make sure we trigger a transaction creation
704 #
714 #
705 # The addchangegroup function will get a transaction object by itself, but
715 # The addchangegroup function will get a transaction object by itself, but
706 # we need to make sure we trigger the creation of a transaction object used
716 # we need to make sure we trigger the creation of a transaction object used
707 # for the whole processing scope.
717 # for the whole processing scope.
708 op.gettransaction()
718 op.gettransaction()
709 cg = changegroup.unbundle10(inpart, 'UN')
719 cg = changegroup.unbundle10(inpart, 'UN')
710 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
720 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
711 op.records.add('changegroup', {'return': ret})
721 op.records.add('changegroup', {'return': ret})
712 if op.reply is not None:
722 if op.reply is not None:
713 # This is definitly not the final form of this
723 # This is definitly not the final form of this
714 # return. But one need to start somewhere.
724 # return. But one need to start somewhere.
715 op.reply.newpart('b2x:reply:changegroup', (),
725 op.reply.newpart('b2x:reply:changegroup', (),
716 [('in-reply-to', str(inpart.id)),
726 [('in-reply-to', str(inpart.id)),
717 ('return', '%i' % ret)])
727 ('return', '%i' % ret)])
718 assert not inpart.read()
728 assert not inpart.read()
719
729
720 @parthandler('b2x:reply:changegroup')
730 @parthandler('b2x:reply:changegroup')
721 def handlechangegroup(op, inpart):
731 def handlechangegroup(op, inpart):
722 p = dict(inpart.advisoryparams)
732 p = dict(inpart.advisoryparams)
723 ret = int(p['return'])
733 ret = int(p['return'])
724 op.records.add('changegroup', {'return': ret}, int(p['in-reply-to']))
734 op.records.add('changegroup', {'return': ret}, int(p['in-reply-to']))
725
735
726 @parthandler('b2x:check:heads')
736 @parthandler('b2x:check:heads')
727 def handlechangegroup(op, inpart):
737 def handlechangegroup(op, inpart):
728 """check that head of the repo did not change
738 """check that head of the repo did not change
729
739
730 This is used to detect a push race when using unbundle.
740 This is used to detect a push race when using unbundle.
731 This replaces the "heads" argument of unbundle."""
741 This replaces the "heads" argument of unbundle."""
732 h = inpart.read(20)
742 h = inpart.read(20)
733 heads = []
743 heads = []
734 while len(h) == 20:
744 while len(h) == 20:
735 heads.append(h)
745 heads.append(h)
736 h = inpart.read(20)
746 h = inpart.read(20)
737 assert not h
747 assert not h
738 if heads != op.repo.heads():
748 if heads != op.repo.heads():
739 raise error.PushRaced('repository changed while pushing - '
749 raise error.PushRaced('repository changed while pushing - '
740 'please try again')
750 'please try again')
741
751
742 @parthandler('b2x:output')
752 @parthandler('b2x:output')
743 def handleoutput(op, inpart):
753 def handleoutput(op, inpart):
744 """forward output captured on the server to the client"""
754 """forward output captured on the server to the client"""
745 for line in inpart.read().splitlines():
755 for line in inpart.read().splitlines():
746 op.ui.write(('remote: %s\n' % line))
756 op.ui.write(('remote: %s\n' % line))
747
757
748 @parthandler('b2x:replycaps')
758 @parthandler('b2x:replycaps')
749 def handlereplycaps(op, inpart):
759 def handlereplycaps(op, inpart):
750 """Notify that a reply bundle should be created
760 """Notify that a reply bundle should be created
751
761
752 The payload contains the capabilities information for the reply"""
762 The payload contains the capabilities information for the reply"""
753 caps = decodecaps(inpart.read())
763 caps = decodecaps(inpart.read())
754 if op.reply is None:
764 if op.reply is None:
755 op.reply = bundle20(op.ui, caps)
765 op.reply = bundle20(op.ui, caps)
756
766
757 @parthandler('b2x:error:abort')
767 @parthandler('b2x:error:abort')
758 def handlereplycaps(op, inpart):
768 def handlereplycaps(op, inpart):
759 """Used to transmit abort error over the wire"""
769 """Used to transmit abort error over the wire"""
760 manargs = dict(inpart.mandatoryparams)
770 manargs = dict(inpart.mandatoryparams)
761 advargs = dict(inpart.advisoryparams)
771 advargs = dict(inpart.advisoryparams)
762 raise util.Abort(manargs['message'], hint=advargs.get('hint'))
772 raise util.Abort(manargs['message'], hint=advargs.get('hint'))
763
773
764 @parthandler('b2x:error:unknownpart')
774 @parthandler('b2x:error:unknownpart')
765 def handlereplycaps(op, inpart):
775 def handlereplycaps(op, inpart):
766 """Used to transmit unknown part error over the wire"""
776 """Used to transmit unknown part error over the wire"""
767 manargs = dict(inpart.mandatoryparams)
777 manargs = dict(inpart.mandatoryparams)
768 raise UnknownPartError(manargs['parttype'])
778 raise UnknownPartError(manargs['parttype'])
769
779
770 @parthandler('b2x:error:pushraced')
780 @parthandler('b2x:error:pushraced')
771 def handlereplycaps(op, inpart):
781 def handlereplycaps(op, inpart):
772 """Used to transmit push race error over the wire"""
782 """Used to transmit push race error over the wire"""
773 manargs = dict(inpart.mandatoryparams)
783 manargs = dict(inpart.mandatoryparams)
774 raise error.ResponseError(_('push failed:'), manargs['message'])
784 raise error.ResponseError(_('push failed:'), manargs['message'])
General Comments 0
You need to be logged in to leave comments. Login now