##// END OF EJS Templates
bundle2: add a ``bundle20.nbparts`` property...
Pierre-Yves David -
r21900:b8bd9708 default
parent child Browse files
Show More
@@ -1,896 +1,901 b''
1 # bundle2.py - generic container format to transmit arbitrary data.
1 # bundle2.py - generic container format to transmit arbitrary data.
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7 """Handling of the new bundle2 format
7 """Handling of the new bundle2 format
8
8
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 payloads in an application agnostic way. It consist in a sequence of "parts"
10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 that will be handed to and processed by the application layer.
11 that will be handed to and processed by the application layer.
12
12
13
13
14 General format architecture
14 General format architecture
15 ===========================
15 ===========================
16
16
17 The format is architectured as follow
17 The format is architectured as follow
18
18
19 - magic string
19 - magic string
20 - stream level parameters
20 - stream level parameters
21 - payload parts (any number)
21 - payload parts (any number)
22 - end of stream marker.
22 - end of stream marker.
23
23
24 the Binary format
24 the Binary format
25 ============================
25 ============================
26
26
27 All numbers are unsigned and big-endian.
27 All numbers are unsigned and big-endian.
28
28
29 stream level parameters
29 stream level parameters
30 ------------------------
30 ------------------------
31
31
32 Binary format is as follow
32 Binary format is as follow
33
33
34 :params size: (16 bits integer)
34 :params size: (16 bits integer)
35
35
36 The total number of Bytes used by the parameters
36 The total number of Bytes used by the parameters
37
37
38 :params value: arbitrary number of Bytes
38 :params value: arbitrary number of Bytes
39
39
40 A blob of `params size` containing the serialized version of all stream level
40 A blob of `params size` containing the serialized version of all stream level
41 parameters.
41 parameters.
42
42
43 The blob contains a space separated list of parameters. Parameters with value
43 The blob contains a space separated list of parameters. Parameters with value
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45
45
46 Empty name are obviously forbidden.
46 Empty name are obviously forbidden.
47
47
48 Name MUST start with a letter. If this first letter is lower case, the
48 Name MUST start with a letter. If this first letter is lower case, the
49 parameter is advisory and can be safely ignored. However when the first
49 parameter is advisory and can be safely ignored. However when the first
50 letter is capital, the parameter is mandatory and the bundling process MUST
50 letter is capital, the parameter is mandatory and the bundling process MUST
51 stop if he is not able to proceed it.
51 stop if he is not able to proceed it.
52
52
53 Stream parameters use a simple textual format for two main reasons:
53 Stream parameters use a simple textual format for two main reasons:
54
54
55 - Stream level parameters should remain simple and we want to discourage any
55 - Stream level parameters should remain simple and we want to discourage any
56 crazy usage.
56 crazy usage.
57 - Textual data allow easy human inspection of a bundle2 header in case of
57 - Textual data allow easy human inspection of a bundle2 header in case of
58 troubles.
58 troubles.
59
59
60 Any Applicative level options MUST go into a bundle2 part instead.
60 Any Applicative level options MUST go into a bundle2 part instead.
61
61
62 Payload part
62 Payload part
63 ------------------------
63 ------------------------
64
64
65 Binary format is as follow
65 Binary format is as follow
66
66
67 :header size: (16 bits inter)
67 :header size: (16 bits inter)
68
68
69 The total number of Bytes used by the part headers. When the header is empty
69 The total number of Bytes used by the part headers. When the header is empty
70 (size = 0) this is interpreted as the end of stream marker.
70 (size = 0) this is interpreted as the end of stream marker.
71
71
72 :header:
72 :header:
73
73
74 The header defines how to interpret the part. It contains two piece of
74 The header defines how to interpret the part. It contains two piece of
75 data: the part type, and the part parameters.
75 data: the part type, and the part parameters.
76
76
77 The part type is used to route an application level handler, that can
77 The part type is used to route an application level handler, that can
78 interpret payload.
78 interpret payload.
79
79
80 Part parameters are passed to the application level handler. They are
80 Part parameters are passed to the application level handler. They are
81 meant to convey information that will help the application level object to
81 meant to convey information that will help the application level object to
82 interpret the part payload.
82 interpret the part payload.
83
83
84 The binary format of the header is has follow
84 The binary format of the header is has follow
85
85
86 :typesize: (one byte)
86 :typesize: (one byte)
87
87
88 :parttype: alphanumerical part name
88 :parttype: alphanumerical part name
89
89
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 to this part.
91 to this part.
92
92
93 :parameters:
93 :parameters:
94
94
95 Part's parameter may have arbitrary content, the binary structure is::
95 Part's parameter may have arbitrary content, the binary structure is::
96
96
97 <mandatory-count><advisory-count><param-sizes><param-data>
97 <mandatory-count><advisory-count><param-sizes><param-data>
98
98
99 :mandatory-count: 1 byte, number of mandatory parameters
99 :mandatory-count: 1 byte, number of mandatory parameters
100
100
101 :advisory-count: 1 byte, number of advisory parameters
101 :advisory-count: 1 byte, number of advisory parameters
102
102
103 :param-sizes:
103 :param-sizes:
104
104
105 N couple of bytes, where N is the total number of parameters. Each
105 N couple of bytes, where N is the total number of parameters. Each
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107
107
108 :param-data:
108 :param-data:
109
109
110 A blob of bytes from which each parameter key and value can be
110 A blob of bytes from which each parameter key and value can be
111 retrieved using the list of size couples stored in the previous
111 retrieved using the list of size couples stored in the previous
112 field.
112 field.
113
113
114 Mandatory parameters comes first, then the advisory ones.
114 Mandatory parameters comes first, then the advisory ones.
115
115
116 Each parameter's key MUST be unique within the part.
116 Each parameter's key MUST be unique within the part.
117
117
118 :payload:
118 :payload:
119
119
120 payload is a series of `<chunksize><chunkdata>`.
120 payload is a series of `<chunksize><chunkdata>`.
121
121
122 `chunksize` is a 32 bits integer, `chunkdata` are plain bytes (as much as
122 `chunksize` is a 32 bits integer, `chunkdata` are plain bytes (as much as
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124
124
125 The current implementation always produces either zero or one chunk.
125 The current implementation always produces either zero or one chunk.
126 This is an implementation limitation that will ultimately be lifted.
126 This is an implementation limitation that will ultimately be lifted.
127
127
128 Bundle processing
128 Bundle processing
129 ============================
129 ============================
130
130
131 Each part is processed in order using a "part handler". Handler are registered
131 Each part is processed in order using a "part handler". Handler are registered
132 for a certain part type.
132 for a certain part type.
133
133
134 The matching of a part to its handler is case insensitive. The case of the
134 The matching of a part to its handler is case insensitive. The case of the
135 part type is used to know if a part is mandatory or advisory. If the Part type
135 part type is used to know if a part is mandatory or advisory. If the Part type
136 contains any uppercase char it is considered mandatory. When no handler is
136 contains any uppercase char it is considered mandatory. When no handler is
137 known for a Mandatory part, the process is aborted and an exception is raised.
137 known for a Mandatory part, the process is aborted and an exception is raised.
138 If the part is advisory and no handler is known, the part is ignored. When the
138 If the part is advisory and no handler is known, the part is ignored. When the
139 process is aborted, the full bundle is still read from the stream to keep the
139 process is aborted, the full bundle is still read from the stream to keep the
140 channel usable. But none of the part read from an abort are processed. In the
140 channel usable. But none of the part read from an abort are processed. In the
141 future, dropping the stream may become an option for channel we do not care to
141 future, dropping the stream may become an option for channel we do not care to
142 preserve.
142 preserve.
143 """
143 """
144
144
145 import util
145 import util
146 import struct
146 import struct
147 import urllib
147 import urllib
148 import string
148 import string
149 import pushkey
149 import pushkey
150
150
151 import changegroup, error
151 import changegroup, error
152 from i18n import _
152 from i18n import _
153
153
154 _pack = struct.pack
154 _pack = struct.pack
155 _unpack = struct.unpack
155 _unpack = struct.unpack
156
156
157 _magicstring = 'HG2X'
157 _magicstring = 'HG2X'
158
158
159 _fstreamparamsize = '>H'
159 _fstreamparamsize = '>H'
160 _fpartheadersize = '>H'
160 _fpartheadersize = '>H'
161 _fparttypesize = '>B'
161 _fparttypesize = '>B'
162 _fpartid = '>I'
162 _fpartid = '>I'
163 _fpayloadsize = '>I'
163 _fpayloadsize = '>I'
164 _fpartparamcount = '>BB'
164 _fpartparamcount = '>BB'
165
165
166 preferedchunksize = 4096
166 preferedchunksize = 4096
167
167
168 def _makefpartparamsizes(nbparams):
168 def _makefpartparamsizes(nbparams):
169 """return a struct format to read part parameter sizes
169 """return a struct format to read part parameter sizes
170
170
171 The number parameters is variable so we need to build that format
171 The number parameters is variable so we need to build that format
172 dynamically.
172 dynamically.
173 """
173 """
174 return '>'+('BB'*nbparams)
174 return '>'+('BB'*nbparams)
175
175
176 parthandlermapping = {}
176 parthandlermapping = {}
177
177
178 def parthandler(parttype, params=()):
178 def parthandler(parttype, params=()):
179 """decorator that register a function as a bundle2 part handler
179 """decorator that register a function as a bundle2 part handler
180
180
181 eg::
181 eg::
182
182
183 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
183 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
184 def myparttypehandler(...):
184 def myparttypehandler(...):
185 '''process a part of type "my part".'''
185 '''process a part of type "my part".'''
186 ...
186 ...
187 """
187 """
188 def _decorator(func):
188 def _decorator(func):
189 lparttype = parttype.lower() # enforce lower case matching.
189 lparttype = parttype.lower() # enforce lower case matching.
190 assert lparttype not in parthandlermapping
190 assert lparttype not in parthandlermapping
191 parthandlermapping[lparttype] = func
191 parthandlermapping[lparttype] = func
192 func.params = frozenset(params)
192 func.params = frozenset(params)
193 return func
193 return func
194 return _decorator
194 return _decorator
195
195
196 class unbundlerecords(object):
196 class unbundlerecords(object):
197 """keep record of what happens during and unbundle
197 """keep record of what happens during and unbundle
198
198
199 New records are added using `records.add('cat', obj)`. Where 'cat' is a
199 New records are added using `records.add('cat', obj)`. Where 'cat' is a
200 category of record and obj is an arbitrary object.
200 category of record and obj is an arbitrary object.
201
201
202 `records['cat']` will return all entries of this category 'cat'.
202 `records['cat']` will return all entries of this category 'cat'.
203
203
204 Iterating on the object itself will yield `('category', obj)` tuples
204 Iterating on the object itself will yield `('category', obj)` tuples
205 for all entries.
205 for all entries.
206
206
207 All iterations happens in chronological order.
207 All iterations happens in chronological order.
208 """
208 """
209
209
210 def __init__(self):
210 def __init__(self):
211 self._categories = {}
211 self._categories = {}
212 self._sequences = []
212 self._sequences = []
213 self._replies = {}
213 self._replies = {}
214
214
215 def add(self, category, entry, inreplyto=None):
215 def add(self, category, entry, inreplyto=None):
216 """add a new record of a given category.
216 """add a new record of a given category.
217
217
218 The entry can then be retrieved in the list returned by
218 The entry can then be retrieved in the list returned by
219 self['category']."""
219 self['category']."""
220 self._categories.setdefault(category, []).append(entry)
220 self._categories.setdefault(category, []).append(entry)
221 self._sequences.append((category, entry))
221 self._sequences.append((category, entry))
222 if inreplyto is not None:
222 if inreplyto is not None:
223 self.getreplies(inreplyto).add(category, entry)
223 self.getreplies(inreplyto).add(category, entry)
224
224
225 def getreplies(self, partid):
225 def getreplies(self, partid):
226 """get the subrecords that replies to a specific part"""
226 """get the subrecords that replies to a specific part"""
227 return self._replies.setdefault(partid, unbundlerecords())
227 return self._replies.setdefault(partid, unbundlerecords())
228
228
229 def __getitem__(self, cat):
229 def __getitem__(self, cat):
230 return tuple(self._categories.get(cat, ()))
230 return tuple(self._categories.get(cat, ()))
231
231
232 def __iter__(self):
232 def __iter__(self):
233 return iter(self._sequences)
233 return iter(self._sequences)
234
234
235 def __len__(self):
235 def __len__(self):
236 return len(self._sequences)
236 return len(self._sequences)
237
237
238 def __nonzero__(self):
238 def __nonzero__(self):
239 return bool(self._sequences)
239 return bool(self._sequences)
240
240
241 class bundleoperation(object):
241 class bundleoperation(object):
242 """an object that represents a single bundling process
242 """an object that represents a single bundling process
243
243
244 Its purpose is to carry unbundle-related objects and states.
244 Its purpose is to carry unbundle-related objects and states.
245
245
246 A new object should be created at the beginning of each bundle processing.
246 A new object should be created at the beginning of each bundle processing.
247 The object is to be returned by the processing function.
247 The object is to be returned by the processing function.
248
248
249 The object has very little content now it will ultimately contain:
249 The object has very little content now it will ultimately contain:
250 * an access to the repo the bundle is applied to,
250 * an access to the repo the bundle is applied to,
251 * a ui object,
251 * a ui object,
252 * a way to retrieve a transaction to add changes to the repo,
252 * a way to retrieve a transaction to add changes to the repo,
253 * a way to record the result of processing each part,
253 * a way to record the result of processing each part,
254 * a way to construct a bundle response when applicable.
254 * a way to construct a bundle response when applicable.
255 """
255 """
256
256
257 def __init__(self, repo, transactiongetter):
257 def __init__(self, repo, transactiongetter):
258 self.repo = repo
258 self.repo = repo
259 self.ui = repo.ui
259 self.ui = repo.ui
260 self.records = unbundlerecords()
260 self.records = unbundlerecords()
261 self.gettransaction = transactiongetter
261 self.gettransaction = transactiongetter
262 self.reply = None
262 self.reply = None
263
263
264 class TransactionUnavailable(RuntimeError):
264 class TransactionUnavailable(RuntimeError):
265 pass
265 pass
266
266
267 def _notransaction():
267 def _notransaction():
268 """default method to get a transaction while processing a bundle
268 """default method to get a transaction while processing a bundle
269
269
270 Raise an exception to highlight the fact that no transaction was expected
270 Raise an exception to highlight the fact that no transaction was expected
271 to be created"""
271 to be created"""
272 raise TransactionUnavailable()
272 raise TransactionUnavailable()
273
273
274 def processbundle(repo, unbundler, transactiongetter=_notransaction):
274 def processbundle(repo, unbundler, transactiongetter=_notransaction):
275 """This function process a bundle, apply effect to/from a repo
275 """This function process a bundle, apply effect to/from a repo
276
276
277 It iterates over each part then searches for and uses the proper handling
277 It iterates over each part then searches for and uses the proper handling
278 code to process the part. Parts are processed in order.
278 code to process the part. Parts are processed in order.
279
279
280 This is very early version of this function that will be strongly reworked
280 This is very early version of this function that will be strongly reworked
281 before final usage.
281 before final usage.
282
282
283 Unknown Mandatory part will abort the process.
283 Unknown Mandatory part will abort the process.
284 """
284 """
285 op = bundleoperation(repo, transactiongetter)
285 op = bundleoperation(repo, transactiongetter)
286 # todo:
286 # todo:
287 # - replace this is a init function soon.
287 # - replace this is a init function soon.
288 # - exception catching
288 # - exception catching
289 unbundler.params
289 unbundler.params
290 iterparts = unbundler.iterparts()
290 iterparts = unbundler.iterparts()
291 part = None
291 part = None
292 try:
292 try:
293 for part in iterparts:
293 for part in iterparts:
294 parttype = part.type
294 parttype = part.type
295 # part key are matched lower case
295 # part key are matched lower case
296 key = parttype.lower()
296 key = parttype.lower()
297 try:
297 try:
298 handler = parthandlermapping.get(key)
298 handler = parthandlermapping.get(key)
299 if handler is None:
299 if handler is None:
300 raise error.BundleValueError(parttype=key)
300 raise error.BundleValueError(parttype=key)
301 op.ui.debug('found a handler for part %r\n' % parttype)
301 op.ui.debug('found a handler for part %r\n' % parttype)
302 unknownparams = part.mandatorykeys - handler.params
302 unknownparams = part.mandatorykeys - handler.params
303 if unknownparams:
303 if unknownparams:
304 unknownparams = list(unknownparams)
304 unknownparams = list(unknownparams)
305 unknownparams.sort()
305 unknownparams.sort()
306 raise error.BundleValueError(parttype=key,
306 raise error.BundleValueError(parttype=key,
307 params=unknownparams)
307 params=unknownparams)
308 except error.BundleValueError, exc:
308 except error.BundleValueError, exc:
309 if key != parttype: # mandatory parts
309 if key != parttype: # mandatory parts
310 raise
310 raise
311 op.ui.debug('ignoring unsupported advisory part %s\n' % exc)
311 op.ui.debug('ignoring unsupported advisory part %s\n' % exc)
312 # consuming the part
312 # consuming the part
313 part.read()
313 part.read()
314 continue
314 continue
315
315
316
316
317 # handler is called outside the above try block so that we don't
317 # handler is called outside the above try block so that we don't
318 # risk catching KeyErrors from anything other than the
318 # risk catching KeyErrors from anything other than the
319 # parthandlermapping lookup (any KeyError raised by handler()
319 # parthandlermapping lookup (any KeyError raised by handler()
320 # itself represents a defect of a different variety).
320 # itself represents a defect of a different variety).
321 output = None
321 output = None
322 if op.reply is not None:
322 if op.reply is not None:
323 op.ui.pushbuffer(error=True)
323 op.ui.pushbuffer(error=True)
324 output = ''
324 output = ''
325 try:
325 try:
326 handler(op, part)
326 handler(op, part)
327 finally:
327 finally:
328 if output is not None:
328 if output is not None:
329 output = op.ui.popbuffer()
329 output = op.ui.popbuffer()
330 if output:
330 if output:
331 outpart = op.reply.newpart('b2x:output', data=output)
331 outpart = op.reply.newpart('b2x:output', data=output)
332 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
332 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
333 part.read()
333 part.read()
334 except Exception, exc:
334 except Exception, exc:
335 if part is not None:
335 if part is not None:
336 # consume the bundle content
336 # consume the bundle content
337 part.read()
337 part.read()
338 for part in iterparts:
338 for part in iterparts:
339 # consume the bundle content
339 # consume the bundle content
340 part.read()
340 part.read()
341 # Small hack to let caller code distinguish exceptions from bundle2
341 # Small hack to let caller code distinguish exceptions from bundle2
342 # processing fron the ones from bundle1 processing. This is mostly
342 # processing fron the ones from bundle1 processing. This is mostly
343 # needed to handle different return codes to unbundle according to the
343 # needed to handle different return codes to unbundle according to the
344 # type of bundle. We should probably clean up or drop this return code
344 # type of bundle. We should probably clean up or drop this return code
345 # craziness in a future version.
345 # craziness in a future version.
346 exc.duringunbundle2 = True
346 exc.duringunbundle2 = True
347 raise
347 raise
348 return op
348 return op
349
349
350 def decodecaps(blob):
350 def decodecaps(blob):
351 """decode a bundle2 caps bytes blob into a dictionnary
351 """decode a bundle2 caps bytes blob into a dictionnary
352
352
353 The blob is a list of capabilities (one per line)
353 The blob is a list of capabilities (one per line)
354 Capabilities may have values using a line of the form::
354 Capabilities may have values using a line of the form::
355
355
356 capability=value1,value2,value3
356 capability=value1,value2,value3
357
357
358 The values are always a list."""
358 The values are always a list."""
359 caps = {}
359 caps = {}
360 for line in blob.splitlines():
360 for line in blob.splitlines():
361 if not line:
361 if not line:
362 continue
362 continue
363 if '=' not in line:
363 if '=' not in line:
364 key, vals = line, ()
364 key, vals = line, ()
365 else:
365 else:
366 key, vals = line.split('=', 1)
366 key, vals = line.split('=', 1)
367 vals = vals.split(',')
367 vals = vals.split(',')
368 key = urllib.unquote(key)
368 key = urllib.unquote(key)
369 vals = [urllib.unquote(v) for v in vals]
369 vals = [urllib.unquote(v) for v in vals]
370 caps[key] = vals
370 caps[key] = vals
371 return caps
371 return caps
372
372
373 def encodecaps(caps):
373 def encodecaps(caps):
374 """encode a bundle2 caps dictionary into a bytes blob"""
374 """encode a bundle2 caps dictionary into a bytes blob"""
375 chunks = []
375 chunks = []
376 for ca in sorted(caps):
376 for ca in sorted(caps):
377 vals = caps[ca]
377 vals = caps[ca]
378 ca = urllib.quote(ca)
378 ca = urllib.quote(ca)
379 vals = [urllib.quote(v) for v in vals]
379 vals = [urllib.quote(v) for v in vals]
380 if vals:
380 if vals:
381 ca = "%s=%s" % (ca, ','.join(vals))
381 ca = "%s=%s" % (ca, ','.join(vals))
382 chunks.append(ca)
382 chunks.append(ca)
383 return '\n'.join(chunks)
383 return '\n'.join(chunks)
384
384
385 class bundle20(object):
385 class bundle20(object):
386 """represent an outgoing bundle2 container
386 """represent an outgoing bundle2 container
387
387
388 Use the `addparam` method to add stream level parameter. and `newpart` to
388 Use the `addparam` method to add stream level parameter. and `newpart` to
389 populate it. Then call `getchunks` to retrieve all the binary chunks of
389 populate it. Then call `getchunks` to retrieve all the binary chunks of
390 data that compose the bundle2 container."""
390 data that compose the bundle2 container."""
391
391
392 def __init__(self, ui, capabilities=()):
392 def __init__(self, ui, capabilities=()):
393 self.ui = ui
393 self.ui = ui
394 self._params = []
394 self._params = []
395 self._parts = []
395 self._parts = []
396 self.capabilities = dict(capabilities)
396 self.capabilities = dict(capabilities)
397
397
398 @property
399 def nbparts(self):
400 """total number of parts added to the bundler"""
401 return len(self._parts)
402
398 # methods used to defines the bundle2 content
403 # methods used to defines the bundle2 content
399 def addparam(self, name, value=None):
404 def addparam(self, name, value=None):
400 """add a stream level parameter"""
405 """add a stream level parameter"""
401 if not name:
406 if not name:
402 raise ValueError('empty parameter name')
407 raise ValueError('empty parameter name')
403 if name[0] not in string.letters:
408 if name[0] not in string.letters:
404 raise ValueError('non letter first character: %r' % name)
409 raise ValueError('non letter first character: %r' % name)
405 self._params.append((name, value))
410 self._params.append((name, value))
406
411
407 def addpart(self, part):
412 def addpart(self, part):
408 """add a new part to the bundle2 container
413 """add a new part to the bundle2 container
409
414
410 Parts contains the actual applicative payload."""
415 Parts contains the actual applicative payload."""
411 assert part.id is None
416 assert part.id is None
412 part.id = len(self._parts) # very cheap counter
417 part.id = len(self._parts) # very cheap counter
413 self._parts.append(part)
418 self._parts.append(part)
414
419
415 def newpart(self, typeid, *args, **kwargs):
420 def newpart(self, typeid, *args, **kwargs):
416 """create a new part and add it to the containers
421 """create a new part and add it to the containers
417
422
418 As the part is directly added to the containers. For now, this means
423 As the part is directly added to the containers. For now, this means
419 that any failure to properly initialize the part after calling
424 that any failure to properly initialize the part after calling
420 ``newpart`` should result in a failure of the whole bundling process.
425 ``newpart`` should result in a failure of the whole bundling process.
421
426
422 You can still fall back to manually create and add if you need better
427 You can still fall back to manually create and add if you need better
423 control."""
428 control."""
424 part = bundlepart(typeid, *args, **kwargs)
429 part = bundlepart(typeid, *args, **kwargs)
425 self.addpart(part)
430 self.addpart(part)
426 return part
431 return part
427
432
428 # methods used to generate the bundle2 stream
433 # methods used to generate the bundle2 stream
429 def getchunks(self):
434 def getchunks(self):
430 self.ui.debug('start emission of %s stream\n' % _magicstring)
435 self.ui.debug('start emission of %s stream\n' % _magicstring)
431 yield _magicstring
436 yield _magicstring
432 param = self._paramchunk()
437 param = self._paramchunk()
433 self.ui.debug('bundle parameter: %s\n' % param)
438 self.ui.debug('bundle parameter: %s\n' % param)
434 yield _pack(_fstreamparamsize, len(param))
439 yield _pack(_fstreamparamsize, len(param))
435 if param:
440 if param:
436 yield param
441 yield param
437
442
438 self.ui.debug('start of parts\n')
443 self.ui.debug('start of parts\n')
439 for part in self._parts:
444 for part in self._parts:
440 self.ui.debug('bundle part: "%s"\n' % part.type)
445 self.ui.debug('bundle part: "%s"\n' % part.type)
441 for chunk in part.getchunks():
446 for chunk in part.getchunks():
442 yield chunk
447 yield chunk
443 self.ui.debug('end of bundle\n')
448 self.ui.debug('end of bundle\n')
444 yield '\0\0'
449 yield '\0\0'
445
450
446 def _paramchunk(self):
451 def _paramchunk(self):
447 """return a encoded version of all stream parameters"""
452 """return a encoded version of all stream parameters"""
448 blocks = []
453 blocks = []
449 for par, value in self._params:
454 for par, value in self._params:
450 par = urllib.quote(par)
455 par = urllib.quote(par)
451 if value is not None:
456 if value is not None:
452 value = urllib.quote(value)
457 value = urllib.quote(value)
453 par = '%s=%s' % (par, value)
458 par = '%s=%s' % (par, value)
454 blocks.append(par)
459 blocks.append(par)
455 return ' '.join(blocks)
460 return ' '.join(blocks)
456
461
457 class unpackermixin(object):
462 class unpackermixin(object):
458 """A mixin to extract bytes and struct data from a stream"""
463 """A mixin to extract bytes and struct data from a stream"""
459
464
460 def __init__(self, fp):
465 def __init__(self, fp):
461 self._fp = fp
466 self._fp = fp
462
467
463 def _unpack(self, format):
468 def _unpack(self, format):
464 """unpack this struct format from the stream"""
469 """unpack this struct format from the stream"""
465 data = self._readexact(struct.calcsize(format))
470 data = self._readexact(struct.calcsize(format))
466 return _unpack(format, data)
471 return _unpack(format, data)
467
472
468 def _readexact(self, size):
473 def _readexact(self, size):
469 """read exactly <size> bytes from the stream"""
474 """read exactly <size> bytes from the stream"""
470 return changegroup.readexactly(self._fp, size)
475 return changegroup.readexactly(self._fp, size)
471
476
472
477
473 class unbundle20(unpackermixin):
478 class unbundle20(unpackermixin):
474 """interpret a bundle2 stream
479 """interpret a bundle2 stream
475
480
476 This class is fed with a binary stream and yields parts through its
481 This class is fed with a binary stream and yields parts through its
477 `iterparts` methods."""
482 `iterparts` methods."""
478
483
479 def __init__(self, ui, fp, header=None):
484 def __init__(self, ui, fp, header=None):
480 """If header is specified, we do not read it out of the stream."""
485 """If header is specified, we do not read it out of the stream."""
481 self.ui = ui
486 self.ui = ui
482 super(unbundle20, self).__init__(fp)
487 super(unbundle20, self).__init__(fp)
483 if header is None:
488 if header is None:
484 header = self._readexact(4)
489 header = self._readexact(4)
485 magic, version = header[0:2], header[2:4]
490 magic, version = header[0:2], header[2:4]
486 if magic != 'HG':
491 if magic != 'HG':
487 raise util.Abort(_('not a Mercurial bundle'))
492 raise util.Abort(_('not a Mercurial bundle'))
488 if version != '2X':
493 if version != '2X':
489 raise util.Abort(_('unknown bundle version %s') % version)
494 raise util.Abort(_('unknown bundle version %s') % version)
490 self.ui.debug('start processing of %s stream\n' % header)
495 self.ui.debug('start processing of %s stream\n' % header)
491
496
492 @util.propertycache
497 @util.propertycache
493 def params(self):
498 def params(self):
494 """dictionary of stream level parameters"""
499 """dictionary of stream level parameters"""
495 self.ui.debug('reading bundle2 stream parameters\n')
500 self.ui.debug('reading bundle2 stream parameters\n')
496 params = {}
501 params = {}
497 paramssize = self._unpack(_fstreamparamsize)[0]
502 paramssize = self._unpack(_fstreamparamsize)[0]
498 if paramssize:
503 if paramssize:
499 for p in self._readexact(paramssize).split(' '):
504 for p in self._readexact(paramssize).split(' '):
500 p = p.split('=', 1)
505 p = p.split('=', 1)
501 p = [urllib.unquote(i) for i in p]
506 p = [urllib.unquote(i) for i in p]
502 if len(p) < 2:
507 if len(p) < 2:
503 p.append(None)
508 p.append(None)
504 self._processparam(*p)
509 self._processparam(*p)
505 params[p[0]] = p[1]
510 params[p[0]] = p[1]
506 return params
511 return params
507
512
508 def _processparam(self, name, value):
513 def _processparam(self, name, value):
509 """process a parameter, applying its effect if needed
514 """process a parameter, applying its effect if needed
510
515
511 Parameter starting with a lower case letter are advisory and will be
516 Parameter starting with a lower case letter are advisory and will be
512 ignored when unknown. Those starting with an upper case letter are
517 ignored when unknown. Those starting with an upper case letter are
513 mandatory and will this function will raise a KeyError when unknown.
518 mandatory and will this function will raise a KeyError when unknown.
514
519
515 Note: no option are currently supported. Any input will be either
520 Note: no option are currently supported. Any input will be either
516 ignored or failing.
521 ignored or failing.
517 """
522 """
518 if not name:
523 if not name:
519 raise ValueError('empty parameter name')
524 raise ValueError('empty parameter name')
520 if name[0] not in string.letters:
525 if name[0] not in string.letters:
521 raise ValueError('non letter first character: %r' % name)
526 raise ValueError('non letter first character: %r' % name)
522 # Some logic will be later added here to try to process the option for
527 # Some logic will be later added here to try to process the option for
523 # a dict of known parameter.
528 # a dict of known parameter.
524 if name[0].islower():
529 if name[0].islower():
525 self.ui.debug("ignoring unknown parameter %r\n" % name)
530 self.ui.debug("ignoring unknown parameter %r\n" % name)
526 else:
531 else:
527 raise error.BundleValueError(params=(name,))
532 raise error.BundleValueError(params=(name,))
528
533
529
534
530 def iterparts(self):
535 def iterparts(self):
531 """yield all parts contained in the stream"""
536 """yield all parts contained in the stream"""
532 # make sure param have been loaded
537 # make sure param have been loaded
533 self.params
538 self.params
534 self.ui.debug('start extraction of bundle2 parts\n')
539 self.ui.debug('start extraction of bundle2 parts\n')
535 headerblock = self._readpartheader()
540 headerblock = self._readpartheader()
536 while headerblock is not None:
541 while headerblock is not None:
537 part = unbundlepart(self.ui, headerblock, self._fp)
542 part = unbundlepart(self.ui, headerblock, self._fp)
538 yield part
543 yield part
539 headerblock = self._readpartheader()
544 headerblock = self._readpartheader()
540 self.ui.debug('end of bundle2 stream\n')
545 self.ui.debug('end of bundle2 stream\n')
541
546
542 def _readpartheader(self):
547 def _readpartheader(self):
543 """reads a part header size and return the bytes blob
548 """reads a part header size and return the bytes blob
544
549
545 returns None if empty"""
550 returns None if empty"""
546 headersize = self._unpack(_fpartheadersize)[0]
551 headersize = self._unpack(_fpartheadersize)[0]
547 self.ui.debug('part header size: %i\n' % headersize)
552 self.ui.debug('part header size: %i\n' % headersize)
548 if headersize:
553 if headersize:
549 return self._readexact(headersize)
554 return self._readexact(headersize)
550 return None
555 return None
551
556
552
557
553 class bundlepart(object):
558 class bundlepart(object):
554 """A bundle2 part contains application level payload
559 """A bundle2 part contains application level payload
555
560
556 The part `type` is used to route the part to the application level
561 The part `type` is used to route the part to the application level
557 handler.
562 handler.
558
563
559 The part payload is contained in ``part.data``. It could be raw bytes or a
564 The part payload is contained in ``part.data``. It could be raw bytes or a
560 generator of byte chunks.
565 generator of byte chunks.
561
566
562 You can add parameters to the part using the ``addparam`` method.
567 You can add parameters to the part using the ``addparam`` method.
563 Parameters can be either mandatory (default) or advisory. Remote side
568 Parameters can be either mandatory (default) or advisory. Remote side
564 should be able to safely ignore the advisory ones.
569 should be able to safely ignore the advisory ones.
565
570
566 Both data and parameters cannot be modified after the generation has begun.
571 Both data and parameters cannot be modified after the generation has begun.
567 """
572 """
568
573
569 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
574 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
570 data=''):
575 data=''):
571 self.id = None
576 self.id = None
572 self.type = parttype
577 self.type = parttype
573 self._data = data
578 self._data = data
574 self._mandatoryparams = list(mandatoryparams)
579 self._mandatoryparams = list(mandatoryparams)
575 self._advisoryparams = list(advisoryparams)
580 self._advisoryparams = list(advisoryparams)
576 # checking for duplicated entries
581 # checking for duplicated entries
577 self._seenparams = set()
582 self._seenparams = set()
578 for pname, __ in self._mandatoryparams + self._advisoryparams:
583 for pname, __ in self._mandatoryparams + self._advisoryparams:
579 if pname in self._seenparams:
584 if pname in self._seenparams:
580 raise RuntimeError('duplicated params: %s' % pname)
585 raise RuntimeError('duplicated params: %s' % pname)
581 self._seenparams.add(pname)
586 self._seenparams.add(pname)
582 # status of the part's generation:
587 # status of the part's generation:
583 # - None: not started,
588 # - None: not started,
584 # - False: currently generated,
589 # - False: currently generated,
585 # - True: generation done.
590 # - True: generation done.
586 self._generated = None
591 self._generated = None
587
592
588 # methods used to defines the part content
593 # methods used to defines the part content
589 def __setdata(self, data):
594 def __setdata(self, data):
590 if self._generated is not None:
595 if self._generated is not None:
591 raise error.ReadOnlyPartError('part is being generated')
596 raise error.ReadOnlyPartError('part is being generated')
592 self._data = data
597 self._data = data
593 def __getdata(self):
598 def __getdata(self):
594 return self._data
599 return self._data
595 data = property(__getdata, __setdata)
600 data = property(__getdata, __setdata)
596
601
597 @property
602 @property
598 def mandatoryparams(self):
603 def mandatoryparams(self):
599 # make it an immutable tuple to force people through ``addparam``
604 # make it an immutable tuple to force people through ``addparam``
600 return tuple(self._mandatoryparams)
605 return tuple(self._mandatoryparams)
601
606
602 @property
607 @property
603 def advisoryparams(self):
608 def advisoryparams(self):
604 # make it an immutable tuple to force people through ``addparam``
609 # make it an immutable tuple to force people through ``addparam``
605 return tuple(self._advisoryparams)
610 return tuple(self._advisoryparams)
606
611
607 def addparam(self, name, value='', mandatory=True):
612 def addparam(self, name, value='', mandatory=True):
608 if self._generated is not None:
613 if self._generated is not None:
609 raise error.ReadOnlyPartError('part is being generated')
614 raise error.ReadOnlyPartError('part is being generated')
610 if name in self._seenparams:
615 if name in self._seenparams:
611 raise ValueError('duplicated params: %s' % name)
616 raise ValueError('duplicated params: %s' % name)
612 self._seenparams.add(name)
617 self._seenparams.add(name)
613 params = self._advisoryparams
618 params = self._advisoryparams
614 if mandatory:
619 if mandatory:
615 params = self._mandatoryparams
620 params = self._mandatoryparams
616 params.append((name, value))
621 params.append((name, value))
617
622
618 # methods used to generates the bundle2 stream
623 # methods used to generates the bundle2 stream
619 def getchunks(self):
624 def getchunks(self):
620 if self._generated is not None:
625 if self._generated is not None:
621 raise RuntimeError('part can only be consumed once')
626 raise RuntimeError('part can only be consumed once')
622 self._generated = False
627 self._generated = False
623 #### header
628 #### header
624 ## parttype
629 ## parttype
625 header = [_pack(_fparttypesize, len(self.type)),
630 header = [_pack(_fparttypesize, len(self.type)),
626 self.type, _pack(_fpartid, self.id),
631 self.type, _pack(_fpartid, self.id),
627 ]
632 ]
628 ## parameters
633 ## parameters
629 # count
634 # count
630 manpar = self.mandatoryparams
635 manpar = self.mandatoryparams
631 advpar = self.advisoryparams
636 advpar = self.advisoryparams
632 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
637 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
633 # size
638 # size
634 parsizes = []
639 parsizes = []
635 for key, value in manpar:
640 for key, value in manpar:
636 parsizes.append(len(key))
641 parsizes.append(len(key))
637 parsizes.append(len(value))
642 parsizes.append(len(value))
638 for key, value in advpar:
643 for key, value in advpar:
639 parsizes.append(len(key))
644 parsizes.append(len(key))
640 parsizes.append(len(value))
645 parsizes.append(len(value))
641 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
646 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
642 header.append(paramsizes)
647 header.append(paramsizes)
643 # key, value
648 # key, value
644 for key, value in manpar:
649 for key, value in manpar:
645 header.append(key)
650 header.append(key)
646 header.append(value)
651 header.append(value)
647 for key, value in advpar:
652 for key, value in advpar:
648 header.append(key)
653 header.append(key)
649 header.append(value)
654 header.append(value)
650 ## finalize header
655 ## finalize header
651 headerchunk = ''.join(header)
656 headerchunk = ''.join(header)
652 yield _pack(_fpartheadersize, len(headerchunk))
657 yield _pack(_fpartheadersize, len(headerchunk))
653 yield headerchunk
658 yield headerchunk
654 ## payload
659 ## payload
655 for chunk in self._payloadchunks():
660 for chunk in self._payloadchunks():
656 yield _pack(_fpayloadsize, len(chunk))
661 yield _pack(_fpayloadsize, len(chunk))
657 yield chunk
662 yield chunk
658 # end of payload
663 # end of payload
659 yield _pack(_fpayloadsize, 0)
664 yield _pack(_fpayloadsize, 0)
660 self._generated = True
665 self._generated = True
661
666
662 def _payloadchunks(self):
667 def _payloadchunks(self):
663 """yield chunks of a the part payload
668 """yield chunks of a the part payload
664
669
665 Exists to handle the different methods to provide data to a part."""
670 Exists to handle the different methods to provide data to a part."""
666 # we only support fixed size data now.
671 # we only support fixed size data now.
667 # This will be improved in the future.
672 # This will be improved in the future.
668 if util.safehasattr(self.data, 'next'):
673 if util.safehasattr(self.data, 'next'):
669 buff = util.chunkbuffer(self.data)
674 buff = util.chunkbuffer(self.data)
670 chunk = buff.read(preferedchunksize)
675 chunk = buff.read(preferedchunksize)
671 while chunk:
676 while chunk:
672 yield chunk
677 yield chunk
673 chunk = buff.read(preferedchunksize)
678 chunk = buff.read(preferedchunksize)
674 elif len(self.data):
679 elif len(self.data):
675 yield self.data
680 yield self.data
676
681
677 class unbundlepart(unpackermixin):
682 class unbundlepart(unpackermixin):
678 """a bundle part read from a bundle"""
683 """a bundle part read from a bundle"""
679
684
680 def __init__(self, ui, header, fp):
685 def __init__(self, ui, header, fp):
681 super(unbundlepart, self).__init__(fp)
686 super(unbundlepart, self).__init__(fp)
682 self.ui = ui
687 self.ui = ui
683 # unbundle state attr
688 # unbundle state attr
684 self._headerdata = header
689 self._headerdata = header
685 self._headeroffset = 0
690 self._headeroffset = 0
686 self._initialized = False
691 self._initialized = False
687 self.consumed = False
692 self.consumed = False
688 # part data
693 # part data
689 self.id = None
694 self.id = None
690 self.type = None
695 self.type = None
691 self.mandatoryparams = None
696 self.mandatoryparams = None
692 self.advisoryparams = None
697 self.advisoryparams = None
693 self.params = None
698 self.params = None
694 self.mandatorykeys = ()
699 self.mandatorykeys = ()
695 self._payloadstream = None
700 self._payloadstream = None
696 self._readheader()
701 self._readheader()
697
702
698 def _fromheader(self, size):
703 def _fromheader(self, size):
699 """return the next <size> byte from the header"""
704 """return the next <size> byte from the header"""
700 offset = self._headeroffset
705 offset = self._headeroffset
701 data = self._headerdata[offset:(offset + size)]
706 data = self._headerdata[offset:(offset + size)]
702 self._headeroffset = offset + size
707 self._headeroffset = offset + size
703 return data
708 return data
704
709
705 def _unpackheader(self, format):
710 def _unpackheader(self, format):
706 """read given format from header
711 """read given format from header
707
712
708 This automatically compute the size of the format to read."""
713 This automatically compute the size of the format to read."""
709 data = self._fromheader(struct.calcsize(format))
714 data = self._fromheader(struct.calcsize(format))
710 return _unpack(format, data)
715 return _unpack(format, data)
711
716
712 def _initparams(self, mandatoryparams, advisoryparams):
717 def _initparams(self, mandatoryparams, advisoryparams):
713 """internal function to setup all logic related parameters"""
718 """internal function to setup all logic related parameters"""
714 # make it read only to prevent people touching it by mistake.
719 # make it read only to prevent people touching it by mistake.
715 self.mandatoryparams = tuple(mandatoryparams)
720 self.mandatoryparams = tuple(mandatoryparams)
716 self.advisoryparams = tuple(advisoryparams)
721 self.advisoryparams = tuple(advisoryparams)
717 # user friendly UI
722 # user friendly UI
718 self.params = dict(self.mandatoryparams)
723 self.params = dict(self.mandatoryparams)
719 self.params.update(dict(self.advisoryparams))
724 self.params.update(dict(self.advisoryparams))
720 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
725 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
721
726
722 def _readheader(self):
727 def _readheader(self):
723 """read the header and setup the object"""
728 """read the header and setup the object"""
724 typesize = self._unpackheader(_fparttypesize)[0]
729 typesize = self._unpackheader(_fparttypesize)[0]
725 self.type = self._fromheader(typesize)
730 self.type = self._fromheader(typesize)
726 self.ui.debug('part type: "%s"\n' % self.type)
731 self.ui.debug('part type: "%s"\n' % self.type)
727 self.id = self._unpackheader(_fpartid)[0]
732 self.id = self._unpackheader(_fpartid)[0]
728 self.ui.debug('part id: "%s"\n' % self.id)
733 self.ui.debug('part id: "%s"\n' % self.id)
729 ## reading parameters
734 ## reading parameters
730 # param count
735 # param count
731 mancount, advcount = self._unpackheader(_fpartparamcount)
736 mancount, advcount = self._unpackheader(_fpartparamcount)
732 self.ui.debug('part parameters: %i\n' % (mancount + advcount))
737 self.ui.debug('part parameters: %i\n' % (mancount + advcount))
733 # param size
738 # param size
734 fparamsizes = _makefpartparamsizes(mancount + advcount)
739 fparamsizes = _makefpartparamsizes(mancount + advcount)
735 paramsizes = self._unpackheader(fparamsizes)
740 paramsizes = self._unpackheader(fparamsizes)
736 # make it a list of couple again
741 # make it a list of couple again
737 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
742 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
738 # split mandatory from advisory
743 # split mandatory from advisory
739 mansizes = paramsizes[:mancount]
744 mansizes = paramsizes[:mancount]
740 advsizes = paramsizes[mancount:]
745 advsizes = paramsizes[mancount:]
741 # retrive param value
746 # retrive param value
742 manparams = []
747 manparams = []
743 for key, value in mansizes:
748 for key, value in mansizes:
744 manparams.append((self._fromheader(key), self._fromheader(value)))
749 manparams.append((self._fromheader(key), self._fromheader(value)))
745 advparams = []
750 advparams = []
746 for key, value in advsizes:
751 for key, value in advsizes:
747 advparams.append((self._fromheader(key), self._fromheader(value)))
752 advparams.append((self._fromheader(key), self._fromheader(value)))
748 self._initparams(manparams, advparams)
753 self._initparams(manparams, advparams)
749 ## part payload
754 ## part payload
750 def payloadchunks():
755 def payloadchunks():
751 payloadsize = self._unpack(_fpayloadsize)[0]
756 payloadsize = self._unpack(_fpayloadsize)[0]
752 self.ui.debug('payload chunk size: %i\n' % payloadsize)
757 self.ui.debug('payload chunk size: %i\n' % payloadsize)
753 while payloadsize:
758 while payloadsize:
754 yield self._readexact(payloadsize)
759 yield self._readexact(payloadsize)
755 payloadsize = self._unpack(_fpayloadsize)[0]
760 payloadsize = self._unpack(_fpayloadsize)[0]
756 self.ui.debug('payload chunk size: %i\n' % payloadsize)
761 self.ui.debug('payload chunk size: %i\n' % payloadsize)
757 self._payloadstream = util.chunkbuffer(payloadchunks())
762 self._payloadstream = util.chunkbuffer(payloadchunks())
758 # we read the data, tell it
763 # we read the data, tell it
759 self._initialized = True
764 self._initialized = True
760
765
761 def read(self, size=None):
766 def read(self, size=None):
762 """read payload data"""
767 """read payload data"""
763 if not self._initialized:
768 if not self._initialized:
764 self._readheader()
769 self._readheader()
765 if size is None:
770 if size is None:
766 data = self._payloadstream.read()
771 data = self._payloadstream.read()
767 else:
772 else:
768 data = self._payloadstream.read(size)
773 data = self._payloadstream.read(size)
769 if size is None or len(data) < size:
774 if size is None or len(data) < size:
770 self.consumed = True
775 self.consumed = True
771 return data
776 return data
772
777
773 def bundle2caps(remote):
778 def bundle2caps(remote):
774 """return the bundlecapabilities of a peer as dict"""
779 """return the bundlecapabilities of a peer as dict"""
775 raw = remote.capable('bundle2-exp')
780 raw = remote.capable('bundle2-exp')
776 if not raw and raw != '':
781 if not raw and raw != '':
777 return {}
782 return {}
778 capsblob = urllib.unquote(remote.capable('bundle2-exp'))
783 capsblob = urllib.unquote(remote.capable('bundle2-exp'))
779 return decodecaps(capsblob)
784 return decodecaps(capsblob)
780
785
781 @parthandler('b2x:changegroup')
786 @parthandler('b2x:changegroup')
782 def handlechangegroup(op, inpart):
787 def handlechangegroup(op, inpart):
783 """apply a changegroup part on the repo
788 """apply a changegroup part on the repo
784
789
785 This is a very early implementation that will massive rework before being
790 This is a very early implementation that will massive rework before being
786 inflicted to any end-user.
791 inflicted to any end-user.
787 """
792 """
788 # Make sure we trigger a transaction creation
793 # Make sure we trigger a transaction creation
789 #
794 #
790 # The addchangegroup function will get a transaction object by itself, but
795 # The addchangegroup function will get a transaction object by itself, but
791 # we need to make sure we trigger the creation of a transaction object used
796 # we need to make sure we trigger the creation of a transaction object used
792 # for the whole processing scope.
797 # for the whole processing scope.
793 op.gettransaction()
798 op.gettransaction()
794 cg = changegroup.unbundle10(inpart, 'UN')
799 cg = changegroup.unbundle10(inpart, 'UN')
795 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
800 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
796 op.records.add('changegroup', {'return': ret})
801 op.records.add('changegroup', {'return': ret})
797 if op.reply is not None:
802 if op.reply is not None:
798 # This is definitly not the final form of this
803 # This is definitly not the final form of this
799 # return. But one need to start somewhere.
804 # return. But one need to start somewhere.
800 part = op.reply.newpart('b2x:reply:changegroup')
805 part = op.reply.newpart('b2x:reply:changegroup')
801 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
806 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
802 part.addparam('return', '%i' % ret, mandatory=False)
807 part.addparam('return', '%i' % ret, mandatory=False)
803 assert not inpart.read()
808 assert not inpart.read()
804
809
805 @parthandler('b2x:reply:changegroup', ('return', 'in-reply-to'))
810 @parthandler('b2x:reply:changegroup', ('return', 'in-reply-to'))
806 def handlechangegroup(op, inpart):
811 def handlechangegroup(op, inpart):
807 ret = int(inpart.params['return'])
812 ret = int(inpart.params['return'])
808 replyto = int(inpart.params['in-reply-to'])
813 replyto = int(inpart.params['in-reply-to'])
809 op.records.add('changegroup', {'return': ret}, replyto)
814 op.records.add('changegroup', {'return': ret}, replyto)
810
815
811 @parthandler('b2x:check:heads')
816 @parthandler('b2x:check:heads')
812 def handlechangegroup(op, inpart):
817 def handlechangegroup(op, inpart):
813 """check that head of the repo did not change
818 """check that head of the repo did not change
814
819
815 This is used to detect a push race when using unbundle.
820 This is used to detect a push race when using unbundle.
816 This replaces the "heads" argument of unbundle."""
821 This replaces the "heads" argument of unbundle."""
817 h = inpart.read(20)
822 h = inpart.read(20)
818 heads = []
823 heads = []
819 while len(h) == 20:
824 while len(h) == 20:
820 heads.append(h)
825 heads.append(h)
821 h = inpart.read(20)
826 h = inpart.read(20)
822 assert not h
827 assert not h
823 if heads != op.repo.heads():
828 if heads != op.repo.heads():
824 raise error.PushRaced('repository changed while pushing - '
829 raise error.PushRaced('repository changed while pushing - '
825 'please try again')
830 'please try again')
826
831
827 @parthandler('b2x:output')
832 @parthandler('b2x:output')
828 def handleoutput(op, inpart):
833 def handleoutput(op, inpart):
829 """forward output captured on the server to the client"""
834 """forward output captured on the server to the client"""
830 for line in inpart.read().splitlines():
835 for line in inpart.read().splitlines():
831 op.ui.write(('remote: %s\n' % line))
836 op.ui.write(('remote: %s\n' % line))
832
837
833 @parthandler('b2x:replycaps')
838 @parthandler('b2x:replycaps')
834 def handlereplycaps(op, inpart):
839 def handlereplycaps(op, inpart):
835 """Notify that a reply bundle should be created
840 """Notify that a reply bundle should be created
836
841
837 The payload contains the capabilities information for the reply"""
842 The payload contains the capabilities information for the reply"""
838 caps = decodecaps(inpart.read())
843 caps = decodecaps(inpart.read())
839 if op.reply is None:
844 if op.reply is None:
840 op.reply = bundle20(op.ui, caps)
845 op.reply = bundle20(op.ui, caps)
841
846
842 @parthandler('b2x:error:abort', ('message', 'hint'))
847 @parthandler('b2x:error:abort', ('message', 'hint'))
843 def handlereplycaps(op, inpart):
848 def handlereplycaps(op, inpart):
844 """Used to transmit abort error over the wire"""
849 """Used to transmit abort error over the wire"""
845 raise util.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
850 raise util.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
846
851
847 @parthandler('b2x:error:unsupportedcontent', ('parttype', 'params'))
852 @parthandler('b2x:error:unsupportedcontent', ('parttype', 'params'))
848 def handlereplycaps(op, inpart):
853 def handlereplycaps(op, inpart):
849 """Used to transmit unknown content error over the wire"""
854 """Used to transmit unknown content error over the wire"""
850 kwargs = {}
855 kwargs = {}
851 parttype = inpart.params.get('parttype')
856 parttype = inpart.params.get('parttype')
852 if parttype is not None:
857 if parttype is not None:
853 kwargs['parttype'] = parttype
858 kwargs['parttype'] = parttype
854 params = inpart.params.get('params')
859 params = inpart.params.get('params')
855 if params is not None:
860 if params is not None:
856 kwargs['params'] = params.split('\0')
861 kwargs['params'] = params.split('\0')
857
862
858 raise error.BundleValueError(**kwargs)
863 raise error.BundleValueError(**kwargs)
859
864
860 @parthandler('b2x:error:pushraced', ('message',))
865 @parthandler('b2x:error:pushraced', ('message',))
861 def handlereplycaps(op, inpart):
866 def handlereplycaps(op, inpart):
862 """Used to transmit push race error over the wire"""
867 """Used to transmit push race error over the wire"""
863 raise error.ResponseError(_('push failed:'), inpart.params['message'])
868 raise error.ResponseError(_('push failed:'), inpart.params['message'])
864
869
865 @parthandler('b2x:listkeys', ('namespace',))
870 @parthandler('b2x:listkeys', ('namespace',))
866 def handlelistkeys(op, inpart):
871 def handlelistkeys(op, inpart):
867 """retrieve pushkey namespace content stored in a bundle2"""
872 """retrieve pushkey namespace content stored in a bundle2"""
868 namespace = inpart.params['namespace']
873 namespace = inpart.params['namespace']
869 r = pushkey.decodekeys(inpart.read())
874 r = pushkey.decodekeys(inpart.read())
870 op.records.add('listkeys', (namespace, r))
875 op.records.add('listkeys', (namespace, r))
871
876
872 @parthandler('b2x:pushkey', ('namespace', 'key', 'old', 'new'))
877 @parthandler('b2x:pushkey', ('namespace', 'key', 'old', 'new'))
873 def handlepushkey(op, inpart):
878 def handlepushkey(op, inpart):
874 """process a pushkey request"""
879 """process a pushkey request"""
875 dec = pushkey.decode
880 dec = pushkey.decode
876 namespace = dec(inpart.params['namespace'])
881 namespace = dec(inpart.params['namespace'])
877 key = dec(inpart.params['key'])
882 key = dec(inpart.params['key'])
878 old = dec(inpart.params['old'])
883 old = dec(inpart.params['old'])
879 new = dec(inpart.params['new'])
884 new = dec(inpart.params['new'])
880 ret = op.repo.pushkey(namespace, key, old, new)
885 ret = op.repo.pushkey(namespace, key, old, new)
881 record = {'namespace': namespace,
886 record = {'namespace': namespace,
882 'key': key,
887 'key': key,
883 'old': old,
888 'old': old,
884 'new': new}
889 'new': new}
885 op.records.add('pushkey', record)
890 op.records.add('pushkey', record)
886 if op.reply is not None:
891 if op.reply is not None:
887 rpart = op.reply.newpart('b2x:reply:pushkey')
892 rpart = op.reply.newpart('b2x:reply:pushkey')
888 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
893 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
889 rpart.addparam('return', '%i' % ret, mandatory=False)
894 rpart.addparam('return', '%i' % ret, mandatory=False)
890
895
891 @parthandler('b2x:reply:pushkey', ('return', 'in-reply-to'))
896 @parthandler('b2x:reply:pushkey', ('return', 'in-reply-to'))
892 def handlepushkeyreply(op, inpart):
897 def handlepushkeyreply(op, inpart):
893 """retrieve the result of a pushkey request"""
898 """retrieve the result of a pushkey request"""
894 ret = int(inpart.params['return'])
899 ret = int(inpart.params['return'])
895 partid = int(inpart.params['in-reply-to'])
900 partid = int(inpart.params['in-reply-to'])
896 op.records.add('pushkey', {'return': ret}, partid)
901 op.records.add('pushkey', {'return': ret}, partid)
General Comments 0
You need to be logged in to leave comments. Login now