##// END OF EJS Templates
bundle2: handle new line in 'indebug' function...
Pierre-Yves David -
r25320:697d8953 default
parent child Browse files
Show More
@@ -1,1282 +1,1282 b''
1 # bundle2.py - generic container format to transmit arbitrary data.
1 # bundle2.py - generic container format to transmit arbitrary data.
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7 """Handling of the new bundle2 format
7 """Handling of the new bundle2 format
8
8
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 payloads in an application agnostic way. It consist in a sequence of "parts"
10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 that will be handed to and processed by the application layer.
11 that will be handed to and processed by the application layer.
12
12
13
13
14 General format architecture
14 General format architecture
15 ===========================
15 ===========================
16
16
17 The format is architectured as follow
17 The format is architectured as follow
18
18
19 - magic string
19 - magic string
20 - stream level parameters
20 - stream level parameters
21 - payload parts (any number)
21 - payload parts (any number)
22 - end of stream marker.
22 - end of stream marker.
23
23
24 the Binary format
24 the Binary format
25 ============================
25 ============================
26
26
27 All numbers are unsigned and big-endian.
27 All numbers are unsigned and big-endian.
28
28
29 stream level parameters
29 stream level parameters
30 ------------------------
30 ------------------------
31
31
32 Binary format is as follow
32 Binary format is as follow
33
33
34 :params size: int32
34 :params size: int32
35
35
36 The total number of Bytes used by the parameters
36 The total number of Bytes used by the parameters
37
37
38 :params value: arbitrary number of Bytes
38 :params value: arbitrary number of Bytes
39
39
40 A blob of `params size` containing the serialized version of all stream level
40 A blob of `params size` containing the serialized version of all stream level
41 parameters.
41 parameters.
42
42
43 The blob contains a space separated list of parameters. Parameters with value
43 The blob contains a space separated list of parameters. Parameters with value
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45
45
46 Empty name are obviously forbidden.
46 Empty name are obviously forbidden.
47
47
48 Name MUST start with a letter. If this first letter is lower case, the
48 Name MUST start with a letter. If this first letter is lower case, the
49 parameter is advisory and can be safely ignored. However when the first
49 parameter is advisory and can be safely ignored. However when the first
50 letter is capital, the parameter is mandatory and the bundling process MUST
50 letter is capital, the parameter is mandatory and the bundling process MUST
51 stop if he is not able to proceed it.
51 stop if he is not able to proceed it.
52
52
53 Stream parameters use a simple textual format for two main reasons:
53 Stream parameters use a simple textual format for two main reasons:
54
54
55 - Stream level parameters should remain simple and we want to discourage any
55 - Stream level parameters should remain simple and we want to discourage any
56 crazy usage.
56 crazy usage.
57 - Textual data allow easy human inspection of a bundle2 header in case of
57 - Textual data allow easy human inspection of a bundle2 header in case of
58 troubles.
58 troubles.
59
59
60 Any Applicative level options MUST go into a bundle2 part instead.
60 Any Applicative level options MUST go into a bundle2 part instead.
61
61
62 Payload part
62 Payload part
63 ------------------------
63 ------------------------
64
64
65 Binary format is as follow
65 Binary format is as follow
66
66
67 :header size: int32
67 :header size: int32
68
68
69 The total number of Bytes used by the part headers. When the header is empty
69 The total number of Bytes used by the part headers. When the header is empty
70 (size = 0) this is interpreted as the end of stream marker.
70 (size = 0) this is interpreted as the end of stream marker.
71
71
72 :header:
72 :header:
73
73
74 The header defines how to interpret the part. It contains two piece of
74 The header defines how to interpret the part. It contains two piece of
75 data: the part type, and the part parameters.
75 data: the part type, and the part parameters.
76
76
77 The part type is used to route an application level handler, that can
77 The part type is used to route an application level handler, that can
78 interpret payload.
78 interpret payload.
79
79
80 Part parameters are passed to the application level handler. They are
80 Part parameters are passed to the application level handler. They are
81 meant to convey information that will help the application level object to
81 meant to convey information that will help the application level object to
82 interpret the part payload.
82 interpret the part payload.
83
83
84 The binary format of the header is has follow
84 The binary format of the header is has follow
85
85
86 :typesize: (one byte)
86 :typesize: (one byte)
87
87
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89
89
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 to this part.
91 to this part.
92
92
93 :parameters:
93 :parameters:
94
94
95 Part's parameter may have arbitrary content, the binary structure is::
95 Part's parameter may have arbitrary content, the binary structure is::
96
96
97 <mandatory-count><advisory-count><param-sizes><param-data>
97 <mandatory-count><advisory-count><param-sizes><param-data>
98
98
99 :mandatory-count: 1 byte, number of mandatory parameters
99 :mandatory-count: 1 byte, number of mandatory parameters
100
100
101 :advisory-count: 1 byte, number of advisory parameters
101 :advisory-count: 1 byte, number of advisory parameters
102
102
103 :param-sizes:
103 :param-sizes:
104
104
105 N couple of bytes, where N is the total number of parameters. Each
105 N couple of bytes, where N is the total number of parameters. Each
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107
107
108 :param-data:
108 :param-data:
109
109
110 A blob of bytes from which each parameter key and value can be
110 A blob of bytes from which each parameter key and value can be
111 retrieved using the list of size couples stored in the previous
111 retrieved using the list of size couples stored in the previous
112 field.
112 field.
113
113
114 Mandatory parameters comes first, then the advisory ones.
114 Mandatory parameters comes first, then the advisory ones.
115
115
116 Each parameter's key MUST be unique within the part.
116 Each parameter's key MUST be unique within the part.
117
117
118 :payload:
118 :payload:
119
119
120 payload is a series of `<chunksize><chunkdata>`.
120 payload is a series of `<chunksize><chunkdata>`.
121
121
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124
124
125 The current implementation always produces either zero or one chunk.
125 The current implementation always produces either zero or one chunk.
126 This is an implementation limitation that will ultimately be lifted.
126 This is an implementation limitation that will ultimately be lifted.
127
127
128 `chunksize` can be negative to trigger special case processing. No such
128 `chunksize` can be negative to trigger special case processing. No such
129 processing is in place yet.
129 processing is in place yet.
130
130
131 Bundle processing
131 Bundle processing
132 ============================
132 ============================
133
133
134 Each part is processed in order using a "part handler". Handler are registered
134 Each part is processed in order using a "part handler". Handler are registered
135 for a certain part type.
135 for a certain part type.
136
136
137 The matching of a part to its handler is case insensitive. The case of the
137 The matching of a part to its handler is case insensitive. The case of the
138 part type is used to know if a part is mandatory or advisory. If the Part type
138 part type is used to know if a part is mandatory or advisory. If the Part type
139 contains any uppercase char it is considered mandatory. When no handler is
139 contains any uppercase char it is considered mandatory. When no handler is
140 known for a Mandatory part, the process is aborted and an exception is raised.
140 known for a Mandatory part, the process is aborted and an exception is raised.
141 If the part is advisory and no handler is known, the part is ignored. When the
141 If the part is advisory and no handler is known, the part is ignored. When the
142 process is aborted, the full bundle is still read from the stream to keep the
142 process is aborted, the full bundle is still read from the stream to keep the
143 channel usable. But none of the part read from an abort are processed. In the
143 channel usable. But none of the part read from an abort are processed. In the
144 future, dropping the stream may become an option for channel we do not care to
144 future, dropping the stream may become an option for channel we do not care to
145 preserve.
145 preserve.
146 """
146 """
147
147
148 import errno
148 import errno
149 import sys
149 import sys
150 import util
150 import util
151 import struct
151 import struct
152 import urllib
152 import urllib
153 import string
153 import string
154 import obsolete
154 import obsolete
155 import pushkey
155 import pushkey
156 import url
156 import url
157 import re
157 import re
158
158
159 import changegroup, error
159 import changegroup, error
160 from i18n import _
160 from i18n import _
161
161
162 _pack = struct.pack
162 _pack = struct.pack
163 _unpack = struct.unpack
163 _unpack = struct.unpack
164
164
165 _fstreamparamsize = '>i'
165 _fstreamparamsize = '>i'
166 _fpartheadersize = '>i'
166 _fpartheadersize = '>i'
167 _fparttypesize = '>B'
167 _fparttypesize = '>B'
168 _fpartid = '>I'
168 _fpartid = '>I'
169 _fpayloadsize = '>i'
169 _fpayloadsize = '>i'
170 _fpartparamcount = '>BB'
170 _fpartparamcount = '>BB'
171
171
172 preferedchunksize = 4096
172 preferedchunksize = 4096
173
173
174 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
174 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
175
175
176 def outdebug(ui, message):
176 def outdebug(ui, message):
177 """debug regarding output stream (bundling)"""
177 """debug regarding output stream (bundling)"""
178 ui.debug('bundle2-output: %s\n' % message)
178 ui.debug('bundle2-output: %s\n' % message)
179
179
180 def indebug(ui, message):
180 def indebug(ui, message):
181 """debug on input stream (unbundling)"""
181 """debug on input stream (unbundling)"""
182 ui.debug('bundle2-input: %s' % message)
182 ui.debug('bundle2-input: %s\n' % message)
183
183
184 def validateparttype(parttype):
184 def validateparttype(parttype):
185 """raise ValueError if a parttype contains invalid character"""
185 """raise ValueError if a parttype contains invalid character"""
186 if _parttypeforbidden.search(parttype):
186 if _parttypeforbidden.search(parttype):
187 raise ValueError(parttype)
187 raise ValueError(parttype)
188
188
189 def _makefpartparamsizes(nbparams):
189 def _makefpartparamsizes(nbparams):
190 """return a struct format to read part parameter sizes
190 """return a struct format to read part parameter sizes
191
191
192 The number parameters is variable so we need to build that format
192 The number parameters is variable so we need to build that format
193 dynamically.
193 dynamically.
194 """
194 """
195 return '>'+('BB'*nbparams)
195 return '>'+('BB'*nbparams)
196
196
197 parthandlermapping = {}
197 parthandlermapping = {}
198
198
199 def parthandler(parttype, params=()):
199 def parthandler(parttype, params=()):
200 """decorator that register a function as a bundle2 part handler
200 """decorator that register a function as a bundle2 part handler
201
201
202 eg::
202 eg::
203
203
204 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
204 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
205 def myparttypehandler(...):
205 def myparttypehandler(...):
206 '''process a part of type "my part".'''
206 '''process a part of type "my part".'''
207 ...
207 ...
208 """
208 """
209 validateparttype(parttype)
209 validateparttype(parttype)
210 def _decorator(func):
210 def _decorator(func):
211 lparttype = parttype.lower() # enforce lower case matching.
211 lparttype = parttype.lower() # enforce lower case matching.
212 assert lparttype not in parthandlermapping
212 assert lparttype not in parthandlermapping
213 parthandlermapping[lparttype] = func
213 parthandlermapping[lparttype] = func
214 func.params = frozenset(params)
214 func.params = frozenset(params)
215 return func
215 return func
216 return _decorator
216 return _decorator
217
217
218 class unbundlerecords(object):
218 class unbundlerecords(object):
219 """keep record of what happens during and unbundle
219 """keep record of what happens during and unbundle
220
220
221 New records are added using `records.add('cat', obj)`. Where 'cat' is a
221 New records are added using `records.add('cat', obj)`. Where 'cat' is a
222 category of record and obj is an arbitrary object.
222 category of record and obj is an arbitrary object.
223
223
224 `records['cat']` will return all entries of this category 'cat'.
224 `records['cat']` will return all entries of this category 'cat'.
225
225
226 Iterating on the object itself will yield `('category', obj)` tuples
226 Iterating on the object itself will yield `('category', obj)` tuples
227 for all entries.
227 for all entries.
228
228
229 All iterations happens in chronological order.
229 All iterations happens in chronological order.
230 """
230 """
231
231
232 def __init__(self):
232 def __init__(self):
233 self._categories = {}
233 self._categories = {}
234 self._sequences = []
234 self._sequences = []
235 self._replies = {}
235 self._replies = {}
236
236
237 def add(self, category, entry, inreplyto=None):
237 def add(self, category, entry, inreplyto=None):
238 """add a new record of a given category.
238 """add a new record of a given category.
239
239
240 The entry can then be retrieved in the list returned by
240 The entry can then be retrieved in the list returned by
241 self['category']."""
241 self['category']."""
242 self._categories.setdefault(category, []).append(entry)
242 self._categories.setdefault(category, []).append(entry)
243 self._sequences.append((category, entry))
243 self._sequences.append((category, entry))
244 if inreplyto is not None:
244 if inreplyto is not None:
245 self.getreplies(inreplyto).add(category, entry)
245 self.getreplies(inreplyto).add(category, entry)
246
246
247 def getreplies(self, partid):
247 def getreplies(self, partid):
248 """get the records that are replies to a specific part"""
248 """get the records that are replies to a specific part"""
249 return self._replies.setdefault(partid, unbundlerecords())
249 return self._replies.setdefault(partid, unbundlerecords())
250
250
251 def __getitem__(self, cat):
251 def __getitem__(self, cat):
252 return tuple(self._categories.get(cat, ()))
252 return tuple(self._categories.get(cat, ()))
253
253
254 def __iter__(self):
254 def __iter__(self):
255 return iter(self._sequences)
255 return iter(self._sequences)
256
256
257 def __len__(self):
257 def __len__(self):
258 return len(self._sequences)
258 return len(self._sequences)
259
259
260 def __nonzero__(self):
260 def __nonzero__(self):
261 return bool(self._sequences)
261 return bool(self._sequences)
262
262
263 class bundleoperation(object):
263 class bundleoperation(object):
264 """an object that represents a single bundling process
264 """an object that represents a single bundling process
265
265
266 Its purpose is to carry unbundle-related objects and states.
266 Its purpose is to carry unbundle-related objects and states.
267
267
268 A new object should be created at the beginning of each bundle processing.
268 A new object should be created at the beginning of each bundle processing.
269 The object is to be returned by the processing function.
269 The object is to be returned by the processing function.
270
270
271 The object has very little content now it will ultimately contain:
271 The object has very little content now it will ultimately contain:
272 * an access to the repo the bundle is applied to,
272 * an access to the repo the bundle is applied to,
273 * a ui object,
273 * a ui object,
274 * a way to retrieve a transaction to add changes to the repo,
274 * a way to retrieve a transaction to add changes to the repo,
275 * a way to record the result of processing each part,
275 * a way to record the result of processing each part,
276 * a way to construct a bundle response when applicable.
276 * a way to construct a bundle response when applicable.
277 """
277 """
278
278
279 def __init__(self, repo, transactiongetter, captureoutput=True):
279 def __init__(self, repo, transactiongetter, captureoutput=True):
280 self.repo = repo
280 self.repo = repo
281 self.ui = repo.ui
281 self.ui = repo.ui
282 self.records = unbundlerecords()
282 self.records = unbundlerecords()
283 self.gettransaction = transactiongetter
283 self.gettransaction = transactiongetter
284 self.reply = None
284 self.reply = None
285 self.captureoutput = captureoutput
285 self.captureoutput = captureoutput
286
286
287 class TransactionUnavailable(RuntimeError):
287 class TransactionUnavailable(RuntimeError):
288 pass
288 pass
289
289
290 def _notransaction():
290 def _notransaction():
291 """default method to get a transaction while processing a bundle
291 """default method to get a transaction while processing a bundle
292
292
293 Raise an exception to highlight the fact that no transaction was expected
293 Raise an exception to highlight the fact that no transaction was expected
294 to be created"""
294 to be created"""
295 raise TransactionUnavailable()
295 raise TransactionUnavailable()
296
296
297 def processbundle(repo, unbundler, transactiongetter=None, op=None):
297 def processbundle(repo, unbundler, transactiongetter=None, op=None):
298 """This function process a bundle, apply effect to/from a repo
298 """This function process a bundle, apply effect to/from a repo
299
299
300 It iterates over each part then searches for and uses the proper handling
300 It iterates over each part then searches for and uses the proper handling
301 code to process the part. Parts are processed in order.
301 code to process the part. Parts are processed in order.
302
302
303 This is very early version of this function that will be strongly reworked
303 This is very early version of this function that will be strongly reworked
304 before final usage.
304 before final usage.
305
305
306 Unknown Mandatory part will abort the process.
306 Unknown Mandatory part will abort the process.
307
307
308 It is temporarily possible to provide a prebuilt bundleoperation to the
308 It is temporarily possible to provide a prebuilt bundleoperation to the
309 function. This is used to ensure output is properly propagated in case of
309 function. This is used to ensure output is properly propagated in case of
310 an error during the unbundling. This output capturing part will likely be
310 an error during the unbundling. This output capturing part will likely be
311 reworked and this ability will probably go away in the process.
311 reworked and this ability will probably go away in the process.
312 """
312 """
313 if op is None:
313 if op is None:
314 if transactiongetter is None:
314 if transactiongetter is None:
315 transactiongetter = _notransaction
315 transactiongetter = _notransaction
316 op = bundleoperation(repo, transactiongetter)
316 op = bundleoperation(repo, transactiongetter)
317 # todo:
317 # todo:
318 # - replace this is a init function soon.
318 # - replace this is a init function soon.
319 # - exception catching
319 # - exception catching
320 unbundler.params
320 unbundler.params
321 iterparts = unbundler.iterparts()
321 iterparts = unbundler.iterparts()
322 part = None
322 part = None
323 try:
323 try:
324 for part in iterparts:
324 for part in iterparts:
325 _processpart(op, part)
325 _processpart(op, part)
326 except BaseException, exc:
326 except BaseException, exc:
327 for part in iterparts:
327 for part in iterparts:
328 # consume the bundle content
328 # consume the bundle content
329 part.seek(0, 2)
329 part.seek(0, 2)
330 # Small hack to let caller code distinguish exceptions from bundle2
330 # Small hack to let caller code distinguish exceptions from bundle2
331 # processing from processing the old format. This is mostly
331 # processing from processing the old format. This is mostly
332 # needed to handle different return codes to unbundle according to the
332 # needed to handle different return codes to unbundle according to the
333 # type of bundle. We should probably clean up or drop this return code
333 # type of bundle. We should probably clean up or drop this return code
334 # craziness in a future version.
334 # craziness in a future version.
335 exc.duringunbundle2 = True
335 exc.duringunbundle2 = True
336 salvaged = []
336 salvaged = []
337 if op.reply is not None:
337 if op.reply is not None:
338 salvaged = op.reply.salvageoutput()
338 salvaged = op.reply.salvageoutput()
339 exc._bundle2salvagedoutput = salvaged
339 exc._bundle2salvagedoutput = salvaged
340 raise
340 raise
341 return op
341 return op
342
342
343 def _processpart(op, part):
343 def _processpart(op, part):
344 """process a single part from a bundle
344 """process a single part from a bundle
345
345
346 The part is guaranteed to have been fully consumed when the function exits
346 The part is guaranteed to have been fully consumed when the function exits
347 (even if an exception is raised)."""
347 (even if an exception is raised)."""
348 try:
348 try:
349 try:
349 try:
350 handler = parthandlermapping.get(part.type)
350 handler = parthandlermapping.get(part.type)
351 if handler is None:
351 if handler is None:
352 raise error.UnsupportedPartError(parttype=part.type)
352 raise error.UnsupportedPartError(parttype=part.type)
353 indebug(op.ui, 'found a handler for part %r\n' % part.type)
353 indebug(op.ui, 'found a handler for part %r' % part.type)
354 unknownparams = part.mandatorykeys - handler.params
354 unknownparams = part.mandatorykeys - handler.params
355 if unknownparams:
355 if unknownparams:
356 unknownparams = list(unknownparams)
356 unknownparams = list(unknownparams)
357 unknownparams.sort()
357 unknownparams.sort()
358 raise error.UnsupportedPartError(parttype=part.type,
358 raise error.UnsupportedPartError(parttype=part.type,
359 params=unknownparams)
359 params=unknownparams)
360 except error.UnsupportedPartError, exc:
360 except error.UnsupportedPartError, exc:
361 if part.mandatory: # mandatory parts
361 if part.mandatory: # mandatory parts
362 raise
362 raise
363 indebug(op.ui, 'ignoring unsupported advisory part %s\n' % exc)
363 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
364 return # skip to part processing
364 return # skip to part processing
365
365
366 # handler is called outside the above try block so that we don't
366 # handler is called outside the above try block so that we don't
367 # risk catching KeyErrors from anything other than the
367 # risk catching KeyErrors from anything other than the
368 # parthandlermapping lookup (any KeyError raised by handler()
368 # parthandlermapping lookup (any KeyError raised by handler()
369 # itself represents a defect of a different variety).
369 # itself represents a defect of a different variety).
370 output = None
370 output = None
371 if op.captureoutput and op.reply is not None:
371 if op.captureoutput and op.reply is not None:
372 op.ui.pushbuffer(error=True, subproc=True)
372 op.ui.pushbuffer(error=True, subproc=True)
373 output = ''
373 output = ''
374 try:
374 try:
375 handler(op, part)
375 handler(op, part)
376 finally:
376 finally:
377 if output is not None:
377 if output is not None:
378 output = op.ui.popbuffer()
378 output = op.ui.popbuffer()
379 if output:
379 if output:
380 outpart = op.reply.newpart('output', data=output,
380 outpart = op.reply.newpart('output', data=output,
381 mandatory=False)
381 mandatory=False)
382 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
382 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
383 finally:
383 finally:
384 # consume the part content to not corrupt the stream.
384 # consume the part content to not corrupt the stream.
385 part.seek(0, 2)
385 part.seek(0, 2)
386
386
387
387
388 def decodecaps(blob):
388 def decodecaps(blob):
389 """decode a bundle2 caps bytes blob into a dictionary
389 """decode a bundle2 caps bytes blob into a dictionary
390
390
391 The blob is a list of capabilities (one per line)
391 The blob is a list of capabilities (one per line)
392 Capabilities may have values using a line of the form::
392 Capabilities may have values using a line of the form::
393
393
394 capability=value1,value2,value3
394 capability=value1,value2,value3
395
395
396 The values are always a list."""
396 The values are always a list."""
397 caps = {}
397 caps = {}
398 for line in blob.splitlines():
398 for line in blob.splitlines():
399 if not line:
399 if not line:
400 continue
400 continue
401 if '=' not in line:
401 if '=' not in line:
402 key, vals = line, ()
402 key, vals = line, ()
403 else:
403 else:
404 key, vals = line.split('=', 1)
404 key, vals = line.split('=', 1)
405 vals = vals.split(',')
405 vals = vals.split(',')
406 key = urllib.unquote(key)
406 key = urllib.unquote(key)
407 vals = [urllib.unquote(v) for v in vals]
407 vals = [urllib.unquote(v) for v in vals]
408 caps[key] = vals
408 caps[key] = vals
409 return caps
409 return caps
410
410
411 def encodecaps(caps):
411 def encodecaps(caps):
412 """encode a bundle2 caps dictionary into a bytes blob"""
412 """encode a bundle2 caps dictionary into a bytes blob"""
413 chunks = []
413 chunks = []
414 for ca in sorted(caps):
414 for ca in sorted(caps):
415 vals = caps[ca]
415 vals = caps[ca]
416 ca = urllib.quote(ca)
416 ca = urllib.quote(ca)
417 vals = [urllib.quote(v) for v in vals]
417 vals = [urllib.quote(v) for v in vals]
418 if vals:
418 if vals:
419 ca = "%s=%s" % (ca, ','.join(vals))
419 ca = "%s=%s" % (ca, ','.join(vals))
420 chunks.append(ca)
420 chunks.append(ca)
421 return '\n'.join(chunks)
421 return '\n'.join(chunks)
422
422
423 class bundle20(object):
423 class bundle20(object):
424 """represent an outgoing bundle2 container
424 """represent an outgoing bundle2 container
425
425
426 Use the `addparam` method to add stream level parameter. and `newpart` to
426 Use the `addparam` method to add stream level parameter. and `newpart` to
427 populate it. Then call `getchunks` to retrieve all the binary chunks of
427 populate it. Then call `getchunks` to retrieve all the binary chunks of
428 data that compose the bundle2 container."""
428 data that compose the bundle2 container."""
429
429
430 _magicstring = 'HG20'
430 _magicstring = 'HG20'
431
431
432 def __init__(self, ui, capabilities=()):
432 def __init__(self, ui, capabilities=()):
433 self.ui = ui
433 self.ui = ui
434 self._params = []
434 self._params = []
435 self._parts = []
435 self._parts = []
436 self.capabilities = dict(capabilities)
436 self.capabilities = dict(capabilities)
437
437
438 @property
438 @property
439 def nbparts(self):
439 def nbparts(self):
440 """total number of parts added to the bundler"""
440 """total number of parts added to the bundler"""
441 return len(self._parts)
441 return len(self._parts)
442
442
443 # methods used to defines the bundle2 content
443 # methods used to defines the bundle2 content
444 def addparam(self, name, value=None):
444 def addparam(self, name, value=None):
445 """add a stream level parameter"""
445 """add a stream level parameter"""
446 if not name:
446 if not name:
447 raise ValueError('empty parameter name')
447 raise ValueError('empty parameter name')
448 if name[0] not in string.letters:
448 if name[0] not in string.letters:
449 raise ValueError('non letter first character: %r' % name)
449 raise ValueError('non letter first character: %r' % name)
450 self._params.append((name, value))
450 self._params.append((name, value))
451
451
452 def addpart(self, part):
452 def addpart(self, part):
453 """add a new part to the bundle2 container
453 """add a new part to the bundle2 container
454
454
455 Parts contains the actual applicative payload."""
455 Parts contains the actual applicative payload."""
456 assert part.id is None
456 assert part.id is None
457 part.id = len(self._parts) # very cheap counter
457 part.id = len(self._parts) # very cheap counter
458 self._parts.append(part)
458 self._parts.append(part)
459
459
460 def newpart(self, typeid, *args, **kwargs):
460 def newpart(self, typeid, *args, **kwargs):
461 """create a new part and add it to the containers
461 """create a new part and add it to the containers
462
462
463 As the part is directly added to the containers. For now, this means
463 As the part is directly added to the containers. For now, this means
464 that any failure to properly initialize the part after calling
464 that any failure to properly initialize the part after calling
465 ``newpart`` should result in a failure of the whole bundling process.
465 ``newpart`` should result in a failure of the whole bundling process.
466
466
467 You can still fall back to manually create and add if you need better
467 You can still fall back to manually create and add if you need better
468 control."""
468 control."""
469 part = bundlepart(typeid, *args, **kwargs)
469 part = bundlepart(typeid, *args, **kwargs)
470 self.addpart(part)
470 self.addpart(part)
471 return part
471 return part
472
472
473 # methods used to generate the bundle2 stream
473 # methods used to generate the bundle2 stream
474 def getchunks(self):
474 def getchunks(self):
475 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
475 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
476 yield self._magicstring
476 yield self._magicstring
477 param = self._paramchunk()
477 param = self._paramchunk()
478 outdebug(self.ui, 'bundle parameter: %s' % param)
478 outdebug(self.ui, 'bundle parameter: %s' % param)
479 yield _pack(_fstreamparamsize, len(param))
479 yield _pack(_fstreamparamsize, len(param))
480 if param:
480 if param:
481 yield param
481 yield param
482
482
483 outdebug(self.ui, 'start of parts')
483 outdebug(self.ui, 'start of parts')
484 for part in self._parts:
484 for part in self._parts:
485 outdebug(self.ui, 'bundle part: "%s"' % part.type)
485 outdebug(self.ui, 'bundle part: "%s"' % part.type)
486 for chunk in part.getchunks():
486 for chunk in part.getchunks():
487 yield chunk
487 yield chunk
488 outdebug(self.ui, 'end of bundle')
488 outdebug(self.ui, 'end of bundle')
489 yield _pack(_fpartheadersize, 0)
489 yield _pack(_fpartheadersize, 0)
490
490
491 def _paramchunk(self):
491 def _paramchunk(self):
492 """return a encoded version of all stream parameters"""
492 """return a encoded version of all stream parameters"""
493 blocks = []
493 blocks = []
494 for par, value in self._params:
494 for par, value in self._params:
495 par = urllib.quote(par)
495 par = urllib.quote(par)
496 if value is not None:
496 if value is not None:
497 value = urllib.quote(value)
497 value = urllib.quote(value)
498 par = '%s=%s' % (par, value)
498 par = '%s=%s' % (par, value)
499 blocks.append(par)
499 blocks.append(par)
500 return ' '.join(blocks)
500 return ' '.join(blocks)
501
501
502 def salvageoutput(self):
502 def salvageoutput(self):
503 """return a list with a copy of all output parts in the bundle
503 """return a list with a copy of all output parts in the bundle
504
504
505 This is meant to be used during error handling to make sure we preserve
505 This is meant to be used during error handling to make sure we preserve
506 server output"""
506 server output"""
507 salvaged = []
507 salvaged = []
508 for part in self._parts:
508 for part in self._parts:
509 if part.type.startswith('output'):
509 if part.type.startswith('output'):
510 salvaged.append(part.copy())
510 salvaged.append(part.copy())
511 return salvaged
511 return salvaged
512
512
513
513
514 class unpackermixin(object):
514 class unpackermixin(object):
515 """A mixin to extract bytes and struct data from a stream"""
515 """A mixin to extract bytes and struct data from a stream"""
516
516
517 def __init__(self, fp):
517 def __init__(self, fp):
518 self._fp = fp
518 self._fp = fp
519 self._seekable = (util.safehasattr(fp, 'seek') and
519 self._seekable = (util.safehasattr(fp, 'seek') and
520 util.safehasattr(fp, 'tell'))
520 util.safehasattr(fp, 'tell'))
521
521
522 def _unpack(self, format):
522 def _unpack(self, format):
523 """unpack this struct format from the stream"""
523 """unpack this struct format from the stream"""
524 data = self._readexact(struct.calcsize(format))
524 data = self._readexact(struct.calcsize(format))
525 return _unpack(format, data)
525 return _unpack(format, data)
526
526
527 def _readexact(self, size):
527 def _readexact(self, size):
528 """read exactly <size> bytes from the stream"""
528 """read exactly <size> bytes from the stream"""
529 return changegroup.readexactly(self._fp, size)
529 return changegroup.readexactly(self._fp, size)
530
530
531 def seek(self, offset, whence=0):
531 def seek(self, offset, whence=0):
532 """move the underlying file pointer"""
532 """move the underlying file pointer"""
533 if self._seekable:
533 if self._seekable:
534 return self._fp.seek(offset, whence)
534 return self._fp.seek(offset, whence)
535 else:
535 else:
536 raise NotImplementedError(_('File pointer is not seekable'))
536 raise NotImplementedError(_('File pointer is not seekable'))
537
537
538 def tell(self):
538 def tell(self):
539 """return the file offset, or None if file is not seekable"""
539 """return the file offset, or None if file is not seekable"""
540 if self._seekable:
540 if self._seekable:
541 try:
541 try:
542 return self._fp.tell()
542 return self._fp.tell()
543 except IOError, e:
543 except IOError, e:
544 if e.errno == errno.ESPIPE:
544 if e.errno == errno.ESPIPE:
545 self._seekable = False
545 self._seekable = False
546 else:
546 else:
547 raise
547 raise
548 return None
548 return None
549
549
550 def close(self):
550 def close(self):
551 """close underlying file"""
551 """close underlying file"""
552 if util.safehasattr(self._fp, 'close'):
552 if util.safehasattr(self._fp, 'close'):
553 return self._fp.close()
553 return self._fp.close()
554
554
555 def getunbundler(ui, fp, header=None):
555 def getunbundler(ui, fp, header=None):
556 """return a valid unbundler object for a given header"""
556 """return a valid unbundler object for a given header"""
557 if header is None:
557 if header is None:
558 header = changegroup.readexactly(fp, 4)
558 header = changegroup.readexactly(fp, 4)
559 magic, version = header[0:2], header[2:4]
559 magic, version = header[0:2], header[2:4]
560 if magic != 'HG':
560 if magic != 'HG':
561 raise util.Abort(_('not a Mercurial bundle'))
561 raise util.Abort(_('not a Mercurial bundle'))
562 unbundlerclass = formatmap.get(version)
562 unbundlerclass = formatmap.get(version)
563 if unbundlerclass is None:
563 if unbundlerclass is None:
564 raise util.Abort(_('unknown bundle version %s') % version)
564 raise util.Abort(_('unknown bundle version %s') % version)
565 unbundler = unbundlerclass(ui, fp)
565 unbundler = unbundlerclass(ui, fp)
566 indebug(ui, 'start processing of %s stream\n' % header)
566 indebug(ui, 'start processing of %s stream' % header)
567 return unbundler
567 return unbundler
568
568
569 class unbundle20(unpackermixin):
569 class unbundle20(unpackermixin):
570 """interpret a bundle2 stream
570 """interpret a bundle2 stream
571
571
572 This class is fed with a binary stream and yields parts through its
572 This class is fed with a binary stream and yields parts through its
573 `iterparts` methods."""
573 `iterparts` methods."""
574
574
575 def __init__(self, ui, fp):
575 def __init__(self, ui, fp):
576 """If header is specified, we do not read it out of the stream."""
576 """If header is specified, we do not read it out of the stream."""
577 self.ui = ui
577 self.ui = ui
578 super(unbundle20, self).__init__(fp)
578 super(unbundle20, self).__init__(fp)
579
579
580 @util.propertycache
580 @util.propertycache
581 def params(self):
581 def params(self):
582 """dictionary of stream level parameters"""
582 """dictionary of stream level parameters"""
583 indebug(self.ui, 'reading bundle2 stream parameters\n')
583 indebug(self.ui, 'reading bundle2 stream parameters')
584 params = {}
584 params = {}
585 paramssize = self._unpack(_fstreamparamsize)[0]
585 paramssize = self._unpack(_fstreamparamsize)[0]
586 if paramssize < 0:
586 if paramssize < 0:
587 raise error.BundleValueError('negative bundle param size: %i'
587 raise error.BundleValueError('negative bundle param size: %i'
588 % paramssize)
588 % paramssize)
589 if paramssize:
589 if paramssize:
590 for p in self._readexact(paramssize).split(' '):
590 for p in self._readexact(paramssize).split(' '):
591 p = p.split('=', 1)
591 p = p.split('=', 1)
592 p = [urllib.unquote(i) for i in p]
592 p = [urllib.unquote(i) for i in p]
593 if len(p) < 2:
593 if len(p) < 2:
594 p.append(None)
594 p.append(None)
595 self._processparam(*p)
595 self._processparam(*p)
596 params[p[0]] = p[1]
596 params[p[0]] = p[1]
597 return params
597 return params
598
598
599 def _processparam(self, name, value):
599 def _processparam(self, name, value):
600 """process a parameter, applying its effect if needed
600 """process a parameter, applying its effect if needed
601
601
602 Parameter starting with a lower case letter are advisory and will be
602 Parameter starting with a lower case letter are advisory and will be
603 ignored when unknown. Those starting with an upper case letter are
603 ignored when unknown. Those starting with an upper case letter are
604 mandatory and will this function will raise a KeyError when unknown.
604 mandatory and will this function will raise a KeyError when unknown.
605
605
606 Note: no option are currently supported. Any input will be either
606 Note: no option are currently supported. Any input will be either
607 ignored or failing.
607 ignored or failing.
608 """
608 """
609 if not name:
609 if not name:
610 raise ValueError('empty parameter name')
610 raise ValueError('empty parameter name')
611 if name[0] not in string.letters:
611 if name[0] not in string.letters:
612 raise ValueError('non letter first character: %r' % name)
612 raise ValueError('non letter first character: %r' % name)
613 # Some logic will be later added here to try to process the option for
613 # Some logic will be later added here to try to process the option for
614 # a dict of known parameter.
614 # a dict of known parameter.
615 if name[0].islower():
615 if name[0].islower():
616 indebug(self.ui, "ignoring unknown parameter %r\n" % name)
616 indebug(self.ui, "ignoring unknown parameter %r" % name)
617 else:
617 else:
618 raise error.UnsupportedPartError(params=(name,))
618 raise error.UnsupportedPartError(params=(name,))
619
619
620
620
621 def iterparts(self):
621 def iterparts(self):
622 """yield all parts contained in the stream"""
622 """yield all parts contained in the stream"""
623 # make sure param have been loaded
623 # make sure param have been loaded
624 self.params
624 self.params
625 indebug(self.ui, 'start extraction of bundle2 parts\n')
625 indebug(self.ui, 'start extraction of bundle2 parts')
626 headerblock = self._readpartheader()
626 headerblock = self._readpartheader()
627 while headerblock is not None:
627 while headerblock is not None:
628 part = unbundlepart(self.ui, headerblock, self._fp)
628 part = unbundlepart(self.ui, headerblock, self._fp)
629 yield part
629 yield part
630 part.seek(0, 2)
630 part.seek(0, 2)
631 headerblock = self._readpartheader()
631 headerblock = self._readpartheader()
632 indebug(self.ui, 'end of bundle2 stream\n')
632 indebug(self.ui, 'end of bundle2 stream')
633
633
634 def _readpartheader(self):
634 def _readpartheader(self):
635 """reads a part header size and return the bytes blob
635 """reads a part header size and return the bytes blob
636
636
637 returns None if empty"""
637 returns None if empty"""
638 headersize = self._unpack(_fpartheadersize)[0]
638 headersize = self._unpack(_fpartheadersize)[0]
639 if headersize < 0:
639 if headersize < 0:
640 raise error.BundleValueError('negative part header size: %i'
640 raise error.BundleValueError('negative part header size: %i'
641 % headersize)
641 % headersize)
642 indebug(self.ui, 'part header size: %i\n' % headersize)
642 indebug(self.ui, 'part header size: %i' % headersize)
643 if headersize:
643 if headersize:
644 return self._readexact(headersize)
644 return self._readexact(headersize)
645 return None
645 return None
646
646
647 def compressed(self):
647 def compressed(self):
648 return False
648 return False
649
649
650 formatmap = {'20': unbundle20}
650 formatmap = {'20': unbundle20}
651
651
652 class bundlepart(object):
652 class bundlepart(object):
653 """A bundle2 part contains application level payload
653 """A bundle2 part contains application level payload
654
654
655 The part `type` is used to route the part to the application level
655 The part `type` is used to route the part to the application level
656 handler.
656 handler.
657
657
658 The part payload is contained in ``part.data``. It could be raw bytes or a
658 The part payload is contained in ``part.data``. It could be raw bytes or a
659 generator of byte chunks.
659 generator of byte chunks.
660
660
661 You can add parameters to the part using the ``addparam`` method.
661 You can add parameters to the part using the ``addparam`` method.
662 Parameters can be either mandatory (default) or advisory. Remote side
662 Parameters can be either mandatory (default) or advisory. Remote side
663 should be able to safely ignore the advisory ones.
663 should be able to safely ignore the advisory ones.
664
664
665 Both data and parameters cannot be modified after the generation has begun.
665 Both data and parameters cannot be modified after the generation has begun.
666 """
666 """
667
667
668 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
668 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
669 data='', mandatory=True):
669 data='', mandatory=True):
670 validateparttype(parttype)
670 validateparttype(parttype)
671 self.id = None
671 self.id = None
672 self.type = parttype
672 self.type = parttype
673 self._data = data
673 self._data = data
674 self._mandatoryparams = list(mandatoryparams)
674 self._mandatoryparams = list(mandatoryparams)
675 self._advisoryparams = list(advisoryparams)
675 self._advisoryparams = list(advisoryparams)
676 # checking for duplicated entries
676 # checking for duplicated entries
677 self._seenparams = set()
677 self._seenparams = set()
678 for pname, __ in self._mandatoryparams + self._advisoryparams:
678 for pname, __ in self._mandatoryparams + self._advisoryparams:
679 if pname in self._seenparams:
679 if pname in self._seenparams:
680 raise RuntimeError('duplicated params: %s' % pname)
680 raise RuntimeError('duplicated params: %s' % pname)
681 self._seenparams.add(pname)
681 self._seenparams.add(pname)
682 # status of the part's generation:
682 # status of the part's generation:
683 # - None: not started,
683 # - None: not started,
684 # - False: currently generated,
684 # - False: currently generated,
685 # - True: generation done.
685 # - True: generation done.
686 self._generated = None
686 self._generated = None
687 self.mandatory = mandatory
687 self.mandatory = mandatory
688
688
689 def copy(self):
689 def copy(self):
690 """return a copy of the part
690 """return a copy of the part
691
691
692 The new part have the very same content but no partid assigned yet.
692 The new part have the very same content but no partid assigned yet.
693 Parts with generated data cannot be copied."""
693 Parts with generated data cannot be copied."""
694 assert not util.safehasattr(self.data, 'next')
694 assert not util.safehasattr(self.data, 'next')
695 return self.__class__(self.type, self._mandatoryparams,
695 return self.__class__(self.type, self._mandatoryparams,
696 self._advisoryparams, self._data, self.mandatory)
696 self._advisoryparams, self._data, self.mandatory)
697
697
698 # methods used to defines the part content
698 # methods used to defines the part content
699 def __setdata(self, data):
699 def __setdata(self, data):
700 if self._generated is not None:
700 if self._generated is not None:
701 raise error.ReadOnlyPartError('part is being generated')
701 raise error.ReadOnlyPartError('part is being generated')
702 self._data = data
702 self._data = data
703 def __getdata(self):
703 def __getdata(self):
704 return self._data
704 return self._data
705 data = property(__getdata, __setdata)
705 data = property(__getdata, __setdata)
706
706
707 @property
707 @property
708 def mandatoryparams(self):
708 def mandatoryparams(self):
709 # make it an immutable tuple to force people through ``addparam``
709 # make it an immutable tuple to force people through ``addparam``
710 return tuple(self._mandatoryparams)
710 return tuple(self._mandatoryparams)
711
711
712 @property
712 @property
713 def advisoryparams(self):
713 def advisoryparams(self):
714 # make it an immutable tuple to force people through ``addparam``
714 # make it an immutable tuple to force people through ``addparam``
715 return tuple(self._advisoryparams)
715 return tuple(self._advisoryparams)
716
716
717 def addparam(self, name, value='', mandatory=True):
717 def addparam(self, name, value='', mandatory=True):
718 if self._generated is not None:
718 if self._generated is not None:
719 raise error.ReadOnlyPartError('part is being generated')
719 raise error.ReadOnlyPartError('part is being generated')
720 if name in self._seenparams:
720 if name in self._seenparams:
721 raise ValueError('duplicated params: %s' % name)
721 raise ValueError('duplicated params: %s' % name)
722 self._seenparams.add(name)
722 self._seenparams.add(name)
723 params = self._advisoryparams
723 params = self._advisoryparams
724 if mandatory:
724 if mandatory:
725 params = self._mandatoryparams
725 params = self._mandatoryparams
726 params.append((name, value))
726 params.append((name, value))
727
727
728 # methods used to generates the bundle2 stream
728 # methods used to generates the bundle2 stream
729 def getchunks(self):
729 def getchunks(self):
730 if self._generated is not None:
730 if self._generated is not None:
731 raise RuntimeError('part can only be consumed once')
731 raise RuntimeError('part can only be consumed once')
732 self._generated = False
732 self._generated = False
733 #### header
733 #### header
734 if self.mandatory:
734 if self.mandatory:
735 parttype = self.type.upper()
735 parttype = self.type.upper()
736 else:
736 else:
737 parttype = self.type.lower()
737 parttype = self.type.lower()
738 ## parttype
738 ## parttype
739 header = [_pack(_fparttypesize, len(parttype)),
739 header = [_pack(_fparttypesize, len(parttype)),
740 parttype, _pack(_fpartid, self.id),
740 parttype, _pack(_fpartid, self.id),
741 ]
741 ]
742 ## parameters
742 ## parameters
743 # count
743 # count
744 manpar = self.mandatoryparams
744 manpar = self.mandatoryparams
745 advpar = self.advisoryparams
745 advpar = self.advisoryparams
746 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
746 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
747 # size
747 # size
748 parsizes = []
748 parsizes = []
749 for key, value in manpar:
749 for key, value in manpar:
750 parsizes.append(len(key))
750 parsizes.append(len(key))
751 parsizes.append(len(value))
751 parsizes.append(len(value))
752 for key, value in advpar:
752 for key, value in advpar:
753 parsizes.append(len(key))
753 parsizes.append(len(key))
754 parsizes.append(len(value))
754 parsizes.append(len(value))
755 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
755 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
756 header.append(paramsizes)
756 header.append(paramsizes)
757 # key, value
757 # key, value
758 for key, value in manpar:
758 for key, value in manpar:
759 header.append(key)
759 header.append(key)
760 header.append(value)
760 header.append(value)
761 for key, value in advpar:
761 for key, value in advpar:
762 header.append(key)
762 header.append(key)
763 header.append(value)
763 header.append(value)
764 ## finalize header
764 ## finalize header
765 headerchunk = ''.join(header)
765 headerchunk = ''.join(header)
766 yield _pack(_fpartheadersize, len(headerchunk))
766 yield _pack(_fpartheadersize, len(headerchunk))
767 yield headerchunk
767 yield headerchunk
768 ## payload
768 ## payload
769 try:
769 try:
770 for chunk in self._payloadchunks():
770 for chunk in self._payloadchunks():
771 yield _pack(_fpayloadsize, len(chunk))
771 yield _pack(_fpayloadsize, len(chunk))
772 yield chunk
772 yield chunk
773 except BaseException, exc:
773 except BaseException, exc:
774 # backup exception data for later
774 # backup exception data for later
775 exc_info = sys.exc_info()
775 exc_info = sys.exc_info()
776 msg = 'unexpected error: %s' % exc
776 msg = 'unexpected error: %s' % exc
777 interpart = bundlepart('error:abort', [('message', msg)],
777 interpart = bundlepart('error:abort', [('message', msg)],
778 mandatory=False)
778 mandatory=False)
779 interpart.id = 0
779 interpart.id = 0
780 yield _pack(_fpayloadsize, -1)
780 yield _pack(_fpayloadsize, -1)
781 for chunk in interpart.getchunks():
781 for chunk in interpart.getchunks():
782 yield chunk
782 yield chunk
783 # abort current part payload
783 # abort current part payload
784 yield _pack(_fpayloadsize, 0)
784 yield _pack(_fpayloadsize, 0)
785 raise exc_info[0], exc_info[1], exc_info[2]
785 raise exc_info[0], exc_info[1], exc_info[2]
786 # end of payload
786 # end of payload
787 yield _pack(_fpayloadsize, 0)
787 yield _pack(_fpayloadsize, 0)
788 self._generated = True
788 self._generated = True
789
789
790 def _payloadchunks(self):
790 def _payloadchunks(self):
791 """yield chunks of a the part payload
791 """yield chunks of a the part payload
792
792
793 Exists to handle the different methods to provide data to a part."""
793 Exists to handle the different methods to provide data to a part."""
794 # we only support fixed size data now.
794 # we only support fixed size data now.
795 # This will be improved in the future.
795 # This will be improved in the future.
796 if util.safehasattr(self.data, 'next'):
796 if util.safehasattr(self.data, 'next'):
797 buff = util.chunkbuffer(self.data)
797 buff = util.chunkbuffer(self.data)
798 chunk = buff.read(preferedchunksize)
798 chunk = buff.read(preferedchunksize)
799 while chunk:
799 while chunk:
800 yield chunk
800 yield chunk
801 chunk = buff.read(preferedchunksize)
801 chunk = buff.read(preferedchunksize)
802 elif len(self.data):
802 elif len(self.data):
803 yield self.data
803 yield self.data
804
804
805
805
806 flaginterrupt = -1
806 flaginterrupt = -1
807
807
808 class interrupthandler(unpackermixin):
808 class interrupthandler(unpackermixin):
809 """read one part and process it with restricted capability
809 """read one part and process it with restricted capability
810
810
811 This allows to transmit exception raised on the producer size during part
811 This allows to transmit exception raised on the producer size during part
812 iteration while the consumer is reading a part.
812 iteration while the consumer is reading a part.
813
813
814 Part processed in this manner only have access to a ui object,"""
814 Part processed in this manner only have access to a ui object,"""
815
815
816 def __init__(self, ui, fp):
816 def __init__(self, ui, fp):
817 super(interrupthandler, self).__init__(fp)
817 super(interrupthandler, self).__init__(fp)
818 self.ui = ui
818 self.ui = ui
819
819
820 def _readpartheader(self):
820 def _readpartheader(self):
821 """reads a part header size and return the bytes blob
821 """reads a part header size and return the bytes blob
822
822
823 returns None if empty"""
823 returns None if empty"""
824 headersize = self._unpack(_fpartheadersize)[0]
824 headersize = self._unpack(_fpartheadersize)[0]
825 if headersize < 0:
825 if headersize < 0:
826 raise error.BundleValueError('negative part header size: %i'
826 raise error.BundleValueError('negative part header size: %i'
827 % headersize)
827 % headersize)
828 indebug(self.ui, 'part header size: %i\n' % headersize)
828 indebug(self.ui, 'part header size: %i\n' % headersize)
829 if headersize:
829 if headersize:
830 return self._readexact(headersize)
830 return self._readexact(headersize)
831 return None
831 return None
832
832
833 def __call__(self):
833 def __call__(self):
834 indebug(self.ui, 'bundle2 stream interruption, looking for a part.\n')
834 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
835 headerblock = self._readpartheader()
835 headerblock = self._readpartheader()
836 if headerblock is None:
836 if headerblock is None:
837 indebug(self.ui, 'no part found during interruption.\n')
837 indebug(self.ui, 'no part found during interruption.')
838 return
838 return
839 part = unbundlepart(self.ui, headerblock, self._fp)
839 part = unbundlepart(self.ui, headerblock, self._fp)
840 op = interruptoperation(self.ui)
840 op = interruptoperation(self.ui)
841 _processpart(op, part)
841 _processpart(op, part)
842
842
843 class interruptoperation(object):
843 class interruptoperation(object):
844 """A limited operation to be use by part handler during interruption
844 """A limited operation to be use by part handler during interruption
845
845
846 It only have access to an ui object.
846 It only have access to an ui object.
847 """
847 """
848
848
849 def __init__(self, ui):
849 def __init__(self, ui):
850 self.ui = ui
850 self.ui = ui
851 self.reply = None
851 self.reply = None
852 self.captureoutput = False
852 self.captureoutput = False
853
853
854 @property
854 @property
855 def repo(self):
855 def repo(self):
856 raise RuntimeError('no repo access from stream interruption')
856 raise RuntimeError('no repo access from stream interruption')
857
857
858 def gettransaction(self):
858 def gettransaction(self):
859 raise TransactionUnavailable('no repo access from stream interruption')
859 raise TransactionUnavailable('no repo access from stream interruption')
860
860
861 class unbundlepart(unpackermixin):
861 class unbundlepart(unpackermixin):
862 """a bundle part read from a bundle"""
862 """a bundle part read from a bundle"""
863
863
864 def __init__(self, ui, header, fp):
864 def __init__(self, ui, header, fp):
865 super(unbundlepart, self).__init__(fp)
865 super(unbundlepart, self).__init__(fp)
866 self.ui = ui
866 self.ui = ui
867 # unbundle state attr
867 # unbundle state attr
868 self._headerdata = header
868 self._headerdata = header
869 self._headeroffset = 0
869 self._headeroffset = 0
870 self._initialized = False
870 self._initialized = False
871 self.consumed = False
871 self.consumed = False
872 # part data
872 # part data
873 self.id = None
873 self.id = None
874 self.type = None
874 self.type = None
875 self.mandatoryparams = None
875 self.mandatoryparams = None
876 self.advisoryparams = None
876 self.advisoryparams = None
877 self.params = None
877 self.params = None
878 self.mandatorykeys = ()
878 self.mandatorykeys = ()
879 self._payloadstream = None
879 self._payloadstream = None
880 self._readheader()
880 self._readheader()
881 self._mandatory = None
881 self._mandatory = None
882 self._chunkindex = [] #(payload, file) position tuples for chunk starts
882 self._chunkindex = [] #(payload, file) position tuples for chunk starts
883 self._pos = 0
883 self._pos = 0
884
884
885 def _fromheader(self, size):
885 def _fromheader(self, size):
886 """return the next <size> byte from the header"""
886 """return the next <size> byte from the header"""
887 offset = self._headeroffset
887 offset = self._headeroffset
888 data = self._headerdata[offset:(offset + size)]
888 data = self._headerdata[offset:(offset + size)]
889 self._headeroffset = offset + size
889 self._headeroffset = offset + size
890 return data
890 return data
891
891
892 def _unpackheader(self, format):
892 def _unpackheader(self, format):
893 """read given format from header
893 """read given format from header
894
894
895 This automatically compute the size of the format to read."""
895 This automatically compute the size of the format to read."""
896 data = self._fromheader(struct.calcsize(format))
896 data = self._fromheader(struct.calcsize(format))
897 return _unpack(format, data)
897 return _unpack(format, data)
898
898
899 def _initparams(self, mandatoryparams, advisoryparams):
899 def _initparams(self, mandatoryparams, advisoryparams):
900 """internal function to setup all logic related parameters"""
900 """internal function to setup all logic related parameters"""
901 # make it read only to prevent people touching it by mistake.
901 # make it read only to prevent people touching it by mistake.
902 self.mandatoryparams = tuple(mandatoryparams)
902 self.mandatoryparams = tuple(mandatoryparams)
903 self.advisoryparams = tuple(advisoryparams)
903 self.advisoryparams = tuple(advisoryparams)
904 # user friendly UI
904 # user friendly UI
905 self.params = dict(self.mandatoryparams)
905 self.params = dict(self.mandatoryparams)
906 self.params.update(dict(self.advisoryparams))
906 self.params.update(dict(self.advisoryparams))
907 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
907 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
908
908
909 def _payloadchunks(self, chunknum=0):
909 def _payloadchunks(self, chunknum=0):
910 '''seek to specified chunk and start yielding data'''
910 '''seek to specified chunk and start yielding data'''
911 if len(self._chunkindex) == 0:
911 if len(self._chunkindex) == 0:
912 assert chunknum == 0, 'Must start with chunk 0'
912 assert chunknum == 0, 'Must start with chunk 0'
913 self._chunkindex.append((0, super(unbundlepart, self).tell()))
913 self._chunkindex.append((0, super(unbundlepart, self).tell()))
914 else:
914 else:
915 assert chunknum < len(self._chunkindex), \
915 assert chunknum < len(self._chunkindex), \
916 'Unknown chunk %d' % chunknum
916 'Unknown chunk %d' % chunknum
917 super(unbundlepart, self).seek(self._chunkindex[chunknum][1])
917 super(unbundlepart, self).seek(self._chunkindex[chunknum][1])
918
918
919 pos = self._chunkindex[chunknum][0]
919 pos = self._chunkindex[chunknum][0]
920 payloadsize = self._unpack(_fpayloadsize)[0]
920 payloadsize = self._unpack(_fpayloadsize)[0]
921 indebug(self.ui, 'payload chunk size: %i\n' % payloadsize)
921 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
922 while payloadsize:
922 while payloadsize:
923 if payloadsize == flaginterrupt:
923 if payloadsize == flaginterrupt:
924 # interruption detection, the handler will now read a
924 # interruption detection, the handler will now read a
925 # single part and process it.
925 # single part and process it.
926 interrupthandler(self.ui, self._fp)()
926 interrupthandler(self.ui, self._fp)()
927 elif payloadsize < 0:
927 elif payloadsize < 0:
928 msg = 'negative payload chunk size: %i' % payloadsize
928 msg = 'negative payload chunk size: %i' % payloadsize
929 raise error.BundleValueError(msg)
929 raise error.BundleValueError(msg)
930 else:
930 else:
931 result = self._readexact(payloadsize)
931 result = self._readexact(payloadsize)
932 chunknum += 1
932 chunknum += 1
933 pos += payloadsize
933 pos += payloadsize
934 if chunknum == len(self._chunkindex):
934 if chunknum == len(self._chunkindex):
935 self._chunkindex.append((pos,
935 self._chunkindex.append((pos,
936 super(unbundlepart, self).tell()))
936 super(unbundlepart, self).tell()))
937 yield result
937 yield result
938 payloadsize = self._unpack(_fpayloadsize)[0]
938 payloadsize = self._unpack(_fpayloadsize)[0]
939 indebug(self.ui, 'payload chunk size: %i\n' % payloadsize)
939 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
940
940
941 def _findchunk(self, pos):
941 def _findchunk(self, pos):
942 '''for a given payload position, return a chunk number and offset'''
942 '''for a given payload position, return a chunk number and offset'''
943 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
943 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
944 if ppos == pos:
944 if ppos == pos:
945 return chunk, 0
945 return chunk, 0
946 elif ppos > pos:
946 elif ppos > pos:
947 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
947 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
948 raise ValueError('Unknown chunk')
948 raise ValueError('Unknown chunk')
949
949
950 def _readheader(self):
950 def _readheader(self):
951 """read the header and setup the object"""
951 """read the header and setup the object"""
952 typesize = self._unpackheader(_fparttypesize)[0]
952 typesize = self._unpackheader(_fparttypesize)[0]
953 self.type = self._fromheader(typesize)
953 self.type = self._fromheader(typesize)
954 indebug(self.ui, 'part type: "%s"\n' % self.type)
954 indebug(self.ui, 'part type: "%s"' % self.type)
955 self.id = self._unpackheader(_fpartid)[0]
955 self.id = self._unpackheader(_fpartid)[0]
956 indebug(self.ui, 'part id: "%s"\n' % self.id)
956 indebug(self.ui, 'part id: "%s"' % self.id)
957 # extract mandatory bit from type
957 # extract mandatory bit from type
958 self.mandatory = (self.type != self.type.lower())
958 self.mandatory = (self.type != self.type.lower())
959 self.type = self.type.lower()
959 self.type = self.type.lower()
960 ## reading parameters
960 ## reading parameters
961 # param count
961 # param count
962 mancount, advcount = self._unpackheader(_fpartparamcount)
962 mancount, advcount = self._unpackheader(_fpartparamcount)
963 indebug(self.ui, 'part parameters: %i\n' % (mancount + advcount))
963 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
964 # param size
964 # param size
965 fparamsizes = _makefpartparamsizes(mancount + advcount)
965 fparamsizes = _makefpartparamsizes(mancount + advcount)
966 paramsizes = self._unpackheader(fparamsizes)
966 paramsizes = self._unpackheader(fparamsizes)
967 # make it a list of couple again
967 # make it a list of couple again
968 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
968 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
969 # split mandatory from advisory
969 # split mandatory from advisory
970 mansizes = paramsizes[:mancount]
970 mansizes = paramsizes[:mancount]
971 advsizes = paramsizes[mancount:]
971 advsizes = paramsizes[mancount:]
972 # retrieve param value
972 # retrieve param value
973 manparams = []
973 manparams = []
974 for key, value in mansizes:
974 for key, value in mansizes:
975 manparams.append((self._fromheader(key), self._fromheader(value)))
975 manparams.append((self._fromheader(key), self._fromheader(value)))
976 advparams = []
976 advparams = []
977 for key, value in advsizes:
977 for key, value in advsizes:
978 advparams.append((self._fromheader(key), self._fromheader(value)))
978 advparams.append((self._fromheader(key), self._fromheader(value)))
979 self._initparams(manparams, advparams)
979 self._initparams(manparams, advparams)
980 ## part payload
980 ## part payload
981 self._payloadstream = util.chunkbuffer(self._payloadchunks())
981 self._payloadstream = util.chunkbuffer(self._payloadchunks())
982 # we read the data, tell it
982 # we read the data, tell it
983 self._initialized = True
983 self._initialized = True
984
984
985 def read(self, size=None):
985 def read(self, size=None):
986 """read payload data"""
986 """read payload data"""
987 if not self._initialized:
987 if not self._initialized:
988 self._readheader()
988 self._readheader()
989 if size is None:
989 if size is None:
990 data = self._payloadstream.read()
990 data = self._payloadstream.read()
991 else:
991 else:
992 data = self._payloadstream.read(size)
992 data = self._payloadstream.read(size)
993 if size is None or len(data) < size:
993 if size is None or len(data) < size:
994 self.consumed = True
994 self.consumed = True
995 self._pos += len(data)
995 self._pos += len(data)
996 return data
996 return data
997
997
998 def tell(self):
998 def tell(self):
999 return self._pos
999 return self._pos
1000
1000
1001 def seek(self, offset, whence=0):
1001 def seek(self, offset, whence=0):
1002 if whence == 0:
1002 if whence == 0:
1003 newpos = offset
1003 newpos = offset
1004 elif whence == 1:
1004 elif whence == 1:
1005 newpos = self._pos + offset
1005 newpos = self._pos + offset
1006 elif whence == 2:
1006 elif whence == 2:
1007 if not self.consumed:
1007 if not self.consumed:
1008 self.read()
1008 self.read()
1009 newpos = self._chunkindex[-1][0] - offset
1009 newpos = self._chunkindex[-1][0] - offset
1010 else:
1010 else:
1011 raise ValueError('Unknown whence value: %r' % (whence,))
1011 raise ValueError('Unknown whence value: %r' % (whence,))
1012
1012
1013 if newpos > self._chunkindex[-1][0] and not self.consumed:
1013 if newpos > self._chunkindex[-1][0] and not self.consumed:
1014 self.read()
1014 self.read()
1015 if not 0 <= newpos <= self._chunkindex[-1][0]:
1015 if not 0 <= newpos <= self._chunkindex[-1][0]:
1016 raise ValueError('Offset out of range')
1016 raise ValueError('Offset out of range')
1017
1017
1018 if self._pos != newpos:
1018 if self._pos != newpos:
1019 chunk, internaloffset = self._findchunk(newpos)
1019 chunk, internaloffset = self._findchunk(newpos)
1020 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1020 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1021 adjust = self.read(internaloffset)
1021 adjust = self.read(internaloffset)
1022 if len(adjust) != internaloffset:
1022 if len(adjust) != internaloffset:
1023 raise util.Abort(_('Seek failed\n'))
1023 raise util.Abort(_('Seek failed\n'))
1024 self._pos = newpos
1024 self._pos = newpos
1025
1025
1026 # These are only the static capabilities.
1026 # These are only the static capabilities.
1027 # Check the 'getrepocaps' function for the rest.
1027 # Check the 'getrepocaps' function for the rest.
1028 capabilities = {'HG20': (),
1028 capabilities = {'HG20': (),
1029 'listkeys': (),
1029 'listkeys': (),
1030 'pushkey': (),
1030 'pushkey': (),
1031 'digests': tuple(sorted(util.DIGESTS.keys())),
1031 'digests': tuple(sorted(util.DIGESTS.keys())),
1032 'remote-changegroup': ('http', 'https'),
1032 'remote-changegroup': ('http', 'https'),
1033 }
1033 }
1034
1034
1035 def getrepocaps(repo, allowpushback=False):
1035 def getrepocaps(repo, allowpushback=False):
1036 """return the bundle2 capabilities for a given repo
1036 """return the bundle2 capabilities for a given repo
1037
1037
1038 Exists to allow extensions (like evolution) to mutate the capabilities.
1038 Exists to allow extensions (like evolution) to mutate the capabilities.
1039 """
1039 """
1040 caps = capabilities.copy()
1040 caps = capabilities.copy()
1041 caps['changegroup'] = tuple(sorted(changegroup.packermap.keys()))
1041 caps['changegroup'] = tuple(sorted(changegroup.packermap.keys()))
1042 if obsolete.isenabled(repo, obsolete.exchangeopt):
1042 if obsolete.isenabled(repo, obsolete.exchangeopt):
1043 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1043 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1044 caps['obsmarkers'] = supportedformat
1044 caps['obsmarkers'] = supportedformat
1045 if allowpushback:
1045 if allowpushback:
1046 caps['pushback'] = ()
1046 caps['pushback'] = ()
1047 return caps
1047 return caps
1048
1048
1049 def bundle2caps(remote):
1049 def bundle2caps(remote):
1050 """return the bundle capabilities of a peer as dict"""
1050 """return the bundle capabilities of a peer as dict"""
1051 raw = remote.capable('bundle2')
1051 raw = remote.capable('bundle2')
1052 if not raw and raw != '':
1052 if not raw and raw != '':
1053 return {}
1053 return {}
1054 capsblob = urllib.unquote(remote.capable('bundle2'))
1054 capsblob = urllib.unquote(remote.capable('bundle2'))
1055 return decodecaps(capsblob)
1055 return decodecaps(capsblob)
1056
1056
1057 def obsmarkersversion(caps):
1057 def obsmarkersversion(caps):
1058 """extract the list of supported obsmarkers versions from a bundle2caps dict
1058 """extract the list of supported obsmarkers versions from a bundle2caps dict
1059 """
1059 """
1060 obscaps = caps.get('obsmarkers', ())
1060 obscaps = caps.get('obsmarkers', ())
1061 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1061 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1062
1062
1063 @parthandler('changegroup', ('version',))
1063 @parthandler('changegroup', ('version',))
1064 def handlechangegroup(op, inpart):
1064 def handlechangegroup(op, inpart):
1065 """apply a changegroup part on the repo
1065 """apply a changegroup part on the repo
1066
1066
1067 This is a very early implementation that will massive rework before being
1067 This is a very early implementation that will massive rework before being
1068 inflicted to any end-user.
1068 inflicted to any end-user.
1069 """
1069 """
1070 # Make sure we trigger a transaction creation
1070 # Make sure we trigger a transaction creation
1071 #
1071 #
1072 # The addchangegroup function will get a transaction object by itself, but
1072 # The addchangegroup function will get a transaction object by itself, but
1073 # we need to make sure we trigger the creation of a transaction object used
1073 # we need to make sure we trigger the creation of a transaction object used
1074 # for the whole processing scope.
1074 # for the whole processing scope.
1075 op.gettransaction()
1075 op.gettransaction()
1076 unpackerversion = inpart.params.get('version', '01')
1076 unpackerversion = inpart.params.get('version', '01')
1077 # We should raise an appropriate exception here
1077 # We should raise an appropriate exception here
1078 unpacker = changegroup.packermap[unpackerversion][1]
1078 unpacker = changegroup.packermap[unpackerversion][1]
1079 cg = unpacker(inpart, 'UN')
1079 cg = unpacker(inpart, 'UN')
1080 # the source and url passed here are overwritten by the one contained in
1080 # the source and url passed here are overwritten by the one contained in
1081 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1081 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1082 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
1082 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
1083 op.records.add('changegroup', {'return': ret})
1083 op.records.add('changegroup', {'return': ret})
1084 if op.reply is not None:
1084 if op.reply is not None:
1085 # This is definitely not the final form of this
1085 # This is definitely not the final form of this
1086 # return. But one need to start somewhere.
1086 # return. But one need to start somewhere.
1087 part = op.reply.newpart('reply:changegroup', mandatory=False)
1087 part = op.reply.newpart('reply:changegroup', mandatory=False)
1088 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1088 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1089 part.addparam('return', '%i' % ret, mandatory=False)
1089 part.addparam('return', '%i' % ret, mandatory=False)
1090 assert not inpart.read()
1090 assert not inpart.read()
1091
1091
1092 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1092 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1093 ['digest:%s' % k for k in util.DIGESTS.keys()])
1093 ['digest:%s' % k for k in util.DIGESTS.keys()])
1094 @parthandler('remote-changegroup', _remotechangegroupparams)
1094 @parthandler('remote-changegroup', _remotechangegroupparams)
1095 def handleremotechangegroup(op, inpart):
1095 def handleremotechangegroup(op, inpart):
1096 """apply a bundle10 on the repo, given an url and validation information
1096 """apply a bundle10 on the repo, given an url and validation information
1097
1097
1098 All the information about the remote bundle to import are given as
1098 All the information about the remote bundle to import are given as
1099 parameters. The parameters include:
1099 parameters. The parameters include:
1100 - url: the url to the bundle10.
1100 - url: the url to the bundle10.
1101 - size: the bundle10 file size. It is used to validate what was
1101 - size: the bundle10 file size. It is used to validate what was
1102 retrieved by the client matches the server knowledge about the bundle.
1102 retrieved by the client matches the server knowledge about the bundle.
1103 - digests: a space separated list of the digest types provided as
1103 - digests: a space separated list of the digest types provided as
1104 parameters.
1104 parameters.
1105 - digest:<digest-type>: the hexadecimal representation of the digest with
1105 - digest:<digest-type>: the hexadecimal representation of the digest with
1106 that name. Like the size, it is used to validate what was retrieved by
1106 that name. Like the size, it is used to validate what was retrieved by
1107 the client matches what the server knows about the bundle.
1107 the client matches what the server knows about the bundle.
1108
1108
1109 When multiple digest types are given, all of them are checked.
1109 When multiple digest types are given, all of them are checked.
1110 """
1110 """
1111 try:
1111 try:
1112 raw_url = inpart.params['url']
1112 raw_url = inpart.params['url']
1113 except KeyError:
1113 except KeyError:
1114 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1114 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1115 parsed_url = util.url(raw_url)
1115 parsed_url = util.url(raw_url)
1116 if parsed_url.scheme not in capabilities['remote-changegroup']:
1116 if parsed_url.scheme not in capabilities['remote-changegroup']:
1117 raise util.Abort(_('remote-changegroup does not support %s urls') %
1117 raise util.Abort(_('remote-changegroup does not support %s urls') %
1118 parsed_url.scheme)
1118 parsed_url.scheme)
1119
1119
1120 try:
1120 try:
1121 size = int(inpart.params['size'])
1121 size = int(inpart.params['size'])
1122 except ValueError:
1122 except ValueError:
1123 raise util.Abort(_('remote-changegroup: invalid value for param "%s"')
1123 raise util.Abort(_('remote-changegroup: invalid value for param "%s"')
1124 % 'size')
1124 % 'size')
1125 except KeyError:
1125 except KeyError:
1126 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1126 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1127
1127
1128 digests = {}
1128 digests = {}
1129 for typ in inpart.params.get('digests', '').split():
1129 for typ in inpart.params.get('digests', '').split():
1130 param = 'digest:%s' % typ
1130 param = 'digest:%s' % typ
1131 try:
1131 try:
1132 value = inpart.params[param]
1132 value = inpart.params[param]
1133 except KeyError:
1133 except KeyError:
1134 raise util.Abort(_('remote-changegroup: missing "%s" param') %
1134 raise util.Abort(_('remote-changegroup: missing "%s" param') %
1135 param)
1135 param)
1136 digests[typ] = value
1136 digests[typ] = value
1137
1137
1138 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1138 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1139
1139
1140 # Make sure we trigger a transaction creation
1140 # Make sure we trigger a transaction creation
1141 #
1141 #
1142 # The addchangegroup function will get a transaction object by itself, but
1142 # The addchangegroup function will get a transaction object by itself, but
1143 # we need to make sure we trigger the creation of a transaction object used
1143 # we need to make sure we trigger the creation of a transaction object used
1144 # for the whole processing scope.
1144 # for the whole processing scope.
1145 op.gettransaction()
1145 op.gettransaction()
1146 import exchange
1146 import exchange
1147 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1147 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1148 if not isinstance(cg, changegroup.cg1unpacker):
1148 if not isinstance(cg, changegroup.cg1unpacker):
1149 raise util.Abort(_('%s: not a bundle version 1.0') %
1149 raise util.Abort(_('%s: not a bundle version 1.0') %
1150 util.hidepassword(raw_url))
1150 util.hidepassword(raw_url))
1151 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
1151 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
1152 op.records.add('changegroup', {'return': ret})
1152 op.records.add('changegroup', {'return': ret})
1153 if op.reply is not None:
1153 if op.reply is not None:
1154 # This is definitely not the final form of this
1154 # This is definitely not the final form of this
1155 # return. But one need to start somewhere.
1155 # return. But one need to start somewhere.
1156 part = op.reply.newpart('reply:changegroup')
1156 part = op.reply.newpart('reply:changegroup')
1157 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1157 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1158 part.addparam('return', '%i' % ret, mandatory=False)
1158 part.addparam('return', '%i' % ret, mandatory=False)
1159 try:
1159 try:
1160 real_part.validate()
1160 real_part.validate()
1161 except util.Abort, e:
1161 except util.Abort, e:
1162 raise util.Abort(_('bundle at %s is corrupted:\n%s') %
1162 raise util.Abort(_('bundle at %s is corrupted:\n%s') %
1163 (util.hidepassword(raw_url), str(e)))
1163 (util.hidepassword(raw_url), str(e)))
1164 assert not inpart.read()
1164 assert not inpart.read()
1165
1165
1166 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1166 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1167 def handlereplychangegroup(op, inpart):
1167 def handlereplychangegroup(op, inpart):
1168 ret = int(inpart.params['return'])
1168 ret = int(inpart.params['return'])
1169 replyto = int(inpart.params['in-reply-to'])
1169 replyto = int(inpart.params['in-reply-to'])
1170 op.records.add('changegroup', {'return': ret}, replyto)
1170 op.records.add('changegroup', {'return': ret}, replyto)
1171
1171
1172 @parthandler('check:heads')
1172 @parthandler('check:heads')
1173 def handlecheckheads(op, inpart):
1173 def handlecheckheads(op, inpart):
1174 """check that head of the repo did not change
1174 """check that head of the repo did not change
1175
1175
1176 This is used to detect a push race when using unbundle.
1176 This is used to detect a push race when using unbundle.
1177 This replaces the "heads" argument of unbundle."""
1177 This replaces the "heads" argument of unbundle."""
1178 h = inpart.read(20)
1178 h = inpart.read(20)
1179 heads = []
1179 heads = []
1180 while len(h) == 20:
1180 while len(h) == 20:
1181 heads.append(h)
1181 heads.append(h)
1182 h = inpart.read(20)
1182 h = inpart.read(20)
1183 assert not h
1183 assert not h
1184 if heads != op.repo.heads():
1184 if heads != op.repo.heads():
1185 raise error.PushRaced('repository changed while pushing - '
1185 raise error.PushRaced('repository changed while pushing - '
1186 'please try again')
1186 'please try again')
1187
1187
1188 @parthandler('output')
1188 @parthandler('output')
1189 def handleoutput(op, inpart):
1189 def handleoutput(op, inpart):
1190 """forward output captured on the server to the client"""
1190 """forward output captured on the server to the client"""
1191 for line in inpart.read().splitlines():
1191 for line in inpart.read().splitlines():
1192 op.ui.status(('remote: %s\n' % line))
1192 op.ui.status(('remote: %s\n' % line))
1193
1193
1194 @parthandler('replycaps')
1194 @parthandler('replycaps')
1195 def handlereplycaps(op, inpart):
1195 def handlereplycaps(op, inpart):
1196 """Notify that a reply bundle should be created
1196 """Notify that a reply bundle should be created
1197
1197
1198 The payload contains the capabilities information for the reply"""
1198 The payload contains the capabilities information for the reply"""
1199 caps = decodecaps(inpart.read())
1199 caps = decodecaps(inpart.read())
1200 if op.reply is None:
1200 if op.reply is None:
1201 op.reply = bundle20(op.ui, caps)
1201 op.reply = bundle20(op.ui, caps)
1202
1202
1203 @parthandler('error:abort', ('message', 'hint'))
1203 @parthandler('error:abort', ('message', 'hint'))
1204 def handleerrorabort(op, inpart):
1204 def handleerrorabort(op, inpart):
1205 """Used to transmit abort error over the wire"""
1205 """Used to transmit abort error over the wire"""
1206 raise util.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
1206 raise util.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
1207
1207
1208 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1208 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1209 def handleerrorunsupportedcontent(op, inpart):
1209 def handleerrorunsupportedcontent(op, inpart):
1210 """Used to transmit unknown content error over the wire"""
1210 """Used to transmit unknown content error over the wire"""
1211 kwargs = {}
1211 kwargs = {}
1212 parttype = inpart.params.get('parttype')
1212 parttype = inpart.params.get('parttype')
1213 if parttype is not None:
1213 if parttype is not None:
1214 kwargs['parttype'] = parttype
1214 kwargs['parttype'] = parttype
1215 params = inpart.params.get('params')
1215 params = inpart.params.get('params')
1216 if params is not None:
1216 if params is not None:
1217 kwargs['params'] = params.split('\0')
1217 kwargs['params'] = params.split('\0')
1218
1218
1219 raise error.UnsupportedPartError(**kwargs)
1219 raise error.UnsupportedPartError(**kwargs)
1220
1220
1221 @parthandler('error:pushraced', ('message',))
1221 @parthandler('error:pushraced', ('message',))
1222 def handleerrorpushraced(op, inpart):
1222 def handleerrorpushraced(op, inpart):
1223 """Used to transmit push race error over the wire"""
1223 """Used to transmit push race error over the wire"""
1224 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1224 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1225
1225
1226 @parthandler('listkeys', ('namespace',))
1226 @parthandler('listkeys', ('namespace',))
1227 def handlelistkeys(op, inpart):
1227 def handlelistkeys(op, inpart):
1228 """retrieve pushkey namespace content stored in a bundle2"""
1228 """retrieve pushkey namespace content stored in a bundle2"""
1229 namespace = inpart.params['namespace']
1229 namespace = inpart.params['namespace']
1230 r = pushkey.decodekeys(inpart.read())
1230 r = pushkey.decodekeys(inpart.read())
1231 op.records.add('listkeys', (namespace, r))
1231 op.records.add('listkeys', (namespace, r))
1232
1232
1233 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1233 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1234 def handlepushkey(op, inpart):
1234 def handlepushkey(op, inpart):
1235 """process a pushkey request"""
1235 """process a pushkey request"""
1236 dec = pushkey.decode
1236 dec = pushkey.decode
1237 namespace = dec(inpart.params['namespace'])
1237 namespace = dec(inpart.params['namespace'])
1238 key = dec(inpart.params['key'])
1238 key = dec(inpart.params['key'])
1239 old = dec(inpart.params['old'])
1239 old = dec(inpart.params['old'])
1240 new = dec(inpart.params['new'])
1240 new = dec(inpart.params['new'])
1241 ret = op.repo.pushkey(namespace, key, old, new)
1241 ret = op.repo.pushkey(namespace, key, old, new)
1242 record = {'namespace': namespace,
1242 record = {'namespace': namespace,
1243 'key': key,
1243 'key': key,
1244 'old': old,
1244 'old': old,
1245 'new': new}
1245 'new': new}
1246 op.records.add('pushkey', record)
1246 op.records.add('pushkey', record)
1247 if op.reply is not None:
1247 if op.reply is not None:
1248 rpart = op.reply.newpart('reply:pushkey')
1248 rpart = op.reply.newpart('reply:pushkey')
1249 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1249 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1250 rpart.addparam('return', '%i' % ret, mandatory=False)
1250 rpart.addparam('return', '%i' % ret, mandatory=False)
1251
1251
1252 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1252 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1253 def handlepushkeyreply(op, inpart):
1253 def handlepushkeyreply(op, inpart):
1254 """retrieve the result of a pushkey request"""
1254 """retrieve the result of a pushkey request"""
1255 ret = int(inpart.params['return'])
1255 ret = int(inpart.params['return'])
1256 partid = int(inpart.params['in-reply-to'])
1256 partid = int(inpart.params['in-reply-to'])
1257 op.records.add('pushkey', {'return': ret}, partid)
1257 op.records.add('pushkey', {'return': ret}, partid)
1258
1258
1259 @parthandler('obsmarkers')
1259 @parthandler('obsmarkers')
1260 def handleobsmarker(op, inpart):
1260 def handleobsmarker(op, inpart):
1261 """add a stream of obsmarkers to the repo"""
1261 """add a stream of obsmarkers to the repo"""
1262 tr = op.gettransaction()
1262 tr = op.gettransaction()
1263 markerdata = inpart.read()
1263 markerdata = inpart.read()
1264 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1264 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1265 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1265 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1266 % len(markerdata))
1266 % len(markerdata))
1267 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1267 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1268 if new:
1268 if new:
1269 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1269 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1270 op.records.add('obsmarkers', {'new': new})
1270 op.records.add('obsmarkers', {'new': new})
1271 if op.reply is not None:
1271 if op.reply is not None:
1272 rpart = op.reply.newpart('reply:obsmarkers')
1272 rpart = op.reply.newpart('reply:obsmarkers')
1273 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1273 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1274 rpart.addparam('new', '%i' % new, mandatory=False)
1274 rpart.addparam('new', '%i' % new, mandatory=False)
1275
1275
1276
1276
1277 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1277 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1278 def handlepushkeyreply(op, inpart):
1278 def handlepushkeyreply(op, inpart):
1279 """retrieve the result of a pushkey request"""
1279 """retrieve the result of a pushkey request"""
1280 ret = int(inpart.params['new'])
1280 ret = int(inpart.params['new'])
1281 partid = int(inpart.params['in-reply-to'])
1281 partid = int(inpart.params['in-reply-to'])
1282 op.records.add('obsmarkers', {'new': ret}, partid)
1282 op.records.add('obsmarkers', {'new': ret}, partid)
General Comments 0
You need to be logged in to leave comments. Login now