##// END OF EJS Templates
bundle: introduce a listkey handler...
Pierre-Yves David -
r21655:35095f33 default
parent child Browse files
Show More
@@ -1,862 +1,870 b''
1 # bundle2.py - generic container format to transmit arbitrary data.
1 # bundle2.py - generic container format to transmit arbitrary data.
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7 """Handling of the new bundle2 format
7 """Handling of the new bundle2 format
8
8
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 payloads in an application agnostic way. It consist in a sequence of "parts"
10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 that will be handed to and processed by the application layer.
11 that will be handed to and processed by the application layer.
12
12
13
13
14 General format architecture
14 General format architecture
15 ===========================
15 ===========================
16
16
17 The format is architectured as follow
17 The format is architectured as follow
18
18
19 - magic string
19 - magic string
20 - stream level parameters
20 - stream level parameters
21 - payload parts (any number)
21 - payload parts (any number)
22 - end of stream marker.
22 - end of stream marker.
23
23
24 the Binary format
24 the Binary format
25 ============================
25 ============================
26
26
27 All numbers are unsigned and big-endian.
27 All numbers are unsigned and big-endian.
28
28
29 stream level parameters
29 stream level parameters
30 ------------------------
30 ------------------------
31
31
32 Binary format is as follow
32 Binary format is as follow
33
33
34 :params size: (16 bits integer)
34 :params size: (16 bits integer)
35
35
36 The total number of Bytes used by the parameters
36 The total number of Bytes used by the parameters
37
37
38 :params value: arbitrary number of Bytes
38 :params value: arbitrary number of Bytes
39
39
40 A blob of `params size` containing the serialized version of all stream level
40 A blob of `params size` containing the serialized version of all stream level
41 parameters.
41 parameters.
42
42
43 The blob contains a space separated list of parameters. Parameters with value
43 The blob contains a space separated list of parameters. Parameters with value
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45
45
46 Empty name are obviously forbidden.
46 Empty name are obviously forbidden.
47
47
48 Name MUST start with a letter. If this first letter is lower case, the
48 Name MUST start with a letter. If this first letter is lower case, the
49 parameter is advisory and can be safely ignored. However when the first
49 parameter is advisory and can be safely ignored. However when the first
50 letter is capital, the parameter is mandatory and the bundling process MUST
50 letter is capital, the parameter is mandatory and the bundling process MUST
51 stop if he is not able to proceed it.
51 stop if he is not able to proceed it.
52
52
53 Stream parameters use a simple textual format for two main reasons:
53 Stream parameters use a simple textual format for two main reasons:
54
54
55 - Stream level parameters should remain simple and we want to discourage any
55 - Stream level parameters should remain simple and we want to discourage any
56 crazy usage.
56 crazy usage.
57 - Textual data allow easy human inspection of a bundle2 header in case of
57 - Textual data allow easy human inspection of a bundle2 header in case of
58 troubles.
58 troubles.
59
59
60 Any Applicative level options MUST go into a bundle2 part instead.
60 Any Applicative level options MUST go into a bundle2 part instead.
61
61
62 Payload part
62 Payload part
63 ------------------------
63 ------------------------
64
64
65 Binary format is as follow
65 Binary format is as follow
66
66
67 :header size: (16 bits inter)
67 :header size: (16 bits inter)
68
68
69 The total number of Bytes used by the part headers. When the header is empty
69 The total number of Bytes used by the part headers. When the header is empty
70 (size = 0) this is interpreted as the end of stream marker.
70 (size = 0) this is interpreted as the end of stream marker.
71
71
72 :header:
72 :header:
73
73
74 The header defines how to interpret the part. It contains two piece of
74 The header defines how to interpret the part. It contains two piece of
75 data: the part type, and the part parameters.
75 data: the part type, and the part parameters.
76
76
77 The part type is used to route an application level handler, that can
77 The part type is used to route an application level handler, that can
78 interpret payload.
78 interpret payload.
79
79
80 Part parameters are passed to the application level handler. They are
80 Part parameters are passed to the application level handler. They are
81 meant to convey information that will help the application level object to
81 meant to convey information that will help the application level object to
82 interpret the part payload.
82 interpret the part payload.
83
83
84 The binary format of the header is has follow
84 The binary format of the header is has follow
85
85
86 :typesize: (one byte)
86 :typesize: (one byte)
87
87
88 :parttype: alphanumerical part name
88 :parttype: alphanumerical part name
89
89
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 to this part.
91 to this part.
92
92
93 :parameters:
93 :parameters:
94
94
95 Part's parameter may have arbitrary content, the binary structure is::
95 Part's parameter may have arbitrary content, the binary structure is::
96
96
97 <mandatory-count><advisory-count><param-sizes><param-data>
97 <mandatory-count><advisory-count><param-sizes><param-data>
98
98
99 :mandatory-count: 1 byte, number of mandatory parameters
99 :mandatory-count: 1 byte, number of mandatory parameters
100
100
101 :advisory-count: 1 byte, number of advisory parameters
101 :advisory-count: 1 byte, number of advisory parameters
102
102
103 :param-sizes:
103 :param-sizes:
104
104
105 N couple of bytes, where N is the total number of parameters. Each
105 N couple of bytes, where N is the total number of parameters. Each
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107
107
108 :param-data:
108 :param-data:
109
109
110 A blob of bytes from which each parameter key and value can be
110 A blob of bytes from which each parameter key and value can be
111 retrieved using the list of size couples stored in the previous
111 retrieved using the list of size couples stored in the previous
112 field.
112 field.
113
113
114 Mandatory parameters comes first, then the advisory ones.
114 Mandatory parameters comes first, then the advisory ones.
115
115
116 Each parameter's key MUST be unique within the part.
116 Each parameter's key MUST be unique within the part.
117
117
118 :payload:
118 :payload:
119
119
120 payload is a series of `<chunksize><chunkdata>`.
120 payload is a series of `<chunksize><chunkdata>`.
121
121
122 `chunksize` is a 32 bits integer, `chunkdata` are plain bytes (as much as
122 `chunksize` is a 32 bits integer, `chunkdata` are plain bytes (as much as
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124
124
125 The current implementation always produces either zero or one chunk.
125 The current implementation always produces either zero or one chunk.
126 This is an implementation limitation that will ultimately be lifted.
126 This is an implementation limitation that will ultimately be lifted.
127
127
128 Bundle processing
128 Bundle processing
129 ============================
129 ============================
130
130
131 Each part is processed in order using a "part handler". Handler are registered
131 Each part is processed in order using a "part handler". Handler are registered
132 for a certain part type.
132 for a certain part type.
133
133
134 The matching of a part to its handler is case insensitive. The case of the
134 The matching of a part to its handler is case insensitive. The case of the
135 part type is used to know if a part is mandatory or advisory. If the Part type
135 part type is used to know if a part is mandatory or advisory. If the Part type
136 contains any uppercase char it is considered mandatory. When no handler is
136 contains any uppercase char it is considered mandatory. When no handler is
137 known for a Mandatory part, the process is aborted and an exception is raised.
137 known for a Mandatory part, the process is aborted and an exception is raised.
138 If the part is advisory and no handler is known, the part is ignored. When the
138 If the part is advisory and no handler is known, the part is ignored. When the
139 process is aborted, the full bundle is still read from the stream to keep the
139 process is aborted, the full bundle is still read from the stream to keep the
140 channel usable. But none of the part read from an abort are processed. In the
140 channel usable. But none of the part read from an abort are processed. In the
141 future, dropping the stream may become an option for channel we do not care to
141 future, dropping the stream may become an option for channel we do not care to
142 preserve.
142 preserve.
143 """
143 """
144
144
145 import util
145 import util
146 import struct
146 import struct
147 import urllib
147 import urllib
148 import string
148 import string
149 import pushkey
149
150
150 import changegroup, error
151 import changegroup, error
151 from i18n import _
152 from i18n import _
152
153
153 _pack = struct.pack
154 _pack = struct.pack
154 _unpack = struct.unpack
155 _unpack = struct.unpack
155
156
156 _magicstring = 'HG2X'
157 _magicstring = 'HG2X'
157
158
158 _fstreamparamsize = '>H'
159 _fstreamparamsize = '>H'
159 _fpartheadersize = '>H'
160 _fpartheadersize = '>H'
160 _fparttypesize = '>B'
161 _fparttypesize = '>B'
161 _fpartid = '>I'
162 _fpartid = '>I'
162 _fpayloadsize = '>I'
163 _fpayloadsize = '>I'
163 _fpartparamcount = '>BB'
164 _fpartparamcount = '>BB'
164
165
165 preferedchunksize = 4096
166 preferedchunksize = 4096
166
167
167 def _makefpartparamsizes(nbparams):
168 def _makefpartparamsizes(nbparams):
168 """return a struct format to read part parameter sizes
169 """return a struct format to read part parameter sizes
169
170
170 The number parameters is variable so we need to build that format
171 The number parameters is variable so we need to build that format
171 dynamically.
172 dynamically.
172 """
173 """
173 return '>'+('BB'*nbparams)
174 return '>'+('BB'*nbparams)
174
175
175 parthandlermapping = {}
176 parthandlermapping = {}
176
177
177 def parthandler(parttype, params=()):
178 def parthandler(parttype, params=()):
178 """decorator that register a function as a bundle2 part handler
179 """decorator that register a function as a bundle2 part handler
179
180
180 eg::
181 eg::
181
182
182 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
183 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
183 def myparttypehandler(...):
184 def myparttypehandler(...):
184 '''process a part of type "my part".'''
185 '''process a part of type "my part".'''
185 ...
186 ...
186 """
187 """
187 def _decorator(func):
188 def _decorator(func):
188 lparttype = parttype.lower() # enforce lower case matching.
189 lparttype = parttype.lower() # enforce lower case matching.
189 assert lparttype not in parthandlermapping
190 assert lparttype not in parthandlermapping
190 parthandlermapping[lparttype] = func
191 parthandlermapping[lparttype] = func
191 func.params = frozenset(params)
192 func.params = frozenset(params)
192 return func
193 return func
193 return _decorator
194 return _decorator
194
195
195 class unbundlerecords(object):
196 class unbundlerecords(object):
196 """keep record of what happens during and unbundle
197 """keep record of what happens during and unbundle
197
198
198 New records are added using `records.add('cat', obj)`. Where 'cat' is a
199 New records are added using `records.add('cat', obj)`. Where 'cat' is a
199 category of record and obj is an arbitrary object.
200 category of record and obj is an arbitrary object.
200
201
201 `records['cat']` will return all entries of this category 'cat'.
202 `records['cat']` will return all entries of this category 'cat'.
202
203
203 Iterating on the object itself will yield `('category', obj)` tuples
204 Iterating on the object itself will yield `('category', obj)` tuples
204 for all entries.
205 for all entries.
205
206
206 All iterations happens in chronological order.
207 All iterations happens in chronological order.
207 """
208 """
208
209
209 def __init__(self):
210 def __init__(self):
210 self._categories = {}
211 self._categories = {}
211 self._sequences = []
212 self._sequences = []
212 self._replies = {}
213 self._replies = {}
213
214
214 def add(self, category, entry, inreplyto=None):
215 def add(self, category, entry, inreplyto=None):
215 """add a new record of a given category.
216 """add a new record of a given category.
216
217
217 The entry can then be retrieved in the list returned by
218 The entry can then be retrieved in the list returned by
218 self['category']."""
219 self['category']."""
219 self._categories.setdefault(category, []).append(entry)
220 self._categories.setdefault(category, []).append(entry)
220 self._sequences.append((category, entry))
221 self._sequences.append((category, entry))
221 if inreplyto is not None:
222 if inreplyto is not None:
222 self.getreplies(inreplyto).add(category, entry)
223 self.getreplies(inreplyto).add(category, entry)
223
224
224 def getreplies(self, partid):
225 def getreplies(self, partid):
225 """get the subrecords that replies to a specific part"""
226 """get the subrecords that replies to a specific part"""
226 return self._replies.setdefault(partid, unbundlerecords())
227 return self._replies.setdefault(partid, unbundlerecords())
227
228
228 def __getitem__(self, cat):
229 def __getitem__(self, cat):
229 return tuple(self._categories.get(cat, ()))
230 return tuple(self._categories.get(cat, ()))
230
231
231 def __iter__(self):
232 def __iter__(self):
232 return iter(self._sequences)
233 return iter(self._sequences)
233
234
234 def __len__(self):
235 def __len__(self):
235 return len(self._sequences)
236 return len(self._sequences)
236
237
237 def __nonzero__(self):
238 def __nonzero__(self):
238 return bool(self._sequences)
239 return bool(self._sequences)
239
240
240 class bundleoperation(object):
241 class bundleoperation(object):
241 """an object that represents a single bundling process
242 """an object that represents a single bundling process
242
243
243 Its purpose is to carry unbundle-related objects and states.
244 Its purpose is to carry unbundle-related objects and states.
244
245
245 A new object should be created at the beginning of each bundle processing.
246 A new object should be created at the beginning of each bundle processing.
246 The object is to be returned by the processing function.
247 The object is to be returned by the processing function.
247
248
248 The object has very little content now it will ultimately contain:
249 The object has very little content now it will ultimately contain:
249 * an access to the repo the bundle is applied to,
250 * an access to the repo the bundle is applied to,
250 * a ui object,
251 * a ui object,
251 * a way to retrieve a transaction to add changes to the repo,
252 * a way to retrieve a transaction to add changes to the repo,
252 * a way to record the result of processing each part,
253 * a way to record the result of processing each part,
253 * a way to construct a bundle response when applicable.
254 * a way to construct a bundle response when applicable.
254 """
255 """
255
256
256 def __init__(self, repo, transactiongetter):
257 def __init__(self, repo, transactiongetter):
257 self.repo = repo
258 self.repo = repo
258 self.ui = repo.ui
259 self.ui = repo.ui
259 self.records = unbundlerecords()
260 self.records = unbundlerecords()
260 self.gettransaction = transactiongetter
261 self.gettransaction = transactiongetter
261 self.reply = None
262 self.reply = None
262
263
263 class TransactionUnavailable(RuntimeError):
264 class TransactionUnavailable(RuntimeError):
264 pass
265 pass
265
266
266 def _notransaction():
267 def _notransaction():
267 """default method to get a transaction while processing a bundle
268 """default method to get a transaction while processing a bundle
268
269
269 Raise an exception to highlight the fact that no transaction was expected
270 Raise an exception to highlight the fact that no transaction was expected
270 to be created"""
271 to be created"""
271 raise TransactionUnavailable()
272 raise TransactionUnavailable()
272
273
273 def processbundle(repo, unbundler, transactiongetter=_notransaction):
274 def processbundle(repo, unbundler, transactiongetter=_notransaction):
274 """This function process a bundle, apply effect to/from a repo
275 """This function process a bundle, apply effect to/from a repo
275
276
276 It iterates over each part then searches for and uses the proper handling
277 It iterates over each part then searches for and uses the proper handling
277 code to process the part. Parts are processed in order.
278 code to process the part. Parts are processed in order.
278
279
279 This is very early version of this function that will be strongly reworked
280 This is very early version of this function that will be strongly reworked
280 before final usage.
281 before final usage.
281
282
282 Unknown Mandatory part will abort the process.
283 Unknown Mandatory part will abort the process.
283 """
284 """
284 op = bundleoperation(repo, transactiongetter)
285 op = bundleoperation(repo, transactiongetter)
285 # todo:
286 # todo:
286 # - replace this is a init function soon.
287 # - replace this is a init function soon.
287 # - exception catching
288 # - exception catching
288 unbundler.params
289 unbundler.params
289 iterparts = unbundler.iterparts()
290 iterparts = unbundler.iterparts()
290 part = None
291 part = None
291 try:
292 try:
292 for part in iterparts:
293 for part in iterparts:
293 parttype = part.type
294 parttype = part.type
294 # part key are matched lower case
295 # part key are matched lower case
295 key = parttype.lower()
296 key = parttype.lower()
296 try:
297 try:
297 handler = parthandlermapping.get(key)
298 handler = parthandlermapping.get(key)
298 if handler is None:
299 if handler is None:
299 raise error.BundleValueError(parttype=key)
300 raise error.BundleValueError(parttype=key)
300 op.ui.debug('found a handler for part %r\n' % parttype)
301 op.ui.debug('found a handler for part %r\n' % parttype)
301 unknownparams = part.mandatorykeys - handler.params
302 unknownparams = part.mandatorykeys - handler.params
302 if unknownparams:
303 if unknownparams:
303 unknownparams = list(unknownparams)
304 unknownparams = list(unknownparams)
304 unknownparams.sort()
305 unknownparams.sort()
305 raise error.BundleValueError(parttype=key,
306 raise error.BundleValueError(parttype=key,
306 params=unknownparams)
307 params=unknownparams)
307 except error.BundleValueError, exc:
308 except error.BundleValueError, exc:
308 if key != parttype: # mandatory parts
309 if key != parttype: # mandatory parts
309 raise
310 raise
310 op.ui.debug('ignoring unsupported advisory part %s\n' % exc)
311 op.ui.debug('ignoring unsupported advisory part %s\n' % exc)
311 # consuming the part
312 # consuming the part
312 part.read()
313 part.read()
313 continue
314 continue
314
315
315
316
316 # handler is called outside the above try block so that we don't
317 # handler is called outside the above try block so that we don't
317 # risk catching KeyErrors from anything other than the
318 # risk catching KeyErrors from anything other than the
318 # parthandlermapping lookup (any KeyError raised by handler()
319 # parthandlermapping lookup (any KeyError raised by handler()
319 # itself represents a defect of a different variety).
320 # itself represents a defect of a different variety).
320 output = None
321 output = None
321 if op.reply is not None:
322 if op.reply is not None:
322 op.ui.pushbuffer(error=True)
323 op.ui.pushbuffer(error=True)
323 output = ''
324 output = ''
324 try:
325 try:
325 handler(op, part)
326 handler(op, part)
326 finally:
327 finally:
327 if output is not None:
328 if output is not None:
328 output = op.ui.popbuffer()
329 output = op.ui.popbuffer()
329 if output:
330 if output:
330 outpart = op.reply.newpart('b2x:output', data=output)
331 outpart = op.reply.newpart('b2x:output', data=output)
331 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
332 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
332 part.read()
333 part.read()
333 except Exception, exc:
334 except Exception, exc:
334 if part is not None:
335 if part is not None:
335 # consume the bundle content
336 # consume the bundle content
336 part.read()
337 part.read()
337 for part in iterparts:
338 for part in iterparts:
338 # consume the bundle content
339 # consume the bundle content
339 part.read()
340 part.read()
340 # Small hack to let caller code distinguish exceptions from bundle2
341 # Small hack to let caller code distinguish exceptions from bundle2
341 # processing fron the ones from bundle1 processing. This is mostly
342 # processing fron the ones from bundle1 processing. This is mostly
342 # needed to handle different return codes to unbundle according to the
343 # needed to handle different return codes to unbundle according to the
343 # type of bundle. We should probably clean up or drop this return code
344 # type of bundle. We should probably clean up or drop this return code
344 # craziness in a future version.
345 # craziness in a future version.
345 exc.duringunbundle2 = True
346 exc.duringunbundle2 = True
346 raise
347 raise
347 return op
348 return op
348
349
349 def decodecaps(blob):
350 def decodecaps(blob):
350 """decode a bundle2 caps bytes blob into a dictionnary
351 """decode a bundle2 caps bytes blob into a dictionnary
351
352
352 The blob is a list of capabilities (one per line)
353 The blob is a list of capabilities (one per line)
353 Capabilities may have values using a line of the form::
354 Capabilities may have values using a line of the form::
354
355
355 capability=value1,value2,value3
356 capability=value1,value2,value3
356
357
357 The values are always a list."""
358 The values are always a list."""
358 caps = {}
359 caps = {}
359 for line in blob.splitlines():
360 for line in blob.splitlines():
360 if not line:
361 if not line:
361 continue
362 continue
362 if '=' not in line:
363 if '=' not in line:
363 key, vals = line, ()
364 key, vals = line, ()
364 else:
365 else:
365 key, vals = line.split('=', 1)
366 key, vals = line.split('=', 1)
366 vals = vals.split(',')
367 vals = vals.split(',')
367 key = urllib.unquote(key)
368 key = urllib.unquote(key)
368 vals = [urllib.unquote(v) for v in vals]
369 vals = [urllib.unquote(v) for v in vals]
369 caps[key] = vals
370 caps[key] = vals
370 return caps
371 return caps
371
372
372 def encodecaps(caps):
373 def encodecaps(caps):
373 """encode a bundle2 caps dictionary into a bytes blob"""
374 """encode a bundle2 caps dictionary into a bytes blob"""
374 chunks = []
375 chunks = []
375 for ca in sorted(caps):
376 for ca in sorted(caps):
376 vals = caps[ca]
377 vals = caps[ca]
377 ca = urllib.quote(ca)
378 ca = urllib.quote(ca)
378 vals = [urllib.quote(v) for v in vals]
379 vals = [urllib.quote(v) for v in vals]
379 if vals:
380 if vals:
380 ca = "%s=%s" % (ca, ','.join(vals))
381 ca = "%s=%s" % (ca, ','.join(vals))
381 chunks.append(ca)
382 chunks.append(ca)
382 return '\n'.join(chunks)
383 return '\n'.join(chunks)
383
384
384 class bundle20(object):
385 class bundle20(object):
385 """represent an outgoing bundle2 container
386 """represent an outgoing bundle2 container
386
387
387 Use the `addparam` method to add stream level parameter. and `newpart` to
388 Use the `addparam` method to add stream level parameter. and `newpart` to
388 populate it. Then call `getchunks` to retrieve all the binary chunks of
389 populate it. Then call `getchunks` to retrieve all the binary chunks of
389 data that compose the bundle2 container."""
390 data that compose the bundle2 container."""
390
391
391 def __init__(self, ui, capabilities=()):
392 def __init__(self, ui, capabilities=()):
392 self.ui = ui
393 self.ui = ui
393 self._params = []
394 self._params = []
394 self._parts = []
395 self._parts = []
395 self.capabilities = dict(capabilities)
396 self.capabilities = dict(capabilities)
396
397
397 # methods used to defines the bundle2 content
398 # methods used to defines the bundle2 content
398 def addparam(self, name, value=None):
399 def addparam(self, name, value=None):
399 """add a stream level parameter"""
400 """add a stream level parameter"""
400 if not name:
401 if not name:
401 raise ValueError('empty parameter name')
402 raise ValueError('empty parameter name')
402 if name[0] not in string.letters:
403 if name[0] not in string.letters:
403 raise ValueError('non letter first character: %r' % name)
404 raise ValueError('non letter first character: %r' % name)
404 self._params.append((name, value))
405 self._params.append((name, value))
405
406
406 def addpart(self, part):
407 def addpart(self, part):
407 """add a new part to the bundle2 container
408 """add a new part to the bundle2 container
408
409
409 Parts contains the actual applicative payload."""
410 Parts contains the actual applicative payload."""
410 assert part.id is None
411 assert part.id is None
411 part.id = len(self._parts) # very cheap counter
412 part.id = len(self._parts) # very cheap counter
412 self._parts.append(part)
413 self._parts.append(part)
413
414
414 def newpart(self, typeid, *args, **kwargs):
415 def newpart(self, typeid, *args, **kwargs):
415 """create a new part and add it to the containers
416 """create a new part and add it to the containers
416
417
417 As the part is directly added to the containers. For now, this means
418 As the part is directly added to the containers. For now, this means
418 that any failure to properly initialize the part after calling
419 that any failure to properly initialize the part after calling
419 ``newpart`` should result in a failure of the whole bundling process.
420 ``newpart`` should result in a failure of the whole bundling process.
420
421
421 You can still fall back to manually create and add if you need better
422 You can still fall back to manually create and add if you need better
422 control."""
423 control."""
423 part = bundlepart(typeid, *args, **kwargs)
424 part = bundlepart(typeid, *args, **kwargs)
424 self.addpart(part)
425 self.addpart(part)
425 return part
426 return part
426
427
427 # methods used to generate the bundle2 stream
428 # methods used to generate the bundle2 stream
428 def getchunks(self):
429 def getchunks(self):
429 self.ui.debug('start emission of %s stream\n' % _magicstring)
430 self.ui.debug('start emission of %s stream\n' % _magicstring)
430 yield _magicstring
431 yield _magicstring
431 param = self._paramchunk()
432 param = self._paramchunk()
432 self.ui.debug('bundle parameter: %s\n' % param)
433 self.ui.debug('bundle parameter: %s\n' % param)
433 yield _pack(_fstreamparamsize, len(param))
434 yield _pack(_fstreamparamsize, len(param))
434 if param:
435 if param:
435 yield param
436 yield param
436
437
437 self.ui.debug('start of parts\n')
438 self.ui.debug('start of parts\n')
438 for part in self._parts:
439 for part in self._parts:
439 self.ui.debug('bundle part: "%s"\n' % part.type)
440 self.ui.debug('bundle part: "%s"\n' % part.type)
440 for chunk in part.getchunks():
441 for chunk in part.getchunks():
441 yield chunk
442 yield chunk
442 self.ui.debug('end of bundle\n')
443 self.ui.debug('end of bundle\n')
443 yield '\0\0'
444 yield '\0\0'
444
445
445 def _paramchunk(self):
446 def _paramchunk(self):
446 """return a encoded version of all stream parameters"""
447 """return a encoded version of all stream parameters"""
447 blocks = []
448 blocks = []
448 for par, value in self._params:
449 for par, value in self._params:
449 par = urllib.quote(par)
450 par = urllib.quote(par)
450 if value is not None:
451 if value is not None:
451 value = urllib.quote(value)
452 value = urllib.quote(value)
452 par = '%s=%s' % (par, value)
453 par = '%s=%s' % (par, value)
453 blocks.append(par)
454 blocks.append(par)
454 return ' '.join(blocks)
455 return ' '.join(blocks)
455
456
456 class unpackermixin(object):
457 class unpackermixin(object):
457 """A mixin to extract bytes and struct data from a stream"""
458 """A mixin to extract bytes and struct data from a stream"""
458
459
459 def __init__(self, fp):
460 def __init__(self, fp):
460 self._fp = fp
461 self._fp = fp
461
462
462 def _unpack(self, format):
463 def _unpack(self, format):
463 """unpack this struct format from the stream"""
464 """unpack this struct format from the stream"""
464 data = self._readexact(struct.calcsize(format))
465 data = self._readexact(struct.calcsize(format))
465 return _unpack(format, data)
466 return _unpack(format, data)
466
467
467 def _readexact(self, size):
468 def _readexact(self, size):
468 """read exactly <size> bytes from the stream"""
469 """read exactly <size> bytes from the stream"""
469 return changegroup.readexactly(self._fp, size)
470 return changegroup.readexactly(self._fp, size)
470
471
471
472
472 class unbundle20(unpackermixin):
473 class unbundle20(unpackermixin):
473 """interpret a bundle2 stream
474 """interpret a bundle2 stream
474
475
475 This class is fed with a binary stream and yields parts through its
476 This class is fed with a binary stream and yields parts through its
476 `iterparts` methods."""
477 `iterparts` methods."""
477
478
478 def __init__(self, ui, fp, header=None):
479 def __init__(self, ui, fp, header=None):
479 """If header is specified, we do not read it out of the stream."""
480 """If header is specified, we do not read it out of the stream."""
480 self.ui = ui
481 self.ui = ui
481 super(unbundle20, self).__init__(fp)
482 super(unbundle20, self).__init__(fp)
482 if header is None:
483 if header is None:
483 header = self._readexact(4)
484 header = self._readexact(4)
484 magic, version = header[0:2], header[2:4]
485 magic, version = header[0:2], header[2:4]
485 if magic != 'HG':
486 if magic != 'HG':
486 raise util.Abort(_('not a Mercurial bundle'))
487 raise util.Abort(_('not a Mercurial bundle'))
487 if version != '2X':
488 if version != '2X':
488 raise util.Abort(_('unknown bundle version %s') % version)
489 raise util.Abort(_('unknown bundle version %s') % version)
489 self.ui.debug('start processing of %s stream\n' % header)
490 self.ui.debug('start processing of %s stream\n' % header)
490
491
491 @util.propertycache
492 @util.propertycache
492 def params(self):
493 def params(self):
493 """dictionary of stream level parameters"""
494 """dictionary of stream level parameters"""
494 self.ui.debug('reading bundle2 stream parameters\n')
495 self.ui.debug('reading bundle2 stream parameters\n')
495 params = {}
496 params = {}
496 paramssize = self._unpack(_fstreamparamsize)[0]
497 paramssize = self._unpack(_fstreamparamsize)[0]
497 if paramssize:
498 if paramssize:
498 for p in self._readexact(paramssize).split(' '):
499 for p in self._readexact(paramssize).split(' '):
499 p = p.split('=', 1)
500 p = p.split('=', 1)
500 p = [urllib.unquote(i) for i in p]
501 p = [urllib.unquote(i) for i in p]
501 if len(p) < 2:
502 if len(p) < 2:
502 p.append(None)
503 p.append(None)
503 self._processparam(*p)
504 self._processparam(*p)
504 params[p[0]] = p[1]
505 params[p[0]] = p[1]
505 return params
506 return params
506
507
507 def _processparam(self, name, value):
508 def _processparam(self, name, value):
508 """process a parameter, applying its effect if needed
509 """process a parameter, applying its effect if needed
509
510
510 Parameter starting with a lower case letter are advisory and will be
511 Parameter starting with a lower case letter are advisory and will be
511 ignored when unknown. Those starting with an upper case letter are
512 ignored when unknown. Those starting with an upper case letter are
512 mandatory and will this function will raise a KeyError when unknown.
513 mandatory and will this function will raise a KeyError when unknown.
513
514
514 Note: no option are currently supported. Any input will be either
515 Note: no option are currently supported. Any input will be either
515 ignored or failing.
516 ignored or failing.
516 """
517 """
517 if not name:
518 if not name:
518 raise ValueError('empty parameter name')
519 raise ValueError('empty parameter name')
519 if name[0] not in string.letters:
520 if name[0] not in string.letters:
520 raise ValueError('non letter first character: %r' % name)
521 raise ValueError('non letter first character: %r' % name)
521 # Some logic will be later added here to try to process the option for
522 # Some logic will be later added here to try to process the option for
522 # a dict of known parameter.
523 # a dict of known parameter.
523 if name[0].islower():
524 if name[0].islower():
524 self.ui.debug("ignoring unknown parameter %r\n" % name)
525 self.ui.debug("ignoring unknown parameter %r\n" % name)
525 else:
526 else:
526 raise error.BundleValueError(params=(name,))
527 raise error.BundleValueError(params=(name,))
527
528
528
529
529 def iterparts(self):
530 def iterparts(self):
530 """yield all parts contained in the stream"""
531 """yield all parts contained in the stream"""
531 # make sure param have been loaded
532 # make sure param have been loaded
532 self.params
533 self.params
533 self.ui.debug('start extraction of bundle2 parts\n')
534 self.ui.debug('start extraction of bundle2 parts\n')
534 headerblock = self._readpartheader()
535 headerblock = self._readpartheader()
535 while headerblock is not None:
536 while headerblock is not None:
536 part = unbundlepart(self.ui, headerblock, self._fp)
537 part = unbundlepart(self.ui, headerblock, self._fp)
537 yield part
538 yield part
538 headerblock = self._readpartheader()
539 headerblock = self._readpartheader()
539 self.ui.debug('end of bundle2 stream\n')
540 self.ui.debug('end of bundle2 stream\n')
540
541
541 def _readpartheader(self):
542 def _readpartheader(self):
542 """reads a part header size and return the bytes blob
543 """reads a part header size and return the bytes blob
543
544
544 returns None if empty"""
545 returns None if empty"""
545 headersize = self._unpack(_fpartheadersize)[0]
546 headersize = self._unpack(_fpartheadersize)[0]
546 self.ui.debug('part header size: %i\n' % headersize)
547 self.ui.debug('part header size: %i\n' % headersize)
547 if headersize:
548 if headersize:
548 return self._readexact(headersize)
549 return self._readexact(headersize)
549 return None
550 return None
550
551
551
552
552 class bundlepart(object):
553 class bundlepart(object):
553 """A bundle2 part contains application level payload
554 """A bundle2 part contains application level payload
554
555
555 The part `type` is used to route the part to the application level
556 The part `type` is used to route the part to the application level
556 handler.
557 handler.
557
558
558 The part payload is contained in ``part.data``. It could be raw bytes or a
559 The part payload is contained in ``part.data``. It could be raw bytes or a
559 generator of byte chunks.
560 generator of byte chunks.
560
561
561 You can add parameters to the part using the ``addparam`` method.
562 You can add parameters to the part using the ``addparam`` method.
562 Parameters can be either mandatory (default) or advisory. Remote side
563 Parameters can be either mandatory (default) or advisory. Remote side
563 should be able to safely ignore the advisory ones.
564 should be able to safely ignore the advisory ones.
564
565
565 Both data and parameters cannot be modified after the generation has begun.
566 Both data and parameters cannot be modified after the generation has begun.
566 """
567 """
567
568
568 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
569 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
569 data=''):
570 data=''):
570 self.id = None
571 self.id = None
571 self.type = parttype
572 self.type = parttype
572 self._data = data
573 self._data = data
573 self._mandatoryparams = list(mandatoryparams)
574 self._mandatoryparams = list(mandatoryparams)
574 self._advisoryparams = list(advisoryparams)
575 self._advisoryparams = list(advisoryparams)
575 # checking for duplicated entries
576 # checking for duplicated entries
576 self._seenparams = set()
577 self._seenparams = set()
577 for pname, __ in self._mandatoryparams + self._advisoryparams:
578 for pname, __ in self._mandatoryparams + self._advisoryparams:
578 if pname in self._seenparams:
579 if pname in self._seenparams:
579 raise RuntimeError('duplicated params: %s' % pname)
580 raise RuntimeError('duplicated params: %s' % pname)
580 self._seenparams.add(pname)
581 self._seenparams.add(pname)
581 # status of the part's generation:
582 # status of the part's generation:
582 # - None: not started,
583 # - None: not started,
583 # - False: currently generated,
584 # - False: currently generated,
584 # - True: generation done.
585 # - True: generation done.
585 self._generated = None
586 self._generated = None
586
587
587 # methods used to defines the part content
588 # methods used to defines the part content
588 def __setdata(self, data):
589 def __setdata(self, data):
589 if self._generated is not None:
590 if self._generated is not None:
590 raise error.ReadOnlyPartError('part is being generated')
591 raise error.ReadOnlyPartError('part is being generated')
591 self._data = data
592 self._data = data
592 def __getdata(self):
593 def __getdata(self):
593 return self._data
594 return self._data
594 data = property(__getdata, __setdata)
595 data = property(__getdata, __setdata)
595
596
596 @property
597 @property
597 def mandatoryparams(self):
598 def mandatoryparams(self):
598 # make it an immutable tuple to force people through ``addparam``
599 # make it an immutable tuple to force people through ``addparam``
599 return tuple(self._mandatoryparams)
600 return tuple(self._mandatoryparams)
600
601
601 @property
602 @property
602 def advisoryparams(self):
603 def advisoryparams(self):
603 # make it an immutable tuple to force people through ``addparam``
604 # make it an immutable tuple to force people through ``addparam``
604 return tuple(self._advisoryparams)
605 return tuple(self._advisoryparams)
605
606
606 def addparam(self, name, value='', mandatory=True):
607 def addparam(self, name, value='', mandatory=True):
607 if self._generated is not None:
608 if self._generated is not None:
608 raise error.ReadOnlyPartError('part is being generated')
609 raise error.ReadOnlyPartError('part is being generated')
609 if name in self._seenparams:
610 if name in self._seenparams:
610 raise ValueError('duplicated params: %s' % name)
611 raise ValueError('duplicated params: %s' % name)
611 self._seenparams.add(name)
612 self._seenparams.add(name)
612 params = self._advisoryparams
613 params = self._advisoryparams
613 if mandatory:
614 if mandatory:
614 params = self._mandatoryparams
615 params = self._mandatoryparams
615 params.append((name, value))
616 params.append((name, value))
616
617
617 # methods used to generates the bundle2 stream
618 # methods used to generates the bundle2 stream
618 def getchunks(self):
619 def getchunks(self):
619 if self._generated is not None:
620 if self._generated is not None:
620 raise RuntimeError('part can only be consumed once')
621 raise RuntimeError('part can only be consumed once')
621 self._generated = False
622 self._generated = False
622 #### header
623 #### header
623 ## parttype
624 ## parttype
624 header = [_pack(_fparttypesize, len(self.type)),
625 header = [_pack(_fparttypesize, len(self.type)),
625 self.type, _pack(_fpartid, self.id),
626 self.type, _pack(_fpartid, self.id),
626 ]
627 ]
627 ## parameters
628 ## parameters
628 # count
629 # count
629 manpar = self.mandatoryparams
630 manpar = self.mandatoryparams
630 advpar = self.advisoryparams
631 advpar = self.advisoryparams
631 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
632 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
632 # size
633 # size
633 parsizes = []
634 parsizes = []
634 for key, value in manpar:
635 for key, value in manpar:
635 parsizes.append(len(key))
636 parsizes.append(len(key))
636 parsizes.append(len(value))
637 parsizes.append(len(value))
637 for key, value in advpar:
638 for key, value in advpar:
638 parsizes.append(len(key))
639 parsizes.append(len(key))
639 parsizes.append(len(value))
640 parsizes.append(len(value))
640 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
641 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
641 header.append(paramsizes)
642 header.append(paramsizes)
642 # key, value
643 # key, value
643 for key, value in manpar:
644 for key, value in manpar:
644 header.append(key)
645 header.append(key)
645 header.append(value)
646 header.append(value)
646 for key, value in advpar:
647 for key, value in advpar:
647 header.append(key)
648 header.append(key)
648 header.append(value)
649 header.append(value)
649 ## finalize header
650 ## finalize header
650 headerchunk = ''.join(header)
651 headerchunk = ''.join(header)
651 yield _pack(_fpartheadersize, len(headerchunk))
652 yield _pack(_fpartheadersize, len(headerchunk))
652 yield headerchunk
653 yield headerchunk
653 ## payload
654 ## payload
654 for chunk in self._payloadchunks():
655 for chunk in self._payloadchunks():
655 yield _pack(_fpayloadsize, len(chunk))
656 yield _pack(_fpayloadsize, len(chunk))
656 yield chunk
657 yield chunk
657 # end of payload
658 # end of payload
658 yield _pack(_fpayloadsize, 0)
659 yield _pack(_fpayloadsize, 0)
659 self._generated = True
660 self._generated = True
660
661
661 def _payloadchunks(self):
662 def _payloadchunks(self):
662 """yield chunks of a the part payload
663 """yield chunks of a the part payload
663
664
664 Exists to handle the different methods to provide data to a part."""
665 Exists to handle the different methods to provide data to a part."""
665 # we only support fixed size data now.
666 # we only support fixed size data now.
666 # This will be improved in the future.
667 # This will be improved in the future.
667 if util.safehasattr(self.data, 'next'):
668 if util.safehasattr(self.data, 'next'):
668 buff = util.chunkbuffer(self.data)
669 buff = util.chunkbuffer(self.data)
669 chunk = buff.read(preferedchunksize)
670 chunk = buff.read(preferedchunksize)
670 while chunk:
671 while chunk:
671 yield chunk
672 yield chunk
672 chunk = buff.read(preferedchunksize)
673 chunk = buff.read(preferedchunksize)
673 elif len(self.data):
674 elif len(self.data):
674 yield self.data
675 yield self.data
675
676
676 class unbundlepart(unpackermixin):
677 class unbundlepart(unpackermixin):
677 """a bundle part read from a bundle"""
678 """a bundle part read from a bundle"""
678
679
679 def __init__(self, ui, header, fp):
680 def __init__(self, ui, header, fp):
680 super(unbundlepart, self).__init__(fp)
681 super(unbundlepart, self).__init__(fp)
681 self.ui = ui
682 self.ui = ui
682 # unbundle state attr
683 # unbundle state attr
683 self._headerdata = header
684 self._headerdata = header
684 self._headeroffset = 0
685 self._headeroffset = 0
685 self._initialized = False
686 self._initialized = False
686 self.consumed = False
687 self.consumed = False
687 # part data
688 # part data
688 self.id = None
689 self.id = None
689 self.type = None
690 self.type = None
690 self.mandatoryparams = None
691 self.mandatoryparams = None
691 self.advisoryparams = None
692 self.advisoryparams = None
692 self.params = None
693 self.params = None
693 self.mandatorykeys = ()
694 self.mandatorykeys = ()
694 self._payloadstream = None
695 self._payloadstream = None
695 self._readheader()
696 self._readheader()
696
697
697 def _fromheader(self, size):
698 def _fromheader(self, size):
698 """return the next <size> byte from the header"""
699 """return the next <size> byte from the header"""
699 offset = self._headeroffset
700 offset = self._headeroffset
700 data = self._headerdata[offset:(offset + size)]
701 data = self._headerdata[offset:(offset + size)]
701 self._headeroffset = offset + size
702 self._headeroffset = offset + size
702 return data
703 return data
703
704
704 def _unpackheader(self, format):
705 def _unpackheader(self, format):
705 """read given format from header
706 """read given format from header
706
707
707 This automatically compute the size of the format to read."""
708 This automatically compute the size of the format to read."""
708 data = self._fromheader(struct.calcsize(format))
709 data = self._fromheader(struct.calcsize(format))
709 return _unpack(format, data)
710 return _unpack(format, data)
710
711
711 def _initparams(self, mandatoryparams, advisoryparams):
712 def _initparams(self, mandatoryparams, advisoryparams):
712 """internal function to setup all logic related parameters"""
713 """internal function to setup all logic related parameters"""
713 # make it read only to prevent people touching it by mistake.
714 # make it read only to prevent people touching it by mistake.
714 self.mandatoryparams = tuple(mandatoryparams)
715 self.mandatoryparams = tuple(mandatoryparams)
715 self.advisoryparams = tuple(advisoryparams)
716 self.advisoryparams = tuple(advisoryparams)
716 # user friendly UI
717 # user friendly UI
717 self.params = dict(self.mandatoryparams)
718 self.params = dict(self.mandatoryparams)
718 self.params.update(dict(self.advisoryparams))
719 self.params.update(dict(self.advisoryparams))
719 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
720 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
720
721
721 def _readheader(self):
722 def _readheader(self):
722 """read the header and setup the object"""
723 """read the header and setup the object"""
723 typesize = self._unpackheader(_fparttypesize)[0]
724 typesize = self._unpackheader(_fparttypesize)[0]
724 self.type = self._fromheader(typesize)
725 self.type = self._fromheader(typesize)
725 self.ui.debug('part type: "%s"\n' % self.type)
726 self.ui.debug('part type: "%s"\n' % self.type)
726 self.id = self._unpackheader(_fpartid)[0]
727 self.id = self._unpackheader(_fpartid)[0]
727 self.ui.debug('part id: "%s"\n' % self.id)
728 self.ui.debug('part id: "%s"\n' % self.id)
728 ## reading parameters
729 ## reading parameters
729 # param count
730 # param count
730 mancount, advcount = self._unpackheader(_fpartparamcount)
731 mancount, advcount = self._unpackheader(_fpartparamcount)
731 self.ui.debug('part parameters: %i\n' % (mancount + advcount))
732 self.ui.debug('part parameters: %i\n' % (mancount + advcount))
732 # param size
733 # param size
733 fparamsizes = _makefpartparamsizes(mancount + advcount)
734 fparamsizes = _makefpartparamsizes(mancount + advcount)
734 paramsizes = self._unpackheader(fparamsizes)
735 paramsizes = self._unpackheader(fparamsizes)
735 # make it a list of couple again
736 # make it a list of couple again
736 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
737 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
737 # split mandatory from advisory
738 # split mandatory from advisory
738 mansizes = paramsizes[:mancount]
739 mansizes = paramsizes[:mancount]
739 advsizes = paramsizes[mancount:]
740 advsizes = paramsizes[mancount:]
740 # retrive param value
741 # retrive param value
741 manparams = []
742 manparams = []
742 for key, value in mansizes:
743 for key, value in mansizes:
743 manparams.append((self._fromheader(key), self._fromheader(value)))
744 manparams.append((self._fromheader(key), self._fromheader(value)))
744 advparams = []
745 advparams = []
745 for key, value in advsizes:
746 for key, value in advsizes:
746 advparams.append((self._fromheader(key), self._fromheader(value)))
747 advparams.append((self._fromheader(key), self._fromheader(value)))
747 self._initparams(manparams, advparams)
748 self._initparams(manparams, advparams)
748 ## part payload
749 ## part payload
749 def payloadchunks():
750 def payloadchunks():
750 payloadsize = self._unpack(_fpayloadsize)[0]
751 payloadsize = self._unpack(_fpayloadsize)[0]
751 self.ui.debug('payload chunk size: %i\n' % payloadsize)
752 self.ui.debug('payload chunk size: %i\n' % payloadsize)
752 while payloadsize:
753 while payloadsize:
753 yield self._readexact(payloadsize)
754 yield self._readexact(payloadsize)
754 payloadsize = self._unpack(_fpayloadsize)[0]
755 payloadsize = self._unpack(_fpayloadsize)[0]
755 self.ui.debug('payload chunk size: %i\n' % payloadsize)
756 self.ui.debug('payload chunk size: %i\n' % payloadsize)
756 self._payloadstream = util.chunkbuffer(payloadchunks())
757 self._payloadstream = util.chunkbuffer(payloadchunks())
757 # we read the data, tell it
758 # we read the data, tell it
758 self._initialized = True
759 self._initialized = True
759
760
760 def read(self, size=None):
761 def read(self, size=None):
761 """read payload data"""
762 """read payload data"""
762 if not self._initialized:
763 if not self._initialized:
763 self._readheader()
764 self._readheader()
764 if size is None:
765 if size is None:
765 data = self._payloadstream.read()
766 data = self._payloadstream.read()
766 else:
767 else:
767 data = self._payloadstream.read(size)
768 data = self._payloadstream.read(size)
768 if size is None or len(data) < size:
769 if size is None or len(data) < size:
769 self.consumed = True
770 self.consumed = True
770 return data
771 return data
771
772
772 def bundle2caps(remote):
773 def bundle2caps(remote):
773 """return the bundlecapabilities of a peer as dict"""
774 """return the bundlecapabilities of a peer as dict"""
774 raw = remote.capable('bundle2-exp')
775 raw = remote.capable('bundle2-exp')
775 if not raw and raw != '':
776 if not raw and raw != '':
776 return {}
777 return {}
777 capsblob = urllib.unquote(remote.capable('bundle2-exp'))
778 capsblob = urllib.unquote(remote.capable('bundle2-exp'))
778 return decodecaps(capsblob)
779 return decodecaps(capsblob)
779
780
780 @parthandler('b2x:changegroup')
781 @parthandler('b2x:changegroup')
781 def handlechangegroup(op, inpart):
782 def handlechangegroup(op, inpart):
782 """apply a changegroup part on the repo
783 """apply a changegroup part on the repo
783
784
784 This is a very early implementation that will massive rework before being
785 This is a very early implementation that will massive rework before being
785 inflicted to any end-user.
786 inflicted to any end-user.
786 """
787 """
787 # Make sure we trigger a transaction creation
788 # Make sure we trigger a transaction creation
788 #
789 #
789 # The addchangegroup function will get a transaction object by itself, but
790 # The addchangegroup function will get a transaction object by itself, but
790 # we need to make sure we trigger the creation of a transaction object used
791 # we need to make sure we trigger the creation of a transaction object used
791 # for the whole processing scope.
792 # for the whole processing scope.
792 op.gettransaction()
793 op.gettransaction()
793 cg = changegroup.unbundle10(inpart, 'UN')
794 cg = changegroup.unbundle10(inpart, 'UN')
794 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
795 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
795 op.records.add('changegroup', {'return': ret})
796 op.records.add('changegroup', {'return': ret})
796 if op.reply is not None:
797 if op.reply is not None:
797 # This is definitly not the final form of this
798 # This is definitly not the final form of this
798 # return. But one need to start somewhere.
799 # return. But one need to start somewhere.
799 part = op.reply.newpart('b2x:reply:changegroup')
800 part = op.reply.newpart('b2x:reply:changegroup')
800 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
801 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
801 part.addparam('return', '%i' % ret, mandatory=False)
802 part.addparam('return', '%i' % ret, mandatory=False)
802 assert not inpart.read()
803 assert not inpart.read()
803
804
804 @parthandler('b2x:reply:changegroup', ('return', 'in-reply-to'))
805 @parthandler('b2x:reply:changegroup', ('return', 'in-reply-to'))
805 def handlechangegroup(op, inpart):
806 def handlechangegroup(op, inpart):
806 ret = int(inpart.params['return'])
807 ret = int(inpart.params['return'])
807 replyto = int(inpart.params['in-reply-to'])
808 replyto = int(inpart.params['in-reply-to'])
808 op.records.add('changegroup', {'return': ret}, replyto)
809 op.records.add('changegroup', {'return': ret}, replyto)
809
810
810 @parthandler('b2x:check:heads')
811 @parthandler('b2x:check:heads')
811 def handlechangegroup(op, inpart):
812 def handlechangegroup(op, inpart):
812 """check that head of the repo did not change
813 """check that head of the repo did not change
813
814
814 This is used to detect a push race when using unbundle.
815 This is used to detect a push race when using unbundle.
815 This replaces the "heads" argument of unbundle."""
816 This replaces the "heads" argument of unbundle."""
816 h = inpart.read(20)
817 h = inpart.read(20)
817 heads = []
818 heads = []
818 while len(h) == 20:
819 while len(h) == 20:
819 heads.append(h)
820 heads.append(h)
820 h = inpart.read(20)
821 h = inpart.read(20)
821 assert not h
822 assert not h
822 if heads != op.repo.heads():
823 if heads != op.repo.heads():
823 raise error.PushRaced('repository changed while pushing - '
824 raise error.PushRaced('repository changed while pushing - '
824 'please try again')
825 'please try again')
825
826
826 @parthandler('b2x:output')
827 @parthandler('b2x:output')
827 def handleoutput(op, inpart):
828 def handleoutput(op, inpart):
828 """forward output captured on the server to the client"""
829 """forward output captured on the server to the client"""
829 for line in inpart.read().splitlines():
830 for line in inpart.read().splitlines():
830 op.ui.write(('remote: %s\n' % line))
831 op.ui.write(('remote: %s\n' % line))
831
832
832 @parthandler('b2x:replycaps')
833 @parthandler('b2x:replycaps')
833 def handlereplycaps(op, inpart):
834 def handlereplycaps(op, inpart):
834 """Notify that a reply bundle should be created
835 """Notify that a reply bundle should be created
835
836
836 The payload contains the capabilities information for the reply"""
837 The payload contains the capabilities information for the reply"""
837 caps = decodecaps(inpart.read())
838 caps = decodecaps(inpart.read())
838 if op.reply is None:
839 if op.reply is None:
839 op.reply = bundle20(op.ui, caps)
840 op.reply = bundle20(op.ui, caps)
840
841
841 @parthandler('b2x:error:abort', ('message', 'hint'))
842 @parthandler('b2x:error:abort', ('message', 'hint'))
842 def handlereplycaps(op, inpart):
843 def handlereplycaps(op, inpart):
843 """Used to transmit abort error over the wire"""
844 """Used to transmit abort error over the wire"""
844 raise util.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
845 raise util.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
845
846
846 @parthandler('b2x:error:unsupportedcontent', ('parttype', 'params'))
847 @parthandler('b2x:error:unsupportedcontent', ('parttype', 'params'))
847 def handlereplycaps(op, inpart):
848 def handlereplycaps(op, inpart):
848 """Used to transmit unknown content error over the wire"""
849 """Used to transmit unknown content error over the wire"""
849 kwargs = {}
850 kwargs = {}
850 parttype = inpart.params.get('parttype')
851 parttype = inpart.params.get('parttype')
851 if parttype is not None:
852 if parttype is not None:
852 kwargs['parttype'] = parttype
853 kwargs['parttype'] = parttype
853 params = inpart.params.get('params')
854 params = inpart.params.get('params')
854 if params is not None:
855 if params is not None:
855 kwargs['params'] = params.split('\0')
856 kwargs['params'] = params.split('\0')
856
857
857 raise error.BundleValueError(**kwargs)
858 raise error.BundleValueError(**kwargs)
858
859
859 @parthandler('b2x:error:pushraced', ('message',))
860 @parthandler('b2x:error:pushraced', ('message',))
860 def handlereplycaps(op, inpart):
861 def handlereplycaps(op, inpart):
861 """Used to transmit push race error over the wire"""
862 """Used to transmit push race error over the wire"""
862 raise error.ResponseError(_('push failed:'), inpart.params['message'])
863 raise error.ResponseError(_('push failed:'), inpart.params['message'])
864
865 @parthandler('b2x:listkeys', ('namespace',))
866 def handlelistkeys(op, inpart):
867 """retrieve pushkey namespace content stored in a bundle2"""
868 namespace = inpart.params['namespace']
869 r = pushkey.decodekeys(inpart.read())
870 op.records.add('listkeys', (namespace, r))
General Comments 0
You need to be logged in to leave comments. Login now