##// END OF EJS Templates
bundle2.unpackermixin: default value for seek() whence parameter...
Eric Sumner -
r24070:de32e988 default
parent child Browse files
Show More
@@ -1,1225 +1,1225 b''
1 # bundle2.py - generic container format to transmit arbitrary data.
1 # bundle2.py - generic container format to transmit arbitrary data.
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7 """Handling of the new bundle2 format
7 """Handling of the new bundle2 format
8
8
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 payloads in an application agnostic way. It consist in a sequence of "parts"
10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 that will be handed to and processed by the application layer.
11 that will be handed to and processed by the application layer.
12
12
13
13
14 General format architecture
14 General format architecture
15 ===========================
15 ===========================
16
16
17 The format is architectured as follow
17 The format is architectured as follow
18
18
19 - magic string
19 - magic string
20 - stream level parameters
20 - stream level parameters
21 - payload parts (any number)
21 - payload parts (any number)
22 - end of stream marker.
22 - end of stream marker.
23
23
24 the Binary format
24 the Binary format
25 ============================
25 ============================
26
26
27 All numbers are unsigned and big-endian.
27 All numbers are unsigned and big-endian.
28
28
29 stream level parameters
29 stream level parameters
30 ------------------------
30 ------------------------
31
31
32 Binary format is as follow
32 Binary format is as follow
33
33
34 :params size: int32
34 :params size: int32
35
35
36 The total number of Bytes used by the parameters
36 The total number of Bytes used by the parameters
37
37
38 :params value: arbitrary number of Bytes
38 :params value: arbitrary number of Bytes
39
39
40 A blob of `params size` containing the serialized version of all stream level
40 A blob of `params size` containing the serialized version of all stream level
41 parameters.
41 parameters.
42
42
43 The blob contains a space separated list of parameters. Parameters with value
43 The blob contains a space separated list of parameters. Parameters with value
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45
45
46 Empty name are obviously forbidden.
46 Empty name are obviously forbidden.
47
47
48 Name MUST start with a letter. If this first letter is lower case, the
48 Name MUST start with a letter. If this first letter is lower case, the
49 parameter is advisory and can be safely ignored. However when the first
49 parameter is advisory and can be safely ignored. However when the first
50 letter is capital, the parameter is mandatory and the bundling process MUST
50 letter is capital, the parameter is mandatory and the bundling process MUST
51 stop if he is not able to proceed it.
51 stop if he is not able to proceed it.
52
52
53 Stream parameters use a simple textual format for two main reasons:
53 Stream parameters use a simple textual format for two main reasons:
54
54
55 - Stream level parameters should remain simple and we want to discourage any
55 - Stream level parameters should remain simple and we want to discourage any
56 crazy usage.
56 crazy usage.
57 - Textual data allow easy human inspection of a bundle2 header in case of
57 - Textual data allow easy human inspection of a bundle2 header in case of
58 troubles.
58 troubles.
59
59
60 Any Applicative level options MUST go into a bundle2 part instead.
60 Any Applicative level options MUST go into a bundle2 part instead.
61
61
62 Payload part
62 Payload part
63 ------------------------
63 ------------------------
64
64
65 Binary format is as follow
65 Binary format is as follow
66
66
67 :header size: int32
67 :header size: int32
68
68
69 The total number of Bytes used by the part headers. When the header is empty
69 The total number of Bytes used by the part headers. When the header is empty
70 (size = 0) this is interpreted as the end of stream marker.
70 (size = 0) this is interpreted as the end of stream marker.
71
71
72 :header:
72 :header:
73
73
74 The header defines how to interpret the part. It contains two piece of
74 The header defines how to interpret the part. It contains two piece of
75 data: the part type, and the part parameters.
75 data: the part type, and the part parameters.
76
76
77 The part type is used to route an application level handler, that can
77 The part type is used to route an application level handler, that can
78 interpret payload.
78 interpret payload.
79
79
80 Part parameters are passed to the application level handler. They are
80 Part parameters are passed to the application level handler. They are
81 meant to convey information that will help the application level object to
81 meant to convey information that will help the application level object to
82 interpret the part payload.
82 interpret the part payload.
83
83
84 The binary format of the header is has follow
84 The binary format of the header is has follow
85
85
86 :typesize: (one byte)
86 :typesize: (one byte)
87
87
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89
89
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 to this part.
91 to this part.
92
92
93 :parameters:
93 :parameters:
94
94
95 Part's parameter may have arbitrary content, the binary structure is::
95 Part's parameter may have arbitrary content, the binary structure is::
96
96
97 <mandatory-count><advisory-count><param-sizes><param-data>
97 <mandatory-count><advisory-count><param-sizes><param-data>
98
98
99 :mandatory-count: 1 byte, number of mandatory parameters
99 :mandatory-count: 1 byte, number of mandatory parameters
100
100
101 :advisory-count: 1 byte, number of advisory parameters
101 :advisory-count: 1 byte, number of advisory parameters
102
102
103 :param-sizes:
103 :param-sizes:
104
104
105 N couple of bytes, where N is the total number of parameters. Each
105 N couple of bytes, where N is the total number of parameters. Each
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107
107
108 :param-data:
108 :param-data:
109
109
110 A blob of bytes from which each parameter key and value can be
110 A blob of bytes from which each parameter key and value can be
111 retrieved using the list of size couples stored in the previous
111 retrieved using the list of size couples stored in the previous
112 field.
112 field.
113
113
114 Mandatory parameters comes first, then the advisory ones.
114 Mandatory parameters comes first, then the advisory ones.
115
115
116 Each parameter's key MUST be unique within the part.
116 Each parameter's key MUST be unique within the part.
117
117
118 :payload:
118 :payload:
119
119
120 payload is a series of `<chunksize><chunkdata>`.
120 payload is a series of `<chunksize><chunkdata>`.
121
121
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124
124
125 The current implementation always produces either zero or one chunk.
125 The current implementation always produces either zero or one chunk.
126 This is an implementation limitation that will ultimately be lifted.
126 This is an implementation limitation that will ultimately be lifted.
127
127
128 `chunksize` can be negative to trigger special case processing. No such
128 `chunksize` can be negative to trigger special case processing. No such
129 processing is in place yet.
129 processing is in place yet.
130
130
131 Bundle processing
131 Bundle processing
132 ============================
132 ============================
133
133
134 Each part is processed in order using a "part handler". Handler are registered
134 Each part is processed in order using a "part handler". Handler are registered
135 for a certain part type.
135 for a certain part type.
136
136
137 The matching of a part to its handler is case insensitive. The case of the
137 The matching of a part to its handler is case insensitive. The case of the
138 part type is used to know if a part is mandatory or advisory. If the Part type
138 part type is used to know if a part is mandatory or advisory. If the Part type
139 contains any uppercase char it is considered mandatory. When no handler is
139 contains any uppercase char it is considered mandatory. When no handler is
140 known for a Mandatory part, the process is aborted and an exception is raised.
140 known for a Mandatory part, the process is aborted and an exception is raised.
141 If the part is advisory and no handler is known, the part is ignored. When the
141 If the part is advisory and no handler is known, the part is ignored. When the
142 process is aborted, the full bundle is still read from the stream to keep the
142 process is aborted, the full bundle is still read from the stream to keep the
143 channel usable. But none of the part read from an abort are processed. In the
143 channel usable. But none of the part read from an abort are processed. In the
144 future, dropping the stream may become an option for channel we do not care to
144 future, dropping the stream may become an option for channel we do not care to
145 preserve.
145 preserve.
146 """
146 """
147
147
148 import errno
148 import errno
149 import sys
149 import sys
150 import util
150 import util
151 import struct
151 import struct
152 import urllib
152 import urllib
153 import string
153 import string
154 import obsolete
154 import obsolete
155 import pushkey
155 import pushkey
156 import url
156 import url
157 import re
157 import re
158
158
159 import changegroup, error
159 import changegroup, error
160 from i18n import _
160 from i18n import _
161
161
162 _pack = struct.pack
162 _pack = struct.pack
163 _unpack = struct.unpack
163 _unpack = struct.unpack
164
164
165 _magicstring = 'HG2Y'
165 _magicstring = 'HG2Y'
166
166
167 _fstreamparamsize = '>i'
167 _fstreamparamsize = '>i'
168 _fpartheadersize = '>i'
168 _fpartheadersize = '>i'
169 _fparttypesize = '>B'
169 _fparttypesize = '>B'
170 _fpartid = '>I'
170 _fpartid = '>I'
171 _fpayloadsize = '>i'
171 _fpayloadsize = '>i'
172 _fpartparamcount = '>BB'
172 _fpartparamcount = '>BB'
173
173
174 preferedchunksize = 4096
174 preferedchunksize = 4096
175
175
176 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
176 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
177
177
178 def validateparttype(parttype):
178 def validateparttype(parttype):
179 """raise ValueError if a parttype contains invalid character"""
179 """raise ValueError if a parttype contains invalid character"""
180 if _parttypeforbidden.search(parttype):
180 if _parttypeforbidden.search(parttype):
181 raise ValueError(parttype)
181 raise ValueError(parttype)
182
182
183 def _makefpartparamsizes(nbparams):
183 def _makefpartparamsizes(nbparams):
184 """return a struct format to read part parameter sizes
184 """return a struct format to read part parameter sizes
185
185
186 The number parameters is variable so we need to build that format
186 The number parameters is variable so we need to build that format
187 dynamically.
187 dynamically.
188 """
188 """
189 return '>'+('BB'*nbparams)
189 return '>'+('BB'*nbparams)
190
190
191 parthandlermapping = {}
191 parthandlermapping = {}
192
192
193 def parthandler(parttype, params=()):
193 def parthandler(parttype, params=()):
194 """decorator that register a function as a bundle2 part handler
194 """decorator that register a function as a bundle2 part handler
195
195
196 eg::
196 eg::
197
197
198 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
198 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
199 def myparttypehandler(...):
199 def myparttypehandler(...):
200 '''process a part of type "my part".'''
200 '''process a part of type "my part".'''
201 ...
201 ...
202 """
202 """
203 validateparttype(parttype)
203 validateparttype(parttype)
204 def _decorator(func):
204 def _decorator(func):
205 lparttype = parttype.lower() # enforce lower case matching.
205 lparttype = parttype.lower() # enforce lower case matching.
206 assert lparttype not in parthandlermapping
206 assert lparttype not in parthandlermapping
207 parthandlermapping[lparttype] = func
207 parthandlermapping[lparttype] = func
208 func.params = frozenset(params)
208 func.params = frozenset(params)
209 return func
209 return func
210 return _decorator
210 return _decorator
211
211
212 class unbundlerecords(object):
212 class unbundlerecords(object):
213 """keep record of what happens during and unbundle
213 """keep record of what happens during and unbundle
214
214
215 New records are added using `records.add('cat', obj)`. Where 'cat' is a
215 New records are added using `records.add('cat', obj)`. Where 'cat' is a
216 category of record and obj is an arbitrary object.
216 category of record and obj is an arbitrary object.
217
217
218 `records['cat']` will return all entries of this category 'cat'.
218 `records['cat']` will return all entries of this category 'cat'.
219
219
220 Iterating on the object itself will yield `('category', obj)` tuples
220 Iterating on the object itself will yield `('category', obj)` tuples
221 for all entries.
221 for all entries.
222
222
223 All iterations happens in chronological order.
223 All iterations happens in chronological order.
224 """
224 """
225
225
226 def __init__(self):
226 def __init__(self):
227 self._categories = {}
227 self._categories = {}
228 self._sequences = []
228 self._sequences = []
229 self._replies = {}
229 self._replies = {}
230
230
231 def add(self, category, entry, inreplyto=None):
231 def add(self, category, entry, inreplyto=None):
232 """add a new record of a given category.
232 """add a new record of a given category.
233
233
234 The entry can then be retrieved in the list returned by
234 The entry can then be retrieved in the list returned by
235 self['category']."""
235 self['category']."""
236 self._categories.setdefault(category, []).append(entry)
236 self._categories.setdefault(category, []).append(entry)
237 self._sequences.append((category, entry))
237 self._sequences.append((category, entry))
238 if inreplyto is not None:
238 if inreplyto is not None:
239 self.getreplies(inreplyto).add(category, entry)
239 self.getreplies(inreplyto).add(category, entry)
240
240
241 def getreplies(self, partid):
241 def getreplies(self, partid):
242 """get the records that are replies to a specific part"""
242 """get the records that are replies to a specific part"""
243 return self._replies.setdefault(partid, unbundlerecords())
243 return self._replies.setdefault(partid, unbundlerecords())
244
244
245 def __getitem__(self, cat):
245 def __getitem__(self, cat):
246 return tuple(self._categories.get(cat, ()))
246 return tuple(self._categories.get(cat, ()))
247
247
248 def __iter__(self):
248 def __iter__(self):
249 return iter(self._sequences)
249 return iter(self._sequences)
250
250
251 def __len__(self):
251 def __len__(self):
252 return len(self._sequences)
252 return len(self._sequences)
253
253
254 def __nonzero__(self):
254 def __nonzero__(self):
255 return bool(self._sequences)
255 return bool(self._sequences)
256
256
257 class bundleoperation(object):
257 class bundleoperation(object):
258 """an object that represents a single bundling process
258 """an object that represents a single bundling process
259
259
260 Its purpose is to carry unbundle-related objects and states.
260 Its purpose is to carry unbundle-related objects and states.
261
261
262 A new object should be created at the beginning of each bundle processing.
262 A new object should be created at the beginning of each bundle processing.
263 The object is to be returned by the processing function.
263 The object is to be returned by the processing function.
264
264
265 The object has very little content now it will ultimately contain:
265 The object has very little content now it will ultimately contain:
266 * an access to the repo the bundle is applied to,
266 * an access to the repo the bundle is applied to,
267 * a ui object,
267 * a ui object,
268 * a way to retrieve a transaction to add changes to the repo,
268 * a way to retrieve a transaction to add changes to the repo,
269 * a way to record the result of processing each part,
269 * a way to record the result of processing each part,
270 * a way to construct a bundle response when applicable.
270 * a way to construct a bundle response when applicable.
271 """
271 """
272
272
273 def __init__(self, repo, transactiongetter):
273 def __init__(self, repo, transactiongetter):
274 self.repo = repo
274 self.repo = repo
275 self.ui = repo.ui
275 self.ui = repo.ui
276 self.records = unbundlerecords()
276 self.records = unbundlerecords()
277 self.gettransaction = transactiongetter
277 self.gettransaction = transactiongetter
278 self.reply = None
278 self.reply = None
279
279
280 class TransactionUnavailable(RuntimeError):
280 class TransactionUnavailable(RuntimeError):
281 pass
281 pass
282
282
283 def _notransaction():
283 def _notransaction():
284 """default method to get a transaction while processing a bundle
284 """default method to get a transaction while processing a bundle
285
285
286 Raise an exception to highlight the fact that no transaction was expected
286 Raise an exception to highlight the fact that no transaction was expected
287 to be created"""
287 to be created"""
288 raise TransactionUnavailable()
288 raise TransactionUnavailable()
289
289
290 def processbundle(repo, unbundler, transactiongetter=None):
290 def processbundle(repo, unbundler, transactiongetter=None):
291 """This function process a bundle, apply effect to/from a repo
291 """This function process a bundle, apply effect to/from a repo
292
292
293 It iterates over each part then searches for and uses the proper handling
293 It iterates over each part then searches for and uses the proper handling
294 code to process the part. Parts are processed in order.
294 code to process the part. Parts are processed in order.
295
295
296 This is very early version of this function that will be strongly reworked
296 This is very early version of this function that will be strongly reworked
297 before final usage.
297 before final usage.
298
298
299 Unknown Mandatory part will abort the process.
299 Unknown Mandatory part will abort the process.
300 """
300 """
301 if transactiongetter is None:
301 if transactiongetter is None:
302 transactiongetter = _notransaction
302 transactiongetter = _notransaction
303 op = bundleoperation(repo, transactiongetter)
303 op = bundleoperation(repo, transactiongetter)
304 # todo:
304 # todo:
305 # - replace this is a init function soon.
305 # - replace this is a init function soon.
306 # - exception catching
306 # - exception catching
307 unbundler.params
307 unbundler.params
308 iterparts = unbundler.iterparts()
308 iterparts = unbundler.iterparts()
309 part = None
309 part = None
310 try:
310 try:
311 for part in iterparts:
311 for part in iterparts:
312 _processpart(op, part)
312 _processpart(op, part)
313 except Exception, exc:
313 except Exception, exc:
314 for part in iterparts:
314 for part in iterparts:
315 # consume the bundle content
315 # consume the bundle content
316 part.seek(0, 2)
316 part.seek(0, 2)
317 # Small hack to let caller code distinguish exceptions from bundle2
317 # Small hack to let caller code distinguish exceptions from bundle2
318 # processing from processing the old format. This is mostly
318 # processing from processing the old format. This is mostly
319 # needed to handle different return codes to unbundle according to the
319 # needed to handle different return codes to unbundle according to the
320 # type of bundle. We should probably clean up or drop this return code
320 # type of bundle. We should probably clean up or drop this return code
321 # craziness in a future version.
321 # craziness in a future version.
322 exc.duringunbundle2 = True
322 exc.duringunbundle2 = True
323 raise
323 raise
324 return op
324 return op
325
325
326 def _processpart(op, part):
326 def _processpart(op, part):
327 """process a single part from a bundle
327 """process a single part from a bundle
328
328
329 The part is guaranteed to have been fully consumed when the function exits
329 The part is guaranteed to have been fully consumed when the function exits
330 (even if an exception is raised)."""
330 (even if an exception is raised)."""
331 try:
331 try:
332 try:
332 try:
333 handler = parthandlermapping.get(part.type)
333 handler = parthandlermapping.get(part.type)
334 if handler is None:
334 if handler is None:
335 raise error.UnsupportedPartError(parttype=part.type)
335 raise error.UnsupportedPartError(parttype=part.type)
336 op.ui.debug('found a handler for part %r\n' % part.type)
336 op.ui.debug('found a handler for part %r\n' % part.type)
337 unknownparams = part.mandatorykeys - handler.params
337 unknownparams = part.mandatorykeys - handler.params
338 if unknownparams:
338 if unknownparams:
339 unknownparams = list(unknownparams)
339 unknownparams = list(unknownparams)
340 unknownparams.sort()
340 unknownparams.sort()
341 raise error.UnsupportedPartError(parttype=part.type,
341 raise error.UnsupportedPartError(parttype=part.type,
342 params=unknownparams)
342 params=unknownparams)
343 except error.UnsupportedPartError, exc:
343 except error.UnsupportedPartError, exc:
344 if part.mandatory: # mandatory parts
344 if part.mandatory: # mandatory parts
345 raise
345 raise
346 op.ui.debug('ignoring unsupported advisory part %s\n' % exc)
346 op.ui.debug('ignoring unsupported advisory part %s\n' % exc)
347 return # skip to part processing
347 return # skip to part processing
348
348
349 # handler is called outside the above try block so that we don't
349 # handler is called outside the above try block so that we don't
350 # risk catching KeyErrors from anything other than the
350 # risk catching KeyErrors from anything other than the
351 # parthandlermapping lookup (any KeyError raised by handler()
351 # parthandlermapping lookup (any KeyError raised by handler()
352 # itself represents a defect of a different variety).
352 # itself represents a defect of a different variety).
353 output = None
353 output = None
354 if op.reply is not None:
354 if op.reply is not None:
355 op.ui.pushbuffer(error=True)
355 op.ui.pushbuffer(error=True)
356 output = ''
356 output = ''
357 try:
357 try:
358 handler(op, part)
358 handler(op, part)
359 finally:
359 finally:
360 if output is not None:
360 if output is not None:
361 output = op.ui.popbuffer()
361 output = op.ui.popbuffer()
362 if output:
362 if output:
363 outpart = op.reply.newpart('b2x:output', data=output,
363 outpart = op.reply.newpart('b2x:output', data=output,
364 mandatory=False)
364 mandatory=False)
365 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
365 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
366 finally:
366 finally:
367 # consume the part content to not corrupt the stream.
367 # consume the part content to not corrupt the stream.
368 part.seek(0, 2)
368 part.seek(0, 2)
369
369
370
370
371 def decodecaps(blob):
371 def decodecaps(blob):
372 """decode a bundle2 caps bytes blob into a dictionary
372 """decode a bundle2 caps bytes blob into a dictionary
373
373
374 The blob is a list of capabilities (one per line)
374 The blob is a list of capabilities (one per line)
375 Capabilities may have values using a line of the form::
375 Capabilities may have values using a line of the form::
376
376
377 capability=value1,value2,value3
377 capability=value1,value2,value3
378
378
379 The values are always a list."""
379 The values are always a list."""
380 caps = {}
380 caps = {}
381 for line in blob.splitlines():
381 for line in blob.splitlines():
382 if not line:
382 if not line:
383 continue
383 continue
384 if '=' not in line:
384 if '=' not in line:
385 key, vals = line, ()
385 key, vals = line, ()
386 else:
386 else:
387 key, vals = line.split('=', 1)
387 key, vals = line.split('=', 1)
388 vals = vals.split(',')
388 vals = vals.split(',')
389 key = urllib.unquote(key)
389 key = urllib.unquote(key)
390 vals = [urllib.unquote(v) for v in vals]
390 vals = [urllib.unquote(v) for v in vals]
391 caps[key] = vals
391 caps[key] = vals
392 return caps
392 return caps
393
393
394 def encodecaps(caps):
394 def encodecaps(caps):
395 """encode a bundle2 caps dictionary into a bytes blob"""
395 """encode a bundle2 caps dictionary into a bytes blob"""
396 chunks = []
396 chunks = []
397 for ca in sorted(caps):
397 for ca in sorted(caps):
398 vals = caps[ca]
398 vals = caps[ca]
399 ca = urllib.quote(ca)
399 ca = urllib.quote(ca)
400 vals = [urllib.quote(v) for v in vals]
400 vals = [urllib.quote(v) for v in vals]
401 if vals:
401 if vals:
402 ca = "%s=%s" % (ca, ','.join(vals))
402 ca = "%s=%s" % (ca, ','.join(vals))
403 chunks.append(ca)
403 chunks.append(ca)
404 return '\n'.join(chunks)
404 return '\n'.join(chunks)
405
405
406 class bundle20(object):
406 class bundle20(object):
407 """represent an outgoing bundle2 container
407 """represent an outgoing bundle2 container
408
408
409 Use the `addparam` method to add stream level parameter. and `newpart` to
409 Use the `addparam` method to add stream level parameter. and `newpart` to
410 populate it. Then call `getchunks` to retrieve all the binary chunks of
410 populate it. Then call `getchunks` to retrieve all the binary chunks of
411 data that compose the bundle2 container."""
411 data that compose the bundle2 container."""
412
412
413 def __init__(self, ui, capabilities=()):
413 def __init__(self, ui, capabilities=()):
414 self.ui = ui
414 self.ui = ui
415 self._params = []
415 self._params = []
416 self._parts = []
416 self._parts = []
417 self.capabilities = dict(capabilities)
417 self.capabilities = dict(capabilities)
418
418
419 @property
419 @property
420 def nbparts(self):
420 def nbparts(self):
421 """total number of parts added to the bundler"""
421 """total number of parts added to the bundler"""
422 return len(self._parts)
422 return len(self._parts)
423
423
424 # methods used to defines the bundle2 content
424 # methods used to defines the bundle2 content
425 def addparam(self, name, value=None):
425 def addparam(self, name, value=None):
426 """add a stream level parameter"""
426 """add a stream level parameter"""
427 if not name:
427 if not name:
428 raise ValueError('empty parameter name')
428 raise ValueError('empty parameter name')
429 if name[0] not in string.letters:
429 if name[0] not in string.letters:
430 raise ValueError('non letter first character: %r' % name)
430 raise ValueError('non letter first character: %r' % name)
431 self._params.append((name, value))
431 self._params.append((name, value))
432
432
433 def addpart(self, part):
433 def addpart(self, part):
434 """add a new part to the bundle2 container
434 """add a new part to the bundle2 container
435
435
436 Parts contains the actual applicative payload."""
436 Parts contains the actual applicative payload."""
437 assert part.id is None
437 assert part.id is None
438 part.id = len(self._parts) # very cheap counter
438 part.id = len(self._parts) # very cheap counter
439 self._parts.append(part)
439 self._parts.append(part)
440
440
441 def newpart(self, typeid, *args, **kwargs):
441 def newpart(self, typeid, *args, **kwargs):
442 """create a new part and add it to the containers
442 """create a new part and add it to the containers
443
443
444 As the part is directly added to the containers. For now, this means
444 As the part is directly added to the containers. For now, this means
445 that any failure to properly initialize the part after calling
445 that any failure to properly initialize the part after calling
446 ``newpart`` should result in a failure of the whole bundling process.
446 ``newpart`` should result in a failure of the whole bundling process.
447
447
448 You can still fall back to manually create and add if you need better
448 You can still fall back to manually create and add if you need better
449 control."""
449 control."""
450 part = bundlepart(typeid, *args, **kwargs)
450 part = bundlepart(typeid, *args, **kwargs)
451 self.addpart(part)
451 self.addpart(part)
452 return part
452 return part
453
453
454 # methods used to generate the bundle2 stream
454 # methods used to generate the bundle2 stream
455 def getchunks(self):
455 def getchunks(self):
456 self.ui.debug('start emission of %s stream\n' % _magicstring)
456 self.ui.debug('start emission of %s stream\n' % _magicstring)
457 yield _magicstring
457 yield _magicstring
458 param = self._paramchunk()
458 param = self._paramchunk()
459 self.ui.debug('bundle parameter: %s\n' % param)
459 self.ui.debug('bundle parameter: %s\n' % param)
460 yield _pack(_fstreamparamsize, len(param))
460 yield _pack(_fstreamparamsize, len(param))
461 if param:
461 if param:
462 yield param
462 yield param
463
463
464 self.ui.debug('start of parts\n')
464 self.ui.debug('start of parts\n')
465 for part in self._parts:
465 for part in self._parts:
466 self.ui.debug('bundle part: "%s"\n' % part.type)
466 self.ui.debug('bundle part: "%s"\n' % part.type)
467 for chunk in part.getchunks():
467 for chunk in part.getchunks():
468 yield chunk
468 yield chunk
469 self.ui.debug('end of bundle\n')
469 self.ui.debug('end of bundle\n')
470 yield _pack(_fpartheadersize, 0)
470 yield _pack(_fpartheadersize, 0)
471
471
472 def _paramchunk(self):
472 def _paramchunk(self):
473 """return a encoded version of all stream parameters"""
473 """return a encoded version of all stream parameters"""
474 blocks = []
474 blocks = []
475 for par, value in self._params:
475 for par, value in self._params:
476 par = urllib.quote(par)
476 par = urllib.quote(par)
477 if value is not None:
477 if value is not None:
478 value = urllib.quote(value)
478 value = urllib.quote(value)
479 par = '%s=%s' % (par, value)
479 par = '%s=%s' % (par, value)
480 blocks.append(par)
480 blocks.append(par)
481 return ' '.join(blocks)
481 return ' '.join(blocks)
482
482
483 class unpackermixin(object):
483 class unpackermixin(object):
484 """A mixin to extract bytes and struct data from a stream"""
484 """A mixin to extract bytes and struct data from a stream"""
485
485
486 def __init__(self, fp):
486 def __init__(self, fp):
487 self._fp = fp
487 self._fp = fp
488 self._seekable = (util.safehasattr(fp, 'seek') and
488 self._seekable = (util.safehasattr(fp, 'seek') and
489 util.safehasattr(fp, 'tell'))
489 util.safehasattr(fp, 'tell'))
490
490
491 def _unpack(self, format):
491 def _unpack(self, format):
492 """unpack this struct format from the stream"""
492 """unpack this struct format from the stream"""
493 data = self._readexact(struct.calcsize(format))
493 data = self._readexact(struct.calcsize(format))
494 return _unpack(format, data)
494 return _unpack(format, data)
495
495
496 def _readexact(self, size):
496 def _readexact(self, size):
497 """read exactly <size> bytes from the stream"""
497 """read exactly <size> bytes from the stream"""
498 return changegroup.readexactly(self._fp, size)
498 return changegroup.readexactly(self._fp, size)
499
499
500 def seek(self, offset, whence):
500 def seek(self, offset, whence=0):
501 """move the underlying file pointer"""
501 """move the underlying file pointer"""
502 if self._seekable:
502 if self._seekable:
503 return self._fp.seek(offset, whence)
503 return self._fp.seek(offset, whence)
504 else:
504 else:
505 raise NotImplementedError(_('File pointer is not seekable'))
505 raise NotImplementedError(_('File pointer is not seekable'))
506
506
507 def tell(self):
507 def tell(self):
508 """return the file offset, or None if file is not seekable"""
508 """return the file offset, or None if file is not seekable"""
509 if self._seekable:
509 if self._seekable:
510 try:
510 try:
511 return self._fp.tell()
511 return self._fp.tell()
512 except IOError, e:
512 except IOError, e:
513 if e.errno == errno.ESPIPE:
513 if e.errno == errno.ESPIPE:
514 self._seekable = False
514 self._seekable = False
515 else:
515 else:
516 raise
516 raise
517 return None
517 return None
518
518
519 def close(self):
519 def close(self):
520 """close underlying file"""
520 """close underlying file"""
521 if util.safehasattr(self._fp, 'close'):
521 if util.safehasattr(self._fp, 'close'):
522 return self._fp.close()
522 return self._fp.close()
523
523
524 class unbundle20(unpackermixin):
524 class unbundle20(unpackermixin):
525 """interpret a bundle2 stream
525 """interpret a bundle2 stream
526
526
527 This class is fed with a binary stream and yields parts through its
527 This class is fed with a binary stream and yields parts through its
528 `iterparts` methods."""
528 `iterparts` methods."""
529
529
530 def __init__(self, ui, fp, header=None):
530 def __init__(self, ui, fp, header=None):
531 """If header is specified, we do not read it out of the stream."""
531 """If header is specified, we do not read it out of the stream."""
532 self.ui = ui
532 self.ui = ui
533 super(unbundle20, self).__init__(fp)
533 super(unbundle20, self).__init__(fp)
534 if header is None:
534 if header is None:
535 header = self._readexact(4)
535 header = self._readexact(4)
536 magic, version = header[0:2], header[2:4]
536 magic, version = header[0:2], header[2:4]
537 if magic != 'HG':
537 if magic != 'HG':
538 raise util.Abort(_('not a Mercurial bundle'))
538 raise util.Abort(_('not a Mercurial bundle'))
539 if version != '2Y':
539 if version != '2Y':
540 raise util.Abort(_('unknown bundle version %s') % version)
540 raise util.Abort(_('unknown bundle version %s') % version)
541 self.ui.debug('start processing of %s stream\n' % header)
541 self.ui.debug('start processing of %s stream\n' % header)
542
542
543 @util.propertycache
543 @util.propertycache
544 def params(self):
544 def params(self):
545 """dictionary of stream level parameters"""
545 """dictionary of stream level parameters"""
546 self.ui.debug('reading bundle2 stream parameters\n')
546 self.ui.debug('reading bundle2 stream parameters\n')
547 params = {}
547 params = {}
548 paramssize = self._unpack(_fstreamparamsize)[0]
548 paramssize = self._unpack(_fstreamparamsize)[0]
549 if paramssize < 0:
549 if paramssize < 0:
550 raise error.BundleValueError('negative bundle param size: %i'
550 raise error.BundleValueError('negative bundle param size: %i'
551 % paramssize)
551 % paramssize)
552 if paramssize:
552 if paramssize:
553 for p in self._readexact(paramssize).split(' '):
553 for p in self._readexact(paramssize).split(' '):
554 p = p.split('=', 1)
554 p = p.split('=', 1)
555 p = [urllib.unquote(i) for i in p]
555 p = [urllib.unquote(i) for i in p]
556 if len(p) < 2:
556 if len(p) < 2:
557 p.append(None)
557 p.append(None)
558 self._processparam(*p)
558 self._processparam(*p)
559 params[p[0]] = p[1]
559 params[p[0]] = p[1]
560 return params
560 return params
561
561
562 def _processparam(self, name, value):
562 def _processparam(self, name, value):
563 """process a parameter, applying its effect if needed
563 """process a parameter, applying its effect if needed
564
564
565 Parameter starting with a lower case letter are advisory and will be
565 Parameter starting with a lower case letter are advisory and will be
566 ignored when unknown. Those starting with an upper case letter are
566 ignored when unknown. Those starting with an upper case letter are
567 mandatory and will this function will raise a KeyError when unknown.
567 mandatory and will this function will raise a KeyError when unknown.
568
568
569 Note: no option are currently supported. Any input will be either
569 Note: no option are currently supported. Any input will be either
570 ignored or failing.
570 ignored or failing.
571 """
571 """
572 if not name:
572 if not name:
573 raise ValueError('empty parameter name')
573 raise ValueError('empty parameter name')
574 if name[0] not in string.letters:
574 if name[0] not in string.letters:
575 raise ValueError('non letter first character: %r' % name)
575 raise ValueError('non letter first character: %r' % name)
576 # Some logic will be later added here to try to process the option for
576 # Some logic will be later added here to try to process the option for
577 # a dict of known parameter.
577 # a dict of known parameter.
578 if name[0].islower():
578 if name[0].islower():
579 self.ui.debug("ignoring unknown parameter %r\n" % name)
579 self.ui.debug("ignoring unknown parameter %r\n" % name)
580 else:
580 else:
581 raise error.UnsupportedPartError(params=(name,))
581 raise error.UnsupportedPartError(params=(name,))
582
582
583
583
584 def iterparts(self):
584 def iterparts(self):
585 """yield all parts contained in the stream"""
585 """yield all parts contained in the stream"""
586 # make sure param have been loaded
586 # make sure param have been loaded
587 self.params
587 self.params
588 self.ui.debug('start extraction of bundle2 parts\n')
588 self.ui.debug('start extraction of bundle2 parts\n')
589 headerblock = self._readpartheader()
589 headerblock = self._readpartheader()
590 while headerblock is not None:
590 while headerblock is not None:
591 part = unbundlepart(self.ui, headerblock, self._fp)
591 part = unbundlepart(self.ui, headerblock, self._fp)
592 yield part
592 yield part
593 part.seek(0, 2)
593 part.seek(0, 2)
594 headerblock = self._readpartheader()
594 headerblock = self._readpartheader()
595 self.ui.debug('end of bundle2 stream\n')
595 self.ui.debug('end of bundle2 stream\n')
596
596
597 def _readpartheader(self):
597 def _readpartheader(self):
598 """reads a part header size and return the bytes blob
598 """reads a part header size and return the bytes blob
599
599
600 returns None if empty"""
600 returns None if empty"""
601 headersize = self._unpack(_fpartheadersize)[0]
601 headersize = self._unpack(_fpartheadersize)[0]
602 if headersize < 0:
602 if headersize < 0:
603 raise error.BundleValueError('negative part header size: %i'
603 raise error.BundleValueError('negative part header size: %i'
604 % headersize)
604 % headersize)
605 self.ui.debug('part header size: %i\n' % headersize)
605 self.ui.debug('part header size: %i\n' % headersize)
606 if headersize:
606 if headersize:
607 return self._readexact(headersize)
607 return self._readexact(headersize)
608 return None
608 return None
609
609
610
610
611 class bundlepart(object):
611 class bundlepart(object):
612 """A bundle2 part contains application level payload
612 """A bundle2 part contains application level payload
613
613
614 The part `type` is used to route the part to the application level
614 The part `type` is used to route the part to the application level
615 handler.
615 handler.
616
616
617 The part payload is contained in ``part.data``. It could be raw bytes or a
617 The part payload is contained in ``part.data``. It could be raw bytes or a
618 generator of byte chunks.
618 generator of byte chunks.
619
619
620 You can add parameters to the part using the ``addparam`` method.
620 You can add parameters to the part using the ``addparam`` method.
621 Parameters can be either mandatory (default) or advisory. Remote side
621 Parameters can be either mandatory (default) or advisory. Remote side
622 should be able to safely ignore the advisory ones.
622 should be able to safely ignore the advisory ones.
623
623
624 Both data and parameters cannot be modified after the generation has begun.
624 Both data and parameters cannot be modified after the generation has begun.
625 """
625 """
626
626
627 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
627 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
628 data='', mandatory=True):
628 data='', mandatory=True):
629 validateparttype(parttype)
629 validateparttype(parttype)
630 self.id = None
630 self.id = None
631 self.type = parttype
631 self.type = parttype
632 self._data = data
632 self._data = data
633 self._mandatoryparams = list(mandatoryparams)
633 self._mandatoryparams = list(mandatoryparams)
634 self._advisoryparams = list(advisoryparams)
634 self._advisoryparams = list(advisoryparams)
635 # checking for duplicated entries
635 # checking for duplicated entries
636 self._seenparams = set()
636 self._seenparams = set()
637 for pname, __ in self._mandatoryparams + self._advisoryparams:
637 for pname, __ in self._mandatoryparams + self._advisoryparams:
638 if pname in self._seenparams:
638 if pname in self._seenparams:
639 raise RuntimeError('duplicated params: %s' % pname)
639 raise RuntimeError('duplicated params: %s' % pname)
640 self._seenparams.add(pname)
640 self._seenparams.add(pname)
641 # status of the part's generation:
641 # status of the part's generation:
642 # - None: not started,
642 # - None: not started,
643 # - False: currently generated,
643 # - False: currently generated,
644 # - True: generation done.
644 # - True: generation done.
645 self._generated = None
645 self._generated = None
646 self.mandatory = mandatory
646 self.mandatory = mandatory
647
647
648 # methods used to defines the part content
648 # methods used to defines the part content
649 def __setdata(self, data):
649 def __setdata(self, data):
650 if self._generated is not None:
650 if self._generated is not None:
651 raise error.ReadOnlyPartError('part is being generated')
651 raise error.ReadOnlyPartError('part is being generated')
652 self._data = data
652 self._data = data
653 def __getdata(self):
653 def __getdata(self):
654 return self._data
654 return self._data
655 data = property(__getdata, __setdata)
655 data = property(__getdata, __setdata)
656
656
657 @property
657 @property
658 def mandatoryparams(self):
658 def mandatoryparams(self):
659 # make it an immutable tuple to force people through ``addparam``
659 # make it an immutable tuple to force people through ``addparam``
660 return tuple(self._mandatoryparams)
660 return tuple(self._mandatoryparams)
661
661
662 @property
662 @property
663 def advisoryparams(self):
663 def advisoryparams(self):
664 # make it an immutable tuple to force people through ``addparam``
664 # make it an immutable tuple to force people through ``addparam``
665 return tuple(self._advisoryparams)
665 return tuple(self._advisoryparams)
666
666
667 def addparam(self, name, value='', mandatory=True):
667 def addparam(self, name, value='', mandatory=True):
668 if self._generated is not None:
668 if self._generated is not None:
669 raise error.ReadOnlyPartError('part is being generated')
669 raise error.ReadOnlyPartError('part is being generated')
670 if name in self._seenparams:
670 if name in self._seenparams:
671 raise ValueError('duplicated params: %s' % name)
671 raise ValueError('duplicated params: %s' % name)
672 self._seenparams.add(name)
672 self._seenparams.add(name)
673 params = self._advisoryparams
673 params = self._advisoryparams
674 if mandatory:
674 if mandatory:
675 params = self._mandatoryparams
675 params = self._mandatoryparams
676 params.append((name, value))
676 params.append((name, value))
677
677
678 # methods used to generates the bundle2 stream
678 # methods used to generates the bundle2 stream
679 def getchunks(self):
679 def getchunks(self):
680 if self._generated is not None:
680 if self._generated is not None:
681 raise RuntimeError('part can only be consumed once')
681 raise RuntimeError('part can only be consumed once')
682 self._generated = False
682 self._generated = False
683 #### header
683 #### header
684 if self.mandatory:
684 if self.mandatory:
685 parttype = self.type.upper()
685 parttype = self.type.upper()
686 else:
686 else:
687 parttype = self.type.lower()
687 parttype = self.type.lower()
688 ## parttype
688 ## parttype
689 header = [_pack(_fparttypesize, len(parttype)),
689 header = [_pack(_fparttypesize, len(parttype)),
690 parttype, _pack(_fpartid, self.id),
690 parttype, _pack(_fpartid, self.id),
691 ]
691 ]
692 ## parameters
692 ## parameters
693 # count
693 # count
694 manpar = self.mandatoryparams
694 manpar = self.mandatoryparams
695 advpar = self.advisoryparams
695 advpar = self.advisoryparams
696 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
696 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
697 # size
697 # size
698 parsizes = []
698 parsizes = []
699 for key, value in manpar:
699 for key, value in manpar:
700 parsizes.append(len(key))
700 parsizes.append(len(key))
701 parsizes.append(len(value))
701 parsizes.append(len(value))
702 for key, value in advpar:
702 for key, value in advpar:
703 parsizes.append(len(key))
703 parsizes.append(len(key))
704 parsizes.append(len(value))
704 parsizes.append(len(value))
705 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
705 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
706 header.append(paramsizes)
706 header.append(paramsizes)
707 # key, value
707 # key, value
708 for key, value in manpar:
708 for key, value in manpar:
709 header.append(key)
709 header.append(key)
710 header.append(value)
710 header.append(value)
711 for key, value in advpar:
711 for key, value in advpar:
712 header.append(key)
712 header.append(key)
713 header.append(value)
713 header.append(value)
714 ## finalize header
714 ## finalize header
715 headerchunk = ''.join(header)
715 headerchunk = ''.join(header)
716 yield _pack(_fpartheadersize, len(headerchunk))
716 yield _pack(_fpartheadersize, len(headerchunk))
717 yield headerchunk
717 yield headerchunk
718 ## payload
718 ## payload
719 try:
719 try:
720 for chunk in self._payloadchunks():
720 for chunk in self._payloadchunks():
721 yield _pack(_fpayloadsize, len(chunk))
721 yield _pack(_fpayloadsize, len(chunk))
722 yield chunk
722 yield chunk
723 except Exception, exc:
723 except Exception, exc:
724 # backup exception data for later
724 # backup exception data for later
725 exc_info = sys.exc_info()
725 exc_info = sys.exc_info()
726 msg = 'unexpected error: %s' % exc
726 msg = 'unexpected error: %s' % exc
727 interpart = bundlepart('b2x:error:abort', [('message', msg)],
727 interpart = bundlepart('b2x:error:abort', [('message', msg)],
728 mandatory=False)
728 mandatory=False)
729 interpart.id = 0
729 interpart.id = 0
730 yield _pack(_fpayloadsize, -1)
730 yield _pack(_fpayloadsize, -1)
731 for chunk in interpart.getchunks():
731 for chunk in interpart.getchunks():
732 yield chunk
732 yield chunk
733 # abort current part payload
733 # abort current part payload
734 yield _pack(_fpayloadsize, 0)
734 yield _pack(_fpayloadsize, 0)
735 raise exc_info[0], exc_info[1], exc_info[2]
735 raise exc_info[0], exc_info[1], exc_info[2]
736 # end of payload
736 # end of payload
737 yield _pack(_fpayloadsize, 0)
737 yield _pack(_fpayloadsize, 0)
738 self._generated = True
738 self._generated = True
739
739
740 def _payloadchunks(self):
740 def _payloadchunks(self):
741 """yield chunks of a the part payload
741 """yield chunks of a the part payload
742
742
743 Exists to handle the different methods to provide data to a part."""
743 Exists to handle the different methods to provide data to a part."""
744 # we only support fixed size data now.
744 # we only support fixed size data now.
745 # This will be improved in the future.
745 # This will be improved in the future.
746 if util.safehasattr(self.data, 'next'):
746 if util.safehasattr(self.data, 'next'):
747 buff = util.chunkbuffer(self.data)
747 buff = util.chunkbuffer(self.data)
748 chunk = buff.read(preferedchunksize)
748 chunk = buff.read(preferedchunksize)
749 while chunk:
749 while chunk:
750 yield chunk
750 yield chunk
751 chunk = buff.read(preferedchunksize)
751 chunk = buff.read(preferedchunksize)
752 elif len(self.data):
752 elif len(self.data):
753 yield self.data
753 yield self.data
754
754
755
755
756 flaginterrupt = -1
756 flaginterrupt = -1
757
757
758 class interrupthandler(unpackermixin):
758 class interrupthandler(unpackermixin):
759 """read one part and process it with restricted capability
759 """read one part and process it with restricted capability
760
760
761 This allows to transmit exception raised on the producer size during part
761 This allows to transmit exception raised on the producer size during part
762 iteration while the consumer is reading a part.
762 iteration while the consumer is reading a part.
763
763
764 Part processed in this manner only have access to a ui object,"""
764 Part processed in this manner only have access to a ui object,"""
765
765
766 def __init__(self, ui, fp):
766 def __init__(self, ui, fp):
767 super(interrupthandler, self).__init__(fp)
767 super(interrupthandler, self).__init__(fp)
768 self.ui = ui
768 self.ui = ui
769
769
770 def _readpartheader(self):
770 def _readpartheader(self):
771 """reads a part header size and return the bytes blob
771 """reads a part header size and return the bytes blob
772
772
773 returns None if empty"""
773 returns None if empty"""
774 headersize = self._unpack(_fpartheadersize)[0]
774 headersize = self._unpack(_fpartheadersize)[0]
775 if headersize < 0:
775 if headersize < 0:
776 raise error.BundleValueError('negative part header size: %i'
776 raise error.BundleValueError('negative part header size: %i'
777 % headersize)
777 % headersize)
778 self.ui.debug('part header size: %i\n' % headersize)
778 self.ui.debug('part header size: %i\n' % headersize)
779 if headersize:
779 if headersize:
780 return self._readexact(headersize)
780 return self._readexact(headersize)
781 return None
781 return None
782
782
783 def __call__(self):
783 def __call__(self):
784 self.ui.debug('bundle2 stream interruption, looking for a part.\n')
784 self.ui.debug('bundle2 stream interruption, looking for a part.\n')
785 headerblock = self._readpartheader()
785 headerblock = self._readpartheader()
786 if headerblock is None:
786 if headerblock is None:
787 self.ui.debug('no part found during interruption.\n')
787 self.ui.debug('no part found during interruption.\n')
788 return
788 return
789 part = unbundlepart(self.ui, headerblock, self._fp)
789 part = unbundlepart(self.ui, headerblock, self._fp)
790 op = interruptoperation(self.ui)
790 op = interruptoperation(self.ui)
791 _processpart(op, part)
791 _processpart(op, part)
792
792
793 class interruptoperation(object):
793 class interruptoperation(object):
794 """A limited operation to be use by part handler during interruption
794 """A limited operation to be use by part handler during interruption
795
795
796 It only have access to an ui object.
796 It only have access to an ui object.
797 """
797 """
798
798
799 def __init__(self, ui):
799 def __init__(self, ui):
800 self.ui = ui
800 self.ui = ui
801 self.reply = None
801 self.reply = None
802
802
803 @property
803 @property
804 def repo(self):
804 def repo(self):
805 raise RuntimeError('no repo access from stream interruption')
805 raise RuntimeError('no repo access from stream interruption')
806
806
807 def gettransaction(self):
807 def gettransaction(self):
808 raise TransactionUnavailable('no repo access from stream interruption')
808 raise TransactionUnavailable('no repo access from stream interruption')
809
809
810 class unbundlepart(unpackermixin):
810 class unbundlepart(unpackermixin):
811 """a bundle part read from a bundle"""
811 """a bundle part read from a bundle"""
812
812
813 def __init__(self, ui, header, fp):
813 def __init__(self, ui, header, fp):
814 super(unbundlepart, self).__init__(fp)
814 super(unbundlepart, self).__init__(fp)
815 self.ui = ui
815 self.ui = ui
816 # unbundle state attr
816 # unbundle state attr
817 self._headerdata = header
817 self._headerdata = header
818 self._headeroffset = 0
818 self._headeroffset = 0
819 self._initialized = False
819 self._initialized = False
820 self.consumed = False
820 self.consumed = False
821 # part data
821 # part data
822 self.id = None
822 self.id = None
823 self.type = None
823 self.type = None
824 self.mandatoryparams = None
824 self.mandatoryparams = None
825 self.advisoryparams = None
825 self.advisoryparams = None
826 self.params = None
826 self.params = None
827 self.mandatorykeys = ()
827 self.mandatorykeys = ()
828 self._payloadstream = None
828 self._payloadstream = None
829 self._readheader()
829 self._readheader()
830 self._mandatory = None
830 self._mandatory = None
831 self._chunkindex = [] #(payload, file) position tuples for chunk starts
831 self._chunkindex = [] #(payload, file) position tuples for chunk starts
832 self._pos = 0
832 self._pos = 0
833
833
834 def _fromheader(self, size):
834 def _fromheader(self, size):
835 """return the next <size> byte from the header"""
835 """return the next <size> byte from the header"""
836 offset = self._headeroffset
836 offset = self._headeroffset
837 data = self._headerdata[offset:(offset + size)]
837 data = self._headerdata[offset:(offset + size)]
838 self._headeroffset = offset + size
838 self._headeroffset = offset + size
839 return data
839 return data
840
840
841 def _unpackheader(self, format):
841 def _unpackheader(self, format):
842 """read given format from header
842 """read given format from header
843
843
844 This automatically compute the size of the format to read."""
844 This automatically compute the size of the format to read."""
845 data = self._fromheader(struct.calcsize(format))
845 data = self._fromheader(struct.calcsize(format))
846 return _unpack(format, data)
846 return _unpack(format, data)
847
847
848 def _initparams(self, mandatoryparams, advisoryparams):
848 def _initparams(self, mandatoryparams, advisoryparams):
849 """internal function to setup all logic related parameters"""
849 """internal function to setup all logic related parameters"""
850 # make it read only to prevent people touching it by mistake.
850 # make it read only to prevent people touching it by mistake.
851 self.mandatoryparams = tuple(mandatoryparams)
851 self.mandatoryparams = tuple(mandatoryparams)
852 self.advisoryparams = tuple(advisoryparams)
852 self.advisoryparams = tuple(advisoryparams)
853 # user friendly UI
853 # user friendly UI
854 self.params = dict(self.mandatoryparams)
854 self.params = dict(self.mandatoryparams)
855 self.params.update(dict(self.advisoryparams))
855 self.params.update(dict(self.advisoryparams))
856 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
856 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
857
857
858 def _payloadchunks(self, chunknum=0):
858 def _payloadchunks(self, chunknum=0):
859 '''seek to specified chunk and start yielding data'''
859 '''seek to specified chunk and start yielding data'''
860 if len(self._chunkindex) == 0:
860 if len(self._chunkindex) == 0:
861 assert chunknum == 0, 'Must start with chunk 0'
861 assert chunknum == 0, 'Must start with chunk 0'
862 self._chunkindex.append((0, super(unbundlepart, self).tell()))
862 self._chunkindex.append((0, super(unbundlepart, self).tell()))
863 else:
863 else:
864 assert chunknum < len(self._chunkindex), \
864 assert chunknum < len(self._chunkindex), \
865 'Unknown chunk %d' % chunknum
865 'Unknown chunk %d' % chunknum
866 super(unbundlepart, self).seek(self._chunkindex[chunknum][1])
866 super(unbundlepart, self).seek(self._chunkindex[chunknum][1])
867
867
868 pos = self._chunkindex[chunknum][0]
868 pos = self._chunkindex[chunknum][0]
869 payloadsize = self._unpack(_fpayloadsize)[0]
869 payloadsize = self._unpack(_fpayloadsize)[0]
870 self.ui.debug('payload chunk size: %i\n' % payloadsize)
870 self.ui.debug('payload chunk size: %i\n' % payloadsize)
871 while payloadsize:
871 while payloadsize:
872 if payloadsize == flaginterrupt:
872 if payloadsize == flaginterrupt:
873 # interruption detection, the handler will now read a
873 # interruption detection, the handler will now read a
874 # single part and process it.
874 # single part and process it.
875 interrupthandler(self.ui, self._fp)()
875 interrupthandler(self.ui, self._fp)()
876 elif payloadsize < 0:
876 elif payloadsize < 0:
877 msg = 'negative payload chunk size: %i' % payloadsize
877 msg = 'negative payload chunk size: %i' % payloadsize
878 raise error.BundleValueError(msg)
878 raise error.BundleValueError(msg)
879 else:
879 else:
880 result = self._readexact(payloadsize)
880 result = self._readexact(payloadsize)
881 chunknum += 1
881 chunknum += 1
882 pos += payloadsize
882 pos += payloadsize
883 if chunknum == len(self._chunkindex):
883 if chunknum == len(self._chunkindex):
884 self._chunkindex.append((pos,
884 self._chunkindex.append((pos,
885 super(unbundlepart, self).tell()))
885 super(unbundlepart, self).tell()))
886 yield result
886 yield result
887 payloadsize = self._unpack(_fpayloadsize)[0]
887 payloadsize = self._unpack(_fpayloadsize)[0]
888 self.ui.debug('payload chunk size: %i\n' % payloadsize)
888 self.ui.debug('payload chunk size: %i\n' % payloadsize)
889
889
890 def _findchunk(self, pos):
890 def _findchunk(self, pos):
891 '''for a given payload position, return a chunk number and offset'''
891 '''for a given payload position, return a chunk number and offset'''
892 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
892 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
893 if ppos == pos:
893 if ppos == pos:
894 return chunk, 0
894 return chunk, 0
895 elif ppos > pos:
895 elif ppos > pos:
896 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
896 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
897 raise ValueError('Unknown chunk')
897 raise ValueError('Unknown chunk')
898
898
899 def _readheader(self):
899 def _readheader(self):
900 """read the header and setup the object"""
900 """read the header and setup the object"""
901 typesize = self._unpackheader(_fparttypesize)[0]
901 typesize = self._unpackheader(_fparttypesize)[0]
902 self.type = self._fromheader(typesize)
902 self.type = self._fromheader(typesize)
903 self.ui.debug('part type: "%s"\n' % self.type)
903 self.ui.debug('part type: "%s"\n' % self.type)
904 self.id = self._unpackheader(_fpartid)[0]
904 self.id = self._unpackheader(_fpartid)[0]
905 self.ui.debug('part id: "%s"\n' % self.id)
905 self.ui.debug('part id: "%s"\n' % self.id)
906 # extract mandatory bit from type
906 # extract mandatory bit from type
907 self.mandatory = (self.type != self.type.lower())
907 self.mandatory = (self.type != self.type.lower())
908 self.type = self.type.lower()
908 self.type = self.type.lower()
909 ## reading parameters
909 ## reading parameters
910 # param count
910 # param count
911 mancount, advcount = self._unpackheader(_fpartparamcount)
911 mancount, advcount = self._unpackheader(_fpartparamcount)
912 self.ui.debug('part parameters: %i\n' % (mancount + advcount))
912 self.ui.debug('part parameters: %i\n' % (mancount + advcount))
913 # param size
913 # param size
914 fparamsizes = _makefpartparamsizes(mancount + advcount)
914 fparamsizes = _makefpartparamsizes(mancount + advcount)
915 paramsizes = self._unpackheader(fparamsizes)
915 paramsizes = self._unpackheader(fparamsizes)
916 # make it a list of couple again
916 # make it a list of couple again
917 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
917 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
918 # split mandatory from advisory
918 # split mandatory from advisory
919 mansizes = paramsizes[:mancount]
919 mansizes = paramsizes[:mancount]
920 advsizes = paramsizes[mancount:]
920 advsizes = paramsizes[mancount:]
921 # retrieve param value
921 # retrieve param value
922 manparams = []
922 manparams = []
923 for key, value in mansizes:
923 for key, value in mansizes:
924 manparams.append((self._fromheader(key), self._fromheader(value)))
924 manparams.append((self._fromheader(key), self._fromheader(value)))
925 advparams = []
925 advparams = []
926 for key, value in advsizes:
926 for key, value in advsizes:
927 advparams.append((self._fromheader(key), self._fromheader(value)))
927 advparams.append((self._fromheader(key), self._fromheader(value)))
928 self._initparams(manparams, advparams)
928 self._initparams(manparams, advparams)
929 ## part payload
929 ## part payload
930 self._payloadstream = util.chunkbuffer(self._payloadchunks())
930 self._payloadstream = util.chunkbuffer(self._payloadchunks())
931 # we read the data, tell it
931 # we read the data, tell it
932 self._initialized = True
932 self._initialized = True
933
933
934 def read(self, size=None):
934 def read(self, size=None):
935 """read payload data"""
935 """read payload data"""
936 if not self._initialized:
936 if not self._initialized:
937 self._readheader()
937 self._readheader()
938 if size is None:
938 if size is None:
939 data = self._payloadstream.read()
939 data = self._payloadstream.read()
940 else:
940 else:
941 data = self._payloadstream.read(size)
941 data = self._payloadstream.read(size)
942 if size is None or len(data) < size:
942 if size is None or len(data) < size:
943 self.consumed = True
943 self.consumed = True
944 self._pos += len(data)
944 self._pos += len(data)
945 return data
945 return data
946
946
947 def tell(self):
947 def tell(self):
948 return self._pos
948 return self._pos
949
949
950 def seek(self, offset, whence=0):
950 def seek(self, offset, whence=0):
951 if whence == 0:
951 if whence == 0:
952 newpos = offset
952 newpos = offset
953 elif whence == 1:
953 elif whence == 1:
954 newpos = self._pos + offset
954 newpos = self._pos + offset
955 elif whence == 2:
955 elif whence == 2:
956 if not self.consumed:
956 if not self.consumed:
957 self.read()
957 self.read()
958 newpos = self._chunkindex[-1][0] - offset
958 newpos = self._chunkindex[-1][0] - offset
959 else:
959 else:
960 raise ValueError('Unknown whence value: %r' % (whence,))
960 raise ValueError('Unknown whence value: %r' % (whence,))
961
961
962 if newpos > self._chunkindex[-1][0] and not self.consumed:
962 if newpos > self._chunkindex[-1][0] and not self.consumed:
963 self.read()
963 self.read()
964 if not 0 <= newpos <= self._chunkindex[-1][0]:
964 if not 0 <= newpos <= self._chunkindex[-1][0]:
965 raise ValueError('Offset out of range')
965 raise ValueError('Offset out of range')
966
966
967 if self._pos != newpos:
967 if self._pos != newpos:
968 chunk, internaloffset = self._findchunk(newpos)
968 chunk, internaloffset = self._findchunk(newpos)
969 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
969 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
970 adjust = self.read(internaloffset)
970 adjust = self.read(internaloffset)
971 if len(adjust) != internaloffset:
971 if len(adjust) != internaloffset:
972 raise util.Abort(_('Seek failed\n'))
972 raise util.Abort(_('Seek failed\n'))
973 self._pos = newpos
973 self._pos = newpos
974
974
975 capabilities = {'HG2Y': (),
975 capabilities = {'HG2Y': (),
976 'b2x:listkeys': (),
976 'b2x:listkeys': (),
977 'b2x:pushkey': (),
977 'b2x:pushkey': (),
978 'digests': tuple(sorted(util.DIGESTS.keys())),
978 'digests': tuple(sorted(util.DIGESTS.keys())),
979 'b2x:remote-changegroup': ('http', 'https'),
979 'b2x:remote-changegroup': ('http', 'https'),
980 }
980 }
981
981
982 def getrepocaps(repo, allowpushback=False):
982 def getrepocaps(repo, allowpushback=False):
983 """return the bundle2 capabilities for a given repo
983 """return the bundle2 capabilities for a given repo
984
984
985 Exists to allow extensions (like evolution) to mutate the capabilities.
985 Exists to allow extensions (like evolution) to mutate the capabilities.
986 """
986 """
987 caps = capabilities.copy()
987 caps = capabilities.copy()
988 caps['b2x:changegroup'] = tuple(sorted(changegroup.packermap.keys()))
988 caps['b2x:changegroup'] = tuple(sorted(changegroup.packermap.keys()))
989 if obsolete.isenabled(repo, obsolete.exchangeopt):
989 if obsolete.isenabled(repo, obsolete.exchangeopt):
990 supportedformat = tuple('V%i' % v for v in obsolete.formats)
990 supportedformat = tuple('V%i' % v for v in obsolete.formats)
991 caps['b2x:obsmarkers'] = supportedformat
991 caps['b2x:obsmarkers'] = supportedformat
992 if allowpushback:
992 if allowpushback:
993 caps['b2x:pushback'] = ()
993 caps['b2x:pushback'] = ()
994 return caps
994 return caps
995
995
996 def bundle2caps(remote):
996 def bundle2caps(remote):
997 """return the bundle capabilities of a peer as dict"""
997 """return the bundle capabilities of a peer as dict"""
998 raw = remote.capable('bundle2-exp')
998 raw = remote.capable('bundle2-exp')
999 if not raw and raw != '':
999 if not raw and raw != '':
1000 return {}
1000 return {}
1001 capsblob = urllib.unquote(remote.capable('bundle2-exp'))
1001 capsblob = urllib.unquote(remote.capable('bundle2-exp'))
1002 return decodecaps(capsblob)
1002 return decodecaps(capsblob)
1003
1003
1004 def obsmarkersversion(caps):
1004 def obsmarkersversion(caps):
1005 """extract the list of supported obsmarkers versions from a bundle2caps dict
1005 """extract the list of supported obsmarkers versions from a bundle2caps dict
1006 """
1006 """
1007 obscaps = caps.get('b2x:obsmarkers', ())
1007 obscaps = caps.get('b2x:obsmarkers', ())
1008 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1008 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1009
1009
1010 @parthandler('b2x:changegroup', ('version',))
1010 @parthandler('b2x:changegroup', ('version',))
1011 def handlechangegroup(op, inpart):
1011 def handlechangegroup(op, inpart):
1012 """apply a changegroup part on the repo
1012 """apply a changegroup part on the repo
1013
1013
1014 This is a very early implementation that will massive rework before being
1014 This is a very early implementation that will massive rework before being
1015 inflicted to any end-user.
1015 inflicted to any end-user.
1016 """
1016 """
1017 # Make sure we trigger a transaction creation
1017 # Make sure we trigger a transaction creation
1018 #
1018 #
1019 # The addchangegroup function will get a transaction object by itself, but
1019 # The addchangegroup function will get a transaction object by itself, but
1020 # we need to make sure we trigger the creation of a transaction object used
1020 # we need to make sure we trigger the creation of a transaction object used
1021 # for the whole processing scope.
1021 # for the whole processing scope.
1022 op.gettransaction()
1022 op.gettransaction()
1023 unpackerversion = inpart.params.get('version', '01')
1023 unpackerversion = inpart.params.get('version', '01')
1024 # We should raise an appropriate exception here
1024 # We should raise an appropriate exception here
1025 unpacker = changegroup.packermap[unpackerversion][1]
1025 unpacker = changegroup.packermap[unpackerversion][1]
1026 cg = unpacker(inpart, 'UN')
1026 cg = unpacker(inpart, 'UN')
1027 # the source and url passed here are overwritten by the one contained in
1027 # the source and url passed here are overwritten by the one contained in
1028 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1028 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1029 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
1029 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
1030 op.records.add('changegroup', {'return': ret})
1030 op.records.add('changegroup', {'return': ret})
1031 if op.reply is not None:
1031 if op.reply is not None:
1032 # This is definitely not the final form of this
1032 # This is definitely not the final form of this
1033 # return. But one need to start somewhere.
1033 # return. But one need to start somewhere.
1034 part = op.reply.newpart('b2x:reply:changegroup', mandatory=False)
1034 part = op.reply.newpart('b2x:reply:changegroup', mandatory=False)
1035 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1035 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1036 part.addparam('return', '%i' % ret, mandatory=False)
1036 part.addparam('return', '%i' % ret, mandatory=False)
1037 assert not inpart.read()
1037 assert not inpart.read()
1038
1038
1039 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1039 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1040 ['digest:%s' % k for k in util.DIGESTS.keys()])
1040 ['digest:%s' % k for k in util.DIGESTS.keys()])
1041 @parthandler('b2x:remote-changegroup', _remotechangegroupparams)
1041 @parthandler('b2x:remote-changegroup', _remotechangegroupparams)
1042 def handleremotechangegroup(op, inpart):
1042 def handleremotechangegroup(op, inpart):
1043 """apply a bundle10 on the repo, given an url and validation information
1043 """apply a bundle10 on the repo, given an url and validation information
1044
1044
1045 All the information about the remote bundle to import are given as
1045 All the information about the remote bundle to import are given as
1046 parameters. The parameters include:
1046 parameters. The parameters include:
1047 - url: the url to the bundle10.
1047 - url: the url to the bundle10.
1048 - size: the bundle10 file size. It is used to validate what was
1048 - size: the bundle10 file size. It is used to validate what was
1049 retrieved by the client matches the server knowledge about the bundle.
1049 retrieved by the client matches the server knowledge about the bundle.
1050 - digests: a space separated list of the digest types provided as
1050 - digests: a space separated list of the digest types provided as
1051 parameters.
1051 parameters.
1052 - digest:<digest-type>: the hexadecimal representation of the digest with
1052 - digest:<digest-type>: the hexadecimal representation of the digest with
1053 that name. Like the size, it is used to validate what was retrieved by
1053 that name. Like the size, it is used to validate what was retrieved by
1054 the client matches what the server knows about the bundle.
1054 the client matches what the server knows about the bundle.
1055
1055
1056 When multiple digest types are given, all of them are checked.
1056 When multiple digest types are given, all of them are checked.
1057 """
1057 """
1058 try:
1058 try:
1059 raw_url = inpart.params['url']
1059 raw_url = inpart.params['url']
1060 except KeyError:
1060 except KeyError:
1061 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1061 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1062 parsed_url = util.url(raw_url)
1062 parsed_url = util.url(raw_url)
1063 if parsed_url.scheme not in capabilities['b2x:remote-changegroup']:
1063 if parsed_url.scheme not in capabilities['b2x:remote-changegroup']:
1064 raise util.Abort(_('remote-changegroup does not support %s urls') %
1064 raise util.Abort(_('remote-changegroup does not support %s urls') %
1065 parsed_url.scheme)
1065 parsed_url.scheme)
1066
1066
1067 try:
1067 try:
1068 size = int(inpart.params['size'])
1068 size = int(inpart.params['size'])
1069 except ValueError:
1069 except ValueError:
1070 raise util.Abort(_('remote-changegroup: invalid value for param "%s"')
1070 raise util.Abort(_('remote-changegroup: invalid value for param "%s"')
1071 % 'size')
1071 % 'size')
1072 except KeyError:
1072 except KeyError:
1073 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1073 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1074
1074
1075 digests = {}
1075 digests = {}
1076 for typ in inpart.params.get('digests', '').split():
1076 for typ in inpart.params.get('digests', '').split():
1077 param = 'digest:%s' % typ
1077 param = 'digest:%s' % typ
1078 try:
1078 try:
1079 value = inpart.params[param]
1079 value = inpart.params[param]
1080 except KeyError:
1080 except KeyError:
1081 raise util.Abort(_('remote-changegroup: missing "%s" param') %
1081 raise util.Abort(_('remote-changegroup: missing "%s" param') %
1082 param)
1082 param)
1083 digests[typ] = value
1083 digests[typ] = value
1084
1084
1085 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1085 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1086
1086
1087 # Make sure we trigger a transaction creation
1087 # Make sure we trigger a transaction creation
1088 #
1088 #
1089 # The addchangegroup function will get a transaction object by itself, but
1089 # The addchangegroup function will get a transaction object by itself, but
1090 # we need to make sure we trigger the creation of a transaction object used
1090 # we need to make sure we trigger the creation of a transaction object used
1091 # for the whole processing scope.
1091 # for the whole processing scope.
1092 op.gettransaction()
1092 op.gettransaction()
1093 import exchange
1093 import exchange
1094 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1094 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1095 if not isinstance(cg, changegroup.cg1unpacker):
1095 if not isinstance(cg, changegroup.cg1unpacker):
1096 raise util.Abort(_('%s: not a bundle version 1.0') %
1096 raise util.Abort(_('%s: not a bundle version 1.0') %
1097 util.hidepassword(raw_url))
1097 util.hidepassword(raw_url))
1098 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
1098 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
1099 op.records.add('changegroup', {'return': ret})
1099 op.records.add('changegroup', {'return': ret})
1100 if op.reply is not None:
1100 if op.reply is not None:
1101 # This is definitely not the final form of this
1101 # This is definitely not the final form of this
1102 # return. But one need to start somewhere.
1102 # return. But one need to start somewhere.
1103 part = op.reply.newpart('b2x:reply:changegroup')
1103 part = op.reply.newpart('b2x:reply:changegroup')
1104 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1104 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1105 part.addparam('return', '%i' % ret, mandatory=False)
1105 part.addparam('return', '%i' % ret, mandatory=False)
1106 try:
1106 try:
1107 real_part.validate()
1107 real_part.validate()
1108 except util.Abort, e:
1108 except util.Abort, e:
1109 raise util.Abort(_('bundle at %s is corrupted:\n%s') %
1109 raise util.Abort(_('bundle at %s is corrupted:\n%s') %
1110 (util.hidepassword(raw_url), str(e)))
1110 (util.hidepassword(raw_url), str(e)))
1111 assert not inpart.read()
1111 assert not inpart.read()
1112
1112
1113 @parthandler('b2x:reply:changegroup', ('return', 'in-reply-to'))
1113 @parthandler('b2x:reply:changegroup', ('return', 'in-reply-to'))
1114 def handlereplychangegroup(op, inpart):
1114 def handlereplychangegroup(op, inpart):
1115 ret = int(inpart.params['return'])
1115 ret = int(inpart.params['return'])
1116 replyto = int(inpart.params['in-reply-to'])
1116 replyto = int(inpart.params['in-reply-to'])
1117 op.records.add('changegroup', {'return': ret}, replyto)
1117 op.records.add('changegroup', {'return': ret}, replyto)
1118
1118
1119 @parthandler('b2x:check:heads')
1119 @parthandler('b2x:check:heads')
1120 def handlecheckheads(op, inpart):
1120 def handlecheckheads(op, inpart):
1121 """check that head of the repo did not change
1121 """check that head of the repo did not change
1122
1122
1123 This is used to detect a push race when using unbundle.
1123 This is used to detect a push race when using unbundle.
1124 This replaces the "heads" argument of unbundle."""
1124 This replaces the "heads" argument of unbundle."""
1125 h = inpart.read(20)
1125 h = inpart.read(20)
1126 heads = []
1126 heads = []
1127 while len(h) == 20:
1127 while len(h) == 20:
1128 heads.append(h)
1128 heads.append(h)
1129 h = inpart.read(20)
1129 h = inpart.read(20)
1130 assert not h
1130 assert not h
1131 if heads != op.repo.heads():
1131 if heads != op.repo.heads():
1132 raise error.PushRaced('repository changed while pushing - '
1132 raise error.PushRaced('repository changed while pushing - '
1133 'please try again')
1133 'please try again')
1134
1134
1135 @parthandler('b2x:output')
1135 @parthandler('b2x:output')
1136 def handleoutput(op, inpart):
1136 def handleoutput(op, inpart):
1137 """forward output captured on the server to the client"""
1137 """forward output captured on the server to the client"""
1138 for line in inpart.read().splitlines():
1138 for line in inpart.read().splitlines():
1139 op.ui.write(('remote: %s\n' % line))
1139 op.ui.write(('remote: %s\n' % line))
1140
1140
1141 @parthandler('b2x:replycaps')
1141 @parthandler('b2x:replycaps')
1142 def handlereplycaps(op, inpart):
1142 def handlereplycaps(op, inpart):
1143 """Notify that a reply bundle should be created
1143 """Notify that a reply bundle should be created
1144
1144
1145 The payload contains the capabilities information for the reply"""
1145 The payload contains the capabilities information for the reply"""
1146 caps = decodecaps(inpart.read())
1146 caps = decodecaps(inpart.read())
1147 if op.reply is None:
1147 if op.reply is None:
1148 op.reply = bundle20(op.ui, caps)
1148 op.reply = bundle20(op.ui, caps)
1149
1149
1150 @parthandler('b2x:error:abort', ('message', 'hint'))
1150 @parthandler('b2x:error:abort', ('message', 'hint'))
1151 def handlereplycaps(op, inpart):
1151 def handlereplycaps(op, inpart):
1152 """Used to transmit abort error over the wire"""
1152 """Used to transmit abort error over the wire"""
1153 raise util.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
1153 raise util.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
1154
1154
1155 @parthandler('b2x:error:unsupportedcontent', ('parttype', 'params'))
1155 @parthandler('b2x:error:unsupportedcontent', ('parttype', 'params'))
1156 def handlereplycaps(op, inpart):
1156 def handlereplycaps(op, inpart):
1157 """Used to transmit unknown content error over the wire"""
1157 """Used to transmit unknown content error over the wire"""
1158 kwargs = {}
1158 kwargs = {}
1159 parttype = inpart.params.get('parttype')
1159 parttype = inpart.params.get('parttype')
1160 if parttype is not None:
1160 if parttype is not None:
1161 kwargs['parttype'] = parttype
1161 kwargs['parttype'] = parttype
1162 params = inpart.params.get('params')
1162 params = inpart.params.get('params')
1163 if params is not None:
1163 if params is not None:
1164 kwargs['params'] = params.split('\0')
1164 kwargs['params'] = params.split('\0')
1165
1165
1166 raise error.UnsupportedPartError(**kwargs)
1166 raise error.UnsupportedPartError(**kwargs)
1167
1167
1168 @parthandler('b2x:error:pushraced', ('message',))
1168 @parthandler('b2x:error:pushraced', ('message',))
1169 def handlereplycaps(op, inpart):
1169 def handlereplycaps(op, inpart):
1170 """Used to transmit push race error over the wire"""
1170 """Used to transmit push race error over the wire"""
1171 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1171 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1172
1172
1173 @parthandler('b2x:listkeys', ('namespace',))
1173 @parthandler('b2x:listkeys', ('namespace',))
1174 def handlelistkeys(op, inpart):
1174 def handlelistkeys(op, inpart):
1175 """retrieve pushkey namespace content stored in a bundle2"""
1175 """retrieve pushkey namespace content stored in a bundle2"""
1176 namespace = inpart.params['namespace']
1176 namespace = inpart.params['namespace']
1177 r = pushkey.decodekeys(inpart.read())
1177 r = pushkey.decodekeys(inpart.read())
1178 op.records.add('listkeys', (namespace, r))
1178 op.records.add('listkeys', (namespace, r))
1179
1179
1180 @parthandler('b2x:pushkey', ('namespace', 'key', 'old', 'new'))
1180 @parthandler('b2x:pushkey', ('namespace', 'key', 'old', 'new'))
1181 def handlepushkey(op, inpart):
1181 def handlepushkey(op, inpart):
1182 """process a pushkey request"""
1182 """process a pushkey request"""
1183 dec = pushkey.decode
1183 dec = pushkey.decode
1184 namespace = dec(inpart.params['namespace'])
1184 namespace = dec(inpart.params['namespace'])
1185 key = dec(inpart.params['key'])
1185 key = dec(inpart.params['key'])
1186 old = dec(inpart.params['old'])
1186 old = dec(inpart.params['old'])
1187 new = dec(inpart.params['new'])
1187 new = dec(inpart.params['new'])
1188 ret = op.repo.pushkey(namespace, key, old, new)
1188 ret = op.repo.pushkey(namespace, key, old, new)
1189 record = {'namespace': namespace,
1189 record = {'namespace': namespace,
1190 'key': key,
1190 'key': key,
1191 'old': old,
1191 'old': old,
1192 'new': new}
1192 'new': new}
1193 op.records.add('pushkey', record)
1193 op.records.add('pushkey', record)
1194 if op.reply is not None:
1194 if op.reply is not None:
1195 rpart = op.reply.newpart('b2x:reply:pushkey')
1195 rpart = op.reply.newpart('b2x:reply:pushkey')
1196 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1196 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1197 rpart.addparam('return', '%i' % ret, mandatory=False)
1197 rpart.addparam('return', '%i' % ret, mandatory=False)
1198
1198
1199 @parthandler('b2x:reply:pushkey', ('return', 'in-reply-to'))
1199 @parthandler('b2x:reply:pushkey', ('return', 'in-reply-to'))
1200 def handlepushkeyreply(op, inpart):
1200 def handlepushkeyreply(op, inpart):
1201 """retrieve the result of a pushkey request"""
1201 """retrieve the result of a pushkey request"""
1202 ret = int(inpart.params['return'])
1202 ret = int(inpart.params['return'])
1203 partid = int(inpart.params['in-reply-to'])
1203 partid = int(inpart.params['in-reply-to'])
1204 op.records.add('pushkey', {'return': ret}, partid)
1204 op.records.add('pushkey', {'return': ret}, partid)
1205
1205
1206 @parthandler('b2x:obsmarkers')
1206 @parthandler('b2x:obsmarkers')
1207 def handleobsmarker(op, inpart):
1207 def handleobsmarker(op, inpart):
1208 """add a stream of obsmarkers to the repo"""
1208 """add a stream of obsmarkers to the repo"""
1209 tr = op.gettransaction()
1209 tr = op.gettransaction()
1210 new = op.repo.obsstore.mergemarkers(tr, inpart.read())
1210 new = op.repo.obsstore.mergemarkers(tr, inpart.read())
1211 if new:
1211 if new:
1212 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1212 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1213 op.records.add('obsmarkers', {'new': new})
1213 op.records.add('obsmarkers', {'new': new})
1214 if op.reply is not None:
1214 if op.reply is not None:
1215 rpart = op.reply.newpart('b2x:reply:obsmarkers')
1215 rpart = op.reply.newpart('b2x:reply:obsmarkers')
1216 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1216 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1217 rpart.addparam('new', '%i' % new, mandatory=False)
1217 rpart.addparam('new', '%i' % new, mandatory=False)
1218
1218
1219
1219
1220 @parthandler('b2x:reply:obsmarkers', ('new', 'in-reply-to'))
1220 @parthandler('b2x:reply:obsmarkers', ('new', 'in-reply-to'))
1221 def handlepushkeyreply(op, inpart):
1221 def handlepushkeyreply(op, inpart):
1222 """retrieve the result of a pushkey request"""
1222 """retrieve the result of a pushkey request"""
1223 ret = int(inpart.params['new'])
1223 ret = int(inpart.params['new'])
1224 partid = int(inpart.params['in-reply-to'])
1224 partid = int(inpart.params['in-reply-to'])
1225 op.records.add('obsmarkers', {'new': ret}, partid)
1225 op.records.add('obsmarkers', {'new': ret}, partid)
General Comments 0
You need to be logged in to leave comments. Login now