##// END OF EJS Templates
bundle2: add a 'salvageoutput' method on bundle20...
Pierre-Yves David -
r24794:21f2e8f4 default
parent child Browse files
Show More
@@ -1,1248 +1,1260 b''
1 # bundle2.py - generic container format to transmit arbitrary data.
1 # bundle2.py - generic container format to transmit arbitrary data.
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7 """Handling of the new bundle2 format
7 """Handling of the new bundle2 format
8
8
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 payloads in an application agnostic way. It consist in a sequence of "parts"
10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 that will be handed to and processed by the application layer.
11 that will be handed to and processed by the application layer.
12
12
13
13
14 General format architecture
14 General format architecture
15 ===========================
15 ===========================
16
16
17 The format is architectured as follow
17 The format is architectured as follow
18
18
19 - magic string
19 - magic string
20 - stream level parameters
20 - stream level parameters
21 - payload parts (any number)
21 - payload parts (any number)
22 - end of stream marker.
22 - end of stream marker.
23
23
24 the Binary format
24 the Binary format
25 ============================
25 ============================
26
26
27 All numbers are unsigned and big-endian.
27 All numbers are unsigned and big-endian.
28
28
29 stream level parameters
29 stream level parameters
30 ------------------------
30 ------------------------
31
31
32 Binary format is as follow
32 Binary format is as follow
33
33
34 :params size: int32
34 :params size: int32
35
35
36 The total number of Bytes used by the parameters
36 The total number of Bytes used by the parameters
37
37
38 :params value: arbitrary number of Bytes
38 :params value: arbitrary number of Bytes
39
39
40 A blob of `params size` containing the serialized version of all stream level
40 A blob of `params size` containing the serialized version of all stream level
41 parameters.
41 parameters.
42
42
43 The blob contains a space separated list of parameters. Parameters with value
43 The blob contains a space separated list of parameters. Parameters with value
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45
45
46 Empty name are obviously forbidden.
46 Empty name are obviously forbidden.
47
47
48 Name MUST start with a letter. If this first letter is lower case, the
48 Name MUST start with a letter. If this first letter is lower case, the
49 parameter is advisory and can be safely ignored. However when the first
49 parameter is advisory and can be safely ignored. However when the first
50 letter is capital, the parameter is mandatory and the bundling process MUST
50 letter is capital, the parameter is mandatory and the bundling process MUST
51 stop if he is not able to proceed it.
51 stop if he is not able to proceed it.
52
52
53 Stream parameters use a simple textual format for two main reasons:
53 Stream parameters use a simple textual format for two main reasons:
54
54
55 - Stream level parameters should remain simple and we want to discourage any
55 - Stream level parameters should remain simple and we want to discourage any
56 crazy usage.
56 crazy usage.
57 - Textual data allow easy human inspection of a bundle2 header in case of
57 - Textual data allow easy human inspection of a bundle2 header in case of
58 troubles.
58 troubles.
59
59
60 Any Applicative level options MUST go into a bundle2 part instead.
60 Any Applicative level options MUST go into a bundle2 part instead.
61
61
62 Payload part
62 Payload part
63 ------------------------
63 ------------------------
64
64
65 Binary format is as follow
65 Binary format is as follow
66
66
67 :header size: int32
67 :header size: int32
68
68
69 The total number of Bytes used by the part headers. When the header is empty
69 The total number of Bytes used by the part headers. When the header is empty
70 (size = 0) this is interpreted as the end of stream marker.
70 (size = 0) this is interpreted as the end of stream marker.
71
71
72 :header:
72 :header:
73
73
74 The header defines how to interpret the part. It contains two piece of
74 The header defines how to interpret the part. It contains two piece of
75 data: the part type, and the part parameters.
75 data: the part type, and the part parameters.
76
76
77 The part type is used to route an application level handler, that can
77 The part type is used to route an application level handler, that can
78 interpret payload.
78 interpret payload.
79
79
80 Part parameters are passed to the application level handler. They are
80 Part parameters are passed to the application level handler. They are
81 meant to convey information that will help the application level object to
81 meant to convey information that will help the application level object to
82 interpret the part payload.
82 interpret the part payload.
83
83
84 The binary format of the header is has follow
84 The binary format of the header is has follow
85
85
86 :typesize: (one byte)
86 :typesize: (one byte)
87
87
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89
89
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 to this part.
91 to this part.
92
92
93 :parameters:
93 :parameters:
94
94
95 Part's parameter may have arbitrary content, the binary structure is::
95 Part's parameter may have arbitrary content, the binary structure is::
96
96
97 <mandatory-count><advisory-count><param-sizes><param-data>
97 <mandatory-count><advisory-count><param-sizes><param-data>
98
98
99 :mandatory-count: 1 byte, number of mandatory parameters
99 :mandatory-count: 1 byte, number of mandatory parameters
100
100
101 :advisory-count: 1 byte, number of advisory parameters
101 :advisory-count: 1 byte, number of advisory parameters
102
102
103 :param-sizes:
103 :param-sizes:
104
104
105 N couple of bytes, where N is the total number of parameters. Each
105 N couple of bytes, where N is the total number of parameters. Each
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107
107
108 :param-data:
108 :param-data:
109
109
110 A blob of bytes from which each parameter key and value can be
110 A blob of bytes from which each parameter key and value can be
111 retrieved using the list of size couples stored in the previous
111 retrieved using the list of size couples stored in the previous
112 field.
112 field.
113
113
114 Mandatory parameters comes first, then the advisory ones.
114 Mandatory parameters comes first, then the advisory ones.
115
115
116 Each parameter's key MUST be unique within the part.
116 Each parameter's key MUST be unique within the part.
117
117
118 :payload:
118 :payload:
119
119
120 payload is a series of `<chunksize><chunkdata>`.
120 payload is a series of `<chunksize><chunkdata>`.
121
121
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124
124
125 The current implementation always produces either zero or one chunk.
125 The current implementation always produces either zero or one chunk.
126 This is an implementation limitation that will ultimately be lifted.
126 This is an implementation limitation that will ultimately be lifted.
127
127
128 `chunksize` can be negative to trigger special case processing. No such
128 `chunksize` can be negative to trigger special case processing. No such
129 processing is in place yet.
129 processing is in place yet.
130
130
131 Bundle processing
131 Bundle processing
132 ============================
132 ============================
133
133
134 Each part is processed in order using a "part handler". Handler are registered
134 Each part is processed in order using a "part handler". Handler are registered
135 for a certain part type.
135 for a certain part type.
136
136
137 The matching of a part to its handler is case insensitive. The case of the
137 The matching of a part to its handler is case insensitive. The case of the
138 part type is used to know if a part is mandatory or advisory. If the Part type
138 part type is used to know if a part is mandatory or advisory. If the Part type
139 contains any uppercase char it is considered mandatory. When no handler is
139 contains any uppercase char it is considered mandatory. When no handler is
140 known for a Mandatory part, the process is aborted and an exception is raised.
140 known for a Mandatory part, the process is aborted and an exception is raised.
141 If the part is advisory and no handler is known, the part is ignored. When the
141 If the part is advisory and no handler is known, the part is ignored. When the
142 process is aborted, the full bundle is still read from the stream to keep the
142 process is aborted, the full bundle is still read from the stream to keep the
143 channel usable. But none of the part read from an abort are processed. In the
143 channel usable. But none of the part read from an abort are processed. In the
144 future, dropping the stream may become an option for channel we do not care to
144 future, dropping the stream may become an option for channel we do not care to
145 preserve.
145 preserve.
146 """
146 """
147
147
148 import errno
148 import errno
149 import sys
149 import sys
150 import util
150 import util
151 import struct
151 import struct
152 import urllib
152 import urllib
153 import string
153 import string
154 import obsolete
154 import obsolete
155 import pushkey
155 import pushkey
156 import url
156 import url
157 import re
157 import re
158
158
159 import changegroup, error
159 import changegroup, error
160 from i18n import _
160 from i18n import _
161
161
162 _pack = struct.pack
162 _pack = struct.pack
163 _unpack = struct.unpack
163 _unpack = struct.unpack
164
164
165 _fstreamparamsize = '>i'
165 _fstreamparamsize = '>i'
166 _fpartheadersize = '>i'
166 _fpartheadersize = '>i'
167 _fparttypesize = '>B'
167 _fparttypesize = '>B'
168 _fpartid = '>I'
168 _fpartid = '>I'
169 _fpayloadsize = '>i'
169 _fpayloadsize = '>i'
170 _fpartparamcount = '>BB'
170 _fpartparamcount = '>BB'
171
171
172 preferedchunksize = 4096
172 preferedchunksize = 4096
173
173
174 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
174 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
175
175
176 def validateparttype(parttype):
176 def validateparttype(parttype):
177 """raise ValueError if a parttype contains invalid character"""
177 """raise ValueError if a parttype contains invalid character"""
178 if _parttypeforbidden.search(parttype):
178 if _parttypeforbidden.search(parttype):
179 raise ValueError(parttype)
179 raise ValueError(parttype)
180
180
181 def _makefpartparamsizes(nbparams):
181 def _makefpartparamsizes(nbparams):
182 """return a struct format to read part parameter sizes
182 """return a struct format to read part parameter sizes
183
183
184 The number parameters is variable so we need to build that format
184 The number parameters is variable so we need to build that format
185 dynamically.
185 dynamically.
186 """
186 """
187 return '>'+('BB'*nbparams)
187 return '>'+('BB'*nbparams)
188
188
189 parthandlermapping = {}
189 parthandlermapping = {}
190
190
191 def parthandler(parttype, params=()):
191 def parthandler(parttype, params=()):
192 """decorator that register a function as a bundle2 part handler
192 """decorator that register a function as a bundle2 part handler
193
193
194 eg::
194 eg::
195
195
196 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
196 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
197 def myparttypehandler(...):
197 def myparttypehandler(...):
198 '''process a part of type "my part".'''
198 '''process a part of type "my part".'''
199 ...
199 ...
200 """
200 """
201 validateparttype(parttype)
201 validateparttype(parttype)
202 def _decorator(func):
202 def _decorator(func):
203 lparttype = parttype.lower() # enforce lower case matching.
203 lparttype = parttype.lower() # enforce lower case matching.
204 assert lparttype not in parthandlermapping
204 assert lparttype not in parthandlermapping
205 parthandlermapping[lparttype] = func
205 parthandlermapping[lparttype] = func
206 func.params = frozenset(params)
206 func.params = frozenset(params)
207 return func
207 return func
208 return _decorator
208 return _decorator
209
209
210 class unbundlerecords(object):
210 class unbundlerecords(object):
211 """keep record of what happens during and unbundle
211 """keep record of what happens during and unbundle
212
212
213 New records are added using `records.add('cat', obj)`. Where 'cat' is a
213 New records are added using `records.add('cat', obj)`. Where 'cat' is a
214 category of record and obj is an arbitrary object.
214 category of record and obj is an arbitrary object.
215
215
216 `records['cat']` will return all entries of this category 'cat'.
216 `records['cat']` will return all entries of this category 'cat'.
217
217
218 Iterating on the object itself will yield `('category', obj)` tuples
218 Iterating on the object itself will yield `('category', obj)` tuples
219 for all entries.
219 for all entries.
220
220
221 All iterations happens in chronological order.
221 All iterations happens in chronological order.
222 """
222 """
223
223
224 def __init__(self):
224 def __init__(self):
225 self._categories = {}
225 self._categories = {}
226 self._sequences = []
226 self._sequences = []
227 self._replies = {}
227 self._replies = {}
228
228
229 def add(self, category, entry, inreplyto=None):
229 def add(self, category, entry, inreplyto=None):
230 """add a new record of a given category.
230 """add a new record of a given category.
231
231
232 The entry can then be retrieved in the list returned by
232 The entry can then be retrieved in the list returned by
233 self['category']."""
233 self['category']."""
234 self._categories.setdefault(category, []).append(entry)
234 self._categories.setdefault(category, []).append(entry)
235 self._sequences.append((category, entry))
235 self._sequences.append((category, entry))
236 if inreplyto is not None:
236 if inreplyto is not None:
237 self.getreplies(inreplyto).add(category, entry)
237 self.getreplies(inreplyto).add(category, entry)
238
238
239 def getreplies(self, partid):
239 def getreplies(self, partid):
240 """get the records that are replies to a specific part"""
240 """get the records that are replies to a specific part"""
241 return self._replies.setdefault(partid, unbundlerecords())
241 return self._replies.setdefault(partid, unbundlerecords())
242
242
243 def __getitem__(self, cat):
243 def __getitem__(self, cat):
244 return tuple(self._categories.get(cat, ()))
244 return tuple(self._categories.get(cat, ()))
245
245
246 def __iter__(self):
246 def __iter__(self):
247 return iter(self._sequences)
247 return iter(self._sequences)
248
248
249 def __len__(self):
249 def __len__(self):
250 return len(self._sequences)
250 return len(self._sequences)
251
251
252 def __nonzero__(self):
252 def __nonzero__(self):
253 return bool(self._sequences)
253 return bool(self._sequences)
254
254
255 class bundleoperation(object):
255 class bundleoperation(object):
256 """an object that represents a single bundling process
256 """an object that represents a single bundling process
257
257
258 Its purpose is to carry unbundle-related objects and states.
258 Its purpose is to carry unbundle-related objects and states.
259
259
260 A new object should be created at the beginning of each bundle processing.
260 A new object should be created at the beginning of each bundle processing.
261 The object is to be returned by the processing function.
261 The object is to be returned by the processing function.
262
262
263 The object has very little content now it will ultimately contain:
263 The object has very little content now it will ultimately contain:
264 * an access to the repo the bundle is applied to,
264 * an access to the repo the bundle is applied to,
265 * a ui object,
265 * a ui object,
266 * a way to retrieve a transaction to add changes to the repo,
266 * a way to retrieve a transaction to add changes to the repo,
267 * a way to record the result of processing each part,
267 * a way to record the result of processing each part,
268 * a way to construct a bundle response when applicable.
268 * a way to construct a bundle response when applicable.
269 """
269 """
270
270
271 def __init__(self, repo, transactiongetter):
271 def __init__(self, repo, transactiongetter):
272 self.repo = repo
272 self.repo = repo
273 self.ui = repo.ui
273 self.ui = repo.ui
274 self.records = unbundlerecords()
274 self.records = unbundlerecords()
275 self.gettransaction = transactiongetter
275 self.gettransaction = transactiongetter
276 self.reply = None
276 self.reply = None
277
277
278 class TransactionUnavailable(RuntimeError):
278 class TransactionUnavailable(RuntimeError):
279 pass
279 pass
280
280
281 def _notransaction():
281 def _notransaction():
282 """default method to get a transaction while processing a bundle
282 """default method to get a transaction while processing a bundle
283
283
284 Raise an exception to highlight the fact that no transaction was expected
284 Raise an exception to highlight the fact that no transaction was expected
285 to be created"""
285 to be created"""
286 raise TransactionUnavailable()
286 raise TransactionUnavailable()
287
287
288 def processbundle(repo, unbundler, transactiongetter=None):
288 def processbundle(repo, unbundler, transactiongetter=None):
289 """This function process a bundle, apply effect to/from a repo
289 """This function process a bundle, apply effect to/from a repo
290
290
291 It iterates over each part then searches for and uses the proper handling
291 It iterates over each part then searches for and uses the proper handling
292 code to process the part. Parts are processed in order.
292 code to process the part. Parts are processed in order.
293
293
294 This is very early version of this function that will be strongly reworked
294 This is very early version of this function that will be strongly reworked
295 before final usage.
295 before final usage.
296
296
297 Unknown Mandatory part will abort the process.
297 Unknown Mandatory part will abort the process.
298 """
298 """
299 if transactiongetter is None:
299 if transactiongetter is None:
300 transactiongetter = _notransaction
300 transactiongetter = _notransaction
301 op = bundleoperation(repo, transactiongetter)
301 op = bundleoperation(repo, transactiongetter)
302 # todo:
302 # todo:
303 # - replace this is a init function soon.
303 # - replace this is a init function soon.
304 # - exception catching
304 # - exception catching
305 unbundler.params
305 unbundler.params
306 iterparts = unbundler.iterparts()
306 iterparts = unbundler.iterparts()
307 part = None
307 part = None
308 try:
308 try:
309 for part in iterparts:
309 for part in iterparts:
310 _processpart(op, part)
310 _processpart(op, part)
311 except Exception, exc:
311 except Exception, exc:
312 for part in iterparts:
312 for part in iterparts:
313 # consume the bundle content
313 # consume the bundle content
314 part.seek(0, 2)
314 part.seek(0, 2)
315 # Small hack to let caller code distinguish exceptions from bundle2
315 # Small hack to let caller code distinguish exceptions from bundle2
316 # processing from processing the old format. This is mostly
316 # processing from processing the old format. This is mostly
317 # needed to handle different return codes to unbundle according to the
317 # needed to handle different return codes to unbundle according to the
318 # type of bundle. We should probably clean up or drop this return code
318 # type of bundle. We should probably clean up or drop this return code
319 # craziness in a future version.
319 # craziness in a future version.
320 exc.duringunbundle2 = True
320 exc.duringunbundle2 = True
321 raise
321 raise
322 return op
322 return op
323
323
324 def _processpart(op, part):
324 def _processpart(op, part):
325 """process a single part from a bundle
325 """process a single part from a bundle
326
326
327 The part is guaranteed to have been fully consumed when the function exits
327 The part is guaranteed to have been fully consumed when the function exits
328 (even if an exception is raised)."""
328 (even if an exception is raised)."""
329 try:
329 try:
330 try:
330 try:
331 handler = parthandlermapping.get(part.type)
331 handler = parthandlermapping.get(part.type)
332 if handler is None:
332 if handler is None:
333 raise error.UnsupportedPartError(parttype=part.type)
333 raise error.UnsupportedPartError(parttype=part.type)
334 op.ui.debug('found a handler for part %r\n' % part.type)
334 op.ui.debug('found a handler for part %r\n' % part.type)
335 unknownparams = part.mandatorykeys - handler.params
335 unknownparams = part.mandatorykeys - handler.params
336 if unknownparams:
336 if unknownparams:
337 unknownparams = list(unknownparams)
337 unknownparams = list(unknownparams)
338 unknownparams.sort()
338 unknownparams.sort()
339 raise error.UnsupportedPartError(parttype=part.type,
339 raise error.UnsupportedPartError(parttype=part.type,
340 params=unknownparams)
340 params=unknownparams)
341 except error.UnsupportedPartError, exc:
341 except error.UnsupportedPartError, exc:
342 if part.mandatory: # mandatory parts
342 if part.mandatory: # mandatory parts
343 raise
343 raise
344 op.ui.debug('ignoring unsupported advisory part %s\n' % exc)
344 op.ui.debug('ignoring unsupported advisory part %s\n' % exc)
345 return # skip to part processing
345 return # skip to part processing
346
346
347 # handler is called outside the above try block so that we don't
347 # handler is called outside the above try block so that we don't
348 # risk catching KeyErrors from anything other than the
348 # risk catching KeyErrors from anything other than the
349 # parthandlermapping lookup (any KeyError raised by handler()
349 # parthandlermapping lookup (any KeyError raised by handler()
350 # itself represents a defect of a different variety).
350 # itself represents a defect of a different variety).
351 output = None
351 output = None
352 if op.reply is not None:
352 if op.reply is not None:
353 op.ui.pushbuffer(error=True)
353 op.ui.pushbuffer(error=True)
354 output = ''
354 output = ''
355 try:
355 try:
356 handler(op, part)
356 handler(op, part)
357 finally:
357 finally:
358 if output is not None:
358 if output is not None:
359 output = op.ui.popbuffer()
359 output = op.ui.popbuffer()
360 if output:
360 if output:
361 outpart = op.reply.newpart('output', data=output,
361 outpart = op.reply.newpart('output', data=output,
362 mandatory=False)
362 mandatory=False)
363 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
363 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
364 finally:
364 finally:
365 # consume the part content to not corrupt the stream.
365 # consume the part content to not corrupt the stream.
366 part.seek(0, 2)
366 part.seek(0, 2)
367
367
368
368
369 def decodecaps(blob):
369 def decodecaps(blob):
370 """decode a bundle2 caps bytes blob into a dictionary
370 """decode a bundle2 caps bytes blob into a dictionary
371
371
372 The blob is a list of capabilities (one per line)
372 The blob is a list of capabilities (one per line)
373 Capabilities may have values using a line of the form::
373 Capabilities may have values using a line of the form::
374
374
375 capability=value1,value2,value3
375 capability=value1,value2,value3
376
376
377 The values are always a list."""
377 The values are always a list."""
378 caps = {}
378 caps = {}
379 for line in blob.splitlines():
379 for line in blob.splitlines():
380 if not line:
380 if not line:
381 continue
381 continue
382 if '=' not in line:
382 if '=' not in line:
383 key, vals = line, ()
383 key, vals = line, ()
384 else:
384 else:
385 key, vals = line.split('=', 1)
385 key, vals = line.split('=', 1)
386 vals = vals.split(',')
386 vals = vals.split(',')
387 key = urllib.unquote(key)
387 key = urllib.unquote(key)
388 vals = [urllib.unquote(v) for v in vals]
388 vals = [urllib.unquote(v) for v in vals]
389 caps[key] = vals
389 caps[key] = vals
390 return caps
390 return caps
391
391
392 def encodecaps(caps):
392 def encodecaps(caps):
393 """encode a bundle2 caps dictionary into a bytes blob"""
393 """encode a bundle2 caps dictionary into a bytes blob"""
394 chunks = []
394 chunks = []
395 for ca in sorted(caps):
395 for ca in sorted(caps):
396 vals = caps[ca]
396 vals = caps[ca]
397 ca = urllib.quote(ca)
397 ca = urllib.quote(ca)
398 vals = [urllib.quote(v) for v in vals]
398 vals = [urllib.quote(v) for v in vals]
399 if vals:
399 if vals:
400 ca = "%s=%s" % (ca, ','.join(vals))
400 ca = "%s=%s" % (ca, ','.join(vals))
401 chunks.append(ca)
401 chunks.append(ca)
402 return '\n'.join(chunks)
402 return '\n'.join(chunks)
403
403
404 class bundle20(object):
404 class bundle20(object):
405 """represent an outgoing bundle2 container
405 """represent an outgoing bundle2 container
406
406
407 Use the `addparam` method to add stream level parameter. and `newpart` to
407 Use the `addparam` method to add stream level parameter. and `newpart` to
408 populate it. Then call `getchunks` to retrieve all the binary chunks of
408 populate it. Then call `getchunks` to retrieve all the binary chunks of
409 data that compose the bundle2 container."""
409 data that compose the bundle2 container."""
410
410
411 _magicstring = 'HG20'
411 _magicstring = 'HG20'
412
412
413 def __init__(self, ui, capabilities=()):
413 def __init__(self, ui, capabilities=()):
414 self.ui = ui
414 self.ui = ui
415 self._params = []
415 self._params = []
416 self._parts = []
416 self._parts = []
417 self.capabilities = dict(capabilities)
417 self.capabilities = dict(capabilities)
418
418
419 @property
419 @property
420 def nbparts(self):
420 def nbparts(self):
421 """total number of parts added to the bundler"""
421 """total number of parts added to the bundler"""
422 return len(self._parts)
422 return len(self._parts)
423
423
424 # methods used to defines the bundle2 content
424 # methods used to defines the bundle2 content
425 def addparam(self, name, value=None):
425 def addparam(self, name, value=None):
426 """add a stream level parameter"""
426 """add a stream level parameter"""
427 if not name:
427 if not name:
428 raise ValueError('empty parameter name')
428 raise ValueError('empty parameter name')
429 if name[0] not in string.letters:
429 if name[0] not in string.letters:
430 raise ValueError('non letter first character: %r' % name)
430 raise ValueError('non letter first character: %r' % name)
431 self._params.append((name, value))
431 self._params.append((name, value))
432
432
433 def addpart(self, part):
433 def addpart(self, part):
434 """add a new part to the bundle2 container
434 """add a new part to the bundle2 container
435
435
436 Parts contains the actual applicative payload."""
436 Parts contains the actual applicative payload."""
437 assert part.id is None
437 assert part.id is None
438 part.id = len(self._parts) # very cheap counter
438 part.id = len(self._parts) # very cheap counter
439 self._parts.append(part)
439 self._parts.append(part)
440
440
441 def newpart(self, typeid, *args, **kwargs):
441 def newpart(self, typeid, *args, **kwargs):
442 """create a new part and add it to the containers
442 """create a new part and add it to the containers
443
443
444 As the part is directly added to the containers. For now, this means
444 As the part is directly added to the containers. For now, this means
445 that any failure to properly initialize the part after calling
445 that any failure to properly initialize the part after calling
446 ``newpart`` should result in a failure of the whole bundling process.
446 ``newpart`` should result in a failure of the whole bundling process.
447
447
448 You can still fall back to manually create and add if you need better
448 You can still fall back to manually create and add if you need better
449 control."""
449 control."""
450 part = bundlepart(typeid, *args, **kwargs)
450 part = bundlepart(typeid, *args, **kwargs)
451 self.addpart(part)
451 self.addpart(part)
452 return part
452 return part
453
453
454 # methods used to generate the bundle2 stream
454 # methods used to generate the bundle2 stream
455 def getchunks(self):
455 def getchunks(self):
456 self.ui.debug('start emission of %s stream\n' % self._magicstring)
456 self.ui.debug('start emission of %s stream\n' % self._magicstring)
457 yield self._magicstring
457 yield self._magicstring
458 param = self._paramchunk()
458 param = self._paramchunk()
459 self.ui.debug('bundle parameter: %s\n' % param)
459 self.ui.debug('bundle parameter: %s\n' % param)
460 yield _pack(_fstreamparamsize, len(param))
460 yield _pack(_fstreamparamsize, len(param))
461 if param:
461 if param:
462 yield param
462 yield param
463
463
464 self.ui.debug('start of parts\n')
464 self.ui.debug('start of parts\n')
465 for part in self._parts:
465 for part in self._parts:
466 self.ui.debug('bundle part: "%s"\n' % part.type)
466 self.ui.debug('bundle part: "%s"\n' % part.type)
467 for chunk in part.getchunks():
467 for chunk in part.getchunks():
468 yield chunk
468 yield chunk
469 self.ui.debug('end of bundle\n')
469 self.ui.debug('end of bundle\n')
470 yield _pack(_fpartheadersize, 0)
470 yield _pack(_fpartheadersize, 0)
471
471
472 def _paramchunk(self):
472 def _paramchunk(self):
473 """return a encoded version of all stream parameters"""
473 """return a encoded version of all stream parameters"""
474 blocks = []
474 blocks = []
475 for par, value in self._params:
475 for par, value in self._params:
476 par = urllib.quote(par)
476 par = urllib.quote(par)
477 if value is not None:
477 if value is not None:
478 value = urllib.quote(value)
478 value = urllib.quote(value)
479 par = '%s=%s' % (par, value)
479 par = '%s=%s' % (par, value)
480 blocks.append(par)
480 blocks.append(par)
481 return ' '.join(blocks)
481 return ' '.join(blocks)
482
482
483 def salvageoutput(self):
484 """return a list with a copy of all output parts in the bundle
485
486 This is meant to be used during error handling to make sure we preserve
487 server output"""
488 salvaged = []
489 for part in self._parts:
490 if part.type.startswith('output'):
491 salvaged.append(part.copy())
492 return salvaged
493
494
483 class unpackermixin(object):
495 class unpackermixin(object):
484 """A mixin to extract bytes and struct data from a stream"""
496 """A mixin to extract bytes and struct data from a stream"""
485
497
486 def __init__(self, fp):
498 def __init__(self, fp):
487 self._fp = fp
499 self._fp = fp
488 self._seekable = (util.safehasattr(fp, 'seek') and
500 self._seekable = (util.safehasattr(fp, 'seek') and
489 util.safehasattr(fp, 'tell'))
501 util.safehasattr(fp, 'tell'))
490
502
491 def _unpack(self, format):
503 def _unpack(self, format):
492 """unpack this struct format from the stream"""
504 """unpack this struct format from the stream"""
493 data = self._readexact(struct.calcsize(format))
505 data = self._readexact(struct.calcsize(format))
494 return _unpack(format, data)
506 return _unpack(format, data)
495
507
496 def _readexact(self, size):
508 def _readexact(self, size):
497 """read exactly <size> bytes from the stream"""
509 """read exactly <size> bytes from the stream"""
498 return changegroup.readexactly(self._fp, size)
510 return changegroup.readexactly(self._fp, size)
499
511
500 def seek(self, offset, whence=0):
512 def seek(self, offset, whence=0):
501 """move the underlying file pointer"""
513 """move the underlying file pointer"""
502 if self._seekable:
514 if self._seekable:
503 return self._fp.seek(offset, whence)
515 return self._fp.seek(offset, whence)
504 else:
516 else:
505 raise NotImplementedError(_('File pointer is not seekable'))
517 raise NotImplementedError(_('File pointer is not seekable'))
506
518
507 def tell(self):
519 def tell(self):
508 """return the file offset, or None if file is not seekable"""
520 """return the file offset, or None if file is not seekable"""
509 if self._seekable:
521 if self._seekable:
510 try:
522 try:
511 return self._fp.tell()
523 return self._fp.tell()
512 except IOError, e:
524 except IOError, e:
513 if e.errno == errno.ESPIPE:
525 if e.errno == errno.ESPIPE:
514 self._seekable = False
526 self._seekable = False
515 else:
527 else:
516 raise
528 raise
517 return None
529 return None
518
530
519 def close(self):
531 def close(self):
520 """close underlying file"""
532 """close underlying file"""
521 if util.safehasattr(self._fp, 'close'):
533 if util.safehasattr(self._fp, 'close'):
522 return self._fp.close()
534 return self._fp.close()
523
535
524 def getunbundler(ui, fp, header=None):
536 def getunbundler(ui, fp, header=None):
525 """return a valid unbundler object for a given header"""
537 """return a valid unbundler object for a given header"""
526 if header is None:
538 if header is None:
527 header = changegroup.readexactly(fp, 4)
539 header = changegroup.readexactly(fp, 4)
528 magic, version = header[0:2], header[2:4]
540 magic, version = header[0:2], header[2:4]
529 if magic != 'HG':
541 if magic != 'HG':
530 raise util.Abort(_('not a Mercurial bundle'))
542 raise util.Abort(_('not a Mercurial bundle'))
531 unbundlerclass = formatmap.get(version)
543 unbundlerclass = formatmap.get(version)
532 if unbundlerclass is None:
544 if unbundlerclass is None:
533 raise util.Abort(_('unknown bundle version %s') % version)
545 raise util.Abort(_('unknown bundle version %s') % version)
534 unbundler = unbundlerclass(ui, fp)
546 unbundler = unbundlerclass(ui, fp)
535 ui.debug('start processing of %s stream\n' % header)
547 ui.debug('start processing of %s stream\n' % header)
536 return unbundler
548 return unbundler
537
549
538 class unbundle20(unpackermixin):
550 class unbundle20(unpackermixin):
539 """interpret a bundle2 stream
551 """interpret a bundle2 stream
540
552
541 This class is fed with a binary stream and yields parts through its
553 This class is fed with a binary stream and yields parts through its
542 `iterparts` methods."""
554 `iterparts` methods."""
543
555
544 def __init__(self, ui, fp):
556 def __init__(self, ui, fp):
545 """If header is specified, we do not read it out of the stream."""
557 """If header is specified, we do not read it out of the stream."""
546 self.ui = ui
558 self.ui = ui
547 super(unbundle20, self).__init__(fp)
559 super(unbundle20, self).__init__(fp)
548
560
549 @util.propertycache
561 @util.propertycache
550 def params(self):
562 def params(self):
551 """dictionary of stream level parameters"""
563 """dictionary of stream level parameters"""
552 self.ui.debug('reading bundle2 stream parameters\n')
564 self.ui.debug('reading bundle2 stream parameters\n')
553 params = {}
565 params = {}
554 paramssize = self._unpack(_fstreamparamsize)[0]
566 paramssize = self._unpack(_fstreamparamsize)[0]
555 if paramssize < 0:
567 if paramssize < 0:
556 raise error.BundleValueError('negative bundle param size: %i'
568 raise error.BundleValueError('negative bundle param size: %i'
557 % paramssize)
569 % paramssize)
558 if paramssize:
570 if paramssize:
559 for p in self._readexact(paramssize).split(' '):
571 for p in self._readexact(paramssize).split(' '):
560 p = p.split('=', 1)
572 p = p.split('=', 1)
561 p = [urllib.unquote(i) for i in p]
573 p = [urllib.unquote(i) for i in p]
562 if len(p) < 2:
574 if len(p) < 2:
563 p.append(None)
575 p.append(None)
564 self._processparam(*p)
576 self._processparam(*p)
565 params[p[0]] = p[1]
577 params[p[0]] = p[1]
566 return params
578 return params
567
579
568 def _processparam(self, name, value):
580 def _processparam(self, name, value):
569 """process a parameter, applying its effect if needed
581 """process a parameter, applying its effect if needed
570
582
571 Parameter starting with a lower case letter are advisory and will be
583 Parameter starting with a lower case letter are advisory and will be
572 ignored when unknown. Those starting with an upper case letter are
584 ignored when unknown. Those starting with an upper case letter are
573 mandatory and will this function will raise a KeyError when unknown.
585 mandatory and will this function will raise a KeyError when unknown.
574
586
575 Note: no option are currently supported. Any input will be either
587 Note: no option are currently supported. Any input will be either
576 ignored or failing.
588 ignored or failing.
577 """
589 """
578 if not name:
590 if not name:
579 raise ValueError('empty parameter name')
591 raise ValueError('empty parameter name')
580 if name[0] not in string.letters:
592 if name[0] not in string.letters:
581 raise ValueError('non letter first character: %r' % name)
593 raise ValueError('non letter first character: %r' % name)
582 # Some logic will be later added here to try to process the option for
594 # Some logic will be later added here to try to process the option for
583 # a dict of known parameter.
595 # a dict of known parameter.
584 if name[0].islower():
596 if name[0].islower():
585 self.ui.debug("ignoring unknown parameter %r\n" % name)
597 self.ui.debug("ignoring unknown parameter %r\n" % name)
586 else:
598 else:
587 raise error.UnsupportedPartError(params=(name,))
599 raise error.UnsupportedPartError(params=(name,))
588
600
589
601
590 def iterparts(self):
602 def iterparts(self):
591 """yield all parts contained in the stream"""
603 """yield all parts contained in the stream"""
592 # make sure param have been loaded
604 # make sure param have been loaded
593 self.params
605 self.params
594 self.ui.debug('start extraction of bundle2 parts\n')
606 self.ui.debug('start extraction of bundle2 parts\n')
595 headerblock = self._readpartheader()
607 headerblock = self._readpartheader()
596 while headerblock is not None:
608 while headerblock is not None:
597 part = unbundlepart(self.ui, headerblock, self._fp)
609 part = unbundlepart(self.ui, headerblock, self._fp)
598 yield part
610 yield part
599 part.seek(0, 2)
611 part.seek(0, 2)
600 headerblock = self._readpartheader()
612 headerblock = self._readpartheader()
601 self.ui.debug('end of bundle2 stream\n')
613 self.ui.debug('end of bundle2 stream\n')
602
614
603 def _readpartheader(self):
615 def _readpartheader(self):
604 """reads a part header size and return the bytes blob
616 """reads a part header size and return the bytes blob
605
617
606 returns None if empty"""
618 returns None if empty"""
607 headersize = self._unpack(_fpartheadersize)[0]
619 headersize = self._unpack(_fpartheadersize)[0]
608 if headersize < 0:
620 if headersize < 0:
609 raise error.BundleValueError('negative part header size: %i'
621 raise error.BundleValueError('negative part header size: %i'
610 % headersize)
622 % headersize)
611 self.ui.debug('part header size: %i\n' % headersize)
623 self.ui.debug('part header size: %i\n' % headersize)
612 if headersize:
624 if headersize:
613 return self._readexact(headersize)
625 return self._readexact(headersize)
614 return None
626 return None
615
627
616 def compressed(self):
628 def compressed(self):
617 return False
629 return False
618
630
619 formatmap = {'20': unbundle20}
631 formatmap = {'20': unbundle20}
620
632
621 class bundlepart(object):
633 class bundlepart(object):
622 """A bundle2 part contains application level payload
634 """A bundle2 part contains application level payload
623
635
624 The part `type` is used to route the part to the application level
636 The part `type` is used to route the part to the application level
625 handler.
637 handler.
626
638
627 The part payload is contained in ``part.data``. It could be raw bytes or a
639 The part payload is contained in ``part.data``. It could be raw bytes or a
628 generator of byte chunks.
640 generator of byte chunks.
629
641
630 You can add parameters to the part using the ``addparam`` method.
642 You can add parameters to the part using the ``addparam`` method.
631 Parameters can be either mandatory (default) or advisory. Remote side
643 Parameters can be either mandatory (default) or advisory. Remote side
632 should be able to safely ignore the advisory ones.
644 should be able to safely ignore the advisory ones.
633
645
634 Both data and parameters cannot be modified after the generation has begun.
646 Both data and parameters cannot be modified after the generation has begun.
635 """
647 """
636
648
637 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
649 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
638 data='', mandatory=True):
650 data='', mandatory=True):
639 validateparttype(parttype)
651 validateparttype(parttype)
640 self.id = None
652 self.id = None
641 self.type = parttype
653 self.type = parttype
642 self._data = data
654 self._data = data
643 self._mandatoryparams = list(mandatoryparams)
655 self._mandatoryparams = list(mandatoryparams)
644 self._advisoryparams = list(advisoryparams)
656 self._advisoryparams = list(advisoryparams)
645 # checking for duplicated entries
657 # checking for duplicated entries
646 self._seenparams = set()
658 self._seenparams = set()
647 for pname, __ in self._mandatoryparams + self._advisoryparams:
659 for pname, __ in self._mandatoryparams + self._advisoryparams:
648 if pname in self._seenparams:
660 if pname in self._seenparams:
649 raise RuntimeError('duplicated params: %s' % pname)
661 raise RuntimeError('duplicated params: %s' % pname)
650 self._seenparams.add(pname)
662 self._seenparams.add(pname)
651 # status of the part's generation:
663 # status of the part's generation:
652 # - None: not started,
664 # - None: not started,
653 # - False: currently generated,
665 # - False: currently generated,
654 # - True: generation done.
666 # - True: generation done.
655 self._generated = None
667 self._generated = None
656 self.mandatory = mandatory
668 self.mandatory = mandatory
657
669
658 def copy(self):
670 def copy(self):
659 """return a copy of the part
671 """return a copy of the part
660
672
661 The new part have the very same content but no partid assigned yet.
673 The new part have the very same content but no partid assigned yet.
662 Parts with generated data cannot be copied."""
674 Parts with generated data cannot be copied."""
663 assert not util.safehasattr(self.data, 'next')
675 assert not util.safehasattr(self.data, 'next')
664 return self.__class__(self.type, self._mandatoryparams,
676 return self.__class__(self.type, self._mandatoryparams,
665 self._advisoryparams, self._data, self.mandatory)
677 self._advisoryparams, self._data, self.mandatory)
666
678
667 # methods used to defines the part content
679 # methods used to defines the part content
668 def __setdata(self, data):
680 def __setdata(self, data):
669 if self._generated is not None:
681 if self._generated is not None:
670 raise error.ReadOnlyPartError('part is being generated')
682 raise error.ReadOnlyPartError('part is being generated')
671 self._data = data
683 self._data = data
672 def __getdata(self):
684 def __getdata(self):
673 return self._data
685 return self._data
674 data = property(__getdata, __setdata)
686 data = property(__getdata, __setdata)
675
687
676 @property
688 @property
677 def mandatoryparams(self):
689 def mandatoryparams(self):
678 # make it an immutable tuple to force people through ``addparam``
690 # make it an immutable tuple to force people through ``addparam``
679 return tuple(self._mandatoryparams)
691 return tuple(self._mandatoryparams)
680
692
681 @property
693 @property
682 def advisoryparams(self):
694 def advisoryparams(self):
683 # make it an immutable tuple to force people through ``addparam``
695 # make it an immutable tuple to force people through ``addparam``
684 return tuple(self._advisoryparams)
696 return tuple(self._advisoryparams)
685
697
686 def addparam(self, name, value='', mandatory=True):
698 def addparam(self, name, value='', mandatory=True):
687 if self._generated is not None:
699 if self._generated is not None:
688 raise error.ReadOnlyPartError('part is being generated')
700 raise error.ReadOnlyPartError('part is being generated')
689 if name in self._seenparams:
701 if name in self._seenparams:
690 raise ValueError('duplicated params: %s' % name)
702 raise ValueError('duplicated params: %s' % name)
691 self._seenparams.add(name)
703 self._seenparams.add(name)
692 params = self._advisoryparams
704 params = self._advisoryparams
693 if mandatory:
705 if mandatory:
694 params = self._mandatoryparams
706 params = self._mandatoryparams
695 params.append((name, value))
707 params.append((name, value))
696
708
697 # methods used to generates the bundle2 stream
709 # methods used to generates the bundle2 stream
698 def getchunks(self):
710 def getchunks(self):
699 if self._generated is not None:
711 if self._generated is not None:
700 raise RuntimeError('part can only be consumed once')
712 raise RuntimeError('part can only be consumed once')
701 self._generated = False
713 self._generated = False
702 #### header
714 #### header
703 if self.mandatory:
715 if self.mandatory:
704 parttype = self.type.upper()
716 parttype = self.type.upper()
705 else:
717 else:
706 parttype = self.type.lower()
718 parttype = self.type.lower()
707 ## parttype
719 ## parttype
708 header = [_pack(_fparttypesize, len(parttype)),
720 header = [_pack(_fparttypesize, len(parttype)),
709 parttype, _pack(_fpartid, self.id),
721 parttype, _pack(_fpartid, self.id),
710 ]
722 ]
711 ## parameters
723 ## parameters
712 # count
724 # count
713 manpar = self.mandatoryparams
725 manpar = self.mandatoryparams
714 advpar = self.advisoryparams
726 advpar = self.advisoryparams
715 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
727 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
716 # size
728 # size
717 parsizes = []
729 parsizes = []
718 for key, value in manpar:
730 for key, value in manpar:
719 parsizes.append(len(key))
731 parsizes.append(len(key))
720 parsizes.append(len(value))
732 parsizes.append(len(value))
721 for key, value in advpar:
733 for key, value in advpar:
722 parsizes.append(len(key))
734 parsizes.append(len(key))
723 parsizes.append(len(value))
735 parsizes.append(len(value))
724 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
736 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
725 header.append(paramsizes)
737 header.append(paramsizes)
726 # key, value
738 # key, value
727 for key, value in manpar:
739 for key, value in manpar:
728 header.append(key)
740 header.append(key)
729 header.append(value)
741 header.append(value)
730 for key, value in advpar:
742 for key, value in advpar:
731 header.append(key)
743 header.append(key)
732 header.append(value)
744 header.append(value)
733 ## finalize header
745 ## finalize header
734 headerchunk = ''.join(header)
746 headerchunk = ''.join(header)
735 yield _pack(_fpartheadersize, len(headerchunk))
747 yield _pack(_fpartheadersize, len(headerchunk))
736 yield headerchunk
748 yield headerchunk
737 ## payload
749 ## payload
738 try:
750 try:
739 for chunk in self._payloadchunks():
751 for chunk in self._payloadchunks():
740 yield _pack(_fpayloadsize, len(chunk))
752 yield _pack(_fpayloadsize, len(chunk))
741 yield chunk
753 yield chunk
742 except Exception, exc:
754 except Exception, exc:
743 # backup exception data for later
755 # backup exception data for later
744 exc_info = sys.exc_info()
756 exc_info = sys.exc_info()
745 msg = 'unexpected error: %s' % exc
757 msg = 'unexpected error: %s' % exc
746 interpart = bundlepart('error:abort', [('message', msg)],
758 interpart = bundlepart('error:abort', [('message', msg)],
747 mandatory=False)
759 mandatory=False)
748 interpart.id = 0
760 interpart.id = 0
749 yield _pack(_fpayloadsize, -1)
761 yield _pack(_fpayloadsize, -1)
750 for chunk in interpart.getchunks():
762 for chunk in interpart.getchunks():
751 yield chunk
763 yield chunk
752 # abort current part payload
764 # abort current part payload
753 yield _pack(_fpayloadsize, 0)
765 yield _pack(_fpayloadsize, 0)
754 raise exc_info[0], exc_info[1], exc_info[2]
766 raise exc_info[0], exc_info[1], exc_info[2]
755 # end of payload
767 # end of payload
756 yield _pack(_fpayloadsize, 0)
768 yield _pack(_fpayloadsize, 0)
757 self._generated = True
769 self._generated = True
758
770
759 def _payloadchunks(self):
771 def _payloadchunks(self):
760 """yield chunks of a the part payload
772 """yield chunks of a the part payload
761
773
762 Exists to handle the different methods to provide data to a part."""
774 Exists to handle the different methods to provide data to a part."""
763 # we only support fixed size data now.
775 # we only support fixed size data now.
764 # This will be improved in the future.
776 # This will be improved in the future.
765 if util.safehasattr(self.data, 'next'):
777 if util.safehasattr(self.data, 'next'):
766 buff = util.chunkbuffer(self.data)
778 buff = util.chunkbuffer(self.data)
767 chunk = buff.read(preferedchunksize)
779 chunk = buff.read(preferedchunksize)
768 while chunk:
780 while chunk:
769 yield chunk
781 yield chunk
770 chunk = buff.read(preferedchunksize)
782 chunk = buff.read(preferedchunksize)
771 elif len(self.data):
783 elif len(self.data):
772 yield self.data
784 yield self.data
773
785
774
786
775 flaginterrupt = -1
787 flaginterrupt = -1
776
788
777 class interrupthandler(unpackermixin):
789 class interrupthandler(unpackermixin):
778 """read one part and process it with restricted capability
790 """read one part and process it with restricted capability
779
791
780 This allows to transmit exception raised on the producer size during part
792 This allows to transmit exception raised on the producer size during part
781 iteration while the consumer is reading a part.
793 iteration while the consumer is reading a part.
782
794
783 Part processed in this manner only have access to a ui object,"""
795 Part processed in this manner only have access to a ui object,"""
784
796
785 def __init__(self, ui, fp):
797 def __init__(self, ui, fp):
786 super(interrupthandler, self).__init__(fp)
798 super(interrupthandler, self).__init__(fp)
787 self.ui = ui
799 self.ui = ui
788
800
789 def _readpartheader(self):
801 def _readpartheader(self):
790 """reads a part header size and return the bytes blob
802 """reads a part header size and return the bytes blob
791
803
792 returns None if empty"""
804 returns None if empty"""
793 headersize = self._unpack(_fpartheadersize)[0]
805 headersize = self._unpack(_fpartheadersize)[0]
794 if headersize < 0:
806 if headersize < 0:
795 raise error.BundleValueError('negative part header size: %i'
807 raise error.BundleValueError('negative part header size: %i'
796 % headersize)
808 % headersize)
797 self.ui.debug('part header size: %i\n' % headersize)
809 self.ui.debug('part header size: %i\n' % headersize)
798 if headersize:
810 if headersize:
799 return self._readexact(headersize)
811 return self._readexact(headersize)
800 return None
812 return None
801
813
802 def __call__(self):
814 def __call__(self):
803 self.ui.debug('bundle2 stream interruption, looking for a part.\n')
815 self.ui.debug('bundle2 stream interruption, looking for a part.\n')
804 headerblock = self._readpartheader()
816 headerblock = self._readpartheader()
805 if headerblock is None:
817 if headerblock is None:
806 self.ui.debug('no part found during interruption.\n')
818 self.ui.debug('no part found during interruption.\n')
807 return
819 return
808 part = unbundlepart(self.ui, headerblock, self._fp)
820 part = unbundlepart(self.ui, headerblock, self._fp)
809 op = interruptoperation(self.ui)
821 op = interruptoperation(self.ui)
810 _processpart(op, part)
822 _processpart(op, part)
811
823
812 class interruptoperation(object):
824 class interruptoperation(object):
813 """A limited operation to be use by part handler during interruption
825 """A limited operation to be use by part handler during interruption
814
826
815 It only have access to an ui object.
827 It only have access to an ui object.
816 """
828 """
817
829
818 def __init__(self, ui):
830 def __init__(self, ui):
819 self.ui = ui
831 self.ui = ui
820 self.reply = None
832 self.reply = None
821
833
822 @property
834 @property
823 def repo(self):
835 def repo(self):
824 raise RuntimeError('no repo access from stream interruption')
836 raise RuntimeError('no repo access from stream interruption')
825
837
826 def gettransaction(self):
838 def gettransaction(self):
827 raise TransactionUnavailable('no repo access from stream interruption')
839 raise TransactionUnavailable('no repo access from stream interruption')
828
840
829 class unbundlepart(unpackermixin):
841 class unbundlepart(unpackermixin):
830 """a bundle part read from a bundle"""
842 """a bundle part read from a bundle"""
831
843
832 def __init__(self, ui, header, fp):
844 def __init__(self, ui, header, fp):
833 super(unbundlepart, self).__init__(fp)
845 super(unbundlepart, self).__init__(fp)
834 self.ui = ui
846 self.ui = ui
835 # unbundle state attr
847 # unbundle state attr
836 self._headerdata = header
848 self._headerdata = header
837 self._headeroffset = 0
849 self._headeroffset = 0
838 self._initialized = False
850 self._initialized = False
839 self.consumed = False
851 self.consumed = False
840 # part data
852 # part data
841 self.id = None
853 self.id = None
842 self.type = None
854 self.type = None
843 self.mandatoryparams = None
855 self.mandatoryparams = None
844 self.advisoryparams = None
856 self.advisoryparams = None
845 self.params = None
857 self.params = None
846 self.mandatorykeys = ()
858 self.mandatorykeys = ()
847 self._payloadstream = None
859 self._payloadstream = None
848 self._readheader()
860 self._readheader()
849 self._mandatory = None
861 self._mandatory = None
850 self._chunkindex = [] #(payload, file) position tuples for chunk starts
862 self._chunkindex = [] #(payload, file) position tuples for chunk starts
851 self._pos = 0
863 self._pos = 0
852
864
853 def _fromheader(self, size):
865 def _fromheader(self, size):
854 """return the next <size> byte from the header"""
866 """return the next <size> byte from the header"""
855 offset = self._headeroffset
867 offset = self._headeroffset
856 data = self._headerdata[offset:(offset + size)]
868 data = self._headerdata[offset:(offset + size)]
857 self._headeroffset = offset + size
869 self._headeroffset = offset + size
858 return data
870 return data
859
871
860 def _unpackheader(self, format):
872 def _unpackheader(self, format):
861 """read given format from header
873 """read given format from header
862
874
863 This automatically compute the size of the format to read."""
875 This automatically compute the size of the format to read."""
864 data = self._fromheader(struct.calcsize(format))
876 data = self._fromheader(struct.calcsize(format))
865 return _unpack(format, data)
877 return _unpack(format, data)
866
878
867 def _initparams(self, mandatoryparams, advisoryparams):
879 def _initparams(self, mandatoryparams, advisoryparams):
868 """internal function to setup all logic related parameters"""
880 """internal function to setup all logic related parameters"""
869 # make it read only to prevent people touching it by mistake.
881 # make it read only to prevent people touching it by mistake.
870 self.mandatoryparams = tuple(mandatoryparams)
882 self.mandatoryparams = tuple(mandatoryparams)
871 self.advisoryparams = tuple(advisoryparams)
883 self.advisoryparams = tuple(advisoryparams)
872 # user friendly UI
884 # user friendly UI
873 self.params = dict(self.mandatoryparams)
885 self.params = dict(self.mandatoryparams)
874 self.params.update(dict(self.advisoryparams))
886 self.params.update(dict(self.advisoryparams))
875 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
887 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
876
888
877 def _payloadchunks(self, chunknum=0):
889 def _payloadchunks(self, chunknum=0):
878 '''seek to specified chunk and start yielding data'''
890 '''seek to specified chunk and start yielding data'''
879 if len(self._chunkindex) == 0:
891 if len(self._chunkindex) == 0:
880 assert chunknum == 0, 'Must start with chunk 0'
892 assert chunknum == 0, 'Must start with chunk 0'
881 self._chunkindex.append((0, super(unbundlepart, self).tell()))
893 self._chunkindex.append((0, super(unbundlepart, self).tell()))
882 else:
894 else:
883 assert chunknum < len(self._chunkindex), \
895 assert chunknum < len(self._chunkindex), \
884 'Unknown chunk %d' % chunknum
896 'Unknown chunk %d' % chunknum
885 super(unbundlepart, self).seek(self._chunkindex[chunknum][1])
897 super(unbundlepart, self).seek(self._chunkindex[chunknum][1])
886
898
887 pos = self._chunkindex[chunknum][0]
899 pos = self._chunkindex[chunknum][0]
888 payloadsize = self._unpack(_fpayloadsize)[0]
900 payloadsize = self._unpack(_fpayloadsize)[0]
889 self.ui.debug('payload chunk size: %i\n' % payloadsize)
901 self.ui.debug('payload chunk size: %i\n' % payloadsize)
890 while payloadsize:
902 while payloadsize:
891 if payloadsize == flaginterrupt:
903 if payloadsize == flaginterrupt:
892 # interruption detection, the handler will now read a
904 # interruption detection, the handler will now read a
893 # single part and process it.
905 # single part and process it.
894 interrupthandler(self.ui, self._fp)()
906 interrupthandler(self.ui, self._fp)()
895 elif payloadsize < 0:
907 elif payloadsize < 0:
896 msg = 'negative payload chunk size: %i' % payloadsize
908 msg = 'negative payload chunk size: %i' % payloadsize
897 raise error.BundleValueError(msg)
909 raise error.BundleValueError(msg)
898 else:
910 else:
899 result = self._readexact(payloadsize)
911 result = self._readexact(payloadsize)
900 chunknum += 1
912 chunknum += 1
901 pos += payloadsize
913 pos += payloadsize
902 if chunknum == len(self._chunkindex):
914 if chunknum == len(self._chunkindex):
903 self._chunkindex.append((pos,
915 self._chunkindex.append((pos,
904 super(unbundlepart, self).tell()))
916 super(unbundlepart, self).tell()))
905 yield result
917 yield result
906 payloadsize = self._unpack(_fpayloadsize)[0]
918 payloadsize = self._unpack(_fpayloadsize)[0]
907 self.ui.debug('payload chunk size: %i\n' % payloadsize)
919 self.ui.debug('payload chunk size: %i\n' % payloadsize)
908
920
909 def _findchunk(self, pos):
921 def _findchunk(self, pos):
910 '''for a given payload position, return a chunk number and offset'''
922 '''for a given payload position, return a chunk number and offset'''
911 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
923 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
912 if ppos == pos:
924 if ppos == pos:
913 return chunk, 0
925 return chunk, 0
914 elif ppos > pos:
926 elif ppos > pos:
915 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
927 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
916 raise ValueError('Unknown chunk')
928 raise ValueError('Unknown chunk')
917
929
918 def _readheader(self):
930 def _readheader(self):
919 """read the header and setup the object"""
931 """read the header and setup the object"""
920 typesize = self._unpackheader(_fparttypesize)[0]
932 typesize = self._unpackheader(_fparttypesize)[0]
921 self.type = self._fromheader(typesize)
933 self.type = self._fromheader(typesize)
922 self.ui.debug('part type: "%s"\n' % self.type)
934 self.ui.debug('part type: "%s"\n' % self.type)
923 self.id = self._unpackheader(_fpartid)[0]
935 self.id = self._unpackheader(_fpartid)[0]
924 self.ui.debug('part id: "%s"\n' % self.id)
936 self.ui.debug('part id: "%s"\n' % self.id)
925 # extract mandatory bit from type
937 # extract mandatory bit from type
926 self.mandatory = (self.type != self.type.lower())
938 self.mandatory = (self.type != self.type.lower())
927 self.type = self.type.lower()
939 self.type = self.type.lower()
928 ## reading parameters
940 ## reading parameters
929 # param count
941 # param count
930 mancount, advcount = self._unpackheader(_fpartparamcount)
942 mancount, advcount = self._unpackheader(_fpartparamcount)
931 self.ui.debug('part parameters: %i\n' % (mancount + advcount))
943 self.ui.debug('part parameters: %i\n' % (mancount + advcount))
932 # param size
944 # param size
933 fparamsizes = _makefpartparamsizes(mancount + advcount)
945 fparamsizes = _makefpartparamsizes(mancount + advcount)
934 paramsizes = self._unpackheader(fparamsizes)
946 paramsizes = self._unpackheader(fparamsizes)
935 # make it a list of couple again
947 # make it a list of couple again
936 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
948 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
937 # split mandatory from advisory
949 # split mandatory from advisory
938 mansizes = paramsizes[:mancount]
950 mansizes = paramsizes[:mancount]
939 advsizes = paramsizes[mancount:]
951 advsizes = paramsizes[mancount:]
940 # retrieve param value
952 # retrieve param value
941 manparams = []
953 manparams = []
942 for key, value in mansizes:
954 for key, value in mansizes:
943 manparams.append((self._fromheader(key), self._fromheader(value)))
955 manparams.append((self._fromheader(key), self._fromheader(value)))
944 advparams = []
956 advparams = []
945 for key, value in advsizes:
957 for key, value in advsizes:
946 advparams.append((self._fromheader(key), self._fromheader(value)))
958 advparams.append((self._fromheader(key), self._fromheader(value)))
947 self._initparams(manparams, advparams)
959 self._initparams(manparams, advparams)
948 ## part payload
960 ## part payload
949 self._payloadstream = util.chunkbuffer(self._payloadchunks())
961 self._payloadstream = util.chunkbuffer(self._payloadchunks())
950 # we read the data, tell it
962 # we read the data, tell it
951 self._initialized = True
963 self._initialized = True
952
964
953 def read(self, size=None):
965 def read(self, size=None):
954 """read payload data"""
966 """read payload data"""
955 if not self._initialized:
967 if not self._initialized:
956 self._readheader()
968 self._readheader()
957 if size is None:
969 if size is None:
958 data = self._payloadstream.read()
970 data = self._payloadstream.read()
959 else:
971 else:
960 data = self._payloadstream.read(size)
972 data = self._payloadstream.read(size)
961 if size is None or len(data) < size:
973 if size is None or len(data) < size:
962 self.consumed = True
974 self.consumed = True
963 self._pos += len(data)
975 self._pos += len(data)
964 return data
976 return data
965
977
966 def tell(self):
978 def tell(self):
967 return self._pos
979 return self._pos
968
980
969 def seek(self, offset, whence=0):
981 def seek(self, offset, whence=0):
970 if whence == 0:
982 if whence == 0:
971 newpos = offset
983 newpos = offset
972 elif whence == 1:
984 elif whence == 1:
973 newpos = self._pos + offset
985 newpos = self._pos + offset
974 elif whence == 2:
986 elif whence == 2:
975 if not self.consumed:
987 if not self.consumed:
976 self.read()
988 self.read()
977 newpos = self._chunkindex[-1][0] - offset
989 newpos = self._chunkindex[-1][0] - offset
978 else:
990 else:
979 raise ValueError('Unknown whence value: %r' % (whence,))
991 raise ValueError('Unknown whence value: %r' % (whence,))
980
992
981 if newpos > self._chunkindex[-1][0] and not self.consumed:
993 if newpos > self._chunkindex[-1][0] and not self.consumed:
982 self.read()
994 self.read()
983 if not 0 <= newpos <= self._chunkindex[-1][0]:
995 if not 0 <= newpos <= self._chunkindex[-1][0]:
984 raise ValueError('Offset out of range')
996 raise ValueError('Offset out of range')
985
997
986 if self._pos != newpos:
998 if self._pos != newpos:
987 chunk, internaloffset = self._findchunk(newpos)
999 chunk, internaloffset = self._findchunk(newpos)
988 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1000 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
989 adjust = self.read(internaloffset)
1001 adjust = self.read(internaloffset)
990 if len(adjust) != internaloffset:
1002 if len(adjust) != internaloffset:
991 raise util.Abort(_('Seek failed\n'))
1003 raise util.Abort(_('Seek failed\n'))
992 self._pos = newpos
1004 self._pos = newpos
993
1005
994 capabilities = {'HG20': (),
1006 capabilities = {'HG20': (),
995 'listkeys': (),
1007 'listkeys': (),
996 'pushkey': (),
1008 'pushkey': (),
997 'digests': tuple(sorted(util.DIGESTS.keys())),
1009 'digests': tuple(sorted(util.DIGESTS.keys())),
998 'remote-changegroup': ('http', 'https'),
1010 'remote-changegroup': ('http', 'https'),
999 }
1011 }
1000
1012
1001 def getrepocaps(repo, allowpushback=False):
1013 def getrepocaps(repo, allowpushback=False):
1002 """return the bundle2 capabilities for a given repo
1014 """return the bundle2 capabilities for a given repo
1003
1015
1004 Exists to allow extensions (like evolution) to mutate the capabilities.
1016 Exists to allow extensions (like evolution) to mutate the capabilities.
1005 """
1017 """
1006 caps = capabilities.copy()
1018 caps = capabilities.copy()
1007 caps['changegroup'] = tuple(sorted(changegroup.packermap.keys()))
1019 caps['changegroup'] = tuple(sorted(changegroup.packermap.keys()))
1008 if obsolete.isenabled(repo, obsolete.exchangeopt):
1020 if obsolete.isenabled(repo, obsolete.exchangeopt):
1009 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1021 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1010 caps['obsmarkers'] = supportedformat
1022 caps['obsmarkers'] = supportedformat
1011 if allowpushback:
1023 if allowpushback:
1012 caps['pushback'] = ()
1024 caps['pushback'] = ()
1013 return caps
1025 return caps
1014
1026
1015 def bundle2caps(remote):
1027 def bundle2caps(remote):
1016 """return the bundle capabilities of a peer as dict"""
1028 """return the bundle capabilities of a peer as dict"""
1017 raw = remote.capable('bundle2')
1029 raw = remote.capable('bundle2')
1018 if not raw and raw != '':
1030 if not raw and raw != '':
1019 return {}
1031 return {}
1020 capsblob = urllib.unquote(remote.capable('bundle2'))
1032 capsblob = urllib.unquote(remote.capable('bundle2'))
1021 return decodecaps(capsblob)
1033 return decodecaps(capsblob)
1022
1034
1023 def obsmarkersversion(caps):
1035 def obsmarkersversion(caps):
1024 """extract the list of supported obsmarkers versions from a bundle2caps dict
1036 """extract the list of supported obsmarkers versions from a bundle2caps dict
1025 """
1037 """
1026 obscaps = caps.get('obsmarkers', ())
1038 obscaps = caps.get('obsmarkers', ())
1027 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1039 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1028
1040
1029 @parthandler('changegroup', ('version',))
1041 @parthandler('changegroup', ('version',))
1030 def handlechangegroup(op, inpart):
1042 def handlechangegroup(op, inpart):
1031 """apply a changegroup part on the repo
1043 """apply a changegroup part on the repo
1032
1044
1033 This is a very early implementation that will massive rework before being
1045 This is a very early implementation that will massive rework before being
1034 inflicted to any end-user.
1046 inflicted to any end-user.
1035 """
1047 """
1036 # Make sure we trigger a transaction creation
1048 # Make sure we trigger a transaction creation
1037 #
1049 #
1038 # The addchangegroup function will get a transaction object by itself, but
1050 # The addchangegroup function will get a transaction object by itself, but
1039 # we need to make sure we trigger the creation of a transaction object used
1051 # we need to make sure we trigger the creation of a transaction object used
1040 # for the whole processing scope.
1052 # for the whole processing scope.
1041 op.gettransaction()
1053 op.gettransaction()
1042 unpackerversion = inpart.params.get('version', '01')
1054 unpackerversion = inpart.params.get('version', '01')
1043 # We should raise an appropriate exception here
1055 # We should raise an appropriate exception here
1044 unpacker = changegroup.packermap[unpackerversion][1]
1056 unpacker = changegroup.packermap[unpackerversion][1]
1045 cg = unpacker(inpart, 'UN')
1057 cg = unpacker(inpart, 'UN')
1046 # the source and url passed here are overwritten by the one contained in
1058 # the source and url passed here are overwritten by the one contained in
1047 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1059 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1048 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
1060 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
1049 op.records.add('changegroup', {'return': ret})
1061 op.records.add('changegroup', {'return': ret})
1050 if op.reply is not None:
1062 if op.reply is not None:
1051 # This is definitely not the final form of this
1063 # This is definitely not the final form of this
1052 # return. But one need to start somewhere.
1064 # return. But one need to start somewhere.
1053 part = op.reply.newpart('reply:changegroup', mandatory=False)
1065 part = op.reply.newpart('reply:changegroup', mandatory=False)
1054 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1066 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1055 part.addparam('return', '%i' % ret, mandatory=False)
1067 part.addparam('return', '%i' % ret, mandatory=False)
1056 assert not inpart.read()
1068 assert not inpart.read()
1057
1069
1058 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1070 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1059 ['digest:%s' % k for k in util.DIGESTS.keys()])
1071 ['digest:%s' % k for k in util.DIGESTS.keys()])
1060 @parthandler('remote-changegroup', _remotechangegroupparams)
1072 @parthandler('remote-changegroup', _remotechangegroupparams)
1061 def handleremotechangegroup(op, inpart):
1073 def handleremotechangegroup(op, inpart):
1062 """apply a bundle10 on the repo, given an url and validation information
1074 """apply a bundle10 on the repo, given an url and validation information
1063
1075
1064 All the information about the remote bundle to import are given as
1076 All the information about the remote bundle to import are given as
1065 parameters. The parameters include:
1077 parameters. The parameters include:
1066 - url: the url to the bundle10.
1078 - url: the url to the bundle10.
1067 - size: the bundle10 file size. It is used to validate what was
1079 - size: the bundle10 file size. It is used to validate what was
1068 retrieved by the client matches the server knowledge about the bundle.
1080 retrieved by the client matches the server knowledge about the bundle.
1069 - digests: a space separated list of the digest types provided as
1081 - digests: a space separated list of the digest types provided as
1070 parameters.
1082 parameters.
1071 - digest:<digest-type>: the hexadecimal representation of the digest with
1083 - digest:<digest-type>: the hexadecimal representation of the digest with
1072 that name. Like the size, it is used to validate what was retrieved by
1084 that name. Like the size, it is used to validate what was retrieved by
1073 the client matches what the server knows about the bundle.
1085 the client matches what the server knows about the bundle.
1074
1086
1075 When multiple digest types are given, all of them are checked.
1087 When multiple digest types are given, all of them are checked.
1076 """
1088 """
1077 try:
1089 try:
1078 raw_url = inpart.params['url']
1090 raw_url = inpart.params['url']
1079 except KeyError:
1091 except KeyError:
1080 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1092 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1081 parsed_url = util.url(raw_url)
1093 parsed_url = util.url(raw_url)
1082 if parsed_url.scheme not in capabilities['remote-changegroup']:
1094 if parsed_url.scheme not in capabilities['remote-changegroup']:
1083 raise util.Abort(_('remote-changegroup does not support %s urls') %
1095 raise util.Abort(_('remote-changegroup does not support %s urls') %
1084 parsed_url.scheme)
1096 parsed_url.scheme)
1085
1097
1086 try:
1098 try:
1087 size = int(inpart.params['size'])
1099 size = int(inpart.params['size'])
1088 except ValueError:
1100 except ValueError:
1089 raise util.Abort(_('remote-changegroup: invalid value for param "%s"')
1101 raise util.Abort(_('remote-changegroup: invalid value for param "%s"')
1090 % 'size')
1102 % 'size')
1091 except KeyError:
1103 except KeyError:
1092 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1104 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1093
1105
1094 digests = {}
1106 digests = {}
1095 for typ in inpart.params.get('digests', '').split():
1107 for typ in inpart.params.get('digests', '').split():
1096 param = 'digest:%s' % typ
1108 param = 'digest:%s' % typ
1097 try:
1109 try:
1098 value = inpart.params[param]
1110 value = inpart.params[param]
1099 except KeyError:
1111 except KeyError:
1100 raise util.Abort(_('remote-changegroup: missing "%s" param') %
1112 raise util.Abort(_('remote-changegroup: missing "%s" param') %
1101 param)
1113 param)
1102 digests[typ] = value
1114 digests[typ] = value
1103
1115
1104 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1116 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1105
1117
1106 # Make sure we trigger a transaction creation
1118 # Make sure we trigger a transaction creation
1107 #
1119 #
1108 # The addchangegroup function will get a transaction object by itself, but
1120 # The addchangegroup function will get a transaction object by itself, but
1109 # we need to make sure we trigger the creation of a transaction object used
1121 # we need to make sure we trigger the creation of a transaction object used
1110 # for the whole processing scope.
1122 # for the whole processing scope.
1111 op.gettransaction()
1123 op.gettransaction()
1112 import exchange
1124 import exchange
1113 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1125 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1114 if not isinstance(cg, changegroup.cg1unpacker):
1126 if not isinstance(cg, changegroup.cg1unpacker):
1115 raise util.Abort(_('%s: not a bundle version 1.0') %
1127 raise util.Abort(_('%s: not a bundle version 1.0') %
1116 util.hidepassword(raw_url))
1128 util.hidepassword(raw_url))
1117 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
1129 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
1118 op.records.add('changegroup', {'return': ret})
1130 op.records.add('changegroup', {'return': ret})
1119 if op.reply is not None:
1131 if op.reply is not None:
1120 # This is definitely not the final form of this
1132 # This is definitely not the final form of this
1121 # return. But one need to start somewhere.
1133 # return. But one need to start somewhere.
1122 part = op.reply.newpart('reply:changegroup')
1134 part = op.reply.newpart('reply:changegroup')
1123 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1135 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1124 part.addparam('return', '%i' % ret, mandatory=False)
1136 part.addparam('return', '%i' % ret, mandatory=False)
1125 try:
1137 try:
1126 real_part.validate()
1138 real_part.validate()
1127 except util.Abort, e:
1139 except util.Abort, e:
1128 raise util.Abort(_('bundle at %s is corrupted:\n%s') %
1140 raise util.Abort(_('bundle at %s is corrupted:\n%s') %
1129 (util.hidepassword(raw_url), str(e)))
1141 (util.hidepassword(raw_url), str(e)))
1130 assert not inpart.read()
1142 assert not inpart.read()
1131
1143
1132 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1144 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1133 def handlereplychangegroup(op, inpart):
1145 def handlereplychangegroup(op, inpart):
1134 ret = int(inpart.params['return'])
1146 ret = int(inpart.params['return'])
1135 replyto = int(inpart.params['in-reply-to'])
1147 replyto = int(inpart.params['in-reply-to'])
1136 op.records.add('changegroup', {'return': ret}, replyto)
1148 op.records.add('changegroup', {'return': ret}, replyto)
1137
1149
1138 @parthandler('check:heads')
1150 @parthandler('check:heads')
1139 def handlecheckheads(op, inpart):
1151 def handlecheckheads(op, inpart):
1140 """check that head of the repo did not change
1152 """check that head of the repo did not change
1141
1153
1142 This is used to detect a push race when using unbundle.
1154 This is used to detect a push race when using unbundle.
1143 This replaces the "heads" argument of unbundle."""
1155 This replaces the "heads" argument of unbundle."""
1144 h = inpart.read(20)
1156 h = inpart.read(20)
1145 heads = []
1157 heads = []
1146 while len(h) == 20:
1158 while len(h) == 20:
1147 heads.append(h)
1159 heads.append(h)
1148 h = inpart.read(20)
1160 h = inpart.read(20)
1149 assert not h
1161 assert not h
1150 if heads != op.repo.heads():
1162 if heads != op.repo.heads():
1151 raise error.PushRaced('repository changed while pushing - '
1163 raise error.PushRaced('repository changed while pushing - '
1152 'please try again')
1164 'please try again')
1153
1165
1154 @parthandler('output')
1166 @parthandler('output')
1155 def handleoutput(op, inpart):
1167 def handleoutput(op, inpart):
1156 """forward output captured on the server to the client"""
1168 """forward output captured on the server to the client"""
1157 for line in inpart.read().splitlines():
1169 for line in inpart.read().splitlines():
1158 op.ui.write(('remote: %s\n' % line))
1170 op.ui.write(('remote: %s\n' % line))
1159
1171
1160 @parthandler('replycaps')
1172 @parthandler('replycaps')
1161 def handlereplycaps(op, inpart):
1173 def handlereplycaps(op, inpart):
1162 """Notify that a reply bundle should be created
1174 """Notify that a reply bundle should be created
1163
1175
1164 The payload contains the capabilities information for the reply"""
1176 The payload contains the capabilities information for the reply"""
1165 caps = decodecaps(inpart.read())
1177 caps = decodecaps(inpart.read())
1166 if op.reply is None:
1178 if op.reply is None:
1167 op.reply = bundle20(op.ui, caps)
1179 op.reply = bundle20(op.ui, caps)
1168
1180
1169 @parthandler('error:abort', ('message', 'hint'))
1181 @parthandler('error:abort', ('message', 'hint'))
1170 def handleerrorabort(op, inpart):
1182 def handleerrorabort(op, inpart):
1171 """Used to transmit abort error over the wire"""
1183 """Used to transmit abort error over the wire"""
1172 raise util.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
1184 raise util.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
1173
1185
1174 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1186 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1175 def handleerrorunsupportedcontent(op, inpart):
1187 def handleerrorunsupportedcontent(op, inpart):
1176 """Used to transmit unknown content error over the wire"""
1188 """Used to transmit unknown content error over the wire"""
1177 kwargs = {}
1189 kwargs = {}
1178 parttype = inpart.params.get('parttype')
1190 parttype = inpart.params.get('parttype')
1179 if parttype is not None:
1191 if parttype is not None:
1180 kwargs['parttype'] = parttype
1192 kwargs['parttype'] = parttype
1181 params = inpart.params.get('params')
1193 params = inpart.params.get('params')
1182 if params is not None:
1194 if params is not None:
1183 kwargs['params'] = params.split('\0')
1195 kwargs['params'] = params.split('\0')
1184
1196
1185 raise error.UnsupportedPartError(**kwargs)
1197 raise error.UnsupportedPartError(**kwargs)
1186
1198
1187 @parthandler('error:pushraced', ('message',))
1199 @parthandler('error:pushraced', ('message',))
1188 def handleerrorpushraced(op, inpart):
1200 def handleerrorpushraced(op, inpart):
1189 """Used to transmit push race error over the wire"""
1201 """Used to transmit push race error over the wire"""
1190 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1202 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1191
1203
1192 @parthandler('listkeys', ('namespace',))
1204 @parthandler('listkeys', ('namespace',))
1193 def handlelistkeys(op, inpart):
1205 def handlelistkeys(op, inpart):
1194 """retrieve pushkey namespace content stored in a bundle2"""
1206 """retrieve pushkey namespace content stored in a bundle2"""
1195 namespace = inpart.params['namespace']
1207 namespace = inpart.params['namespace']
1196 r = pushkey.decodekeys(inpart.read())
1208 r = pushkey.decodekeys(inpart.read())
1197 op.records.add('listkeys', (namespace, r))
1209 op.records.add('listkeys', (namespace, r))
1198
1210
1199 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1211 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1200 def handlepushkey(op, inpart):
1212 def handlepushkey(op, inpart):
1201 """process a pushkey request"""
1213 """process a pushkey request"""
1202 dec = pushkey.decode
1214 dec = pushkey.decode
1203 namespace = dec(inpart.params['namespace'])
1215 namespace = dec(inpart.params['namespace'])
1204 key = dec(inpart.params['key'])
1216 key = dec(inpart.params['key'])
1205 old = dec(inpart.params['old'])
1217 old = dec(inpart.params['old'])
1206 new = dec(inpart.params['new'])
1218 new = dec(inpart.params['new'])
1207 ret = op.repo.pushkey(namespace, key, old, new)
1219 ret = op.repo.pushkey(namespace, key, old, new)
1208 record = {'namespace': namespace,
1220 record = {'namespace': namespace,
1209 'key': key,
1221 'key': key,
1210 'old': old,
1222 'old': old,
1211 'new': new}
1223 'new': new}
1212 op.records.add('pushkey', record)
1224 op.records.add('pushkey', record)
1213 if op.reply is not None:
1225 if op.reply is not None:
1214 rpart = op.reply.newpart('reply:pushkey')
1226 rpart = op.reply.newpart('reply:pushkey')
1215 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1227 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1216 rpart.addparam('return', '%i' % ret, mandatory=False)
1228 rpart.addparam('return', '%i' % ret, mandatory=False)
1217
1229
1218 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1230 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1219 def handlepushkeyreply(op, inpart):
1231 def handlepushkeyreply(op, inpart):
1220 """retrieve the result of a pushkey request"""
1232 """retrieve the result of a pushkey request"""
1221 ret = int(inpart.params['return'])
1233 ret = int(inpart.params['return'])
1222 partid = int(inpart.params['in-reply-to'])
1234 partid = int(inpart.params['in-reply-to'])
1223 op.records.add('pushkey', {'return': ret}, partid)
1235 op.records.add('pushkey', {'return': ret}, partid)
1224
1236
1225 @parthandler('obsmarkers')
1237 @parthandler('obsmarkers')
1226 def handleobsmarker(op, inpart):
1238 def handleobsmarker(op, inpart):
1227 """add a stream of obsmarkers to the repo"""
1239 """add a stream of obsmarkers to the repo"""
1228 tr = op.gettransaction()
1240 tr = op.gettransaction()
1229 markerdata = inpart.read()
1241 markerdata = inpart.read()
1230 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1242 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1231 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1243 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1232 % len(markerdata))
1244 % len(markerdata))
1233 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1245 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1234 if new:
1246 if new:
1235 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1247 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1236 op.records.add('obsmarkers', {'new': new})
1248 op.records.add('obsmarkers', {'new': new})
1237 if op.reply is not None:
1249 if op.reply is not None:
1238 rpart = op.reply.newpart('reply:obsmarkers')
1250 rpart = op.reply.newpart('reply:obsmarkers')
1239 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1251 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1240 rpart.addparam('new', '%i' % new, mandatory=False)
1252 rpart.addparam('new', '%i' % new, mandatory=False)
1241
1253
1242
1254
1243 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1255 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1244 def handlepushkeyreply(op, inpart):
1256 def handlepushkeyreply(op, inpart):
1245 """retrieve the result of a pushkey request"""
1257 """retrieve the result of a pushkey request"""
1246 ret = int(inpart.params['new'])
1258 ret = int(inpart.params['new'])
1247 partid = int(inpart.params['in-reply-to'])
1259 partid = int(inpart.params['in-reply-to'])
1248 op.records.add('obsmarkers', {'new': ret}, partid)
1260 op.records.add('obsmarkers', {'new': ret}, partid)
General Comments 0
You need to be logged in to leave comments. Login now