##// END OF EJS Templates
bundle2._processpart: forcing lower-case compare is no longer necessary...
Eric Sumner -
r23586:112f9c73 default
parent child Browse files
Show More
@@ -1,1127 +1,1124 b''
1 # bundle2.py - generic container format to transmit arbitrary data.
1 # bundle2.py - generic container format to transmit arbitrary data.
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7 """Handling of the new bundle2 format
7 """Handling of the new bundle2 format
8
8
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 payloads in an application agnostic way. It consist in a sequence of "parts"
10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 that will be handed to and processed by the application layer.
11 that will be handed to and processed by the application layer.
12
12
13
13
14 General format architecture
14 General format architecture
15 ===========================
15 ===========================
16
16
17 The format is architectured as follow
17 The format is architectured as follow
18
18
19 - magic string
19 - magic string
20 - stream level parameters
20 - stream level parameters
21 - payload parts (any number)
21 - payload parts (any number)
22 - end of stream marker.
22 - end of stream marker.
23
23
24 the Binary format
24 the Binary format
25 ============================
25 ============================
26
26
27 All numbers are unsigned and big-endian.
27 All numbers are unsigned and big-endian.
28
28
29 stream level parameters
29 stream level parameters
30 ------------------------
30 ------------------------
31
31
32 Binary format is as follow
32 Binary format is as follow
33
33
34 :params size: int32
34 :params size: int32
35
35
36 The total number of Bytes used by the parameters
36 The total number of Bytes used by the parameters
37
37
38 :params value: arbitrary number of Bytes
38 :params value: arbitrary number of Bytes
39
39
40 A blob of `params size` containing the serialized version of all stream level
40 A blob of `params size` containing the serialized version of all stream level
41 parameters.
41 parameters.
42
42
43 The blob contains a space separated list of parameters. Parameters with value
43 The blob contains a space separated list of parameters. Parameters with value
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45
45
46 Empty name are obviously forbidden.
46 Empty name are obviously forbidden.
47
47
48 Name MUST start with a letter. If this first letter is lower case, the
48 Name MUST start with a letter. If this first letter is lower case, the
49 parameter is advisory and can be safely ignored. However when the first
49 parameter is advisory and can be safely ignored. However when the first
50 letter is capital, the parameter is mandatory and the bundling process MUST
50 letter is capital, the parameter is mandatory and the bundling process MUST
51 stop if he is not able to proceed it.
51 stop if he is not able to proceed it.
52
52
53 Stream parameters use a simple textual format for two main reasons:
53 Stream parameters use a simple textual format for two main reasons:
54
54
55 - Stream level parameters should remain simple and we want to discourage any
55 - Stream level parameters should remain simple and we want to discourage any
56 crazy usage.
56 crazy usage.
57 - Textual data allow easy human inspection of a bundle2 header in case of
57 - Textual data allow easy human inspection of a bundle2 header in case of
58 troubles.
58 troubles.
59
59
60 Any Applicative level options MUST go into a bundle2 part instead.
60 Any Applicative level options MUST go into a bundle2 part instead.
61
61
62 Payload part
62 Payload part
63 ------------------------
63 ------------------------
64
64
65 Binary format is as follow
65 Binary format is as follow
66
66
67 :header size: int32
67 :header size: int32
68
68
69 The total number of Bytes used by the part headers. When the header is empty
69 The total number of Bytes used by the part headers. When the header is empty
70 (size = 0) this is interpreted as the end of stream marker.
70 (size = 0) this is interpreted as the end of stream marker.
71
71
72 :header:
72 :header:
73
73
74 The header defines how to interpret the part. It contains two piece of
74 The header defines how to interpret the part. It contains two piece of
75 data: the part type, and the part parameters.
75 data: the part type, and the part parameters.
76
76
77 The part type is used to route an application level handler, that can
77 The part type is used to route an application level handler, that can
78 interpret payload.
78 interpret payload.
79
79
80 Part parameters are passed to the application level handler. They are
80 Part parameters are passed to the application level handler. They are
81 meant to convey information that will help the application level object to
81 meant to convey information that will help the application level object to
82 interpret the part payload.
82 interpret the part payload.
83
83
84 The binary format of the header is has follow
84 The binary format of the header is has follow
85
85
86 :typesize: (one byte)
86 :typesize: (one byte)
87
87
88 :parttype: alphanumerical part name
88 :parttype: alphanumerical part name
89
89
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 to this part.
91 to this part.
92
92
93 :parameters:
93 :parameters:
94
94
95 Part's parameter may have arbitrary content, the binary structure is::
95 Part's parameter may have arbitrary content, the binary structure is::
96
96
97 <mandatory-count><advisory-count><param-sizes><param-data>
97 <mandatory-count><advisory-count><param-sizes><param-data>
98
98
99 :mandatory-count: 1 byte, number of mandatory parameters
99 :mandatory-count: 1 byte, number of mandatory parameters
100
100
101 :advisory-count: 1 byte, number of advisory parameters
101 :advisory-count: 1 byte, number of advisory parameters
102
102
103 :param-sizes:
103 :param-sizes:
104
104
105 N couple of bytes, where N is the total number of parameters. Each
105 N couple of bytes, where N is the total number of parameters. Each
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107
107
108 :param-data:
108 :param-data:
109
109
110 A blob of bytes from which each parameter key and value can be
110 A blob of bytes from which each parameter key and value can be
111 retrieved using the list of size couples stored in the previous
111 retrieved using the list of size couples stored in the previous
112 field.
112 field.
113
113
114 Mandatory parameters comes first, then the advisory ones.
114 Mandatory parameters comes first, then the advisory ones.
115
115
116 Each parameter's key MUST be unique within the part.
116 Each parameter's key MUST be unique within the part.
117
117
118 :payload:
118 :payload:
119
119
120 payload is a series of `<chunksize><chunkdata>`.
120 payload is a series of `<chunksize><chunkdata>`.
121
121
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124
124
125 The current implementation always produces either zero or one chunk.
125 The current implementation always produces either zero or one chunk.
126 This is an implementation limitation that will ultimately be lifted.
126 This is an implementation limitation that will ultimately be lifted.
127
127
128 `chunksize` can be negative to trigger special case processing. No such
128 `chunksize` can be negative to trigger special case processing. No such
129 processing is in place yet.
129 processing is in place yet.
130
130
131 Bundle processing
131 Bundle processing
132 ============================
132 ============================
133
133
134 Each part is processed in order using a "part handler". Handler are registered
134 Each part is processed in order using a "part handler". Handler are registered
135 for a certain part type.
135 for a certain part type.
136
136
137 The matching of a part to its handler is case insensitive. The case of the
137 The matching of a part to its handler is case insensitive. The case of the
138 part type is used to know if a part is mandatory or advisory. If the Part type
138 part type is used to know if a part is mandatory or advisory. If the Part type
139 contains any uppercase char it is considered mandatory. When no handler is
139 contains any uppercase char it is considered mandatory. When no handler is
140 known for a Mandatory part, the process is aborted and an exception is raised.
140 known for a Mandatory part, the process is aborted and an exception is raised.
141 If the part is advisory and no handler is known, the part is ignored. When the
141 If the part is advisory and no handler is known, the part is ignored. When the
142 process is aborted, the full bundle is still read from the stream to keep the
142 process is aborted, the full bundle is still read from the stream to keep the
143 channel usable. But none of the part read from an abort are processed. In the
143 channel usable. But none of the part read from an abort are processed. In the
144 future, dropping the stream may become an option for channel we do not care to
144 future, dropping the stream may become an option for channel we do not care to
145 preserve.
145 preserve.
146 """
146 """
147
147
148 import sys
148 import sys
149 import util
149 import util
150 import struct
150 import struct
151 import urllib
151 import urllib
152 import string
152 import string
153 import obsolete
153 import obsolete
154 import pushkey
154 import pushkey
155 import url
155 import url
156
156
157 import changegroup, error
157 import changegroup, error
158 from i18n import _
158 from i18n import _
159
159
160 _pack = struct.pack
160 _pack = struct.pack
161 _unpack = struct.unpack
161 _unpack = struct.unpack
162
162
163 _magicstring = 'HG2Y'
163 _magicstring = 'HG2Y'
164
164
165 _fstreamparamsize = '>i'
165 _fstreamparamsize = '>i'
166 _fpartheadersize = '>i'
166 _fpartheadersize = '>i'
167 _fparttypesize = '>B'
167 _fparttypesize = '>B'
168 _fpartid = '>I'
168 _fpartid = '>I'
169 _fpayloadsize = '>i'
169 _fpayloadsize = '>i'
170 _fpartparamcount = '>BB'
170 _fpartparamcount = '>BB'
171
171
172 preferedchunksize = 4096
172 preferedchunksize = 4096
173
173
174 def _makefpartparamsizes(nbparams):
174 def _makefpartparamsizes(nbparams):
175 """return a struct format to read part parameter sizes
175 """return a struct format to read part parameter sizes
176
176
177 The number parameters is variable so we need to build that format
177 The number parameters is variable so we need to build that format
178 dynamically.
178 dynamically.
179 """
179 """
180 return '>'+('BB'*nbparams)
180 return '>'+('BB'*nbparams)
181
181
182 parthandlermapping = {}
182 parthandlermapping = {}
183
183
184 def parthandler(parttype, params=()):
184 def parthandler(parttype, params=()):
185 """decorator that register a function as a bundle2 part handler
185 """decorator that register a function as a bundle2 part handler
186
186
187 eg::
187 eg::
188
188
189 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
189 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
190 def myparttypehandler(...):
190 def myparttypehandler(...):
191 '''process a part of type "my part".'''
191 '''process a part of type "my part".'''
192 ...
192 ...
193 """
193 """
194 def _decorator(func):
194 def _decorator(func):
195 lparttype = parttype.lower() # enforce lower case matching.
195 lparttype = parttype.lower() # enforce lower case matching.
196 assert lparttype not in parthandlermapping
196 assert lparttype not in parthandlermapping
197 parthandlermapping[lparttype] = func
197 parthandlermapping[lparttype] = func
198 func.params = frozenset(params)
198 func.params = frozenset(params)
199 return func
199 return func
200 return _decorator
200 return _decorator
201
201
202 class unbundlerecords(object):
202 class unbundlerecords(object):
203 """keep record of what happens during and unbundle
203 """keep record of what happens during and unbundle
204
204
205 New records are added using `records.add('cat', obj)`. Where 'cat' is a
205 New records are added using `records.add('cat', obj)`. Where 'cat' is a
206 category of record and obj is an arbitrary object.
206 category of record and obj is an arbitrary object.
207
207
208 `records['cat']` will return all entries of this category 'cat'.
208 `records['cat']` will return all entries of this category 'cat'.
209
209
210 Iterating on the object itself will yield `('category', obj)` tuples
210 Iterating on the object itself will yield `('category', obj)` tuples
211 for all entries.
211 for all entries.
212
212
213 All iterations happens in chronological order.
213 All iterations happens in chronological order.
214 """
214 """
215
215
216 def __init__(self):
216 def __init__(self):
217 self._categories = {}
217 self._categories = {}
218 self._sequences = []
218 self._sequences = []
219 self._replies = {}
219 self._replies = {}
220
220
221 def add(self, category, entry, inreplyto=None):
221 def add(self, category, entry, inreplyto=None):
222 """add a new record of a given category.
222 """add a new record of a given category.
223
223
224 The entry can then be retrieved in the list returned by
224 The entry can then be retrieved in the list returned by
225 self['category']."""
225 self['category']."""
226 self._categories.setdefault(category, []).append(entry)
226 self._categories.setdefault(category, []).append(entry)
227 self._sequences.append((category, entry))
227 self._sequences.append((category, entry))
228 if inreplyto is not None:
228 if inreplyto is not None:
229 self.getreplies(inreplyto).add(category, entry)
229 self.getreplies(inreplyto).add(category, entry)
230
230
231 def getreplies(self, partid):
231 def getreplies(self, partid):
232 """get the records that are replies to a specific part"""
232 """get the records that are replies to a specific part"""
233 return self._replies.setdefault(partid, unbundlerecords())
233 return self._replies.setdefault(partid, unbundlerecords())
234
234
235 def __getitem__(self, cat):
235 def __getitem__(self, cat):
236 return tuple(self._categories.get(cat, ()))
236 return tuple(self._categories.get(cat, ()))
237
237
238 def __iter__(self):
238 def __iter__(self):
239 return iter(self._sequences)
239 return iter(self._sequences)
240
240
241 def __len__(self):
241 def __len__(self):
242 return len(self._sequences)
242 return len(self._sequences)
243
243
244 def __nonzero__(self):
244 def __nonzero__(self):
245 return bool(self._sequences)
245 return bool(self._sequences)
246
246
247 class bundleoperation(object):
247 class bundleoperation(object):
248 """an object that represents a single bundling process
248 """an object that represents a single bundling process
249
249
250 Its purpose is to carry unbundle-related objects and states.
250 Its purpose is to carry unbundle-related objects and states.
251
251
252 A new object should be created at the beginning of each bundle processing.
252 A new object should be created at the beginning of each bundle processing.
253 The object is to be returned by the processing function.
253 The object is to be returned by the processing function.
254
254
255 The object has very little content now it will ultimately contain:
255 The object has very little content now it will ultimately contain:
256 * an access to the repo the bundle is applied to,
256 * an access to the repo the bundle is applied to,
257 * a ui object,
257 * a ui object,
258 * a way to retrieve a transaction to add changes to the repo,
258 * a way to retrieve a transaction to add changes to the repo,
259 * a way to record the result of processing each part,
259 * a way to record the result of processing each part,
260 * a way to construct a bundle response when applicable.
260 * a way to construct a bundle response when applicable.
261 """
261 """
262
262
263 def __init__(self, repo, transactiongetter):
263 def __init__(self, repo, transactiongetter):
264 self.repo = repo
264 self.repo = repo
265 self.ui = repo.ui
265 self.ui = repo.ui
266 self.records = unbundlerecords()
266 self.records = unbundlerecords()
267 self.gettransaction = transactiongetter
267 self.gettransaction = transactiongetter
268 self.reply = None
268 self.reply = None
269
269
270 class TransactionUnavailable(RuntimeError):
270 class TransactionUnavailable(RuntimeError):
271 pass
271 pass
272
272
273 def _notransaction():
273 def _notransaction():
274 """default method to get a transaction while processing a bundle
274 """default method to get a transaction while processing a bundle
275
275
276 Raise an exception to highlight the fact that no transaction was expected
276 Raise an exception to highlight the fact that no transaction was expected
277 to be created"""
277 to be created"""
278 raise TransactionUnavailable()
278 raise TransactionUnavailable()
279
279
280 def processbundle(repo, unbundler, transactiongetter=None):
280 def processbundle(repo, unbundler, transactiongetter=None):
281 """This function process a bundle, apply effect to/from a repo
281 """This function process a bundle, apply effect to/from a repo
282
282
283 It iterates over each part then searches for and uses the proper handling
283 It iterates over each part then searches for and uses the proper handling
284 code to process the part. Parts are processed in order.
284 code to process the part. Parts are processed in order.
285
285
286 This is very early version of this function that will be strongly reworked
286 This is very early version of this function that will be strongly reworked
287 before final usage.
287 before final usage.
288
288
289 Unknown Mandatory part will abort the process.
289 Unknown Mandatory part will abort the process.
290 """
290 """
291 if transactiongetter is None:
291 if transactiongetter is None:
292 transactiongetter = _notransaction
292 transactiongetter = _notransaction
293 op = bundleoperation(repo, transactiongetter)
293 op = bundleoperation(repo, transactiongetter)
294 # todo:
294 # todo:
295 # - replace this is a init function soon.
295 # - replace this is a init function soon.
296 # - exception catching
296 # - exception catching
297 unbundler.params
297 unbundler.params
298 iterparts = unbundler.iterparts()
298 iterparts = unbundler.iterparts()
299 part = None
299 part = None
300 try:
300 try:
301 for part in iterparts:
301 for part in iterparts:
302 _processpart(op, part)
302 _processpart(op, part)
303 except Exception, exc:
303 except Exception, exc:
304 for part in iterparts:
304 for part in iterparts:
305 # consume the bundle content
305 # consume the bundle content
306 part.read()
306 part.read()
307 # Small hack to let caller code distinguish exceptions from bundle2
307 # Small hack to let caller code distinguish exceptions from bundle2
308 # processing from processing the old format. This is mostly
308 # processing from processing the old format. This is mostly
309 # needed to handle different return codes to unbundle according to the
309 # needed to handle different return codes to unbundle according to the
310 # type of bundle. We should probably clean up or drop this return code
310 # type of bundle. We should probably clean up or drop this return code
311 # craziness in a future version.
311 # craziness in a future version.
312 exc.duringunbundle2 = True
312 exc.duringunbundle2 = True
313 raise
313 raise
314 return op
314 return op
315
315
316 def _processpart(op, part):
316 def _processpart(op, part):
317 """process a single part from a bundle
317 """process a single part from a bundle
318
318
319 The part is guaranteed to have been fully consumed when the function exits
319 The part is guaranteed to have been fully consumed when the function exits
320 (even if an exception is raised)."""
320 (even if an exception is raised)."""
321 try:
321 try:
322 parttype = part.type
323 # part key are matched lower case
324 key = parttype.lower()
325 try:
322 try:
326 handler = parthandlermapping.get(key)
323 handler = parthandlermapping.get(part.type)
327 if handler is None:
324 if handler is None:
328 raise error.UnsupportedPartError(parttype=key)
325 raise error.UnsupportedPartError(parttype=part.type)
329 op.ui.debug('found a handler for part %r\n' % parttype)
326 op.ui.debug('found a handler for part %r\n' % part.type)
330 unknownparams = part.mandatorykeys - handler.params
327 unknownparams = part.mandatorykeys - handler.params
331 if unknownparams:
328 if unknownparams:
332 unknownparams = list(unknownparams)
329 unknownparams = list(unknownparams)
333 unknownparams.sort()
330 unknownparams.sort()
334 raise error.UnsupportedPartError(parttype=key,
331 raise error.UnsupportedPartError(parttype=part.type,
335 params=unknownparams)
332 params=unknownparams)
336 except error.UnsupportedPartError, exc:
333 except error.UnsupportedPartError, exc:
337 if part.mandatory: # mandatory parts
334 if part.mandatory: # mandatory parts
338 raise
335 raise
339 op.ui.debug('ignoring unsupported advisory part %s\n' % exc)
336 op.ui.debug('ignoring unsupported advisory part %s\n' % exc)
340 return # skip to part processing
337 return # skip to part processing
341
338
342 # handler is called outside the above try block so that we don't
339 # handler is called outside the above try block so that we don't
343 # risk catching KeyErrors from anything other than the
340 # risk catching KeyErrors from anything other than the
344 # parthandlermapping lookup (any KeyError raised by handler()
341 # parthandlermapping lookup (any KeyError raised by handler()
345 # itself represents a defect of a different variety).
342 # itself represents a defect of a different variety).
346 output = None
343 output = None
347 if op.reply is not None:
344 if op.reply is not None:
348 op.ui.pushbuffer(error=True)
345 op.ui.pushbuffer(error=True)
349 output = ''
346 output = ''
350 try:
347 try:
351 handler(op, part)
348 handler(op, part)
352 finally:
349 finally:
353 if output is not None:
350 if output is not None:
354 output = op.ui.popbuffer()
351 output = op.ui.popbuffer()
355 if output:
352 if output:
356 outpart = op.reply.newpart('b2x:output', data=output)
353 outpart = op.reply.newpart('b2x:output', data=output)
357 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
354 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
358 finally:
355 finally:
359 # consume the part content to not corrupt the stream.
356 # consume the part content to not corrupt the stream.
360 part.read()
357 part.read()
361
358
362
359
363 def decodecaps(blob):
360 def decodecaps(blob):
364 """decode a bundle2 caps bytes blob into a dictionary
361 """decode a bundle2 caps bytes blob into a dictionary
365
362
366 The blob is a list of capabilities (one per line)
363 The blob is a list of capabilities (one per line)
367 Capabilities may have values using a line of the form::
364 Capabilities may have values using a line of the form::
368
365
369 capability=value1,value2,value3
366 capability=value1,value2,value3
370
367
371 The values are always a list."""
368 The values are always a list."""
372 caps = {}
369 caps = {}
373 for line in blob.splitlines():
370 for line in blob.splitlines():
374 if not line:
371 if not line:
375 continue
372 continue
376 if '=' not in line:
373 if '=' not in line:
377 key, vals = line, ()
374 key, vals = line, ()
378 else:
375 else:
379 key, vals = line.split('=', 1)
376 key, vals = line.split('=', 1)
380 vals = vals.split(',')
377 vals = vals.split(',')
381 key = urllib.unquote(key)
378 key = urllib.unquote(key)
382 vals = [urllib.unquote(v) for v in vals]
379 vals = [urllib.unquote(v) for v in vals]
383 caps[key] = vals
380 caps[key] = vals
384 return caps
381 return caps
385
382
386 def encodecaps(caps):
383 def encodecaps(caps):
387 """encode a bundle2 caps dictionary into a bytes blob"""
384 """encode a bundle2 caps dictionary into a bytes blob"""
388 chunks = []
385 chunks = []
389 for ca in sorted(caps):
386 for ca in sorted(caps):
390 vals = caps[ca]
387 vals = caps[ca]
391 ca = urllib.quote(ca)
388 ca = urllib.quote(ca)
392 vals = [urllib.quote(v) for v in vals]
389 vals = [urllib.quote(v) for v in vals]
393 if vals:
390 if vals:
394 ca = "%s=%s" % (ca, ','.join(vals))
391 ca = "%s=%s" % (ca, ','.join(vals))
395 chunks.append(ca)
392 chunks.append(ca)
396 return '\n'.join(chunks)
393 return '\n'.join(chunks)
397
394
398 class bundle20(object):
395 class bundle20(object):
399 """represent an outgoing bundle2 container
396 """represent an outgoing bundle2 container
400
397
401 Use the `addparam` method to add stream level parameter. and `newpart` to
398 Use the `addparam` method to add stream level parameter. and `newpart` to
402 populate it. Then call `getchunks` to retrieve all the binary chunks of
399 populate it. Then call `getchunks` to retrieve all the binary chunks of
403 data that compose the bundle2 container."""
400 data that compose the bundle2 container."""
404
401
405 def __init__(self, ui, capabilities=()):
402 def __init__(self, ui, capabilities=()):
406 self.ui = ui
403 self.ui = ui
407 self._params = []
404 self._params = []
408 self._parts = []
405 self._parts = []
409 self.capabilities = dict(capabilities)
406 self.capabilities = dict(capabilities)
410
407
411 @property
408 @property
412 def nbparts(self):
409 def nbparts(self):
413 """total number of parts added to the bundler"""
410 """total number of parts added to the bundler"""
414 return len(self._parts)
411 return len(self._parts)
415
412
416 # methods used to defines the bundle2 content
413 # methods used to defines the bundle2 content
417 def addparam(self, name, value=None):
414 def addparam(self, name, value=None):
418 """add a stream level parameter"""
415 """add a stream level parameter"""
419 if not name:
416 if not name:
420 raise ValueError('empty parameter name')
417 raise ValueError('empty parameter name')
421 if name[0] not in string.letters:
418 if name[0] not in string.letters:
422 raise ValueError('non letter first character: %r' % name)
419 raise ValueError('non letter first character: %r' % name)
423 self._params.append((name, value))
420 self._params.append((name, value))
424
421
425 def addpart(self, part):
422 def addpart(self, part):
426 """add a new part to the bundle2 container
423 """add a new part to the bundle2 container
427
424
428 Parts contains the actual applicative payload."""
425 Parts contains the actual applicative payload."""
429 assert part.id is None
426 assert part.id is None
430 part.id = len(self._parts) # very cheap counter
427 part.id = len(self._parts) # very cheap counter
431 self._parts.append(part)
428 self._parts.append(part)
432
429
433 def newpart(self, typeid, *args, **kwargs):
430 def newpart(self, typeid, *args, **kwargs):
434 """create a new part and add it to the containers
431 """create a new part and add it to the containers
435
432
436 As the part is directly added to the containers. For now, this means
433 As the part is directly added to the containers. For now, this means
437 that any failure to properly initialize the part after calling
434 that any failure to properly initialize the part after calling
438 ``newpart`` should result in a failure of the whole bundling process.
435 ``newpart`` should result in a failure of the whole bundling process.
439
436
440 You can still fall back to manually create and add if you need better
437 You can still fall back to manually create and add if you need better
441 control."""
438 control."""
442 part = bundlepart(typeid, *args, **kwargs)
439 part = bundlepart(typeid, *args, **kwargs)
443 self.addpart(part)
440 self.addpart(part)
444 return part
441 return part
445
442
446 # methods used to generate the bundle2 stream
443 # methods used to generate the bundle2 stream
447 def getchunks(self):
444 def getchunks(self):
448 self.ui.debug('start emission of %s stream\n' % _magicstring)
445 self.ui.debug('start emission of %s stream\n' % _magicstring)
449 yield _magicstring
446 yield _magicstring
450 param = self._paramchunk()
447 param = self._paramchunk()
451 self.ui.debug('bundle parameter: %s\n' % param)
448 self.ui.debug('bundle parameter: %s\n' % param)
452 yield _pack(_fstreamparamsize, len(param))
449 yield _pack(_fstreamparamsize, len(param))
453 if param:
450 if param:
454 yield param
451 yield param
455
452
456 self.ui.debug('start of parts\n')
453 self.ui.debug('start of parts\n')
457 for part in self._parts:
454 for part in self._parts:
458 self.ui.debug('bundle part: "%s"\n' % part.type)
455 self.ui.debug('bundle part: "%s"\n' % part.type)
459 for chunk in part.getchunks():
456 for chunk in part.getchunks():
460 yield chunk
457 yield chunk
461 self.ui.debug('end of bundle\n')
458 self.ui.debug('end of bundle\n')
462 yield _pack(_fpartheadersize, 0)
459 yield _pack(_fpartheadersize, 0)
463
460
464 def _paramchunk(self):
461 def _paramchunk(self):
465 """return a encoded version of all stream parameters"""
462 """return a encoded version of all stream parameters"""
466 blocks = []
463 blocks = []
467 for par, value in self._params:
464 for par, value in self._params:
468 par = urllib.quote(par)
465 par = urllib.quote(par)
469 if value is not None:
466 if value is not None:
470 value = urllib.quote(value)
467 value = urllib.quote(value)
471 par = '%s=%s' % (par, value)
468 par = '%s=%s' % (par, value)
472 blocks.append(par)
469 blocks.append(par)
473 return ' '.join(blocks)
470 return ' '.join(blocks)
474
471
475 class unpackermixin(object):
472 class unpackermixin(object):
476 """A mixin to extract bytes and struct data from a stream"""
473 """A mixin to extract bytes and struct data from a stream"""
477
474
478 def __init__(self, fp):
475 def __init__(self, fp):
479 self._fp = fp
476 self._fp = fp
480
477
481 def _unpack(self, format):
478 def _unpack(self, format):
482 """unpack this struct format from the stream"""
479 """unpack this struct format from the stream"""
483 data = self._readexact(struct.calcsize(format))
480 data = self._readexact(struct.calcsize(format))
484 return _unpack(format, data)
481 return _unpack(format, data)
485
482
486 def _readexact(self, size):
483 def _readexact(self, size):
487 """read exactly <size> bytes from the stream"""
484 """read exactly <size> bytes from the stream"""
488 return changegroup.readexactly(self._fp, size)
485 return changegroup.readexactly(self._fp, size)
489
486
490
487
491 class unbundle20(unpackermixin):
488 class unbundle20(unpackermixin):
492 """interpret a bundle2 stream
489 """interpret a bundle2 stream
493
490
494 This class is fed with a binary stream and yields parts through its
491 This class is fed with a binary stream and yields parts through its
495 `iterparts` methods."""
492 `iterparts` methods."""
496
493
497 def __init__(self, ui, fp, header=None):
494 def __init__(self, ui, fp, header=None):
498 """If header is specified, we do not read it out of the stream."""
495 """If header is specified, we do not read it out of the stream."""
499 self.ui = ui
496 self.ui = ui
500 super(unbundle20, self).__init__(fp)
497 super(unbundle20, self).__init__(fp)
501 if header is None:
498 if header is None:
502 header = self._readexact(4)
499 header = self._readexact(4)
503 magic, version = header[0:2], header[2:4]
500 magic, version = header[0:2], header[2:4]
504 if magic != 'HG':
501 if magic != 'HG':
505 raise util.Abort(_('not a Mercurial bundle'))
502 raise util.Abort(_('not a Mercurial bundle'))
506 if version != '2Y':
503 if version != '2Y':
507 raise util.Abort(_('unknown bundle version %s') % version)
504 raise util.Abort(_('unknown bundle version %s') % version)
508 self.ui.debug('start processing of %s stream\n' % header)
505 self.ui.debug('start processing of %s stream\n' % header)
509
506
510 @util.propertycache
507 @util.propertycache
511 def params(self):
508 def params(self):
512 """dictionary of stream level parameters"""
509 """dictionary of stream level parameters"""
513 self.ui.debug('reading bundle2 stream parameters\n')
510 self.ui.debug('reading bundle2 stream parameters\n')
514 params = {}
511 params = {}
515 paramssize = self._unpack(_fstreamparamsize)[0]
512 paramssize = self._unpack(_fstreamparamsize)[0]
516 if paramssize < 0:
513 if paramssize < 0:
517 raise error.BundleValueError('negative bundle param size: %i'
514 raise error.BundleValueError('negative bundle param size: %i'
518 % paramssize)
515 % paramssize)
519 if paramssize:
516 if paramssize:
520 for p in self._readexact(paramssize).split(' '):
517 for p in self._readexact(paramssize).split(' '):
521 p = p.split('=', 1)
518 p = p.split('=', 1)
522 p = [urllib.unquote(i) for i in p]
519 p = [urllib.unquote(i) for i in p]
523 if len(p) < 2:
520 if len(p) < 2:
524 p.append(None)
521 p.append(None)
525 self._processparam(*p)
522 self._processparam(*p)
526 params[p[0]] = p[1]
523 params[p[0]] = p[1]
527 return params
524 return params
528
525
529 def _processparam(self, name, value):
526 def _processparam(self, name, value):
530 """process a parameter, applying its effect if needed
527 """process a parameter, applying its effect if needed
531
528
532 Parameter starting with a lower case letter are advisory and will be
529 Parameter starting with a lower case letter are advisory and will be
533 ignored when unknown. Those starting with an upper case letter are
530 ignored when unknown. Those starting with an upper case letter are
534 mandatory and will this function will raise a KeyError when unknown.
531 mandatory and will this function will raise a KeyError when unknown.
535
532
536 Note: no option are currently supported. Any input will be either
533 Note: no option are currently supported. Any input will be either
537 ignored or failing.
534 ignored or failing.
538 """
535 """
539 if not name:
536 if not name:
540 raise ValueError('empty parameter name')
537 raise ValueError('empty parameter name')
541 if name[0] not in string.letters:
538 if name[0] not in string.letters:
542 raise ValueError('non letter first character: %r' % name)
539 raise ValueError('non letter first character: %r' % name)
543 # Some logic will be later added here to try to process the option for
540 # Some logic will be later added here to try to process the option for
544 # a dict of known parameter.
541 # a dict of known parameter.
545 if name[0].islower():
542 if name[0].islower():
546 self.ui.debug("ignoring unknown parameter %r\n" % name)
543 self.ui.debug("ignoring unknown parameter %r\n" % name)
547 else:
544 else:
548 raise error.UnsupportedPartError(params=(name,))
545 raise error.UnsupportedPartError(params=(name,))
549
546
550
547
551 def iterparts(self):
548 def iterparts(self):
552 """yield all parts contained in the stream"""
549 """yield all parts contained in the stream"""
553 # make sure param have been loaded
550 # make sure param have been loaded
554 self.params
551 self.params
555 self.ui.debug('start extraction of bundle2 parts\n')
552 self.ui.debug('start extraction of bundle2 parts\n')
556 headerblock = self._readpartheader()
553 headerblock = self._readpartheader()
557 while headerblock is not None:
554 while headerblock is not None:
558 part = unbundlepart(self.ui, headerblock, self._fp)
555 part = unbundlepart(self.ui, headerblock, self._fp)
559 yield part
556 yield part
560 headerblock = self._readpartheader()
557 headerblock = self._readpartheader()
561 self.ui.debug('end of bundle2 stream\n')
558 self.ui.debug('end of bundle2 stream\n')
562
559
563 def _readpartheader(self):
560 def _readpartheader(self):
564 """reads a part header size and return the bytes blob
561 """reads a part header size and return the bytes blob
565
562
566 returns None if empty"""
563 returns None if empty"""
567 headersize = self._unpack(_fpartheadersize)[0]
564 headersize = self._unpack(_fpartheadersize)[0]
568 if headersize < 0:
565 if headersize < 0:
569 raise error.BundleValueError('negative part header size: %i'
566 raise error.BundleValueError('negative part header size: %i'
570 % headersize)
567 % headersize)
571 self.ui.debug('part header size: %i\n' % headersize)
568 self.ui.debug('part header size: %i\n' % headersize)
572 if headersize:
569 if headersize:
573 return self._readexact(headersize)
570 return self._readexact(headersize)
574 return None
571 return None
575
572
576
573
577 class bundlepart(object):
574 class bundlepart(object):
578 """A bundle2 part contains application level payload
575 """A bundle2 part contains application level payload
579
576
580 The part `type` is used to route the part to the application level
577 The part `type` is used to route the part to the application level
581 handler.
578 handler.
582
579
583 The part payload is contained in ``part.data``. It could be raw bytes or a
580 The part payload is contained in ``part.data``. It could be raw bytes or a
584 generator of byte chunks.
581 generator of byte chunks.
585
582
586 You can add parameters to the part using the ``addparam`` method.
583 You can add parameters to the part using the ``addparam`` method.
587 Parameters can be either mandatory (default) or advisory. Remote side
584 Parameters can be either mandatory (default) or advisory. Remote side
588 should be able to safely ignore the advisory ones.
585 should be able to safely ignore the advisory ones.
589
586
590 Both data and parameters cannot be modified after the generation has begun.
587 Both data and parameters cannot be modified after the generation has begun.
591 """
588 """
592
589
593 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
590 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
594 data=''):
591 data=''):
595 self.id = None
592 self.id = None
596 self.type = parttype
593 self.type = parttype
597 self._data = data
594 self._data = data
598 self._mandatoryparams = list(mandatoryparams)
595 self._mandatoryparams = list(mandatoryparams)
599 self._advisoryparams = list(advisoryparams)
596 self._advisoryparams = list(advisoryparams)
600 # checking for duplicated entries
597 # checking for duplicated entries
601 self._seenparams = set()
598 self._seenparams = set()
602 for pname, __ in self._mandatoryparams + self._advisoryparams:
599 for pname, __ in self._mandatoryparams + self._advisoryparams:
603 if pname in self._seenparams:
600 if pname in self._seenparams:
604 raise RuntimeError('duplicated params: %s' % pname)
601 raise RuntimeError('duplicated params: %s' % pname)
605 self._seenparams.add(pname)
602 self._seenparams.add(pname)
606 # status of the part's generation:
603 # status of the part's generation:
607 # - None: not started,
604 # - None: not started,
608 # - False: currently generated,
605 # - False: currently generated,
609 # - True: generation done.
606 # - True: generation done.
610 self._generated = None
607 self._generated = None
611
608
612 # methods used to defines the part content
609 # methods used to defines the part content
613 def __setdata(self, data):
610 def __setdata(self, data):
614 if self._generated is not None:
611 if self._generated is not None:
615 raise error.ReadOnlyPartError('part is being generated')
612 raise error.ReadOnlyPartError('part is being generated')
616 self._data = data
613 self._data = data
617 def __getdata(self):
614 def __getdata(self):
618 return self._data
615 return self._data
619 data = property(__getdata, __setdata)
616 data = property(__getdata, __setdata)
620
617
621 @property
618 @property
622 def mandatoryparams(self):
619 def mandatoryparams(self):
623 # make it an immutable tuple to force people through ``addparam``
620 # make it an immutable tuple to force people through ``addparam``
624 return tuple(self._mandatoryparams)
621 return tuple(self._mandatoryparams)
625
622
626 @property
623 @property
627 def advisoryparams(self):
624 def advisoryparams(self):
628 # make it an immutable tuple to force people through ``addparam``
625 # make it an immutable tuple to force people through ``addparam``
629 return tuple(self._advisoryparams)
626 return tuple(self._advisoryparams)
630
627
631 def addparam(self, name, value='', mandatory=True):
628 def addparam(self, name, value='', mandatory=True):
632 if self._generated is not None:
629 if self._generated is not None:
633 raise error.ReadOnlyPartError('part is being generated')
630 raise error.ReadOnlyPartError('part is being generated')
634 if name in self._seenparams:
631 if name in self._seenparams:
635 raise ValueError('duplicated params: %s' % name)
632 raise ValueError('duplicated params: %s' % name)
636 self._seenparams.add(name)
633 self._seenparams.add(name)
637 params = self._advisoryparams
634 params = self._advisoryparams
638 if mandatory:
635 if mandatory:
639 params = self._mandatoryparams
636 params = self._mandatoryparams
640 params.append((name, value))
637 params.append((name, value))
641
638
642 # methods used to generates the bundle2 stream
639 # methods used to generates the bundle2 stream
643 def getchunks(self):
640 def getchunks(self):
644 if self._generated is not None:
641 if self._generated is not None:
645 raise RuntimeError('part can only be consumed once')
642 raise RuntimeError('part can only be consumed once')
646 self._generated = False
643 self._generated = False
647 #### header
644 #### header
648 ## parttype
645 ## parttype
649 header = [_pack(_fparttypesize, len(self.type)),
646 header = [_pack(_fparttypesize, len(self.type)),
650 self.type, _pack(_fpartid, self.id),
647 self.type, _pack(_fpartid, self.id),
651 ]
648 ]
652 ## parameters
649 ## parameters
653 # count
650 # count
654 manpar = self.mandatoryparams
651 manpar = self.mandatoryparams
655 advpar = self.advisoryparams
652 advpar = self.advisoryparams
656 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
653 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
657 # size
654 # size
658 parsizes = []
655 parsizes = []
659 for key, value in manpar:
656 for key, value in manpar:
660 parsizes.append(len(key))
657 parsizes.append(len(key))
661 parsizes.append(len(value))
658 parsizes.append(len(value))
662 for key, value in advpar:
659 for key, value in advpar:
663 parsizes.append(len(key))
660 parsizes.append(len(key))
664 parsizes.append(len(value))
661 parsizes.append(len(value))
665 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
662 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
666 header.append(paramsizes)
663 header.append(paramsizes)
667 # key, value
664 # key, value
668 for key, value in manpar:
665 for key, value in manpar:
669 header.append(key)
666 header.append(key)
670 header.append(value)
667 header.append(value)
671 for key, value in advpar:
668 for key, value in advpar:
672 header.append(key)
669 header.append(key)
673 header.append(value)
670 header.append(value)
674 ## finalize header
671 ## finalize header
675 headerchunk = ''.join(header)
672 headerchunk = ''.join(header)
676 yield _pack(_fpartheadersize, len(headerchunk))
673 yield _pack(_fpartheadersize, len(headerchunk))
677 yield headerchunk
674 yield headerchunk
678 ## payload
675 ## payload
679 try:
676 try:
680 for chunk in self._payloadchunks():
677 for chunk in self._payloadchunks():
681 yield _pack(_fpayloadsize, len(chunk))
678 yield _pack(_fpayloadsize, len(chunk))
682 yield chunk
679 yield chunk
683 except Exception, exc:
680 except Exception, exc:
684 # backup exception data for later
681 # backup exception data for later
685 exc_info = sys.exc_info()
682 exc_info = sys.exc_info()
686 msg = 'unexpected error: %s' % exc
683 msg = 'unexpected error: %s' % exc
687 interpart = bundlepart('b2x:error:abort', [('message', msg)])
684 interpart = bundlepart('b2x:error:abort', [('message', msg)])
688 interpart.id = 0
685 interpart.id = 0
689 yield _pack(_fpayloadsize, -1)
686 yield _pack(_fpayloadsize, -1)
690 for chunk in interpart.getchunks():
687 for chunk in interpart.getchunks():
691 yield chunk
688 yield chunk
692 # abort current part payload
689 # abort current part payload
693 yield _pack(_fpayloadsize, 0)
690 yield _pack(_fpayloadsize, 0)
694 raise exc_info[0], exc_info[1], exc_info[2]
691 raise exc_info[0], exc_info[1], exc_info[2]
695 # end of payload
692 # end of payload
696 yield _pack(_fpayloadsize, 0)
693 yield _pack(_fpayloadsize, 0)
697 self._generated = True
694 self._generated = True
698
695
699 def _payloadchunks(self):
696 def _payloadchunks(self):
700 """yield chunks of a the part payload
697 """yield chunks of a the part payload
701
698
702 Exists to handle the different methods to provide data to a part."""
699 Exists to handle the different methods to provide data to a part."""
703 # we only support fixed size data now.
700 # we only support fixed size data now.
704 # This will be improved in the future.
701 # This will be improved in the future.
705 if util.safehasattr(self.data, 'next'):
702 if util.safehasattr(self.data, 'next'):
706 buff = util.chunkbuffer(self.data)
703 buff = util.chunkbuffer(self.data)
707 chunk = buff.read(preferedchunksize)
704 chunk = buff.read(preferedchunksize)
708 while chunk:
705 while chunk:
709 yield chunk
706 yield chunk
710 chunk = buff.read(preferedchunksize)
707 chunk = buff.read(preferedchunksize)
711 elif len(self.data):
708 elif len(self.data):
712 yield self.data
709 yield self.data
713
710
714
711
715 flaginterrupt = -1
712 flaginterrupt = -1
716
713
717 class interrupthandler(unpackermixin):
714 class interrupthandler(unpackermixin):
718 """read one part and process it with restricted capability
715 """read one part and process it with restricted capability
719
716
720 This allows to transmit exception raised on the producer size during part
717 This allows to transmit exception raised on the producer size during part
721 iteration while the consumer is reading a part.
718 iteration while the consumer is reading a part.
722
719
723 Part processed in this manner only have access to a ui object,"""
720 Part processed in this manner only have access to a ui object,"""
724
721
725 def __init__(self, ui, fp):
722 def __init__(self, ui, fp):
726 super(interrupthandler, self).__init__(fp)
723 super(interrupthandler, self).__init__(fp)
727 self.ui = ui
724 self.ui = ui
728
725
729 def _readpartheader(self):
726 def _readpartheader(self):
730 """reads a part header size and return the bytes blob
727 """reads a part header size and return the bytes blob
731
728
732 returns None if empty"""
729 returns None if empty"""
733 headersize = self._unpack(_fpartheadersize)[0]
730 headersize = self._unpack(_fpartheadersize)[0]
734 if headersize < 0:
731 if headersize < 0:
735 raise error.BundleValueError('negative part header size: %i'
732 raise error.BundleValueError('negative part header size: %i'
736 % headersize)
733 % headersize)
737 self.ui.debug('part header size: %i\n' % headersize)
734 self.ui.debug('part header size: %i\n' % headersize)
738 if headersize:
735 if headersize:
739 return self._readexact(headersize)
736 return self._readexact(headersize)
740 return None
737 return None
741
738
742 def __call__(self):
739 def __call__(self):
743 self.ui.debug('bundle2 stream interruption, looking for a part.\n')
740 self.ui.debug('bundle2 stream interruption, looking for a part.\n')
744 headerblock = self._readpartheader()
741 headerblock = self._readpartheader()
745 if headerblock is None:
742 if headerblock is None:
746 self.ui.debug('no part found during interruption.\n')
743 self.ui.debug('no part found during interruption.\n')
747 return
744 return
748 part = unbundlepart(self.ui, headerblock, self._fp)
745 part = unbundlepart(self.ui, headerblock, self._fp)
749 op = interruptoperation(self.ui)
746 op = interruptoperation(self.ui)
750 _processpart(op, part)
747 _processpart(op, part)
751
748
752 class interruptoperation(object):
749 class interruptoperation(object):
753 """A limited operation to be use by part handler during interruption
750 """A limited operation to be use by part handler during interruption
754
751
755 It only have access to an ui object.
752 It only have access to an ui object.
756 """
753 """
757
754
758 def __init__(self, ui):
755 def __init__(self, ui):
759 self.ui = ui
756 self.ui = ui
760 self.reply = None
757 self.reply = None
761
758
762 @property
759 @property
763 def repo(self):
760 def repo(self):
764 raise RuntimeError('no repo access from stream interruption')
761 raise RuntimeError('no repo access from stream interruption')
765
762
766 def gettransaction(self):
763 def gettransaction(self):
767 raise TransactionUnavailable('no repo access from stream interruption')
764 raise TransactionUnavailable('no repo access from stream interruption')
768
765
769 class unbundlepart(unpackermixin):
766 class unbundlepart(unpackermixin):
770 """a bundle part read from a bundle"""
767 """a bundle part read from a bundle"""
771
768
772 def __init__(self, ui, header, fp):
769 def __init__(self, ui, header, fp):
773 super(unbundlepart, self).__init__(fp)
770 super(unbundlepart, self).__init__(fp)
774 self.ui = ui
771 self.ui = ui
775 # unbundle state attr
772 # unbundle state attr
776 self._headerdata = header
773 self._headerdata = header
777 self._headeroffset = 0
774 self._headeroffset = 0
778 self._initialized = False
775 self._initialized = False
779 self.consumed = False
776 self.consumed = False
780 # part data
777 # part data
781 self.id = None
778 self.id = None
782 self.type = None
779 self.type = None
783 self.mandatoryparams = None
780 self.mandatoryparams = None
784 self.advisoryparams = None
781 self.advisoryparams = None
785 self.params = None
782 self.params = None
786 self.mandatorykeys = ()
783 self.mandatorykeys = ()
787 self._payloadstream = None
784 self._payloadstream = None
788 self._readheader()
785 self._readheader()
789 self._mandatory = None
786 self._mandatory = None
790
787
791 def _fromheader(self, size):
788 def _fromheader(self, size):
792 """return the next <size> byte from the header"""
789 """return the next <size> byte from the header"""
793 offset = self._headeroffset
790 offset = self._headeroffset
794 data = self._headerdata[offset:(offset + size)]
791 data = self._headerdata[offset:(offset + size)]
795 self._headeroffset = offset + size
792 self._headeroffset = offset + size
796 return data
793 return data
797
794
798 def _unpackheader(self, format):
795 def _unpackheader(self, format):
799 """read given format from header
796 """read given format from header
800
797
801 This automatically compute the size of the format to read."""
798 This automatically compute the size of the format to read."""
802 data = self._fromheader(struct.calcsize(format))
799 data = self._fromheader(struct.calcsize(format))
803 return _unpack(format, data)
800 return _unpack(format, data)
804
801
805 def _initparams(self, mandatoryparams, advisoryparams):
802 def _initparams(self, mandatoryparams, advisoryparams):
806 """internal function to setup all logic related parameters"""
803 """internal function to setup all logic related parameters"""
807 # make it read only to prevent people touching it by mistake.
804 # make it read only to prevent people touching it by mistake.
808 self.mandatoryparams = tuple(mandatoryparams)
805 self.mandatoryparams = tuple(mandatoryparams)
809 self.advisoryparams = tuple(advisoryparams)
806 self.advisoryparams = tuple(advisoryparams)
810 # user friendly UI
807 # user friendly UI
811 self.params = dict(self.mandatoryparams)
808 self.params = dict(self.mandatoryparams)
812 self.params.update(dict(self.advisoryparams))
809 self.params.update(dict(self.advisoryparams))
813 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
810 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
814
811
815 def _readheader(self):
812 def _readheader(self):
816 """read the header and setup the object"""
813 """read the header and setup the object"""
817 typesize = self._unpackheader(_fparttypesize)[0]
814 typesize = self._unpackheader(_fparttypesize)[0]
818 self.type = self._fromheader(typesize)
815 self.type = self._fromheader(typesize)
819 self.ui.debug('part type: "%s"\n' % self.type)
816 self.ui.debug('part type: "%s"\n' % self.type)
820 self.id = self._unpackheader(_fpartid)[0]
817 self.id = self._unpackheader(_fpartid)[0]
821 self.ui.debug('part id: "%s"\n' % self.id)
818 self.ui.debug('part id: "%s"\n' % self.id)
822 # extract mandatory bit from type
819 # extract mandatory bit from type
823 self.mandatory = (self.type != self.type.lower())
820 self.mandatory = (self.type != self.type.lower())
824 self.type = self.type.lower()
821 self.type = self.type.lower()
825 ## reading parameters
822 ## reading parameters
826 # param count
823 # param count
827 mancount, advcount = self._unpackheader(_fpartparamcount)
824 mancount, advcount = self._unpackheader(_fpartparamcount)
828 self.ui.debug('part parameters: %i\n' % (mancount + advcount))
825 self.ui.debug('part parameters: %i\n' % (mancount + advcount))
829 # param size
826 # param size
830 fparamsizes = _makefpartparamsizes(mancount + advcount)
827 fparamsizes = _makefpartparamsizes(mancount + advcount)
831 paramsizes = self._unpackheader(fparamsizes)
828 paramsizes = self._unpackheader(fparamsizes)
832 # make it a list of couple again
829 # make it a list of couple again
833 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
830 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
834 # split mandatory from advisory
831 # split mandatory from advisory
835 mansizes = paramsizes[:mancount]
832 mansizes = paramsizes[:mancount]
836 advsizes = paramsizes[mancount:]
833 advsizes = paramsizes[mancount:]
837 # retrieve param value
834 # retrieve param value
838 manparams = []
835 manparams = []
839 for key, value in mansizes:
836 for key, value in mansizes:
840 manparams.append((self._fromheader(key), self._fromheader(value)))
837 manparams.append((self._fromheader(key), self._fromheader(value)))
841 advparams = []
838 advparams = []
842 for key, value in advsizes:
839 for key, value in advsizes:
843 advparams.append((self._fromheader(key), self._fromheader(value)))
840 advparams.append((self._fromheader(key), self._fromheader(value)))
844 self._initparams(manparams, advparams)
841 self._initparams(manparams, advparams)
845 ## part payload
842 ## part payload
846 def payloadchunks():
843 def payloadchunks():
847 payloadsize = self._unpack(_fpayloadsize)[0]
844 payloadsize = self._unpack(_fpayloadsize)[0]
848 self.ui.debug('payload chunk size: %i\n' % payloadsize)
845 self.ui.debug('payload chunk size: %i\n' % payloadsize)
849 while payloadsize:
846 while payloadsize:
850 if payloadsize == flaginterrupt:
847 if payloadsize == flaginterrupt:
851 # interruption detection, the handler will now read a
848 # interruption detection, the handler will now read a
852 # single part and process it.
849 # single part and process it.
853 interrupthandler(self.ui, self._fp)()
850 interrupthandler(self.ui, self._fp)()
854 elif payloadsize < 0:
851 elif payloadsize < 0:
855 msg = 'negative payload chunk size: %i' % payloadsize
852 msg = 'negative payload chunk size: %i' % payloadsize
856 raise error.BundleValueError(msg)
853 raise error.BundleValueError(msg)
857 else:
854 else:
858 yield self._readexact(payloadsize)
855 yield self._readexact(payloadsize)
859 payloadsize = self._unpack(_fpayloadsize)[0]
856 payloadsize = self._unpack(_fpayloadsize)[0]
860 self.ui.debug('payload chunk size: %i\n' % payloadsize)
857 self.ui.debug('payload chunk size: %i\n' % payloadsize)
861 self._payloadstream = util.chunkbuffer(payloadchunks())
858 self._payloadstream = util.chunkbuffer(payloadchunks())
862 # we read the data, tell it
859 # we read the data, tell it
863 self._initialized = True
860 self._initialized = True
864
861
865 def read(self, size=None):
862 def read(self, size=None):
866 """read payload data"""
863 """read payload data"""
867 if not self._initialized:
864 if not self._initialized:
868 self._readheader()
865 self._readheader()
869 if size is None:
866 if size is None:
870 data = self._payloadstream.read()
867 data = self._payloadstream.read()
871 else:
868 else:
872 data = self._payloadstream.read(size)
869 data = self._payloadstream.read(size)
873 if size is None or len(data) < size:
870 if size is None or len(data) < size:
874 self.consumed = True
871 self.consumed = True
875 return data
872 return data
876
873
877 capabilities = {'HG2Y': (),
874 capabilities = {'HG2Y': (),
878 'b2x:listkeys': (),
875 'b2x:listkeys': (),
879 'b2x:pushkey': (),
876 'b2x:pushkey': (),
880 'digests': tuple(sorted(util.DIGESTS.keys())),
877 'digests': tuple(sorted(util.DIGESTS.keys())),
881 'b2x:remote-changegroup': ('http', 'https'),
878 'b2x:remote-changegroup': ('http', 'https'),
882 }
879 }
883
880
884 def getrepocaps(repo, allowpushback=False):
881 def getrepocaps(repo, allowpushback=False):
885 """return the bundle2 capabilities for a given repo
882 """return the bundle2 capabilities for a given repo
886
883
887 Exists to allow extensions (like evolution) to mutate the capabilities.
884 Exists to allow extensions (like evolution) to mutate the capabilities.
888 """
885 """
889 caps = capabilities.copy()
886 caps = capabilities.copy()
890 caps['b2x:changegroup'] = tuple(sorted(changegroup.packermap.keys()))
887 caps['b2x:changegroup'] = tuple(sorted(changegroup.packermap.keys()))
891 if obsolete.isenabled(repo, obsolete.exchangeopt):
888 if obsolete.isenabled(repo, obsolete.exchangeopt):
892 supportedformat = tuple('V%i' % v for v in obsolete.formats)
889 supportedformat = tuple('V%i' % v for v in obsolete.formats)
893 caps['b2x:obsmarkers'] = supportedformat
890 caps['b2x:obsmarkers'] = supportedformat
894 if allowpushback:
891 if allowpushback:
895 caps['b2x:pushback'] = ()
892 caps['b2x:pushback'] = ()
896 return caps
893 return caps
897
894
898 def bundle2caps(remote):
895 def bundle2caps(remote):
899 """return the bundle capabilities of a peer as dict"""
896 """return the bundle capabilities of a peer as dict"""
900 raw = remote.capable('bundle2-exp')
897 raw = remote.capable('bundle2-exp')
901 if not raw and raw != '':
898 if not raw and raw != '':
902 return {}
899 return {}
903 capsblob = urllib.unquote(remote.capable('bundle2-exp'))
900 capsblob = urllib.unquote(remote.capable('bundle2-exp'))
904 return decodecaps(capsblob)
901 return decodecaps(capsblob)
905
902
906 def obsmarkersversion(caps):
903 def obsmarkersversion(caps):
907 """extract the list of supported obsmarkers versions from a bundle2caps dict
904 """extract the list of supported obsmarkers versions from a bundle2caps dict
908 """
905 """
909 obscaps = caps.get('b2x:obsmarkers', ())
906 obscaps = caps.get('b2x:obsmarkers', ())
910 return [int(c[1:]) for c in obscaps if c.startswith('V')]
907 return [int(c[1:]) for c in obscaps if c.startswith('V')]
911
908
912 @parthandler('b2x:changegroup', ('version',))
909 @parthandler('b2x:changegroup', ('version',))
913 def handlechangegroup(op, inpart):
910 def handlechangegroup(op, inpart):
914 """apply a changegroup part on the repo
911 """apply a changegroup part on the repo
915
912
916 This is a very early implementation that will massive rework before being
913 This is a very early implementation that will massive rework before being
917 inflicted to any end-user.
914 inflicted to any end-user.
918 """
915 """
919 # Make sure we trigger a transaction creation
916 # Make sure we trigger a transaction creation
920 #
917 #
921 # The addchangegroup function will get a transaction object by itself, but
918 # The addchangegroup function will get a transaction object by itself, but
922 # we need to make sure we trigger the creation of a transaction object used
919 # we need to make sure we trigger the creation of a transaction object used
923 # for the whole processing scope.
920 # for the whole processing scope.
924 op.gettransaction()
921 op.gettransaction()
925 unpackerversion = inpart.params.get('version', '01')
922 unpackerversion = inpart.params.get('version', '01')
926 # We should raise an appropriate exception here
923 # We should raise an appropriate exception here
927 unpacker = changegroup.packermap[unpackerversion][1]
924 unpacker = changegroup.packermap[unpackerversion][1]
928 cg = unpacker(inpart, 'UN')
925 cg = unpacker(inpart, 'UN')
929 # the source and url passed here are overwritten by the one contained in
926 # the source and url passed here are overwritten by the one contained in
930 # the transaction.hookargs argument. So 'bundle2' is a placeholder
927 # the transaction.hookargs argument. So 'bundle2' is a placeholder
931 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
928 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
932 op.records.add('changegroup', {'return': ret})
929 op.records.add('changegroup', {'return': ret})
933 if op.reply is not None:
930 if op.reply is not None:
934 # This is definitely not the final form of this
931 # This is definitely not the final form of this
935 # return. But one need to start somewhere.
932 # return. But one need to start somewhere.
936 part = op.reply.newpart('b2x:reply:changegroup')
933 part = op.reply.newpart('b2x:reply:changegroup')
937 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
934 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
938 part.addparam('return', '%i' % ret, mandatory=False)
935 part.addparam('return', '%i' % ret, mandatory=False)
939 assert not inpart.read()
936 assert not inpart.read()
940
937
941 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
938 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
942 ['digest:%s' % k for k in util.DIGESTS.keys()])
939 ['digest:%s' % k for k in util.DIGESTS.keys()])
943 @parthandler('b2x:remote-changegroup', _remotechangegroupparams)
940 @parthandler('b2x:remote-changegroup', _remotechangegroupparams)
944 def handleremotechangegroup(op, inpart):
941 def handleremotechangegroup(op, inpart):
945 """apply a bundle10 on the repo, given an url and validation information
942 """apply a bundle10 on the repo, given an url and validation information
946
943
947 All the information about the remote bundle to import are given as
944 All the information about the remote bundle to import are given as
948 parameters. The parameters include:
945 parameters. The parameters include:
949 - url: the url to the bundle10.
946 - url: the url to the bundle10.
950 - size: the bundle10 file size. It is used to validate what was
947 - size: the bundle10 file size. It is used to validate what was
951 retrieved by the client matches the server knowledge about the bundle.
948 retrieved by the client matches the server knowledge about the bundle.
952 - digests: a space separated list of the digest types provided as
949 - digests: a space separated list of the digest types provided as
953 parameters.
950 parameters.
954 - digest:<digest-type>: the hexadecimal representation of the digest with
951 - digest:<digest-type>: the hexadecimal representation of the digest with
955 that name. Like the size, it is used to validate what was retrieved by
952 that name. Like the size, it is used to validate what was retrieved by
956 the client matches what the server knows about the bundle.
953 the client matches what the server knows about the bundle.
957
954
958 When multiple digest types are given, all of them are checked.
955 When multiple digest types are given, all of them are checked.
959 """
956 """
960 try:
957 try:
961 raw_url = inpart.params['url']
958 raw_url = inpart.params['url']
962 except KeyError:
959 except KeyError:
963 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'url')
960 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'url')
964 parsed_url = util.url(raw_url)
961 parsed_url = util.url(raw_url)
965 if parsed_url.scheme not in capabilities['b2x:remote-changegroup']:
962 if parsed_url.scheme not in capabilities['b2x:remote-changegroup']:
966 raise util.Abort(_('remote-changegroup does not support %s urls') %
963 raise util.Abort(_('remote-changegroup does not support %s urls') %
967 parsed_url.scheme)
964 parsed_url.scheme)
968
965
969 try:
966 try:
970 size = int(inpart.params['size'])
967 size = int(inpart.params['size'])
971 except ValueError:
968 except ValueError:
972 raise util.Abort(_('remote-changegroup: invalid value for param "%s"')
969 raise util.Abort(_('remote-changegroup: invalid value for param "%s"')
973 % 'size')
970 % 'size')
974 except KeyError:
971 except KeyError:
975 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'size')
972 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'size')
976
973
977 digests = {}
974 digests = {}
978 for typ in inpart.params.get('digests', '').split():
975 for typ in inpart.params.get('digests', '').split():
979 param = 'digest:%s' % typ
976 param = 'digest:%s' % typ
980 try:
977 try:
981 value = inpart.params[param]
978 value = inpart.params[param]
982 except KeyError:
979 except KeyError:
983 raise util.Abort(_('remote-changegroup: missing "%s" param') %
980 raise util.Abort(_('remote-changegroup: missing "%s" param') %
984 param)
981 param)
985 digests[typ] = value
982 digests[typ] = value
986
983
987 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
984 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
988
985
989 # Make sure we trigger a transaction creation
986 # Make sure we trigger a transaction creation
990 #
987 #
991 # The addchangegroup function will get a transaction object by itself, but
988 # The addchangegroup function will get a transaction object by itself, but
992 # we need to make sure we trigger the creation of a transaction object used
989 # we need to make sure we trigger the creation of a transaction object used
993 # for the whole processing scope.
990 # for the whole processing scope.
994 op.gettransaction()
991 op.gettransaction()
995 import exchange
992 import exchange
996 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
993 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
997 if not isinstance(cg, changegroup.cg1unpacker):
994 if not isinstance(cg, changegroup.cg1unpacker):
998 raise util.Abort(_('%s: not a bundle version 1.0') %
995 raise util.Abort(_('%s: not a bundle version 1.0') %
999 util.hidepassword(raw_url))
996 util.hidepassword(raw_url))
1000 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
997 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
1001 op.records.add('changegroup', {'return': ret})
998 op.records.add('changegroup', {'return': ret})
1002 if op.reply is not None:
999 if op.reply is not None:
1003 # This is definitely not the final form of this
1000 # This is definitely not the final form of this
1004 # return. But one need to start somewhere.
1001 # return. But one need to start somewhere.
1005 part = op.reply.newpart('b2x:reply:changegroup')
1002 part = op.reply.newpart('b2x:reply:changegroup')
1006 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1003 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1007 part.addparam('return', '%i' % ret, mandatory=False)
1004 part.addparam('return', '%i' % ret, mandatory=False)
1008 try:
1005 try:
1009 real_part.validate()
1006 real_part.validate()
1010 except util.Abort, e:
1007 except util.Abort, e:
1011 raise util.Abort(_('bundle at %s is corrupted:\n%s') %
1008 raise util.Abort(_('bundle at %s is corrupted:\n%s') %
1012 (util.hidepassword(raw_url), str(e)))
1009 (util.hidepassword(raw_url), str(e)))
1013 assert not inpart.read()
1010 assert not inpart.read()
1014
1011
1015 @parthandler('b2x:reply:changegroup', ('return', 'in-reply-to'))
1012 @parthandler('b2x:reply:changegroup', ('return', 'in-reply-to'))
1016 def handlereplychangegroup(op, inpart):
1013 def handlereplychangegroup(op, inpart):
1017 ret = int(inpart.params['return'])
1014 ret = int(inpart.params['return'])
1018 replyto = int(inpart.params['in-reply-to'])
1015 replyto = int(inpart.params['in-reply-to'])
1019 op.records.add('changegroup', {'return': ret}, replyto)
1016 op.records.add('changegroup', {'return': ret}, replyto)
1020
1017
1021 @parthandler('b2x:check:heads')
1018 @parthandler('b2x:check:heads')
1022 def handlecheckheads(op, inpart):
1019 def handlecheckheads(op, inpart):
1023 """check that head of the repo did not change
1020 """check that head of the repo did not change
1024
1021
1025 This is used to detect a push race when using unbundle.
1022 This is used to detect a push race when using unbundle.
1026 This replaces the "heads" argument of unbundle."""
1023 This replaces the "heads" argument of unbundle."""
1027 h = inpart.read(20)
1024 h = inpart.read(20)
1028 heads = []
1025 heads = []
1029 while len(h) == 20:
1026 while len(h) == 20:
1030 heads.append(h)
1027 heads.append(h)
1031 h = inpart.read(20)
1028 h = inpart.read(20)
1032 assert not h
1029 assert not h
1033 if heads != op.repo.heads():
1030 if heads != op.repo.heads():
1034 raise error.PushRaced('repository changed while pushing - '
1031 raise error.PushRaced('repository changed while pushing - '
1035 'please try again')
1032 'please try again')
1036
1033
1037 @parthandler('b2x:output')
1034 @parthandler('b2x:output')
1038 def handleoutput(op, inpart):
1035 def handleoutput(op, inpart):
1039 """forward output captured on the server to the client"""
1036 """forward output captured on the server to the client"""
1040 for line in inpart.read().splitlines():
1037 for line in inpart.read().splitlines():
1041 op.ui.write(('remote: %s\n' % line))
1038 op.ui.write(('remote: %s\n' % line))
1042
1039
1043 @parthandler('b2x:replycaps')
1040 @parthandler('b2x:replycaps')
1044 def handlereplycaps(op, inpart):
1041 def handlereplycaps(op, inpart):
1045 """Notify that a reply bundle should be created
1042 """Notify that a reply bundle should be created
1046
1043
1047 The payload contains the capabilities information for the reply"""
1044 The payload contains the capabilities information for the reply"""
1048 caps = decodecaps(inpart.read())
1045 caps = decodecaps(inpart.read())
1049 if op.reply is None:
1046 if op.reply is None:
1050 op.reply = bundle20(op.ui, caps)
1047 op.reply = bundle20(op.ui, caps)
1051
1048
1052 @parthandler('b2x:error:abort', ('message', 'hint'))
1049 @parthandler('b2x:error:abort', ('message', 'hint'))
1053 def handlereplycaps(op, inpart):
1050 def handlereplycaps(op, inpart):
1054 """Used to transmit abort error over the wire"""
1051 """Used to transmit abort error over the wire"""
1055 raise util.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
1052 raise util.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
1056
1053
1057 @parthandler('b2x:error:unsupportedcontent', ('parttype', 'params'))
1054 @parthandler('b2x:error:unsupportedcontent', ('parttype', 'params'))
1058 def handlereplycaps(op, inpart):
1055 def handlereplycaps(op, inpart):
1059 """Used to transmit unknown content error over the wire"""
1056 """Used to transmit unknown content error over the wire"""
1060 kwargs = {}
1057 kwargs = {}
1061 parttype = inpart.params.get('parttype')
1058 parttype = inpart.params.get('parttype')
1062 if parttype is not None:
1059 if parttype is not None:
1063 kwargs['parttype'] = parttype
1060 kwargs['parttype'] = parttype
1064 params = inpart.params.get('params')
1061 params = inpart.params.get('params')
1065 if params is not None:
1062 if params is not None:
1066 kwargs['params'] = params.split('\0')
1063 kwargs['params'] = params.split('\0')
1067
1064
1068 raise error.UnsupportedPartError(**kwargs)
1065 raise error.UnsupportedPartError(**kwargs)
1069
1066
1070 @parthandler('b2x:error:pushraced', ('message',))
1067 @parthandler('b2x:error:pushraced', ('message',))
1071 def handlereplycaps(op, inpart):
1068 def handlereplycaps(op, inpart):
1072 """Used to transmit push race error over the wire"""
1069 """Used to transmit push race error over the wire"""
1073 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1070 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1074
1071
1075 @parthandler('b2x:listkeys', ('namespace',))
1072 @parthandler('b2x:listkeys', ('namespace',))
1076 def handlelistkeys(op, inpart):
1073 def handlelistkeys(op, inpart):
1077 """retrieve pushkey namespace content stored in a bundle2"""
1074 """retrieve pushkey namespace content stored in a bundle2"""
1078 namespace = inpart.params['namespace']
1075 namespace = inpart.params['namespace']
1079 r = pushkey.decodekeys(inpart.read())
1076 r = pushkey.decodekeys(inpart.read())
1080 op.records.add('listkeys', (namespace, r))
1077 op.records.add('listkeys', (namespace, r))
1081
1078
1082 @parthandler('b2x:pushkey', ('namespace', 'key', 'old', 'new'))
1079 @parthandler('b2x:pushkey', ('namespace', 'key', 'old', 'new'))
1083 def handlepushkey(op, inpart):
1080 def handlepushkey(op, inpart):
1084 """process a pushkey request"""
1081 """process a pushkey request"""
1085 dec = pushkey.decode
1082 dec = pushkey.decode
1086 namespace = dec(inpart.params['namespace'])
1083 namespace = dec(inpart.params['namespace'])
1087 key = dec(inpart.params['key'])
1084 key = dec(inpart.params['key'])
1088 old = dec(inpart.params['old'])
1085 old = dec(inpart.params['old'])
1089 new = dec(inpart.params['new'])
1086 new = dec(inpart.params['new'])
1090 ret = op.repo.pushkey(namespace, key, old, new)
1087 ret = op.repo.pushkey(namespace, key, old, new)
1091 record = {'namespace': namespace,
1088 record = {'namespace': namespace,
1092 'key': key,
1089 'key': key,
1093 'old': old,
1090 'old': old,
1094 'new': new}
1091 'new': new}
1095 op.records.add('pushkey', record)
1092 op.records.add('pushkey', record)
1096 if op.reply is not None:
1093 if op.reply is not None:
1097 rpart = op.reply.newpart('b2x:reply:pushkey')
1094 rpart = op.reply.newpart('b2x:reply:pushkey')
1098 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1095 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1099 rpart.addparam('return', '%i' % ret, mandatory=False)
1096 rpart.addparam('return', '%i' % ret, mandatory=False)
1100
1097
1101 @parthandler('b2x:reply:pushkey', ('return', 'in-reply-to'))
1098 @parthandler('b2x:reply:pushkey', ('return', 'in-reply-to'))
1102 def handlepushkeyreply(op, inpart):
1099 def handlepushkeyreply(op, inpart):
1103 """retrieve the result of a pushkey request"""
1100 """retrieve the result of a pushkey request"""
1104 ret = int(inpart.params['return'])
1101 ret = int(inpart.params['return'])
1105 partid = int(inpart.params['in-reply-to'])
1102 partid = int(inpart.params['in-reply-to'])
1106 op.records.add('pushkey', {'return': ret}, partid)
1103 op.records.add('pushkey', {'return': ret}, partid)
1107
1104
1108 @parthandler('b2x:obsmarkers')
1105 @parthandler('b2x:obsmarkers')
1109 def handleobsmarker(op, inpart):
1106 def handleobsmarker(op, inpart):
1110 """add a stream of obsmarkers to the repo"""
1107 """add a stream of obsmarkers to the repo"""
1111 tr = op.gettransaction()
1108 tr = op.gettransaction()
1112 new = op.repo.obsstore.mergemarkers(tr, inpart.read())
1109 new = op.repo.obsstore.mergemarkers(tr, inpart.read())
1113 if new:
1110 if new:
1114 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1111 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1115 op.records.add('obsmarkers', {'new': new})
1112 op.records.add('obsmarkers', {'new': new})
1116 if op.reply is not None:
1113 if op.reply is not None:
1117 rpart = op.reply.newpart('b2x:reply:obsmarkers')
1114 rpart = op.reply.newpart('b2x:reply:obsmarkers')
1118 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1115 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1119 rpart.addparam('new', '%i' % new, mandatory=False)
1116 rpart.addparam('new', '%i' % new, mandatory=False)
1120
1117
1121
1118
1122 @parthandler('b2x:reply:obsmarkers', ('new', 'in-reply-to'))
1119 @parthandler('b2x:reply:obsmarkers', ('new', 'in-reply-to'))
1123 def handlepushkeyreply(op, inpart):
1120 def handlepushkeyreply(op, inpart):
1124 """retrieve the result of a pushkey request"""
1121 """retrieve the result of a pushkey request"""
1125 ret = int(inpart.params['new'])
1122 ret = int(inpart.params['new'])
1126 partid = int(inpart.params['in-reply-to'])
1123 partid = int(inpart.params['in-reply-to'])
1127 op.records.add('obsmarkers', {'new': ret}, partid)
1124 op.records.add('obsmarkers', {'new': ret}, partid)
General Comments 0
You need to be logged in to leave comments. Login now