##// END OF EJS Templates
bundle2.processbundle: let callers request default behavior...
Eric Sumner -
r23438:6e0ecb9a default
parent child Browse files
Show More
@@ -1,1119 +1,1121 b''
1 # bundle2.py - generic container format to transmit arbitrary data.
1 # bundle2.py - generic container format to transmit arbitrary data.
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7 """Handling of the new bundle2 format
7 """Handling of the new bundle2 format
8
8
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 payloads in an application agnostic way. It consist in a sequence of "parts"
10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 that will be handed to and processed by the application layer.
11 that will be handed to and processed by the application layer.
12
12
13
13
14 General format architecture
14 General format architecture
15 ===========================
15 ===========================
16
16
17 The format is architectured as follow
17 The format is architectured as follow
18
18
19 - magic string
19 - magic string
20 - stream level parameters
20 - stream level parameters
21 - payload parts (any number)
21 - payload parts (any number)
22 - end of stream marker.
22 - end of stream marker.
23
23
24 the Binary format
24 the Binary format
25 ============================
25 ============================
26
26
27 All numbers are unsigned and big-endian.
27 All numbers are unsigned and big-endian.
28
28
29 stream level parameters
29 stream level parameters
30 ------------------------
30 ------------------------
31
31
32 Binary format is as follow
32 Binary format is as follow
33
33
34 :params size: int32
34 :params size: int32
35
35
36 The total number of Bytes used by the parameters
36 The total number of Bytes used by the parameters
37
37
38 :params value: arbitrary number of Bytes
38 :params value: arbitrary number of Bytes
39
39
40 A blob of `params size` containing the serialized version of all stream level
40 A blob of `params size` containing the serialized version of all stream level
41 parameters.
41 parameters.
42
42
43 The blob contains a space separated list of parameters. Parameters with value
43 The blob contains a space separated list of parameters. Parameters with value
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45
45
46 Empty name are obviously forbidden.
46 Empty name are obviously forbidden.
47
47
48 Name MUST start with a letter. If this first letter is lower case, the
48 Name MUST start with a letter. If this first letter is lower case, the
49 parameter is advisory and can be safely ignored. However when the first
49 parameter is advisory and can be safely ignored. However when the first
50 letter is capital, the parameter is mandatory and the bundling process MUST
50 letter is capital, the parameter is mandatory and the bundling process MUST
51 stop if he is not able to proceed it.
51 stop if he is not able to proceed it.
52
52
53 Stream parameters use a simple textual format for two main reasons:
53 Stream parameters use a simple textual format for two main reasons:
54
54
55 - Stream level parameters should remain simple and we want to discourage any
55 - Stream level parameters should remain simple and we want to discourage any
56 crazy usage.
56 crazy usage.
57 - Textual data allow easy human inspection of a bundle2 header in case of
57 - Textual data allow easy human inspection of a bundle2 header in case of
58 troubles.
58 troubles.
59
59
60 Any Applicative level options MUST go into a bundle2 part instead.
60 Any Applicative level options MUST go into a bundle2 part instead.
61
61
62 Payload part
62 Payload part
63 ------------------------
63 ------------------------
64
64
65 Binary format is as follow
65 Binary format is as follow
66
66
67 :header size: int32
67 :header size: int32
68
68
69 The total number of Bytes used by the part headers. When the header is empty
69 The total number of Bytes used by the part headers. When the header is empty
70 (size = 0) this is interpreted as the end of stream marker.
70 (size = 0) this is interpreted as the end of stream marker.
71
71
72 :header:
72 :header:
73
73
74 The header defines how to interpret the part. It contains two piece of
74 The header defines how to interpret the part. It contains two piece of
75 data: the part type, and the part parameters.
75 data: the part type, and the part parameters.
76
76
77 The part type is used to route an application level handler, that can
77 The part type is used to route an application level handler, that can
78 interpret payload.
78 interpret payload.
79
79
80 Part parameters are passed to the application level handler. They are
80 Part parameters are passed to the application level handler. They are
81 meant to convey information that will help the application level object to
81 meant to convey information that will help the application level object to
82 interpret the part payload.
82 interpret the part payload.
83
83
84 The binary format of the header is has follow
84 The binary format of the header is has follow
85
85
86 :typesize: (one byte)
86 :typesize: (one byte)
87
87
88 :parttype: alphanumerical part name
88 :parttype: alphanumerical part name
89
89
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 to this part.
91 to this part.
92
92
93 :parameters:
93 :parameters:
94
94
95 Part's parameter may have arbitrary content, the binary structure is::
95 Part's parameter may have arbitrary content, the binary structure is::
96
96
97 <mandatory-count><advisory-count><param-sizes><param-data>
97 <mandatory-count><advisory-count><param-sizes><param-data>
98
98
99 :mandatory-count: 1 byte, number of mandatory parameters
99 :mandatory-count: 1 byte, number of mandatory parameters
100
100
101 :advisory-count: 1 byte, number of advisory parameters
101 :advisory-count: 1 byte, number of advisory parameters
102
102
103 :param-sizes:
103 :param-sizes:
104
104
105 N couple of bytes, where N is the total number of parameters. Each
105 N couple of bytes, where N is the total number of parameters. Each
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107
107
108 :param-data:
108 :param-data:
109
109
110 A blob of bytes from which each parameter key and value can be
110 A blob of bytes from which each parameter key and value can be
111 retrieved using the list of size couples stored in the previous
111 retrieved using the list of size couples stored in the previous
112 field.
112 field.
113
113
114 Mandatory parameters comes first, then the advisory ones.
114 Mandatory parameters comes first, then the advisory ones.
115
115
116 Each parameter's key MUST be unique within the part.
116 Each parameter's key MUST be unique within the part.
117
117
118 :payload:
118 :payload:
119
119
120 payload is a series of `<chunksize><chunkdata>`.
120 payload is a series of `<chunksize><chunkdata>`.
121
121
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124
124
125 The current implementation always produces either zero or one chunk.
125 The current implementation always produces either zero or one chunk.
126 This is an implementation limitation that will ultimately be lifted.
126 This is an implementation limitation that will ultimately be lifted.
127
127
128 `chunksize` can be negative to trigger special case processing. No such
128 `chunksize` can be negative to trigger special case processing. No such
129 processing is in place yet.
129 processing is in place yet.
130
130
131 Bundle processing
131 Bundle processing
132 ============================
132 ============================
133
133
134 Each part is processed in order using a "part handler". Handler are registered
134 Each part is processed in order using a "part handler". Handler are registered
135 for a certain part type.
135 for a certain part type.
136
136
137 The matching of a part to its handler is case insensitive. The case of the
137 The matching of a part to its handler is case insensitive. The case of the
138 part type is used to know if a part is mandatory or advisory. If the Part type
138 part type is used to know if a part is mandatory or advisory. If the Part type
139 contains any uppercase char it is considered mandatory. When no handler is
139 contains any uppercase char it is considered mandatory. When no handler is
140 known for a Mandatory part, the process is aborted and an exception is raised.
140 known for a Mandatory part, the process is aborted and an exception is raised.
141 If the part is advisory and no handler is known, the part is ignored. When the
141 If the part is advisory and no handler is known, the part is ignored. When the
142 process is aborted, the full bundle is still read from the stream to keep the
142 process is aborted, the full bundle is still read from the stream to keep the
143 channel usable. But none of the part read from an abort are processed. In the
143 channel usable. But none of the part read from an abort are processed. In the
144 future, dropping the stream may become an option for channel we do not care to
144 future, dropping the stream may become an option for channel we do not care to
145 preserve.
145 preserve.
146 """
146 """
147
147
148 import sys
148 import sys
149 import util
149 import util
150 import struct
150 import struct
151 import urllib
151 import urllib
152 import string
152 import string
153 import obsolete
153 import obsolete
154 import pushkey
154 import pushkey
155 import url
155 import url
156
156
157 import changegroup, error
157 import changegroup, error
158 from i18n import _
158 from i18n import _
159
159
160 _pack = struct.pack
160 _pack = struct.pack
161 _unpack = struct.unpack
161 _unpack = struct.unpack
162
162
163 _magicstring = 'HG2Y'
163 _magicstring = 'HG2Y'
164
164
165 _fstreamparamsize = '>i'
165 _fstreamparamsize = '>i'
166 _fpartheadersize = '>i'
166 _fpartheadersize = '>i'
167 _fparttypesize = '>B'
167 _fparttypesize = '>B'
168 _fpartid = '>I'
168 _fpartid = '>I'
169 _fpayloadsize = '>i'
169 _fpayloadsize = '>i'
170 _fpartparamcount = '>BB'
170 _fpartparamcount = '>BB'
171
171
172 preferedchunksize = 4096
172 preferedchunksize = 4096
173
173
174 def _makefpartparamsizes(nbparams):
174 def _makefpartparamsizes(nbparams):
175 """return a struct format to read part parameter sizes
175 """return a struct format to read part parameter sizes
176
176
177 The number parameters is variable so we need to build that format
177 The number parameters is variable so we need to build that format
178 dynamically.
178 dynamically.
179 """
179 """
180 return '>'+('BB'*nbparams)
180 return '>'+('BB'*nbparams)
181
181
182 parthandlermapping = {}
182 parthandlermapping = {}
183
183
184 def parthandler(parttype, params=()):
184 def parthandler(parttype, params=()):
185 """decorator that register a function as a bundle2 part handler
185 """decorator that register a function as a bundle2 part handler
186
186
187 eg::
187 eg::
188
188
189 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
189 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
190 def myparttypehandler(...):
190 def myparttypehandler(...):
191 '''process a part of type "my part".'''
191 '''process a part of type "my part".'''
192 ...
192 ...
193 """
193 """
194 def _decorator(func):
194 def _decorator(func):
195 lparttype = parttype.lower() # enforce lower case matching.
195 lparttype = parttype.lower() # enforce lower case matching.
196 assert lparttype not in parthandlermapping
196 assert lparttype not in parthandlermapping
197 parthandlermapping[lparttype] = func
197 parthandlermapping[lparttype] = func
198 func.params = frozenset(params)
198 func.params = frozenset(params)
199 return func
199 return func
200 return _decorator
200 return _decorator
201
201
202 class unbundlerecords(object):
202 class unbundlerecords(object):
203 """keep record of what happens during and unbundle
203 """keep record of what happens during and unbundle
204
204
205 New records are added using `records.add('cat', obj)`. Where 'cat' is a
205 New records are added using `records.add('cat', obj)`. Where 'cat' is a
206 category of record and obj is an arbitrary object.
206 category of record and obj is an arbitrary object.
207
207
208 `records['cat']` will return all entries of this category 'cat'.
208 `records['cat']` will return all entries of this category 'cat'.
209
209
210 Iterating on the object itself will yield `('category', obj)` tuples
210 Iterating on the object itself will yield `('category', obj)` tuples
211 for all entries.
211 for all entries.
212
212
213 All iterations happens in chronological order.
213 All iterations happens in chronological order.
214 """
214 """
215
215
216 def __init__(self):
216 def __init__(self):
217 self._categories = {}
217 self._categories = {}
218 self._sequences = []
218 self._sequences = []
219 self._replies = {}
219 self._replies = {}
220
220
221 def add(self, category, entry, inreplyto=None):
221 def add(self, category, entry, inreplyto=None):
222 """add a new record of a given category.
222 """add a new record of a given category.
223
223
224 The entry can then be retrieved in the list returned by
224 The entry can then be retrieved in the list returned by
225 self['category']."""
225 self['category']."""
226 self._categories.setdefault(category, []).append(entry)
226 self._categories.setdefault(category, []).append(entry)
227 self._sequences.append((category, entry))
227 self._sequences.append((category, entry))
228 if inreplyto is not None:
228 if inreplyto is not None:
229 self.getreplies(inreplyto).add(category, entry)
229 self.getreplies(inreplyto).add(category, entry)
230
230
231 def getreplies(self, partid):
231 def getreplies(self, partid):
232 """get the records that are replies to a specific part"""
232 """get the records that are replies to a specific part"""
233 return self._replies.setdefault(partid, unbundlerecords())
233 return self._replies.setdefault(partid, unbundlerecords())
234
234
235 def __getitem__(self, cat):
235 def __getitem__(self, cat):
236 return tuple(self._categories.get(cat, ()))
236 return tuple(self._categories.get(cat, ()))
237
237
238 def __iter__(self):
238 def __iter__(self):
239 return iter(self._sequences)
239 return iter(self._sequences)
240
240
241 def __len__(self):
241 def __len__(self):
242 return len(self._sequences)
242 return len(self._sequences)
243
243
244 def __nonzero__(self):
244 def __nonzero__(self):
245 return bool(self._sequences)
245 return bool(self._sequences)
246
246
247 class bundleoperation(object):
247 class bundleoperation(object):
248 """an object that represents a single bundling process
248 """an object that represents a single bundling process
249
249
250 Its purpose is to carry unbundle-related objects and states.
250 Its purpose is to carry unbundle-related objects and states.
251
251
252 A new object should be created at the beginning of each bundle processing.
252 A new object should be created at the beginning of each bundle processing.
253 The object is to be returned by the processing function.
253 The object is to be returned by the processing function.
254
254
255 The object has very little content now it will ultimately contain:
255 The object has very little content now it will ultimately contain:
256 * an access to the repo the bundle is applied to,
256 * an access to the repo the bundle is applied to,
257 * a ui object,
257 * a ui object,
258 * a way to retrieve a transaction to add changes to the repo,
258 * a way to retrieve a transaction to add changes to the repo,
259 * a way to record the result of processing each part,
259 * a way to record the result of processing each part,
260 * a way to construct a bundle response when applicable.
260 * a way to construct a bundle response when applicable.
261 """
261 """
262
262
263 def __init__(self, repo, transactiongetter):
263 def __init__(self, repo, transactiongetter):
264 self.repo = repo
264 self.repo = repo
265 self.ui = repo.ui
265 self.ui = repo.ui
266 self.records = unbundlerecords()
266 self.records = unbundlerecords()
267 self.gettransaction = transactiongetter
267 self.gettransaction = transactiongetter
268 self.reply = None
268 self.reply = None
269
269
270 class TransactionUnavailable(RuntimeError):
270 class TransactionUnavailable(RuntimeError):
271 pass
271 pass
272
272
273 def _notransaction():
273 def _notransaction():
274 """default method to get a transaction while processing a bundle
274 """default method to get a transaction while processing a bundle
275
275
276 Raise an exception to highlight the fact that no transaction was expected
276 Raise an exception to highlight the fact that no transaction was expected
277 to be created"""
277 to be created"""
278 raise TransactionUnavailable()
278 raise TransactionUnavailable()
279
279
280 def processbundle(repo, unbundler, transactiongetter=_notransaction):
280 def processbundle(repo, unbundler, transactiongetter=None):
281 """This function process a bundle, apply effect to/from a repo
281 """This function process a bundle, apply effect to/from a repo
282
282
283 It iterates over each part then searches for and uses the proper handling
283 It iterates over each part then searches for and uses the proper handling
284 code to process the part. Parts are processed in order.
284 code to process the part. Parts are processed in order.
285
285
286 This is very early version of this function that will be strongly reworked
286 This is very early version of this function that will be strongly reworked
287 before final usage.
287 before final usage.
288
288
289 Unknown Mandatory part will abort the process.
289 Unknown Mandatory part will abort the process.
290 """
290 """
291 if transactiongetter is None:
292 transactiongetter = _notransaction
291 op = bundleoperation(repo, transactiongetter)
293 op = bundleoperation(repo, transactiongetter)
292 # todo:
294 # todo:
293 # - replace this is a init function soon.
295 # - replace this is a init function soon.
294 # - exception catching
296 # - exception catching
295 unbundler.params
297 unbundler.params
296 iterparts = unbundler.iterparts()
298 iterparts = unbundler.iterparts()
297 part = None
299 part = None
298 try:
300 try:
299 for part in iterparts:
301 for part in iterparts:
300 _processpart(op, part)
302 _processpart(op, part)
301 except Exception, exc:
303 except Exception, exc:
302 for part in iterparts:
304 for part in iterparts:
303 # consume the bundle content
305 # consume the bundle content
304 part.read()
306 part.read()
305 # Small hack to let caller code distinguish exceptions from bundle2
307 # Small hack to let caller code distinguish exceptions from bundle2
306 # processing from processing the old format. This is mostly
308 # processing from processing the old format. This is mostly
307 # needed to handle different return codes to unbundle according to the
309 # needed to handle different return codes to unbundle according to the
308 # type of bundle. We should probably clean up or drop this return code
310 # type of bundle. We should probably clean up or drop this return code
309 # craziness in a future version.
311 # craziness in a future version.
310 exc.duringunbundle2 = True
312 exc.duringunbundle2 = True
311 raise
313 raise
312 return op
314 return op
313
315
314 def _processpart(op, part):
316 def _processpart(op, part):
315 """process a single part from a bundle
317 """process a single part from a bundle
316
318
317 The part is guaranteed to have been fully consumed when the function exits
319 The part is guaranteed to have been fully consumed when the function exits
318 (even if an exception is raised)."""
320 (even if an exception is raised)."""
319 try:
321 try:
320 parttype = part.type
322 parttype = part.type
321 # part key are matched lower case
323 # part key are matched lower case
322 key = parttype.lower()
324 key = parttype.lower()
323 try:
325 try:
324 handler = parthandlermapping.get(key)
326 handler = parthandlermapping.get(key)
325 if handler is None:
327 if handler is None:
326 raise error.UnsupportedPartError(parttype=key)
328 raise error.UnsupportedPartError(parttype=key)
327 op.ui.debug('found a handler for part %r\n' % parttype)
329 op.ui.debug('found a handler for part %r\n' % parttype)
328 unknownparams = part.mandatorykeys - handler.params
330 unknownparams = part.mandatorykeys - handler.params
329 if unknownparams:
331 if unknownparams:
330 unknownparams = list(unknownparams)
332 unknownparams = list(unknownparams)
331 unknownparams.sort()
333 unknownparams.sort()
332 raise error.UnsupportedPartError(parttype=key,
334 raise error.UnsupportedPartError(parttype=key,
333 params=unknownparams)
335 params=unknownparams)
334 except error.UnsupportedPartError, exc:
336 except error.UnsupportedPartError, exc:
335 if key != parttype: # mandatory parts
337 if key != parttype: # mandatory parts
336 raise
338 raise
337 op.ui.debug('ignoring unsupported advisory part %s\n' % exc)
339 op.ui.debug('ignoring unsupported advisory part %s\n' % exc)
338 return # skip to part processing
340 return # skip to part processing
339
341
340 # handler is called outside the above try block so that we don't
342 # handler is called outside the above try block so that we don't
341 # risk catching KeyErrors from anything other than the
343 # risk catching KeyErrors from anything other than the
342 # parthandlermapping lookup (any KeyError raised by handler()
344 # parthandlermapping lookup (any KeyError raised by handler()
343 # itself represents a defect of a different variety).
345 # itself represents a defect of a different variety).
344 output = None
346 output = None
345 if op.reply is not None:
347 if op.reply is not None:
346 op.ui.pushbuffer(error=True)
348 op.ui.pushbuffer(error=True)
347 output = ''
349 output = ''
348 try:
350 try:
349 handler(op, part)
351 handler(op, part)
350 finally:
352 finally:
351 if output is not None:
353 if output is not None:
352 output = op.ui.popbuffer()
354 output = op.ui.popbuffer()
353 if output:
355 if output:
354 outpart = op.reply.newpart('b2x:output', data=output)
356 outpart = op.reply.newpart('b2x:output', data=output)
355 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
357 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
356 finally:
358 finally:
357 # consume the part content to not corrupt the stream.
359 # consume the part content to not corrupt the stream.
358 part.read()
360 part.read()
359
361
360
362
361 def decodecaps(blob):
363 def decodecaps(blob):
362 """decode a bundle2 caps bytes blob into a dictionary
364 """decode a bundle2 caps bytes blob into a dictionary
363
365
364 The blob is a list of capabilities (one per line)
366 The blob is a list of capabilities (one per line)
365 Capabilities may have values using a line of the form::
367 Capabilities may have values using a line of the form::
366
368
367 capability=value1,value2,value3
369 capability=value1,value2,value3
368
370
369 The values are always a list."""
371 The values are always a list."""
370 caps = {}
372 caps = {}
371 for line in blob.splitlines():
373 for line in blob.splitlines():
372 if not line:
374 if not line:
373 continue
375 continue
374 if '=' not in line:
376 if '=' not in line:
375 key, vals = line, ()
377 key, vals = line, ()
376 else:
378 else:
377 key, vals = line.split('=', 1)
379 key, vals = line.split('=', 1)
378 vals = vals.split(',')
380 vals = vals.split(',')
379 key = urllib.unquote(key)
381 key = urllib.unquote(key)
380 vals = [urllib.unquote(v) for v in vals]
382 vals = [urllib.unquote(v) for v in vals]
381 caps[key] = vals
383 caps[key] = vals
382 return caps
384 return caps
383
385
384 def encodecaps(caps):
386 def encodecaps(caps):
385 """encode a bundle2 caps dictionary into a bytes blob"""
387 """encode a bundle2 caps dictionary into a bytes blob"""
386 chunks = []
388 chunks = []
387 for ca in sorted(caps):
389 for ca in sorted(caps):
388 vals = caps[ca]
390 vals = caps[ca]
389 ca = urllib.quote(ca)
391 ca = urllib.quote(ca)
390 vals = [urllib.quote(v) for v in vals]
392 vals = [urllib.quote(v) for v in vals]
391 if vals:
393 if vals:
392 ca = "%s=%s" % (ca, ','.join(vals))
394 ca = "%s=%s" % (ca, ','.join(vals))
393 chunks.append(ca)
395 chunks.append(ca)
394 return '\n'.join(chunks)
396 return '\n'.join(chunks)
395
397
396 class bundle20(object):
398 class bundle20(object):
397 """represent an outgoing bundle2 container
399 """represent an outgoing bundle2 container
398
400
399 Use the `addparam` method to add stream level parameter. and `newpart` to
401 Use the `addparam` method to add stream level parameter. and `newpart` to
400 populate it. Then call `getchunks` to retrieve all the binary chunks of
402 populate it. Then call `getchunks` to retrieve all the binary chunks of
401 data that compose the bundle2 container."""
403 data that compose the bundle2 container."""
402
404
403 def __init__(self, ui, capabilities=()):
405 def __init__(self, ui, capabilities=()):
404 self.ui = ui
406 self.ui = ui
405 self._params = []
407 self._params = []
406 self._parts = []
408 self._parts = []
407 self.capabilities = dict(capabilities)
409 self.capabilities = dict(capabilities)
408
410
409 @property
411 @property
410 def nbparts(self):
412 def nbparts(self):
411 """total number of parts added to the bundler"""
413 """total number of parts added to the bundler"""
412 return len(self._parts)
414 return len(self._parts)
413
415
414 # methods used to defines the bundle2 content
416 # methods used to defines the bundle2 content
415 def addparam(self, name, value=None):
417 def addparam(self, name, value=None):
416 """add a stream level parameter"""
418 """add a stream level parameter"""
417 if not name:
419 if not name:
418 raise ValueError('empty parameter name')
420 raise ValueError('empty parameter name')
419 if name[0] not in string.letters:
421 if name[0] not in string.letters:
420 raise ValueError('non letter first character: %r' % name)
422 raise ValueError('non letter first character: %r' % name)
421 self._params.append((name, value))
423 self._params.append((name, value))
422
424
423 def addpart(self, part):
425 def addpart(self, part):
424 """add a new part to the bundle2 container
426 """add a new part to the bundle2 container
425
427
426 Parts contains the actual applicative payload."""
428 Parts contains the actual applicative payload."""
427 assert part.id is None
429 assert part.id is None
428 part.id = len(self._parts) # very cheap counter
430 part.id = len(self._parts) # very cheap counter
429 self._parts.append(part)
431 self._parts.append(part)
430
432
431 def newpart(self, typeid, *args, **kwargs):
433 def newpart(self, typeid, *args, **kwargs):
432 """create a new part and add it to the containers
434 """create a new part and add it to the containers
433
435
434 As the part is directly added to the containers. For now, this means
436 As the part is directly added to the containers. For now, this means
435 that any failure to properly initialize the part after calling
437 that any failure to properly initialize the part after calling
436 ``newpart`` should result in a failure of the whole bundling process.
438 ``newpart`` should result in a failure of the whole bundling process.
437
439
438 You can still fall back to manually create and add if you need better
440 You can still fall back to manually create and add if you need better
439 control."""
441 control."""
440 part = bundlepart(typeid, *args, **kwargs)
442 part = bundlepart(typeid, *args, **kwargs)
441 self.addpart(part)
443 self.addpart(part)
442 return part
444 return part
443
445
444 # methods used to generate the bundle2 stream
446 # methods used to generate the bundle2 stream
445 def getchunks(self):
447 def getchunks(self):
446 self.ui.debug('start emission of %s stream\n' % _magicstring)
448 self.ui.debug('start emission of %s stream\n' % _magicstring)
447 yield _magicstring
449 yield _magicstring
448 param = self._paramchunk()
450 param = self._paramchunk()
449 self.ui.debug('bundle parameter: %s\n' % param)
451 self.ui.debug('bundle parameter: %s\n' % param)
450 yield _pack(_fstreamparamsize, len(param))
452 yield _pack(_fstreamparamsize, len(param))
451 if param:
453 if param:
452 yield param
454 yield param
453
455
454 self.ui.debug('start of parts\n')
456 self.ui.debug('start of parts\n')
455 for part in self._parts:
457 for part in self._parts:
456 self.ui.debug('bundle part: "%s"\n' % part.type)
458 self.ui.debug('bundle part: "%s"\n' % part.type)
457 for chunk in part.getchunks():
459 for chunk in part.getchunks():
458 yield chunk
460 yield chunk
459 self.ui.debug('end of bundle\n')
461 self.ui.debug('end of bundle\n')
460 yield _pack(_fpartheadersize, 0)
462 yield _pack(_fpartheadersize, 0)
461
463
462 def _paramchunk(self):
464 def _paramchunk(self):
463 """return a encoded version of all stream parameters"""
465 """return a encoded version of all stream parameters"""
464 blocks = []
466 blocks = []
465 for par, value in self._params:
467 for par, value in self._params:
466 par = urllib.quote(par)
468 par = urllib.quote(par)
467 if value is not None:
469 if value is not None:
468 value = urllib.quote(value)
470 value = urllib.quote(value)
469 par = '%s=%s' % (par, value)
471 par = '%s=%s' % (par, value)
470 blocks.append(par)
472 blocks.append(par)
471 return ' '.join(blocks)
473 return ' '.join(blocks)
472
474
473 class unpackermixin(object):
475 class unpackermixin(object):
474 """A mixin to extract bytes and struct data from a stream"""
476 """A mixin to extract bytes and struct data from a stream"""
475
477
476 def __init__(self, fp):
478 def __init__(self, fp):
477 self._fp = fp
479 self._fp = fp
478
480
479 def _unpack(self, format):
481 def _unpack(self, format):
480 """unpack this struct format from the stream"""
482 """unpack this struct format from the stream"""
481 data = self._readexact(struct.calcsize(format))
483 data = self._readexact(struct.calcsize(format))
482 return _unpack(format, data)
484 return _unpack(format, data)
483
485
484 def _readexact(self, size):
486 def _readexact(self, size):
485 """read exactly <size> bytes from the stream"""
487 """read exactly <size> bytes from the stream"""
486 return changegroup.readexactly(self._fp, size)
488 return changegroup.readexactly(self._fp, size)
487
489
488
490
489 class unbundle20(unpackermixin):
491 class unbundle20(unpackermixin):
490 """interpret a bundle2 stream
492 """interpret a bundle2 stream
491
493
492 This class is fed with a binary stream and yields parts through its
494 This class is fed with a binary stream and yields parts through its
493 `iterparts` methods."""
495 `iterparts` methods."""
494
496
495 def __init__(self, ui, fp, header=None):
497 def __init__(self, ui, fp, header=None):
496 """If header is specified, we do not read it out of the stream."""
498 """If header is specified, we do not read it out of the stream."""
497 self.ui = ui
499 self.ui = ui
498 super(unbundle20, self).__init__(fp)
500 super(unbundle20, self).__init__(fp)
499 if header is None:
501 if header is None:
500 header = self._readexact(4)
502 header = self._readexact(4)
501 magic, version = header[0:2], header[2:4]
503 magic, version = header[0:2], header[2:4]
502 if magic != 'HG':
504 if magic != 'HG':
503 raise util.Abort(_('not a Mercurial bundle'))
505 raise util.Abort(_('not a Mercurial bundle'))
504 if version != '2Y':
506 if version != '2Y':
505 raise util.Abort(_('unknown bundle version %s') % version)
507 raise util.Abort(_('unknown bundle version %s') % version)
506 self.ui.debug('start processing of %s stream\n' % header)
508 self.ui.debug('start processing of %s stream\n' % header)
507
509
508 @util.propertycache
510 @util.propertycache
509 def params(self):
511 def params(self):
510 """dictionary of stream level parameters"""
512 """dictionary of stream level parameters"""
511 self.ui.debug('reading bundle2 stream parameters\n')
513 self.ui.debug('reading bundle2 stream parameters\n')
512 params = {}
514 params = {}
513 paramssize = self._unpack(_fstreamparamsize)[0]
515 paramssize = self._unpack(_fstreamparamsize)[0]
514 if paramssize < 0:
516 if paramssize < 0:
515 raise error.BundleValueError('negative bundle param size: %i'
517 raise error.BundleValueError('negative bundle param size: %i'
516 % paramssize)
518 % paramssize)
517 if paramssize:
519 if paramssize:
518 for p in self._readexact(paramssize).split(' '):
520 for p in self._readexact(paramssize).split(' '):
519 p = p.split('=', 1)
521 p = p.split('=', 1)
520 p = [urllib.unquote(i) for i in p]
522 p = [urllib.unquote(i) for i in p]
521 if len(p) < 2:
523 if len(p) < 2:
522 p.append(None)
524 p.append(None)
523 self._processparam(*p)
525 self._processparam(*p)
524 params[p[0]] = p[1]
526 params[p[0]] = p[1]
525 return params
527 return params
526
528
527 def _processparam(self, name, value):
529 def _processparam(self, name, value):
528 """process a parameter, applying its effect if needed
530 """process a parameter, applying its effect if needed
529
531
530 Parameter starting with a lower case letter are advisory and will be
532 Parameter starting with a lower case letter are advisory and will be
531 ignored when unknown. Those starting with an upper case letter are
533 ignored when unknown. Those starting with an upper case letter are
532 mandatory and will this function will raise a KeyError when unknown.
534 mandatory and will this function will raise a KeyError when unknown.
533
535
534 Note: no option are currently supported. Any input will be either
536 Note: no option are currently supported. Any input will be either
535 ignored or failing.
537 ignored or failing.
536 """
538 """
537 if not name:
539 if not name:
538 raise ValueError('empty parameter name')
540 raise ValueError('empty parameter name')
539 if name[0] not in string.letters:
541 if name[0] not in string.letters:
540 raise ValueError('non letter first character: %r' % name)
542 raise ValueError('non letter first character: %r' % name)
541 # Some logic will be later added here to try to process the option for
543 # Some logic will be later added here to try to process the option for
542 # a dict of known parameter.
544 # a dict of known parameter.
543 if name[0].islower():
545 if name[0].islower():
544 self.ui.debug("ignoring unknown parameter %r\n" % name)
546 self.ui.debug("ignoring unknown parameter %r\n" % name)
545 else:
547 else:
546 raise error.UnsupportedPartError(params=(name,))
548 raise error.UnsupportedPartError(params=(name,))
547
549
548
550
549 def iterparts(self):
551 def iterparts(self):
550 """yield all parts contained in the stream"""
552 """yield all parts contained in the stream"""
551 # make sure param have been loaded
553 # make sure param have been loaded
552 self.params
554 self.params
553 self.ui.debug('start extraction of bundle2 parts\n')
555 self.ui.debug('start extraction of bundle2 parts\n')
554 headerblock = self._readpartheader()
556 headerblock = self._readpartheader()
555 while headerblock is not None:
557 while headerblock is not None:
556 part = unbundlepart(self.ui, headerblock, self._fp)
558 part = unbundlepart(self.ui, headerblock, self._fp)
557 yield part
559 yield part
558 headerblock = self._readpartheader()
560 headerblock = self._readpartheader()
559 self.ui.debug('end of bundle2 stream\n')
561 self.ui.debug('end of bundle2 stream\n')
560
562
561 def _readpartheader(self):
563 def _readpartheader(self):
562 """reads a part header size and return the bytes blob
564 """reads a part header size and return the bytes blob
563
565
564 returns None if empty"""
566 returns None if empty"""
565 headersize = self._unpack(_fpartheadersize)[0]
567 headersize = self._unpack(_fpartheadersize)[0]
566 if headersize < 0:
568 if headersize < 0:
567 raise error.BundleValueError('negative part header size: %i'
569 raise error.BundleValueError('negative part header size: %i'
568 % headersize)
570 % headersize)
569 self.ui.debug('part header size: %i\n' % headersize)
571 self.ui.debug('part header size: %i\n' % headersize)
570 if headersize:
572 if headersize:
571 return self._readexact(headersize)
573 return self._readexact(headersize)
572 return None
574 return None
573
575
574
576
575 class bundlepart(object):
577 class bundlepart(object):
576 """A bundle2 part contains application level payload
578 """A bundle2 part contains application level payload
577
579
578 The part `type` is used to route the part to the application level
580 The part `type` is used to route the part to the application level
579 handler.
581 handler.
580
582
581 The part payload is contained in ``part.data``. It could be raw bytes or a
583 The part payload is contained in ``part.data``. It could be raw bytes or a
582 generator of byte chunks.
584 generator of byte chunks.
583
585
584 You can add parameters to the part using the ``addparam`` method.
586 You can add parameters to the part using the ``addparam`` method.
585 Parameters can be either mandatory (default) or advisory. Remote side
587 Parameters can be either mandatory (default) or advisory. Remote side
586 should be able to safely ignore the advisory ones.
588 should be able to safely ignore the advisory ones.
587
589
588 Both data and parameters cannot be modified after the generation has begun.
590 Both data and parameters cannot be modified after the generation has begun.
589 """
591 """
590
592
591 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
593 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
592 data=''):
594 data=''):
593 self.id = None
595 self.id = None
594 self.type = parttype
596 self.type = parttype
595 self._data = data
597 self._data = data
596 self._mandatoryparams = list(mandatoryparams)
598 self._mandatoryparams = list(mandatoryparams)
597 self._advisoryparams = list(advisoryparams)
599 self._advisoryparams = list(advisoryparams)
598 # checking for duplicated entries
600 # checking for duplicated entries
599 self._seenparams = set()
601 self._seenparams = set()
600 for pname, __ in self._mandatoryparams + self._advisoryparams:
602 for pname, __ in self._mandatoryparams + self._advisoryparams:
601 if pname in self._seenparams:
603 if pname in self._seenparams:
602 raise RuntimeError('duplicated params: %s' % pname)
604 raise RuntimeError('duplicated params: %s' % pname)
603 self._seenparams.add(pname)
605 self._seenparams.add(pname)
604 # status of the part's generation:
606 # status of the part's generation:
605 # - None: not started,
607 # - None: not started,
606 # - False: currently generated,
608 # - False: currently generated,
607 # - True: generation done.
609 # - True: generation done.
608 self._generated = None
610 self._generated = None
609
611
610 # methods used to defines the part content
612 # methods used to defines the part content
611 def __setdata(self, data):
613 def __setdata(self, data):
612 if self._generated is not None:
614 if self._generated is not None:
613 raise error.ReadOnlyPartError('part is being generated')
615 raise error.ReadOnlyPartError('part is being generated')
614 self._data = data
616 self._data = data
615 def __getdata(self):
617 def __getdata(self):
616 return self._data
618 return self._data
617 data = property(__getdata, __setdata)
619 data = property(__getdata, __setdata)
618
620
619 @property
621 @property
620 def mandatoryparams(self):
622 def mandatoryparams(self):
621 # make it an immutable tuple to force people through ``addparam``
623 # make it an immutable tuple to force people through ``addparam``
622 return tuple(self._mandatoryparams)
624 return tuple(self._mandatoryparams)
623
625
624 @property
626 @property
625 def advisoryparams(self):
627 def advisoryparams(self):
626 # make it an immutable tuple to force people through ``addparam``
628 # make it an immutable tuple to force people through ``addparam``
627 return tuple(self._advisoryparams)
629 return tuple(self._advisoryparams)
628
630
629 def addparam(self, name, value='', mandatory=True):
631 def addparam(self, name, value='', mandatory=True):
630 if self._generated is not None:
632 if self._generated is not None:
631 raise error.ReadOnlyPartError('part is being generated')
633 raise error.ReadOnlyPartError('part is being generated')
632 if name in self._seenparams:
634 if name in self._seenparams:
633 raise ValueError('duplicated params: %s' % name)
635 raise ValueError('duplicated params: %s' % name)
634 self._seenparams.add(name)
636 self._seenparams.add(name)
635 params = self._advisoryparams
637 params = self._advisoryparams
636 if mandatory:
638 if mandatory:
637 params = self._mandatoryparams
639 params = self._mandatoryparams
638 params.append((name, value))
640 params.append((name, value))
639
641
640 # methods used to generates the bundle2 stream
642 # methods used to generates the bundle2 stream
641 def getchunks(self):
643 def getchunks(self):
642 if self._generated is not None:
644 if self._generated is not None:
643 raise RuntimeError('part can only be consumed once')
645 raise RuntimeError('part can only be consumed once')
644 self._generated = False
646 self._generated = False
645 #### header
647 #### header
646 ## parttype
648 ## parttype
647 header = [_pack(_fparttypesize, len(self.type)),
649 header = [_pack(_fparttypesize, len(self.type)),
648 self.type, _pack(_fpartid, self.id),
650 self.type, _pack(_fpartid, self.id),
649 ]
651 ]
650 ## parameters
652 ## parameters
651 # count
653 # count
652 manpar = self.mandatoryparams
654 manpar = self.mandatoryparams
653 advpar = self.advisoryparams
655 advpar = self.advisoryparams
654 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
656 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
655 # size
657 # size
656 parsizes = []
658 parsizes = []
657 for key, value in manpar:
659 for key, value in manpar:
658 parsizes.append(len(key))
660 parsizes.append(len(key))
659 parsizes.append(len(value))
661 parsizes.append(len(value))
660 for key, value in advpar:
662 for key, value in advpar:
661 parsizes.append(len(key))
663 parsizes.append(len(key))
662 parsizes.append(len(value))
664 parsizes.append(len(value))
663 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
665 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
664 header.append(paramsizes)
666 header.append(paramsizes)
665 # key, value
667 # key, value
666 for key, value in manpar:
668 for key, value in manpar:
667 header.append(key)
669 header.append(key)
668 header.append(value)
670 header.append(value)
669 for key, value in advpar:
671 for key, value in advpar:
670 header.append(key)
672 header.append(key)
671 header.append(value)
673 header.append(value)
672 ## finalize header
674 ## finalize header
673 headerchunk = ''.join(header)
675 headerchunk = ''.join(header)
674 yield _pack(_fpartheadersize, len(headerchunk))
676 yield _pack(_fpartheadersize, len(headerchunk))
675 yield headerchunk
677 yield headerchunk
676 ## payload
678 ## payload
677 try:
679 try:
678 for chunk in self._payloadchunks():
680 for chunk in self._payloadchunks():
679 yield _pack(_fpayloadsize, len(chunk))
681 yield _pack(_fpayloadsize, len(chunk))
680 yield chunk
682 yield chunk
681 except Exception, exc:
683 except Exception, exc:
682 # backup exception data for later
684 # backup exception data for later
683 exc_info = sys.exc_info()
685 exc_info = sys.exc_info()
684 msg = 'unexpected error: %s' % exc
686 msg = 'unexpected error: %s' % exc
685 interpart = bundlepart('b2x:error:abort', [('message', msg)])
687 interpart = bundlepart('b2x:error:abort', [('message', msg)])
686 interpart.id = 0
688 interpart.id = 0
687 yield _pack(_fpayloadsize, -1)
689 yield _pack(_fpayloadsize, -1)
688 for chunk in interpart.getchunks():
690 for chunk in interpart.getchunks():
689 yield chunk
691 yield chunk
690 # abort current part payload
692 # abort current part payload
691 yield _pack(_fpayloadsize, 0)
693 yield _pack(_fpayloadsize, 0)
692 raise exc_info[0], exc_info[1], exc_info[2]
694 raise exc_info[0], exc_info[1], exc_info[2]
693 # end of payload
695 # end of payload
694 yield _pack(_fpayloadsize, 0)
696 yield _pack(_fpayloadsize, 0)
695 self._generated = True
697 self._generated = True
696
698
697 def _payloadchunks(self):
699 def _payloadchunks(self):
698 """yield chunks of a the part payload
700 """yield chunks of a the part payload
699
701
700 Exists to handle the different methods to provide data to a part."""
702 Exists to handle the different methods to provide data to a part."""
701 # we only support fixed size data now.
703 # we only support fixed size data now.
702 # This will be improved in the future.
704 # This will be improved in the future.
703 if util.safehasattr(self.data, 'next'):
705 if util.safehasattr(self.data, 'next'):
704 buff = util.chunkbuffer(self.data)
706 buff = util.chunkbuffer(self.data)
705 chunk = buff.read(preferedchunksize)
707 chunk = buff.read(preferedchunksize)
706 while chunk:
708 while chunk:
707 yield chunk
709 yield chunk
708 chunk = buff.read(preferedchunksize)
710 chunk = buff.read(preferedchunksize)
709 elif len(self.data):
711 elif len(self.data):
710 yield self.data
712 yield self.data
711
713
712
714
713 flaginterrupt = -1
715 flaginterrupt = -1
714
716
715 class interrupthandler(unpackermixin):
717 class interrupthandler(unpackermixin):
716 """read one part and process it with restricted capability
718 """read one part and process it with restricted capability
717
719
718 This allows to transmit exception raised on the producer size during part
720 This allows to transmit exception raised on the producer size during part
719 iteration while the consumer is reading a part.
721 iteration while the consumer is reading a part.
720
722
721 Part processed in this manner only have access to a ui object,"""
723 Part processed in this manner only have access to a ui object,"""
722
724
723 def __init__(self, ui, fp):
725 def __init__(self, ui, fp):
724 super(interrupthandler, self).__init__(fp)
726 super(interrupthandler, self).__init__(fp)
725 self.ui = ui
727 self.ui = ui
726
728
727 def _readpartheader(self):
729 def _readpartheader(self):
728 """reads a part header size and return the bytes blob
730 """reads a part header size and return the bytes blob
729
731
730 returns None if empty"""
732 returns None if empty"""
731 headersize = self._unpack(_fpartheadersize)[0]
733 headersize = self._unpack(_fpartheadersize)[0]
732 if headersize < 0:
734 if headersize < 0:
733 raise error.BundleValueError('negative part header size: %i'
735 raise error.BundleValueError('negative part header size: %i'
734 % headersize)
736 % headersize)
735 self.ui.debug('part header size: %i\n' % headersize)
737 self.ui.debug('part header size: %i\n' % headersize)
736 if headersize:
738 if headersize:
737 return self._readexact(headersize)
739 return self._readexact(headersize)
738 return None
740 return None
739
741
740 def __call__(self):
742 def __call__(self):
741 self.ui.debug('bundle2 stream interruption, looking for a part.\n')
743 self.ui.debug('bundle2 stream interruption, looking for a part.\n')
742 headerblock = self._readpartheader()
744 headerblock = self._readpartheader()
743 if headerblock is None:
745 if headerblock is None:
744 self.ui.debug('no part found during interruption.\n')
746 self.ui.debug('no part found during interruption.\n')
745 return
747 return
746 part = unbundlepart(self.ui, headerblock, self._fp)
748 part = unbundlepart(self.ui, headerblock, self._fp)
747 op = interruptoperation(self.ui)
749 op = interruptoperation(self.ui)
748 _processpart(op, part)
750 _processpart(op, part)
749
751
750 class interruptoperation(object):
752 class interruptoperation(object):
751 """A limited operation to be use by part handler during interruption
753 """A limited operation to be use by part handler during interruption
752
754
753 It only have access to an ui object.
755 It only have access to an ui object.
754 """
756 """
755
757
756 def __init__(self, ui):
758 def __init__(self, ui):
757 self.ui = ui
759 self.ui = ui
758 self.reply = None
760 self.reply = None
759
761
760 @property
762 @property
761 def repo(self):
763 def repo(self):
762 raise RuntimeError('no repo access from stream interruption')
764 raise RuntimeError('no repo access from stream interruption')
763
765
764 def gettransaction(self):
766 def gettransaction(self):
765 raise TransactionUnavailable('no repo access from stream interruption')
767 raise TransactionUnavailable('no repo access from stream interruption')
766
768
767 class unbundlepart(unpackermixin):
769 class unbundlepart(unpackermixin):
768 """a bundle part read from a bundle"""
770 """a bundle part read from a bundle"""
769
771
770 def __init__(self, ui, header, fp):
772 def __init__(self, ui, header, fp):
771 super(unbundlepart, self).__init__(fp)
773 super(unbundlepart, self).__init__(fp)
772 self.ui = ui
774 self.ui = ui
773 # unbundle state attr
775 # unbundle state attr
774 self._headerdata = header
776 self._headerdata = header
775 self._headeroffset = 0
777 self._headeroffset = 0
776 self._initialized = False
778 self._initialized = False
777 self.consumed = False
779 self.consumed = False
778 # part data
780 # part data
779 self.id = None
781 self.id = None
780 self.type = None
782 self.type = None
781 self.mandatoryparams = None
783 self.mandatoryparams = None
782 self.advisoryparams = None
784 self.advisoryparams = None
783 self.params = None
785 self.params = None
784 self.mandatorykeys = ()
786 self.mandatorykeys = ()
785 self._payloadstream = None
787 self._payloadstream = None
786 self._readheader()
788 self._readheader()
787
789
788 def _fromheader(self, size):
790 def _fromheader(self, size):
789 """return the next <size> byte from the header"""
791 """return the next <size> byte from the header"""
790 offset = self._headeroffset
792 offset = self._headeroffset
791 data = self._headerdata[offset:(offset + size)]
793 data = self._headerdata[offset:(offset + size)]
792 self._headeroffset = offset + size
794 self._headeroffset = offset + size
793 return data
795 return data
794
796
795 def _unpackheader(self, format):
797 def _unpackheader(self, format):
796 """read given format from header
798 """read given format from header
797
799
798 This automatically compute the size of the format to read."""
800 This automatically compute the size of the format to read."""
799 data = self._fromheader(struct.calcsize(format))
801 data = self._fromheader(struct.calcsize(format))
800 return _unpack(format, data)
802 return _unpack(format, data)
801
803
802 def _initparams(self, mandatoryparams, advisoryparams):
804 def _initparams(self, mandatoryparams, advisoryparams):
803 """internal function to setup all logic related parameters"""
805 """internal function to setup all logic related parameters"""
804 # make it read only to prevent people touching it by mistake.
806 # make it read only to prevent people touching it by mistake.
805 self.mandatoryparams = tuple(mandatoryparams)
807 self.mandatoryparams = tuple(mandatoryparams)
806 self.advisoryparams = tuple(advisoryparams)
808 self.advisoryparams = tuple(advisoryparams)
807 # user friendly UI
809 # user friendly UI
808 self.params = dict(self.mandatoryparams)
810 self.params = dict(self.mandatoryparams)
809 self.params.update(dict(self.advisoryparams))
811 self.params.update(dict(self.advisoryparams))
810 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
812 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
811
813
812 def _readheader(self):
814 def _readheader(self):
813 """read the header and setup the object"""
815 """read the header and setup the object"""
814 typesize = self._unpackheader(_fparttypesize)[0]
816 typesize = self._unpackheader(_fparttypesize)[0]
815 self.type = self._fromheader(typesize)
817 self.type = self._fromheader(typesize)
816 self.ui.debug('part type: "%s"\n' % self.type)
818 self.ui.debug('part type: "%s"\n' % self.type)
817 self.id = self._unpackheader(_fpartid)[0]
819 self.id = self._unpackheader(_fpartid)[0]
818 self.ui.debug('part id: "%s"\n' % self.id)
820 self.ui.debug('part id: "%s"\n' % self.id)
819 ## reading parameters
821 ## reading parameters
820 # param count
822 # param count
821 mancount, advcount = self._unpackheader(_fpartparamcount)
823 mancount, advcount = self._unpackheader(_fpartparamcount)
822 self.ui.debug('part parameters: %i\n' % (mancount + advcount))
824 self.ui.debug('part parameters: %i\n' % (mancount + advcount))
823 # param size
825 # param size
824 fparamsizes = _makefpartparamsizes(mancount + advcount)
826 fparamsizes = _makefpartparamsizes(mancount + advcount)
825 paramsizes = self._unpackheader(fparamsizes)
827 paramsizes = self._unpackheader(fparamsizes)
826 # make it a list of couple again
828 # make it a list of couple again
827 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
829 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
828 # split mandatory from advisory
830 # split mandatory from advisory
829 mansizes = paramsizes[:mancount]
831 mansizes = paramsizes[:mancount]
830 advsizes = paramsizes[mancount:]
832 advsizes = paramsizes[mancount:]
831 # retrieve param value
833 # retrieve param value
832 manparams = []
834 manparams = []
833 for key, value in mansizes:
835 for key, value in mansizes:
834 manparams.append((self._fromheader(key), self._fromheader(value)))
836 manparams.append((self._fromheader(key), self._fromheader(value)))
835 advparams = []
837 advparams = []
836 for key, value in advsizes:
838 for key, value in advsizes:
837 advparams.append((self._fromheader(key), self._fromheader(value)))
839 advparams.append((self._fromheader(key), self._fromheader(value)))
838 self._initparams(manparams, advparams)
840 self._initparams(manparams, advparams)
839 ## part payload
841 ## part payload
840 def payloadchunks():
842 def payloadchunks():
841 payloadsize = self._unpack(_fpayloadsize)[0]
843 payloadsize = self._unpack(_fpayloadsize)[0]
842 self.ui.debug('payload chunk size: %i\n' % payloadsize)
844 self.ui.debug('payload chunk size: %i\n' % payloadsize)
843 while payloadsize:
845 while payloadsize:
844 if payloadsize == flaginterrupt:
846 if payloadsize == flaginterrupt:
845 # interruption detection, the handler will now read a
847 # interruption detection, the handler will now read a
846 # single part and process it.
848 # single part and process it.
847 interrupthandler(self.ui, self._fp)()
849 interrupthandler(self.ui, self._fp)()
848 elif payloadsize < 0:
850 elif payloadsize < 0:
849 msg = 'negative payload chunk size: %i' % payloadsize
851 msg = 'negative payload chunk size: %i' % payloadsize
850 raise error.BundleValueError(msg)
852 raise error.BundleValueError(msg)
851 else:
853 else:
852 yield self._readexact(payloadsize)
854 yield self._readexact(payloadsize)
853 payloadsize = self._unpack(_fpayloadsize)[0]
855 payloadsize = self._unpack(_fpayloadsize)[0]
854 self.ui.debug('payload chunk size: %i\n' % payloadsize)
856 self.ui.debug('payload chunk size: %i\n' % payloadsize)
855 self._payloadstream = util.chunkbuffer(payloadchunks())
857 self._payloadstream = util.chunkbuffer(payloadchunks())
856 # we read the data, tell it
858 # we read the data, tell it
857 self._initialized = True
859 self._initialized = True
858
860
859 def read(self, size=None):
861 def read(self, size=None):
860 """read payload data"""
862 """read payload data"""
861 if not self._initialized:
863 if not self._initialized:
862 self._readheader()
864 self._readheader()
863 if size is None:
865 if size is None:
864 data = self._payloadstream.read()
866 data = self._payloadstream.read()
865 else:
867 else:
866 data = self._payloadstream.read(size)
868 data = self._payloadstream.read(size)
867 if size is None or len(data) < size:
869 if size is None or len(data) < size:
868 self.consumed = True
870 self.consumed = True
869 return data
871 return data
870
872
871 capabilities = {'HG2Y': (),
873 capabilities = {'HG2Y': (),
872 'b2x:listkeys': (),
874 'b2x:listkeys': (),
873 'b2x:pushkey': (),
875 'b2x:pushkey': (),
874 'digests': tuple(sorted(util.DIGESTS.keys())),
876 'digests': tuple(sorted(util.DIGESTS.keys())),
875 'b2x:remote-changegroup': ('http', 'https'),
877 'b2x:remote-changegroup': ('http', 'https'),
876 }
878 }
877
879
878 def getrepocaps(repo):
880 def getrepocaps(repo):
879 """return the bundle2 capabilities for a given repo
881 """return the bundle2 capabilities for a given repo
880
882
881 Exists to allow extensions (like evolution) to mutate the capabilities.
883 Exists to allow extensions (like evolution) to mutate the capabilities.
882 """
884 """
883 caps = capabilities.copy()
885 caps = capabilities.copy()
884 caps['b2x:changegroup'] = tuple(sorted(changegroup.packermap.keys()))
886 caps['b2x:changegroup'] = tuple(sorted(changegroup.packermap.keys()))
885 if obsolete.isenabled(repo, obsolete.exchangeopt):
887 if obsolete.isenabled(repo, obsolete.exchangeopt):
886 supportedformat = tuple('V%i' % v for v in obsolete.formats)
888 supportedformat = tuple('V%i' % v for v in obsolete.formats)
887 caps['b2x:obsmarkers'] = supportedformat
889 caps['b2x:obsmarkers'] = supportedformat
888 return caps
890 return caps
889
891
890 def bundle2caps(remote):
892 def bundle2caps(remote):
891 """return the bundle capabilities of a peer as dict"""
893 """return the bundle capabilities of a peer as dict"""
892 raw = remote.capable('bundle2-exp')
894 raw = remote.capable('bundle2-exp')
893 if not raw and raw != '':
895 if not raw and raw != '':
894 return {}
896 return {}
895 capsblob = urllib.unquote(remote.capable('bundle2-exp'))
897 capsblob = urllib.unquote(remote.capable('bundle2-exp'))
896 return decodecaps(capsblob)
898 return decodecaps(capsblob)
897
899
898 def obsmarkersversion(caps):
900 def obsmarkersversion(caps):
899 """extract the list of supported obsmarkers versions from a bundle2caps dict
901 """extract the list of supported obsmarkers versions from a bundle2caps dict
900 """
902 """
901 obscaps = caps.get('b2x:obsmarkers', ())
903 obscaps = caps.get('b2x:obsmarkers', ())
902 return [int(c[1:]) for c in obscaps if c.startswith('V')]
904 return [int(c[1:]) for c in obscaps if c.startswith('V')]
903
905
904 @parthandler('b2x:changegroup', ('version',))
906 @parthandler('b2x:changegroup', ('version',))
905 def handlechangegroup(op, inpart):
907 def handlechangegroup(op, inpart):
906 """apply a changegroup part on the repo
908 """apply a changegroup part on the repo
907
909
908 This is a very early implementation that will massive rework before being
910 This is a very early implementation that will massive rework before being
909 inflicted to any end-user.
911 inflicted to any end-user.
910 """
912 """
911 # Make sure we trigger a transaction creation
913 # Make sure we trigger a transaction creation
912 #
914 #
913 # The addchangegroup function will get a transaction object by itself, but
915 # The addchangegroup function will get a transaction object by itself, but
914 # we need to make sure we trigger the creation of a transaction object used
916 # we need to make sure we trigger the creation of a transaction object used
915 # for the whole processing scope.
917 # for the whole processing scope.
916 op.gettransaction()
918 op.gettransaction()
917 unpackerversion = inpart.params.get('version', '01')
919 unpackerversion = inpart.params.get('version', '01')
918 # We should raise an appropriate exception here
920 # We should raise an appropriate exception here
919 unpacker = changegroup.packermap[unpackerversion][1]
921 unpacker = changegroup.packermap[unpackerversion][1]
920 cg = unpacker(inpart, 'UN')
922 cg = unpacker(inpart, 'UN')
921 # the source and url passed here are overwritten by the one contained in
923 # the source and url passed here are overwritten by the one contained in
922 # the transaction.hookargs argument. So 'bundle2' is a placeholder
924 # the transaction.hookargs argument. So 'bundle2' is a placeholder
923 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
925 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
924 op.records.add('changegroup', {'return': ret})
926 op.records.add('changegroup', {'return': ret})
925 if op.reply is not None:
927 if op.reply is not None:
926 # This is definitely not the final form of this
928 # This is definitely not the final form of this
927 # return. But one need to start somewhere.
929 # return. But one need to start somewhere.
928 part = op.reply.newpart('b2x:reply:changegroup')
930 part = op.reply.newpart('b2x:reply:changegroup')
929 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
931 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
930 part.addparam('return', '%i' % ret, mandatory=False)
932 part.addparam('return', '%i' % ret, mandatory=False)
931 assert not inpart.read()
933 assert not inpart.read()
932
934
933 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
935 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
934 ['digest:%s' % k for k in util.DIGESTS.keys()])
936 ['digest:%s' % k for k in util.DIGESTS.keys()])
935 @parthandler('b2x:remote-changegroup', _remotechangegroupparams)
937 @parthandler('b2x:remote-changegroup', _remotechangegroupparams)
936 def handleremotechangegroup(op, inpart):
938 def handleremotechangegroup(op, inpart):
937 """apply a bundle10 on the repo, given an url and validation information
939 """apply a bundle10 on the repo, given an url and validation information
938
940
939 All the information about the remote bundle to import are given as
941 All the information about the remote bundle to import are given as
940 parameters. The parameters include:
942 parameters. The parameters include:
941 - url: the url to the bundle10.
943 - url: the url to the bundle10.
942 - size: the bundle10 file size. It is used to validate what was
944 - size: the bundle10 file size. It is used to validate what was
943 retrieved by the client matches the server knowledge about the bundle.
945 retrieved by the client matches the server knowledge about the bundle.
944 - digests: a space separated list of the digest types provided as
946 - digests: a space separated list of the digest types provided as
945 parameters.
947 parameters.
946 - digest:<digest-type>: the hexadecimal representation of the digest with
948 - digest:<digest-type>: the hexadecimal representation of the digest with
947 that name. Like the size, it is used to validate what was retrieved by
949 that name. Like the size, it is used to validate what was retrieved by
948 the client matches what the server knows about the bundle.
950 the client matches what the server knows about the bundle.
949
951
950 When multiple digest types are given, all of them are checked.
952 When multiple digest types are given, all of them are checked.
951 """
953 """
952 try:
954 try:
953 raw_url = inpart.params['url']
955 raw_url = inpart.params['url']
954 except KeyError:
956 except KeyError:
955 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'url')
957 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'url')
956 parsed_url = util.url(raw_url)
958 parsed_url = util.url(raw_url)
957 if parsed_url.scheme not in capabilities['b2x:remote-changegroup']:
959 if parsed_url.scheme not in capabilities['b2x:remote-changegroup']:
958 raise util.Abort(_('remote-changegroup does not support %s urls') %
960 raise util.Abort(_('remote-changegroup does not support %s urls') %
959 parsed_url.scheme)
961 parsed_url.scheme)
960
962
961 try:
963 try:
962 size = int(inpart.params['size'])
964 size = int(inpart.params['size'])
963 except ValueError:
965 except ValueError:
964 raise util.Abort(_('remote-changegroup: invalid value for param "%s"')
966 raise util.Abort(_('remote-changegroup: invalid value for param "%s"')
965 % 'size')
967 % 'size')
966 except KeyError:
968 except KeyError:
967 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'size')
969 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'size')
968
970
969 digests = {}
971 digests = {}
970 for typ in inpart.params.get('digests', '').split():
972 for typ in inpart.params.get('digests', '').split():
971 param = 'digest:%s' % typ
973 param = 'digest:%s' % typ
972 try:
974 try:
973 value = inpart.params[param]
975 value = inpart.params[param]
974 except KeyError:
976 except KeyError:
975 raise util.Abort(_('remote-changegroup: missing "%s" param') %
977 raise util.Abort(_('remote-changegroup: missing "%s" param') %
976 param)
978 param)
977 digests[typ] = value
979 digests[typ] = value
978
980
979 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
981 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
980
982
981 # Make sure we trigger a transaction creation
983 # Make sure we trigger a transaction creation
982 #
984 #
983 # The addchangegroup function will get a transaction object by itself, but
985 # The addchangegroup function will get a transaction object by itself, but
984 # we need to make sure we trigger the creation of a transaction object used
986 # we need to make sure we trigger the creation of a transaction object used
985 # for the whole processing scope.
987 # for the whole processing scope.
986 op.gettransaction()
988 op.gettransaction()
987 import exchange
989 import exchange
988 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
990 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
989 if not isinstance(cg, changegroup.cg1unpacker):
991 if not isinstance(cg, changegroup.cg1unpacker):
990 raise util.Abort(_('%s: not a bundle version 1.0') %
992 raise util.Abort(_('%s: not a bundle version 1.0') %
991 util.hidepassword(raw_url))
993 util.hidepassword(raw_url))
992 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
994 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
993 op.records.add('changegroup', {'return': ret})
995 op.records.add('changegroup', {'return': ret})
994 if op.reply is not None:
996 if op.reply is not None:
995 # This is definitely not the final form of this
997 # This is definitely not the final form of this
996 # return. But one need to start somewhere.
998 # return. But one need to start somewhere.
997 part = op.reply.newpart('b2x:reply:changegroup')
999 part = op.reply.newpart('b2x:reply:changegroup')
998 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1000 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
999 part.addparam('return', '%i' % ret, mandatory=False)
1001 part.addparam('return', '%i' % ret, mandatory=False)
1000 try:
1002 try:
1001 real_part.validate()
1003 real_part.validate()
1002 except util.Abort, e:
1004 except util.Abort, e:
1003 raise util.Abort(_('bundle at %s is corrupted:\n%s') %
1005 raise util.Abort(_('bundle at %s is corrupted:\n%s') %
1004 (util.hidepassword(raw_url), str(e)))
1006 (util.hidepassword(raw_url), str(e)))
1005 assert not inpart.read()
1007 assert not inpart.read()
1006
1008
1007 @parthandler('b2x:reply:changegroup', ('return', 'in-reply-to'))
1009 @parthandler('b2x:reply:changegroup', ('return', 'in-reply-to'))
1008 def handlereplychangegroup(op, inpart):
1010 def handlereplychangegroup(op, inpart):
1009 ret = int(inpart.params['return'])
1011 ret = int(inpart.params['return'])
1010 replyto = int(inpart.params['in-reply-to'])
1012 replyto = int(inpart.params['in-reply-to'])
1011 op.records.add('changegroup', {'return': ret}, replyto)
1013 op.records.add('changegroup', {'return': ret}, replyto)
1012
1014
1013 @parthandler('b2x:check:heads')
1015 @parthandler('b2x:check:heads')
1014 def handlecheckheads(op, inpart):
1016 def handlecheckheads(op, inpart):
1015 """check that head of the repo did not change
1017 """check that head of the repo did not change
1016
1018
1017 This is used to detect a push race when using unbundle.
1019 This is used to detect a push race when using unbundle.
1018 This replaces the "heads" argument of unbundle."""
1020 This replaces the "heads" argument of unbundle."""
1019 h = inpart.read(20)
1021 h = inpart.read(20)
1020 heads = []
1022 heads = []
1021 while len(h) == 20:
1023 while len(h) == 20:
1022 heads.append(h)
1024 heads.append(h)
1023 h = inpart.read(20)
1025 h = inpart.read(20)
1024 assert not h
1026 assert not h
1025 if heads != op.repo.heads():
1027 if heads != op.repo.heads():
1026 raise error.PushRaced('repository changed while pushing - '
1028 raise error.PushRaced('repository changed while pushing - '
1027 'please try again')
1029 'please try again')
1028
1030
1029 @parthandler('b2x:output')
1031 @parthandler('b2x:output')
1030 def handleoutput(op, inpart):
1032 def handleoutput(op, inpart):
1031 """forward output captured on the server to the client"""
1033 """forward output captured on the server to the client"""
1032 for line in inpart.read().splitlines():
1034 for line in inpart.read().splitlines():
1033 op.ui.write(('remote: %s\n' % line))
1035 op.ui.write(('remote: %s\n' % line))
1034
1036
1035 @parthandler('b2x:replycaps')
1037 @parthandler('b2x:replycaps')
1036 def handlereplycaps(op, inpart):
1038 def handlereplycaps(op, inpart):
1037 """Notify that a reply bundle should be created
1039 """Notify that a reply bundle should be created
1038
1040
1039 The payload contains the capabilities information for the reply"""
1041 The payload contains the capabilities information for the reply"""
1040 caps = decodecaps(inpart.read())
1042 caps = decodecaps(inpart.read())
1041 if op.reply is None:
1043 if op.reply is None:
1042 op.reply = bundle20(op.ui, caps)
1044 op.reply = bundle20(op.ui, caps)
1043
1045
1044 @parthandler('b2x:error:abort', ('message', 'hint'))
1046 @parthandler('b2x:error:abort', ('message', 'hint'))
1045 def handlereplycaps(op, inpart):
1047 def handlereplycaps(op, inpart):
1046 """Used to transmit abort error over the wire"""
1048 """Used to transmit abort error over the wire"""
1047 raise util.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
1049 raise util.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
1048
1050
1049 @parthandler('b2x:error:unsupportedcontent', ('parttype', 'params'))
1051 @parthandler('b2x:error:unsupportedcontent', ('parttype', 'params'))
1050 def handlereplycaps(op, inpart):
1052 def handlereplycaps(op, inpart):
1051 """Used to transmit unknown content error over the wire"""
1053 """Used to transmit unknown content error over the wire"""
1052 kwargs = {}
1054 kwargs = {}
1053 parttype = inpart.params.get('parttype')
1055 parttype = inpart.params.get('parttype')
1054 if parttype is not None:
1056 if parttype is not None:
1055 kwargs['parttype'] = parttype
1057 kwargs['parttype'] = parttype
1056 params = inpart.params.get('params')
1058 params = inpart.params.get('params')
1057 if params is not None:
1059 if params is not None:
1058 kwargs['params'] = params.split('\0')
1060 kwargs['params'] = params.split('\0')
1059
1061
1060 raise error.UnsupportedPartError(**kwargs)
1062 raise error.UnsupportedPartError(**kwargs)
1061
1063
1062 @parthandler('b2x:error:pushraced', ('message',))
1064 @parthandler('b2x:error:pushraced', ('message',))
1063 def handlereplycaps(op, inpart):
1065 def handlereplycaps(op, inpart):
1064 """Used to transmit push race error over the wire"""
1066 """Used to transmit push race error over the wire"""
1065 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1067 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1066
1068
1067 @parthandler('b2x:listkeys', ('namespace',))
1069 @parthandler('b2x:listkeys', ('namespace',))
1068 def handlelistkeys(op, inpart):
1070 def handlelistkeys(op, inpart):
1069 """retrieve pushkey namespace content stored in a bundle2"""
1071 """retrieve pushkey namespace content stored in a bundle2"""
1070 namespace = inpart.params['namespace']
1072 namespace = inpart.params['namespace']
1071 r = pushkey.decodekeys(inpart.read())
1073 r = pushkey.decodekeys(inpart.read())
1072 op.records.add('listkeys', (namespace, r))
1074 op.records.add('listkeys', (namespace, r))
1073
1075
1074 @parthandler('b2x:pushkey', ('namespace', 'key', 'old', 'new'))
1076 @parthandler('b2x:pushkey', ('namespace', 'key', 'old', 'new'))
1075 def handlepushkey(op, inpart):
1077 def handlepushkey(op, inpart):
1076 """process a pushkey request"""
1078 """process a pushkey request"""
1077 dec = pushkey.decode
1079 dec = pushkey.decode
1078 namespace = dec(inpart.params['namespace'])
1080 namespace = dec(inpart.params['namespace'])
1079 key = dec(inpart.params['key'])
1081 key = dec(inpart.params['key'])
1080 old = dec(inpart.params['old'])
1082 old = dec(inpart.params['old'])
1081 new = dec(inpart.params['new'])
1083 new = dec(inpart.params['new'])
1082 ret = op.repo.pushkey(namespace, key, old, new)
1084 ret = op.repo.pushkey(namespace, key, old, new)
1083 record = {'namespace': namespace,
1085 record = {'namespace': namespace,
1084 'key': key,
1086 'key': key,
1085 'old': old,
1087 'old': old,
1086 'new': new}
1088 'new': new}
1087 op.records.add('pushkey', record)
1089 op.records.add('pushkey', record)
1088 if op.reply is not None:
1090 if op.reply is not None:
1089 rpart = op.reply.newpart('b2x:reply:pushkey')
1091 rpart = op.reply.newpart('b2x:reply:pushkey')
1090 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1092 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1091 rpart.addparam('return', '%i' % ret, mandatory=False)
1093 rpart.addparam('return', '%i' % ret, mandatory=False)
1092
1094
1093 @parthandler('b2x:reply:pushkey', ('return', 'in-reply-to'))
1095 @parthandler('b2x:reply:pushkey', ('return', 'in-reply-to'))
1094 def handlepushkeyreply(op, inpart):
1096 def handlepushkeyreply(op, inpart):
1095 """retrieve the result of a pushkey request"""
1097 """retrieve the result of a pushkey request"""
1096 ret = int(inpart.params['return'])
1098 ret = int(inpart.params['return'])
1097 partid = int(inpart.params['in-reply-to'])
1099 partid = int(inpart.params['in-reply-to'])
1098 op.records.add('pushkey', {'return': ret}, partid)
1100 op.records.add('pushkey', {'return': ret}, partid)
1099
1101
1100 @parthandler('b2x:obsmarkers')
1102 @parthandler('b2x:obsmarkers')
1101 def handleobsmarker(op, inpart):
1103 def handleobsmarker(op, inpart):
1102 """add a stream of obsmarkers to the repo"""
1104 """add a stream of obsmarkers to the repo"""
1103 tr = op.gettransaction()
1105 tr = op.gettransaction()
1104 new = op.repo.obsstore.mergemarkers(tr, inpart.read())
1106 new = op.repo.obsstore.mergemarkers(tr, inpart.read())
1105 if new:
1107 if new:
1106 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1108 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1107 op.records.add('obsmarkers', {'new': new})
1109 op.records.add('obsmarkers', {'new': new})
1108 if op.reply is not None:
1110 if op.reply is not None:
1109 rpart = op.reply.newpart('b2x:reply:obsmarkers')
1111 rpart = op.reply.newpart('b2x:reply:obsmarkers')
1110 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1112 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1111 rpart.addparam('new', '%i' % new, mandatory=False)
1113 rpart.addparam('new', '%i' % new, mandatory=False)
1112
1114
1113
1115
1114 @parthandler('b2x:reply:obsmarkers', ('new', 'in-reply-to'))
1116 @parthandler('b2x:reply:obsmarkers', ('new', 'in-reply-to'))
1115 def handlepushkeyreply(op, inpart):
1117 def handlepushkeyreply(op, inpart):
1116 """retrieve the result of a pushkey request"""
1118 """retrieve the result of a pushkey request"""
1117 ret = int(inpart.params['new'])
1119 ret = int(inpart.params['new'])
1118 partid = int(inpart.params['in-reply-to'])
1120 partid = int(inpart.params['in-reply-to'])
1119 op.records.add('obsmarkers', {'new': ret}, partid)
1121 op.records.add('obsmarkers', {'new': ret}, partid)
General Comments 0
You need to be logged in to leave comments. Login now