##// END OF EJS Templates
bundle2: issue remote output as "status" (issue4612)...
Pierre-Yves David -
r24846:e79dd1c9 stable
parent child Browse files
Show More
@@ -1,1264 +1,1264 b''
1 # bundle2.py - generic container format to transmit arbitrary data.
1 # bundle2.py - generic container format to transmit arbitrary data.
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7 """Handling of the new bundle2 format
7 """Handling of the new bundle2 format
8
8
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 payloads in an application agnostic way. It consist in a sequence of "parts"
10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 that will be handed to and processed by the application layer.
11 that will be handed to and processed by the application layer.
12
12
13
13
14 General format architecture
14 General format architecture
15 ===========================
15 ===========================
16
16
17 The format is architectured as follow
17 The format is architectured as follow
18
18
19 - magic string
19 - magic string
20 - stream level parameters
20 - stream level parameters
21 - payload parts (any number)
21 - payload parts (any number)
22 - end of stream marker.
22 - end of stream marker.
23
23
24 the Binary format
24 the Binary format
25 ============================
25 ============================
26
26
27 All numbers are unsigned and big-endian.
27 All numbers are unsigned and big-endian.
28
28
29 stream level parameters
29 stream level parameters
30 ------------------------
30 ------------------------
31
31
32 Binary format is as follow
32 Binary format is as follow
33
33
34 :params size: int32
34 :params size: int32
35
35
36 The total number of Bytes used by the parameters
36 The total number of Bytes used by the parameters
37
37
38 :params value: arbitrary number of Bytes
38 :params value: arbitrary number of Bytes
39
39
40 A blob of `params size` containing the serialized version of all stream level
40 A blob of `params size` containing the serialized version of all stream level
41 parameters.
41 parameters.
42
42
43 The blob contains a space separated list of parameters. Parameters with value
43 The blob contains a space separated list of parameters. Parameters with value
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45
45
46 Empty name are obviously forbidden.
46 Empty name are obviously forbidden.
47
47
48 Name MUST start with a letter. If this first letter is lower case, the
48 Name MUST start with a letter. If this first letter is lower case, the
49 parameter is advisory and can be safely ignored. However when the first
49 parameter is advisory and can be safely ignored. However when the first
50 letter is capital, the parameter is mandatory and the bundling process MUST
50 letter is capital, the parameter is mandatory and the bundling process MUST
51 stop if he is not able to proceed it.
51 stop if he is not able to proceed it.
52
52
53 Stream parameters use a simple textual format for two main reasons:
53 Stream parameters use a simple textual format for two main reasons:
54
54
55 - Stream level parameters should remain simple and we want to discourage any
55 - Stream level parameters should remain simple and we want to discourage any
56 crazy usage.
56 crazy usage.
57 - Textual data allow easy human inspection of a bundle2 header in case of
57 - Textual data allow easy human inspection of a bundle2 header in case of
58 troubles.
58 troubles.
59
59
60 Any Applicative level options MUST go into a bundle2 part instead.
60 Any Applicative level options MUST go into a bundle2 part instead.
61
61
62 Payload part
62 Payload part
63 ------------------------
63 ------------------------
64
64
65 Binary format is as follow
65 Binary format is as follow
66
66
67 :header size: int32
67 :header size: int32
68
68
69 The total number of Bytes used by the part headers. When the header is empty
69 The total number of Bytes used by the part headers. When the header is empty
70 (size = 0) this is interpreted as the end of stream marker.
70 (size = 0) this is interpreted as the end of stream marker.
71
71
72 :header:
72 :header:
73
73
74 The header defines how to interpret the part. It contains two piece of
74 The header defines how to interpret the part. It contains two piece of
75 data: the part type, and the part parameters.
75 data: the part type, and the part parameters.
76
76
77 The part type is used to route an application level handler, that can
77 The part type is used to route an application level handler, that can
78 interpret payload.
78 interpret payload.
79
79
80 Part parameters are passed to the application level handler. They are
80 Part parameters are passed to the application level handler. They are
81 meant to convey information that will help the application level object to
81 meant to convey information that will help the application level object to
82 interpret the part payload.
82 interpret the part payload.
83
83
84 The binary format of the header is has follow
84 The binary format of the header is has follow
85
85
86 :typesize: (one byte)
86 :typesize: (one byte)
87
87
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89
89
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 to this part.
91 to this part.
92
92
93 :parameters:
93 :parameters:
94
94
95 Part's parameter may have arbitrary content, the binary structure is::
95 Part's parameter may have arbitrary content, the binary structure is::
96
96
97 <mandatory-count><advisory-count><param-sizes><param-data>
97 <mandatory-count><advisory-count><param-sizes><param-data>
98
98
99 :mandatory-count: 1 byte, number of mandatory parameters
99 :mandatory-count: 1 byte, number of mandatory parameters
100
100
101 :advisory-count: 1 byte, number of advisory parameters
101 :advisory-count: 1 byte, number of advisory parameters
102
102
103 :param-sizes:
103 :param-sizes:
104
104
105 N couple of bytes, where N is the total number of parameters. Each
105 N couple of bytes, where N is the total number of parameters. Each
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107
107
108 :param-data:
108 :param-data:
109
109
110 A blob of bytes from which each parameter key and value can be
110 A blob of bytes from which each parameter key and value can be
111 retrieved using the list of size couples stored in the previous
111 retrieved using the list of size couples stored in the previous
112 field.
112 field.
113
113
114 Mandatory parameters comes first, then the advisory ones.
114 Mandatory parameters comes first, then the advisory ones.
115
115
116 Each parameter's key MUST be unique within the part.
116 Each parameter's key MUST be unique within the part.
117
117
118 :payload:
118 :payload:
119
119
120 payload is a series of `<chunksize><chunkdata>`.
120 payload is a series of `<chunksize><chunkdata>`.
121
121
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124
124
125 The current implementation always produces either zero or one chunk.
125 The current implementation always produces either zero or one chunk.
126 This is an implementation limitation that will ultimately be lifted.
126 This is an implementation limitation that will ultimately be lifted.
127
127
128 `chunksize` can be negative to trigger special case processing. No such
128 `chunksize` can be negative to trigger special case processing. No such
129 processing is in place yet.
129 processing is in place yet.
130
130
131 Bundle processing
131 Bundle processing
132 ============================
132 ============================
133
133
134 Each part is processed in order using a "part handler". Handler are registered
134 Each part is processed in order using a "part handler". Handler are registered
135 for a certain part type.
135 for a certain part type.
136
136
137 The matching of a part to its handler is case insensitive. The case of the
137 The matching of a part to its handler is case insensitive. The case of the
138 part type is used to know if a part is mandatory or advisory. If the Part type
138 part type is used to know if a part is mandatory or advisory. If the Part type
139 contains any uppercase char it is considered mandatory. When no handler is
139 contains any uppercase char it is considered mandatory. When no handler is
140 known for a Mandatory part, the process is aborted and an exception is raised.
140 known for a Mandatory part, the process is aborted and an exception is raised.
141 If the part is advisory and no handler is known, the part is ignored. When the
141 If the part is advisory and no handler is known, the part is ignored. When the
142 process is aborted, the full bundle is still read from the stream to keep the
142 process is aborted, the full bundle is still read from the stream to keep the
143 channel usable. But none of the part read from an abort are processed. In the
143 channel usable. But none of the part read from an abort are processed. In the
144 future, dropping the stream may become an option for channel we do not care to
144 future, dropping the stream may become an option for channel we do not care to
145 preserve.
145 preserve.
146 """
146 """
147
147
148 import errno
148 import errno
149 import sys
149 import sys
150 import util
150 import util
151 import struct
151 import struct
152 import urllib
152 import urllib
153 import string
153 import string
154 import obsolete
154 import obsolete
155 import pushkey
155 import pushkey
156 import url
156 import url
157 import re
157 import re
158
158
159 import changegroup, error
159 import changegroup, error
160 from i18n import _
160 from i18n import _
161
161
162 _pack = struct.pack
162 _pack = struct.pack
163 _unpack = struct.unpack
163 _unpack = struct.unpack
164
164
165 _fstreamparamsize = '>i'
165 _fstreamparamsize = '>i'
166 _fpartheadersize = '>i'
166 _fpartheadersize = '>i'
167 _fparttypesize = '>B'
167 _fparttypesize = '>B'
168 _fpartid = '>I'
168 _fpartid = '>I'
169 _fpayloadsize = '>i'
169 _fpayloadsize = '>i'
170 _fpartparamcount = '>BB'
170 _fpartparamcount = '>BB'
171
171
172 preferedchunksize = 4096
172 preferedchunksize = 4096
173
173
174 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
174 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
175
175
176 def validateparttype(parttype):
176 def validateparttype(parttype):
177 """raise ValueError if a parttype contains invalid character"""
177 """raise ValueError if a parttype contains invalid character"""
178 if _parttypeforbidden.search(parttype):
178 if _parttypeforbidden.search(parttype):
179 raise ValueError(parttype)
179 raise ValueError(parttype)
180
180
181 def _makefpartparamsizes(nbparams):
181 def _makefpartparamsizes(nbparams):
182 """return a struct format to read part parameter sizes
182 """return a struct format to read part parameter sizes
183
183
184 The number parameters is variable so we need to build that format
184 The number parameters is variable so we need to build that format
185 dynamically.
185 dynamically.
186 """
186 """
187 return '>'+('BB'*nbparams)
187 return '>'+('BB'*nbparams)
188
188
189 parthandlermapping = {}
189 parthandlermapping = {}
190
190
191 def parthandler(parttype, params=()):
191 def parthandler(parttype, params=()):
192 """decorator that register a function as a bundle2 part handler
192 """decorator that register a function as a bundle2 part handler
193
193
194 eg::
194 eg::
195
195
196 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
196 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
197 def myparttypehandler(...):
197 def myparttypehandler(...):
198 '''process a part of type "my part".'''
198 '''process a part of type "my part".'''
199 ...
199 ...
200 """
200 """
201 validateparttype(parttype)
201 validateparttype(parttype)
202 def _decorator(func):
202 def _decorator(func):
203 lparttype = parttype.lower() # enforce lower case matching.
203 lparttype = parttype.lower() # enforce lower case matching.
204 assert lparttype not in parthandlermapping
204 assert lparttype not in parthandlermapping
205 parthandlermapping[lparttype] = func
205 parthandlermapping[lparttype] = func
206 func.params = frozenset(params)
206 func.params = frozenset(params)
207 return func
207 return func
208 return _decorator
208 return _decorator
209
209
210 class unbundlerecords(object):
210 class unbundlerecords(object):
211 """keep record of what happens during and unbundle
211 """keep record of what happens during and unbundle
212
212
213 New records are added using `records.add('cat', obj)`. Where 'cat' is a
213 New records are added using `records.add('cat', obj)`. Where 'cat' is a
214 category of record and obj is an arbitrary object.
214 category of record and obj is an arbitrary object.
215
215
216 `records['cat']` will return all entries of this category 'cat'.
216 `records['cat']` will return all entries of this category 'cat'.
217
217
218 Iterating on the object itself will yield `('category', obj)` tuples
218 Iterating on the object itself will yield `('category', obj)` tuples
219 for all entries.
219 for all entries.
220
220
221 All iterations happens in chronological order.
221 All iterations happens in chronological order.
222 """
222 """
223
223
224 def __init__(self):
224 def __init__(self):
225 self._categories = {}
225 self._categories = {}
226 self._sequences = []
226 self._sequences = []
227 self._replies = {}
227 self._replies = {}
228
228
229 def add(self, category, entry, inreplyto=None):
229 def add(self, category, entry, inreplyto=None):
230 """add a new record of a given category.
230 """add a new record of a given category.
231
231
232 The entry can then be retrieved in the list returned by
232 The entry can then be retrieved in the list returned by
233 self['category']."""
233 self['category']."""
234 self._categories.setdefault(category, []).append(entry)
234 self._categories.setdefault(category, []).append(entry)
235 self._sequences.append((category, entry))
235 self._sequences.append((category, entry))
236 if inreplyto is not None:
236 if inreplyto is not None:
237 self.getreplies(inreplyto).add(category, entry)
237 self.getreplies(inreplyto).add(category, entry)
238
238
239 def getreplies(self, partid):
239 def getreplies(self, partid):
240 """get the records that are replies to a specific part"""
240 """get the records that are replies to a specific part"""
241 return self._replies.setdefault(partid, unbundlerecords())
241 return self._replies.setdefault(partid, unbundlerecords())
242
242
243 def __getitem__(self, cat):
243 def __getitem__(self, cat):
244 return tuple(self._categories.get(cat, ()))
244 return tuple(self._categories.get(cat, ()))
245
245
246 def __iter__(self):
246 def __iter__(self):
247 return iter(self._sequences)
247 return iter(self._sequences)
248
248
249 def __len__(self):
249 def __len__(self):
250 return len(self._sequences)
250 return len(self._sequences)
251
251
252 def __nonzero__(self):
252 def __nonzero__(self):
253 return bool(self._sequences)
253 return bool(self._sequences)
254
254
255 class bundleoperation(object):
255 class bundleoperation(object):
256 """an object that represents a single bundling process
256 """an object that represents a single bundling process
257
257
258 Its purpose is to carry unbundle-related objects and states.
258 Its purpose is to carry unbundle-related objects and states.
259
259
260 A new object should be created at the beginning of each bundle processing.
260 A new object should be created at the beginning of each bundle processing.
261 The object is to be returned by the processing function.
261 The object is to be returned by the processing function.
262
262
263 The object has very little content now it will ultimately contain:
263 The object has very little content now it will ultimately contain:
264 * an access to the repo the bundle is applied to,
264 * an access to the repo the bundle is applied to,
265 * a ui object,
265 * a ui object,
266 * a way to retrieve a transaction to add changes to the repo,
266 * a way to retrieve a transaction to add changes to the repo,
267 * a way to record the result of processing each part,
267 * a way to record the result of processing each part,
268 * a way to construct a bundle response when applicable.
268 * a way to construct a bundle response when applicable.
269 """
269 """
270
270
271 def __init__(self, repo, transactiongetter):
271 def __init__(self, repo, transactiongetter):
272 self.repo = repo
272 self.repo = repo
273 self.ui = repo.ui
273 self.ui = repo.ui
274 self.records = unbundlerecords()
274 self.records = unbundlerecords()
275 self.gettransaction = transactiongetter
275 self.gettransaction = transactiongetter
276 self.reply = None
276 self.reply = None
277
277
278 class TransactionUnavailable(RuntimeError):
278 class TransactionUnavailable(RuntimeError):
279 pass
279 pass
280
280
281 def _notransaction():
281 def _notransaction():
282 """default method to get a transaction while processing a bundle
282 """default method to get a transaction while processing a bundle
283
283
284 Raise an exception to highlight the fact that no transaction was expected
284 Raise an exception to highlight the fact that no transaction was expected
285 to be created"""
285 to be created"""
286 raise TransactionUnavailable()
286 raise TransactionUnavailable()
287
287
288 def processbundle(repo, unbundler, transactiongetter=None):
288 def processbundle(repo, unbundler, transactiongetter=None):
289 """This function process a bundle, apply effect to/from a repo
289 """This function process a bundle, apply effect to/from a repo
290
290
291 It iterates over each part then searches for and uses the proper handling
291 It iterates over each part then searches for and uses the proper handling
292 code to process the part. Parts are processed in order.
292 code to process the part. Parts are processed in order.
293
293
294 This is very early version of this function that will be strongly reworked
294 This is very early version of this function that will be strongly reworked
295 before final usage.
295 before final usage.
296
296
297 Unknown Mandatory part will abort the process.
297 Unknown Mandatory part will abort the process.
298 """
298 """
299 if transactiongetter is None:
299 if transactiongetter is None:
300 transactiongetter = _notransaction
300 transactiongetter = _notransaction
301 op = bundleoperation(repo, transactiongetter)
301 op = bundleoperation(repo, transactiongetter)
302 # todo:
302 # todo:
303 # - replace this is a init function soon.
303 # - replace this is a init function soon.
304 # - exception catching
304 # - exception catching
305 unbundler.params
305 unbundler.params
306 iterparts = unbundler.iterparts()
306 iterparts = unbundler.iterparts()
307 part = None
307 part = None
308 try:
308 try:
309 for part in iterparts:
309 for part in iterparts:
310 _processpart(op, part)
310 _processpart(op, part)
311 except Exception, exc:
311 except Exception, exc:
312 for part in iterparts:
312 for part in iterparts:
313 # consume the bundle content
313 # consume the bundle content
314 part.seek(0, 2)
314 part.seek(0, 2)
315 # Small hack to let caller code distinguish exceptions from bundle2
315 # Small hack to let caller code distinguish exceptions from bundle2
316 # processing from processing the old format. This is mostly
316 # processing from processing the old format. This is mostly
317 # needed to handle different return codes to unbundle according to the
317 # needed to handle different return codes to unbundle according to the
318 # type of bundle. We should probably clean up or drop this return code
318 # type of bundle. We should probably clean up or drop this return code
319 # craziness in a future version.
319 # craziness in a future version.
320 exc.duringunbundle2 = True
320 exc.duringunbundle2 = True
321 salvaged = []
321 salvaged = []
322 if op.reply is not None:
322 if op.reply is not None:
323 salvaged = op.reply.salvageoutput()
323 salvaged = op.reply.salvageoutput()
324 exc._bundle2salvagedoutput = salvaged
324 exc._bundle2salvagedoutput = salvaged
325 raise
325 raise
326 return op
326 return op
327
327
328 def _processpart(op, part):
328 def _processpart(op, part):
329 """process a single part from a bundle
329 """process a single part from a bundle
330
330
331 The part is guaranteed to have been fully consumed when the function exits
331 The part is guaranteed to have been fully consumed when the function exits
332 (even if an exception is raised)."""
332 (even if an exception is raised)."""
333 try:
333 try:
334 try:
334 try:
335 handler = parthandlermapping.get(part.type)
335 handler = parthandlermapping.get(part.type)
336 if handler is None:
336 if handler is None:
337 raise error.UnsupportedPartError(parttype=part.type)
337 raise error.UnsupportedPartError(parttype=part.type)
338 op.ui.debug('found a handler for part %r\n' % part.type)
338 op.ui.debug('found a handler for part %r\n' % part.type)
339 unknownparams = part.mandatorykeys - handler.params
339 unknownparams = part.mandatorykeys - handler.params
340 if unknownparams:
340 if unknownparams:
341 unknownparams = list(unknownparams)
341 unknownparams = list(unknownparams)
342 unknownparams.sort()
342 unknownparams.sort()
343 raise error.UnsupportedPartError(parttype=part.type,
343 raise error.UnsupportedPartError(parttype=part.type,
344 params=unknownparams)
344 params=unknownparams)
345 except error.UnsupportedPartError, exc:
345 except error.UnsupportedPartError, exc:
346 if part.mandatory: # mandatory parts
346 if part.mandatory: # mandatory parts
347 raise
347 raise
348 op.ui.debug('ignoring unsupported advisory part %s\n' % exc)
348 op.ui.debug('ignoring unsupported advisory part %s\n' % exc)
349 return # skip to part processing
349 return # skip to part processing
350
350
351 # handler is called outside the above try block so that we don't
351 # handler is called outside the above try block so that we don't
352 # risk catching KeyErrors from anything other than the
352 # risk catching KeyErrors from anything other than the
353 # parthandlermapping lookup (any KeyError raised by handler()
353 # parthandlermapping lookup (any KeyError raised by handler()
354 # itself represents a defect of a different variety).
354 # itself represents a defect of a different variety).
355 output = None
355 output = None
356 if op.reply is not None:
356 if op.reply is not None:
357 op.ui.pushbuffer(error=True)
357 op.ui.pushbuffer(error=True)
358 output = ''
358 output = ''
359 try:
359 try:
360 handler(op, part)
360 handler(op, part)
361 finally:
361 finally:
362 if output is not None:
362 if output is not None:
363 output = op.ui.popbuffer()
363 output = op.ui.popbuffer()
364 if output:
364 if output:
365 outpart = op.reply.newpart('output', data=output,
365 outpart = op.reply.newpart('output', data=output,
366 mandatory=False)
366 mandatory=False)
367 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
367 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
368 finally:
368 finally:
369 # consume the part content to not corrupt the stream.
369 # consume the part content to not corrupt the stream.
370 part.seek(0, 2)
370 part.seek(0, 2)
371
371
372
372
373 def decodecaps(blob):
373 def decodecaps(blob):
374 """decode a bundle2 caps bytes blob into a dictionary
374 """decode a bundle2 caps bytes blob into a dictionary
375
375
376 The blob is a list of capabilities (one per line)
376 The blob is a list of capabilities (one per line)
377 Capabilities may have values using a line of the form::
377 Capabilities may have values using a line of the form::
378
378
379 capability=value1,value2,value3
379 capability=value1,value2,value3
380
380
381 The values are always a list."""
381 The values are always a list."""
382 caps = {}
382 caps = {}
383 for line in blob.splitlines():
383 for line in blob.splitlines():
384 if not line:
384 if not line:
385 continue
385 continue
386 if '=' not in line:
386 if '=' not in line:
387 key, vals = line, ()
387 key, vals = line, ()
388 else:
388 else:
389 key, vals = line.split('=', 1)
389 key, vals = line.split('=', 1)
390 vals = vals.split(',')
390 vals = vals.split(',')
391 key = urllib.unquote(key)
391 key = urllib.unquote(key)
392 vals = [urllib.unquote(v) for v in vals]
392 vals = [urllib.unquote(v) for v in vals]
393 caps[key] = vals
393 caps[key] = vals
394 return caps
394 return caps
395
395
396 def encodecaps(caps):
396 def encodecaps(caps):
397 """encode a bundle2 caps dictionary into a bytes blob"""
397 """encode a bundle2 caps dictionary into a bytes blob"""
398 chunks = []
398 chunks = []
399 for ca in sorted(caps):
399 for ca in sorted(caps):
400 vals = caps[ca]
400 vals = caps[ca]
401 ca = urllib.quote(ca)
401 ca = urllib.quote(ca)
402 vals = [urllib.quote(v) for v in vals]
402 vals = [urllib.quote(v) for v in vals]
403 if vals:
403 if vals:
404 ca = "%s=%s" % (ca, ','.join(vals))
404 ca = "%s=%s" % (ca, ','.join(vals))
405 chunks.append(ca)
405 chunks.append(ca)
406 return '\n'.join(chunks)
406 return '\n'.join(chunks)
407
407
408 class bundle20(object):
408 class bundle20(object):
409 """represent an outgoing bundle2 container
409 """represent an outgoing bundle2 container
410
410
411 Use the `addparam` method to add stream level parameter. and `newpart` to
411 Use the `addparam` method to add stream level parameter. and `newpart` to
412 populate it. Then call `getchunks` to retrieve all the binary chunks of
412 populate it. Then call `getchunks` to retrieve all the binary chunks of
413 data that compose the bundle2 container."""
413 data that compose the bundle2 container."""
414
414
415 _magicstring = 'HG20'
415 _magicstring = 'HG20'
416
416
417 def __init__(self, ui, capabilities=()):
417 def __init__(self, ui, capabilities=()):
418 self.ui = ui
418 self.ui = ui
419 self._params = []
419 self._params = []
420 self._parts = []
420 self._parts = []
421 self.capabilities = dict(capabilities)
421 self.capabilities = dict(capabilities)
422
422
423 @property
423 @property
424 def nbparts(self):
424 def nbparts(self):
425 """total number of parts added to the bundler"""
425 """total number of parts added to the bundler"""
426 return len(self._parts)
426 return len(self._parts)
427
427
428 # methods used to defines the bundle2 content
428 # methods used to defines the bundle2 content
429 def addparam(self, name, value=None):
429 def addparam(self, name, value=None):
430 """add a stream level parameter"""
430 """add a stream level parameter"""
431 if not name:
431 if not name:
432 raise ValueError('empty parameter name')
432 raise ValueError('empty parameter name')
433 if name[0] not in string.letters:
433 if name[0] not in string.letters:
434 raise ValueError('non letter first character: %r' % name)
434 raise ValueError('non letter first character: %r' % name)
435 self._params.append((name, value))
435 self._params.append((name, value))
436
436
437 def addpart(self, part):
437 def addpart(self, part):
438 """add a new part to the bundle2 container
438 """add a new part to the bundle2 container
439
439
440 Parts contains the actual applicative payload."""
440 Parts contains the actual applicative payload."""
441 assert part.id is None
441 assert part.id is None
442 part.id = len(self._parts) # very cheap counter
442 part.id = len(self._parts) # very cheap counter
443 self._parts.append(part)
443 self._parts.append(part)
444
444
445 def newpart(self, typeid, *args, **kwargs):
445 def newpart(self, typeid, *args, **kwargs):
446 """create a new part and add it to the containers
446 """create a new part and add it to the containers
447
447
448 As the part is directly added to the containers. For now, this means
448 As the part is directly added to the containers. For now, this means
449 that any failure to properly initialize the part after calling
449 that any failure to properly initialize the part after calling
450 ``newpart`` should result in a failure of the whole bundling process.
450 ``newpart`` should result in a failure of the whole bundling process.
451
451
452 You can still fall back to manually create and add if you need better
452 You can still fall back to manually create and add if you need better
453 control."""
453 control."""
454 part = bundlepart(typeid, *args, **kwargs)
454 part = bundlepart(typeid, *args, **kwargs)
455 self.addpart(part)
455 self.addpart(part)
456 return part
456 return part
457
457
458 # methods used to generate the bundle2 stream
458 # methods used to generate the bundle2 stream
459 def getchunks(self):
459 def getchunks(self):
460 self.ui.debug('start emission of %s stream\n' % self._magicstring)
460 self.ui.debug('start emission of %s stream\n' % self._magicstring)
461 yield self._magicstring
461 yield self._magicstring
462 param = self._paramchunk()
462 param = self._paramchunk()
463 self.ui.debug('bundle parameter: %s\n' % param)
463 self.ui.debug('bundle parameter: %s\n' % param)
464 yield _pack(_fstreamparamsize, len(param))
464 yield _pack(_fstreamparamsize, len(param))
465 if param:
465 if param:
466 yield param
466 yield param
467
467
468 self.ui.debug('start of parts\n')
468 self.ui.debug('start of parts\n')
469 for part in self._parts:
469 for part in self._parts:
470 self.ui.debug('bundle part: "%s"\n' % part.type)
470 self.ui.debug('bundle part: "%s"\n' % part.type)
471 for chunk in part.getchunks():
471 for chunk in part.getchunks():
472 yield chunk
472 yield chunk
473 self.ui.debug('end of bundle\n')
473 self.ui.debug('end of bundle\n')
474 yield _pack(_fpartheadersize, 0)
474 yield _pack(_fpartheadersize, 0)
475
475
476 def _paramchunk(self):
476 def _paramchunk(self):
477 """return a encoded version of all stream parameters"""
477 """return a encoded version of all stream parameters"""
478 blocks = []
478 blocks = []
479 for par, value in self._params:
479 for par, value in self._params:
480 par = urllib.quote(par)
480 par = urllib.quote(par)
481 if value is not None:
481 if value is not None:
482 value = urllib.quote(value)
482 value = urllib.quote(value)
483 par = '%s=%s' % (par, value)
483 par = '%s=%s' % (par, value)
484 blocks.append(par)
484 blocks.append(par)
485 return ' '.join(blocks)
485 return ' '.join(blocks)
486
486
487 def salvageoutput(self):
487 def salvageoutput(self):
488 """return a list with a copy of all output parts in the bundle
488 """return a list with a copy of all output parts in the bundle
489
489
490 This is meant to be used during error handling to make sure we preserve
490 This is meant to be used during error handling to make sure we preserve
491 server output"""
491 server output"""
492 salvaged = []
492 salvaged = []
493 for part in self._parts:
493 for part in self._parts:
494 if part.type.startswith('output'):
494 if part.type.startswith('output'):
495 salvaged.append(part.copy())
495 salvaged.append(part.copy())
496 return salvaged
496 return salvaged
497
497
498
498
499 class unpackermixin(object):
499 class unpackermixin(object):
500 """A mixin to extract bytes and struct data from a stream"""
500 """A mixin to extract bytes and struct data from a stream"""
501
501
502 def __init__(self, fp):
502 def __init__(self, fp):
503 self._fp = fp
503 self._fp = fp
504 self._seekable = (util.safehasattr(fp, 'seek') and
504 self._seekable = (util.safehasattr(fp, 'seek') and
505 util.safehasattr(fp, 'tell'))
505 util.safehasattr(fp, 'tell'))
506
506
507 def _unpack(self, format):
507 def _unpack(self, format):
508 """unpack this struct format from the stream"""
508 """unpack this struct format from the stream"""
509 data = self._readexact(struct.calcsize(format))
509 data = self._readexact(struct.calcsize(format))
510 return _unpack(format, data)
510 return _unpack(format, data)
511
511
512 def _readexact(self, size):
512 def _readexact(self, size):
513 """read exactly <size> bytes from the stream"""
513 """read exactly <size> bytes from the stream"""
514 return changegroup.readexactly(self._fp, size)
514 return changegroup.readexactly(self._fp, size)
515
515
516 def seek(self, offset, whence=0):
516 def seek(self, offset, whence=0):
517 """move the underlying file pointer"""
517 """move the underlying file pointer"""
518 if self._seekable:
518 if self._seekable:
519 return self._fp.seek(offset, whence)
519 return self._fp.seek(offset, whence)
520 else:
520 else:
521 raise NotImplementedError(_('File pointer is not seekable'))
521 raise NotImplementedError(_('File pointer is not seekable'))
522
522
523 def tell(self):
523 def tell(self):
524 """return the file offset, or None if file is not seekable"""
524 """return the file offset, or None if file is not seekable"""
525 if self._seekable:
525 if self._seekable:
526 try:
526 try:
527 return self._fp.tell()
527 return self._fp.tell()
528 except IOError, e:
528 except IOError, e:
529 if e.errno == errno.ESPIPE:
529 if e.errno == errno.ESPIPE:
530 self._seekable = False
530 self._seekable = False
531 else:
531 else:
532 raise
532 raise
533 return None
533 return None
534
534
535 def close(self):
535 def close(self):
536 """close underlying file"""
536 """close underlying file"""
537 if util.safehasattr(self._fp, 'close'):
537 if util.safehasattr(self._fp, 'close'):
538 return self._fp.close()
538 return self._fp.close()
539
539
540 def getunbundler(ui, fp, header=None):
540 def getunbundler(ui, fp, header=None):
541 """return a valid unbundler object for a given header"""
541 """return a valid unbundler object for a given header"""
542 if header is None:
542 if header is None:
543 header = changegroup.readexactly(fp, 4)
543 header = changegroup.readexactly(fp, 4)
544 magic, version = header[0:2], header[2:4]
544 magic, version = header[0:2], header[2:4]
545 if magic != 'HG':
545 if magic != 'HG':
546 raise util.Abort(_('not a Mercurial bundle'))
546 raise util.Abort(_('not a Mercurial bundle'))
547 unbundlerclass = formatmap.get(version)
547 unbundlerclass = formatmap.get(version)
548 if unbundlerclass is None:
548 if unbundlerclass is None:
549 raise util.Abort(_('unknown bundle version %s') % version)
549 raise util.Abort(_('unknown bundle version %s') % version)
550 unbundler = unbundlerclass(ui, fp)
550 unbundler = unbundlerclass(ui, fp)
551 ui.debug('start processing of %s stream\n' % header)
551 ui.debug('start processing of %s stream\n' % header)
552 return unbundler
552 return unbundler
553
553
554 class unbundle20(unpackermixin):
554 class unbundle20(unpackermixin):
555 """interpret a bundle2 stream
555 """interpret a bundle2 stream
556
556
557 This class is fed with a binary stream and yields parts through its
557 This class is fed with a binary stream and yields parts through its
558 `iterparts` methods."""
558 `iterparts` methods."""
559
559
560 def __init__(self, ui, fp):
560 def __init__(self, ui, fp):
561 """If header is specified, we do not read it out of the stream."""
561 """If header is specified, we do not read it out of the stream."""
562 self.ui = ui
562 self.ui = ui
563 super(unbundle20, self).__init__(fp)
563 super(unbundle20, self).__init__(fp)
564
564
565 @util.propertycache
565 @util.propertycache
566 def params(self):
566 def params(self):
567 """dictionary of stream level parameters"""
567 """dictionary of stream level parameters"""
568 self.ui.debug('reading bundle2 stream parameters\n')
568 self.ui.debug('reading bundle2 stream parameters\n')
569 params = {}
569 params = {}
570 paramssize = self._unpack(_fstreamparamsize)[0]
570 paramssize = self._unpack(_fstreamparamsize)[0]
571 if paramssize < 0:
571 if paramssize < 0:
572 raise error.BundleValueError('negative bundle param size: %i'
572 raise error.BundleValueError('negative bundle param size: %i'
573 % paramssize)
573 % paramssize)
574 if paramssize:
574 if paramssize:
575 for p in self._readexact(paramssize).split(' '):
575 for p in self._readexact(paramssize).split(' '):
576 p = p.split('=', 1)
576 p = p.split('=', 1)
577 p = [urllib.unquote(i) for i in p]
577 p = [urllib.unquote(i) for i in p]
578 if len(p) < 2:
578 if len(p) < 2:
579 p.append(None)
579 p.append(None)
580 self._processparam(*p)
580 self._processparam(*p)
581 params[p[0]] = p[1]
581 params[p[0]] = p[1]
582 return params
582 return params
583
583
584 def _processparam(self, name, value):
584 def _processparam(self, name, value):
585 """process a parameter, applying its effect if needed
585 """process a parameter, applying its effect if needed
586
586
587 Parameter starting with a lower case letter are advisory and will be
587 Parameter starting with a lower case letter are advisory and will be
588 ignored when unknown. Those starting with an upper case letter are
588 ignored when unknown. Those starting with an upper case letter are
589 mandatory and will this function will raise a KeyError when unknown.
589 mandatory and will this function will raise a KeyError when unknown.
590
590
591 Note: no option are currently supported. Any input will be either
591 Note: no option are currently supported. Any input will be either
592 ignored or failing.
592 ignored or failing.
593 """
593 """
594 if not name:
594 if not name:
595 raise ValueError('empty parameter name')
595 raise ValueError('empty parameter name')
596 if name[0] not in string.letters:
596 if name[0] not in string.letters:
597 raise ValueError('non letter first character: %r' % name)
597 raise ValueError('non letter first character: %r' % name)
598 # Some logic will be later added here to try to process the option for
598 # Some logic will be later added here to try to process the option for
599 # a dict of known parameter.
599 # a dict of known parameter.
600 if name[0].islower():
600 if name[0].islower():
601 self.ui.debug("ignoring unknown parameter %r\n" % name)
601 self.ui.debug("ignoring unknown parameter %r\n" % name)
602 else:
602 else:
603 raise error.UnsupportedPartError(params=(name,))
603 raise error.UnsupportedPartError(params=(name,))
604
604
605
605
606 def iterparts(self):
606 def iterparts(self):
607 """yield all parts contained in the stream"""
607 """yield all parts contained in the stream"""
608 # make sure param have been loaded
608 # make sure param have been loaded
609 self.params
609 self.params
610 self.ui.debug('start extraction of bundle2 parts\n')
610 self.ui.debug('start extraction of bundle2 parts\n')
611 headerblock = self._readpartheader()
611 headerblock = self._readpartheader()
612 while headerblock is not None:
612 while headerblock is not None:
613 part = unbundlepart(self.ui, headerblock, self._fp)
613 part = unbundlepart(self.ui, headerblock, self._fp)
614 yield part
614 yield part
615 part.seek(0, 2)
615 part.seek(0, 2)
616 headerblock = self._readpartheader()
616 headerblock = self._readpartheader()
617 self.ui.debug('end of bundle2 stream\n')
617 self.ui.debug('end of bundle2 stream\n')
618
618
619 def _readpartheader(self):
619 def _readpartheader(self):
620 """reads a part header size and return the bytes blob
620 """reads a part header size and return the bytes blob
621
621
622 returns None if empty"""
622 returns None if empty"""
623 headersize = self._unpack(_fpartheadersize)[0]
623 headersize = self._unpack(_fpartheadersize)[0]
624 if headersize < 0:
624 if headersize < 0:
625 raise error.BundleValueError('negative part header size: %i'
625 raise error.BundleValueError('negative part header size: %i'
626 % headersize)
626 % headersize)
627 self.ui.debug('part header size: %i\n' % headersize)
627 self.ui.debug('part header size: %i\n' % headersize)
628 if headersize:
628 if headersize:
629 return self._readexact(headersize)
629 return self._readexact(headersize)
630 return None
630 return None
631
631
632 def compressed(self):
632 def compressed(self):
633 return False
633 return False
634
634
635 formatmap = {'20': unbundle20}
635 formatmap = {'20': unbundle20}
636
636
637 class bundlepart(object):
637 class bundlepart(object):
638 """A bundle2 part contains application level payload
638 """A bundle2 part contains application level payload
639
639
640 The part `type` is used to route the part to the application level
640 The part `type` is used to route the part to the application level
641 handler.
641 handler.
642
642
643 The part payload is contained in ``part.data``. It could be raw bytes or a
643 The part payload is contained in ``part.data``. It could be raw bytes or a
644 generator of byte chunks.
644 generator of byte chunks.
645
645
646 You can add parameters to the part using the ``addparam`` method.
646 You can add parameters to the part using the ``addparam`` method.
647 Parameters can be either mandatory (default) or advisory. Remote side
647 Parameters can be either mandatory (default) or advisory. Remote side
648 should be able to safely ignore the advisory ones.
648 should be able to safely ignore the advisory ones.
649
649
650 Both data and parameters cannot be modified after the generation has begun.
650 Both data and parameters cannot be modified after the generation has begun.
651 """
651 """
652
652
653 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
653 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
654 data='', mandatory=True):
654 data='', mandatory=True):
655 validateparttype(parttype)
655 validateparttype(parttype)
656 self.id = None
656 self.id = None
657 self.type = parttype
657 self.type = parttype
658 self._data = data
658 self._data = data
659 self._mandatoryparams = list(mandatoryparams)
659 self._mandatoryparams = list(mandatoryparams)
660 self._advisoryparams = list(advisoryparams)
660 self._advisoryparams = list(advisoryparams)
661 # checking for duplicated entries
661 # checking for duplicated entries
662 self._seenparams = set()
662 self._seenparams = set()
663 for pname, __ in self._mandatoryparams + self._advisoryparams:
663 for pname, __ in self._mandatoryparams + self._advisoryparams:
664 if pname in self._seenparams:
664 if pname in self._seenparams:
665 raise RuntimeError('duplicated params: %s' % pname)
665 raise RuntimeError('duplicated params: %s' % pname)
666 self._seenparams.add(pname)
666 self._seenparams.add(pname)
667 # status of the part's generation:
667 # status of the part's generation:
668 # - None: not started,
668 # - None: not started,
669 # - False: currently generated,
669 # - False: currently generated,
670 # - True: generation done.
670 # - True: generation done.
671 self._generated = None
671 self._generated = None
672 self.mandatory = mandatory
672 self.mandatory = mandatory
673
673
674 def copy(self):
674 def copy(self):
675 """return a copy of the part
675 """return a copy of the part
676
676
677 The new part have the very same content but no partid assigned yet.
677 The new part have the very same content but no partid assigned yet.
678 Parts with generated data cannot be copied."""
678 Parts with generated data cannot be copied."""
679 assert not util.safehasattr(self.data, 'next')
679 assert not util.safehasattr(self.data, 'next')
680 return self.__class__(self.type, self._mandatoryparams,
680 return self.__class__(self.type, self._mandatoryparams,
681 self._advisoryparams, self._data, self.mandatory)
681 self._advisoryparams, self._data, self.mandatory)
682
682
683 # methods used to defines the part content
683 # methods used to defines the part content
684 def __setdata(self, data):
684 def __setdata(self, data):
685 if self._generated is not None:
685 if self._generated is not None:
686 raise error.ReadOnlyPartError('part is being generated')
686 raise error.ReadOnlyPartError('part is being generated')
687 self._data = data
687 self._data = data
688 def __getdata(self):
688 def __getdata(self):
689 return self._data
689 return self._data
690 data = property(__getdata, __setdata)
690 data = property(__getdata, __setdata)
691
691
692 @property
692 @property
693 def mandatoryparams(self):
693 def mandatoryparams(self):
694 # make it an immutable tuple to force people through ``addparam``
694 # make it an immutable tuple to force people through ``addparam``
695 return tuple(self._mandatoryparams)
695 return tuple(self._mandatoryparams)
696
696
697 @property
697 @property
698 def advisoryparams(self):
698 def advisoryparams(self):
699 # make it an immutable tuple to force people through ``addparam``
699 # make it an immutable tuple to force people through ``addparam``
700 return tuple(self._advisoryparams)
700 return tuple(self._advisoryparams)
701
701
702 def addparam(self, name, value='', mandatory=True):
702 def addparam(self, name, value='', mandatory=True):
703 if self._generated is not None:
703 if self._generated is not None:
704 raise error.ReadOnlyPartError('part is being generated')
704 raise error.ReadOnlyPartError('part is being generated')
705 if name in self._seenparams:
705 if name in self._seenparams:
706 raise ValueError('duplicated params: %s' % name)
706 raise ValueError('duplicated params: %s' % name)
707 self._seenparams.add(name)
707 self._seenparams.add(name)
708 params = self._advisoryparams
708 params = self._advisoryparams
709 if mandatory:
709 if mandatory:
710 params = self._mandatoryparams
710 params = self._mandatoryparams
711 params.append((name, value))
711 params.append((name, value))
712
712
713 # methods used to generates the bundle2 stream
713 # methods used to generates the bundle2 stream
714 def getchunks(self):
714 def getchunks(self):
715 if self._generated is not None:
715 if self._generated is not None:
716 raise RuntimeError('part can only be consumed once')
716 raise RuntimeError('part can only be consumed once')
717 self._generated = False
717 self._generated = False
718 #### header
718 #### header
719 if self.mandatory:
719 if self.mandatory:
720 parttype = self.type.upper()
720 parttype = self.type.upper()
721 else:
721 else:
722 parttype = self.type.lower()
722 parttype = self.type.lower()
723 ## parttype
723 ## parttype
724 header = [_pack(_fparttypesize, len(parttype)),
724 header = [_pack(_fparttypesize, len(parttype)),
725 parttype, _pack(_fpartid, self.id),
725 parttype, _pack(_fpartid, self.id),
726 ]
726 ]
727 ## parameters
727 ## parameters
728 # count
728 # count
729 manpar = self.mandatoryparams
729 manpar = self.mandatoryparams
730 advpar = self.advisoryparams
730 advpar = self.advisoryparams
731 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
731 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
732 # size
732 # size
733 parsizes = []
733 parsizes = []
734 for key, value in manpar:
734 for key, value in manpar:
735 parsizes.append(len(key))
735 parsizes.append(len(key))
736 parsizes.append(len(value))
736 parsizes.append(len(value))
737 for key, value in advpar:
737 for key, value in advpar:
738 parsizes.append(len(key))
738 parsizes.append(len(key))
739 parsizes.append(len(value))
739 parsizes.append(len(value))
740 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
740 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
741 header.append(paramsizes)
741 header.append(paramsizes)
742 # key, value
742 # key, value
743 for key, value in manpar:
743 for key, value in manpar:
744 header.append(key)
744 header.append(key)
745 header.append(value)
745 header.append(value)
746 for key, value in advpar:
746 for key, value in advpar:
747 header.append(key)
747 header.append(key)
748 header.append(value)
748 header.append(value)
749 ## finalize header
749 ## finalize header
750 headerchunk = ''.join(header)
750 headerchunk = ''.join(header)
751 yield _pack(_fpartheadersize, len(headerchunk))
751 yield _pack(_fpartheadersize, len(headerchunk))
752 yield headerchunk
752 yield headerchunk
753 ## payload
753 ## payload
754 try:
754 try:
755 for chunk in self._payloadchunks():
755 for chunk in self._payloadchunks():
756 yield _pack(_fpayloadsize, len(chunk))
756 yield _pack(_fpayloadsize, len(chunk))
757 yield chunk
757 yield chunk
758 except Exception, exc:
758 except Exception, exc:
759 # backup exception data for later
759 # backup exception data for later
760 exc_info = sys.exc_info()
760 exc_info = sys.exc_info()
761 msg = 'unexpected error: %s' % exc
761 msg = 'unexpected error: %s' % exc
762 interpart = bundlepart('error:abort', [('message', msg)],
762 interpart = bundlepart('error:abort', [('message', msg)],
763 mandatory=False)
763 mandatory=False)
764 interpart.id = 0
764 interpart.id = 0
765 yield _pack(_fpayloadsize, -1)
765 yield _pack(_fpayloadsize, -1)
766 for chunk in interpart.getchunks():
766 for chunk in interpart.getchunks():
767 yield chunk
767 yield chunk
768 # abort current part payload
768 # abort current part payload
769 yield _pack(_fpayloadsize, 0)
769 yield _pack(_fpayloadsize, 0)
770 raise exc_info[0], exc_info[1], exc_info[2]
770 raise exc_info[0], exc_info[1], exc_info[2]
771 # end of payload
771 # end of payload
772 yield _pack(_fpayloadsize, 0)
772 yield _pack(_fpayloadsize, 0)
773 self._generated = True
773 self._generated = True
774
774
775 def _payloadchunks(self):
775 def _payloadchunks(self):
776 """yield chunks of a the part payload
776 """yield chunks of a the part payload
777
777
778 Exists to handle the different methods to provide data to a part."""
778 Exists to handle the different methods to provide data to a part."""
779 # we only support fixed size data now.
779 # we only support fixed size data now.
780 # This will be improved in the future.
780 # This will be improved in the future.
781 if util.safehasattr(self.data, 'next'):
781 if util.safehasattr(self.data, 'next'):
782 buff = util.chunkbuffer(self.data)
782 buff = util.chunkbuffer(self.data)
783 chunk = buff.read(preferedchunksize)
783 chunk = buff.read(preferedchunksize)
784 while chunk:
784 while chunk:
785 yield chunk
785 yield chunk
786 chunk = buff.read(preferedchunksize)
786 chunk = buff.read(preferedchunksize)
787 elif len(self.data):
787 elif len(self.data):
788 yield self.data
788 yield self.data
789
789
790
790
791 flaginterrupt = -1
791 flaginterrupt = -1
792
792
793 class interrupthandler(unpackermixin):
793 class interrupthandler(unpackermixin):
794 """read one part and process it with restricted capability
794 """read one part and process it with restricted capability
795
795
796 This allows to transmit exception raised on the producer size during part
796 This allows to transmit exception raised on the producer size during part
797 iteration while the consumer is reading a part.
797 iteration while the consumer is reading a part.
798
798
799 Part processed in this manner only have access to a ui object,"""
799 Part processed in this manner only have access to a ui object,"""
800
800
801 def __init__(self, ui, fp):
801 def __init__(self, ui, fp):
802 super(interrupthandler, self).__init__(fp)
802 super(interrupthandler, self).__init__(fp)
803 self.ui = ui
803 self.ui = ui
804
804
805 def _readpartheader(self):
805 def _readpartheader(self):
806 """reads a part header size and return the bytes blob
806 """reads a part header size and return the bytes blob
807
807
808 returns None if empty"""
808 returns None if empty"""
809 headersize = self._unpack(_fpartheadersize)[0]
809 headersize = self._unpack(_fpartheadersize)[0]
810 if headersize < 0:
810 if headersize < 0:
811 raise error.BundleValueError('negative part header size: %i'
811 raise error.BundleValueError('negative part header size: %i'
812 % headersize)
812 % headersize)
813 self.ui.debug('part header size: %i\n' % headersize)
813 self.ui.debug('part header size: %i\n' % headersize)
814 if headersize:
814 if headersize:
815 return self._readexact(headersize)
815 return self._readexact(headersize)
816 return None
816 return None
817
817
818 def __call__(self):
818 def __call__(self):
819 self.ui.debug('bundle2 stream interruption, looking for a part.\n')
819 self.ui.debug('bundle2 stream interruption, looking for a part.\n')
820 headerblock = self._readpartheader()
820 headerblock = self._readpartheader()
821 if headerblock is None:
821 if headerblock is None:
822 self.ui.debug('no part found during interruption.\n')
822 self.ui.debug('no part found during interruption.\n')
823 return
823 return
824 part = unbundlepart(self.ui, headerblock, self._fp)
824 part = unbundlepart(self.ui, headerblock, self._fp)
825 op = interruptoperation(self.ui)
825 op = interruptoperation(self.ui)
826 _processpart(op, part)
826 _processpart(op, part)
827
827
828 class interruptoperation(object):
828 class interruptoperation(object):
829 """A limited operation to be use by part handler during interruption
829 """A limited operation to be use by part handler during interruption
830
830
831 It only have access to an ui object.
831 It only have access to an ui object.
832 """
832 """
833
833
834 def __init__(self, ui):
834 def __init__(self, ui):
835 self.ui = ui
835 self.ui = ui
836 self.reply = None
836 self.reply = None
837
837
838 @property
838 @property
839 def repo(self):
839 def repo(self):
840 raise RuntimeError('no repo access from stream interruption')
840 raise RuntimeError('no repo access from stream interruption')
841
841
842 def gettransaction(self):
842 def gettransaction(self):
843 raise TransactionUnavailable('no repo access from stream interruption')
843 raise TransactionUnavailable('no repo access from stream interruption')
844
844
845 class unbundlepart(unpackermixin):
845 class unbundlepart(unpackermixin):
846 """a bundle part read from a bundle"""
846 """a bundle part read from a bundle"""
847
847
848 def __init__(self, ui, header, fp):
848 def __init__(self, ui, header, fp):
849 super(unbundlepart, self).__init__(fp)
849 super(unbundlepart, self).__init__(fp)
850 self.ui = ui
850 self.ui = ui
851 # unbundle state attr
851 # unbundle state attr
852 self._headerdata = header
852 self._headerdata = header
853 self._headeroffset = 0
853 self._headeroffset = 0
854 self._initialized = False
854 self._initialized = False
855 self.consumed = False
855 self.consumed = False
856 # part data
856 # part data
857 self.id = None
857 self.id = None
858 self.type = None
858 self.type = None
859 self.mandatoryparams = None
859 self.mandatoryparams = None
860 self.advisoryparams = None
860 self.advisoryparams = None
861 self.params = None
861 self.params = None
862 self.mandatorykeys = ()
862 self.mandatorykeys = ()
863 self._payloadstream = None
863 self._payloadstream = None
864 self._readheader()
864 self._readheader()
865 self._mandatory = None
865 self._mandatory = None
866 self._chunkindex = [] #(payload, file) position tuples for chunk starts
866 self._chunkindex = [] #(payload, file) position tuples for chunk starts
867 self._pos = 0
867 self._pos = 0
868
868
869 def _fromheader(self, size):
869 def _fromheader(self, size):
870 """return the next <size> byte from the header"""
870 """return the next <size> byte from the header"""
871 offset = self._headeroffset
871 offset = self._headeroffset
872 data = self._headerdata[offset:(offset + size)]
872 data = self._headerdata[offset:(offset + size)]
873 self._headeroffset = offset + size
873 self._headeroffset = offset + size
874 return data
874 return data
875
875
876 def _unpackheader(self, format):
876 def _unpackheader(self, format):
877 """read given format from header
877 """read given format from header
878
878
879 This automatically compute the size of the format to read."""
879 This automatically compute the size of the format to read."""
880 data = self._fromheader(struct.calcsize(format))
880 data = self._fromheader(struct.calcsize(format))
881 return _unpack(format, data)
881 return _unpack(format, data)
882
882
883 def _initparams(self, mandatoryparams, advisoryparams):
883 def _initparams(self, mandatoryparams, advisoryparams):
884 """internal function to setup all logic related parameters"""
884 """internal function to setup all logic related parameters"""
885 # make it read only to prevent people touching it by mistake.
885 # make it read only to prevent people touching it by mistake.
886 self.mandatoryparams = tuple(mandatoryparams)
886 self.mandatoryparams = tuple(mandatoryparams)
887 self.advisoryparams = tuple(advisoryparams)
887 self.advisoryparams = tuple(advisoryparams)
888 # user friendly UI
888 # user friendly UI
889 self.params = dict(self.mandatoryparams)
889 self.params = dict(self.mandatoryparams)
890 self.params.update(dict(self.advisoryparams))
890 self.params.update(dict(self.advisoryparams))
891 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
891 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
892
892
893 def _payloadchunks(self, chunknum=0):
893 def _payloadchunks(self, chunknum=0):
894 '''seek to specified chunk and start yielding data'''
894 '''seek to specified chunk and start yielding data'''
895 if len(self._chunkindex) == 0:
895 if len(self._chunkindex) == 0:
896 assert chunknum == 0, 'Must start with chunk 0'
896 assert chunknum == 0, 'Must start with chunk 0'
897 self._chunkindex.append((0, super(unbundlepart, self).tell()))
897 self._chunkindex.append((0, super(unbundlepart, self).tell()))
898 else:
898 else:
899 assert chunknum < len(self._chunkindex), \
899 assert chunknum < len(self._chunkindex), \
900 'Unknown chunk %d' % chunknum
900 'Unknown chunk %d' % chunknum
901 super(unbundlepart, self).seek(self._chunkindex[chunknum][1])
901 super(unbundlepart, self).seek(self._chunkindex[chunknum][1])
902
902
903 pos = self._chunkindex[chunknum][0]
903 pos = self._chunkindex[chunknum][0]
904 payloadsize = self._unpack(_fpayloadsize)[0]
904 payloadsize = self._unpack(_fpayloadsize)[0]
905 self.ui.debug('payload chunk size: %i\n' % payloadsize)
905 self.ui.debug('payload chunk size: %i\n' % payloadsize)
906 while payloadsize:
906 while payloadsize:
907 if payloadsize == flaginterrupt:
907 if payloadsize == flaginterrupt:
908 # interruption detection, the handler will now read a
908 # interruption detection, the handler will now read a
909 # single part and process it.
909 # single part and process it.
910 interrupthandler(self.ui, self._fp)()
910 interrupthandler(self.ui, self._fp)()
911 elif payloadsize < 0:
911 elif payloadsize < 0:
912 msg = 'negative payload chunk size: %i' % payloadsize
912 msg = 'negative payload chunk size: %i' % payloadsize
913 raise error.BundleValueError(msg)
913 raise error.BundleValueError(msg)
914 else:
914 else:
915 result = self._readexact(payloadsize)
915 result = self._readexact(payloadsize)
916 chunknum += 1
916 chunknum += 1
917 pos += payloadsize
917 pos += payloadsize
918 if chunknum == len(self._chunkindex):
918 if chunknum == len(self._chunkindex):
919 self._chunkindex.append((pos,
919 self._chunkindex.append((pos,
920 super(unbundlepart, self).tell()))
920 super(unbundlepart, self).tell()))
921 yield result
921 yield result
922 payloadsize = self._unpack(_fpayloadsize)[0]
922 payloadsize = self._unpack(_fpayloadsize)[0]
923 self.ui.debug('payload chunk size: %i\n' % payloadsize)
923 self.ui.debug('payload chunk size: %i\n' % payloadsize)
924
924
925 def _findchunk(self, pos):
925 def _findchunk(self, pos):
926 '''for a given payload position, return a chunk number and offset'''
926 '''for a given payload position, return a chunk number and offset'''
927 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
927 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
928 if ppos == pos:
928 if ppos == pos:
929 return chunk, 0
929 return chunk, 0
930 elif ppos > pos:
930 elif ppos > pos:
931 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
931 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
932 raise ValueError('Unknown chunk')
932 raise ValueError('Unknown chunk')
933
933
934 def _readheader(self):
934 def _readheader(self):
935 """read the header and setup the object"""
935 """read the header and setup the object"""
936 typesize = self._unpackheader(_fparttypesize)[0]
936 typesize = self._unpackheader(_fparttypesize)[0]
937 self.type = self._fromheader(typesize)
937 self.type = self._fromheader(typesize)
938 self.ui.debug('part type: "%s"\n' % self.type)
938 self.ui.debug('part type: "%s"\n' % self.type)
939 self.id = self._unpackheader(_fpartid)[0]
939 self.id = self._unpackheader(_fpartid)[0]
940 self.ui.debug('part id: "%s"\n' % self.id)
940 self.ui.debug('part id: "%s"\n' % self.id)
941 # extract mandatory bit from type
941 # extract mandatory bit from type
942 self.mandatory = (self.type != self.type.lower())
942 self.mandatory = (self.type != self.type.lower())
943 self.type = self.type.lower()
943 self.type = self.type.lower()
944 ## reading parameters
944 ## reading parameters
945 # param count
945 # param count
946 mancount, advcount = self._unpackheader(_fpartparamcount)
946 mancount, advcount = self._unpackheader(_fpartparamcount)
947 self.ui.debug('part parameters: %i\n' % (mancount + advcount))
947 self.ui.debug('part parameters: %i\n' % (mancount + advcount))
948 # param size
948 # param size
949 fparamsizes = _makefpartparamsizes(mancount + advcount)
949 fparamsizes = _makefpartparamsizes(mancount + advcount)
950 paramsizes = self._unpackheader(fparamsizes)
950 paramsizes = self._unpackheader(fparamsizes)
951 # make it a list of couple again
951 # make it a list of couple again
952 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
952 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
953 # split mandatory from advisory
953 # split mandatory from advisory
954 mansizes = paramsizes[:mancount]
954 mansizes = paramsizes[:mancount]
955 advsizes = paramsizes[mancount:]
955 advsizes = paramsizes[mancount:]
956 # retrieve param value
956 # retrieve param value
957 manparams = []
957 manparams = []
958 for key, value in mansizes:
958 for key, value in mansizes:
959 manparams.append((self._fromheader(key), self._fromheader(value)))
959 manparams.append((self._fromheader(key), self._fromheader(value)))
960 advparams = []
960 advparams = []
961 for key, value in advsizes:
961 for key, value in advsizes:
962 advparams.append((self._fromheader(key), self._fromheader(value)))
962 advparams.append((self._fromheader(key), self._fromheader(value)))
963 self._initparams(manparams, advparams)
963 self._initparams(manparams, advparams)
964 ## part payload
964 ## part payload
965 self._payloadstream = util.chunkbuffer(self._payloadchunks())
965 self._payloadstream = util.chunkbuffer(self._payloadchunks())
966 # we read the data, tell it
966 # we read the data, tell it
967 self._initialized = True
967 self._initialized = True
968
968
969 def read(self, size=None):
969 def read(self, size=None):
970 """read payload data"""
970 """read payload data"""
971 if not self._initialized:
971 if not self._initialized:
972 self._readheader()
972 self._readheader()
973 if size is None:
973 if size is None:
974 data = self._payloadstream.read()
974 data = self._payloadstream.read()
975 else:
975 else:
976 data = self._payloadstream.read(size)
976 data = self._payloadstream.read(size)
977 if size is None or len(data) < size:
977 if size is None or len(data) < size:
978 self.consumed = True
978 self.consumed = True
979 self._pos += len(data)
979 self._pos += len(data)
980 return data
980 return data
981
981
982 def tell(self):
982 def tell(self):
983 return self._pos
983 return self._pos
984
984
985 def seek(self, offset, whence=0):
985 def seek(self, offset, whence=0):
986 if whence == 0:
986 if whence == 0:
987 newpos = offset
987 newpos = offset
988 elif whence == 1:
988 elif whence == 1:
989 newpos = self._pos + offset
989 newpos = self._pos + offset
990 elif whence == 2:
990 elif whence == 2:
991 if not self.consumed:
991 if not self.consumed:
992 self.read()
992 self.read()
993 newpos = self._chunkindex[-1][0] - offset
993 newpos = self._chunkindex[-1][0] - offset
994 else:
994 else:
995 raise ValueError('Unknown whence value: %r' % (whence,))
995 raise ValueError('Unknown whence value: %r' % (whence,))
996
996
997 if newpos > self._chunkindex[-1][0] and not self.consumed:
997 if newpos > self._chunkindex[-1][0] and not self.consumed:
998 self.read()
998 self.read()
999 if not 0 <= newpos <= self._chunkindex[-1][0]:
999 if not 0 <= newpos <= self._chunkindex[-1][0]:
1000 raise ValueError('Offset out of range')
1000 raise ValueError('Offset out of range')
1001
1001
1002 if self._pos != newpos:
1002 if self._pos != newpos:
1003 chunk, internaloffset = self._findchunk(newpos)
1003 chunk, internaloffset = self._findchunk(newpos)
1004 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1004 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1005 adjust = self.read(internaloffset)
1005 adjust = self.read(internaloffset)
1006 if len(adjust) != internaloffset:
1006 if len(adjust) != internaloffset:
1007 raise util.Abort(_('Seek failed\n'))
1007 raise util.Abort(_('Seek failed\n'))
1008 self._pos = newpos
1008 self._pos = newpos
1009
1009
1010 capabilities = {'HG20': (),
1010 capabilities = {'HG20': (),
1011 'listkeys': (),
1011 'listkeys': (),
1012 'pushkey': (),
1012 'pushkey': (),
1013 'digests': tuple(sorted(util.DIGESTS.keys())),
1013 'digests': tuple(sorted(util.DIGESTS.keys())),
1014 'remote-changegroup': ('http', 'https'),
1014 'remote-changegroup': ('http', 'https'),
1015 }
1015 }
1016
1016
1017 def getrepocaps(repo, allowpushback=False):
1017 def getrepocaps(repo, allowpushback=False):
1018 """return the bundle2 capabilities for a given repo
1018 """return the bundle2 capabilities for a given repo
1019
1019
1020 Exists to allow extensions (like evolution) to mutate the capabilities.
1020 Exists to allow extensions (like evolution) to mutate the capabilities.
1021 """
1021 """
1022 caps = capabilities.copy()
1022 caps = capabilities.copy()
1023 caps['changegroup'] = tuple(sorted(changegroup.packermap.keys()))
1023 caps['changegroup'] = tuple(sorted(changegroup.packermap.keys()))
1024 if obsolete.isenabled(repo, obsolete.exchangeopt):
1024 if obsolete.isenabled(repo, obsolete.exchangeopt):
1025 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1025 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1026 caps['obsmarkers'] = supportedformat
1026 caps['obsmarkers'] = supportedformat
1027 if allowpushback:
1027 if allowpushback:
1028 caps['pushback'] = ()
1028 caps['pushback'] = ()
1029 return caps
1029 return caps
1030
1030
1031 def bundle2caps(remote):
1031 def bundle2caps(remote):
1032 """return the bundle capabilities of a peer as dict"""
1032 """return the bundle capabilities of a peer as dict"""
1033 raw = remote.capable('bundle2')
1033 raw = remote.capable('bundle2')
1034 if not raw and raw != '':
1034 if not raw and raw != '':
1035 return {}
1035 return {}
1036 capsblob = urllib.unquote(remote.capable('bundle2'))
1036 capsblob = urllib.unquote(remote.capable('bundle2'))
1037 return decodecaps(capsblob)
1037 return decodecaps(capsblob)
1038
1038
1039 def obsmarkersversion(caps):
1039 def obsmarkersversion(caps):
1040 """extract the list of supported obsmarkers versions from a bundle2caps dict
1040 """extract the list of supported obsmarkers versions from a bundle2caps dict
1041 """
1041 """
1042 obscaps = caps.get('obsmarkers', ())
1042 obscaps = caps.get('obsmarkers', ())
1043 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1043 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1044
1044
1045 @parthandler('changegroup', ('version',))
1045 @parthandler('changegroup', ('version',))
1046 def handlechangegroup(op, inpart):
1046 def handlechangegroup(op, inpart):
1047 """apply a changegroup part on the repo
1047 """apply a changegroup part on the repo
1048
1048
1049 This is a very early implementation that will massive rework before being
1049 This is a very early implementation that will massive rework before being
1050 inflicted to any end-user.
1050 inflicted to any end-user.
1051 """
1051 """
1052 # Make sure we trigger a transaction creation
1052 # Make sure we trigger a transaction creation
1053 #
1053 #
1054 # The addchangegroup function will get a transaction object by itself, but
1054 # The addchangegroup function will get a transaction object by itself, but
1055 # we need to make sure we trigger the creation of a transaction object used
1055 # we need to make sure we trigger the creation of a transaction object used
1056 # for the whole processing scope.
1056 # for the whole processing scope.
1057 op.gettransaction()
1057 op.gettransaction()
1058 unpackerversion = inpart.params.get('version', '01')
1058 unpackerversion = inpart.params.get('version', '01')
1059 # We should raise an appropriate exception here
1059 # We should raise an appropriate exception here
1060 unpacker = changegroup.packermap[unpackerversion][1]
1060 unpacker = changegroup.packermap[unpackerversion][1]
1061 cg = unpacker(inpart, 'UN')
1061 cg = unpacker(inpart, 'UN')
1062 # the source and url passed here are overwritten by the one contained in
1062 # the source and url passed here are overwritten by the one contained in
1063 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1063 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1064 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
1064 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
1065 op.records.add('changegroup', {'return': ret})
1065 op.records.add('changegroup', {'return': ret})
1066 if op.reply is not None:
1066 if op.reply is not None:
1067 # This is definitely not the final form of this
1067 # This is definitely not the final form of this
1068 # return. But one need to start somewhere.
1068 # return. But one need to start somewhere.
1069 part = op.reply.newpart('reply:changegroup', mandatory=False)
1069 part = op.reply.newpart('reply:changegroup', mandatory=False)
1070 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1070 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1071 part.addparam('return', '%i' % ret, mandatory=False)
1071 part.addparam('return', '%i' % ret, mandatory=False)
1072 assert not inpart.read()
1072 assert not inpart.read()
1073
1073
1074 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1074 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1075 ['digest:%s' % k for k in util.DIGESTS.keys()])
1075 ['digest:%s' % k for k in util.DIGESTS.keys()])
1076 @parthandler('remote-changegroup', _remotechangegroupparams)
1076 @parthandler('remote-changegroup', _remotechangegroupparams)
1077 def handleremotechangegroup(op, inpart):
1077 def handleremotechangegroup(op, inpart):
1078 """apply a bundle10 on the repo, given an url and validation information
1078 """apply a bundle10 on the repo, given an url and validation information
1079
1079
1080 All the information about the remote bundle to import are given as
1080 All the information about the remote bundle to import are given as
1081 parameters. The parameters include:
1081 parameters. The parameters include:
1082 - url: the url to the bundle10.
1082 - url: the url to the bundle10.
1083 - size: the bundle10 file size. It is used to validate what was
1083 - size: the bundle10 file size. It is used to validate what was
1084 retrieved by the client matches the server knowledge about the bundle.
1084 retrieved by the client matches the server knowledge about the bundle.
1085 - digests: a space separated list of the digest types provided as
1085 - digests: a space separated list of the digest types provided as
1086 parameters.
1086 parameters.
1087 - digest:<digest-type>: the hexadecimal representation of the digest with
1087 - digest:<digest-type>: the hexadecimal representation of the digest with
1088 that name. Like the size, it is used to validate what was retrieved by
1088 that name. Like the size, it is used to validate what was retrieved by
1089 the client matches what the server knows about the bundle.
1089 the client matches what the server knows about the bundle.
1090
1090
1091 When multiple digest types are given, all of them are checked.
1091 When multiple digest types are given, all of them are checked.
1092 """
1092 """
1093 try:
1093 try:
1094 raw_url = inpart.params['url']
1094 raw_url = inpart.params['url']
1095 except KeyError:
1095 except KeyError:
1096 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1096 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1097 parsed_url = util.url(raw_url)
1097 parsed_url = util.url(raw_url)
1098 if parsed_url.scheme not in capabilities['remote-changegroup']:
1098 if parsed_url.scheme not in capabilities['remote-changegroup']:
1099 raise util.Abort(_('remote-changegroup does not support %s urls') %
1099 raise util.Abort(_('remote-changegroup does not support %s urls') %
1100 parsed_url.scheme)
1100 parsed_url.scheme)
1101
1101
1102 try:
1102 try:
1103 size = int(inpart.params['size'])
1103 size = int(inpart.params['size'])
1104 except ValueError:
1104 except ValueError:
1105 raise util.Abort(_('remote-changegroup: invalid value for param "%s"')
1105 raise util.Abort(_('remote-changegroup: invalid value for param "%s"')
1106 % 'size')
1106 % 'size')
1107 except KeyError:
1107 except KeyError:
1108 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1108 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1109
1109
1110 digests = {}
1110 digests = {}
1111 for typ in inpart.params.get('digests', '').split():
1111 for typ in inpart.params.get('digests', '').split():
1112 param = 'digest:%s' % typ
1112 param = 'digest:%s' % typ
1113 try:
1113 try:
1114 value = inpart.params[param]
1114 value = inpart.params[param]
1115 except KeyError:
1115 except KeyError:
1116 raise util.Abort(_('remote-changegroup: missing "%s" param') %
1116 raise util.Abort(_('remote-changegroup: missing "%s" param') %
1117 param)
1117 param)
1118 digests[typ] = value
1118 digests[typ] = value
1119
1119
1120 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1120 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1121
1121
1122 # Make sure we trigger a transaction creation
1122 # Make sure we trigger a transaction creation
1123 #
1123 #
1124 # The addchangegroup function will get a transaction object by itself, but
1124 # The addchangegroup function will get a transaction object by itself, but
1125 # we need to make sure we trigger the creation of a transaction object used
1125 # we need to make sure we trigger the creation of a transaction object used
1126 # for the whole processing scope.
1126 # for the whole processing scope.
1127 op.gettransaction()
1127 op.gettransaction()
1128 import exchange
1128 import exchange
1129 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1129 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1130 if not isinstance(cg, changegroup.cg1unpacker):
1130 if not isinstance(cg, changegroup.cg1unpacker):
1131 raise util.Abort(_('%s: not a bundle version 1.0') %
1131 raise util.Abort(_('%s: not a bundle version 1.0') %
1132 util.hidepassword(raw_url))
1132 util.hidepassword(raw_url))
1133 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
1133 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
1134 op.records.add('changegroup', {'return': ret})
1134 op.records.add('changegroup', {'return': ret})
1135 if op.reply is not None:
1135 if op.reply is not None:
1136 # This is definitely not the final form of this
1136 # This is definitely not the final form of this
1137 # return. But one need to start somewhere.
1137 # return. But one need to start somewhere.
1138 part = op.reply.newpart('reply:changegroup')
1138 part = op.reply.newpart('reply:changegroup')
1139 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1139 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1140 part.addparam('return', '%i' % ret, mandatory=False)
1140 part.addparam('return', '%i' % ret, mandatory=False)
1141 try:
1141 try:
1142 real_part.validate()
1142 real_part.validate()
1143 except util.Abort, e:
1143 except util.Abort, e:
1144 raise util.Abort(_('bundle at %s is corrupted:\n%s') %
1144 raise util.Abort(_('bundle at %s is corrupted:\n%s') %
1145 (util.hidepassword(raw_url), str(e)))
1145 (util.hidepassword(raw_url), str(e)))
1146 assert not inpart.read()
1146 assert not inpart.read()
1147
1147
1148 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1148 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1149 def handlereplychangegroup(op, inpart):
1149 def handlereplychangegroup(op, inpart):
1150 ret = int(inpart.params['return'])
1150 ret = int(inpart.params['return'])
1151 replyto = int(inpart.params['in-reply-to'])
1151 replyto = int(inpart.params['in-reply-to'])
1152 op.records.add('changegroup', {'return': ret}, replyto)
1152 op.records.add('changegroup', {'return': ret}, replyto)
1153
1153
1154 @parthandler('check:heads')
1154 @parthandler('check:heads')
1155 def handlecheckheads(op, inpart):
1155 def handlecheckheads(op, inpart):
1156 """check that head of the repo did not change
1156 """check that head of the repo did not change
1157
1157
1158 This is used to detect a push race when using unbundle.
1158 This is used to detect a push race when using unbundle.
1159 This replaces the "heads" argument of unbundle."""
1159 This replaces the "heads" argument of unbundle."""
1160 h = inpart.read(20)
1160 h = inpart.read(20)
1161 heads = []
1161 heads = []
1162 while len(h) == 20:
1162 while len(h) == 20:
1163 heads.append(h)
1163 heads.append(h)
1164 h = inpart.read(20)
1164 h = inpart.read(20)
1165 assert not h
1165 assert not h
1166 if heads != op.repo.heads():
1166 if heads != op.repo.heads():
1167 raise error.PushRaced('repository changed while pushing - '
1167 raise error.PushRaced('repository changed while pushing - '
1168 'please try again')
1168 'please try again')
1169
1169
1170 @parthandler('output')
1170 @parthandler('output')
1171 def handleoutput(op, inpart):
1171 def handleoutput(op, inpart):
1172 """forward output captured on the server to the client"""
1172 """forward output captured on the server to the client"""
1173 for line in inpart.read().splitlines():
1173 for line in inpart.read().splitlines():
1174 op.ui.write(('remote: %s\n' % line))
1174 op.ui.status(('remote: %s\n' % line))
1175
1175
1176 @parthandler('replycaps')
1176 @parthandler('replycaps')
1177 def handlereplycaps(op, inpart):
1177 def handlereplycaps(op, inpart):
1178 """Notify that a reply bundle should be created
1178 """Notify that a reply bundle should be created
1179
1179
1180 The payload contains the capabilities information for the reply"""
1180 The payload contains the capabilities information for the reply"""
1181 caps = decodecaps(inpart.read())
1181 caps = decodecaps(inpart.read())
1182 if op.reply is None:
1182 if op.reply is None:
1183 op.reply = bundle20(op.ui, caps)
1183 op.reply = bundle20(op.ui, caps)
1184
1184
1185 @parthandler('error:abort', ('message', 'hint'))
1185 @parthandler('error:abort', ('message', 'hint'))
1186 def handleerrorabort(op, inpart):
1186 def handleerrorabort(op, inpart):
1187 """Used to transmit abort error over the wire"""
1187 """Used to transmit abort error over the wire"""
1188 raise util.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
1188 raise util.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
1189
1189
1190 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1190 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1191 def handleerrorunsupportedcontent(op, inpart):
1191 def handleerrorunsupportedcontent(op, inpart):
1192 """Used to transmit unknown content error over the wire"""
1192 """Used to transmit unknown content error over the wire"""
1193 kwargs = {}
1193 kwargs = {}
1194 parttype = inpart.params.get('parttype')
1194 parttype = inpart.params.get('parttype')
1195 if parttype is not None:
1195 if parttype is not None:
1196 kwargs['parttype'] = parttype
1196 kwargs['parttype'] = parttype
1197 params = inpart.params.get('params')
1197 params = inpart.params.get('params')
1198 if params is not None:
1198 if params is not None:
1199 kwargs['params'] = params.split('\0')
1199 kwargs['params'] = params.split('\0')
1200
1200
1201 raise error.UnsupportedPartError(**kwargs)
1201 raise error.UnsupportedPartError(**kwargs)
1202
1202
1203 @parthandler('error:pushraced', ('message',))
1203 @parthandler('error:pushraced', ('message',))
1204 def handleerrorpushraced(op, inpart):
1204 def handleerrorpushraced(op, inpart):
1205 """Used to transmit push race error over the wire"""
1205 """Used to transmit push race error over the wire"""
1206 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1206 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1207
1207
1208 @parthandler('listkeys', ('namespace',))
1208 @parthandler('listkeys', ('namespace',))
1209 def handlelistkeys(op, inpart):
1209 def handlelistkeys(op, inpart):
1210 """retrieve pushkey namespace content stored in a bundle2"""
1210 """retrieve pushkey namespace content stored in a bundle2"""
1211 namespace = inpart.params['namespace']
1211 namespace = inpart.params['namespace']
1212 r = pushkey.decodekeys(inpart.read())
1212 r = pushkey.decodekeys(inpart.read())
1213 op.records.add('listkeys', (namespace, r))
1213 op.records.add('listkeys', (namespace, r))
1214
1214
1215 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1215 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1216 def handlepushkey(op, inpart):
1216 def handlepushkey(op, inpart):
1217 """process a pushkey request"""
1217 """process a pushkey request"""
1218 dec = pushkey.decode
1218 dec = pushkey.decode
1219 namespace = dec(inpart.params['namespace'])
1219 namespace = dec(inpart.params['namespace'])
1220 key = dec(inpart.params['key'])
1220 key = dec(inpart.params['key'])
1221 old = dec(inpart.params['old'])
1221 old = dec(inpart.params['old'])
1222 new = dec(inpart.params['new'])
1222 new = dec(inpart.params['new'])
1223 ret = op.repo.pushkey(namespace, key, old, new)
1223 ret = op.repo.pushkey(namespace, key, old, new)
1224 record = {'namespace': namespace,
1224 record = {'namespace': namespace,
1225 'key': key,
1225 'key': key,
1226 'old': old,
1226 'old': old,
1227 'new': new}
1227 'new': new}
1228 op.records.add('pushkey', record)
1228 op.records.add('pushkey', record)
1229 if op.reply is not None:
1229 if op.reply is not None:
1230 rpart = op.reply.newpart('reply:pushkey')
1230 rpart = op.reply.newpart('reply:pushkey')
1231 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1231 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1232 rpart.addparam('return', '%i' % ret, mandatory=False)
1232 rpart.addparam('return', '%i' % ret, mandatory=False)
1233
1233
1234 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1234 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1235 def handlepushkeyreply(op, inpart):
1235 def handlepushkeyreply(op, inpart):
1236 """retrieve the result of a pushkey request"""
1236 """retrieve the result of a pushkey request"""
1237 ret = int(inpart.params['return'])
1237 ret = int(inpart.params['return'])
1238 partid = int(inpart.params['in-reply-to'])
1238 partid = int(inpart.params['in-reply-to'])
1239 op.records.add('pushkey', {'return': ret}, partid)
1239 op.records.add('pushkey', {'return': ret}, partid)
1240
1240
1241 @parthandler('obsmarkers')
1241 @parthandler('obsmarkers')
1242 def handleobsmarker(op, inpart):
1242 def handleobsmarker(op, inpart):
1243 """add a stream of obsmarkers to the repo"""
1243 """add a stream of obsmarkers to the repo"""
1244 tr = op.gettransaction()
1244 tr = op.gettransaction()
1245 markerdata = inpart.read()
1245 markerdata = inpart.read()
1246 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1246 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1247 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1247 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1248 % len(markerdata))
1248 % len(markerdata))
1249 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1249 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1250 if new:
1250 if new:
1251 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1251 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1252 op.records.add('obsmarkers', {'new': new})
1252 op.records.add('obsmarkers', {'new': new})
1253 if op.reply is not None:
1253 if op.reply is not None:
1254 rpart = op.reply.newpart('reply:obsmarkers')
1254 rpart = op.reply.newpart('reply:obsmarkers')
1255 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1255 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1256 rpart.addparam('new', '%i' % new, mandatory=False)
1256 rpart.addparam('new', '%i' % new, mandatory=False)
1257
1257
1258
1258
1259 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1259 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1260 def handlepushkeyreply(op, inpart):
1260 def handlepushkeyreply(op, inpart):
1261 """retrieve the result of a pushkey request"""
1261 """retrieve the result of a pushkey request"""
1262 ret = int(inpart.params['new'])
1262 ret = int(inpart.params['new'])
1263 partid = int(inpart.params['in-reply-to'])
1263 partid = int(inpart.params['in-reply-to'])
1264 op.records.add('obsmarkers', {'new': ret}, partid)
1264 op.records.add('obsmarkers', {'new': ret}, partid)
General Comments 0
You need to be logged in to leave comments. Login now