##// END OF EJS Templates
bundle2: introduce an ``_initparams`` method...
Pierre-Yves David -
r21608:3cb96ca9 default
parent child Browse files
Show More
@@ -1,840 +1,844
1 # bundle2.py - generic container format to transmit arbitrary data.
1 # bundle2.py - generic container format to transmit arbitrary data.
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7 """Handling of the new bundle2 format
7 """Handling of the new bundle2 format
8
8
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 payloads in an application agnostic way. It consist in a sequence of "parts"
10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 that will be handed to and processed by the application layer.
11 that will be handed to and processed by the application layer.
12
12
13
13
14 General format architecture
14 General format architecture
15 ===========================
15 ===========================
16
16
17 The format is architectured as follow
17 The format is architectured as follow
18
18
19 - magic string
19 - magic string
20 - stream level parameters
20 - stream level parameters
21 - payload parts (any number)
21 - payload parts (any number)
22 - end of stream marker.
22 - end of stream marker.
23
23
24 the Binary format
24 the Binary format
25 ============================
25 ============================
26
26
27 All numbers are unsigned and big-endian.
27 All numbers are unsigned and big-endian.
28
28
29 stream level parameters
29 stream level parameters
30 ------------------------
30 ------------------------
31
31
32 Binary format is as follow
32 Binary format is as follow
33
33
34 :params size: (16 bits integer)
34 :params size: (16 bits integer)
35
35
36 The total number of Bytes used by the parameters
36 The total number of Bytes used by the parameters
37
37
38 :params value: arbitrary number of Bytes
38 :params value: arbitrary number of Bytes
39
39
40 A blob of `params size` containing the serialized version of all stream level
40 A blob of `params size` containing the serialized version of all stream level
41 parameters.
41 parameters.
42
42
43 The blob contains a space separated list of parameters. Parameters with value
43 The blob contains a space separated list of parameters. Parameters with value
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45
45
46 Empty name are obviously forbidden.
46 Empty name are obviously forbidden.
47
47
48 Name MUST start with a letter. If this first letter is lower case, the
48 Name MUST start with a letter. If this first letter is lower case, the
49 parameter is advisory and can be safely ignored. However when the first
49 parameter is advisory and can be safely ignored. However when the first
50 letter is capital, the parameter is mandatory and the bundling process MUST
50 letter is capital, the parameter is mandatory and the bundling process MUST
51 stop if he is not able to proceed it.
51 stop if he is not able to proceed it.
52
52
53 Stream parameters use a simple textual format for two main reasons:
53 Stream parameters use a simple textual format for two main reasons:
54
54
55 - Stream level parameters should remain simple and we want to discourage any
55 - Stream level parameters should remain simple and we want to discourage any
56 crazy usage.
56 crazy usage.
57 - Textual data allow easy human inspection of a bundle2 header in case of
57 - Textual data allow easy human inspection of a bundle2 header in case of
58 troubles.
58 troubles.
59
59
60 Any Applicative level options MUST go into a bundle2 part instead.
60 Any Applicative level options MUST go into a bundle2 part instead.
61
61
62 Payload part
62 Payload part
63 ------------------------
63 ------------------------
64
64
65 Binary format is as follow
65 Binary format is as follow
66
66
67 :header size: (16 bits inter)
67 :header size: (16 bits inter)
68
68
69 The total number of Bytes used by the part headers. When the header is empty
69 The total number of Bytes used by the part headers. When the header is empty
70 (size = 0) this is interpreted as the end of stream marker.
70 (size = 0) this is interpreted as the end of stream marker.
71
71
72 :header:
72 :header:
73
73
74 The header defines how to interpret the part. It contains two piece of
74 The header defines how to interpret the part. It contains two piece of
75 data: the part type, and the part parameters.
75 data: the part type, and the part parameters.
76
76
77 The part type is used to route an application level handler, that can
77 The part type is used to route an application level handler, that can
78 interpret payload.
78 interpret payload.
79
79
80 Part parameters are passed to the application level handler. They are
80 Part parameters are passed to the application level handler. They are
81 meant to convey information that will help the application level object to
81 meant to convey information that will help the application level object to
82 interpret the part payload.
82 interpret the part payload.
83
83
84 The binary format of the header is has follow
84 The binary format of the header is has follow
85
85
86 :typesize: (one byte)
86 :typesize: (one byte)
87
87
88 :parttype: alphanumerical part name
88 :parttype: alphanumerical part name
89
89
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 to this part.
91 to this part.
92
92
93 :parameters:
93 :parameters:
94
94
95 Part's parameter may have arbitrary content, the binary structure is::
95 Part's parameter may have arbitrary content, the binary structure is::
96
96
97 <mandatory-count><advisory-count><param-sizes><param-data>
97 <mandatory-count><advisory-count><param-sizes><param-data>
98
98
99 :mandatory-count: 1 byte, number of mandatory parameters
99 :mandatory-count: 1 byte, number of mandatory parameters
100
100
101 :advisory-count: 1 byte, number of advisory parameters
101 :advisory-count: 1 byte, number of advisory parameters
102
102
103 :param-sizes:
103 :param-sizes:
104
104
105 N couple of bytes, where N is the total number of parameters. Each
105 N couple of bytes, where N is the total number of parameters. Each
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107
107
108 :param-data:
108 :param-data:
109
109
110 A blob of bytes from which each parameter key and value can be
110 A blob of bytes from which each parameter key and value can be
111 retrieved using the list of size couples stored in the previous
111 retrieved using the list of size couples stored in the previous
112 field.
112 field.
113
113
114 Mandatory parameters comes first, then the advisory ones.
114 Mandatory parameters comes first, then the advisory ones.
115
115
116 Each parameter's key MUST be unique within the part.
116 Each parameter's key MUST be unique within the part.
117
117
118 :payload:
118 :payload:
119
119
120 payload is a series of `<chunksize><chunkdata>`.
120 payload is a series of `<chunksize><chunkdata>`.
121
121
122 `chunksize` is a 32 bits integer, `chunkdata` are plain bytes (as much as
122 `chunksize` is a 32 bits integer, `chunkdata` are plain bytes (as much as
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124
124
125 The current implementation always produces either zero or one chunk.
125 The current implementation always produces either zero or one chunk.
126 This is an implementation limitation that will ultimately be lifted.
126 This is an implementation limitation that will ultimately be lifted.
127
127
128 Bundle processing
128 Bundle processing
129 ============================
129 ============================
130
130
131 Each part is processed in order using a "part handler". Handler are registered
131 Each part is processed in order using a "part handler". Handler are registered
132 for a certain part type.
132 for a certain part type.
133
133
134 The matching of a part to its handler is case insensitive. The case of the
134 The matching of a part to its handler is case insensitive. The case of the
135 part type is used to know if a part is mandatory or advisory. If the Part type
135 part type is used to know if a part is mandatory or advisory. If the Part type
136 contains any uppercase char it is considered mandatory. When no handler is
136 contains any uppercase char it is considered mandatory. When no handler is
137 known for a Mandatory part, the process is aborted and an exception is raised.
137 known for a Mandatory part, the process is aborted and an exception is raised.
138 If the part is advisory and no handler is known, the part is ignored. When the
138 If the part is advisory and no handler is known, the part is ignored. When the
139 process is aborted, the full bundle is still read from the stream to keep the
139 process is aborted, the full bundle is still read from the stream to keep the
140 channel usable. But none of the part read from an abort are processed. In the
140 channel usable. But none of the part read from an abort are processed. In the
141 future, dropping the stream may become an option for channel we do not care to
141 future, dropping the stream may become an option for channel we do not care to
142 preserve.
142 preserve.
143 """
143 """
144
144
145 import util
145 import util
146 import struct
146 import struct
147 import urllib
147 import urllib
148 import string
148 import string
149
149
150 import changegroup, error
150 import changegroup, error
151 from i18n import _
151 from i18n import _
152
152
153 _pack = struct.pack
153 _pack = struct.pack
154 _unpack = struct.unpack
154 _unpack = struct.unpack
155
155
156 _magicstring = 'HG2X'
156 _magicstring = 'HG2X'
157
157
158 _fstreamparamsize = '>H'
158 _fstreamparamsize = '>H'
159 _fpartheadersize = '>H'
159 _fpartheadersize = '>H'
160 _fparttypesize = '>B'
160 _fparttypesize = '>B'
161 _fpartid = '>I'
161 _fpartid = '>I'
162 _fpayloadsize = '>I'
162 _fpayloadsize = '>I'
163 _fpartparamcount = '>BB'
163 _fpartparamcount = '>BB'
164
164
165 preferedchunksize = 4096
165 preferedchunksize = 4096
166
166
167 def _makefpartparamsizes(nbparams):
167 def _makefpartparamsizes(nbparams):
168 """return a struct format to read part parameter sizes
168 """return a struct format to read part parameter sizes
169
169
170 The number parameters is variable so we need to build that format
170 The number parameters is variable so we need to build that format
171 dynamically.
171 dynamically.
172 """
172 """
173 return '>'+('BB'*nbparams)
173 return '>'+('BB'*nbparams)
174
174
175 class UnknownPartError(KeyError):
175 class UnknownPartError(KeyError):
176 """error raised when no handler is found for a Mandatory part"""
176 """error raised when no handler is found for a Mandatory part"""
177 pass
177 pass
178
178
179 class ReadOnlyPartError(RuntimeError):
179 class ReadOnlyPartError(RuntimeError):
180 """error raised when code tries to alter a part being generated"""
180 """error raised when code tries to alter a part being generated"""
181 pass
181 pass
182
182
183 parthandlermapping = {}
183 parthandlermapping = {}
184
184
185 def parthandler(parttype):
185 def parthandler(parttype):
186 """decorator that register a function as a bundle2 part handler
186 """decorator that register a function as a bundle2 part handler
187
187
188 eg::
188 eg::
189
189
190 @parthandler('myparttype')
190 @parthandler('myparttype')
191 def myparttypehandler(...):
191 def myparttypehandler(...):
192 '''process a part of type "my part".'''
192 '''process a part of type "my part".'''
193 ...
193 ...
194 """
194 """
195 def _decorator(func):
195 def _decorator(func):
196 lparttype = parttype.lower() # enforce lower case matching.
196 lparttype = parttype.lower() # enforce lower case matching.
197 assert lparttype not in parthandlermapping
197 assert lparttype not in parthandlermapping
198 parthandlermapping[lparttype] = func
198 parthandlermapping[lparttype] = func
199 return func
199 return func
200 return _decorator
200 return _decorator
201
201
202 class unbundlerecords(object):
202 class unbundlerecords(object):
203 """keep record of what happens during and unbundle
203 """keep record of what happens during and unbundle
204
204
205 New records are added using `records.add('cat', obj)`. Where 'cat' is a
205 New records are added using `records.add('cat', obj)`. Where 'cat' is a
206 category of record and obj is an arbitrary object.
206 category of record and obj is an arbitrary object.
207
207
208 `records['cat']` will return all entries of this category 'cat'.
208 `records['cat']` will return all entries of this category 'cat'.
209
209
210 Iterating on the object itself will yield `('category', obj)` tuples
210 Iterating on the object itself will yield `('category', obj)` tuples
211 for all entries.
211 for all entries.
212
212
213 All iterations happens in chronological order.
213 All iterations happens in chronological order.
214 """
214 """
215
215
216 def __init__(self):
216 def __init__(self):
217 self._categories = {}
217 self._categories = {}
218 self._sequences = []
218 self._sequences = []
219 self._replies = {}
219 self._replies = {}
220
220
221 def add(self, category, entry, inreplyto=None):
221 def add(self, category, entry, inreplyto=None):
222 """add a new record of a given category.
222 """add a new record of a given category.
223
223
224 The entry can then be retrieved in the list returned by
224 The entry can then be retrieved in the list returned by
225 self['category']."""
225 self['category']."""
226 self._categories.setdefault(category, []).append(entry)
226 self._categories.setdefault(category, []).append(entry)
227 self._sequences.append((category, entry))
227 self._sequences.append((category, entry))
228 if inreplyto is not None:
228 if inreplyto is not None:
229 self.getreplies(inreplyto).add(category, entry)
229 self.getreplies(inreplyto).add(category, entry)
230
230
231 def getreplies(self, partid):
231 def getreplies(self, partid):
232 """get the subrecords that replies to a specific part"""
232 """get the subrecords that replies to a specific part"""
233 return self._replies.setdefault(partid, unbundlerecords())
233 return self._replies.setdefault(partid, unbundlerecords())
234
234
235 def __getitem__(self, cat):
235 def __getitem__(self, cat):
236 return tuple(self._categories.get(cat, ()))
236 return tuple(self._categories.get(cat, ()))
237
237
238 def __iter__(self):
238 def __iter__(self):
239 return iter(self._sequences)
239 return iter(self._sequences)
240
240
241 def __len__(self):
241 def __len__(self):
242 return len(self._sequences)
242 return len(self._sequences)
243
243
244 def __nonzero__(self):
244 def __nonzero__(self):
245 return bool(self._sequences)
245 return bool(self._sequences)
246
246
247 class bundleoperation(object):
247 class bundleoperation(object):
248 """an object that represents a single bundling process
248 """an object that represents a single bundling process
249
249
250 Its purpose is to carry unbundle-related objects and states.
250 Its purpose is to carry unbundle-related objects and states.
251
251
252 A new object should be created at the beginning of each bundle processing.
252 A new object should be created at the beginning of each bundle processing.
253 The object is to be returned by the processing function.
253 The object is to be returned by the processing function.
254
254
255 The object has very little content now it will ultimately contain:
255 The object has very little content now it will ultimately contain:
256 * an access to the repo the bundle is applied to,
256 * an access to the repo the bundle is applied to,
257 * a ui object,
257 * a ui object,
258 * a way to retrieve a transaction to add changes to the repo,
258 * a way to retrieve a transaction to add changes to the repo,
259 * a way to record the result of processing each part,
259 * a way to record the result of processing each part,
260 * a way to construct a bundle response when applicable.
260 * a way to construct a bundle response when applicable.
261 """
261 """
262
262
263 def __init__(self, repo, transactiongetter):
263 def __init__(self, repo, transactiongetter):
264 self.repo = repo
264 self.repo = repo
265 self.ui = repo.ui
265 self.ui = repo.ui
266 self.records = unbundlerecords()
266 self.records = unbundlerecords()
267 self.gettransaction = transactiongetter
267 self.gettransaction = transactiongetter
268 self.reply = None
268 self.reply = None
269
269
270 class TransactionUnavailable(RuntimeError):
270 class TransactionUnavailable(RuntimeError):
271 pass
271 pass
272
272
273 def _notransaction():
273 def _notransaction():
274 """default method to get a transaction while processing a bundle
274 """default method to get a transaction while processing a bundle
275
275
276 Raise an exception to highlight the fact that no transaction was expected
276 Raise an exception to highlight the fact that no transaction was expected
277 to be created"""
277 to be created"""
278 raise TransactionUnavailable()
278 raise TransactionUnavailable()
279
279
280 def processbundle(repo, unbundler, transactiongetter=_notransaction):
280 def processbundle(repo, unbundler, transactiongetter=_notransaction):
281 """This function process a bundle, apply effect to/from a repo
281 """This function process a bundle, apply effect to/from a repo
282
282
283 It iterates over each part then searches for and uses the proper handling
283 It iterates over each part then searches for and uses the proper handling
284 code to process the part. Parts are processed in order.
284 code to process the part. Parts are processed in order.
285
285
286 This is very early version of this function that will be strongly reworked
286 This is very early version of this function that will be strongly reworked
287 before final usage.
287 before final usage.
288
288
289 Unknown Mandatory part will abort the process.
289 Unknown Mandatory part will abort the process.
290 """
290 """
291 op = bundleoperation(repo, transactiongetter)
291 op = bundleoperation(repo, transactiongetter)
292 # todo:
292 # todo:
293 # - replace this is a init function soon.
293 # - replace this is a init function soon.
294 # - exception catching
294 # - exception catching
295 unbundler.params
295 unbundler.params
296 iterparts = unbundler.iterparts()
296 iterparts = unbundler.iterparts()
297 part = None
297 part = None
298 try:
298 try:
299 for part in iterparts:
299 for part in iterparts:
300 parttype = part.type
300 parttype = part.type
301 # part key are matched lower case
301 # part key are matched lower case
302 key = parttype.lower()
302 key = parttype.lower()
303 try:
303 try:
304 handler = parthandlermapping[key]
304 handler = parthandlermapping[key]
305 op.ui.debug('found a handler for part %r\n' % parttype)
305 op.ui.debug('found a handler for part %r\n' % parttype)
306 except KeyError:
306 except KeyError:
307 if key != parttype: # mandatory parts
307 if key != parttype: # mandatory parts
308 # todo:
308 # todo:
309 # - use a more precise exception
309 # - use a more precise exception
310 raise UnknownPartError(key)
310 raise UnknownPartError(key)
311 op.ui.debug('ignoring unknown advisory part %r\n' % key)
311 op.ui.debug('ignoring unknown advisory part %r\n' % key)
312 # consuming the part
312 # consuming the part
313 part.read()
313 part.read()
314 continue
314 continue
315
315
316 # handler is called outside the above try block so that we don't
316 # handler is called outside the above try block so that we don't
317 # risk catching KeyErrors from anything other than the
317 # risk catching KeyErrors from anything other than the
318 # parthandlermapping lookup (any KeyError raised by handler()
318 # parthandlermapping lookup (any KeyError raised by handler()
319 # itself represents a defect of a different variety).
319 # itself represents a defect of a different variety).
320 output = None
320 output = None
321 if op.reply is not None:
321 if op.reply is not None:
322 op.ui.pushbuffer(error=True)
322 op.ui.pushbuffer(error=True)
323 output = ''
323 output = ''
324 try:
324 try:
325 handler(op, part)
325 handler(op, part)
326 finally:
326 finally:
327 if output is not None:
327 if output is not None:
328 output = op.ui.popbuffer()
328 output = op.ui.popbuffer()
329 if output:
329 if output:
330 outpart = op.reply.newpart('b2x:output', data=output)
330 outpart = op.reply.newpart('b2x:output', data=output)
331 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
331 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
332 part.read()
332 part.read()
333 except Exception, exc:
333 except Exception, exc:
334 if part is not None:
334 if part is not None:
335 # consume the bundle content
335 # consume the bundle content
336 part.read()
336 part.read()
337 for part in iterparts:
337 for part in iterparts:
338 # consume the bundle content
338 # consume the bundle content
339 part.read()
339 part.read()
340 # Small hack to let caller code distinguish exceptions from bundle2
340 # Small hack to let caller code distinguish exceptions from bundle2
341 # processing fron the ones from bundle1 processing. This is mostly
341 # processing fron the ones from bundle1 processing. This is mostly
342 # needed to handle different return codes to unbundle according to the
342 # needed to handle different return codes to unbundle according to the
343 # type of bundle. We should probably clean up or drop this return code
343 # type of bundle. We should probably clean up or drop this return code
344 # craziness in a future version.
344 # craziness in a future version.
345 exc.duringunbundle2 = True
345 exc.duringunbundle2 = True
346 raise
346 raise
347 return op
347 return op
348
348
349 def decodecaps(blob):
349 def decodecaps(blob):
350 """decode a bundle2 caps bytes blob into a dictionnary
350 """decode a bundle2 caps bytes blob into a dictionnary
351
351
352 The blob is a list of capabilities (one per line)
352 The blob is a list of capabilities (one per line)
353 Capabilities may have values using a line of the form::
353 Capabilities may have values using a line of the form::
354
354
355 capability=value1,value2,value3
355 capability=value1,value2,value3
356
356
357 The values are always a list."""
357 The values are always a list."""
358 caps = {}
358 caps = {}
359 for line in blob.splitlines():
359 for line in blob.splitlines():
360 if not line:
360 if not line:
361 continue
361 continue
362 if '=' not in line:
362 if '=' not in line:
363 key, vals = line, ()
363 key, vals = line, ()
364 else:
364 else:
365 key, vals = line.split('=', 1)
365 key, vals = line.split('=', 1)
366 vals = vals.split(',')
366 vals = vals.split(',')
367 key = urllib.unquote(key)
367 key = urllib.unquote(key)
368 vals = [urllib.unquote(v) for v in vals]
368 vals = [urllib.unquote(v) for v in vals]
369 caps[key] = vals
369 caps[key] = vals
370 return caps
370 return caps
371
371
372 def encodecaps(caps):
372 def encodecaps(caps):
373 """encode a bundle2 caps dictionary into a bytes blob"""
373 """encode a bundle2 caps dictionary into a bytes blob"""
374 chunks = []
374 chunks = []
375 for ca in sorted(caps):
375 for ca in sorted(caps):
376 vals = caps[ca]
376 vals = caps[ca]
377 ca = urllib.quote(ca)
377 ca = urllib.quote(ca)
378 vals = [urllib.quote(v) for v in vals]
378 vals = [urllib.quote(v) for v in vals]
379 if vals:
379 if vals:
380 ca = "%s=%s" % (ca, ','.join(vals))
380 ca = "%s=%s" % (ca, ','.join(vals))
381 chunks.append(ca)
381 chunks.append(ca)
382 return '\n'.join(chunks)
382 return '\n'.join(chunks)
383
383
384 class bundle20(object):
384 class bundle20(object):
385 """represent an outgoing bundle2 container
385 """represent an outgoing bundle2 container
386
386
387 Use the `addparam` method to add stream level parameter. and `newpart` to
387 Use the `addparam` method to add stream level parameter. and `newpart` to
388 populate it. Then call `getchunks` to retrieve all the binary chunks of
388 populate it. Then call `getchunks` to retrieve all the binary chunks of
389 data that compose the bundle2 container."""
389 data that compose the bundle2 container."""
390
390
391 def __init__(self, ui, capabilities=()):
391 def __init__(self, ui, capabilities=()):
392 self.ui = ui
392 self.ui = ui
393 self._params = []
393 self._params = []
394 self._parts = []
394 self._parts = []
395 self.capabilities = dict(capabilities)
395 self.capabilities = dict(capabilities)
396
396
397 # methods used to defines the bundle2 content
397 # methods used to defines the bundle2 content
398 def addparam(self, name, value=None):
398 def addparam(self, name, value=None):
399 """add a stream level parameter"""
399 """add a stream level parameter"""
400 if not name:
400 if not name:
401 raise ValueError('empty parameter name')
401 raise ValueError('empty parameter name')
402 if name[0] not in string.letters:
402 if name[0] not in string.letters:
403 raise ValueError('non letter first character: %r' % name)
403 raise ValueError('non letter first character: %r' % name)
404 self._params.append((name, value))
404 self._params.append((name, value))
405
405
406 def addpart(self, part):
406 def addpart(self, part):
407 """add a new part to the bundle2 container
407 """add a new part to the bundle2 container
408
408
409 Parts contains the actual applicative payload."""
409 Parts contains the actual applicative payload."""
410 assert part.id is None
410 assert part.id is None
411 part.id = len(self._parts) # very cheap counter
411 part.id = len(self._parts) # very cheap counter
412 self._parts.append(part)
412 self._parts.append(part)
413
413
414 def newpart(self, typeid, *args, **kwargs):
414 def newpart(self, typeid, *args, **kwargs):
415 """create a new part and add it to the containers
415 """create a new part and add it to the containers
416
416
417 As the part is directly added to the containers. For now, this means
417 As the part is directly added to the containers. For now, this means
418 that any failure to properly initialize the part after calling
418 that any failure to properly initialize the part after calling
419 ``newpart`` should result in a failure of the whole bundling process.
419 ``newpart`` should result in a failure of the whole bundling process.
420
420
421 You can still fall back to manually create and add if you need better
421 You can still fall back to manually create and add if you need better
422 control."""
422 control."""
423 part = bundlepart(typeid, *args, **kwargs)
423 part = bundlepart(typeid, *args, **kwargs)
424 self.addpart(part)
424 self.addpart(part)
425 return part
425 return part
426
426
427 # methods used to generate the bundle2 stream
427 # methods used to generate the bundle2 stream
428 def getchunks(self):
428 def getchunks(self):
429 self.ui.debug('start emission of %s stream\n' % _magicstring)
429 self.ui.debug('start emission of %s stream\n' % _magicstring)
430 yield _magicstring
430 yield _magicstring
431 param = self._paramchunk()
431 param = self._paramchunk()
432 self.ui.debug('bundle parameter: %s\n' % param)
432 self.ui.debug('bundle parameter: %s\n' % param)
433 yield _pack(_fstreamparamsize, len(param))
433 yield _pack(_fstreamparamsize, len(param))
434 if param:
434 if param:
435 yield param
435 yield param
436
436
437 self.ui.debug('start of parts\n')
437 self.ui.debug('start of parts\n')
438 for part in self._parts:
438 for part in self._parts:
439 self.ui.debug('bundle part: "%s"\n' % part.type)
439 self.ui.debug('bundle part: "%s"\n' % part.type)
440 for chunk in part.getchunks():
440 for chunk in part.getchunks():
441 yield chunk
441 yield chunk
442 self.ui.debug('end of bundle\n')
442 self.ui.debug('end of bundle\n')
443 yield '\0\0'
443 yield '\0\0'
444
444
445 def _paramchunk(self):
445 def _paramchunk(self):
446 """return a encoded version of all stream parameters"""
446 """return a encoded version of all stream parameters"""
447 blocks = []
447 blocks = []
448 for par, value in self._params:
448 for par, value in self._params:
449 par = urllib.quote(par)
449 par = urllib.quote(par)
450 if value is not None:
450 if value is not None:
451 value = urllib.quote(value)
451 value = urllib.quote(value)
452 par = '%s=%s' % (par, value)
452 par = '%s=%s' % (par, value)
453 blocks.append(par)
453 blocks.append(par)
454 return ' '.join(blocks)
454 return ' '.join(blocks)
455
455
456 class unpackermixin(object):
456 class unpackermixin(object):
457 """A mixin to extract bytes and struct data from a stream"""
457 """A mixin to extract bytes and struct data from a stream"""
458
458
459 def __init__(self, fp):
459 def __init__(self, fp):
460 self._fp = fp
460 self._fp = fp
461
461
462 def _unpack(self, format):
462 def _unpack(self, format):
463 """unpack this struct format from the stream"""
463 """unpack this struct format from the stream"""
464 data = self._readexact(struct.calcsize(format))
464 data = self._readexact(struct.calcsize(format))
465 return _unpack(format, data)
465 return _unpack(format, data)
466
466
467 def _readexact(self, size):
467 def _readexact(self, size):
468 """read exactly <size> bytes from the stream"""
468 """read exactly <size> bytes from the stream"""
469 return changegroup.readexactly(self._fp, size)
469 return changegroup.readexactly(self._fp, size)
470
470
471
471
472 class unbundle20(unpackermixin):
472 class unbundle20(unpackermixin):
473 """interpret a bundle2 stream
473 """interpret a bundle2 stream
474
474
475 This class is fed with a binary stream and yields parts through its
475 This class is fed with a binary stream and yields parts through its
476 `iterparts` methods."""
476 `iterparts` methods."""
477
477
478 def __init__(self, ui, fp, header=None):
478 def __init__(self, ui, fp, header=None):
479 """If header is specified, we do not read it out of the stream."""
479 """If header is specified, we do not read it out of the stream."""
480 self.ui = ui
480 self.ui = ui
481 super(unbundle20, self).__init__(fp)
481 super(unbundle20, self).__init__(fp)
482 if header is None:
482 if header is None:
483 header = self._readexact(4)
483 header = self._readexact(4)
484 magic, version = header[0:2], header[2:4]
484 magic, version = header[0:2], header[2:4]
485 if magic != 'HG':
485 if magic != 'HG':
486 raise util.Abort(_('not a Mercurial bundle'))
486 raise util.Abort(_('not a Mercurial bundle'))
487 if version != '2X':
487 if version != '2X':
488 raise util.Abort(_('unknown bundle version %s') % version)
488 raise util.Abort(_('unknown bundle version %s') % version)
489 self.ui.debug('start processing of %s stream\n' % header)
489 self.ui.debug('start processing of %s stream\n' % header)
490
490
491 @util.propertycache
491 @util.propertycache
492 def params(self):
492 def params(self):
493 """dictionary of stream level parameters"""
493 """dictionary of stream level parameters"""
494 self.ui.debug('reading bundle2 stream parameters\n')
494 self.ui.debug('reading bundle2 stream parameters\n')
495 params = {}
495 params = {}
496 paramssize = self._unpack(_fstreamparamsize)[0]
496 paramssize = self._unpack(_fstreamparamsize)[0]
497 if paramssize:
497 if paramssize:
498 for p in self._readexact(paramssize).split(' '):
498 for p in self._readexact(paramssize).split(' '):
499 p = p.split('=', 1)
499 p = p.split('=', 1)
500 p = [urllib.unquote(i) for i in p]
500 p = [urllib.unquote(i) for i in p]
501 if len(p) < 2:
501 if len(p) < 2:
502 p.append(None)
502 p.append(None)
503 self._processparam(*p)
503 self._processparam(*p)
504 params[p[0]] = p[1]
504 params[p[0]] = p[1]
505 return params
505 return params
506
506
507 def _processparam(self, name, value):
507 def _processparam(self, name, value):
508 """process a parameter, applying its effect if needed
508 """process a parameter, applying its effect if needed
509
509
510 Parameter starting with a lower case letter are advisory and will be
510 Parameter starting with a lower case letter are advisory and will be
511 ignored when unknown. Those starting with an upper case letter are
511 ignored when unknown. Those starting with an upper case letter are
512 mandatory and will this function will raise a KeyError when unknown.
512 mandatory and will this function will raise a KeyError when unknown.
513
513
514 Note: no option are currently supported. Any input will be either
514 Note: no option are currently supported. Any input will be either
515 ignored or failing.
515 ignored or failing.
516 """
516 """
517 if not name:
517 if not name:
518 raise ValueError('empty parameter name')
518 raise ValueError('empty parameter name')
519 if name[0] not in string.letters:
519 if name[0] not in string.letters:
520 raise ValueError('non letter first character: %r' % name)
520 raise ValueError('non letter first character: %r' % name)
521 # Some logic will be later added here to try to process the option for
521 # Some logic will be later added here to try to process the option for
522 # a dict of known parameter.
522 # a dict of known parameter.
523 if name[0].islower():
523 if name[0].islower():
524 self.ui.debug("ignoring unknown parameter %r\n" % name)
524 self.ui.debug("ignoring unknown parameter %r\n" % name)
525 else:
525 else:
526 raise KeyError(name)
526 raise KeyError(name)
527
527
528
528
529 def iterparts(self):
529 def iterparts(self):
530 """yield all parts contained in the stream"""
530 """yield all parts contained in the stream"""
531 # make sure param have been loaded
531 # make sure param have been loaded
532 self.params
532 self.params
533 self.ui.debug('start extraction of bundle2 parts\n')
533 self.ui.debug('start extraction of bundle2 parts\n')
534 headerblock = self._readpartheader()
534 headerblock = self._readpartheader()
535 while headerblock is not None:
535 while headerblock is not None:
536 part = unbundlepart(self.ui, headerblock, self._fp)
536 part = unbundlepart(self.ui, headerblock, self._fp)
537 yield part
537 yield part
538 headerblock = self._readpartheader()
538 headerblock = self._readpartheader()
539 self.ui.debug('end of bundle2 stream\n')
539 self.ui.debug('end of bundle2 stream\n')
540
540
541 def _readpartheader(self):
541 def _readpartheader(self):
542 """reads a part header size and return the bytes blob
542 """reads a part header size and return the bytes blob
543
543
544 returns None if empty"""
544 returns None if empty"""
545 headersize = self._unpack(_fpartheadersize)[0]
545 headersize = self._unpack(_fpartheadersize)[0]
546 self.ui.debug('part header size: %i\n' % headersize)
546 self.ui.debug('part header size: %i\n' % headersize)
547 if headersize:
547 if headersize:
548 return self._readexact(headersize)
548 return self._readexact(headersize)
549 return None
549 return None
550
550
551
551
552 class bundlepart(object):
552 class bundlepart(object):
553 """A bundle2 part contains application level payload
553 """A bundle2 part contains application level payload
554
554
555 The part `type` is used to route the part to the application level
555 The part `type` is used to route the part to the application level
556 handler.
556 handler.
557
557
558 The part payload is contained in ``part.data``. It could be raw bytes or a
558 The part payload is contained in ``part.data``. It could be raw bytes or a
559 generator of byte chunks.
559 generator of byte chunks.
560
560
561 You can add parameters to the part using the ``addparam`` method.
561 You can add parameters to the part using the ``addparam`` method.
562 Parameters can be either mandatory (default) or advisory. Remote side
562 Parameters can be either mandatory (default) or advisory. Remote side
563 should be able to safely ignore the advisory ones.
563 should be able to safely ignore the advisory ones.
564
564
565 Both data and parameters cannot be modified after the generation has begun.
565 Both data and parameters cannot be modified after the generation has begun.
566 """
566 """
567
567
568 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
568 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
569 data=''):
569 data=''):
570 self.id = None
570 self.id = None
571 self.type = parttype
571 self.type = parttype
572 self._data = data
572 self._data = data
573 self._mandatoryparams = list(mandatoryparams)
573 self._mandatoryparams = list(mandatoryparams)
574 self._advisoryparams = list(advisoryparams)
574 self._advisoryparams = list(advisoryparams)
575 # checking for duplicated entries
575 # checking for duplicated entries
576 self._seenparams = set()
576 self._seenparams = set()
577 for pname, __ in self._mandatoryparams + self._advisoryparams:
577 for pname, __ in self._mandatoryparams + self._advisoryparams:
578 if pname in self._seenparams:
578 if pname in self._seenparams:
579 raise RuntimeError('duplicated params: %s' % pname)
579 raise RuntimeError('duplicated params: %s' % pname)
580 self._seenparams.add(pname)
580 self._seenparams.add(pname)
581 # status of the part's generation:
581 # status of the part's generation:
582 # - None: not started,
582 # - None: not started,
583 # - False: currently generated,
583 # - False: currently generated,
584 # - True: generation done.
584 # - True: generation done.
585 self._generated = None
585 self._generated = None
586
586
587 # methods used to defines the part content
587 # methods used to defines the part content
588 def __setdata(self, data):
588 def __setdata(self, data):
589 if self._generated is not None:
589 if self._generated is not None:
590 raise ReadOnlyPartError('part is being generated')
590 raise ReadOnlyPartError('part is being generated')
591 self._data = data
591 self._data = data
592 def __getdata(self):
592 def __getdata(self):
593 return self._data
593 return self._data
594 data = property(__getdata, __setdata)
594 data = property(__getdata, __setdata)
595
595
596 @property
596 @property
597 def mandatoryparams(self):
597 def mandatoryparams(self):
598 # make it an immutable tuple to force people through ``addparam``
598 # make it an immutable tuple to force people through ``addparam``
599 return tuple(self._mandatoryparams)
599 return tuple(self._mandatoryparams)
600
600
601 @property
601 @property
602 def advisoryparams(self):
602 def advisoryparams(self):
603 # make it an immutable tuple to force people through ``addparam``
603 # make it an immutable tuple to force people through ``addparam``
604 return tuple(self._advisoryparams)
604 return tuple(self._advisoryparams)
605
605
606 def addparam(self, name, value='', mandatory=True):
606 def addparam(self, name, value='', mandatory=True):
607 if self._generated is not None:
607 if self._generated is not None:
608 raise ReadOnlyPartError('part is being generated')
608 raise ReadOnlyPartError('part is being generated')
609 if name in self._seenparams:
609 if name in self._seenparams:
610 raise ValueError('duplicated params: %s' % name)
610 raise ValueError('duplicated params: %s' % name)
611 self._seenparams.add(name)
611 self._seenparams.add(name)
612 params = self._advisoryparams
612 params = self._advisoryparams
613 if mandatory:
613 if mandatory:
614 params = self._mandatoryparams
614 params = self._mandatoryparams
615 params.append((name, value))
615 params.append((name, value))
616
616
617 # methods used to generates the bundle2 stream
617 # methods used to generates the bundle2 stream
618 def getchunks(self):
618 def getchunks(self):
619 if self._generated is not None:
619 if self._generated is not None:
620 raise RuntimeError('part can only be consumed once')
620 raise RuntimeError('part can only be consumed once')
621 self._generated = False
621 self._generated = False
622 #### header
622 #### header
623 ## parttype
623 ## parttype
624 header = [_pack(_fparttypesize, len(self.type)),
624 header = [_pack(_fparttypesize, len(self.type)),
625 self.type, _pack(_fpartid, self.id),
625 self.type, _pack(_fpartid, self.id),
626 ]
626 ]
627 ## parameters
627 ## parameters
628 # count
628 # count
629 manpar = self.mandatoryparams
629 manpar = self.mandatoryparams
630 advpar = self.advisoryparams
630 advpar = self.advisoryparams
631 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
631 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
632 # size
632 # size
633 parsizes = []
633 parsizes = []
634 for key, value in manpar:
634 for key, value in manpar:
635 parsizes.append(len(key))
635 parsizes.append(len(key))
636 parsizes.append(len(value))
636 parsizes.append(len(value))
637 for key, value in advpar:
637 for key, value in advpar:
638 parsizes.append(len(key))
638 parsizes.append(len(key))
639 parsizes.append(len(value))
639 parsizes.append(len(value))
640 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
640 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
641 header.append(paramsizes)
641 header.append(paramsizes)
642 # key, value
642 # key, value
643 for key, value in manpar:
643 for key, value in manpar:
644 header.append(key)
644 header.append(key)
645 header.append(value)
645 header.append(value)
646 for key, value in advpar:
646 for key, value in advpar:
647 header.append(key)
647 header.append(key)
648 header.append(value)
648 header.append(value)
649 ## finalize header
649 ## finalize header
650 headerchunk = ''.join(header)
650 headerchunk = ''.join(header)
651 yield _pack(_fpartheadersize, len(headerchunk))
651 yield _pack(_fpartheadersize, len(headerchunk))
652 yield headerchunk
652 yield headerchunk
653 ## payload
653 ## payload
654 for chunk in self._payloadchunks():
654 for chunk in self._payloadchunks():
655 yield _pack(_fpayloadsize, len(chunk))
655 yield _pack(_fpayloadsize, len(chunk))
656 yield chunk
656 yield chunk
657 # end of payload
657 # end of payload
658 yield _pack(_fpayloadsize, 0)
658 yield _pack(_fpayloadsize, 0)
659 self._generated = True
659 self._generated = True
660
660
661 def _payloadchunks(self):
661 def _payloadchunks(self):
662 """yield chunks of a the part payload
662 """yield chunks of a the part payload
663
663
664 Exists to handle the different methods to provide data to a part."""
664 Exists to handle the different methods to provide data to a part."""
665 # we only support fixed size data now.
665 # we only support fixed size data now.
666 # This will be improved in the future.
666 # This will be improved in the future.
667 if util.safehasattr(self.data, 'next'):
667 if util.safehasattr(self.data, 'next'):
668 buff = util.chunkbuffer(self.data)
668 buff = util.chunkbuffer(self.data)
669 chunk = buff.read(preferedchunksize)
669 chunk = buff.read(preferedchunksize)
670 while chunk:
670 while chunk:
671 yield chunk
671 yield chunk
672 chunk = buff.read(preferedchunksize)
672 chunk = buff.read(preferedchunksize)
673 elif len(self.data):
673 elif len(self.data):
674 yield self.data
674 yield self.data
675
675
676 class unbundlepart(unpackermixin):
676 class unbundlepart(unpackermixin):
677 """a bundle part read from a bundle"""
677 """a bundle part read from a bundle"""
678
678
679 def __init__(self, ui, header, fp):
679 def __init__(self, ui, header, fp):
680 super(unbundlepart, self).__init__(fp)
680 super(unbundlepart, self).__init__(fp)
681 self.ui = ui
681 self.ui = ui
682 # unbundle state attr
682 # unbundle state attr
683 self._headerdata = header
683 self._headerdata = header
684 self._headeroffset = 0
684 self._headeroffset = 0
685 self._initialized = False
685 self._initialized = False
686 self.consumed = False
686 self.consumed = False
687 # part data
687 # part data
688 self.id = None
688 self.id = None
689 self.type = None
689 self.type = None
690 self.mandatoryparams = None
690 self.mandatoryparams = None
691 self.advisoryparams = None
691 self.advisoryparams = None
692 self._payloadstream = None
692 self._payloadstream = None
693 self._readheader()
693 self._readheader()
694
694
695 def _fromheader(self, size):
695 def _fromheader(self, size):
696 """return the next <size> byte from the header"""
696 """return the next <size> byte from the header"""
697 offset = self._headeroffset
697 offset = self._headeroffset
698 data = self._headerdata[offset:(offset + size)]
698 data = self._headerdata[offset:(offset + size)]
699 self._headeroffset = offset + size
699 self._headeroffset = offset + size
700 return data
700 return data
701
701
702 def _unpackheader(self, format):
702 def _unpackheader(self, format):
703 """read given format from header
703 """read given format from header
704
704
705 This automatically compute the size of the format to read."""
705 This automatically compute the size of the format to read."""
706 data = self._fromheader(struct.calcsize(format))
706 data = self._fromheader(struct.calcsize(format))
707 return _unpack(format, data)
707 return _unpack(format, data)
708
708
709 def _initparams(self, mandatoryparams, advisoryparams):
710 """internal function to setup all logic related parameters"""
711 self.mandatoryparams = mandatoryparams
712 self.advisoryparams = advisoryparams
713
709 def _readheader(self):
714 def _readheader(self):
710 """read the header and setup the object"""
715 """read the header and setup the object"""
711 typesize = self._unpackheader(_fparttypesize)[0]
716 typesize = self._unpackheader(_fparttypesize)[0]
712 self.type = self._fromheader(typesize)
717 self.type = self._fromheader(typesize)
713 self.ui.debug('part type: "%s"\n' % self.type)
718 self.ui.debug('part type: "%s"\n' % self.type)
714 self.id = self._unpackheader(_fpartid)[0]
719 self.id = self._unpackheader(_fpartid)[0]
715 self.ui.debug('part id: "%s"\n' % self.id)
720 self.ui.debug('part id: "%s"\n' % self.id)
716 ## reading parameters
721 ## reading parameters
717 # param count
722 # param count
718 mancount, advcount = self._unpackheader(_fpartparamcount)
723 mancount, advcount = self._unpackheader(_fpartparamcount)
719 self.ui.debug('part parameters: %i\n' % (mancount + advcount))
724 self.ui.debug('part parameters: %i\n' % (mancount + advcount))
720 # param size
725 # param size
721 fparamsizes = _makefpartparamsizes(mancount + advcount)
726 fparamsizes = _makefpartparamsizes(mancount + advcount)
722 paramsizes = self._unpackheader(fparamsizes)
727 paramsizes = self._unpackheader(fparamsizes)
723 # make it a list of couple again
728 # make it a list of couple again
724 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
729 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
725 # split mandatory from advisory
730 # split mandatory from advisory
726 mansizes = paramsizes[:mancount]
731 mansizes = paramsizes[:mancount]
727 advsizes = paramsizes[mancount:]
732 advsizes = paramsizes[mancount:]
728 # retrive param value
733 # retrive param value
729 manparams = []
734 manparams = []
730 for key, value in mansizes:
735 for key, value in mansizes:
731 manparams.append((self._fromheader(key), self._fromheader(value)))
736 manparams.append((self._fromheader(key), self._fromheader(value)))
732 advparams = []
737 advparams = []
733 for key, value in advsizes:
738 for key, value in advsizes:
734 advparams.append((self._fromheader(key), self._fromheader(value)))
739 advparams.append((self._fromheader(key), self._fromheader(value)))
735 self.mandatoryparams = manparams
740 self._initparams(manparams, advparams)
736 self.advisoryparams = advparams
737 ## part payload
741 ## part payload
738 def payloadchunks():
742 def payloadchunks():
739 payloadsize = self._unpack(_fpayloadsize)[0]
743 payloadsize = self._unpack(_fpayloadsize)[0]
740 self.ui.debug('payload chunk size: %i\n' % payloadsize)
744 self.ui.debug('payload chunk size: %i\n' % payloadsize)
741 while payloadsize:
745 while payloadsize:
742 yield self._readexact(payloadsize)
746 yield self._readexact(payloadsize)
743 payloadsize = self._unpack(_fpayloadsize)[0]
747 payloadsize = self._unpack(_fpayloadsize)[0]
744 self.ui.debug('payload chunk size: %i\n' % payloadsize)
748 self.ui.debug('payload chunk size: %i\n' % payloadsize)
745 self._payloadstream = util.chunkbuffer(payloadchunks())
749 self._payloadstream = util.chunkbuffer(payloadchunks())
746 # we read the data, tell it
750 # we read the data, tell it
747 self._initialized = True
751 self._initialized = True
748
752
749 def read(self, size=None):
753 def read(self, size=None):
750 """read payload data"""
754 """read payload data"""
751 if not self._initialized:
755 if not self._initialized:
752 self._readheader()
756 self._readheader()
753 if size is None:
757 if size is None:
754 data = self._payloadstream.read()
758 data = self._payloadstream.read()
755 else:
759 else:
756 data = self._payloadstream.read(size)
760 data = self._payloadstream.read(size)
757 if size is None or len(data) < size:
761 if size is None or len(data) < size:
758 self.consumed = True
762 self.consumed = True
759 return data
763 return data
760
764
761
765
762 @parthandler('b2x:changegroup')
766 @parthandler('b2x:changegroup')
763 def handlechangegroup(op, inpart):
767 def handlechangegroup(op, inpart):
764 """apply a changegroup part on the repo
768 """apply a changegroup part on the repo
765
769
766 This is a very early implementation that will massive rework before being
770 This is a very early implementation that will massive rework before being
767 inflicted to any end-user.
771 inflicted to any end-user.
768 """
772 """
769 # Make sure we trigger a transaction creation
773 # Make sure we trigger a transaction creation
770 #
774 #
771 # The addchangegroup function will get a transaction object by itself, but
775 # The addchangegroup function will get a transaction object by itself, but
772 # we need to make sure we trigger the creation of a transaction object used
776 # we need to make sure we trigger the creation of a transaction object used
773 # for the whole processing scope.
777 # for the whole processing scope.
774 op.gettransaction()
778 op.gettransaction()
775 cg = changegroup.unbundle10(inpart, 'UN')
779 cg = changegroup.unbundle10(inpart, 'UN')
776 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
780 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
777 op.records.add('changegroup', {'return': ret})
781 op.records.add('changegroup', {'return': ret})
778 if op.reply is not None:
782 if op.reply is not None:
779 # This is definitly not the final form of this
783 # This is definitly not the final form of this
780 # return. But one need to start somewhere.
784 # return. But one need to start somewhere.
781 part = op.reply.newpart('b2x:reply:changegroup')
785 part = op.reply.newpart('b2x:reply:changegroup')
782 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
786 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
783 part.addparam('return', '%i' % ret, mandatory=False)
787 part.addparam('return', '%i' % ret, mandatory=False)
784 assert not inpart.read()
788 assert not inpart.read()
785
789
786 @parthandler('b2x:reply:changegroup')
790 @parthandler('b2x:reply:changegroup')
787 def handlechangegroup(op, inpart):
791 def handlechangegroup(op, inpart):
788 p = dict(inpart.advisoryparams)
792 p = dict(inpart.advisoryparams)
789 ret = int(p['return'])
793 ret = int(p['return'])
790 op.records.add('changegroup', {'return': ret}, int(p['in-reply-to']))
794 op.records.add('changegroup', {'return': ret}, int(p['in-reply-to']))
791
795
792 @parthandler('b2x:check:heads')
796 @parthandler('b2x:check:heads')
793 def handlechangegroup(op, inpart):
797 def handlechangegroup(op, inpart):
794 """check that head of the repo did not change
798 """check that head of the repo did not change
795
799
796 This is used to detect a push race when using unbundle.
800 This is used to detect a push race when using unbundle.
797 This replaces the "heads" argument of unbundle."""
801 This replaces the "heads" argument of unbundle."""
798 h = inpart.read(20)
802 h = inpart.read(20)
799 heads = []
803 heads = []
800 while len(h) == 20:
804 while len(h) == 20:
801 heads.append(h)
805 heads.append(h)
802 h = inpart.read(20)
806 h = inpart.read(20)
803 assert not h
807 assert not h
804 if heads != op.repo.heads():
808 if heads != op.repo.heads():
805 raise error.PushRaced('repository changed while pushing - '
809 raise error.PushRaced('repository changed while pushing - '
806 'please try again')
810 'please try again')
807
811
808 @parthandler('b2x:output')
812 @parthandler('b2x:output')
809 def handleoutput(op, inpart):
813 def handleoutput(op, inpart):
810 """forward output captured on the server to the client"""
814 """forward output captured on the server to the client"""
811 for line in inpart.read().splitlines():
815 for line in inpart.read().splitlines():
812 op.ui.write(('remote: %s\n' % line))
816 op.ui.write(('remote: %s\n' % line))
813
817
814 @parthandler('b2x:replycaps')
818 @parthandler('b2x:replycaps')
815 def handlereplycaps(op, inpart):
819 def handlereplycaps(op, inpart):
816 """Notify that a reply bundle should be created
820 """Notify that a reply bundle should be created
817
821
818 The payload contains the capabilities information for the reply"""
822 The payload contains the capabilities information for the reply"""
819 caps = decodecaps(inpart.read())
823 caps = decodecaps(inpart.read())
820 if op.reply is None:
824 if op.reply is None:
821 op.reply = bundle20(op.ui, caps)
825 op.reply = bundle20(op.ui, caps)
822
826
823 @parthandler('b2x:error:abort')
827 @parthandler('b2x:error:abort')
824 def handlereplycaps(op, inpart):
828 def handlereplycaps(op, inpart):
825 """Used to transmit abort error over the wire"""
829 """Used to transmit abort error over the wire"""
826 manargs = dict(inpart.mandatoryparams)
830 manargs = dict(inpart.mandatoryparams)
827 advargs = dict(inpart.advisoryparams)
831 advargs = dict(inpart.advisoryparams)
828 raise util.Abort(manargs['message'], hint=advargs.get('hint'))
832 raise util.Abort(manargs['message'], hint=advargs.get('hint'))
829
833
830 @parthandler('b2x:error:unknownpart')
834 @parthandler('b2x:error:unknownpart')
831 def handlereplycaps(op, inpart):
835 def handlereplycaps(op, inpart):
832 """Used to transmit unknown part error over the wire"""
836 """Used to transmit unknown part error over the wire"""
833 manargs = dict(inpart.mandatoryparams)
837 manargs = dict(inpart.mandatoryparams)
834 raise UnknownPartError(manargs['parttype'])
838 raise UnknownPartError(manargs['parttype'])
835
839
836 @parthandler('b2x:error:pushraced')
840 @parthandler('b2x:error:pushraced')
837 def handlereplycaps(op, inpart):
841 def handlereplycaps(op, inpart):
838 """Used to transmit push race error over the wire"""
842 """Used to transmit push race error over the wire"""
839 manargs = dict(inpart.mandatoryparams)
843 manargs = dict(inpart.mandatoryparams)
840 raise error.ResponseError(_('push failed:'), manargs['message'])
844 raise error.ResponseError(_('push failed:'), manargs['message'])
General Comments 0
You need to be logged in to leave comments. Login now