##// END OF EJS Templates
bundle2: introduce a specific function for bundling debug message...
Pierre-Yves David -
r25313:8f2c362b default
parent child Browse files
Show More
@@ -1,1272 +1,1276
1 # bundle2.py - generic container format to transmit arbitrary data.
1 # bundle2.py - generic container format to transmit arbitrary data.
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7 """Handling of the new bundle2 format
7 """Handling of the new bundle2 format
8
8
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 payloads in an application agnostic way. It consist in a sequence of "parts"
10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 that will be handed to and processed by the application layer.
11 that will be handed to and processed by the application layer.
12
12
13
13
14 General format architecture
14 General format architecture
15 ===========================
15 ===========================
16
16
17 The format is architectured as follow
17 The format is architectured as follow
18
18
19 - magic string
19 - magic string
20 - stream level parameters
20 - stream level parameters
21 - payload parts (any number)
21 - payload parts (any number)
22 - end of stream marker.
22 - end of stream marker.
23
23
24 the Binary format
24 the Binary format
25 ============================
25 ============================
26
26
27 All numbers are unsigned and big-endian.
27 All numbers are unsigned and big-endian.
28
28
29 stream level parameters
29 stream level parameters
30 ------------------------
30 ------------------------
31
31
32 Binary format is as follow
32 Binary format is as follow
33
33
34 :params size: int32
34 :params size: int32
35
35
36 The total number of Bytes used by the parameters
36 The total number of Bytes used by the parameters
37
37
38 :params value: arbitrary number of Bytes
38 :params value: arbitrary number of Bytes
39
39
40 A blob of `params size` containing the serialized version of all stream level
40 A blob of `params size` containing the serialized version of all stream level
41 parameters.
41 parameters.
42
42
43 The blob contains a space separated list of parameters. Parameters with value
43 The blob contains a space separated list of parameters. Parameters with value
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45
45
46 Empty name are obviously forbidden.
46 Empty name are obviously forbidden.
47
47
48 Name MUST start with a letter. If this first letter is lower case, the
48 Name MUST start with a letter. If this first letter is lower case, the
49 parameter is advisory and can be safely ignored. However when the first
49 parameter is advisory and can be safely ignored. However when the first
50 letter is capital, the parameter is mandatory and the bundling process MUST
50 letter is capital, the parameter is mandatory and the bundling process MUST
51 stop if he is not able to proceed it.
51 stop if he is not able to proceed it.
52
52
53 Stream parameters use a simple textual format for two main reasons:
53 Stream parameters use a simple textual format for two main reasons:
54
54
55 - Stream level parameters should remain simple and we want to discourage any
55 - Stream level parameters should remain simple and we want to discourage any
56 crazy usage.
56 crazy usage.
57 - Textual data allow easy human inspection of a bundle2 header in case of
57 - Textual data allow easy human inspection of a bundle2 header in case of
58 troubles.
58 troubles.
59
59
60 Any Applicative level options MUST go into a bundle2 part instead.
60 Any Applicative level options MUST go into a bundle2 part instead.
61
61
62 Payload part
62 Payload part
63 ------------------------
63 ------------------------
64
64
65 Binary format is as follow
65 Binary format is as follow
66
66
67 :header size: int32
67 :header size: int32
68
68
69 The total number of Bytes used by the part headers. When the header is empty
69 The total number of Bytes used by the part headers. When the header is empty
70 (size = 0) this is interpreted as the end of stream marker.
70 (size = 0) this is interpreted as the end of stream marker.
71
71
72 :header:
72 :header:
73
73
74 The header defines how to interpret the part. It contains two piece of
74 The header defines how to interpret the part. It contains two piece of
75 data: the part type, and the part parameters.
75 data: the part type, and the part parameters.
76
76
77 The part type is used to route an application level handler, that can
77 The part type is used to route an application level handler, that can
78 interpret payload.
78 interpret payload.
79
79
80 Part parameters are passed to the application level handler. They are
80 Part parameters are passed to the application level handler. They are
81 meant to convey information that will help the application level object to
81 meant to convey information that will help the application level object to
82 interpret the part payload.
82 interpret the part payload.
83
83
84 The binary format of the header is has follow
84 The binary format of the header is has follow
85
85
86 :typesize: (one byte)
86 :typesize: (one byte)
87
87
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89
89
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 to this part.
91 to this part.
92
92
93 :parameters:
93 :parameters:
94
94
95 Part's parameter may have arbitrary content, the binary structure is::
95 Part's parameter may have arbitrary content, the binary structure is::
96
96
97 <mandatory-count><advisory-count><param-sizes><param-data>
97 <mandatory-count><advisory-count><param-sizes><param-data>
98
98
99 :mandatory-count: 1 byte, number of mandatory parameters
99 :mandatory-count: 1 byte, number of mandatory parameters
100
100
101 :advisory-count: 1 byte, number of advisory parameters
101 :advisory-count: 1 byte, number of advisory parameters
102
102
103 :param-sizes:
103 :param-sizes:
104
104
105 N couple of bytes, where N is the total number of parameters. Each
105 N couple of bytes, where N is the total number of parameters. Each
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107
107
108 :param-data:
108 :param-data:
109
109
110 A blob of bytes from which each parameter key and value can be
110 A blob of bytes from which each parameter key and value can be
111 retrieved using the list of size couples stored in the previous
111 retrieved using the list of size couples stored in the previous
112 field.
112 field.
113
113
114 Mandatory parameters comes first, then the advisory ones.
114 Mandatory parameters comes first, then the advisory ones.
115
115
116 Each parameter's key MUST be unique within the part.
116 Each parameter's key MUST be unique within the part.
117
117
118 :payload:
118 :payload:
119
119
120 payload is a series of `<chunksize><chunkdata>`.
120 payload is a series of `<chunksize><chunkdata>`.
121
121
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124
124
125 The current implementation always produces either zero or one chunk.
125 The current implementation always produces either zero or one chunk.
126 This is an implementation limitation that will ultimately be lifted.
126 This is an implementation limitation that will ultimately be lifted.
127
127
128 `chunksize` can be negative to trigger special case processing. No such
128 `chunksize` can be negative to trigger special case processing. No such
129 processing is in place yet.
129 processing is in place yet.
130
130
131 Bundle processing
131 Bundle processing
132 ============================
132 ============================
133
133
134 Each part is processed in order using a "part handler". Handler are registered
134 Each part is processed in order using a "part handler". Handler are registered
135 for a certain part type.
135 for a certain part type.
136
136
137 The matching of a part to its handler is case insensitive. The case of the
137 The matching of a part to its handler is case insensitive. The case of the
138 part type is used to know if a part is mandatory or advisory. If the Part type
138 part type is used to know if a part is mandatory or advisory. If the Part type
139 contains any uppercase char it is considered mandatory. When no handler is
139 contains any uppercase char it is considered mandatory. When no handler is
140 known for a Mandatory part, the process is aborted and an exception is raised.
140 known for a Mandatory part, the process is aborted and an exception is raised.
141 If the part is advisory and no handler is known, the part is ignored. When the
141 If the part is advisory and no handler is known, the part is ignored. When the
142 process is aborted, the full bundle is still read from the stream to keep the
142 process is aborted, the full bundle is still read from the stream to keep the
143 channel usable. But none of the part read from an abort are processed. In the
143 channel usable. But none of the part read from an abort are processed. In the
144 future, dropping the stream may become an option for channel we do not care to
144 future, dropping the stream may become an option for channel we do not care to
145 preserve.
145 preserve.
146 """
146 """
147
147
148 import errno
148 import errno
149 import sys
149 import sys
150 import util
150 import util
151 import struct
151 import struct
152 import urllib
152 import urllib
153 import string
153 import string
154 import obsolete
154 import obsolete
155 import pushkey
155 import pushkey
156 import url
156 import url
157 import re
157 import re
158
158
159 import changegroup, error
159 import changegroup, error
160 from i18n import _
160 from i18n import _
161
161
162 _pack = struct.pack
162 _pack = struct.pack
163 _unpack = struct.unpack
163 _unpack = struct.unpack
164
164
165 _fstreamparamsize = '>i'
165 _fstreamparamsize = '>i'
166 _fpartheadersize = '>i'
166 _fpartheadersize = '>i'
167 _fparttypesize = '>B'
167 _fparttypesize = '>B'
168 _fpartid = '>I'
168 _fpartid = '>I'
169 _fpayloadsize = '>i'
169 _fpayloadsize = '>i'
170 _fpartparamcount = '>BB'
170 _fpartparamcount = '>BB'
171
171
172 preferedchunksize = 4096
172 preferedchunksize = 4096
173
173
174 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
174 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
175
175
176 def outdebug(ui, message):
177 """debug regarding output stream (bundling)"""
178 ui.debug(message)
179
176 def validateparttype(parttype):
180 def validateparttype(parttype):
177 """raise ValueError if a parttype contains invalid character"""
181 """raise ValueError if a parttype contains invalid character"""
178 if _parttypeforbidden.search(parttype):
182 if _parttypeforbidden.search(parttype):
179 raise ValueError(parttype)
183 raise ValueError(parttype)
180
184
181 def _makefpartparamsizes(nbparams):
185 def _makefpartparamsizes(nbparams):
182 """return a struct format to read part parameter sizes
186 """return a struct format to read part parameter sizes
183
187
184 The number parameters is variable so we need to build that format
188 The number parameters is variable so we need to build that format
185 dynamically.
189 dynamically.
186 """
190 """
187 return '>'+('BB'*nbparams)
191 return '>'+('BB'*nbparams)
188
192
189 parthandlermapping = {}
193 parthandlermapping = {}
190
194
191 def parthandler(parttype, params=()):
195 def parthandler(parttype, params=()):
192 """decorator that register a function as a bundle2 part handler
196 """decorator that register a function as a bundle2 part handler
193
197
194 eg::
198 eg::
195
199
196 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
200 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
197 def myparttypehandler(...):
201 def myparttypehandler(...):
198 '''process a part of type "my part".'''
202 '''process a part of type "my part".'''
199 ...
203 ...
200 """
204 """
201 validateparttype(parttype)
205 validateparttype(parttype)
202 def _decorator(func):
206 def _decorator(func):
203 lparttype = parttype.lower() # enforce lower case matching.
207 lparttype = parttype.lower() # enforce lower case matching.
204 assert lparttype not in parthandlermapping
208 assert lparttype not in parthandlermapping
205 parthandlermapping[lparttype] = func
209 parthandlermapping[lparttype] = func
206 func.params = frozenset(params)
210 func.params = frozenset(params)
207 return func
211 return func
208 return _decorator
212 return _decorator
209
213
210 class unbundlerecords(object):
214 class unbundlerecords(object):
211 """keep record of what happens during and unbundle
215 """keep record of what happens during and unbundle
212
216
213 New records are added using `records.add('cat', obj)`. Where 'cat' is a
217 New records are added using `records.add('cat', obj)`. Where 'cat' is a
214 category of record and obj is an arbitrary object.
218 category of record and obj is an arbitrary object.
215
219
216 `records['cat']` will return all entries of this category 'cat'.
220 `records['cat']` will return all entries of this category 'cat'.
217
221
218 Iterating on the object itself will yield `('category', obj)` tuples
222 Iterating on the object itself will yield `('category', obj)` tuples
219 for all entries.
223 for all entries.
220
224
221 All iterations happens in chronological order.
225 All iterations happens in chronological order.
222 """
226 """
223
227
224 def __init__(self):
228 def __init__(self):
225 self._categories = {}
229 self._categories = {}
226 self._sequences = []
230 self._sequences = []
227 self._replies = {}
231 self._replies = {}
228
232
229 def add(self, category, entry, inreplyto=None):
233 def add(self, category, entry, inreplyto=None):
230 """add a new record of a given category.
234 """add a new record of a given category.
231
235
232 The entry can then be retrieved in the list returned by
236 The entry can then be retrieved in the list returned by
233 self['category']."""
237 self['category']."""
234 self._categories.setdefault(category, []).append(entry)
238 self._categories.setdefault(category, []).append(entry)
235 self._sequences.append((category, entry))
239 self._sequences.append((category, entry))
236 if inreplyto is not None:
240 if inreplyto is not None:
237 self.getreplies(inreplyto).add(category, entry)
241 self.getreplies(inreplyto).add(category, entry)
238
242
239 def getreplies(self, partid):
243 def getreplies(self, partid):
240 """get the records that are replies to a specific part"""
244 """get the records that are replies to a specific part"""
241 return self._replies.setdefault(partid, unbundlerecords())
245 return self._replies.setdefault(partid, unbundlerecords())
242
246
243 def __getitem__(self, cat):
247 def __getitem__(self, cat):
244 return tuple(self._categories.get(cat, ()))
248 return tuple(self._categories.get(cat, ()))
245
249
246 def __iter__(self):
250 def __iter__(self):
247 return iter(self._sequences)
251 return iter(self._sequences)
248
252
249 def __len__(self):
253 def __len__(self):
250 return len(self._sequences)
254 return len(self._sequences)
251
255
252 def __nonzero__(self):
256 def __nonzero__(self):
253 return bool(self._sequences)
257 return bool(self._sequences)
254
258
255 class bundleoperation(object):
259 class bundleoperation(object):
256 """an object that represents a single bundling process
260 """an object that represents a single bundling process
257
261
258 Its purpose is to carry unbundle-related objects and states.
262 Its purpose is to carry unbundle-related objects and states.
259
263
260 A new object should be created at the beginning of each bundle processing.
264 A new object should be created at the beginning of each bundle processing.
261 The object is to be returned by the processing function.
265 The object is to be returned by the processing function.
262
266
263 The object has very little content now it will ultimately contain:
267 The object has very little content now it will ultimately contain:
264 * an access to the repo the bundle is applied to,
268 * an access to the repo the bundle is applied to,
265 * a ui object,
269 * a ui object,
266 * a way to retrieve a transaction to add changes to the repo,
270 * a way to retrieve a transaction to add changes to the repo,
267 * a way to record the result of processing each part,
271 * a way to record the result of processing each part,
268 * a way to construct a bundle response when applicable.
272 * a way to construct a bundle response when applicable.
269 """
273 """
270
274
271 def __init__(self, repo, transactiongetter, captureoutput=True):
275 def __init__(self, repo, transactiongetter, captureoutput=True):
272 self.repo = repo
276 self.repo = repo
273 self.ui = repo.ui
277 self.ui = repo.ui
274 self.records = unbundlerecords()
278 self.records = unbundlerecords()
275 self.gettransaction = transactiongetter
279 self.gettransaction = transactiongetter
276 self.reply = None
280 self.reply = None
277 self.captureoutput = captureoutput
281 self.captureoutput = captureoutput
278
282
279 class TransactionUnavailable(RuntimeError):
283 class TransactionUnavailable(RuntimeError):
280 pass
284 pass
281
285
282 def _notransaction():
286 def _notransaction():
283 """default method to get a transaction while processing a bundle
287 """default method to get a transaction while processing a bundle
284
288
285 Raise an exception to highlight the fact that no transaction was expected
289 Raise an exception to highlight the fact that no transaction was expected
286 to be created"""
290 to be created"""
287 raise TransactionUnavailable()
291 raise TransactionUnavailable()
288
292
289 def processbundle(repo, unbundler, transactiongetter=None, op=None):
293 def processbundle(repo, unbundler, transactiongetter=None, op=None):
290 """This function process a bundle, apply effect to/from a repo
294 """This function process a bundle, apply effect to/from a repo
291
295
292 It iterates over each part then searches for and uses the proper handling
296 It iterates over each part then searches for and uses the proper handling
293 code to process the part. Parts are processed in order.
297 code to process the part. Parts are processed in order.
294
298
295 This is very early version of this function that will be strongly reworked
299 This is very early version of this function that will be strongly reworked
296 before final usage.
300 before final usage.
297
301
298 Unknown Mandatory part will abort the process.
302 Unknown Mandatory part will abort the process.
299
303
300 It is temporarily possible to provide a prebuilt bundleoperation to the
304 It is temporarily possible to provide a prebuilt bundleoperation to the
301 function. This is used to ensure output is properly propagated in case of
305 function. This is used to ensure output is properly propagated in case of
302 an error during the unbundling. This output capturing part will likely be
306 an error during the unbundling. This output capturing part will likely be
303 reworked and this ability will probably go away in the process.
307 reworked and this ability will probably go away in the process.
304 """
308 """
305 if op is None:
309 if op is None:
306 if transactiongetter is None:
310 if transactiongetter is None:
307 transactiongetter = _notransaction
311 transactiongetter = _notransaction
308 op = bundleoperation(repo, transactiongetter)
312 op = bundleoperation(repo, transactiongetter)
309 # todo:
313 # todo:
310 # - replace this is a init function soon.
314 # - replace this is a init function soon.
311 # - exception catching
315 # - exception catching
312 unbundler.params
316 unbundler.params
313 iterparts = unbundler.iterparts()
317 iterparts = unbundler.iterparts()
314 part = None
318 part = None
315 try:
319 try:
316 for part in iterparts:
320 for part in iterparts:
317 _processpart(op, part)
321 _processpart(op, part)
318 except BaseException, exc:
322 except BaseException, exc:
319 for part in iterparts:
323 for part in iterparts:
320 # consume the bundle content
324 # consume the bundle content
321 part.seek(0, 2)
325 part.seek(0, 2)
322 # Small hack to let caller code distinguish exceptions from bundle2
326 # Small hack to let caller code distinguish exceptions from bundle2
323 # processing from processing the old format. This is mostly
327 # processing from processing the old format. This is mostly
324 # needed to handle different return codes to unbundle according to the
328 # needed to handle different return codes to unbundle according to the
325 # type of bundle. We should probably clean up or drop this return code
329 # type of bundle. We should probably clean up or drop this return code
326 # craziness in a future version.
330 # craziness in a future version.
327 exc.duringunbundle2 = True
331 exc.duringunbundle2 = True
328 salvaged = []
332 salvaged = []
329 if op.reply is not None:
333 if op.reply is not None:
330 salvaged = op.reply.salvageoutput()
334 salvaged = op.reply.salvageoutput()
331 exc._bundle2salvagedoutput = salvaged
335 exc._bundle2salvagedoutput = salvaged
332 raise
336 raise
333 return op
337 return op
334
338
335 def _processpart(op, part):
339 def _processpart(op, part):
336 """process a single part from a bundle
340 """process a single part from a bundle
337
341
338 The part is guaranteed to have been fully consumed when the function exits
342 The part is guaranteed to have been fully consumed when the function exits
339 (even if an exception is raised)."""
343 (even if an exception is raised)."""
340 try:
344 try:
341 try:
345 try:
342 handler = parthandlermapping.get(part.type)
346 handler = parthandlermapping.get(part.type)
343 if handler is None:
347 if handler is None:
344 raise error.UnsupportedPartError(parttype=part.type)
348 raise error.UnsupportedPartError(parttype=part.type)
345 op.ui.debug('found a handler for part %r\n' % part.type)
349 op.ui.debug('found a handler for part %r\n' % part.type)
346 unknownparams = part.mandatorykeys - handler.params
350 unknownparams = part.mandatorykeys - handler.params
347 if unknownparams:
351 if unknownparams:
348 unknownparams = list(unknownparams)
352 unknownparams = list(unknownparams)
349 unknownparams.sort()
353 unknownparams.sort()
350 raise error.UnsupportedPartError(parttype=part.type,
354 raise error.UnsupportedPartError(parttype=part.type,
351 params=unknownparams)
355 params=unknownparams)
352 except error.UnsupportedPartError, exc:
356 except error.UnsupportedPartError, exc:
353 if part.mandatory: # mandatory parts
357 if part.mandatory: # mandatory parts
354 raise
358 raise
355 op.ui.debug('ignoring unsupported advisory part %s\n' % exc)
359 op.ui.debug('ignoring unsupported advisory part %s\n' % exc)
356 return # skip to part processing
360 return # skip to part processing
357
361
358 # handler is called outside the above try block so that we don't
362 # handler is called outside the above try block so that we don't
359 # risk catching KeyErrors from anything other than the
363 # risk catching KeyErrors from anything other than the
360 # parthandlermapping lookup (any KeyError raised by handler()
364 # parthandlermapping lookup (any KeyError raised by handler()
361 # itself represents a defect of a different variety).
365 # itself represents a defect of a different variety).
362 output = None
366 output = None
363 if op.captureoutput and op.reply is not None:
367 if op.captureoutput and op.reply is not None:
364 op.ui.pushbuffer(error=True, subproc=True)
368 op.ui.pushbuffer(error=True, subproc=True)
365 output = ''
369 output = ''
366 try:
370 try:
367 handler(op, part)
371 handler(op, part)
368 finally:
372 finally:
369 if output is not None:
373 if output is not None:
370 output = op.ui.popbuffer()
374 output = op.ui.popbuffer()
371 if output:
375 if output:
372 outpart = op.reply.newpart('output', data=output,
376 outpart = op.reply.newpart('output', data=output,
373 mandatory=False)
377 mandatory=False)
374 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
378 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
375 finally:
379 finally:
376 # consume the part content to not corrupt the stream.
380 # consume the part content to not corrupt the stream.
377 part.seek(0, 2)
381 part.seek(0, 2)
378
382
379
383
380 def decodecaps(blob):
384 def decodecaps(blob):
381 """decode a bundle2 caps bytes blob into a dictionary
385 """decode a bundle2 caps bytes blob into a dictionary
382
386
383 The blob is a list of capabilities (one per line)
387 The blob is a list of capabilities (one per line)
384 Capabilities may have values using a line of the form::
388 Capabilities may have values using a line of the form::
385
389
386 capability=value1,value2,value3
390 capability=value1,value2,value3
387
391
388 The values are always a list."""
392 The values are always a list."""
389 caps = {}
393 caps = {}
390 for line in blob.splitlines():
394 for line in blob.splitlines():
391 if not line:
395 if not line:
392 continue
396 continue
393 if '=' not in line:
397 if '=' not in line:
394 key, vals = line, ()
398 key, vals = line, ()
395 else:
399 else:
396 key, vals = line.split('=', 1)
400 key, vals = line.split('=', 1)
397 vals = vals.split(',')
401 vals = vals.split(',')
398 key = urllib.unquote(key)
402 key = urllib.unquote(key)
399 vals = [urllib.unquote(v) for v in vals]
403 vals = [urllib.unquote(v) for v in vals]
400 caps[key] = vals
404 caps[key] = vals
401 return caps
405 return caps
402
406
403 def encodecaps(caps):
407 def encodecaps(caps):
404 """encode a bundle2 caps dictionary into a bytes blob"""
408 """encode a bundle2 caps dictionary into a bytes blob"""
405 chunks = []
409 chunks = []
406 for ca in sorted(caps):
410 for ca in sorted(caps):
407 vals = caps[ca]
411 vals = caps[ca]
408 ca = urllib.quote(ca)
412 ca = urllib.quote(ca)
409 vals = [urllib.quote(v) for v in vals]
413 vals = [urllib.quote(v) for v in vals]
410 if vals:
414 if vals:
411 ca = "%s=%s" % (ca, ','.join(vals))
415 ca = "%s=%s" % (ca, ','.join(vals))
412 chunks.append(ca)
416 chunks.append(ca)
413 return '\n'.join(chunks)
417 return '\n'.join(chunks)
414
418
415 class bundle20(object):
419 class bundle20(object):
416 """represent an outgoing bundle2 container
420 """represent an outgoing bundle2 container
417
421
418 Use the `addparam` method to add stream level parameter. and `newpart` to
422 Use the `addparam` method to add stream level parameter. and `newpart` to
419 populate it. Then call `getchunks` to retrieve all the binary chunks of
423 populate it. Then call `getchunks` to retrieve all the binary chunks of
420 data that compose the bundle2 container."""
424 data that compose the bundle2 container."""
421
425
422 _magicstring = 'HG20'
426 _magicstring = 'HG20'
423
427
424 def __init__(self, ui, capabilities=()):
428 def __init__(self, ui, capabilities=()):
425 self.ui = ui
429 self.ui = ui
426 self._params = []
430 self._params = []
427 self._parts = []
431 self._parts = []
428 self.capabilities = dict(capabilities)
432 self.capabilities = dict(capabilities)
429
433
430 @property
434 @property
431 def nbparts(self):
435 def nbparts(self):
432 """total number of parts added to the bundler"""
436 """total number of parts added to the bundler"""
433 return len(self._parts)
437 return len(self._parts)
434
438
435 # methods used to defines the bundle2 content
439 # methods used to defines the bundle2 content
436 def addparam(self, name, value=None):
440 def addparam(self, name, value=None):
437 """add a stream level parameter"""
441 """add a stream level parameter"""
438 if not name:
442 if not name:
439 raise ValueError('empty parameter name')
443 raise ValueError('empty parameter name')
440 if name[0] not in string.letters:
444 if name[0] not in string.letters:
441 raise ValueError('non letter first character: %r' % name)
445 raise ValueError('non letter first character: %r' % name)
442 self._params.append((name, value))
446 self._params.append((name, value))
443
447
444 def addpart(self, part):
448 def addpart(self, part):
445 """add a new part to the bundle2 container
449 """add a new part to the bundle2 container
446
450
447 Parts contains the actual applicative payload."""
451 Parts contains the actual applicative payload."""
448 assert part.id is None
452 assert part.id is None
449 part.id = len(self._parts) # very cheap counter
453 part.id = len(self._parts) # very cheap counter
450 self._parts.append(part)
454 self._parts.append(part)
451
455
452 def newpart(self, typeid, *args, **kwargs):
456 def newpart(self, typeid, *args, **kwargs):
453 """create a new part and add it to the containers
457 """create a new part and add it to the containers
454
458
455 As the part is directly added to the containers. For now, this means
459 As the part is directly added to the containers. For now, this means
456 that any failure to properly initialize the part after calling
460 that any failure to properly initialize the part after calling
457 ``newpart`` should result in a failure of the whole bundling process.
461 ``newpart`` should result in a failure of the whole bundling process.
458
462
459 You can still fall back to manually create and add if you need better
463 You can still fall back to manually create and add if you need better
460 control."""
464 control."""
461 part = bundlepart(typeid, *args, **kwargs)
465 part = bundlepart(typeid, *args, **kwargs)
462 self.addpart(part)
466 self.addpart(part)
463 return part
467 return part
464
468
465 # methods used to generate the bundle2 stream
469 # methods used to generate the bundle2 stream
466 def getchunks(self):
470 def getchunks(self):
467 self.ui.debug('start emission of %s stream\n' % self._magicstring)
471 outdebug(self.ui, 'start emission of %s stream\n' % self._magicstring)
468 yield self._magicstring
472 yield self._magicstring
469 param = self._paramchunk()
473 param = self._paramchunk()
470 self.ui.debug('bundle parameter: %s\n' % param)
474 outdebug(self.ui, 'bundle parameter: %s\n' % param)
471 yield _pack(_fstreamparamsize, len(param))
475 yield _pack(_fstreamparamsize, len(param))
472 if param:
476 if param:
473 yield param
477 yield param
474
478
475 self.ui.debug('start of parts\n')
479 outdebug(self.ui, 'start of parts\n')
476 for part in self._parts:
480 for part in self._parts:
477 self.ui.debug('bundle part: "%s"\n' % part.type)
481 outdebug(self.ui, 'bundle part: "%s"\n' % part.type)
478 for chunk in part.getchunks():
482 for chunk in part.getchunks():
479 yield chunk
483 yield chunk
480 self.ui.debug('end of bundle\n')
484 outdebug(self.ui, 'end of bundle\n')
481 yield _pack(_fpartheadersize, 0)
485 yield _pack(_fpartheadersize, 0)
482
486
483 def _paramchunk(self):
487 def _paramchunk(self):
484 """return a encoded version of all stream parameters"""
488 """return a encoded version of all stream parameters"""
485 blocks = []
489 blocks = []
486 for par, value in self._params:
490 for par, value in self._params:
487 par = urllib.quote(par)
491 par = urllib.quote(par)
488 if value is not None:
492 if value is not None:
489 value = urllib.quote(value)
493 value = urllib.quote(value)
490 par = '%s=%s' % (par, value)
494 par = '%s=%s' % (par, value)
491 blocks.append(par)
495 blocks.append(par)
492 return ' '.join(blocks)
496 return ' '.join(blocks)
493
497
494 def salvageoutput(self):
498 def salvageoutput(self):
495 """return a list with a copy of all output parts in the bundle
499 """return a list with a copy of all output parts in the bundle
496
500
497 This is meant to be used during error handling to make sure we preserve
501 This is meant to be used during error handling to make sure we preserve
498 server output"""
502 server output"""
499 salvaged = []
503 salvaged = []
500 for part in self._parts:
504 for part in self._parts:
501 if part.type.startswith('output'):
505 if part.type.startswith('output'):
502 salvaged.append(part.copy())
506 salvaged.append(part.copy())
503 return salvaged
507 return salvaged
504
508
505
509
506 class unpackermixin(object):
510 class unpackermixin(object):
507 """A mixin to extract bytes and struct data from a stream"""
511 """A mixin to extract bytes and struct data from a stream"""
508
512
509 def __init__(self, fp):
513 def __init__(self, fp):
510 self._fp = fp
514 self._fp = fp
511 self._seekable = (util.safehasattr(fp, 'seek') and
515 self._seekable = (util.safehasattr(fp, 'seek') and
512 util.safehasattr(fp, 'tell'))
516 util.safehasattr(fp, 'tell'))
513
517
514 def _unpack(self, format):
518 def _unpack(self, format):
515 """unpack this struct format from the stream"""
519 """unpack this struct format from the stream"""
516 data = self._readexact(struct.calcsize(format))
520 data = self._readexact(struct.calcsize(format))
517 return _unpack(format, data)
521 return _unpack(format, data)
518
522
519 def _readexact(self, size):
523 def _readexact(self, size):
520 """read exactly <size> bytes from the stream"""
524 """read exactly <size> bytes from the stream"""
521 return changegroup.readexactly(self._fp, size)
525 return changegroup.readexactly(self._fp, size)
522
526
523 def seek(self, offset, whence=0):
527 def seek(self, offset, whence=0):
524 """move the underlying file pointer"""
528 """move the underlying file pointer"""
525 if self._seekable:
529 if self._seekable:
526 return self._fp.seek(offset, whence)
530 return self._fp.seek(offset, whence)
527 else:
531 else:
528 raise NotImplementedError(_('File pointer is not seekable'))
532 raise NotImplementedError(_('File pointer is not seekable'))
529
533
530 def tell(self):
534 def tell(self):
531 """return the file offset, or None if file is not seekable"""
535 """return the file offset, or None if file is not seekable"""
532 if self._seekable:
536 if self._seekable:
533 try:
537 try:
534 return self._fp.tell()
538 return self._fp.tell()
535 except IOError, e:
539 except IOError, e:
536 if e.errno == errno.ESPIPE:
540 if e.errno == errno.ESPIPE:
537 self._seekable = False
541 self._seekable = False
538 else:
542 else:
539 raise
543 raise
540 return None
544 return None
541
545
542 def close(self):
546 def close(self):
543 """close underlying file"""
547 """close underlying file"""
544 if util.safehasattr(self._fp, 'close'):
548 if util.safehasattr(self._fp, 'close'):
545 return self._fp.close()
549 return self._fp.close()
546
550
547 def getunbundler(ui, fp, header=None):
551 def getunbundler(ui, fp, header=None):
548 """return a valid unbundler object for a given header"""
552 """return a valid unbundler object for a given header"""
549 if header is None:
553 if header is None:
550 header = changegroup.readexactly(fp, 4)
554 header = changegroup.readexactly(fp, 4)
551 magic, version = header[0:2], header[2:4]
555 magic, version = header[0:2], header[2:4]
552 if magic != 'HG':
556 if magic != 'HG':
553 raise util.Abort(_('not a Mercurial bundle'))
557 raise util.Abort(_('not a Mercurial bundle'))
554 unbundlerclass = formatmap.get(version)
558 unbundlerclass = formatmap.get(version)
555 if unbundlerclass is None:
559 if unbundlerclass is None:
556 raise util.Abort(_('unknown bundle version %s') % version)
560 raise util.Abort(_('unknown bundle version %s') % version)
557 unbundler = unbundlerclass(ui, fp)
561 unbundler = unbundlerclass(ui, fp)
558 ui.debug('start processing of %s stream\n' % header)
562 ui.debug('start processing of %s stream\n' % header)
559 return unbundler
563 return unbundler
560
564
561 class unbundle20(unpackermixin):
565 class unbundle20(unpackermixin):
562 """interpret a bundle2 stream
566 """interpret a bundle2 stream
563
567
564 This class is fed with a binary stream and yields parts through its
568 This class is fed with a binary stream and yields parts through its
565 `iterparts` methods."""
569 `iterparts` methods."""
566
570
567 def __init__(self, ui, fp):
571 def __init__(self, ui, fp):
568 """If header is specified, we do not read it out of the stream."""
572 """If header is specified, we do not read it out of the stream."""
569 self.ui = ui
573 self.ui = ui
570 super(unbundle20, self).__init__(fp)
574 super(unbundle20, self).__init__(fp)
571
575
572 @util.propertycache
576 @util.propertycache
573 def params(self):
577 def params(self):
574 """dictionary of stream level parameters"""
578 """dictionary of stream level parameters"""
575 self.ui.debug('reading bundle2 stream parameters\n')
579 self.ui.debug('reading bundle2 stream parameters\n')
576 params = {}
580 params = {}
577 paramssize = self._unpack(_fstreamparamsize)[0]
581 paramssize = self._unpack(_fstreamparamsize)[0]
578 if paramssize < 0:
582 if paramssize < 0:
579 raise error.BundleValueError('negative bundle param size: %i'
583 raise error.BundleValueError('negative bundle param size: %i'
580 % paramssize)
584 % paramssize)
581 if paramssize:
585 if paramssize:
582 for p in self._readexact(paramssize).split(' '):
586 for p in self._readexact(paramssize).split(' '):
583 p = p.split('=', 1)
587 p = p.split('=', 1)
584 p = [urllib.unquote(i) for i in p]
588 p = [urllib.unquote(i) for i in p]
585 if len(p) < 2:
589 if len(p) < 2:
586 p.append(None)
590 p.append(None)
587 self._processparam(*p)
591 self._processparam(*p)
588 params[p[0]] = p[1]
592 params[p[0]] = p[1]
589 return params
593 return params
590
594
591 def _processparam(self, name, value):
595 def _processparam(self, name, value):
592 """process a parameter, applying its effect if needed
596 """process a parameter, applying its effect if needed
593
597
594 Parameter starting with a lower case letter are advisory and will be
598 Parameter starting with a lower case letter are advisory and will be
595 ignored when unknown. Those starting with an upper case letter are
599 ignored when unknown. Those starting with an upper case letter are
596 mandatory and will this function will raise a KeyError when unknown.
600 mandatory and will this function will raise a KeyError when unknown.
597
601
598 Note: no option are currently supported. Any input will be either
602 Note: no option are currently supported. Any input will be either
599 ignored or failing.
603 ignored or failing.
600 """
604 """
601 if not name:
605 if not name:
602 raise ValueError('empty parameter name')
606 raise ValueError('empty parameter name')
603 if name[0] not in string.letters:
607 if name[0] not in string.letters:
604 raise ValueError('non letter first character: %r' % name)
608 raise ValueError('non letter first character: %r' % name)
605 # Some logic will be later added here to try to process the option for
609 # Some logic will be later added here to try to process the option for
606 # a dict of known parameter.
610 # a dict of known parameter.
607 if name[0].islower():
611 if name[0].islower():
608 self.ui.debug("ignoring unknown parameter %r\n" % name)
612 self.ui.debug("ignoring unknown parameter %r\n" % name)
609 else:
613 else:
610 raise error.UnsupportedPartError(params=(name,))
614 raise error.UnsupportedPartError(params=(name,))
611
615
612
616
613 def iterparts(self):
617 def iterparts(self):
614 """yield all parts contained in the stream"""
618 """yield all parts contained in the stream"""
615 # make sure param have been loaded
619 # make sure param have been loaded
616 self.params
620 self.params
617 self.ui.debug('start extraction of bundle2 parts\n')
621 self.ui.debug('start extraction of bundle2 parts\n')
618 headerblock = self._readpartheader()
622 headerblock = self._readpartheader()
619 while headerblock is not None:
623 while headerblock is not None:
620 part = unbundlepart(self.ui, headerblock, self._fp)
624 part = unbundlepart(self.ui, headerblock, self._fp)
621 yield part
625 yield part
622 part.seek(0, 2)
626 part.seek(0, 2)
623 headerblock = self._readpartheader()
627 headerblock = self._readpartheader()
624 self.ui.debug('end of bundle2 stream\n')
628 self.ui.debug('end of bundle2 stream\n')
625
629
626 def _readpartheader(self):
630 def _readpartheader(self):
627 """reads a part header size and return the bytes blob
631 """reads a part header size and return the bytes blob
628
632
629 returns None if empty"""
633 returns None if empty"""
630 headersize = self._unpack(_fpartheadersize)[0]
634 headersize = self._unpack(_fpartheadersize)[0]
631 if headersize < 0:
635 if headersize < 0:
632 raise error.BundleValueError('negative part header size: %i'
636 raise error.BundleValueError('negative part header size: %i'
633 % headersize)
637 % headersize)
634 self.ui.debug('part header size: %i\n' % headersize)
638 self.ui.debug('part header size: %i\n' % headersize)
635 if headersize:
639 if headersize:
636 return self._readexact(headersize)
640 return self._readexact(headersize)
637 return None
641 return None
638
642
639 def compressed(self):
643 def compressed(self):
640 return False
644 return False
641
645
642 formatmap = {'20': unbundle20}
646 formatmap = {'20': unbundle20}
643
647
644 class bundlepart(object):
648 class bundlepart(object):
645 """A bundle2 part contains application level payload
649 """A bundle2 part contains application level payload
646
650
647 The part `type` is used to route the part to the application level
651 The part `type` is used to route the part to the application level
648 handler.
652 handler.
649
653
650 The part payload is contained in ``part.data``. It could be raw bytes or a
654 The part payload is contained in ``part.data``. It could be raw bytes or a
651 generator of byte chunks.
655 generator of byte chunks.
652
656
653 You can add parameters to the part using the ``addparam`` method.
657 You can add parameters to the part using the ``addparam`` method.
654 Parameters can be either mandatory (default) or advisory. Remote side
658 Parameters can be either mandatory (default) or advisory. Remote side
655 should be able to safely ignore the advisory ones.
659 should be able to safely ignore the advisory ones.
656
660
657 Both data and parameters cannot be modified after the generation has begun.
661 Both data and parameters cannot be modified after the generation has begun.
658 """
662 """
659
663
660 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
664 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
661 data='', mandatory=True):
665 data='', mandatory=True):
662 validateparttype(parttype)
666 validateparttype(parttype)
663 self.id = None
667 self.id = None
664 self.type = parttype
668 self.type = parttype
665 self._data = data
669 self._data = data
666 self._mandatoryparams = list(mandatoryparams)
670 self._mandatoryparams = list(mandatoryparams)
667 self._advisoryparams = list(advisoryparams)
671 self._advisoryparams = list(advisoryparams)
668 # checking for duplicated entries
672 # checking for duplicated entries
669 self._seenparams = set()
673 self._seenparams = set()
670 for pname, __ in self._mandatoryparams + self._advisoryparams:
674 for pname, __ in self._mandatoryparams + self._advisoryparams:
671 if pname in self._seenparams:
675 if pname in self._seenparams:
672 raise RuntimeError('duplicated params: %s' % pname)
676 raise RuntimeError('duplicated params: %s' % pname)
673 self._seenparams.add(pname)
677 self._seenparams.add(pname)
674 # status of the part's generation:
678 # status of the part's generation:
675 # - None: not started,
679 # - None: not started,
676 # - False: currently generated,
680 # - False: currently generated,
677 # - True: generation done.
681 # - True: generation done.
678 self._generated = None
682 self._generated = None
679 self.mandatory = mandatory
683 self.mandatory = mandatory
680
684
681 def copy(self):
685 def copy(self):
682 """return a copy of the part
686 """return a copy of the part
683
687
684 The new part have the very same content but no partid assigned yet.
688 The new part have the very same content but no partid assigned yet.
685 Parts with generated data cannot be copied."""
689 Parts with generated data cannot be copied."""
686 assert not util.safehasattr(self.data, 'next')
690 assert not util.safehasattr(self.data, 'next')
687 return self.__class__(self.type, self._mandatoryparams,
691 return self.__class__(self.type, self._mandatoryparams,
688 self._advisoryparams, self._data, self.mandatory)
692 self._advisoryparams, self._data, self.mandatory)
689
693
690 # methods used to defines the part content
694 # methods used to defines the part content
691 def __setdata(self, data):
695 def __setdata(self, data):
692 if self._generated is not None:
696 if self._generated is not None:
693 raise error.ReadOnlyPartError('part is being generated')
697 raise error.ReadOnlyPartError('part is being generated')
694 self._data = data
698 self._data = data
695 def __getdata(self):
699 def __getdata(self):
696 return self._data
700 return self._data
697 data = property(__getdata, __setdata)
701 data = property(__getdata, __setdata)
698
702
699 @property
703 @property
700 def mandatoryparams(self):
704 def mandatoryparams(self):
701 # make it an immutable tuple to force people through ``addparam``
705 # make it an immutable tuple to force people through ``addparam``
702 return tuple(self._mandatoryparams)
706 return tuple(self._mandatoryparams)
703
707
704 @property
708 @property
705 def advisoryparams(self):
709 def advisoryparams(self):
706 # make it an immutable tuple to force people through ``addparam``
710 # make it an immutable tuple to force people through ``addparam``
707 return tuple(self._advisoryparams)
711 return tuple(self._advisoryparams)
708
712
709 def addparam(self, name, value='', mandatory=True):
713 def addparam(self, name, value='', mandatory=True):
710 if self._generated is not None:
714 if self._generated is not None:
711 raise error.ReadOnlyPartError('part is being generated')
715 raise error.ReadOnlyPartError('part is being generated')
712 if name in self._seenparams:
716 if name in self._seenparams:
713 raise ValueError('duplicated params: %s' % name)
717 raise ValueError('duplicated params: %s' % name)
714 self._seenparams.add(name)
718 self._seenparams.add(name)
715 params = self._advisoryparams
719 params = self._advisoryparams
716 if mandatory:
720 if mandatory:
717 params = self._mandatoryparams
721 params = self._mandatoryparams
718 params.append((name, value))
722 params.append((name, value))
719
723
720 # methods used to generates the bundle2 stream
724 # methods used to generates the bundle2 stream
721 def getchunks(self):
725 def getchunks(self):
722 if self._generated is not None:
726 if self._generated is not None:
723 raise RuntimeError('part can only be consumed once')
727 raise RuntimeError('part can only be consumed once')
724 self._generated = False
728 self._generated = False
725 #### header
729 #### header
726 if self.mandatory:
730 if self.mandatory:
727 parttype = self.type.upper()
731 parttype = self.type.upper()
728 else:
732 else:
729 parttype = self.type.lower()
733 parttype = self.type.lower()
730 ## parttype
734 ## parttype
731 header = [_pack(_fparttypesize, len(parttype)),
735 header = [_pack(_fparttypesize, len(parttype)),
732 parttype, _pack(_fpartid, self.id),
736 parttype, _pack(_fpartid, self.id),
733 ]
737 ]
734 ## parameters
738 ## parameters
735 # count
739 # count
736 manpar = self.mandatoryparams
740 manpar = self.mandatoryparams
737 advpar = self.advisoryparams
741 advpar = self.advisoryparams
738 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
742 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
739 # size
743 # size
740 parsizes = []
744 parsizes = []
741 for key, value in manpar:
745 for key, value in manpar:
742 parsizes.append(len(key))
746 parsizes.append(len(key))
743 parsizes.append(len(value))
747 parsizes.append(len(value))
744 for key, value in advpar:
748 for key, value in advpar:
745 parsizes.append(len(key))
749 parsizes.append(len(key))
746 parsizes.append(len(value))
750 parsizes.append(len(value))
747 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
751 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
748 header.append(paramsizes)
752 header.append(paramsizes)
749 # key, value
753 # key, value
750 for key, value in manpar:
754 for key, value in manpar:
751 header.append(key)
755 header.append(key)
752 header.append(value)
756 header.append(value)
753 for key, value in advpar:
757 for key, value in advpar:
754 header.append(key)
758 header.append(key)
755 header.append(value)
759 header.append(value)
756 ## finalize header
760 ## finalize header
757 headerchunk = ''.join(header)
761 headerchunk = ''.join(header)
758 yield _pack(_fpartheadersize, len(headerchunk))
762 yield _pack(_fpartheadersize, len(headerchunk))
759 yield headerchunk
763 yield headerchunk
760 ## payload
764 ## payload
761 try:
765 try:
762 for chunk in self._payloadchunks():
766 for chunk in self._payloadchunks():
763 yield _pack(_fpayloadsize, len(chunk))
767 yield _pack(_fpayloadsize, len(chunk))
764 yield chunk
768 yield chunk
765 except BaseException, exc:
769 except BaseException, exc:
766 # backup exception data for later
770 # backup exception data for later
767 exc_info = sys.exc_info()
771 exc_info = sys.exc_info()
768 msg = 'unexpected error: %s' % exc
772 msg = 'unexpected error: %s' % exc
769 interpart = bundlepart('error:abort', [('message', msg)],
773 interpart = bundlepart('error:abort', [('message', msg)],
770 mandatory=False)
774 mandatory=False)
771 interpart.id = 0
775 interpart.id = 0
772 yield _pack(_fpayloadsize, -1)
776 yield _pack(_fpayloadsize, -1)
773 for chunk in interpart.getchunks():
777 for chunk in interpart.getchunks():
774 yield chunk
778 yield chunk
775 # abort current part payload
779 # abort current part payload
776 yield _pack(_fpayloadsize, 0)
780 yield _pack(_fpayloadsize, 0)
777 raise exc_info[0], exc_info[1], exc_info[2]
781 raise exc_info[0], exc_info[1], exc_info[2]
778 # end of payload
782 # end of payload
779 yield _pack(_fpayloadsize, 0)
783 yield _pack(_fpayloadsize, 0)
780 self._generated = True
784 self._generated = True
781
785
782 def _payloadchunks(self):
786 def _payloadchunks(self):
783 """yield chunks of a the part payload
787 """yield chunks of a the part payload
784
788
785 Exists to handle the different methods to provide data to a part."""
789 Exists to handle the different methods to provide data to a part."""
786 # we only support fixed size data now.
790 # we only support fixed size data now.
787 # This will be improved in the future.
791 # This will be improved in the future.
788 if util.safehasattr(self.data, 'next'):
792 if util.safehasattr(self.data, 'next'):
789 buff = util.chunkbuffer(self.data)
793 buff = util.chunkbuffer(self.data)
790 chunk = buff.read(preferedchunksize)
794 chunk = buff.read(preferedchunksize)
791 while chunk:
795 while chunk:
792 yield chunk
796 yield chunk
793 chunk = buff.read(preferedchunksize)
797 chunk = buff.read(preferedchunksize)
794 elif len(self.data):
798 elif len(self.data):
795 yield self.data
799 yield self.data
796
800
797
801
798 flaginterrupt = -1
802 flaginterrupt = -1
799
803
800 class interrupthandler(unpackermixin):
804 class interrupthandler(unpackermixin):
801 """read one part and process it with restricted capability
805 """read one part and process it with restricted capability
802
806
803 This allows to transmit exception raised on the producer size during part
807 This allows to transmit exception raised on the producer size during part
804 iteration while the consumer is reading a part.
808 iteration while the consumer is reading a part.
805
809
806 Part processed in this manner only have access to a ui object,"""
810 Part processed in this manner only have access to a ui object,"""
807
811
808 def __init__(self, ui, fp):
812 def __init__(self, ui, fp):
809 super(interrupthandler, self).__init__(fp)
813 super(interrupthandler, self).__init__(fp)
810 self.ui = ui
814 self.ui = ui
811
815
812 def _readpartheader(self):
816 def _readpartheader(self):
813 """reads a part header size and return the bytes blob
817 """reads a part header size and return the bytes blob
814
818
815 returns None if empty"""
819 returns None if empty"""
816 headersize = self._unpack(_fpartheadersize)[0]
820 headersize = self._unpack(_fpartheadersize)[0]
817 if headersize < 0:
821 if headersize < 0:
818 raise error.BundleValueError('negative part header size: %i'
822 raise error.BundleValueError('negative part header size: %i'
819 % headersize)
823 % headersize)
820 self.ui.debug('part header size: %i\n' % headersize)
824 self.ui.debug('part header size: %i\n' % headersize)
821 if headersize:
825 if headersize:
822 return self._readexact(headersize)
826 return self._readexact(headersize)
823 return None
827 return None
824
828
825 def __call__(self):
829 def __call__(self):
826 self.ui.debug('bundle2 stream interruption, looking for a part.\n')
830 self.ui.debug('bundle2 stream interruption, looking for a part.\n')
827 headerblock = self._readpartheader()
831 headerblock = self._readpartheader()
828 if headerblock is None:
832 if headerblock is None:
829 self.ui.debug('no part found during interruption.\n')
833 self.ui.debug('no part found during interruption.\n')
830 return
834 return
831 part = unbundlepart(self.ui, headerblock, self._fp)
835 part = unbundlepart(self.ui, headerblock, self._fp)
832 op = interruptoperation(self.ui)
836 op = interruptoperation(self.ui)
833 _processpart(op, part)
837 _processpart(op, part)
834
838
835 class interruptoperation(object):
839 class interruptoperation(object):
836 """A limited operation to be use by part handler during interruption
840 """A limited operation to be use by part handler during interruption
837
841
838 It only have access to an ui object.
842 It only have access to an ui object.
839 """
843 """
840
844
841 def __init__(self, ui):
845 def __init__(self, ui):
842 self.ui = ui
846 self.ui = ui
843 self.reply = None
847 self.reply = None
844 self.captureoutput = False
848 self.captureoutput = False
845
849
846 @property
850 @property
847 def repo(self):
851 def repo(self):
848 raise RuntimeError('no repo access from stream interruption')
852 raise RuntimeError('no repo access from stream interruption')
849
853
850 def gettransaction(self):
854 def gettransaction(self):
851 raise TransactionUnavailable('no repo access from stream interruption')
855 raise TransactionUnavailable('no repo access from stream interruption')
852
856
853 class unbundlepart(unpackermixin):
857 class unbundlepart(unpackermixin):
854 """a bundle part read from a bundle"""
858 """a bundle part read from a bundle"""
855
859
856 def __init__(self, ui, header, fp):
860 def __init__(self, ui, header, fp):
857 super(unbundlepart, self).__init__(fp)
861 super(unbundlepart, self).__init__(fp)
858 self.ui = ui
862 self.ui = ui
859 # unbundle state attr
863 # unbundle state attr
860 self._headerdata = header
864 self._headerdata = header
861 self._headeroffset = 0
865 self._headeroffset = 0
862 self._initialized = False
866 self._initialized = False
863 self.consumed = False
867 self.consumed = False
864 # part data
868 # part data
865 self.id = None
869 self.id = None
866 self.type = None
870 self.type = None
867 self.mandatoryparams = None
871 self.mandatoryparams = None
868 self.advisoryparams = None
872 self.advisoryparams = None
869 self.params = None
873 self.params = None
870 self.mandatorykeys = ()
874 self.mandatorykeys = ()
871 self._payloadstream = None
875 self._payloadstream = None
872 self._readheader()
876 self._readheader()
873 self._mandatory = None
877 self._mandatory = None
874 self._chunkindex = [] #(payload, file) position tuples for chunk starts
878 self._chunkindex = [] #(payload, file) position tuples for chunk starts
875 self._pos = 0
879 self._pos = 0
876
880
877 def _fromheader(self, size):
881 def _fromheader(self, size):
878 """return the next <size> byte from the header"""
882 """return the next <size> byte from the header"""
879 offset = self._headeroffset
883 offset = self._headeroffset
880 data = self._headerdata[offset:(offset + size)]
884 data = self._headerdata[offset:(offset + size)]
881 self._headeroffset = offset + size
885 self._headeroffset = offset + size
882 return data
886 return data
883
887
884 def _unpackheader(self, format):
888 def _unpackheader(self, format):
885 """read given format from header
889 """read given format from header
886
890
887 This automatically compute the size of the format to read."""
891 This automatically compute the size of the format to read."""
888 data = self._fromheader(struct.calcsize(format))
892 data = self._fromheader(struct.calcsize(format))
889 return _unpack(format, data)
893 return _unpack(format, data)
890
894
891 def _initparams(self, mandatoryparams, advisoryparams):
895 def _initparams(self, mandatoryparams, advisoryparams):
892 """internal function to setup all logic related parameters"""
896 """internal function to setup all logic related parameters"""
893 # make it read only to prevent people touching it by mistake.
897 # make it read only to prevent people touching it by mistake.
894 self.mandatoryparams = tuple(mandatoryparams)
898 self.mandatoryparams = tuple(mandatoryparams)
895 self.advisoryparams = tuple(advisoryparams)
899 self.advisoryparams = tuple(advisoryparams)
896 # user friendly UI
900 # user friendly UI
897 self.params = dict(self.mandatoryparams)
901 self.params = dict(self.mandatoryparams)
898 self.params.update(dict(self.advisoryparams))
902 self.params.update(dict(self.advisoryparams))
899 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
903 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
900
904
901 def _payloadchunks(self, chunknum=0):
905 def _payloadchunks(self, chunknum=0):
902 '''seek to specified chunk and start yielding data'''
906 '''seek to specified chunk and start yielding data'''
903 if len(self._chunkindex) == 0:
907 if len(self._chunkindex) == 0:
904 assert chunknum == 0, 'Must start with chunk 0'
908 assert chunknum == 0, 'Must start with chunk 0'
905 self._chunkindex.append((0, super(unbundlepart, self).tell()))
909 self._chunkindex.append((0, super(unbundlepart, self).tell()))
906 else:
910 else:
907 assert chunknum < len(self._chunkindex), \
911 assert chunknum < len(self._chunkindex), \
908 'Unknown chunk %d' % chunknum
912 'Unknown chunk %d' % chunknum
909 super(unbundlepart, self).seek(self._chunkindex[chunknum][1])
913 super(unbundlepart, self).seek(self._chunkindex[chunknum][1])
910
914
911 pos = self._chunkindex[chunknum][0]
915 pos = self._chunkindex[chunknum][0]
912 payloadsize = self._unpack(_fpayloadsize)[0]
916 payloadsize = self._unpack(_fpayloadsize)[0]
913 self.ui.debug('payload chunk size: %i\n' % payloadsize)
917 self.ui.debug('payload chunk size: %i\n' % payloadsize)
914 while payloadsize:
918 while payloadsize:
915 if payloadsize == flaginterrupt:
919 if payloadsize == flaginterrupt:
916 # interruption detection, the handler will now read a
920 # interruption detection, the handler will now read a
917 # single part and process it.
921 # single part and process it.
918 interrupthandler(self.ui, self._fp)()
922 interrupthandler(self.ui, self._fp)()
919 elif payloadsize < 0:
923 elif payloadsize < 0:
920 msg = 'negative payload chunk size: %i' % payloadsize
924 msg = 'negative payload chunk size: %i' % payloadsize
921 raise error.BundleValueError(msg)
925 raise error.BundleValueError(msg)
922 else:
926 else:
923 result = self._readexact(payloadsize)
927 result = self._readexact(payloadsize)
924 chunknum += 1
928 chunknum += 1
925 pos += payloadsize
929 pos += payloadsize
926 if chunknum == len(self._chunkindex):
930 if chunknum == len(self._chunkindex):
927 self._chunkindex.append((pos,
931 self._chunkindex.append((pos,
928 super(unbundlepart, self).tell()))
932 super(unbundlepart, self).tell()))
929 yield result
933 yield result
930 payloadsize = self._unpack(_fpayloadsize)[0]
934 payloadsize = self._unpack(_fpayloadsize)[0]
931 self.ui.debug('payload chunk size: %i\n' % payloadsize)
935 self.ui.debug('payload chunk size: %i\n' % payloadsize)
932
936
933 def _findchunk(self, pos):
937 def _findchunk(self, pos):
934 '''for a given payload position, return a chunk number and offset'''
938 '''for a given payload position, return a chunk number and offset'''
935 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
939 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
936 if ppos == pos:
940 if ppos == pos:
937 return chunk, 0
941 return chunk, 0
938 elif ppos > pos:
942 elif ppos > pos:
939 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
943 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
940 raise ValueError('Unknown chunk')
944 raise ValueError('Unknown chunk')
941
945
942 def _readheader(self):
946 def _readheader(self):
943 """read the header and setup the object"""
947 """read the header and setup the object"""
944 typesize = self._unpackheader(_fparttypesize)[0]
948 typesize = self._unpackheader(_fparttypesize)[0]
945 self.type = self._fromheader(typesize)
949 self.type = self._fromheader(typesize)
946 self.ui.debug('part type: "%s"\n' % self.type)
950 self.ui.debug('part type: "%s"\n' % self.type)
947 self.id = self._unpackheader(_fpartid)[0]
951 self.id = self._unpackheader(_fpartid)[0]
948 self.ui.debug('part id: "%s"\n' % self.id)
952 self.ui.debug('part id: "%s"\n' % self.id)
949 # extract mandatory bit from type
953 # extract mandatory bit from type
950 self.mandatory = (self.type != self.type.lower())
954 self.mandatory = (self.type != self.type.lower())
951 self.type = self.type.lower()
955 self.type = self.type.lower()
952 ## reading parameters
956 ## reading parameters
953 # param count
957 # param count
954 mancount, advcount = self._unpackheader(_fpartparamcount)
958 mancount, advcount = self._unpackheader(_fpartparamcount)
955 self.ui.debug('part parameters: %i\n' % (mancount + advcount))
959 self.ui.debug('part parameters: %i\n' % (mancount + advcount))
956 # param size
960 # param size
957 fparamsizes = _makefpartparamsizes(mancount + advcount)
961 fparamsizes = _makefpartparamsizes(mancount + advcount)
958 paramsizes = self._unpackheader(fparamsizes)
962 paramsizes = self._unpackheader(fparamsizes)
959 # make it a list of couple again
963 # make it a list of couple again
960 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
964 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
961 # split mandatory from advisory
965 # split mandatory from advisory
962 mansizes = paramsizes[:mancount]
966 mansizes = paramsizes[:mancount]
963 advsizes = paramsizes[mancount:]
967 advsizes = paramsizes[mancount:]
964 # retrieve param value
968 # retrieve param value
965 manparams = []
969 manparams = []
966 for key, value in mansizes:
970 for key, value in mansizes:
967 manparams.append((self._fromheader(key), self._fromheader(value)))
971 manparams.append((self._fromheader(key), self._fromheader(value)))
968 advparams = []
972 advparams = []
969 for key, value in advsizes:
973 for key, value in advsizes:
970 advparams.append((self._fromheader(key), self._fromheader(value)))
974 advparams.append((self._fromheader(key), self._fromheader(value)))
971 self._initparams(manparams, advparams)
975 self._initparams(manparams, advparams)
972 ## part payload
976 ## part payload
973 self._payloadstream = util.chunkbuffer(self._payloadchunks())
977 self._payloadstream = util.chunkbuffer(self._payloadchunks())
974 # we read the data, tell it
978 # we read the data, tell it
975 self._initialized = True
979 self._initialized = True
976
980
977 def read(self, size=None):
981 def read(self, size=None):
978 """read payload data"""
982 """read payload data"""
979 if not self._initialized:
983 if not self._initialized:
980 self._readheader()
984 self._readheader()
981 if size is None:
985 if size is None:
982 data = self._payloadstream.read()
986 data = self._payloadstream.read()
983 else:
987 else:
984 data = self._payloadstream.read(size)
988 data = self._payloadstream.read(size)
985 if size is None or len(data) < size:
989 if size is None or len(data) < size:
986 self.consumed = True
990 self.consumed = True
987 self._pos += len(data)
991 self._pos += len(data)
988 return data
992 return data
989
993
990 def tell(self):
994 def tell(self):
991 return self._pos
995 return self._pos
992
996
993 def seek(self, offset, whence=0):
997 def seek(self, offset, whence=0):
994 if whence == 0:
998 if whence == 0:
995 newpos = offset
999 newpos = offset
996 elif whence == 1:
1000 elif whence == 1:
997 newpos = self._pos + offset
1001 newpos = self._pos + offset
998 elif whence == 2:
1002 elif whence == 2:
999 if not self.consumed:
1003 if not self.consumed:
1000 self.read()
1004 self.read()
1001 newpos = self._chunkindex[-1][0] - offset
1005 newpos = self._chunkindex[-1][0] - offset
1002 else:
1006 else:
1003 raise ValueError('Unknown whence value: %r' % (whence,))
1007 raise ValueError('Unknown whence value: %r' % (whence,))
1004
1008
1005 if newpos > self._chunkindex[-1][0] and not self.consumed:
1009 if newpos > self._chunkindex[-1][0] and not self.consumed:
1006 self.read()
1010 self.read()
1007 if not 0 <= newpos <= self._chunkindex[-1][0]:
1011 if not 0 <= newpos <= self._chunkindex[-1][0]:
1008 raise ValueError('Offset out of range')
1012 raise ValueError('Offset out of range')
1009
1013
1010 if self._pos != newpos:
1014 if self._pos != newpos:
1011 chunk, internaloffset = self._findchunk(newpos)
1015 chunk, internaloffset = self._findchunk(newpos)
1012 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1016 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1013 adjust = self.read(internaloffset)
1017 adjust = self.read(internaloffset)
1014 if len(adjust) != internaloffset:
1018 if len(adjust) != internaloffset:
1015 raise util.Abort(_('Seek failed\n'))
1019 raise util.Abort(_('Seek failed\n'))
1016 self._pos = newpos
1020 self._pos = newpos
1017
1021
1018 capabilities = {'HG20': (),
1022 capabilities = {'HG20': (),
1019 'listkeys': (),
1023 'listkeys': (),
1020 'pushkey': (),
1024 'pushkey': (),
1021 'digests': tuple(sorted(util.DIGESTS.keys())),
1025 'digests': tuple(sorted(util.DIGESTS.keys())),
1022 'remote-changegroup': ('http', 'https'),
1026 'remote-changegroup': ('http', 'https'),
1023 }
1027 }
1024
1028
1025 def getrepocaps(repo, allowpushback=False):
1029 def getrepocaps(repo, allowpushback=False):
1026 """return the bundle2 capabilities for a given repo
1030 """return the bundle2 capabilities for a given repo
1027
1031
1028 Exists to allow extensions (like evolution) to mutate the capabilities.
1032 Exists to allow extensions (like evolution) to mutate the capabilities.
1029 """
1033 """
1030 caps = capabilities.copy()
1034 caps = capabilities.copy()
1031 caps['changegroup'] = tuple(sorted(changegroup.packermap.keys()))
1035 caps['changegroup'] = tuple(sorted(changegroup.packermap.keys()))
1032 if obsolete.isenabled(repo, obsolete.exchangeopt):
1036 if obsolete.isenabled(repo, obsolete.exchangeopt):
1033 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1037 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1034 caps['obsmarkers'] = supportedformat
1038 caps['obsmarkers'] = supportedformat
1035 if allowpushback:
1039 if allowpushback:
1036 caps['pushback'] = ()
1040 caps['pushback'] = ()
1037 return caps
1041 return caps
1038
1042
1039 def bundle2caps(remote):
1043 def bundle2caps(remote):
1040 """return the bundle capabilities of a peer as dict"""
1044 """return the bundle capabilities of a peer as dict"""
1041 raw = remote.capable('bundle2')
1045 raw = remote.capable('bundle2')
1042 if not raw and raw != '':
1046 if not raw and raw != '':
1043 return {}
1047 return {}
1044 capsblob = urllib.unquote(remote.capable('bundle2'))
1048 capsblob = urllib.unquote(remote.capable('bundle2'))
1045 return decodecaps(capsblob)
1049 return decodecaps(capsblob)
1046
1050
1047 def obsmarkersversion(caps):
1051 def obsmarkersversion(caps):
1048 """extract the list of supported obsmarkers versions from a bundle2caps dict
1052 """extract the list of supported obsmarkers versions from a bundle2caps dict
1049 """
1053 """
1050 obscaps = caps.get('obsmarkers', ())
1054 obscaps = caps.get('obsmarkers', ())
1051 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1055 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1052
1056
1053 @parthandler('changegroup', ('version',))
1057 @parthandler('changegroup', ('version',))
1054 def handlechangegroup(op, inpart):
1058 def handlechangegroup(op, inpart):
1055 """apply a changegroup part on the repo
1059 """apply a changegroup part on the repo
1056
1060
1057 This is a very early implementation that will massive rework before being
1061 This is a very early implementation that will massive rework before being
1058 inflicted to any end-user.
1062 inflicted to any end-user.
1059 """
1063 """
1060 # Make sure we trigger a transaction creation
1064 # Make sure we trigger a transaction creation
1061 #
1065 #
1062 # The addchangegroup function will get a transaction object by itself, but
1066 # The addchangegroup function will get a transaction object by itself, but
1063 # we need to make sure we trigger the creation of a transaction object used
1067 # we need to make sure we trigger the creation of a transaction object used
1064 # for the whole processing scope.
1068 # for the whole processing scope.
1065 op.gettransaction()
1069 op.gettransaction()
1066 unpackerversion = inpart.params.get('version', '01')
1070 unpackerversion = inpart.params.get('version', '01')
1067 # We should raise an appropriate exception here
1071 # We should raise an appropriate exception here
1068 unpacker = changegroup.packermap[unpackerversion][1]
1072 unpacker = changegroup.packermap[unpackerversion][1]
1069 cg = unpacker(inpart, 'UN')
1073 cg = unpacker(inpart, 'UN')
1070 # the source and url passed here are overwritten by the one contained in
1074 # the source and url passed here are overwritten by the one contained in
1071 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1075 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1072 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
1076 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
1073 op.records.add('changegroup', {'return': ret})
1077 op.records.add('changegroup', {'return': ret})
1074 if op.reply is not None:
1078 if op.reply is not None:
1075 # This is definitely not the final form of this
1079 # This is definitely not the final form of this
1076 # return. But one need to start somewhere.
1080 # return. But one need to start somewhere.
1077 part = op.reply.newpart('reply:changegroup', mandatory=False)
1081 part = op.reply.newpart('reply:changegroup', mandatory=False)
1078 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1082 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1079 part.addparam('return', '%i' % ret, mandatory=False)
1083 part.addparam('return', '%i' % ret, mandatory=False)
1080 assert not inpart.read()
1084 assert not inpart.read()
1081
1085
1082 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1086 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1083 ['digest:%s' % k for k in util.DIGESTS.keys()])
1087 ['digest:%s' % k for k in util.DIGESTS.keys()])
1084 @parthandler('remote-changegroup', _remotechangegroupparams)
1088 @parthandler('remote-changegroup', _remotechangegroupparams)
1085 def handleremotechangegroup(op, inpart):
1089 def handleremotechangegroup(op, inpart):
1086 """apply a bundle10 on the repo, given an url and validation information
1090 """apply a bundle10 on the repo, given an url and validation information
1087
1091
1088 All the information about the remote bundle to import are given as
1092 All the information about the remote bundle to import are given as
1089 parameters. The parameters include:
1093 parameters. The parameters include:
1090 - url: the url to the bundle10.
1094 - url: the url to the bundle10.
1091 - size: the bundle10 file size. It is used to validate what was
1095 - size: the bundle10 file size. It is used to validate what was
1092 retrieved by the client matches the server knowledge about the bundle.
1096 retrieved by the client matches the server knowledge about the bundle.
1093 - digests: a space separated list of the digest types provided as
1097 - digests: a space separated list of the digest types provided as
1094 parameters.
1098 parameters.
1095 - digest:<digest-type>: the hexadecimal representation of the digest with
1099 - digest:<digest-type>: the hexadecimal representation of the digest with
1096 that name. Like the size, it is used to validate what was retrieved by
1100 that name. Like the size, it is used to validate what was retrieved by
1097 the client matches what the server knows about the bundle.
1101 the client matches what the server knows about the bundle.
1098
1102
1099 When multiple digest types are given, all of them are checked.
1103 When multiple digest types are given, all of them are checked.
1100 """
1104 """
1101 try:
1105 try:
1102 raw_url = inpart.params['url']
1106 raw_url = inpart.params['url']
1103 except KeyError:
1107 except KeyError:
1104 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1108 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1105 parsed_url = util.url(raw_url)
1109 parsed_url = util.url(raw_url)
1106 if parsed_url.scheme not in capabilities['remote-changegroup']:
1110 if parsed_url.scheme not in capabilities['remote-changegroup']:
1107 raise util.Abort(_('remote-changegroup does not support %s urls') %
1111 raise util.Abort(_('remote-changegroup does not support %s urls') %
1108 parsed_url.scheme)
1112 parsed_url.scheme)
1109
1113
1110 try:
1114 try:
1111 size = int(inpart.params['size'])
1115 size = int(inpart.params['size'])
1112 except ValueError:
1116 except ValueError:
1113 raise util.Abort(_('remote-changegroup: invalid value for param "%s"')
1117 raise util.Abort(_('remote-changegroup: invalid value for param "%s"')
1114 % 'size')
1118 % 'size')
1115 except KeyError:
1119 except KeyError:
1116 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1120 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1117
1121
1118 digests = {}
1122 digests = {}
1119 for typ in inpart.params.get('digests', '').split():
1123 for typ in inpart.params.get('digests', '').split():
1120 param = 'digest:%s' % typ
1124 param = 'digest:%s' % typ
1121 try:
1125 try:
1122 value = inpart.params[param]
1126 value = inpart.params[param]
1123 except KeyError:
1127 except KeyError:
1124 raise util.Abort(_('remote-changegroup: missing "%s" param') %
1128 raise util.Abort(_('remote-changegroup: missing "%s" param') %
1125 param)
1129 param)
1126 digests[typ] = value
1130 digests[typ] = value
1127
1131
1128 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1132 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1129
1133
1130 # Make sure we trigger a transaction creation
1134 # Make sure we trigger a transaction creation
1131 #
1135 #
1132 # The addchangegroup function will get a transaction object by itself, but
1136 # The addchangegroup function will get a transaction object by itself, but
1133 # we need to make sure we trigger the creation of a transaction object used
1137 # we need to make sure we trigger the creation of a transaction object used
1134 # for the whole processing scope.
1138 # for the whole processing scope.
1135 op.gettransaction()
1139 op.gettransaction()
1136 import exchange
1140 import exchange
1137 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1141 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1138 if not isinstance(cg, changegroup.cg1unpacker):
1142 if not isinstance(cg, changegroup.cg1unpacker):
1139 raise util.Abort(_('%s: not a bundle version 1.0') %
1143 raise util.Abort(_('%s: not a bundle version 1.0') %
1140 util.hidepassword(raw_url))
1144 util.hidepassword(raw_url))
1141 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
1145 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
1142 op.records.add('changegroup', {'return': ret})
1146 op.records.add('changegroup', {'return': ret})
1143 if op.reply is not None:
1147 if op.reply is not None:
1144 # This is definitely not the final form of this
1148 # This is definitely not the final form of this
1145 # return. But one need to start somewhere.
1149 # return. But one need to start somewhere.
1146 part = op.reply.newpart('reply:changegroup')
1150 part = op.reply.newpart('reply:changegroup')
1147 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1151 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1148 part.addparam('return', '%i' % ret, mandatory=False)
1152 part.addparam('return', '%i' % ret, mandatory=False)
1149 try:
1153 try:
1150 real_part.validate()
1154 real_part.validate()
1151 except util.Abort, e:
1155 except util.Abort, e:
1152 raise util.Abort(_('bundle at %s is corrupted:\n%s') %
1156 raise util.Abort(_('bundle at %s is corrupted:\n%s') %
1153 (util.hidepassword(raw_url), str(e)))
1157 (util.hidepassword(raw_url), str(e)))
1154 assert not inpart.read()
1158 assert not inpart.read()
1155
1159
1156 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1160 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1157 def handlereplychangegroup(op, inpart):
1161 def handlereplychangegroup(op, inpart):
1158 ret = int(inpart.params['return'])
1162 ret = int(inpart.params['return'])
1159 replyto = int(inpart.params['in-reply-to'])
1163 replyto = int(inpart.params['in-reply-to'])
1160 op.records.add('changegroup', {'return': ret}, replyto)
1164 op.records.add('changegroup', {'return': ret}, replyto)
1161
1165
1162 @parthandler('check:heads')
1166 @parthandler('check:heads')
1163 def handlecheckheads(op, inpart):
1167 def handlecheckheads(op, inpart):
1164 """check that head of the repo did not change
1168 """check that head of the repo did not change
1165
1169
1166 This is used to detect a push race when using unbundle.
1170 This is used to detect a push race when using unbundle.
1167 This replaces the "heads" argument of unbundle."""
1171 This replaces the "heads" argument of unbundle."""
1168 h = inpart.read(20)
1172 h = inpart.read(20)
1169 heads = []
1173 heads = []
1170 while len(h) == 20:
1174 while len(h) == 20:
1171 heads.append(h)
1175 heads.append(h)
1172 h = inpart.read(20)
1176 h = inpart.read(20)
1173 assert not h
1177 assert not h
1174 if heads != op.repo.heads():
1178 if heads != op.repo.heads():
1175 raise error.PushRaced('repository changed while pushing - '
1179 raise error.PushRaced('repository changed while pushing - '
1176 'please try again')
1180 'please try again')
1177
1181
1178 @parthandler('output')
1182 @parthandler('output')
1179 def handleoutput(op, inpart):
1183 def handleoutput(op, inpart):
1180 """forward output captured on the server to the client"""
1184 """forward output captured on the server to the client"""
1181 for line in inpart.read().splitlines():
1185 for line in inpart.read().splitlines():
1182 op.ui.status(('remote: %s\n' % line))
1186 op.ui.status(('remote: %s\n' % line))
1183
1187
1184 @parthandler('replycaps')
1188 @parthandler('replycaps')
1185 def handlereplycaps(op, inpart):
1189 def handlereplycaps(op, inpart):
1186 """Notify that a reply bundle should be created
1190 """Notify that a reply bundle should be created
1187
1191
1188 The payload contains the capabilities information for the reply"""
1192 The payload contains the capabilities information for the reply"""
1189 caps = decodecaps(inpart.read())
1193 caps = decodecaps(inpart.read())
1190 if op.reply is None:
1194 if op.reply is None:
1191 op.reply = bundle20(op.ui, caps)
1195 op.reply = bundle20(op.ui, caps)
1192
1196
1193 @parthandler('error:abort', ('message', 'hint'))
1197 @parthandler('error:abort', ('message', 'hint'))
1194 def handleerrorabort(op, inpart):
1198 def handleerrorabort(op, inpart):
1195 """Used to transmit abort error over the wire"""
1199 """Used to transmit abort error over the wire"""
1196 raise util.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
1200 raise util.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
1197
1201
1198 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1202 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1199 def handleerrorunsupportedcontent(op, inpart):
1203 def handleerrorunsupportedcontent(op, inpart):
1200 """Used to transmit unknown content error over the wire"""
1204 """Used to transmit unknown content error over the wire"""
1201 kwargs = {}
1205 kwargs = {}
1202 parttype = inpart.params.get('parttype')
1206 parttype = inpart.params.get('parttype')
1203 if parttype is not None:
1207 if parttype is not None:
1204 kwargs['parttype'] = parttype
1208 kwargs['parttype'] = parttype
1205 params = inpart.params.get('params')
1209 params = inpart.params.get('params')
1206 if params is not None:
1210 if params is not None:
1207 kwargs['params'] = params.split('\0')
1211 kwargs['params'] = params.split('\0')
1208
1212
1209 raise error.UnsupportedPartError(**kwargs)
1213 raise error.UnsupportedPartError(**kwargs)
1210
1214
1211 @parthandler('error:pushraced', ('message',))
1215 @parthandler('error:pushraced', ('message',))
1212 def handleerrorpushraced(op, inpart):
1216 def handleerrorpushraced(op, inpart):
1213 """Used to transmit push race error over the wire"""
1217 """Used to transmit push race error over the wire"""
1214 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1218 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1215
1219
1216 @parthandler('listkeys', ('namespace',))
1220 @parthandler('listkeys', ('namespace',))
1217 def handlelistkeys(op, inpart):
1221 def handlelistkeys(op, inpart):
1218 """retrieve pushkey namespace content stored in a bundle2"""
1222 """retrieve pushkey namespace content stored in a bundle2"""
1219 namespace = inpart.params['namespace']
1223 namespace = inpart.params['namespace']
1220 r = pushkey.decodekeys(inpart.read())
1224 r = pushkey.decodekeys(inpart.read())
1221 op.records.add('listkeys', (namespace, r))
1225 op.records.add('listkeys', (namespace, r))
1222
1226
1223 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1227 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1224 def handlepushkey(op, inpart):
1228 def handlepushkey(op, inpart):
1225 """process a pushkey request"""
1229 """process a pushkey request"""
1226 dec = pushkey.decode
1230 dec = pushkey.decode
1227 namespace = dec(inpart.params['namespace'])
1231 namespace = dec(inpart.params['namespace'])
1228 key = dec(inpart.params['key'])
1232 key = dec(inpart.params['key'])
1229 old = dec(inpart.params['old'])
1233 old = dec(inpart.params['old'])
1230 new = dec(inpart.params['new'])
1234 new = dec(inpart.params['new'])
1231 ret = op.repo.pushkey(namespace, key, old, new)
1235 ret = op.repo.pushkey(namespace, key, old, new)
1232 record = {'namespace': namespace,
1236 record = {'namespace': namespace,
1233 'key': key,
1237 'key': key,
1234 'old': old,
1238 'old': old,
1235 'new': new}
1239 'new': new}
1236 op.records.add('pushkey', record)
1240 op.records.add('pushkey', record)
1237 if op.reply is not None:
1241 if op.reply is not None:
1238 rpart = op.reply.newpart('reply:pushkey')
1242 rpart = op.reply.newpart('reply:pushkey')
1239 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1243 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1240 rpart.addparam('return', '%i' % ret, mandatory=False)
1244 rpart.addparam('return', '%i' % ret, mandatory=False)
1241
1245
1242 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1246 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1243 def handlepushkeyreply(op, inpart):
1247 def handlepushkeyreply(op, inpart):
1244 """retrieve the result of a pushkey request"""
1248 """retrieve the result of a pushkey request"""
1245 ret = int(inpart.params['return'])
1249 ret = int(inpart.params['return'])
1246 partid = int(inpart.params['in-reply-to'])
1250 partid = int(inpart.params['in-reply-to'])
1247 op.records.add('pushkey', {'return': ret}, partid)
1251 op.records.add('pushkey', {'return': ret}, partid)
1248
1252
1249 @parthandler('obsmarkers')
1253 @parthandler('obsmarkers')
1250 def handleobsmarker(op, inpart):
1254 def handleobsmarker(op, inpart):
1251 """add a stream of obsmarkers to the repo"""
1255 """add a stream of obsmarkers to the repo"""
1252 tr = op.gettransaction()
1256 tr = op.gettransaction()
1253 markerdata = inpart.read()
1257 markerdata = inpart.read()
1254 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1258 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1255 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1259 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1256 % len(markerdata))
1260 % len(markerdata))
1257 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1261 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1258 if new:
1262 if new:
1259 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1263 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1260 op.records.add('obsmarkers', {'new': new})
1264 op.records.add('obsmarkers', {'new': new})
1261 if op.reply is not None:
1265 if op.reply is not None:
1262 rpart = op.reply.newpart('reply:obsmarkers')
1266 rpart = op.reply.newpart('reply:obsmarkers')
1263 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1267 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1264 rpart.addparam('new', '%i' % new, mandatory=False)
1268 rpart.addparam('new', '%i' % new, mandatory=False)
1265
1269
1266
1270
1267 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1271 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1268 def handlepushkeyreply(op, inpart):
1272 def handlepushkeyreply(op, inpart):
1269 """retrieve the result of a pushkey request"""
1273 """retrieve the result of a pushkey request"""
1270 ret = int(inpart.params['new'])
1274 ret = int(inpart.params['new'])
1271 partid = int(inpart.params['in-reply-to'])
1275 partid = int(inpart.params['in-reply-to'])
1272 op.records.add('obsmarkers', {'new': ret}, partid)
1276 op.records.add('obsmarkers', {'new': ret}, partid)
General Comments 0
You need to be logged in to leave comments. Login now