##// END OF EJS Templates
bundle2.unpackermixin: control for underlying file descriptor...
Eric Sumner -
r24026:3daef83a default
parent child Browse files
Show More
@@ -1,1141 +1,1167 b''
1 # bundle2.py - generic container format to transmit arbitrary data.
1 # bundle2.py - generic container format to transmit arbitrary data.
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7 """Handling of the new bundle2 format
7 """Handling of the new bundle2 format
8
8
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 payloads in an application agnostic way. It consist in a sequence of "parts"
10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 that will be handed to and processed by the application layer.
11 that will be handed to and processed by the application layer.
12
12
13
13
14 General format architecture
14 General format architecture
15 ===========================
15 ===========================
16
16
17 The format is architectured as follow
17 The format is architectured as follow
18
18
19 - magic string
19 - magic string
20 - stream level parameters
20 - stream level parameters
21 - payload parts (any number)
21 - payload parts (any number)
22 - end of stream marker.
22 - end of stream marker.
23
23
24 the Binary format
24 the Binary format
25 ============================
25 ============================
26
26
27 All numbers are unsigned and big-endian.
27 All numbers are unsigned and big-endian.
28
28
29 stream level parameters
29 stream level parameters
30 ------------------------
30 ------------------------
31
31
32 Binary format is as follow
32 Binary format is as follow
33
33
34 :params size: int32
34 :params size: int32
35
35
36 The total number of Bytes used by the parameters
36 The total number of Bytes used by the parameters
37
37
38 :params value: arbitrary number of Bytes
38 :params value: arbitrary number of Bytes
39
39
40 A blob of `params size` containing the serialized version of all stream level
40 A blob of `params size` containing the serialized version of all stream level
41 parameters.
41 parameters.
42
42
43 The blob contains a space separated list of parameters. Parameters with value
43 The blob contains a space separated list of parameters. Parameters with value
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45
45
46 Empty name are obviously forbidden.
46 Empty name are obviously forbidden.
47
47
48 Name MUST start with a letter. If this first letter is lower case, the
48 Name MUST start with a letter. If this first letter is lower case, the
49 parameter is advisory and can be safely ignored. However when the first
49 parameter is advisory and can be safely ignored. However when the first
50 letter is capital, the parameter is mandatory and the bundling process MUST
50 letter is capital, the parameter is mandatory and the bundling process MUST
51 stop if he is not able to proceed it.
51 stop if he is not able to proceed it.
52
52
53 Stream parameters use a simple textual format for two main reasons:
53 Stream parameters use a simple textual format for two main reasons:
54
54
55 - Stream level parameters should remain simple and we want to discourage any
55 - Stream level parameters should remain simple and we want to discourage any
56 crazy usage.
56 crazy usage.
57 - Textual data allow easy human inspection of a bundle2 header in case of
57 - Textual data allow easy human inspection of a bundle2 header in case of
58 troubles.
58 troubles.
59
59
60 Any Applicative level options MUST go into a bundle2 part instead.
60 Any Applicative level options MUST go into a bundle2 part instead.
61
61
62 Payload part
62 Payload part
63 ------------------------
63 ------------------------
64
64
65 Binary format is as follow
65 Binary format is as follow
66
66
67 :header size: int32
67 :header size: int32
68
68
69 The total number of Bytes used by the part headers. When the header is empty
69 The total number of Bytes used by the part headers. When the header is empty
70 (size = 0) this is interpreted as the end of stream marker.
70 (size = 0) this is interpreted as the end of stream marker.
71
71
72 :header:
72 :header:
73
73
74 The header defines how to interpret the part. It contains two piece of
74 The header defines how to interpret the part. It contains two piece of
75 data: the part type, and the part parameters.
75 data: the part type, and the part parameters.
76
76
77 The part type is used to route an application level handler, that can
77 The part type is used to route an application level handler, that can
78 interpret payload.
78 interpret payload.
79
79
80 Part parameters are passed to the application level handler. They are
80 Part parameters are passed to the application level handler. They are
81 meant to convey information that will help the application level object to
81 meant to convey information that will help the application level object to
82 interpret the part payload.
82 interpret the part payload.
83
83
84 The binary format of the header is has follow
84 The binary format of the header is has follow
85
85
86 :typesize: (one byte)
86 :typesize: (one byte)
87
87
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89
89
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 to this part.
91 to this part.
92
92
93 :parameters:
93 :parameters:
94
94
95 Part's parameter may have arbitrary content, the binary structure is::
95 Part's parameter may have arbitrary content, the binary structure is::
96
96
97 <mandatory-count><advisory-count><param-sizes><param-data>
97 <mandatory-count><advisory-count><param-sizes><param-data>
98
98
99 :mandatory-count: 1 byte, number of mandatory parameters
99 :mandatory-count: 1 byte, number of mandatory parameters
100
100
101 :advisory-count: 1 byte, number of advisory parameters
101 :advisory-count: 1 byte, number of advisory parameters
102
102
103 :param-sizes:
103 :param-sizes:
104
104
105 N couple of bytes, where N is the total number of parameters. Each
105 N couple of bytes, where N is the total number of parameters. Each
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107
107
108 :param-data:
108 :param-data:
109
109
110 A blob of bytes from which each parameter key and value can be
110 A blob of bytes from which each parameter key and value can be
111 retrieved using the list of size couples stored in the previous
111 retrieved using the list of size couples stored in the previous
112 field.
112 field.
113
113
114 Mandatory parameters comes first, then the advisory ones.
114 Mandatory parameters comes first, then the advisory ones.
115
115
116 Each parameter's key MUST be unique within the part.
116 Each parameter's key MUST be unique within the part.
117
117
118 :payload:
118 :payload:
119
119
120 payload is a series of `<chunksize><chunkdata>`.
120 payload is a series of `<chunksize><chunkdata>`.
121
121
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124
124
125 The current implementation always produces either zero or one chunk.
125 The current implementation always produces either zero or one chunk.
126 This is an implementation limitation that will ultimately be lifted.
126 This is an implementation limitation that will ultimately be lifted.
127
127
128 `chunksize` can be negative to trigger special case processing. No such
128 `chunksize` can be negative to trigger special case processing. No such
129 processing is in place yet.
129 processing is in place yet.
130
130
131 Bundle processing
131 Bundle processing
132 ============================
132 ============================
133
133
134 Each part is processed in order using a "part handler". Handler are registered
134 Each part is processed in order using a "part handler". Handler are registered
135 for a certain part type.
135 for a certain part type.
136
136
137 The matching of a part to its handler is case insensitive. The case of the
137 The matching of a part to its handler is case insensitive. The case of the
138 part type is used to know if a part is mandatory or advisory. If the Part type
138 part type is used to know if a part is mandatory or advisory. If the Part type
139 contains any uppercase char it is considered mandatory. When no handler is
139 contains any uppercase char it is considered mandatory. When no handler is
140 known for a Mandatory part, the process is aborted and an exception is raised.
140 known for a Mandatory part, the process is aborted and an exception is raised.
141 If the part is advisory and no handler is known, the part is ignored. When the
141 If the part is advisory and no handler is known, the part is ignored. When the
142 process is aborted, the full bundle is still read from the stream to keep the
142 process is aborted, the full bundle is still read from the stream to keep the
143 channel usable. But none of the part read from an abort are processed. In the
143 channel usable. But none of the part read from an abort are processed. In the
144 future, dropping the stream may become an option for channel we do not care to
144 future, dropping the stream may become an option for channel we do not care to
145 preserve.
145 preserve.
146 """
146 """
147
147
148 import errno
148 import sys
149 import sys
149 import util
150 import util
150 import struct
151 import struct
151 import urllib
152 import urllib
152 import string
153 import string
153 import obsolete
154 import obsolete
154 import pushkey
155 import pushkey
155 import url
156 import url
156 import re
157 import re
157
158
158 import changegroup, error
159 import changegroup, error
159 from i18n import _
160 from i18n import _
160
161
161 _pack = struct.pack
162 _pack = struct.pack
162 _unpack = struct.unpack
163 _unpack = struct.unpack
163
164
164 _magicstring = 'HG2Y'
165 _magicstring = 'HG2Y'
165
166
166 _fstreamparamsize = '>i'
167 _fstreamparamsize = '>i'
167 _fpartheadersize = '>i'
168 _fpartheadersize = '>i'
168 _fparttypesize = '>B'
169 _fparttypesize = '>B'
169 _fpartid = '>I'
170 _fpartid = '>I'
170 _fpayloadsize = '>i'
171 _fpayloadsize = '>i'
171 _fpartparamcount = '>BB'
172 _fpartparamcount = '>BB'
172
173
173 preferedchunksize = 4096
174 preferedchunksize = 4096
174
175
175 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
176 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
176
177
177 def validateparttype(parttype):
178 def validateparttype(parttype):
178 """raise ValueError if a parttype contains invalid character"""
179 """raise ValueError if a parttype contains invalid character"""
179 if _parttypeforbidden.search(parttype):
180 if _parttypeforbidden.search(parttype):
180 raise ValueError(parttype)
181 raise ValueError(parttype)
181
182
182 def _makefpartparamsizes(nbparams):
183 def _makefpartparamsizes(nbparams):
183 """return a struct format to read part parameter sizes
184 """return a struct format to read part parameter sizes
184
185
185 The number parameters is variable so we need to build that format
186 The number parameters is variable so we need to build that format
186 dynamically.
187 dynamically.
187 """
188 """
188 return '>'+('BB'*nbparams)
189 return '>'+('BB'*nbparams)
189
190
190 parthandlermapping = {}
191 parthandlermapping = {}
191
192
192 def parthandler(parttype, params=()):
193 def parthandler(parttype, params=()):
193 """decorator that register a function as a bundle2 part handler
194 """decorator that register a function as a bundle2 part handler
194
195
195 eg::
196 eg::
196
197
197 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
198 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
198 def myparttypehandler(...):
199 def myparttypehandler(...):
199 '''process a part of type "my part".'''
200 '''process a part of type "my part".'''
200 ...
201 ...
201 """
202 """
202 validateparttype(parttype)
203 validateparttype(parttype)
203 def _decorator(func):
204 def _decorator(func):
204 lparttype = parttype.lower() # enforce lower case matching.
205 lparttype = parttype.lower() # enforce lower case matching.
205 assert lparttype not in parthandlermapping
206 assert lparttype not in parthandlermapping
206 parthandlermapping[lparttype] = func
207 parthandlermapping[lparttype] = func
207 func.params = frozenset(params)
208 func.params = frozenset(params)
208 return func
209 return func
209 return _decorator
210 return _decorator
210
211
211 class unbundlerecords(object):
212 class unbundlerecords(object):
212 """keep record of what happens during and unbundle
213 """keep record of what happens during and unbundle
213
214
214 New records are added using `records.add('cat', obj)`. Where 'cat' is a
215 New records are added using `records.add('cat', obj)`. Where 'cat' is a
215 category of record and obj is an arbitrary object.
216 category of record and obj is an arbitrary object.
216
217
217 `records['cat']` will return all entries of this category 'cat'.
218 `records['cat']` will return all entries of this category 'cat'.
218
219
219 Iterating on the object itself will yield `('category', obj)` tuples
220 Iterating on the object itself will yield `('category', obj)` tuples
220 for all entries.
221 for all entries.
221
222
222 All iterations happens in chronological order.
223 All iterations happens in chronological order.
223 """
224 """
224
225
225 def __init__(self):
226 def __init__(self):
226 self._categories = {}
227 self._categories = {}
227 self._sequences = []
228 self._sequences = []
228 self._replies = {}
229 self._replies = {}
229
230
230 def add(self, category, entry, inreplyto=None):
231 def add(self, category, entry, inreplyto=None):
231 """add a new record of a given category.
232 """add a new record of a given category.
232
233
233 The entry can then be retrieved in the list returned by
234 The entry can then be retrieved in the list returned by
234 self['category']."""
235 self['category']."""
235 self._categories.setdefault(category, []).append(entry)
236 self._categories.setdefault(category, []).append(entry)
236 self._sequences.append((category, entry))
237 self._sequences.append((category, entry))
237 if inreplyto is not None:
238 if inreplyto is not None:
238 self.getreplies(inreplyto).add(category, entry)
239 self.getreplies(inreplyto).add(category, entry)
239
240
240 def getreplies(self, partid):
241 def getreplies(self, partid):
241 """get the records that are replies to a specific part"""
242 """get the records that are replies to a specific part"""
242 return self._replies.setdefault(partid, unbundlerecords())
243 return self._replies.setdefault(partid, unbundlerecords())
243
244
244 def __getitem__(self, cat):
245 def __getitem__(self, cat):
245 return tuple(self._categories.get(cat, ()))
246 return tuple(self._categories.get(cat, ()))
246
247
247 def __iter__(self):
248 def __iter__(self):
248 return iter(self._sequences)
249 return iter(self._sequences)
249
250
250 def __len__(self):
251 def __len__(self):
251 return len(self._sequences)
252 return len(self._sequences)
252
253
253 def __nonzero__(self):
254 def __nonzero__(self):
254 return bool(self._sequences)
255 return bool(self._sequences)
255
256
256 class bundleoperation(object):
257 class bundleoperation(object):
257 """an object that represents a single bundling process
258 """an object that represents a single bundling process
258
259
259 Its purpose is to carry unbundle-related objects and states.
260 Its purpose is to carry unbundle-related objects and states.
260
261
261 A new object should be created at the beginning of each bundle processing.
262 A new object should be created at the beginning of each bundle processing.
262 The object is to be returned by the processing function.
263 The object is to be returned by the processing function.
263
264
264 The object has very little content now it will ultimately contain:
265 The object has very little content now it will ultimately contain:
265 * an access to the repo the bundle is applied to,
266 * an access to the repo the bundle is applied to,
266 * a ui object,
267 * a ui object,
267 * a way to retrieve a transaction to add changes to the repo,
268 * a way to retrieve a transaction to add changes to the repo,
268 * a way to record the result of processing each part,
269 * a way to record the result of processing each part,
269 * a way to construct a bundle response when applicable.
270 * a way to construct a bundle response when applicable.
270 """
271 """
271
272
272 def __init__(self, repo, transactiongetter):
273 def __init__(self, repo, transactiongetter):
273 self.repo = repo
274 self.repo = repo
274 self.ui = repo.ui
275 self.ui = repo.ui
275 self.records = unbundlerecords()
276 self.records = unbundlerecords()
276 self.gettransaction = transactiongetter
277 self.gettransaction = transactiongetter
277 self.reply = None
278 self.reply = None
278
279
279 class TransactionUnavailable(RuntimeError):
280 class TransactionUnavailable(RuntimeError):
280 pass
281 pass
281
282
282 def _notransaction():
283 def _notransaction():
283 """default method to get a transaction while processing a bundle
284 """default method to get a transaction while processing a bundle
284
285
285 Raise an exception to highlight the fact that no transaction was expected
286 Raise an exception to highlight the fact that no transaction was expected
286 to be created"""
287 to be created"""
287 raise TransactionUnavailable()
288 raise TransactionUnavailable()
288
289
289 def processbundle(repo, unbundler, transactiongetter=None):
290 def processbundle(repo, unbundler, transactiongetter=None):
290 """This function process a bundle, apply effect to/from a repo
291 """This function process a bundle, apply effect to/from a repo
291
292
292 It iterates over each part then searches for and uses the proper handling
293 It iterates over each part then searches for and uses the proper handling
293 code to process the part. Parts are processed in order.
294 code to process the part. Parts are processed in order.
294
295
295 This is very early version of this function that will be strongly reworked
296 This is very early version of this function that will be strongly reworked
296 before final usage.
297 before final usage.
297
298
298 Unknown Mandatory part will abort the process.
299 Unknown Mandatory part will abort the process.
299 """
300 """
300 if transactiongetter is None:
301 if transactiongetter is None:
301 transactiongetter = _notransaction
302 transactiongetter = _notransaction
302 op = bundleoperation(repo, transactiongetter)
303 op = bundleoperation(repo, transactiongetter)
303 # todo:
304 # todo:
304 # - replace this is a init function soon.
305 # - replace this is a init function soon.
305 # - exception catching
306 # - exception catching
306 unbundler.params
307 unbundler.params
307 iterparts = unbundler.iterparts()
308 iterparts = unbundler.iterparts()
308 part = None
309 part = None
309 try:
310 try:
310 for part in iterparts:
311 for part in iterparts:
311 _processpart(op, part)
312 _processpart(op, part)
312 except Exception, exc:
313 except Exception, exc:
313 for part in iterparts:
314 for part in iterparts:
314 # consume the bundle content
315 # consume the bundle content
315 part.read()
316 part.read()
316 # Small hack to let caller code distinguish exceptions from bundle2
317 # Small hack to let caller code distinguish exceptions from bundle2
317 # processing from processing the old format. This is mostly
318 # processing from processing the old format. This is mostly
318 # needed to handle different return codes to unbundle according to the
319 # needed to handle different return codes to unbundle according to the
319 # type of bundle. We should probably clean up or drop this return code
320 # type of bundle. We should probably clean up or drop this return code
320 # craziness in a future version.
321 # craziness in a future version.
321 exc.duringunbundle2 = True
322 exc.duringunbundle2 = True
322 raise
323 raise
323 return op
324 return op
324
325
325 def _processpart(op, part):
326 def _processpart(op, part):
326 """process a single part from a bundle
327 """process a single part from a bundle
327
328
328 The part is guaranteed to have been fully consumed when the function exits
329 The part is guaranteed to have been fully consumed when the function exits
329 (even if an exception is raised)."""
330 (even if an exception is raised)."""
330 try:
331 try:
331 try:
332 try:
332 handler = parthandlermapping.get(part.type)
333 handler = parthandlermapping.get(part.type)
333 if handler is None:
334 if handler is None:
334 raise error.UnsupportedPartError(parttype=part.type)
335 raise error.UnsupportedPartError(parttype=part.type)
335 op.ui.debug('found a handler for part %r\n' % part.type)
336 op.ui.debug('found a handler for part %r\n' % part.type)
336 unknownparams = part.mandatorykeys - handler.params
337 unknownparams = part.mandatorykeys - handler.params
337 if unknownparams:
338 if unknownparams:
338 unknownparams = list(unknownparams)
339 unknownparams = list(unknownparams)
339 unknownparams.sort()
340 unknownparams.sort()
340 raise error.UnsupportedPartError(parttype=part.type,
341 raise error.UnsupportedPartError(parttype=part.type,
341 params=unknownparams)
342 params=unknownparams)
342 except error.UnsupportedPartError, exc:
343 except error.UnsupportedPartError, exc:
343 if part.mandatory: # mandatory parts
344 if part.mandatory: # mandatory parts
344 raise
345 raise
345 op.ui.debug('ignoring unsupported advisory part %s\n' % exc)
346 op.ui.debug('ignoring unsupported advisory part %s\n' % exc)
346 return # skip to part processing
347 return # skip to part processing
347
348
348 # handler is called outside the above try block so that we don't
349 # handler is called outside the above try block so that we don't
349 # risk catching KeyErrors from anything other than the
350 # risk catching KeyErrors from anything other than the
350 # parthandlermapping lookup (any KeyError raised by handler()
351 # parthandlermapping lookup (any KeyError raised by handler()
351 # itself represents a defect of a different variety).
352 # itself represents a defect of a different variety).
352 output = None
353 output = None
353 if op.reply is not None:
354 if op.reply is not None:
354 op.ui.pushbuffer(error=True)
355 op.ui.pushbuffer(error=True)
355 output = ''
356 output = ''
356 try:
357 try:
357 handler(op, part)
358 handler(op, part)
358 finally:
359 finally:
359 if output is not None:
360 if output is not None:
360 output = op.ui.popbuffer()
361 output = op.ui.popbuffer()
361 if output:
362 if output:
362 outpart = op.reply.newpart('b2x:output', data=output,
363 outpart = op.reply.newpart('b2x:output', data=output,
363 mandatory=False)
364 mandatory=False)
364 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
365 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
365 finally:
366 finally:
366 # consume the part content to not corrupt the stream.
367 # consume the part content to not corrupt the stream.
367 part.read()
368 part.read()
368
369
369
370
370 def decodecaps(blob):
371 def decodecaps(blob):
371 """decode a bundle2 caps bytes blob into a dictionary
372 """decode a bundle2 caps bytes blob into a dictionary
372
373
373 The blob is a list of capabilities (one per line)
374 The blob is a list of capabilities (one per line)
374 Capabilities may have values using a line of the form::
375 Capabilities may have values using a line of the form::
375
376
376 capability=value1,value2,value3
377 capability=value1,value2,value3
377
378
378 The values are always a list."""
379 The values are always a list."""
379 caps = {}
380 caps = {}
380 for line in blob.splitlines():
381 for line in blob.splitlines():
381 if not line:
382 if not line:
382 continue
383 continue
383 if '=' not in line:
384 if '=' not in line:
384 key, vals = line, ()
385 key, vals = line, ()
385 else:
386 else:
386 key, vals = line.split('=', 1)
387 key, vals = line.split('=', 1)
387 vals = vals.split(',')
388 vals = vals.split(',')
388 key = urllib.unquote(key)
389 key = urllib.unquote(key)
389 vals = [urllib.unquote(v) for v in vals]
390 vals = [urllib.unquote(v) for v in vals]
390 caps[key] = vals
391 caps[key] = vals
391 return caps
392 return caps
392
393
393 def encodecaps(caps):
394 def encodecaps(caps):
394 """encode a bundle2 caps dictionary into a bytes blob"""
395 """encode a bundle2 caps dictionary into a bytes blob"""
395 chunks = []
396 chunks = []
396 for ca in sorted(caps):
397 for ca in sorted(caps):
397 vals = caps[ca]
398 vals = caps[ca]
398 ca = urllib.quote(ca)
399 ca = urllib.quote(ca)
399 vals = [urllib.quote(v) for v in vals]
400 vals = [urllib.quote(v) for v in vals]
400 if vals:
401 if vals:
401 ca = "%s=%s" % (ca, ','.join(vals))
402 ca = "%s=%s" % (ca, ','.join(vals))
402 chunks.append(ca)
403 chunks.append(ca)
403 return '\n'.join(chunks)
404 return '\n'.join(chunks)
404
405
405 class bundle20(object):
406 class bundle20(object):
406 """represent an outgoing bundle2 container
407 """represent an outgoing bundle2 container
407
408
408 Use the `addparam` method to add stream level parameter. and `newpart` to
409 Use the `addparam` method to add stream level parameter. and `newpart` to
409 populate it. Then call `getchunks` to retrieve all the binary chunks of
410 populate it. Then call `getchunks` to retrieve all the binary chunks of
410 data that compose the bundle2 container."""
411 data that compose the bundle2 container."""
411
412
412 def __init__(self, ui, capabilities=()):
413 def __init__(self, ui, capabilities=()):
413 self.ui = ui
414 self.ui = ui
414 self._params = []
415 self._params = []
415 self._parts = []
416 self._parts = []
416 self.capabilities = dict(capabilities)
417 self.capabilities = dict(capabilities)
417
418
418 @property
419 @property
419 def nbparts(self):
420 def nbparts(self):
420 """total number of parts added to the bundler"""
421 """total number of parts added to the bundler"""
421 return len(self._parts)
422 return len(self._parts)
422
423
423 # methods used to defines the bundle2 content
424 # methods used to defines the bundle2 content
424 def addparam(self, name, value=None):
425 def addparam(self, name, value=None):
425 """add a stream level parameter"""
426 """add a stream level parameter"""
426 if not name:
427 if not name:
427 raise ValueError('empty parameter name')
428 raise ValueError('empty parameter name')
428 if name[0] not in string.letters:
429 if name[0] not in string.letters:
429 raise ValueError('non letter first character: %r' % name)
430 raise ValueError('non letter first character: %r' % name)
430 self._params.append((name, value))
431 self._params.append((name, value))
431
432
432 def addpart(self, part):
433 def addpart(self, part):
433 """add a new part to the bundle2 container
434 """add a new part to the bundle2 container
434
435
435 Parts contains the actual applicative payload."""
436 Parts contains the actual applicative payload."""
436 assert part.id is None
437 assert part.id is None
437 part.id = len(self._parts) # very cheap counter
438 part.id = len(self._parts) # very cheap counter
438 self._parts.append(part)
439 self._parts.append(part)
439
440
440 def newpart(self, typeid, *args, **kwargs):
441 def newpart(self, typeid, *args, **kwargs):
441 """create a new part and add it to the containers
442 """create a new part and add it to the containers
442
443
443 As the part is directly added to the containers. For now, this means
444 As the part is directly added to the containers. For now, this means
444 that any failure to properly initialize the part after calling
445 that any failure to properly initialize the part after calling
445 ``newpart`` should result in a failure of the whole bundling process.
446 ``newpart`` should result in a failure of the whole bundling process.
446
447
447 You can still fall back to manually create and add if you need better
448 You can still fall back to manually create and add if you need better
448 control."""
449 control."""
449 part = bundlepart(typeid, *args, **kwargs)
450 part = bundlepart(typeid, *args, **kwargs)
450 self.addpart(part)
451 self.addpart(part)
451 return part
452 return part
452
453
453 # methods used to generate the bundle2 stream
454 # methods used to generate the bundle2 stream
454 def getchunks(self):
455 def getchunks(self):
455 self.ui.debug('start emission of %s stream\n' % _magicstring)
456 self.ui.debug('start emission of %s stream\n' % _magicstring)
456 yield _magicstring
457 yield _magicstring
457 param = self._paramchunk()
458 param = self._paramchunk()
458 self.ui.debug('bundle parameter: %s\n' % param)
459 self.ui.debug('bundle parameter: %s\n' % param)
459 yield _pack(_fstreamparamsize, len(param))
460 yield _pack(_fstreamparamsize, len(param))
460 if param:
461 if param:
461 yield param
462 yield param
462
463
463 self.ui.debug('start of parts\n')
464 self.ui.debug('start of parts\n')
464 for part in self._parts:
465 for part in self._parts:
465 self.ui.debug('bundle part: "%s"\n' % part.type)
466 self.ui.debug('bundle part: "%s"\n' % part.type)
466 for chunk in part.getchunks():
467 for chunk in part.getchunks():
467 yield chunk
468 yield chunk
468 self.ui.debug('end of bundle\n')
469 self.ui.debug('end of bundle\n')
469 yield _pack(_fpartheadersize, 0)
470 yield _pack(_fpartheadersize, 0)
470
471
471 def _paramchunk(self):
472 def _paramchunk(self):
472 """return a encoded version of all stream parameters"""
473 """return a encoded version of all stream parameters"""
473 blocks = []
474 blocks = []
474 for par, value in self._params:
475 for par, value in self._params:
475 par = urllib.quote(par)
476 par = urllib.quote(par)
476 if value is not None:
477 if value is not None:
477 value = urllib.quote(value)
478 value = urllib.quote(value)
478 par = '%s=%s' % (par, value)
479 par = '%s=%s' % (par, value)
479 blocks.append(par)
480 blocks.append(par)
480 return ' '.join(blocks)
481 return ' '.join(blocks)
481
482
482 class unpackermixin(object):
483 class unpackermixin(object):
483 """A mixin to extract bytes and struct data from a stream"""
484 """A mixin to extract bytes and struct data from a stream"""
484
485
485 def __init__(self, fp):
486 def __init__(self, fp):
486 self._fp = fp
487 self._fp = fp
488 self._seekable = (util.safehasattr(fp, 'seek') and
489 util.safehasattr(fp, 'tell'))
487
490
488 def _unpack(self, format):
491 def _unpack(self, format):
489 """unpack this struct format from the stream"""
492 """unpack this struct format from the stream"""
490 data = self._readexact(struct.calcsize(format))
493 data = self._readexact(struct.calcsize(format))
491 return _unpack(format, data)
494 return _unpack(format, data)
492
495
493 def _readexact(self, size):
496 def _readexact(self, size):
494 """read exactly <size> bytes from the stream"""
497 """read exactly <size> bytes from the stream"""
495 return changegroup.readexactly(self._fp, size)
498 return changegroup.readexactly(self._fp, size)
496
499
500 def seek(self, offset, whence):
501 """move the underlying file pointer"""
502 if self._seekable:
503 return self._fp.seek(offset, whence)
504 else:
505 raise NotImplementedError(_('File pointer is not seekable'))
506
507 def tell(self):
508 """return the file offset, or None if file is not seekable"""
509 if self._seekable:
510 try:
511 return self._fp.tell()
512 except IOError, e:
513 if e.errno == errno.ESPIPE:
514 self._seekable = False
515 else:
516 raise
517 return None
518
519 def close(self):
520 """close underlying file"""
521 if util.safehasattr(self._fp, 'close'):
522 return self._fp.close()
497
523
498 class unbundle20(unpackermixin):
524 class unbundle20(unpackermixin):
499 """interpret a bundle2 stream
525 """interpret a bundle2 stream
500
526
501 This class is fed with a binary stream and yields parts through its
527 This class is fed with a binary stream and yields parts through its
502 `iterparts` methods."""
528 `iterparts` methods."""
503
529
504 def __init__(self, ui, fp, header=None):
530 def __init__(self, ui, fp, header=None):
505 """If header is specified, we do not read it out of the stream."""
531 """If header is specified, we do not read it out of the stream."""
506 self.ui = ui
532 self.ui = ui
507 super(unbundle20, self).__init__(fp)
533 super(unbundle20, self).__init__(fp)
508 if header is None:
534 if header is None:
509 header = self._readexact(4)
535 header = self._readexact(4)
510 magic, version = header[0:2], header[2:4]
536 magic, version = header[0:2], header[2:4]
511 if magic != 'HG':
537 if magic != 'HG':
512 raise util.Abort(_('not a Mercurial bundle'))
538 raise util.Abort(_('not a Mercurial bundle'))
513 if version != '2Y':
539 if version != '2Y':
514 raise util.Abort(_('unknown bundle version %s') % version)
540 raise util.Abort(_('unknown bundle version %s') % version)
515 self.ui.debug('start processing of %s stream\n' % header)
541 self.ui.debug('start processing of %s stream\n' % header)
516
542
517 @util.propertycache
543 @util.propertycache
518 def params(self):
544 def params(self):
519 """dictionary of stream level parameters"""
545 """dictionary of stream level parameters"""
520 self.ui.debug('reading bundle2 stream parameters\n')
546 self.ui.debug('reading bundle2 stream parameters\n')
521 params = {}
547 params = {}
522 paramssize = self._unpack(_fstreamparamsize)[0]
548 paramssize = self._unpack(_fstreamparamsize)[0]
523 if paramssize < 0:
549 if paramssize < 0:
524 raise error.BundleValueError('negative bundle param size: %i'
550 raise error.BundleValueError('negative bundle param size: %i'
525 % paramssize)
551 % paramssize)
526 if paramssize:
552 if paramssize:
527 for p in self._readexact(paramssize).split(' '):
553 for p in self._readexact(paramssize).split(' '):
528 p = p.split('=', 1)
554 p = p.split('=', 1)
529 p = [urllib.unquote(i) for i in p]
555 p = [urllib.unquote(i) for i in p]
530 if len(p) < 2:
556 if len(p) < 2:
531 p.append(None)
557 p.append(None)
532 self._processparam(*p)
558 self._processparam(*p)
533 params[p[0]] = p[1]
559 params[p[0]] = p[1]
534 return params
560 return params
535
561
536 def _processparam(self, name, value):
562 def _processparam(self, name, value):
537 """process a parameter, applying its effect if needed
563 """process a parameter, applying its effect if needed
538
564
539 Parameter starting with a lower case letter are advisory and will be
565 Parameter starting with a lower case letter are advisory and will be
540 ignored when unknown. Those starting with an upper case letter are
566 ignored when unknown. Those starting with an upper case letter are
541 mandatory and will this function will raise a KeyError when unknown.
567 mandatory and will this function will raise a KeyError when unknown.
542
568
543 Note: no option are currently supported. Any input will be either
569 Note: no option are currently supported. Any input will be either
544 ignored or failing.
570 ignored or failing.
545 """
571 """
546 if not name:
572 if not name:
547 raise ValueError('empty parameter name')
573 raise ValueError('empty parameter name')
548 if name[0] not in string.letters:
574 if name[0] not in string.letters:
549 raise ValueError('non letter first character: %r' % name)
575 raise ValueError('non letter first character: %r' % name)
550 # Some logic will be later added here to try to process the option for
576 # Some logic will be later added here to try to process the option for
551 # a dict of known parameter.
577 # a dict of known parameter.
552 if name[0].islower():
578 if name[0].islower():
553 self.ui.debug("ignoring unknown parameter %r\n" % name)
579 self.ui.debug("ignoring unknown parameter %r\n" % name)
554 else:
580 else:
555 raise error.UnsupportedPartError(params=(name,))
581 raise error.UnsupportedPartError(params=(name,))
556
582
557
583
558 def iterparts(self):
584 def iterparts(self):
559 """yield all parts contained in the stream"""
585 """yield all parts contained in the stream"""
560 # make sure param have been loaded
586 # make sure param have been loaded
561 self.params
587 self.params
562 self.ui.debug('start extraction of bundle2 parts\n')
588 self.ui.debug('start extraction of bundle2 parts\n')
563 headerblock = self._readpartheader()
589 headerblock = self._readpartheader()
564 while headerblock is not None:
590 while headerblock is not None:
565 part = unbundlepart(self.ui, headerblock, self._fp)
591 part = unbundlepart(self.ui, headerblock, self._fp)
566 yield part
592 yield part
567 headerblock = self._readpartheader()
593 headerblock = self._readpartheader()
568 self.ui.debug('end of bundle2 stream\n')
594 self.ui.debug('end of bundle2 stream\n')
569
595
570 def _readpartheader(self):
596 def _readpartheader(self):
571 """reads a part header size and return the bytes blob
597 """reads a part header size and return the bytes blob
572
598
573 returns None if empty"""
599 returns None if empty"""
574 headersize = self._unpack(_fpartheadersize)[0]
600 headersize = self._unpack(_fpartheadersize)[0]
575 if headersize < 0:
601 if headersize < 0:
576 raise error.BundleValueError('negative part header size: %i'
602 raise error.BundleValueError('negative part header size: %i'
577 % headersize)
603 % headersize)
578 self.ui.debug('part header size: %i\n' % headersize)
604 self.ui.debug('part header size: %i\n' % headersize)
579 if headersize:
605 if headersize:
580 return self._readexact(headersize)
606 return self._readexact(headersize)
581 return None
607 return None
582
608
583
609
584 class bundlepart(object):
610 class bundlepart(object):
585 """A bundle2 part contains application level payload
611 """A bundle2 part contains application level payload
586
612
587 The part `type` is used to route the part to the application level
613 The part `type` is used to route the part to the application level
588 handler.
614 handler.
589
615
590 The part payload is contained in ``part.data``. It could be raw bytes or a
616 The part payload is contained in ``part.data``. It could be raw bytes or a
591 generator of byte chunks.
617 generator of byte chunks.
592
618
593 You can add parameters to the part using the ``addparam`` method.
619 You can add parameters to the part using the ``addparam`` method.
594 Parameters can be either mandatory (default) or advisory. Remote side
620 Parameters can be either mandatory (default) or advisory. Remote side
595 should be able to safely ignore the advisory ones.
621 should be able to safely ignore the advisory ones.
596
622
597 Both data and parameters cannot be modified after the generation has begun.
623 Both data and parameters cannot be modified after the generation has begun.
598 """
624 """
599
625
600 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
626 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
601 data='', mandatory=True):
627 data='', mandatory=True):
602 validateparttype(parttype)
628 validateparttype(parttype)
603 self.id = None
629 self.id = None
604 self.type = parttype
630 self.type = parttype
605 self._data = data
631 self._data = data
606 self._mandatoryparams = list(mandatoryparams)
632 self._mandatoryparams = list(mandatoryparams)
607 self._advisoryparams = list(advisoryparams)
633 self._advisoryparams = list(advisoryparams)
608 # checking for duplicated entries
634 # checking for duplicated entries
609 self._seenparams = set()
635 self._seenparams = set()
610 for pname, __ in self._mandatoryparams + self._advisoryparams:
636 for pname, __ in self._mandatoryparams + self._advisoryparams:
611 if pname in self._seenparams:
637 if pname in self._seenparams:
612 raise RuntimeError('duplicated params: %s' % pname)
638 raise RuntimeError('duplicated params: %s' % pname)
613 self._seenparams.add(pname)
639 self._seenparams.add(pname)
614 # status of the part's generation:
640 # status of the part's generation:
615 # - None: not started,
641 # - None: not started,
616 # - False: currently generated,
642 # - False: currently generated,
617 # - True: generation done.
643 # - True: generation done.
618 self._generated = None
644 self._generated = None
619 self.mandatory = mandatory
645 self.mandatory = mandatory
620
646
621 # methods used to defines the part content
647 # methods used to defines the part content
622 def __setdata(self, data):
648 def __setdata(self, data):
623 if self._generated is not None:
649 if self._generated is not None:
624 raise error.ReadOnlyPartError('part is being generated')
650 raise error.ReadOnlyPartError('part is being generated')
625 self._data = data
651 self._data = data
626 def __getdata(self):
652 def __getdata(self):
627 return self._data
653 return self._data
628 data = property(__getdata, __setdata)
654 data = property(__getdata, __setdata)
629
655
630 @property
656 @property
631 def mandatoryparams(self):
657 def mandatoryparams(self):
632 # make it an immutable tuple to force people through ``addparam``
658 # make it an immutable tuple to force people through ``addparam``
633 return tuple(self._mandatoryparams)
659 return tuple(self._mandatoryparams)
634
660
635 @property
661 @property
636 def advisoryparams(self):
662 def advisoryparams(self):
637 # make it an immutable tuple to force people through ``addparam``
663 # make it an immutable tuple to force people through ``addparam``
638 return tuple(self._advisoryparams)
664 return tuple(self._advisoryparams)
639
665
640 def addparam(self, name, value='', mandatory=True):
666 def addparam(self, name, value='', mandatory=True):
641 if self._generated is not None:
667 if self._generated is not None:
642 raise error.ReadOnlyPartError('part is being generated')
668 raise error.ReadOnlyPartError('part is being generated')
643 if name in self._seenparams:
669 if name in self._seenparams:
644 raise ValueError('duplicated params: %s' % name)
670 raise ValueError('duplicated params: %s' % name)
645 self._seenparams.add(name)
671 self._seenparams.add(name)
646 params = self._advisoryparams
672 params = self._advisoryparams
647 if mandatory:
673 if mandatory:
648 params = self._mandatoryparams
674 params = self._mandatoryparams
649 params.append((name, value))
675 params.append((name, value))
650
676
651 # methods used to generates the bundle2 stream
677 # methods used to generates the bundle2 stream
652 def getchunks(self):
678 def getchunks(self):
653 if self._generated is not None:
679 if self._generated is not None:
654 raise RuntimeError('part can only be consumed once')
680 raise RuntimeError('part can only be consumed once')
655 self._generated = False
681 self._generated = False
656 #### header
682 #### header
657 if self.mandatory:
683 if self.mandatory:
658 parttype = self.type.upper()
684 parttype = self.type.upper()
659 else:
685 else:
660 parttype = self.type.lower()
686 parttype = self.type.lower()
661 ## parttype
687 ## parttype
662 header = [_pack(_fparttypesize, len(parttype)),
688 header = [_pack(_fparttypesize, len(parttype)),
663 parttype, _pack(_fpartid, self.id),
689 parttype, _pack(_fpartid, self.id),
664 ]
690 ]
665 ## parameters
691 ## parameters
666 # count
692 # count
667 manpar = self.mandatoryparams
693 manpar = self.mandatoryparams
668 advpar = self.advisoryparams
694 advpar = self.advisoryparams
669 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
695 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
670 # size
696 # size
671 parsizes = []
697 parsizes = []
672 for key, value in manpar:
698 for key, value in manpar:
673 parsizes.append(len(key))
699 parsizes.append(len(key))
674 parsizes.append(len(value))
700 parsizes.append(len(value))
675 for key, value in advpar:
701 for key, value in advpar:
676 parsizes.append(len(key))
702 parsizes.append(len(key))
677 parsizes.append(len(value))
703 parsizes.append(len(value))
678 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
704 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
679 header.append(paramsizes)
705 header.append(paramsizes)
680 # key, value
706 # key, value
681 for key, value in manpar:
707 for key, value in manpar:
682 header.append(key)
708 header.append(key)
683 header.append(value)
709 header.append(value)
684 for key, value in advpar:
710 for key, value in advpar:
685 header.append(key)
711 header.append(key)
686 header.append(value)
712 header.append(value)
687 ## finalize header
713 ## finalize header
688 headerchunk = ''.join(header)
714 headerchunk = ''.join(header)
689 yield _pack(_fpartheadersize, len(headerchunk))
715 yield _pack(_fpartheadersize, len(headerchunk))
690 yield headerchunk
716 yield headerchunk
691 ## payload
717 ## payload
692 try:
718 try:
693 for chunk in self._payloadchunks():
719 for chunk in self._payloadchunks():
694 yield _pack(_fpayloadsize, len(chunk))
720 yield _pack(_fpayloadsize, len(chunk))
695 yield chunk
721 yield chunk
696 except Exception, exc:
722 except Exception, exc:
697 # backup exception data for later
723 # backup exception data for later
698 exc_info = sys.exc_info()
724 exc_info = sys.exc_info()
699 msg = 'unexpected error: %s' % exc
725 msg = 'unexpected error: %s' % exc
700 interpart = bundlepart('b2x:error:abort', [('message', msg)],
726 interpart = bundlepart('b2x:error:abort', [('message', msg)],
701 mandatory=False)
727 mandatory=False)
702 interpart.id = 0
728 interpart.id = 0
703 yield _pack(_fpayloadsize, -1)
729 yield _pack(_fpayloadsize, -1)
704 for chunk in interpart.getchunks():
730 for chunk in interpart.getchunks():
705 yield chunk
731 yield chunk
706 # abort current part payload
732 # abort current part payload
707 yield _pack(_fpayloadsize, 0)
733 yield _pack(_fpayloadsize, 0)
708 raise exc_info[0], exc_info[1], exc_info[2]
734 raise exc_info[0], exc_info[1], exc_info[2]
709 # end of payload
735 # end of payload
710 yield _pack(_fpayloadsize, 0)
736 yield _pack(_fpayloadsize, 0)
711 self._generated = True
737 self._generated = True
712
738
713 def _payloadchunks(self):
739 def _payloadchunks(self):
714 """yield chunks of a the part payload
740 """yield chunks of a the part payload
715
741
716 Exists to handle the different methods to provide data to a part."""
742 Exists to handle the different methods to provide data to a part."""
717 # we only support fixed size data now.
743 # we only support fixed size data now.
718 # This will be improved in the future.
744 # This will be improved in the future.
719 if util.safehasattr(self.data, 'next'):
745 if util.safehasattr(self.data, 'next'):
720 buff = util.chunkbuffer(self.data)
746 buff = util.chunkbuffer(self.data)
721 chunk = buff.read(preferedchunksize)
747 chunk = buff.read(preferedchunksize)
722 while chunk:
748 while chunk:
723 yield chunk
749 yield chunk
724 chunk = buff.read(preferedchunksize)
750 chunk = buff.read(preferedchunksize)
725 elif len(self.data):
751 elif len(self.data):
726 yield self.data
752 yield self.data
727
753
728
754
729 flaginterrupt = -1
755 flaginterrupt = -1
730
756
731 class interrupthandler(unpackermixin):
757 class interrupthandler(unpackermixin):
732 """read one part and process it with restricted capability
758 """read one part and process it with restricted capability
733
759
734 This allows to transmit exception raised on the producer size during part
760 This allows to transmit exception raised on the producer size during part
735 iteration while the consumer is reading a part.
761 iteration while the consumer is reading a part.
736
762
737 Part processed in this manner only have access to a ui object,"""
763 Part processed in this manner only have access to a ui object,"""
738
764
739 def __init__(self, ui, fp):
765 def __init__(self, ui, fp):
740 super(interrupthandler, self).__init__(fp)
766 super(interrupthandler, self).__init__(fp)
741 self.ui = ui
767 self.ui = ui
742
768
743 def _readpartheader(self):
769 def _readpartheader(self):
744 """reads a part header size and return the bytes blob
770 """reads a part header size and return the bytes blob
745
771
746 returns None if empty"""
772 returns None if empty"""
747 headersize = self._unpack(_fpartheadersize)[0]
773 headersize = self._unpack(_fpartheadersize)[0]
748 if headersize < 0:
774 if headersize < 0:
749 raise error.BundleValueError('negative part header size: %i'
775 raise error.BundleValueError('negative part header size: %i'
750 % headersize)
776 % headersize)
751 self.ui.debug('part header size: %i\n' % headersize)
777 self.ui.debug('part header size: %i\n' % headersize)
752 if headersize:
778 if headersize:
753 return self._readexact(headersize)
779 return self._readexact(headersize)
754 return None
780 return None
755
781
756 def __call__(self):
782 def __call__(self):
757 self.ui.debug('bundle2 stream interruption, looking for a part.\n')
783 self.ui.debug('bundle2 stream interruption, looking for a part.\n')
758 headerblock = self._readpartheader()
784 headerblock = self._readpartheader()
759 if headerblock is None:
785 if headerblock is None:
760 self.ui.debug('no part found during interruption.\n')
786 self.ui.debug('no part found during interruption.\n')
761 return
787 return
762 part = unbundlepart(self.ui, headerblock, self._fp)
788 part = unbundlepart(self.ui, headerblock, self._fp)
763 op = interruptoperation(self.ui)
789 op = interruptoperation(self.ui)
764 _processpart(op, part)
790 _processpart(op, part)
765
791
766 class interruptoperation(object):
792 class interruptoperation(object):
767 """A limited operation to be use by part handler during interruption
793 """A limited operation to be use by part handler during interruption
768
794
769 It only have access to an ui object.
795 It only have access to an ui object.
770 """
796 """
771
797
772 def __init__(self, ui):
798 def __init__(self, ui):
773 self.ui = ui
799 self.ui = ui
774 self.reply = None
800 self.reply = None
775
801
776 @property
802 @property
777 def repo(self):
803 def repo(self):
778 raise RuntimeError('no repo access from stream interruption')
804 raise RuntimeError('no repo access from stream interruption')
779
805
780 def gettransaction(self):
806 def gettransaction(self):
781 raise TransactionUnavailable('no repo access from stream interruption')
807 raise TransactionUnavailable('no repo access from stream interruption')
782
808
783 class unbundlepart(unpackermixin):
809 class unbundlepart(unpackermixin):
784 """a bundle part read from a bundle"""
810 """a bundle part read from a bundle"""
785
811
786 def __init__(self, ui, header, fp):
812 def __init__(self, ui, header, fp):
787 super(unbundlepart, self).__init__(fp)
813 super(unbundlepart, self).__init__(fp)
788 self.ui = ui
814 self.ui = ui
789 # unbundle state attr
815 # unbundle state attr
790 self._headerdata = header
816 self._headerdata = header
791 self._headeroffset = 0
817 self._headeroffset = 0
792 self._initialized = False
818 self._initialized = False
793 self.consumed = False
819 self.consumed = False
794 # part data
820 # part data
795 self.id = None
821 self.id = None
796 self.type = None
822 self.type = None
797 self.mandatoryparams = None
823 self.mandatoryparams = None
798 self.advisoryparams = None
824 self.advisoryparams = None
799 self.params = None
825 self.params = None
800 self.mandatorykeys = ()
826 self.mandatorykeys = ()
801 self._payloadstream = None
827 self._payloadstream = None
802 self._readheader()
828 self._readheader()
803 self._mandatory = None
829 self._mandatory = None
804
830
805 def _fromheader(self, size):
831 def _fromheader(self, size):
806 """return the next <size> byte from the header"""
832 """return the next <size> byte from the header"""
807 offset = self._headeroffset
833 offset = self._headeroffset
808 data = self._headerdata[offset:(offset + size)]
834 data = self._headerdata[offset:(offset + size)]
809 self._headeroffset = offset + size
835 self._headeroffset = offset + size
810 return data
836 return data
811
837
812 def _unpackheader(self, format):
838 def _unpackheader(self, format):
813 """read given format from header
839 """read given format from header
814
840
815 This automatically compute the size of the format to read."""
841 This automatically compute the size of the format to read."""
816 data = self._fromheader(struct.calcsize(format))
842 data = self._fromheader(struct.calcsize(format))
817 return _unpack(format, data)
843 return _unpack(format, data)
818
844
819 def _initparams(self, mandatoryparams, advisoryparams):
845 def _initparams(self, mandatoryparams, advisoryparams):
820 """internal function to setup all logic related parameters"""
846 """internal function to setup all logic related parameters"""
821 # make it read only to prevent people touching it by mistake.
847 # make it read only to prevent people touching it by mistake.
822 self.mandatoryparams = tuple(mandatoryparams)
848 self.mandatoryparams = tuple(mandatoryparams)
823 self.advisoryparams = tuple(advisoryparams)
849 self.advisoryparams = tuple(advisoryparams)
824 # user friendly UI
850 # user friendly UI
825 self.params = dict(self.mandatoryparams)
851 self.params = dict(self.mandatoryparams)
826 self.params.update(dict(self.advisoryparams))
852 self.params.update(dict(self.advisoryparams))
827 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
853 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
828
854
829 def _readheader(self):
855 def _readheader(self):
830 """read the header and setup the object"""
856 """read the header and setup the object"""
831 typesize = self._unpackheader(_fparttypesize)[0]
857 typesize = self._unpackheader(_fparttypesize)[0]
832 self.type = self._fromheader(typesize)
858 self.type = self._fromheader(typesize)
833 self.ui.debug('part type: "%s"\n' % self.type)
859 self.ui.debug('part type: "%s"\n' % self.type)
834 self.id = self._unpackheader(_fpartid)[0]
860 self.id = self._unpackheader(_fpartid)[0]
835 self.ui.debug('part id: "%s"\n' % self.id)
861 self.ui.debug('part id: "%s"\n' % self.id)
836 # extract mandatory bit from type
862 # extract mandatory bit from type
837 self.mandatory = (self.type != self.type.lower())
863 self.mandatory = (self.type != self.type.lower())
838 self.type = self.type.lower()
864 self.type = self.type.lower()
839 ## reading parameters
865 ## reading parameters
840 # param count
866 # param count
841 mancount, advcount = self._unpackheader(_fpartparamcount)
867 mancount, advcount = self._unpackheader(_fpartparamcount)
842 self.ui.debug('part parameters: %i\n' % (mancount + advcount))
868 self.ui.debug('part parameters: %i\n' % (mancount + advcount))
843 # param size
869 # param size
844 fparamsizes = _makefpartparamsizes(mancount + advcount)
870 fparamsizes = _makefpartparamsizes(mancount + advcount)
845 paramsizes = self._unpackheader(fparamsizes)
871 paramsizes = self._unpackheader(fparamsizes)
846 # make it a list of couple again
872 # make it a list of couple again
847 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
873 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
848 # split mandatory from advisory
874 # split mandatory from advisory
849 mansizes = paramsizes[:mancount]
875 mansizes = paramsizes[:mancount]
850 advsizes = paramsizes[mancount:]
876 advsizes = paramsizes[mancount:]
851 # retrieve param value
877 # retrieve param value
852 manparams = []
878 manparams = []
853 for key, value in mansizes:
879 for key, value in mansizes:
854 manparams.append((self._fromheader(key), self._fromheader(value)))
880 manparams.append((self._fromheader(key), self._fromheader(value)))
855 advparams = []
881 advparams = []
856 for key, value in advsizes:
882 for key, value in advsizes:
857 advparams.append((self._fromheader(key), self._fromheader(value)))
883 advparams.append((self._fromheader(key), self._fromheader(value)))
858 self._initparams(manparams, advparams)
884 self._initparams(manparams, advparams)
859 ## part payload
885 ## part payload
860 def payloadchunks():
886 def payloadchunks():
861 payloadsize = self._unpack(_fpayloadsize)[0]
887 payloadsize = self._unpack(_fpayloadsize)[0]
862 self.ui.debug('payload chunk size: %i\n' % payloadsize)
888 self.ui.debug('payload chunk size: %i\n' % payloadsize)
863 while payloadsize:
889 while payloadsize:
864 if payloadsize == flaginterrupt:
890 if payloadsize == flaginterrupt:
865 # interruption detection, the handler will now read a
891 # interruption detection, the handler will now read a
866 # single part and process it.
892 # single part and process it.
867 interrupthandler(self.ui, self._fp)()
893 interrupthandler(self.ui, self._fp)()
868 elif payloadsize < 0:
894 elif payloadsize < 0:
869 msg = 'negative payload chunk size: %i' % payloadsize
895 msg = 'negative payload chunk size: %i' % payloadsize
870 raise error.BundleValueError(msg)
896 raise error.BundleValueError(msg)
871 else:
897 else:
872 yield self._readexact(payloadsize)
898 yield self._readexact(payloadsize)
873 payloadsize = self._unpack(_fpayloadsize)[0]
899 payloadsize = self._unpack(_fpayloadsize)[0]
874 self.ui.debug('payload chunk size: %i\n' % payloadsize)
900 self.ui.debug('payload chunk size: %i\n' % payloadsize)
875 self._payloadstream = util.chunkbuffer(payloadchunks())
901 self._payloadstream = util.chunkbuffer(payloadchunks())
876 # we read the data, tell it
902 # we read the data, tell it
877 self._initialized = True
903 self._initialized = True
878
904
879 def read(self, size=None):
905 def read(self, size=None):
880 """read payload data"""
906 """read payload data"""
881 if not self._initialized:
907 if not self._initialized:
882 self._readheader()
908 self._readheader()
883 if size is None:
909 if size is None:
884 data = self._payloadstream.read()
910 data = self._payloadstream.read()
885 else:
911 else:
886 data = self._payloadstream.read(size)
912 data = self._payloadstream.read(size)
887 if size is None or len(data) < size:
913 if size is None or len(data) < size:
888 self.consumed = True
914 self.consumed = True
889 return data
915 return data
890
916
891 capabilities = {'HG2Y': (),
917 capabilities = {'HG2Y': (),
892 'b2x:listkeys': (),
918 'b2x:listkeys': (),
893 'b2x:pushkey': (),
919 'b2x:pushkey': (),
894 'digests': tuple(sorted(util.DIGESTS.keys())),
920 'digests': tuple(sorted(util.DIGESTS.keys())),
895 'b2x:remote-changegroup': ('http', 'https'),
921 'b2x:remote-changegroup': ('http', 'https'),
896 }
922 }
897
923
898 def getrepocaps(repo, allowpushback=False):
924 def getrepocaps(repo, allowpushback=False):
899 """return the bundle2 capabilities for a given repo
925 """return the bundle2 capabilities for a given repo
900
926
901 Exists to allow extensions (like evolution) to mutate the capabilities.
927 Exists to allow extensions (like evolution) to mutate the capabilities.
902 """
928 """
903 caps = capabilities.copy()
929 caps = capabilities.copy()
904 caps['b2x:changegroup'] = tuple(sorted(changegroup.packermap.keys()))
930 caps['b2x:changegroup'] = tuple(sorted(changegroup.packermap.keys()))
905 if obsolete.isenabled(repo, obsolete.exchangeopt):
931 if obsolete.isenabled(repo, obsolete.exchangeopt):
906 supportedformat = tuple('V%i' % v for v in obsolete.formats)
932 supportedformat = tuple('V%i' % v for v in obsolete.formats)
907 caps['b2x:obsmarkers'] = supportedformat
933 caps['b2x:obsmarkers'] = supportedformat
908 if allowpushback:
934 if allowpushback:
909 caps['b2x:pushback'] = ()
935 caps['b2x:pushback'] = ()
910 return caps
936 return caps
911
937
912 def bundle2caps(remote):
938 def bundle2caps(remote):
913 """return the bundle capabilities of a peer as dict"""
939 """return the bundle capabilities of a peer as dict"""
914 raw = remote.capable('bundle2-exp')
940 raw = remote.capable('bundle2-exp')
915 if not raw and raw != '':
941 if not raw and raw != '':
916 return {}
942 return {}
917 capsblob = urllib.unquote(remote.capable('bundle2-exp'))
943 capsblob = urllib.unquote(remote.capable('bundle2-exp'))
918 return decodecaps(capsblob)
944 return decodecaps(capsblob)
919
945
920 def obsmarkersversion(caps):
946 def obsmarkersversion(caps):
921 """extract the list of supported obsmarkers versions from a bundle2caps dict
947 """extract the list of supported obsmarkers versions from a bundle2caps dict
922 """
948 """
923 obscaps = caps.get('b2x:obsmarkers', ())
949 obscaps = caps.get('b2x:obsmarkers', ())
924 return [int(c[1:]) for c in obscaps if c.startswith('V')]
950 return [int(c[1:]) for c in obscaps if c.startswith('V')]
925
951
926 @parthandler('b2x:changegroup', ('version',))
952 @parthandler('b2x:changegroup', ('version',))
927 def handlechangegroup(op, inpart):
953 def handlechangegroup(op, inpart):
928 """apply a changegroup part on the repo
954 """apply a changegroup part on the repo
929
955
930 This is a very early implementation that will massive rework before being
956 This is a very early implementation that will massive rework before being
931 inflicted to any end-user.
957 inflicted to any end-user.
932 """
958 """
933 # Make sure we trigger a transaction creation
959 # Make sure we trigger a transaction creation
934 #
960 #
935 # The addchangegroup function will get a transaction object by itself, but
961 # The addchangegroup function will get a transaction object by itself, but
936 # we need to make sure we trigger the creation of a transaction object used
962 # we need to make sure we trigger the creation of a transaction object used
937 # for the whole processing scope.
963 # for the whole processing scope.
938 op.gettransaction()
964 op.gettransaction()
939 unpackerversion = inpart.params.get('version', '01')
965 unpackerversion = inpart.params.get('version', '01')
940 # We should raise an appropriate exception here
966 # We should raise an appropriate exception here
941 unpacker = changegroup.packermap[unpackerversion][1]
967 unpacker = changegroup.packermap[unpackerversion][1]
942 cg = unpacker(inpart, 'UN')
968 cg = unpacker(inpart, 'UN')
943 # the source and url passed here are overwritten by the one contained in
969 # the source and url passed here are overwritten by the one contained in
944 # the transaction.hookargs argument. So 'bundle2' is a placeholder
970 # the transaction.hookargs argument. So 'bundle2' is a placeholder
945 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
971 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
946 op.records.add('changegroup', {'return': ret})
972 op.records.add('changegroup', {'return': ret})
947 if op.reply is not None:
973 if op.reply is not None:
948 # This is definitely not the final form of this
974 # This is definitely not the final form of this
949 # return. But one need to start somewhere.
975 # return. But one need to start somewhere.
950 part = op.reply.newpart('b2x:reply:changegroup', mandatory=False)
976 part = op.reply.newpart('b2x:reply:changegroup', mandatory=False)
951 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
977 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
952 part.addparam('return', '%i' % ret, mandatory=False)
978 part.addparam('return', '%i' % ret, mandatory=False)
953 assert not inpart.read()
979 assert not inpart.read()
954
980
955 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
981 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
956 ['digest:%s' % k for k in util.DIGESTS.keys()])
982 ['digest:%s' % k for k in util.DIGESTS.keys()])
957 @parthandler('b2x:remote-changegroup', _remotechangegroupparams)
983 @parthandler('b2x:remote-changegroup', _remotechangegroupparams)
958 def handleremotechangegroup(op, inpart):
984 def handleremotechangegroup(op, inpart):
959 """apply a bundle10 on the repo, given an url and validation information
985 """apply a bundle10 on the repo, given an url and validation information
960
986
961 All the information about the remote bundle to import are given as
987 All the information about the remote bundle to import are given as
962 parameters. The parameters include:
988 parameters. The parameters include:
963 - url: the url to the bundle10.
989 - url: the url to the bundle10.
964 - size: the bundle10 file size. It is used to validate what was
990 - size: the bundle10 file size. It is used to validate what was
965 retrieved by the client matches the server knowledge about the bundle.
991 retrieved by the client matches the server knowledge about the bundle.
966 - digests: a space separated list of the digest types provided as
992 - digests: a space separated list of the digest types provided as
967 parameters.
993 parameters.
968 - digest:<digest-type>: the hexadecimal representation of the digest with
994 - digest:<digest-type>: the hexadecimal representation of the digest with
969 that name. Like the size, it is used to validate what was retrieved by
995 that name. Like the size, it is used to validate what was retrieved by
970 the client matches what the server knows about the bundle.
996 the client matches what the server knows about the bundle.
971
997
972 When multiple digest types are given, all of them are checked.
998 When multiple digest types are given, all of them are checked.
973 """
999 """
974 try:
1000 try:
975 raw_url = inpart.params['url']
1001 raw_url = inpart.params['url']
976 except KeyError:
1002 except KeyError:
977 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1003 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'url')
978 parsed_url = util.url(raw_url)
1004 parsed_url = util.url(raw_url)
979 if parsed_url.scheme not in capabilities['b2x:remote-changegroup']:
1005 if parsed_url.scheme not in capabilities['b2x:remote-changegroup']:
980 raise util.Abort(_('remote-changegroup does not support %s urls') %
1006 raise util.Abort(_('remote-changegroup does not support %s urls') %
981 parsed_url.scheme)
1007 parsed_url.scheme)
982
1008
983 try:
1009 try:
984 size = int(inpart.params['size'])
1010 size = int(inpart.params['size'])
985 except ValueError:
1011 except ValueError:
986 raise util.Abort(_('remote-changegroup: invalid value for param "%s"')
1012 raise util.Abort(_('remote-changegroup: invalid value for param "%s"')
987 % 'size')
1013 % 'size')
988 except KeyError:
1014 except KeyError:
989 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1015 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'size')
990
1016
991 digests = {}
1017 digests = {}
992 for typ in inpart.params.get('digests', '').split():
1018 for typ in inpart.params.get('digests', '').split():
993 param = 'digest:%s' % typ
1019 param = 'digest:%s' % typ
994 try:
1020 try:
995 value = inpart.params[param]
1021 value = inpart.params[param]
996 except KeyError:
1022 except KeyError:
997 raise util.Abort(_('remote-changegroup: missing "%s" param') %
1023 raise util.Abort(_('remote-changegroup: missing "%s" param') %
998 param)
1024 param)
999 digests[typ] = value
1025 digests[typ] = value
1000
1026
1001 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1027 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1002
1028
1003 # Make sure we trigger a transaction creation
1029 # Make sure we trigger a transaction creation
1004 #
1030 #
1005 # The addchangegroup function will get a transaction object by itself, but
1031 # The addchangegroup function will get a transaction object by itself, but
1006 # we need to make sure we trigger the creation of a transaction object used
1032 # we need to make sure we trigger the creation of a transaction object used
1007 # for the whole processing scope.
1033 # for the whole processing scope.
1008 op.gettransaction()
1034 op.gettransaction()
1009 import exchange
1035 import exchange
1010 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1036 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1011 if not isinstance(cg, changegroup.cg1unpacker):
1037 if not isinstance(cg, changegroup.cg1unpacker):
1012 raise util.Abort(_('%s: not a bundle version 1.0') %
1038 raise util.Abort(_('%s: not a bundle version 1.0') %
1013 util.hidepassword(raw_url))
1039 util.hidepassword(raw_url))
1014 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
1040 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
1015 op.records.add('changegroup', {'return': ret})
1041 op.records.add('changegroup', {'return': ret})
1016 if op.reply is not None:
1042 if op.reply is not None:
1017 # This is definitely not the final form of this
1043 # This is definitely not the final form of this
1018 # return. But one need to start somewhere.
1044 # return. But one need to start somewhere.
1019 part = op.reply.newpart('b2x:reply:changegroup')
1045 part = op.reply.newpart('b2x:reply:changegroup')
1020 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1046 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1021 part.addparam('return', '%i' % ret, mandatory=False)
1047 part.addparam('return', '%i' % ret, mandatory=False)
1022 try:
1048 try:
1023 real_part.validate()
1049 real_part.validate()
1024 except util.Abort, e:
1050 except util.Abort, e:
1025 raise util.Abort(_('bundle at %s is corrupted:\n%s') %
1051 raise util.Abort(_('bundle at %s is corrupted:\n%s') %
1026 (util.hidepassword(raw_url), str(e)))
1052 (util.hidepassword(raw_url), str(e)))
1027 assert not inpart.read()
1053 assert not inpart.read()
1028
1054
1029 @parthandler('b2x:reply:changegroup', ('return', 'in-reply-to'))
1055 @parthandler('b2x:reply:changegroup', ('return', 'in-reply-to'))
1030 def handlereplychangegroup(op, inpart):
1056 def handlereplychangegroup(op, inpart):
1031 ret = int(inpart.params['return'])
1057 ret = int(inpart.params['return'])
1032 replyto = int(inpart.params['in-reply-to'])
1058 replyto = int(inpart.params['in-reply-to'])
1033 op.records.add('changegroup', {'return': ret}, replyto)
1059 op.records.add('changegroup', {'return': ret}, replyto)
1034
1060
1035 @parthandler('b2x:check:heads')
1061 @parthandler('b2x:check:heads')
1036 def handlecheckheads(op, inpart):
1062 def handlecheckheads(op, inpart):
1037 """check that head of the repo did not change
1063 """check that head of the repo did not change
1038
1064
1039 This is used to detect a push race when using unbundle.
1065 This is used to detect a push race when using unbundle.
1040 This replaces the "heads" argument of unbundle."""
1066 This replaces the "heads" argument of unbundle."""
1041 h = inpart.read(20)
1067 h = inpart.read(20)
1042 heads = []
1068 heads = []
1043 while len(h) == 20:
1069 while len(h) == 20:
1044 heads.append(h)
1070 heads.append(h)
1045 h = inpart.read(20)
1071 h = inpart.read(20)
1046 assert not h
1072 assert not h
1047 if heads != op.repo.heads():
1073 if heads != op.repo.heads():
1048 raise error.PushRaced('repository changed while pushing - '
1074 raise error.PushRaced('repository changed while pushing - '
1049 'please try again')
1075 'please try again')
1050
1076
1051 @parthandler('b2x:output')
1077 @parthandler('b2x:output')
1052 def handleoutput(op, inpart):
1078 def handleoutput(op, inpart):
1053 """forward output captured on the server to the client"""
1079 """forward output captured on the server to the client"""
1054 for line in inpart.read().splitlines():
1080 for line in inpart.read().splitlines():
1055 op.ui.write(('remote: %s\n' % line))
1081 op.ui.write(('remote: %s\n' % line))
1056
1082
1057 @parthandler('b2x:replycaps')
1083 @parthandler('b2x:replycaps')
1058 def handlereplycaps(op, inpart):
1084 def handlereplycaps(op, inpart):
1059 """Notify that a reply bundle should be created
1085 """Notify that a reply bundle should be created
1060
1086
1061 The payload contains the capabilities information for the reply"""
1087 The payload contains the capabilities information for the reply"""
1062 caps = decodecaps(inpart.read())
1088 caps = decodecaps(inpart.read())
1063 if op.reply is None:
1089 if op.reply is None:
1064 op.reply = bundle20(op.ui, caps)
1090 op.reply = bundle20(op.ui, caps)
1065
1091
1066 @parthandler('b2x:error:abort', ('message', 'hint'))
1092 @parthandler('b2x:error:abort', ('message', 'hint'))
1067 def handlereplycaps(op, inpart):
1093 def handlereplycaps(op, inpart):
1068 """Used to transmit abort error over the wire"""
1094 """Used to transmit abort error over the wire"""
1069 raise util.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
1095 raise util.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
1070
1096
1071 @parthandler('b2x:error:unsupportedcontent', ('parttype', 'params'))
1097 @parthandler('b2x:error:unsupportedcontent', ('parttype', 'params'))
1072 def handlereplycaps(op, inpart):
1098 def handlereplycaps(op, inpart):
1073 """Used to transmit unknown content error over the wire"""
1099 """Used to transmit unknown content error over the wire"""
1074 kwargs = {}
1100 kwargs = {}
1075 parttype = inpart.params.get('parttype')
1101 parttype = inpart.params.get('parttype')
1076 if parttype is not None:
1102 if parttype is not None:
1077 kwargs['parttype'] = parttype
1103 kwargs['parttype'] = parttype
1078 params = inpart.params.get('params')
1104 params = inpart.params.get('params')
1079 if params is not None:
1105 if params is not None:
1080 kwargs['params'] = params.split('\0')
1106 kwargs['params'] = params.split('\0')
1081
1107
1082 raise error.UnsupportedPartError(**kwargs)
1108 raise error.UnsupportedPartError(**kwargs)
1083
1109
1084 @parthandler('b2x:error:pushraced', ('message',))
1110 @parthandler('b2x:error:pushraced', ('message',))
1085 def handlereplycaps(op, inpart):
1111 def handlereplycaps(op, inpart):
1086 """Used to transmit push race error over the wire"""
1112 """Used to transmit push race error over the wire"""
1087 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1113 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1088
1114
1089 @parthandler('b2x:listkeys', ('namespace',))
1115 @parthandler('b2x:listkeys', ('namespace',))
1090 def handlelistkeys(op, inpart):
1116 def handlelistkeys(op, inpart):
1091 """retrieve pushkey namespace content stored in a bundle2"""
1117 """retrieve pushkey namespace content stored in a bundle2"""
1092 namespace = inpart.params['namespace']
1118 namespace = inpart.params['namespace']
1093 r = pushkey.decodekeys(inpart.read())
1119 r = pushkey.decodekeys(inpart.read())
1094 op.records.add('listkeys', (namespace, r))
1120 op.records.add('listkeys', (namespace, r))
1095
1121
1096 @parthandler('b2x:pushkey', ('namespace', 'key', 'old', 'new'))
1122 @parthandler('b2x:pushkey', ('namespace', 'key', 'old', 'new'))
1097 def handlepushkey(op, inpart):
1123 def handlepushkey(op, inpart):
1098 """process a pushkey request"""
1124 """process a pushkey request"""
1099 dec = pushkey.decode
1125 dec = pushkey.decode
1100 namespace = dec(inpart.params['namespace'])
1126 namespace = dec(inpart.params['namespace'])
1101 key = dec(inpart.params['key'])
1127 key = dec(inpart.params['key'])
1102 old = dec(inpart.params['old'])
1128 old = dec(inpart.params['old'])
1103 new = dec(inpart.params['new'])
1129 new = dec(inpart.params['new'])
1104 ret = op.repo.pushkey(namespace, key, old, new)
1130 ret = op.repo.pushkey(namespace, key, old, new)
1105 record = {'namespace': namespace,
1131 record = {'namespace': namespace,
1106 'key': key,
1132 'key': key,
1107 'old': old,
1133 'old': old,
1108 'new': new}
1134 'new': new}
1109 op.records.add('pushkey', record)
1135 op.records.add('pushkey', record)
1110 if op.reply is not None:
1136 if op.reply is not None:
1111 rpart = op.reply.newpart('b2x:reply:pushkey')
1137 rpart = op.reply.newpart('b2x:reply:pushkey')
1112 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1138 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1113 rpart.addparam('return', '%i' % ret, mandatory=False)
1139 rpart.addparam('return', '%i' % ret, mandatory=False)
1114
1140
1115 @parthandler('b2x:reply:pushkey', ('return', 'in-reply-to'))
1141 @parthandler('b2x:reply:pushkey', ('return', 'in-reply-to'))
1116 def handlepushkeyreply(op, inpart):
1142 def handlepushkeyreply(op, inpart):
1117 """retrieve the result of a pushkey request"""
1143 """retrieve the result of a pushkey request"""
1118 ret = int(inpart.params['return'])
1144 ret = int(inpart.params['return'])
1119 partid = int(inpart.params['in-reply-to'])
1145 partid = int(inpart.params['in-reply-to'])
1120 op.records.add('pushkey', {'return': ret}, partid)
1146 op.records.add('pushkey', {'return': ret}, partid)
1121
1147
1122 @parthandler('b2x:obsmarkers')
1148 @parthandler('b2x:obsmarkers')
1123 def handleobsmarker(op, inpart):
1149 def handleobsmarker(op, inpart):
1124 """add a stream of obsmarkers to the repo"""
1150 """add a stream of obsmarkers to the repo"""
1125 tr = op.gettransaction()
1151 tr = op.gettransaction()
1126 new = op.repo.obsstore.mergemarkers(tr, inpart.read())
1152 new = op.repo.obsstore.mergemarkers(tr, inpart.read())
1127 if new:
1153 if new:
1128 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1154 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1129 op.records.add('obsmarkers', {'new': new})
1155 op.records.add('obsmarkers', {'new': new})
1130 if op.reply is not None:
1156 if op.reply is not None:
1131 rpart = op.reply.newpart('b2x:reply:obsmarkers')
1157 rpart = op.reply.newpart('b2x:reply:obsmarkers')
1132 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1158 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1133 rpart.addparam('new', '%i' % new, mandatory=False)
1159 rpart.addparam('new', '%i' % new, mandatory=False)
1134
1160
1135
1161
1136 @parthandler('b2x:reply:obsmarkers', ('new', 'in-reply-to'))
1162 @parthandler('b2x:reply:obsmarkers', ('new', 'in-reply-to'))
1137 def handlepushkeyreply(op, inpart):
1163 def handlepushkeyreply(op, inpart):
1138 """retrieve the result of a pushkey request"""
1164 """retrieve the result of a pushkey request"""
1139 ret = int(inpart.params['new'])
1165 ret = int(inpart.params['new'])
1140 partid = int(inpart.params['in-reply-to'])
1166 partid = int(inpart.params['in-reply-to'])
1141 op.records.add('obsmarkers', {'new': ret}, partid)
1167 op.records.add('obsmarkers', {'new': ret}, partid)
General Comments 0
You need to be logged in to leave comments. Login now