##// END OF EJS Templates
bundle20: move magic string into the class...
Pierre-Yves David -
r24640:685639f9 default
parent child Browse files
Show More
@@ -1,1227 +1,1227
1 # bundle2.py - generic container format to transmit arbitrary data.
1 # bundle2.py - generic container format to transmit arbitrary data.
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7 """Handling of the new bundle2 format
7 """Handling of the new bundle2 format
8
8
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 payloads in an application agnostic way. It consist in a sequence of "parts"
10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 that will be handed to and processed by the application layer.
11 that will be handed to and processed by the application layer.
12
12
13
13
14 General format architecture
14 General format architecture
15 ===========================
15 ===========================
16
16
17 The format is architectured as follow
17 The format is architectured as follow
18
18
19 - magic string
19 - magic string
20 - stream level parameters
20 - stream level parameters
21 - payload parts (any number)
21 - payload parts (any number)
22 - end of stream marker.
22 - end of stream marker.
23
23
24 the Binary format
24 the Binary format
25 ============================
25 ============================
26
26
27 All numbers are unsigned and big-endian.
27 All numbers are unsigned and big-endian.
28
28
29 stream level parameters
29 stream level parameters
30 ------------------------
30 ------------------------
31
31
32 Binary format is as follow
32 Binary format is as follow
33
33
34 :params size: int32
34 :params size: int32
35
35
36 The total number of Bytes used by the parameters
36 The total number of Bytes used by the parameters
37
37
38 :params value: arbitrary number of Bytes
38 :params value: arbitrary number of Bytes
39
39
40 A blob of `params size` containing the serialized version of all stream level
40 A blob of `params size` containing the serialized version of all stream level
41 parameters.
41 parameters.
42
42
43 The blob contains a space separated list of parameters. Parameters with value
43 The blob contains a space separated list of parameters. Parameters with value
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45
45
46 Empty name are obviously forbidden.
46 Empty name are obviously forbidden.
47
47
48 Name MUST start with a letter. If this first letter is lower case, the
48 Name MUST start with a letter. If this first letter is lower case, the
49 parameter is advisory and can be safely ignored. However when the first
49 parameter is advisory and can be safely ignored. However when the first
50 letter is capital, the parameter is mandatory and the bundling process MUST
50 letter is capital, the parameter is mandatory and the bundling process MUST
51 stop if he is not able to proceed it.
51 stop if he is not able to proceed it.
52
52
53 Stream parameters use a simple textual format for two main reasons:
53 Stream parameters use a simple textual format for two main reasons:
54
54
55 - Stream level parameters should remain simple and we want to discourage any
55 - Stream level parameters should remain simple and we want to discourage any
56 crazy usage.
56 crazy usage.
57 - Textual data allow easy human inspection of a bundle2 header in case of
57 - Textual data allow easy human inspection of a bundle2 header in case of
58 troubles.
58 troubles.
59
59
60 Any Applicative level options MUST go into a bundle2 part instead.
60 Any Applicative level options MUST go into a bundle2 part instead.
61
61
62 Payload part
62 Payload part
63 ------------------------
63 ------------------------
64
64
65 Binary format is as follow
65 Binary format is as follow
66
66
67 :header size: int32
67 :header size: int32
68
68
69 The total number of Bytes used by the part headers. When the header is empty
69 The total number of Bytes used by the part headers. When the header is empty
70 (size = 0) this is interpreted as the end of stream marker.
70 (size = 0) this is interpreted as the end of stream marker.
71
71
72 :header:
72 :header:
73
73
74 The header defines how to interpret the part. It contains two piece of
74 The header defines how to interpret the part. It contains two piece of
75 data: the part type, and the part parameters.
75 data: the part type, and the part parameters.
76
76
77 The part type is used to route an application level handler, that can
77 The part type is used to route an application level handler, that can
78 interpret payload.
78 interpret payload.
79
79
80 Part parameters are passed to the application level handler. They are
80 Part parameters are passed to the application level handler. They are
81 meant to convey information that will help the application level object to
81 meant to convey information that will help the application level object to
82 interpret the part payload.
82 interpret the part payload.
83
83
84 The binary format of the header is has follow
84 The binary format of the header is has follow
85
85
86 :typesize: (one byte)
86 :typesize: (one byte)
87
87
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89
89
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 to this part.
91 to this part.
92
92
93 :parameters:
93 :parameters:
94
94
95 Part's parameter may have arbitrary content, the binary structure is::
95 Part's parameter may have arbitrary content, the binary structure is::
96
96
97 <mandatory-count><advisory-count><param-sizes><param-data>
97 <mandatory-count><advisory-count><param-sizes><param-data>
98
98
99 :mandatory-count: 1 byte, number of mandatory parameters
99 :mandatory-count: 1 byte, number of mandatory parameters
100
100
101 :advisory-count: 1 byte, number of advisory parameters
101 :advisory-count: 1 byte, number of advisory parameters
102
102
103 :param-sizes:
103 :param-sizes:
104
104
105 N couple of bytes, where N is the total number of parameters. Each
105 N couple of bytes, where N is the total number of parameters. Each
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107
107
108 :param-data:
108 :param-data:
109
109
110 A blob of bytes from which each parameter key and value can be
110 A blob of bytes from which each parameter key and value can be
111 retrieved using the list of size couples stored in the previous
111 retrieved using the list of size couples stored in the previous
112 field.
112 field.
113
113
114 Mandatory parameters comes first, then the advisory ones.
114 Mandatory parameters comes first, then the advisory ones.
115
115
116 Each parameter's key MUST be unique within the part.
116 Each parameter's key MUST be unique within the part.
117
117
118 :payload:
118 :payload:
119
119
120 payload is a series of `<chunksize><chunkdata>`.
120 payload is a series of `<chunksize><chunkdata>`.
121
121
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124
124
125 The current implementation always produces either zero or one chunk.
125 The current implementation always produces either zero or one chunk.
126 This is an implementation limitation that will ultimately be lifted.
126 This is an implementation limitation that will ultimately be lifted.
127
127
128 `chunksize` can be negative to trigger special case processing. No such
128 `chunksize` can be negative to trigger special case processing. No such
129 processing is in place yet.
129 processing is in place yet.
130
130
131 Bundle processing
131 Bundle processing
132 ============================
132 ============================
133
133
134 Each part is processed in order using a "part handler". Handler are registered
134 Each part is processed in order using a "part handler". Handler are registered
135 for a certain part type.
135 for a certain part type.
136
136
137 The matching of a part to its handler is case insensitive. The case of the
137 The matching of a part to its handler is case insensitive. The case of the
138 part type is used to know if a part is mandatory or advisory. If the Part type
138 part type is used to know if a part is mandatory or advisory. If the Part type
139 contains any uppercase char it is considered mandatory. When no handler is
139 contains any uppercase char it is considered mandatory. When no handler is
140 known for a Mandatory part, the process is aborted and an exception is raised.
140 known for a Mandatory part, the process is aborted and an exception is raised.
141 If the part is advisory and no handler is known, the part is ignored. When the
141 If the part is advisory and no handler is known, the part is ignored. When the
142 process is aborted, the full bundle is still read from the stream to keep the
142 process is aborted, the full bundle is still read from the stream to keep the
143 channel usable. But none of the part read from an abort are processed. In the
143 channel usable. But none of the part read from an abort are processed. In the
144 future, dropping the stream may become an option for channel we do not care to
144 future, dropping the stream may become an option for channel we do not care to
145 preserve.
145 preserve.
146 """
146 """
147
147
148 import errno
148 import errno
149 import sys
149 import sys
150 import util
150 import util
151 import struct
151 import struct
152 import urllib
152 import urllib
153 import string
153 import string
154 import obsolete
154 import obsolete
155 import pushkey
155 import pushkey
156 import url
156 import url
157 import re
157 import re
158
158
159 import changegroup, error
159 import changegroup, error
160 from i18n import _
160 from i18n import _
161
161
162 _pack = struct.pack
162 _pack = struct.pack
163 _unpack = struct.unpack
163 _unpack = struct.unpack
164
164
165 _magicstring = 'HG2Y'
166
167 _fstreamparamsize = '>i'
165 _fstreamparamsize = '>i'
168 _fpartheadersize = '>i'
166 _fpartheadersize = '>i'
169 _fparttypesize = '>B'
167 _fparttypesize = '>B'
170 _fpartid = '>I'
168 _fpartid = '>I'
171 _fpayloadsize = '>i'
169 _fpayloadsize = '>i'
172 _fpartparamcount = '>BB'
170 _fpartparamcount = '>BB'
173
171
174 preferedchunksize = 4096
172 preferedchunksize = 4096
175
173
176 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
174 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
177
175
178 def validateparttype(parttype):
176 def validateparttype(parttype):
179 """raise ValueError if a parttype contains invalid character"""
177 """raise ValueError if a parttype contains invalid character"""
180 if _parttypeforbidden.search(parttype):
178 if _parttypeforbidden.search(parttype):
181 raise ValueError(parttype)
179 raise ValueError(parttype)
182
180
183 def _makefpartparamsizes(nbparams):
181 def _makefpartparamsizes(nbparams):
184 """return a struct format to read part parameter sizes
182 """return a struct format to read part parameter sizes
185
183
186 The number parameters is variable so we need to build that format
184 The number parameters is variable so we need to build that format
187 dynamically.
185 dynamically.
188 """
186 """
189 return '>'+('BB'*nbparams)
187 return '>'+('BB'*nbparams)
190
188
191 parthandlermapping = {}
189 parthandlermapping = {}
192
190
193 def parthandler(parttype, params=()):
191 def parthandler(parttype, params=()):
194 """decorator that register a function as a bundle2 part handler
192 """decorator that register a function as a bundle2 part handler
195
193
196 eg::
194 eg::
197
195
198 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
196 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
199 def myparttypehandler(...):
197 def myparttypehandler(...):
200 '''process a part of type "my part".'''
198 '''process a part of type "my part".'''
201 ...
199 ...
202 """
200 """
203 validateparttype(parttype)
201 validateparttype(parttype)
204 def _decorator(func):
202 def _decorator(func):
205 lparttype = parttype.lower() # enforce lower case matching.
203 lparttype = parttype.lower() # enforce lower case matching.
206 assert lparttype not in parthandlermapping
204 assert lparttype not in parthandlermapping
207 parthandlermapping[lparttype] = func
205 parthandlermapping[lparttype] = func
208 func.params = frozenset(params)
206 func.params = frozenset(params)
209 return func
207 return func
210 return _decorator
208 return _decorator
211
209
212 class unbundlerecords(object):
210 class unbundlerecords(object):
213 """keep record of what happens during and unbundle
211 """keep record of what happens during and unbundle
214
212
215 New records are added using `records.add('cat', obj)`. Where 'cat' is a
213 New records are added using `records.add('cat', obj)`. Where 'cat' is a
216 category of record and obj is an arbitrary object.
214 category of record and obj is an arbitrary object.
217
215
218 `records['cat']` will return all entries of this category 'cat'.
216 `records['cat']` will return all entries of this category 'cat'.
219
217
220 Iterating on the object itself will yield `('category', obj)` tuples
218 Iterating on the object itself will yield `('category', obj)` tuples
221 for all entries.
219 for all entries.
222
220
223 All iterations happens in chronological order.
221 All iterations happens in chronological order.
224 """
222 """
225
223
226 def __init__(self):
224 def __init__(self):
227 self._categories = {}
225 self._categories = {}
228 self._sequences = []
226 self._sequences = []
229 self._replies = {}
227 self._replies = {}
230
228
231 def add(self, category, entry, inreplyto=None):
229 def add(self, category, entry, inreplyto=None):
232 """add a new record of a given category.
230 """add a new record of a given category.
233
231
234 The entry can then be retrieved in the list returned by
232 The entry can then be retrieved in the list returned by
235 self['category']."""
233 self['category']."""
236 self._categories.setdefault(category, []).append(entry)
234 self._categories.setdefault(category, []).append(entry)
237 self._sequences.append((category, entry))
235 self._sequences.append((category, entry))
238 if inreplyto is not None:
236 if inreplyto is not None:
239 self.getreplies(inreplyto).add(category, entry)
237 self.getreplies(inreplyto).add(category, entry)
240
238
241 def getreplies(self, partid):
239 def getreplies(self, partid):
242 """get the records that are replies to a specific part"""
240 """get the records that are replies to a specific part"""
243 return self._replies.setdefault(partid, unbundlerecords())
241 return self._replies.setdefault(partid, unbundlerecords())
244
242
245 def __getitem__(self, cat):
243 def __getitem__(self, cat):
246 return tuple(self._categories.get(cat, ()))
244 return tuple(self._categories.get(cat, ()))
247
245
248 def __iter__(self):
246 def __iter__(self):
249 return iter(self._sequences)
247 return iter(self._sequences)
250
248
251 def __len__(self):
249 def __len__(self):
252 return len(self._sequences)
250 return len(self._sequences)
253
251
254 def __nonzero__(self):
252 def __nonzero__(self):
255 return bool(self._sequences)
253 return bool(self._sequences)
256
254
257 class bundleoperation(object):
255 class bundleoperation(object):
258 """an object that represents a single bundling process
256 """an object that represents a single bundling process
259
257
260 Its purpose is to carry unbundle-related objects and states.
258 Its purpose is to carry unbundle-related objects and states.
261
259
262 A new object should be created at the beginning of each bundle processing.
260 A new object should be created at the beginning of each bundle processing.
263 The object is to be returned by the processing function.
261 The object is to be returned by the processing function.
264
262
265 The object has very little content now it will ultimately contain:
263 The object has very little content now it will ultimately contain:
266 * an access to the repo the bundle is applied to,
264 * an access to the repo the bundle is applied to,
267 * a ui object,
265 * a ui object,
268 * a way to retrieve a transaction to add changes to the repo,
266 * a way to retrieve a transaction to add changes to the repo,
269 * a way to record the result of processing each part,
267 * a way to record the result of processing each part,
270 * a way to construct a bundle response when applicable.
268 * a way to construct a bundle response when applicable.
271 """
269 """
272
270
273 def __init__(self, repo, transactiongetter):
271 def __init__(self, repo, transactiongetter):
274 self.repo = repo
272 self.repo = repo
275 self.ui = repo.ui
273 self.ui = repo.ui
276 self.records = unbundlerecords()
274 self.records = unbundlerecords()
277 self.gettransaction = transactiongetter
275 self.gettransaction = transactiongetter
278 self.reply = None
276 self.reply = None
279
277
280 class TransactionUnavailable(RuntimeError):
278 class TransactionUnavailable(RuntimeError):
281 pass
279 pass
282
280
283 def _notransaction():
281 def _notransaction():
284 """default method to get a transaction while processing a bundle
282 """default method to get a transaction while processing a bundle
285
283
286 Raise an exception to highlight the fact that no transaction was expected
284 Raise an exception to highlight the fact that no transaction was expected
287 to be created"""
285 to be created"""
288 raise TransactionUnavailable()
286 raise TransactionUnavailable()
289
287
290 def processbundle(repo, unbundler, transactiongetter=None):
288 def processbundle(repo, unbundler, transactiongetter=None):
291 """This function process a bundle, apply effect to/from a repo
289 """This function process a bundle, apply effect to/from a repo
292
290
293 It iterates over each part then searches for and uses the proper handling
291 It iterates over each part then searches for and uses the proper handling
294 code to process the part. Parts are processed in order.
292 code to process the part. Parts are processed in order.
295
293
296 This is very early version of this function that will be strongly reworked
294 This is very early version of this function that will be strongly reworked
297 before final usage.
295 before final usage.
298
296
299 Unknown Mandatory part will abort the process.
297 Unknown Mandatory part will abort the process.
300 """
298 """
301 if transactiongetter is None:
299 if transactiongetter is None:
302 transactiongetter = _notransaction
300 transactiongetter = _notransaction
303 op = bundleoperation(repo, transactiongetter)
301 op = bundleoperation(repo, transactiongetter)
304 # todo:
302 # todo:
305 # - replace this is a init function soon.
303 # - replace this is a init function soon.
306 # - exception catching
304 # - exception catching
307 unbundler.params
305 unbundler.params
308 iterparts = unbundler.iterparts()
306 iterparts = unbundler.iterparts()
309 part = None
307 part = None
310 try:
308 try:
311 for part in iterparts:
309 for part in iterparts:
312 _processpart(op, part)
310 _processpart(op, part)
313 except Exception, exc:
311 except Exception, exc:
314 for part in iterparts:
312 for part in iterparts:
315 # consume the bundle content
313 # consume the bundle content
316 part.seek(0, 2)
314 part.seek(0, 2)
317 # Small hack to let caller code distinguish exceptions from bundle2
315 # Small hack to let caller code distinguish exceptions from bundle2
318 # processing from processing the old format. This is mostly
316 # processing from processing the old format. This is mostly
319 # needed to handle different return codes to unbundle according to the
317 # needed to handle different return codes to unbundle according to the
320 # type of bundle. We should probably clean up or drop this return code
318 # type of bundle. We should probably clean up or drop this return code
321 # craziness in a future version.
319 # craziness in a future version.
322 exc.duringunbundle2 = True
320 exc.duringunbundle2 = True
323 raise
321 raise
324 return op
322 return op
325
323
326 def _processpart(op, part):
324 def _processpart(op, part):
327 """process a single part from a bundle
325 """process a single part from a bundle
328
326
329 The part is guaranteed to have been fully consumed when the function exits
327 The part is guaranteed to have been fully consumed when the function exits
330 (even if an exception is raised)."""
328 (even if an exception is raised)."""
331 try:
329 try:
332 try:
330 try:
333 handler = parthandlermapping.get(part.type)
331 handler = parthandlermapping.get(part.type)
334 if handler is None:
332 if handler is None:
335 raise error.UnsupportedPartError(parttype=part.type)
333 raise error.UnsupportedPartError(parttype=part.type)
336 op.ui.debug('found a handler for part %r\n' % part.type)
334 op.ui.debug('found a handler for part %r\n' % part.type)
337 unknownparams = part.mandatorykeys - handler.params
335 unknownparams = part.mandatorykeys - handler.params
338 if unknownparams:
336 if unknownparams:
339 unknownparams = list(unknownparams)
337 unknownparams = list(unknownparams)
340 unknownparams.sort()
338 unknownparams.sort()
341 raise error.UnsupportedPartError(parttype=part.type,
339 raise error.UnsupportedPartError(parttype=part.type,
342 params=unknownparams)
340 params=unknownparams)
343 except error.UnsupportedPartError, exc:
341 except error.UnsupportedPartError, exc:
344 if part.mandatory: # mandatory parts
342 if part.mandatory: # mandatory parts
345 raise
343 raise
346 op.ui.debug('ignoring unsupported advisory part %s\n' % exc)
344 op.ui.debug('ignoring unsupported advisory part %s\n' % exc)
347 return # skip to part processing
345 return # skip to part processing
348
346
349 # handler is called outside the above try block so that we don't
347 # handler is called outside the above try block so that we don't
350 # risk catching KeyErrors from anything other than the
348 # risk catching KeyErrors from anything other than the
351 # parthandlermapping lookup (any KeyError raised by handler()
349 # parthandlermapping lookup (any KeyError raised by handler()
352 # itself represents a defect of a different variety).
350 # itself represents a defect of a different variety).
353 output = None
351 output = None
354 if op.reply is not None:
352 if op.reply is not None:
355 op.ui.pushbuffer(error=True)
353 op.ui.pushbuffer(error=True)
356 output = ''
354 output = ''
357 try:
355 try:
358 handler(op, part)
356 handler(op, part)
359 finally:
357 finally:
360 if output is not None:
358 if output is not None:
361 output = op.ui.popbuffer()
359 output = op.ui.popbuffer()
362 if output:
360 if output:
363 outpart = op.reply.newpart('b2x:output', data=output,
361 outpart = op.reply.newpart('b2x:output', data=output,
364 mandatory=False)
362 mandatory=False)
365 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
363 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
366 finally:
364 finally:
367 # consume the part content to not corrupt the stream.
365 # consume the part content to not corrupt the stream.
368 part.seek(0, 2)
366 part.seek(0, 2)
369
367
370
368
371 def decodecaps(blob):
369 def decodecaps(blob):
372 """decode a bundle2 caps bytes blob into a dictionary
370 """decode a bundle2 caps bytes blob into a dictionary
373
371
374 The blob is a list of capabilities (one per line)
372 The blob is a list of capabilities (one per line)
375 Capabilities may have values using a line of the form::
373 Capabilities may have values using a line of the form::
376
374
377 capability=value1,value2,value3
375 capability=value1,value2,value3
378
376
379 The values are always a list."""
377 The values are always a list."""
380 caps = {}
378 caps = {}
381 for line in blob.splitlines():
379 for line in blob.splitlines():
382 if not line:
380 if not line:
383 continue
381 continue
384 if '=' not in line:
382 if '=' not in line:
385 key, vals = line, ()
383 key, vals = line, ()
386 else:
384 else:
387 key, vals = line.split('=', 1)
385 key, vals = line.split('=', 1)
388 vals = vals.split(',')
386 vals = vals.split(',')
389 key = urllib.unquote(key)
387 key = urllib.unquote(key)
390 vals = [urllib.unquote(v) for v in vals]
388 vals = [urllib.unquote(v) for v in vals]
391 caps[key] = vals
389 caps[key] = vals
392 return caps
390 return caps
393
391
394 def encodecaps(caps):
392 def encodecaps(caps):
395 """encode a bundle2 caps dictionary into a bytes blob"""
393 """encode a bundle2 caps dictionary into a bytes blob"""
396 chunks = []
394 chunks = []
397 for ca in sorted(caps):
395 for ca in sorted(caps):
398 vals = caps[ca]
396 vals = caps[ca]
399 ca = urllib.quote(ca)
397 ca = urllib.quote(ca)
400 vals = [urllib.quote(v) for v in vals]
398 vals = [urllib.quote(v) for v in vals]
401 if vals:
399 if vals:
402 ca = "%s=%s" % (ca, ','.join(vals))
400 ca = "%s=%s" % (ca, ','.join(vals))
403 chunks.append(ca)
401 chunks.append(ca)
404 return '\n'.join(chunks)
402 return '\n'.join(chunks)
405
403
406 class bundle20(object):
404 class bundle20(object):
407 """represent an outgoing bundle2 container
405 """represent an outgoing bundle2 container
408
406
409 Use the `addparam` method to add stream level parameter. and `newpart` to
407 Use the `addparam` method to add stream level parameter. and `newpart` to
410 populate it. Then call `getchunks` to retrieve all the binary chunks of
408 populate it. Then call `getchunks` to retrieve all the binary chunks of
411 data that compose the bundle2 container."""
409 data that compose the bundle2 container."""
412
410
411 _magicstring = 'HG2Y'
412
413 def __init__(self, ui, capabilities=()):
413 def __init__(self, ui, capabilities=()):
414 self.ui = ui
414 self.ui = ui
415 self._params = []
415 self._params = []
416 self._parts = []
416 self._parts = []
417 self.capabilities = dict(capabilities)
417 self.capabilities = dict(capabilities)
418
418
419 @property
419 @property
420 def nbparts(self):
420 def nbparts(self):
421 """total number of parts added to the bundler"""
421 """total number of parts added to the bundler"""
422 return len(self._parts)
422 return len(self._parts)
423
423
424 # methods used to defines the bundle2 content
424 # methods used to defines the bundle2 content
425 def addparam(self, name, value=None):
425 def addparam(self, name, value=None):
426 """add a stream level parameter"""
426 """add a stream level parameter"""
427 if not name:
427 if not name:
428 raise ValueError('empty parameter name')
428 raise ValueError('empty parameter name')
429 if name[0] not in string.letters:
429 if name[0] not in string.letters:
430 raise ValueError('non letter first character: %r' % name)
430 raise ValueError('non letter first character: %r' % name)
431 self._params.append((name, value))
431 self._params.append((name, value))
432
432
433 def addpart(self, part):
433 def addpart(self, part):
434 """add a new part to the bundle2 container
434 """add a new part to the bundle2 container
435
435
436 Parts contains the actual applicative payload."""
436 Parts contains the actual applicative payload."""
437 assert part.id is None
437 assert part.id is None
438 part.id = len(self._parts) # very cheap counter
438 part.id = len(self._parts) # very cheap counter
439 self._parts.append(part)
439 self._parts.append(part)
440
440
441 def newpart(self, typeid, *args, **kwargs):
441 def newpart(self, typeid, *args, **kwargs):
442 """create a new part and add it to the containers
442 """create a new part and add it to the containers
443
443
444 As the part is directly added to the containers. For now, this means
444 As the part is directly added to the containers. For now, this means
445 that any failure to properly initialize the part after calling
445 that any failure to properly initialize the part after calling
446 ``newpart`` should result in a failure of the whole bundling process.
446 ``newpart`` should result in a failure of the whole bundling process.
447
447
448 You can still fall back to manually create and add if you need better
448 You can still fall back to manually create and add if you need better
449 control."""
449 control."""
450 part = bundlepart(typeid, *args, **kwargs)
450 part = bundlepart(typeid, *args, **kwargs)
451 self.addpart(part)
451 self.addpart(part)
452 return part
452 return part
453
453
454 # methods used to generate the bundle2 stream
454 # methods used to generate the bundle2 stream
455 def getchunks(self):
455 def getchunks(self):
456 self.ui.debug('start emission of %s stream\n' % _magicstring)
456 self.ui.debug('start emission of %s stream\n' % self._magicstring)
457 yield _magicstring
457 yield self._magicstring
458 param = self._paramchunk()
458 param = self._paramchunk()
459 self.ui.debug('bundle parameter: %s\n' % param)
459 self.ui.debug('bundle parameter: %s\n' % param)
460 yield _pack(_fstreamparamsize, len(param))
460 yield _pack(_fstreamparamsize, len(param))
461 if param:
461 if param:
462 yield param
462 yield param
463
463
464 self.ui.debug('start of parts\n')
464 self.ui.debug('start of parts\n')
465 for part in self._parts:
465 for part in self._parts:
466 self.ui.debug('bundle part: "%s"\n' % part.type)
466 self.ui.debug('bundle part: "%s"\n' % part.type)
467 for chunk in part.getchunks():
467 for chunk in part.getchunks():
468 yield chunk
468 yield chunk
469 self.ui.debug('end of bundle\n')
469 self.ui.debug('end of bundle\n')
470 yield _pack(_fpartheadersize, 0)
470 yield _pack(_fpartheadersize, 0)
471
471
472 def _paramchunk(self):
472 def _paramchunk(self):
473 """return a encoded version of all stream parameters"""
473 """return a encoded version of all stream parameters"""
474 blocks = []
474 blocks = []
475 for par, value in self._params:
475 for par, value in self._params:
476 par = urllib.quote(par)
476 par = urllib.quote(par)
477 if value is not None:
477 if value is not None:
478 value = urllib.quote(value)
478 value = urllib.quote(value)
479 par = '%s=%s' % (par, value)
479 par = '%s=%s' % (par, value)
480 blocks.append(par)
480 blocks.append(par)
481 return ' '.join(blocks)
481 return ' '.join(blocks)
482
482
483 class unpackermixin(object):
483 class unpackermixin(object):
484 """A mixin to extract bytes and struct data from a stream"""
484 """A mixin to extract bytes and struct data from a stream"""
485
485
486 def __init__(self, fp):
486 def __init__(self, fp):
487 self._fp = fp
487 self._fp = fp
488 self._seekable = (util.safehasattr(fp, 'seek') and
488 self._seekable = (util.safehasattr(fp, 'seek') and
489 util.safehasattr(fp, 'tell'))
489 util.safehasattr(fp, 'tell'))
490
490
491 def _unpack(self, format):
491 def _unpack(self, format):
492 """unpack this struct format from the stream"""
492 """unpack this struct format from the stream"""
493 data = self._readexact(struct.calcsize(format))
493 data = self._readexact(struct.calcsize(format))
494 return _unpack(format, data)
494 return _unpack(format, data)
495
495
496 def _readexact(self, size):
496 def _readexact(self, size):
497 """read exactly <size> bytes from the stream"""
497 """read exactly <size> bytes from the stream"""
498 return changegroup.readexactly(self._fp, size)
498 return changegroup.readexactly(self._fp, size)
499
499
500 def seek(self, offset, whence=0):
500 def seek(self, offset, whence=0):
501 """move the underlying file pointer"""
501 """move the underlying file pointer"""
502 if self._seekable:
502 if self._seekable:
503 return self._fp.seek(offset, whence)
503 return self._fp.seek(offset, whence)
504 else:
504 else:
505 raise NotImplementedError(_('File pointer is not seekable'))
505 raise NotImplementedError(_('File pointer is not seekable'))
506
506
507 def tell(self):
507 def tell(self):
508 """return the file offset, or None if file is not seekable"""
508 """return the file offset, or None if file is not seekable"""
509 if self._seekable:
509 if self._seekable:
510 try:
510 try:
511 return self._fp.tell()
511 return self._fp.tell()
512 except IOError, e:
512 except IOError, e:
513 if e.errno == errno.ESPIPE:
513 if e.errno == errno.ESPIPE:
514 self._seekable = False
514 self._seekable = False
515 else:
515 else:
516 raise
516 raise
517 return None
517 return None
518
518
519 def close(self):
519 def close(self):
520 """close underlying file"""
520 """close underlying file"""
521 if util.safehasattr(self._fp, 'close'):
521 if util.safehasattr(self._fp, 'close'):
522 return self._fp.close()
522 return self._fp.close()
523
523
524 class unbundle20(unpackermixin):
524 class unbundle20(unpackermixin):
525 """interpret a bundle2 stream
525 """interpret a bundle2 stream
526
526
527 This class is fed with a binary stream and yields parts through its
527 This class is fed with a binary stream and yields parts through its
528 `iterparts` methods."""
528 `iterparts` methods."""
529
529
530 def __init__(self, ui, fp, header=None):
530 def __init__(self, ui, fp, header=None):
531 """If header is specified, we do not read it out of the stream."""
531 """If header is specified, we do not read it out of the stream."""
532 self.ui = ui
532 self.ui = ui
533 super(unbundle20, self).__init__(fp)
533 super(unbundle20, self).__init__(fp)
534 if header is None:
534 if header is None:
535 header = self._readexact(4)
535 header = self._readexact(4)
536 magic, version = header[0:2], header[2:4]
536 magic, version = header[0:2], header[2:4]
537 if magic != 'HG':
537 if magic != 'HG':
538 raise util.Abort(_('not a Mercurial bundle'))
538 raise util.Abort(_('not a Mercurial bundle'))
539 if version != '2Y':
539 if version != '2Y':
540 raise util.Abort(_('unknown bundle version %s') % version)
540 raise util.Abort(_('unknown bundle version %s') % version)
541 self.ui.debug('start processing of %s stream\n' % header)
541 self.ui.debug('start processing of %s stream\n' % header)
542
542
543 @util.propertycache
543 @util.propertycache
544 def params(self):
544 def params(self):
545 """dictionary of stream level parameters"""
545 """dictionary of stream level parameters"""
546 self.ui.debug('reading bundle2 stream parameters\n')
546 self.ui.debug('reading bundle2 stream parameters\n')
547 params = {}
547 params = {}
548 paramssize = self._unpack(_fstreamparamsize)[0]
548 paramssize = self._unpack(_fstreamparamsize)[0]
549 if paramssize < 0:
549 if paramssize < 0:
550 raise error.BundleValueError('negative bundle param size: %i'
550 raise error.BundleValueError('negative bundle param size: %i'
551 % paramssize)
551 % paramssize)
552 if paramssize:
552 if paramssize:
553 for p in self._readexact(paramssize).split(' '):
553 for p in self._readexact(paramssize).split(' '):
554 p = p.split('=', 1)
554 p = p.split('=', 1)
555 p = [urllib.unquote(i) for i in p]
555 p = [urllib.unquote(i) for i in p]
556 if len(p) < 2:
556 if len(p) < 2:
557 p.append(None)
557 p.append(None)
558 self._processparam(*p)
558 self._processparam(*p)
559 params[p[0]] = p[1]
559 params[p[0]] = p[1]
560 return params
560 return params
561
561
562 def _processparam(self, name, value):
562 def _processparam(self, name, value):
563 """process a parameter, applying its effect if needed
563 """process a parameter, applying its effect if needed
564
564
565 Parameter starting with a lower case letter are advisory and will be
565 Parameter starting with a lower case letter are advisory and will be
566 ignored when unknown. Those starting with an upper case letter are
566 ignored when unknown. Those starting with an upper case letter are
567 mandatory and will this function will raise a KeyError when unknown.
567 mandatory and will this function will raise a KeyError when unknown.
568
568
569 Note: no option are currently supported. Any input will be either
569 Note: no option are currently supported. Any input will be either
570 ignored or failing.
570 ignored or failing.
571 """
571 """
572 if not name:
572 if not name:
573 raise ValueError('empty parameter name')
573 raise ValueError('empty parameter name')
574 if name[0] not in string.letters:
574 if name[0] not in string.letters:
575 raise ValueError('non letter first character: %r' % name)
575 raise ValueError('non letter first character: %r' % name)
576 # Some logic will be later added here to try to process the option for
576 # Some logic will be later added here to try to process the option for
577 # a dict of known parameter.
577 # a dict of known parameter.
578 if name[0].islower():
578 if name[0].islower():
579 self.ui.debug("ignoring unknown parameter %r\n" % name)
579 self.ui.debug("ignoring unknown parameter %r\n" % name)
580 else:
580 else:
581 raise error.UnsupportedPartError(params=(name,))
581 raise error.UnsupportedPartError(params=(name,))
582
582
583
583
584 def iterparts(self):
584 def iterparts(self):
585 """yield all parts contained in the stream"""
585 """yield all parts contained in the stream"""
586 # make sure param have been loaded
586 # make sure param have been loaded
587 self.params
587 self.params
588 self.ui.debug('start extraction of bundle2 parts\n')
588 self.ui.debug('start extraction of bundle2 parts\n')
589 headerblock = self._readpartheader()
589 headerblock = self._readpartheader()
590 while headerblock is not None:
590 while headerblock is not None:
591 part = unbundlepart(self.ui, headerblock, self._fp)
591 part = unbundlepart(self.ui, headerblock, self._fp)
592 yield part
592 yield part
593 part.seek(0, 2)
593 part.seek(0, 2)
594 headerblock = self._readpartheader()
594 headerblock = self._readpartheader()
595 self.ui.debug('end of bundle2 stream\n')
595 self.ui.debug('end of bundle2 stream\n')
596
596
597 def _readpartheader(self):
597 def _readpartheader(self):
598 """reads a part header size and return the bytes blob
598 """reads a part header size and return the bytes blob
599
599
600 returns None if empty"""
600 returns None if empty"""
601 headersize = self._unpack(_fpartheadersize)[0]
601 headersize = self._unpack(_fpartheadersize)[0]
602 if headersize < 0:
602 if headersize < 0:
603 raise error.BundleValueError('negative part header size: %i'
603 raise error.BundleValueError('negative part header size: %i'
604 % headersize)
604 % headersize)
605 self.ui.debug('part header size: %i\n' % headersize)
605 self.ui.debug('part header size: %i\n' % headersize)
606 if headersize:
606 if headersize:
607 return self._readexact(headersize)
607 return self._readexact(headersize)
608 return None
608 return None
609
609
610 def compressed(self):
610 def compressed(self):
611 return False
611 return False
612
612
613 class bundlepart(object):
613 class bundlepart(object):
614 """A bundle2 part contains application level payload
614 """A bundle2 part contains application level payload
615
615
616 The part `type` is used to route the part to the application level
616 The part `type` is used to route the part to the application level
617 handler.
617 handler.
618
618
619 The part payload is contained in ``part.data``. It could be raw bytes or a
619 The part payload is contained in ``part.data``. It could be raw bytes or a
620 generator of byte chunks.
620 generator of byte chunks.
621
621
622 You can add parameters to the part using the ``addparam`` method.
622 You can add parameters to the part using the ``addparam`` method.
623 Parameters can be either mandatory (default) or advisory. Remote side
623 Parameters can be either mandatory (default) or advisory. Remote side
624 should be able to safely ignore the advisory ones.
624 should be able to safely ignore the advisory ones.
625
625
626 Both data and parameters cannot be modified after the generation has begun.
626 Both data and parameters cannot be modified after the generation has begun.
627 """
627 """
628
628
629 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
629 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
630 data='', mandatory=True):
630 data='', mandatory=True):
631 validateparttype(parttype)
631 validateparttype(parttype)
632 self.id = None
632 self.id = None
633 self.type = parttype
633 self.type = parttype
634 self._data = data
634 self._data = data
635 self._mandatoryparams = list(mandatoryparams)
635 self._mandatoryparams = list(mandatoryparams)
636 self._advisoryparams = list(advisoryparams)
636 self._advisoryparams = list(advisoryparams)
637 # checking for duplicated entries
637 # checking for duplicated entries
638 self._seenparams = set()
638 self._seenparams = set()
639 for pname, __ in self._mandatoryparams + self._advisoryparams:
639 for pname, __ in self._mandatoryparams + self._advisoryparams:
640 if pname in self._seenparams:
640 if pname in self._seenparams:
641 raise RuntimeError('duplicated params: %s' % pname)
641 raise RuntimeError('duplicated params: %s' % pname)
642 self._seenparams.add(pname)
642 self._seenparams.add(pname)
643 # status of the part's generation:
643 # status of the part's generation:
644 # - None: not started,
644 # - None: not started,
645 # - False: currently generated,
645 # - False: currently generated,
646 # - True: generation done.
646 # - True: generation done.
647 self._generated = None
647 self._generated = None
648 self.mandatory = mandatory
648 self.mandatory = mandatory
649
649
650 # methods used to defines the part content
650 # methods used to defines the part content
651 def __setdata(self, data):
651 def __setdata(self, data):
652 if self._generated is not None:
652 if self._generated is not None:
653 raise error.ReadOnlyPartError('part is being generated')
653 raise error.ReadOnlyPartError('part is being generated')
654 self._data = data
654 self._data = data
655 def __getdata(self):
655 def __getdata(self):
656 return self._data
656 return self._data
657 data = property(__getdata, __setdata)
657 data = property(__getdata, __setdata)
658
658
659 @property
659 @property
660 def mandatoryparams(self):
660 def mandatoryparams(self):
661 # make it an immutable tuple to force people through ``addparam``
661 # make it an immutable tuple to force people through ``addparam``
662 return tuple(self._mandatoryparams)
662 return tuple(self._mandatoryparams)
663
663
664 @property
664 @property
665 def advisoryparams(self):
665 def advisoryparams(self):
666 # make it an immutable tuple to force people through ``addparam``
666 # make it an immutable tuple to force people through ``addparam``
667 return tuple(self._advisoryparams)
667 return tuple(self._advisoryparams)
668
668
669 def addparam(self, name, value='', mandatory=True):
669 def addparam(self, name, value='', mandatory=True):
670 if self._generated is not None:
670 if self._generated is not None:
671 raise error.ReadOnlyPartError('part is being generated')
671 raise error.ReadOnlyPartError('part is being generated')
672 if name in self._seenparams:
672 if name in self._seenparams:
673 raise ValueError('duplicated params: %s' % name)
673 raise ValueError('duplicated params: %s' % name)
674 self._seenparams.add(name)
674 self._seenparams.add(name)
675 params = self._advisoryparams
675 params = self._advisoryparams
676 if mandatory:
676 if mandatory:
677 params = self._mandatoryparams
677 params = self._mandatoryparams
678 params.append((name, value))
678 params.append((name, value))
679
679
680 # methods used to generates the bundle2 stream
680 # methods used to generates the bundle2 stream
681 def getchunks(self):
681 def getchunks(self):
682 if self._generated is not None:
682 if self._generated is not None:
683 raise RuntimeError('part can only be consumed once')
683 raise RuntimeError('part can only be consumed once')
684 self._generated = False
684 self._generated = False
685 #### header
685 #### header
686 if self.mandatory:
686 if self.mandatory:
687 parttype = self.type.upper()
687 parttype = self.type.upper()
688 else:
688 else:
689 parttype = self.type.lower()
689 parttype = self.type.lower()
690 ## parttype
690 ## parttype
691 header = [_pack(_fparttypesize, len(parttype)),
691 header = [_pack(_fparttypesize, len(parttype)),
692 parttype, _pack(_fpartid, self.id),
692 parttype, _pack(_fpartid, self.id),
693 ]
693 ]
694 ## parameters
694 ## parameters
695 # count
695 # count
696 manpar = self.mandatoryparams
696 manpar = self.mandatoryparams
697 advpar = self.advisoryparams
697 advpar = self.advisoryparams
698 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
698 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
699 # size
699 # size
700 parsizes = []
700 parsizes = []
701 for key, value in manpar:
701 for key, value in manpar:
702 parsizes.append(len(key))
702 parsizes.append(len(key))
703 parsizes.append(len(value))
703 parsizes.append(len(value))
704 for key, value in advpar:
704 for key, value in advpar:
705 parsizes.append(len(key))
705 parsizes.append(len(key))
706 parsizes.append(len(value))
706 parsizes.append(len(value))
707 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
707 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
708 header.append(paramsizes)
708 header.append(paramsizes)
709 # key, value
709 # key, value
710 for key, value in manpar:
710 for key, value in manpar:
711 header.append(key)
711 header.append(key)
712 header.append(value)
712 header.append(value)
713 for key, value in advpar:
713 for key, value in advpar:
714 header.append(key)
714 header.append(key)
715 header.append(value)
715 header.append(value)
716 ## finalize header
716 ## finalize header
717 headerchunk = ''.join(header)
717 headerchunk = ''.join(header)
718 yield _pack(_fpartheadersize, len(headerchunk))
718 yield _pack(_fpartheadersize, len(headerchunk))
719 yield headerchunk
719 yield headerchunk
720 ## payload
720 ## payload
721 try:
721 try:
722 for chunk in self._payloadchunks():
722 for chunk in self._payloadchunks():
723 yield _pack(_fpayloadsize, len(chunk))
723 yield _pack(_fpayloadsize, len(chunk))
724 yield chunk
724 yield chunk
725 except Exception, exc:
725 except Exception, exc:
726 # backup exception data for later
726 # backup exception data for later
727 exc_info = sys.exc_info()
727 exc_info = sys.exc_info()
728 msg = 'unexpected error: %s' % exc
728 msg = 'unexpected error: %s' % exc
729 interpart = bundlepart('b2x:error:abort', [('message', msg)],
729 interpart = bundlepart('b2x:error:abort', [('message', msg)],
730 mandatory=False)
730 mandatory=False)
731 interpart.id = 0
731 interpart.id = 0
732 yield _pack(_fpayloadsize, -1)
732 yield _pack(_fpayloadsize, -1)
733 for chunk in interpart.getchunks():
733 for chunk in interpart.getchunks():
734 yield chunk
734 yield chunk
735 # abort current part payload
735 # abort current part payload
736 yield _pack(_fpayloadsize, 0)
736 yield _pack(_fpayloadsize, 0)
737 raise exc_info[0], exc_info[1], exc_info[2]
737 raise exc_info[0], exc_info[1], exc_info[2]
738 # end of payload
738 # end of payload
739 yield _pack(_fpayloadsize, 0)
739 yield _pack(_fpayloadsize, 0)
740 self._generated = True
740 self._generated = True
741
741
742 def _payloadchunks(self):
742 def _payloadchunks(self):
743 """yield chunks of a the part payload
743 """yield chunks of a the part payload
744
744
745 Exists to handle the different methods to provide data to a part."""
745 Exists to handle the different methods to provide data to a part."""
746 # we only support fixed size data now.
746 # we only support fixed size data now.
747 # This will be improved in the future.
747 # This will be improved in the future.
748 if util.safehasattr(self.data, 'next'):
748 if util.safehasattr(self.data, 'next'):
749 buff = util.chunkbuffer(self.data)
749 buff = util.chunkbuffer(self.data)
750 chunk = buff.read(preferedchunksize)
750 chunk = buff.read(preferedchunksize)
751 while chunk:
751 while chunk:
752 yield chunk
752 yield chunk
753 chunk = buff.read(preferedchunksize)
753 chunk = buff.read(preferedchunksize)
754 elif len(self.data):
754 elif len(self.data):
755 yield self.data
755 yield self.data
756
756
757
757
758 flaginterrupt = -1
758 flaginterrupt = -1
759
759
760 class interrupthandler(unpackermixin):
760 class interrupthandler(unpackermixin):
761 """read one part and process it with restricted capability
761 """read one part and process it with restricted capability
762
762
763 This allows to transmit exception raised on the producer size during part
763 This allows to transmit exception raised on the producer size during part
764 iteration while the consumer is reading a part.
764 iteration while the consumer is reading a part.
765
765
766 Part processed in this manner only have access to a ui object,"""
766 Part processed in this manner only have access to a ui object,"""
767
767
768 def __init__(self, ui, fp):
768 def __init__(self, ui, fp):
769 super(interrupthandler, self).__init__(fp)
769 super(interrupthandler, self).__init__(fp)
770 self.ui = ui
770 self.ui = ui
771
771
772 def _readpartheader(self):
772 def _readpartheader(self):
773 """reads a part header size and return the bytes blob
773 """reads a part header size and return the bytes blob
774
774
775 returns None if empty"""
775 returns None if empty"""
776 headersize = self._unpack(_fpartheadersize)[0]
776 headersize = self._unpack(_fpartheadersize)[0]
777 if headersize < 0:
777 if headersize < 0:
778 raise error.BundleValueError('negative part header size: %i'
778 raise error.BundleValueError('negative part header size: %i'
779 % headersize)
779 % headersize)
780 self.ui.debug('part header size: %i\n' % headersize)
780 self.ui.debug('part header size: %i\n' % headersize)
781 if headersize:
781 if headersize:
782 return self._readexact(headersize)
782 return self._readexact(headersize)
783 return None
783 return None
784
784
785 def __call__(self):
785 def __call__(self):
786 self.ui.debug('bundle2 stream interruption, looking for a part.\n')
786 self.ui.debug('bundle2 stream interruption, looking for a part.\n')
787 headerblock = self._readpartheader()
787 headerblock = self._readpartheader()
788 if headerblock is None:
788 if headerblock is None:
789 self.ui.debug('no part found during interruption.\n')
789 self.ui.debug('no part found during interruption.\n')
790 return
790 return
791 part = unbundlepart(self.ui, headerblock, self._fp)
791 part = unbundlepart(self.ui, headerblock, self._fp)
792 op = interruptoperation(self.ui)
792 op = interruptoperation(self.ui)
793 _processpart(op, part)
793 _processpart(op, part)
794
794
795 class interruptoperation(object):
795 class interruptoperation(object):
796 """A limited operation to be use by part handler during interruption
796 """A limited operation to be use by part handler during interruption
797
797
798 It only have access to an ui object.
798 It only have access to an ui object.
799 """
799 """
800
800
801 def __init__(self, ui):
801 def __init__(self, ui):
802 self.ui = ui
802 self.ui = ui
803 self.reply = None
803 self.reply = None
804
804
805 @property
805 @property
806 def repo(self):
806 def repo(self):
807 raise RuntimeError('no repo access from stream interruption')
807 raise RuntimeError('no repo access from stream interruption')
808
808
809 def gettransaction(self):
809 def gettransaction(self):
810 raise TransactionUnavailable('no repo access from stream interruption')
810 raise TransactionUnavailable('no repo access from stream interruption')
811
811
812 class unbundlepart(unpackermixin):
812 class unbundlepart(unpackermixin):
813 """a bundle part read from a bundle"""
813 """a bundle part read from a bundle"""
814
814
815 def __init__(self, ui, header, fp):
815 def __init__(self, ui, header, fp):
816 super(unbundlepart, self).__init__(fp)
816 super(unbundlepart, self).__init__(fp)
817 self.ui = ui
817 self.ui = ui
818 # unbundle state attr
818 # unbundle state attr
819 self._headerdata = header
819 self._headerdata = header
820 self._headeroffset = 0
820 self._headeroffset = 0
821 self._initialized = False
821 self._initialized = False
822 self.consumed = False
822 self.consumed = False
823 # part data
823 # part data
824 self.id = None
824 self.id = None
825 self.type = None
825 self.type = None
826 self.mandatoryparams = None
826 self.mandatoryparams = None
827 self.advisoryparams = None
827 self.advisoryparams = None
828 self.params = None
828 self.params = None
829 self.mandatorykeys = ()
829 self.mandatorykeys = ()
830 self._payloadstream = None
830 self._payloadstream = None
831 self._readheader()
831 self._readheader()
832 self._mandatory = None
832 self._mandatory = None
833 self._chunkindex = [] #(payload, file) position tuples for chunk starts
833 self._chunkindex = [] #(payload, file) position tuples for chunk starts
834 self._pos = 0
834 self._pos = 0
835
835
836 def _fromheader(self, size):
836 def _fromheader(self, size):
837 """return the next <size> byte from the header"""
837 """return the next <size> byte from the header"""
838 offset = self._headeroffset
838 offset = self._headeroffset
839 data = self._headerdata[offset:(offset + size)]
839 data = self._headerdata[offset:(offset + size)]
840 self._headeroffset = offset + size
840 self._headeroffset = offset + size
841 return data
841 return data
842
842
843 def _unpackheader(self, format):
843 def _unpackheader(self, format):
844 """read given format from header
844 """read given format from header
845
845
846 This automatically compute the size of the format to read."""
846 This automatically compute the size of the format to read."""
847 data = self._fromheader(struct.calcsize(format))
847 data = self._fromheader(struct.calcsize(format))
848 return _unpack(format, data)
848 return _unpack(format, data)
849
849
850 def _initparams(self, mandatoryparams, advisoryparams):
850 def _initparams(self, mandatoryparams, advisoryparams):
851 """internal function to setup all logic related parameters"""
851 """internal function to setup all logic related parameters"""
852 # make it read only to prevent people touching it by mistake.
852 # make it read only to prevent people touching it by mistake.
853 self.mandatoryparams = tuple(mandatoryparams)
853 self.mandatoryparams = tuple(mandatoryparams)
854 self.advisoryparams = tuple(advisoryparams)
854 self.advisoryparams = tuple(advisoryparams)
855 # user friendly UI
855 # user friendly UI
856 self.params = dict(self.mandatoryparams)
856 self.params = dict(self.mandatoryparams)
857 self.params.update(dict(self.advisoryparams))
857 self.params.update(dict(self.advisoryparams))
858 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
858 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
859
859
860 def _payloadchunks(self, chunknum=0):
860 def _payloadchunks(self, chunknum=0):
861 '''seek to specified chunk and start yielding data'''
861 '''seek to specified chunk and start yielding data'''
862 if len(self._chunkindex) == 0:
862 if len(self._chunkindex) == 0:
863 assert chunknum == 0, 'Must start with chunk 0'
863 assert chunknum == 0, 'Must start with chunk 0'
864 self._chunkindex.append((0, super(unbundlepart, self).tell()))
864 self._chunkindex.append((0, super(unbundlepart, self).tell()))
865 else:
865 else:
866 assert chunknum < len(self._chunkindex), \
866 assert chunknum < len(self._chunkindex), \
867 'Unknown chunk %d' % chunknum
867 'Unknown chunk %d' % chunknum
868 super(unbundlepart, self).seek(self._chunkindex[chunknum][1])
868 super(unbundlepart, self).seek(self._chunkindex[chunknum][1])
869
869
870 pos = self._chunkindex[chunknum][0]
870 pos = self._chunkindex[chunknum][0]
871 payloadsize = self._unpack(_fpayloadsize)[0]
871 payloadsize = self._unpack(_fpayloadsize)[0]
872 self.ui.debug('payload chunk size: %i\n' % payloadsize)
872 self.ui.debug('payload chunk size: %i\n' % payloadsize)
873 while payloadsize:
873 while payloadsize:
874 if payloadsize == flaginterrupt:
874 if payloadsize == flaginterrupt:
875 # interruption detection, the handler will now read a
875 # interruption detection, the handler will now read a
876 # single part and process it.
876 # single part and process it.
877 interrupthandler(self.ui, self._fp)()
877 interrupthandler(self.ui, self._fp)()
878 elif payloadsize < 0:
878 elif payloadsize < 0:
879 msg = 'negative payload chunk size: %i' % payloadsize
879 msg = 'negative payload chunk size: %i' % payloadsize
880 raise error.BundleValueError(msg)
880 raise error.BundleValueError(msg)
881 else:
881 else:
882 result = self._readexact(payloadsize)
882 result = self._readexact(payloadsize)
883 chunknum += 1
883 chunknum += 1
884 pos += payloadsize
884 pos += payloadsize
885 if chunknum == len(self._chunkindex):
885 if chunknum == len(self._chunkindex):
886 self._chunkindex.append((pos,
886 self._chunkindex.append((pos,
887 super(unbundlepart, self).tell()))
887 super(unbundlepart, self).tell()))
888 yield result
888 yield result
889 payloadsize = self._unpack(_fpayloadsize)[0]
889 payloadsize = self._unpack(_fpayloadsize)[0]
890 self.ui.debug('payload chunk size: %i\n' % payloadsize)
890 self.ui.debug('payload chunk size: %i\n' % payloadsize)
891
891
892 def _findchunk(self, pos):
892 def _findchunk(self, pos):
893 '''for a given payload position, return a chunk number and offset'''
893 '''for a given payload position, return a chunk number and offset'''
894 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
894 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
895 if ppos == pos:
895 if ppos == pos:
896 return chunk, 0
896 return chunk, 0
897 elif ppos > pos:
897 elif ppos > pos:
898 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
898 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
899 raise ValueError('Unknown chunk')
899 raise ValueError('Unknown chunk')
900
900
901 def _readheader(self):
901 def _readheader(self):
902 """read the header and setup the object"""
902 """read the header and setup the object"""
903 typesize = self._unpackheader(_fparttypesize)[0]
903 typesize = self._unpackheader(_fparttypesize)[0]
904 self.type = self._fromheader(typesize)
904 self.type = self._fromheader(typesize)
905 self.ui.debug('part type: "%s"\n' % self.type)
905 self.ui.debug('part type: "%s"\n' % self.type)
906 self.id = self._unpackheader(_fpartid)[0]
906 self.id = self._unpackheader(_fpartid)[0]
907 self.ui.debug('part id: "%s"\n' % self.id)
907 self.ui.debug('part id: "%s"\n' % self.id)
908 # extract mandatory bit from type
908 # extract mandatory bit from type
909 self.mandatory = (self.type != self.type.lower())
909 self.mandatory = (self.type != self.type.lower())
910 self.type = self.type.lower()
910 self.type = self.type.lower()
911 ## reading parameters
911 ## reading parameters
912 # param count
912 # param count
913 mancount, advcount = self._unpackheader(_fpartparamcount)
913 mancount, advcount = self._unpackheader(_fpartparamcount)
914 self.ui.debug('part parameters: %i\n' % (mancount + advcount))
914 self.ui.debug('part parameters: %i\n' % (mancount + advcount))
915 # param size
915 # param size
916 fparamsizes = _makefpartparamsizes(mancount + advcount)
916 fparamsizes = _makefpartparamsizes(mancount + advcount)
917 paramsizes = self._unpackheader(fparamsizes)
917 paramsizes = self._unpackheader(fparamsizes)
918 # make it a list of couple again
918 # make it a list of couple again
919 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
919 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
920 # split mandatory from advisory
920 # split mandatory from advisory
921 mansizes = paramsizes[:mancount]
921 mansizes = paramsizes[:mancount]
922 advsizes = paramsizes[mancount:]
922 advsizes = paramsizes[mancount:]
923 # retrieve param value
923 # retrieve param value
924 manparams = []
924 manparams = []
925 for key, value in mansizes:
925 for key, value in mansizes:
926 manparams.append((self._fromheader(key), self._fromheader(value)))
926 manparams.append((self._fromheader(key), self._fromheader(value)))
927 advparams = []
927 advparams = []
928 for key, value in advsizes:
928 for key, value in advsizes:
929 advparams.append((self._fromheader(key), self._fromheader(value)))
929 advparams.append((self._fromheader(key), self._fromheader(value)))
930 self._initparams(manparams, advparams)
930 self._initparams(manparams, advparams)
931 ## part payload
931 ## part payload
932 self._payloadstream = util.chunkbuffer(self._payloadchunks())
932 self._payloadstream = util.chunkbuffer(self._payloadchunks())
933 # we read the data, tell it
933 # we read the data, tell it
934 self._initialized = True
934 self._initialized = True
935
935
936 def read(self, size=None):
936 def read(self, size=None):
937 """read payload data"""
937 """read payload data"""
938 if not self._initialized:
938 if not self._initialized:
939 self._readheader()
939 self._readheader()
940 if size is None:
940 if size is None:
941 data = self._payloadstream.read()
941 data = self._payloadstream.read()
942 else:
942 else:
943 data = self._payloadstream.read(size)
943 data = self._payloadstream.read(size)
944 if size is None or len(data) < size:
944 if size is None or len(data) < size:
945 self.consumed = True
945 self.consumed = True
946 self._pos += len(data)
946 self._pos += len(data)
947 return data
947 return data
948
948
949 def tell(self):
949 def tell(self):
950 return self._pos
950 return self._pos
951
951
952 def seek(self, offset, whence=0):
952 def seek(self, offset, whence=0):
953 if whence == 0:
953 if whence == 0:
954 newpos = offset
954 newpos = offset
955 elif whence == 1:
955 elif whence == 1:
956 newpos = self._pos + offset
956 newpos = self._pos + offset
957 elif whence == 2:
957 elif whence == 2:
958 if not self.consumed:
958 if not self.consumed:
959 self.read()
959 self.read()
960 newpos = self._chunkindex[-1][0] - offset
960 newpos = self._chunkindex[-1][0] - offset
961 else:
961 else:
962 raise ValueError('Unknown whence value: %r' % (whence,))
962 raise ValueError('Unknown whence value: %r' % (whence,))
963
963
964 if newpos > self._chunkindex[-1][0] and not self.consumed:
964 if newpos > self._chunkindex[-1][0] and not self.consumed:
965 self.read()
965 self.read()
966 if not 0 <= newpos <= self._chunkindex[-1][0]:
966 if not 0 <= newpos <= self._chunkindex[-1][0]:
967 raise ValueError('Offset out of range')
967 raise ValueError('Offset out of range')
968
968
969 if self._pos != newpos:
969 if self._pos != newpos:
970 chunk, internaloffset = self._findchunk(newpos)
970 chunk, internaloffset = self._findchunk(newpos)
971 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
971 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
972 adjust = self.read(internaloffset)
972 adjust = self.read(internaloffset)
973 if len(adjust) != internaloffset:
973 if len(adjust) != internaloffset:
974 raise util.Abort(_('Seek failed\n'))
974 raise util.Abort(_('Seek failed\n'))
975 self._pos = newpos
975 self._pos = newpos
976
976
977 capabilities = {'HG2Y': (),
977 capabilities = {'HG2Y': (),
978 'b2x:listkeys': (),
978 'b2x:listkeys': (),
979 'b2x:pushkey': (),
979 'b2x:pushkey': (),
980 'digests': tuple(sorted(util.DIGESTS.keys())),
980 'digests': tuple(sorted(util.DIGESTS.keys())),
981 'b2x:remote-changegroup': ('http', 'https'),
981 'b2x:remote-changegroup': ('http', 'https'),
982 }
982 }
983
983
984 def getrepocaps(repo, allowpushback=False):
984 def getrepocaps(repo, allowpushback=False):
985 """return the bundle2 capabilities for a given repo
985 """return the bundle2 capabilities for a given repo
986
986
987 Exists to allow extensions (like evolution) to mutate the capabilities.
987 Exists to allow extensions (like evolution) to mutate the capabilities.
988 """
988 """
989 caps = capabilities.copy()
989 caps = capabilities.copy()
990 caps['b2x:changegroup'] = tuple(sorted(changegroup.packermap.keys()))
990 caps['b2x:changegroup'] = tuple(sorted(changegroup.packermap.keys()))
991 if obsolete.isenabled(repo, obsolete.exchangeopt):
991 if obsolete.isenabled(repo, obsolete.exchangeopt):
992 supportedformat = tuple('V%i' % v for v in obsolete.formats)
992 supportedformat = tuple('V%i' % v for v in obsolete.formats)
993 caps['b2x:obsmarkers'] = supportedformat
993 caps['b2x:obsmarkers'] = supportedformat
994 if allowpushback:
994 if allowpushback:
995 caps['b2x:pushback'] = ()
995 caps['b2x:pushback'] = ()
996 return caps
996 return caps
997
997
998 def bundle2caps(remote):
998 def bundle2caps(remote):
999 """return the bundle capabilities of a peer as dict"""
999 """return the bundle capabilities of a peer as dict"""
1000 raw = remote.capable('bundle2-exp')
1000 raw = remote.capable('bundle2-exp')
1001 if not raw and raw != '':
1001 if not raw and raw != '':
1002 return {}
1002 return {}
1003 capsblob = urllib.unquote(remote.capable('bundle2-exp'))
1003 capsblob = urllib.unquote(remote.capable('bundle2-exp'))
1004 return decodecaps(capsblob)
1004 return decodecaps(capsblob)
1005
1005
1006 def obsmarkersversion(caps):
1006 def obsmarkersversion(caps):
1007 """extract the list of supported obsmarkers versions from a bundle2caps dict
1007 """extract the list of supported obsmarkers versions from a bundle2caps dict
1008 """
1008 """
1009 obscaps = caps.get('b2x:obsmarkers', ())
1009 obscaps = caps.get('b2x:obsmarkers', ())
1010 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1010 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1011
1011
1012 @parthandler('b2x:changegroup', ('version',))
1012 @parthandler('b2x:changegroup', ('version',))
1013 def handlechangegroup(op, inpart):
1013 def handlechangegroup(op, inpart):
1014 """apply a changegroup part on the repo
1014 """apply a changegroup part on the repo
1015
1015
1016 This is a very early implementation that will massive rework before being
1016 This is a very early implementation that will massive rework before being
1017 inflicted to any end-user.
1017 inflicted to any end-user.
1018 """
1018 """
1019 # Make sure we trigger a transaction creation
1019 # Make sure we trigger a transaction creation
1020 #
1020 #
1021 # The addchangegroup function will get a transaction object by itself, but
1021 # The addchangegroup function will get a transaction object by itself, but
1022 # we need to make sure we trigger the creation of a transaction object used
1022 # we need to make sure we trigger the creation of a transaction object used
1023 # for the whole processing scope.
1023 # for the whole processing scope.
1024 op.gettransaction()
1024 op.gettransaction()
1025 unpackerversion = inpart.params.get('version', '01')
1025 unpackerversion = inpart.params.get('version', '01')
1026 # We should raise an appropriate exception here
1026 # We should raise an appropriate exception here
1027 unpacker = changegroup.packermap[unpackerversion][1]
1027 unpacker = changegroup.packermap[unpackerversion][1]
1028 cg = unpacker(inpart, 'UN')
1028 cg = unpacker(inpart, 'UN')
1029 # the source and url passed here are overwritten by the one contained in
1029 # the source and url passed here are overwritten by the one contained in
1030 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1030 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1031 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
1031 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
1032 op.records.add('changegroup', {'return': ret})
1032 op.records.add('changegroup', {'return': ret})
1033 if op.reply is not None:
1033 if op.reply is not None:
1034 # This is definitely not the final form of this
1034 # This is definitely not the final form of this
1035 # return. But one need to start somewhere.
1035 # return. But one need to start somewhere.
1036 part = op.reply.newpart('b2x:reply:changegroup', mandatory=False)
1036 part = op.reply.newpart('b2x:reply:changegroup', mandatory=False)
1037 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1037 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1038 part.addparam('return', '%i' % ret, mandatory=False)
1038 part.addparam('return', '%i' % ret, mandatory=False)
1039 assert not inpart.read()
1039 assert not inpart.read()
1040
1040
1041 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1041 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1042 ['digest:%s' % k for k in util.DIGESTS.keys()])
1042 ['digest:%s' % k for k in util.DIGESTS.keys()])
1043 @parthandler('b2x:remote-changegroup', _remotechangegroupparams)
1043 @parthandler('b2x:remote-changegroup', _remotechangegroupparams)
1044 def handleremotechangegroup(op, inpart):
1044 def handleremotechangegroup(op, inpart):
1045 """apply a bundle10 on the repo, given an url and validation information
1045 """apply a bundle10 on the repo, given an url and validation information
1046
1046
1047 All the information about the remote bundle to import are given as
1047 All the information about the remote bundle to import are given as
1048 parameters. The parameters include:
1048 parameters. The parameters include:
1049 - url: the url to the bundle10.
1049 - url: the url to the bundle10.
1050 - size: the bundle10 file size. It is used to validate what was
1050 - size: the bundle10 file size. It is used to validate what was
1051 retrieved by the client matches the server knowledge about the bundle.
1051 retrieved by the client matches the server knowledge about the bundle.
1052 - digests: a space separated list of the digest types provided as
1052 - digests: a space separated list of the digest types provided as
1053 parameters.
1053 parameters.
1054 - digest:<digest-type>: the hexadecimal representation of the digest with
1054 - digest:<digest-type>: the hexadecimal representation of the digest with
1055 that name. Like the size, it is used to validate what was retrieved by
1055 that name. Like the size, it is used to validate what was retrieved by
1056 the client matches what the server knows about the bundle.
1056 the client matches what the server knows about the bundle.
1057
1057
1058 When multiple digest types are given, all of them are checked.
1058 When multiple digest types are given, all of them are checked.
1059 """
1059 """
1060 try:
1060 try:
1061 raw_url = inpart.params['url']
1061 raw_url = inpart.params['url']
1062 except KeyError:
1062 except KeyError:
1063 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1063 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1064 parsed_url = util.url(raw_url)
1064 parsed_url = util.url(raw_url)
1065 if parsed_url.scheme not in capabilities['b2x:remote-changegroup']:
1065 if parsed_url.scheme not in capabilities['b2x:remote-changegroup']:
1066 raise util.Abort(_('remote-changegroup does not support %s urls') %
1066 raise util.Abort(_('remote-changegroup does not support %s urls') %
1067 parsed_url.scheme)
1067 parsed_url.scheme)
1068
1068
1069 try:
1069 try:
1070 size = int(inpart.params['size'])
1070 size = int(inpart.params['size'])
1071 except ValueError:
1071 except ValueError:
1072 raise util.Abort(_('remote-changegroup: invalid value for param "%s"')
1072 raise util.Abort(_('remote-changegroup: invalid value for param "%s"')
1073 % 'size')
1073 % 'size')
1074 except KeyError:
1074 except KeyError:
1075 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1075 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1076
1076
1077 digests = {}
1077 digests = {}
1078 for typ in inpart.params.get('digests', '').split():
1078 for typ in inpart.params.get('digests', '').split():
1079 param = 'digest:%s' % typ
1079 param = 'digest:%s' % typ
1080 try:
1080 try:
1081 value = inpart.params[param]
1081 value = inpart.params[param]
1082 except KeyError:
1082 except KeyError:
1083 raise util.Abort(_('remote-changegroup: missing "%s" param') %
1083 raise util.Abort(_('remote-changegroup: missing "%s" param') %
1084 param)
1084 param)
1085 digests[typ] = value
1085 digests[typ] = value
1086
1086
1087 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1087 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1088
1088
1089 # Make sure we trigger a transaction creation
1089 # Make sure we trigger a transaction creation
1090 #
1090 #
1091 # The addchangegroup function will get a transaction object by itself, but
1091 # The addchangegroup function will get a transaction object by itself, but
1092 # we need to make sure we trigger the creation of a transaction object used
1092 # we need to make sure we trigger the creation of a transaction object used
1093 # for the whole processing scope.
1093 # for the whole processing scope.
1094 op.gettransaction()
1094 op.gettransaction()
1095 import exchange
1095 import exchange
1096 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1096 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1097 if not isinstance(cg, changegroup.cg1unpacker):
1097 if not isinstance(cg, changegroup.cg1unpacker):
1098 raise util.Abort(_('%s: not a bundle version 1.0') %
1098 raise util.Abort(_('%s: not a bundle version 1.0') %
1099 util.hidepassword(raw_url))
1099 util.hidepassword(raw_url))
1100 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
1100 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
1101 op.records.add('changegroup', {'return': ret})
1101 op.records.add('changegroup', {'return': ret})
1102 if op.reply is not None:
1102 if op.reply is not None:
1103 # This is definitely not the final form of this
1103 # This is definitely not the final form of this
1104 # return. But one need to start somewhere.
1104 # return. But one need to start somewhere.
1105 part = op.reply.newpart('b2x:reply:changegroup')
1105 part = op.reply.newpart('b2x:reply:changegroup')
1106 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1106 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1107 part.addparam('return', '%i' % ret, mandatory=False)
1107 part.addparam('return', '%i' % ret, mandatory=False)
1108 try:
1108 try:
1109 real_part.validate()
1109 real_part.validate()
1110 except util.Abort, e:
1110 except util.Abort, e:
1111 raise util.Abort(_('bundle at %s is corrupted:\n%s') %
1111 raise util.Abort(_('bundle at %s is corrupted:\n%s') %
1112 (util.hidepassword(raw_url), str(e)))
1112 (util.hidepassword(raw_url), str(e)))
1113 assert not inpart.read()
1113 assert not inpart.read()
1114
1114
1115 @parthandler('b2x:reply:changegroup', ('return', 'in-reply-to'))
1115 @parthandler('b2x:reply:changegroup', ('return', 'in-reply-to'))
1116 def handlereplychangegroup(op, inpart):
1116 def handlereplychangegroup(op, inpart):
1117 ret = int(inpart.params['return'])
1117 ret = int(inpart.params['return'])
1118 replyto = int(inpart.params['in-reply-to'])
1118 replyto = int(inpart.params['in-reply-to'])
1119 op.records.add('changegroup', {'return': ret}, replyto)
1119 op.records.add('changegroup', {'return': ret}, replyto)
1120
1120
1121 @parthandler('b2x:check:heads')
1121 @parthandler('b2x:check:heads')
1122 def handlecheckheads(op, inpart):
1122 def handlecheckheads(op, inpart):
1123 """check that head of the repo did not change
1123 """check that head of the repo did not change
1124
1124
1125 This is used to detect a push race when using unbundle.
1125 This is used to detect a push race when using unbundle.
1126 This replaces the "heads" argument of unbundle."""
1126 This replaces the "heads" argument of unbundle."""
1127 h = inpart.read(20)
1127 h = inpart.read(20)
1128 heads = []
1128 heads = []
1129 while len(h) == 20:
1129 while len(h) == 20:
1130 heads.append(h)
1130 heads.append(h)
1131 h = inpart.read(20)
1131 h = inpart.read(20)
1132 assert not h
1132 assert not h
1133 if heads != op.repo.heads():
1133 if heads != op.repo.heads():
1134 raise error.PushRaced('repository changed while pushing - '
1134 raise error.PushRaced('repository changed while pushing - '
1135 'please try again')
1135 'please try again')
1136
1136
1137 @parthandler('b2x:output')
1137 @parthandler('b2x:output')
1138 def handleoutput(op, inpart):
1138 def handleoutput(op, inpart):
1139 """forward output captured on the server to the client"""
1139 """forward output captured on the server to the client"""
1140 for line in inpart.read().splitlines():
1140 for line in inpart.read().splitlines():
1141 op.ui.write(('remote: %s\n' % line))
1141 op.ui.write(('remote: %s\n' % line))
1142
1142
1143 @parthandler('b2x:replycaps')
1143 @parthandler('b2x:replycaps')
1144 def handlereplycaps(op, inpart):
1144 def handlereplycaps(op, inpart):
1145 """Notify that a reply bundle should be created
1145 """Notify that a reply bundle should be created
1146
1146
1147 The payload contains the capabilities information for the reply"""
1147 The payload contains the capabilities information for the reply"""
1148 caps = decodecaps(inpart.read())
1148 caps = decodecaps(inpart.read())
1149 if op.reply is None:
1149 if op.reply is None:
1150 op.reply = bundle20(op.ui, caps)
1150 op.reply = bundle20(op.ui, caps)
1151
1151
1152 @parthandler('b2x:error:abort', ('message', 'hint'))
1152 @parthandler('b2x:error:abort', ('message', 'hint'))
1153 def handlereplycaps(op, inpart):
1153 def handlereplycaps(op, inpart):
1154 """Used to transmit abort error over the wire"""
1154 """Used to transmit abort error over the wire"""
1155 raise util.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
1155 raise util.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
1156
1156
1157 @parthandler('b2x:error:unsupportedcontent', ('parttype', 'params'))
1157 @parthandler('b2x:error:unsupportedcontent', ('parttype', 'params'))
1158 def handlereplycaps(op, inpart):
1158 def handlereplycaps(op, inpart):
1159 """Used to transmit unknown content error over the wire"""
1159 """Used to transmit unknown content error over the wire"""
1160 kwargs = {}
1160 kwargs = {}
1161 parttype = inpart.params.get('parttype')
1161 parttype = inpart.params.get('parttype')
1162 if parttype is not None:
1162 if parttype is not None:
1163 kwargs['parttype'] = parttype
1163 kwargs['parttype'] = parttype
1164 params = inpart.params.get('params')
1164 params = inpart.params.get('params')
1165 if params is not None:
1165 if params is not None:
1166 kwargs['params'] = params.split('\0')
1166 kwargs['params'] = params.split('\0')
1167
1167
1168 raise error.UnsupportedPartError(**kwargs)
1168 raise error.UnsupportedPartError(**kwargs)
1169
1169
1170 @parthandler('b2x:error:pushraced', ('message',))
1170 @parthandler('b2x:error:pushraced', ('message',))
1171 def handlereplycaps(op, inpart):
1171 def handlereplycaps(op, inpart):
1172 """Used to transmit push race error over the wire"""
1172 """Used to transmit push race error over the wire"""
1173 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1173 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1174
1174
1175 @parthandler('b2x:listkeys', ('namespace',))
1175 @parthandler('b2x:listkeys', ('namespace',))
1176 def handlelistkeys(op, inpart):
1176 def handlelistkeys(op, inpart):
1177 """retrieve pushkey namespace content stored in a bundle2"""
1177 """retrieve pushkey namespace content stored in a bundle2"""
1178 namespace = inpart.params['namespace']
1178 namespace = inpart.params['namespace']
1179 r = pushkey.decodekeys(inpart.read())
1179 r = pushkey.decodekeys(inpart.read())
1180 op.records.add('listkeys', (namespace, r))
1180 op.records.add('listkeys', (namespace, r))
1181
1181
1182 @parthandler('b2x:pushkey', ('namespace', 'key', 'old', 'new'))
1182 @parthandler('b2x:pushkey', ('namespace', 'key', 'old', 'new'))
1183 def handlepushkey(op, inpart):
1183 def handlepushkey(op, inpart):
1184 """process a pushkey request"""
1184 """process a pushkey request"""
1185 dec = pushkey.decode
1185 dec = pushkey.decode
1186 namespace = dec(inpart.params['namespace'])
1186 namespace = dec(inpart.params['namespace'])
1187 key = dec(inpart.params['key'])
1187 key = dec(inpart.params['key'])
1188 old = dec(inpart.params['old'])
1188 old = dec(inpart.params['old'])
1189 new = dec(inpart.params['new'])
1189 new = dec(inpart.params['new'])
1190 ret = op.repo.pushkey(namespace, key, old, new)
1190 ret = op.repo.pushkey(namespace, key, old, new)
1191 record = {'namespace': namespace,
1191 record = {'namespace': namespace,
1192 'key': key,
1192 'key': key,
1193 'old': old,
1193 'old': old,
1194 'new': new}
1194 'new': new}
1195 op.records.add('pushkey', record)
1195 op.records.add('pushkey', record)
1196 if op.reply is not None:
1196 if op.reply is not None:
1197 rpart = op.reply.newpart('b2x:reply:pushkey')
1197 rpart = op.reply.newpart('b2x:reply:pushkey')
1198 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1198 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1199 rpart.addparam('return', '%i' % ret, mandatory=False)
1199 rpart.addparam('return', '%i' % ret, mandatory=False)
1200
1200
1201 @parthandler('b2x:reply:pushkey', ('return', 'in-reply-to'))
1201 @parthandler('b2x:reply:pushkey', ('return', 'in-reply-to'))
1202 def handlepushkeyreply(op, inpart):
1202 def handlepushkeyreply(op, inpart):
1203 """retrieve the result of a pushkey request"""
1203 """retrieve the result of a pushkey request"""
1204 ret = int(inpart.params['return'])
1204 ret = int(inpart.params['return'])
1205 partid = int(inpart.params['in-reply-to'])
1205 partid = int(inpart.params['in-reply-to'])
1206 op.records.add('pushkey', {'return': ret}, partid)
1206 op.records.add('pushkey', {'return': ret}, partid)
1207
1207
1208 @parthandler('b2x:obsmarkers')
1208 @parthandler('b2x:obsmarkers')
1209 def handleobsmarker(op, inpart):
1209 def handleobsmarker(op, inpart):
1210 """add a stream of obsmarkers to the repo"""
1210 """add a stream of obsmarkers to the repo"""
1211 tr = op.gettransaction()
1211 tr = op.gettransaction()
1212 new = op.repo.obsstore.mergemarkers(tr, inpart.read())
1212 new = op.repo.obsstore.mergemarkers(tr, inpart.read())
1213 if new:
1213 if new:
1214 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1214 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1215 op.records.add('obsmarkers', {'new': new})
1215 op.records.add('obsmarkers', {'new': new})
1216 if op.reply is not None:
1216 if op.reply is not None:
1217 rpart = op.reply.newpart('b2x:reply:obsmarkers')
1217 rpart = op.reply.newpart('b2x:reply:obsmarkers')
1218 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1218 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1219 rpart.addparam('new', '%i' % new, mandatory=False)
1219 rpart.addparam('new', '%i' % new, mandatory=False)
1220
1220
1221
1221
1222 @parthandler('b2x:reply:obsmarkers', ('new', 'in-reply-to'))
1222 @parthandler('b2x:reply:obsmarkers', ('new', 'in-reply-to'))
1223 def handlepushkeyreply(op, inpart):
1223 def handlepushkeyreply(op, inpart):
1224 """retrieve the result of a pushkey request"""
1224 """retrieve the result of a pushkey request"""
1225 ret = int(inpart.params['new'])
1225 ret = int(inpart.params['new'])
1226 partid = int(inpart.params['in-reply-to'])
1226 partid = int(inpart.params['in-reply-to'])
1227 op.records.add('obsmarkers', {'new': ret}, partid)
1227 op.records.add('obsmarkers', {'new': ret}, partid)
General Comments 0
You need to be logged in to leave comments. Login now