##// END OF EJS Templates
bundle2.unbundlepart: decouple mandatory from parttype...
Eric Sumner -
r23585:94b25d71 default
parent child Browse files
Show More
@@ -1,1123 +1,1127 b''
1 # bundle2.py - generic container format to transmit arbitrary data.
1 # bundle2.py - generic container format to transmit arbitrary data.
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7 """Handling of the new bundle2 format
7 """Handling of the new bundle2 format
8
8
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 payloads in an application agnostic way. It consist in a sequence of "parts"
10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 that will be handed to and processed by the application layer.
11 that will be handed to and processed by the application layer.
12
12
13
13
14 General format architecture
14 General format architecture
15 ===========================
15 ===========================
16
16
17 The format is architectured as follow
17 The format is architectured as follow
18
18
19 - magic string
19 - magic string
20 - stream level parameters
20 - stream level parameters
21 - payload parts (any number)
21 - payload parts (any number)
22 - end of stream marker.
22 - end of stream marker.
23
23
24 the Binary format
24 the Binary format
25 ============================
25 ============================
26
26
27 All numbers are unsigned and big-endian.
27 All numbers are unsigned and big-endian.
28
28
29 stream level parameters
29 stream level parameters
30 ------------------------
30 ------------------------
31
31
32 Binary format is as follow
32 Binary format is as follow
33
33
34 :params size: int32
34 :params size: int32
35
35
36 The total number of Bytes used by the parameters
36 The total number of Bytes used by the parameters
37
37
38 :params value: arbitrary number of Bytes
38 :params value: arbitrary number of Bytes
39
39
40 A blob of `params size` containing the serialized version of all stream level
40 A blob of `params size` containing the serialized version of all stream level
41 parameters.
41 parameters.
42
42
43 The blob contains a space separated list of parameters. Parameters with value
43 The blob contains a space separated list of parameters. Parameters with value
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45
45
46 Empty name are obviously forbidden.
46 Empty name are obviously forbidden.
47
47
48 Name MUST start with a letter. If this first letter is lower case, the
48 Name MUST start with a letter. If this first letter is lower case, the
49 parameter is advisory and can be safely ignored. However when the first
49 parameter is advisory and can be safely ignored. However when the first
50 letter is capital, the parameter is mandatory and the bundling process MUST
50 letter is capital, the parameter is mandatory and the bundling process MUST
51 stop if he is not able to proceed it.
51 stop if he is not able to proceed it.
52
52
53 Stream parameters use a simple textual format for two main reasons:
53 Stream parameters use a simple textual format for two main reasons:
54
54
55 - Stream level parameters should remain simple and we want to discourage any
55 - Stream level parameters should remain simple and we want to discourage any
56 crazy usage.
56 crazy usage.
57 - Textual data allow easy human inspection of a bundle2 header in case of
57 - Textual data allow easy human inspection of a bundle2 header in case of
58 troubles.
58 troubles.
59
59
60 Any Applicative level options MUST go into a bundle2 part instead.
60 Any Applicative level options MUST go into a bundle2 part instead.
61
61
62 Payload part
62 Payload part
63 ------------------------
63 ------------------------
64
64
65 Binary format is as follow
65 Binary format is as follow
66
66
67 :header size: int32
67 :header size: int32
68
68
69 The total number of Bytes used by the part headers. When the header is empty
69 The total number of Bytes used by the part headers. When the header is empty
70 (size = 0) this is interpreted as the end of stream marker.
70 (size = 0) this is interpreted as the end of stream marker.
71
71
72 :header:
72 :header:
73
73
74 The header defines how to interpret the part. It contains two piece of
74 The header defines how to interpret the part. It contains two piece of
75 data: the part type, and the part parameters.
75 data: the part type, and the part parameters.
76
76
77 The part type is used to route an application level handler, that can
77 The part type is used to route an application level handler, that can
78 interpret payload.
78 interpret payload.
79
79
80 Part parameters are passed to the application level handler. They are
80 Part parameters are passed to the application level handler. They are
81 meant to convey information that will help the application level object to
81 meant to convey information that will help the application level object to
82 interpret the part payload.
82 interpret the part payload.
83
83
84 The binary format of the header is has follow
84 The binary format of the header is has follow
85
85
86 :typesize: (one byte)
86 :typesize: (one byte)
87
87
88 :parttype: alphanumerical part name
88 :parttype: alphanumerical part name
89
89
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 to this part.
91 to this part.
92
92
93 :parameters:
93 :parameters:
94
94
95 Part's parameter may have arbitrary content, the binary structure is::
95 Part's parameter may have arbitrary content, the binary structure is::
96
96
97 <mandatory-count><advisory-count><param-sizes><param-data>
97 <mandatory-count><advisory-count><param-sizes><param-data>
98
98
99 :mandatory-count: 1 byte, number of mandatory parameters
99 :mandatory-count: 1 byte, number of mandatory parameters
100
100
101 :advisory-count: 1 byte, number of advisory parameters
101 :advisory-count: 1 byte, number of advisory parameters
102
102
103 :param-sizes:
103 :param-sizes:
104
104
105 N couple of bytes, where N is the total number of parameters. Each
105 N couple of bytes, where N is the total number of parameters. Each
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107
107
108 :param-data:
108 :param-data:
109
109
110 A blob of bytes from which each parameter key and value can be
110 A blob of bytes from which each parameter key and value can be
111 retrieved using the list of size couples stored in the previous
111 retrieved using the list of size couples stored in the previous
112 field.
112 field.
113
113
114 Mandatory parameters comes first, then the advisory ones.
114 Mandatory parameters comes first, then the advisory ones.
115
115
116 Each parameter's key MUST be unique within the part.
116 Each parameter's key MUST be unique within the part.
117
117
118 :payload:
118 :payload:
119
119
120 payload is a series of `<chunksize><chunkdata>`.
120 payload is a series of `<chunksize><chunkdata>`.
121
121
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124
124
125 The current implementation always produces either zero or one chunk.
125 The current implementation always produces either zero or one chunk.
126 This is an implementation limitation that will ultimately be lifted.
126 This is an implementation limitation that will ultimately be lifted.
127
127
128 `chunksize` can be negative to trigger special case processing. No such
128 `chunksize` can be negative to trigger special case processing. No such
129 processing is in place yet.
129 processing is in place yet.
130
130
131 Bundle processing
131 Bundle processing
132 ============================
132 ============================
133
133
134 Each part is processed in order using a "part handler". Handler are registered
134 Each part is processed in order using a "part handler". Handler are registered
135 for a certain part type.
135 for a certain part type.
136
136
137 The matching of a part to its handler is case insensitive. The case of the
137 The matching of a part to its handler is case insensitive. The case of the
138 part type is used to know if a part is mandatory or advisory. If the Part type
138 part type is used to know if a part is mandatory or advisory. If the Part type
139 contains any uppercase char it is considered mandatory. When no handler is
139 contains any uppercase char it is considered mandatory. When no handler is
140 known for a Mandatory part, the process is aborted and an exception is raised.
140 known for a Mandatory part, the process is aborted and an exception is raised.
141 If the part is advisory and no handler is known, the part is ignored. When the
141 If the part is advisory and no handler is known, the part is ignored. When the
142 process is aborted, the full bundle is still read from the stream to keep the
142 process is aborted, the full bundle is still read from the stream to keep the
143 channel usable. But none of the part read from an abort are processed. In the
143 channel usable. But none of the part read from an abort are processed. In the
144 future, dropping the stream may become an option for channel we do not care to
144 future, dropping the stream may become an option for channel we do not care to
145 preserve.
145 preserve.
146 """
146 """
147
147
148 import sys
148 import sys
149 import util
149 import util
150 import struct
150 import struct
151 import urllib
151 import urllib
152 import string
152 import string
153 import obsolete
153 import obsolete
154 import pushkey
154 import pushkey
155 import url
155 import url
156
156
157 import changegroup, error
157 import changegroup, error
158 from i18n import _
158 from i18n import _
159
159
160 _pack = struct.pack
160 _pack = struct.pack
161 _unpack = struct.unpack
161 _unpack = struct.unpack
162
162
163 _magicstring = 'HG2Y'
163 _magicstring = 'HG2Y'
164
164
165 _fstreamparamsize = '>i'
165 _fstreamparamsize = '>i'
166 _fpartheadersize = '>i'
166 _fpartheadersize = '>i'
167 _fparttypesize = '>B'
167 _fparttypesize = '>B'
168 _fpartid = '>I'
168 _fpartid = '>I'
169 _fpayloadsize = '>i'
169 _fpayloadsize = '>i'
170 _fpartparamcount = '>BB'
170 _fpartparamcount = '>BB'
171
171
172 preferedchunksize = 4096
172 preferedchunksize = 4096
173
173
174 def _makefpartparamsizes(nbparams):
174 def _makefpartparamsizes(nbparams):
175 """return a struct format to read part parameter sizes
175 """return a struct format to read part parameter sizes
176
176
177 The number parameters is variable so we need to build that format
177 The number parameters is variable so we need to build that format
178 dynamically.
178 dynamically.
179 """
179 """
180 return '>'+('BB'*nbparams)
180 return '>'+('BB'*nbparams)
181
181
182 parthandlermapping = {}
182 parthandlermapping = {}
183
183
184 def parthandler(parttype, params=()):
184 def parthandler(parttype, params=()):
185 """decorator that register a function as a bundle2 part handler
185 """decorator that register a function as a bundle2 part handler
186
186
187 eg::
187 eg::
188
188
189 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
189 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
190 def myparttypehandler(...):
190 def myparttypehandler(...):
191 '''process a part of type "my part".'''
191 '''process a part of type "my part".'''
192 ...
192 ...
193 """
193 """
194 def _decorator(func):
194 def _decorator(func):
195 lparttype = parttype.lower() # enforce lower case matching.
195 lparttype = parttype.lower() # enforce lower case matching.
196 assert lparttype not in parthandlermapping
196 assert lparttype not in parthandlermapping
197 parthandlermapping[lparttype] = func
197 parthandlermapping[lparttype] = func
198 func.params = frozenset(params)
198 func.params = frozenset(params)
199 return func
199 return func
200 return _decorator
200 return _decorator
201
201
202 class unbundlerecords(object):
202 class unbundlerecords(object):
203 """keep record of what happens during and unbundle
203 """keep record of what happens during and unbundle
204
204
205 New records are added using `records.add('cat', obj)`. Where 'cat' is a
205 New records are added using `records.add('cat', obj)`. Where 'cat' is a
206 category of record and obj is an arbitrary object.
206 category of record and obj is an arbitrary object.
207
207
208 `records['cat']` will return all entries of this category 'cat'.
208 `records['cat']` will return all entries of this category 'cat'.
209
209
210 Iterating on the object itself will yield `('category', obj)` tuples
210 Iterating on the object itself will yield `('category', obj)` tuples
211 for all entries.
211 for all entries.
212
212
213 All iterations happens in chronological order.
213 All iterations happens in chronological order.
214 """
214 """
215
215
216 def __init__(self):
216 def __init__(self):
217 self._categories = {}
217 self._categories = {}
218 self._sequences = []
218 self._sequences = []
219 self._replies = {}
219 self._replies = {}
220
220
221 def add(self, category, entry, inreplyto=None):
221 def add(self, category, entry, inreplyto=None):
222 """add a new record of a given category.
222 """add a new record of a given category.
223
223
224 The entry can then be retrieved in the list returned by
224 The entry can then be retrieved in the list returned by
225 self['category']."""
225 self['category']."""
226 self._categories.setdefault(category, []).append(entry)
226 self._categories.setdefault(category, []).append(entry)
227 self._sequences.append((category, entry))
227 self._sequences.append((category, entry))
228 if inreplyto is not None:
228 if inreplyto is not None:
229 self.getreplies(inreplyto).add(category, entry)
229 self.getreplies(inreplyto).add(category, entry)
230
230
231 def getreplies(self, partid):
231 def getreplies(self, partid):
232 """get the records that are replies to a specific part"""
232 """get the records that are replies to a specific part"""
233 return self._replies.setdefault(partid, unbundlerecords())
233 return self._replies.setdefault(partid, unbundlerecords())
234
234
235 def __getitem__(self, cat):
235 def __getitem__(self, cat):
236 return tuple(self._categories.get(cat, ()))
236 return tuple(self._categories.get(cat, ()))
237
237
238 def __iter__(self):
238 def __iter__(self):
239 return iter(self._sequences)
239 return iter(self._sequences)
240
240
241 def __len__(self):
241 def __len__(self):
242 return len(self._sequences)
242 return len(self._sequences)
243
243
244 def __nonzero__(self):
244 def __nonzero__(self):
245 return bool(self._sequences)
245 return bool(self._sequences)
246
246
247 class bundleoperation(object):
247 class bundleoperation(object):
248 """an object that represents a single bundling process
248 """an object that represents a single bundling process
249
249
250 Its purpose is to carry unbundle-related objects and states.
250 Its purpose is to carry unbundle-related objects and states.
251
251
252 A new object should be created at the beginning of each bundle processing.
252 A new object should be created at the beginning of each bundle processing.
253 The object is to be returned by the processing function.
253 The object is to be returned by the processing function.
254
254
255 The object has very little content now it will ultimately contain:
255 The object has very little content now it will ultimately contain:
256 * an access to the repo the bundle is applied to,
256 * an access to the repo the bundle is applied to,
257 * a ui object,
257 * a ui object,
258 * a way to retrieve a transaction to add changes to the repo,
258 * a way to retrieve a transaction to add changes to the repo,
259 * a way to record the result of processing each part,
259 * a way to record the result of processing each part,
260 * a way to construct a bundle response when applicable.
260 * a way to construct a bundle response when applicable.
261 """
261 """
262
262
263 def __init__(self, repo, transactiongetter):
263 def __init__(self, repo, transactiongetter):
264 self.repo = repo
264 self.repo = repo
265 self.ui = repo.ui
265 self.ui = repo.ui
266 self.records = unbundlerecords()
266 self.records = unbundlerecords()
267 self.gettransaction = transactiongetter
267 self.gettransaction = transactiongetter
268 self.reply = None
268 self.reply = None
269
269
270 class TransactionUnavailable(RuntimeError):
270 class TransactionUnavailable(RuntimeError):
271 pass
271 pass
272
272
273 def _notransaction():
273 def _notransaction():
274 """default method to get a transaction while processing a bundle
274 """default method to get a transaction while processing a bundle
275
275
276 Raise an exception to highlight the fact that no transaction was expected
276 Raise an exception to highlight the fact that no transaction was expected
277 to be created"""
277 to be created"""
278 raise TransactionUnavailable()
278 raise TransactionUnavailable()
279
279
280 def processbundle(repo, unbundler, transactiongetter=None):
280 def processbundle(repo, unbundler, transactiongetter=None):
281 """This function process a bundle, apply effect to/from a repo
281 """This function process a bundle, apply effect to/from a repo
282
282
283 It iterates over each part then searches for and uses the proper handling
283 It iterates over each part then searches for and uses the proper handling
284 code to process the part. Parts are processed in order.
284 code to process the part. Parts are processed in order.
285
285
286 This is very early version of this function that will be strongly reworked
286 This is very early version of this function that will be strongly reworked
287 before final usage.
287 before final usage.
288
288
289 Unknown Mandatory part will abort the process.
289 Unknown Mandatory part will abort the process.
290 """
290 """
291 if transactiongetter is None:
291 if transactiongetter is None:
292 transactiongetter = _notransaction
292 transactiongetter = _notransaction
293 op = bundleoperation(repo, transactiongetter)
293 op = bundleoperation(repo, transactiongetter)
294 # todo:
294 # todo:
295 # - replace this is a init function soon.
295 # - replace this is a init function soon.
296 # - exception catching
296 # - exception catching
297 unbundler.params
297 unbundler.params
298 iterparts = unbundler.iterparts()
298 iterparts = unbundler.iterparts()
299 part = None
299 part = None
300 try:
300 try:
301 for part in iterparts:
301 for part in iterparts:
302 _processpart(op, part)
302 _processpart(op, part)
303 except Exception, exc:
303 except Exception, exc:
304 for part in iterparts:
304 for part in iterparts:
305 # consume the bundle content
305 # consume the bundle content
306 part.read()
306 part.read()
307 # Small hack to let caller code distinguish exceptions from bundle2
307 # Small hack to let caller code distinguish exceptions from bundle2
308 # processing from processing the old format. This is mostly
308 # processing from processing the old format. This is mostly
309 # needed to handle different return codes to unbundle according to the
309 # needed to handle different return codes to unbundle according to the
310 # type of bundle. We should probably clean up or drop this return code
310 # type of bundle. We should probably clean up or drop this return code
311 # craziness in a future version.
311 # craziness in a future version.
312 exc.duringunbundle2 = True
312 exc.duringunbundle2 = True
313 raise
313 raise
314 return op
314 return op
315
315
316 def _processpart(op, part):
316 def _processpart(op, part):
317 """process a single part from a bundle
317 """process a single part from a bundle
318
318
319 The part is guaranteed to have been fully consumed when the function exits
319 The part is guaranteed to have been fully consumed when the function exits
320 (even if an exception is raised)."""
320 (even if an exception is raised)."""
321 try:
321 try:
322 parttype = part.type
322 parttype = part.type
323 # part key are matched lower case
323 # part key are matched lower case
324 key = parttype.lower()
324 key = parttype.lower()
325 try:
325 try:
326 handler = parthandlermapping.get(key)
326 handler = parthandlermapping.get(key)
327 if handler is None:
327 if handler is None:
328 raise error.UnsupportedPartError(parttype=key)
328 raise error.UnsupportedPartError(parttype=key)
329 op.ui.debug('found a handler for part %r\n' % parttype)
329 op.ui.debug('found a handler for part %r\n' % parttype)
330 unknownparams = part.mandatorykeys - handler.params
330 unknownparams = part.mandatorykeys - handler.params
331 if unknownparams:
331 if unknownparams:
332 unknownparams = list(unknownparams)
332 unknownparams = list(unknownparams)
333 unknownparams.sort()
333 unknownparams.sort()
334 raise error.UnsupportedPartError(parttype=key,
334 raise error.UnsupportedPartError(parttype=key,
335 params=unknownparams)
335 params=unknownparams)
336 except error.UnsupportedPartError, exc:
336 except error.UnsupportedPartError, exc:
337 if key != parttype: # mandatory parts
337 if part.mandatory: # mandatory parts
338 raise
338 raise
339 op.ui.debug('ignoring unsupported advisory part %s\n' % exc)
339 op.ui.debug('ignoring unsupported advisory part %s\n' % exc)
340 return # skip to part processing
340 return # skip to part processing
341
341
342 # handler is called outside the above try block so that we don't
342 # handler is called outside the above try block so that we don't
343 # risk catching KeyErrors from anything other than the
343 # risk catching KeyErrors from anything other than the
344 # parthandlermapping lookup (any KeyError raised by handler()
344 # parthandlermapping lookup (any KeyError raised by handler()
345 # itself represents a defect of a different variety).
345 # itself represents a defect of a different variety).
346 output = None
346 output = None
347 if op.reply is not None:
347 if op.reply is not None:
348 op.ui.pushbuffer(error=True)
348 op.ui.pushbuffer(error=True)
349 output = ''
349 output = ''
350 try:
350 try:
351 handler(op, part)
351 handler(op, part)
352 finally:
352 finally:
353 if output is not None:
353 if output is not None:
354 output = op.ui.popbuffer()
354 output = op.ui.popbuffer()
355 if output:
355 if output:
356 outpart = op.reply.newpart('b2x:output', data=output)
356 outpart = op.reply.newpart('b2x:output', data=output)
357 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
357 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
358 finally:
358 finally:
359 # consume the part content to not corrupt the stream.
359 # consume the part content to not corrupt the stream.
360 part.read()
360 part.read()
361
361
362
362
363 def decodecaps(blob):
363 def decodecaps(blob):
364 """decode a bundle2 caps bytes blob into a dictionary
364 """decode a bundle2 caps bytes blob into a dictionary
365
365
366 The blob is a list of capabilities (one per line)
366 The blob is a list of capabilities (one per line)
367 Capabilities may have values using a line of the form::
367 Capabilities may have values using a line of the form::
368
368
369 capability=value1,value2,value3
369 capability=value1,value2,value3
370
370
371 The values are always a list."""
371 The values are always a list."""
372 caps = {}
372 caps = {}
373 for line in blob.splitlines():
373 for line in blob.splitlines():
374 if not line:
374 if not line:
375 continue
375 continue
376 if '=' not in line:
376 if '=' not in line:
377 key, vals = line, ()
377 key, vals = line, ()
378 else:
378 else:
379 key, vals = line.split('=', 1)
379 key, vals = line.split('=', 1)
380 vals = vals.split(',')
380 vals = vals.split(',')
381 key = urllib.unquote(key)
381 key = urllib.unquote(key)
382 vals = [urllib.unquote(v) for v in vals]
382 vals = [urllib.unquote(v) for v in vals]
383 caps[key] = vals
383 caps[key] = vals
384 return caps
384 return caps
385
385
386 def encodecaps(caps):
386 def encodecaps(caps):
387 """encode a bundle2 caps dictionary into a bytes blob"""
387 """encode a bundle2 caps dictionary into a bytes blob"""
388 chunks = []
388 chunks = []
389 for ca in sorted(caps):
389 for ca in sorted(caps):
390 vals = caps[ca]
390 vals = caps[ca]
391 ca = urllib.quote(ca)
391 ca = urllib.quote(ca)
392 vals = [urllib.quote(v) for v in vals]
392 vals = [urllib.quote(v) for v in vals]
393 if vals:
393 if vals:
394 ca = "%s=%s" % (ca, ','.join(vals))
394 ca = "%s=%s" % (ca, ','.join(vals))
395 chunks.append(ca)
395 chunks.append(ca)
396 return '\n'.join(chunks)
396 return '\n'.join(chunks)
397
397
398 class bundle20(object):
398 class bundle20(object):
399 """represent an outgoing bundle2 container
399 """represent an outgoing bundle2 container
400
400
401 Use the `addparam` method to add stream level parameter. and `newpart` to
401 Use the `addparam` method to add stream level parameter. and `newpart` to
402 populate it. Then call `getchunks` to retrieve all the binary chunks of
402 populate it. Then call `getchunks` to retrieve all the binary chunks of
403 data that compose the bundle2 container."""
403 data that compose the bundle2 container."""
404
404
405 def __init__(self, ui, capabilities=()):
405 def __init__(self, ui, capabilities=()):
406 self.ui = ui
406 self.ui = ui
407 self._params = []
407 self._params = []
408 self._parts = []
408 self._parts = []
409 self.capabilities = dict(capabilities)
409 self.capabilities = dict(capabilities)
410
410
411 @property
411 @property
412 def nbparts(self):
412 def nbparts(self):
413 """total number of parts added to the bundler"""
413 """total number of parts added to the bundler"""
414 return len(self._parts)
414 return len(self._parts)
415
415
416 # methods used to defines the bundle2 content
416 # methods used to defines the bundle2 content
417 def addparam(self, name, value=None):
417 def addparam(self, name, value=None):
418 """add a stream level parameter"""
418 """add a stream level parameter"""
419 if not name:
419 if not name:
420 raise ValueError('empty parameter name')
420 raise ValueError('empty parameter name')
421 if name[0] not in string.letters:
421 if name[0] not in string.letters:
422 raise ValueError('non letter first character: %r' % name)
422 raise ValueError('non letter first character: %r' % name)
423 self._params.append((name, value))
423 self._params.append((name, value))
424
424
425 def addpart(self, part):
425 def addpart(self, part):
426 """add a new part to the bundle2 container
426 """add a new part to the bundle2 container
427
427
428 Parts contains the actual applicative payload."""
428 Parts contains the actual applicative payload."""
429 assert part.id is None
429 assert part.id is None
430 part.id = len(self._parts) # very cheap counter
430 part.id = len(self._parts) # very cheap counter
431 self._parts.append(part)
431 self._parts.append(part)
432
432
433 def newpart(self, typeid, *args, **kwargs):
433 def newpart(self, typeid, *args, **kwargs):
434 """create a new part and add it to the containers
434 """create a new part and add it to the containers
435
435
436 As the part is directly added to the containers. For now, this means
436 As the part is directly added to the containers. For now, this means
437 that any failure to properly initialize the part after calling
437 that any failure to properly initialize the part after calling
438 ``newpart`` should result in a failure of the whole bundling process.
438 ``newpart`` should result in a failure of the whole bundling process.
439
439
440 You can still fall back to manually create and add if you need better
440 You can still fall back to manually create and add if you need better
441 control."""
441 control."""
442 part = bundlepart(typeid, *args, **kwargs)
442 part = bundlepart(typeid, *args, **kwargs)
443 self.addpart(part)
443 self.addpart(part)
444 return part
444 return part
445
445
446 # methods used to generate the bundle2 stream
446 # methods used to generate the bundle2 stream
447 def getchunks(self):
447 def getchunks(self):
448 self.ui.debug('start emission of %s stream\n' % _magicstring)
448 self.ui.debug('start emission of %s stream\n' % _magicstring)
449 yield _magicstring
449 yield _magicstring
450 param = self._paramchunk()
450 param = self._paramchunk()
451 self.ui.debug('bundle parameter: %s\n' % param)
451 self.ui.debug('bundle parameter: %s\n' % param)
452 yield _pack(_fstreamparamsize, len(param))
452 yield _pack(_fstreamparamsize, len(param))
453 if param:
453 if param:
454 yield param
454 yield param
455
455
456 self.ui.debug('start of parts\n')
456 self.ui.debug('start of parts\n')
457 for part in self._parts:
457 for part in self._parts:
458 self.ui.debug('bundle part: "%s"\n' % part.type)
458 self.ui.debug('bundle part: "%s"\n' % part.type)
459 for chunk in part.getchunks():
459 for chunk in part.getchunks():
460 yield chunk
460 yield chunk
461 self.ui.debug('end of bundle\n')
461 self.ui.debug('end of bundle\n')
462 yield _pack(_fpartheadersize, 0)
462 yield _pack(_fpartheadersize, 0)
463
463
464 def _paramchunk(self):
464 def _paramchunk(self):
465 """return a encoded version of all stream parameters"""
465 """return a encoded version of all stream parameters"""
466 blocks = []
466 blocks = []
467 for par, value in self._params:
467 for par, value in self._params:
468 par = urllib.quote(par)
468 par = urllib.quote(par)
469 if value is not None:
469 if value is not None:
470 value = urllib.quote(value)
470 value = urllib.quote(value)
471 par = '%s=%s' % (par, value)
471 par = '%s=%s' % (par, value)
472 blocks.append(par)
472 blocks.append(par)
473 return ' '.join(blocks)
473 return ' '.join(blocks)
474
474
475 class unpackermixin(object):
475 class unpackermixin(object):
476 """A mixin to extract bytes and struct data from a stream"""
476 """A mixin to extract bytes and struct data from a stream"""
477
477
478 def __init__(self, fp):
478 def __init__(self, fp):
479 self._fp = fp
479 self._fp = fp
480
480
481 def _unpack(self, format):
481 def _unpack(self, format):
482 """unpack this struct format from the stream"""
482 """unpack this struct format from the stream"""
483 data = self._readexact(struct.calcsize(format))
483 data = self._readexact(struct.calcsize(format))
484 return _unpack(format, data)
484 return _unpack(format, data)
485
485
486 def _readexact(self, size):
486 def _readexact(self, size):
487 """read exactly <size> bytes from the stream"""
487 """read exactly <size> bytes from the stream"""
488 return changegroup.readexactly(self._fp, size)
488 return changegroup.readexactly(self._fp, size)
489
489
490
490
491 class unbundle20(unpackermixin):
491 class unbundle20(unpackermixin):
492 """interpret a bundle2 stream
492 """interpret a bundle2 stream
493
493
494 This class is fed with a binary stream and yields parts through its
494 This class is fed with a binary stream and yields parts through its
495 `iterparts` methods."""
495 `iterparts` methods."""
496
496
497 def __init__(self, ui, fp, header=None):
497 def __init__(self, ui, fp, header=None):
498 """If header is specified, we do not read it out of the stream."""
498 """If header is specified, we do not read it out of the stream."""
499 self.ui = ui
499 self.ui = ui
500 super(unbundle20, self).__init__(fp)
500 super(unbundle20, self).__init__(fp)
501 if header is None:
501 if header is None:
502 header = self._readexact(4)
502 header = self._readexact(4)
503 magic, version = header[0:2], header[2:4]
503 magic, version = header[0:2], header[2:4]
504 if magic != 'HG':
504 if magic != 'HG':
505 raise util.Abort(_('not a Mercurial bundle'))
505 raise util.Abort(_('not a Mercurial bundle'))
506 if version != '2Y':
506 if version != '2Y':
507 raise util.Abort(_('unknown bundle version %s') % version)
507 raise util.Abort(_('unknown bundle version %s') % version)
508 self.ui.debug('start processing of %s stream\n' % header)
508 self.ui.debug('start processing of %s stream\n' % header)
509
509
510 @util.propertycache
510 @util.propertycache
511 def params(self):
511 def params(self):
512 """dictionary of stream level parameters"""
512 """dictionary of stream level parameters"""
513 self.ui.debug('reading bundle2 stream parameters\n')
513 self.ui.debug('reading bundle2 stream parameters\n')
514 params = {}
514 params = {}
515 paramssize = self._unpack(_fstreamparamsize)[0]
515 paramssize = self._unpack(_fstreamparamsize)[0]
516 if paramssize < 0:
516 if paramssize < 0:
517 raise error.BundleValueError('negative bundle param size: %i'
517 raise error.BundleValueError('negative bundle param size: %i'
518 % paramssize)
518 % paramssize)
519 if paramssize:
519 if paramssize:
520 for p in self._readexact(paramssize).split(' '):
520 for p in self._readexact(paramssize).split(' '):
521 p = p.split('=', 1)
521 p = p.split('=', 1)
522 p = [urllib.unquote(i) for i in p]
522 p = [urllib.unquote(i) for i in p]
523 if len(p) < 2:
523 if len(p) < 2:
524 p.append(None)
524 p.append(None)
525 self._processparam(*p)
525 self._processparam(*p)
526 params[p[0]] = p[1]
526 params[p[0]] = p[1]
527 return params
527 return params
528
528
529 def _processparam(self, name, value):
529 def _processparam(self, name, value):
530 """process a parameter, applying its effect if needed
530 """process a parameter, applying its effect if needed
531
531
532 Parameter starting with a lower case letter are advisory and will be
532 Parameter starting with a lower case letter are advisory and will be
533 ignored when unknown. Those starting with an upper case letter are
533 ignored when unknown. Those starting with an upper case letter are
534 mandatory and will this function will raise a KeyError when unknown.
534 mandatory and will this function will raise a KeyError when unknown.
535
535
536 Note: no option are currently supported. Any input will be either
536 Note: no option are currently supported. Any input will be either
537 ignored or failing.
537 ignored or failing.
538 """
538 """
539 if not name:
539 if not name:
540 raise ValueError('empty parameter name')
540 raise ValueError('empty parameter name')
541 if name[0] not in string.letters:
541 if name[0] not in string.letters:
542 raise ValueError('non letter first character: %r' % name)
542 raise ValueError('non letter first character: %r' % name)
543 # Some logic will be later added here to try to process the option for
543 # Some logic will be later added here to try to process the option for
544 # a dict of known parameter.
544 # a dict of known parameter.
545 if name[0].islower():
545 if name[0].islower():
546 self.ui.debug("ignoring unknown parameter %r\n" % name)
546 self.ui.debug("ignoring unknown parameter %r\n" % name)
547 else:
547 else:
548 raise error.UnsupportedPartError(params=(name,))
548 raise error.UnsupportedPartError(params=(name,))
549
549
550
550
551 def iterparts(self):
551 def iterparts(self):
552 """yield all parts contained in the stream"""
552 """yield all parts contained in the stream"""
553 # make sure param have been loaded
553 # make sure param have been loaded
554 self.params
554 self.params
555 self.ui.debug('start extraction of bundle2 parts\n')
555 self.ui.debug('start extraction of bundle2 parts\n')
556 headerblock = self._readpartheader()
556 headerblock = self._readpartheader()
557 while headerblock is not None:
557 while headerblock is not None:
558 part = unbundlepart(self.ui, headerblock, self._fp)
558 part = unbundlepart(self.ui, headerblock, self._fp)
559 yield part
559 yield part
560 headerblock = self._readpartheader()
560 headerblock = self._readpartheader()
561 self.ui.debug('end of bundle2 stream\n')
561 self.ui.debug('end of bundle2 stream\n')
562
562
563 def _readpartheader(self):
563 def _readpartheader(self):
564 """reads a part header size and return the bytes blob
564 """reads a part header size and return the bytes blob
565
565
566 returns None if empty"""
566 returns None if empty"""
567 headersize = self._unpack(_fpartheadersize)[0]
567 headersize = self._unpack(_fpartheadersize)[0]
568 if headersize < 0:
568 if headersize < 0:
569 raise error.BundleValueError('negative part header size: %i'
569 raise error.BundleValueError('negative part header size: %i'
570 % headersize)
570 % headersize)
571 self.ui.debug('part header size: %i\n' % headersize)
571 self.ui.debug('part header size: %i\n' % headersize)
572 if headersize:
572 if headersize:
573 return self._readexact(headersize)
573 return self._readexact(headersize)
574 return None
574 return None
575
575
576
576
577 class bundlepart(object):
577 class bundlepart(object):
578 """A bundle2 part contains application level payload
578 """A bundle2 part contains application level payload
579
579
580 The part `type` is used to route the part to the application level
580 The part `type` is used to route the part to the application level
581 handler.
581 handler.
582
582
583 The part payload is contained in ``part.data``. It could be raw bytes or a
583 The part payload is contained in ``part.data``. It could be raw bytes or a
584 generator of byte chunks.
584 generator of byte chunks.
585
585
586 You can add parameters to the part using the ``addparam`` method.
586 You can add parameters to the part using the ``addparam`` method.
587 Parameters can be either mandatory (default) or advisory. Remote side
587 Parameters can be either mandatory (default) or advisory. Remote side
588 should be able to safely ignore the advisory ones.
588 should be able to safely ignore the advisory ones.
589
589
590 Both data and parameters cannot be modified after the generation has begun.
590 Both data and parameters cannot be modified after the generation has begun.
591 """
591 """
592
592
593 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
593 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
594 data=''):
594 data=''):
595 self.id = None
595 self.id = None
596 self.type = parttype
596 self.type = parttype
597 self._data = data
597 self._data = data
598 self._mandatoryparams = list(mandatoryparams)
598 self._mandatoryparams = list(mandatoryparams)
599 self._advisoryparams = list(advisoryparams)
599 self._advisoryparams = list(advisoryparams)
600 # checking for duplicated entries
600 # checking for duplicated entries
601 self._seenparams = set()
601 self._seenparams = set()
602 for pname, __ in self._mandatoryparams + self._advisoryparams:
602 for pname, __ in self._mandatoryparams + self._advisoryparams:
603 if pname in self._seenparams:
603 if pname in self._seenparams:
604 raise RuntimeError('duplicated params: %s' % pname)
604 raise RuntimeError('duplicated params: %s' % pname)
605 self._seenparams.add(pname)
605 self._seenparams.add(pname)
606 # status of the part's generation:
606 # status of the part's generation:
607 # - None: not started,
607 # - None: not started,
608 # - False: currently generated,
608 # - False: currently generated,
609 # - True: generation done.
609 # - True: generation done.
610 self._generated = None
610 self._generated = None
611
611
612 # methods used to defines the part content
612 # methods used to defines the part content
613 def __setdata(self, data):
613 def __setdata(self, data):
614 if self._generated is not None:
614 if self._generated is not None:
615 raise error.ReadOnlyPartError('part is being generated')
615 raise error.ReadOnlyPartError('part is being generated')
616 self._data = data
616 self._data = data
617 def __getdata(self):
617 def __getdata(self):
618 return self._data
618 return self._data
619 data = property(__getdata, __setdata)
619 data = property(__getdata, __setdata)
620
620
621 @property
621 @property
622 def mandatoryparams(self):
622 def mandatoryparams(self):
623 # make it an immutable tuple to force people through ``addparam``
623 # make it an immutable tuple to force people through ``addparam``
624 return tuple(self._mandatoryparams)
624 return tuple(self._mandatoryparams)
625
625
626 @property
626 @property
627 def advisoryparams(self):
627 def advisoryparams(self):
628 # make it an immutable tuple to force people through ``addparam``
628 # make it an immutable tuple to force people through ``addparam``
629 return tuple(self._advisoryparams)
629 return tuple(self._advisoryparams)
630
630
631 def addparam(self, name, value='', mandatory=True):
631 def addparam(self, name, value='', mandatory=True):
632 if self._generated is not None:
632 if self._generated is not None:
633 raise error.ReadOnlyPartError('part is being generated')
633 raise error.ReadOnlyPartError('part is being generated')
634 if name in self._seenparams:
634 if name in self._seenparams:
635 raise ValueError('duplicated params: %s' % name)
635 raise ValueError('duplicated params: %s' % name)
636 self._seenparams.add(name)
636 self._seenparams.add(name)
637 params = self._advisoryparams
637 params = self._advisoryparams
638 if mandatory:
638 if mandatory:
639 params = self._mandatoryparams
639 params = self._mandatoryparams
640 params.append((name, value))
640 params.append((name, value))
641
641
642 # methods used to generates the bundle2 stream
642 # methods used to generates the bundle2 stream
643 def getchunks(self):
643 def getchunks(self):
644 if self._generated is not None:
644 if self._generated is not None:
645 raise RuntimeError('part can only be consumed once')
645 raise RuntimeError('part can only be consumed once')
646 self._generated = False
646 self._generated = False
647 #### header
647 #### header
648 ## parttype
648 ## parttype
649 header = [_pack(_fparttypesize, len(self.type)),
649 header = [_pack(_fparttypesize, len(self.type)),
650 self.type, _pack(_fpartid, self.id),
650 self.type, _pack(_fpartid, self.id),
651 ]
651 ]
652 ## parameters
652 ## parameters
653 # count
653 # count
654 manpar = self.mandatoryparams
654 manpar = self.mandatoryparams
655 advpar = self.advisoryparams
655 advpar = self.advisoryparams
656 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
656 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
657 # size
657 # size
658 parsizes = []
658 parsizes = []
659 for key, value in manpar:
659 for key, value in manpar:
660 parsizes.append(len(key))
660 parsizes.append(len(key))
661 parsizes.append(len(value))
661 parsizes.append(len(value))
662 for key, value in advpar:
662 for key, value in advpar:
663 parsizes.append(len(key))
663 parsizes.append(len(key))
664 parsizes.append(len(value))
664 parsizes.append(len(value))
665 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
665 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
666 header.append(paramsizes)
666 header.append(paramsizes)
667 # key, value
667 # key, value
668 for key, value in manpar:
668 for key, value in manpar:
669 header.append(key)
669 header.append(key)
670 header.append(value)
670 header.append(value)
671 for key, value in advpar:
671 for key, value in advpar:
672 header.append(key)
672 header.append(key)
673 header.append(value)
673 header.append(value)
674 ## finalize header
674 ## finalize header
675 headerchunk = ''.join(header)
675 headerchunk = ''.join(header)
676 yield _pack(_fpartheadersize, len(headerchunk))
676 yield _pack(_fpartheadersize, len(headerchunk))
677 yield headerchunk
677 yield headerchunk
678 ## payload
678 ## payload
679 try:
679 try:
680 for chunk in self._payloadchunks():
680 for chunk in self._payloadchunks():
681 yield _pack(_fpayloadsize, len(chunk))
681 yield _pack(_fpayloadsize, len(chunk))
682 yield chunk
682 yield chunk
683 except Exception, exc:
683 except Exception, exc:
684 # backup exception data for later
684 # backup exception data for later
685 exc_info = sys.exc_info()
685 exc_info = sys.exc_info()
686 msg = 'unexpected error: %s' % exc
686 msg = 'unexpected error: %s' % exc
687 interpart = bundlepart('b2x:error:abort', [('message', msg)])
687 interpart = bundlepart('b2x:error:abort', [('message', msg)])
688 interpart.id = 0
688 interpart.id = 0
689 yield _pack(_fpayloadsize, -1)
689 yield _pack(_fpayloadsize, -1)
690 for chunk in interpart.getchunks():
690 for chunk in interpart.getchunks():
691 yield chunk
691 yield chunk
692 # abort current part payload
692 # abort current part payload
693 yield _pack(_fpayloadsize, 0)
693 yield _pack(_fpayloadsize, 0)
694 raise exc_info[0], exc_info[1], exc_info[2]
694 raise exc_info[0], exc_info[1], exc_info[2]
695 # end of payload
695 # end of payload
696 yield _pack(_fpayloadsize, 0)
696 yield _pack(_fpayloadsize, 0)
697 self._generated = True
697 self._generated = True
698
698
699 def _payloadchunks(self):
699 def _payloadchunks(self):
700 """yield chunks of a the part payload
700 """yield chunks of a the part payload
701
701
702 Exists to handle the different methods to provide data to a part."""
702 Exists to handle the different methods to provide data to a part."""
703 # we only support fixed size data now.
703 # we only support fixed size data now.
704 # This will be improved in the future.
704 # This will be improved in the future.
705 if util.safehasattr(self.data, 'next'):
705 if util.safehasattr(self.data, 'next'):
706 buff = util.chunkbuffer(self.data)
706 buff = util.chunkbuffer(self.data)
707 chunk = buff.read(preferedchunksize)
707 chunk = buff.read(preferedchunksize)
708 while chunk:
708 while chunk:
709 yield chunk
709 yield chunk
710 chunk = buff.read(preferedchunksize)
710 chunk = buff.read(preferedchunksize)
711 elif len(self.data):
711 elif len(self.data):
712 yield self.data
712 yield self.data
713
713
714
714
715 flaginterrupt = -1
715 flaginterrupt = -1
716
716
717 class interrupthandler(unpackermixin):
717 class interrupthandler(unpackermixin):
718 """read one part and process it with restricted capability
718 """read one part and process it with restricted capability
719
719
720 This allows to transmit exception raised on the producer size during part
720 This allows to transmit exception raised on the producer size during part
721 iteration while the consumer is reading a part.
721 iteration while the consumer is reading a part.
722
722
723 Part processed in this manner only have access to a ui object,"""
723 Part processed in this manner only have access to a ui object,"""
724
724
725 def __init__(self, ui, fp):
725 def __init__(self, ui, fp):
726 super(interrupthandler, self).__init__(fp)
726 super(interrupthandler, self).__init__(fp)
727 self.ui = ui
727 self.ui = ui
728
728
729 def _readpartheader(self):
729 def _readpartheader(self):
730 """reads a part header size and return the bytes blob
730 """reads a part header size and return the bytes blob
731
731
732 returns None if empty"""
732 returns None if empty"""
733 headersize = self._unpack(_fpartheadersize)[0]
733 headersize = self._unpack(_fpartheadersize)[0]
734 if headersize < 0:
734 if headersize < 0:
735 raise error.BundleValueError('negative part header size: %i'
735 raise error.BundleValueError('negative part header size: %i'
736 % headersize)
736 % headersize)
737 self.ui.debug('part header size: %i\n' % headersize)
737 self.ui.debug('part header size: %i\n' % headersize)
738 if headersize:
738 if headersize:
739 return self._readexact(headersize)
739 return self._readexact(headersize)
740 return None
740 return None
741
741
742 def __call__(self):
742 def __call__(self):
743 self.ui.debug('bundle2 stream interruption, looking for a part.\n')
743 self.ui.debug('bundle2 stream interruption, looking for a part.\n')
744 headerblock = self._readpartheader()
744 headerblock = self._readpartheader()
745 if headerblock is None:
745 if headerblock is None:
746 self.ui.debug('no part found during interruption.\n')
746 self.ui.debug('no part found during interruption.\n')
747 return
747 return
748 part = unbundlepart(self.ui, headerblock, self._fp)
748 part = unbundlepart(self.ui, headerblock, self._fp)
749 op = interruptoperation(self.ui)
749 op = interruptoperation(self.ui)
750 _processpart(op, part)
750 _processpart(op, part)
751
751
752 class interruptoperation(object):
752 class interruptoperation(object):
753 """A limited operation to be use by part handler during interruption
753 """A limited operation to be use by part handler during interruption
754
754
755 It only have access to an ui object.
755 It only have access to an ui object.
756 """
756 """
757
757
758 def __init__(self, ui):
758 def __init__(self, ui):
759 self.ui = ui
759 self.ui = ui
760 self.reply = None
760 self.reply = None
761
761
762 @property
762 @property
763 def repo(self):
763 def repo(self):
764 raise RuntimeError('no repo access from stream interruption')
764 raise RuntimeError('no repo access from stream interruption')
765
765
766 def gettransaction(self):
766 def gettransaction(self):
767 raise TransactionUnavailable('no repo access from stream interruption')
767 raise TransactionUnavailable('no repo access from stream interruption')
768
768
769 class unbundlepart(unpackermixin):
769 class unbundlepart(unpackermixin):
770 """a bundle part read from a bundle"""
770 """a bundle part read from a bundle"""
771
771
772 def __init__(self, ui, header, fp):
772 def __init__(self, ui, header, fp):
773 super(unbundlepart, self).__init__(fp)
773 super(unbundlepart, self).__init__(fp)
774 self.ui = ui
774 self.ui = ui
775 # unbundle state attr
775 # unbundle state attr
776 self._headerdata = header
776 self._headerdata = header
777 self._headeroffset = 0
777 self._headeroffset = 0
778 self._initialized = False
778 self._initialized = False
779 self.consumed = False
779 self.consumed = False
780 # part data
780 # part data
781 self.id = None
781 self.id = None
782 self.type = None
782 self.type = None
783 self.mandatoryparams = None
783 self.mandatoryparams = None
784 self.advisoryparams = None
784 self.advisoryparams = None
785 self.params = None
785 self.params = None
786 self.mandatorykeys = ()
786 self.mandatorykeys = ()
787 self._payloadstream = None
787 self._payloadstream = None
788 self._readheader()
788 self._readheader()
789 self._mandatory = None
789
790
790 def _fromheader(self, size):
791 def _fromheader(self, size):
791 """return the next <size> byte from the header"""
792 """return the next <size> byte from the header"""
792 offset = self._headeroffset
793 offset = self._headeroffset
793 data = self._headerdata[offset:(offset + size)]
794 data = self._headerdata[offset:(offset + size)]
794 self._headeroffset = offset + size
795 self._headeroffset = offset + size
795 return data
796 return data
796
797
797 def _unpackheader(self, format):
798 def _unpackheader(self, format):
798 """read given format from header
799 """read given format from header
799
800
800 This automatically compute the size of the format to read."""
801 This automatically compute the size of the format to read."""
801 data = self._fromheader(struct.calcsize(format))
802 data = self._fromheader(struct.calcsize(format))
802 return _unpack(format, data)
803 return _unpack(format, data)
803
804
804 def _initparams(self, mandatoryparams, advisoryparams):
805 def _initparams(self, mandatoryparams, advisoryparams):
805 """internal function to setup all logic related parameters"""
806 """internal function to setup all logic related parameters"""
806 # make it read only to prevent people touching it by mistake.
807 # make it read only to prevent people touching it by mistake.
807 self.mandatoryparams = tuple(mandatoryparams)
808 self.mandatoryparams = tuple(mandatoryparams)
808 self.advisoryparams = tuple(advisoryparams)
809 self.advisoryparams = tuple(advisoryparams)
809 # user friendly UI
810 # user friendly UI
810 self.params = dict(self.mandatoryparams)
811 self.params = dict(self.mandatoryparams)
811 self.params.update(dict(self.advisoryparams))
812 self.params.update(dict(self.advisoryparams))
812 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
813 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
813
814
814 def _readheader(self):
815 def _readheader(self):
815 """read the header and setup the object"""
816 """read the header and setup the object"""
816 typesize = self._unpackheader(_fparttypesize)[0]
817 typesize = self._unpackheader(_fparttypesize)[0]
817 self.type = self._fromheader(typesize)
818 self.type = self._fromheader(typesize)
818 self.ui.debug('part type: "%s"\n' % self.type)
819 self.ui.debug('part type: "%s"\n' % self.type)
819 self.id = self._unpackheader(_fpartid)[0]
820 self.id = self._unpackheader(_fpartid)[0]
820 self.ui.debug('part id: "%s"\n' % self.id)
821 self.ui.debug('part id: "%s"\n' % self.id)
822 # extract mandatory bit from type
823 self.mandatory = (self.type != self.type.lower())
824 self.type = self.type.lower()
821 ## reading parameters
825 ## reading parameters
822 # param count
826 # param count
823 mancount, advcount = self._unpackheader(_fpartparamcount)
827 mancount, advcount = self._unpackheader(_fpartparamcount)
824 self.ui.debug('part parameters: %i\n' % (mancount + advcount))
828 self.ui.debug('part parameters: %i\n' % (mancount + advcount))
825 # param size
829 # param size
826 fparamsizes = _makefpartparamsizes(mancount + advcount)
830 fparamsizes = _makefpartparamsizes(mancount + advcount)
827 paramsizes = self._unpackheader(fparamsizes)
831 paramsizes = self._unpackheader(fparamsizes)
828 # make it a list of couple again
832 # make it a list of couple again
829 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
833 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
830 # split mandatory from advisory
834 # split mandatory from advisory
831 mansizes = paramsizes[:mancount]
835 mansizes = paramsizes[:mancount]
832 advsizes = paramsizes[mancount:]
836 advsizes = paramsizes[mancount:]
833 # retrieve param value
837 # retrieve param value
834 manparams = []
838 manparams = []
835 for key, value in mansizes:
839 for key, value in mansizes:
836 manparams.append((self._fromheader(key), self._fromheader(value)))
840 manparams.append((self._fromheader(key), self._fromheader(value)))
837 advparams = []
841 advparams = []
838 for key, value in advsizes:
842 for key, value in advsizes:
839 advparams.append((self._fromheader(key), self._fromheader(value)))
843 advparams.append((self._fromheader(key), self._fromheader(value)))
840 self._initparams(manparams, advparams)
844 self._initparams(manparams, advparams)
841 ## part payload
845 ## part payload
842 def payloadchunks():
846 def payloadchunks():
843 payloadsize = self._unpack(_fpayloadsize)[0]
847 payloadsize = self._unpack(_fpayloadsize)[0]
844 self.ui.debug('payload chunk size: %i\n' % payloadsize)
848 self.ui.debug('payload chunk size: %i\n' % payloadsize)
845 while payloadsize:
849 while payloadsize:
846 if payloadsize == flaginterrupt:
850 if payloadsize == flaginterrupt:
847 # interruption detection, the handler will now read a
851 # interruption detection, the handler will now read a
848 # single part and process it.
852 # single part and process it.
849 interrupthandler(self.ui, self._fp)()
853 interrupthandler(self.ui, self._fp)()
850 elif payloadsize < 0:
854 elif payloadsize < 0:
851 msg = 'negative payload chunk size: %i' % payloadsize
855 msg = 'negative payload chunk size: %i' % payloadsize
852 raise error.BundleValueError(msg)
856 raise error.BundleValueError(msg)
853 else:
857 else:
854 yield self._readexact(payloadsize)
858 yield self._readexact(payloadsize)
855 payloadsize = self._unpack(_fpayloadsize)[0]
859 payloadsize = self._unpack(_fpayloadsize)[0]
856 self.ui.debug('payload chunk size: %i\n' % payloadsize)
860 self.ui.debug('payload chunk size: %i\n' % payloadsize)
857 self._payloadstream = util.chunkbuffer(payloadchunks())
861 self._payloadstream = util.chunkbuffer(payloadchunks())
858 # we read the data, tell it
862 # we read the data, tell it
859 self._initialized = True
863 self._initialized = True
860
864
861 def read(self, size=None):
865 def read(self, size=None):
862 """read payload data"""
866 """read payload data"""
863 if not self._initialized:
867 if not self._initialized:
864 self._readheader()
868 self._readheader()
865 if size is None:
869 if size is None:
866 data = self._payloadstream.read()
870 data = self._payloadstream.read()
867 else:
871 else:
868 data = self._payloadstream.read(size)
872 data = self._payloadstream.read(size)
869 if size is None or len(data) < size:
873 if size is None or len(data) < size:
870 self.consumed = True
874 self.consumed = True
871 return data
875 return data
872
876
873 capabilities = {'HG2Y': (),
877 capabilities = {'HG2Y': (),
874 'b2x:listkeys': (),
878 'b2x:listkeys': (),
875 'b2x:pushkey': (),
879 'b2x:pushkey': (),
876 'digests': tuple(sorted(util.DIGESTS.keys())),
880 'digests': tuple(sorted(util.DIGESTS.keys())),
877 'b2x:remote-changegroup': ('http', 'https'),
881 'b2x:remote-changegroup': ('http', 'https'),
878 }
882 }
879
883
880 def getrepocaps(repo, allowpushback=False):
884 def getrepocaps(repo, allowpushback=False):
881 """return the bundle2 capabilities for a given repo
885 """return the bundle2 capabilities for a given repo
882
886
883 Exists to allow extensions (like evolution) to mutate the capabilities.
887 Exists to allow extensions (like evolution) to mutate the capabilities.
884 """
888 """
885 caps = capabilities.copy()
889 caps = capabilities.copy()
886 caps['b2x:changegroup'] = tuple(sorted(changegroup.packermap.keys()))
890 caps['b2x:changegroup'] = tuple(sorted(changegroup.packermap.keys()))
887 if obsolete.isenabled(repo, obsolete.exchangeopt):
891 if obsolete.isenabled(repo, obsolete.exchangeopt):
888 supportedformat = tuple('V%i' % v for v in obsolete.formats)
892 supportedformat = tuple('V%i' % v for v in obsolete.formats)
889 caps['b2x:obsmarkers'] = supportedformat
893 caps['b2x:obsmarkers'] = supportedformat
890 if allowpushback:
894 if allowpushback:
891 caps['b2x:pushback'] = ()
895 caps['b2x:pushback'] = ()
892 return caps
896 return caps
893
897
894 def bundle2caps(remote):
898 def bundle2caps(remote):
895 """return the bundle capabilities of a peer as dict"""
899 """return the bundle capabilities of a peer as dict"""
896 raw = remote.capable('bundle2-exp')
900 raw = remote.capable('bundle2-exp')
897 if not raw and raw != '':
901 if not raw and raw != '':
898 return {}
902 return {}
899 capsblob = urllib.unquote(remote.capable('bundle2-exp'))
903 capsblob = urllib.unquote(remote.capable('bundle2-exp'))
900 return decodecaps(capsblob)
904 return decodecaps(capsblob)
901
905
902 def obsmarkersversion(caps):
906 def obsmarkersversion(caps):
903 """extract the list of supported obsmarkers versions from a bundle2caps dict
907 """extract the list of supported obsmarkers versions from a bundle2caps dict
904 """
908 """
905 obscaps = caps.get('b2x:obsmarkers', ())
909 obscaps = caps.get('b2x:obsmarkers', ())
906 return [int(c[1:]) for c in obscaps if c.startswith('V')]
910 return [int(c[1:]) for c in obscaps if c.startswith('V')]
907
911
908 @parthandler('b2x:changegroup', ('version',))
912 @parthandler('b2x:changegroup', ('version',))
909 def handlechangegroup(op, inpart):
913 def handlechangegroup(op, inpart):
910 """apply a changegroup part on the repo
914 """apply a changegroup part on the repo
911
915
912 This is a very early implementation that will massive rework before being
916 This is a very early implementation that will massive rework before being
913 inflicted to any end-user.
917 inflicted to any end-user.
914 """
918 """
915 # Make sure we trigger a transaction creation
919 # Make sure we trigger a transaction creation
916 #
920 #
917 # The addchangegroup function will get a transaction object by itself, but
921 # The addchangegroup function will get a transaction object by itself, but
918 # we need to make sure we trigger the creation of a transaction object used
922 # we need to make sure we trigger the creation of a transaction object used
919 # for the whole processing scope.
923 # for the whole processing scope.
920 op.gettransaction()
924 op.gettransaction()
921 unpackerversion = inpart.params.get('version', '01')
925 unpackerversion = inpart.params.get('version', '01')
922 # We should raise an appropriate exception here
926 # We should raise an appropriate exception here
923 unpacker = changegroup.packermap[unpackerversion][1]
927 unpacker = changegroup.packermap[unpackerversion][1]
924 cg = unpacker(inpart, 'UN')
928 cg = unpacker(inpart, 'UN')
925 # the source and url passed here are overwritten by the one contained in
929 # the source and url passed here are overwritten by the one contained in
926 # the transaction.hookargs argument. So 'bundle2' is a placeholder
930 # the transaction.hookargs argument. So 'bundle2' is a placeholder
927 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
931 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
928 op.records.add('changegroup', {'return': ret})
932 op.records.add('changegroup', {'return': ret})
929 if op.reply is not None:
933 if op.reply is not None:
930 # This is definitely not the final form of this
934 # This is definitely not the final form of this
931 # return. But one need to start somewhere.
935 # return. But one need to start somewhere.
932 part = op.reply.newpart('b2x:reply:changegroup')
936 part = op.reply.newpart('b2x:reply:changegroup')
933 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
937 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
934 part.addparam('return', '%i' % ret, mandatory=False)
938 part.addparam('return', '%i' % ret, mandatory=False)
935 assert not inpart.read()
939 assert not inpart.read()
936
940
937 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
941 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
938 ['digest:%s' % k for k in util.DIGESTS.keys()])
942 ['digest:%s' % k for k in util.DIGESTS.keys()])
939 @parthandler('b2x:remote-changegroup', _remotechangegroupparams)
943 @parthandler('b2x:remote-changegroup', _remotechangegroupparams)
940 def handleremotechangegroup(op, inpart):
944 def handleremotechangegroup(op, inpart):
941 """apply a bundle10 on the repo, given an url and validation information
945 """apply a bundle10 on the repo, given an url and validation information
942
946
943 All the information about the remote bundle to import are given as
947 All the information about the remote bundle to import are given as
944 parameters. The parameters include:
948 parameters. The parameters include:
945 - url: the url to the bundle10.
949 - url: the url to the bundle10.
946 - size: the bundle10 file size. It is used to validate what was
950 - size: the bundle10 file size. It is used to validate what was
947 retrieved by the client matches the server knowledge about the bundle.
951 retrieved by the client matches the server knowledge about the bundle.
948 - digests: a space separated list of the digest types provided as
952 - digests: a space separated list of the digest types provided as
949 parameters.
953 parameters.
950 - digest:<digest-type>: the hexadecimal representation of the digest with
954 - digest:<digest-type>: the hexadecimal representation of the digest with
951 that name. Like the size, it is used to validate what was retrieved by
955 that name. Like the size, it is used to validate what was retrieved by
952 the client matches what the server knows about the bundle.
956 the client matches what the server knows about the bundle.
953
957
954 When multiple digest types are given, all of them are checked.
958 When multiple digest types are given, all of them are checked.
955 """
959 """
956 try:
960 try:
957 raw_url = inpart.params['url']
961 raw_url = inpart.params['url']
958 except KeyError:
962 except KeyError:
959 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'url')
963 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'url')
960 parsed_url = util.url(raw_url)
964 parsed_url = util.url(raw_url)
961 if parsed_url.scheme not in capabilities['b2x:remote-changegroup']:
965 if parsed_url.scheme not in capabilities['b2x:remote-changegroup']:
962 raise util.Abort(_('remote-changegroup does not support %s urls') %
966 raise util.Abort(_('remote-changegroup does not support %s urls') %
963 parsed_url.scheme)
967 parsed_url.scheme)
964
968
965 try:
969 try:
966 size = int(inpart.params['size'])
970 size = int(inpart.params['size'])
967 except ValueError:
971 except ValueError:
968 raise util.Abort(_('remote-changegroup: invalid value for param "%s"')
972 raise util.Abort(_('remote-changegroup: invalid value for param "%s"')
969 % 'size')
973 % 'size')
970 except KeyError:
974 except KeyError:
971 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'size')
975 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'size')
972
976
973 digests = {}
977 digests = {}
974 for typ in inpart.params.get('digests', '').split():
978 for typ in inpart.params.get('digests', '').split():
975 param = 'digest:%s' % typ
979 param = 'digest:%s' % typ
976 try:
980 try:
977 value = inpart.params[param]
981 value = inpart.params[param]
978 except KeyError:
982 except KeyError:
979 raise util.Abort(_('remote-changegroup: missing "%s" param') %
983 raise util.Abort(_('remote-changegroup: missing "%s" param') %
980 param)
984 param)
981 digests[typ] = value
985 digests[typ] = value
982
986
983 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
987 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
984
988
985 # Make sure we trigger a transaction creation
989 # Make sure we trigger a transaction creation
986 #
990 #
987 # The addchangegroup function will get a transaction object by itself, but
991 # The addchangegroup function will get a transaction object by itself, but
988 # we need to make sure we trigger the creation of a transaction object used
992 # we need to make sure we trigger the creation of a transaction object used
989 # for the whole processing scope.
993 # for the whole processing scope.
990 op.gettransaction()
994 op.gettransaction()
991 import exchange
995 import exchange
992 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
996 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
993 if not isinstance(cg, changegroup.cg1unpacker):
997 if not isinstance(cg, changegroup.cg1unpacker):
994 raise util.Abort(_('%s: not a bundle version 1.0') %
998 raise util.Abort(_('%s: not a bundle version 1.0') %
995 util.hidepassword(raw_url))
999 util.hidepassword(raw_url))
996 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
1000 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
997 op.records.add('changegroup', {'return': ret})
1001 op.records.add('changegroup', {'return': ret})
998 if op.reply is not None:
1002 if op.reply is not None:
999 # This is definitely not the final form of this
1003 # This is definitely not the final form of this
1000 # return. But one need to start somewhere.
1004 # return. But one need to start somewhere.
1001 part = op.reply.newpart('b2x:reply:changegroup')
1005 part = op.reply.newpart('b2x:reply:changegroup')
1002 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1006 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1003 part.addparam('return', '%i' % ret, mandatory=False)
1007 part.addparam('return', '%i' % ret, mandatory=False)
1004 try:
1008 try:
1005 real_part.validate()
1009 real_part.validate()
1006 except util.Abort, e:
1010 except util.Abort, e:
1007 raise util.Abort(_('bundle at %s is corrupted:\n%s') %
1011 raise util.Abort(_('bundle at %s is corrupted:\n%s') %
1008 (util.hidepassword(raw_url), str(e)))
1012 (util.hidepassword(raw_url), str(e)))
1009 assert not inpart.read()
1013 assert not inpart.read()
1010
1014
1011 @parthandler('b2x:reply:changegroup', ('return', 'in-reply-to'))
1015 @parthandler('b2x:reply:changegroup', ('return', 'in-reply-to'))
1012 def handlereplychangegroup(op, inpart):
1016 def handlereplychangegroup(op, inpart):
1013 ret = int(inpart.params['return'])
1017 ret = int(inpart.params['return'])
1014 replyto = int(inpart.params['in-reply-to'])
1018 replyto = int(inpart.params['in-reply-to'])
1015 op.records.add('changegroup', {'return': ret}, replyto)
1019 op.records.add('changegroup', {'return': ret}, replyto)
1016
1020
1017 @parthandler('b2x:check:heads')
1021 @parthandler('b2x:check:heads')
1018 def handlecheckheads(op, inpart):
1022 def handlecheckheads(op, inpart):
1019 """check that head of the repo did not change
1023 """check that head of the repo did not change
1020
1024
1021 This is used to detect a push race when using unbundle.
1025 This is used to detect a push race when using unbundle.
1022 This replaces the "heads" argument of unbundle."""
1026 This replaces the "heads" argument of unbundle."""
1023 h = inpart.read(20)
1027 h = inpart.read(20)
1024 heads = []
1028 heads = []
1025 while len(h) == 20:
1029 while len(h) == 20:
1026 heads.append(h)
1030 heads.append(h)
1027 h = inpart.read(20)
1031 h = inpart.read(20)
1028 assert not h
1032 assert not h
1029 if heads != op.repo.heads():
1033 if heads != op.repo.heads():
1030 raise error.PushRaced('repository changed while pushing - '
1034 raise error.PushRaced('repository changed while pushing - '
1031 'please try again')
1035 'please try again')
1032
1036
1033 @parthandler('b2x:output')
1037 @parthandler('b2x:output')
1034 def handleoutput(op, inpart):
1038 def handleoutput(op, inpart):
1035 """forward output captured on the server to the client"""
1039 """forward output captured on the server to the client"""
1036 for line in inpart.read().splitlines():
1040 for line in inpart.read().splitlines():
1037 op.ui.write(('remote: %s\n' % line))
1041 op.ui.write(('remote: %s\n' % line))
1038
1042
1039 @parthandler('b2x:replycaps')
1043 @parthandler('b2x:replycaps')
1040 def handlereplycaps(op, inpart):
1044 def handlereplycaps(op, inpart):
1041 """Notify that a reply bundle should be created
1045 """Notify that a reply bundle should be created
1042
1046
1043 The payload contains the capabilities information for the reply"""
1047 The payload contains the capabilities information for the reply"""
1044 caps = decodecaps(inpart.read())
1048 caps = decodecaps(inpart.read())
1045 if op.reply is None:
1049 if op.reply is None:
1046 op.reply = bundle20(op.ui, caps)
1050 op.reply = bundle20(op.ui, caps)
1047
1051
1048 @parthandler('b2x:error:abort', ('message', 'hint'))
1052 @parthandler('b2x:error:abort', ('message', 'hint'))
1049 def handlereplycaps(op, inpart):
1053 def handlereplycaps(op, inpart):
1050 """Used to transmit abort error over the wire"""
1054 """Used to transmit abort error over the wire"""
1051 raise util.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
1055 raise util.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
1052
1056
1053 @parthandler('b2x:error:unsupportedcontent', ('parttype', 'params'))
1057 @parthandler('b2x:error:unsupportedcontent', ('parttype', 'params'))
1054 def handlereplycaps(op, inpart):
1058 def handlereplycaps(op, inpart):
1055 """Used to transmit unknown content error over the wire"""
1059 """Used to transmit unknown content error over the wire"""
1056 kwargs = {}
1060 kwargs = {}
1057 parttype = inpart.params.get('parttype')
1061 parttype = inpart.params.get('parttype')
1058 if parttype is not None:
1062 if parttype is not None:
1059 kwargs['parttype'] = parttype
1063 kwargs['parttype'] = parttype
1060 params = inpart.params.get('params')
1064 params = inpart.params.get('params')
1061 if params is not None:
1065 if params is not None:
1062 kwargs['params'] = params.split('\0')
1066 kwargs['params'] = params.split('\0')
1063
1067
1064 raise error.UnsupportedPartError(**kwargs)
1068 raise error.UnsupportedPartError(**kwargs)
1065
1069
1066 @parthandler('b2x:error:pushraced', ('message',))
1070 @parthandler('b2x:error:pushraced', ('message',))
1067 def handlereplycaps(op, inpart):
1071 def handlereplycaps(op, inpart):
1068 """Used to transmit push race error over the wire"""
1072 """Used to transmit push race error over the wire"""
1069 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1073 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1070
1074
1071 @parthandler('b2x:listkeys', ('namespace',))
1075 @parthandler('b2x:listkeys', ('namespace',))
1072 def handlelistkeys(op, inpart):
1076 def handlelistkeys(op, inpart):
1073 """retrieve pushkey namespace content stored in a bundle2"""
1077 """retrieve pushkey namespace content stored in a bundle2"""
1074 namespace = inpart.params['namespace']
1078 namespace = inpart.params['namespace']
1075 r = pushkey.decodekeys(inpart.read())
1079 r = pushkey.decodekeys(inpart.read())
1076 op.records.add('listkeys', (namespace, r))
1080 op.records.add('listkeys', (namespace, r))
1077
1081
1078 @parthandler('b2x:pushkey', ('namespace', 'key', 'old', 'new'))
1082 @parthandler('b2x:pushkey', ('namespace', 'key', 'old', 'new'))
1079 def handlepushkey(op, inpart):
1083 def handlepushkey(op, inpart):
1080 """process a pushkey request"""
1084 """process a pushkey request"""
1081 dec = pushkey.decode
1085 dec = pushkey.decode
1082 namespace = dec(inpart.params['namespace'])
1086 namespace = dec(inpart.params['namespace'])
1083 key = dec(inpart.params['key'])
1087 key = dec(inpart.params['key'])
1084 old = dec(inpart.params['old'])
1088 old = dec(inpart.params['old'])
1085 new = dec(inpart.params['new'])
1089 new = dec(inpart.params['new'])
1086 ret = op.repo.pushkey(namespace, key, old, new)
1090 ret = op.repo.pushkey(namespace, key, old, new)
1087 record = {'namespace': namespace,
1091 record = {'namespace': namespace,
1088 'key': key,
1092 'key': key,
1089 'old': old,
1093 'old': old,
1090 'new': new}
1094 'new': new}
1091 op.records.add('pushkey', record)
1095 op.records.add('pushkey', record)
1092 if op.reply is not None:
1096 if op.reply is not None:
1093 rpart = op.reply.newpart('b2x:reply:pushkey')
1097 rpart = op.reply.newpart('b2x:reply:pushkey')
1094 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1098 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1095 rpart.addparam('return', '%i' % ret, mandatory=False)
1099 rpart.addparam('return', '%i' % ret, mandatory=False)
1096
1100
1097 @parthandler('b2x:reply:pushkey', ('return', 'in-reply-to'))
1101 @parthandler('b2x:reply:pushkey', ('return', 'in-reply-to'))
1098 def handlepushkeyreply(op, inpart):
1102 def handlepushkeyreply(op, inpart):
1099 """retrieve the result of a pushkey request"""
1103 """retrieve the result of a pushkey request"""
1100 ret = int(inpart.params['return'])
1104 ret = int(inpart.params['return'])
1101 partid = int(inpart.params['in-reply-to'])
1105 partid = int(inpart.params['in-reply-to'])
1102 op.records.add('pushkey', {'return': ret}, partid)
1106 op.records.add('pushkey', {'return': ret}, partid)
1103
1107
1104 @parthandler('b2x:obsmarkers')
1108 @parthandler('b2x:obsmarkers')
1105 def handleobsmarker(op, inpart):
1109 def handleobsmarker(op, inpart):
1106 """add a stream of obsmarkers to the repo"""
1110 """add a stream of obsmarkers to the repo"""
1107 tr = op.gettransaction()
1111 tr = op.gettransaction()
1108 new = op.repo.obsstore.mergemarkers(tr, inpart.read())
1112 new = op.repo.obsstore.mergemarkers(tr, inpart.read())
1109 if new:
1113 if new:
1110 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1114 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1111 op.records.add('obsmarkers', {'new': new})
1115 op.records.add('obsmarkers', {'new': new})
1112 if op.reply is not None:
1116 if op.reply is not None:
1113 rpart = op.reply.newpart('b2x:reply:obsmarkers')
1117 rpart = op.reply.newpart('b2x:reply:obsmarkers')
1114 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1118 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1115 rpart.addparam('new', '%i' % new, mandatory=False)
1119 rpart.addparam('new', '%i' % new, mandatory=False)
1116
1120
1117
1121
1118 @parthandler('b2x:reply:obsmarkers', ('new', 'in-reply-to'))
1122 @parthandler('b2x:reply:obsmarkers', ('new', 'in-reply-to'))
1119 def handlepushkeyreply(op, inpart):
1123 def handlepushkeyreply(op, inpart):
1120 """retrieve the result of a pushkey request"""
1124 """retrieve the result of a pushkey request"""
1121 ret = int(inpart.params['new'])
1125 ret = int(inpart.params['new'])
1122 partid = int(inpart.params['in-reply-to'])
1126 partid = int(inpart.params['in-reply-to'])
1123 op.records.add('obsmarkers', {'new': ret}, partid)
1127 op.records.add('obsmarkers', {'new': ret}, partid)
General Comments 0
You need to be logged in to leave comments. Login now