##// END OF EJS Templates
bundle2: fix parttype enforcement...
Matt Mackall -
r23916:a3f7c781 default
parent child Browse files
Show More
@@ -1,1141 +1,1141 b''
1 # bundle2.py - generic container format to transmit arbitrary data.
1 # bundle2.py - generic container format to transmit arbitrary data.
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7 """Handling of the new bundle2 format
7 """Handling of the new bundle2 format
8
8
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 payloads in an application agnostic way. It consist in a sequence of "parts"
10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 that will be handed to and processed by the application layer.
11 that will be handed to and processed by the application layer.
12
12
13
13
14 General format architecture
14 General format architecture
15 ===========================
15 ===========================
16
16
17 The format is architectured as follow
17 The format is architectured as follow
18
18
19 - magic string
19 - magic string
20 - stream level parameters
20 - stream level parameters
21 - payload parts (any number)
21 - payload parts (any number)
22 - end of stream marker.
22 - end of stream marker.
23
23
24 the Binary format
24 the Binary format
25 ============================
25 ============================
26
26
27 All numbers are unsigned and big-endian.
27 All numbers are unsigned and big-endian.
28
28
29 stream level parameters
29 stream level parameters
30 ------------------------
30 ------------------------
31
31
32 Binary format is as follow
32 Binary format is as follow
33
33
34 :params size: int32
34 :params size: int32
35
35
36 The total number of Bytes used by the parameters
36 The total number of Bytes used by the parameters
37
37
38 :params value: arbitrary number of Bytes
38 :params value: arbitrary number of Bytes
39
39
40 A blob of `params size` containing the serialized version of all stream level
40 A blob of `params size` containing the serialized version of all stream level
41 parameters.
41 parameters.
42
42
43 The blob contains a space separated list of parameters. Parameters with value
43 The blob contains a space separated list of parameters. Parameters with value
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45
45
46 Empty name are obviously forbidden.
46 Empty name are obviously forbidden.
47
47
48 Name MUST start with a letter. If this first letter is lower case, the
48 Name MUST start with a letter. If this first letter is lower case, the
49 parameter is advisory and can be safely ignored. However when the first
49 parameter is advisory and can be safely ignored. However when the first
50 letter is capital, the parameter is mandatory and the bundling process MUST
50 letter is capital, the parameter is mandatory and the bundling process MUST
51 stop if he is not able to proceed it.
51 stop if he is not able to proceed it.
52
52
53 Stream parameters use a simple textual format for two main reasons:
53 Stream parameters use a simple textual format for two main reasons:
54
54
55 - Stream level parameters should remain simple and we want to discourage any
55 - Stream level parameters should remain simple and we want to discourage any
56 crazy usage.
56 crazy usage.
57 - Textual data allow easy human inspection of a bundle2 header in case of
57 - Textual data allow easy human inspection of a bundle2 header in case of
58 troubles.
58 troubles.
59
59
60 Any Applicative level options MUST go into a bundle2 part instead.
60 Any Applicative level options MUST go into a bundle2 part instead.
61
61
62 Payload part
62 Payload part
63 ------------------------
63 ------------------------
64
64
65 Binary format is as follow
65 Binary format is as follow
66
66
67 :header size: int32
67 :header size: int32
68
68
69 The total number of Bytes used by the part headers. When the header is empty
69 The total number of Bytes used by the part headers. When the header is empty
70 (size = 0) this is interpreted as the end of stream marker.
70 (size = 0) this is interpreted as the end of stream marker.
71
71
72 :header:
72 :header:
73
73
74 The header defines how to interpret the part. It contains two piece of
74 The header defines how to interpret the part. It contains two piece of
75 data: the part type, and the part parameters.
75 data: the part type, and the part parameters.
76
76
77 The part type is used to route an application level handler, that can
77 The part type is used to route an application level handler, that can
78 interpret payload.
78 interpret payload.
79
79
80 Part parameters are passed to the application level handler. They are
80 Part parameters are passed to the application level handler. They are
81 meant to convey information that will help the application level object to
81 meant to convey information that will help the application level object to
82 interpret the part payload.
82 interpret the part payload.
83
83
84 The binary format of the header is has follow
84 The binary format of the header is has follow
85
85
86 :typesize: (one byte)
86 :typesize: (one byte)
87
87
88 :parttype: alphanumerical part name (restricted to ^a-zA-Z0-9_:-])
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89
89
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 to this part.
91 to this part.
92
92
93 :parameters:
93 :parameters:
94
94
95 Part's parameter may have arbitrary content, the binary structure is::
95 Part's parameter may have arbitrary content, the binary structure is::
96
96
97 <mandatory-count><advisory-count><param-sizes><param-data>
97 <mandatory-count><advisory-count><param-sizes><param-data>
98
98
99 :mandatory-count: 1 byte, number of mandatory parameters
99 :mandatory-count: 1 byte, number of mandatory parameters
100
100
101 :advisory-count: 1 byte, number of advisory parameters
101 :advisory-count: 1 byte, number of advisory parameters
102
102
103 :param-sizes:
103 :param-sizes:
104
104
105 N couple of bytes, where N is the total number of parameters. Each
105 N couple of bytes, where N is the total number of parameters. Each
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107
107
108 :param-data:
108 :param-data:
109
109
110 A blob of bytes from which each parameter key and value can be
110 A blob of bytes from which each parameter key and value can be
111 retrieved using the list of size couples stored in the previous
111 retrieved using the list of size couples stored in the previous
112 field.
112 field.
113
113
114 Mandatory parameters comes first, then the advisory ones.
114 Mandatory parameters comes first, then the advisory ones.
115
115
116 Each parameter's key MUST be unique within the part.
116 Each parameter's key MUST be unique within the part.
117
117
118 :payload:
118 :payload:
119
119
120 payload is a series of `<chunksize><chunkdata>`.
120 payload is a series of `<chunksize><chunkdata>`.
121
121
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124
124
125 The current implementation always produces either zero or one chunk.
125 The current implementation always produces either zero or one chunk.
126 This is an implementation limitation that will ultimately be lifted.
126 This is an implementation limitation that will ultimately be lifted.
127
127
128 `chunksize` can be negative to trigger special case processing. No such
128 `chunksize` can be negative to trigger special case processing. No such
129 processing is in place yet.
129 processing is in place yet.
130
130
131 Bundle processing
131 Bundle processing
132 ============================
132 ============================
133
133
134 Each part is processed in order using a "part handler". Handler are registered
134 Each part is processed in order using a "part handler". Handler are registered
135 for a certain part type.
135 for a certain part type.
136
136
137 The matching of a part to its handler is case insensitive. The case of the
137 The matching of a part to its handler is case insensitive. The case of the
138 part type is used to know if a part is mandatory or advisory. If the Part type
138 part type is used to know if a part is mandatory or advisory. If the Part type
139 contains any uppercase char it is considered mandatory. When no handler is
139 contains any uppercase char it is considered mandatory. When no handler is
140 known for a Mandatory part, the process is aborted and an exception is raised.
140 known for a Mandatory part, the process is aborted and an exception is raised.
141 If the part is advisory and no handler is known, the part is ignored. When the
141 If the part is advisory and no handler is known, the part is ignored. When the
142 process is aborted, the full bundle is still read from the stream to keep the
142 process is aborted, the full bundle is still read from the stream to keep the
143 channel usable. But none of the part read from an abort are processed. In the
143 channel usable. But none of the part read from an abort are processed. In the
144 future, dropping the stream may become an option for channel we do not care to
144 future, dropping the stream may become an option for channel we do not care to
145 preserve.
145 preserve.
146 """
146 """
147
147
148 import sys
148 import sys
149 import util
149 import util
150 import struct
150 import struct
151 import urllib
151 import urllib
152 import string
152 import string
153 import obsolete
153 import obsolete
154 import pushkey
154 import pushkey
155 import url
155 import url
156 import re
156 import re
157
157
158 import changegroup, error
158 import changegroup, error
159 from i18n import _
159 from i18n import _
160
160
161 _pack = struct.pack
161 _pack = struct.pack
162 _unpack = struct.unpack
162 _unpack = struct.unpack
163
163
164 _magicstring = 'HG2Y'
164 _magicstring = 'HG2Y'
165
165
166 _fstreamparamsize = '>i'
166 _fstreamparamsize = '>i'
167 _fpartheadersize = '>i'
167 _fpartheadersize = '>i'
168 _fparttypesize = '>B'
168 _fparttypesize = '>B'
169 _fpartid = '>I'
169 _fpartid = '>I'
170 _fpayloadsize = '>i'
170 _fpayloadsize = '>i'
171 _fpartparamcount = '>BB'
171 _fpartparamcount = '>BB'
172
172
173 preferedchunksize = 4096
173 preferedchunksize = 4096
174
174
175 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
175 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
176
176
177 def validateparttype(parttype):
177 def validateparttype(parttype):
178 """raise ValueError if a parttype contains invalid character"""
178 """raise ValueError if a parttype contains invalid character"""
179 if _parttypeforbidden.match(parttype):
179 if _parttypeforbidden.search(parttype):
180 raise ValueError(parttype)
180 raise ValueError(parttype)
181
181
182 def _makefpartparamsizes(nbparams):
182 def _makefpartparamsizes(nbparams):
183 """return a struct format to read part parameter sizes
183 """return a struct format to read part parameter sizes
184
184
185 The number parameters is variable so we need to build that format
185 The number parameters is variable so we need to build that format
186 dynamically.
186 dynamically.
187 """
187 """
188 return '>'+('BB'*nbparams)
188 return '>'+('BB'*nbparams)
189
189
190 parthandlermapping = {}
190 parthandlermapping = {}
191
191
192 def parthandler(parttype, params=()):
192 def parthandler(parttype, params=()):
193 """decorator that register a function as a bundle2 part handler
193 """decorator that register a function as a bundle2 part handler
194
194
195 eg::
195 eg::
196
196
197 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
197 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
198 def myparttypehandler(...):
198 def myparttypehandler(...):
199 '''process a part of type "my part".'''
199 '''process a part of type "my part".'''
200 ...
200 ...
201 """
201 """
202 validateparttype(parttype)
202 validateparttype(parttype)
203 def _decorator(func):
203 def _decorator(func):
204 lparttype = parttype.lower() # enforce lower case matching.
204 lparttype = parttype.lower() # enforce lower case matching.
205 assert lparttype not in parthandlermapping
205 assert lparttype not in parthandlermapping
206 parthandlermapping[lparttype] = func
206 parthandlermapping[lparttype] = func
207 func.params = frozenset(params)
207 func.params = frozenset(params)
208 return func
208 return func
209 return _decorator
209 return _decorator
210
210
211 class unbundlerecords(object):
211 class unbundlerecords(object):
212 """keep record of what happens during and unbundle
212 """keep record of what happens during and unbundle
213
213
214 New records are added using `records.add('cat', obj)`. Where 'cat' is a
214 New records are added using `records.add('cat', obj)`. Where 'cat' is a
215 category of record and obj is an arbitrary object.
215 category of record and obj is an arbitrary object.
216
216
217 `records['cat']` will return all entries of this category 'cat'.
217 `records['cat']` will return all entries of this category 'cat'.
218
218
219 Iterating on the object itself will yield `('category', obj)` tuples
219 Iterating on the object itself will yield `('category', obj)` tuples
220 for all entries.
220 for all entries.
221
221
222 All iterations happens in chronological order.
222 All iterations happens in chronological order.
223 """
223 """
224
224
225 def __init__(self):
225 def __init__(self):
226 self._categories = {}
226 self._categories = {}
227 self._sequences = []
227 self._sequences = []
228 self._replies = {}
228 self._replies = {}
229
229
230 def add(self, category, entry, inreplyto=None):
230 def add(self, category, entry, inreplyto=None):
231 """add a new record of a given category.
231 """add a new record of a given category.
232
232
233 The entry can then be retrieved in the list returned by
233 The entry can then be retrieved in the list returned by
234 self['category']."""
234 self['category']."""
235 self._categories.setdefault(category, []).append(entry)
235 self._categories.setdefault(category, []).append(entry)
236 self._sequences.append((category, entry))
236 self._sequences.append((category, entry))
237 if inreplyto is not None:
237 if inreplyto is not None:
238 self.getreplies(inreplyto).add(category, entry)
238 self.getreplies(inreplyto).add(category, entry)
239
239
240 def getreplies(self, partid):
240 def getreplies(self, partid):
241 """get the records that are replies to a specific part"""
241 """get the records that are replies to a specific part"""
242 return self._replies.setdefault(partid, unbundlerecords())
242 return self._replies.setdefault(partid, unbundlerecords())
243
243
244 def __getitem__(self, cat):
244 def __getitem__(self, cat):
245 return tuple(self._categories.get(cat, ()))
245 return tuple(self._categories.get(cat, ()))
246
246
247 def __iter__(self):
247 def __iter__(self):
248 return iter(self._sequences)
248 return iter(self._sequences)
249
249
250 def __len__(self):
250 def __len__(self):
251 return len(self._sequences)
251 return len(self._sequences)
252
252
253 def __nonzero__(self):
253 def __nonzero__(self):
254 return bool(self._sequences)
254 return bool(self._sequences)
255
255
256 class bundleoperation(object):
256 class bundleoperation(object):
257 """an object that represents a single bundling process
257 """an object that represents a single bundling process
258
258
259 Its purpose is to carry unbundle-related objects and states.
259 Its purpose is to carry unbundle-related objects and states.
260
260
261 A new object should be created at the beginning of each bundle processing.
261 A new object should be created at the beginning of each bundle processing.
262 The object is to be returned by the processing function.
262 The object is to be returned by the processing function.
263
263
264 The object has very little content now it will ultimately contain:
264 The object has very little content now it will ultimately contain:
265 * an access to the repo the bundle is applied to,
265 * an access to the repo the bundle is applied to,
266 * a ui object,
266 * a ui object,
267 * a way to retrieve a transaction to add changes to the repo,
267 * a way to retrieve a transaction to add changes to the repo,
268 * a way to record the result of processing each part,
268 * a way to record the result of processing each part,
269 * a way to construct a bundle response when applicable.
269 * a way to construct a bundle response when applicable.
270 """
270 """
271
271
272 def __init__(self, repo, transactiongetter):
272 def __init__(self, repo, transactiongetter):
273 self.repo = repo
273 self.repo = repo
274 self.ui = repo.ui
274 self.ui = repo.ui
275 self.records = unbundlerecords()
275 self.records = unbundlerecords()
276 self.gettransaction = transactiongetter
276 self.gettransaction = transactiongetter
277 self.reply = None
277 self.reply = None
278
278
279 class TransactionUnavailable(RuntimeError):
279 class TransactionUnavailable(RuntimeError):
280 pass
280 pass
281
281
282 def _notransaction():
282 def _notransaction():
283 """default method to get a transaction while processing a bundle
283 """default method to get a transaction while processing a bundle
284
284
285 Raise an exception to highlight the fact that no transaction was expected
285 Raise an exception to highlight the fact that no transaction was expected
286 to be created"""
286 to be created"""
287 raise TransactionUnavailable()
287 raise TransactionUnavailable()
288
288
289 def processbundle(repo, unbundler, transactiongetter=None):
289 def processbundle(repo, unbundler, transactiongetter=None):
290 """This function process a bundle, apply effect to/from a repo
290 """This function process a bundle, apply effect to/from a repo
291
291
292 It iterates over each part then searches for and uses the proper handling
292 It iterates over each part then searches for and uses the proper handling
293 code to process the part. Parts are processed in order.
293 code to process the part. Parts are processed in order.
294
294
295 This is very early version of this function that will be strongly reworked
295 This is very early version of this function that will be strongly reworked
296 before final usage.
296 before final usage.
297
297
298 Unknown Mandatory part will abort the process.
298 Unknown Mandatory part will abort the process.
299 """
299 """
300 if transactiongetter is None:
300 if transactiongetter is None:
301 transactiongetter = _notransaction
301 transactiongetter = _notransaction
302 op = bundleoperation(repo, transactiongetter)
302 op = bundleoperation(repo, transactiongetter)
303 # todo:
303 # todo:
304 # - replace this is a init function soon.
304 # - replace this is a init function soon.
305 # - exception catching
305 # - exception catching
306 unbundler.params
306 unbundler.params
307 iterparts = unbundler.iterparts()
307 iterparts = unbundler.iterparts()
308 part = None
308 part = None
309 try:
309 try:
310 for part in iterparts:
310 for part in iterparts:
311 _processpart(op, part)
311 _processpart(op, part)
312 except Exception, exc:
312 except Exception, exc:
313 for part in iterparts:
313 for part in iterparts:
314 # consume the bundle content
314 # consume the bundle content
315 part.read()
315 part.read()
316 # Small hack to let caller code distinguish exceptions from bundle2
316 # Small hack to let caller code distinguish exceptions from bundle2
317 # processing from processing the old format. This is mostly
317 # processing from processing the old format. This is mostly
318 # needed to handle different return codes to unbundle according to the
318 # needed to handle different return codes to unbundle according to the
319 # type of bundle. We should probably clean up or drop this return code
319 # type of bundle. We should probably clean up or drop this return code
320 # craziness in a future version.
320 # craziness in a future version.
321 exc.duringunbundle2 = True
321 exc.duringunbundle2 = True
322 raise
322 raise
323 return op
323 return op
324
324
325 def _processpart(op, part):
325 def _processpart(op, part):
326 """process a single part from a bundle
326 """process a single part from a bundle
327
327
328 The part is guaranteed to have been fully consumed when the function exits
328 The part is guaranteed to have been fully consumed when the function exits
329 (even if an exception is raised)."""
329 (even if an exception is raised)."""
330 try:
330 try:
331 try:
331 try:
332 handler = parthandlermapping.get(part.type)
332 handler = parthandlermapping.get(part.type)
333 if handler is None:
333 if handler is None:
334 raise error.UnsupportedPartError(parttype=part.type)
334 raise error.UnsupportedPartError(parttype=part.type)
335 op.ui.debug('found a handler for part %r\n' % part.type)
335 op.ui.debug('found a handler for part %r\n' % part.type)
336 unknownparams = part.mandatorykeys - handler.params
336 unknownparams = part.mandatorykeys - handler.params
337 if unknownparams:
337 if unknownparams:
338 unknownparams = list(unknownparams)
338 unknownparams = list(unknownparams)
339 unknownparams.sort()
339 unknownparams.sort()
340 raise error.UnsupportedPartError(parttype=part.type,
340 raise error.UnsupportedPartError(parttype=part.type,
341 params=unknownparams)
341 params=unknownparams)
342 except error.UnsupportedPartError, exc:
342 except error.UnsupportedPartError, exc:
343 if part.mandatory: # mandatory parts
343 if part.mandatory: # mandatory parts
344 raise
344 raise
345 op.ui.debug('ignoring unsupported advisory part %s\n' % exc)
345 op.ui.debug('ignoring unsupported advisory part %s\n' % exc)
346 return # skip to part processing
346 return # skip to part processing
347
347
348 # handler is called outside the above try block so that we don't
348 # handler is called outside the above try block so that we don't
349 # risk catching KeyErrors from anything other than the
349 # risk catching KeyErrors from anything other than the
350 # parthandlermapping lookup (any KeyError raised by handler()
350 # parthandlermapping lookup (any KeyError raised by handler()
351 # itself represents a defect of a different variety).
351 # itself represents a defect of a different variety).
352 output = None
352 output = None
353 if op.reply is not None:
353 if op.reply is not None:
354 op.ui.pushbuffer(error=True)
354 op.ui.pushbuffer(error=True)
355 output = ''
355 output = ''
356 try:
356 try:
357 handler(op, part)
357 handler(op, part)
358 finally:
358 finally:
359 if output is not None:
359 if output is not None:
360 output = op.ui.popbuffer()
360 output = op.ui.popbuffer()
361 if output:
361 if output:
362 outpart = op.reply.newpart('b2x:output', data=output,
362 outpart = op.reply.newpart('b2x:output', data=output,
363 mandatory=False)
363 mandatory=False)
364 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
364 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
365 finally:
365 finally:
366 # consume the part content to not corrupt the stream.
366 # consume the part content to not corrupt the stream.
367 part.read()
367 part.read()
368
368
369
369
370 def decodecaps(blob):
370 def decodecaps(blob):
371 """decode a bundle2 caps bytes blob into a dictionary
371 """decode a bundle2 caps bytes blob into a dictionary
372
372
373 The blob is a list of capabilities (one per line)
373 The blob is a list of capabilities (one per line)
374 Capabilities may have values using a line of the form::
374 Capabilities may have values using a line of the form::
375
375
376 capability=value1,value2,value3
376 capability=value1,value2,value3
377
377
378 The values are always a list."""
378 The values are always a list."""
379 caps = {}
379 caps = {}
380 for line in blob.splitlines():
380 for line in blob.splitlines():
381 if not line:
381 if not line:
382 continue
382 continue
383 if '=' not in line:
383 if '=' not in line:
384 key, vals = line, ()
384 key, vals = line, ()
385 else:
385 else:
386 key, vals = line.split('=', 1)
386 key, vals = line.split('=', 1)
387 vals = vals.split(',')
387 vals = vals.split(',')
388 key = urllib.unquote(key)
388 key = urllib.unquote(key)
389 vals = [urllib.unquote(v) for v in vals]
389 vals = [urllib.unquote(v) for v in vals]
390 caps[key] = vals
390 caps[key] = vals
391 return caps
391 return caps
392
392
393 def encodecaps(caps):
393 def encodecaps(caps):
394 """encode a bundle2 caps dictionary into a bytes blob"""
394 """encode a bundle2 caps dictionary into a bytes blob"""
395 chunks = []
395 chunks = []
396 for ca in sorted(caps):
396 for ca in sorted(caps):
397 vals = caps[ca]
397 vals = caps[ca]
398 ca = urllib.quote(ca)
398 ca = urllib.quote(ca)
399 vals = [urllib.quote(v) for v in vals]
399 vals = [urllib.quote(v) for v in vals]
400 if vals:
400 if vals:
401 ca = "%s=%s" % (ca, ','.join(vals))
401 ca = "%s=%s" % (ca, ','.join(vals))
402 chunks.append(ca)
402 chunks.append(ca)
403 return '\n'.join(chunks)
403 return '\n'.join(chunks)
404
404
405 class bundle20(object):
405 class bundle20(object):
406 """represent an outgoing bundle2 container
406 """represent an outgoing bundle2 container
407
407
408 Use the `addparam` method to add stream level parameter. and `newpart` to
408 Use the `addparam` method to add stream level parameter. and `newpart` to
409 populate it. Then call `getchunks` to retrieve all the binary chunks of
409 populate it. Then call `getchunks` to retrieve all the binary chunks of
410 data that compose the bundle2 container."""
410 data that compose the bundle2 container."""
411
411
412 def __init__(self, ui, capabilities=()):
412 def __init__(self, ui, capabilities=()):
413 self.ui = ui
413 self.ui = ui
414 self._params = []
414 self._params = []
415 self._parts = []
415 self._parts = []
416 self.capabilities = dict(capabilities)
416 self.capabilities = dict(capabilities)
417
417
418 @property
418 @property
419 def nbparts(self):
419 def nbparts(self):
420 """total number of parts added to the bundler"""
420 """total number of parts added to the bundler"""
421 return len(self._parts)
421 return len(self._parts)
422
422
423 # methods used to defines the bundle2 content
423 # methods used to defines the bundle2 content
424 def addparam(self, name, value=None):
424 def addparam(self, name, value=None):
425 """add a stream level parameter"""
425 """add a stream level parameter"""
426 if not name:
426 if not name:
427 raise ValueError('empty parameter name')
427 raise ValueError('empty parameter name')
428 if name[0] not in string.letters:
428 if name[0] not in string.letters:
429 raise ValueError('non letter first character: %r' % name)
429 raise ValueError('non letter first character: %r' % name)
430 self._params.append((name, value))
430 self._params.append((name, value))
431
431
432 def addpart(self, part):
432 def addpart(self, part):
433 """add a new part to the bundle2 container
433 """add a new part to the bundle2 container
434
434
435 Parts contains the actual applicative payload."""
435 Parts contains the actual applicative payload."""
436 assert part.id is None
436 assert part.id is None
437 part.id = len(self._parts) # very cheap counter
437 part.id = len(self._parts) # very cheap counter
438 self._parts.append(part)
438 self._parts.append(part)
439
439
440 def newpart(self, typeid, *args, **kwargs):
440 def newpart(self, typeid, *args, **kwargs):
441 """create a new part and add it to the containers
441 """create a new part and add it to the containers
442
442
443 As the part is directly added to the containers. For now, this means
443 As the part is directly added to the containers. For now, this means
444 that any failure to properly initialize the part after calling
444 that any failure to properly initialize the part after calling
445 ``newpart`` should result in a failure of the whole bundling process.
445 ``newpart`` should result in a failure of the whole bundling process.
446
446
447 You can still fall back to manually create and add if you need better
447 You can still fall back to manually create and add if you need better
448 control."""
448 control."""
449 part = bundlepart(typeid, *args, **kwargs)
449 part = bundlepart(typeid, *args, **kwargs)
450 self.addpart(part)
450 self.addpart(part)
451 return part
451 return part
452
452
453 # methods used to generate the bundle2 stream
453 # methods used to generate the bundle2 stream
454 def getchunks(self):
454 def getchunks(self):
455 self.ui.debug('start emission of %s stream\n' % _magicstring)
455 self.ui.debug('start emission of %s stream\n' % _magicstring)
456 yield _magicstring
456 yield _magicstring
457 param = self._paramchunk()
457 param = self._paramchunk()
458 self.ui.debug('bundle parameter: %s\n' % param)
458 self.ui.debug('bundle parameter: %s\n' % param)
459 yield _pack(_fstreamparamsize, len(param))
459 yield _pack(_fstreamparamsize, len(param))
460 if param:
460 if param:
461 yield param
461 yield param
462
462
463 self.ui.debug('start of parts\n')
463 self.ui.debug('start of parts\n')
464 for part in self._parts:
464 for part in self._parts:
465 self.ui.debug('bundle part: "%s"\n' % part.type)
465 self.ui.debug('bundle part: "%s"\n' % part.type)
466 for chunk in part.getchunks():
466 for chunk in part.getchunks():
467 yield chunk
467 yield chunk
468 self.ui.debug('end of bundle\n')
468 self.ui.debug('end of bundle\n')
469 yield _pack(_fpartheadersize, 0)
469 yield _pack(_fpartheadersize, 0)
470
470
471 def _paramchunk(self):
471 def _paramchunk(self):
472 """return a encoded version of all stream parameters"""
472 """return a encoded version of all stream parameters"""
473 blocks = []
473 blocks = []
474 for par, value in self._params:
474 for par, value in self._params:
475 par = urllib.quote(par)
475 par = urllib.quote(par)
476 if value is not None:
476 if value is not None:
477 value = urllib.quote(value)
477 value = urllib.quote(value)
478 par = '%s=%s' % (par, value)
478 par = '%s=%s' % (par, value)
479 blocks.append(par)
479 blocks.append(par)
480 return ' '.join(blocks)
480 return ' '.join(blocks)
481
481
482 class unpackermixin(object):
482 class unpackermixin(object):
483 """A mixin to extract bytes and struct data from a stream"""
483 """A mixin to extract bytes and struct data from a stream"""
484
484
485 def __init__(self, fp):
485 def __init__(self, fp):
486 self._fp = fp
486 self._fp = fp
487
487
488 def _unpack(self, format):
488 def _unpack(self, format):
489 """unpack this struct format from the stream"""
489 """unpack this struct format from the stream"""
490 data = self._readexact(struct.calcsize(format))
490 data = self._readexact(struct.calcsize(format))
491 return _unpack(format, data)
491 return _unpack(format, data)
492
492
493 def _readexact(self, size):
493 def _readexact(self, size):
494 """read exactly <size> bytes from the stream"""
494 """read exactly <size> bytes from the stream"""
495 return changegroup.readexactly(self._fp, size)
495 return changegroup.readexactly(self._fp, size)
496
496
497
497
498 class unbundle20(unpackermixin):
498 class unbundle20(unpackermixin):
499 """interpret a bundle2 stream
499 """interpret a bundle2 stream
500
500
501 This class is fed with a binary stream and yields parts through its
501 This class is fed with a binary stream and yields parts through its
502 `iterparts` methods."""
502 `iterparts` methods."""
503
503
504 def __init__(self, ui, fp, header=None):
504 def __init__(self, ui, fp, header=None):
505 """If header is specified, we do not read it out of the stream."""
505 """If header is specified, we do not read it out of the stream."""
506 self.ui = ui
506 self.ui = ui
507 super(unbundle20, self).__init__(fp)
507 super(unbundle20, self).__init__(fp)
508 if header is None:
508 if header is None:
509 header = self._readexact(4)
509 header = self._readexact(4)
510 magic, version = header[0:2], header[2:4]
510 magic, version = header[0:2], header[2:4]
511 if magic != 'HG':
511 if magic != 'HG':
512 raise util.Abort(_('not a Mercurial bundle'))
512 raise util.Abort(_('not a Mercurial bundle'))
513 if version != '2Y':
513 if version != '2Y':
514 raise util.Abort(_('unknown bundle version %s') % version)
514 raise util.Abort(_('unknown bundle version %s') % version)
515 self.ui.debug('start processing of %s stream\n' % header)
515 self.ui.debug('start processing of %s stream\n' % header)
516
516
517 @util.propertycache
517 @util.propertycache
518 def params(self):
518 def params(self):
519 """dictionary of stream level parameters"""
519 """dictionary of stream level parameters"""
520 self.ui.debug('reading bundle2 stream parameters\n')
520 self.ui.debug('reading bundle2 stream parameters\n')
521 params = {}
521 params = {}
522 paramssize = self._unpack(_fstreamparamsize)[0]
522 paramssize = self._unpack(_fstreamparamsize)[0]
523 if paramssize < 0:
523 if paramssize < 0:
524 raise error.BundleValueError('negative bundle param size: %i'
524 raise error.BundleValueError('negative bundle param size: %i'
525 % paramssize)
525 % paramssize)
526 if paramssize:
526 if paramssize:
527 for p in self._readexact(paramssize).split(' '):
527 for p in self._readexact(paramssize).split(' '):
528 p = p.split('=', 1)
528 p = p.split('=', 1)
529 p = [urllib.unquote(i) for i in p]
529 p = [urllib.unquote(i) for i in p]
530 if len(p) < 2:
530 if len(p) < 2:
531 p.append(None)
531 p.append(None)
532 self._processparam(*p)
532 self._processparam(*p)
533 params[p[0]] = p[1]
533 params[p[0]] = p[1]
534 return params
534 return params
535
535
536 def _processparam(self, name, value):
536 def _processparam(self, name, value):
537 """process a parameter, applying its effect if needed
537 """process a parameter, applying its effect if needed
538
538
539 Parameter starting with a lower case letter are advisory and will be
539 Parameter starting with a lower case letter are advisory and will be
540 ignored when unknown. Those starting with an upper case letter are
540 ignored when unknown. Those starting with an upper case letter are
541 mandatory and will this function will raise a KeyError when unknown.
541 mandatory and will this function will raise a KeyError when unknown.
542
542
543 Note: no option are currently supported. Any input will be either
543 Note: no option are currently supported. Any input will be either
544 ignored or failing.
544 ignored or failing.
545 """
545 """
546 if not name:
546 if not name:
547 raise ValueError('empty parameter name')
547 raise ValueError('empty parameter name')
548 if name[0] not in string.letters:
548 if name[0] not in string.letters:
549 raise ValueError('non letter first character: %r' % name)
549 raise ValueError('non letter first character: %r' % name)
550 # Some logic will be later added here to try to process the option for
550 # Some logic will be later added here to try to process the option for
551 # a dict of known parameter.
551 # a dict of known parameter.
552 if name[0].islower():
552 if name[0].islower():
553 self.ui.debug("ignoring unknown parameter %r\n" % name)
553 self.ui.debug("ignoring unknown parameter %r\n" % name)
554 else:
554 else:
555 raise error.UnsupportedPartError(params=(name,))
555 raise error.UnsupportedPartError(params=(name,))
556
556
557
557
558 def iterparts(self):
558 def iterparts(self):
559 """yield all parts contained in the stream"""
559 """yield all parts contained in the stream"""
560 # make sure param have been loaded
560 # make sure param have been loaded
561 self.params
561 self.params
562 self.ui.debug('start extraction of bundle2 parts\n')
562 self.ui.debug('start extraction of bundle2 parts\n')
563 headerblock = self._readpartheader()
563 headerblock = self._readpartheader()
564 while headerblock is not None:
564 while headerblock is not None:
565 part = unbundlepart(self.ui, headerblock, self._fp)
565 part = unbundlepart(self.ui, headerblock, self._fp)
566 yield part
566 yield part
567 headerblock = self._readpartheader()
567 headerblock = self._readpartheader()
568 self.ui.debug('end of bundle2 stream\n')
568 self.ui.debug('end of bundle2 stream\n')
569
569
570 def _readpartheader(self):
570 def _readpartheader(self):
571 """reads a part header size and return the bytes blob
571 """reads a part header size and return the bytes blob
572
572
573 returns None if empty"""
573 returns None if empty"""
574 headersize = self._unpack(_fpartheadersize)[0]
574 headersize = self._unpack(_fpartheadersize)[0]
575 if headersize < 0:
575 if headersize < 0:
576 raise error.BundleValueError('negative part header size: %i'
576 raise error.BundleValueError('negative part header size: %i'
577 % headersize)
577 % headersize)
578 self.ui.debug('part header size: %i\n' % headersize)
578 self.ui.debug('part header size: %i\n' % headersize)
579 if headersize:
579 if headersize:
580 return self._readexact(headersize)
580 return self._readexact(headersize)
581 return None
581 return None
582
582
583
583
584 class bundlepart(object):
584 class bundlepart(object):
585 """A bundle2 part contains application level payload
585 """A bundle2 part contains application level payload
586
586
587 The part `type` is used to route the part to the application level
587 The part `type` is used to route the part to the application level
588 handler.
588 handler.
589
589
590 The part payload is contained in ``part.data``. It could be raw bytes or a
590 The part payload is contained in ``part.data``. It could be raw bytes or a
591 generator of byte chunks.
591 generator of byte chunks.
592
592
593 You can add parameters to the part using the ``addparam`` method.
593 You can add parameters to the part using the ``addparam`` method.
594 Parameters can be either mandatory (default) or advisory. Remote side
594 Parameters can be either mandatory (default) or advisory. Remote side
595 should be able to safely ignore the advisory ones.
595 should be able to safely ignore the advisory ones.
596
596
597 Both data and parameters cannot be modified after the generation has begun.
597 Both data and parameters cannot be modified after the generation has begun.
598 """
598 """
599
599
600 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
600 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
601 data='', mandatory=True):
601 data='', mandatory=True):
602 validateparttype(parttype)
602 validateparttype(parttype)
603 self.id = None
603 self.id = None
604 self.type = parttype
604 self.type = parttype
605 self._data = data
605 self._data = data
606 self._mandatoryparams = list(mandatoryparams)
606 self._mandatoryparams = list(mandatoryparams)
607 self._advisoryparams = list(advisoryparams)
607 self._advisoryparams = list(advisoryparams)
608 # checking for duplicated entries
608 # checking for duplicated entries
609 self._seenparams = set()
609 self._seenparams = set()
610 for pname, __ in self._mandatoryparams + self._advisoryparams:
610 for pname, __ in self._mandatoryparams + self._advisoryparams:
611 if pname in self._seenparams:
611 if pname in self._seenparams:
612 raise RuntimeError('duplicated params: %s' % pname)
612 raise RuntimeError('duplicated params: %s' % pname)
613 self._seenparams.add(pname)
613 self._seenparams.add(pname)
614 # status of the part's generation:
614 # status of the part's generation:
615 # - None: not started,
615 # - None: not started,
616 # - False: currently generated,
616 # - False: currently generated,
617 # - True: generation done.
617 # - True: generation done.
618 self._generated = None
618 self._generated = None
619 self.mandatory = mandatory
619 self.mandatory = mandatory
620
620
621 # methods used to defines the part content
621 # methods used to defines the part content
622 def __setdata(self, data):
622 def __setdata(self, data):
623 if self._generated is not None:
623 if self._generated is not None:
624 raise error.ReadOnlyPartError('part is being generated')
624 raise error.ReadOnlyPartError('part is being generated')
625 self._data = data
625 self._data = data
626 def __getdata(self):
626 def __getdata(self):
627 return self._data
627 return self._data
628 data = property(__getdata, __setdata)
628 data = property(__getdata, __setdata)
629
629
630 @property
630 @property
631 def mandatoryparams(self):
631 def mandatoryparams(self):
632 # make it an immutable tuple to force people through ``addparam``
632 # make it an immutable tuple to force people through ``addparam``
633 return tuple(self._mandatoryparams)
633 return tuple(self._mandatoryparams)
634
634
635 @property
635 @property
636 def advisoryparams(self):
636 def advisoryparams(self):
637 # make it an immutable tuple to force people through ``addparam``
637 # make it an immutable tuple to force people through ``addparam``
638 return tuple(self._advisoryparams)
638 return tuple(self._advisoryparams)
639
639
640 def addparam(self, name, value='', mandatory=True):
640 def addparam(self, name, value='', mandatory=True):
641 if self._generated is not None:
641 if self._generated is not None:
642 raise error.ReadOnlyPartError('part is being generated')
642 raise error.ReadOnlyPartError('part is being generated')
643 if name in self._seenparams:
643 if name in self._seenparams:
644 raise ValueError('duplicated params: %s' % name)
644 raise ValueError('duplicated params: %s' % name)
645 self._seenparams.add(name)
645 self._seenparams.add(name)
646 params = self._advisoryparams
646 params = self._advisoryparams
647 if mandatory:
647 if mandatory:
648 params = self._mandatoryparams
648 params = self._mandatoryparams
649 params.append((name, value))
649 params.append((name, value))
650
650
651 # methods used to generates the bundle2 stream
651 # methods used to generates the bundle2 stream
652 def getchunks(self):
652 def getchunks(self):
653 if self._generated is not None:
653 if self._generated is not None:
654 raise RuntimeError('part can only be consumed once')
654 raise RuntimeError('part can only be consumed once')
655 self._generated = False
655 self._generated = False
656 #### header
656 #### header
657 if self.mandatory:
657 if self.mandatory:
658 parttype = self.type.upper()
658 parttype = self.type.upper()
659 else:
659 else:
660 parttype = self.type.lower()
660 parttype = self.type.lower()
661 ## parttype
661 ## parttype
662 header = [_pack(_fparttypesize, len(parttype)),
662 header = [_pack(_fparttypesize, len(parttype)),
663 parttype, _pack(_fpartid, self.id),
663 parttype, _pack(_fpartid, self.id),
664 ]
664 ]
665 ## parameters
665 ## parameters
666 # count
666 # count
667 manpar = self.mandatoryparams
667 manpar = self.mandatoryparams
668 advpar = self.advisoryparams
668 advpar = self.advisoryparams
669 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
669 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
670 # size
670 # size
671 parsizes = []
671 parsizes = []
672 for key, value in manpar:
672 for key, value in manpar:
673 parsizes.append(len(key))
673 parsizes.append(len(key))
674 parsizes.append(len(value))
674 parsizes.append(len(value))
675 for key, value in advpar:
675 for key, value in advpar:
676 parsizes.append(len(key))
676 parsizes.append(len(key))
677 parsizes.append(len(value))
677 parsizes.append(len(value))
678 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
678 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
679 header.append(paramsizes)
679 header.append(paramsizes)
680 # key, value
680 # key, value
681 for key, value in manpar:
681 for key, value in manpar:
682 header.append(key)
682 header.append(key)
683 header.append(value)
683 header.append(value)
684 for key, value in advpar:
684 for key, value in advpar:
685 header.append(key)
685 header.append(key)
686 header.append(value)
686 header.append(value)
687 ## finalize header
687 ## finalize header
688 headerchunk = ''.join(header)
688 headerchunk = ''.join(header)
689 yield _pack(_fpartheadersize, len(headerchunk))
689 yield _pack(_fpartheadersize, len(headerchunk))
690 yield headerchunk
690 yield headerchunk
691 ## payload
691 ## payload
692 try:
692 try:
693 for chunk in self._payloadchunks():
693 for chunk in self._payloadchunks():
694 yield _pack(_fpayloadsize, len(chunk))
694 yield _pack(_fpayloadsize, len(chunk))
695 yield chunk
695 yield chunk
696 except Exception, exc:
696 except Exception, exc:
697 # backup exception data for later
697 # backup exception data for later
698 exc_info = sys.exc_info()
698 exc_info = sys.exc_info()
699 msg = 'unexpected error: %s' % exc
699 msg = 'unexpected error: %s' % exc
700 interpart = bundlepart('b2x:error:abort', [('message', msg)],
700 interpart = bundlepart('b2x:error:abort', [('message', msg)],
701 mandatory=False)
701 mandatory=False)
702 interpart.id = 0
702 interpart.id = 0
703 yield _pack(_fpayloadsize, -1)
703 yield _pack(_fpayloadsize, -1)
704 for chunk in interpart.getchunks():
704 for chunk in interpart.getchunks():
705 yield chunk
705 yield chunk
706 # abort current part payload
706 # abort current part payload
707 yield _pack(_fpayloadsize, 0)
707 yield _pack(_fpayloadsize, 0)
708 raise exc_info[0], exc_info[1], exc_info[2]
708 raise exc_info[0], exc_info[1], exc_info[2]
709 # end of payload
709 # end of payload
710 yield _pack(_fpayloadsize, 0)
710 yield _pack(_fpayloadsize, 0)
711 self._generated = True
711 self._generated = True
712
712
713 def _payloadchunks(self):
713 def _payloadchunks(self):
714 """yield chunks of a the part payload
714 """yield chunks of a the part payload
715
715
716 Exists to handle the different methods to provide data to a part."""
716 Exists to handle the different methods to provide data to a part."""
717 # we only support fixed size data now.
717 # we only support fixed size data now.
718 # This will be improved in the future.
718 # This will be improved in the future.
719 if util.safehasattr(self.data, 'next'):
719 if util.safehasattr(self.data, 'next'):
720 buff = util.chunkbuffer(self.data)
720 buff = util.chunkbuffer(self.data)
721 chunk = buff.read(preferedchunksize)
721 chunk = buff.read(preferedchunksize)
722 while chunk:
722 while chunk:
723 yield chunk
723 yield chunk
724 chunk = buff.read(preferedchunksize)
724 chunk = buff.read(preferedchunksize)
725 elif len(self.data):
725 elif len(self.data):
726 yield self.data
726 yield self.data
727
727
728
728
729 flaginterrupt = -1
729 flaginterrupt = -1
730
730
731 class interrupthandler(unpackermixin):
731 class interrupthandler(unpackermixin):
732 """read one part and process it with restricted capability
732 """read one part and process it with restricted capability
733
733
734 This allows to transmit exception raised on the producer size during part
734 This allows to transmit exception raised on the producer size during part
735 iteration while the consumer is reading a part.
735 iteration while the consumer is reading a part.
736
736
737 Part processed in this manner only have access to a ui object,"""
737 Part processed in this manner only have access to a ui object,"""
738
738
739 def __init__(self, ui, fp):
739 def __init__(self, ui, fp):
740 super(interrupthandler, self).__init__(fp)
740 super(interrupthandler, self).__init__(fp)
741 self.ui = ui
741 self.ui = ui
742
742
743 def _readpartheader(self):
743 def _readpartheader(self):
744 """reads a part header size and return the bytes blob
744 """reads a part header size and return the bytes blob
745
745
746 returns None if empty"""
746 returns None if empty"""
747 headersize = self._unpack(_fpartheadersize)[0]
747 headersize = self._unpack(_fpartheadersize)[0]
748 if headersize < 0:
748 if headersize < 0:
749 raise error.BundleValueError('negative part header size: %i'
749 raise error.BundleValueError('negative part header size: %i'
750 % headersize)
750 % headersize)
751 self.ui.debug('part header size: %i\n' % headersize)
751 self.ui.debug('part header size: %i\n' % headersize)
752 if headersize:
752 if headersize:
753 return self._readexact(headersize)
753 return self._readexact(headersize)
754 return None
754 return None
755
755
756 def __call__(self):
756 def __call__(self):
757 self.ui.debug('bundle2 stream interruption, looking for a part.\n')
757 self.ui.debug('bundle2 stream interruption, looking for a part.\n')
758 headerblock = self._readpartheader()
758 headerblock = self._readpartheader()
759 if headerblock is None:
759 if headerblock is None:
760 self.ui.debug('no part found during interruption.\n')
760 self.ui.debug('no part found during interruption.\n')
761 return
761 return
762 part = unbundlepart(self.ui, headerblock, self._fp)
762 part = unbundlepart(self.ui, headerblock, self._fp)
763 op = interruptoperation(self.ui)
763 op = interruptoperation(self.ui)
764 _processpart(op, part)
764 _processpart(op, part)
765
765
766 class interruptoperation(object):
766 class interruptoperation(object):
767 """A limited operation to be use by part handler during interruption
767 """A limited operation to be use by part handler during interruption
768
768
769 It only have access to an ui object.
769 It only have access to an ui object.
770 """
770 """
771
771
772 def __init__(self, ui):
772 def __init__(self, ui):
773 self.ui = ui
773 self.ui = ui
774 self.reply = None
774 self.reply = None
775
775
776 @property
776 @property
777 def repo(self):
777 def repo(self):
778 raise RuntimeError('no repo access from stream interruption')
778 raise RuntimeError('no repo access from stream interruption')
779
779
780 def gettransaction(self):
780 def gettransaction(self):
781 raise TransactionUnavailable('no repo access from stream interruption')
781 raise TransactionUnavailable('no repo access from stream interruption')
782
782
783 class unbundlepart(unpackermixin):
783 class unbundlepart(unpackermixin):
784 """a bundle part read from a bundle"""
784 """a bundle part read from a bundle"""
785
785
786 def __init__(self, ui, header, fp):
786 def __init__(self, ui, header, fp):
787 super(unbundlepart, self).__init__(fp)
787 super(unbundlepart, self).__init__(fp)
788 self.ui = ui
788 self.ui = ui
789 # unbundle state attr
789 # unbundle state attr
790 self._headerdata = header
790 self._headerdata = header
791 self._headeroffset = 0
791 self._headeroffset = 0
792 self._initialized = False
792 self._initialized = False
793 self.consumed = False
793 self.consumed = False
794 # part data
794 # part data
795 self.id = None
795 self.id = None
796 self.type = None
796 self.type = None
797 self.mandatoryparams = None
797 self.mandatoryparams = None
798 self.advisoryparams = None
798 self.advisoryparams = None
799 self.params = None
799 self.params = None
800 self.mandatorykeys = ()
800 self.mandatorykeys = ()
801 self._payloadstream = None
801 self._payloadstream = None
802 self._readheader()
802 self._readheader()
803 self._mandatory = None
803 self._mandatory = None
804
804
805 def _fromheader(self, size):
805 def _fromheader(self, size):
806 """return the next <size> byte from the header"""
806 """return the next <size> byte from the header"""
807 offset = self._headeroffset
807 offset = self._headeroffset
808 data = self._headerdata[offset:(offset + size)]
808 data = self._headerdata[offset:(offset + size)]
809 self._headeroffset = offset + size
809 self._headeroffset = offset + size
810 return data
810 return data
811
811
812 def _unpackheader(self, format):
812 def _unpackheader(self, format):
813 """read given format from header
813 """read given format from header
814
814
815 This automatically compute the size of the format to read."""
815 This automatically compute the size of the format to read."""
816 data = self._fromheader(struct.calcsize(format))
816 data = self._fromheader(struct.calcsize(format))
817 return _unpack(format, data)
817 return _unpack(format, data)
818
818
819 def _initparams(self, mandatoryparams, advisoryparams):
819 def _initparams(self, mandatoryparams, advisoryparams):
820 """internal function to setup all logic related parameters"""
820 """internal function to setup all logic related parameters"""
821 # make it read only to prevent people touching it by mistake.
821 # make it read only to prevent people touching it by mistake.
822 self.mandatoryparams = tuple(mandatoryparams)
822 self.mandatoryparams = tuple(mandatoryparams)
823 self.advisoryparams = tuple(advisoryparams)
823 self.advisoryparams = tuple(advisoryparams)
824 # user friendly UI
824 # user friendly UI
825 self.params = dict(self.mandatoryparams)
825 self.params = dict(self.mandatoryparams)
826 self.params.update(dict(self.advisoryparams))
826 self.params.update(dict(self.advisoryparams))
827 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
827 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
828
828
829 def _readheader(self):
829 def _readheader(self):
830 """read the header and setup the object"""
830 """read the header and setup the object"""
831 typesize = self._unpackheader(_fparttypesize)[0]
831 typesize = self._unpackheader(_fparttypesize)[0]
832 self.type = self._fromheader(typesize)
832 self.type = self._fromheader(typesize)
833 self.ui.debug('part type: "%s"\n' % self.type)
833 self.ui.debug('part type: "%s"\n' % self.type)
834 self.id = self._unpackheader(_fpartid)[0]
834 self.id = self._unpackheader(_fpartid)[0]
835 self.ui.debug('part id: "%s"\n' % self.id)
835 self.ui.debug('part id: "%s"\n' % self.id)
836 # extract mandatory bit from type
836 # extract mandatory bit from type
837 self.mandatory = (self.type != self.type.lower())
837 self.mandatory = (self.type != self.type.lower())
838 self.type = self.type.lower()
838 self.type = self.type.lower()
839 ## reading parameters
839 ## reading parameters
840 # param count
840 # param count
841 mancount, advcount = self._unpackheader(_fpartparamcount)
841 mancount, advcount = self._unpackheader(_fpartparamcount)
842 self.ui.debug('part parameters: %i\n' % (mancount + advcount))
842 self.ui.debug('part parameters: %i\n' % (mancount + advcount))
843 # param size
843 # param size
844 fparamsizes = _makefpartparamsizes(mancount + advcount)
844 fparamsizes = _makefpartparamsizes(mancount + advcount)
845 paramsizes = self._unpackheader(fparamsizes)
845 paramsizes = self._unpackheader(fparamsizes)
846 # make it a list of couple again
846 # make it a list of couple again
847 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
847 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
848 # split mandatory from advisory
848 # split mandatory from advisory
849 mansizes = paramsizes[:mancount]
849 mansizes = paramsizes[:mancount]
850 advsizes = paramsizes[mancount:]
850 advsizes = paramsizes[mancount:]
851 # retrieve param value
851 # retrieve param value
852 manparams = []
852 manparams = []
853 for key, value in mansizes:
853 for key, value in mansizes:
854 manparams.append((self._fromheader(key), self._fromheader(value)))
854 manparams.append((self._fromheader(key), self._fromheader(value)))
855 advparams = []
855 advparams = []
856 for key, value in advsizes:
856 for key, value in advsizes:
857 advparams.append((self._fromheader(key), self._fromheader(value)))
857 advparams.append((self._fromheader(key), self._fromheader(value)))
858 self._initparams(manparams, advparams)
858 self._initparams(manparams, advparams)
859 ## part payload
859 ## part payload
860 def payloadchunks():
860 def payloadchunks():
861 payloadsize = self._unpack(_fpayloadsize)[0]
861 payloadsize = self._unpack(_fpayloadsize)[0]
862 self.ui.debug('payload chunk size: %i\n' % payloadsize)
862 self.ui.debug('payload chunk size: %i\n' % payloadsize)
863 while payloadsize:
863 while payloadsize:
864 if payloadsize == flaginterrupt:
864 if payloadsize == flaginterrupt:
865 # interruption detection, the handler will now read a
865 # interruption detection, the handler will now read a
866 # single part and process it.
866 # single part and process it.
867 interrupthandler(self.ui, self._fp)()
867 interrupthandler(self.ui, self._fp)()
868 elif payloadsize < 0:
868 elif payloadsize < 0:
869 msg = 'negative payload chunk size: %i' % payloadsize
869 msg = 'negative payload chunk size: %i' % payloadsize
870 raise error.BundleValueError(msg)
870 raise error.BundleValueError(msg)
871 else:
871 else:
872 yield self._readexact(payloadsize)
872 yield self._readexact(payloadsize)
873 payloadsize = self._unpack(_fpayloadsize)[0]
873 payloadsize = self._unpack(_fpayloadsize)[0]
874 self.ui.debug('payload chunk size: %i\n' % payloadsize)
874 self.ui.debug('payload chunk size: %i\n' % payloadsize)
875 self._payloadstream = util.chunkbuffer(payloadchunks())
875 self._payloadstream = util.chunkbuffer(payloadchunks())
876 # we read the data, tell it
876 # we read the data, tell it
877 self._initialized = True
877 self._initialized = True
878
878
879 def read(self, size=None):
879 def read(self, size=None):
880 """read payload data"""
880 """read payload data"""
881 if not self._initialized:
881 if not self._initialized:
882 self._readheader()
882 self._readheader()
883 if size is None:
883 if size is None:
884 data = self._payloadstream.read()
884 data = self._payloadstream.read()
885 else:
885 else:
886 data = self._payloadstream.read(size)
886 data = self._payloadstream.read(size)
887 if size is None or len(data) < size:
887 if size is None or len(data) < size:
888 self.consumed = True
888 self.consumed = True
889 return data
889 return data
890
890
891 capabilities = {'HG2Y': (),
891 capabilities = {'HG2Y': (),
892 'b2x:listkeys': (),
892 'b2x:listkeys': (),
893 'b2x:pushkey': (),
893 'b2x:pushkey': (),
894 'digests': tuple(sorted(util.DIGESTS.keys())),
894 'digests': tuple(sorted(util.DIGESTS.keys())),
895 'b2x:remote-changegroup': ('http', 'https'),
895 'b2x:remote-changegroup': ('http', 'https'),
896 }
896 }
897
897
898 def getrepocaps(repo, allowpushback=False):
898 def getrepocaps(repo, allowpushback=False):
899 """return the bundle2 capabilities for a given repo
899 """return the bundle2 capabilities for a given repo
900
900
901 Exists to allow extensions (like evolution) to mutate the capabilities.
901 Exists to allow extensions (like evolution) to mutate the capabilities.
902 """
902 """
903 caps = capabilities.copy()
903 caps = capabilities.copy()
904 caps['b2x:changegroup'] = tuple(sorted(changegroup.packermap.keys()))
904 caps['b2x:changegroup'] = tuple(sorted(changegroup.packermap.keys()))
905 if obsolete.isenabled(repo, obsolete.exchangeopt):
905 if obsolete.isenabled(repo, obsolete.exchangeopt):
906 supportedformat = tuple('V%i' % v for v in obsolete.formats)
906 supportedformat = tuple('V%i' % v for v in obsolete.formats)
907 caps['b2x:obsmarkers'] = supportedformat
907 caps['b2x:obsmarkers'] = supportedformat
908 if allowpushback:
908 if allowpushback:
909 caps['b2x:pushback'] = ()
909 caps['b2x:pushback'] = ()
910 return caps
910 return caps
911
911
912 def bundle2caps(remote):
912 def bundle2caps(remote):
913 """return the bundle capabilities of a peer as dict"""
913 """return the bundle capabilities of a peer as dict"""
914 raw = remote.capable('bundle2-exp')
914 raw = remote.capable('bundle2-exp')
915 if not raw and raw != '':
915 if not raw and raw != '':
916 return {}
916 return {}
917 capsblob = urllib.unquote(remote.capable('bundle2-exp'))
917 capsblob = urllib.unquote(remote.capable('bundle2-exp'))
918 return decodecaps(capsblob)
918 return decodecaps(capsblob)
919
919
920 def obsmarkersversion(caps):
920 def obsmarkersversion(caps):
921 """extract the list of supported obsmarkers versions from a bundle2caps dict
921 """extract the list of supported obsmarkers versions from a bundle2caps dict
922 """
922 """
923 obscaps = caps.get('b2x:obsmarkers', ())
923 obscaps = caps.get('b2x:obsmarkers', ())
924 return [int(c[1:]) for c in obscaps if c.startswith('V')]
924 return [int(c[1:]) for c in obscaps if c.startswith('V')]
925
925
926 @parthandler('b2x:changegroup', ('version',))
926 @parthandler('b2x:changegroup', ('version',))
927 def handlechangegroup(op, inpart):
927 def handlechangegroup(op, inpart):
928 """apply a changegroup part on the repo
928 """apply a changegroup part on the repo
929
929
930 This is a very early implementation that will massive rework before being
930 This is a very early implementation that will massive rework before being
931 inflicted to any end-user.
931 inflicted to any end-user.
932 """
932 """
933 # Make sure we trigger a transaction creation
933 # Make sure we trigger a transaction creation
934 #
934 #
935 # The addchangegroup function will get a transaction object by itself, but
935 # The addchangegroup function will get a transaction object by itself, but
936 # we need to make sure we trigger the creation of a transaction object used
936 # we need to make sure we trigger the creation of a transaction object used
937 # for the whole processing scope.
937 # for the whole processing scope.
938 op.gettransaction()
938 op.gettransaction()
939 unpackerversion = inpart.params.get('version', '01')
939 unpackerversion = inpart.params.get('version', '01')
940 # We should raise an appropriate exception here
940 # We should raise an appropriate exception here
941 unpacker = changegroup.packermap[unpackerversion][1]
941 unpacker = changegroup.packermap[unpackerversion][1]
942 cg = unpacker(inpart, 'UN')
942 cg = unpacker(inpart, 'UN')
943 # the source and url passed here are overwritten by the one contained in
943 # the source and url passed here are overwritten by the one contained in
944 # the transaction.hookargs argument. So 'bundle2' is a placeholder
944 # the transaction.hookargs argument. So 'bundle2' is a placeholder
945 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
945 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
946 op.records.add('changegroup', {'return': ret})
946 op.records.add('changegroup', {'return': ret})
947 if op.reply is not None:
947 if op.reply is not None:
948 # This is definitely not the final form of this
948 # This is definitely not the final form of this
949 # return. But one need to start somewhere.
949 # return. But one need to start somewhere.
950 part = op.reply.newpart('b2x:reply:changegroup', mandatory=False)
950 part = op.reply.newpart('b2x:reply:changegroup', mandatory=False)
951 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
951 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
952 part.addparam('return', '%i' % ret, mandatory=False)
952 part.addparam('return', '%i' % ret, mandatory=False)
953 assert not inpart.read()
953 assert not inpart.read()
954
954
955 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
955 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
956 ['digest:%s' % k for k in util.DIGESTS.keys()])
956 ['digest:%s' % k for k in util.DIGESTS.keys()])
957 @parthandler('b2x:remote-changegroup', _remotechangegroupparams)
957 @parthandler('b2x:remote-changegroup', _remotechangegroupparams)
958 def handleremotechangegroup(op, inpart):
958 def handleremotechangegroup(op, inpart):
959 """apply a bundle10 on the repo, given an url and validation information
959 """apply a bundle10 on the repo, given an url and validation information
960
960
961 All the information about the remote bundle to import are given as
961 All the information about the remote bundle to import are given as
962 parameters. The parameters include:
962 parameters. The parameters include:
963 - url: the url to the bundle10.
963 - url: the url to the bundle10.
964 - size: the bundle10 file size. It is used to validate what was
964 - size: the bundle10 file size. It is used to validate what was
965 retrieved by the client matches the server knowledge about the bundle.
965 retrieved by the client matches the server knowledge about the bundle.
966 - digests: a space separated list of the digest types provided as
966 - digests: a space separated list of the digest types provided as
967 parameters.
967 parameters.
968 - digest:<digest-type>: the hexadecimal representation of the digest with
968 - digest:<digest-type>: the hexadecimal representation of the digest with
969 that name. Like the size, it is used to validate what was retrieved by
969 that name. Like the size, it is used to validate what was retrieved by
970 the client matches what the server knows about the bundle.
970 the client matches what the server knows about the bundle.
971
971
972 When multiple digest types are given, all of them are checked.
972 When multiple digest types are given, all of them are checked.
973 """
973 """
974 try:
974 try:
975 raw_url = inpart.params['url']
975 raw_url = inpart.params['url']
976 except KeyError:
976 except KeyError:
977 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'url')
977 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'url')
978 parsed_url = util.url(raw_url)
978 parsed_url = util.url(raw_url)
979 if parsed_url.scheme not in capabilities['b2x:remote-changegroup']:
979 if parsed_url.scheme not in capabilities['b2x:remote-changegroup']:
980 raise util.Abort(_('remote-changegroup does not support %s urls') %
980 raise util.Abort(_('remote-changegroup does not support %s urls') %
981 parsed_url.scheme)
981 parsed_url.scheme)
982
982
983 try:
983 try:
984 size = int(inpart.params['size'])
984 size = int(inpart.params['size'])
985 except ValueError:
985 except ValueError:
986 raise util.Abort(_('remote-changegroup: invalid value for param "%s"')
986 raise util.Abort(_('remote-changegroup: invalid value for param "%s"')
987 % 'size')
987 % 'size')
988 except KeyError:
988 except KeyError:
989 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'size')
989 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'size')
990
990
991 digests = {}
991 digests = {}
992 for typ in inpart.params.get('digests', '').split():
992 for typ in inpart.params.get('digests', '').split():
993 param = 'digest:%s' % typ
993 param = 'digest:%s' % typ
994 try:
994 try:
995 value = inpart.params[param]
995 value = inpart.params[param]
996 except KeyError:
996 except KeyError:
997 raise util.Abort(_('remote-changegroup: missing "%s" param') %
997 raise util.Abort(_('remote-changegroup: missing "%s" param') %
998 param)
998 param)
999 digests[typ] = value
999 digests[typ] = value
1000
1000
1001 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1001 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1002
1002
1003 # Make sure we trigger a transaction creation
1003 # Make sure we trigger a transaction creation
1004 #
1004 #
1005 # The addchangegroup function will get a transaction object by itself, but
1005 # The addchangegroup function will get a transaction object by itself, but
1006 # we need to make sure we trigger the creation of a transaction object used
1006 # we need to make sure we trigger the creation of a transaction object used
1007 # for the whole processing scope.
1007 # for the whole processing scope.
1008 op.gettransaction()
1008 op.gettransaction()
1009 import exchange
1009 import exchange
1010 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1010 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1011 if not isinstance(cg, changegroup.cg1unpacker):
1011 if not isinstance(cg, changegroup.cg1unpacker):
1012 raise util.Abort(_('%s: not a bundle version 1.0') %
1012 raise util.Abort(_('%s: not a bundle version 1.0') %
1013 util.hidepassword(raw_url))
1013 util.hidepassword(raw_url))
1014 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
1014 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
1015 op.records.add('changegroup', {'return': ret})
1015 op.records.add('changegroup', {'return': ret})
1016 if op.reply is not None:
1016 if op.reply is not None:
1017 # This is definitely not the final form of this
1017 # This is definitely not the final form of this
1018 # return. But one need to start somewhere.
1018 # return. But one need to start somewhere.
1019 part = op.reply.newpart('b2x:reply:changegroup')
1019 part = op.reply.newpart('b2x:reply:changegroup')
1020 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1020 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1021 part.addparam('return', '%i' % ret, mandatory=False)
1021 part.addparam('return', '%i' % ret, mandatory=False)
1022 try:
1022 try:
1023 real_part.validate()
1023 real_part.validate()
1024 except util.Abort, e:
1024 except util.Abort, e:
1025 raise util.Abort(_('bundle at %s is corrupted:\n%s') %
1025 raise util.Abort(_('bundle at %s is corrupted:\n%s') %
1026 (util.hidepassword(raw_url), str(e)))
1026 (util.hidepassword(raw_url), str(e)))
1027 assert not inpart.read()
1027 assert not inpart.read()
1028
1028
1029 @parthandler('b2x:reply:changegroup', ('return', 'in-reply-to'))
1029 @parthandler('b2x:reply:changegroup', ('return', 'in-reply-to'))
1030 def handlereplychangegroup(op, inpart):
1030 def handlereplychangegroup(op, inpart):
1031 ret = int(inpart.params['return'])
1031 ret = int(inpart.params['return'])
1032 replyto = int(inpart.params['in-reply-to'])
1032 replyto = int(inpart.params['in-reply-to'])
1033 op.records.add('changegroup', {'return': ret}, replyto)
1033 op.records.add('changegroup', {'return': ret}, replyto)
1034
1034
1035 @parthandler('b2x:check:heads')
1035 @parthandler('b2x:check:heads')
1036 def handlecheckheads(op, inpart):
1036 def handlecheckheads(op, inpart):
1037 """check that head of the repo did not change
1037 """check that head of the repo did not change
1038
1038
1039 This is used to detect a push race when using unbundle.
1039 This is used to detect a push race when using unbundle.
1040 This replaces the "heads" argument of unbundle."""
1040 This replaces the "heads" argument of unbundle."""
1041 h = inpart.read(20)
1041 h = inpart.read(20)
1042 heads = []
1042 heads = []
1043 while len(h) == 20:
1043 while len(h) == 20:
1044 heads.append(h)
1044 heads.append(h)
1045 h = inpart.read(20)
1045 h = inpart.read(20)
1046 assert not h
1046 assert not h
1047 if heads != op.repo.heads():
1047 if heads != op.repo.heads():
1048 raise error.PushRaced('repository changed while pushing - '
1048 raise error.PushRaced('repository changed while pushing - '
1049 'please try again')
1049 'please try again')
1050
1050
1051 @parthandler('b2x:output')
1051 @parthandler('b2x:output')
1052 def handleoutput(op, inpart):
1052 def handleoutput(op, inpart):
1053 """forward output captured on the server to the client"""
1053 """forward output captured on the server to the client"""
1054 for line in inpart.read().splitlines():
1054 for line in inpart.read().splitlines():
1055 op.ui.write(('remote: %s\n' % line))
1055 op.ui.write(('remote: %s\n' % line))
1056
1056
1057 @parthandler('b2x:replycaps')
1057 @parthandler('b2x:replycaps')
1058 def handlereplycaps(op, inpart):
1058 def handlereplycaps(op, inpart):
1059 """Notify that a reply bundle should be created
1059 """Notify that a reply bundle should be created
1060
1060
1061 The payload contains the capabilities information for the reply"""
1061 The payload contains the capabilities information for the reply"""
1062 caps = decodecaps(inpart.read())
1062 caps = decodecaps(inpart.read())
1063 if op.reply is None:
1063 if op.reply is None:
1064 op.reply = bundle20(op.ui, caps)
1064 op.reply = bundle20(op.ui, caps)
1065
1065
1066 @parthandler('b2x:error:abort', ('message', 'hint'))
1066 @parthandler('b2x:error:abort', ('message', 'hint'))
1067 def handlereplycaps(op, inpart):
1067 def handlereplycaps(op, inpart):
1068 """Used to transmit abort error over the wire"""
1068 """Used to transmit abort error over the wire"""
1069 raise util.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
1069 raise util.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
1070
1070
1071 @parthandler('b2x:error:unsupportedcontent', ('parttype', 'params'))
1071 @parthandler('b2x:error:unsupportedcontent', ('parttype', 'params'))
1072 def handlereplycaps(op, inpart):
1072 def handlereplycaps(op, inpart):
1073 """Used to transmit unknown content error over the wire"""
1073 """Used to transmit unknown content error over the wire"""
1074 kwargs = {}
1074 kwargs = {}
1075 parttype = inpart.params.get('parttype')
1075 parttype = inpart.params.get('parttype')
1076 if parttype is not None:
1076 if parttype is not None:
1077 kwargs['parttype'] = parttype
1077 kwargs['parttype'] = parttype
1078 params = inpart.params.get('params')
1078 params = inpart.params.get('params')
1079 if params is not None:
1079 if params is not None:
1080 kwargs['params'] = params.split('\0')
1080 kwargs['params'] = params.split('\0')
1081
1081
1082 raise error.UnsupportedPartError(**kwargs)
1082 raise error.UnsupportedPartError(**kwargs)
1083
1083
1084 @parthandler('b2x:error:pushraced', ('message',))
1084 @parthandler('b2x:error:pushraced', ('message',))
1085 def handlereplycaps(op, inpart):
1085 def handlereplycaps(op, inpart):
1086 """Used to transmit push race error over the wire"""
1086 """Used to transmit push race error over the wire"""
1087 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1087 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1088
1088
1089 @parthandler('b2x:listkeys', ('namespace',))
1089 @parthandler('b2x:listkeys', ('namespace',))
1090 def handlelistkeys(op, inpart):
1090 def handlelistkeys(op, inpart):
1091 """retrieve pushkey namespace content stored in a bundle2"""
1091 """retrieve pushkey namespace content stored in a bundle2"""
1092 namespace = inpart.params['namespace']
1092 namespace = inpart.params['namespace']
1093 r = pushkey.decodekeys(inpart.read())
1093 r = pushkey.decodekeys(inpart.read())
1094 op.records.add('listkeys', (namespace, r))
1094 op.records.add('listkeys', (namespace, r))
1095
1095
1096 @parthandler('b2x:pushkey', ('namespace', 'key', 'old', 'new'))
1096 @parthandler('b2x:pushkey', ('namespace', 'key', 'old', 'new'))
1097 def handlepushkey(op, inpart):
1097 def handlepushkey(op, inpart):
1098 """process a pushkey request"""
1098 """process a pushkey request"""
1099 dec = pushkey.decode
1099 dec = pushkey.decode
1100 namespace = dec(inpart.params['namespace'])
1100 namespace = dec(inpart.params['namespace'])
1101 key = dec(inpart.params['key'])
1101 key = dec(inpart.params['key'])
1102 old = dec(inpart.params['old'])
1102 old = dec(inpart.params['old'])
1103 new = dec(inpart.params['new'])
1103 new = dec(inpart.params['new'])
1104 ret = op.repo.pushkey(namespace, key, old, new)
1104 ret = op.repo.pushkey(namespace, key, old, new)
1105 record = {'namespace': namespace,
1105 record = {'namespace': namespace,
1106 'key': key,
1106 'key': key,
1107 'old': old,
1107 'old': old,
1108 'new': new}
1108 'new': new}
1109 op.records.add('pushkey', record)
1109 op.records.add('pushkey', record)
1110 if op.reply is not None:
1110 if op.reply is not None:
1111 rpart = op.reply.newpart('b2x:reply:pushkey')
1111 rpart = op.reply.newpart('b2x:reply:pushkey')
1112 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1112 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1113 rpart.addparam('return', '%i' % ret, mandatory=False)
1113 rpart.addparam('return', '%i' % ret, mandatory=False)
1114
1114
1115 @parthandler('b2x:reply:pushkey', ('return', 'in-reply-to'))
1115 @parthandler('b2x:reply:pushkey', ('return', 'in-reply-to'))
1116 def handlepushkeyreply(op, inpart):
1116 def handlepushkeyreply(op, inpart):
1117 """retrieve the result of a pushkey request"""
1117 """retrieve the result of a pushkey request"""
1118 ret = int(inpart.params['return'])
1118 ret = int(inpart.params['return'])
1119 partid = int(inpart.params['in-reply-to'])
1119 partid = int(inpart.params['in-reply-to'])
1120 op.records.add('pushkey', {'return': ret}, partid)
1120 op.records.add('pushkey', {'return': ret}, partid)
1121
1121
1122 @parthandler('b2x:obsmarkers')
1122 @parthandler('b2x:obsmarkers')
1123 def handleobsmarker(op, inpart):
1123 def handleobsmarker(op, inpart):
1124 """add a stream of obsmarkers to the repo"""
1124 """add a stream of obsmarkers to the repo"""
1125 tr = op.gettransaction()
1125 tr = op.gettransaction()
1126 new = op.repo.obsstore.mergemarkers(tr, inpart.read())
1126 new = op.repo.obsstore.mergemarkers(tr, inpart.read())
1127 if new:
1127 if new:
1128 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1128 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1129 op.records.add('obsmarkers', {'new': new})
1129 op.records.add('obsmarkers', {'new': new})
1130 if op.reply is not None:
1130 if op.reply is not None:
1131 rpart = op.reply.newpart('b2x:reply:obsmarkers')
1131 rpart = op.reply.newpart('b2x:reply:obsmarkers')
1132 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1132 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1133 rpart.addparam('new', '%i' % new, mandatory=False)
1133 rpart.addparam('new', '%i' % new, mandatory=False)
1134
1134
1135
1135
1136 @parthandler('b2x:reply:obsmarkers', ('new', 'in-reply-to'))
1136 @parthandler('b2x:reply:obsmarkers', ('new', 'in-reply-to'))
1137 def handlepushkeyreply(op, inpart):
1137 def handlepushkeyreply(op, inpart):
1138 """retrieve the result of a pushkey request"""
1138 """retrieve the result of a pushkey request"""
1139 ret = int(inpart.params['new'])
1139 ret = int(inpart.params['new'])
1140 partid = int(inpart.params['in-reply-to'])
1140 partid = int(inpart.params['in-reply-to'])
1141 op.records.add('obsmarkers', {'new': ret}, partid)
1141 op.records.add('obsmarkers', {'new': ret}, partid)
General Comments 0
You need to be logged in to leave comments. Login now