##// END OF EJS Templates
bundle2: move handler validation out of processpart...
Durham Goode -
r34260:07e4170f default
parent child Browse files
Show More
@@ -1,1932 +1,1939 b''
1 # bundle2.py - generic container format to transmit arbitrary data.
1 # bundle2.py - generic container format to transmit arbitrary data.
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7 """Handling of the new bundle2 format
7 """Handling of the new bundle2 format
8
8
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 payloads in an application agnostic way. It consist in a sequence of "parts"
10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 that will be handed to and processed by the application layer.
11 that will be handed to and processed by the application layer.
12
12
13
13
14 General format architecture
14 General format architecture
15 ===========================
15 ===========================
16
16
17 The format is architectured as follow
17 The format is architectured as follow
18
18
19 - magic string
19 - magic string
20 - stream level parameters
20 - stream level parameters
21 - payload parts (any number)
21 - payload parts (any number)
22 - end of stream marker.
22 - end of stream marker.
23
23
24 the Binary format
24 the Binary format
25 ============================
25 ============================
26
26
27 All numbers are unsigned and big-endian.
27 All numbers are unsigned and big-endian.
28
28
29 stream level parameters
29 stream level parameters
30 ------------------------
30 ------------------------
31
31
32 Binary format is as follow
32 Binary format is as follow
33
33
34 :params size: int32
34 :params size: int32
35
35
36 The total number of Bytes used by the parameters
36 The total number of Bytes used by the parameters
37
37
38 :params value: arbitrary number of Bytes
38 :params value: arbitrary number of Bytes
39
39
40 A blob of `params size` containing the serialized version of all stream level
40 A blob of `params size` containing the serialized version of all stream level
41 parameters.
41 parameters.
42
42
43 The blob contains a space separated list of parameters. Parameters with value
43 The blob contains a space separated list of parameters. Parameters with value
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45
45
46 Empty name are obviously forbidden.
46 Empty name are obviously forbidden.
47
47
48 Name MUST start with a letter. If this first letter is lower case, the
48 Name MUST start with a letter. If this first letter is lower case, the
49 parameter is advisory and can be safely ignored. However when the first
49 parameter is advisory and can be safely ignored. However when the first
50 letter is capital, the parameter is mandatory and the bundling process MUST
50 letter is capital, the parameter is mandatory and the bundling process MUST
51 stop if he is not able to proceed it.
51 stop if he is not able to proceed it.
52
52
53 Stream parameters use a simple textual format for two main reasons:
53 Stream parameters use a simple textual format for two main reasons:
54
54
55 - Stream level parameters should remain simple and we want to discourage any
55 - Stream level parameters should remain simple and we want to discourage any
56 crazy usage.
56 crazy usage.
57 - Textual data allow easy human inspection of a bundle2 header in case of
57 - Textual data allow easy human inspection of a bundle2 header in case of
58 troubles.
58 troubles.
59
59
60 Any Applicative level options MUST go into a bundle2 part instead.
60 Any Applicative level options MUST go into a bundle2 part instead.
61
61
62 Payload part
62 Payload part
63 ------------------------
63 ------------------------
64
64
65 Binary format is as follow
65 Binary format is as follow
66
66
67 :header size: int32
67 :header size: int32
68
68
69 The total number of Bytes used by the part header. When the header is empty
69 The total number of Bytes used by the part header. When the header is empty
70 (size = 0) this is interpreted as the end of stream marker.
70 (size = 0) this is interpreted as the end of stream marker.
71
71
72 :header:
72 :header:
73
73
74 The header defines how to interpret the part. It contains two piece of
74 The header defines how to interpret the part. It contains two piece of
75 data: the part type, and the part parameters.
75 data: the part type, and the part parameters.
76
76
77 The part type is used to route an application level handler, that can
77 The part type is used to route an application level handler, that can
78 interpret payload.
78 interpret payload.
79
79
80 Part parameters are passed to the application level handler. They are
80 Part parameters are passed to the application level handler. They are
81 meant to convey information that will help the application level object to
81 meant to convey information that will help the application level object to
82 interpret the part payload.
82 interpret the part payload.
83
83
84 The binary format of the header is has follow
84 The binary format of the header is has follow
85
85
86 :typesize: (one byte)
86 :typesize: (one byte)
87
87
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89
89
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 to this part.
91 to this part.
92
92
93 :parameters:
93 :parameters:
94
94
95 Part's parameter may have arbitrary content, the binary structure is::
95 Part's parameter may have arbitrary content, the binary structure is::
96
96
97 <mandatory-count><advisory-count><param-sizes><param-data>
97 <mandatory-count><advisory-count><param-sizes><param-data>
98
98
99 :mandatory-count: 1 byte, number of mandatory parameters
99 :mandatory-count: 1 byte, number of mandatory parameters
100
100
101 :advisory-count: 1 byte, number of advisory parameters
101 :advisory-count: 1 byte, number of advisory parameters
102
102
103 :param-sizes:
103 :param-sizes:
104
104
105 N couple of bytes, where N is the total number of parameters. Each
105 N couple of bytes, where N is the total number of parameters. Each
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107
107
108 :param-data:
108 :param-data:
109
109
110 A blob of bytes from which each parameter key and value can be
110 A blob of bytes from which each parameter key and value can be
111 retrieved using the list of size couples stored in the previous
111 retrieved using the list of size couples stored in the previous
112 field.
112 field.
113
113
114 Mandatory parameters comes first, then the advisory ones.
114 Mandatory parameters comes first, then the advisory ones.
115
115
116 Each parameter's key MUST be unique within the part.
116 Each parameter's key MUST be unique within the part.
117
117
118 :payload:
118 :payload:
119
119
120 payload is a series of `<chunksize><chunkdata>`.
120 payload is a series of `<chunksize><chunkdata>`.
121
121
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124
124
125 The current implementation always produces either zero or one chunk.
125 The current implementation always produces either zero or one chunk.
126 This is an implementation limitation that will ultimately be lifted.
126 This is an implementation limitation that will ultimately be lifted.
127
127
128 `chunksize` can be negative to trigger special case processing. No such
128 `chunksize` can be negative to trigger special case processing. No such
129 processing is in place yet.
129 processing is in place yet.
130
130
131 Bundle processing
131 Bundle processing
132 ============================
132 ============================
133
133
134 Each part is processed in order using a "part handler". Handler are registered
134 Each part is processed in order using a "part handler". Handler are registered
135 for a certain part type.
135 for a certain part type.
136
136
137 The matching of a part to its handler is case insensitive. The case of the
137 The matching of a part to its handler is case insensitive. The case of the
138 part type is used to know if a part is mandatory or advisory. If the Part type
138 part type is used to know if a part is mandatory or advisory. If the Part type
139 contains any uppercase char it is considered mandatory. When no handler is
139 contains any uppercase char it is considered mandatory. When no handler is
140 known for a Mandatory part, the process is aborted and an exception is raised.
140 known for a Mandatory part, the process is aborted and an exception is raised.
141 If the part is advisory and no handler is known, the part is ignored. When the
141 If the part is advisory and no handler is known, the part is ignored. When the
142 process is aborted, the full bundle is still read from the stream to keep the
142 process is aborted, the full bundle is still read from the stream to keep the
143 channel usable. But none of the part read from an abort are processed. In the
143 channel usable. But none of the part read from an abort are processed. In the
144 future, dropping the stream may become an option for channel we do not care to
144 future, dropping the stream may become an option for channel we do not care to
145 preserve.
145 preserve.
146 """
146 """
147
147
148 from __future__ import absolute_import, division
148 from __future__ import absolute_import, division
149
149
150 import errno
150 import errno
151 import re
151 import re
152 import string
152 import string
153 import struct
153 import struct
154 import sys
154 import sys
155
155
156 from .i18n import _
156 from .i18n import _
157 from . import (
157 from . import (
158 changegroup,
158 changegroup,
159 error,
159 error,
160 obsolete,
160 obsolete,
161 phases,
161 phases,
162 pushkey,
162 pushkey,
163 pycompat,
163 pycompat,
164 tags,
164 tags,
165 url,
165 url,
166 util,
166 util,
167 )
167 )
168
168
169 urlerr = util.urlerr
169 urlerr = util.urlerr
170 urlreq = util.urlreq
170 urlreq = util.urlreq
171
171
172 _pack = struct.pack
172 _pack = struct.pack
173 _unpack = struct.unpack
173 _unpack = struct.unpack
174
174
175 _fstreamparamsize = '>i'
175 _fstreamparamsize = '>i'
176 _fpartheadersize = '>i'
176 _fpartheadersize = '>i'
177 _fparttypesize = '>B'
177 _fparttypesize = '>B'
178 _fpartid = '>I'
178 _fpartid = '>I'
179 _fpayloadsize = '>i'
179 _fpayloadsize = '>i'
180 _fpartparamcount = '>BB'
180 _fpartparamcount = '>BB'
181
181
182 _fphasesentry = '>i20s'
182 _fphasesentry = '>i20s'
183
183
184 preferedchunksize = 4096
184 preferedchunksize = 4096
185
185
186 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
186 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
187
187
188 def outdebug(ui, message):
188 def outdebug(ui, message):
189 """debug regarding output stream (bundling)"""
189 """debug regarding output stream (bundling)"""
190 if ui.configbool('devel', 'bundle2.debug'):
190 if ui.configbool('devel', 'bundle2.debug'):
191 ui.debug('bundle2-output: %s\n' % message)
191 ui.debug('bundle2-output: %s\n' % message)
192
192
193 def indebug(ui, message):
193 def indebug(ui, message):
194 """debug on input stream (unbundling)"""
194 """debug on input stream (unbundling)"""
195 if ui.configbool('devel', 'bundle2.debug'):
195 if ui.configbool('devel', 'bundle2.debug'):
196 ui.debug('bundle2-input: %s\n' % message)
196 ui.debug('bundle2-input: %s\n' % message)
197
197
198 def validateparttype(parttype):
198 def validateparttype(parttype):
199 """raise ValueError if a parttype contains invalid character"""
199 """raise ValueError if a parttype contains invalid character"""
200 if _parttypeforbidden.search(parttype):
200 if _parttypeforbidden.search(parttype):
201 raise ValueError(parttype)
201 raise ValueError(parttype)
202
202
203 def _makefpartparamsizes(nbparams):
203 def _makefpartparamsizes(nbparams):
204 """return a struct format to read part parameter sizes
204 """return a struct format to read part parameter sizes
205
205
206 The number parameters is variable so we need to build that format
206 The number parameters is variable so we need to build that format
207 dynamically.
207 dynamically.
208 """
208 """
209 return '>'+('BB'*nbparams)
209 return '>'+('BB'*nbparams)
210
210
211 parthandlermapping = {}
211 parthandlermapping = {}
212
212
213 def parthandler(parttype, params=()):
213 def parthandler(parttype, params=()):
214 """decorator that register a function as a bundle2 part handler
214 """decorator that register a function as a bundle2 part handler
215
215
216 eg::
216 eg::
217
217
218 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
218 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
219 def myparttypehandler(...):
219 def myparttypehandler(...):
220 '''process a part of type "my part".'''
220 '''process a part of type "my part".'''
221 ...
221 ...
222 """
222 """
223 validateparttype(parttype)
223 validateparttype(parttype)
224 def _decorator(func):
224 def _decorator(func):
225 lparttype = parttype.lower() # enforce lower case matching.
225 lparttype = parttype.lower() # enforce lower case matching.
226 assert lparttype not in parthandlermapping
226 assert lparttype not in parthandlermapping
227 parthandlermapping[lparttype] = func
227 parthandlermapping[lparttype] = func
228 func.params = frozenset(params)
228 func.params = frozenset(params)
229 return func
229 return func
230 return _decorator
230 return _decorator
231
231
232 class unbundlerecords(object):
232 class unbundlerecords(object):
233 """keep record of what happens during and unbundle
233 """keep record of what happens during and unbundle
234
234
235 New records are added using `records.add('cat', obj)`. Where 'cat' is a
235 New records are added using `records.add('cat', obj)`. Where 'cat' is a
236 category of record and obj is an arbitrary object.
236 category of record and obj is an arbitrary object.
237
237
238 `records['cat']` will return all entries of this category 'cat'.
238 `records['cat']` will return all entries of this category 'cat'.
239
239
240 Iterating on the object itself will yield `('category', obj)` tuples
240 Iterating on the object itself will yield `('category', obj)` tuples
241 for all entries.
241 for all entries.
242
242
243 All iterations happens in chronological order.
243 All iterations happens in chronological order.
244 """
244 """
245
245
246 def __init__(self):
246 def __init__(self):
247 self._categories = {}
247 self._categories = {}
248 self._sequences = []
248 self._sequences = []
249 self._replies = {}
249 self._replies = {}
250
250
251 def add(self, category, entry, inreplyto=None):
251 def add(self, category, entry, inreplyto=None):
252 """add a new record of a given category.
252 """add a new record of a given category.
253
253
254 The entry can then be retrieved in the list returned by
254 The entry can then be retrieved in the list returned by
255 self['category']."""
255 self['category']."""
256 self._categories.setdefault(category, []).append(entry)
256 self._categories.setdefault(category, []).append(entry)
257 self._sequences.append((category, entry))
257 self._sequences.append((category, entry))
258 if inreplyto is not None:
258 if inreplyto is not None:
259 self.getreplies(inreplyto).add(category, entry)
259 self.getreplies(inreplyto).add(category, entry)
260
260
261 def getreplies(self, partid):
261 def getreplies(self, partid):
262 """get the records that are replies to a specific part"""
262 """get the records that are replies to a specific part"""
263 return self._replies.setdefault(partid, unbundlerecords())
263 return self._replies.setdefault(partid, unbundlerecords())
264
264
265 def __getitem__(self, cat):
265 def __getitem__(self, cat):
266 return tuple(self._categories.get(cat, ()))
266 return tuple(self._categories.get(cat, ()))
267
267
268 def __iter__(self):
268 def __iter__(self):
269 return iter(self._sequences)
269 return iter(self._sequences)
270
270
271 def __len__(self):
271 def __len__(self):
272 return len(self._sequences)
272 return len(self._sequences)
273
273
274 def __nonzero__(self):
274 def __nonzero__(self):
275 return bool(self._sequences)
275 return bool(self._sequences)
276
276
277 __bool__ = __nonzero__
277 __bool__ = __nonzero__
278
278
279 class bundleoperation(object):
279 class bundleoperation(object):
280 """an object that represents a single bundling process
280 """an object that represents a single bundling process
281
281
282 Its purpose is to carry unbundle-related objects and states.
282 Its purpose is to carry unbundle-related objects and states.
283
283
284 A new object should be created at the beginning of each bundle processing.
284 A new object should be created at the beginning of each bundle processing.
285 The object is to be returned by the processing function.
285 The object is to be returned by the processing function.
286
286
287 The object has very little content now it will ultimately contain:
287 The object has very little content now it will ultimately contain:
288 * an access to the repo the bundle is applied to,
288 * an access to the repo the bundle is applied to,
289 * a ui object,
289 * a ui object,
290 * a way to retrieve a transaction to add changes to the repo,
290 * a way to retrieve a transaction to add changes to the repo,
291 * a way to record the result of processing each part,
291 * a way to record the result of processing each part,
292 * a way to construct a bundle response when applicable.
292 * a way to construct a bundle response when applicable.
293 """
293 """
294
294
295 def __init__(self, repo, transactiongetter, captureoutput=True):
295 def __init__(self, repo, transactiongetter, captureoutput=True):
296 self.repo = repo
296 self.repo = repo
297 self.ui = repo.ui
297 self.ui = repo.ui
298 self.records = unbundlerecords()
298 self.records = unbundlerecords()
299 self.reply = None
299 self.reply = None
300 self.captureoutput = captureoutput
300 self.captureoutput = captureoutput
301 self.hookargs = {}
301 self.hookargs = {}
302 self._gettransaction = transactiongetter
302 self._gettransaction = transactiongetter
303
303
304 def gettransaction(self):
304 def gettransaction(self):
305 transaction = self._gettransaction()
305 transaction = self._gettransaction()
306
306
307 if self.hookargs:
307 if self.hookargs:
308 # the ones added to the transaction supercede those added
308 # the ones added to the transaction supercede those added
309 # to the operation.
309 # to the operation.
310 self.hookargs.update(transaction.hookargs)
310 self.hookargs.update(transaction.hookargs)
311 transaction.hookargs = self.hookargs
311 transaction.hookargs = self.hookargs
312
312
313 # mark the hookargs as flushed. further attempts to add to
313 # mark the hookargs as flushed. further attempts to add to
314 # hookargs will result in an abort.
314 # hookargs will result in an abort.
315 self.hookargs = None
315 self.hookargs = None
316
316
317 return transaction
317 return transaction
318
318
319 def addhookargs(self, hookargs):
319 def addhookargs(self, hookargs):
320 if self.hookargs is None:
320 if self.hookargs is None:
321 raise error.ProgrammingError('attempted to add hookargs to '
321 raise error.ProgrammingError('attempted to add hookargs to '
322 'operation after transaction started')
322 'operation after transaction started')
323 self.hookargs.update(hookargs)
323 self.hookargs.update(hookargs)
324
324
325 class TransactionUnavailable(RuntimeError):
325 class TransactionUnavailable(RuntimeError):
326 pass
326 pass
327
327
328 def _notransaction():
328 def _notransaction():
329 """default method to get a transaction while processing a bundle
329 """default method to get a transaction while processing a bundle
330
330
331 Raise an exception to highlight the fact that no transaction was expected
331 Raise an exception to highlight the fact that no transaction was expected
332 to be created"""
332 to be created"""
333 raise TransactionUnavailable()
333 raise TransactionUnavailable()
334
334
335 def applybundle(repo, unbundler, tr, source=None, url=None, **kwargs):
335 def applybundle(repo, unbundler, tr, source=None, url=None, **kwargs):
336 # transform me into unbundler.apply() as soon as the freeze is lifted
336 # transform me into unbundler.apply() as soon as the freeze is lifted
337 if isinstance(unbundler, unbundle20):
337 if isinstance(unbundler, unbundle20):
338 tr.hookargs['bundle2'] = '1'
338 tr.hookargs['bundle2'] = '1'
339 if source is not None and 'source' not in tr.hookargs:
339 if source is not None and 'source' not in tr.hookargs:
340 tr.hookargs['source'] = source
340 tr.hookargs['source'] = source
341 if url is not None and 'url' not in tr.hookargs:
341 if url is not None and 'url' not in tr.hookargs:
342 tr.hookargs['url'] = url
342 tr.hookargs['url'] = url
343 return processbundle(repo, unbundler, lambda: tr)
343 return processbundle(repo, unbundler, lambda: tr)
344 else:
344 else:
345 # the transactiongetter won't be used, but we might as well set it
345 # the transactiongetter won't be used, but we might as well set it
346 op = bundleoperation(repo, lambda: tr)
346 op = bundleoperation(repo, lambda: tr)
347 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
347 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
348 return op
348 return op
349
349
350 class partiterator(object):
350 class partiterator(object):
351 def __init__(self, repo, op, unbundler):
351 def __init__(self, repo, op, unbundler):
352 self.repo = repo
352 self.repo = repo
353 self.op = op
353 self.op = op
354 self.unbundler = unbundler
354 self.unbundler = unbundler
355 self.iterator = None
355 self.iterator = None
356 self.count = 0
356 self.count = 0
357 self.current = None
357 self.current = None
358
358
359 def __enter__(self):
359 def __enter__(self):
360 def func():
360 def func():
361 itr = enumerate(self.unbundler.iterparts())
361 itr = enumerate(self.unbundler.iterparts())
362 for count, p in itr:
362 for count, p in itr:
363 self.count = count
363 self.count = count
364 self.current = p
364 self.current = p
365 yield p
365 yield p
366 p.seek(0, 2)
366 p.seek(0, 2)
367 self.current = None
367 self.current = None
368 self.iterator = func()
368 self.iterator = func()
369 return self.iterator
369 return self.iterator
370
370
371 def __exit__(self, type, exc, tb):
371 def __exit__(self, type, exc, tb):
372 if not self.iterator:
372 if not self.iterator:
373 return
373 return
374
374
375 if exc:
375 if exc:
376 # If exiting or interrupted, do not attempt to seek the stream in
376 # If exiting or interrupted, do not attempt to seek the stream in
377 # the finally block below. This makes abort faster.
377 # the finally block below. This makes abort faster.
378 if (self.current and
378 if (self.current and
379 not isinstance(exc, (SystemExit, KeyboardInterrupt))):
379 not isinstance(exc, (SystemExit, KeyboardInterrupt))):
380 # consume the part content to not corrupt the stream.
380 # consume the part content to not corrupt the stream.
381 self.current.seek(0, 2)
381 self.current.seek(0, 2)
382
382
383 # Any exceptions seeking to the end of the bundle at this point are
383 # Any exceptions seeking to the end of the bundle at this point are
384 # almost certainly related to the underlying stream being bad.
384 # almost certainly related to the underlying stream being bad.
385 # And, chances are that the exception we're handling is related to
385 # And, chances are that the exception we're handling is related to
386 # getting in that bad state. So, we swallow the seeking error and
386 # getting in that bad state. So, we swallow the seeking error and
387 # re-raise the original error.
387 # re-raise the original error.
388 seekerror = False
388 seekerror = False
389 try:
389 try:
390 for part in self.iterator:
390 for part in self.iterator:
391 # consume the bundle content
391 # consume the bundle content
392 part.seek(0, 2)
392 part.seek(0, 2)
393 except Exception:
393 except Exception:
394 seekerror = True
394 seekerror = True
395
395
396 # Small hack to let caller code distinguish exceptions from bundle2
396 # Small hack to let caller code distinguish exceptions from bundle2
397 # processing from processing the old format. This is mostly needed
397 # processing from processing the old format. This is mostly needed
398 # to handle different return codes to unbundle according to the type
398 # to handle different return codes to unbundle according to the type
399 # of bundle. We should probably clean up or drop this return code
399 # of bundle. We should probably clean up or drop this return code
400 # craziness in a future version.
400 # craziness in a future version.
401 exc.duringunbundle2 = True
401 exc.duringunbundle2 = True
402 salvaged = []
402 salvaged = []
403 replycaps = None
403 replycaps = None
404 if self.op.reply is not None:
404 if self.op.reply is not None:
405 salvaged = self.op.reply.salvageoutput()
405 salvaged = self.op.reply.salvageoutput()
406 replycaps = self.op.reply.capabilities
406 replycaps = self.op.reply.capabilities
407 exc._replycaps = replycaps
407 exc._replycaps = replycaps
408 exc._bundle2salvagedoutput = salvaged
408 exc._bundle2salvagedoutput = salvaged
409
409
410 # Re-raising from a variable loses the original stack. So only use
410 # Re-raising from a variable loses the original stack. So only use
411 # that form if we need to.
411 # that form if we need to.
412 if seekerror:
412 if seekerror:
413 raise exc
413 raise exc
414
414
415 self.repo.ui.debug('bundle2-input-bundle: %i parts total\n' %
415 self.repo.ui.debug('bundle2-input-bundle: %i parts total\n' %
416 self.count)
416 self.count)
417
417
418 def processbundle(repo, unbundler, transactiongetter=None, op=None):
418 def processbundle(repo, unbundler, transactiongetter=None, op=None):
419 """This function process a bundle, apply effect to/from a repo
419 """This function process a bundle, apply effect to/from a repo
420
420
421 It iterates over each part then searches for and uses the proper handling
421 It iterates over each part then searches for and uses the proper handling
422 code to process the part. Parts are processed in order.
422 code to process the part. Parts are processed in order.
423
423
424 Unknown Mandatory part will abort the process.
424 Unknown Mandatory part will abort the process.
425
425
426 It is temporarily possible to provide a prebuilt bundleoperation to the
426 It is temporarily possible to provide a prebuilt bundleoperation to the
427 function. This is used to ensure output is properly propagated in case of
427 function. This is used to ensure output is properly propagated in case of
428 an error during the unbundling. This output capturing part will likely be
428 an error during the unbundling. This output capturing part will likely be
429 reworked and this ability will probably go away in the process.
429 reworked and this ability will probably go away in the process.
430 """
430 """
431 if op is None:
431 if op is None:
432 if transactiongetter is None:
432 if transactiongetter is None:
433 transactiongetter = _notransaction
433 transactiongetter = _notransaction
434 op = bundleoperation(repo, transactiongetter)
434 op = bundleoperation(repo, transactiongetter)
435 # todo:
435 # todo:
436 # - replace this is a init function soon.
436 # - replace this is a init function soon.
437 # - exception catching
437 # - exception catching
438 unbundler.params
438 unbundler.params
439 if repo.ui.debugflag:
439 if repo.ui.debugflag:
440 msg = ['bundle2-input-bundle:']
440 msg = ['bundle2-input-bundle:']
441 if unbundler.params:
441 if unbundler.params:
442 msg.append(' %i params' % len(unbundler.params))
442 msg.append(' %i params' % len(unbundler.params))
443 if op._gettransaction is None or op._gettransaction is _notransaction:
443 if op._gettransaction is None or op._gettransaction is _notransaction:
444 msg.append(' no-transaction')
444 msg.append(' no-transaction')
445 else:
445 else:
446 msg.append(' with-transaction')
446 msg.append(' with-transaction')
447 msg.append('\n')
447 msg.append('\n')
448 repo.ui.debug(''.join(msg))
448 repo.ui.debug(''.join(msg))
449
449
450 with partiterator(repo, op, unbundler) as parts:
450 with partiterator(repo, op, unbundler) as parts:
451 for part in parts:
451 for part in parts:
452 _processpart(op, part)
452 _processpart(op, part)
453
453
454 return op
454 return op
455
455
456 def _processchangegroup(op, cg, tr, source, url, **kwargs):
456 def _processchangegroup(op, cg, tr, source, url, **kwargs):
457 ret = cg.apply(op.repo, tr, source, url, **kwargs)
457 ret = cg.apply(op.repo, tr, source, url, **kwargs)
458 op.records.add('changegroup', {
458 op.records.add('changegroup', {
459 'return': ret,
459 'return': ret,
460 })
460 })
461 return ret
461 return ret
462
462
463 def _gethandler(op, part):
464 status = 'unknown' # used by debug output
465 try:
466 handler = parthandlermapping.get(part.type)
467 if handler is None:
468 status = 'unsupported-type'
469 raise error.BundleUnknownFeatureError(parttype=part.type)
470 indebug(op.ui, 'found a handler for part %r' % part.type)
471 unknownparams = part.mandatorykeys - handler.params
472 if unknownparams:
473 unknownparams = list(unknownparams)
474 unknownparams.sort()
475 status = 'unsupported-params (%s)' % unknownparams
476 raise error.BundleUnknownFeatureError(parttype=part.type,
477 params=unknownparams)
478 status = 'supported'
479 except error.BundleUnknownFeatureError as exc:
480 if part.mandatory: # mandatory parts
481 raise
482 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
483 return # skip to part processing
484 finally:
485 if op.ui.debugflag:
486 msg = ['bundle2-input-part: "%s"' % part.type]
487 if not part.mandatory:
488 msg.append(' (advisory)')
489 nbmp = len(part.mandatorykeys)
490 nbap = len(part.params) - nbmp
491 if nbmp or nbap:
492 msg.append(' (params:')
493 if nbmp:
494 msg.append(' %i mandatory' % nbmp)
495 if nbap:
496 msg.append(' %i advisory' % nbmp)
497 msg.append(')')
498 msg.append(' %s\n' % status)
499 op.ui.debug(''.join(msg))
500
501 return handler
502
463 def _processpart(op, part):
503 def _processpart(op, part):
464 """process a single part from a bundle
504 """process a single part from a bundle
465
505
466 The part is guaranteed to have been fully consumed when the function exits
506 The part is guaranteed to have been fully consumed when the function exits
467 (even if an exception is raised)."""
507 (even if an exception is raised)."""
468 status = 'unknown' # used by debug output
469 try:
508 try:
470 try:
509 handler = _gethandler(op, part)
471 handler = parthandlermapping.get(part.type)
510 if handler is None:
472 if handler is None:
511 return
473 status = 'unsupported-type'
474 raise error.BundleUnknownFeatureError(parttype=part.type)
475 indebug(op.ui, 'found a handler for part %r' % part.type)
476 unknownparams = part.mandatorykeys - handler.params
477 if unknownparams:
478 unknownparams = list(unknownparams)
479 unknownparams.sort()
480 status = 'unsupported-params (%s)' % unknownparams
481 raise error.BundleUnknownFeatureError(parttype=part.type,
482 params=unknownparams)
483 status = 'supported'
484 except error.BundleUnknownFeatureError as exc:
485 if part.mandatory: # mandatory parts
486 raise
487 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
488 return # skip to part processing
489 finally:
490 if op.ui.debugflag:
491 msg = ['bundle2-input-part: "%s"' % part.type]
492 if not part.mandatory:
493 msg.append(' (advisory)')
494 nbmp = len(part.mandatorykeys)
495 nbap = len(part.params) - nbmp
496 if nbmp or nbap:
497 msg.append(' (params:')
498 if nbmp:
499 msg.append(' %i mandatory' % nbmp)
500 if nbap:
501 msg.append(' %i advisory' % nbmp)
502 msg.append(')')
503 msg.append(' %s\n' % status)
504 op.ui.debug(''.join(msg))
505
512
506 # handler is called outside the above try block so that we don't
513 # handler is called outside the above try block so that we don't
507 # risk catching KeyErrors from anything other than the
514 # risk catching KeyErrors from anything other than the
508 # parthandlermapping lookup (any KeyError raised by handler()
515 # parthandlermapping lookup (any KeyError raised by handler()
509 # itself represents a defect of a different variety).
516 # itself represents a defect of a different variety).
510 output = None
517 output = None
511 if op.captureoutput and op.reply is not None:
518 if op.captureoutput and op.reply is not None:
512 op.ui.pushbuffer(error=True, subproc=True)
519 op.ui.pushbuffer(error=True, subproc=True)
513 output = ''
520 output = ''
514 try:
521 try:
515 handler(op, part)
522 handler(op, part)
516 finally:
523 finally:
517 if output is not None:
524 if output is not None:
518 output = op.ui.popbuffer()
525 output = op.ui.popbuffer()
519 if output:
526 if output:
520 outpart = op.reply.newpart('output', data=output,
527 outpart = op.reply.newpart('output', data=output,
521 mandatory=False)
528 mandatory=False)
522 outpart.addparam(
529 outpart.addparam(
523 'in-reply-to', pycompat.bytestr(part.id), mandatory=False)
530 'in-reply-to', pycompat.bytestr(part.id), mandatory=False)
524 finally:
531 finally:
525 pass
532 pass
526
533
527
534
528 def decodecaps(blob):
535 def decodecaps(blob):
529 """decode a bundle2 caps bytes blob into a dictionary
536 """decode a bundle2 caps bytes blob into a dictionary
530
537
531 The blob is a list of capabilities (one per line)
538 The blob is a list of capabilities (one per line)
532 Capabilities may have values using a line of the form::
539 Capabilities may have values using a line of the form::
533
540
534 capability=value1,value2,value3
541 capability=value1,value2,value3
535
542
536 The values are always a list."""
543 The values are always a list."""
537 caps = {}
544 caps = {}
538 for line in blob.splitlines():
545 for line in blob.splitlines():
539 if not line:
546 if not line:
540 continue
547 continue
541 if '=' not in line:
548 if '=' not in line:
542 key, vals = line, ()
549 key, vals = line, ()
543 else:
550 else:
544 key, vals = line.split('=', 1)
551 key, vals = line.split('=', 1)
545 vals = vals.split(',')
552 vals = vals.split(',')
546 key = urlreq.unquote(key)
553 key = urlreq.unquote(key)
547 vals = [urlreq.unquote(v) for v in vals]
554 vals = [urlreq.unquote(v) for v in vals]
548 caps[key] = vals
555 caps[key] = vals
549 return caps
556 return caps
550
557
551 def encodecaps(caps):
558 def encodecaps(caps):
552 """encode a bundle2 caps dictionary into a bytes blob"""
559 """encode a bundle2 caps dictionary into a bytes blob"""
553 chunks = []
560 chunks = []
554 for ca in sorted(caps):
561 for ca in sorted(caps):
555 vals = caps[ca]
562 vals = caps[ca]
556 ca = urlreq.quote(ca)
563 ca = urlreq.quote(ca)
557 vals = [urlreq.quote(v) for v in vals]
564 vals = [urlreq.quote(v) for v in vals]
558 if vals:
565 if vals:
559 ca = "%s=%s" % (ca, ','.join(vals))
566 ca = "%s=%s" % (ca, ','.join(vals))
560 chunks.append(ca)
567 chunks.append(ca)
561 return '\n'.join(chunks)
568 return '\n'.join(chunks)
562
569
563 bundletypes = {
570 bundletypes = {
564 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
571 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
565 # since the unification ssh accepts a header but there
572 # since the unification ssh accepts a header but there
566 # is no capability signaling it.
573 # is no capability signaling it.
567 "HG20": (), # special-cased below
574 "HG20": (), # special-cased below
568 "HG10UN": ("HG10UN", 'UN'),
575 "HG10UN": ("HG10UN", 'UN'),
569 "HG10BZ": ("HG10", 'BZ'),
576 "HG10BZ": ("HG10", 'BZ'),
570 "HG10GZ": ("HG10GZ", 'GZ'),
577 "HG10GZ": ("HG10GZ", 'GZ'),
571 }
578 }
572
579
573 # hgweb uses this list to communicate its preferred type
580 # hgweb uses this list to communicate its preferred type
574 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
581 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
575
582
576 class bundle20(object):
583 class bundle20(object):
577 """represent an outgoing bundle2 container
584 """represent an outgoing bundle2 container
578
585
579 Use the `addparam` method to add stream level parameter. and `newpart` to
586 Use the `addparam` method to add stream level parameter. and `newpart` to
580 populate it. Then call `getchunks` to retrieve all the binary chunks of
587 populate it. Then call `getchunks` to retrieve all the binary chunks of
581 data that compose the bundle2 container."""
588 data that compose the bundle2 container."""
582
589
583 _magicstring = 'HG20'
590 _magicstring = 'HG20'
584
591
585 def __init__(self, ui, capabilities=()):
592 def __init__(self, ui, capabilities=()):
586 self.ui = ui
593 self.ui = ui
587 self._params = []
594 self._params = []
588 self._parts = []
595 self._parts = []
589 self.capabilities = dict(capabilities)
596 self.capabilities = dict(capabilities)
590 self._compengine = util.compengines.forbundletype('UN')
597 self._compengine = util.compengines.forbundletype('UN')
591 self._compopts = None
598 self._compopts = None
592
599
593 def setcompression(self, alg, compopts=None):
600 def setcompression(self, alg, compopts=None):
594 """setup core part compression to <alg>"""
601 """setup core part compression to <alg>"""
595 if alg in (None, 'UN'):
602 if alg in (None, 'UN'):
596 return
603 return
597 assert not any(n.lower() == 'compression' for n, v in self._params)
604 assert not any(n.lower() == 'compression' for n, v in self._params)
598 self.addparam('Compression', alg)
605 self.addparam('Compression', alg)
599 self._compengine = util.compengines.forbundletype(alg)
606 self._compengine = util.compengines.forbundletype(alg)
600 self._compopts = compopts
607 self._compopts = compopts
601
608
602 @property
609 @property
603 def nbparts(self):
610 def nbparts(self):
604 """total number of parts added to the bundler"""
611 """total number of parts added to the bundler"""
605 return len(self._parts)
612 return len(self._parts)
606
613
607 # methods used to defines the bundle2 content
614 # methods used to defines the bundle2 content
608 def addparam(self, name, value=None):
615 def addparam(self, name, value=None):
609 """add a stream level parameter"""
616 """add a stream level parameter"""
610 if not name:
617 if not name:
611 raise ValueError('empty parameter name')
618 raise ValueError('empty parameter name')
612 if name[0] not in pycompat.bytestr(string.ascii_letters):
619 if name[0] not in pycompat.bytestr(string.ascii_letters):
613 raise ValueError('non letter first character: %r' % name)
620 raise ValueError('non letter first character: %r' % name)
614 self._params.append((name, value))
621 self._params.append((name, value))
615
622
616 def addpart(self, part):
623 def addpart(self, part):
617 """add a new part to the bundle2 container
624 """add a new part to the bundle2 container
618
625
619 Parts contains the actual applicative payload."""
626 Parts contains the actual applicative payload."""
620 assert part.id is None
627 assert part.id is None
621 part.id = len(self._parts) # very cheap counter
628 part.id = len(self._parts) # very cheap counter
622 self._parts.append(part)
629 self._parts.append(part)
623
630
624 def newpart(self, typeid, *args, **kwargs):
631 def newpart(self, typeid, *args, **kwargs):
625 """create a new part and add it to the containers
632 """create a new part and add it to the containers
626
633
627 As the part is directly added to the containers. For now, this means
634 As the part is directly added to the containers. For now, this means
628 that any failure to properly initialize the part after calling
635 that any failure to properly initialize the part after calling
629 ``newpart`` should result in a failure of the whole bundling process.
636 ``newpart`` should result in a failure of the whole bundling process.
630
637
631 You can still fall back to manually create and add if you need better
638 You can still fall back to manually create and add if you need better
632 control."""
639 control."""
633 part = bundlepart(typeid, *args, **kwargs)
640 part = bundlepart(typeid, *args, **kwargs)
634 self.addpart(part)
641 self.addpart(part)
635 return part
642 return part
636
643
637 # methods used to generate the bundle2 stream
644 # methods used to generate the bundle2 stream
638 def getchunks(self):
645 def getchunks(self):
639 if self.ui.debugflag:
646 if self.ui.debugflag:
640 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
647 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
641 if self._params:
648 if self._params:
642 msg.append(' (%i params)' % len(self._params))
649 msg.append(' (%i params)' % len(self._params))
643 msg.append(' %i parts total\n' % len(self._parts))
650 msg.append(' %i parts total\n' % len(self._parts))
644 self.ui.debug(''.join(msg))
651 self.ui.debug(''.join(msg))
645 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
652 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
646 yield self._magicstring
653 yield self._magicstring
647 param = self._paramchunk()
654 param = self._paramchunk()
648 outdebug(self.ui, 'bundle parameter: %s' % param)
655 outdebug(self.ui, 'bundle parameter: %s' % param)
649 yield _pack(_fstreamparamsize, len(param))
656 yield _pack(_fstreamparamsize, len(param))
650 if param:
657 if param:
651 yield param
658 yield param
652 for chunk in self._compengine.compressstream(self._getcorechunk(),
659 for chunk in self._compengine.compressstream(self._getcorechunk(),
653 self._compopts):
660 self._compopts):
654 yield chunk
661 yield chunk
655
662
656 def _paramchunk(self):
663 def _paramchunk(self):
657 """return a encoded version of all stream parameters"""
664 """return a encoded version of all stream parameters"""
658 blocks = []
665 blocks = []
659 for par, value in self._params:
666 for par, value in self._params:
660 par = urlreq.quote(par)
667 par = urlreq.quote(par)
661 if value is not None:
668 if value is not None:
662 value = urlreq.quote(value)
669 value = urlreq.quote(value)
663 par = '%s=%s' % (par, value)
670 par = '%s=%s' % (par, value)
664 blocks.append(par)
671 blocks.append(par)
665 return ' '.join(blocks)
672 return ' '.join(blocks)
666
673
667 def _getcorechunk(self):
674 def _getcorechunk(self):
668 """yield chunk for the core part of the bundle
675 """yield chunk for the core part of the bundle
669
676
670 (all but headers and parameters)"""
677 (all but headers and parameters)"""
671 outdebug(self.ui, 'start of parts')
678 outdebug(self.ui, 'start of parts')
672 for part in self._parts:
679 for part in self._parts:
673 outdebug(self.ui, 'bundle part: "%s"' % part.type)
680 outdebug(self.ui, 'bundle part: "%s"' % part.type)
674 for chunk in part.getchunks(ui=self.ui):
681 for chunk in part.getchunks(ui=self.ui):
675 yield chunk
682 yield chunk
676 outdebug(self.ui, 'end of bundle')
683 outdebug(self.ui, 'end of bundle')
677 yield _pack(_fpartheadersize, 0)
684 yield _pack(_fpartheadersize, 0)
678
685
679
686
680 def salvageoutput(self):
687 def salvageoutput(self):
681 """return a list with a copy of all output parts in the bundle
688 """return a list with a copy of all output parts in the bundle
682
689
683 This is meant to be used during error handling to make sure we preserve
690 This is meant to be used during error handling to make sure we preserve
684 server output"""
691 server output"""
685 salvaged = []
692 salvaged = []
686 for part in self._parts:
693 for part in self._parts:
687 if part.type.startswith('output'):
694 if part.type.startswith('output'):
688 salvaged.append(part.copy())
695 salvaged.append(part.copy())
689 return salvaged
696 return salvaged
690
697
691
698
692 class unpackermixin(object):
699 class unpackermixin(object):
693 """A mixin to extract bytes and struct data from a stream"""
700 """A mixin to extract bytes and struct data from a stream"""
694
701
695 def __init__(self, fp):
702 def __init__(self, fp):
696 self._fp = fp
703 self._fp = fp
697
704
698 def _unpack(self, format):
705 def _unpack(self, format):
699 """unpack this struct format from the stream
706 """unpack this struct format from the stream
700
707
701 This method is meant for internal usage by the bundle2 protocol only.
708 This method is meant for internal usage by the bundle2 protocol only.
702 They directly manipulate the low level stream including bundle2 level
709 They directly manipulate the low level stream including bundle2 level
703 instruction.
710 instruction.
704
711
705 Do not use it to implement higher-level logic or methods."""
712 Do not use it to implement higher-level logic or methods."""
706 data = self._readexact(struct.calcsize(format))
713 data = self._readexact(struct.calcsize(format))
707 return _unpack(format, data)
714 return _unpack(format, data)
708
715
709 def _readexact(self, size):
716 def _readexact(self, size):
710 """read exactly <size> bytes from the stream
717 """read exactly <size> bytes from the stream
711
718
712 This method is meant for internal usage by the bundle2 protocol only.
719 This method is meant for internal usage by the bundle2 protocol only.
713 They directly manipulate the low level stream including bundle2 level
720 They directly manipulate the low level stream including bundle2 level
714 instruction.
721 instruction.
715
722
716 Do not use it to implement higher-level logic or methods."""
723 Do not use it to implement higher-level logic or methods."""
717 return changegroup.readexactly(self._fp, size)
724 return changegroup.readexactly(self._fp, size)
718
725
719 def getunbundler(ui, fp, magicstring=None):
726 def getunbundler(ui, fp, magicstring=None):
720 """return a valid unbundler object for a given magicstring"""
727 """return a valid unbundler object for a given magicstring"""
721 if magicstring is None:
728 if magicstring is None:
722 magicstring = changegroup.readexactly(fp, 4)
729 magicstring = changegroup.readexactly(fp, 4)
723 magic, version = magicstring[0:2], magicstring[2:4]
730 magic, version = magicstring[0:2], magicstring[2:4]
724 if magic != 'HG':
731 if magic != 'HG':
725 ui.debug(
732 ui.debug(
726 "error: invalid magic: %r (version %r), should be 'HG'\n"
733 "error: invalid magic: %r (version %r), should be 'HG'\n"
727 % (magic, version))
734 % (magic, version))
728 raise error.Abort(_('not a Mercurial bundle'))
735 raise error.Abort(_('not a Mercurial bundle'))
729 unbundlerclass = formatmap.get(version)
736 unbundlerclass = formatmap.get(version)
730 if unbundlerclass is None:
737 if unbundlerclass is None:
731 raise error.Abort(_('unknown bundle version %s') % version)
738 raise error.Abort(_('unknown bundle version %s') % version)
732 unbundler = unbundlerclass(ui, fp)
739 unbundler = unbundlerclass(ui, fp)
733 indebug(ui, 'start processing of %s stream' % magicstring)
740 indebug(ui, 'start processing of %s stream' % magicstring)
734 return unbundler
741 return unbundler
735
742
736 class unbundle20(unpackermixin):
743 class unbundle20(unpackermixin):
737 """interpret a bundle2 stream
744 """interpret a bundle2 stream
738
745
739 This class is fed with a binary stream and yields parts through its
746 This class is fed with a binary stream and yields parts through its
740 `iterparts` methods."""
747 `iterparts` methods."""
741
748
742 _magicstring = 'HG20'
749 _magicstring = 'HG20'
743
750
744 def __init__(self, ui, fp):
751 def __init__(self, ui, fp):
745 """If header is specified, we do not read it out of the stream."""
752 """If header is specified, we do not read it out of the stream."""
746 self.ui = ui
753 self.ui = ui
747 self._compengine = util.compengines.forbundletype('UN')
754 self._compengine = util.compengines.forbundletype('UN')
748 self._compressed = None
755 self._compressed = None
749 super(unbundle20, self).__init__(fp)
756 super(unbundle20, self).__init__(fp)
750
757
751 @util.propertycache
758 @util.propertycache
752 def params(self):
759 def params(self):
753 """dictionary of stream level parameters"""
760 """dictionary of stream level parameters"""
754 indebug(self.ui, 'reading bundle2 stream parameters')
761 indebug(self.ui, 'reading bundle2 stream parameters')
755 params = {}
762 params = {}
756 paramssize = self._unpack(_fstreamparamsize)[0]
763 paramssize = self._unpack(_fstreamparamsize)[0]
757 if paramssize < 0:
764 if paramssize < 0:
758 raise error.BundleValueError('negative bundle param size: %i'
765 raise error.BundleValueError('negative bundle param size: %i'
759 % paramssize)
766 % paramssize)
760 if paramssize:
767 if paramssize:
761 params = self._readexact(paramssize)
768 params = self._readexact(paramssize)
762 params = self._processallparams(params)
769 params = self._processallparams(params)
763 return params
770 return params
764
771
765 def _processallparams(self, paramsblock):
772 def _processallparams(self, paramsblock):
766 """"""
773 """"""
767 params = util.sortdict()
774 params = util.sortdict()
768 for p in paramsblock.split(' '):
775 for p in paramsblock.split(' '):
769 p = p.split('=', 1)
776 p = p.split('=', 1)
770 p = [urlreq.unquote(i) for i in p]
777 p = [urlreq.unquote(i) for i in p]
771 if len(p) < 2:
778 if len(p) < 2:
772 p.append(None)
779 p.append(None)
773 self._processparam(*p)
780 self._processparam(*p)
774 params[p[0]] = p[1]
781 params[p[0]] = p[1]
775 return params
782 return params
776
783
777
784
778 def _processparam(self, name, value):
785 def _processparam(self, name, value):
779 """process a parameter, applying its effect if needed
786 """process a parameter, applying its effect if needed
780
787
781 Parameter starting with a lower case letter are advisory and will be
788 Parameter starting with a lower case letter are advisory and will be
782 ignored when unknown. Those starting with an upper case letter are
789 ignored when unknown. Those starting with an upper case letter are
783 mandatory and will this function will raise a KeyError when unknown.
790 mandatory and will this function will raise a KeyError when unknown.
784
791
785 Note: no option are currently supported. Any input will be either
792 Note: no option are currently supported. Any input will be either
786 ignored or failing.
793 ignored or failing.
787 """
794 """
788 if not name:
795 if not name:
789 raise ValueError('empty parameter name')
796 raise ValueError('empty parameter name')
790 if name[0] not in pycompat.bytestr(string.ascii_letters):
797 if name[0] not in pycompat.bytestr(string.ascii_letters):
791 raise ValueError('non letter first character: %r' % name)
798 raise ValueError('non letter first character: %r' % name)
792 try:
799 try:
793 handler = b2streamparamsmap[name.lower()]
800 handler = b2streamparamsmap[name.lower()]
794 except KeyError:
801 except KeyError:
795 if name[0].islower():
802 if name[0].islower():
796 indebug(self.ui, "ignoring unknown parameter %r" % name)
803 indebug(self.ui, "ignoring unknown parameter %r" % name)
797 else:
804 else:
798 raise error.BundleUnknownFeatureError(params=(name,))
805 raise error.BundleUnknownFeatureError(params=(name,))
799 else:
806 else:
800 handler(self, name, value)
807 handler(self, name, value)
801
808
802 def _forwardchunks(self):
809 def _forwardchunks(self):
803 """utility to transfer a bundle2 as binary
810 """utility to transfer a bundle2 as binary
804
811
805 This is made necessary by the fact the 'getbundle' command over 'ssh'
812 This is made necessary by the fact the 'getbundle' command over 'ssh'
806 have no way to know then the reply end, relying on the bundle to be
813 have no way to know then the reply end, relying on the bundle to be
807 interpreted to know its end. This is terrible and we are sorry, but we
814 interpreted to know its end. This is terrible and we are sorry, but we
808 needed to move forward to get general delta enabled.
815 needed to move forward to get general delta enabled.
809 """
816 """
810 yield self._magicstring
817 yield self._magicstring
811 assert 'params' not in vars(self)
818 assert 'params' not in vars(self)
812 paramssize = self._unpack(_fstreamparamsize)[0]
819 paramssize = self._unpack(_fstreamparamsize)[0]
813 if paramssize < 0:
820 if paramssize < 0:
814 raise error.BundleValueError('negative bundle param size: %i'
821 raise error.BundleValueError('negative bundle param size: %i'
815 % paramssize)
822 % paramssize)
816 yield _pack(_fstreamparamsize, paramssize)
823 yield _pack(_fstreamparamsize, paramssize)
817 if paramssize:
824 if paramssize:
818 params = self._readexact(paramssize)
825 params = self._readexact(paramssize)
819 self._processallparams(params)
826 self._processallparams(params)
820 yield params
827 yield params
821 assert self._compengine.bundletype == 'UN'
828 assert self._compengine.bundletype == 'UN'
822 # From there, payload might need to be decompressed
829 # From there, payload might need to be decompressed
823 self._fp = self._compengine.decompressorreader(self._fp)
830 self._fp = self._compengine.decompressorreader(self._fp)
824 emptycount = 0
831 emptycount = 0
825 while emptycount < 2:
832 while emptycount < 2:
826 # so we can brainlessly loop
833 # so we can brainlessly loop
827 assert _fpartheadersize == _fpayloadsize
834 assert _fpartheadersize == _fpayloadsize
828 size = self._unpack(_fpartheadersize)[0]
835 size = self._unpack(_fpartheadersize)[0]
829 yield _pack(_fpartheadersize, size)
836 yield _pack(_fpartheadersize, size)
830 if size:
837 if size:
831 emptycount = 0
838 emptycount = 0
832 else:
839 else:
833 emptycount += 1
840 emptycount += 1
834 continue
841 continue
835 if size == flaginterrupt:
842 if size == flaginterrupt:
836 continue
843 continue
837 elif size < 0:
844 elif size < 0:
838 raise error.BundleValueError('negative chunk size: %i')
845 raise error.BundleValueError('negative chunk size: %i')
839 yield self._readexact(size)
846 yield self._readexact(size)
840
847
841
848
842 def iterparts(self):
849 def iterparts(self):
843 """yield all parts contained in the stream"""
850 """yield all parts contained in the stream"""
844 # make sure param have been loaded
851 # make sure param have been loaded
845 self.params
852 self.params
846 # From there, payload need to be decompressed
853 # From there, payload need to be decompressed
847 self._fp = self._compengine.decompressorreader(self._fp)
854 self._fp = self._compengine.decompressorreader(self._fp)
848 indebug(self.ui, 'start extraction of bundle2 parts')
855 indebug(self.ui, 'start extraction of bundle2 parts')
849 headerblock = self._readpartheader()
856 headerblock = self._readpartheader()
850 while headerblock is not None:
857 while headerblock is not None:
851 part = unbundlepart(self.ui, headerblock, self._fp)
858 part = unbundlepart(self.ui, headerblock, self._fp)
852 yield part
859 yield part
853 # Seek to the end of the part to force it's consumption so the next
860 # Seek to the end of the part to force it's consumption so the next
854 # part can be read. But then seek back to the beginning so the
861 # part can be read. But then seek back to the beginning so the
855 # code consuming this generator has a part that starts at 0.
862 # code consuming this generator has a part that starts at 0.
856 part.seek(0, 2)
863 part.seek(0, 2)
857 part.seek(0)
864 part.seek(0)
858 headerblock = self._readpartheader()
865 headerblock = self._readpartheader()
859 indebug(self.ui, 'end of bundle2 stream')
866 indebug(self.ui, 'end of bundle2 stream')
860
867
861 def _readpartheader(self):
868 def _readpartheader(self):
862 """reads a part header size and return the bytes blob
869 """reads a part header size and return the bytes blob
863
870
864 returns None if empty"""
871 returns None if empty"""
865 headersize = self._unpack(_fpartheadersize)[0]
872 headersize = self._unpack(_fpartheadersize)[0]
866 if headersize < 0:
873 if headersize < 0:
867 raise error.BundleValueError('negative part header size: %i'
874 raise error.BundleValueError('negative part header size: %i'
868 % headersize)
875 % headersize)
869 indebug(self.ui, 'part header size: %i' % headersize)
876 indebug(self.ui, 'part header size: %i' % headersize)
870 if headersize:
877 if headersize:
871 return self._readexact(headersize)
878 return self._readexact(headersize)
872 return None
879 return None
873
880
874 def compressed(self):
881 def compressed(self):
875 self.params # load params
882 self.params # load params
876 return self._compressed
883 return self._compressed
877
884
878 def close(self):
885 def close(self):
879 """close underlying file"""
886 """close underlying file"""
880 if util.safehasattr(self._fp, 'close'):
887 if util.safehasattr(self._fp, 'close'):
881 return self._fp.close()
888 return self._fp.close()
882
889
883 formatmap = {'20': unbundle20}
890 formatmap = {'20': unbundle20}
884
891
885 b2streamparamsmap = {}
892 b2streamparamsmap = {}
886
893
887 def b2streamparamhandler(name):
894 def b2streamparamhandler(name):
888 """register a handler for a stream level parameter"""
895 """register a handler for a stream level parameter"""
889 def decorator(func):
896 def decorator(func):
890 assert name not in formatmap
897 assert name not in formatmap
891 b2streamparamsmap[name] = func
898 b2streamparamsmap[name] = func
892 return func
899 return func
893 return decorator
900 return decorator
894
901
895 @b2streamparamhandler('compression')
902 @b2streamparamhandler('compression')
896 def processcompression(unbundler, param, value):
903 def processcompression(unbundler, param, value):
897 """read compression parameter and install payload decompression"""
904 """read compression parameter and install payload decompression"""
898 if value not in util.compengines.supportedbundletypes:
905 if value not in util.compengines.supportedbundletypes:
899 raise error.BundleUnknownFeatureError(params=(param,),
906 raise error.BundleUnknownFeatureError(params=(param,),
900 values=(value,))
907 values=(value,))
901 unbundler._compengine = util.compengines.forbundletype(value)
908 unbundler._compengine = util.compengines.forbundletype(value)
902 if value is not None:
909 if value is not None:
903 unbundler._compressed = True
910 unbundler._compressed = True
904
911
905 class bundlepart(object):
912 class bundlepart(object):
906 """A bundle2 part contains application level payload
913 """A bundle2 part contains application level payload
907
914
908 The part `type` is used to route the part to the application level
915 The part `type` is used to route the part to the application level
909 handler.
916 handler.
910
917
911 The part payload is contained in ``part.data``. It could be raw bytes or a
918 The part payload is contained in ``part.data``. It could be raw bytes or a
912 generator of byte chunks.
919 generator of byte chunks.
913
920
914 You can add parameters to the part using the ``addparam`` method.
921 You can add parameters to the part using the ``addparam`` method.
915 Parameters can be either mandatory (default) or advisory. Remote side
922 Parameters can be either mandatory (default) or advisory. Remote side
916 should be able to safely ignore the advisory ones.
923 should be able to safely ignore the advisory ones.
917
924
918 Both data and parameters cannot be modified after the generation has begun.
925 Both data and parameters cannot be modified after the generation has begun.
919 """
926 """
920
927
921 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
928 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
922 data='', mandatory=True):
929 data='', mandatory=True):
923 validateparttype(parttype)
930 validateparttype(parttype)
924 self.id = None
931 self.id = None
925 self.type = parttype
932 self.type = parttype
926 self._data = data
933 self._data = data
927 self._mandatoryparams = list(mandatoryparams)
934 self._mandatoryparams = list(mandatoryparams)
928 self._advisoryparams = list(advisoryparams)
935 self._advisoryparams = list(advisoryparams)
929 # checking for duplicated entries
936 # checking for duplicated entries
930 self._seenparams = set()
937 self._seenparams = set()
931 for pname, __ in self._mandatoryparams + self._advisoryparams:
938 for pname, __ in self._mandatoryparams + self._advisoryparams:
932 if pname in self._seenparams:
939 if pname in self._seenparams:
933 raise error.ProgrammingError('duplicated params: %s' % pname)
940 raise error.ProgrammingError('duplicated params: %s' % pname)
934 self._seenparams.add(pname)
941 self._seenparams.add(pname)
935 # status of the part's generation:
942 # status of the part's generation:
936 # - None: not started,
943 # - None: not started,
937 # - False: currently generated,
944 # - False: currently generated,
938 # - True: generation done.
945 # - True: generation done.
939 self._generated = None
946 self._generated = None
940 self.mandatory = mandatory
947 self.mandatory = mandatory
941
948
942 def __repr__(self):
949 def __repr__(self):
943 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
950 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
944 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
951 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
945 % (cls, id(self), self.id, self.type, self.mandatory))
952 % (cls, id(self), self.id, self.type, self.mandatory))
946
953
947 def copy(self):
954 def copy(self):
948 """return a copy of the part
955 """return a copy of the part
949
956
950 The new part have the very same content but no partid assigned yet.
957 The new part have the very same content but no partid assigned yet.
951 Parts with generated data cannot be copied."""
958 Parts with generated data cannot be copied."""
952 assert not util.safehasattr(self.data, 'next')
959 assert not util.safehasattr(self.data, 'next')
953 return self.__class__(self.type, self._mandatoryparams,
960 return self.__class__(self.type, self._mandatoryparams,
954 self._advisoryparams, self._data, self.mandatory)
961 self._advisoryparams, self._data, self.mandatory)
955
962
956 # methods used to defines the part content
963 # methods used to defines the part content
957 @property
964 @property
958 def data(self):
965 def data(self):
959 return self._data
966 return self._data
960
967
961 @data.setter
968 @data.setter
962 def data(self, data):
969 def data(self, data):
963 if self._generated is not None:
970 if self._generated is not None:
964 raise error.ReadOnlyPartError('part is being generated')
971 raise error.ReadOnlyPartError('part is being generated')
965 self._data = data
972 self._data = data
966
973
967 @property
974 @property
968 def mandatoryparams(self):
975 def mandatoryparams(self):
969 # make it an immutable tuple to force people through ``addparam``
976 # make it an immutable tuple to force people through ``addparam``
970 return tuple(self._mandatoryparams)
977 return tuple(self._mandatoryparams)
971
978
972 @property
979 @property
973 def advisoryparams(self):
980 def advisoryparams(self):
974 # make it an immutable tuple to force people through ``addparam``
981 # make it an immutable tuple to force people through ``addparam``
975 return tuple(self._advisoryparams)
982 return tuple(self._advisoryparams)
976
983
977 def addparam(self, name, value='', mandatory=True):
984 def addparam(self, name, value='', mandatory=True):
978 """add a parameter to the part
985 """add a parameter to the part
979
986
980 If 'mandatory' is set to True, the remote handler must claim support
987 If 'mandatory' is set to True, the remote handler must claim support
981 for this parameter or the unbundling will be aborted.
988 for this parameter or the unbundling will be aborted.
982
989
983 The 'name' and 'value' cannot exceed 255 bytes each.
990 The 'name' and 'value' cannot exceed 255 bytes each.
984 """
991 """
985 if self._generated is not None:
992 if self._generated is not None:
986 raise error.ReadOnlyPartError('part is being generated')
993 raise error.ReadOnlyPartError('part is being generated')
987 if name in self._seenparams:
994 if name in self._seenparams:
988 raise ValueError('duplicated params: %s' % name)
995 raise ValueError('duplicated params: %s' % name)
989 self._seenparams.add(name)
996 self._seenparams.add(name)
990 params = self._advisoryparams
997 params = self._advisoryparams
991 if mandatory:
998 if mandatory:
992 params = self._mandatoryparams
999 params = self._mandatoryparams
993 params.append((name, value))
1000 params.append((name, value))
994
1001
995 # methods used to generates the bundle2 stream
1002 # methods used to generates the bundle2 stream
996 def getchunks(self, ui):
1003 def getchunks(self, ui):
997 if self._generated is not None:
1004 if self._generated is not None:
998 raise error.ProgrammingError('part can only be consumed once')
1005 raise error.ProgrammingError('part can only be consumed once')
999 self._generated = False
1006 self._generated = False
1000
1007
1001 if ui.debugflag:
1008 if ui.debugflag:
1002 msg = ['bundle2-output-part: "%s"' % self.type]
1009 msg = ['bundle2-output-part: "%s"' % self.type]
1003 if not self.mandatory:
1010 if not self.mandatory:
1004 msg.append(' (advisory)')
1011 msg.append(' (advisory)')
1005 nbmp = len(self.mandatoryparams)
1012 nbmp = len(self.mandatoryparams)
1006 nbap = len(self.advisoryparams)
1013 nbap = len(self.advisoryparams)
1007 if nbmp or nbap:
1014 if nbmp or nbap:
1008 msg.append(' (params:')
1015 msg.append(' (params:')
1009 if nbmp:
1016 if nbmp:
1010 msg.append(' %i mandatory' % nbmp)
1017 msg.append(' %i mandatory' % nbmp)
1011 if nbap:
1018 if nbap:
1012 msg.append(' %i advisory' % nbmp)
1019 msg.append(' %i advisory' % nbmp)
1013 msg.append(')')
1020 msg.append(')')
1014 if not self.data:
1021 if not self.data:
1015 msg.append(' empty payload')
1022 msg.append(' empty payload')
1016 elif util.safehasattr(self.data, 'next'):
1023 elif util.safehasattr(self.data, 'next'):
1017 msg.append(' streamed payload')
1024 msg.append(' streamed payload')
1018 else:
1025 else:
1019 msg.append(' %i bytes payload' % len(self.data))
1026 msg.append(' %i bytes payload' % len(self.data))
1020 msg.append('\n')
1027 msg.append('\n')
1021 ui.debug(''.join(msg))
1028 ui.debug(''.join(msg))
1022
1029
1023 #### header
1030 #### header
1024 if self.mandatory:
1031 if self.mandatory:
1025 parttype = self.type.upper()
1032 parttype = self.type.upper()
1026 else:
1033 else:
1027 parttype = self.type.lower()
1034 parttype = self.type.lower()
1028 outdebug(ui, 'part %s: "%s"' % (pycompat.bytestr(self.id), parttype))
1035 outdebug(ui, 'part %s: "%s"' % (pycompat.bytestr(self.id), parttype))
1029 ## parttype
1036 ## parttype
1030 header = [_pack(_fparttypesize, len(parttype)),
1037 header = [_pack(_fparttypesize, len(parttype)),
1031 parttype, _pack(_fpartid, self.id),
1038 parttype, _pack(_fpartid, self.id),
1032 ]
1039 ]
1033 ## parameters
1040 ## parameters
1034 # count
1041 # count
1035 manpar = self.mandatoryparams
1042 manpar = self.mandatoryparams
1036 advpar = self.advisoryparams
1043 advpar = self.advisoryparams
1037 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
1044 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
1038 # size
1045 # size
1039 parsizes = []
1046 parsizes = []
1040 for key, value in manpar:
1047 for key, value in manpar:
1041 parsizes.append(len(key))
1048 parsizes.append(len(key))
1042 parsizes.append(len(value))
1049 parsizes.append(len(value))
1043 for key, value in advpar:
1050 for key, value in advpar:
1044 parsizes.append(len(key))
1051 parsizes.append(len(key))
1045 parsizes.append(len(value))
1052 parsizes.append(len(value))
1046 paramsizes = _pack(_makefpartparamsizes(len(parsizes) // 2), *parsizes)
1053 paramsizes = _pack(_makefpartparamsizes(len(parsizes) // 2), *parsizes)
1047 header.append(paramsizes)
1054 header.append(paramsizes)
1048 # key, value
1055 # key, value
1049 for key, value in manpar:
1056 for key, value in manpar:
1050 header.append(key)
1057 header.append(key)
1051 header.append(value)
1058 header.append(value)
1052 for key, value in advpar:
1059 for key, value in advpar:
1053 header.append(key)
1060 header.append(key)
1054 header.append(value)
1061 header.append(value)
1055 ## finalize header
1062 ## finalize header
1056 try:
1063 try:
1057 headerchunk = ''.join(header)
1064 headerchunk = ''.join(header)
1058 except TypeError:
1065 except TypeError:
1059 raise TypeError(r'Found a non-bytes trying to '
1066 raise TypeError(r'Found a non-bytes trying to '
1060 r'build bundle part header: %r' % header)
1067 r'build bundle part header: %r' % header)
1061 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
1068 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
1062 yield _pack(_fpartheadersize, len(headerchunk))
1069 yield _pack(_fpartheadersize, len(headerchunk))
1063 yield headerchunk
1070 yield headerchunk
1064 ## payload
1071 ## payload
1065 try:
1072 try:
1066 for chunk in self._payloadchunks():
1073 for chunk in self._payloadchunks():
1067 outdebug(ui, 'payload chunk size: %i' % len(chunk))
1074 outdebug(ui, 'payload chunk size: %i' % len(chunk))
1068 yield _pack(_fpayloadsize, len(chunk))
1075 yield _pack(_fpayloadsize, len(chunk))
1069 yield chunk
1076 yield chunk
1070 except GeneratorExit:
1077 except GeneratorExit:
1071 # GeneratorExit means that nobody is listening for our
1078 # GeneratorExit means that nobody is listening for our
1072 # results anyway, so just bail quickly rather than trying
1079 # results anyway, so just bail quickly rather than trying
1073 # to produce an error part.
1080 # to produce an error part.
1074 ui.debug('bundle2-generatorexit\n')
1081 ui.debug('bundle2-generatorexit\n')
1075 raise
1082 raise
1076 except BaseException as exc:
1083 except BaseException as exc:
1077 bexc = util.forcebytestr(exc)
1084 bexc = util.forcebytestr(exc)
1078 # backup exception data for later
1085 # backup exception data for later
1079 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
1086 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
1080 % bexc)
1087 % bexc)
1081 tb = sys.exc_info()[2]
1088 tb = sys.exc_info()[2]
1082 msg = 'unexpected error: %s' % bexc
1089 msg = 'unexpected error: %s' % bexc
1083 interpart = bundlepart('error:abort', [('message', msg)],
1090 interpart = bundlepart('error:abort', [('message', msg)],
1084 mandatory=False)
1091 mandatory=False)
1085 interpart.id = 0
1092 interpart.id = 0
1086 yield _pack(_fpayloadsize, -1)
1093 yield _pack(_fpayloadsize, -1)
1087 for chunk in interpart.getchunks(ui=ui):
1094 for chunk in interpart.getchunks(ui=ui):
1088 yield chunk
1095 yield chunk
1089 outdebug(ui, 'closing payload chunk')
1096 outdebug(ui, 'closing payload chunk')
1090 # abort current part payload
1097 # abort current part payload
1091 yield _pack(_fpayloadsize, 0)
1098 yield _pack(_fpayloadsize, 0)
1092 pycompat.raisewithtb(exc, tb)
1099 pycompat.raisewithtb(exc, tb)
1093 # end of payload
1100 # end of payload
1094 outdebug(ui, 'closing payload chunk')
1101 outdebug(ui, 'closing payload chunk')
1095 yield _pack(_fpayloadsize, 0)
1102 yield _pack(_fpayloadsize, 0)
1096 self._generated = True
1103 self._generated = True
1097
1104
1098 def _payloadchunks(self):
1105 def _payloadchunks(self):
1099 """yield chunks of a the part payload
1106 """yield chunks of a the part payload
1100
1107
1101 Exists to handle the different methods to provide data to a part."""
1108 Exists to handle the different methods to provide data to a part."""
1102 # we only support fixed size data now.
1109 # we only support fixed size data now.
1103 # This will be improved in the future.
1110 # This will be improved in the future.
1104 if (util.safehasattr(self.data, 'next')
1111 if (util.safehasattr(self.data, 'next')
1105 or util.safehasattr(self.data, '__next__')):
1112 or util.safehasattr(self.data, '__next__')):
1106 buff = util.chunkbuffer(self.data)
1113 buff = util.chunkbuffer(self.data)
1107 chunk = buff.read(preferedchunksize)
1114 chunk = buff.read(preferedchunksize)
1108 while chunk:
1115 while chunk:
1109 yield chunk
1116 yield chunk
1110 chunk = buff.read(preferedchunksize)
1117 chunk = buff.read(preferedchunksize)
1111 elif len(self.data):
1118 elif len(self.data):
1112 yield self.data
1119 yield self.data
1113
1120
1114
1121
1115 flaginterrupt = -1
1122 flaginterrupt = -1
1116
1123
1117 class interrupthandler(unpackermixin):
1124 class interrupthandler(unpackermixin):
1118 """read one part and process it with restricted capability
1125 """read one part and process it with restricted capability
1119
1126
1120 This allows to transmit exception raised on the producer size during part
1127 This allows to transmit exception raised on the producer size during part
1121 iteration while the consumer is reading a part.
1128 iteration while the consumer is reading a part.
1122
1129
1123 Part processed in this manner only have access to a ui object,"""
1130 Part processed in this manner only have access to a ui object,"""
1124
1131
1125 def __init__(self, ui, fp):
1132 def __init__(self, ui, fp):
1126 super(interrupthandler, self).__init__(fp)
1133 super(interrupthandler, self).__init__(fp)
1127 self.ui = ui
1134 self.ui = ui
1128
1135
1129 def _readpartheader(self):
1136 def _readpartheader(self):
1130 """reads a part header size and return the bytes blob
1137 """reads a part header size and return the bytes blob
1131
1138
1132 returns None if empty"""
1139 returns None if empty"""
1133 headersize = self._unpack(_fpartheadersize)[0]
1140 headersize = self._unpack(_fpartheadersize)[0]
1134 if headersize < 0:
1141 if headersize < 0:
1135 raise error.BundleValueError('negative part header size: %i'
1142 raise error.BundleValueError('negative part header size: %i'
1136 % headersize)
1143 % headersize)
1137 indebug(self.ui, 'part header size: %i\n' % headersize)
1144 indebug(self.ui, 'part header size: %i\n' % headersize)
1138 if headersize:
1145 if headersize:
1139 return self._readexact(headersize)
1146 return self._readexact(headersize)
1140 return None
1147 return None
1141
1148
1142 def __call__(self):
1149 def __call__(self):
1143
1150
1144 self.ui.debug('bundle2-input-stream-interrupt:'
1151 self.ui.debug('bundle2-input-stream-interrupt:'
1145 ' opening out of band context\n')
1152 ' opening out of band context\n')
1146 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1153 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1147 headerblock = self._readpartheader()
1154 headerblock = self._readpartheader()
1148 if headerblock is None:
1155 if headerblock is None:
1149 indebug(self.ui, 'no part found during interruption.')
1156 indebug(self.ui, 'no part found during interruption.')
1150 return
1157 return
1151 part = unbundlepart(self.ui, headerblock, self._fp)
1158 part = unbundlepart(self.ui, headerblock, self._fp)
1152 op = interruptoperation(self.ui)
1159 op = interruptoperation(self.ui)
1153 hardabort = False
1160 hardabort = False
1154 try:
1161 try:
1155 _processpart(op, part)
1162 _processpart(op, part)
1156 except (SystemExit, KeyboardInterrupt):
1163 except (SystemExit, KeyboardInterrupt):
1157 hardabort = True
1164 hardabort = True
1158 raise
1165 raise
1159 finally:
1166 finally:
1160 if not hardabort:
1167 if not hardabort:
1161 part.seek(0, 2)
1168 part.seek(0, 2)
1162 self.ui.debug('bundle2-input-stream-interrupt:'
1169 self.ui.debug('bundle2-input-stream-interrupt:'
1163 ' closing out of band context\n')
1170 ' closing out of band context\n')
1164
1171
1165 class interruptoperation(object):
1172 class interruptoperation(object):
1166 """A limited operation to be use by part handler during interruption
1173 """A limited operation to be use by part handler during interruption
1167
1174
1168 It only have access to an ui object.
1175 It only have access to an ui object.
1169 """
1176 """
1170
1177
1171 def __init__(self, ui):
1178 def __init__(self, ui):
1172 self.ui = ui
1179 self.ui = ui
1173 self.reply = None
1180 self.reply = None
1174 self.captureoutput = False
1181 self.captureoutput = False
1175
1182
1176 @property
1183 @property
1177 def repo(self):
1184 def repo(self):
1178 raise error.ProgrammingError('no repo access from stream interruption')
1185 raise error.ProgrammingError('no repo access from stream interruption')
1179
1186
1180 def gettransaction(self):
1187 def gettransaction(self):
1181 raise TransactionUnavailable('no repo access from stream interruption')
1188 raise TransactionUnavailable('no repo access from stream interruption')
1182
1189
1183 class unbundlepart(unpackermixin):
1190 class unbundlepart(unpackermixin):
1184 """a bundle part read from a bundle"""
1191 """a bundle part read from a bundle"""
1185
1192
1186 def __init__(self, ui, header, fp):
1193 def __init__(self, ui, header, fp):
1187 super(unbundlepart, self).__init__(fp)
1194 super(unbundlepart, self).__init__(fp)
1188 self._seekable = (util.safehasattr(fp, 'seek') and
1195 self._seekable = (util.safehasattr(fp, 'seek') and
1189 util.safehasattr(fp, 'tell'))
1196 util.safehasattr(fp, 'tell'))
1190 self.ui = ui
1197 self.ui = ui
1191 # unbundle state attr
1198 # unbundle state attr
1192 self._headerdata = header
1199 self._headerdata = header
1193 self._headeroffset = 0
1200 self._headeroffset = 0
1194 self._initialized = False
1201 self._initialized = False
1195 self.consumed = False
1202 self.consumed = False
1196 # part data
1203 # part data
1197 self.id = None
1204 self.id = None
1198 self.type = None
1205 self.type = None
1199 self.mandatoryparams = None
1206 self.mandatoryparams = None
1200 self.advisoryparams = None
1207 self.advisoryparams = None
1201 self.params = None
1208 self.params = None
1202 self.mandatorykeys = ()
1209 self.mandatorykeys = ()
1203 self._payloadstream = None
1210 self._payloadstream = None
1204 self._readheader()
1211 self._readheader()
1205 self._mandatory = None
1212 self._mandatory = None
1206 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1213 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1207 self._pos = 0
1214 self._pos = 0
1208
1215
1209 def _fromheader(self, size):
1216 def _fromheader(self, size):
1210 """return the next <size> byte from the header"""
1217 """return the next <size> byte from the header"""
1211 offset = self._headeroffset
1218 offset = self._headeroffset
1212 data = self._headerdata[offset:(offset + size)]
1219 data = self._headerdata[offset:(offset + size)]
1213 self._headeroffset = offset + size
1220 self._headeroffset = offset + size
1214 return data
1221 return data
1215
1222
1216 def _unpackheader(self, format):
1223 def _unpackheader(self, format):
1217 """read given format from header
1224 """read given format from header
1218
1225
1219 This automatically compute the size of the format to read."""
1226 This automatically compute the size of the format to read."""
1220 data = self._fromheader(struct.calcsize(format))
1227 data = self._fromheader(struct.calcsize(format))
1221 return _unpack(format, data)
1228 return _unpack(format, data)
1222
1229
1223 def _initparams(self, mandatoryparams, advisoryparams):
1230 def _initparams(self, mandatoryparams, advisoryparams):
1224 """internal function to setup all logic related parameters"""
1231 """internal function to setup all logic related parameters"""
1225 # make it read only to prevent people touching it by mistake.
1232 # make it read only to prevent people touching it by mistake.
1226 self.mandatoryparams = tuple(mandatoryparams)
1233 self.mandatoryparams = tuple(mandatoryparams)
1227 self.advisoryparams = tuple(advisoryparams)
1234 self.advisoryparams = tuple(advisoryparams)
1228 # user friendly UI
1235 # user friendly UI
1229 self.params = util.sortdict(self.mandatoryparams)
1236 self.params = util.sortdict(self.mandatoryparams)
1230 self.params.update(self.advisoryparams)
1237 self.params.update(self.advisoryparams)
1231 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1238 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1232
1239
1233 def _payloadchunks(self, chunknum=0):
1240 def _payloadchunks(self, chunknum=0):
1234 '''seek to specified chunk and start yielding data'''
1241 '''seek to specified chunk and start yielding data'''
1235 if len(self._chunkindex) == 0:
1242 if len(self._chunkindex) == 0:
1236 assert chunknum == 0, 'Must start with chunk 0'
1243 assert chunknum == 0, 'Must start with chunk 0'
1237 self._chunkindex.append((0, self._tellfp()))
1244 self._chunkindex.append((0, self._tellfp()))
1238 else:
1245 else:
1239 assert chunknum < len(self._chunkindex), \
1246 assert chunknum < len(self._chunkindex), \
1240 'Unknown chunk %d' % chunknum
1247 'Unknown chunk %d' % chunknum
1241 self._seekfp(self._chunkindex[chunknum][1])
1248 self._seekfp(self._chunkindex[chunknum][1])
1242
1249
1243 pos = self._chunkindex[chunknum][0]
1250 pos = self._chunkindex[chunknum][0]
1244 payloadsize = self._unpack(_fpayloadsize)[0]
1251 payloadsize = self._unpack(_fpayloadsize)[0]
1245 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1252 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1246 while payloadsize:
1253 while payloadsize:
1247 if payloadsize == flaginterrupt:
1254 if payloadsize == flaginterrupt:
1248 # interruption detection, the handler will now read a
1255 # interruption detection, the handler will now read a
1249 # single part and process it.
1256 # single part and process it.
1250 interrupthandler(self.ui, self._fp)()
1257 interrupthandler(self.ui, self._fp)()
1251 elif payloadsize < 0:
1258 elif payloadsize < 0:
1252 msg = 'negative payload chunk size: %i' % payloadsize
1259 msg = 'negative payload chunk size: %i' % payloadsize
1253 raise error.BundleValueError(msg)
1260 raise error.BundleValueError(msg)
1254 else:
1261 else:
1255 result = self._readexact(payloadsize)
1262 result = self._readexact(payloadsize)
1256 chunknum += 1
1263 chunknum += 1
1257 pos += payloadsize
1264 pos += payloadsize
1258 if chunknum == len(self._chunkindex):
1265 if chunknum == len(self._chunkindex):
1259 self._chunkindex.append((pos, self._tellfp()))
1266 self._chunkindex.append((pos, self._tellfp()))
1260 yield result
1267 yield result
1261 payloadsize = self._unpack(_fpayloadsize)[0]
1268 payloadsize = self._unpack(_fpayloadsize)[0]
1262 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1269 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1263
1270
1264 def _findchunk(self, pos):
1271 def _findchunk(self, pos):
1265 '''for a given payload position, return a chunk number and offset'''
1272 '''for a given payload position, return a chunk number and offset'''
1266 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1273 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1267 if ppos == pos:
1274 if ppos == pos:
1268 return chunk, 0
1275 return chunk, 0
1269 elif ppos > pos:
1276 elif ppos > pos:
1270 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1277 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1271 raise ValueError('Unknown chunk')
1278 raise ValueError('Unknown chunk')
1272
1279
1273 def _readheader(self):
1280 def _readheader(self):
1274 """read the header and setup the object"""
1281 """read the header and setup the object"""
1275 typesize = self._unpackheader(_fparttypesize)[0]
1282 typesize = self._unpackheader(_fparttypesize)[0]
1276 self.type = self._fromheader(typesize)
1283 self.type = self._fromheader(typesize)
1277 indebug(self.ui, 'part type: "%s"' % self.type)
1284 indebug(self.ui, 'part type: "%s"' % self.type)
1278 self.id = self._unpackheader(_fpartid)[0]
1285 self.id = self._unpackheader(_fpartid)[0]
1279 indebug(self.ui, 'part id: "%s"' % pycompat.bytestr(self.id))
1286 indebug(self.ui, 'part id: "%s"' % pycompat.bytestr(self.id))
1280 # extract mandatory bit from type
1287 # extract mandatory bit from type
1281 self.mandatory = (self.type != self.type.lower())
1288 self.mandatory = (self.type != self.type.lower())
1282 self.type = self.type.lower()
1289 self.type = self.type.lower()
1283 ## reading parameters
1290 ## reading parameters
1284 # param count
1291 # param count
1285 mancount, advcount = self._unpackheader(_fpartparamcount)
1292 mancount, advcount = self._unpackheader(_fpartparamcount)
1286 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1293 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1287 # param size
1294 # param size
1288 fparamsizes = _makefpartparamsizes(mancount + advcount)
1295 fparamsizes = _makefpartparamsizes(mancount + advcount)
1289 paramsizes = self._unpackheader(fparamsizes)
1296 paramsizes = self._unpackheader(fparamsizes)
1290 # make it a list of couple again
1297 # make it a list of couple again
1291 paramsizes = list(zip(paramsizes[::2], paramsizes[1::2]))
1298 paramsizes = list(zip(paramsizes[::2], paramsizes[1::2]))
1292 # split mandatory from advisory
1299 # split mandatory from advisory
1293 mansizes = paramsizes[:mancount]
1300 mansizes = paramsizes[:mancount]
1294 advsizes = paramsizes[mancount:]
1301 advsizes = paramsizes[mancount:]
1295 # retrieve param value
1302 # retrieve param value
1296 manparams = []
1303 manparams = []
1297 for key, value in mansizes:
1304 for key, value in mansizes:
1298 manparams.append((self._fromheader(key), self._fromheader(value)))
1305 manparams.append((self._fromheader(key), self._fromheader(value)))
1299 advparams = []
1306 advparams = []
1300 for key, value in advsizes:
1307 for key, value in advsizes:
1301 advparams.append((self._fromheader(key), self._fromheader(value)))
1308 advparams.append((self._fromheader(key), self._fromheader(value)))
1302 self._initparams(manparams, advparams)
1309 self._initparams(manparams, advparams)
1303 ## part payload
1310 ## part payload
1304 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1311 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1305 # we read the data, tell it
1312 # we read the data, tell it
1306 self._initialized = True
1313 self._initialized = True
1307
1314
1308 def read(self, size=None):
1315 def read(self, size=None):
1309 """read payload data"""
1316 """read payload data"""
1310 if not self._initialized:
1317 if not self._initialized:
1311 self._readheader()
1318 self._readheader()
1312 if size is None:
1319 if size is None:
1313 data = self._payloadstream.read()
1320 data = self._payloadstream.read()
1314 else:
1321 else:
1315 data = self._payloadstream.read(size)
1322 data = self._payloadstream.read(size)
1316 self._pos += len(data)
1323 self._pos += len(data)
1317 if size is None or len(data) < size:
1324 if size is None or len(data) < size:
1318 if not self.consumed and self._pos:
1325 if not self.consumed and self._pos:
1319 self.ui.debug('bundle2-input-part: total payload size %i\n'
1326 self.ui.debug('bundle2-input-part: total payload size %i\n'
1320 % self._pos)
1327 % self._pos)
1321 self.consumed = True
1328 self.consumed = True
1322 return data
1329 return data
1323
1330
1324 def tell(self):
1331 def tell(self):
1325 return self._pos
1332 return self._pos
1326
1333
1327 def seek(self, offset, whence=0):
1334 def seek(self, offset, whence=0):
1328 if whence == 0:
1335 if whence == 0:
1329 newpos = offset
1336 newpos = offset
1330 elif whence == 1:
1337 elif whence == 1:
1331 newpos = self._pos + offset
1338 newpos = self._pos + offset
1332 elif whence == 2:
1339 elif whence == 2:
1333 if not self.consumed:
1340 if not self.consumed:
1334 self.read()
1341 self.read()
1335 newpos = self._chunkindex[-1][0] - offset
1342 newpos = self._chunkindex[-1][0] - offset
1336 else:
1343 else:
1337 raise ValueError('Unknown whence value: %r' % (whence,))
1344 raise ValueError('Unknown whence value: %r' % (whence,))
1338
1345
1339 if newpos > self._chunkindex[-1][0] and not self.consumed:
1346 if newpos > self._chunkindex[-1][0] and not self.consumed:
1340 self.read()
1347 self.read()
1341 if not 0 <= newpos <= self._chunkindex[-1][0]:
1348 if not 0 <= newpos <= self._chunkindex[-1][0]:
1342 raise ValueError('Offset out of range')
1349 raise ValueError('Offset out of range')
1343
1350
1344 if self._pos != newpos:
1351 if self._pos != newpos:
1345 chunk, internaloffset = self._findchunk(newpos)
1352 chunk, internaloffset = self._findchunk(newpos)
1346 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1353 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1347 adjust = self.read(internaloffset)
1354 adjust = self.read(internaloffset)
1348 if len(adjust) != internaloffset:
1355 if len(adjust) != internaloffset:
1349 raise error.Abort(_('Seek failed\n'))
1356 raise error.Abort(_('Seek failed\n'))
1350 self._pos = newpos
1357 self._pos = newpos
1351
1358
1352 def _seekfp(self, offset, whence=0):
1359 def _seekfp(self, offset, whence=0):
1353 """move the underlying file pointer
1360 """move the underlying file pointer
1354
1361
1355 This method is meant for internal usage by the bundle2 protocol only.
1362 This method is meant for internal usage by the bundle2 protocol only.
1356 They directly manipulate the low level stream including bundle2 level
1363 They directly manipulate the low level stream including bundle2 level
1357 instruction.
1364 instruction.
1358
1365
1359 Do not use it to implement higher-level logic or methods."""
1366 Do not use it to implement higher-level logic or methods."""
1360 if self._seekable:
1367 if self._seekable:
1361 return self._fp.seek(offset, whence)
1368 return self._fp.seek(offset, whence)
1362 else:
1369 else:
1363 raise NotImplementedError(_('File pointer is not seekable'))
1370 raise NotImplementedError(_('File pointer is not seekable'))
1364
1371
1365 def _tellfp(self):
1372 def _tellfp(self):
1366 """return the file offset, or None if file is not seekable
1373 """return the file offset, or None if file is not seekable
1367
1374
1368 This method is meant for internal usage by the bundle2 protocol only.
1375 This method is meant for internal usage by the bundle2 protocol only.
1369 They directly manipulate the low level stream including bundle2 level
1376 They directly manipulate the low level stream including bundle2 level
1370 instruction.
1377 instruction.
1371
1378
1372 Do not use it to implement higher-level logic or methods."""
1379 Do not use it to implement higher-level logic or methods."""
1373 if self._seekable:
1380 if self._seekable:
1374 try:
1381 try:
1375 return self._fp.tell()
1382 return self._fp.tell()
1376 except IOError as e:
1383 except IOError as e:
1377 if e.errno == errno.ESPIPE:
1384 if e.errno == errno.ESPIPE:
1378 self._seekable = False
1385 self._seekable = False
1379 else:
1386 else:
1380 raise
1387 raise
1381 return None
1388 return None
1382
1389
1383 # These are only the static capabilities.
1390 # These are only the static capabilities.
1384 # Check the 'getrepocaps' function for the rest.
1391 # Check the 'getrepocaps' function for the rest.
1385 capabilities = {'HG20': (),
1392 capabilities = {'HG20': (),
1386 'error': ('abort', 'unsupportedcontent', 'pushraced',
1393 'error': ('abort', 'unsupportedcontent', 'pushraced',
1387 'pushkey'),
1394 'pushkey'),
1388 'listkeys': (),
1395 'listkeys': (),
1389 'pushkey': (),
1396 'pushkey': (),
1390 'digests': tuple(sorted(util.DIGESTS.keys())),
1397 'digests': tuple(sorted(util.DIGESTS.keys())),
1391 'remote-changegroup': ('http', 'https'),
1398 'remote-changegroup': ('http', 'https'),
1392 'hgtagsfnodes': (),
1399 'hgtagsfnodes': (),
1393 }
1400 }
1394
1401
1395 def getrepocaps(repo, allowpushback=False):
1402 def getrepocaps(repo, allowpushback=False):
1396 """return the bundle2 capabilities for a given repo
1403 """return the bundle2 capabilities for a given repo
1397
1404
1398 Exists to allow extensions (like evolution) to mutate the capabilities.
1405 Exists to allow extensions (like evolution) to mutate the capabilities.
1399 """
1406 """
1400 caps = capabilities.copy()
1407 caps = capabilities.copy()
1401 caps['changegroup'] = tuple(sorted(
1408 caps['changegroup'] = tuple(sorted(
1402 changegroup.supportedincomingversions(repo)))
1409 changegroup.supportedincomingversions(repo)))
1403 if obsolete.isenabled(repo, obsolete.exchangeopt):
1410 if obsolete.isenabled(repo, obsolete.exchangeopt):
1404 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1411 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1405 caps['obsmarkers'] = supportedformat
1412 caps['obsmarkers'] = supportedformat
1406 if allowpushback:
1413 if allowpushback:
1407 caps['pushback'] = ()
1414 caps['pushback'] = ()
1408 cpmode = repo.ui.config('server', 'concurrent-push-mode')
1415 cpmode = repo.ui.config('server', 'concurrent-push-mode')
1409 if cpmode == 'check-related':
1416 if cpmode == 'check-related':
1410 caps['checkheads'] = ('related',)
1417 caps['checkheads'] = ('related',)
1411 return caps
1418 return caps
1412
1419
1413 def bundle2caps(remote):
1420 def bundle2caps(remote):
1414 """return the bundle capabilities of a peer as dict"""
1421 """return the bundle capabilities of a peer as dict"""
1415 raw = remote.capable('bundle2')
1422 raw = remote.capable('bundle2')
1416 if not raw and raw != '':
1423 if not raw and raw != '':
1417 return {}
1424 return {}
1418 capsblob = urlreq.unquote(remote.capable('bundle2'))
1425 capsblob = urlreq.unquote(remote.capable('bundle2'))
1419 return decodecaps(capsblob)
1426 return decodecaps(capsblob)
1420
1427
1421 def obsmarkersversion(caps):
1428 def obsmarkersversion(caps):
1422 """extract the list of supported obsmarkers versions from a bundle2caps dict
1429 """extract the list of supported obsmarkers versions from a bundle2caps dict
1423 """
1430 """
1424 obscaps = caps.get('obsmarkers', ())
1431 obscaps = caps.get('obsmarkers', ())
1425 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1432 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1426
1433
1427 def writenewbundle(ui, repo, source, filename, bundletype, outgoing, opts,
1434 def writenewbundle(ui, repo, source, filename, bundletype, outgoing, opts,
1428 vfs=None, compression=None, compopts=None):
1435 vfs=None, compression=None, compopts=None):
1429 if bundletype.startswith('HG10'):
1436 if bundletype.startswith('HG10'):
1430 cg = changegroup.makechangegroup(repo, outgoing, '01', source)
1437 cg = changegroup.makechangegroup(repo, outgoing, '01', source)
1431 return writebundle(ui, cg, filename, bundletype, vfs=vfs,
1438 return writebundle(ui, cg, filename, bundletype, vfs=vfs,
1432 compression=compression, compopts=compopts)
1439 compression=compression, compopts=compopts)
1433 elif not bundletype.startswith('HG20'):
1440 elif not bundletype.startswith('HG20'):
1434 raise error.ProgrammingError('unknown bundle type: %s' % bundletype)
1441 raise error.ProgrammingError('unknown bundle type: %s' % bundletype)
1435
1442
1436 caps = {}
1443 caps = {}
1437 if 'obsolescence' in opts:
1444 if 'obsolescence' in opts:
1438 caps['obsmarkers'] = ('V1',)
1445 caps['obsmarkers'] = ('V1',)
1439 bundle = bundle20(ui, caps)
1446 bundle = bundle20(ui, caps)
1440 bundle.setcompression(compression, compopts)
1447 bundle.setcompression(compression, compopts)
1441 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1448 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1442 chunkiter = bundle.getchunks()
1449 chunkiter = bundle.getchunks()
1443
1450
1444 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1451 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1445
1452
1446 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1453 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1447 # We should eventually reconcile this logic with the one behind
1454 # We should eventually reconcile this logic with the one behind
1448 # 'exchange.getbundle2partsgenerator'.
1455 # 'exchange.getbundle2partsgenerator'.
1449 #
1456 #
1450 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1457 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1451 # different right now. So we keep them separated for now for the sake of
1458 # different right now. So we keep them separated for now for the sake of
1452 # simplicity.
1459 # simplicity.
1453
1460
1454 # we always want a changegroup in such bundle
1461 # we always want a changegroup in such bundle
1455 cgversion = opts.get('cg.version')
1462 cgversion = opts.get('cg.version')
1456 if cgversion is None:
1463 if cgversion is None:
1457 cgversion = changegroup.safeversion(repo)
1464 cgversion = changegroup.safeversion(repo)
1458 cg = changegroup.makechangegroup(repo, outgoing, cgversion, source)
1465 cg = changegroup.makechangegroup(repo, outgoing, cgversion, source)
1459 part = bundler.newpart('changegroup', data=cg.getchunks())
1466 part = bundler.newpart('changegroup', data=cg.getchunks())
1460 part.addparam('version', cg.version)
1467 part.addparam('version', cg.version)
1461 if 'clcount' in cg.extras:
1468 if 'clcount' in cg.extras:
1462 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1469 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1463 mandatory=False)
1470 mandatory=False)
1464 if opts.get('phases') and repo.revs('%ln and secret()',
1471 if opts.get('phases') and repo.revs('%ln and secret()',
1465 outgoing.missingheads):
1472 outgoing.missingheads):
1466 part.addparam('targetphase', '%d' % phases.secret, mandatory=False)
1473 part.addparam('targetphase', '%d' % phases.secret, mandatory=False)
1467
1474
1468 addparttagsfnodescache(repo, bundler, outgoing)
1475 addparttagsfnodescache(repo, bundler, outgoing)
1469
1476
1470 if opts.get('obsolescence', False):
1477 if opts.get('obsolescence', False):
1471 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1478 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1472 buildobsmarkerspart(bundler, obsmarkers)
1479 buildobsmarkerspart(bundler, obsmarkers)
1473
1480
1474 if opts.get('phases', False):
1481 if opts.get('phases', False):
1475 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1482 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1476 phasedata = []
1483 phasedata = []
1477 for phase in phases.allphases:
1484 for phase in phases.allphases:
1478 for head in headsbyphase[phase]:
1485 for head in headsbyphase[phase]:
1479 phasedata.append(_pack(_fphasesentry, phase, head))
1486 phasedata.append(_pack(_fphasesentry, phase, head))
1480 bundler.newpart('phase-heads', data=''.join(phasedata))
1487 bundler.newpart('phase-heads', data=''.join(phasedata))
1481
1488
1482 def addparttagsfnodescache(repo, bundler, outgoing):
1489 def addparttagsfnodescache(repo, bundler, outgoing):
1483 # we include the tags fnode cache for the bundle changeset
1490 # we include the tags fnode cache for the bundle changeset
1484 # (as an optional parts)
1491 # (as an optional parts)
1485 cache = tags.hgtagsfnodescache(repo.unfiltered())
1492 cache = tags.hgtagsfnodescache(repo.unfiltered())
1486 chunks = []
1493 chunks = []
1487
1494
1488 # .hgtags fnodes are only relevant for head changesets. While we could
1495 # .hgtags fnodes are only relevant for head changesets. While we could
1489 # transfer values for all known nodes, there will likely be little to
1496 # transfer values for all known nodes, there will likely be little to
1490 # no benefit.
1497 # no benefit.
1491 #
1498 #
1492 # We don't bother using a generator to produce output data because
1499 # We don't bother using a generator to produce output data because
1493 # a) we only have 40 bytes per head and even esoteric numbers of heads
1500 # a) we only have 40 bytes per head and even esoteric numbers of heads
1494 # consume little memory (1M heads is 40MB) b) we don't want to send the
1501 # consume little memory (1M heads is 40MB) b) we don't want to send the
1495 # part if we don't have entries and knowing if we have entries requires
1502 # part if we don't have entries and knowing if we have entries requires
1496 # cache lookups.
1503 # cache lookups.
1497 for node in outgoing.missingheads:
1504 for node in outgoing.missingheads:
1498 # Don't compute missing, as this may slow down serving.
1505 # Don't compute missing, as this may slow down serving.
1499 fnode = cache.getfnode(node, computemissing=False)
1506 fnode = cache.getfnode(node, computemissing=False)
1500 if fnode is not None:
1507 if fnode is not None:
1501 chunks.extend([node, fnode])
1508 chunks.extend([node, fnode])
1502
1509
1503 if chunks:
1510 if chunks:
1504 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1511 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1505
1512
1506 def buildobsmarkerspart(bundler, markers):
1513 def buildobsmarkerspart(bundler, markers):
1507 """add an obsmarker part to the bundler with <markers>
1514 """add an obsmarker part to the bundler with <markers>
1508
1515
1509 No part is created if markers is empty.
1516 No part is created if markers is empty.
1510 Raises ValueError if the bundler doesn't support any known obsmarker format.
1517 Raises ValueError if the bundler doesn't support any known obsmarker format.
1511 """
1518 """
1512 if not markers:
1519 if not markers:
1513 return None
1520 return None
1514
1521
1515 remoteversions = obsmarkersversion(bundler.capabilities)
1522 remoteversions = obsmarkersversion(bundler.capabilities)
1516 version = obsolete.commonversion(remoteversions)
1523 version = obsolete.commonversion(remoteversions)
1517 if version is None:
1524 if version is None:
1518 raise ValueError('bundler does not support common obsmarker format')
1525 raise ValueError('bundler does not support common obsmarker format')
1519 stream = obsolete.encodemarkers(markers, True, version=version)
1526 stream = obsolete.encodemarkers(markers, True, version=version)
1520 return bundler.newpart('obsmarkers', data=stream)
1527 return bundler.newpart('obsmarkers', data=stream)
1521
1528
1522 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1529 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1523 compopts=None):
1530 compopts=None):
1524 """Write a bundle file and return its filename.
1531 """Write a bundle file and return its filename.
1525
1532
1526 Existing files will not be overwritten.
1533 Existing files will not be overwritten.
1527 If no filename is specified, a temporary file is created.
1534 If no filename is specified, a temporary file is created.
1528 bz2 compression can be turned off.
1535 bz2 compression can be turned off.
1529 The bundle file will be deleted in case of errors.
1536 The bundle file will be deleted in case of errors.
1530 """
1537 """
1531
1538
1532 if bundletype == "HG20":
1539 if bundletype == "HG20":
1533 bundle = bundle20(ui)
1540 bundle = bundle20(ui)
1534 bundle.setcompression(compression, compopts)
1541 bundle.setcompression(compression, compopts)
1535 part = bundle.newpart('changegroup', data=cg.getchunks())
1542 part = bundle.newpart('changegroup', data=cg.getchunks())
1536 part.addparam('version', cg.version)
1543 part.addparam('version', cg.version)
1537 if 'clcount' in cg.extras:
1544 if 'clcount' in cg.extras:
1538 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1545 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1539 mandatory=False)
1546 mandatory=False)
1540 chunkiter = bundle.getchunks()
1547 chunkiter = bundle.getchunks()
1541 else:
1548 else:
1542 # compression argument is only for the bundle2 case
1549 # compression argument is only for the bundle2 case
1543 assert compression is None
1550 assert compression is None
1544 if cg.version != '01':
1551 if cg.version != '01':
1545 raise error.Abort(_('old bundle types only supports v1 '
1552 raise error.Abort(_('old bundle types only supports v1 '
1546 'changegroups'))
1553 'changegroups'))
1547 header, comp = bundletypes[bundletype]
1554 header, comp = bundletypes[bundletype]
1548 if comp not in util.compengines.supportedbundletypes:
1555 if comp not in util.compengines.supportedbundletypes:
1549 raise error.Abort(_('unknown stream compression type: %s')
1556 raise error.Abort(_('unknown stream compression type: %s')
1550 % comp)
1557 % comp)
1551 compengine = util.compengines.forbundletype(comp)
1558 compengine = util.compengines.forbundletype(comp)
1552 def chunkiter():
1559 def chunkiter():
1553 yield header
1560 yield header
1554 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1561 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1555 yield chunk
1562 yield chunk
1556 chunkiter = chunkiter()
1563 chunkiter = chunkiter()
1557
1564
1558 # parse the changegroup data, otherwise we will block
1565 # parse the changegroup data, otherwise we will block
1559 # in case of sshrepo because we don't know the end of the stream
1566 # in case of sshrepo because we don't know the end of the stream
1560 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1567 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1561
1568
1562 def combinechangegroupresults(op):
1569 def combinechangegroupresults(op):
1563 """logic to combine 0 or more addchangegroup results into one"""
1570 """logic to combine 0 or more addchangegroup results into one"""
1564 results = [r.get('return', 0)
1571 results = [r.get('return', 0)
1565 for r in op.records['changegroup']]
1572 for r in op.records['changegroup']]
1566 changedheads = 0
1573 changedheads = 0
1567 result = 1
1574 result = 1
1568 for ret in results:
1575 for ret in results:
1569 # If any changegroup result is 0, return 0
1576 # If any changegroup result is 0, return 0
1570 if ret == 0:
1577 if ret == 0:
1571 result = 0
1578 result = 0
1572 break
1579 break
1573 if ret < -1:
1580 if ret < -1:
1574 changedheads += ret + 1
1581 changedheads += ret + 1
1575 elif ret > 1:
1582 elif ret > 1:
1576 changedheads += ret - 1
1583 changedheads += ret - 1
1577 if changedheads > 0:
1584 if changedheads > 0:
1578 result = 1 + changedheads
1585 result = 1 + changedheads
1579 elif changedheads < 0:
1586 elif changedheads < 0:
1580 result = -1 + changedheads
1587 result = -1 + changedheads
1581 return result
1588 return result
1582
1589
1583 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest',
1590 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest',
1584 'targetphase'))
1591 'targetphase'))
1585 def handlechangegroup(op, inpart):
1592 def handlechangegroup(op, inpart):
1586 """apply a changegroup part on the repo
1593 """apply a changegroup part on the repo
1587
1594
1588 This is a very early implementation that will massive rework before being
1595 This is a very early implementation that will massive rework before being
1589 inflicted to any end-user.
1596 inflicted to any end-user.
1590 """
1597 """
1591 tr = op.gettransaction()
1598 tr = op.gettransaction()
1592 unpackerversion = inpart.params.get('version', '01')
1599 unpackerversion = inpart.params.get('version', '01')
1593 # We should raise an appropriate exception here
1600 # We should raise an appropriate exception here
1594 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1601 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1595 # the source and url passed here are overwritten by the one contained in
1602 # the source and url passed here are overwritten by the one contained in
1596 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1603 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1597 nbchangesets = None
1604 nbchangesets = None
1598 if 'nbchanges' in inpart.params:
1605 if 'nbchanges' in inpart.params:
1599 nbchangesets = int(inpart.params.get('nbchanges'))
1606 nbchangesets = int(inpart.params.get('nbchanges'))
1600 if ('treemanifest' in inpart.params and
1607 if ('treemanifest' in inpart.params and
1601 'treemanifest' not in op.repo.requirements):
1608 'treemanifest' not in op.repo.requirements):
1602 if len(op.repo.changelog) != 0:
1609 if len(op.repo.changelog) != 0:
1603 raise error.Abort(_(
1610 raise error.Abort(_(
1604 "bundle contains tree manifests, but local repo is "
1611 "bundle contains tree manifests, but local repo is "
1605 "non-empty and does not use tree manifests"))
1612 "non-empty and does not use tree manifests"))
1606 op.repo.requirements.add('treemanifest')
1613 op.repo.requirements.add('treemanifest')
1607 op.repo._applyopenerreqs()
1614 op.repo._applyopenerreqs()
1608 op.repo._writerequirements()
1615 op.repo._writerequirements()
1609 extrakwargs = {}
1616 extrakwargs = {}
1610 targetphase = inpart.params.get('targetphase')
1617 targetphase = inpart.params.get('targetphase')
1611 if targetphase is not None:
1618 if targetphase is not None:
1612 extrakwargs['targetphase'] = int(targetphase)
1619 extrakwargs['targetphase'] = int(targetphase)
1613 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2',
1620 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2',
1614 expectedtotal=nbchangesets, **extrakwargs)
1621 expectedtotal=nbchangesets, **extrakwargs)
1615 if op.reply is not None:
1622 if op.reply is not None:
1616 # This is definitely not the final form of this
1623 # This is definitely not the final form of this
1617 # return. But one need to start somewhere.
1624 # return. But one need to start somewhere.
1618 part = op.reply.newpart('reply:changegroup', mandatory=False)
1625 part = op.reply.newpart('reply:changegroup', mandatory=False)
1619 part.addparam(
1626 part.addparam(
1620 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1627 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1621 part.addparam('return', '%i' % ret, mandatory=False)
1628 part.addparam('return', '%i' % ret, mandatory=False)
1622 assert not inpart.read()
1629 assert not inpart.read()
1623
1630
1624 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1631 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1625 ['digest:%s' % k for k in util.DIGESTS.keys()])
1632 ['digest:%s' % k for k in util.DIGESTS.keys()])
1626 @parthandler('remote-changegroup', _remotechangegroupparams)
1633 @parthandler('remote-changegroup', _remotechangegroupparams)
1627 def handleremotechangegroup(op, inpart):
1634 def handleremotechangegroup(op, inpart):
1628 """apply a bundle10 on the repo, given an url and validation information
1635 """apply a bundle10 on the repo, given an url and validation information
1629
1636
1630 All the information about the remote bundle to import are given as
1637 All the information about the remote bundle to import are given as
1631 parameters. The parameters include:
1638 parameters. The parameters include:
1632 - url: the url to the bundle10.
1639 - url: the url to the bundle10.
1633 - size: the bundle10 file size. It is used to validate what was
1640 - size: the bundle10 file size. It is used to validate what was
1634 retrieved by the client matches the server knowledge about the bundle.
1641 retrieved by the client matches the server knowledge about the bundle.
1635 - digests: a space separated list of the digest types provided as
1642 - digests: a space separated list of the digest types provided as
1636 parameters.
1643 parameters.
1637 - digest:<digest-type>: the hexadecimal representation of the digest with
1644 - digest:<digest-type>: the hexadecimal representation of the digest with
1638 that name. Like the size, it is used to validate what was retrieved by
1645 that name. Like the size, it is used to validate what was retrieved by
1639 the client matches what the server knows about the bundle.
1646 the client matches what the server knows about the bundle.
1640
1647
1641 When multiple digest types are given, all of them are checked.
1648 When multiple digest types are given, all of them are checked.
1642 """
1649 """
1643 try:
1650 try:
1644 raw_url = inpart.params['url']
1651 raw_url = inpart.params['url']
1645 except KeyError:
1652 except KeyError:
1646 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1653 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1647 parsed_url = util.url(raw_url)
1654 parsed_url = util.url(raw_url)
1648 if parsed_url.scheme not in capabilities['remote-changegroup']:
1655 if parsed_url.scheme not in capabilities['remote-changegroup']:
1649 raise error.Abort(_('remote-changegroup does not support %s urls') %
1656 raise error.Abort(_('remote-changegroup does not support %s urls') %
1650 parsed_url.scheme)
1657 parsed_url.scheme)
1651
1658
1652 try:
1659 try:
1653 size = int(inpart.params['size'])
1660 size = int(inpart.params['size'])
1654 except ValueError:
1661 except ValueError:
1655 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1662 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1656 % 'size')
1663 % 'size')
1657 except KeyError:
1664 except KeyError:
1658 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1665 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1659
1666
1660 digests = {}
1667 digests = {}
1661 for typ in inpart.params.get('digests', '').split():
1668 for typ in inpart.params.get('digests', '').split():
1662 param = 'digest:%s' % typ
1669 param = 'digest:%s' % typ
1663 try:
1670 try:
1664 value = inpart.params[param]
1671 value = inpart.params[param]
1665 except KeyError:
1672 except KeyError:
1666 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1673 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1667 param)
1674 param)
1668 digests[typ] = value
1675 digests[typ] = value
1669
1676
1670 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1677 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1671
1678
1672 tr = op.gettransaction()
1679 tr = op.gettransaction()
1673 from . import exchange
1680 from . import exchange
1674 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1681 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1675 if not isinstance(cg, changegroup.cg1unpacker):
1682 if not isinstance(cg, changegroup.cg1unpacker):
1676 raise error.Abort(_('%s: not a bundle version 1.0') %
1683 raise error.Abort(_('%s: not a bundle version 1.0') %
1677 util.hidepassword(raw_url))
1684 util.hidepassword(raw_url))
1678 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2')
1685 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2')
1679 if op.reply is not None:
1686 if op.reply is not None:
1680 # This is definitely not the final form of this
1687 # This is definitely not the final form of this
1681 # return. But one need to start somewhere.
1688 # return. But one need to start somewhere.
1682 part = op.reply.newpart('reply:changegroup')
1689 part = op.reply.newpart('reply:changegroup')
1683 part.addparam(
1690 part.addparam(
1684 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1691 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1685 part.addparam('return', '%i' % ret, mandatory=False)
1692 part.addparam('return', '%i' % ret, mandatory=False)
1686 try:
1693 try:
1687 real_part.validate()
1694 real_part.validate()
1688 except error.Abort as e:
1695 except error.Abort as e:
1689 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1696 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1690 (util.hidepassword(raw_url), str(e)))
1697 (util.hidepassword(raw_url), str(e)))
1691 assert not inpart.read()
1698 assert not inpart.read()
1692
1699
1693 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1700 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1694 def handlereplychangegroup(op, inpart):
1701 def handlereplychangegroup(op, inpart):
1695 ret = int(inpart.params['return'])
1702 ret = int(inpart.params['return'])
1696 replyto = int(inpart.params['in-reply-to'])
1703 replyto = int(inpart.params['in-reply-to'])
1697 op.records.add('changegroup', {'return': ret}, replyto)
1704 op.records.add('changegroup', {'return': ret}, replyto)
1698
1705
1699 @parthandler('check:heads')
1706 @parthandler('check:heads')
1700 def handlecheckheads(op, inpart):
1707 def handlecheckheads(op, inpart):
1701 """check that head of the repo did not change
1708 """check that head of the repo did not change
1702
1709
1703 This is used to detect a push race when using unbundle.
1710 This is used to detect a push race when using unbundle.
1704 This replaces the "heads" argument of unbundle."""
1711 This replaces the "heads" argument of unbundle."""
1705 h = inpart.read(20)
1712 h = inpart.read(20)
1706 heads = []
1713 heads = []
1707 while len(h) == 20:
1714 while len(h) == 20:
1708 heads.append(h)
1715 heads.append(h)
1709 h = inpart.read(20)
1716 h = inpart.read(20)
1710 assert not h
1717 assert not h
1711 # Trigger a transaction so that we are guaranteed to have the lock now.
1718 # Trigger a transaction so that we are guaranteed to have the lock now.
1712 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1719 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1713 op.gettransaction()
1720 op.gettransaction()
1714 if sorted(heads) != sorted(op.repo.heads()):
1721 if sorted(heads) != sorted(op.repo.heads()):
1715 raise error.PushRaced('repository changed while pushing - '
1722 raise error.PushRaced('repository changed while pushing - '
1716 'please try again')
1723 'please try again')
1717
1724
1718 @parthandler('check:updated-heads')
1725 @parthandler('check:updated-heads')
1719 def handlecheckupdatedheads(op, inpart):
1726 def handlecheckupdatedheads(op, inpart):
1720 """check for race on the heads touched by a push
1727 """check for race on the heads touched by a push
1721
1728
1722 This is similar to 'check:heads' but focus on the heads actually updated
1729 This is similar to 'check:heads' but focus on the heads actually updated
1723 during the push. If other activities happen on unrelated heads, it is
1730 during the push. If other activities happen on unrelated heads, it is
1724 ignored.
1731 ignored.
1725
1732
1726 This allow server with high traffic to avoid push contention as long as
1733 This allow server with high traffic to avoid push contention as long as
1727 unrelated parts of the graph are involved."""
1734 unrelated parts of the graph are involved."""
1728 h = inpart.read(20)
1735 h = inpart.read(20)
1729 heads = []
1736 heads = []
1730 while len(h) == 20:
1737 while len(h) == 20:
1731 heads.append(h)
1738 heads.append(h)
1732 h = inpart.read(20)
1739 h = inpart.read(20)
1733 assert not h
1740 assert not h
1734 # trigger a transaction so that we are guaranteed to have the lock now.
1741 # trigger a transaction so that we are guaranteed to have the lock now.
1735 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1742 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1736 op.gettransaction()
1743 op.gettransaction()
1737
1744
1738 currentheads = set()
1745 currentheads = set()
1739 for ls in op.repo.branchmap().itervalues():
1746 for ls in op.repo.branchmap().itervalues():
1740 currentheads.update(ls)
1747 currentheads.update(ls)
1741
1748
1742 for h in heads:
1749 for h in heads:
1743 if h not in currentheads:
1750 if h not in currentheads:
1744 raise error.PushRaced('repository changed while pushing - '
1751 raise error.PushRaced('repository changed while pushing - '
1745 'please try again')
1752 'please try again')
1746
1753
1747 @parthandler('output')
1754 @parthandler('output')
1748 def handleoutput(op, inpart):
1755 def handleoutput(op, inpart):
1749 """forward output captured on the server to the client"""
1756 """forward output captured on the server to the client"""
1750 for line in inpart.read().splitlines():
1757 for line in inpart.read().splitlines():
1751 op.ui.status(_('remote: %s\n') % line)
1758 op.ui.status(_('remote: %s\n') % line)
1752
1759
1753 @parthandler('replycaps')
1760 @parthandler('replycaps')
1754 def handlereplycaps(op, inpart):
1761 def handlereplycaps(op, inpart):
1755 """Notify that a reply bundle should be created
1762 """Notify that a reply bundle should be created
1756
1763
1757 The payload contains the capabilities information for the reply"""
1764 The payload contains the capabilities information for the reply"""
1758 caps = decodecaps(inpart.read())
1765 caps = decodecaps(inpart.read())
1759 if op.reply is None:
1766 if op.reply is None:
1760 op.reply = bundle20(op.ui, caps)
1767 op.reply = bundle20(op.ui, caps)
1761
1768
1762 class AbortFromPart(error.Abort):
1769 class AbortFromPart(error.Abort):
1763 """Sub-class of Abort that denotes an error from a bundle2 part."""
1770 """Sub-class of Abort that denotes an error from a bundle2 part."""
1764
1771
1765 @parthandler('error:abort', ('message', 'hint'))
1772 @parthandler('error:abort', ('message', 'hint'))
1766 def handleerrorabort(op, inpart):
1773 def handleerrorabort(op, inpart):
1767 """Used to transmit abort error over the wire"""
1774 """Used to transmit abort error over the wire"""
1768 raise AbortFromPart(inpart.params['message'],
1775 raise AbortFromPart(inpart.params['message'],
1769 hint=inpart.params.get('hint'))
1776 hint=inpart.params.get('hint'))
1770
1777
1771 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1778 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1772 'in-reply-to'))
1779 'in-reply-to'))
1773 def handleerrorpushkey(op, inpart):
1780 def handleerrorpushkey(op, inpart):
1774 """Used to transmit failure of a mandatory pushkey over the wire"""
1781 """Used to transmit failure of a mandatory pushkey over the wire"""
1775 kwargs = {}
1782 kwargs = {}
1776 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1783 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1777 value = inpart.params.get(name)
1784 value = inpart.params.get(name)
1778 if value is not None:
1785 if value is not None:
1779 kwargs[name] = value
1786 kwargs[name] = value
1780 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1787 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1781
1788
1782 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1789 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1783 def handleerrorunsupportedcontent(op, inpart):
1790 def handleerrorunsupportedcontent(op, inpart):
1784 """Used to transmit unknown content error over the wire"""
1791 """Used to transmit unknown content error over the wire"""
1785 kwargs = {}
1792 kwargs = {}
1786 parttype = inpart.params.get('parttype')
1793 parttype = inpart.params.get('parttype')
1787 if parttype is not None:
1794 if parttype is not None:
1788 kwargs['parttype'] = parttype
1795 kwargs['parttype'] = parttype
1789 params = inpart.params.get('params')
1796 params = inpart.params.get('params')
1790 if params is not None:
1797 if params is not None:
1791 kwargs['params'] = params.split('\0')
1798 kwargs['params'] = params.split('\0')
1792
1799
1793 raise error.BundleUnknownFeatureError(**kwargs)
1800 raise error.BundleUnknownFeatureError(**kwargs)
1794
1801
1795 @parthandler('error:pushraced', ('message',))
1802 @parthandler('error:pushraced', ('message',))
1796 def handleerrorpushraced(op, inpart):
1803 def handleerrorpushraced(op, inpart):
1797 """Used to transmit push race error over the wire"""
1804 """Used to transmit push race error over the wire"""
1798 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1805 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1799
1806
1800 @parthandler('listkeys', ('namespace',))
1807 @parthandler('listkeys', ('namespace',))
1801 def handlelistkeys(op, inpart):
1808 def handlelistkeys(op, inpart):
1802 """retrieve pushkey namespace content stored in a bundle2"""
1809 """retrieve pushkey namespace content stored in a bundle2"""
1803 namespace = inpart.params['namespace']
1810 namespace = inpart.params['namespace']
1804 r = pushkey.decodekeys(inpart.read())
1811 r = pushkey.decodekeys(inpart.read())
1805 op.records.add('listkeys', (namespace, r))
1812 op.records.add('listkeys', (namespace, r))
1806
1813
1807 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1814 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1808 def handlepushkey(op, inpart):
1815 def handlepushkey(op, inpart):
1809 """process a pushkey request"""
1816 """process a pushkey request"""
1810 dec = pushkey.decode
1817 dec = pushkey.decode
1811 namespace = dec(inpart.params['namespace'])
1818 namespace = dec(inpart.params['namespace'])
1812 key = dec(inpart.params['key'])
1819 key = dec(inpart.params['key'])
1813 old = dec(inpart.params['old'])
1820 old = dec(inpart.params['old'])
1814 new = dec(inpart.params['new'])
1821 new = dec(inpart.params['new'])
1815 # Grab the transaction to ensure that we have the lock before performing the
1822 # Grab the transaction to ensure that we have the lock before performing the
1816 # pushkey.
1823 # pushkey.
1817 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1824 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1818 op.gettransaction()
1825 op.gettransaction()
1819 ret = op.repo.pushkey(namespace, key, old, new)
1826 ret = op.repo.pushkey(namespace, key, old, new)
1820 record = {'namespace': namespace,
1827 record = {'namespace': namespace,
1821 'key': key,
1828 'key': key,
1822 'old': old,
1829 'old': old,
1823 'new': new}
1830 'new': new}
1824 op.records.add('pushkey', record)
1831 op.records.add('pushkey', record)
1825 if op.reply is not None:
1832 if op.reply is not None:
1826 rpart = op.reply.newpart('reply:pushkey')
1833 rpart = op.reply.newpart('reply:pushkey')
1827 rpart.addparam(
1834 rpart.addparam(
1828 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1835 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1829 rpart.addparam('return', '%i' % ret, mandatory=False)
1836 rpart.addparam('return', '%i' % ret, mandatory=False)
1830 if inpart.mandatory and not ret:
1837 if inpart.mandatory and not ret:
1831 kwargs = {}
1838 kwargs = {}
1832 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1839 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1833 if key in inpart.params:
1840 if key in inpart.params:
1834 kwargs[key] = inpart.params[key]
1841 kwargs[key] = inpart.params[key]
1835 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1842 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1836
1843
1837 def _readphaseheads(inpart):
1844 def _readphaseheads(inpart):
1838 headsbyphase = [[] for i in phases.allphases]
1845 headsbyphase = [[] for i in phases.allphases]
1839 entrysize = struct.calcsize(_fphasesentry)
1846 entrysize = struct.calcsize(_fphasesentry)
1840 while True:
1847 while True:
1841 entry = inpart.read(entrysize)
1848 entry = inpart.read(entrysize)
1842 if len(entry) < entrysize:
1849 if len(entry) < entrysize:
1843 if entry:
1850 if entry:
1844 raise error.Abort(_('bad phase-heads bundle part'))
1851 raise error.Abort(_('bad phase-heads bundle part'))
1845 break
1852 break
1846 phase, node = struct.unpack(_fphasesentry, entry)
1853 phase, node = struct.unpack(_fphasesentry, entry)
1847 headsbyphase[phase].append(node)
1854 headsbyphase[phase].append(node)
1848 return headsbyphase
1855 return headsbyphase
1849
1856
1850 @parthandler('phase-heads')
1857 @parthandler('phase-heads')
1851 def handlephases(op, inpart):
1858 def handlephases(op, inpart):
1852 """apply phases from bundle part to repo"""
1859 """apply phases from bundle part to repo"""
1853 headsbyphase = _readphaseheads(inpart)
1860 headsbyphase = _readphaseheads(inpart)
1854 phases.updatephases(op.repo.unfiltered(), op.gettransaction(), headsbyphase)
1861 phases.updatephases(op.repo.unfiltered(), op.gettransaction(), headsbyphase)
1855 op.records.add('phase-heads', {})
1862 op.records.add('phase-heads', {})
1856
1863
1857 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1864 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1858 def handlepushkeyreply(op, inpart):
1865 def handlepushkeyreply(op, inpart):
1859 """retrieve the result of a pushkey request"""
1866 """retrieve the result of a pushkey request"""
1860 ret = int(inpart.params['return'])
1867 ret = int(inpart.params['return'])
1861 partid = int(inpart.params['in-reply-to'])
1868 partid = int(inpart.params['in-reply-to'])
1862 op.records.add('pushkey', {'return': ret}, partid)
1869 op.records.add('pushkey', {'return': ret}, partid)
1863
1870
1864 @parthandler('obsmarkers')
1871 @parthandler('obsmarkers')
1865 def handleobsmarker(op, inpart):
1872 def handleobsmarker(op, inpart):
1866 """add a stream of obsmarkers to the repo"""
1873 """add a stream of obsmarkers to the repo"""
1867 tr = op.gettransaction()
1874 tr = op.gettransaction()
1868 markerdata = inpart.read()
1875 markerdata = inpart.read()
1869 if op.ui.config('experimental', 'obsmarkers-exchange-debug'):
1876 if op.ui.config('experimental', 'obsmarkers-exchange-debug'):
1870 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1877 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1871 % len(markerdata))
1878 % len(markerdata))
1872 # The mergemarkers call will crash if marker creation is not enabled.
1879 # The mergemarkers call will crash if marker creation is not enabled.
1873 # we want to avoid this if the part is advisory.
1880 # we want to avoid this if the part is advisory.
1874 if not inpart.mandatory and op.repo.obsstore.readonly:
1881 if not inpart.mandatory and op.repo.obsstore.readonly:
1875 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1882 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1876 return
1883 return
1877 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1884 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1878 op.repo.invalidatevolatilesets()
1885 op.repo.invalidatevolatilesets()
1879 if new:
1886 if new:
1880 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1887 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1881 op.records.add('obsmarkers', {'new': new})
1888 op.records.add('obsmarkers', {'new': new})
1882 if op.reply is not None:
1889 if op.reply is not None:
1883 rpart = op.reply.newpart('reply:obsmarkers')
1890 rpart = op.reply.newpart('reply:obsmarkers')
1884 rpart.addparam(
1891 rpart.addparam(
1885 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1892 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1886 rpart.addparam('new', '%i' % new, mandatory=False)
1893 rpart.addparam('new', '%i' % new, mandatory=False)
1887
1894
1888
1895
1889 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1896 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1890 def handleobsmarkerreply(op, inpart):
1897 def handleobsmarkerreply(op, inpart):
1891 """retrieve the result of a pushkey request"""
1898 """retrieve the result of a pushkey request"""
1892 ret = int(inpart.params['new'])
1899 ret = int(inpart.params['new'])
1893 partid = int(inpart.params['in-reply-to'])
1900 partid = int(inpart.params['in-reply-to'])
1894 op.records.add('obsmarkers', {'new': ret}, partid)
1901 op.records.add('obsmarkers', {'new': ret}, partid)
1895
1902
1896 @parthandler('hgtagsfnodes')
1903 @parthandler('hgtagsfnodes')
1897 def handlehgtagsfnodes(op, inpart):
1904 def handlehgtagsfnodes(op, inpart):
1898 """Applies .hgtags fnodes cache entries to the local repo.
1905 """Applies .hgtags fnodes cache entries to the local repo.
1899
1906
1900 Payload is pairs of 20 byte changeset nodes and filenodes.
1907 Payload is pairs of 20 byte changeset nodes and filenodes.
1901 """
1908 """
1902 # Grab the transaction so we ensure that we have the lock at this point.
1909 # Grab the transaction so we ensure that we have the lock at this point.
1903 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1910 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1904 op.gettransaction()
1911 op.gettransaction()
1905 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1912 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1906
1913
1907 count = 0
1914 count = 0
1908 while True:
1915 while True:
1909 node = inpart.read(20)
1916 node = inpart.read(20)
1910 fnode = inpart.read(20)
1917 fnode = inpart.read(20)
1911 if len(node) < 20 or len(fnode) < 20:
1918 if len(node) < 20 or len(fnode) < 20:
1912 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1919 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1913 break
1920 break
1914 cache.setfnode(node, fnode)
1921 cache.setfnode(node, fnode)
1915 count += 1
1922 count += 1
1916
1923
1917 cache.write()
1924 cache.write()
1918 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
1925 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
1919
1926
1920 @parthandler('pushvars')
1927 @parthandler('pushvars')
1921 def bundle2getvars(op, part):
1928 def bundle2getvars(op, part):
1922 '''unbundle a bundle2 containing shellvars on the server'''
1929 '''unbundle a bundle2 containing shellvars on the server'''
1923 # An option to disable unbundling on server-side for security reasons
1930 # An option to disable unbundling on server-side for security reasons
1924 if op.ui.configbool('push', 'pushvars.server'):
1931 if op.ui.configbool('push', 'pushvars.server'):
1925 hookargs = {}
1932 hookargs = {}
1926 for key, value in part.advisoryparams:
1933 for key, value in part.advisoryparams:
1927 key = key.upper()
1934 key = key.upper()
1928 # We want pushed variables to have USERVAR_ prepended so we know
1935 # We want pushed variables to have USERVAR_ prepended so we know
1929 # they came from the --pushvar flag.
1936 # they came from the --pushvar flag.
1930 key = "USERVAR_" + key
1937 key = "USERVAR_" + key
1931 hookargs[key] = value
1938 hookargs[key] = value
1932 op.addhookargs(hookargs)
1939 op.addhookargs(hookargs)
General Comments 0
You need to be logged in to leave comments. Login now