##// END OF EJS Templates
bundle2: extract processing of part into its own function...
Pierre-Yves David -
r23008:d3137827 default
parent child Browse files
Show More
@@ -1,948 +1,953 b''
1 # bundle2.py - generic container format to transmit arbitrary data.
1 # bundle2.py - generic container format to transmit arbitrary data.
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7 """Handling of the new bundle2 format
7 """Handling of the new bundle2 format
8
8
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 payloads in an application agnostic way. It consist in a sequence of "parts"
10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 that will be handed to and processed by the application layer.
11 that will be handed to and processed by the application layer.
12
12
13
13
14 General format architecture
14 General format architecture
15 ===========================
15 ===========================
16
16
17 The format is architectured as follow
17 The format is architectured as follow
18
18
19 - magic string
19 - magic string
20 - stream level parameters
20 - stream level parameters
21 - payload parts (any number)
21 - payload parts (any number)
22 - end of stream marker.
22 - end of stream marker.
23
23
24 the Binary format
24 the Binary format
25 ============================
25 ============================
26
26
27 All numbers are unsigned and big-endian.
27 All numbers are unsigned and big-endian.
28
28
29 stream level parameters
29 stream level parameters
30 ------------------------
30 ------------------------
31
31
32 Binary format is as follow
32 Binary format is as follow
33
33
34 :params size: (16 bits integer)
34 :params size: (16 bits integer)
35
35
36 The total number of Bytes used by the parameters
36 The total number of Bytes used by the parameters
37
37
38 :params value: arbitrary number of Bytes
38 :params value: arbitrary number of Bytes
39
39
40 A blob of `params size` containing the serialized version of all stream level
40 A blob of `params size` containing the serialized version of all stream level
41 parameters.
41 parameters.
42
42
43 The blob contains a space separated list of parameters. Parameters with value
43 The blob contains a space separated list of parameters. Parameters with value
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45
45
46 Empty name are obviously forbidden.
46 Empty name are obviously forbidden.
47
47
48 Name MUST start with a letter. If this first letter is lower case, the
48 Name MUST start with a letter. If this first letter is lower case, the
49 parameter is advisory and can be safely ignored. However when the first
49 parameter is advisory and can be safely ignored. However when the first
50 letter is capital, the parameter is mandatory and the bundling process MUST
50 letter is capital, the parameter is mandatory and the bundling process MUST
51 stop if he is not able to proceed it.
51 stop if he is not able to proceed it.
52
52
53 Stream parameters use a simple textual format for two main reasons:
53 Stream parameters use a simple textual format for two main reasons:
54
54
55 - Stream level parameters should remain simple and we want to discourage any
55 - Stream level parameters should remain simple and we want to discourage any
56 crazy usage.
56 crazy usage.
57 - Textual data allow easy human inspection of a bundle2 header in case of
57 - Textual data allow easy human inspection of a bundle2 header in case of
58 troubles.
58 troubles.
59
59
60 Any Applicative level options MUST go into a bundle2 part instead.
60 Any Applicative level options MUST go into a bundle2 part instead.
61
61
62 Payload part
62 Payload part
63 ------------------------
63 ------------------------
64
64
65 Binary format is as follow
65 Binary format is as follow
66
66
67 :header size: (16 bits inter)
67 :header size: (16 bits inter)
68
68
69 The total number of Bytes used by the part headers. When the header is empty
69 The total number of Bytes used by the part headers. When the header is empty
70 (size = 0) this is interpreted as the end of stream marker.
70 (size = 0) this is interpreted as the end of stream marker.
71
71
72 :header:
72 :header:
73
73
74 The header defines how to interpret the part. It contains two piece of
74 The header defines how to interpret the part. It contains two piece of
75 data: the part type, and the part parameters.
75 data: the part type, and the part parameters.
76
76
77 The part type is used to route an application level handler, that can
77 The part type is used to route an application level handler, that can
78 interpret payload.
78 interpret payload.
79
79
80 Part parameters are passed to the application level handler. They are
80 Part parameters are passed to the application level handler. They are
81 meant to convey information that will help the application level object to
81 meant to convey information that will help the application level object to
82 interpret the part payload.
82 interpret the part payload.
83
83
84 The binary format of the header is has follow
84 The binary format of the header is has follow
85
85
86 :typesize: (one byte)
86 :typesize: (one byte)
87
87
88 :parttype: alphanumerical part name
88 :parttype: alphanumerical part name
89
89
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 to this part.
91 to this part.
92
92
93 :parameters:
93 :parameters:
94
94
95 Part's parameter may have arbitrary content, the binary structure is::
95 Part's parameter may have arbitrary content, the binary structure is::
96
96
97 <mandatory-count><advisory-count><param-sizes><param-data>
97 <mandatory-count><advisory-count><param-sizes><param-data>
98
98
99 :mandatory-count: 1 byte, number of mandatory parameters
99 :mandatory-count: 1 byte, number of mandatory parameters
100
100
101 :advisory-count: 1 byte, number of advisory parameters
101 :advisory-count: 1 byte, number of advisory parameters
102
102
103 :param-sizes:
103 :param-sizes:
104
104
105 N couple of bytes, where N is the total number of parameters. Each
105 N couple of bytes, where N is the total number of parameters. Each
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107
107
108 :param-data:
108 :param-data:
109
109
110 A blob of bytes from which each parameter key and value can be
110 A blob of bytes from which each parameter key and value can be
111 retrieved using the list of size couples stored in the previous
111 retrieved using the list of size couples stored in the previous
112 field.
112 field.
113
113
114 Mandatory parameters comes first, then the advisory ones.
114 Mandatory parameters comes first, then the advisory ones.
115
115
116 Each parameter's key MUST be unique within the part.
116 Each parameter's key MUST be unique within the part.
117
117
118 :payload:
118 :payload:
119
119
120 payload is a series of `<chunksize><chunkdata>`.
120 payload is a series of `<chunksize><chunkdata>`.
121
121
122 `chunksize` is a 32 bits integer, `chunkdata` are plain bytes (as much as
122 `chunksize` is a 32 bits integer, `chunkdata` are plain bytes (as much as
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124
124
125 The current implementation always produces either zero or one chunk.
125 The current implementation always produces either zero or one chunk.
126 This is an implementation limitation that will ultimately be lifted.
126 This is an implementation limitation that will ultimately be lifted.
127
127
128 Bundle processing
128 Bundle processing
129 ============================
129 ============================
130
130
131 Each part is processed in order using a "part handler". Handler are registered
131 Each part is processed in order using a "part handler". Handler are registered
132 for a certain part type.
132 for a certain part type.
133
133
134 The matching of a part to its handler is case insensitive. The case of the
134 The matching of a part to its handler is case insensitive. The case of the
135 part type is used to know if a part is mandatory or advisory. If the Part type
135 part type is used to know if a part is mandatory or advisory. If the Part type
136 contains any uppercase char it is considered mandatory. When no handler is
136 contains any uppercase char it is considered mandatory. When no handler is
137 known for a Mandatory part, the process is aborted and an exception is raised.
137 known for a Mandatory part, the process is aborted and an exception is raised.
138 If the part is advisory and no handler is known, the part is ignored. When the
138 If the part is advisory and no handler is known, the part is ignored. When the
139 process is aborted, the full bundle is still read from the stream to keep the
139 process is aborted, the full bundle is still read from the stream to keep the
140 channel usable. But none of the part read from an abort are processed. In the
140 channel usable. But none of the part read from an abort are processed. In the
141 future, dropping the stream may become an option for channel we do not care to
141 future, dropping the stream may become an option for channel we do not care to
142 preserve.
142 preserve.
143 """
143 """
144
144
145 import util
145 import util
146 import struct
146 import struct
147 import urllib
147 import urllib
148 import string
148 import string
149 import obsolete
149 import obsolete
150 import pushkey
150 import pushkey
151
151
152 import changegroup, error
152 import changegroup, error
153 from i18n import _
153 from i18n import _
154
154
155 _pack = struct.pack
155 _pack = struct.pack
156 _unpack = struct.unpack
156 _unpack = struct.unpack
157
157
158 _magicstring = 'HG2X'
158 _magicstring = 'HG2X'
159
159
160 _fstreamparamsize = '>H'
160 _fstreamparamsize = '>H'
161 _fpartheadersize = '>H'
161 _fpartheadersize = '>H'
162 _fparttypesize = '>B'
162 _fparttypesize = '>B'
163 _fpartid = '>I'
163 _fpartid = '>I'
164 _fpayloadsize = '>I'
164 _fpayloadsize = '>I'
165 _fpartparamcount = '>BB'
165 _fpartparamcount = '>BB'
166
166
167 preferedchunksize = 4096
167 preferedchunksize = 4096
168
168
169 def _makefpartparamsizes(nbparams):
169 def _makefpartparamsizes(nbparams):
170 """return a struct format to read part parameter sizes
170 """return a struct format to read part parameter sizes
171
171
172 The number parameters is variable so we need to build that format
172 The number parameters is variable so we need to build that format
173 dynamically.
173 dynamically.
174 """
174 """
175 return '>'+('BB'*nbparams)
175 return '>'+('BB'*nbparams)
176
176
177 parthandlermapping = {}
177 parthandlermapping = {}
178
178
179 def parthandler(parttype, params=()):
179 def parthandler(parttype, params=()):
180 """decorator that register a function as a bundle2 part handler
180 """decorator that register a function as a bundle2 part handler
181
181
182 eg::
182 eg::
183
183
184 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
184 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
185 def myparttypehandler(...):
185 def myparttypehandler(...):
186 '''process a part of type "my part".'''
186 '''process a part of type "my part".'''
187 ...
187 ...
188 """
188 """
189 def _decorator(func):
189 def _decorator(func):
190 lparttype = parttype.lower() # enforce lower case matching.
190 lparttype = parttype.lower() # enforce lower case matching.
191 assert lparttype not in parthandlermapping
191 assert lparttype not in parthandlermapping
192 parthandlermapping[lparttype] = func
192 parthandlermapping[lparttype] = func
193 func.params = frozenset(params)
193 func.params = frozenset(params)
194 return func
194 return func
195 return _decorator
195 return _decorator
196
196
197 class unbundlerecords(object):
197 class unbundlerecords(object):
198 """keep record of what happens during and unbundle
198 """keep record of what happens during and unbundle
199
199
200 New records are added using `records.add('cat', obj)`. Where 'cat' is a
200 New records are added using `records.add('cat', obj)`. Where 'cat' is a
201 category of record and obj is an arbitrary object.
201 category of record and obj is an arbitrary object.
202
202
203 `records['cat']` will return all entries of this category 'cat'.
203 `records['cat']` will return all entries of this category 'cat'.
204
204
205 Iterating on the object itself will yield `('category', obj)` tuples
205 Iterating on the object itself will yield `('category', obj)` tuples
206 for all entries.
206 for all entries.
207
207
208 All iterations happens in chronological order.
208 All iterations happens in chronological order.
209 """
209 """
210
210
211 def __init__(self):
211 def __init__(self):
212 self._categories = {}
212 self._categories = {}
213 self._sequences = []
213 self._sequences = []
214 self._replies = {}
214 self._replies = {}
215
215
216 def add(self, category, entry, inreplyto=None):
216 def add(self, category, entry, inreplyto=None):
217 """add a new record of a given category.
217 """add a new record of a given category.
218
218
219 The entry can then be retrieved in the list returned by
219 The entry can then be retrieved in the list returned by
220 self['category']."""
220 self['category']."""
221 self._categories.setdefault(category, []).append(entry)
221 self._categories.setdefault(category, []).append(entry)
222 self._sequences.append((category, entry))
222 self._sequences.append((category, entry))
223 if inreplyto is not None:
223 if inreplyto is not None:
224 self.getreplies(inreplyto).add(category, entry)
224 self.getreplies(inreplyto).add(category, entry)
225
225
226 def getreplies(self, partid):
226 def getreplies(self, partid):
227 """get the subrecords that replies to a specific part"""
227 """get the subrecords that replies to a specific part"""
228 return self._replies.setdefault(partid, unbundlerecords())
228 return self._replies.setdefault(partid, unbundlerecords())
229
229
230 def __getitem__(self, cat):
230 def __getitem__(self, cat):
231 return tuple(self._categories.get(cat, ()))
231 return tuple(self._categories.get(cat, ()))
232
232
233 def __iter__(self):
233 def __iter__(self):
234 return iter(self._sequences)
234 return iter(self._sequences)
235
235
236 def __len__(self):
236 def __len__(self):
237 return len(self._sequences)
237 return len(self._sequences)
238
238
239 def __nonzero__(self):
239 def __nonzero__(self):
240 return bool(self._sequences)
240 return bool(self._sequences)
241
241
242 class bundleoperation(object):
242 class bundleoperation(object):
243 """an object that represents a single bundling process
243 """an object that represents a single bundling process
244
244
245 Its purpose is to carry unbundle-related objects and states.
245 Its purpose is to carry unbundle-related objects and states.
246
246
247 A new object should be created at the beginning of each bundle processing.
247 A new object should be created at the beginning of each bundle processing.
248 The object is to be returned by the processing function.
248 The object is to be returned by the processing function.
249
249
250 The object has very little content now it will ultimately contain:
250 The object has very little content now it will ultimately contain:
251 * an access to the repo the bundle is applied to,
251 * an access to the repo the bundle is applied to,
252 * a ui object,
252 * a ui object,
253 * a way to retrieve a transaction to add changes to the repo,
253 * a way to retrieve a transaction to add changes to the repo,
254 * a way to record the result of processing each part,
254 * a way to record the result of processing each part,
255 * a way to construct a bundle response when applicable.
255 * a way to construct a bundle response when applicable.
256 """
256 """
257
257
258 def __init__(self, repo, transactiongetter):
258 def __init__(self, repo, transactiongetter):
259 self.repo = repo
259 self.repo = repo
260 self.ui = repo.ui
260 self.ui = repo.ui
261 self.records = unbundlerecords()
261 self.records = unbundlerecords()
262 self.gettransaction = transactiongetter
262 self.gettransaction = transactiongetter
263 self.reply = None
263 self.reply = None
264
264
265 class TransactionUnavailable(RuntimeError):
265 class TransactionUnavailable(RuntimeError):
266 pass
266 pass
267
267
268 def _notransaction():
268 def _notransaction():
269 """default method to get a transaction while processing a bundle
269 """default method to get a transaction while processing a bundle
270
270
271 Raise an exception to highlight the fact that no transaction was expected
271 Raise an exception to highlight the fact that no transaction was expected
272 to be created"""
272 to be created"""
273 raise TransactionUnavailable()
273 raise TransactionUnavailable()
274
274
275 def processbundle(repo, unbundler, transactiongetter=_notransaction):
275 def processbundle(repo, unbundler, transactiongetter=_notransaction):
276 """This function process a bundle, apply effect to/from a repo
276 """This function process a bundle, apply effect to/from a repo
277
277
278 It iterates over each part then searches for and uses the proper handling
278 It iterates over each part then searches for and uses the proper handling
279 code to process the part. Parts are processed in order.
279 code to process the part. Parts are processed in order.
280
280
281 This is very early version of this function that will be strongly reworked
281 This is very early version of this function that will be strongly reworked
282 before final usage.
282 before final usage.
283
283
284 Unknown Mandatory part will abort the process.
284 Unknown Mandatory part will abort the process.
285 """
285 """
286 op = bundleoperation(repo, transactiongetter)
286 op = bundleoperation(repo, transactiongetter)
287 # todo:
287 # todo:
288 # - replace this is a init function soon.
288 # - replace this is a init function soon.
289 # - exception catching
289 # - exception catching
290 unbundler.params
290 unbundler.params
291 iterparts = unbundler.iterparts()
291 iterparts = unbundler.iterparts()
292 part = None
292 part = None
293 try:
293 try:
294 for part in iterparts:
294 for part in iterparts:
295 parttype = part.type
295 _processpart(op, part)
296 # part key are matched lower case
297 key = parttype.lower()
298 try:
299 handler = parthandlermapping.get(key)
300 if handler is None:
301 raise error.BundleValueError(parttype=key)
302 op.ui.debug('found a handler for part %r\n' % parttype)
303 unknownparams = part.mandatorykeys - handler.params
304 if unknownparams:
305 unknownparams = list(unknownparams)
306 unknownparams.sort()
307 raise error.BundleValueError(parttype=key,
308 params=unknownparams)
309 except error.BundleValueError, exc:
310 if key != parttype: # mandatory parts
311 raise
312 op.ui.debug('ignoring unsupported advisory part %s\n' % exc)
313 # consuming the part
314 part.read()
315 continue
316
317
318 # handler is called outside the above try block so that we don't
319 # risk catching KeyErrors from anything other than the
320 # parthandlermapping lookup (any KeyError raised by handler()
321 # itself represents a defect of a different variety).
322 output = None
323 if op.reply is not None:
324 op.ui.pushbuffer(error=True)
325 output = ''
326 try:
327 handler(op, part)
328 finally:
329 if output is not None:
330 output = op.ui.popbuffer()
331 if output:
332 outpart = op.reply.newpart('b2x:output', data=output)
333 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
334 part.read()
335 except Exception, exc:
296 except Exception, exc:
336 if part is not None:
337 # consume the bundle content
338 part.read()
339 for part in iterparts:
297 for part in iterparts:
340 # consume the bundle content
298 # consume the bundle content
341 part.read()
299 part.read()
342 # Small hack to let caller code distinguish exceptions from bundle2
300 # Small hack to let caller code distinguish exceptions from bundle2
343 # processing fron the ones from bundle1 processing. This is mostly
301 # processing fron the ones from bundle1 processing. This is mostly
344 # needed to handle different return codes to unbundle according to the
302 # needed to handle different return codes to unbundle according to the
345 # type of bundle. We should probably clean up or drop this return code
303 # type of bundle. We should probably clean up or drop this return code
346 # craziness in a future version.
304 # craziness in a future version.
347 exc.duringunbundle2 = True
305 exc.duringunbundle2 = True
348 raise
306 raise
349 return op
307 return op
350
308
309 def _processpart(op, part):
310 """process a single part from a bundle
311
312 The part is guaranteed to have been fully consumed when the function exits
313 (even if an exception is raised)."""
314 try:
315 parttype = part.type
316 # part key are matched lower case
317 key = parttype.lower()
318 try:
319 handler = parthandlermapping.get(key)
320 if handler is None:
321 raise error.BundleValueError(parttype=key)
322 op.ui.debug('found a handler for part %r\n' % parttype)
323 unknownparams = part.mandatorykeys - handler.params
324 if unknownparams:
325 unknownparams = list(unknownparams)
326 unknownparams.sort()
327 raise error.BundleValueError(parttype=key,
328 params=unknownparams)
329 except error.BundleValueError, exc:
330 if key != parttype: # mandatory parts
331 raise
332 op.ui.debug('ignoring unsupported advisory part %s\n' % exc)
333 return # skip to part processing
334
335 # handler is called outside the above try block so that we don't
336 # risk catching KeyErrors from anything other than the
337 # parthandlermapping lookup (any KeyError raised by handler()
338 # itself represents a defect of a different variety).
339 output = None
340 if op.reply is not None:
341 op.ui.pushbuffer(error=True)
342 output = ''
343 try:
344 handler(op, part)
345 finally:
346 if output is not None:
347 output = op.ui.popbuffer()
348 if output:
349 outpart = op.reply.newpart('b2x:output', data=output)
350 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
351 finally:
352 # consume the part content to not corrupt the stream.
353 part.read()
354
355
351 def decodecaps(blob):
356 def decodecaps(blob):
352 """decode a bundle2 caps bytes blob into a dictionnary
357 """decode a bundle2 caps bytes blob into a dictionnary
353
358
354 The blob is a list of capabilities (one per line)
359 The blob is a list of capabilities (one per line)
355 Capabilities may have values using a line of the form::
360 Capabilities may have values using a line of the form::
356
361
357 capability=value1,value2,value3
362 capability=value1,value2,value3
358
363
359 The values are always a list."""
364 The values are always a list."""
360 caps = {}
365 caps = {}
361 for line in blob.splitlines():
366 for line in blob.splitlines():
362 if not line:
367 if not line:
363 continue
368 continue
364 if '=' not in line:
369 if '=' not in line:
365 key, vals = line, ()
370 key, vals = line, ()
366 else:
371 else:
367 key, vals = line.split('=', 1)
372 key, vals = line.split('=', 1)
368 vals = vals.split(',')
373 vals = vals.split(',')
369 key = urllib.unquote(key)
374 key = urllib.unquote(key)
370 vals = [urllib.unquote(v) for v in vals]
375 vals = [urllib.unquote(v) for v in vals]
371 caps[key] = vals
376 caps[key] = vals
372 return caps
377 return caps
373
378
374 def encodecaps(caps):
379 def encodecaps(caps):
375 """encode a bundle2 caps dictionary into a bytes blob"""
380 """encode a bundle2 caps dictionary into a bytes blob"""
376 chunks = []
381 chunks = []
377 for ca in sorted(caps):
382 for ca in sorted(caps):
378 vals = caps[ca]
383 vals = caps[ca]
379 ca = urllib.quote(ca)
384 ca = urllib.quote(ca)
380 vals = [urllib.quote(v) for v in vals]
385 vals = [urllib.quote(v) for v in vals]
381 if vals:
386 if vals:
382 ca = "%s=%s" % (ca, ','.join(vals))
387 ca = "%s=%s" % (ca, ','.join(vals))
383 chunks.append(ca)
388 chunks.append(ca)
384 return '\n'.join(chunks)
389 return '\n'.join(chunks)
385
390
386 class bundle20(object):
391 class bundle20(object):
387 """represent an outgoing bundle2 container
392 """represent an outgoing bundle2 container
388
393
389 Use the `addparam` method to add stream level parameter. and `newpart` to
394 Use the `addparam` method to add stream level parameter. and `newpart` to
390 populate it. Then call `getchunks` to retrieve all the binary chunks of
395 populate it. Then call `getchunks` to retrieve all the binary chunks of
391 data that compose the bundle2 container."""
396 data that compose the bundle2 container."""
392
397
393 def __init__(self, ui, capabilities=()):
398 def __init__(self, ui, capabilities=()):
394 self.ui = ui
399 self.ui = ui
395 self._params = []
400 self._params = []
396 self._parts = []
401 self._parts = []
397 self.capabilities = dict(capabilities)
402 self.capabilities = dict(capabilities)
398
403
399 @property
404 @property
400 def nbparts(self):
405 def nbparts(self):
401 """total number of parts added to the bundler"""
406 """total number of parts added to the bundler"""
402 return len(self._parts)
407 return len(self._parts)
403
408
404 # methods used to defines the bundle2 content
409 # methods used to defines the bundle2 content
405 def addparam(self, name, value=None):
410 def addparam(self, name, value=None):
406 """add a stream level parameter"""
411 """add a stream level parameter"""
407 if not name:
412 if not name:
408 raise ValueError('empty parameter name')
413 raise ValueError('empty parameter name')
409 if name[0] not in string.letters:
414 if name[0] not in string.letters:
410 raise ValueError('non letter first character: %r' % name)
415 raise ValueError('non letter first character: %r' % name)
411 self._params.append((name, value))
416 self._params.append((name, value))
412
417
413 def addpart(self, part):
418 def addpart(self, part):
414 """add a new part to the bundle2 container
419 """add a new part to the bundle2 container
415
420
416 Parts contains the actual applicative payload."""
421 Parts contains the actual applicative payload."""
417 assert part.id is None
422 assert part.id is None
418 part.id = len(self._parts) # very cheap counter
423 part.id = len(self._parts) # very cheap counter
419 self._parts.append(part)
424 self._parts.append(part)
420
425
421 def newpart(self, typeid, *args, **kwargs):
426 def newpart(self, typeid, *args, **kwargs):
422 """create a new part and add it to the containers
427 """create a new part and add it to the containers
423
428
424 As the part is directly added to the containers. For now, this means
429 As the part is directly added to the containers. For now, this means
425 that any failure to properly initialize the part after calling
430 that any failure to properly initialize the part after calling
426 ``newpart`` should result in a failure of the whole bundling process.
431 ``newpart`` should result in a failure of the whole bundling process.
427
432
428 You can still fall back to manually create and add if you need better
433 You can still fall back to manually create and add if you need better
429 control."""
434 control."""
430 part = bundlepart(typeid, *args, **kwargs)
435 part = bundlepart(typeid, *args, **kwargs)
431 self.addpart(part)
436 self.addpart(part)
432 return part
437 return part
433
438
434 # methods used to generate the bundle2 stream
439 # methods used to generate the bundle2 stream
435 def getchunks(self):
440 def getchunks(self):
436 self.ui.debug('start emission of %s stream\n' % _magicstring)
441 self.ui.debug('start emission of %s stream\n' % _magicstring)
437 yield _magicstring
442 yield _magicstring
438 param = self._paramchunk()
443 param = self._paramchunk()
439 self.ui.debug('bundle parameter: %s\n' % param)
444 self.ui.debug('bundle parameter: %s\n' % param)
440 yield _pack(_fstreamparamsize, len(param))
445 yield _pack(_fstreamparamsize, len(param))
441 if param:
446 if param:
442 yield param
447 yield param
443
448
444 self.ui.debug('start of parts\n')
449 self.ui.debug('start of parts\n')
445 for part in self._parts:
450 for part in self._parts:
446 self.ui.debug('bundle part: "%s"\n' % part.type)
451 self.ui.debug('bundle part: "%s"\n' % part.type)
447 for chunk in part.getchunks():
452 for chunk in part.getchunks():
448 yield chunk
453 yield chunk
449 self.ui.debug('end of bundle\n')
454 self.ui.debug('end of bundle\n')
450 yield _pack(_fpartheadersize, 0)
455 yield _pack(_fpartheadersize, 0)
451
456
452 def _paramchunk(self):
457 def _paramchunk(self):
453 """return a encoded version of all stream parameters"""
458 """return a encoded version of all stream parameters"""
454 blocks = []
459 blocks = []
455 for par, value in self._params:
460 for par, value in self._params:
456 par = urllib.quote(par)
461 par = urllib.quote(par)
457 if value is not None:
462 if value is not None:
458 value = urllib.quote(value)
463 value = urllib.quote(value)
459 par = '%s=%s' % (par, value)
464 par = '%s=%s' % (par, value)
460 blocks.append(par)
465 blocks.append(par)
461 return ' '.join(blocks)
466 return ' '.join(blocks)
462
467
463 class unpackermixin(object):
468 class unpackermixin(object):
464 """A mixin to extract bytes and struct data from a stream"""
469 """A mixin to extract bytes and struct data from a stream"""
465
470
466 def __init__(self, fp):
471 def __init__(self, fp):
467 self._fp = fp
472 self._fp = fp
468
473
469 def _unpack(self, format):
474 def _unpack(self, format):
470 """unpack this struct format from the stream"""
475 """unpack this struct format from the stream"""
471 data = self._readexact(struct.calcsize(format))
476 data = self._readexact(struct.calcsize(format))
472 return _unpack(format, data)
477 return _unpack(format, data)
473
478
474 def _readexact(self, size):
479 def _readexact(self, size):
475 """read exactly <size> bytes from the stream"""
480 """read exactly <size> bytes from the stream"""
476 return changegroup.readexactly(self._fp, size)
481 return changegroup.readexactly(self._fp, size)
477
482
478
483
479 class unbundle20(unpackermixin):
484 class unbundle20(unpackermixin):
480 """interpret a bundle2 stream
485 """interpret a bundle2 stream
481
486
482 This class is fed with a binary stream and yields parts through its
487 This class is fed with a binary stream and yields parts through its
483 `iterparts` methods."""
488 `iterparts` methods."""
484
489
485 def __init__(self, ui, fp, header=None):
490 def __init__(self, ui, fp, header=None):
486 """If header is specified, we do not read it out of the stream."""
491 """If header is specified, we do not read it out of the stream."""
487 self.ui = ui
492 self.ui = ui
488 super(unbundle20, self).__init__(fp)
493 super(unbundle20, self).__init__(fp)
489 if header is None:
494 if header is None:
490 header = self._readexact(4)
495 header = self._readexact(4)
491 magic, version = header[0:2], header[2:4]
496 magic, version = header[0:2], header[2:4]
492 if magic != 'HG':
497 if magic != 'HG':
493 raise util.Abort(_('not a Mercurial bundle'))
498 raise util.Abort(_('not a Mercurial bundle'))
494 if version != '2X':
499 if version != '2X':
495 raise util.Abort(_('unknown bundle version %s') % version)
500 raise util.Abort(_('unknown bundle version %s') % version)
496 self.ui.debug('start processing of %s stream\n' % header)
501 self.ui.debug('start processing of %s stream\n' % header)
497
502
498 @util.propertycache
503 @util.propertycache
499 def params(self):
504 def params(self):
500 """dictionary of stream level parameters"""
505 """dictionary of stream level parameters"""
501 self.ui.debug('reading bundle2 stream parameters\n')
506 self.ui.debug('reading bundle2 stream parameters\n')
502 params = {}
507 params = {}
503 paramssize = self._unpack(_fstreamparamsize)[0]
508 paramssize = self._unpack(_fstreamparamsize)[0]
504 if paramssize:
509 if paramssize:
505 for p in self._readexact(paramssize).split(' '):
510 for p in self._readexact(paramssize).split(' '):
506 p = p.split('=', 1)
511 p = p.split('=', 1)
507 p = [urllib.unquote(i) for i in p]
512 p = [urllib.unquote(i) for i in p]
508 if len(p) < 2:
513 if len(p) < 2:
509 p.append(None)
514 p.append(None)
510 self._processparam(*p)
515 self._processparam(*p)
511 params[p[0]] = p[1]
516 params[p[0]] = p[1]
512 return params
517 return params
513
518
514 def _processparam(self, name, value):
519 def _processparam(self, name, value):
515 """process a parameter, applying its effect if needed
520 """process a parameter, applying its effect if needed
516
521
517 Parameter starting with a lower case letter are advisory and will be
522 Parameter starting with a lower case letter are advisory and will be
518 ignored when unknown. Those starting with an upper case letter are
523 ignored when unknown. Those starting with an upper case letter are
519 mandatory and will this function will raise a KeyError when unknown.
524 mandatory and will this function will raise a KeyError when unknown.
520
525
521 Note: no option are currently supported. Any input will be either
526 Note: no option are currently supported. Any input will be either
522 ignored or failing.
527 ignored or failing.
523 """
528 """
524 if not name:
529 if not name:
525 raise ValueError('empty parameter name')
530 raise ValueError('empty parameter name')
526 if name[0] not in string.letters:
531 if name[0] not in string.letters:
527 raise ValueError('non letter first character: %r' % name)
532 raise ValueError('non letter first character: %r' % name)
528 # Some logic will be later added here to try to process the option for
533 # Some logic will be later added here to try to process the option for
529 # a dict of known parameter.
534 # a dict of known parameter.
530 if name[0].islower():
535 if name[0].islower():
531 self.ui.debug("ignoring unknown parameter %r\n" % name)
536 self.ui.debug("ignoring unknown parameter %r\n" % name)
532 else:
537 else:
533 raise error.BundleValueError(params=(name,))
538 raise error.BundleValueError(params=(name,))
534
539
535
540
536 def iterparts(self):
541 def iterparts(self):
537 """yield all parts contained in the stream"""
542 """yield all parts contained in the stream"""
538 # make sure param have been loaded
543 # make sure param have been loaded
539 self.params
544 self.params
540 self.ui.debug('start extraction of bundle2 parts\n')
545 self.ui.debug('start extraction of bundle2 parts\n')
541 headerblock = self._readpartheader()
546 headerblock = self._readpartheader()
542 while headerblock is not None:
547 while headerblock is not None:
543 part = unbundlepart(self.ui, headerblock, self._fp)
548 part = unbundlepart(self.ui, headerblock, self._fp)
544 yield part
549 yield part
545 headerblock = self._readpartheader()
550 headerblock = self._readpartheader()
546 self.ui.debug('end of bundle2 stream\n')
551 self.ui.debug('end of bundle2 stream\n')
547
552
548 def _readpartheader(self):
553 def _readpartheader(self):
549 """reads a part header size and return the bytes blob
554 """reads a part header size and return the bytes blob
550
555
551 returns None if empty"""
556 returns None if empty"""
552 headersize = self._unpack(_fpartheadersize)[0]
557 headersize = self._unpack(_fpartheadersize)[0]
553 self.ui.debug('part header size: %i\n' % headersize)
558 self.ui.debug('part header size: %i\n' % headersize)
554 if headersize:
559 if headersize:
555 return self._readexact(headersize)
560 return self._readexact(headersize)
556 return None
561 return None
557
562
558
563
559 class bundlepart(object):
564 class bundlepart(object):
560 """A bundle2 part contains application level payload
565 """A bundle2 part contains application level payload
561
566
562 The part `type` is used to route the part to the application level
567 The part `type` is used to route the part to the application level
563 handler.
568 handler.
564
569
565 The part payload is contained in ``part.data``. It could be raw bytes or a
570 The part payload is contained in ``part.data``. It could be raw bytes or a
566 generator of byte chunks.
571 generator of byte chunks.
567
572
568 You can add parameters to the part using the ``addparam`` method.
573 You can add parameters to the part using the ``addparam`` method.
569 Parameters can be either mandatory (default) or advisory. Remote side
574 Parameters can be either mandatory (default) or advisory. Remote side
570 should be able to safely ignore the advisory ones.
575 should be able to safely ignore the advisory ones.
571
576
572 Both data and parameters cannot be modified after the generation has begun.
577 Both data and parameters cannot be modified after the generation has begun.
573 """
578 """
574
579
575 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
580 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
576 data=''):
581 data=''):
577 self.id = None
582 self.id = None
578 self.type = parttype
583 self.type = parttype
579 self._data = data
584 self._data = data
580 self._mandatoryparams = list(mandatoryparams)
585 self._mandatoryparams = list(mandatoryparams)
581 self._advisoryparams = list(advisoryparams)
586 self._advisoryparams = list(advisoryparams)
582 # checking for duplicated entries
587 # checking for duplicated entries
583 self._seenparams = set()
588 self._seenparams = set()
584 for pname, __ in self._mandatoryparams + self._advisoryparams:
589 for pname, __ in self._mandatoryparams + self._advisoryparams:
585 if pname in self._seenparams:
590 if pname in self._seenparams:
586 raise RuntimeError('duplicated params: %s' % pname)
591 raise RuntimeError('duplicated params: %s' % pname)
587 self._seenparams.add(pname)
592 self._seenparams.add(pname)
588 # status of the part's generation:
593 # status of the part's generation:
589 # - None: not started,
594 # - None: not started,
590 # - False: currently generated,
595 # - False: currently generated,
591 # - True: generation done.
596 # - True: generation done.
592 self._generated = None
597 self._generated = None
593
598
594 # methods used to defines the part content
599 # methods used to defines the part content
595 def __setdata(self, data):
600 def __setdata(self, data):
596 if self._generated is not None:
601 if self._generated is not None:
597 raise error.ReadOnlyPartError('part is being generated')
602 raise error.ReadOnlyPartError('part is being generated')
598 self._data = data
603 self._data = data
599 def __getdata(self):
604 def __getdata(self):
600 return self._data
605 return self._data
601 data = property(__getdata, __setdata)
606 data = property(__getdata, __setdata)
602
607
603 @property
608 @property
604 def mandatoryparams(self):
609 def mandatoryparams(self):
605 # make it an immutable tuple to force people through ``addparam``
610 # make it an immutable tuple to force people through ``addparam``
606 return tuple(self._mandatoryparams)
611 return tuple(self._mandatoryparams)
607
612
608 @property
613 @property
609 def advisoryparams(self):
614 def advisoryparams(self):
610 # make it an immutable tuple to force people through ``addparam``
615 # make it an immutable tuple to force people through ``addparam``
611 return tuple(self._advisoryparams)
616 return tuple(self._advisoryparams)
612
617
613 def addparam(self, name, value='', mandatory=True):
618 def addparam(self, name, value='', mandatory=True):
614 if self._generated is not None:
619 if self._generated is not None:
615 raise error.ReadOnlyPartError('part is being generated')
620 raise error.ReadOnlyPartError('part is being generated')
616 if name in self._seenparams:
621 if name in self._seenparams:
617 raise ValueError('duplicated params: %s' % name)
622 raise ValueError('duplicated params: %s' % name)
618 self._seenparams.add(name)
623 self._seenparams.add(name)
619 params = self._advisoryparams
624 params = self._advisoryparams
620 if mandatory:
625 if mandatory:
621 params = self._mandatoryparams
626 params = self._mandatoryparams
622 params.append((name, value))
627 params.append((name, value))
623
628
624 # methods used to generates the bundle2 stream
629 # methods used to generates the bundle2 stream
625 def getchunks(self):
630 def getchunks(self):
626 if self._generated is not None:
631 if self._generated is not None:
627 raise RuntimeError('part can only be consumed once')
632 raise RuntimeError('part can only be consumed once')
628 self._generated = False
633 self._generated = False
629 #### header
634 #### header
630 ## parttype
635 ## parttype
631 header = [_pack(_fparttypesize, len(self.type)),
636 header = [_pack(_fparttypesize, len(self.type)),
632 self.type, _pack(_fpartid, self.id),
637 self.type, _pack(_fpartid, self.id),
633 ]
638 ]
634 ## parameters
639 ## parameters
635 # count
640 # count
636 manpar = self.mandatoryparams
641 manpar = self.mandatoryparams
637 advpar = self.advisoryparams
642 advpar = self.advisoryparams
638 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
643 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
639 # size
644 # size
640 parsizes = []
645 parsizes = []
641 for key, value in manpar:
646 for key, value in manpar:
642 parsizes.append(len(key))
647 parsizes.append(len(key))
643 parsizes.append(len(value))
648 parsizes.append(len(value))
644 for key, value in advpar:
649 for key, value in advpar:
645 parsizes.append(len(key))
650 parsizes.append(len(key))
646 parsizes.append(len(value))
651 parsizes.append(len(value))
647 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
652 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
648 header.append(paramsizes)
653 header.append(paramsizes)
649 # key, value
654 # key, value
650 for key, value in manpar:
655 for key, value in manpar:
651 header.append(key)
656 header.append(key)
652 header.append(value)
657 header.append(value)
653 for key, value in advpar:
658 for key, value in advpar:
654 header.append(key)
659 header.append(key)
655 header.append(value)
660 header.append(value)
656 ## finalize header
661 ## finalize header
657 headerchunk = ''.join(header)
662 headerchunk = ''.join(header)
658 yield _pack(_fpartheadersize, len(headerchunk))
663 yield _pack(_fpartheadersize, len(headerchunk))
659 yield headerchunk
664 yield headerchunk
660 ## payload
665 ## payload
661 for chunk in self._payloadchunks():
666 for chunk in self._payloadchunks():
662 yield _pack(_fpayloadsize, len(chunk))
667 yield _pack(_fpayloadsize, len(chunk))
663 yield chunk
668 yield chunk
664 # end of payload
669 # end of payload
665 yield _pack(_fpayloadsize, 0)
670 yield _pack(_fpayloadsize, 0)
666 self._generated = True
671 self._generated = True
667
672
668 def _payloadchunks(self):
673 def _payloadchunks(self):
669 """yield chunks of a the part payload
674 """yield chunks of a the part payload
670
675
671 Exists to handle the different methods to provide data to a part."""
676 Exists to handle the different methods to provide data to a part."""
672 # we only support fixed size data now.
677 # we only support fixed size data now.
673 # This will be improved in the future.
678 # This will be improved in the future.
674 if util.safehasattr(self.data, 'next'):
679 if util.safehasattr(self.data, 'next'):
675 buff = util.chunkbuffer(self.data)
680 buff = util.chunkbuffer(self.data)
676 chunk = buff.read(preferedchunksize)
681 chunk = buff.read(preferedchunksize)
677 while chunk:
682 while chunk:
678 yield chunk
683 yield chunk
679 chunk = buff.read(preferedchunksize)
684 chunk = buff.read(preferedchunksize)
680 elif len(self.data):
685 elif len(self.data):
681 yield self.data
686 yield self.data
682
687
683 class unbundlepart(unpackermixin):
688 class unbundlepart(unpackermixin):
684 """a bundle part read from a bundle"""
689 """a bundle part read from a bundle"""
685
690
686 def __init__(self, ui, header, fp):
691 def __init__(self, ui, header, fp):
687 super(unbundlepart, self).__init__(fp)
692 super(unbundlepart, self).__init__(fp)
688 self.ui = ui
693 self.ui = ui
689 # unbundle state attr
694 # unbundle state attr
690 self._headerdata = header
695 self._headerdata = header
691 self._headeroffset = 0
696 self._headeroffset = 0
692 self._initialized = False
697 self._initialized = False
693 self.consumed = False
698 self.consumed = False
694 # part data
699 # part data
695 self.id = None
700 self.id = None
696 self.type = None
701 self.type = None
697 self.mandatoryparams = None
702 self.mandatoryparams = None
698 self.advisoryparams = None
703 self.advisoryparams = None
699 self.params = None
704 self.params = None
700 self.mandatorykeys = ()
705 self.mandatorykeys = ()
701 self._payloadstream = None
706 self._payloadstream = None
702 self._readheader()
707 self._readheader()
703
708
704 def _fromheader(self, size):
709 def _fromheader(self, size):
705 """return the next <size> byte from the header"""
710 """return the next <size> byte from the header"""
706 offset = self._headeroffset
711 offset = self._headeroffset
707 data = self._headerdata[offset:(offset + size)]
712 data = self._headerdata[offset:(offset + size)]
708 self._headeroffset = offset + size
713 self._headeroffset = offset + size
709 return data
714 return data
710
715
711 def _unpackheader(self, format):
716 def _unpackheader(self, format):
712 """read given format from header
717 """read given format from header
713
718
714 This automatically compute the size of the format to read."""
719 This automatically compute the size of the format to read."""
715 data = self._fromheader(struct.calcsize(format))
720 data = self._fromheader(struct.calcsize(format))
716 return _unpack(format, data)
721 return _unpack(format, data)
717
722
718 def _initparams(self, mandatoryparams, advisoryparams):
723 def _initparams(self, mandatoryparams, advisoryparams):
719 """internal function to setup all logic related parameters"""
724 """internal function to setup all logic related parameters"""
720 # make it read only to prevent people touching it by mistake.
725 # make it read only to prevent people touching it by mistake.
721 self.mandatoryparams = tuple(mandatoryparams)
726 self.mandatoryparams = tuple(mandatoryparams)
722 self.advisoryparams = tuple(advisoryparams)
727 self.advisoryparams = tuple(advisoryparams)
723 # user friendly UI
728 # user friendly UI
724 self.params = dict(self.mandatoryparams)
729 self.params = dict(self.mandatoryparams)
725 self.params.update(dict(self.advisoryparams))
730 self.params.update(dict(self.advisoryparams))
726 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
731 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
727
732
728 def _readheader(self):
733 def _readheader(self):
729 """read the header and setup the object"""
734 """read the header and setup the object"""
730 typesize = self._unpackheader(_fparttypesize)[0]
735 typesize = self._unpackheader(_fparttypesize)[0]
731 self.type = self._fromheader(typesize)
736 self.type = self._fromheader(typesize)
732 self.ui.debug('part type: "%s"\n' % self.type)
737 self.ui.debug('part type: "%s"\n' % self.type)
733 self.id = self._unpackheader(_fpartid)[0]
738 self.id = self._unpackheader(_fpartid)[0]
734 self.ui.debug('part id: "%s"\n' % self.id)
739 self.ui.debug('part id: "%s"\n' % self.id)
735 ## reading parameters
740 ## reading parameters
736 # param count
741 # param count
737 mancount, advcount = self._unpackheader(_fpartparamcount)
742 mancount, advcount = self._unpackheader(_fpartparamcount)
738 self.ui.debug('part parameters: %i\n' % (mancount + advcount))
743 self.ui.debug('part parameters: %i\n' % (mancount + advcount))
739 # param size
744 # param size
740 fparamsizes = _makefpartparamsizes(mancount + advcount)
745 fparamsizes = _makefpartparamsizes(mancount + advcount)
741 paramsizes = self._unpackheader(fparamsizes)
746 paramsizes = self._unpackheader(fparamsizes)
742 # make it a list of couple again
747 # make it a list of couple again
743 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
748 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
744 # split mandatory from advisory
749 # split mandatory from advisory
745 mansizes = paramsizes[:mancount]
750 mansizes = paramsizes[:mancount]
746 advsizes = paramsizes[mancount:]
751 advsizes = paramsizes[mancount:]
747 # retrive param value
752 # retrive param value
748 manparams = []
753 manparams = []
749 for key, value in mansizes:
754 for key, value in mansizes:
750 manparams.append((self._fromheader(key), self._fromheader(value)))
755 manparams.append((self._fromheader(key), self._fromheader(value)))
751 advparams = []
756 advparams = []
752 for key, value in advsizes:
757 for key, value in advsizes:
753 advparams.append((self._fromheader(key), self._fromheader(value)))
758 advparams.append((self._fromheader(key), self._fromheader(value)))
754 self._initparams(manparams, advparams)
759 self._initparams(manparams, advparams)
755 ## part payload
760 ## part payload
756 def payloadchunks():
761 def payloadchunks():
757 payloadsize = self._unpack(_fpayloadsize)[0]
762 payloadsize = self._unpack(_fpayloadsize)[0]
758 self.ui.debug('payload chunk size: %i\n' % payloadsize)
763 self.ui.debug('payload chunk size: %i\n' % payloadsize)
759 while payloadsize:
764 while payloadsize:
760 yield self._readexact(payloadsize)
765 yield self._readexact(payloadsize)
761 payloadsize = self._unpack(_fpayloadsize)[0]
766 payloadsize = self._unpack(_fpayloadsize)[0]
762 self.ui.debug('payload chunk size: %i\n' % payloadsize)
767 self.ui.debug('payload chunk size: %i\n' % payloadsize)
763 self._payloadstream = util.chunkbuffer(payloadchunks())
768 self._payloadstream = util.chunkbuffer(payloadchunks())
764 # we read the data, tell it
769 # we read the data, tell it
765 self._initialized = True
770 self._initialized = True
766
771
767 def read(self, size=None):
772 def read(self, size=None):
768 """read payload data"""
773 """read payload data"""
769 if not self._initialized:
774 if not self._initialized:
770 self._readheader()
775 self._readheader()
771 if size is None:
776 if size is None:
772 data = self._payloadstream.read()
777 data = self._payloadstream.read()
773 else:
778 else:
774 data = self._payloadstream.read(size)
779 data = self._payloadstream.read(size)
775 if size is None or len(data) < size:
780 if size is None or len(data) < size:
776 self.consumed = True
781 self.consumed = True
777 return data
782 return data
778
783
779 capabilities = {'HG2X': (),
784 capabilities = {'HG2X': (),
780 'b2x:listkeys': (),
785 'b2x:listkeys': (),
781 'b2x:pushkey': (),
786 'b2x:pushkey': (),
782 'b2x:changegroup': (),
787 'b2x:changegroup': (),
783 }
788 }
784
789
785 def getrepocaps(repo):
790 def getrepocaps(repo):
786 """return the bundle2 capabilities for a given repo
791 """return the bundle2 capabilities for a given repo
787
792
788 Exists to allow extensions (like evolution) to mutate the capabilities.
793 Exists to allow extensions (like evolution) to mutate the capabilities.
789 """
794 """
790 caps = capabilities.copy()
795 caps = capabilities.copy()
791 if obsolete.isenabled(repo, obsolete.exchangeopt):
796 if obsolete.isenabled(repo, obsolete.exchangeopt):
792 supportedformat = tuple('V%i' % v for v in obsolete.formats)
797 supportedformat = tuple('V%i' % v for v in obsolete.formats)
793 caps['b2x:obsmarkers'] = supportedformat
798 caps['b2x:obsmarkers'] = supportedformat
794 return caps
799 return caps
795
800
796 def bundle2caps(remote):
801 def bundle2caps(remote):
797 """return the bundlecapabilities of a peer as dict"""
802 """return the bundlecapabilities of a peer as dict"""
798 raw = remote.capable('bundle2-exp')
803 raw = remote.capable('bundle2-exp')
799 if not raw and raw != '':
804 if not raw and raw != '':
800 return {}
805 return {}
801 capsblob = urllib.unquote(remote.capable('bundle2-exp'))
806 capsblob = urllib.unquote(remote.capable('bundle2-exp'))
802 return decodecaps(capsblob)
807 return decodecaps(capsblob)
803
808
804 def obsmarkersversion(caps):
809 def obsmarkersversion(caps):
805 """extract the list of supported obsmarkers versions from a bundle2caps dict
810 """extract the list of supported obsmarkers versions from a bundle2caps dict
806 """
811 """
807 obscaps = caps.get('b2x:obsmarkers', ())
812 obscaps = caps.get('b2x:obsmarkers', ())
808 return [int(c[1:]) for c in obscaps if c.startswith('V')]
813 return [int(c[1:]) for c in obscaps if c.startswith('V')]
809
814
810 @parthandler('b2x:changegroup')
815 @parthandler('b2x:changegroup')
811 def handlechangegroup(op, inpart):
816 def handlechangegroup(op, inpart):
812 """apply a changegroup part on the repo
817 """apply a changegroup part on the repo
813
818
814 This is a very early implementation that will massive rework before being
819 This is a very early implementation that will massive rework before being
815 inflicted to any end-user.
820 inflicted to any end-user.
816 """
821 """
817 # Make sure we trigger a transaction creation
822 # Make sure we trigger a transaction creation
818 #
823 #
819 # The addchangegroup function will get a transaction object by itself, but
824 # The addchangegroup function will get a transaction object by itself, but
820 # we need to make sure we trigger the creation of a transaction object used
825 # we need to make sure we trigger the creation of a transaction object used
821 # for the whole processing scope.
826 # for the whole processing scope.
822 op.gettransaction()
827 op.gettransaction()
823 cg = changegroup.cg1unpacker(inpart, 'UN')
828 cg = changegroup.cg1unpacker(inpart, 'UN')
824 # the source and url passed here are overwritten by the one contained in
829 # the source and url passed here are overwritten by the one contained in
825 # the transaction.hookargs argument. So 'bundle2' is a placeholder
830 # the transaction.hookargs argument. So 'bundle2' is a placeholder
826 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
831 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
827 op.records.add('changegroup', {'return': ret})
832 op.records.add('changegroup', {'return': ret})
828 if op.reply is not None:
833 if op.reply is not None:
829 # This is definitly not the final form of this
834 # This is definitly not the final form of this
830 # return. But one need to start somewhere.
835 # return. But one need to start somewhere.
831 part = op.reply.newpart('b2x:reply:changegroup')
836 part = op.reply.newpart('b2x:reply:changegroup')
832 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
837 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
833 part.addparam('return', '%i' % ret, mandatory=False)
838 part.addparam('return', '%i' % ret, mandatory=False)
834 assert not inpart.read()
839 assert not inpart.read()
835
840
836 @parthandler('b2x:reply:changegroup', ('return', 'in-reply-to'))
841 @parthandler('b2x:reply:changegroup', ('return', 'in-reply-to'))
837 def handlereplychangegroup(op, inpart):
842 def handlereplychangegroup(op, inpart):
838 ret = int(inpart.params['return'])
843 ret = int(inpart.params['return'])
839 replyto = int(inpart.params['in-reply-to'])
844 replyto = int(inpart.params['in-reply-to'])
840 op.records.add('changegroup', {'return': ret}, replyto)
845 op.records.add('changegroup', {'return': ret}, replyto)
841
846
842 @parthandler('b2x:check:heads')
847 @parthandler('b2x:check:heads')
843 def handlecheckheads(op, inpart):
848 def handlecheckheads(op, inpart):
844 """check that head of the repo did not change
849 """check that head of the repo did not change
845
850
846 This is used to detect a push race when using unbundle.
851 This is used to detect a push race when using unbundle.
847 This replaces the "heads" argument of unbundle."""
852 This replaces the "heads" argument of unbundle."""
848 h = inpart.read(20)
853 h = inpart.read(20)
849 heads = []
854 heads = []
850 while len(h) == 20:
855 while len(h) == 20:
851 heads.append(h)
856 heads.append(h)
852 h = inpart.read(20)
857 h = inpart.read(20)
853 assert not h
858 assert not h
854 if heads != op.repo.heads():
859 if heads != op.repo.heads():
855 raise error.PushRaced('repository changed while pushing - '
860 raise error.PushRaced('repository changed while pushing - '
856 'please try again')
861 'please try again')
857
862
858 @parthandler('b2x:output')
863 @parthandler('b2x:output')
859 def handleoutput(op, inpart):
864 def handleoutput(op, inpart):
860 """forward output captured on the server to the client"""
865 """forward output captured on the server to the client"""
861 for line in inpart.read().splitlines():
866 for line in inpart.read().splitlines():
862 op.ui.write(('remote: %s\n' % line))
867 op.ui.write(('remote: %s\n' % line))
863
868
864 @parthandler('b2x:replycaps')
869 @parthandler('b2x:replycaps')
865 def handlereplycaps(op, inpart):
870 def handlereplycaps(op, inpart):
866 """Notify that a reply bundle should be created
871 """Notify that a reply bundle should be created
867
872
868 The payload contains the capabilities information for the reply"""
873 The payload contains the capabilities information for the reply"""
869 caps = decodecaps(inpart.read())
874 caps = decodecaps(inpart.read())
870 if op.reply is None:
875 if op.reply is None:
871 op.reply = bundle20(op.ui, caps)
876 op.reply = bundle20(op.ui, caps)
872
877
873 @parthandler('b2x:error:abort', ('message', 'hint'))
878 @parthandler('b2x:error:abort', ('message', 'hint'))
874 def handlereplycaps(op, inpart):
879 def handlereplycaps(op, inpart):
875 """Used to transmit abort error over the wire"""
880 """Used to transmit abort error over the wire"""
876 raise util.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
881 raise util.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
877
882
878 @parthandler('b2x:error:unsupportedcontent', ('parttype', 'params'))
883 @parthandler('b2x:error:unsupportedcontent', ('parttype', 'params'))
879 def handlereplycaps(op, inpart):
884 def handlereplycaps(op, inpart):
880 """Used to transmit unknown content error over the wire"""
885 """Used to transmit unknown content error over the wire"""
881 kwargs = {}
886 kwargs = {}
882 parttype = inpart.params.get('parttype')
887 parttype = inpart.params.get('parttype')
883 if parttype is not None:
888 if parttype is not None:
884 kwargs['parttype'] = parttype
889 kwargs['parttype'] = parttype
885 params = inpart.params.get('params')
890 params = inpart.params.get('params')
886 if params is not None:
891 if params is not None:
887 kwargs['params'] = params.split('\0')
892 kwargs['params'] = params.split('\0')
888
893
889 raise error.BundleValueError(**kwargs)
894 raise error.BundleValueError(**kwargs)
890
895
891 @parthandler('b2x:error:pushraced', ('message',))
896 @parthandler('b2x:error:pushraced', ('message',))
892 def handlereplycaps(op, inpart):
897 def handlereplycaps(op, inpart):
893 """Used to transmit push race error over the wire"""
898 """Used to transmit push race error over the wire"""
894 raise error.ResponseError(_('push failed:'), inpart.params['message'])
899 raise error.ResponseError(_('push failed:'), inpart.params['message'])
895
900
896 @parthandler('b2x:listkeys', ('namespace',))
901 @parthandler('b2x:listkeys', ('namespace',))
897 def handlelistkeys(op, inpart):
902 def handlelistkeys(op, inpart):
898 """retrieve pushkey namespace content stored in a bundle2"""
903 """retrieve pushkey namespace content stored in a bundle2"""
899 namespace = inpart.params['namespace']
904 namespace = inpart.params['namespace']
900 r = pushkey.decodekeys(inpart.read())
905 r = pushkey.decodekeys(inpart.read())
901 op.records.add('listkeys', (namespace, r))
906 op.records.add('listkeys', (namespace, r))
902
907
903 @parthandler('b2x:pushkey', ('namespace', 'key', 'old', 'new'))
908 @parthandler('b2x:pushkey', ('namespace', 'key', 'old', 'new'))
904 def handlepushkey(op, inpart):
909 def handlepushkey(op, inpart):
905 """process a pushkey request"""
910 """process a pushkey request"""
906 dec = pushkey.decode
911 dec = pushkey.decode
907 namespace = dec(inpart.params['namespace'])
912 namespace = dec(inpart.params['namespace'])
908 key = dec(inpart.params['key'])
913 key = dec(inpart.params['key'])
909 old = dec(inpart.params['old'])
914 old = dec(inpart.params['old'])
910 new = dec(inpart.params['new'])
915 new = dec(inpart.params['new'])
911 ret = op.repo.pushkey(namespace, key, old, new)
916 ret = op.repo.pushkey(namespace, key, old, new)
912 record = {'namespace': namespace,
917 record = {'namespace': namespace,
913 'key': key,
918 'key': key,
914 'old': old,
919 'old': old,
915 'new': new}
920 'new': new}
916 op.records.add('pushkey', record)
921 op.records.add('pushkey', record)
917 if op.reply is not None:
922 if op.reply is not None:
918 rpart = op.reply.newpart('b2x:reply:pushkey')
923 rpart = op.reply.newpart('b2x:reply:pushkey')
919 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
924 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
920 rpart.addparam('return', '%i' % ret, mandatory=False)
925 rpart.addparam('return', '%i' % ret, mandatory=False)
921
926
922 @parthandler('b2x:reply:pushkey', ('return', 'in-reply-to'))
927 @parthandler('b2x:reply:pushkey', ('return', 'in-reply-to'))
923 def handlepushkeyreply(op, inpart):
928 def handlepushkeyreply(op, inpart):
924 """retrieve the result of a pushkey request"""
929 """retrieve the result of a pushkey request"""
925 ret = int(inpart.params['return'])
930 ret = int(inpart.params['return'])
926 partid = int(inpart.params['in-reply-to'])
931 partid = int(inpart.params['in-reply-to'])
927 op.records.add('pushkey', {'return': ret}, partid)
932 op.records.add('pushkey', {'return': ret}, partid)
928
933
929 @parthandler('b2x:obsmarkers')
934 @parthandler('b2x:obsmarkers')
930 def handleobsmarker(op, inpart):
935 def handleobsmarker(op, inpart):
931 """add a stream of obsmarkers to the repo"""
936 """add a stream of obsmarkers to the repo"""
932 tr = op.gettransaction()
937 tr = op.gettransaction()
933 new = op.repo.obsstore.mergemarkers(tr, inpart.read())
938 new = op.repo.obsstore.mergemarkers(tr, inpart.read())
934 if new:
939 if new:
935 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
940 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
936 op.records.add('obsmarkers', {'new': new})
941 op.records.add('obsmarkers', {'new': new})
937 if op.reply is not None:
942 if op.reply is not None:
938 rpart = op.reply.newpart('b2x:reply:obsmarkers')
943 rpart = op.reply.newpart('b2x:reply:obsmarkers')
939 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
944 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
940 rpart.addparam('new', '%i' % new, mandatory=False)
945 rpart.addparam('new', '%i' % new, mandatory=False)
941
946
942
947
943 @parthandler('b2x:reply:obsmarkers', ('new', 'in-reply-to'))
948 @parthandler('b2x:reply:obsmarkers', ('new', 'in-reply-to'))
944 def handlepushkeyreply(op, inpart):
949 def handlepushkeyreply(op, inpart):
945 """retrieve the result of a pushkey request"""
950 """retrieve the result of a pushkey request"""
946 ret = int(inpart.params['new'])
951 ret = int(inpart.params['new'])
947 partid = int(inpart.params['in-reply-to'])
952 partid = int(inpart.params['in-reply-to'])
948 op.records.add('obsmarkers', {'new': ret}, partid)
953 op.records.add('obsmarkers', {'new': ret}, partid)
General Comments 0
You need to be logged in to leave comments. Login now