##// END OF EJS Templates
bundle2: add a interrupt mechanism...
Pierre-Yves David -
r23066:ad144882 stable
parent child Browse files
Show More
@@ -1,1042 +1,1102
1 # bundle2.py - generic container format to transmit arbitrary data.
1 # bundle2.py - generic container format to transmit arbitrary data.
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7 """Handling of the new bundle2 format
7 """Handling of the new bundle2 format
8
8
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 payloads in an application agnostic way. It consist in a sequence of "parts"
10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 that will be handed to and processed by the application layer.
11 that will be handed to and processed by the application layer.
12
12
13
13
14 General format architecture
14 General format architecture
15 ===========================
15 ===========================
16
16
17 The format is architectured as follow
17 The format is architectured as follow
18
18
19 - magic string
19 - magic string
20 - stream level parameters
20 - stream level parameters
21 - payload parts (any number)
21 - payload parts (any number)
22 - end of stream marker.
22 - end of stream marker.
23
23
24 the Binary format
24 the Binary format
25 ============================
25 ============================
26
26
27 All numbers are unsigned and big-endian.
27 All numbers are unsigned and big-endian.
28
28
29 stream level parameters
29 stream level parameters
30 ------------------------
30 ------------------------
31
31
32 Binary format is as follow
32 Binary format is as follow
33
33
34 :params size: int32
34 :params size: int32
35
35
36 The total number of Bytes used by the parameters
36 The total number of Bytes used by the parameters
37
37
38 :params value: arbitrary number of Bytes
38 :params value: arbitrary number of Bytes
39
39
40 A blob of `params size` containing the serialized version of all stream level
40 A blob of `params size` containing the serialized version of all stream level
41 parameters.
41 parameters.
42
42
43 The blob contains a space separated list of parameters. Parameters with value
43 The blob contains a space separated list of parameters. Parameters with value
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45
45
46 Empty name are obviously forbidden.
46 Empty name are obviously forbidden.
47
47
48 Name MUST start with a letter. If this first letter is lower case, the
48 Name MUST start with a letter. If this first letter is lower case, the
49 parameter is advisory and can be safely ignored. However when the first
49 parameter is advisory and can be safely ignored. However when the first
50 letter is capital, the parameter is mandatory and the bundling process MUST
50 letter is capital, the parameter is mandatory and the bundling process MUST
51 stop if he is not able to proceed it.
51 stop if he is not able to proceed it.
52
52
53 Stream parameters use a simple textual format for two main reasons:
53 Stream parameters use a simple textual format for two main reasons:
54
54
55 - Stream level parameters should remain simple and we want to discourage any
55 - Stream level parameters should remain simple and we want to discourage any
56 crazy usage.
56 crazy usage.
57 - Textual data allow easy human inspection of a bundle2 header in case of
57 - Textual data allow easy human inspection of a bundle2 header in case of
58 troubles.
58 troubles.
59
59
60 Any Applicative level options MUST go into a bundle2 part instead.
60 Any Applicative level options MUST go into a bundle2 part instead.
61
61
62 Payload part
62 Payload part
63 ------------------------
63 ------------------------
64
64
65 Binary format is as follow
65 Binary format is as follow
66
66
67 :header size: int32
67 :header size: int32
68
68
69 The total number of Bytes used by the part headers. When the header is empty
69 The total number of Bytes used by the part headers. When the header is empty
70 (size = 0) this is interpreted as the end of stream marker.
70 (size = 0) this is interpreted as the end of stream marker.
71
71
72 :header:
72 :header:
73
73
74 The header defines how to interpret the part. It contains two piece of
74 The header defines how to interpret the part. It contains two piece of
75 data: the part type, and the part parameters.
75 data: the part type, and the part parameters.
76
76
77 The part type is used to route an application level handler, that can
77 The part type is used to route an application level handler, that can
78 interpret payload.
78 interpret payload.
79
79
80 Part parameters are passed to the application level handler. They are
80 Part parameters are passed to the application level handler. They are
81 meant to convey information that will help the application level object to
81 meant to convey information that will help the application level object to
82 interpret the part payload.
82 interpret the part payload.
83
83
84 The binary format of the header is has follow
84 The binary format of the header is has follow
85
85
86 :typesize: (one byte)
86 :typesize: (one byte)
87
87
88 :parttype: alphanumerical part name
88 :parttype: alphanumerical part name
89
89
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 to this part.
91 to this part.
92
92
93 :parameters:
93 :parameters:
94
94
95 Part's parameter may have arbitrary content, the binary structure is::
95 Part's parameter may have arbitrary content, the binary structure is::
96
96
97 <mandatory-count><advisory-count><param-sizes><param-data>
97 <mandatory-count><advisory-count><param-sizes><param-data>
98
98
99 :mandatory-count: 1 byte, number of mandatory parameters
99 :mandatory-count: 1 byte, number of mandatory parameters
100
100
101 :advisory-count: 1 byte, number of advisory parameters
101 :advisory-count: 1 byte, number of advisory parameters
102
102
103 :param-sizes:
103 :param-sizes:
104
104
105 N couple of bytes, where N is the total number of parameters. Each
105 N couple of bytes, where N is the total number of parameters. Each
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107
107
108 :param-data:
108 :param-data:
109
109
110 A blob of bytes from which each parameter key and value can be
110 A blob of bytes from which each parameter key and value can be
111 retrieved using the list of size couples stored in the previous
111 retrieved using the list of size couples stored in the previous
112 field.
112 field.
113
113
114 Mandatory parameters comes first, then the advisory ones.
114 Mandatory parameters comes first, then the advisory ones.
115
115
116 Each parameter's key MUST be unique within the part.
116 Each parameter's key MUST be unique within the part.
117
117
118 :payload:
118 :payload:
119
119
120 payload is a series of `<chunksize><chunkdata>`.
120 payload is a series of `<chunksize><chunkdata>`.
121
121
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124
124
125 The current implementation always produces either zero or one chunk.
125 The current implementation always produces either zero or one chunk.
126 This is an implementation limitation that will ultimately be lifted.
126 This is an implementation limitation that will ultimately be lifted.
127
127
128 `chunksize` can be negative to trigger special case processing. No such
128 `chunksize` can be negative to trigger special case processing. No such
129 processing is in place yet.
129 processing is in place yet.
130
130
131 Bundle processing
131 Bundle processing
132 ============================
132 ============================
133
133
134 Each part is processed in order using a "part handler". Handler are registered
134 Each part is processed in order using a "part handler". Handler are registered
135 for a certain part type.
135 for a certain part type.
136
136
137 The matching of a part to its handler is case insensitive. The case of the
137 The matching of a part to its handler is case insensitive. The case of the
138 part type is used to know if a part is mandatory or advisory. If the Part type
138 part type is used to know if a part is mandatory or advisory. If the Part type
139 contains any uppercase char it is considered mandatory. When no handler is
139 contains any uppercase char it is considered mandatory. When no handler is
140 known for a Mandatory part, the process is aborted and an exception is raised.
140 known for a Mandatory part, the process is aborted and an exception is raised.
141 If the part is advisory and no handler is known, the part is ignored. When the
141 If the part is advisory and no handler is known, the part is ignored. When the
142 process is aborted, the full bundle is still read from the stream to keep the
142 process is aborted, the full bundle is still read from the stream to keep the
143 channel usable. But none of the part read from an abort are processed. In the
143 channel usable. But none of the part read from an abort are processed. In the
144 future, dropping the stream may become an option for channel we do not care to
144 future, dropping the stream may become an option for channel we do not care to
145 preserve.
145 preserve.
146 """
146 """
147
147
148 import util
148 import util
149 import struct
149 import struct
150 import urllib
150 import urllib
151 import string
151 import string
152 import obsolete
152 import obsolete
153 import pushkey
153 import pushkey
154 import url
154 import url
155
155
156 import changegroup, error
156 import changegroup, error
157 from i18n import _
157 from i18n import _
158
158
159 _pack = struct.pack
159 _pack = struct.pack
160 _unpack = struct.unpack
160 _unpack = struct.unpack
161
161
162 _magicstring = 'HG2Y'
162 _magicstring = 'HG2Y'
163
163
164 _fstreamparamsize = '>i'
164 _fstreamparamsize = '>i'
165 _fpartheadersize = '>i'
165 _fpartheadersize = '>i'
166 _fparttypesize = '>B'
166 _fparttypesize = '>B'
167 _fpartid = '>I'
167 _fpartid = '>I'
168 _fpayloadsize = '>i'
168 _fpayloadsize = '>i'
169 _fpartparamcount = '>BB'
169 _fpartparamcount = '>BB'
170
170
171 preferedchunksize = 4096
171 preferedchunksize = 4096
172
172
173 def _makefpartparamsizes(nbparams):
173 def _makefpartparamsizes(nbparams):
174 """return a struct format to read part parameter sizes
174 """return a struct format to read part parameter sizes
175
175
176 The number parameters is variable so we need to build that format
176 The number parameters is variable so we need to build that format
177 dynamically.
177 dynamically.
178 """
178 """
179 return '>'+('BB'*nbparams)
179 return '>'+('BB'*nbparams)
180
180
181 parthandlermapping = {}
181 parthandlermapping = {}
182
182
183 def parthandler(parttype, params=()):
183 def parthandler(parttype, params=()):
184 """decorator that register a function as a bundle2 part handler
184 """decorator that register a function as a bundle2 part handler
185
185
186 eg::
186 eg::
187
187
188 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
188 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
189 def myparttypehandler(...):
189 def myparttypehandler(...):
190 '''process a part of type "my part".'''
190 '''process a part of type "my part".'''
191 ...
191 ...
192 """
192 """
193 def _decorator(func):
193 def _decorator(func):
194 lparttype = parttype.lower() # enforce lower case matching.
194 lparttype = parttype.lower() # enforce lower case matching.
195 assert lparttype not in parthandlermapping
195 assert lparttype not in parthandlermapping
196 parthandlermapping[lparttype] = func
196 parthandlermapping[lparttype] = func
197 func.params = frozenset(params)
197 func.params = frozenset(params)
198 return func
198 return func
199 return _decorator
199 return _decorator
200
200
201 class unbundlerecords(object):
201 class unbundlerecords(object):
202 """keep record of what happens during and unbundle
202 """keep record of what happens during and unbundle
203
203
204 New records are added using `records.add('cat', obj)`. Where 'cat' is a
204 New records are added using `records.add('cat', obj)`. Where 'cat' is a
205 category of record and obj is an arbitrary object.
205 category of record and obj is an arbitrary object.
206
206
207 `records['cat']` will return all entries of this category 'cat'.
207 `records['cat']` will return all entries of this category 'cat'.
208
208
209 Iterating on the object itself will yield `('category', obj)` tuples
209 Iterating on the object itself will yield `('category', obj)` tuples
210 for all entries.
210 for all entries.
211
211
212 All iterations happens in chronological order.
212 All iterations happens in chronological order.
213 """
213 """
214
214
215 def __init__(self):
215 def __init__(self):
216 self._categories = {}
216 self._categories = {}
217 self._sequences = []
217 self._sequences = []
218 self._replies = {}
218 self._replies = {}
219
219
220 def add(self, category, entry, inreplyto=None):
220 def add(self, category, entry, inreplyto=None):
221 """add a new record of a given category.
221 """add a new record of a given category.
222
222
223 The entry can then be retrieved in the list returned by
223 The entry can then be retrieved in the list returned by
224 self['category']."""
224 self['category']."""
225 self._categories.setdefault(category, []).append(entry)
225 self._categories.setdefault(category, []).append(entry)
226 self._sequences.append((category, entry))
226 self._sequences.append((category, entry))
227 if inreplyto is not None:
227 if inreplyto is not None:
228 self.getreplies(inreplyto).add(category, entry)
228 self.getreplies(inreplyto).add(category, entry)
229
229
230 def getreplies(self, partid):
230 def getreplies(self, partid):
231 """get the subrecords that replies to a specific part"""
231 """get the subrecords that replies to a specific part"""
232 return self._replies.setdefault(partid, unbundlerecords())
232 return self._replies.setdefault(partid, unbundlerecords())
233
233
234 def __getitem__(self, cat):
234 def __getitem__(self, cat):
235 return tuple(self._categories.get(cat, ()))
235 return tuple(self._categories.get(cat, ()))
236
236
237 def __iter__(self):
237 def __iter__(self):
238 return iter(self._sequences)
238 return iter(self._sequences)
239
239
240 def __len__(self):
240 def __len__(self):
241 return len(self._sequences)
241 return len(self._sequences)
242
242
243 def __nonzero__(self):
243 def __nonzero__(self):
244 return bool(self._sequences)
244 return bool(self._sequences)
245
245
246 class bundleoperation(object):
246 class bundleoperation(object):
247 """an object that represents a single bundling process
247 """an object that represents a single bundling process
248
248
249 Its purpose is to carry unbundle-related objects and states.
249 Its purpose is to carry unbundle-related objects and states.
250
250
251 A new object should be created at the beginning of each bundle processing.
251 A new object should be created at the beginning of each bundle processing.
252 The object is to be returned by the processing function.
252 The object is to be returned by the processing function.
253
253
254 The object has very little content now it will ultimately contain:
254 The object has very little content now it will ultimately contain:
255 * an access to the repo the bundle is applied to,
255 * an access to the repo the bundle is applied to,
256 * a ui object,
256 * a ui object,
257 * a way to retrieve a transaction to add changes to the repo,
257 * a way to retrieve a transaction to add changes to the repo,
258 * a way to record the result of processing each part,
258 * a way to record the result of processing each part,
259 * a way to construct a bundle response when applicable.
259 * a way to construct a bundle response when applicable.
260 """
260 """
261
261
262 def __init__(self, repo, transactiongetter):
262 def __init__(self, repo, transactiongetter):
263 self.repo = repo
263 self.repo = repo
264 self.ui = repo.ui
264 self.ui = repo.ui
265 self.records = unbundlerecords()
265 self.records = unbundlerecords()
266 self.gettransaction = transactiongetter
266 self.gettransaction = transactiongetter
267 self.reply = None
267 self.reply = None
268
268
269 class TransactionUnavailable(RuntimeError):
269 class TransactionUnavailable(RuntimeError):
270 pass
270 pass
271
271
272 def _notransaction():
272 def _notransaction():
273 """default method to get a transaction while processing a bundle
273 """default method to get a transaction while processing a bundle
274
274
275 Raise an exception to highlight the fact that no transaction was expected
275 Raise an exception to highlight the fact that no transaction was expected
276 to be created"""
276 to be created"""
277 raise TransactionUnavailable()
277 raise TransactionUnavailable()
278
278
279 def processbundle(repo, unbundler, transactiongetter=_notransaction):
279 def processbundle(repo, unbundler, transactiongetter=_notransaction):
280 """This function process a bundle, apply effect to/from a repo
280 """This function process a bundle, apply effect to/from a repo
281
281
282 It iterates over each part then searches for and uses the proper handling
282 It iterates over each part then searches for and uses the proper handling
283 code to process the part. Parts are processed in order.
283 code to process the part. Parts are processed in order.
284
284
285 This is very early version of this function that will be strongly reworked
285 This is very early version of this function that will be strongly reworked
286 before final usage.
286 before final usage.
287
287
288 Unknown Mandatory part will abort the process.
288 Unknown Mandatory part will abort the process.
289 """
289 """
290 op = bundleoperation(repo, transactiongetter)
290 op = bundleoperation(repo, transactiongetter)
291 # todo:
291 # todo:
292 # - replace this is a init function soon.
292 # - replace this is a init function soon.
293 # - exception catching
293 # - exception catching
294 unbundler.params
294 unbundler.params
295 iterparts = unbundler.iterparts()
295 iterparts = unbundler.iterparts()
296 part = None
296 part = None
297 try:
297 try:
298 for part in iterparts:
298 for part in iterparts:
299 _processpart(op, part)
299 _processpart(op, part)
300 except Exception, exc:
300 except Exception, exc:
301 for part in iterparts:
301 for part in iterparts:
302 # consume the bundle content
302 # consume the bundle content
303 part.read()
303 part.read()
304 # Small hack to let caller code distinguish exceptions from bundle2
304 # Small hack to let caller code distinguish exceptions from bundle2
305 # processing fron the ones from bundle1 processing. This is mostly
305 # processing fron the ones from bundle1 processing. This is mostly
306 # needed to handle different return codes to unbundle according to the
306 # needed to handle different return codes to unbundle according to the
307 # type of bundle. We should probably clean up or drop this return code
307 # type of bundle. We should probably clean up or drop this return code
308 # craziness in a future version.
308 # craziness in a future version.
309 exc.duringunbundle2 = True
309 exc.duringunbundle2 = True
310 raise
310 raise
311 return op
311 return op
312
312
313 def _processpart(op, part):
313 def _processpart(op, part):
314 """process a single part from a bundle
314 """process a single part from a bundle
315
315
316 The part is guaranteed to have been fully consumed when the function exits
316 The part is guaranteed to have been fully consumed when the function exits
317 (even if an exception is raised)."""
317 (even if an exception is raised)."""
318 try:
318 try:
319 parttype = part.type
319 parttype = part.type
320 # part key are matched lower case
320 # part key are matched lower case
321 key = parttype.lower()
321 key = parttype.lower()
322 try:
322 try:
323 handler = parthandlermapping.get(key)
323 handler = parthandlermapping.get(key)
324 if handler is None:
324 if handler is None:
325 raise error.UnsupportedPartError(parttype=key)
325 raise error.UnsupportedPartError(parttype=key)
326 op.ui.debug('found a handler for part %r\n' % parttype)
326 op.ui.debug('found a handler for part %r\n' % parttype)
327 unknownparams = part.mandatorykeys - handler.params
327 unknownparams = part.mandatorykeys - handler.params
328 if unknownparams:
328 if unknownparams:
329 unknownparams = list(unknownparams)
329 unknownparams = list(unknownparams)
330 unknownparams.sort()
330 unknownparams.sort()
331 raise error.UnsupportedPartError(parttype=key,
331 raise error.UnsupportedPartError(parttype=key,
332 params=unknownparams)
332 params=unknownparams)
333 except error.UnsupportedPartError, exc:
333 except error.UnsupportedPartError, exc:
334 if key != parttype: # mandatory parts
334 if key != parttype: # mandatory parts
335 raise
335 raise
336 op.ui.debug('ignoring unsupported advisory part %s\n' % exc)
336 op.ui.debug('ignoring unsupported advisory part %s\n' % exc)
337 return # skip to part processing
337 return # skip to part processing
338
338
339 # handler is called outside the above try block so that we don't
339 # handler is called outside the above try block so that we don't
340 # risk catching KeyErrors from anything other than the
340 # risk catching KeyErrors from anything other than the
341 # parthandlermapping lookup (any KeyError raised by handler()
341 # parthandlermapping lookup (any KeyError raised by handler()
342 # itself represents a defect of a different variety).
342 # itself represents a defect of a different variety).
343 output = None
343 output = None
344 if op.reply is not None:
344 if op.reply is not None:
345 op.ui.pushbuffer(error=True)
345 op.ui.pushbuffer(error=True)
346 output = ''
346 output = ''
347 try:
347 try:
348 handler(op, part)
348 handler(op, part)
349 finally:
349 finally:
350 if output is not None:
350 if output is not None:
351 output = op.ui.popbuffer()
351 output = op.ui.popbuffer()
352 if output:
352 if output:
353 outpart = op.reply.newpart('b2x:output', data=output)
353 outpart = op.reply.newpart('b2x:output', data=output)
354 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
354 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
355 finally:
355 finally:
356 # consume the part content to not corrupt the stream.
356 # consume the part content to not corrupt the stream.
357 part.read()
357 part.read()
358
358
359
359
360 def decodecaps(blob):
360 def decodecaps(blob):
361 """decode a bundle2 caps bytes blob into a dictionnary
361 """decode a bundle2 caps bytes blob into a dictionnary
362
362
363 The blob is a list of capabilities (one per line)
363 The blob is a list of capabilities (one per line)
364 Capabilities may have values using a line of the form::
364 Capabilities may have values using a line of the form::
365
365
366 capability=value1,value2,value3
366 capability=value1,value2,value3
367
367
368 The values are always a list."""
368 The values are always a list."""
369 caps = {}
369 caps = {}
370 for line in blob.splitlines():
370 for line in blob.splitlines():
371 if not line:
371 if not line:
372 continue
372 continue
373 if '=' not in line:
373 if '=' not in line:
374 key, vals = line, ()
374 key, vals = line, ()
375 else:
375 else:
376 key, vals = line.split('=', 1)
376 key, vals = line.split('=', 1)
377 vals = vals.split(',')
377 vals = vals.split(',')
378 key = urllib.unquote(key)
378 key = urllib.unquote(key)
379 vals = [urllib.unquote(v) for v in vals]
379 vals = [urllib.unquote(v) for v in vals]
380 caps[key] = vals
380 caps[key] = vals
381 return caps
381 return caps
382
382
383 def encodecaps(caps):
383 def encodecaps(caps):
384 """encode a bundle2 caps dictionary into a bytes blob"""
384 """encode a bundle2 caps dictionary into a bytes blob"""
385 chunks = []
385 chunks = []
386 for ca in sorted(caps):
386 for ca in sorted(caps):
387 vals = caps[ca]
387 vals = caps[ca]
388 ca = urllib.quote(ca)
388 ca = urllib.quote(ca)
389 vals = [urllib.quote(v) for v in vals]
389 vals = [urllib.quote(v) for v in vals]
390 if vals:
390 if vals:
391 ca = "%s=%s" % (ca, ','.join(vals))
391 ca = "%s=%s" % (ca, ','.join(vals))
392 chunks.append(ca)
392 chunks.append(ca)
393 return '\n'.join(chunks)
393 return '\n'.join(chunks)
394
394
395 class bundle20(object):
395 class bundle20(object):
396 """represent an outgoing bundle2 container
396 """represent an outgoing bundle2 container
397
397
398 Use the `addparam` method to add stream level parameter. and `newpart` to
398 Use the `addparam` method to add stream level parameter. and `newpart` to
399 populate it. Then call `getchunks` to retrieve all the binary chunks of
399 populate it. Then call `getchunks` to retrieve all the binary chunks of
400 data that compose the bundle2 container."""
400 data that compose the bundle2 container."""
401
401
402 def __init__(self, ui, capabilities=()):
402 def __init__(self, ui, capabilities=()):
403 self.ui = ui
403 self.ui = ui
404 self._params = []
404 self._params = []
405 self._parts = []
405 self._parts = []
406 self.capabilities = dict(capabilities)
406 self.capabilities = dict(capabilities)
407
407
408 @property
408 @property
409 def nbparts(self):
409 def nbparts(self):
410 """total number of parts added to the bundler"""
410 """total number of parts added to the bundler"""
411 return len(self._parts)
411 return len(self._parts)
412
412
413 # methods used to defines the bundle2 content
413 # methods used to defines the bundle2 content
414 def addparam(self, name, value=None):
414 def addparam(self, name, value=None):
415 """add a stream level parameter"""
415 """add a stream level parameter"""
416 if not name:
416 if not name:
417 raise ValueError('empty parameter name')
417 raise ValueError('empty parameter name')
418 if name[0] not in string.letters:
418 if name[0] not in string.letters:
419 raise ValueError('non letter first character: %r' % name)
419 raise ValueError('non letter first character: %r' % name)
420 self._params.append((name, value))
420 self._params.append((name, value))
421
421
422 def addpart(self, part):
422 def addpart(self, part):
423 """add a new part to the bundle2 container
423 """add a new part to the bundle2 container
424
424
425 Parts contains the actual applicative payload."""
425 Parts contains the actual applicative payload."""
426 assert part.id is None
426 assert part.id is None
427 part.id = len(self._parts) # very cheap counter
427 part.id = len(self._parts) # very cheap counter
428 self._parts.append(part)
428 self._parts.append(part)
429
429
430 def newpart(self, typeid, *args, **kwargs):
430 def newpart(self, typeid, *args, **kwargs):
431 """create a new part and add it to the containers
431 """create a new part and add it to the containers
432
432
433 As the part is directly added to the containers. For now, this means
433 As the part is directly added to the containers. For now, this means
434 that any failure to properly initialize the part after calling
434 that any failure to properly initialize the part after calling
435 ``newpart`` should result in a failure of the whole bundling process.
435 ``newpart`` should result in a failure of the whole bundling process.
436
436
437 You can still fall back to manually create and add if you need better
437 You can still fall back to manually create and add if you need better
438 control."""
438 control."""
439 part = bundlepart(typeid, *args, **kwargs)
439 part = bundlepart(typeid, *args, **kwargs)
440 self.addpart(part)
440 self.addpart(part)
441 return part
441 return part
442
442
443 # methods used to generate the bundle2 stream
443 # methods used to generate the bundle2 stream
444 def getchunks(self):
444 def getchunks(self):
445 self.ui.debug('start emission of %s stream\n' % _magicstring)
445 self.ui.debug('start emission of %s stream\n' % _magicstring)
446 yield _magicstring
446 yield _magicstring
447 param = self._paramchunk()
447 param = self._paramchunk()
448 self.ui.debug('bundle parameter: %s\n' % param)
448 self.ui.debug('bundle parameter: %s\n' % param)
449 yield _pack(_fstreamparamsize, len(param))
449 yield _pack(_fstreamparamsize, len(param))
450 if param:
450 if param:
451 yield param
451 yield param
452
452
453 self.ui.debug('start of parts\n')
453 self.ui.debug('start of parts\n')
454 for part in self._parts:
454 for part in self._parts:
455 self.ui.debug('bundle part: "%s"\n' % part.type)
455 self.ui.debug('bundle part: "%s"\n' % part.type)
456 for chunk in part.getchunks():
456 for chunk in part.getchunks():
457 yield chunk
457 yield chunk
458 self.ui.debug('end of bundle\n')
458 self.ui.debug('end of bundle\n')
459 yield _pack(_fpartheadersize, 0)
459 yield _pack(_fpartheadersize, 0)
460
460
461 def _paramchunk(self):
461 def _paramchunk(self):
462 """return a encoded version of all stream parameters"""
462 """return a encoded version of all stream parameters"""
463 blocks = []
463 blocks = []
464 for par, value in self._params:
464 for par, value in self._params:
465 par = urllib.quote(par)
465 par = urllib.quote(par)
466 if value is not None:
466 if value is not None:
467 value = urllib.quote(value)
467 value = urllib.quote(value)
468 par = '%s=%s' % (par, value)
468 par = '%s=%s' % (par, value)
469 blocks.append(par)
469 blocks.append(par)
470 return ' '.join(blocks)
470 return ' '.join(blocks)
471
471
472 class unpackermixin(object):
472 class unpackermixin(object):
473 """A mixin to extract bytes and struct data from a stream"""
473 """A mixin to extract bytes and struct data from a stream"""
474
474
475 def __init__(self, fp):
475 def __init__(self, fp):
476 self._fp = fp
476 self._fp = fp
477
477
478 def _unpack(self, format):
478 def _unpack(self, format):
479 """unpack this struct format from the stream"""
479 """unpack this struct format from the stream"""
480 data = self._readexact(struct.calcsize(format))
480 data = self._readexact(struct.calcsize(format))
481 return _unpack(format, data)
481 return _unpack(format, data)
482
482
483 def _readexact(self, size):
483 def _readexact(self, size):
484 """read exactly <size> bytes from the stream"""
484 """read exactly <size> bytes from the stream"""
485 return changegroup.readexactly(self._fp, size)
485 return changegroup.readexactly(self._fp, size)
486
486
487
487
488 class unbundle20(unpackermixin):
488 class unbundle20(unpackermixin):
489 """interpret a bundle2 stream
489 """interpret a bundle2 stream
490
490
491 This class is fed with a binary stream and yields parts through its
491 This class is fed with a binary stream and yields parts through its
492 `iterparts` methods."""
492 `iterparts` methods."""
493
493
494 def __init__(self, ui, fp, header=None):
494 def __init__(self, ui, fp, header=None):
495 """If header is specified, we do not read it out of the stream."""
495 """If header is specified, we do not read it out of the stream."""
496 self.ui = ui
496 self.ui = ui
497 super(unbundle20, self).__init__(fp)
497 super(unbundle20, self).__init__(fp)
498 if header is None:
498 if header is None:
499 header = self._readexact(4)
499 header = self._readexact(4)
500 magic, version = header[0:2], header[2:4]
500 magic, version = header[0:2], header[2:4]
501 if magic != 'HG':
501 if magic != 'HG':
502 raise util.Abort(_('not a Mercurial bundle'))
502 raise util.Abort(_('not a Mercurial bundle'))
503 if version != '2Y':
503 if version != '2Y':
504 raise util.Abort(_('unknown bundle version %s') % version)
504 raise util.Abort(_('unknown bundle version %s') % version)
505 self.ui.debug('start processing of %s stream\n' % header)
505 self.ui.debug('start processing of %s stream\n' % header)
506
506
507 @util.propertycache
507 @util.propertycache
508 def params(self):
508 def params(self):
509 """dictionary of stream level parameters"""
509 """dictionary of stream level parameters"""
510 self.ui.debug('reading bundle2 stream parameters\n')
510 self.ui.debug('reading bundle2 stream parameters\n')
511 params = {}
511 params = {}
512 paramssize = self._unpack(_fstreamparamsize)[0]
512 paramssize = self._unpack(_fstreamparamsize)[0]
513 if paramssize < 0:
513 if paramssize < 0:
514 raise error.BundleValueError('negative bundle param size: %i'
514 raise error.BundleValueError('negative bundle param size: %i'
515 % paramssize)
515 % paramssize)
516 if paramssize:
516 if paramssize:
517 for p in self._readexact(paramssize).split(' '):
517 for p in self._readexact(paramssize).split(' '):
518 p = p.split('=', 1)
518 p = p.split('=', 1)
519 p = [urllib.unquote(i) for i in p]
519 p = [urllib.unquote(i) for i in p]
520 if len(p) < 2:
520 if len(p) < 2:
521 p.append(None)
521 p.append(None)
522 self._processparam(*p)
522 self._processparam(*p)
523 params[p[0]] = p[1]
523 params[p[0]] = p[1]
524 return params
524 return params
525
525
526 def _processparam(self, name, value):
526 def _processparam(self, name, value):
527 """process a parameter, applying its effect if needed
527 """process a parameter, applying its effect if needed
528
528
529 Parameter starting with a lower case letter are advisory and will be
529 Parameter starting with a lower case letter are advisory and will be
530 ignored when unknown. Those starting with an upper case letter are
530 ignored when unknown. Those starting with an upper case letter are
531 mandatory and will this function will raise a KeyError when unknown.
531 mandatory and will this function will raise a KeyError when unknown.
532
532
533 Note: no option are currently supported. Any input will be either
533 Note: no option are currently supported. Any input will be either
534 ignored or failing.
534 ignored or failing.
535 """
535 """
536 if not name:
536 if not name:
537 raise ValueError('empty parameter name')
537 raise ValueError('empty parameter name')
538 if name[0] not in string.letters:
538 if name[0] not in string.letters:
539 raise ValueError('non letter first character: %r' % name)
539 raise ValueError('non letter first character: %r' % name)
540 # Some logic will be later added here to try to process the option for
540 # Some logic will be later added here to try to process the option for
541 # a dict of known parameter.
541 # a dict of known parameter.
542 if name[0].islower():
542 if name[0].islower():
543 self.ui.debug("ignoring unknown parameter %r\n" % name)
543 self.ui.debug("ignoring unknown parameter %r\n" % name)
544 else:
544 else:
545 raise error.UnsupportedPartError(params=(name,))
545 raise error.UnsupportedPartError(params=(name,))
546
546
547
547
548 def iterparts(self):
548 def iterparts(self):
549 """yield all parts contained in the stream"""
549 """yield all parts contained in the stream"""
550 # make sure param have been loaded
550 # make sure param have been loaded
551 self.params
551 self.params
552 self.ui.debug('start extraction of bundle2 parts\n')
552 self.ui.debug('start extraction of bundle2 parts\n')
553 headerblock = self._readpartheader()
553 headerblock = self._readpartheader()
554 while headerblock is not None:
554 while headerblock is not None:
555 part = unbundlepart(self.ui, headerblock, self._fp)
555 part = unbundlepart(self.ui, headerblock, self._fp)
556 yield part
556 yield part
557 headerblock = self._readpartheader()
557 headerblock = self._readpartheader()
558 self.ui.debug('end of bundle2 stream\n')
558 self.ui.debug('end of bundle2 stream\n')
559
559
560 def _readpartheader(self):
560 def _readpartheader(self):
561 """reads a part header size and return the bytes blob
561 """reads a part header size and return the bytes blob
562
562
563 returns None if empty"""
563 returns None if empty"""
564 headersize = self._unpack(_fpartheadersize)[0]
564 headersize = self._unpack(_fpartheadersize)[0]
565 if headersize < 0:
565 if headersize < 0:
566 raise error.BundleValueError('negative part header size: %i'
566 raise error.BundleValueError('negative part header size: %i'
567 % headersize)
567 % headersize)
568 self.ui.debug('part header size: %i\n' % headersize)
568 self.ui.debug('part header size: %i\n' % headersize)
569 if headersize:
569 if headersize:
570 return self._readexact(headersize)
570 return self._readexact(headersize)
571 return None
571 return None
572
572
573
573
574 class bundlepart(object):
574 class bundlepart(object):
575 """A bundle2 part contains application level payload
575 """A bundle2 part contains application level payload
576
576
577 The part `type` is used to route the part to the application level
577 The part `type` is used to route the part to the application level
578 handler.
578 handler.
579
579
580 The part payload is contained in ``part.data``. It could be raw bytes or a
580 The part payload is contained in ``part.data``. It could be raw bytes or a
581 generator of byte chunks.
581 generator of byte chunks.
582
582
583 You can add parameters to the part using the ``addparam`` method.
583 You can add parameters to the part using the ``addparam`` method.
584 Parameters can be either mandatory (default) or advisory. Remote side
584 Parameters can be either mandatory (default) or advisory. Remote side
585 should be able to safely ignore the advisory ones.
585 should be able to safely ignore the advisory ones.
586
586
587 Both data and parameters cannot be modified after the generation has begun.
587 Both data and parameters cannot be modified after the generation has begun.
588 """
588 """
589
589
590 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
590 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
591 data=''):
591 data=''):
592 self.id = None
592 self.id = None
593 self.type = parttype
593 self.type = parttype
594 self._data = data
594 self._data = data
595 self._mandatoryparams = list(mandatoryparams)
595 self._mandatoryparams = list(mandatoryparams)
596 self._advisoryparams = list(advisoryparams)
596 self._advisoryparams = list(advisoryparams)
597 # checking for duplicated entries
597 # checking for duplicated entries
598 self._seenparams = set()
598 self._seenparams = set()
599 for pname, __ in self._mandatoryparams + self._advisoryparams:
599 for pname, __ in self._mandatoryparams + self._advisoryparams:
600 if pname in self._seenparams:
600 if pname in self._seenparams:
601 raise RuntimeError('duplicated params: %s' % pname)
601 raise RuntimeError('duplicated params: %s' % pname)
602 self._seenparams.add(pname)
602 self._seenparams.add(pname)
603 # status of the part's generation:
603 # status of the part's generation:
604 # - None: not started,
604 # - None: not started,
605 # - False: currently generated,
605 # - False: currently generated,
606 # - True: generation done.
606 # - True: generation done.
607 self._generated = None
607 self._generated = None
608
608
609 # methods used to defines the part content
609 # methods used to defines the part content
610 def __setdata(self, data):
610 def __setdata(self, data):
611 if self._generated is not None:
611 if self._generated is not None:
612 raise error.ReadOnlyPartError('part is being generated')
612 raise error.ReadOnlyPartError('part is being generated')
613 self._data = data
613 self._data = data
614 def __getdata(self):
614 def __getdata(self):
615 return self._data
615 return self._data
616 data = property(__getdata, __setdata)
616 data = property(__getdata, __setdata)
617
617
618 @property
618 @property
619 def mandatoryparams(self):
619 def mandatoryparams(self):
620 # make it an immutable tuple to force people through ``addparam``
620 # make it an immutable tuple to force people through ``addparam``
621 return tuple(self._mandatoryparams)
621 return tuple(self._mandatoryparams)
622
622
623 @property
623 @property
624 def advisoryparams(self):
624 def advisoryparams(self):
625 # make it an immutable tuple to force people through ``addparam``
625 # make it an immutable tuple to force people through ``addparam``
626 return tuple(self._advisoryparams)
626 return tuple(self._advisoryparams)
627
627
628 def addparam(self, name, value='', mandatory=True):
628 def addparam(self, name, value='', mandatory=True):
629 if self._generated is not None:
629 if self._generated is not None:
630 raise error.ReadOnlyPartError('part is being generated')
630 raise error.ReadOnlyPartError('part is being generated')
631 if name in self._seenparams:
631 if name in self._seenparams:
632 raise ValueError('duplicated params: %s' % name)
632 raise ValueError('duplicated params: %s' % name)
633 self._seenparams.add(name)
633 self._seenparams.add(name)
634 params = self._advisoryparams
634 params = self._advisoryparams
635 if mandatory:
635 if mandatory:
636 params = self._mandatoryparams
636 params = self._mandatoryparams
637 params.append((name, value))
637 params.append((name, value))
638
638
639 # methods used to generates the bundle2 stream
639 # methods used to generates the bundle2 stream
640 def getchunks(self):
640 def getchunks(self):
641 if self._generated is not None:
641 if self._generated is not None:
642 raise RuntimeError('part can only be consumed once')
642 raise RuntimeError('part can only be consumed once')
643 self._generated = False
643 self._generated = False
644 #### header
644 #### header
645 ## parttype
645 ## parttype
646 header = [_pack(_fparttypesize, len(self.type)),
646 header = [_pack(_fparttypesize, len(self.type)),
647 self.type, _pack(_fpartid, self.id),
647 self.type, _pack(_fpartid, self.id),
648 ]
648 ]
649 ## parameters
649 ## parameters
650 # count
650 # count
651 manpar = self.mandatoryparams
651 manpar = self.mandatoryparams
652 advpar = self.advisoryparams
652 advpar = self.advisoryparams
653 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
653 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
654 # size
654 # size
655 parsizes = []
655 parsizes = []
656 for key, value in manpar:
656 for key, value in manpar:
657 parsizes.append(len(key))
657 parsizes.append(len(key))
658 parsizes.append(len(value))
658 parsizes.append(len(value))
659 for key, value in advpar:
659 for key, value in advpar:
660 parsizes.append(len(key))
660 parsizes.append(len(key))
661 parsizes.append(len(value))
661 parsizes.append(len(value))
662 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
662 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
663 header.append(paramsizes)
663 header.append(paramsizes)
664 # key, value
664 # key, value
665 for key, value in manpar:
665 for key, value in manpar:
666 header.append(key)
666 header.append(key)
667 header.append(value)
667 header.append(value)
668 for key, value in advpar:
668 for key, value in advpar:
669 header.append(key)
669 header.append(key)
670 header.append(value)
670 header.append(value)
671 ## finalize header
671 ## finalize header
672 headerchunk = ''.join(header)
672 headerchunk = ''.join(header)
673 yield _pack(_fpartheadersize, len(headerchunk))
673 yield _pack(_fpartheadersize, len(headerchunk))
674 yield headerchunk
674 yield headerchunk
675 ## payload
675 ## payload
676 for chunk in self._payloadchunks():
676 for chunk in self._payloadchunks():
677 yield _pack(_fpayloadsize, len(chunk))
677 yield _pack(_fpayloadsize, len(chunk))
678 yield chunk
678 yield chunk
679 # end of payload
679 # end of payload
680 yield _pack(_fpayloadsize, 0)
680 yield _pack(_fpayloadsize, 0)
681 self._generated = True
681 self._generated = True
682
682
683 def _payloadchunks(self):
683 def _payloadchunks(self):
684 """yield chunks of a the part payload
684 """yield chunks of a the part payload
685
685
686 Exists to handle the different methods to provide data to a part."""
686 Exists to handle the different methods to provide data to a part."""
687 # we only support fixed size data now.
687 # we only support fixed size data now.
688 # This will be improved in the future.
688 # This will be improved in the future.
689 if util.safehasattr(self.data, 'next'):
689 if util.safehasattr(self.data, 'next'):
690 buff = util.chunkbuffer(self.data)
690 buff = util.chunkbuffer(self.data)
691 chunk = buff.read(preferedchunksize)
691 chunk = buff.read(preferedchunksize)
692 while chunk:
692 while chunk:
693 yield chunk
693 yield chunk
694 chunk = buff.read(preferedchunksize)
694 chunk = buff.read(preferedchunksize)
695 elif len(self.data):
695 elif len(self.data):
696 yield self.data
696 yield self.data
697
697
698
699 flaginterrupt = -1
700
701 class interrupthandler(unpackermixin):
702 """read one part and process it with restricted capability
703
704 This allows to transmit exception raised on the producer size during part
705 iteration while the consumer is reading a part.
706
707 Part processed in this manner only have access to a ui object,"""
708
709 def __init__(self, ui, fp):
710 super(interrupthandler, self).__init__(fp)
711 self.ui = ui
712
713 def _readpartheader(self):
714 """reads a part header size and return the bytes blob
715
716 returns None if empty"""
717 headersize = self._unpack(_fpartheadersize)[0]
718 if headersize < 0:
719 raise error.BundleValueError('negative part header size: %i'
720 % headersize)
721 self.ui.debug('part header size: %i\n' % headersize)
722 if headersize:
723 return self._readexact(headersize)
724 return None
725
726 def __call__(self):
727 self.ui.debug('bundle2 stream interruption, looking for a part.\n')
728 headerblock = self._readpartheader()
729 if headerblock is None:
730 self.ui.debug('no part found during iterruption.\n')
731 return
732 part = unbundlepart(self.ui, headerblock, self._fp)
733 op = interruptoperation(self.ui)
734 _processpart(op, part)
735
736 class interruptoperation(object):
737 """A limited operation to be use by part handler during interruption
738
739 It only have access to an ui object.
740 """
741
742 def __init__(self, ui):
743 self.ui = ui
744 self.reply = None
745
746 @property
747 def repo(self):
748 raise RuntimeError('no repo access from stream interruption')
749
750 def gettransaction(self):
751 raise TransactionUnavailable('no repo access from stream interruption')
752
698 class unbundlepart(unpackermixin):
753 class unbundlepart(unpackermixin):
699 """a bundle part read from a bundle"""
754 """a bundle part read from a bundle"""
700
755
701 def __init__(self, ui, header, fp):
756 def __init__(self, ui, header, fp):
702 super(unbundlepart, self).__init__(fp)
757 super(unbundlepart, self).__init__(fp)
703 self.ui = ui
758 self.ui = ui
704 # unbundle state attr
759 # unbundle state attr
705 self._headerdata = header
760 self._headerdata = header
706 self._headeroffset = 0
761 self._headeroffset = 0
707 self._initialized = False
762 self._initialized = False
708 self.consumed = False
763 self.consumed = False
709 # part data
764 # part data
710 self.id = None
765 self.id = None
711 self.type = None
766 self.type = None
712 self.mandatoryparams = None
767 self.mandatoryparams = None
713 self.advisoryparams = None
768 self.advisoryparams = None
714 self.params = None
769 self.params = None
715 self.mandatorykeys = ()
770 self.mandatorykeys = ()
716 self._payloadstream = None
771 self._payloadstream = None
717 self._readheader()
772 self._readheader()
718
773
719 def _fromheader(self, size):
774 def _fromheader(self, size):
720 """return the next <size> byte from the header"""
775 """return the next <size> byte from the header"""
721 offset = self._headeroffset
776 offset = self._headeroffset
722 data = self._headerdata[offset:(offset + size)]
777 data = self._headerdata[offset:(offset + size)]
723 self._headeroffset = offset + size
778 self._headeroffset = offset + size
724 return data
779 return data
725
780
726 def _unpackheader(self, format):
781 def _unpackheader(self, format):
727 """read given format from header
782 """read given format from header
728
783
729 This automatically compute the size of the format to read."""
784 This automatically compute the size of the format to read."""
730 data = self._fromheader(struct.calcsize(format))
785 data = self._fromheader(struct.calcsize(format))
731 return _unpack(format, data)
786 return _unpack(format, data)
732
787
733 def _initparams(self, mandatoryparams, advisoryparams):
788 def _initparams(self, mandatoryparams, advisoryparams):
734 """internal function to setup all logic related parameters"""
789 """internal function to setup all logic related parameters"""
735 # make it read only to prevent people touching it by mistake.
790 # make it read only to prevent people touching it by mistake.
736 self.mandatoryparams = tuple(mandatoryparams)
791 self.mandatoryparams = tuple(mandatoryparams)
737 self.advisoryparams = tuple(advisoryparams)
792 self.advisoryparams = tuple(advisoryparams)
738 # user friendly UI
793 # user friendly UI
739 self.params = dict(self.mandatoryparams)
794 self.params = dict(self.mandatoryparams)
740 self.params.update(dict(self.advisoryparams))
795 self.params.update(dict(self.advisoryparams))
741 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
796 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
742
797
743 def _readheader(self):
798 def _readheader(self):
744 """read the header and setup the object"""
799 """read the header and setup the object"""
745 typesize = self._unpackheader(_fparttypesize)[0]
800 typesize = self._unpackheader(_fparttypesize)[0]
746 self.type = self._fromheader(typesize)
801 self.type = self._fromheader(typesize)
747 self.ui.debug('part type: "%s"\n' % self.type)
802 self.ui.debug('part type: "%s"\n' % self.type)
748 self.id = self._unpackheader(_fpartid)[0]
803 self.id = self._unpackheader(_fpartid)[0]
749 self.ui.debug('part id: "%s"\n' % self.id)
804 self.ui.debug('part id: "%s"\n' % self.id)
750 ## reading parameters
805 ## reading parameters
751 # param count
806 # param count
752 mancount, advcount = self._unpackheader(_fpartparamcount)
807 mancount, advcount = self._unpackheader(_fpartparamcount)
753 self.ui.debug('part parameters: %i\n' % (mancount + advcount))
808 self.ui.debug('part parameters: %i\n' % (mancount + advcount))
754 # param size
809 # param size
755 fparamsizes = _makefpartparamsizes(mancount + advcount)
810 fparamsizes = _makefpartparamsizes(mancount + advcount)
756 paramsizes = self._unpackheader(fparamsizes)
811 paramsizes = self._unpackheader(fparamsizes)
757 # make it a list of couple again
812 # make it a list of couple again
758 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
813 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
759 # split mandatory from advisory
814 # split mandatory from advisory
760 mansizes = paramsizes[:mancount]
815 mansizes = paramsizes[:mancount]
761 advsizes = paramsizes[mancount:]
816 advsizes = paramsizes[mancount:]
762 # retrive param value
817 # retrive param value
763 manparams = []
818 manparams = []
764 for key, value in mansizes:
819 for key, value in mansizes:
765 manparams.append((self._fromheader(key), self._fromheader(value)))
820 manparams.append((self._fromheader(key), self._fromheader(value)))
766 advparams = []
821 advparams = []
767 for key, value in advsizes:
822 for key, value in advsizes:
768 advparams.append((self._fromheader(key), self._fromheader(value)))
823 advparams.append((self._fromheader(key), self._fromheader(value)))
769 self._initparams(manparams, advparams)
824 self._initparams(manparams, advparams)
770 ## part payload
825 ## part payload
771 def payloadchunks():
826 def payloadchunks():
772 payloadsize = self._unpack(_fpayloadsize)[0]
827 payloadsize = self._unpack(_fpayloadsize)[0]
773 self.ui.debug('payload chunk size: %i\n' % payloadsize)
828 self.ui.debug('payload chunk size: %i\n' % payloadsize)
774 while payloadsize:
829 while payloadsize:
775 if payloadsize < 0:
830 if payloadsize == flaginterrupt:
776 msg = 'negative payload chunk size: %i' % payloadsize
831 # interruption detection, the handler will now read a
832 # single part and process it.
833 interrupthandler(self.ui, self._fp)()
834 elif payloadsize < 0:
835 msg = 'negative payload chunk size: %i' % payloadsize
777 raise error.BundleValueError(msg)
836 raise error.BundleValueError(msg)
778 yield self._readexact(payloadsize)
837 else:
838 yield self._readexact(payloadsize)
779 payloadsize = self._unpack(_fpayloadsize)[0]
839 payloadsize = self._unpack(_fpayloadsize)[0]
780 self.ui.debug('payload chunk size: %i\n' % payloadsize)
840 self.ui.debug('payload chunk size: %i\n' % payloadsize)
781 self._payloadstream = util.chunkbuffer(payloadchunks())
841 self._payloadstream = util.chunkbuffer(payloadchunks())
782 # we read the data, tell it
842 # we read the data, tell it
783 self._initialized = True
843 self._initialized = True
784
844
785 def read(self, size=None):
845 def read(self, size=None):
786 """read payload data"""
846 """read payload data"""
787 if not self._initialized:
847 if not self._initialized:
788 self._readheader()
848 self._readheader()
789 if size is None:
849 if size is None:
790 data = self._payloadstream.read()
850 data = self._payloadstream.read()
791 else:
851 else:
792 data = self._payloadstream.read(size)
852 data = self._payloadstream.read(size)
793 if size is None or len(data) < size:
853 if size is None or len(data) < size:
794 self.consumed = True
854 self.consumed = True
795 return data
855 return data
796
856
797 capabilities = {'HG2Y': (),
857 capabilities = {'HG2Y': (),
798 'b2x:listkeys': (),
858 'b2x:listkeys': (),
799 'b2x:pushkey': (),
859 'b2x:pushkey': (),
800 'b2x:changegroup': (),
860 'b2x:changegroup': (),
801 'digests': tuple(sorted(util.DIGESTS.keys())),
861 'digests': tuple(sorted(util.DIGESTS.keys())),
802 'b2x:remote-changegroup': ('http', 'https'),
862 'b2x:remote-changegroup': ('http', 'https'),
803 }
863 }
804
864
805 def getrepocaps(repo):
865 def getrepocaps(repo):
806 """return the bundle2 capabilities for a given repo
866 """return the bundle2 capabilities for a given repo
807
867
808 Exists to allow extensions (like evolution) to mutate the capabilities.
868 Exists to allow extensions (like evolution) to mutate the capabilities.
809 """
869 """
810 caps = capabilities.copy()
870 caps = capabilities.copy()
811 if obsolete.isenabled(repo, obsolete.exchangeopt):
871 if obsolete.isenabled(repo, obsolete.exchangeopt):
812 supportedformat = tuple('V%i' % v for v in obsolete.formats)
872 supportedformat = tuple('V%i' % v for v in obsolete.formats)
813 caps['b2x:obsmarkers'] = supportedformat
873 caps['b2x:obsmarkers'] = supportedformat
814 return caps
874 return caps
815
875
816 def bundle2caps(remote):
876 def bundle2caps(remote):
817 """return the bundlecapabilities of a peer as dict"""
877 """return the bundlecapabilities of a peer as dict"""
818 raw = remote.capable('bundle2-exp')
878 raw = remote.capable('bundle2-exp')
819 if not raw and raw != '':
879 if not raw and raw != '':
820 return {}
880 return {}
821 capsblob = urllib.unquote(remote.capable('bundle2-exp'))
881 capsblob = urllib.unquote(remote.capable('bundle2-exp'))
822 return decodecaps(capsblob)
882 return decodecaps(capsblob)
823
883
824 def obsmarkersversion(caps):
884 def obsmarkersversion(caps):
825 """extract the list of supported obsmarkers versions from a bundle2caps dict
885 """extract the list of supported obsmarkers versions from a bundle2caps dict
826 """
886 """
827 obscaps = caps.get('b2x:obsmarkers', ())
887 obscaps = caps.get('b2x:obsmarkers', ())
828 return [int(c[1:]) for c in obscaps if c.startswith('V')]
888 return [int(c[1:]) for c in obscaps if c.startswith('V')]
829
889
830 @parthandler('b2x:changegroup')
890 @parthandler('b2x:changegroup')
831 def handlechangegroup(op, inpart):
891 def handlechangegroup(op, inpart):
832 """apply a changegroup part on the repo
892 """apply a changegroup part on the repo
833
893
834 This is a very early implementation that will massive rework before being
894 This is a very early implementation that will massive rework before being
835 inflicted to any end-user.
895 inflicted to any end-user.
836 """
896 """
837 # Make sure we trigger a transaction creation
897 # Make sure we trigger a transaction creation
838 #
898 #
839 # The addchangegroup function will get a transaction object by itself, but
899 # The addchangegroup function will get a transaction object by itself, but
840 # we need to make sure we trigger the creation of a transaction object used
900 # we need to make sure we trigger the creation of a transaction object used
841 # for the whole processing scope.
901 # for the whole processing scope.
842 op.gettransaction()
902 op.gettransaction()
843 cg = changegroup.cg1unpacker(inpart, 'UN')
903 cg = changegroup.cg1unpacker(inpart, 'UN')
844 # the source and url passed here are overwritten by the one contained in
904 # the source and url passed here are overwritten by the one contained in
845 # the transaction.hookargs argument. So 'bundle2' is a placeholder
905 # the transaction.hookargs argument. So 'bundle2' is a placeholder
846 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
906 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
847 op.records.add('changegroup', {'return': ret})
907 op.records.add('changegroup', {'return': ret})
848 if op.reply is not None:
908 if op.reply is not None:
849 # This is definitly not the final form of this
909 # This is definitly not the final form of this
850 # return. But one need to start somewhere.
910 # return. But one need to start somewhere.
851 part = op.reply.newpart('b2x:reply:changegroup')
911 part = op.reply.newpart('b2x:reply:changegroup')
852 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
912 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
853 part.addparam('return', '%i' % ret, mandatory=False)
913 part.addparam('return', '%i' % ret, mandatory=False)
854 assert not inpart.read()
914 assert not inpart.read()
855
915
856 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
916 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
857 ['digest:%s' % k for k in util.DIGESTS.keys()])
917 ['digest:%s' % k for k in util.DIGESTS.keys()])
858 @parthandler('b2x:remote-changegroup', _remotechangegroupparams)
918 @parthandler('b2x:remote-changegroup', _remotechangegroupparams)
859 def handleremotechangegroup(op, inpart):
919 def handleremotechangegroup(op, inpart):
860 """apply a bundle10 on the repo, given an url and validation information
920 """apply a bundle10 on the repo, given an url and validation information
861
921
862 All the information about the remote bundle to import are given as
922 All the information about the remote bundle to import are given as
863 parameters. The parameters include:
923 parameters. The parameters include:
864 - url: the url to the bundle10.
924 - url: the url to the bundle10.
865 - size: the bundle10 file size. It is used to validate what was
925 - size: the bundle10 file size. It is used to validate what was
866 retrieved by the client matches the server knowledge about the bundle.
926 retrieved by the client matches the server knowledge about the bundle.
867 - digests: a space separated list of the digest types provided as
927 - digests: a space separated list of the digest types provided as
868 parameters.
928 parameters.
869 - digest:<digest-type>: the hexadecimal representation of the digest with
929 - digest:<digest-type>: the hexadecimal representation of the digest with
870 that name. Like the size, it is used to validate what was retrieved by
930 that name. Like the size, it is used to validate what was retrieved by
871 the client matches what the server knows about the bundle.
931 the client matches what the server knows about the bundle.
872
932
873 When multiple digest types are given, all of them are checked.
933 When multiple digest types are given, all of them are checked.
874 """
934 """
875 try:
935 try:
876 raw_url = inpart.params['url']
936 raw_url = inpart.params['url']
877 except KeyError:
937 except KeyError:
878 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'url')
938 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'url')
879 parsed_url = util.url(raw_url)
939 parsed_url = util.url(raw_url)
880 if parsed_url.scheme not in capabilities['b2x:remote-changegroup']:
940 if parsed_url.scheme not in capabilities['b2x:remote-changegroup']:
881 raise util.Abort(_('remote-changegroup does not support %s urls') %
941 raise util.Abort(_('remote-changegroup does not support %s urls') %
882 parsed_url.scheme)
942 parsed_url.scheme)
883
943
884 try:
944 try:
885 size = int(inpart.params['size'])
945 size = int(inpart.params['size'])
886 except ValueError:
946 except ValueError:
887 raise util.Abort(_('remote-changegroup: invalid value for param "%s"')
947 raise util.Abort(_('remote-changegroup: invalid value for param "%s"')
888 % 'size')
948 % 'size')
889 except KeyError:
949 except KeyError:
890 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'size')
950 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'size')
891
951
892 digests = {}
952 digests = {}
893 for typ in inpart.params.get('digests', '').split():
953 for typ in inpart.params.get('digests', '').split():
894 param = 'digest:%s' % typ
954 param = 'digest:%s' % typ
895 try:
955 try:
896 value = inpart.params[param]
956 value = inpart.params[param]
897 except KeyError:
957 except KeyError:
898 raise util.Abort(_('remote-changegroup: missing "%s" param') %
958 raise util.Abort(_('remote-changegroup: missing "%s" param') %
899 param)
959 param)
900 digests[typ] = value
960 digests[typ] = value
901
961
902 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
962 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
903
963
904 # Make sure we trigger a transaction creation
964 # Make sure we trigger a transaction creation
905 #
965 #
906 # The addchangegroup function will get a transaction object by itself, but
966 # The addchangegroup function will get a transaction object by itself, but
907 # we need to make sure we trigger the creation of a transaction object used
967 # we need to make sure we trigger the creation of a transaction object used
908 # for the whole processing scope.
968 # for the whole processing scope.
909 op.gettransaction()
969 op.gettransaction()
910 import exchange
970 import exchange
911 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
971 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
912 if not isinstance(cg, changegroup.cg1unpacker):
972 if not isinstance(cg, changegroup.cg1unpacker):
913 raise util.Abort(_('%s: not a bundle version 1.0') %
973 raise util.Abort(_('%s: not a bundle version 1.0') %
914 util.hidepassword(raw_url))
974 util.hidepassword(raw_url))
915 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
975 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
916 op.records.add('changegroup', {'return': ret})
976 op.records.add('changegroup', {'return': ret})
917 if op.reply is not None:
977 if op.reply is not None:
918 # This is definitly not the final form of this
978 # This is definitly not the final form of this
919 # return. But one need to start somewhere.
979 # return. But one need to start somewhere.
920 part = op.reply.newpart('b2x:reply:changegroup')
980 part = op.reply.newpart('b2x:reply:changegroup')
921 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
981 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
922 part.addparam('return', '%i' % ret, mandatory=False)
982 part.addparam('return', '%i' % ret, mandatory=False)
923 try:
983 try:
924 real_part.validate()
984 real_part.validate()
925 except util.Abort, e:
985 except util.Abort, e:
926 raise util.Abort(_('bundle at %s is corrupted:\n%s') %
986 raise util.Abort(_('bundle at %s is corrupted:\n%s') %
927 (util.hidepassword(raw_url), str(e)))
987 (util.hidepassword(raw_url), str(e)))
928 assert not inpart.read()
988 assert not inpart.read()
929
989
930 @parthandler('b2x:reply:changegroup', ('return', 'in-reply-to'))
990 @parthandler('b2x:reply:changegroup', ('return', 'in-reply-to'))
931 def handlereplychangegroup(op, inpart):
991 def handlereplychangegroup(op, inpart):
932 ret = int(inpart.params['return'])
992 ret = int(inpart.params['return'])
933 replyto = int(inpart.params['in-reply-to'])
993 replyto = int(inpart.params['in-reply-to'])
934 op.records.add('changegroup', {'return': ret}, replyto)
994 op.records.add('changegroup', {'return': ret}, replyto)
935
995
936 @parthandler('b2x:check:heads')
996 @parthandler('b2x:check:heads')
937 def handlecheckheads(op, inpart):
997 def handlecheckheads(op, inpart):
938 """check that head of the repo did not change
998 """check that head of the repo did not change
939
999
940 This is used to detect a push race when using unbundle.
1000 This is used to detect a push race when using unbundle.
941 This replaces the "heads" argument of unbundle."""
1001 This replaces the "heads" argument of unbundle."""
942 h = inpart.read(20)
1002 h = inpart.read(20)
943 heads = []
1003 heads = []
944 while len(h) == 20:
1004 while len(h) == 20:
945 heads.append(h)
1005 heads.append(h)
946 h = inpart.read(20)
1006 h = inpart.read(20)
947 assert not h
1007 assert not h
948 if heads != op.repo.heads():
1008 if heads != op.repo.heads():
949 raise error.PushRaced('repository changed while pushing - '
1009 raise error.PushRaced('repository changed while pushing - '
950 'please try again')
1010 'please try again')
951
1011
952 @parthandler('b2x:output')
1012 @parthandler('b2x:output')
953 def handleoutput(op, inpart):
1013 def handleoutput(op, inpart):
954 """forward output captured on the server to the client"""
1014 """forward output captured on the server to the client"""
955 for line in inpart.read().splitlines():
1015 for line in inpart.read().splitlines():
956 op.ui.write(('remote: %s\n' % line))
1016 op.ui.write(('remote: %s\n' % line))
957
1017
958 @parthandler('b2x:replycaps')
1018 @parthandler('b2x:replycaps')
959 def handlereplycaps(op, inpart):
1019 def handlereplycaps(op, inpart):
960 """Notify that a reply bundle should be created
1020 """Notify that a reply bundle should be created
961
1021
962 The payload contains the capabilities information for the reply"""
1022 The payload contains the capabilities information for the reply"""
963 caps = decodecaps(inpart.read())
1023 caps = decodecaps(inpart.read())
964 if op.reply is None:
1024 if op.reply is None:
965 op.reply = bundle20(op.ui, caps)
1025 op.reply = bundle20(op.ui, caps)
966
1026
967 @parthandler('b2x:error:abort', ('message', 'hint'))
1027 @parthandler('b2x:error:abort', ('message', 'hint'))
968 def handlereplycaps(op, inpart):
1028 def handlereplycaps(op, inpart):
969 """Used to transmit abort error over the wire"""
1029 """Used to transmit abort error over the wire"""
970 raise util.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
1030 raise util.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
971
1031
972 @parthandler('b2x:error:unsupportedcontent', ('parttype', 'params'))
1032 @parthandler('b2x:error:unsupportedcontent', ('parttype', 'params'))
973 def handlereplycaps(op, inpart):
1033 def handlereplycaps(op, inpart):
974 """Used to transmit unknown content error over the wire"""
1034 """Used to transmit unknown content error over the wire"""
975 kwargs = {}
1035 kwargs = {}
976 parttype = inpart.params.get('parttype')
1036 parttype = inpart.params.get('parttype')
977 if parttype is not None:
1037 if parttype is not None:
978 kwargs['parttype'] = parttype
1038 kwargs['parttype'] = parttype
979 params = inpart.params.get('params')
1039 params = inpart.params.get('params')
980 if params is not None:
1040 if params is not None:
981 kwargs['params'] = params.split('\0')
1041 kwargs['params'] = params.split('\0')
982
1042
983 raise error.UnsupportedPartError(**kwargs)
1043 raise error.UnsupportedPartError(**kwargs)
984
1044
985 @parthandler('b2x:error:pushraced', ('message',))
1045 @parthandler('b2x:error:pushraced', ('message',))
986 def handlereplycaps(op, inpart):
1046 def handlereplycaps(op, inpart):
987 """Used to transmit push race error over the wire"""
1047 """Used to transmit push race error over the wire"""
988 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1048 raise error.ResponseError(_('push failed:'), inpart.params['message'])
989
1049
990 @parthandler('b2x:listkeys', ('namespace',))
1050 @parthandler('b2x:listkeys', ('namespace',))
991 def handlelistkeys(op, inpart):
1051 def handlelistkeys(op, inpart):
992 """retrieve pushkey namespace content stored in a bundle2"""
1052 """retrieve pushkey namespace content stored in a bundle2"""
993 namespace = inpart.params['namespace']
1053 namespace = inpart.params['namespace']
994 r = pushkey.decodekeys(inpart.read())
1054 r = pushkey.decodekeys(inpart.read())
995 op.records.add('listkeys', (namespace, r))
1055 op.records.add('listkeys', (namespace, r))
996
1056
997 @parthandler('b2x:pushkey', ('namespace', 'key', 'old', 'new'))
1057 @parthandler('b2x:pushkey', ('namespace', 'key', 'old', 'new'))
998 def handlepushkey(op, inpart):
1058 def handlepushkey(op, inpart):
999 """process a pushkey request"""
1059 """process a pushkey request"""
1000 dec = pushkey.decode
1060 dec = pushkey.decode
1001 namespace = dec(inpart.params['namespace'])
1061 namespace = dec(inpart.params['namespace'])
1002 key = dec(inpart.params['key'])
1062 key = dec(inpart.params['key'])
1003 old = dec(inpart.params['old'])
1063 old = dec(inpart.params['old'])
1004 new = dec(inpart.params['new'])
1064 new = dec(inpart.params['new'])
1005 ret = op.repo.pushkey(namespace, key, old, new)
1065 ret = op.repo.pushkey(namespace, key, old, new)
1006 record = {'namespace': namespace,
1066 record = {'namespace': namespace,
1007 'key': key,
1067 'key': key,
1008 'old': old,
1068 'old': old,
1009 'new': new}
1069 'new': new}
1010 op.records.add('pushkey', record)
1070 op.records.add('pushkey', record)
1011 if op.reply is not None:
1071 if op.reply is not None:
1012 rpart = op.reply.newpart('b2x:reply:pushkey')
1072 rpart = op.reply.newpart('b2x:reply:pushkey')
1013 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1073 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1014 rpart.addparam('return', '%i' % ret, mandatory=False)
1074 rpart.addparam('return', '%i' % ret, mandatory=False)
1015
1075
1016 @parthandler('b2x:reply:pushkey', ('return', 'in-reply-to'))
1076 @parthandler('b2x:reply:pushkey', ('return', 'in-reply-to'))
1017 def handlepushkeyreply(op, inpart):
1077 def handlepushkeyreply(op, inpart):
1018 """retrieve the result of a pushkey request"""
1078 """retrieve the result of a pushkey request"""
1019 ret = int(inpart.params['return'])
1079 ret = int(inpart.params['return'])
1020 partid = int(inpart.params['in-reply-to'])
1080 partid = int(inpart.params['in-reply-to'])
1021 op.records.add('pushkey', {'return': ret}, partid)
1081 op.records.add('pushkey', {'return': ret}, partid)
1022
1082
1023 @parthandler('b2x:obsmarkers')
1083 @parthandler('b2x:obsmarkers')
1024 def handleobsmarker(op, inpart):
1084 def handleobsmarker(op, inpart):
1025 """add a stream of obsmarkers to the repo"""
1085 """add a stream of obsmarkers to the repo"""
1026 tr = op.gettransaction()
1086 tr = op.gettransaction()
1027 new = op.repo.obsstore.mergemarkers(tr, inpart.read())
1087 new = op.repo.obsstore.mergemarkers(tr, inpart.read())
1028 if new:
1088 if new:
1029 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1089 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1030 op.records.add('obsmarkers', {'new': new})
1090 op.records.add('obsmarkers', {'new': new})
1031 if op.reply is not None:
1091 if op.reply is not None:
1032 rpart = op.reply.newpart('b2x:reply:obsmarkers')
1092 rpart = op.reply.newpart('b2x:reply:obsmarkers')
1033 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1093 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1034 rpart.addparam('new', '%i' % new, mandatory=False)
1094 rpart.addparam('new', '%i' % new, mandatory=False)
1035
1095
1036
1096
1037 @parthandler('b2x:reply:obsmarkers', ('new', 'in-reply-to'))
1097 @parthandler('b2x:reply:obsmarkers', ('new', 'in-reply-to'))
1038 def handlepushkeyreply(op, inpart):
1098 def handlepushkeyreply(op, inpart):
1039 """retrieve the result of a pushkey request"""
1099 """retrieve the result of a pushkey request"""
1040 ret = int(inpart.params['new'])
1100 ret = int(inpart.params['new'])
1041 partid = int(inpart.params['in-reply-to'])
1101 partid = int(inpart.params['in-reply-to'])
1042 op.records.add('obsmarkers', {'new': ret}, partid)
1102 op.records.add('obsmarkers', {'new': ret}, partid)
General Comments 0
You need to be logged in to leave comments. Login now