##// END OF EJS Templates
bundle2: detect and disallow a negative chunk size...
Pierre-Yves David -
r23011:006a81d0 default
parent child Browse files
Show More
@@ -1,956 +1,965 b''
1 # bundle2.py - generic container format to transmit arbitrary data.
1 # bundle2.py - generic container format to transmit arbitrary data.
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7 """Handling of the new bundle2 format
7 """Handling of the new bundle2 format
8
8
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 payloads in an application agnostic way. It consist in a sequence of "parts"
10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 that will be handed to and processed by the application layer.
11 that will be handed to and processed by the application layer.
12
12
13
13
14 General format architecture
14 General format architecture
15 ===========================
15 ===========================
16
16
17 The format is architectured as follow
17 The format is architectured as follow
18
18
19 - magic string
19 - magic string
20 - stream level parameters
20 - stream level parameters
21 - payload parts (any number)
21 - payload parts (any number)
22 - end of stream marker.
22 - end of stream marker.
23
23
24 the Binary format
24 the Binary format
25 ============================
25 ============================
26
26
27 All numbers are unsigned and big-endian.
27 All numbers are unsigned and big-endian.
28
28
29 stream level parameters
29 stream level parameters
30 ------------------------
30 ------------------------
31
31
32 Binary format is as follow
32 Binary format is as follow
33
33
34 :params size: int32
34 :params size: int32
35
35
36 The total number of Bytes used by the parameters
36 The total number of Bytes used by the parameters
37
37
38 :params value: arbitrary number of Bytes
38 :params value: arbitrary number of Bytes
39
39
40 A blob of `params size` containing the serialized version of all stream level
40 A blob of `params size` containing the serialized version of all stream level
41 parameters.
41 parameters.
42
42
43 The blob contains a space separated list of parameters. Parameters with value
43 The blob contains a space separated list of parameters. Parameters with value
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45
45
46 Empty name are obviously forbidden.
46 Empty name are obviously forbidden.
47
47
48 Name MUST start with a letter. If this first letter is lower case, the
48 Name MUST start with a letter. If this first letter is lower case, the
49 parameter is advisory and can be safely ignored. However when the first
49 parameter is advisory and can be safely ignored. However when the first
50 letter is capital, the parameter is mandatory and the bundling process MUST
50 letter is capital, the parameter is mandatory and the bundling process MUST
51 stop if he is not able to proceed it.
51 stop if he is not able to proceed it.
52
52
53 Stream parameters use a simple textual format for two main reasons:
53 Stream parameters use a simple textual format for two main reasons:
54
54
55 - Stream level parameters should remain simple and we want to discourage any
55 - Stream level parameters should remain simple and we want to discourage any
56 crazy usage.
56 crazy usage.
57 - Textual data allow easy human inspection of a bundle2 header in case of
57 - Textual data allow easy human inspection of a bundle2 header in case of
58 troubles.
58 troubles.
59
59
60 Any Applicative level options MUST go into a bundle2 part instead.
60 Any Applicative level options MUST go into a bundle2 part instead.
61
61
62 Payload part
62 Payload part
63 ------------------------
63 ------------------------
64
64
65 Binary format is as follow
65 Binary format is as follow
66
66
67 :header size: int32
67 :header size: int32
68
68
69 The total number of Bytes used by the part headers. When the header is empty
69 The total number of Bytes used by the part headers. When the header is empty
70 (size = 0) this is interpreted as the end of stream marker.
70 (size = 0) this is interpreted as the end of stream marker.
71
71
72 :header:
72 :header:
73
73
74 The header defines how to interpret the part. It contains two piece of
74 The header defines how to interpret the part. It contains two piece of
75 data: the part type, and the part parameters.
75 data: the part type, and the part parameters.
76
76
77 The part type is used to route an application level handler, that can
77 The part type is used to route an application level handler, that can
78 interpret payload.
78 interpret payload.
79
79
80 Part parameters are passed to the application level handler. They are
80 Part parameters are passed to the application level handler. They are
81 meant to convey information that will help the application level object to
81 meant to convey information that will help the application level object to
82 interpret the part payload.
82 interpret the part payload.
83
83
84 The binary format of the header is has follow
84 The binary format of the header is has follow
85
85
86 :typesize: (one byte)
86 :typesize: (one byte)
87
87
88 :parttype: alphanumerical part name
88 :parttype: alphanumerical part name
89
89
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 to this part.
91 to this part.
92
92
93 :parameters:
93 :parameters:
94
94
95 Part's parameter may have arbitrary content, the binary structure is::
95 Part's parameter may have arbitrary content, the binary structure is::
96
96
97 <mandatory-count><advisory-count><param-sizes><param-data>
97 <mandatory-count><advisory-count><param-sizes><param-data>
98
98
99 :mandatory-count: 1 byte, number of mandatory parameters
99 :mandatory-count: 1 byte, number of mandatory parameters
100
100
101 :advisory-count: 1 byte, number of advisory parameters
101 :advisory-count: 1 byte, number of advisory parameters
102
102
103 :param-sizes:
103 :param-sizes:
104
104
105 N couple of bytes, where N is the total number of parameters. Each
105 N couple of bytes, where N is the total number of parameters. Each
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107
107
108 :param-data:
108 :param-data:
109
109
110 A blob of bytes from which each parameter key and value can be
110 A blob of bytes from which each parameter key and value can be
111 retrieved using the list of size couples stored in the previous
111 retrieved using the list of size couples stored in the previous
112 field.
112 field.
113
113
114 Mandatory parameters comes first, then the advisory ones.
114 Mandatory parameters comes first, then the advisory ones.
115
115
116 Each parameter's key MUST be unique within the part.
116 Each parameter's key MUST be unique within the part.
117
117
118 :payload:
118 :payload:
119
119
120 payload is a series of `<chunksize><chunkdata>`.
120 payload is a series of `<chunksize><chunkdata>`.
121
121
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124
124
125 The current implementation always produces either zero or one chunk.
125 The current implementation always produces either zero or one chunk.
126 This is an implementation limitation that will ultimately be lifted.
126 This is an implementation limitation that will ultimately be lifted.
127
127
128 `chunksize` can be negative to trigger special case processing. No such
128 `chunksize` can be negative to trigger special case processing. No such
129 processing is in place yet.
129 processing is in place yet.
130
130
131 Bundle processing
131 Bundle processing
132 ============================
132 ============================
133
133
134 Each part is processed in order using a "part handler". Handler are registered
134 Each part is processed in order using a "part handler". Handler are registered
135 for a certain part type.
135 for a certain part type.
136
136
137 The matching of a part to its handler is case insensitive. The case of the
137 The matching of a part to its handler is case insensitive. The case of the
138 part type is used to know if a part is mandatory or advisory. If the Part type
138 part type is used to know if a part is mandatory or advisory. If the Part type
139 contains any uppercase char it is considered mandatory. When no handler is
139 contains any uppercase char it is considered mandatory. When no handler is
140 known for a Mandatory part, the process is aborted and an exception is raised.
140 known for a Mandatory part, the process is aborted and an exception is raised.
141 If the part is advisory and no handler is known, the part is ignored. When the
141 If the part is advisory and no handler is known, the part is ignored. When the
142 process is aborted, the full bundle is still read from the stream to keep the
142 process is aborted, the full bundle is still read from the stream to keep the
143 channel usable. But none of the part read from an abort are processed. In the
143 channel usable. But none of the part read from an abort are processed. In the
144 future, dropping the stream may become an option for channel we do not care to
144 future, dropping the stream may become an option for channel we do not care to
145 preserve.
145 preserve.
146 """
146 """
147
147
148 import util
148 import util
149 import struct
149 import struct
150 import urllib
150 import urllib
151 import string
151 import string
152 import obsolete
152 import obsolete
153 import pushkey
153 import pushkey
154
154
155 import changegroup, error
155 import changegroup, error
156 from i18n import _
156 from i18n import _
157
157
158 _pack = struct.pack
158 _pack = struct.pack
159 _unpack = struct.unpack
159 _unpack = struct.unpack
160
160
161 _magicstring = 'HG2Y'
161 _magicstring = 'HG2Y'
162
162
163 _fstreamparamsize = '>i'
163 _fstreamparamsize = '>i'
164 _fpartheadersize = '>i'
164 _fpartheadersize = '>i'
165 _fparttypesize = '>B'
165 _fparttypesize = '>B'
166 _fpartid = '>I'
166 _fpartid = '>I'
167 _fpayloadsize = '>i'
167 _fpayloadsize = '>i'
168 _fpartparamcount = '>BB'
168 _fpartparamcount = '>BB'
169
169
170 preferedchunksize = 4096
170 preferedchunksize = 4096
171
171
172 def _makefpartparamsizes(nbparams):
172 def _makefpartparamsizes(nbparams):
173 """return a struct format to read part parameter sizes
173 """return a struct format to read part parameter sizes
174
174
175 The number parameters is variable so we need to build that format
175 The number parameters is variable so we need to build that format
176 dynamically.
176 dynamically.
177 """
177 """
178 return '>'+('BB'*nbparams)
178 return '>'+('BB'*nbparams)
179
179
180 parthandlermapping = {}
180 parthandlermapping = {}
181
181
182 def parthandler(parttype, params=()):
182 def parthandler(parttype, params=()):
183 """decorator that register a function as a bundle2 part handler
183 """decorator that register a function as a bundle2 part handler
184
184
185 eg::
185 eg::
186
186
187 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
187 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
188 def myparttypehandler(...):
188 def myparttypehandler(...):
189 '''process a part of type "my part".'''
189 '''process a part of type "my part".'''
190 ...
190 ...
191 """
191 """
192 def _decorator(func):
192 def _decorator(func):
193 lparttype = parttype.lower() # enforce lower case matching.
193 lparttype = parttype.lower() # enforce lower case matching.
194 assert lparttype not in parthandlermapping
194 assert lparttype not in parthandlermapping
195 parthandlermapping[lparttype] = func
195 parthandlermapping[lparttype] = func
196 func.params = frozenset(params)
196 func.params = frozenset(params)
197 return func
197 return func
198 return _decorator
198 return _decorator
199
199
200 class unbundlerecords(object):
200 class unbundlerecords(object):
201 """keep record of what happens during and unbundle
201 """keep record of what happens during and unbundle
202
202
203 New records are added using `records.add('cat', obj)`. Where 'cat' is a
203 New records are added using `records.add('cat', obj)`. Where 'cat' is a
204 category of record and obj is an arbitrary object.
204 category of record and obj is an arbitrary object.
205
205
206 `records['cat']` will return all entries of this category 'cat'.
206 `records['cat']` will return all entries of this category 'cat'.
207
207
208 Iterating on the object itself will yield `('category', obj)` tuples
208 Iterating on the object itself will yield `('category', obj)` tuples
209 for all entries.
209 for all entries.
210
210
211 All iterations happens in chronological order.
211 All iterations happens in chronological order.
212 """
212 """
213
213
214 def __init__(self):
214 def __init__(self):
215 self._categories = {}
215 self._categories = {}
216 self._sequences = []
216 self._sequences = []
217 self._replies = {}
217 self._replies = {}
218
218
219 def add(self, category, entry, inreplyto=None):
219 def add(self, category, entry, inreplyto=None):
220 """add a new record of a given category.
220 """add a new record of a given category.
221
221
222 The entry can then be retrieved in the list returned by
222 The entry can then be retrieved in the list returned by
223 self['category']."""
223 self['category']."""
224 self._categories.setdefault(category, []).append(entry)
224 self._categories.setdefault(category, []).append(entry)
225 self._sequences.append((category, entry))
225 self._sequences.append((category, entry))
226 if inreplyto is not None:
226 if inreplyto is not None:
227 self.getreplies(inreplyto).add(category, entry)
227 self.getreplies(inreplyto).add(category, entry)
228
228
229 def getreplies(self, partid):
229 def getreplies(self, partid):
230 """get the subrecords that replies to a specific part"""
230 """get the subrecords that replies to a specific part"""
231 return self._replies.setdefault(partid, unbundlerecords())
231 return self._replies.setdefault(partid, unbundlerecords())
232
232
233 def __getitem__(self, cat):
233 def __getitem__(self, cat):
234 return tuple(self._categories.get(cat, ()))
234 return tuple(self._categories.get(cat, ()))
235
235
236 def __iter__(self):
236 def __iter__(self):
237 return iter(self._sequences)
237 return iter(self._sequences)
238
238
239 def __len__(self):
239 def __len__(self):
240 return len(self._sequences)
240 return len(self._sequences)
241
241
242 def __nonzero__(self):
242 def __nonzero__(self):
243 return bool(self._sequences)
243 return bool(self._sequences)
244
244
245 class bundleoperation(object):
245 class bundleoperation(object):
246 """an object that represents a single bundling process
246 """an object that represents a single bundling process
247
247
248 Its purpose is to carry unbundle-related objects and states.
248 Its purpose is to carry unbundle-related objects and states.
249
249
250 A new object should be created at the beginning of each bundle processing.
250 A new object should be created at the beginning of each bundle processing.
251 The object is to be returned by the processing function.
251 The object is to be returned by the processing function.
252
252
253 The object has very little content now it will ultimately contain:
253 The object has very little content now it will ultimately contain:
254 * an access to the repo the bundle is applied to,
254 * an access to the repo the bundle is applied to,
255 * a ui object,
255 * a ui object,
256 * a way to retrieve a transaction to add changes to the repo,
256 * a way to retrieve a transaction to add changes to the repo,
257 * a way to record the result of processing each part,
257 * a way to record the result of processing each part,
258 * a way to construct a bundle response when applicable.
258 * a way to construct a bundle response when applicable.
259 """
259 """
260
260
261 def __init__(self, repo, transactiongetter):
261 def __init__(self, repo, transactiongetter):
262 self.repo = repo
262 self.repo = repo
263 self.ui = repo.ui
263 self.ui = repo.ui
264 self.records = unbundlerecords()
264 self.records = unbundlerecords()
265 self.gettransaction = transactiongetter
265 self.gettransaction = transactiongetter
266 self.reply = None
266 self.reply = None
267
267
268 class TransactionUnavailable(RuntimeError):
268 class TransactionUnavailable(RuntimeError):
269 pass
269 pass
270
270
271 def _notransaction():
271 def _notransaction():
272 """default method to get a transaction while processing a bundle
272 """default method to get a transaction while processing a bundle
273
273
274 Raise an exception to highlight the fact that no transaction was expected
274 Raise an exception to highlight the fact that no transaction was expected
275 to be created"""
275 to be created"""
276 raise TransactionUnavailable()
276 raise TransactionUnavailable()
277
277
278 def processbundle(repo, unbundler, transactiongetter=_notransaction):
278 def processbundle(repo, unbundler, transactiongetter=_notransaction):
279 """This function process a bundle, apply effect to/from a repo
279 """This function process a bundle, apply effect to/from a repo
280
280
281 It iterates over each part then searches for and uses the proper handling
281 It iterates over each part then searches for and uses the proper handling
282 code to process the part. Parts are processed in order.
282 code to process the part. Parts are processed in order.
283
283
284 This is very early version of this function that will be strongly reworked
284 This is very early version of this function that will be strongly reworked
285 before final usage.
285 before final usage.
286
286
287 Unknown Mandatory part will abort the process.
287 Unknown Mandatory part will abort the process.
288 """
288 """
289 op = bundleoperation(repo, transactiongetter)
289 op = bundleoperation(repo, transactiongetter)
290 # todo:
290 # todo:
291 # - replace this is a init function soon.
291 # - replace this is a init function soon.
292 # - exception catching
292 # - exception catching
293 unbundler.params
293 unbundler.params
294 iterparts = unbundler.iterparts()
294 iterparts = unbundler.iterparts()
295 part = None
295 part = None
296 try:
296 try:
297 for part in iterparts:
297 for part in iterparts:
298 _processpart(op, part)
298 _processpart(op, part)
299 except Exception, exc:
299 except Exception, exc:
300 for part in iterparts:
300 for part in iterparts:
301 # consume the bundle content
301 # consume the bundle content
302 part.read()
302 part.read()
303 # Small hack to let caller code distinguish exceptions from bundle2
303 # Small hack to let caller code distinguish exceptions from bundle2
304 # processing fron the ones from bundle1 processing. This is mostly
304 # processing fron the ones from bundle1 processing. This is mostly
305 # needed to handle different return codes to unbundle according to the
305 # needed to handle different return codes to unbundle according to the
306 # type of bundle. We should probably clean up or drop this return code
306 # type of bundle. We should probably clean up or drop this return code
307 # craziness in a future version.
307 # craziness in a future version.
308 exc.duringunbundle2 = True
308 exc.duringunbundle2 = True
309 raise
309 raise
310 return op
310 return op
311
311
312 def _processpart(op, part):
312 def _processpart(op, part):
313 """process a single part from a bundle
313 """process a single part from a bundle
314
314
315 The part is guaranteed to have been fully consumed when the function exits
315 The part is guaranteed to have been fully consumed when the function exits
316 (even if an exception is raised)."""
316 (even if an exception is raised)."""
317 try:
317 try:
318 parttype = part.type
318 parttype = part.type
319 # part key are matched lower case
319 # part key are matched lower case
320 key = parttype.lower()
320 key = parttype.lower()
321 try:
321 try:
322 handler = parthandlermapping.get(key)
322 handler = parthandlermapping.get(key)
323 if handler is None:
323 if handler is None:
324 raise error.UnsupportedPartError(parttype=key)
324 raise error.UnsupportedPartError(parttype=key)
325 op.ui.debug('found a handler for part %r\n' % parttype)
325 op.ui.debug('found a handler for part %r\n' % parttype)
326 unknownparams = part.mandatorykeys - handler.params
326 unknownparams = part.mandatorykeys - handler.params
327 if unknownparams:
327 if unknownparams:
328 unknownparams = list(unknownparams)
328 unknownparams = list(unknownparams)
329 unknownparams.sort()
329 unknownparams.sort()
330 raise error.UnsupportedPartError(parttype=key,
330 raise error.UnsupportedPartError(parttype=key,
331 params=unknownparams)
331 params=unknownparams)
332 except error.UnsupportedPartError, exc:
332 except error.UnsupportedPartError, exc:
333 if key != parttype: # mandatory parts
333 if key != parttype: # mandatory parts
334 raise
334 raise
335 op.ui.debug('ignoring unsupported advisory part %s\n' % exc)
335 op.ui.debug('ignoring unsupported advisory part %s\n' % exc)
336 return # skip to part processing
336 return # skip to part processing
337
337
338 # handler is called outside the above try block so that we don't
338 # handler is called outside the above try block so that we don't
339 # risk catching KeyErrors from anything other than the
339 # risk catching KeyErrors from anything other than the
340 # parthandlermapping lookup (any KeyError raised by handler()
340 # parthandlermapping lookup (any KeyError raised by handler()
341 # itself represents a defect of a different variety).
341 # itself represents a defect of a different variety).
342 output = None
342 output = None
343 if op.reply is not None:
343 if op.reply is not None:
344 op.ui.pushbuffer(error=True)
344 op.ui.pushbuffer(error=True)
345 output = ''
345 output = ''
346 try:
346 try:
347 handler(op, part)
347 handler(op, part)
348 finally:
348 finally:
349 if output is not None:
349 if output is not None:
350 output = op.ui.popbuffer()
350 output = op.ui.popbuffer()
351 if output:
351 if output:
352 outpart = op.reply.newpart('b2x:output', data=output)
352 outpart = op.reply.newpart('b2x:output', data=output)
353 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
353 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
354 finally:
354 finally:
355 # consume the part content to not corrupt the stream.
355 # consume the part content to not corrupt the stream.
356 part.read()
356 part.read()
357
357
358
358
359 def decodecaps(blob):
359 def decodecaps(blob):
360 """decode a bundle2 caps bytes blob into a dictionnary
360 """decode a bundle2 caps bytes blob into a dictionnary
361
361
362 The blob is a list of capabilities (one per line)
362 The blob is a list of capabilities (one per line)
363 Capabilities may have values using a line of the form::
363 Capabilities may have values using a line of the form::
364
364
365 capability=value1,value2,value3
365 capability=value1,value2,value3
366
366
367 The values are always a list."""
367 The values are always a list."""
368 caps = {}
368 caps = {}
369 for line in blob.splitlines():
369 for line in blob.splitlines():
370 if not line:
370 if not line:
371 continue
371 continue
372 if '=' not in line:
372 if '=' not in line:
373 key, vals = line, ()
373 key, vals = line, ()
374 else:
374 else:
375 key, vals = line.split('=', 1)
375 key, vals = line.split('=', 1)
376 vals = vals.split(',')
376 vals = vals.split(',')
377 key = urllib.unquote(key)
377 key = urllib.unquote(key)
378 vals = [urllib.unquote(v) for v in vals]
378 vals = [urllib.unquote(v) for v in vals]
379 caps[key] = vals
379 caps[key] = vals
380 return caps
380 return caps
381
381
382 def encodecaps(caps):
382 def encodecaps(caps):
383 """encode a bundle2 caps dictionary into a bytes blob"""
383 """encode a bundle2 caps dictionary into a bytes blob"""
384 chunks = []
384 chunks = []
385 for ca in sorted(caps):
385 for ca in sorted(caps):
386 vals = caps[ca]
386 vals = caps[ca]
387 ca = urllib.quote(ca)
387 ca = urllib.quote(ca)
388 vals = [urllib.quote(v) for v in vals]
388 vals = [urllib.quote(v) for v in vals]
389 if vals:
389 if vals:
390 ca = "%s=%s" % (ca, ','.join(vals))
390 ca = "%s=%s" % (ca, ','.join(vals))
391 chunks.append(ca)
391 chunks.append(ca)
392 return '\n'.join(chunks)
392 return '\n'.join(chunks)
393
393
394 class bundle20(object):
394 class bundle20(object):
395 """represent an outgoing bundle2 container
395 """represent an outgoing bundle2 container
396
396
397 Use the `addparam` method to add stream level parameter. and `newpart` to
397 Use the `addparam` method to add stream level parameter. and `newpart` to
398 populate it. Then call `getchunks` to retrieve all the binary chunks of
398 populate it. Then call `getchunks` to retrieve all the binary chunks of
399 data that compose the bundle2 container."""
399 data that compose the bundle2 container."""
400
400
401 def __init__(self, ui, capabilities=()):
401 def __init__(self, ui, capabilities=()):
402 self.ui = ui
402 self.ui = ui
403 self._params = []
403 self._params = []
404 self._parts = []
404 self._parts = []
405 self.capabilities = dict(capabilities)
405 self.capabilities = dict(capabilities)
406
406
407 @property
407 @property
408 def nbparts(self):
408 def nbparts(self):
409 """total number of parts added to the bundler"""
409 """total number of parts added to the bundler"""
410 return len(self._parts)
410 return len(self._parts)
411
411
412 # methods used to defines the bundle2 content
412 # methods used to defines the bundle2 content
413 def addparam(self, name, value=None):
413 def addparam(self, name, value=None):
414 """add a stream level parameter"""
414 """add a stream level parameter"""
415 if not name:
415 if not name:
416 raise ValueError('empty parameter name')
416 raise ValueError('empty parameter name')
417 if name[0] not in string.letters:
417 if name[0] not in string.letters:
418 raise ValueError('non letter first character: %r' % name)
418 raise ValueError('non letter first character: %r' % name)
419 self._params.append((name, value))
419 self._params.append((name, value))
420
420
421 def addpart(self, part):
421 def addpart(self, part):
422 """add a new part to the bundle2 container
422 """add a new part to the bundle2 container
423
423
424 Parts contains the actual applicative payload."""
424 Parts contains the actual applicative payload."""
425 assert part.id is None
425 assert part.id is None
426 part.id = len(self._parts) # very cheap counter
426 part.id = len(self._parts) # very cheap counter
427 self._parts.append(part)
427 self._parts.append(part)
428
428
429 def newpart(self, typeid, *args, **kwargs):
429 def newpart(self, typeid, *args, **kwargs):
430 """create a new part and add it to the containers
430 """create a new part and add it to the containers
431
431
432 As the part is directly added to the containers. For now, this means
432 As the part is directly added to the containers. For now, this means
433 that any failure to properly initialize the part after calling
433 that any failure to properly initialize the part after calling
434 ``newpart`` should result in a failure of the whole bundling process.
434 ``newpart`` should result in a failure of the whole bundling process.
435
435
436 You can still fall back to manually create and add if you need better
436 You can still fall back to manually create and add if you need better
437 control."""
437 control."""
438 part = bundlepart(typeid, *args, **kwargs)
438 part = bundlepart(typeid, *args, **kwargs)
439 self.addpart(part)
439 self.addpart(part)
440 return part
440 return part
441
441
442 # methods used to generate the bundle2 stream
442 # methods used to generate the bundle2 stream
443 def getchunks(self):
443 def getchunks(self):
444 self.ui.debug('start emission of %s stream\n' % _magicstring)
444 self.ui.debug('start emission of %s stream\n' % _magicstring)
445 yield _magicstring
445 yield _magicstring
446 param = self._paramchunk()
446 param = self._paramchunk()
447 self.ui.debug('bundle parameter: %s\n' % param)
447 self.ui.debug('bundle parameter: %s\n' % param)
448 yield _pack(_fstreamparamsize, len(param))
448 yield _pack(_fstreamparamsize, len(param))
449 if param:
449 if param:
450 yield param
450 yield param
451
451
452 self.ui.debug('start of parts\n')
452 self.ui.debug('start of parts\n')
453 for part in self._parts:
453 for part in self._parts:
454 self.ui.debug('bundle part: "%s"\n' % part.type)
454 self.ui.debug('bundle part: "%s"\n' % part.type)
455 for chunk in part.getchunks():
455 for chunk in part.getchunks():
456 yield chunk
456 yield chunk
457 self.ui.debug('end of bundle\n')
457 self.ui.debug('end of bundle\n')
458 yield _pack(_fpartheadersize, 0)
458 yield _pack(_fpartheadersize, 0)
459
459
460 def _paramchunk(self):
460 def _paramchunk(self):
461 """return a encoded version of all stream parameters"""
461 """return a encoded version of all stream parameters"""
462 blocks = []
462 blocks = []
463 for par, value in self._params:
463 for par, value in self._params:
464 par = urllib.quote(par)
464 par = urllib.quote(par)
465 if value is not None:
465 if value is not None:
466 value = urllib.quote(value)
466 value = urllib.quote(value)
467 par = '%s=%s' % (par, value)
467 par = '%s=%s' % (par, value)
468 blocks.append(par)
468 blocks.append(par)
469 return ' '.join(blocks)
469 return ' '.join(blocks)
470
470
471 class unpackermixin(object):
471 class unpackermixin(object):
472 """A mixin to extract bytes and struct data from a stream"""
472 """A mixin to extract bytes and struct data from a stream"""
473
473
474 def __init__(self, fp):
474 def __init__(self, fp):
475 self._fp = fp
475 self._fp = fp
476
476
477 def _unpack(self, format):
477 def _unpack(self, format):
478 """unpack this struct format from the stream"""
478 """unpack this struct format from the stream"""
479 data = self._readexact(struct.calcsize(format))
479 data = self._readexact(struct.calcsize(format))
480 return _unpack(format, data)
480 return _unpack(format, data)
481
481
482 def _readexact(self, size):
482 def _readexact(self, size):
483 """read exactly <size> bytes from the stream"""
483 """read exactly <size> bytes from the stream"""
484 return changegroup.readexactly(self._fp, size)
484 return changegroup.readexactly(self._fp, size)
485
485
486
486
487 class unbundle20(unpackermixin):
487 class unbundle20(unpackermixin):
488 """interpret a bundle2 stream
488 """interpret a bundle2 stream
489
489
490 This class is fed with a binary stream and yields parts through its
490 This class is fed with a binary stream and yields parts through its
491 `iterparts` methods."""
491 `iterparts` methods."""
492
492
493 def __init__(self, ui, fp, header=None):
493 def __init__(self, ui, fp, header=None):
494 """If header is specified, we do not read it out of the stream."""
494 """If header is specified, we do not read it out of the stream."""
495 self.ui = ui
495 self.ui = ui
496 super(unbundle20, self).__init__(fp)
496 super(unbundle20, self).__init__(fp)
497 if header is None:
497 if header is None:
498 header = self._readexact(4)
498 header = self._readexact(4)
499 magic, version = header[0:2], header[2:4]
499 magic, version = header[0:2], header[2:4]
500 if magic != 'HG':
500 if magic != 'HG':
501 raise util.Abort(_('not a Mercurial bundle'))
501 raise util.Abort(_('not a Mercurial bundle'))
502 if version != '2Y':
502 if version != '2Y':
503 raise util.Abort(_('unknown bundle version %s') % version)
503 raise util.Abort(_('unknown bundle version %s') % version)
504 self.ui.debug('start processing of %s stream\n' % header)
504 self.ui.debug('start processing of %s stream\n' % header)
505
505
506 @util.propertycache
506 @util.propertycache
507 def params(self):
507 def params(self):
508 """dictionary of stream level parameters"""
508 """dictionary of stream level parameters"""
509 self.ui.debug('reading bundle2 stream parameters\n')
509 self.ui.debug('reading bundle2 stream parameters\n')
510 params = {}
510 params = {}
511 paramssize = self._unpack(_fstreamparamsize)[0]
511 paramssize = self._unpack(_fstreamparamsize)[0]
512 if paramssize < 0:
513 raise error.BundleValueError('negative bundle param size: %i'
514 % paramssize)
512 if paramssize:
515 if paramssize:
513 for p in self._readexact(paramssize).split(' '):
516 for p in self._readexact(paramssize).split(' '):
514 p = p.split('=', 1)
517 p = p.split('=', 1)
515 p = [urllib.unquote(i) for i in p]
518 p = [urllib.unquote(i) for i in p]
516 if len(p) < 2:
519 if len(p) < 2:
517 p.append(None)
520 p.append(None)
518 self._processparam(*p)
521 self._processparam(*p)
519 params[p[0]] = p[1]
522 params[p[0]] = p[1]
520 return params
523 return params
521
524
522 def _processparam(self, name, value):
525 def _processparam(self, name, value):
523 """process a parameter, applying its effect if needed
526 """process a parameter, applying its effect if needed
524
527
525 Parameter starting with a lower case letter are advisory and will be
528 Parameter starting with a lower case letter are advisory and will be
526 ignored when unknown. Those starting with an upper case letter are
529 ignored when unknown. Those starting with an upper case letter are
527 mandatory and will this function will raise a KeyError when unknown.
530 mandatory and will this function will raise a KeyError when unknown.
528
531
529 Note: no option are currently supported. Any input will be either
532 Note: no option are currently supported. Any input will be either
530 ignored or failing.
533 ignored or failing.
531 """
534 """
532 if not name:
535 if not name:
533 raise ValueError('empty parameter name')
536 raise ValueError('empty parameter name')
534 if name[0] not in string.letters:
537 if name[0] not in string.letters:
535 raise ValueError('non letter first character: %r' % name)
538 raise ValueError('non letter first character: %r' % name)
536 # Some logic will be later added here to try to process the option for
539 # Some logic will be later added here to try to process the option for
537 # a dict of known parameter.
540 # a dict of known parameter.
538 if name[0].islower():
541 if name[0].islower():
539 self.ui.debug("ignoring unknown parameter %r\n" % name)
542 self.ui.debug("ignoring unknown parameter %r\n" % name)
540 else:
543 else:
541 raise error.UnsupportedPartError(params=(name,))
544 raise error.UnsupportedPartError(params=(name,))
542
545
543
546
544 def iterparts(self):
547 def iterparts(self):
545 """yield all parts contained in the stream"""
548 """yield all parts contained in the stream"""
546 # make sure param have been loaded
549 # make sure param have been loaded
547 self.params
550 self.params
548 self.ui.debug('start extraction of bundle2 parts\n')
551 self.ui.debug('start extraction of bundle2 parts\n')
549 headerblock = self._readpartheader()
552 headerblock = self._readpartheader()
550 while headerblock is not None:
553 while headerblock is not None:
551 part = unbundlepart(self.ui, headerblock, self._fp)
554 part = unbundlepart(self.ui, headerblock, self._fp)
552 yield part
555 yield part
553 headerblock = self._readpartheader()
556 headerblock = self._readpartheader()
554 self.ui.debug('end of bundle2 stream\n')
557 self.ui.debug('end of bundle2 stream\n')
555
558
556 def _readpartheader(self):
559 def _readpartheader(self):
557 """reads a part header size and return the bytes blob
560 """reads a part header size and return the bytes blob
558
561
559 returns None if empty"""
562 returns None if empty"""
560 headersize = self._unpack(_fpartheadersize)[0]
563 headersize = self._unpack(_fpartheadersize)[0]
564 if headersize < 0:
565 raise error.BundleValueError('negative part header size: %i'
566 % headersize)
561 self.ui.debug('part header size: %i\n' % headersize)
567 self.ui.debug('part header size: %i\n' % headersize)
562 if headersize:
568 if headersize:
563 return self._readexact(headersize)
569 return self._readexact(headersize)
564 return None
570 return None
565
571
566
572
567 class bundlepart(object):
573 class bundlepart(object):
568 """A bundle2 part contains application level payload
574 """A bundle2 part contains application level payload
569
575
570 The part `type` is used to route the part to the application level
576 The part `type` is used to route the part to the application level
571 handler.
577 handler.
572
578
573 The part payload is contained in ``part.data``. It could be raw bytes or a
579 The part payload is contained in ``part.data``. It could be raw bytes or a
574 generator of byte chunks.
580 generator of byte chunks.
575
581
576 You can add parameters to the part using the ``addparam`` method.
582 You can add parameters to the part using the ``addparam`` method.
577 Parameters can be either mandatory (default) or advisory. Remote side
583 Parameters can be either mandatory (default) or advisory. Remote side
578 should be able to safely ignore the advisory ones.
584 should be able to safely ignore the advisory ones.
579
585
580 Both data and parameters cannot be modified after the generation has begun.
586 Both data and parameters cannot be modified after the generation has begun.
581 """
587 """
582
588
583 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
589 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
584 data=''):
590 data=''):
585 self.id = None
591 self.id = None
586 self.type = parttype
592 self.type = parttype
587 self._data = data
593 self._data = data
588 self._mandatoryparams = list(mandatoryparams)
594 self._mandatoryparams = list(mandatoryparams)
589 self._advisoryparams = list(advisoryparams)
595 self._advisoryparams = list(advisoryparams)
590 # checking for duplicated entries
596 # checking for duplicated entries
591 self._seenparams = set()
597 self._seenparams = set()
592 for pname, __ in self._mandatoryparams + self._advisoryparams:
598 for pname, __ in self._mandatoryparams + self._advisoryparams:
593 if pname in self._seenparams:
599 if pname in self._seenparams:
594 raise RuntimeError('duplicated params: %s' % pname)
600 raise RuntimeError('duplicated params: %s' % pname)
595 self._seenparams.add(pname)
601 self._seenparams.add(pname)
596 # status of the part's generation:
602 # status of the part's generation:
597 # - None: not started,
603 # - None: not started,
598 # - False: currently generated,
604 # - False: currently generated,
599 # - True: generation done.
605 # - True: generation done.
600 self._generated = None
606 self._generated = None
601
607
602 # methods used to defines the part content
608 # methods used to defines the part content
603 def __setdata(self, data):
609 def __setdata(self, data):
604 if self._generated is not None:
610 if self._generated is not None:
605 raise error.ReadOnlyPartError('part is being generated')
611 raise error.ReadOnlyPartError('part is being generated')
606 self._data = data
612 self._data = data
607 def __getdata(self):
613 def __getdata(self):
608 return self._data
614 return self._data
609 data = property(__getdata, __setdata)
615 data = property(__getdata, __setdata)
610
616
611 @property
617 @property
612 def mandatoryparams(self):
618 def mandatoryparams(self):
613 # make it an immutable tuple to force people through ``addparam``
619 # make it an immutable tuple to force people through ``addparam``
614 return tuple(self._mandatoryparams)
620 return tuple(self._mandatoryparams)
615
621
616 @property
622 @property
617 def advisoryparams(self):
623 def advisoryparams(self):
618 # make it an immutable tuple to force people through ``addparam``
624 # make it an immutable tuple to force people through ``addparam``
619 return tuple(self._advisoryparams)
625 return tuple(self._advisoryparams)
620
626
621 def addparam(self, name, value='', mandatory=True):
627 def addparam(self, name, value='', mandatory=True):
622 if self._generated is not None:
628 if self._generated is not None:
623 raise error.ReadOnlyPartError('part is being generated')
629 raise error.ReadOnlyPartError('part is being generated')
624 if name in self._seenparams:
630 if name in self._seenparams:
625 raise ValueError('duplicated params: %s' % name)
631 raise ValueError('duplicated params: %s' % name)
626 self._seenparams.add(name)
632 self._seenparams.add(name)
627 params = self._advisoryparams
633 params = self._advisoryparams
628 if mandatory:
634 if mandatory:
629 params = self._mandatoryparams
635 params = self._mandatoryparams
630 params.append((name, value))
636 params.append((name, value))
631
637
632 # methods used to generates the bundle2 stream
638 # methods used to generates the bundle2 stream
633 def getchunks(self):
639 def getchunks(self):
634 if self._generated is not None:
640 if self._generated is not None:
635 raise RuntimeError('part can only be consumed once')
641 raise RuntimeError('part can only be consumed once')
636 self._generated = False
642 self._generated = False
637 #### header
643 #### header
638 ## parttype
644 ## parttype
639 header = [_pack(_fparttypesize, len(self.type)),
645 header = [_pack(_fparttypesize, len(self.type)),
640 self.type, _pack(_fpartid, self.id),
646 self.type, _pack(_fpartid, self.id),
641 ]
647 ]
642 ## parameters
648 ## parameters
643 # count
649 # count
644 manpar = self.mandatoryparams
650 manpar = self.mandatoryparams
645 advpar = self.advisoryparams
651 advpar = self.advisoryparams
646 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
652 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
647 # size
653 # size
648 parsizes = []
654 parsizes = []
649 for key, value in manpar:
655 for key, value in manpar:
650 parsizes.append(len(key))
656 parsizes.append(len(key))
651 parsizes.append(len(value))
657 parsizes.append(len(value))
652 for key, value in advpar:
658 for key, value in advpar:
653 parsizes.append(len(key))
659 parsizes.append(len(key))
654 parsizes.append(len(value))
660 parsizes.append(len(value))
655 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
661 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
656 header.append(paramsizes)
662 header.append(paramsizes)
657 # key, value
663 # key, value
658 for key, value in manpar:
664 for key, value in manpar:
659 header.append(key)
665 header.append(key)
660 header.append(value)
666 header.append(value)
661 for key, value in advpar:
667 for key, value in advpar:
662 header.append(key)
668 header.append(key)
663 header.append(value)
669 header.append(value)
664 ## finalize header
670 ## finalize header
665 headerchunk = ''.join(header)
671 headerchunk = ''.join(header)
666 yield _pack(_fpartheadersize, len(headerchunk))
672 yield _pack(_fpartheadersize, len(headerchunk))
667 yield headerchunk
673 yield headerchunk
668 ## payload
674 ## payload
669 for chunk in self._payloadchunks():
675 for chunk in self._payloadchunks():
670 yield _pack(_fpayloadsize, len(chunk))
676 yield _pack(_fpayloadsize, len(chunk))
671 yield chunk
677 yield chunk
672 # end of payload
678 # end of payload
673 yield _pack(_fpayloadsize, 0)
679 yield _pack(_fpayloadsize, 0)
674 self._generated = True
680 self._generated = True
675
681
676 def _payloadchunks(self):
682 def _payloadchunks(self):
677 """yield chunks of a the part payload
683 """yield chunks of a the part payload
678
684
679 Exists to handle the different methods to provide data to a part."""
685 Exists to handle the different methods to provide data to a part."""
680 # we only support fixed size data now.
686 # we only support fixed size data now.
681 # This will be improved in the future.
687 # This will be improved in the future.
682 if util.safehasattr(self.data, 'next'):
688 if util.safehasattr(self.data, 'next'):
683 buff = util.chunkbuffer(self.data)
689 buff = util.chunkbuffer(self.data)
684 chunk = buff.read(preferedchunksize)
690 chunk = buff.read(preferedchunksize)
685 while chunk:
691 while chunk:
686 yield chunk
692 yield chunk
687 chunk = buff.read(preferedchunksize)
693 chunk = buff.read(preferedchunksize)
688 elif len(self.data):
694 elif len(self.data):
689 yield self.data
695 yield self.data
690
696
691 class unbundlepart(unpackermixin):
697 class unbundlepart(unpackermixin):
692 """a bundle part read from a bundle"""
698 """a bundle part read from a bundle"""
693
699
694 def __init__(self, ui, header, fp):
700 def __init__(self, ui, header, fp):
695 super(unbundlepart, self).__init__(fp)
701 super(unbundlepart, self).__init__(fp)
696 self.ui = ui
702 self.ui = ui
697 # unbundle state attr
703 # unbundle state attr
698 self._headerdata = header
704 self._headerdata = header
699 self._headeroffset = 0
705 self._headeroffset = 0
700 self._initialized = False
706 self._initialized = False
701 self.consumed = False
707 self.consumed = False
702 # part data
708 # part data
703 self.id = None
709 self.id = None
704 self.type = None
710 self.type = None
705 self.mandatoryparams = None
711 self.mandatoryparams = None
706 self.advisoryparams = None
712 self.advisoryparams = None
707 self.params = None
713 self.params = None
708 self.mandatorykeys = ()
714 self.mandatorykeys = ()
709 self._payloadstream = None
715 self._payloadstream = None
710 self._readheader()
716 self._readheader()
711
717
712 def _fromheader(self, size):
718 def _fromheader(self, size):
713 """return the next <size> byte from the header"""
719 """return the next <size> byte from the header"""
714 offset = self._headeroffset
720 offset = self._headeroffset
715 data = self._headerdata[offset:(offset + size)]
721 data = self._headerdata[offset:(offset + size)]
716 self._headeroffset = offset + size
722 self._headeroffset = offset + size
717 return data
723 return data
718
724
719 def _unpackheader(self, format):
725 def _unpackheader(self, format):
720 """read given format from header
726 """read given format from header
721
727
722 This automatically compute the size of the format to read."""
728 This automatically compute the size of the format to read."""
723 data = self._fromheader(struct.calcsize(format))
729 data = self._fromheader(struct.calcsize(format))
724 return _unpack(format, data)
730 return _unpack(format, data)
725
731
726 def _initparams(self, mandatoryparams, advisoryparams):
732 def _initparams(self, mandatoryparams, advisoryparams):
727 """internal function to setup all logic related parameters"""
733 """internal function to setup all logic related parameters"""
728 # make it read only to prevent people touching it by mistake.
734 # make it read only to prevent people touching it by mistake.
729 self.mandatoryparams = tuple(mandatoryparams)
735 self.mandatoryparams = tuple(mandatoryparams)
730 self.advisoryparams = tuple(advisoryparams)
736 self.advisoryparams = tuple(advisoryparams)
731 # user friendly UI
737 # user friendly UI
732 self.params = dict(self.mandatoryparams)
738 self.params = dict(self.mandatoryparams)
733 self.params.update(dict(self.advisoryparams))
739 self.params.update(dict(self.advisoryparams))
734 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
740 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
735
741
736 def _readheader(self):
742 def _readheader(self):
737 """read the header and setup the object"""
743 """read the header and setup the object"""
738 typesize = self._unpackheader(_fparttypesize)[0]
744 typesize = self._unpackheader(_fparttypesize)[0]
739 self.type = self._fromheader(typesize)
745 self.type = self._fromheader(typesize)
740 self.ui.debug('part type: "%s"\n' % self.type)
746 self.ui.debug('part type: "%s"\n' % self.type)
741 self.id = self._unpackheader(_fpartid)[0]
747 self.id = self._unpackheader(_fpartid)[0]
742 self.ui.debug('part id: "%s"\n' % self.id)
748 self.ui.debug('part id: "%s"\n' % self.id)
743 ## reading parameters
749 ## reading parameters
744 # param count
750 # param count
745 mancount, advcount = self._unpackheader(_fpartparamcount)
751 mancount, advcount = self._unpackheader(_fpartparamcount)
746 self.ui.debug('part parameters: %i\n' % (mancount + advcount))
752 self.ui.debug('part parameters: %i\n' % (mancount + advcount))
747 # param size
753 # param size
748 fparamsizes = _makefpartparamsizes(mancount + advcount)
754 fparamsizes = _makefpartparamsizes(mancount + advcount)
749 paramsizes = self._unpackheader(fparamsizes)
755 paramsizes = self._unpackheader(fparamsizes)
750 # make it a list of couple again
756 # make it a list of couple again
751 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
757 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
752 # split mandatory from advisory
758 # split mandatory from advisory
753 mansizes = paramsizes[:mancount]
759 mansizes = paramsizes[:mancount]
754 advsizes = paramsizes[mancount:]
760 advsizes = paramsizes[mancount:]
755 # retrive param value
761 # retrive param value
756 manparams = []
762 manparams = []
757 for key, value in mansizes:
763 for key, value in mansizes:
758 manparams.append((self._fromheader(key), self._fromheader(value)))
764 manparams.append((self._fromheader(key), self._fromheader(value)))
759 advparams = []
765 advparams = []
760 for key, value in advsizes:
766 for key, value in advsizes:
761 advparams.append((self._fromheader(key), self._fromheader(value)))
767 advparams.append((self._fromheader(key), self._fromheader(value)))
762 self._initparams(manparams, advparams)
768 self._initparams(manparams, advparams)
763 ## part payload
769 ## part payload
764 def payloadchunks():
770 def payloadchunks():
765 payloadsize = self._unpack(_fpayloadsize)[0]
771 payloadsize = self._unpack(_fpayloadsize)[0]
766 self.ui.debug('payload chunk size: %i\n' % payloadsize)
772 self.ui.debug('payload chunk size: %i\n' % payloadsize)
767 while payloadsize:
773 while payloadsize:
774 if payloadsize < 0:
775 msg = 'negative payload chunk size: %i' % payloadsize
776 raise error.BundleValueError(msg)
768 yield self._readexact(payloadsize)
777 yield self._readexact(payloadsize)
769 payloadsize = self._unpack(_fpayloadsize)[0]
778 payloadsize = self._unpack(_fpayloadsize)[0]
770 self.ui.debug('payload chunk size: %i\n' % payloadsize)
779 self.ui.debug('payload chunk size: %i\n' % payloadsize)
771 self._payloadstream = util.chunkbuffer(payloadchunks())
780 self._payloadstream = util.chunkbuffer(payloadchunks())
772 # we read the data, tell it
781 # we read the data, tell it
773 self._initialized = True
782 self._initialized = True
774
783
775 def read(self, size=None):
784 def read(self, size=None):
776 """read payload data"""
785 """read payload data"""
777 if not self._initialized:
786 if not self._initialized:
778 self._readheader()
787 self._readheader()
779 if size is None:
788 if size is None:
780 data = self._payloadstream.read()
789 data = self._payloadstream.read()
781 else:
790 else:
782 data = self._payloadstream.read(size)
791 data = self._payloadstream.read(size)
783 if size is None or len(data) < size:
792 if size is None or len(data) < size:
784 self.consumed = True
793 self.consumed = True
785 return data
794 return data
786
795
787 capabilities = {'HG2Y': (),
796 capabilities = {'HG2Y': (),
788 'b2x:listkeys': (),
797 'b2x:listkeys': (),
789 'b2x:pushkey': (),
798 'b2x:pushkey': (),
790 'b2x:changegroup': (),
799 'b2x:changegroup': (),
791 }
800 }
792
801
793 def getrepocaps(repo):
802 def getrepocaps(repo):
794 """return the bundle2 capabilities for a given repo
803 """return the bundle2 capabilities for a given repo
795
804
796 Exists to allow extensions (like evolution) to mutate the capabilities.
805 Exists to allow extensions (like evolution) to mutate the capabilities.
797 """
806 """
798 caps = capabilities.copy()
807 caps = capabilities.copy()
799 if obsolete.isenabled(repo, obsolete.exchangeopt):
808 if obsolete.isenabled(repo, obsolete.exchangeopt):
800 supportedformat = tuple('V%i' % v for v in obsolete.formats)
809 supportedformat = tuple('V%i' % v for v in obsolete.formats)
801 caps['b2x:obsmarkers'] = supportedformat
810 caps['b2x:obsmarkers'] = supportedformat
802 return caps
811 return caps
803
812
804 def bundle2caps(remote):
813 def bundle2caps(remote):
805 """return the bundlecapabilities of a peer as dict"""
814 """return the bundlecapabilities of a peer as dict"""
806 raw = remote.capable('bundle2-exp')
815 raw = remote.capable('bundle2-exp')
807 if not raw and raw != '':
816 if not raw and raw != '':
808 return {}
817 return {}
809 capsblob = urllib.unquote(remote.capable('bundle2-exp'))
818 capsblob = urllib.unquote(remote.capable('bundle2-exp'))
810 return decodecaps(capsblob)
819 return decodecaps(capsblob)
811
820
812 def obsmarkersversion(caps):
821 def obsmarkersversion(caps):
813 """extract the list of supported obsmarkers versions from a bundle2caps dict
822 """extract the list of supported obsmarkers versions from a bundle2caps dict
814 """
823 """
815 obscaps = caps.get('b2x:obsmarkers', ())
824 obscaps = caps.get('b2x:obsmarkers', ())
816 return [int(c[1:]) for c in obscaps if c.startswith('V')]
825 return [int(c[1:]) for c in obscaps if c.startswith('V')]
817
826
818 @parthandler('b2x:changegroup')
827 @parthandler('b2x:changegroup')
819 def handlechangegroup(op, inpart):
828 def handlechangegroup(op, inpart):
820 """apply a changegroup part on the repo
829 """apply a changegroup part on the repo
821
830
822 This is a very early implementation that will massive rework before being
831 This is a very early implementation that will massive rework before being
823 inflicted to any end-user.
832 inflicted to any end-user.
824 """
833 """
825 # Make sure we trigger a transaction creation
834 # Make sure we trigger a transaction creation
826 #
835 #
827 # The addchangegroup function will get a transaction object by itself, but
836 # The addchangegroup function will get a transaction object by itself, but
828 # we need to make sure we trigger the creation of a transaction object used
837 # we need to make sure we trigger the creation of a transaction object used
829 # for the whole processing scope.
838 # for the whole processing scope.
830 op.gettransaction()
839 op.gettransaction()
831 cg = changegroup.cg1unpacker(inpart, 'UN')
840 cg = changegroup.cg1unpacker(inpart, 'UN')
832 # the source and url passed here are overwritten by the one contained in
841 # the source and url passed here are overwritten by the one contained in
833 # the transaction.hookargs argument. So 'bundle2' is a placeholder
842 # the transaction.hookargs argument. So 'bundle2' is a placeholder
834 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
843 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
835 op.records.add('changegroup', {'return': ret})
844 op.records.add('changegroup', {'return': ret})
836 if op.reply is not None:
845 if op.reply is not None:
837 # This is definitly not the final form of this
846 # This is definitly not the final form of this
838 # return. But one need to start somewhere.
847 # return. But one need to start somewhere.
839 part = op.reply.newpart('b2x:reply:changegroup')
848 part = op.reply.newpart('b2x:reply:changegroup')
840 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
849 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
841 part.addparam('return', '%i' % ret, mandatory=False)
850 part.addparam('return', '%i' % ret, mandatory=False)
842 assert not inpart.read()
851 assert not inpart.read()
843
852
844 @parthandler('b2x:reply:changegroup', ('return', 'in-reply-to'))
853 @parthandler('b2x:reply:changegroup', ('return', 'in-reply-to'))
845 def handlereplychangegroup(op, inpart):
854 def handlereplychangegroup(op, inpart):
846 ret = int(inpart.params['return'])
855 ret = int(inpart.params['return'])
847 replyto = int(inpart.params['in-reply-to'])
856 replyto = int(inpart.params['in-reply-to'])
848 op.records.add('changegroup', {'return': ret}, replyto)
857 op.records.add('changegroup', {'return': ret}, replyto)
849
858
850 @parthandler('b2x:check:heads')
859 @parthandler('b2x:check:heads')
851 def handlecheckheads(op, inpart):
860 def handlecheckheads(op, inpart):
852 """check that head of the repo did not change
861 """check that head of the repo did not change
853
862
854 This is used to detect a push race when using unbundle.
863 This is used to detect a push race when using unbundle.
855 This replaces the "heads" argument of unbundle."""
864 This replaces the "heads" argument of unbundle."""
856 h = inpart.read(20)
865 h = inpart.read(20)
857 heads = []
866 heads = []
858 while len(h) == 20:
867 while len(h) == 20:
859 heads.append(h)
868 heads.append(h)
860 h = inpart.read(20)
869 h = inpart.read(20)
861 assert not h
870 assert not h
862 if heads != op.repo.heads():
871 if heads != op.repo.heads():
863 raise error.PushRaced('repository changed while pushing - '
872 raise error.PushRaced('repository changed while pushing - '
864 'please try again')
873 'please try again')
865
874
866 @parthandler('b2x:output')
875 @parthandler('b2x:output')
867 def handleoutput(op, inpart):
876 def handleoutput(op, inpart):
868 """forward output captured on the server to the client"""
877 """forward output captured on the server to the client"""
869 for line in inpart.read().splitlines():
878 for line in inpart.read().splitlines():
870 op.ui.write(('remote: %s\n' % line))
879 op.ui.write(('remote: %s\n' % line))
871
880
872 @parthandler('b2x:replycaps')
881 @parthandler('b2x:replycaps')
873 def handlereplycaps(op, inpart):
882 def handlereplycaps(op, inpart):
874 """Notify that a reply bundle should be created
883 """Notify that a reply bundle should be created
875
884
876 The payload contains the capabilities information for the reply"""
885 The payload contains the capabilities information for the reply"""
877 caps = decodecaps(inpart.read())
886 caps = decodecaps(inpart.read())
878 if op.reply is None:
887 if op.reply is None:
879 op.reply = bundle20(op.ui, caps)
888 op.reply = bundle20(op.ui, caps)
880
889
881 @parthandler('b2x:error:abort', ('message', 'hint'))
890 @parthandler('b2x:error:abort', ('message', 'hint'))
882 def handlereplycaps(op, inpart):
891 def handlereplycaps(op, inpart):
883 """Used to transmit abort error over the wire"""
892 """Used to transmit abort error over the wire"""
884 raise util.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
893 raise util.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
885
894
886 @parthandler('b2x:error:unsupportedcontent', ('parttype', 'params'))
895 @parthandler('b2x:error:unsupportedcontent', ('parttype', 'params'))
887 def handlereplycaps(op, inpart):
896 def handlereplycaps(op, inpart):
888 """Used to transmit unknown content error over the wire"""
897 """Used to transmit unknown content error over the wire"""
889 kwargs = {}
898 kwargs = {}
890 parttype = inpart.params.get('parttype')
899 parttype = inpart.params.get('parttype')
891 if parttype is not None:
900 if parttype is not None:
892 kwargs['parttype'] = parttype
901 kwargs['parttype'] = parttype
893 params = inpart.params.get('params')
902 params = inpart.params.get('params')
894 if params is not None:
903 if params is not None:
895 kwargs['params'] = params.split('\0')
904 kwargs['params'] = params.split('\0')
896
905
897 raise error.UnsupportedPartError(**kwargs)
906 raise error.UnsupportedPartError(**kwargs)
898
907
899 @parthandler('b2x:error:pushraced', ('message',))
908 @parthandler('b2x:error:pushraced', ('message',))
900 def handlereplycaps(op, inpart):
909 def handlereplycaps(op, inpart):
901 """Used to transmit push race error over the wire"""
910 """Used to transmit push race error over the wire"""
902 raise error.ResponseError(_('push failed:'), inpart.params['message'])
911 raise error.ResponseError(_('push failed:'), inpart.params['message'])
903
912
904 @parthandler('b2x:listkeys', ('namespace',))
913 @parthandler('b2x:listkeys', ('namespace',))
905 def handlelistkeys(op, inpart):
914 def handlelistkeys(op, inpart):
906 """retrieve pushkey namespace content stored in a bundle2"""
915 """retrieve pushkey namespace content stored in a bundle2"""
907 namespace = inpart.params['namespace']
916 namespace = inpart.params['namespace']
908 r = pushkey.decodekeys(inpart.read())
917 r = pushkey.decodekeys(inpart.read())
909 op.records.add('listkeys', (namespace, r))
918 op.records.add('listkeys', (namespace, r))
910
919
911 @parthandler('b2x:pushkey', ('namespace', 'key', 'old', 'new'))
920 @parthandler('b2x:pushkey', ('namespace', 'key', 'old', 'new'))
912 def handlepushkey(op, inpart):
921 def handlepushkey(op, inpart):
913 """process a pushkey request"""
922 """process a pushkey request"""
914 dec = pushkey.decode
923 dec = pushkey.decode
915 namespace = dec(inpart.params['namespace'])
924 namespace = dec(inpart.params['namespace'])
916 key = dec(inpart.params['key'])
925 key = dec(inpart.params['key'])
917 old = dec(inpart.params['old'])
926 old = dec(inpart.params['old'])
918 new = dec(inpart.params['new'])
927 new = dec(inpart.params['new'])
919 ret = op.repo.pushkey(namespace, key, old, new)
928 ret = op.repo.pushkey(namespace, key, old, new)
920 record = {'namespace': namespace,
929 record = {'namespace': namespace,
921 'key': key,
930 'key': key,
922 'old': old,
931 'old': old,
923 'new': new}
932 'new': new}
924 op.records.add('pushkey', record)
933 op.records.add('pushkey', record)
925 if op.reply is not None:
934 if op.reply is not None:
926 rpart = op.reply.newpart('b2x:reply:pushkey')
935 rpart = op.reply.newpart('b2x:reply:pushkey')
927 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
936 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
928 rpart.addparam('return', '%i' % ret, mandatory=False)
937 rpart.addparam('return', '%i' % ret, mandatory=False)
929
938
930 @parthandler('b2x:reply:pushkey', ('return', 'in-reply-to'))
939 @parthandler('b2x:reply:pushkey', ('return', 'in-reply-to'))
931 def handlepushkeyreply(op, inpart):
940 def handlepushkeyreply(op, inpart):
932 """retrieve the result of a pushkey request"""
941 """retrieve the result of a pushkey request"""
933 ret = int(inpart.params['return'])
942 ret = int(inpart.params['return'])
934 partid = int(inpart.params['in-reply-to'])
943 partid = int(inpart.params['in-reply-to'])
935 op.records.add('pushkey', {'return': ret}, partid)
944 op.records.add('pushkey', {'return': ret}, partid)
936
945
937 @parthandler('b2x:obsmarkers')
946 @parthandler('b2x:obsmarkers')
938 def handleobsmarker(op, inpart):
947 def handleobsmarker(op, inpart):
939 """add a stream of obsmarkers to the repo"""
948 """add a stream of obsmarkers to the repo"""
940 tr = op.gettransaction()
949 tr = op.gettransaction()
941 new = op.repo.obsstore.mergemarkers(tr, inpart.read())
950 new = op.repo.obsstore.mergemarkers(tr, inpart.read())
942 if new:
951 if new:
943 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
952 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
944 op.records.add('obsmarkers', {'new': new})
953 op.records.add('obsmarkers', {'new': new})
945 if op.reply is not None:
954 if op.reply is not None:
946 rpart = op.reply.newpart('b2x:reply:obsmarkers')
955 rpart = op.reply.newpart('b2x:reply:obsmarkers')
947 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
956 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
948 rpart.addparam('new', '%i' % new, mandatory=False)
957 rpart.addparam('new', '%i' % new, mandatory=False)
949
958
950
959
951 @parthandler('b2x:reply:obsmarkers', ('new', 'in-reply-to'))
960 @parthandler('b2x:reply:obsmarkers', ('new', 'in-reply-to'))
952 def handlepushkeyreply(op, inpart):
961 def handlepushkeyreply(op, inpart):
953 """retrieve the result of a pushkey request"""
962 """retrieve the result of a pushkey request"""
954 ret = int(inpart.params['new'])
963 ret = int(inpart.params['new'])
955 partid = int(inpart.params['in-reply-to'])
964 partid = int(inpart.params['in-reply-to'])
956 op.records.add('obsmarkers', {'new': ret}, partid)
965 op.records.add('obsmarkers', {'new': ret}, partid)
General Comments 0
You need to be logged in to leave comments. Login now