##// END OF EJS Templates
bundle2caps: advertise the available versions for changegroup packer...
Pierre-Yves David -
r23169:e4dc2b0b default
parent child Browse files
Show More
@@ -1,1116 +1,1117
1 # bundle2.py - generic container format to transmit arbitrary data.
1 # bundle2.py - generic container format to transmit arbitrary data.
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7 """Handling of the new bundle2 format
7 """Handling of the new bundle2 format
8
8
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 payloads in an application agnostic way. It consist in a sequence of "parts"
10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 that will be handed to and processed by the application layer.
11 that will be handed to and processed by the application layer.
12
12
13
13
14 General format architecture
14 General format architecture
15 ===========================
15 ===========================
16
16
17 The format is architectured as follow
17 The format is architectured as follow
18
18
19 - magic string
19 - magic string
20 - stream level parameters
20 - stream level parameters
21 - payload parts (any number)
21 - payload parts (any number)
22 - end of stream marker.
22 - end of stream marker.
23
23
24 the Binary format
24 the Binary format
25 ============================
25 ============================
26
26
27 All numbers are unsigned and big-endian.
27 All numbers are unsigned and big-endian.
28
28
29 stream level parameters
29 stream level parameters
30 ------------------------
30 ------------------------
31
31
32 Binary format is as follow
32 Binary format is as follow
33
33
34 :params size: int32
34 :params size: int32
35
35
36 The total number of Bytes used by the parameters
36 The total number of Bytes used by the parameters
37
37
38 :params value: arbitrary number of Bytes
38 :params value: arbitrary number of Bytes
39
39
40 A blob of `params size` containing the serialized version of all stream level
40 A blob of `params size` containing the serialized version of all stream level
41 parameters.
41 parameters.
42
42
43 The blob contains a space separated list of parameters. Parameters with value
43 The blob contains a space separated list of parameters. Parameters with value
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45
45
46 Empty name are obviously forbidden.
46 Empty name are obviously forbidden.
47
47
48 Name MUST start with a letter. If this first letter is lower case, the
48 Name MUST start with a letter. If this first letter is lower case, the
49 parameter is advisory and can be safely ignored. However when the first
49 parameter is advisory and can be safely ignored. However when the first
50 letter is capital, the parameter is mandatory and the bundling process MUST
50 letter is capital, the parameter is mandatory and the bundling process MUST
51 stop if he is not able to proceed it.
51 stop if he is not able to proceed it.
52
52
53 Stream parameters use a simple textual format for two main reasons:
53 Stream parameters use a simple textual format for two main reasons:
54
54
55 - Stream level parameters should remain simple and we want to discourage any
55 - Stream level parameters should remain simple and we want to discourage any
56 crazy usage.
56 crazy usage.
57 - Textual data allow easy human inspection of a bundle2 header in case of
57 - Textual data allow easy human inspection of a bundle2 header in case of
58 troubles.
58 troubles.
59
59
60 Any Applicative level options MUST go into a bundle2 part instead.
60 Any Applicative level options MUST go into a bundle2 part instead.
61
61
62 Payload part
62 Payload part
63 ------------------------
63 ------------------------
64
64
65 Binary format is as follow
65 Binary format is as follow
66
66
67 :header size: int32
67 :header size: int32
68
68
69 The total number of Bytes used by the part headers. When the header is empty
69 The total number of Bytes used by the part headers. When the header is empty
70 (size = 0) this is interpreted as the end of stream marker.
70 (size = 0) this is interpreted as the end of stream marker.
71
71
72 :header:
72 :header:
73
73
74 The header defines how to interpret the part. It contains two piece of
74 The header defines how to interpret the part. It contains two piece of
75 data: the part type, and the part parameters.
75 data: the part type, and the part parameters.
76
76
77 The part type is used to route an application level handler, that can
77 The part type is used to route an application level handler, that can
78 interpret payload.
78 interpret payload.
79
79
80 Part parameters are passed to the application level handler. They are
80 Part parameters are passed to the application level handler. They are
81 meant to convey information that will help the application level object to
81 meant to convey information that will help the application level object to
82 interpret the part payload.
82 interpret the part payload.
83
83
84 The binary format of the header is has follow
84 The binary format of the header is has follow
85
85
86 :typesize: (one byte)
86 :typesize: (one byte)
87
87
88 :parttype: alphanumerical part name
88 :parttype: alphanumerical part name
89
89
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 to this part.
91 to this part.
92
92
93 :parameters:
93 :parameters:
94
94
95 Part's parameter may have arbitrary content, the binary structure is::
95 Part's parameter may have arbitrary content, the binary structure is::
96
96
97 <mandatory-count><advisory-count><param-sizes><param-data>
97 <mandatory-count><advisory-count><param-sizes><param-data>
98
98
99 :mandatory-count: 1 byte, number of mandatory parameters
99 :mandatory-count: 1 byte, number of mandatory parameters
100
100
101 :advisory-count: 1 byte, number of advisory parameters
101 :advisory-count: 1 byte, number of advisory parameters
102
102
103 :param-sizes:
103 :param-sizes:
104
104
105 N couple of bytes, where N is the total number of parameters. Each
105 N couple of bytes, where N is the total number of parameters. Each
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107
107
108 :param-data:
108 :param-data:
109
109
110 A blob of bytes from which each parameter key and value can be
110 A blob of bytes from which each parameter key and value can be
111 retrieved using the list of size couples stored in the previous
111 retrieved using the list of size couples stored in the previous
112 field.
112 field.
113
113
114 Mandatory parameters comes first, then the advisory ones.
114 Mandatory parameters comes first, then the advisory ones.
115
115
116 Each parameter's key MUST be unique within the part.
116 Each parameter's key MUST be unique within the part.
117
117
118 :payload:
118 :payload:
119
119
120 payload is a series of `<chunksize><chunkdata>`.
120 payload is a series of `<chunksize><chunkdata>`.
121
121
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124
124
125 The current implementation always produces either zero or one chunk.
125 The current implementation always produces either zero or one chunk.
126 This is an implementation limitation that will ultimately be lifted.
126 This is an implementation limitation that will ultimately be lifted.
127
127
128 `chunksize` can be negative to trigger special case processing. No such
128 `chunksize` can be negative to trigger special case processing. No such
129 processing is in place yet.
129 processing is in place yet.
130
130
131 Bundle processing
131 Bundle processing
132 ============================
132 ============================
133
133
134 Each part is processed in order using a "part handler". Handler are registered
134 Each part is processed in order using a "part handler". Handler are registered
135 for a certain part type.
135 for a certain part type.
136
136
137 The matching of a part to its handler is case insensitive. The case of the
137 The matching of a part to its handler is case insensitive. The case of the
138 part type is used to know if a part is mandatory or advisory. If the Part type
138 part type is used to know if a part is mandatory or advisory. If the Part type
139 contains any uppercase char it is considered mandatory. When no handler is
139 contains any uppercase char it is considered mandatory. When no handler is
140 known for a Mandatory part, the process is aborted and an exception is raised.
140 known for a Mandatory part, the process is aborted and an exception is raised.
141 If the part is advisory and no handler is known, the part is ignored. When the
141 If the part is advisory and no handler is known, the part is ignored. When the
142 process is aborted, the full bundle is still read from the stream to keep the
142 process is aborted, the full bundle is still read from the stream to keep the
143 channel usable. But none of the part read from an abort are processed. In the
143 channel usable. But none of the part read from an abort are processed. In the
144 future, dropping the stream may become an option for channel we do not care to
144 future, dropping the stream may become an option for channel we do not care to
145 preserve.
145 preserve.
146 """
146 """
147
147
148 import sys
148 import sys
149 import util
149 import util
150 import struct
150 import struct
151 import urllib
151 import urllib
152 import string
152 import string
153 import obsolete
153 import obsolete
154 import pushkey
154 import pushkey
155 import url
155 import url
156
156
157 import changegroup, error
157 import changegroup, error
158 from i18n import _
158 from i18n import _
159
159
160 _pack = struct.pack
160 _pack = struct.pack
161 _unpack = struct.unpack
161 _unpack = struct.unpack
162
162
163 _magicstring = 'HG2Y'
163 _magicstring = 'HG2Y'
164
164
165 _fstreamparamsize = '>i'
165 _fstreamparamsize = '>i'
166 _fpartheadersize = '>i'
166 _fpartheadersize = '>i'
167 _fparttypesize = '>B'
167 _fparttypesize = '>B'
168 _fpartid = '>I'
168 _fpartid = '>I'
169 _fpayloadsize = '>i'
169 _fpayloadsize = '>i'
170 _fpartparamcount = '>BB'
170 _fpartparamcount = '>BB'
171
171
172 preferedchunksize = 4096
172 preferedchunksize = 4096
173
173
174 def _makefpartparamsizes(nbparams):
174 def _makefpartparamsizes(nbparams):
175 """return a struct format to read part parameter sizes
175 """return a struct format to read part parameter sizes
176
176
177 The number parameters is variable so we need to build that format
177 The number parameters is variable so we need to build that format
178 dynamically.
178 dynamically.
179 """
179 """
180 return '>'+('BB'*nbparams)
180 return '>'+('BB'*nbparams)
181
181
182 parthandlermapping = {}
182 parthandlermapping = {}
183
183
184 def parthandler(parttype, params=()):
184 def parthandler(parttype, params=()):
185 """decorator that register a function as a bundle2 part handler
185 """decorator that register a function as a bundle2 part handler
186
186
187 eg::
187 eg::
188
188
189 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
189 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
190 def myparttypehandler(...):
190 def myparttypehandler(...):
191 '''process a part of type "my part".'''
191 '''process a part of type "my part".'''
192 ...
192 ...
193 """
193 """
194 def _decorator(func):
194 def _decorator(func):
195 lparttype = parttype.lower() # enforce lower case matching.
195 lparttype = parttype.lower() # enforce lower case matching.
196 assert lparttype not in parthandlermapping
196 assert lparttype not in parthandlermapping
197 parthandlermapping[lparttype] = func
197 parthandlermapping[lparttype] = func
198 func.params = frozenset(params)
198 func.params = frozenset(params)
199 return func
199 return func
200 return _decorator
200 return _decorator
201
201
202 class unbundlerecords(object):
202 class unbundlerecords(object):
203 """keep record of what happens during and unbundle
203 """keep record of what happens during and unbundle
204
204
205 New records are added using `records.add('cat', obj)`. Where 'cat' is a
205 New records are added using `records.add('cat', obj)`. Where 'cat' is a
206 category of record and obj is an arbitrary object.
206 category of record and obj is an arbitrary object.
207
207
208 `records['cat']` will return all entries of this category 'cat'.
208 `records['cat']` will return all entries of this category 'cat'.
209
209
210 Iterating on the object itself will yield `('category', obj)` tuples
210 Iterating on the object itself will yield `('category', obj)` tuples
211 for all entries.
211 for all entries.
212
212
213 All iterations happens in chronological order.
213 All iterations happens in chronological order.
214 """
214 """
215
215
216 def __init__(self):
216 def __init__(self):
217 self._categories = {}
217 self._categories = {}
218 self._sequences = []
218 self._sequences = []
219 self._replies = {}
219 self._replies = {}
220
220
221 def add(self, category, entry, inreplyto=None):
221 def add(self, category, entry, inreplyto=None):
222 """add a new record of a given category.
222 """add a new record of a given category.
223
223
224 The entry can then be retrieved in the list returned by
224 The entry can then be retrieved in the list returned by
225 self['category']."""
225 self['category']."""
226 self._categories.setdefault(category, []).append(entry)
226 self._categories.setdefault(category, []).append(entry)
227 self._sequences.append((category, entry))
227 self._sequences.append((category, entry))
228 if inreplyto is not None:
228 if inreplyto is not None:
229 self.getreplies(inreplyto).add(category, entry)
229 self.getreplies(inreplyto).add(category, entry)
230
230
231 def getreplies(self, partid):
231 def getreplies(self, partid):
232 """get the records that are replies to a specific part"""
232 """get the records that are replies to a specific part"""
233 return self._replies.setdefault(partid, unbundlerecords())
233 return self._replies.setdefault(partid, unbundlerecords())
234
234
235 def __getitem__(self, cat):
235 def __getitem__(self, cat):
236 return tuple(self._categories.get(cat, ()))
236 return tuple(self._categories.get(cat, ()))
237
237
238 def __iter__(self):
238 def __iter__(self):
239 return iter(self._sequences)
239 return iter(self._sequences)
240
240
241 def __len__(self):
241 def __len__(self):
242 return len(self._sequences)
242 return len(self._sequences)
243
243
244 def __nonzero__(self):
244 def __nonzero__(self):
245 return bool(self._sequences)
245 return bool(self._sequences)
246
246
247 class bundleoperation(object):
247 class bundleoperation(object):
248 """an object that represents a single bundling process
248 """an object that represents a single bundling process
249
249
250 Its purpose is to carry unbundle-related objects and states.
250 Its purpose is to carry unbundle-related objects and states.
251
251
252 A new object should be created at the beginning of each bundle processing.
252 A new object should be created at the beginning of each bundle processing.
253 The object is to be returned by the processing function.
253 The object is to be returned by the processing function.
254
254
255 The object has very little content now it will ultimately contain:
255 The object has very little content now it will ultimately contain:
256 * an access to the repo the bundle is applied to,
256 * an access to the repo the bundle is applied to,
257 * a ui object,
257 * a ui object,
258 * a way to retrieve a transaction to add changes to the repo,
258 * a way to retrieve a transaction to add changes to the repo,
259 * a way to record the result of processing each part,
259 * a way to record the result of processing each part,
260 * a way to construct a bundle response when applicable.
260 * a way to construct a bundle response when applicable.
261 """
261 """
262
262
263 def __init__(self, repo, transactiongetter):
263 def __init__(self, repo, transactiongetter):
264 self.repo = repo
264 self.repo = repo
265 self.ui = repo.ui
265 self.ui = repo.ui
266 self.records = unbundlerecords()
266 self.records = unbundlerecords()
267 self.gettransaction = transactiongetter
267 self.gettransaction = transactiongetter
268 self.reply = None
268 self.reply = None
269
269
270 class TransactionUnavailable(RuntimeError):
270 class TransactionUnavailable(RuntimeError):
271 pass
271 pass
272
272
273 def _notransaction():
273 def _notransaction():
274 """default method to get a transaction while processing a bundle
274 """default method to get a transaction while processing a bundle
275
275
276 Raise an exception to highlight the fact that no transaction was expected
276 Raise an exception to highlight the fact that no transaction was expected
277 to be created"""
277 to be created"""
278 raise TransactionUnavailable()
278 raise TransactionUnavailable()
279
279
280 def processbundle(repo, unbundler, transactiongetter=_notransaction):
280 def processbundle(repo, unbundler, transactiongetter=_notransaction):
281 """This function process a bundle, apply effect to/from a repo
281 """This function process a bundle, apply effect to/from a repo
282
282
283 It iterates over each part then searches for and uses the proper handling
283 It iterates over each part then searches for and uses the proper handling
284 code to process the part. Parts are processed in order.
284 code to process the part. Parts are processed in order.
285
285
286 This is very early version of this function that will be strongly reworked
286 This is very early version of this function that will be strongly reworked
287 before final usage.
287 before final usage.
288
288
289 Unknown Mandatory part will abort the process.
289 Unknown Mandatory part will abort the process.
290 """
290 """
291 op = bundleoperation(repo, transactiongetter)
291 op = bundleoperation(repo, transactiongetter)
292 # todo:
292 # todo:
293 # - replace this is a init function soon.
293 # - replace this is a init function soon.
294 # - exception catching
294 # - exception catching
295 unbundler.params
295 unbundler.params
296 iterparts = unbundler.iterparts()
296 iterparts = unbundler.iterparts()
297 part = None
297 part = None
298 try:
298 try:
299 for part in iterparts:
299 for part in iterparts:
300 _processpart(op, part)
300 _processpart(op, part)
301 except Exception, exc:
301 except Exception, exc:
302 for part in iterparts:
302 for part in iterparts:
303 # consume the bundle content
303 # consume the bundle content
304 part.read()
304 part.read()
305 # Small hack to let caller code distinguish exceptions from bundle2
305 # Small hack to let caller code distinguish exceptions from bundle2
306 # processing from processing the old format. This is mostly
306 # processing from processing the old format. This is mostly
307 # needed to handle different return codes to unbundle according to the
307 # needed to handle different return codes to unbundle according to the
308 # type of bundle. We should probably clean up or drop this return code
308 # type of bundle. We should probably clean up or drop this return code
309 # craziness in a future version.
309 # craziness in a future version.
310 exc.duringunbundle2 = True
310 exc.duringunbundle2 = True
311 raise
311 raise
312 return op
312 return op
313
313
314 def _processpart(op, part):
314 def _processpart(op, part):
315 """process a single part from a bundle
315 """process a single part from a bundle
316
316
317 The part is guaranteed to have been fully consumed when the function exits
317 The part is guaranteed to have been fully consumed when the function exits
318 (even if an exception is raised)."""
318 (even if an exception is raised)."""
319 try:
319 try:
320 parttype = part.type
320 parttype = part.type
321 # part key are matched lower case
321 # part key are matched lower case
322 key = parttype.lower()
322 key = parttype.lower()
323 try:
323 try:
324 handler = parthandlermapping.get(key)
324 handler = parthandlermapping.get(key)
325 if handler is None:
325 if handler is None:
326 raise error.UnsupportedPartError(parttype=key)
326 raise error.UnsupportedPartError(parttype=key)
327 op.ui.debug('found a handler for part %r\n' % parttype)
327 op.ui.debug('found a handler for part %r\n' % parttype)
328 unknownparams = part.mandatorykeys - handler.params
328 unknownparams = part.mandatorykeys - handler.params
329 if unknownparams:
329 if unknownparams:
330 unknownparams = list(unknownparams)
330 unknownparams = list(unknownparams)
331 unknownparams.sort()
331 unknownparams.sort()
332 raise error.UnsupportedPartError(parttype=key,
332 raise error.UnsupportedPartError(parttype=key,
333 params=unknownparams)
333 params=unknownparams)
334 except error.UnsupportedPartError, exc:
334 except error.UnsupportedPartError, exc:
335 if key != parttype: # mandatory parts
335 if key != parttype: # mandatory parts
336 raise
336 raise
337 op.ui.debug('ignoring unsupported advisory part %s\n' % exc)
337 op.ui.debug('ignoring unsupported advisory part %s\n' % exc)
338 return # skip to part processing
338 return # skip to part processing
339
339
340 # handler is called outside the above try block so that we don't
340 # handler is called outside the above try block so that we don't
341 # risk catching KeyErrors from anything other than the
341 # risk catching KeyErrors from anything other than the
342 # parthandlermapping lookup (any KeyError raised by handler()
342 # parthandlermapping lookup (any KeyError raised by handler()
343 # itself represents a defect of a different variety).
343 # itself represents a defect of a different variety).
344 output = None
344 output = None
345 if op.reply is not None:
345 if op.reply is not None:
346 op.ui.pushbuffer(error=True)
346 op.ui.pushbuffer(error=True)
347 output = ''
347 output = ''
348 try:
348 try:
349 handler(op, part)
349 handler(op, part)
350 finally:
350 finally:
351 if output is not None:
351 if output is not None:
352 output = op.ui.popbuffer()
352 output = op.ui.popbuffer()
353 if output:
353 if output:
354 outpart = op.reply.newpart('b2x:output', data=output)
354 outpart = op.reply.newpart('b2x:output', data=output)
355 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
355 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
356 finally:
356 finally:
357 # consume the part content to not corrupt the stream.
357 # consume the part content to not corrupt the stream.
358 part.read()
358 part.read()
359
359
360
360
361 def decodecaps(blob):
361 def decodecaps(blob):
362 """decode a bundle2 caps bytes blob into a dictionary
362 """decode a bundle2 caps bytes blob into a dictionary
363
363
364 The blob is a list of capabilities (one per line)
364 The blob is a list of capabilities (one per line)
365 Capabilities may have values using a line of the form::
365 Capabilities may have values using a line of the form::
366
366
367 capability=value1,value2,value3
367 capability=value1,value2,value3
368
368
369 The values are always a list."""
369 The values are always a list."""
370 caps = {}
370 caps = {}
371 for line in blob.splitlines():
371 for line in blob.splitlines():
372 if not line:
372 if not line:
373 continue
373 continue
374 if '=' not in line:
374 if '=' not in line:
375 key, vals = line, ()
375 key, vals = line, ()
376 else:
376 else:
377 key, vals = line.split('=', 1)
377 key, vals = line.split('=', 1)
378 vals = vals.split(',')
378 vals = vals.split(',')
379 key = urllib.unquote(key)
379 key = urllib.unquote(key)
380 vals = [urllib.unquote(v) for v in vals]
380 vals = [urllib.unquote(v) for v in vals]
381 caps[key] = vals
381 caps[key] = vals
382 return caps
382 return caps
383
383
384 def encodecaps(caps):
384 def encodecaps(caps):
385 """encode a bundle2 caps dictionary into a bytes blob"""
385 """encode a bundle2 caps dictionary into a bytes blob"""
386 chunks = []
386 chunks = []
387 for ca in sorted(caps):
387 for ca in sorted(caps):
388 vals = caps[ca]
388 vals = caps[ca]
389 ca = urllib.quote(ca)
389 ca = urllib.quote(ca)
390 vals = [urllib.quote(v) for v in vals]
390 vals = [urllib.quote(v) for v in vals]
391 if vals:
391 if vals:
392 ca = "%s=%s" % (ca, ','.join(vals))
392 ca = "%s=%s" % (ca, ','.join(vals))
393 chunks.append(ca)
393 chunks.append(ca)
394 return '\n'.join(chunks)
394 return '\n'.join(chunks)
395
395
396 class bundle20(object):
396 class bundle20(object):
397 """represent an outgoing bundle2 container
397 """represent an outgoing bundle2 container
398
398
399 Use the `addparam` method to add stream level parameter. and `newpart` to
399 Use the `addparam` method to add stream level parameter. and `newpart` to
400 populate it. Then call `getchunks` to retrieve all the binary chunks of
400 populate it. Then call `getchunks` to retrieve all the binary chunks of
401 data that compose the bundle2 container."""
401 data that compose the bundle2 container."""
402
402
403 def __init__(self, ui, capabilities=()):
403 def __init__(self, ui, capabilities=()):
404 self.ui = ui
404 self.ui = ui
405 self._params = []
405 self._params = []
406 self._parts = []
406 self._parts = []
407 self.capabilities = dict(capabilities)
407 self.capabilities = dict(capabilities)
408
408
409 @property
409 @property
410 def nbparts(self):
410 def nbparts(self):
411 """total number of parts added to the bundler"""
411 """total number of parts added to the bundler"""
412 return len(self._parts)
412 return len(self._parts)
413
413
414 # methods used to defines the bundle2 content
414 # methods used to defines the bundle2 content
415 def addparam(self, name, value=None):
415 def addparam(self, name, value=None):
416 """add a stream level parameter"""
416 """add a stream level parameter"""
417 if not name:
417 if not name:
418 raise ValueError('empty parameter name')
418 raise ValueError('empty parameter name')
419 if name[0] not in string.letters:
419 if name[0] not in string.letters:
420 raise ValueError('non letter first character: %r' % name)
420 raise ValueError('non letter first character: %r' % name)
421 self._params.append((name, value))
421 self._params.append((name, value))
422
422
423 def addpart(self, part):
423 def addpart(self, part):
424 """add a new part to the bundle2 container
424 """add a new part to the bundle2 container
425
425
426 Parts contains the actual applicative payload."""
426 Parts contains the actual applicative payload."""
427 assert part.id is None
427 assert part.id is None
428 part.id = len(self._parts) # very cheap counter
428 part.id = len(self._parts) # very cheap counter
429 self._parts.append(part)
429 self._parts.append(part)
430
430
431 def newpart(self, typeid, *args, **kwargs):
431 def newpart(self, typeid, *args, **kwargs):
432 """create a new part and add it to the containers
432 """create a new part and add it to the containers
433
433
434 As the part is directly added to the containers. For now, this means
434 As the part is directly added to the containers. For now, this means
435 that any failure to properly initialize the part after calling
435 that any failure to properly initialize the part after calling
436 ``newpart`` should result in a failure of the whole bundling process.
436 ``newpart`` should result in a failure of the whole bundling process.
437
437
438 You can still fall back to manually create and add if you need better
438 You can still fall back to manually create and add if you need better
439 control."""
439 control."""
440 part = bundlepart(typeid, *args, **kwargs)
440 part = bundlepart(typeid, *args, **kwargs)
441 self.addpart(part)
441 self.addpart(part)
442 return part
442 return part
443
443
444 # methods used to generate the bundle2 stream
444 # methods used to generate the bundle2 stream
445 def getchunks(self):
445 def getchunks(self):
446 self.ui.debug('start emission of %s stream\n' % _magicstring)
446 self.ui.debug('start emission of %s stream\n' % _magicstring)
447 yield _magicstring
447 yield _magicstring
448 param = self._paramchunk()
448 param = self._paramchunk()
449 self.ui.debug('bundle parameter: %s\n' % param)
449 self.ui.debug('bundle parameter: %s\n' % param)
450 yield _pack(_fstreamparamsize, len(param))
450 yield _pack(_fstreamparamsize, len(param))
451 if param:
451 if param:
452 yield param
452 yield param
453
453
454 self.ui.debug('start of parts\n')
454 self.ui.debug('start of parts\n')
455 for part in self._parts:
455 for part in self._parts:
456 self.ui.debug('bundle part: "%s"\n' % part.type)
456 self.ui.debug('bundle part: "%s"\n' % part.type)
457 for chunk in part.getchunks():
457 for chunk in part.getchunks():
458 yield chunk
458 yield chunk
459 self.ui.debug('end of bundle\n')
459 self.ui.debug('end of bundle\n')
460 yield _pack(_fpartheadersize, 0)
460 yield _pack(_fpartheadersize, 0)
461
461
462 def _paramchunk(self):
462 def _paramchunk(self):
463 """return a encoded version of all stream parameters"""
463 """return a encoded version of all stream parameters"""
464 blocks = []
464 blocks = []
465 for par, value in self._params:
465 for par, value in self._params:
466 par = urllib.quote(par)
466 par = urllib.quote(par)
467 if value is not None:
467 if value is not None:
468 value = urllib.quote(value)
468 value = urllib.quote(value)
469 par = '%s=%s' % (par, value)
469 par = '%s=%s' % (par, value)
470 blocks.append(par)
470 blocks.append(par)
471 return ' '.join(blocks)
471 return ' '.join(blocks)
472
472
473 class unpackermixin(object):
473 class unpackermixin(object):
474 """A mixin to extract bytes and struct data from a stream"""
474 """A mixin to extract bytes and struct data from a stream"""
475
475
476 def __init__(self, fp):
476 def __init__(self, fp):
477 self._fp = fp
477 self._fp = fp
478
478
479 def _unpack(self, format):
479 def _unpack(self, format):
480 """unpack this struct format from the stream"""
480 """unpack this struct format from the stream"""
481 data = self._readexact(struct.calcsize(format))
481 data = self._readexact(struct.calcsize(format))
482 return _unpack(format, data)
482 return _unpack(format, data)
483
483
484 def _readexact(self, size):
484 def _readexact(self, size):
485 """read exactly <size> bytes from the stream"""
485 """read exactly <size> bytes from the stream"""
486 return changegroup.readexactly(self._fp, size)
486 return changegroup.readexactly(self._fp, size)
487
487
488
488
489 class unbundle20(unpackermixin):
489 class unbundle20(unpackermixin):
490 """interpret a bundle2 stream
490 """interpret a bundle2 stream
491
491
492 This class is fed with a binary stream and yields parts through its
492 This class is fed with a binary stream and yields parts through its
493 `iterparts` methods."""
493 `iterparts` methods."""
494
494
495 def __init__(self, ui, fp, header=None):
495 def __init__(self, ui, fp, header=None):
496 """If header is specified, we do not read it out of the stream."""
496 """If header is specified, we do not read it out of the stream."""
497 self.ui = ui
497 self.ui = ui
498 super(unbundle20, self).__init__(fp)
498 super(unbundle20, self).__init__(fp)
499 if header is None:
499 if header is None:
500 header = self._readexact(4)
500 header = self._readexact(4)
501 magic, version = header[0:2], header[2:4]
501 magic, version = header[0:2], header[2:4]
502 if magic != 'HG':
502 if magic != 'HG':
503 raise util.Abort(_('not a Mercurial bundle'))
503 raise util.Abort(_('not a Mercurial bundle'))
504 if version != '2Y':
504 if version != '2Y':
505 raise util.Abort(_('unknown bundle version %s') % version)
505 raise util.Abort(_('unknown bundle version %s') % version)
506 self.ui.debug('start processing of %s stream\n' % header)
506 self.ui.debug('start processing of %s stream\n' % header)
507
507
508 @util.propertycache
508 @util.propertycache
509 def params(self):
509 def params(self):
510 """dictionary of stream level parameters"""
510 """dictionary of stream level parameters"""
511 self.ui.debug('reading bundle2 stream parameters\n')
511 self.ui.debug('reading bundle2 stream parameters\n')
512 params = {}
512 params = {}
513 paramssize = self._unpack(_fstreamparamsize)[0]
513 paramssize = self._unpack(_fstreamparamsize)[0]
514 if paramssize < 0:
514 if paramssize < 0:
515 raise error.BundleValueError('negative bundle param size: %i'
515 raise error.BundleValueError('negative bundle param size: %i'
516 % paramssize)
516 % paramssize)
517 if paramssize:
517 if paramssize:
518 for p in self._readexact(paramssize).split(' '):
518 for p in self._readexact(paramssize).split(' '):
519 p = p.split('=', 1)
519 p = p.split('=', 1)
520 p = [urllib.unquote(i) for i in p]
520 p = [urllib.unquote(i) for i in p]
521 if len(p) < 2:
521 if len(p) < 2:
522 p.append(None)
522 p.append(None)
523 self._processparam(*p)
523 self._processparam(*p)
524 params[p[0]] = p[1]
524 params[p[0]] = p[1]
525 return params
525 return params
526
526
527 def _processparam(self, name, value):
527 def _processparam(self, name, value):
528 """process a parameter, applying its effect if needed
528 """process a parameter, applying its effect if needed
529
529
530 Parameter starting with a lower case letter are advisory and will be
530 Parameter starting with a lower case letter are advisory and will be
531 ignored when unknown. Those starting with an upper case letter are
531 ignored when unknown. Those starting with an upper case letter are
532 mandatory and will this function will raise a KeyError when unknown.
532 mandatory and will this function will raise a KeyError when unknown.
533
533
534 Note: no option are currently supported. Any input will be either
534 Note: no option are currently supported. Any input will be either
535 ignored or failing.
535 ignored or failing.
536 """
536 """
537 if not name:
537 if not name:
538 raise ValueError('empty parameter name')
538 raise ValueError('empty parameter name')
539 if name[0] not in string.letters:
539 if name[0] not in string.letters:
540 raise ValueError('non letter first character: %r' % name)
540 raise ValueError('non letter first character: %r' % name)
541 # Some logic will be later added here to try to process the option for
541 # Some logic will be later added here to try to process the option for
542 # a dict of known parameter.
542 # a dict of known parameter.
543 if name[0].islower():
543 if name[0].islower():
544 self.ui.debug("ignoring unknown parameter %r\n" % name)
544 self.ui.debug("ignoring unknown parameter %r\n" % name)
545 else:
545 else:
546 raise error.UnsupportedPartError(params=(name,))
546 raise error.UnsupportedPartError(params=(name,))
547
547
548
548
549 def iterparts(self):
549 def iterparts(self):
550 """yield all parts contained in the stream"""
550 """yield all parts contained in the stream"""
551 # make sure param have been loaded
551 # make sure param have been loaded
552 self.params
552 self.params
553 self.ui.debug('start extraction of bundle2 parts\n')
553 self.ui.debug('start extraction of bundle2 parts\n')
554 headerblock = self._readpartheader()
554 headerblock = self._readpartheader()
555 while headerblock is not None:
555 while headerblock is not None:
556 part = unbundlepart(self.ui, headerblock, self._fp)
556 part = unbundlepart(self.ui, headerblock, self._fp)
557 yield part
557 yield part
558 headerblock = self._readpartheader()
558 headerblock = self._readpartheader()
559 self.ui.debug('end of bundle2 stream\n')
559 self.ui.debug('end of bundle2 stream\n')
560
560
561 def _readpartheader(self):
561 def _readpartheader(self):
562 """reads a part header size and return the bytes blob
562 """reads a part header size and return the bytes blob
563
563
564 returns None if empty"""
564 returns None if empty"""
565 headersize = self._unpack(_fpartheadersize)[0]
565 headersize = self._unpack(_fpartheadersize)[0]
566 if headersize < 0:
566 if headersize < 0:
567 raise error.BundleValueError('negative part header size: %i'
567 raise error.BundleValueError('negative part header size: %i'
568 % headersize)
568 % headersize)
569 self.ui.debug('part header size: %i\n' % headersize)
569 self.ui.debug('part header size: %i\n' % headersize)
570 if headersize:
570 if headersize:
571 return self._readexact(headersize)
571 return self._readexact(headersize)
572 return None
572 return None
573
573
574
574
575 class bundlepart(object):
575 class bundlepart(object):
576 """A bundle2 part contains application level payload
576 """A bundle2 part contains application level payload
577
577
578 The part `type` is used to route the part to the application level
578 The part `type` is used to route the part to the application level
579 handler.
579 handler.
580
580
581 The part payload is contained in ``part.data``. It could be raw bytes or a
581 The part payload is contained in ``part.data``. It could be raw bytes or a
582 generator of byte chunks.
582 generator of byte chunks.
583
583
584 You can add parameters to the part using the ``addparam`` method.
584 You can add parameters to the part using the ``addparam`` method.
585 Parameters can be either mandatory (default) or advisory. Remote side
585 Parameters can be either mandatory (default) or advisory. Remote side
586 should be able to safely ignore the advisory ones.
586 should be able to safely ignore the advisory ones.
587
587
588 Both data and parameters cannot be modified after the generation has begun.
588 Both data and parameters cannot be modified after the generation has begun.
589 """
589 """
590
590
591 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
591 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
592 data=''):
592 data=''):
593 self.id = None
593 self.id = None
594 self.type = parttype
594 self.type = parttype
595 self._data = data
595 self._data = data
596 self._mandatoryparams = list(mandatoryparams)
596 self._mandatoryparams = list(mandatoryparams)
597 self._advisoryparams = list(advisoryparams)
597 self._advisoryparams = list(advisoryparams)
598 # checking for duplicated entries
598 # checking for duplicated entries
599 self._seenparams = set()
599 self._seenparams = set()
600 for pname, __ in self._mandatoryparams + self._advisoryparams:
600 for pname, __ in self._mandatoryparams + self._advisoryparams:
601 if pname in self._seenparams:
601 if pname in self._seenparams:
602 raise RuntimeError('duplicated params: %s' % pname)
602 raise RuntimeError('duplicated params: %s' % pname)
603 self._seenparams.add(pname)
603 self._seenparams.add(pname)
604 # status of the part's generation:
604 # status of the part's generation:
605 # - None: not started,
605 # - None: not started,
606 # - False: currently generated,
606 # - False: currently generated,
607 # - True: generation done.
607 # - True: generation done.
608 self._generated = None
608 self._generated = None
609
609
610 # methods used to defines the part content
610 # methods used to defines the part content
611 def __setdata(self, data):
611 def __setdata(self, data):
612 if self._generated is not None:
612 if self._generated is not None:
613 raise error.ReadOnlyPartError('part is being generated')
613 raise error.ReadOnlyPartError('part is being generated')
614 self._data = data
614 self._data = data
615 def __getdata(self):
615 def __getdata(self):
616 return self._data
616 return self._data
617 data = property(__getdata, __setdata)
617 data = property(__getdata, __setdata)
618
618
619 @property
619 @property
620 def mandatoryparams(self):
620 def mandatoryparams(self):
621 # make it an immutable tuple to force people through ``addparam``
621 # make it an immutable tuple to force people through ``addparam``
622 return tuple(self._mandatoryparams)
622 return tuple(self._mandatoryparams)
623
623
624 @property
624 @property
625 def advisoryparams(self):
625 def advisoryparams(self):
626 # make it an immutable tuple to force people through ``addparam``
626 # make it an immutable tuple to force people through ``addparam``
627 return tuple(self._advisoryparams)
627 return tuple(self._advisoryparams)
628
628
629 def addparam(self, name, value='', mandatory=True):
629 def addparam(self, name, value='', mandatory=True):
630 if self._generated is not None:
630 if self._generated is not None:
631 raise error.ReadOnlyPartError('part is being generated')
631 raise error.ReadOnlyPartError('part is being generated')
632 if name in self._seenparams:
632 if name in self._seenparams:
633 raise ValueError('duplicated params: %s' % name)
633 raise ValueError('duplicated params: %s' % name)
634 self._seenparams.add(name)
634 self._seenparams.add(name)
635 params = self._advisoryparams
635 params = self._advisoryparams
636 if mandatory:
636 if mandatory:
637 params = self._mandatoryparams
637 params = self._mandatoryparams
638 params.append((name, value))
638 params.append((name, value))
639
639
640 # methods used to generates the bundle2 stream
640 # methods used to generates the bundle2 stream
641 def getchunks(self):
641 def getchunks(self):
642 if self._generated is not None:
642 if self._generated is not None:
643 raise RuntimeError('part can only be consumed once')
643 raise RuntimeError('part can only be consumed once')
644 self._generated = False
644 self._generated = False
645 #### header
645 #### header
646 ## parttype
646 ## parttype
647 header = [_pack(_fparttypesize, len(self.type)),
647 header = [_pack(_fparttypesize, len(self.type)),
648 self.type, _pack(_fpartid, self.id),
648 self.type, _pack(_fpartid, self.id),
649 ]
649 ]
650 ## parameters
650 ## parameters
651 # count
651 # count
652 manpar = self.mandatoryparams
652 manpar = self.mandatoryparams
653 advpar = self.advisoryparams
653 advpar = self.advisoryparams
654 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
654 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
655 # size
655 # size
656 parsizes = []
656 parsizes = []
657 for key, value in manpar:
657 for key, value in manpar:
658 parsizes.append(len(key))
658 parsizes.append(len(key))
659 parsizes.append(len(value))
659 parsizes.append(len(value))
660 for key, value in advpar:
660 for key, value in advpar:
661 parsizes.append(len(key))
661 parsizes.append(len(key))
662 parsizes.append(len(value))
662 parsizes.append(len(value))
663 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
663 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
664 header.append(paramsizes)
664 header.append(paramsizes)
665 # key, value
665 # key, value
666 for key, value in manpar:
666 for key, value in manpar:
667 header.append(key)
667 header.append(key)
668 header.append(value)
668 header.append(value)
669 for key, value in advpar:
669 for key, value in advpar:
670 header.append(key)
670 header.append(key)
671 header.append(value)
671 header.append(value)
672 ## finalize header
672 ## finalize header
673 headerchunk = ''.join(header)
673 headerchunk = ''.join(header)
674 yield _pack(_fpartheadersize, len(headerchunk))
674 yield _pack(_fpartheadersize, len(headerchunk))
675 yield headerchunk
675 yield headerchunk
676 ## payload
676 ## payload
677 try:
677 try:
678 for chunk in self._payloadchunks():
678 for chunk in self._payloadchunks():
679 yield _pack(_fpayloadsize, len(chunk))
679 yield _pack(_fpayloadsize, len(chunk))
680 yield chunk
680 yield chunk
681 except Exception, exc:
681 except Exception, exc:
682 # backup exception data for later
682 # backup exception data for later
683 exc_info = sys.exc_info()
683 exc_info = sys.exc_info()
684 msg = 'unexpected error: %s' % exc
684 msg = 'unexpected error: %s' % exc
685 interpart = bundlepart('b2x:error:abort', [('message', msg)])
685 interpart = bundlepart('b2x:error:abort', [('message', msg)])
686 interpart.id = 0
686 interpart.id = 0
687 yield _pack(_fpayloadsize, -1)
687 yield _pack(_fpayloadsize, -1)
688 for chunk in interpart.getchunks():
688 for chunk in interpart.getchunks():
689 yield chunk
689 yield chunk
690 # abort current part payload
690 # abort current part payload
691 yield _pack(_fpayloadsize, 0)
691 yield _pack(_fpayloadsize, 0)
692 raise exc_info[0], exc_info[1], exc_info[2]
692 raise exc_info[0], exc_info[1], exc_info[2]
693 # end of payload
693 # end of payload
694 yield _pack(_fpayloadsize, 0)
694 yield _pack(_fpayloadsize, 0)
695 self._generated = True
695 self._generated = True
696
696
697 def _payloadchunks(self):
697 def _payloadchunks(self):
698 """yield chunks of a the part payload
698 """yield chunks of a the part payload
699
699
700 Exists to handle the different methods to provide data to a part."""
700 Exists to handle the different methods to provide data to a part."""
701 # we only support fixed size data now.
701 # we only support fixed size data now.
702 # This will be improved in the future.
702 # This will be improved in the future.
703 if util.safehasattr(self.data, 'next'):
703 if util.safehasattr(self.data, 'next'):
704 buff = util.chunkbuffer(self.data)
704 buff = util.chunkbuffer(self.data)
705 chunk = buff.read(preferedchunksize)
705 chunk = buff.read(preferedchunksize)
706 while chunk:
706 while chunk:
707 yield chunk
707 yield chunk
708 chunk = buff.read(preferedchunksize)
708 chunk = buff.read(preferedchunksize)
709 elif len(self.data):
709 elif len(self.data):
710 yield self.data
710 yield self.data
711
711
712
712
713 flaginterrupt = -1
713 flaginterrupt = -1
714
714
715 class interrupthandler(unpackermixin):
715 class interrupthandler(unpackermixin):
716 """read one part and process it with restricted capability
716 """read one part and process it with restricted capability
717
717
718 This allows to transmit exception raised on the producer size during part
718 This allows to transmit exception raised on the producer size during part
719 iteration while the consumer is reading a part.
719 iteration while the consumer is reading a part.
720
720
721 Part processed in this manner only have access to a ui object,"""
721 Part processed in this manner only have access to a ui object,"""
722
722
723 def __init__(self, ui, fp):
723 def __init__(self, ui, fp):
724 super(interrupthandler, self).__init__(fp)
724 super(interrupthandler, self).__init__(fp)
725 self.ui = ui
725 self.ui = ui
726
726
727 def _readpartheader(self):
727 def _readpartheader(self):
728 """reads a part header size and return the bytes blob
728 """reads a part header size and return the bytes blob
729
729
730 returns None if empty"""
730 returns None if empty"""
731 headersize = self._unpack(_fpartheadersize)[0]
731 headersize = self._unpack(_fpartheadersize)[0]
732 if headersize < 0:
732 if headersize < 0:
733 raise error.BundleValueError('negative part header size: %i'
733 raise error.BundleValueError('negative part header size: %i'
734 % headersize)
734 % headersize)
735 self.ui.debug('part header size: %i\n' % headersize)
735 self.ui.debug('part header size: %i\n' % headersize)
736 if headersize:
736 if headersize:
737 return self._readexact(headersize)
737 return self._readexact(headersize)
738 return None
738 return None
739
739
740 def __call__(self):
740 def __call__(self):
741 self.ui.debug('bundle2 stream interruption, looking for a part.\n')
741 self.ui.debug('bundle2 stream interruption, looking for a part.\n')
742 headerblock = self._readpartheader()
742 headerblock = self._readpartheader()
743 if headerblock is None:
743 if headerblock is None:
744 self.ui.debug('no part found during interruption.\n')
744 self.ui.debug('no part found during interruption.\n')
745 return
745 return
746 part = unbundlepart(self.ui, headerblock, self._fp)
746 part = unbundlepart(self.ui, headerblock, self._fp)
747 op = interruptoperation(self.ui)
747 op = interruptoperation(self.ui)
748 _processpart(op, part)
748 _processpart(op, part)
749
749
750 class interruptoperation(object):
750 class interruptoperation(object):
751 """A limited operation to be use by part handler during interruption
751 """A limited operation to be use by part handler during interruption
752
752
753 It only have access to an ui object.
753 It only have access to an ui object.
754 """
754 """
755
755
756 def __init__(self, ui):
756 def __init__(self, ui):
757 self.ui = ui
757 self.ui = ui
758 self.reply = None
758 self.reply = None
759
759
760 @property
760 @property
761 def repo(self):
761 def repo(self):
762 raise RuntimeError('no repo access from stream interruption')
762 raise RuntimeError('no repo access from stream interruption')
763
763
764 def gettransaction(self):
764 def gettransaction(self):
765 raise TransactionUnavailable('no repo access from stream interruption')
765 raise TransactionUnavailable('no repo access from stream interruption')
766
766
767 class unbundlepart(unpackermixin):
767 class unbundlepart(unpackermixin):
768 """a bundle part read from a bundle"""
768 """a bundle part read from a bundle"""
769
769
770 def __init__(self, ui, header, fp):
770 def __init__(self, ui, header, fp):
771 super(unbundlepart, self).__init__(fp)
771 super(unbundlepart, self).__init__(fp)
772 self.ui = ui
772 self.ui = ui
773 # unbundle state attr
773 # unbundle state attr
774 self._headerdata = header
774 self._headerdata = header
775 self._headeroffset = 0
775 self._headeroffset = 0
776 self._initialized = False
776 self._initialized = False
777 self.consumed = False
777 self.consumed = False
778 # part data
778 # part data
779 self.id = None
779 self.id = None
780 self.type = None
780 self.type = None
781 self.mandatoryparams = None
781 self.mandatoryparams = None
782 self.advisoryparams = None
782 self.advisoryparams = None
783 self.params = None
783 self.params = None
784 self.mandatorykeys = ()
784 self.mandatorykeys = ()
785 self._payloadstream = None
785 self._payloadstream = None
786 self._readheader()
786 self._readheader()
787
787
788 def _fromheader(self, size):
788 def _fromheader(self, size):
789 """return the next <size> byte from the header"""
789 """return the next <size> byte from the header"""
790 offset = self._headeroffset
790 offset = self._headeroffset
791 data = self._headerdata[offset:(offset + size)]
791 data = self._headerdata[offset:(offset + size)]
792 self._headeroffset = offset + size
792 self._headeroffset = offset + size
793 return data
793 return data
794
794
795 def _unpackheader(self, format):
795 def _unpackheader(self, format):
796 """read given format from header
796 """read given format from header
797
797
798 This automatically compute the size of the format to read."""
798 This automatically compute the size of the format to read."""
799 data = self._fromheader(struct.calcsize(format))
799 data = self._fromheader(struct.calcsize(format))
800 return _unpack(format, data)
800 return _unpack(format, data)
801
801
802 def _initparams(self, mandatoryparams, advisoryparams):
802 def _initparams(self, mandatoryparams, advisoryparams):
803 """internal function to setup all logic related parameters"""
803 """internal function to setup all logic related parameters"""
804 # make it read only to prevent people touching it by mistake.
804 # make it read only to prevent people touching it by mistake.
805 self.mandatoryparams = tuple(mandatoryparams)
805 self.mandatoryparams = tuple(mandatoryparams)
806 self.advisoryparams = tuple(advisoryparams)
806 self.advisoryparams = tuple(advisoryparams)
807 # user friendly UI
807 # user friendly UI
808 self.params = dict(self.mandatoryparams)
808 self.params = dict(self.mandatoryparams)
809 self.params.update(dict(self.advisoryparams))
809 self.params.update(dict(self.advisoryparams))
810 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
810 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
811
811
812 def _readheader(self):
812 def _readheader(self):
813 """read the header and setup the object"""
813 """read the header and setup the object"""
814 typesize = self._unpackheader(_fparttypesize)[0]
814 typesize = self._unpackheader(_fparttypesize)[0]
815 self.type = self._fromheader(typesize)
815 self.type = self._fromheader(typesize)
816 self.ui.debug('part type: "%s"\n' % self.type)
816 self.ui.debug('part type: "%s"\n' % self.type)
817 self.id = self._unpackheader(_fpartid)[0]
817 self.id = self._unpackheader(_fpartid)[0]
818 self.ui.debug('part id: "%s"\n' % self.id)
818 self.ui.debug('part id: "%s"\n' % self.id)
819 ## reading parameters
819 ## reading parameters
820 # param count
820 # param count
821 mancount, advcount = self._unpackheader(_fpartparamcount)
821 mancount, advcount = self._unpackheader(_fpartparamcount)
822 self.ui.debug('part parameters: %i\n' % (mancount + advcount))
822 self.ui.debug('part parameters: %i\n' % (mancount + advcount))
823 # param size
823 # param size
824 fparamsizes = _makefpartparamsizes(mancount + advcount)
824 fparamsizes = _makefpartparamsizes(mancount + advcount)
825 paramsizes = self._unpackheader(fparamsizes)
825 paramsizes = self._unpackheader(fparamsizes)
826 # make it a list of couple again
826 # make it a list of couple again
827 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
827 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
828 # split mandatory from advisory
828 # split mandatory from advisory
829 mansizes = paramsizes[:mancount]
829 mansizes = paramsizes[:mancount]
830 advsizes = paramsizes[mancount:]
830 advsizes = paramsizes[mancount:]
831 # retrieve param value
831 # retrieve param value
832 manparams = []
832 manparams = []
833 for key, value in mansizes:
833 for key, value in mansizes:
834 manparams.append((self._fromheader(key), self._fromheader(value)))
834 manparams.append((self._fromheader(key), self._fromheader(value)))
835 advparams = []
835 advparams = []
836 for key, value in advsizes:
836 for key, value in advsizes:
837 advparams.append((self._fromheader(key), self._fromheader(value)))
837 advparams.append((self._fromheader(key), self._fromheader(value)))
838 self._initparams(manparams, advparams)
838 self._initparams(manparams, advparams)
839 ## part payload
839 ## part payload
840 def payloadchunks():
840 def payloadchunks():
841 payloadsize = self._unpack(_fpayloadsize)[0]
841 payloadsize = self._unpack(_fpayloadsize)[0]
842 self.ui.debug('payload chunk size: %i\n' % payloadsize)
842 self.ui.debug('payload chunk size: %i\n' % payloadsize)
843 while payloadsize:
843 while payloadsize:
844 if payloadsize == flaginterrupt:
844 if payloadsize == flaginterrupt:
845 # interruption detection, the handler will now read a
845 # interruption detection, the handler will now read a
846 # single part and process it.
846 # single part and process it.
847 interrupthandler(self.ui, self._fp)()
847 interrupthandler(self.ui, self._fp)()
848 elif payloadsize < 0:
848 elif payloadsize < 0:
849 msg = 'negative payload chunk size: %i' % payloadsize
849 msg = 'negative payload chunk size: %i' % payloadsize
850 raise error.BundleValueError(msg)
850 raise error.BundleValueError(msg)
851 else:
851 else:
852 yield self._readexact(payloadsize)
852 yield self._readexact(payloadsize)
853 payloadsize = self._unpack(_fpayloadsize)[0]
853 payloadsize = self._unpack(_fpayloadsize)[0]
854 self.ui.debug('payload chunk size: %i\n' % payloadsize)
854 self.ui.debug('payload chunk size: %i\n' % payloadsize)
855 self._payloadstream = util.chunkbuffer(payloadchunks())
855 self._payloadstream = util.chunkbuffer(payloadchunks())
856 # we read the data, tell it
856 # we read the data, tell it
857 self._initialized = True
857 self._initialized = True
858
858
859 def read(self, size=None):
859 def read(self, size=None):
860 """read payload data"""
860 """read payload data"""
861 if not self._initialized:
861 if not self._initialized:
862 self._readheader()
862 self._readheader()
863 if size is None:
863 if size is None:
864 data = self._payloadstream.read()
864 data = self._payloadstream.read()
865 else:
865 else:
866 data = self._payloadstream.read(size)
866 data = self._payloadstream.read(size)
867 if size is None or len(data) < size:
867 if size is None or len(data) < size:
868 self.consumed = True
868 self.consumed = True
869 return data
869 return data
870
870
871 capabilities = {'HG2Y': (),
871 capabilities = {'HG2Y': (),
872 'b2x:listkeys': (),
872 'b2x:listkeys': (),
873 'b2x:pushkey': (),
873 'b2x:pushkey': (),
874 'b2x:changegroup': (),
874 'b2x:changegroup': (),
875 'digests': tuple(sorted(util.DIGESTS.keys())),
875 'digests': tuple(sorted(util.DIGESTS.keys())),
876 'b2x:remote-changegroup': ('http', 'https'),
876 'b2x:remote-changegroup': ('http', 'https'),
877 }
877 }
878
878
879 def getrepocaps(repo):
879 def getrepocaps(repo):
880 """return the bundle2 capabilities for a given repo
880 """return the bundle2 capabilities for a given repo
881
881
882 Exists to allow extensions (like evolution) to mutate the capabilities.
882 Exists to allow extensions (like evolution) to mutate the capabilities.
883 """
883 """
884 caps = capabilities.copy()
884 caps = capabilities.copy()
885 caps['b2x:changegroup'] = tuple(sorted(changegroup.packermap.keys()))
885 if obsolete.isenabled(repo, obsolete.exchangeopt):
886 if obsolete.isenabled(repo, obsolete.exchangeopt):
886 supportedformat = tuple('V%i' % v for v in obsolete.formats)
887 supportedformat = tuple('V%i' % v for v in obsolete.formats)
887 caps['b2x:obsmarkers'] = supportedformat
888 caps['b2x:obsmarkers'] = supportedformat
888 return caps
889 return caps
889
890
890 def bundle2caps(remote):
891 def bundle2caps(remote):
891 """return the bundle capabilities of a peer as dict"""
892 """return the bundle capabilities of a peer as dict"""
892 raw = remote.capable('bundle2-exp')
893 raw = remote.capable('bundle2-exp')
893 if not raw and raw != '':
894 if not raw and raw != '':
894 return {}
895 return {}
895 capsblob = urllib.unquote(remote.capable('bundle2-exp'))
896 capsblob = urllib.unquote(remote.capable('bundle2-exp'))
896 return decodecaps(capsblob)
897 return decodecaps(capsblob)
897
898
898 def obsmarkersversion(caps):
899 def obsmarkersversion(caps):
899 """extract the list of supported obsmarkers versions from a bundle2caps dict
900 """extract the list of supported obsmarkers versions from a bundle2caps dict
900 """
901 """
901 obscaps = caps.get('b2x:obsmarkers', ())
902 obscaps = caps.get('b2x:obsmarkers', ())
902 return [int(c[1:]) for c in obscaps if c.startswith('V')]
903 return [int(c[1:]) for c in obscaps if c.startswith('V')]
903
904
904 @parthandler('b2x:changegroup')
905 @parthandler('b2x:changegroup')
905 def handlechangegroup(op, inpart):
906 def handlechangegroup(op, inpart):
906 """apply a changegroup part on the repo
907 """apply a changegroup part on the repo
907
908
908 This is a very early implementation that will massive rework before being
909 This is a very early implementation that will massive rework before being
909 inflicted to any end-user.
910 inflicted to any end-user.
910 """
911 """
911 # Make sure we trigger a transaction creation
912 # Make sure we trigger a transaction creation
912 #
913 #
913 # The addchangegroup function will get a transaction object by itself, but
914 # The addchangegroup function will get a transaction object by itself, but
914 # we need to make sure we trigger the creation of a transaction object used
915 # we need to make sure we trigger the creation of a transaction object used
915 # for the whole processing scope.
916 # for the whole processing scope.
916 op.gettransaction()
917 op.gettransaction()
917 cg = changegroup.cg1unpacker(inpart, 'UN')
918 cg = changegroup.cg1unpacker(inpart, 'UN')
918 # the source and url passed here are overwritten by the one contained in
919 # the source and url passed here are overwritten by the one contained in
919 # the transaction.hookargs argument. So 'bundle2' is a placeholder
920 # the transaction.hookargs argument. So 'bundle2' is a placeholder
920 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
921 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
921 op.records.add('changegroup', {'return': ret})
922 op.records.add('changegroup', {'return': ret})
922 if op.reply is not None:
923 if op.reply is not None:
923 # This is definitely not the final form of this
924 # This is definitely not the final form of this
924 # return. But one need to start somewhere.
925 # return. But one need to start somewhere.
925 part = op.reply.newpart('b2x:reply:changegroup')
926 part = op.reply.newpart('b2x:reply:changegroup')
926 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
927 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
927 part.addparam('return', '%i' % ret, mandatory=False)
928 part.addparam('return', '%i' % ret, mandatory=False)
928 assert not inpart.read()
929 assert not inpart.read()
929
930
930 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
931 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
931 ['digest:%s' % k for k in util.DIGESTS.keys()])
932 ['digest:%s' % k for k in util.DIGESTS.keys()])
932 @parthandler('b2x:remote-changegroup', _remotechangegroupparams)
933 @parthandler('b2x:remote-changegroup', _remotechangegroupparams)
933 def handleremotechangegroup(op, inpart):
934 def handleremotechangegroup(op, inpart):
934 """apply a bundle10 on the repo, given an url and validation information
935 """apply a bundle10 on the repo, given an url and validation information
935
936
936 All the information about the remote bundle to import are given as
937 All the information about the remote bundle to import are given as
937 parameters. The parameters include:
938 parameters. The parameters include:
938 - url: the url to the bundle10.
939 - url: the url to the bundle10.
939 - size: the bundle10 file size. It is used to validate what was
940 - size: the bundle10 file size. It is used to validate what was
940 retrieved by the client matches the server knowledge about the bundle.
941 retrieved by the client matches the server knowledge about the bundle.
941 - digests: a space separated list of the digest types provided as
942 - digests: a space separated list of the digest types provided as
942 parameters.
943 parameters.
943 - digest:<digest-type>: the hexadecimal representation of the digest with
944 - digest:<digest-type>: the hexadecimal representation of the digest with
944 that name. Like the size, it is used to validate what was retrieved by
945 that name. Like the size, it is used to validate what was retrieved by
945 the client matches what the server knows about the bundle.
946 the client matches what the server knows about the bundle.
946
947
947 When multiple digest types are given, all of them are checked.
948 When multiple digest types are given, all of them are checked.
948 """
949 """
949 try:
950 try:
950 raw_url = inpart.params['url']
951 raw_url = inpart.params['url']
951 except KeyError:
952 except KeyError:
952 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'url')
953 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'url')
953 parsed_url = util.url(raw_url)
954 parsed_url = util.url(raw_url)
954 if parsed_url.scheme not in capabilities['b2x:remote-changegroup']:
955 if parsed_url.scheme not in capabilities['b2x:remote-changegroup']:
955 raise util.Abort(_('remote-changegroup does not support %s urls') %
956 raise util.Abort(_('remote-changegroup does not support %s urls') %
956 parsed_url.scheme)
957 parsed_url.scheme)
957
958
958 try:
959 try:
959 size = int(inpart.params['size'])
960 size = int(inpart.params['size'])
960 except ValueError:
961 except ValueError:
961 raise util.Abort(_('remote-changegroup: invalid value for param "%s"')
962 raise util.Abort(_('remote-changegroup: invalid value for param "%s"')
962 % 'size')
963 % 'size')
963 except KeyError:
964 except KeyError:
964 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'size')
965 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'size')
965
966
966 digests = {}
967 digests = {}
967 for typ in inpart.params.get('digests', '').split():
968 for typ in inpart.params.get('digests', '').split():
968 param = 'digest:%s' % typ
969 param = 'digest:%s' % typ
969 try:
970 try:
970 value = inpart.params[param]
971 value = inpart.params[param]
971 except KeyError:
972 except KeyError:
972 raise util.Abort(_('remote-changegroup: missing "%s" param') %
973 raise util.Abort(_('remote-changegroup: missing "%s" param') %
973 param)
974 param)
974 digests[typ] = value
975 digests[typ] = value
975
976
976 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
977 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
977
978
978 # Make sure we trigger a transaction creation
979 # Make sure we trigger a transaction creation
979 #
980 #
980 # The addchangegroup function will get a transaction object by itself, but
981 # The addchangegroup function will get a transaction object by itself, but
981 # we need to make sure we trigger the creation of a transaction object used
982 # we need to make sure we trigger the creation of a transaction object used
982 # for the whole processing scope.
983 # for the whole processing scope.
983 op.gettransaction()
984 op.gettransaction()
984 import exchange
985 import exchange
985 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
986 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
986 if not isinstance(cg, changegroup.cg1unpacker):
987 if not isinstance(cg, changegroup.cg1unpacker):
987 raise util.Abort(_('%s: not a bundle version 1.0') %
988 raise util.Abort(_('%s: not a bundle version 1.0') %
988 util.hidepassword(raw_url))
989 util.hidepassword(raw_url))
989 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
990 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
990 op.records.add('changegroup', {'return': ret})
991 op.records.add('changegroup', {'return': ret})
991 if op.reply is not None:
992 if op.reply is not None:
992 # This is definitely not the final form of this
993 # This is definitely not the final form of this
993 # return. But one need to start somewhere.
994 # return. But one need to start somewhere.
994 part = op.reply.newpart('b2x:reply:changegroup')
995 part = op.reply.newpart('b2x:reply:changegroup')
995 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
996 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
996 part.addparam('return', '%i' % ret, mandatory=False)
997 part.addparam('return', '%i' % ret, mandatory=False)
997 try:
998 try:
998 real_part.validate()
999 real_part.validate()
999 except util.Abort, e:
1000 except util.Abort, e:
1000 raise util.Abort(_('bundle at %s is corrupted:\n%s') %
1001 raise util.Abort(_('bundle at %s is corrupted:\n%s') %
1001 (util.hidepassword(raw_url), str(e)))
1002 (util.hidepassword(raw_url), str(e)))
1002 assert not inpart.read()
1003 assert not inpart.read()
1003
1004
1004 @parthandler('b2x:reply:changegroup', ('return', 'in-reply-to'))
1005 @parthandler('b2x:reply:changegroup', ('return', 'in-reply-to'))
1005 def handlereplychangegroup(op, inpart):
1006 def handlereplychangegroup(op, inpart):
1006 ret = int(inpart.params['return'])
1007 ret = int(inpart.params['return'])
1007 replyto = int(inpart.params['in-reply-to'])
1008 replyto = int(inpart.params['in-reply-to'])
1008 op.records.add('changegroup', {'return': ret}, replyto)
1009 op.records.add('changegroup', {'return': ret}, replyto)
1009
1010
1010 @parthandler('b2x:check:heads')
1011 @parthandler('b2x:check:heads')
1011 def handlecheckheads(op, inpart):
1012 def handlecheckheads(op, inpart):
1012 """check that head of the repo did not change
1013 """check that head of the repo did not change
1013
1014
1014 This is used to detect a push race when using unbundle.
1015 This is used to detect a push race when using unbundle.
1015 This replaces the "heads" argument of unbundle."""
1016 This replaces the "heads" argument of unbundle."""
1016 h = inpart.read(20)
1017 h = inpart.read(20)
1017 heads = []
1018 heads = []
1018 while len(h) == 20:
1019 while len(h) == 20:
1019 heads.append(h)
1020 heads.append(h)
1020 h = inpart.read(20)
1021 h = inpart.read(20)
1021 assert not h
1022 assert not h
1022 if heads != op.repo.heads():
1023 if heads != op.repo.heads():
1023 raise error.PushRaced('repository changed while pushing - '
1024 raise error.PushRaced('repository changed while pushing - '
1024 'please try again')
1025 'please try again')
1025
1026
1026 @parthandler('b2x:output')
1027 @parthandler('b2x:output')
1027 def handleoutput(op, inpart):
1028 def handleoutput(op, inpart):
1028 """forward output captured on the server to the client"""
1029 """forward output captured on the server to the client"""
1029 for line in inpart.read().splitlines():
1030 for line in inpart.read().splitlines():
1030 op.ui.write(('remote: %s\n' % line))
1031 op.ui.write(('remote: %s\n' % line))
1031
1032
1032 @parthandler('b2x:replycaps')
1033 @parthandler('b2x:replycaps')
1033 def handlereplycaps(op, inpart):
1034 def handlereplycaps(op, inpart):
1034 """Notify that a reply bundle should be created
1035 """Notify that a reply bundle should be created
1035
1036
1036 The payload contains the capabilities information for the reply"""
1037 The payload contains the capabilities information for the reply"""
1037 caps = decodecaps(inpart.read())
1038 caps = decodecaps(inpart.read())
1038 if op.reply is None:
1039 if op.reply is None:
1039 op.reply = bundle20(op.ui, caps)
1040 op.reply = bundle20(op.ui, caps)
1040
1041
1041 @parthandler('b2x:error:abort', ('message', 'hint'))
1042 @parthandler('b2x:error:abort', ('message', 'hint'))
1042 def handlereplycaps(op, inpart):
1043 def handlereplycaps(op, inpart):
1043 """Used to transmit abort error over the wire"""
1044 """Used to transmit abort error over the wire"""
1044 raise util.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
1045 raise util.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
1045
1046
1046 @parthandler('b2x:error:unsupportedcontent', ('parttype', 'params'))
1047 @parthandler('b2x:error:unsupportedcontent', ('parttype', 'params'))
1047 def handlereplycaps(op, inpart):
1048 def handlereplycaps(op, inpart):
1048 """Used to transmit unknown content error over the wire"""
1049 """Used to transmit unknown content error over the wire"""
1049 kwargs = {}
1050 kwargs = {}
1050 parttype = inpart.params.get('parttype')
1051 parttype = inpart.params.get('parttype')
1051 if parttype is not None:
1052 if parttype is not None:
1052 kwargs['parttype'] = parttype
1053 kwargs['parttype'] = parttype
1053 params = inpart.params.get('params')
1054 params = inpart.params.get('params')
1054 if params is not None:
1055 if params is not None:
1055 kwargs['params'] = params.split('\0')
1056 kwargs['params'] = params.split('\0')
1056
1057
1057 raise error.UnsupportedPartError(**kwargs)
1058 raise error.UnsupportedPartError(**kwargs)
1058
1059
1059 @parthandler('b2x:error:pushraced', ('message',))
1060 @parthandler('b2x:error:pushraced', ('message',))
1060 def handlereplycaps(op, inpart):
1061 def handlereplycaps(op, inpart):
1061 """Used to transmit push race error over the wire"""
1062 """Used to transmit push race error over the wire"""
1062 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1063 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1063
1064
1064 @parthandler('b2x:listkeys', ('namespace',))
1065 @parthandler('b2x:listkeys', ('namespace',))
1065 def handlelistkeys(op, inpart):
1066 def handlelistkeys(op, inpart):
1066 """retrieve pushkey namespace content stored in a bundle2"""
1067 """retrieve pushkey namespace content stored in a bundle2"""
1067 namespace = inpart.params['namespace']
1068 namespace = inpart.params['namespace']
1068 r = pushkey.decodekeys(inpart.read())
1069 r = pushkey.decodekeys(inpart.read())
1069 op.records.add('listkeys', (namespace, r))
1070 op.records.add('listkeys', (namespace, r))
1070
1071
1071 @parthandler('b2x:pushkey', ('namespace', 'key', 'old', 'new'))
1072 @parthandler('b2x:pushkey', ('namespace', 'key', 'old', 'new'))
1072 def handlepushkey(op, inpart):
1073 def handlepushkey(op, inpart):
1073 """process a pushkey request"""
1074 """process a pushkey request"""
1074 dec = pushkey.decode
1075 dec = pushkey.decode
1075 namespace = dec(inpart.params['namespace'])
1076 namespace = dec(inpart.params['namespace'])
1076 key = dec(inpart.params['key'])
1077 key = dec(inpart.params['key'])
1077 old = dec(inpart.params['old'])
1078 old = dec(inpart.params['old'])
1078 new = dec(inpart.params['new'])
1079 new = dec(inpart.params['new'])
1079 ret = op.repo.pushkey(namespace, key, old, new)
1080 ret = op.repo.pushkey(namespace, key, old, new)
1080 record = {'namespace': namespace,
1081 record = {'namespace': namespace,
1081 'key': key,
1082 'key': key,
1082 'old': old,
1083 'old': old,
1083 'new': new}
1084 'new': new}
1084 op.records.add('pushkey', record)
1085 op.records.add('pushkey', record)
1085 if op.reply is not None:
1086 if op.reply is not None:
1086 rpart = op.reply.newpart('b2x:reply:pushkey')
1087 rpart = op.reply.newpart('b2x:reply:pushkey')
1087 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1088 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1088 rpart.addparam('return', '%i' % ret, mandatory=False)
1089 rpart.addparam('return', '%i' % ret, mandatory=False)
1089
1090
1090 @parthandler('b2x:reply:pushkey', ('return', 'in-reply-to'))
1091 @parthandler('b2x:reply:pushkey', ('return', 'in-reply-to'))
1091 def handlepushkeyreply(op, inpart):
1092 def handlepushkeyreply(op, inpart):
1092 """retrieve the result of a pushkey request"""
1093 """retrieve the result of a pushkey request"""
1093 ret = int(inpart.params['return'])
1094 ret = int(inpart.params['return'])
1094 partid = int(inpart.params['in-reply-to'])
1095 partid = int(inpart.params['in-reply-to'])
1095 op.records.add('pushkey', {'return': ret}, partid)
1096 op.records.add('pushkey', {'return': ret}, partid)
1096
1097
1097 @parthandler('b2x:obsmarkers')
1098 @parthandler('b2x:obsmarkers')
1098 def handleobsmarker(op, inpart):
1099 def handleobsmarker(op, inpart):
1099 """add a stream of obsmarkers to the repo"""
1100 """add a stream of obsmarkers to the repo"""
1100 tr = op.gettransaction()
1101 tr = op.gettransaction()
1101 new = op.repo.obsstore.mergemarkers(tr, inpart.read())
1102 new = op.repo.obsstore.mergemarkers(tr, inpart.read())
1102 if new:
1103 if new:
1103 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1104 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1104 op.records.add('obsmarkers', {'new': new})
1105 op.records.add('obsmarkers', {'new': new})
1105 if op.reply is not None:
1106 if op.reply is not None:
1106 rpart = op.reply.newpart('b2x:reply:obsmarkers')
1107 rpart = op.reply.newpart('b2x:reply:obsmarkers')
1107 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1108 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1108 rpart.addparam('new', '%i' % new, mandatory=False)
1109 rpart.addparam('new', '%i' % new, mandatory=False)
1109
1110
1110
1111
1111 @parthandler('b2x:reply:obsmarkers', ('new', 'in-reply-to'))
1112 @parthandler('b2x:reply:obsmarkers', ('new', 'in-reply-to'))
1112 def handlepushkeyreply(op, inpart):
1113 def handlepushkeyreply(op, inpart):
1113 """retrieve the result of a pushkey request"""
1114 """retrieve the result of a pushkey request"""
1114 ret = int(inpart.params['new'])
1115 ret = int(inpart.params['new'])
1115 partid = int(inpart.params['in-reply-to'])
1116 partid = int(inpart.params['in-reply-to'])
1116 op.records.add('obsmarkers', {'new': ret}, partid)
1117 op.records.add('obsmarkers', {'new': ret}, partid)
General Comments 0
You need to be logged in to leave comments. Login now