##// END OF EJS Templates
unbundle20: move header parsing into the 'getunbundler' function...
Pierre-Yves David -
r24642:54e5c239 default
parent child Browse files
Show More
@@ -1,1231 +1,1232 b''
1 # bundle2.py - generic container format to transmit arbitrary data.
1 # bundle2.py - generic container format to transmit arbitrary data.
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7 """Handling of the new bundle2 format
7 """Handling of the new bundle2 format
8
8
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 payloads in an application agnostic way. It consist in a sequence of "parts"
10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 that will be handed to and processed by the application layer.
11 that will be handed to and processed by the application layer.
12
12
13
13
14 General format architecture
14 General format architecture
15 ===========================
15 ===========================
16
16
17 The format is architectured as follow
17 The format is architectured as follow
18
18
19 - magic string
19 - magic string
20 - stream level parameters
20 - stream level parameters
21 - payload parts (any number)
21 - payload parts (any number)
22 - end of stream marker.
22 - end of stream marker.
23
23
24 the Binary format
24 the Binary format
25 ============================
25 ============================
26
26
27 All numbers are unsigned and big-endian.
27 All numbers are unsigned and big-endian.
28
28
29 stream level parameters
29 stream level parameters
30 ------------------------
30 ------------------------
31
31
32 Binary format is as follow
32 Binary format is as follow
33
33
34 :params size: int32
34 :params size: int32
35
35
36 The total number of Bytes used by the parameters
36 The total number of Bytes used by the parameters
37
37
38 :params value: arbitrary number of Bytes
38 :params value: arbitrary number of Bytes
39
39
40 A blob of `params size` containing the serialized version of all stream level
40 A blob of `params size` containing the serialized version of all stream level
41 parameters.
41 parameters.
42
42
43 The blob contains a space separated list of parameters. Parameters with value
43 The blob contains a space separated list of parameters. Parameters with value
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45
45
46 Empty name are obviously forbidden.
46 Empty name are obviously forbidden.
47
47
48 Name MUST start with a letter. If this first letter is lower case, the
48 Name MUST start with a letter. If this first letter is lower case, the
49 parameter is advisory and can be safely ignored. However when the first
49 parameter is advisory and can be safely ignored. However when the first
50 letter is capital, the parameter is mandatory and the bundling process MUST
50 letter is capital, the parameter is mandatory and the bundling process MUST
51 stop if he is not able to proceed it.
51 stop if he is not able to proceed it.
52
52
53 Stream parameters use a simple textual format for two main reasons:
53 Stream parameters use a simple textual format for two main reasons:
54
54
55 - Stream level parameters should remain simple and we want to discourage any
55 - Stream level parameters should remain simple and we want to discourage any
56 crazy usage.
56 crazy usage.
57 - Textual data allow easy human inspection of a bundle2 header in case of
57 - Textual data allow easy human inspection of a bundle2 header in case of
58 troubles.
58 troubles.
59
59
60 Any Applicative level options MUST go into a bundle2 part instead.
60 Any Applicative level options MUST go into a bundle2 part instead.
61
61
62 Payload part
62 Payload part
63 ------------------------
63 ------------------------
64
64
65 Binary format is as follow
65 Binary format is as follow
66
66
67 :header size: int32
67 :header size: int32
68
68
69 The total number of Bytes used by the part headers. When the header is empty
69 The total number of Bytes used by the part headers. When the header is empty
70 (size = 0) this is interpreted as the end of stream marker.
70 (size = 0) this is interpreted as the end of stream marker.
71
71
72 :header:
72 :header:
73
73
74 The header defines how to interpret the part. It contains two piece of
74 The header defines how to interpret the part. It contains two piece of
75 data: the part type, and the part parameters.
75 data: the part type, and the part parameters.
76
76
77 The part type is used to route an application level handler, that can
77 The part type is used to route an application level handler, that can
78 interpret payload.
78 interpret payload.
79
79
80 Part parameters are passed to the application level handler. They are
80 Part parameters are passed to the application level handler. They are
81 meant to convey information that will help the application level object to
81 meant to convey information that will help the application level object to
82 interpret the part payload.
82 interpret the part payload.
83
83
84 The binary format of the header is has follow
84 The binary format of the header is has follow
85
85
86 :typesize: (one byte)
86 :typesize: (one byte)
87
87
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89
89
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 to this part.
91 to this part.
92
92
93 :parameters:
93 :parameters:
94
94
95 Part's parameter may have arbitrary content, the binary structure is::
95 Part's parameter may have arbitrary content, the binary structure is::
96
96
97 <mandatory-count><advisory-count><param-sizes><param-data>
97 <mandatory-count><advisory-count><param-sizes><param-data>
98
98
99 :mandatory-count: 1 byte, number of mandatory parameters
99 :mandatory-count: 1 byte, number of mandatory parameters
100
100
101 :advisory-count: 1 byte, number of advisory parameters
101 :advisory-count: 1 byte, number of advisory parameters
102
102
103 :param-sizes:
103 :param-sizes:
104
104
105 N couple of bytes, where N is the total number of parameters. Each
105 N couple of bytes, where N is the total number of parameters. Each
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107
107
108 :param-data:
108 :param-data:
109
109
110 A blob of bytes from which each parameter key and value can be
110 A blob of bytes from which each parameter key and value can be
111 retrieved using the list of size couples stored in the previous
111 retrieved using the list of size couples stored in the previous
112 field.
112 field.
113
113
114 Mandatory parameters comes first, then the advisory ones.
114 Mandatory parameters comes first, then the advisory ones.
115
115
116 Each parameter's key MUST be unique within the part.
116 Each parameter's key MUST be unique within the part.
117
117
118 :payload:
118 :payload:
119
119
120 payload is a series of `<chunksize><chunkdata>`.
120 payload is a series of `<chunksize><chunkdata>`.
121
121
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124
124
125 The current implementation always produces either zero or one chunk.
125 The current implementation always produces either zero or one chunk.
126 This is an implementation limitation that will ultimately be lifted.
126 This is an implementation limitation that will ultimately be lifted.
127
127
128 `chunksize` can be negative to trigger special case processing. No such
128 `chunksize` can be negative to trigger special case processing. No such
129 processing is in place yet.
129 processing is in place yet.
130
130
131 Bundle processing
131 Bundle processing
132 ============================
132 ============================
133
133
134 Each part is processed in order using a "part handler". Handler are registered
134 Each part is processed in order using a "part handler". Handler are registered
135 for a certain part type.
135 for a certain part type.
136
136
137 The matching of a part to its handler is case insensitive. The case of the
137 The matching of a part to its handler is case insensitive. The case of the
138 part type is used to know if a part is mandatory or advisory. If the Part type
138 part type is used to know if a part is mandatory or advisory. If the Part type
139 contains any uppercase char it is considered mandatory. When no handler is
139 contains any uppercase char it is considered mandatory. When no handler is
140 known for a Mandatory part, the process is aborted and an exception is raised.
140 known for a Mandatory part, the process is aborted and an exception is raised.
141 If the part is advisory and no handler is known, the part is ignored. When the
141 If the part is advisory and no handler is known, the part is ignored. When the
142 process is aborted, the full bundle is still read from the stream to keep the
142 process is aborted, the full bundle is still read from the stream to keep the
143 channel usable. But none of the part read from an abort are processed. In the
143 channel usable. But none of the part read from an abort are processed. In the
144 future, dropping the stream may become an option for channel we do not care to
144 future, dropping the stream may become an option for channel we do not care to
145 preserve.
145 preserve.
146 """
146 """
147
147
148 import errno
148 import errno
149 import sys
149 import sys
150 import util
150 import util
151 import struct
151 import struct
152 import urllib
152 import urllib
153 import string
153 import string
154 import obsolete
154 import obsolete
155 import pushkey
155 import pushkey
156 import url
156 import url
157 import re
157 import re
158
158
159 import changegroup, error
159 import changegroup, error
160 from i18n import _
160 from i18n import _
161
161
162 _pack = struct.pack
162 _pack = struct.pack
163 _unpack = struct.unpack
163 _unpack = struct.unpack
164
164
165 _fstreamparamsize = '>i'
165 _fstreamparamsize = '>i'
166 _fpartheadersize = '>i'
166 _fpartheadersize = '>i'
167 _fparttypesize = '>B'
167 _fparttypesize = '>B'
168 _fpartid = '>I'
168 _fpartid = '>I'
169 _fpayloadsize = '>i'
169 _fpayloadsize = '>i'
170 _fpartparamcount = '>BB'
170 _fpartparamcount = '>BB'
171
171
172 preferedchunksize = 4096
172 preferedchunksize = 4096
173
173
174 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
174 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
175
175
176 def validateparttype(parttype):
176 def validateparttype(parttype):
177 """raise ValueError if a parttype contains invalid character"""
177 """raise ValueError if a parttype contains invalid character"""
178 if _parttypeforbidden.search(parttype):
178 if _parttypeforbidden.search(parttype):
179 raise ValueError(parttype)
179 raise ValueError(parttype)
180
180
181 def _makefpartparamsizes(nbparams):
181 def _makefpartparamsizes(nbparams):
182 """return a struct format to read part parameter sizes
182 """return a struct format to read part parameter sizes
183
183
184 The number parameters is variable so we need to build that format
184 The number parameters is variable so we need to build that format
185 dynamically.
185 dynamically.
186 """
186 """
187 return '>'+('BB'*nbparams)
187 return '>'+('BB'*nbparams)
188
188
189 parthandlermapping = {}
189 parthandlermapping = {}
190
190
191 def parthandler(parttype, params=()):
191 def parthandler(parttype, params=()):
192 """decorator that register a function as a bundle2 part handler
192 """decorator that register a function as a bundle2 part handler
193
193
194 eg::
194 eg::
195
195
196 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
196 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
197 def myparttypehandler(...):
197 def myparttypehandler(...):
198 '''process a part of type "my part".'''
198 '''process a part of type "my part".'''
199 ...
199 ...
200 """
200 """
201 validateparttype(parttype)
201 validateparttype(parttype)
202 def _decorator(func):
202 def _decorator(func):
203 lparttype = parttype.lower() # enforce lower case matching.
203 lparttype = parttype.lower() # enforce lower case matching.
204 assert lparttype not in parthandlermapping
204 assert lparttype not in parthandlermapping
205 parthandlermapping[lparttype] = func
205 parthandlermapping[lparttype] = func
206 func.params = frozenset(params)
206 func.params = frozenset(params)
207 return func
207 return func
208 return _decorator
208 return _decorator
209
209
210 class unbundlerecords(object):
210 class unbundlerecords(object):
211 """keep record of what happens during and unbundle
211 """keep record of what happens during and unbundle
212
212
213 New records are added using `records.add('cat', obj)`. Where 'cat' is a
213 New records are added using `records.add('cat', obj)`. Where 'cat' is a
214 category of record and obj is an arbitrary object.
214 category of record and obj is an arbitrary object.
215
215
216 `records['cat']` will return all entries of this category 'cat'.
216 `records['cat']` will return all entries of this category 'cat'.
217
217
218 Iterating on the object itself will yield `('category', obj)` tuples
218 Iterating on the object itself will yield `('category', obj)` tuples
219 for all entries.
219 for all entries.
220
220
221 All iterations happens in chronological order.
221 All iterations happens in chronological order.
222 """
222 """
223
223
224 def __init__(self):
224 def __init__(self):
225 self._categories = {}
225 self._categories = {}
226 self._sequences = []
226 self._sequences = []
227 self._replies = {}
227 self._replies = {}
228
228
229 def add(self, category, entry, inreplyto=None):
229 def add(self, category, entry, inreplyto=None):
230 """add a new record of a given category.
230 """add a new record of a given category.
231
231
232 The entry can then be retrieved in the list returned by
232 The entry can then be retrieved in the list returned by
233 self['category']."""
233 self['category']."""
234 self._categories.setdefault(category, []).append(entry)
234 self._categories.setdefault(category, []).append(entry)
235 self._sequences.append((category, entry))
235 self._sequences.append((category, entry))
236 if inreplyto is not None:
236 if inreplyto is not None:
237 self.getreplies(inreplyto).add(category, entry)
237 self.getreplies(inreplyto).add(category, entry)
238
238
239 def getreplies(self, partid):
239 def getreplies(self, partid):
240 """get the records that are replies to a specific part"""
240 """get the records that are replies to a specific part"""
241 return self._replies.setdefault(partid, unbundlerecords())
241 return self._replies.setdefault(partid, unbundlerecords())
242
242
243 def __getitem__(self, cat):
243 def __getitem__(self, cat):
244 return tuple(self._categories.get(cat, ()))
244 return tuple(self._categories.get(cat, ()))
245
245
246 def __iter__(self):
246 def __iter__(self):
247 return iter(self._sequences)
247 return iter(self._sequences)
248
248
249 def __len__(self):
249 def __len__(self):
250 return len(self._sequences)
250 return len(self._sequences)
251
251
252 def __nonzero__(self):
252 def __nonzero__(self):
253 return bool(self._sequences)
253 return bool(self._sequences)
254
254
255 class bundleoperation(object):
255 class bundleoperation(object):
256 """an object that represents a single bundling process
256 """an object that represents a single bundling process
257
257
258 Its purpose is to carry unbundle-related objects and states.
258 Its purpose is to carry unbundle-related objects and states.
259
259
260 A new object should be created at the beginning of each bundle processing.
260 A new object should be created at the beginning of each bundle processing.
261 The object is to be returned by the processing function.
261 The object is to be returned by the processing function.
262
262
263 The object has very little content now it will ultimately contain:
263 The object has very little content now it will ultimately contain:
264 * an access to the repo the bundle is applied to,
264 * an access to the repo the bundle is applied to,
265 * a ui object,
265 * a ui object,
266 * a way to retrieve a transaction to add changes to the repo,
266 * a way to retrieve a transaction to add changes to the repo,
267 * a way to record the result of processing each part,
267 * a way to record the result of processing each part,
268 * a way to construct a bundle response when applicable.
268 * a way to construct a bundle response when applicable.
269 """
269 """
270
270
271 def __init__(self, repo, transactiongetter):
271 def __init__(self, repo, transactiongetter):
272 self.repo = repo
272 self.repo = repo
273 self.ui = repo.ui
273 self.ui = repo.ui
274 self.records = unbundlerecords()
274 self.records = unbundlerecords()
275 self.gettransaction = transactiongetter
275 self.gettransaction = transactiongetter
276 self.reply = None
276 self.reply = None
277
277
278 class TransactionUnavailable(RuntimeError):
278 class TransactionUnavailable(RuntimeError):
279 pass
279 pass
280
280
281 def _notransaction():
281 def _notransaction():
282 """default method to get a transaction while processing a bundle
282 """default method to get a transaction while processing a bundle
283
283
284 Raise an exception to highlight the fact that no transaction was expected
284 Raise an exception to highlight the fact that no transaction was expected
285 to be created"""
285 to be created"""
286 raise TransactionUnavailable()
286 raise TransactionUnavailable()
287
287
288 def processbundle(repo, unbundler, transactiongetter=None):
288 def processbundle(repo, unbundler, transactiongetter=None):
289 """This function process a bundle, apply effect to/from a repo
289 """This function process a bundle, apply effect to/from a repo
290
290
291 It iterates over each part then searches for and uses the proper handling
291 It iterates over each part then searches for and uses the proper handling
292 code to process the part. Parts are processed in order.
292 code to process the part. Parts are processed in order.
293
293
294 This is very early version of this function that will be strongly reworked
294 This is very early version of this function that will be strongly reworked
295 before final usage.
295 before final usage.
296
296
297 Unknown Mandatory part will abort the process.
297 Unknown Mandatory part will abort the process.
298 """
298 """
299 if transactiongetter is None:
299 if transactiongetter is None:
300 transactiongetter = _notransaction
300 transactiongetter = _notransaction
301 op = bundleoperation(repo, transactiongetter)
301 op = bundleoperation(repo, transactiongetter)
302 # todo:
302 # todo:
303 # - replace this is a init function soon.
303 # - replace this is a init function soon.
304 # - exception catching
304 # - exception catching
305 unbundler.params
305 unbundler.params
306 iterparts = unbundler.iterparts()
306 iterparts = unbundler.iterparts()
307 part = None
307 part = None
308 try:
308 try:
309 for part in iterparts:
309 for part in iterparts:
310 _processpart(op, part)
310 _processpart(op, part)
311 except Exception, exc:
311 except Exception, exc:
312 for part in iterparts:
312 for part in iterparts:
313 # consume the bundle content
313 # consume the bundle content
314 part.seek(0, 2)
314 part.seek(0, 2)
315 # Small hack to let caller code distinguish exceptions from bundle2
315 # Small hack to let caller code distinguish exceptions from bundle2
316 # processing from processing the old format. This is mostly
316 # processing from processing the old format. This is mostly
317 # needed to handle different return codes to unbundle according to the
317 # needed to handle different return codes to unbundle according to the
318 # type of bundle. We should probably clean up or drop this return code
318 # type of bundle. We should probably clean up or drop this return code
319 # craziness in a future version.
319 # craziness in a future version.
320 exc.duringunbundle2 = True
320 exc.duringunbundle2 = True
321 raise
321 raise
322 return op
322 return op
323
323
324 def _processpart(op, part):
324 def _processpart(op, part):
325 """process a single part from a bundle
325 """process a single part from a bundle
326
326
327 The part is guaranteed to have been fully consumed when the function exits
327 The part is guaranteed to have been fully consumed when the function exits
328 (even if an exception is raised)."""
328 (even if an exception is raised)."""
329 try:
329 try:
330 try:
330 try:
331 handler = parthandlermapping.get(part.type)
331 handler = parthandlermapping.get(part.type)
332 if handler is None:
332 if handler is None:
333 raise error.UnsupportedPartError(parttype=part.type)
333 raise error.UnsupportedPartError(parttype=part.type)
334 op.ui.debug('found a handler for part %r\n' % part.type)
334 op.ui.debug('found a handler for part %r\n' % part.type)
335 unknownparams = part.mandatorykeys - handler.params
335 unknownparams = part.mandatorykeys - handler.params
336 if unknownparams:
336 if unknownparams:
337 unknownparams = list(unknownparams)
337 unknownparams = list(unknownparams)
338 unknownparams.sort()
338 unknownparams.sort()
339 raise error.UnsupportedPartError(parttype=part.type,
339 raise error.UnsupportedPartError(parttype=part.type,
340 params=unknownparams)
340 params=unknownparams)
341 except error.UnsupportedPartError, exc:
341 except error.UnsupportedPartError, exc:
342 if part.mandatory: # mandatory parts
342 if part.mandatory: # mandatory parts
343 raise
343 raise
344 op.ui.debug('ignoring unsupported advisory part %s\n' % exc)
344 op.ui.debug('ignoring unsupported advisory part %s\n' % exc)
345 return # skip to part processing
345 return # skip to part processing
346
346
347 # handler is called outside the above try block so that we don't
347 # handler is called outside the above try block so that we don't
348 # risk catching KeyErrors from anything other than the
348 # risk catching KeyErrors from anything other than the
349 # parthandlermapping lookup (any KeyError raised by handler()
349 # parthandlermapping lookup (any KeyError raised by handler()
350 # itself represents a defect of a different variety).
350 # itself represents a defect of a different variety).
351 output = None
351 output = None
352 if op.reply is not None:
352 if op.reply is not None:
353 op.ui.pushbuffer(error=True)
353 op.ui.pushbuffer(error=True)
354 output = ''
354 output = ''
355 try:
355 try:
356 handler(op, part)
356 handler(op, part)
357 finally:
357 finally:
358 if output is not None:
358 if output is not None:
359 output = op.ui.popbuffer()
359 output = op.ui.popbuffer()
360 if output:
360 if output:
361 outpart = op.reply.newpart('b2x:output', data=output,
361 outpart = op.reply.newpart('b2x:output', data=output,
362 mandatory=False)
362 mandatory=False)
363 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
363 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
364 finally:
364 finally:
365 # consume the part content to not corrupt the stream.
365 # consume the part content to not corrupt the stream.
366 part.seek(0, 2)
366 part.seek(0, 2)
367
367
368
368
369 def decodecaps(blob):
369 def decodecaps(blob):
370 """decode a bundle2 caps bytes blob into a dictionary
370 """decode a bundle2 caps bytes blob into a dictionary
371
371
372 The blob is a list of capabilities (one per line)
372 The blob is a list of capabilities (one per line)
373 Capabilities may have values using a line of the form::
373 Capabilities may have values using a line of the form::
374
374
375 capability=value1,value2,value3
375 capability=value1,value2,value3
376
376
377 The values are always a list."""
377 The values are always a list."""
378 caps = {}
378 caps = {}
379 for line in blob.splitlines():
379 for line in blob.splitlines():
380 if not line:
380 if not line:
381 continue
381 continue
382 if '=' not in line:
382 if '=' not in line:
383 key, vals = line, ()
383 key, vals = line, ()
384 else:
384 else:
385 key, vals = line.split('=', 1)
385 key, vals = line.split('=', 1)
386 vals = vals.split(',')
386 vals = vals.split(',')
387 key = urllib.unquote(key)
387 key = urllib.unquote(key)
388 vals = [urllib.unquote(v) for v in vals]
388 vals = [urllib.unquote(v) for v in vals]
389 caps[key] = vals
389 caps[key] = vals
390 return caps
390 return caps
391
391
392 def encodecaps(caps):
392 def encodecaps(caps):
393 """encode a bundle2 caps dictionary into a bytes blob"""
393 """encode a bundle2 caps dictionary into a bytes blob"""
394 chunks = []
394 chunks = []
395 for ca in sorted(caps):
395 for ca in sorted(caps):
396 vals = caps[ca]
396 vals = caps[ca]
397 ca = urllib.quote(ca)
397 ca = urllib.quote(ca)
398 vals = [urllib.quote(v) for v in vals]
398 vals = [urllib.quote(v) for v in vals]
399 if vals:
399 if vals:
400 ca = "%s=%s" % (ca, ','.join(vals))
400 ca = "%s=%s" % (ca, ','.join(vals))
401 chunks.append(ca)
401 chunks.append(ca)
402 return '\n'.join(chunks)
402 return '\n'.join(chunks)
403
403
404 class bundle20(object):
404 class bundle20(object):
405 """represent an outgoing bundle2 container
405 """represent an outgoing bundle2 container
406
406
407 Use the `addparam` method to add stream level parameter. and `newpart` to
407 Use the `addparam` method to add stream level parameter. and `newpart` to
408 populate it. Then call `getchunks` to retrieve all the binary chunks of
408 populate it. Then call `getchunks` to retrieve all the binary chunks of
409 data that compose the bundle2 container."""
409 data that compose the bundle2 container."""
410
410
411 _magicstring = 'HG2Y'
411 _magicstring = 'HG2Y'
412
412
413 def __init__(self, ui, capabilities=()):
413 def __init__(self, ui, capabilities=()):
414 self.ui = ui
414 self.ui = ui
415 self._params = []
415 self._params = []
416 self._parts = []
416 self._parts = []
417 self.capabilities = dict(capabilities)
417 self.capabilities = dict(capabilities)
418
418
419 @property
419 @property
420 def nbparts(self):
420 def nbparts(self):
421 """total number of parts added to the bundler"""
421 """total number of parts added to the bundler"""
422 return len(self._parts)
422 return len(self._parts)
423
423
424 # methods used to defines the bundle2 content
424 # methods used to defines the bundle2 content
425 def addparam(self, name, value=None):
425 def addparam(self, name, value=None):
426 """add a stream level parameter"""
426 """add a stream level parameter"""
427 if not name:
427 if not name:
428 raise ValueError('empty parameter name')
428 raise ValueError('empty parameter name')
429 if name[0] not in string.letters:
429 if name[0] not in string.letters:
430 raise ValueError('non letter first character: %r' % name)
430 raise ValueError('non letter first character: %r' % name)
431 self._params.append((name, value))
431 self._params.append((name, value))
432
432
433 def addpart(self, part):
433 def addpart(self, part):
434 """add a new part to the bundle2 container
434 """add a new part to the bundle2 container
435
435
436 Parts contains the actual applicative payload."""
436 Parts contains the actual applicative payload."""
437 assert part.id is None
437 assert part.id is None
438 part.id = len(self._parts) # very cheap counter
438 part.id = len(self._parts) # very cheap counter
439 self._parts.append(part)
439 self._parts.append(part)
440
440
441 def newpart(self, typeid, *args, **kwargs):
441 def newpart(self, typeid, *args, **kwargs):
442 """create a new part and add it to the containers
442 """create a new part and add it to the containers
443
443
444 As the part is directly added to the containers. For now, this means
444 As the part is directly added to the containers. For now, this means
445 that any failure to properly initialize the part after calling
445 that any failure to properly initialize the part after calling
446 ``newpart`` should result in a failure of the whole bundling process.
446 ``newpart`` should result in a failure of the whole bundling process.
447
447
448 You can still fall back to manually create and add if you need better
448 You can still fall back to manually create and add if you need better
449 control."""
449 control."""
450 part = bundlepart(typeid, *args, **kwargs)
450 part = bundlepart(typeid, *args, **kwargs)
451 self.addpart(part)
451 self.addpart(part)
452 return part
452 return part
453
453
454 # methods used to generate the bundle2 stream
454 # methods used to generate the bundle2 stream
455 def getchunks(self):
455 def getchunks(self):
456 self.ui.debug('start emission of %s stream\n' % self._magicstring)
456 self.ui.debug('start emission of %s stream\n' % self._magicstring)
457 yield self._magicstring
457 yield self._magicstring
458 param = self._paramchunk()
458 param = self._paramchunk()
459 self.ui.debug('bundle parameter: %s\n' % param)
459 self.ui.debug('bundle parameter: %s\n' % param)
460 yield _pack(_fstreamparamsize, len(param))
460 yield _pack(_fstreamparamsize, len(param))
461 if param:
461 if param:
462 yield param
462 yield param
463
463
464 self.ui.debug('start of parts\n')
464 self.ui.debug('start of parts\n')
465 for part in self._parts:
465 for part in self._parts:
466 self.ui.debug('bundle part: "%s"\n' % part.type)
466 self.ui.debug('bundle part: "%s"\n' % part.type)
467 for chunk in part.getchunks():
467 for chunk in part.getchunks():
468 yield chunk
468 yield chunk
469 self.ui.debug('end of bundle\n')
469 self.ui.debug('end of bundle\n')
470 yield _pack(_fpartheadersize, 0)
470 yield _pack(_fpartheadersize, 0)
471
471
472 def _paramchunk(self):
472 def _paramchunk(self):
473 """return a encoded version of all stream parameters"""
473 """return a encoded version of all stream parameters"""
474 blocks = []
474 blocks = []
475 for par, value in self._params:
475 for par, value in self._params:
476 par = urllib.quote(par)
476 par = urllib.quote(par)
477 if value is not None:
477 if value is not None:
478 value = urllib.quote(value)
478 value = urllib.quote(value)
479 par = '%s=%s' % (par, value)
479 par = '%s=%s' % (par, value)
480 blocks.append(par)
480 blocks.append(par)
481 return ' '.join(blocks)
481 return ' '.join(blocks)
482
482
483 class unpackermixin(object):
483 class unpackermixin(object):
484 """A mixin to extract bytes and struct data from a stream"""
484 """A mixin to extract bytes and struct data from a stream"""
485
485
486 def __init__(self, fp):
486 def __init__(self, fp):
487 self._fp = fp
487 self._fp = fp
488 self._seekable = (util.safehasattr(fp, 'seek') and
488 self._seekable = (util.safehasattr(fp, 'seek') and
489 util.safehasattr(fp, 'tell'))
489 util.safehasattr(fp, 'tell'))
490
490
491 def _unpack(self, format):
491 def _unpack(self, format):
492 """unpack this struct format from the stream"""
492 """unpack this struct format from the stream"""
493 data = self._readexact(struct.calcsize(format))
493 data = self._readexact(struct.calcsize(format))
494 return _unpack(format, data)
494 return _unpack(format, data)
495
495
496 def _readexact(self, size):
496 def _readexact(self, size):
497 """read exactly <size> bytes from the stream"""
497 """read exactly <size> bytes from the stream"""
498 return changegroup.readexactly(self._fp, size)
498 return changegroup.readexactly(self._fp, size)
499
499
500 def seek(self, offset, whence=0):
500 def seek(self, offset, whence=0):
501 """move the underlying file pointer"""
501 """move the underlying file pointer"""
502 if self._seekable:
502 if self._seekable:
503 return self._fp.seek(offset, whence)
503 return self._fp.seek(offset, whence)
504 else:
504 else:
505 raise NotImplementedError(_('File pointer is not seekable'))
505 raise NotImplementedError(_('File pointer is not seekable'))
506
506
507 def tell(self):
507 def tell(self):
508 """return the file offset, or None if file is not seekable"""
508 """return the file offset, or None if file is not seekable"""
509 if self._seekable:
509 if self._seekable:
510 try:
510 try:
511 return self._fp.tell()
511 return self._fp.tell()
512 except IOError, e:
512 except IOError, e:
513 if e.errno == errno.ESPIPE:
513 if e.errno == errno.ESPIPE:
514 self._seekable = False
514 self._seekable = False
515 else:
515 else:
516 raise
516 raise
517 return None
517 return None
518
518
519 def close(self):
519 def close(self):
520 """close underlying file"""
520 """close underlying file"""
521 if util.safehasattr(self._fp, 'close'):
521 if util.safehasattr(self._fp, 'close'):
522 return self._fp.close()
522 return self._fp.close()
523
523
524 def getunbundler(ui, fp, header=None):
524 def getunbundler(ui, fp, header=None):
525 """return a valid unbundler object for a given header"""
525 """return a valid unbundler object for a given header"""
526 return unbundle20(ui, fp, header)
526 if header is None:
527 header = changegroup.readexactly(fp, 4)
528 magic, version = header[0:2], header[2:4]
529 if magic != 'HG':
530 raise util.Abort(_('not a Mercurial bundle'))
531 if version != '2Y':
532 raise util.Abort(_('unknown bundle version %s') % version)
533 unbundler = unbundle20(ui, fp)
534 ui.debug('start processing of %s stream\n' % header)
535 return unbundler
527
536
528 class unbundle20(unpackermixin):
537 class unbundle20(unpackermixin):
529 """interpret a bundle2 stream
538 """interpret a bundle2 stream
530
539
531 This class is fed with a binary stream and yields parts through its
540 This class is fed with a binary stream and yields parts through its
532 `iterparts` methods."""
541 `iterparts` methods."""
533
542
534 def __init__(self, ui, fp, header=None):
543 def __init__(self, ui, fp):
535 """If header is specified, we do not read it out of the stream."""
544 """If header is specified, we do not read it out of the stream."""
536 self.ui = ui
545 self.ui = ui
537 super(unbundle20, self).__init__(fp)
546 super(unbundle20, self).__init__(fp)
538 if header is None:
539 header = self._readexact(4)
540 magic, version = header[0:2], header[2:4]
541 if magic != 'HG':
542 raise util.Abort(_('not a Mercurial bundle'))
543 if version != '2Y':
544 raise util.Abort(_('unknown bundle version %s') % version)
545 self.ui.debug('start processing of %s stream\n' % header)
546
547
547 @util.propertycache
548 @util.propertycache
548 def params(self):
549 def params(self):
549 """dictionary of stream level parameters"""
550 """dictionary of stream level parameters"""
550 self.ui.debug('reading bundle2 stream parameters\n')
551 self.ui.debug('reading bundle2 stream parameters\n')
551 params = {}
552 params = {}
552 paramssize = self._unpack(_fstreamparamsize)[0]
553 paramssize = self._unpack(_fstreamparamsize)[0]
553 if paramssize < 0:
554 if paramssize < 0:
554 raise error.BundleValueError('negative bundle param size: %i'
555 raise error.BundleValueError('negative bundle param size: %i'
555 % paramssize)
556 % paramssize)
556 if paramssize:
557 if paramssize:
557 for p in self._readexact(paramssize).split(' '):
558 for p in self._readexact(paramssize).split(' '):
558 p = p.split('=', 1)
559 p = p.split('=', 1)
559 p = [urllib.unquote(i) for i in p]
560 p = [urllib.unquote(i) for i in p]
560 if len(p) < 2:
561 if len(p) < 2:
561 p.append(None)
562 p.append(None)
562 self._processparam(*p)
563 self._processparam(*p)
563 params[p[0]] = p[1]
564 params[p[0]] = p[1]
564 return params
565 return params
565
566
566 def _processparam(self, name, value):
567 def _processparam(self, name, value):
567 """process a parameter, applying its effect if needed
568 """process a parameter, applying its effect if needed
568
569
569 Parameter starting with a lower case letter are advisory and will be
570 Parameter starting with a lower case letter are advisory and will be
570 ignored when unknown. Those starting with an upper case letter are
571 ignored when unknown. Those starting with an upper case letter are
571 mandatory and will this function will raise a KeyError when unknown.
572 mandatory and will this function will raise a KeyError when unknown.
572
573
573 Note: no option are currently supported. Any input will be either
574 Note: no option are currently supported. Any input will be either
574 ignored or failing.
575 ignored or failing.
575 """
576 """
576 if not name:
577 if not name:
577 raise ValueError('empty parameter name')
578 raise ValueError('empty parameter name')
578 if name[0] not in string.letters:
579 if name[0] not in string.letters:
579 raise ValueError('non letter first character: %r' % name)
580 raise ValueError('non letter first character: %r' % name)
580 # Some logic will be later added here to try to process the option for
581 # Some logic will be later added here to try to process the option for
581 # a dict of known parameter.
582 # a dict of known parameter.
582 if name[0].islower():
583 if name[0].islower():
583 self.ui.debug("ignoring unknown parameter %r\n" % name)
584 self.ui.debug("ignoring unknown parameter %r\n" % name)
584 else:
585 else:
585 raise error.UnsupportedPartError(params=(name,))
586 raise error.UnsupportedPartError(params=(name,))
586
587
587
588
588 def iterparts(self):
589 def iterparts(self):
589 """yield all parts contained in the stream"""
590 """yield all parts contained in the stream"""
590 # make sure param have been loaded
591 # make sure param have been loaded
591 self.params
592 self.params
592 self.ui.debug('start extraction of bundle2 parts\n')
593 self.ui.debug('start extraction of bundle2 parts\n')
593 headerblock = self._readpartheader()
594 headerblock = self._readpartheader()
594 while headerblock is not None:
595 while headerblock is not None:
595 part = unbundlepart(self.ui, headerblock, self._fp)
596 part = unbundlepart(self.ui, headerblock, self._fp)
596 yield part
597 yield part
597 part.seek(0, 2)
598 part.seek(0, 2)
598 headerblock = self._readpartheader()
599 headerblock = self._readpartheader()
599 self.ui.debug('end of bundle2 stream\n')
600 self.ui.debug('end of bundle2 stream\n')
600
601
601 def _readpartheader(self):
602 def _readpartheader(self):
602 """reads a part header size and return the bytes blob
603 """reads a part header size and return the bytes blob
603
604
604 returns None if empty"""
605 returns None if empty"""
605 headersize = self._unpack(_fpartheadersize)[0]
606 headersize = self._unpack(_fpartheadersize)[0]
606 if headersize < 0:
607 if headersize < 0:
607 raise error.BundleValueError('negative part header size: %i'
608 raise error.BundleValueError('negative part header size: %i'
608 % headersize)
609 % headersize)
609 self.ui.debug('part header size: %i\n' % headersize)
610 self.ui.debug('part header size: %i\n' % headersize)
610 if headersize:
611 if headersize:
611 return self._readexact(headersize)
612 return self._readexact(headersize)
612 return None
613 return None
613
614
614 def compressed(self):
615 def compressed(self):
615 return False
616 return False
616
617
617 class bundlepart(object):
618 class bundlepart(object):
618 """A bundle2 part contains application level payload
619 """A bundle2 part contains application level payload
619
620
620 The part `type` is used to route the part to the application level
621 The part `type` is used to route the part to the application level
621 handler.
622 handler.
622
623
623 The part payload is contained in ``part.data``. It could be raw bytes or a
624 The part payload is contained in ``part.data``. It could be raw bytes or a
624 generator of byte chunks.
625 generator of byte chunks.
625
626
626 You can add parameters to the part using the ``addparam`` method.
627 You can add parameters to the part using the ``addparam`` method.
627 Parameters can be either mandatory (default) or advisory. Remote side
628 Parameters can be either mandatory (default) or advisory. Remote side
628 should be able to safely ignore the advisory ones.
629 should be able to safely ignore the advisory ones.
629
630
630 Both data and parameters cannot be modified after the generation has begun.
631 Both data and parameters cannot be modified after the generation has begun.
631 """
632 """
632
633
633 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
634 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
634 data='', mandatory=True):
635 data='', mandatory=True):
635 validateparttype(parttype)
636 validateparttype(parttype)
636 self.id = None
637 self.id = None
637 self.type = parttype
638 self.type = parttype
638 self._data = data
639 self._data = data
639 self._mandatoryparams = list(mandatoryparams)
640 self._mandatoryparams = list(mandatoryparams)
640 self._advisoryparams = list(advisoryparams)
641 self._advisoryparams = list(advisoryparams)
641 # checking for duplicated entries
642 # checking for duplicated entries
642 self._seenparams = set()
643 self._seenparams = set()
643 for pname, __ in self._mandatoryparams + self._advisoryparams:
644 for pname, __ in self._mandatoryparams + self._advisoryparams:
644 if pname in self._seenparams:
645 if pname in self._seenparams:
645 raise RuntimeError('duplicated params: %s' % pname)
646 raise RuntimeError('duplicated params: %s' % pname)
646 self._seenparams.add(pname)
647 self._seenparams.add(pname)
647 # status of the part's generation:
648 # status of the part's generation:
648 # - None: not started,
649 # - None: not started,
649 # - False: currently generated,
650 # - False: currently generated,
650 # - True: generation done.
651 # - True: generation done.
651 self._generated = None
652 self._generated = None
652 self.mandatory = mandatory
653 self.mandatory = mandatory
653
654
654 # methods used to defines the part content
655 # methods used to defines the part content
655 def __setdata(self, data):
656 def __setdata(self, data):
656 if self._generated is not None:
657 if self._generated is not None:
657 raise error.ReadOnlyPartError('part is being generated')
658 raise error.ReadOnlyPartError('part is being generated')
658 self._data = data
659 self._data = data
659 def __getdata(self):
660 def __getdata(self):
660 return self._data
661 return self._data
661 data = property(__getdata, __setdata)
662 data = property(__getdata, __setdata)
662
663
663 @property
664 @property
664 def mandatoryparams(self):
665 def mandatoryparams(self):
665 # make it an immutable tuple to force people through ``addparam``
666 # make it an immutable tuple to force people through ``addparam``
666 return tuple(self._mandatoryparams)
667 return tuple(self._mandatoryparams)
667
668
668 @property
669 @property
669 def advisoryparams(self):
670 def advisoryparams(self):
670 # make it an immutable tuple to force people through ``addparam``
671 # make it an immutable tuple to force people through ``addparam``
671 return tuple(self._advisoryparams)
672 return tuple(self._advisoryparams)
672
673
673 def addparam(self, name, value='', mandatory=True):
674 def addparam(self, name, value='', mandatory=True):
674 if self._generated is not None:
675 if self._generated is not None:
675 raise error.ReadOnlyPartError('part is being generated')
676 raise error.ReadOnlyPartError('part is being generated')
676 if name in self._seenparams:
677 if name in self._seenparams:
677 raise ValueError('duplicated params: %s' % name)
678 raise ValueError('duplicated params: %s' % name)
678 self._seenparams.add(name)
679 self._seenparams.add(name)
679 params = self._advisoryparams
680 params = self._advisoryparams
680 if mandatory:
681 if mandatory:
681 params = self._mandatoryparams
682 params = self._mandatoryparams
682 params.append((name, value))
683 params.append((name, value))
683
684
684 # methods used to generates the bundle2 stream
685 # methods used to generates the bundle2 stream
685 def getchunks(self):
686 def getchunks(self):
686 if self._generated is not None:
687 if self._generated is not None:
687 raise RuntimeError('part can only be consumed once')
688 raise RuntimeError('part can only be consumed once')
688 self._generated = False
689 self._generated = False
689 #### header
690 #### header
690 if self.mandatory:
691 if self.mandatory:
691 parttype = self.type.upper()
692 parttype = self.type.upper()
692 else:
693 else:
693 parttype = self.type.lower()
694 parttype = self.type.lower()
694 ## parttype
695 ## parttype
695 header = [_pack(_fparttypesize, len(parttype)),
696 header = [_pack(_fparttypesize, len(parttype)),
696 parttype, _pack(_fpartid, self.id),
697 parttype, _pack(_fpartid, self.id),
697 ]
698 ]
698 ## parameters
699 ## parameters
699 # count
700 # count
700 manpar = self.mandatoryparams
701 manpar = self.mandatoryparams
701 advpar = self.advisoryparams
702 advpar = self.advisoryparams
702 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
703 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
703 # size
704 # size
704 parsizes = []
705 parsizes = []
705 for key, value in manpar:
706 for key, value in manpar:
706 parsizes.append(len(key))
707 parsizes.append(len(key))
707 parsizes.append(len(value))
708 parsizes.append(len(value))
708 for key, value in advpar:
709 for key, value in advpar:
709 parsizes.append(len(key))
710 parsizes.append(len(key))
710 parsizes.append(len(value))
711 parsizes.append(len(value))
711 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
712 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
712 header.append(paramsizes)
713 header.append(paramsizes)
713 # key, value
714 # key, value
714 for key, value in manpar:
715 for key, value in manpar:
715 header.append(key)
716 header.append(key)
716 header.append(value)
717 header.append(value)
717 for key, value in advpar:
718 for key, value in advpar:
718 header.append(key)
719 header.append(key)
719 header.append(value)
720 header.append(value)
720 ## finalize header
721 ## finalize header
721 headerchunk = ''.join(header)
722 headerchunk = ''.join(header)
722 yield _pack(_fpartheadersize, len(headerchunk))
723 yield _pack(_fpartheadersize, len(headerchunk))
723 yield headerchunk
724 yield headerchunk
724 ## payload
725 ## payload
725 try:
726 try:
726 for chunk in self._payloadchunks():
727 for chunk in self._payloadchunks():
727 yield _pack(_fpayloadsize, len(chunk))
728 yield _pack(_fpayloadsize, len(chunk))
728 yield chunk
729 yield chunk
729 except Exception, exc:
730 except Exception, exc:
730 # backup exception data for later
731 # backup exception data for later
731 exc_info = sys.exc_info()
732 exc_info = sys.exc_info()
732 msg = 'unexpected error: %s' % exc
733 msg = 'unexpected error: %s' % exc
733 interpart = bundlepart('b2x:error:abort', [('message', msg)],
734 interpart = bundlepart('b2x:error:abort', [('message', msg)],
734 mandatory=False)
735 mandatory=False)
735 interpart.id = 0
736 interpart.id = 0
736 yield _pack(_fpayloadsize, -1)
737 yield _pack(_fpayloadsize, -1)
737 for chunk in interpart.getchunks():
738 for chunk in interpart.getchunks():
738 yield chunk
739 yield chunk
739 # abort current part payload
740 # abort current part payload
740 yield _pack(_fpayloadsize, 0)
741 yield _pack(_fpayloadsize, 0)
741 raise exc_info[0], exc_info[1], exc_info[2]
742 raise exc_info[0], exc_info[1], exc_info[2]
742 # end of payload
743 # end of payload
743 yield _pack(_fpayloadsize, 0)
744 yield _pack(_fpayloadsize, 0)
744 self._generated = True
745 self._generated = True
745
746
746 def _payloadchunks(self):
747 def _payloadchunks(self):
747 """yield chunks of a the part payload
748 """yield chunks of a the part payload
748
749
749 Exists to handle the different methods to provide data to a part."""
750 Exists to handle the different methods to provide data to a part."""
750 # we only support fixed size data now.
751 # we only support fixed size data now.
751 # This will be improved in the future.
752 # This will be improved in the future.
752 if util.safehasattr(self.data, 'next'):
753 if util.safehasattr(self.data, 'next'):
753 buff = util.chunkbuffer(self.data)
754 buff = util.chunkbuffer(self.data)
754 chunk = buff.read(preferedchunksize)
755 chunk = buff.read(preferedchunksize)
755 while chunk:
756 while chunk:
756 yield chunk
757 yield chunk
757 chunk = buff.read(preferedchunksize)
758 chunk = buff.read(preferedchunksize)
758 elif len(self.data):
759 elif len(self.data):
759 yield self.data
760 yield self.data
760
761
761
762
762 flaginterrupt = -1
763 flaginterrupt = -1
763
764
764 class interrupthandler(unpackermixin):
765 class interrupthandler(unpackermixin):
765 """read one part and process it with restricted capability
766 """read one part and process it with restricted capability
766
767
767 This allows to transmit exception raised on the producer size during part
768 This allows to transmit exception raised on the producer size during part
768 iteration while the consumer is reading a part.
769 iteration while the consumer is reading a part.
769
770
770 Part processed in this manner only have access to a ui object,"""
771 Part processed in this manner only have access to a ui object,"""
771
772
772 def __init__(self, ui, fp):
773 def __init__(self, ui, fp):
773 super(interrupthandler, self).__init__(fp)
774 super(interrupthandler, self).__init__(fp)
774 self.ui = ui
775 self.ui = ui
775
776
776 def _readpartheader(self):
777 def _readpartheader(self):
777 """reads a part header size and return the bytes blob
778 """reads a part header size and return the bytes blob
778
779
779 returns None if empty"""
780 returns None if empty"""
780 headersize = self._unpack(_fpartheadersize)[0]
781 headersize = self._unpack(_fpartheadersize)[0]
781 if headersize < 0:
782 if headersize < 0:
782 raise error.BundleValueError('negative part header size: %i'
783 raise error.BundleValueError('negative part header size: %i'
783 % headersize)
784 % headersize)
784 self.ui.debug('part header size: %i\n' % headersize)
785 self.ui.debug('part header size: %i\n' % headersize)
785 if headersize:
786 if headersize:
786 return self._readexact(headersize)
787 return self._readexact(headersize)
787 return None
788 return None
788
789
789 def __call__(self):
790 def __call__(self):
790 self.ui.debug('bundle2 stream interruption, looking for a part.\n')
791 self.ui.debug('bundle2 stream interruption, looking for a part.\n')
791 headerblock = self._readpartheader()
792 headerblock = self._readpartheader()
792 if headerblock is None:
793 if headerblock is None:
793 self.ui.debug('no part found during interruption.\n')
794 self.ui.debug('no part found during interruption.\n')
794 return
795 return
795 part = unbundlepart(self.ui, headerblock, self._fp)
796 part = unbundlepart(self.ui, headerblock, self._fp)
796 op = interruptoperation(self.ui)
797 op = interruptoperation(self.ui)
797 _processpart(op, part)
798 _processpart(op, part)
798
799
799 class interruptoperation(object):
800 class interruptoperation(object):
800 """A limited operation to be use by part handler during interruption
801 """A limited operation to be use by part handler during interruption
801
802
802 It only have access to an ui object.
803 It only have access to an ui object.
803 """
804 """
804
805
805 def __init__(self, ui):
806 def __init__(self, ui):
806 self.ui = ui
807 self.ui = ui
807 self.reply = None
808 self.reply = None
808
809
809 @property
810 @property
810 def repo(self):
811 def repo(self):
811 raise RuntimeError('no repo access from stream interruption')
812 raise RuntimeError('no repo access from stream interruption')
812
813
813 def gettransaction(self):
814 def gettransaction(self):
814 raise TransactionUnavailable('no repo access from stream interruption')
815 raise TransactionUnavailable('no repo access from stream interruption')
815
816
816 class unbundlepart(unpackermixin):
817 class unbundlepart(unpackermixin):
817 """a bundle part read from a bundle"""
818 """a bundle part read from a bundle"""
818
819
819 def __init__(self, ui, header, fp):
820 def __init__(self, ui, header, fp):
820 super(unbundlepart, self).__init__(fp)
821 super(unbundlepart, self).__init__(fp)
821 self.ui = ui
822 self.ui = ui
822 # unbundle state attr
823 # unbundle state attr
823 self._headerdata = header
824 self._headerdata = header
824 self._headeroffset = 0
825 self._headeroffset = 0
825 self._initialized = False
826 self._initialized = False
826 self.consumed = False
827 self.consumed = False
827 # part data
828 # part data
828 self.id = None
829 self.id = None
829 self.type = None
830 self.type = None
830 self.mandatoryparams = None
831 self.mandatoryparams = None
831 self.advisoryparams = None
832 self.advisoryparams = None
832 self.params = None
833 self.params = None
833 self.mandatorykeys = ()
834 self.mandatorykeys = ()
834 self._payloadstream = None
835 self._payloadstream = None
835 self._readheader()
836 self._readheader()
836 self._mandatory = None
837 self._mandatory = None
837 self._chunkindex = [] #(payload, file) position tuples for chunk starts
838 self._chunkindex = [] #(payload, file) position tuples for chunk starts
838 self._pos = 0
839 self._pos = 0
839
840
840 def _fromheader(self, size):
841 def _fromheader(self, size):
841 """return the next <size> byte from the header"""
842 """return the next <size> byte from the header"""
842 offset = self._headeroffset
843 offset = self._headeroffset
843 data = self._headerdata[offset:(offset + size)]
844 data = self._headerdata[offset:(offset + size)]
844 self._headeroffset = offset + size
845 self._headeroffset = offset + size
845 return data
846 return data
846
847
847 def _unpackheader(self, format):
848 def _unpackheader(self, format):
848 """read given format from header
849 """read given format from header
849
850
850 This automatically compute the size of the format to read."""
851 This automatically compute the size of the format to read."""
851 data = self._fromheader(struct.calcsize(format))
852 data = self._fromheader(struct.calcsize(format))
852 return _unpack(format, data)
853 return _unpack(format, data)
853
854
854 def _initparams(self, mandatoryparams, advisoryparams):
855 def _initparams(self, mandatoryparams, advisoryparams):
855 """internal function to setup all logic related parameters"""
856 """internal function to setup all logic related parameters"""
856 # make it read only to prevent people touching it by mistake.
857 # make it read only to prevent people touching it by mistake.
857 self.mandatoryparams = tuple(mandatoryparams)
858 self.mandatoryparams = tuple(mandatoryparams)
858 self.advisoryparams = tuple(advisoryparams)
859 self.advisoryparams = tuple(advisoryparams)
859 # user friendly UI
860 # user friendly UI
860 self.params = dict(self.mandatoryparams)
861 self.params = dict(self.mandatoryparams)
861 self.params.update(dict(self.advisoryparams))
862 self.params.update(dict(self.advisoryparams))
862 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
863 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
863
864
864 def _payloadchunks(self, chunknum=0):
865 def _payloadchunks(self, chunknum=0):
865 '''seek to specified chunk and start yielding data'''
866 '''seek to specified chunk and start yielding data'''
866 if len(self._chunkindex) == 0:
867 if len(self._chunkindex) == 0:
867 assert chunknum == 0, 'Must start with chunk 0'
868 assert chunknum == 0, 'Must start with chunk 0'
868 self._chunkindex.append((0, super(unbundlepart, self).tell()))
869 self._chunkindex.append((0, super(unbundlepart, self).tell()))
869 else:
870 else:
870 assert chunknum < len(self._chunkindex), \
871 assert chunknum < len(self._chunkindex), \
871 'Unknown chunk %d' % chunknum
872 'Unknown chunk %d' % chunknum
872 super(unbundlepart, self).seek(self._chunkindex[chunknum][1])
873 super(unbundlepart, self).seek(self._chunkindex[chunknum][1])
873
874
874 pos = self._chunkindex[chunknum][0]
875 pos = self._chunkindex[chunknum][0]
875 payloadsize = self._unpack(_fpayloadsize)[0]
876 payloadsize = self._unpack(_fpayloadsize)[0]
876 self.ui.debug('payload chunk size: %i\n' % payloadsize)
877 self.ui.debug('payload chunk size: %i\n' % payloadsize)
877 while payloadsize:
878 while payloadsize:
878 if payloadsize == flaginterrupt:
879 if payloadsize == flaginterrupt:
879 # interruption detection, the handler will now read a
880 # interruption detection, the handler will now read a
880 # single part and process it.
881 # single part and process it.
881 interrupthandler(self.ui, self._fp)()
882 interrupthandler(self.ui, self._fp)()
882 elif payloadsize < 0:
883 elif payloadsize < 0:
883 msg = 'negative payload chunk size: %i' % payloadsize
884 msg = 'negative payload chunk size: %i' % payloadsize
884 raise error.BundleValueError(msg)
885 raise error.BundleValueError(msg)
885 else:
886 else:
886 result = self._readexact(payloadsize)
887 result = self._readexact(payloadsize)
887 chunknum += 1
888 chunknum += 1
888 pos += payloadsize
889 pos += payloadsize
889 if chunknum == len(self._chunkindex):
890 if chunknum == len(self._chunkindex):
890 self._chunkindex.append((pos,
891 self._chunkindex.append((pos,
891 super(unbundlepart, self).tell()))
892 super(unbundlepart, self).tell()))
892 yield result
893 yield result
893 payloadsize = self._unpack(_fpayloadsize)[0]
894 payloadsize = self._unpack(_fpayloadsize)[0]
894 self.ui.debug('payload chunk size: %i\n' % payloadsize)
895 self.ui.debug('payload chunk size: %i\n' % payloadsize)
895
896
896 def _findchunk(self, pos):
897 def _findchunk(self, pos):
897 '''for a given payload position, return a chunk number and offset'''
898 '''for a given payload position, return a chunk number and offset'''
898 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
899 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
899 if ppos == pos:
900 if ppos == pos:
900 return chunk, 0
901 return chunk, 0
901 elif ppos > pos:
902 elif ppos > pos:
902 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
903 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
903 raise ValueError('Unknown chunk')
904 raise ValueError('Unknown chunk')
904
905
905 def _readheader(self):
906 def _readheader(self):
906 """read the header and setup the object"""
907 """read the header and setup the object"""
907 typesize = self._unpackheader(_fparttypesize)[0]
908 typesize = self._unpackheader(_fparttypesize)[0]
908 self.type = self._fromheader(typesize)
909 self.type = self._fromheader(typesize)
909 self.ui.debug('part type: "%s"\n' % self.type)
910 self.ui.debug('part type: "%s"\n' % self.type)
910 self.id = self._unpackheader(_fpartid)[0]
911 self.id = self._unpackheader(_fpartid)[0]
911 self.ui.debug('part id: "%s"\n' % self.id)
912 self.ui.debug('part id: "%s"\n' % self.id)
912 # extract mandatory bit from type
913 # extract mandatory bit from type
913 self.mandatory = (self.type != self.type.lower())
914 self.mandatory = (self.type != self.type.lower())
914 self.type = self.type.lower()
915 self.type = self.type.lower()
915 ## reading parameters
916 ## reading parameters
916 # param count
917 # param count
917 mancount, advcount = self._unpackheader(_fpartparamcount)
918 mancount, advcount = self._unpackheader(_fpartparamcount)
918 self.ui.debug('part parameters: %i\n' % (mancount + advcount))
919 self.ui.debug('part parameters: %i\n' % (mancount + advcount))
919 # param size
920 # param size
920 fparamsizes = _makefpartparamsizes(mancount + advcount)
921 fparamsizes = _makefpartparamsizes(mancount + advcount)
921 paramsizes = self._unpackheader(fparamsizes)
922 paramsizes = self._unpackheader(fparamsizes)
922 # make it a list of couple again
923 # make it a list of couple again
923 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
924 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
924 # split mandatory from advisory
925 # split mandatory from advisory
925 mansizes = paramsizes[:mancount]
926 mansizes = paramsizes[:mancount]
926 advsizes = paramsizes[mancount:]
927 advsizes = paramsizes[mancount:]
927 # retrieve param value
928 # retrieve param value
928 manparams = []
929 manparams = []
929 for key, value in mansizes:
930 for key, value in mansizes:
930 manparams.append((self._fromheader(key), self._fromheader(value)))
931 manparams.append((self._fromheader(key), self._fromheader(value)))
931 advparams = []
932 advparams = []
932 for key, value in advsizes:
933 for key, value in advsizes:
933 advparams.append((self._fromheader(key), self._fromheader(value)))
934 advparams.append((self._fromheader(key), self._fromheader(value)))
934 self._initparams(manparams, advparams)
935 self._initparams(manparams, advparams)
935 ## part payload
936 ## part payload
936 self._payloadstream = util.chunkbuffer(self._payloadchunks())
937 self._payloadstream = util.chunkbuffer(self._payloadchunks())
937 # we read the data, tell it
938 # we read the data, tell it
938 self._initialized = True
939 self._initialized = True
939
940
940 def read(self, size=None):
941 def read(self, size=None):
941 """read payload data"""
942 """read payload data"""
942 if not self._initialized:
943 if not self._initialized:
943 self._readheader()
944 self._readheader()
944 if size is None:
945 if size is None:
945 data = self._payloadstream.read()
946 data = self._payloadstream.read()
946 else:
947 else:
947 data = self._payloadstream.read(size)
948 data = self._payloadstream.read(size)
948 if size is None or len(data) < size:
949 if size is None or len(data) < size:
949 self.consumed = True
950 self.consumed = True
950 self._pos += len(data)
951 self._pos += len(data)
951 return data
952 return data
952
953
953 def tell(self):
954 def tell(self):
954 return self._pos
955 return self._pos
955
956
956 def seek(self, offset, whence=0):
957 def seek(self, offset, whence=0):
957 if whence == 0:
958 if whence == 0:
958 newpos = offset
959 newpos = offset
959 elif whence == 1:
960 elif whence == 1:
960 newpos = self._pos + offset
961 newpos = self._pos + offset
961 elif whence == 2:
962 elif whence == 2:
962 if not self.consumed:
963 if not self.consumed:
963 self.read()
964 self.read()
964 newpos = self._chunkindex[-1][0] - offset
965 newpos = self._chunkindex[-1][0] - offset
965 else:
966 else:
966 raise ValueError('Unknown whence value: %r' % (whence,))
967 raise ValueError('Unknown whence value: %r' % (whence,))
967
968
968 if newpos > self._chunkindex[-1][0] and not self.consumed:
969 if newpos > self._chunkindex[-1][0] and not self.consumed:
969 self.read()
970 self.read()
970 if not 0 <= newpos <= self._chunkindex[-1][0]:
971 if not 0 <= newpos <= self._chunkindex[-1][0]:
971 raise ValueError('Offset out of range')
972 raise ValueError('Offset out of range')
972
973
973 if self._pos != newpos:
974 if self._pos != newpos:
974 chunk, internaloffset = self._findchunk(newpos)
975 chunk, internaloffset = self._findchunk(newpos)
975 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
976 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
976 adjust = self.read(internaloffset)
977 adjust = self.read(internaloffset)
977 if len(adjust) != internaloffset:
978 if len(adjust) != internaloffset:
978 raise util.Abort(_('Seek failed\n'))
979 raise util.Abort(_('Seek failed\n'))
979 self._pos = newpos
980 self._pos = newpos
980
981
981 capabilities = {'HG2Y': (),
982 capabilities = {'HG2Y': (),
982 'b2x:listkeys': (),
983 'b2x:listkeys': (),
983 'b2x:pushkey': (),
984 'b2x:pushkey': (),
984 'digests': tuple(sorted(util.DIGESTS.keys())),
985 'digests': tuple(sorted(util.DIGESTS.keys())),
985 'b2x:remote-changegroup': ('http', 'https'),
986 'b2x:remote-changegroup': ('http', 'https'),
986 }
987 }
987
988
988 def getrepocaps(repo, allowpushback=False):
989 def getrepocaps(repo, allowpushback=False):
989 """return the bundle2 capabilities for a given repo
990 """return the bundle2 capabilities for a given repo
990
991
991 Exists to allow extensions (like evolution) to mutate the capabilities.
992 Exists to allow extensions (like evolution) to mutate the capabilities.
992 """
993 """
993 caps = capabilities.copy()
994 caps = capabilities.copy()
994 caps['b2x:changegroup'] = tuple(sorted(changegroup.packermap.keys()))
995 caps['b2x:changegroup'] = tuple(sorted(changegroup.packermap.keys()))
995 if obsolete.isenabled(repo, obsolete.exchangeopt):
996 if obsolete.isenabled(repo, obsolete.exchangeopt):
996 supportedformat = tuple('V%i' % v for v in obsolete.formats)
997 supportedformat = tuple('V%i' % v for v in obsolete.formats)
997 caps['b2x:obsmarkers'] = supportedformat
998 caps['b2x:obsmarkers'] = supportedformat
998 if allowpushback:
999 if allowpushback:
999 caps['b2x:pushback'] = ()
1000 caps['b2x:pushback'] = ()
1000 return caps
1001 return caps
1001
1002
1002 def bundle2caps(remote):
1003 def bundle2caps(remote):
1003 """return the bundle capabilities of a peer as dict"""
1004 """return the bundle capabilities of a peer as dict"""
1004 raw = remote.capable('bundle2-exp')
1005 raw = remote.capable('bundle2-exp')
1005 if not raw and raw != '':
1006 if not raw and raw != '':
1006 return {}
1007 return {}
1007 capsblob = urllib.unquote(remote.capable('bundle2-exp'))
1008 capsblob = urllib.unquote(remote.capable('bundle2-exp'))
1008 return decodecaps(capsblob)
1009 return decodecaps(capsblob)
1009
1010
1010 def obsmarkersversion(caps):
1011 def obsmarkersversion(caps):
1011 """extract the list of supported obsmarkers versions from a bundle2caps dict
1012 """extract the list of supported obsmarkers versions from a bundle2caps dict
1012 """
1013 """
1013 obscaps = caps.get('b2x:obsmarkers', ())
1014 obscaps = caps.get('b2x:obsmarkers', ())
1014 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1015 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1015
1016
1016 @parthandler('b2x:changegroup', ('version',))
1017 @parthandler('b2x:changegroup', ('version',))
1017 def handlechangegroup(op, inpart):
1018 def handlechangegroup(op, inpart):
1018 """apply a changegroup part on the repo
1019 """apply a changegroup part on the repo
1019
1020
1020 This is a very early implementation that will massive rework before being
1021 This is a very early implementation that will massive rework before being
1021 inflicted to any end-user.
1022 inflicted to any end-user.
1022 """
1023 """
1023 # Make sure we trigger a transaction creation
1024 # Make sure we trigger a transaction creation
1024 #
1025 #
1025 # The addchangegroup function will get a transaction object by itself, but
1026 # The addchangegroup function will get a transaction object by itself, but
1026 # we need to make sure we trigger the creation of a transaction object used
1027 # we need to make sure we trigger the creation of a transaction object used
1027 # for the whole processing scope.
1028 # for the whole processing scope.
1028 op.gettransaction()
1029 op.gettransaction()
1029 unpackerversion = inpart.params.get('version', '01')
1030 unpackerversion = inpart.params.get('version', '01')
1030 # We should raise an appropriate exception here
1031 # We should raise an appropriate exception here
1031 unpacker = changegroup.packermap[unpackerversion][1]
1032 unpacker = changegroup.packermap[unpackerversion][1]
1032 cg = unpacker(inpart, 'UN')
1033 cg = unpacker(inpart, 'UN')
1033 # the source and url passed here are overwritten by the one contained in
1034 # the source and url passed here are overwritten by the one contained in
1034 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1035 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1035 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
1036 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
1036 op.records.add('changegroup', {'return': ret})
1037 op.records.add('changegroup', {'return': ret})
1037 if op.reply is not None:
1038 if op.reply is not None:
1038 # This is definitely not the final form of this
1039 # This is definitely not the final form of this
1039 # return. But one need to start somewhere.
1040 # return. But one need to start somewhere.
1040 part = op.reply.newpart('b2x:reply:changegroup', mandatory=False)
1041 part = op.reply.newpart('b2x:reply:changegroup', mandatory=False)
1041 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1042 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1042 part.addparam('return', '%i' % ret, mandatory=False)
1043 part.addparam('return', '%i' % ret, mandatory=False)
1043 assert not inpart.read()
1044 assert not inpart.read()
1044
1045
1045 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1046 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1046 ['digest:%s' % k for k in util.DIGESTS.keys()])
1047 ['digest:%s' % k for k in util.DIGESTS.keys()])
1047 @parthandler('b2x:remote-changegroup', _remotechangegroupparams)
1048 @parthandler('b2x:remote-changegroup', _remotechangegroupparams)
1048 def handleremotechangegroup(op, inpart):
1049 def handleremotechangegroup(op, inpart):
1049 """apply a bundle10 on the repo, given an url and validation information
1050 """apply a bundle10 on the repo, given an url and validation information
1050
1051
1051 All the information about the remote bundle to import are given as
1052 All the information about the remote bundle to import are given as
1052 parameters. The parameters include:
1053 parameters. The parameters include:
1053 - url: the url to the bundle10.
1054 - url: the url to the bundle10.
1054 - size: the bundle10 file size. It is used to validate what was
1055 - size: the bundle10 file size. It is used to validate what was
1055 retrieved by the client matches the server knowledge about the bundle.
1056 retrieved by the client matches the server knowledge about the bundle.
1056 - digests: a space separated list of the digest types provided as
1057 - digests: a space separated list of the digest types provided as
1057 parameters.
1058 parameters.
1058 - digest:<digest-type>: the hexadecimal representation of the digest with
1059 - digest:<digest-type>: the hexadecimal representation of the digest with
1059 that name. Like the size, it is used to validate what was retrieved by
1060 that name. Like the size, it is used to validate what was retrieved by
1060 the client matches what the server knows about the bundle.
1061 the client matches what the server knows about the bundle.
1061
1062
1062 When multiple digest types are given, all of them are checked.
1063 When multiple digest types are given, all of them are checked.
1063 """
1064 """
1064 try:
1065 try:
1065 raw_url = inpart.params['url']
1066 raw_url = inpart.params['url']
1066 except KeyError:
1067 except KeyError:
1067 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1068 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1068 parsed_url = util.url(raw_url)
1069 parsed_url = util.url(raw_url)
1069 if parsed_url.scheme not in capabilities['b2x:remote-changegroup']:
1070 if parsed_url.scheme not in capabilities['b2x:remote-changegroup']:
1070 raise util.Abort(_('remote-changegroup does not support %s urls') %
1071 raise util.Abort(_('remote-changegroup does not support %s urls') %
1071 parsed_url.scheme)
1072 parsed_url.scheme)
1072
1073
1073 try:
1074 try:
1074 size = int(inpart.params['size'])
1075 size = int(inpart.params['size'])
1075 except ValueError:
1076 except ValueError:
1076 raise util.Abort(_('remote-changegroup: invalid value for param "%s"')
1077 raise util.Abort(_('remote-changegroup: invalid value for param "%s"')
1077 % 'size')
1078 % 'size')
1078 except KeyError:
1079 except KeyError:
1079 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1080 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1080
1081
1081 digests = {}
1082 digests = {}
1082 for typ in inpart.params.get('digests', '').split():
1083 for typ in inpart.params.get('digests', '').split():
1083 param = 'digest:%s' % typ
1084 param = 'digest:%s' % typ
1084 try:
1085 try:
1085 value = inpart.params[param]
1086 value = inpart.params[param]
1086 except KeyError:
1087 except KeyError:
1087 raise util.Abort(_('remote-changegroup: missing "%s" param') %
1088 raise util.Abort(_('remote-changegroup: missing "%s" param') %
1088 param)
1089 param)
1089 digests[typ] = value
1090 digests[typ] = value
1090
1091
1091 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1092 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1092
1093
1093 # Make sure we trigger a transaction creation
1094 # Make sure we trigger a transaction creation
1094 #
1095 #
1095 # The addchangegroup function will get a transaction object by itself, but
1096 # The addchangegroup function will get a transaction object by itself, but
1096 # we need to make sure we trigger the creation of a transaction object used
1097 # we need to make sure we trigger the creation of a transaction object used
1097 # for the whole processing scope.
1098 # for the whole processing scope.
1098 op.gettransaction()
1099 op.gettransaction()
1099 import exchange
1100 import exchange
1100 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1101 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1101 if not isinstance(cg, changegroup.cg1unpacker):
1102 if not isinstance(cg, changegroup.cg1unpacker):
1102 raise util.Abort(_('%s: not a bundle version 1.0') %
1103 raise util.Abort(_('%s: not a bundle version 1.0') %
1103 util.hidepassword(raw_url))
1104 util.hidepassword(raw_url))
1104 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
1105 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
1105 op.records.add('changegroup', {'return': ret})
1106 op.records.add('changegroup', {'return': ret})
1106 if op.reply is not None:
1107 if op.reply is not None:
1107 # This is definitely not the final form of this
1108 # This is definitely not the final form of this
1108 # return. But one need to start somewhere.
1109 # return. But one need to start somewhere.
1109 part = op.reply.newpart('b2x:reply:changegroup')
1110 part = op.reply.newpart('b2x:reply:changegroup')
1110 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1111 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1111 part.addparam('return', '%i' % ret, mandatory=False)
1112 part.addparam('return', '%i' % ret, mandatory=False)
1112 try:
1113 try:
1113 real_part.validate()
1114 real_part.validate()
1114 except util.Abort, e:
1115 except util.Abort, e:
1115 raise util.Abort(_('bundle at %s is corrupted:\n%s') %
1116 raise util.Abort(_('bundle at %s is corrupted:\n%s') %
1116 (util.hidepassword(raw_url), str(e)))
1117 (util.hidepassword(raw_url), str(e)))
1117 assert not inpart.read()
1118 assert not inpart.read()
1118
1119
1119 @parthandler('b2x:reply:changegroup', ('return', 'in-reply-to'))
1120 @parthandler('b2x:reply:changegroup', ('return', 'in-reply-to'))
1120 def handlereplychangegroup(op, inpart):
1121 def handlereplychangegroup(op, inpart):
1121 ret = int(inpart.params['return'])
1122 ret = int(inpart.params['return'])
1122 replyto = int(inpart.params['in-reply-to'])
1123 replyto = int(inpart.params['in-reply-to'])
1123 op.records.add('changegroup', {'return': ret}, replyto)
1124 op.records.add('changegroup', {'return': ret}, replyto)
1124
1125
1125 @parthandler('b2x:check:heads')
1126 @parthandler('b2x:check:heads')
1126 def handlecheckheads(op, inpart):
1127 def handlecheckheads(op, inpart):
1127 """check that head of the repo did not change
1128 """check that head of the repo did not change
1128
1129
1129 This is used to detect a push race when using unbundle.
1130 This is used to detect a push race when using unbundle.
1130 This replaces the "heads" argument of unbundle."""
1131 This replaces the "heads" argument of unbundle."""
1131 h = inpart.read(20)
1132 h = inpart.read(20)
1132 heads = []
1133 heads = []
1133 while len(h) == 20:
1134 while len(h) == 20:
1134 heads.append(h)
1135 heads.append(h)
1135 h = inpart.read(20)
1136 h = inpart.read(20)
1136 assert not h
1137 assert not h
1137 if heads != op.repo.heads():
1138 if heads != op.repo.heads():
1138 raise error.PushRaced('repository changed while pushing - '
1139 raise error.PushRaced('repository changed while pushing - '
1139 'please try again')
1140 'please try again')
1140
1141
1141 @parthandler('b2x:output')
1142 @parthandler('b2x:output')
1142 def handleoutput(op, inpart):
1143 def handleoutput(op, inpart):
1143 """forward output captured on the server to the client"""
1144 """forward output captured on the server to the client"""
1144 for line in inpart.read().splitlines():
1145 for line in inpart.read().splitlines():
1145 op.ui.write(('remote: %s\n' % line))
1146 op.ui.write(('remote: %s\n' % line))
1146
1147
1147 @parthandler('b2x:replycaps')
1148 @parthandler('b2x:replycaps')
1148 def handlereplycaps(op, inpart):
1149 def handlereplycaps(op, inpart):
1149 """Notify that a reply bundle should be created
1150 """Notify that a reply bundle should be created
1150
1151
1151 The payload contains the capabilities information for the reply"""
1152 The payload contains the capabilities information for the reply"""
1152 caps = decodecaps(inpart.read())
1153 caps = decodecaps(inpart.read())
1153 if op.reply is None:
1154 if op.reply is None:
1154 op.reply = bundle20(op.ui, caps)
1155 op.reply = bundle20(op.ui, caps)
1155
1156
1156 @parthandler('b2x:error:abort', ('message', 'hint'))
1157 @parthandler('b2x:error:abort', ('message', 'hint'))
1157 def handlereplycaps(op, inpart):
1158 def handlereplycaps(op, inpart):
1158 """Used to transmit abort error over the wire"""
1159 """Used to transmit abort error over the wire"""
1159 raise util.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
1160 raise util.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
1160
1161
1161 @parthandler('b2x:error:unsupportedcontent', ('parttype', 'params'))
1162 @parthandler('b2x:error:unsupportedcontent', ('parttype', 'params'))
1162 def handlereplycaps(op, inpart):
1163 def handlereplycaps(op, inpart):
1163 """Used to transmit unknown content error over the wire"""
1164 """Used to transmit unknown content error over the wire"""
1164 kwargs = {}
1165 kwargs = {}
1165 parttype = inpart.params.get('parttype')
1166 parttype = inpart.params.get('parttype')
1166 if parttype is not None:
1167 if parttype is not None:
1167 kwargs['parttype'] = parttype
1168 kwargs['parttype'] = parttype
1168 params = inpart.params.get('params')
1169 params = inpart.params.get('params')
1169 if params is not None:
1170 if params is not None:
1170 kwargs['params'] = params.split('\0')
1171 kwargs['params'] = params.split('\0')
1171
1172
1172 raise error.UnsupportedPartError(**kwargs)
1173 raise error.UnsupportedPartError(**kwargs)
1173
1174
1174 @parthandler('b2x:error:pushraced', ('message',))
1175 @parthandler('b2x:error:pushraced', ('message',))
1175 def handlereplycaps(op, inpart):
1176 def handlereplycaps(op, inpart):
1176 """Used to transmit push race error over the wire"""
1177 """Used to transmit push race error over the wire"""
1177 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1178 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1178
1179
1179 @parthandler('b2x:listkeys', ('namespace',))
1180 @parthandler('b2x:listkeys', ('namespace',))
1180 def handlelistkeys(op, inpart):
1181 def handlelistkeys(op, inpart):
1181 """retrieve pushkey namespace content stored in a bundle2"""
1182 """retrieve pushkey namespace content stored in a bundle2"""
1182 namespace = inpart.params['namespace']
1183 namespace = inpart.params['namespace']
1183 r = pushkey.decodekeys(inpart.read())
1184 r = pushkey.decodekeys(inpart.read())
1184 op.records.add('listkeys', (namespace, r))
1185 op.records.add('listkeys', (namespace, r))
1185
1186
1186 @parthandler('b2x:pushkey', ('namespace', 'key', 'old', 'new'))
1187 @parthandler('b2x:pushkey', ('namespace', 'key', 'old', 'new'))
1187 def handlepushkey(op, inpart):
1188 def handlepushkey(op, inpart):
1188 """process a pushkey request"""
1189 """process a pushkey request"""
1189 dec = pushkey.decode
1190 dec = pushkey.decode
1190 namespace = dec(inpart.params['namespace'])
1191 namespace = dec(inpart.params['namespace'])
1191 key = dec(inpart.params['key'])
1192 key = dec(inpart.params['key'])
1192 old = dec(inpart.params['old'])
1193 old = dec(inpart.params['old'])
1193 new = dec(inpart.params['new'])
1194 new = dec(inpart.params['new'])
1194 ret = op.repo.pushkey(namespace, key, old, new)
1195 ret = op.repo.pushkey(namespace, key, old, new)
1195 record = {'namespace': namespace,
1196 record = {'namespace': namespace,
1196 'key': key,
1197 'key': key,
1197 'old': old,
1198 'old': old,
1198 'new': new}
1199 'new': new}
1199 op.records.add('pushkey', record)
1200 op.records.add('pushkey', record)
1200 if op.reply is not None:
1201 if op.reply is not None:
1201 rpart = op.reply.newpart('b2x:reply:pushkey')
1202 rpart = op.reply.newpart('b2x:reply:pushkey')
1202 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1203 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1203 rpart.addparam('return', '%i' % ret, mandatory=False)
1204 rpart.addparam('return', '%i' % ret, mandatory=False)
1204
1205
1205 @parthandler('b2x:reply:pushkey', ('return', 'in-reply-to'))
1206 @parthandler('b2x:reply:pushkey', ('return', 'in-reply-to'))
1206 def handlepushkeyreply(op, inpart):
1207 def handlepushkeyreply(op, inpart):
1207 """retrieve the result of a pushkey request"""
1208 """retrieve the result of a pushkey request"""
1208 ret = int(inpart.params['return'])
1209 ret = int(inpart.params['return'])
1209 partid = int(inpart.params['in-reply-to'])
1210 partid = int(inpart.params['in-reply-to'])
1210 op.records.add('pushkey', {'return': ret}, partid)
1211 op.records.add('pushkey', {'return': ret}, partid)
1211
1212
1212 @parthandler('b2x:obsmarkers')
1213 @parthandler('b2x:obsmarkers')
1213 def handleobsmarker(op, inpart):
1214 def handleobsmarker(op, inpart):
1214 """add a stream of obsmarkers to the repo"""
1215 """add a stream of obsmarkers to the repo"""
1215 tr = op.gettransaction()
1216 tr = op.gettransaction()
1216 new = op.repo.obsstore.mergemarkers(tr, inpart.read())
1217 new = op.repo.obsstore.mergemarkers(tr, inpart.read())
1217 if new:
1218 if new:
1218 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1219 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1219 op.records.add('obsmarkers', {'new': new})
1220 op.records.add('obsmarkers', {'new': new})
1220 if op.reply is not None:
1221 if op.reply is not None:
1221 rpart = op.reply.newpart('b2x:reply:obsmarkers')
1222 rpart = op.reply.newpart('b2x:reply:obsmarkers')
1222 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1223 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1223 rpart.addparam('new', '%i' % new, mandatory=False)
1224 rpart.addparam('new', '%i' % new, mandatory=False)
1224
1225
1225
1226
1226 @parthandler('b2x:reply:obsmarkers', ('new', 'in-reply-to'))
1227 @parthandler('b2x:reply:obsmarkers', ('new', 'in-reply-to'))
1227 def handlepushkeyreply(op, inpart):
1228 def handlepushkeyreply(op, inpart):
1228 """retrieve the result of a pushkey request"""
1229 """retrieve the result of a pushkey request"""
1229 ret = int(inpart.params['new'])
1230 ret = int(inpart.params['new'])
1230 partid = int(inpart.params['in-reply-to'])
1231 partid = int(inpart.params['in-reply-to'])
1231 op.records.add('obsmarkers', {'new': ret}, partid)
1232 op.records.add('obsmarkers', {'new': ret}, partid)
General Comments 0
You need to be logged in to leave comments. Login now