##// END OF EJS Templates
bundle2: move part processing to a separate function...
Durham Goode -
r34262:f010097c default
parent child Browse files
Show More
@@ -1,1935 +1,1938 b''
1 # bundle2.py - generic container format to transmit arbitrary data.
1 # bundle2.py - generic container format to transmit arbitrary data.
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7 """Handling of the new bundle2 format
7 """Handling of the new bundle2 format
8
8
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 payloads in an application agnostic way. It consist in a sequence of "parts"
10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 that will be handed to and processed by the application layer.
11 that will be handed to and processed by the application layer.
12
12
13
13
14 General format architecture
14 General format architecture
15 ===========================
15 ===========================
16
16
17 The format is architectured as follow
17 The format is architectured as follow
18
18
19 - magic string
19 - magic string
20 - stream level parameters
20 - stream level parameters
21 - payload parts (any number)
21 - payload parts (any number)
22 - end of stream marker.
22 - end of stream marker.
23
23
24 the Binary format
24 the Binary format
25 ============================
25 ============================
26
26
27 All numbers are unsigned and big-endian.
27 All numbers are unsigned and big-endian.
28
28
29 stream level parameters
29 stream level parameters
30 ------------------------
30 ------------------------
31
31
32 Binary format is as follow
32 Binary format is as follow
33
33
34 :params size: int32
34 :params size: int32
35
35
36 The total number of Bytes used by the parameters
36 The total number of Bytes used by the parameters
37
37
38 :params value: arbitrary number of Bytes
38 :params value: arbitrary number of Bytes
39
39
40 A blob of `params size` containing the serialized version of all stream level
40 A blob of `params size` containing the serialized version of all stream level
41 parameters.
41 parameters.
42
42
43 The blob contains a space separated list of parameters. Parameters with value
43 The blob contains a space separated list of parameters. Parameters with value
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45
45
46 Empty name are obviously forbidden.
46 Empty name are obviously forbidden.
47
47
48 Name MUST start with a letter. If this first letter is lower case, the
48 Name MUST start with a letter. If this first letter is lower case, the
49 parameter is advisory and can be safely ignored. However when the first
49 parameter is advisory and can be safely ignored. However when the first
50 letter is capital, the parameter is mandatory and the bundling process MUST
50 letter is capital, the parameter is mandatory and the bundling process MUST
51 stop if he is not able to proceed it.
51 stop if he is not able to proceed it.
52
52
53 Stream parameters use a simple textual format for two main reasons:
53 Stream parameters use a simple textual format for two main reasons:
54
54
55 - Stream level parameters should remain simple and we want to discourage any
55 - Stream level parameters should remain simple and we want to discourage any
56 crazy usage.
56 crazy usage.
57 - Textual data allow easy human inspection of a bundle2 header in case of
57 - Textual data allow easy human inspection of a bundle2 header in case of
58 troubles.
58 troubles.
59
59
60 Any Applicative level options MUST go into a bundle2 part instead.
60 Any Applicative level options MUST go into a bundle2 part instead.
61
61
62 Payload part
62 Payload part
63 ------------------------
63 ------------------------
64
64
65 Binary format is as follow
65 Binary format is as follow
66
66
67 :header size: int32
67 :header size: int32
68
68
69 The total number of Bytes used by the part header. When the header is empty
69 The total number of Bytes used by the part header. When the header is empty
70 (size = 0) this is interpreted as the end of stream marker.
70 (size = 0) this is interpreted as the end of stream marker.
71
71
72 :header:
72 :header:
73
73
74 The header defines how to interpret the part. It contains two piece of
74 The header defines how to interpret the part. It contains two piece of
75 data: the part type, and the part parameters.
75 data: the part type, and the part parameters.
76
76
77 The part type is used to route an application level handler, that can
77 The part type is used to route an application level handler, that can
78 interpret payload.
78 interpret payload.
79
79
80 Part parameters are passed to the application level handler. They are
80 Part parameters are passed to the application level handler. They are
81 meant to convey information that will help the application level object to
81 meant to convey information that will help the application level object to
82 interpret the part payload.
82 interpret the part payload.
83
83
84 The binary format of the header is has follow
84 The binary format of the header is has follow
85
85
86 :typesize: (one byte)
86 :typesize: (one byte)
87
87
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89
89
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 to this part.
91 to this part.
92
92
93 :parameters:
93 :parameters:
94
94
95 Part's parameter may have arbitrary content, the binary structure is::
95 Part's parameter may have arbitrary content, the binary structure is::
96
96
97 <mandatory-count><advisory-count><param-sizes><param-data>
97 <mandatory-count><advisory-count><param-sizes><param-data>
98
98
99 :mandatory-count: 1 byte, number of mandatory parameters
99 :mandatory-count: 1 byte, number of mandatory parameters
100
100
101 :advisory-count: 1 byte, number of advisory parameters
101 :advisory-count: 1 byte, number of advisory parameters
102
102
103 :param-sizes:
103 :param-sizes:
104
104
105 N couple of bytes, where N is the total number of parameters. Each
105 N couple of bytes, where N is the total number of parameters. Each
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107
107
108 :param-data:
108 :param-data:
109
109
110 A blob of bytes from which each parameter key and value can be
110 A blob of bytes from which each parameter key and value can be
111 retrieved using the list of size couples stored in the previous
111 retrieved using the list of size couples stored in the previous
112 field.
112 field.
113
113
114 Mandatory parameters comes first, then the advisory ones.
114 Mandatory parameters comes first, then the advisory ones.
115
115
116 Each parameter's key MUST be unique within the part.
116 Each parameter's key MUST be unique within the part.
117
117
118 :payload:
118 :payload:
119
119
120 payload is a series of `<chunksize><chunkdata>`.
120 payload is a series of `<chunksize><chunkdata>`.
121
121
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124
124
125 The current implementation always produces either zero or one chunk.
125 The current implementation always produces either zero or one chunk.
126 This is an implementation limitation that will ultimately be lifted.
126 This is an implementation limitation that will ultimately be lifted.
127
127
128 `chunksize` can be negative to trigger special case processing. No such
128 `chunksize` can be negative to trigger special case processing. No such
129 processing is in place yet.
129 processing is in place yet.
130
130
131 Bundle processing
131 Bundle processing
132 ============================
132 ============================
133
133
134 Each part is processed in order using a "part handler". Handler are registered
134 Each part is processed in order using a "part handler". Handler are registered
135 for a certain part type.
135 for a certain part type.
136
136
137 The matching of a part to its handler is case insensitive. The case of the
137 The matching of a part to its handler is case insensitive. The case of the
138 part type is used to know if a part is mandatory or advisory. If the Part type
138 part type is used to know if a part is mandatory or advisory. If the Part type
139 contains any uppercase char it is considered mandatory. When no handler is
139 contains any uppercase char it is considered mandatory. When no handler is
140 known for a Mandatory part, the process is aborted and an exception is raised.
140 known for a Mandatory part, the process is aborted and an exception is raised.
141 If the part is advisory and no handler is known, the part is ignored. When the
141 If the part is advisory and no handler is known, the part is ignored. When the
142 process is aborted, the full bundle is still read from the stream to keep the
142 process is aborted, the full bundle is still read from the stream to keep the
143 channel usable. But none of the part read from an abort are processed. In the
143 channel usable. But none of the part read from an abort are processed. In the
144 future, dropping the stream may become an option for channel we do not care to
144 future, dropping the stream may become an option for channel we do not care to
145 preserve.
145 preserve.
146 """
146 """
147
147
148 from __future__ import absolute_import, division
148 from __future__ import absolute_import, division
149
149
150 import errno
150 import errno
151 import re
151 import re
152 import string
152 import string
153 import struct
153 import struct
154 import sys
154 import sys
155
155
156 from .i18n import _
156 from .i18n import _
157 from . import (
157 from . import (
158 changegroup,
158 changegroup,
159 error,
159 error,
160 obsolete,
160 obsolete,
161 phases,
161 phases,
162 pushkey,
162 pushkey,
163 pycompat,
163 pycompat,
164 tags,
164 tags,
165 url,
165 url,
166 util,
166 util,
167 )
167 )
168
168
169 urlerr = util.urlerr
169 urlerr = util.urlerr
170 urlreq = util.urlreq
170 urlreq = util.urlreq
171
171
172 _pack = struct.pack
172 _pack = struct.pack
173 _unpack = struct.unpack
173 _unpack = struct.unpack
174
174
175 _fstreamparamsize = '>i'
175 _fstreamparamsize = '>i'
176 _fpartheadersize = '>i'
176 _fpartheadersize = '>i'
177 _fparttypesize = '>B'
177 _fparttypesize = '>B'
178 _fpartid = '>I'
178 _fpartid = '>I'
179 _fpayloadsize = '>i'
179 _fpayloadsize = '>i'
180 _fpartparamcount = '>BB'
180 _fpartparamcount = '>BB'
181
181
182 _fphasesentry = '>i20s'
182 _fphasesentry = '>i20s'
183
183
184 preferedchunksize = 4096
184 preferedchunksize = 4096
185
185
186 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
186 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
187
187
188 def outdebug(ui, message):
188 def outdebug(ui, message):
189 """debug regarding output stream (bundling)"""
189 """debug regarding output stream (bundling)"""
190 if ui.configbool('devel', 'bundle2.debug'):
190 if ui.configbool('devel', 'bundle2.debug'):
191 ui.debug('bundle2-output: %s\n' % message)
191 ui.debug('bundle2-output: %s\n' % message)
192
192
193 def indebug(ui, message):
193 def indebug(ui, message):
194 """debug on input stream (unbundling)"""
194 """debug on input stream (unbundling)"""
195 if ui.configbool('devel', 'bundle2.debug'):
195 if ui.configbool('devel', 'bundle2.debug'):
196 ui.debug('bundle2-input: %s\n' % message)
196 ui.debug('bundle2-input: %s\n' % message)
197
197
198 def validateparttype(parttype):
198 def validateparttype(parttype):
199 """raise ValueError if a parttype contains invalid character"""
199 """raise ValueError if a parttype contains invalid character"""
200 if _parttypeforbidden.search(parttype):
200 if _parttypeforbidden.search(parttype):
201 raise ValueError(parttype)
201 raise ValueError(parttype)
202
202
203 def _makefpartparamsizes(nbparams):
203 def _makefpartparamsizes(nbparams):
204 """return a struct format to read part parameter sizes
204 """return a struct format to read part parameter sizes
205
205
206 The number parameters is variable so we need to build that format
206 The number parameters is variable so we need to build that format
207 dynamically.
207 dynamically.
208 """
208 """
209 return '>'+('BB'*nbparams)
209 return '>'+('BB'*nbparams)
210
210
211 parthandlermapping = {}
211 parthandlermapping = {}
212
212
213 def parthandler(parttype, params=()):
213 def parthandler(parttype, params=()):
214 """decorator that register a function as a bundle2 part handler
214 """decorator that register a function as a bundle2 part handler
215
215
216 eg::
216 eg::
217
217
218 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
218 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
219 def myparttypehandler(...):
219 def myparttypehandler(...):
220 '''process a part of type "my part".'''
220 '''process a part of type "my part".'''
221 ...
221 ...
222 """
222 """
223 validateparttype(parttype)
223 validateparttype(parttype)
224 def _decorator(func):
224 def _decorator(func):
225 lparttype = parttype.lower() # enforce lower case matching.
225 lparttype = parttype.lower() # enforce lower case matching.
226 assert lparttype not in parthandlermapping
226 assert lparttype not in parthandlermapping
227 parthandlermapping[lparttype] = func
227 parthandlermapping[lparttype] = func
228 func.params = frozenset(params)
228 func.params = frozenset(params)
229 return func
229 return func
230 return _decorator
230 return _decorator
231
231
232 class unbundlerecords(object):
232 class unbundlerecords(object):
233 """keep record of what happens during and unbundle
233 """keep record of what happens during and unbundle
234
234
235 New records are added using `records.add('cat', obj)`. Where 'cat' is a
235 New records are added using `records.add('cat', obj)`. Where 'cat' is a
236 category of record and obj is an arbitrary object.
236 category of record and obj is an arbitrary object.
237
237
238 `records['cat']` will return all entries of this category 'cat'.
238 `records['cat']` will return all entries of this category 'cat'.
239
239
240 Iterating on the object itself will yield `('category', obj)` tuples
240 Iterating on the object itself will yield `('category', obj)` tuples
241 for all entries.
241 for all entries.
242
242
243 All iterations happens in chronological order.
243 All iterations happens in chronological order.
244 """
244 """
245
245
246 def __init__(self):
246 def __init__(self):
247 self._categories = {}
247 self._categories = {}
248 self._sequences = []
248 self._sequences = []
249 self._replies = {}
249 self._replies = {}
250
250
251 def add(self, category, entry, inreplyto=None):
251 def add(self, category, entry, inreplyto=None):
252 """add a new record of a given category.
252 """add a new record of a given category.
253
253
254 The entry can then be retrieved in the list returned by
254 The entry can then be retrieved in the list returned by
255 self['category']."""
255 self['category']."""
256 self._categories.setdefault(category, []).append(entry)
256 self._categories.setdefault(category, []).append(entry)
257 self._sequences.append((category, entry))
257 self._sequences.append((category, entry))
258 if inreplyto is not None:
258 if inreplyto is not None:
259 self.getreplies(inreplyto).add(category, entry)
259 self.getreplies(inreplyto).add(category, entry)
260
260
261 def getreplies(self, partid):
261 def getreplies(self, partid):
262 """get the records that are replies to a specific part"""
262 """get the records that are replies to a specific part"""
263 return self._replies.setdefault(partid, unbundlerecords())
263 return self._replies.setdefault(partid, unbundlerecords())
264
264
265 def __getitem__(self, cat):
265 def __getitem__(self, cat):
266 return tuple(self._categories.get(cat, ()))
266 return tuple(self._categories.get(cat, ()))
267
267
268 def __iter__(self):
268 def __iter__(self):
269 return iter(self._sequences)
269 return iter(self._sequences)
270
270
271 def __len__(self):
271 def __len__(self):
272 return len(self._sequences)
272 return len(self._sequences)
273
273
274 def __nonzero__(self):
274 def __nonzero__(self):
275 return bool(self._sequences)
275 return bool(self._sequences)
276
276
277 __bool__ = __nonzero__
277 __bool__ = __nonzero__
278
278
279 class bundleoperation(object):
279 class bundleoperation(object):
280 """an object that represents a single bundling process
280 """an object that represents a single bundling process
281
281
282 Its purpose is to carry unbundle-related objects and states.
282 Its purpose is to carry unbundle-related objects and states.
283
283
284 A new object should be created at the beginning of each bundle processing.
284 A new object should be created at the beginning of each bundle processing.
285 The object is to be returned by the processing function.
285 The object is to be returned by the processing function.
286
286
287 The object has very little content now it will ultimately contain:
287 The object has very little content now it will ultimately contain:
288 * an access to the repo the bundle is applied to,
288 * an access to the repo the bundle is applied to,
289 * a ui object,
289 * a ui object,
290 * a way to retrieve a transaction to add changes to the repo,
290 * a way to retrieve a transaction to add changes to the repo,
291 * a way to record the result of processing each part,
291 * a way to record the result of processing each part,
292 * a way to construct a bundle response when applicable.
292 * a way to construct a bundle response when applicable.
293 """
293 """
294
294
295 def __init__(self, repo, transactiongetter, captureoutput=True):
295 def __init__(self, repo, transactiongetter, captureoutput=True):
296 self.repo = repo
296 self.repo = repo
297 self.ui = repo.ui
297 self.ui = repo.ui
298 self.records = unbundlerecords()
298 self.records = unbundlerecords()
299 self.reply = None
299 self.reply = None
300 self.captureoutput = captureoutput
300 self.captureoutput = captureoutput
301 self.hookargs = {}
301 self.hookargs = {}
302 self._gettransaction = transactiongetter
302 self._gettransaction = transactiongetter
303
303
304 def gettransaction(self):
304 def gettransaction(self):
305 transaction = self._gettransaction()
305 transaction = self._gettransaction()
306
306
307 if self.hookargs:
307 if self.hookargs:
308 # the ones added to the transaction supercede those added
308 # the ones added to the transaction supercede those added
309 # to the operation.
309 # to the operation.
310 self.hookargs.update(transaction.hookargs)
310 self.hookargs.update(transaction.hookargs)
311 transaction.hookargs = self.hookargs
311 transaction.hookargs = self.hookargs
312
312
313 # mark the hookargs as flushed. further attempts to add to
313 # mark the hookargs as flushed. further attempts to add to
314 # hookargs will result in an abort.
314 # hookargs will result in an abort.
315 self.hookargs = None
315 self.hookargs = None
316
316
317 return transaction
317 return transaction
318
318
319 def addhookargs(self, hookargs):
319 def addhookargs(self, hookargs):
320 if self.hookargs is None:
320 if self.hookargs is None:
321 raise error.ProgrammingError('attempted to add hookargs to '
321 raise error.ProgrammingError('attempted to add hookargs to '
322 'operation after transaction started')
322 'operation after transaction started')
323 self.hookargs.update(hookargs)
323 self.hookargs.update(hookargs)
324
324
325 class TransactionUnavailable(RuntimeError):
325 class TransactionUnavailable(RuntimeError):
326 pass
326 pass
327
327
328 def _notransaction():
328 def _notransaction():
329 """default method to get a transaction while processing a bundle
329 """default method to get a transaction while processing a bundle
330
330
331 Raise an exception to highlight the fact that no transaction was expected
331 Raise an exception to highlight the fact that no transaction was expected
332 to be created"""
332 to be created"""
333 raise TransactionUnavailable()
333 raise TransactionUnavailable()
334
334
335 def applybundle(repo, unbundler, tr, source=None, url=None, **kwargs):
335 def applybundle(repo, unbundler, tr, source=None, url=None, **kwargs):
336 # transform me into unbundler.apply() as soon as the freeze is lifted
336 # transform me into unbundler.apply() as soon as the freeze is lifted
337 if isinstance(unbundler, unbundle20):
337 if isinstance(unbundler, unbundle20):
338 tr.hookargs['bundle2'] = '1'
338 tr.hookargs['bundle2'] = '1'
339 if source is not None and 'source' not in tr.hookargs:
339 if source is not None and 'source' not in tr.hookargs:
340 tr.hookargs['source'] = source
340 tr.hookargs['source'] = source
341 if url is not None and 'url' not in tr.hookargs:
341 if url is not None and 'url' not in tr.hookargs:
342 tr.hookargs['url'] = url
342 tr.hookargs['url'] = url
343 return processbundle(repo, unbundler, lambda: tr)
343 return processbundle(repo, unbundler, lambda: tr)
344 else:
344 else:
345 # the transactiongetter won't be used, but we might as well set it
345 # the transactiongetter won't be used, but we might as well set it
346 op = bundleoperation(repo, lambda: tr)
346 op = bundleoperation(repo, lambda: tr)
347 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
347 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
348 return op
348 return op
349
349
350 class partiterator(object):
350 class partiterator(object):
351 def __init__(self, repo, op, unbundler):
351 def __init__(self, repo, op, unbundler):
352 self.repo = repo
352 self.repo = repo
353 self.op = op
353 self.op = op
354 self.unbundler = unbundler
354 self.unbundler = unbundler
355 self.iterator = None
355 self.iterator = None
356 self.count = 0
356 self.count = 0
357 self.current = None
357 self.current = None
358
358
359 def __enter__(self):
359 def __enter__(self):
360 def func():
360 def func():
361 itr = enumerate(self.unbundler.iterparts())
361 itr = enumerate(self.unbundler.iterparts())
362 for count, p in itr:
362 for count, p in itr:
363 self.count = count
363 self.count = count
364 self.current = p
364 self.current = p
365 yield p
365 yield p
366 p.seek(0, 2)
366 p.seek(0, 2)
367 self.current = None
367 self.current = None
368 self.iterator = func()
368 self.iterator = func()
369 return self.iterator
369 return self.iterator
370
370
371 def __exit__(self, type, exc, tb):
371 def __exit__(self, type, exc, tb):
372 if not self.iterator:
372 if not self.iterator:
373 return
373 return
374
374
375 if exc:
375 if exc:
376 # If exiting or interrupted, do not attempt to seek the stream in
376 # If exiting or interrupted, do not attempt to seek the stream in
377 # the finally block below. This makes abort faster.
377 # the finally block below. This makes abort faster.
378 if (self.current and
378 if (self.current and
379 not isinstance(exc, (SystemExit, KeyboardInterrupt))):
379 not isinstance(exc, (SystemExit, KeyboardInterrupt))):
380 # consume the part content to not corrupt the stream.
380 # consume the part content to not corrupt the stream.
381 self.current.seek(0, 2)
381 self.current.seek(0, 2)
382
382
383 # Any exceptions seeking to the end of the bundle at this point are
383 # Any exceptions seeking to the end of the bundle at this point are
384 # almost certainly related to the underlying stream being bad.
384 # almost certainly related to the underlying stream being bad.
385 # And, chances are that the exception we're handling is related to
385 # And, chances are that the exception we're handling is related to
386 # getting in that bad state. So, we swallow the seeking error and
386 # getting in that bad state. So, we swallow the seeking error and
387 # re-raise the original error.
387 # re-raise the original error.
388 seekerror = False
388 seekerror = False
389 try:
389 try:
390 for part in self.iterator:
390 for part in self.iterator:
391 # consume the bundle content
391 # consume the bundle content
392 part.seek(0, 2)
392 part.seek(0, 2)
393 except Exception:
393 except Exception:
394 seekerror = True
394 seekerror = True
395
395
396 # Small hack to let caller code distinguish exceptions from bundle2
396 # Small hack to let caller code distinguish exceptions from bundle2
397 # processing from processing the old format. This is mostly needed
397 # processing from processing the old format. This is mostly needed
398 # to handle different return codes to unbundle according to the type
398 # to handle different return codes to unbundle according to the type
399 # of bundle. We should probably clean up or drop this return code
399 # of bundle. We should probably clean up or drop this return code
400 # craziness in a future version.
400 # craziness in a future version.
401 exc.duringunbundle2 = True
401 exc.duringunbundle2 = True
402 salvaged = []
402 salvaged = []
403 replycaps = None
403 replycaps = None
404 if self.op.reply is not None:
404 if self.op.reply is not None:
405 salvaged = self.op.reply.salvageoutput()
405 salvaged = self.op.reply.salvageoutput()
406 replycaps = self.op.reply.capabilities
406 replycaps = self.op.reply.capabilities
407 exc._replycaps = replycaps
407 exc._replycaps = replycaps
408 exc._bundle2salvagedoutput = salvaged
408 exc._bundle2salvagedoutput = salvaged
409
409
410 # Re-raising from a variable loses the original stack. So only use
410 # Re-raising from a variable loses the original stack. So only use
411 # that form if we need to.
411 # that form if we need to.
412 if seekerror:
412 if seekerror:
413 raise exc
413 raise exc
414
414
415 self.repo.ui.debug('bundle2-input-bundle: %i parts total\n' %
415 self.repo.ui.debug('bundle2-input-bundle: %i parts total\n' %
416 self.count)
416 self.count)
417
417
418 def processbundle(repo, unbundler, transactiongetter=None, op=None):
418 def processbundle(repo, unbundler, transactiongetter=None, op=None):
419 """This function process a bundle, apply effect to/from a repo
419 """This function process a bundle, apply effect to/from a repo
420
420
421 It iterates over each part then searches for and uses the proper handling
421 It iterates over each part then searches for and uses the proper handling
422 code to process the part. Parts are processed in order.
422 code to process the part. Parts are processed in order.
423
423
424 Unknown Mandatory part will abort the process.
424 Unknown Mandatory part will abort the process.
425
425
426 It is temporarily possible to provide a prebuilt bundleoperation to the
426 It is temporarily possible to provide a prebuilt bundleoperation to the
427 function. This is used to ensure output is properly propagated in case of
427 function. This is used to ensure output is properly propagated in case of
428 an error during the unbundling. This output capturing part will likely be
428 an error during the unbundling. This output capturing part will likely be
429 reworked and this ability will probably go away in the process.
429 reworked and this ability will probably go away in the process.
430 """
430 """
431 if op is None:
431 if op is None:
432 if transactiongetter is None:
432 if transactiongetter is None:
433 transactiongetter = _notransaction
433 transactiongetter = _notransaction
434 op = bundleoperation(repo, transactiongetter)
434 op = bundleoperation(repo, transactiongetter)
435 # todo:
435 # todo:
436 # - replace this is a init function soon.
436 # - replace this is a init function soon.
437 # - exception catching
437 # - exception catching
438 unbundler.params
438 unbundler.params
439 if repo.ui.debugflag:
439 if repo.ui.debugflag:
440 msg = ['bundle2-input-bundle:']
440 msg = ['bundle2-input-bundle:']
441 if unbundler.params:
441 if unbundler.params:
442 msg.append(' %i params' % len(unbundler.params))
442 msg.append(' %i params' % len(unbundler.params))
443 if op._gettransaction is None or op._gettransaction is _notransaction:
443 if op._gettransaction is None or op._gettransaction is _notransaction:
444 msg.append(' no-transaction')
444 msg.append(' no-transaction')
445 else:
445 else:
446 msg.append(' with-transaction')
446 msg.append(' with-transaction')
447 msg.append('\n')
447 msg.append('\n')
448 repo.ui.debug(''.join(msg))
448 repo.ui.debug(''.join(msg))
449
449
450 processparts(repo, op, unbundler)
451
452 return op
453
454 def processparts(repo, op, unbundler):
450 with partiterator(repo, op, unbundler) as parts:
455 with partiterator(repo, op, unbundler) as parts:
451 for part in parts:
456 for part in parts:
452 _processpart(op, part)
457 _processpart(op, part)
453
458
454 return op
455
456 def _processchangegroup(op, cg, tr, source, url, **kwargs):
459 def _processchangegroup(op, cg, tr, source, url, **kwargs):
457 ret = cg.apply(op.repo, tr, source, url, **kwargs)
460 ret = cg.apply(op.repo, tr, source, url, **kwargs)
458 op.records.add('changegroup', {
461 op.records.add('changegroup', {
459 'return': ret,
462 'return': ret,
460 })
463 })
461 return ret
464 return ret
462
465
463 def _gethandler(op, part):
466 def _gethandler(op, part):
464 status = 'unknown' # used by debug output
467 status = 'unknown' # used by debug output
465 try:
468 try:
466 handler = parthandlermapping.get(part.type)
469 handler = parthandlermapping.get(part.type)
467 if handler is None:
470 if handler is None:
468 status = 'unsupported-type'
471 status = 'unsupported-type'
469 raise error.BundleUnknownFeatureError(parttype=part.type)
472 raise error.BundleUnknownFeatureError(parttype=part.type)
470 indebug(op.ui, 'found a handler for part %r' % part.type)
473 indebug(op.ui, 'found a handler for part %r' % part.type)
471 unknownparams = part.mandatorykeys - handler.params
474 unknownparams = part.mandatorykeys - handler.params
472 if unknownparams:
475 if unknownparams:
473 unknownparams = list(unknownparams)
476 unknownparams = list(unknownparams)
474 unknownparams.sort()
477 unknownparams.sort()
475 status = 'unsupported-params (%s)' % unknownparams
478 status = 'unsupported-params (%s)' % unknownparams
476 raise error.BundleUnknownFeatureError(parttype=part.type,
479 raise error.BundleUnknownFeatureError(parttype=part.type,
477 params=unknownparams)
480 params=unknownparams)
478 status = 'supported'
481 status = 'supported'
479 except error.BundleUnknownFeatureError as exc:
482 except error.BundleUnknownFeatureError as exc:
480 if part.mandatory: # mandatory parts
483 if part.mandatory: # mandatory parts
481 raise
484 raise
482 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
485 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
483 return # skip to part processing
486 return # skip to part processing
484 finally:
487 finally:
485 if op.ui.debugflag:
488 if op.ui.debugflag:
486 msg = ['bundle2-input-part: "%s"' % part.type]
489 msg = ['bundle2-input-part: "%s"' % part.type]
487 if not part.mandatory:
490 if not part.mandatory:
488 msg.append(' (advisory)')
491 msg.append(' (advisory)')
489 nbmp = len(part.mandatorykeys)
492 nbmp = len(part.mandatorykeys)
490 nbap = len(part.params) - nbmp
493 nbap = len(part.params) - nbmp
491 if nbmp or nbap:
494 if nbmp or nbap:
492 msg.append(' (params:')
495 msg.append(' (params:')
493 if nbmp:
496 if nbmp:
494 msg.append(' %i mandatory' % nbmp)
497 msg.append(' %i mandatory' % nbmp)
495 if nbap:
498 if nbap:
496 msg.append(' %i advisory' % nbmp)
499 msg.append(' %i advisory' % nbmp)
497 msg.append(')')
500 msg.append(')')
498 msg.append(' %s\n' % status)
501 msg.append(' %s\n' % status)
499 op.ui.debug(''.join(msg))
502 op.ui.debug(''.join(msg))
500
503
501 return handler
504 return handler
502
505
503 def _processpart(op, part):
506 def _processpart(op, part):
504 """process a single part from a bundle
507 """process a single part from a bundle
505
508
506 The part is guaranteed to have been fully consumed when the function exits
509 The part is guaranteed to have been fully consumed when the function exits
507 (even if an exception is raised)."""
510 (even if an exception is raised)."""
508 handler = _gethandler(op, part)
511 handler = _gethandler(op, part)
509 if handler is None:
512 if handler is None:
510 return
513 return
511
514
512 # handler is called outside the above try block so that we don't
515 # handler is called outside the above try block so that we don't
513 # risk catching KeyErrors from anything other than the
516 # risk catching KeyErrors from anything other than the
514 # parthandlermapping lookup (any KeyError raised by handler()
517 # parthandlermapping lookup (any KeyError raised by handler()
515 # itself represents a defect of a different variety).
518 # itself represents a defect of a different variety).
516 output = None
519 output = None
517 if op.captureoutput and op.reply is not None:
520 if op.captureoutput and op.reply is not None:
518 op.ui.pushbuffer(error=True, subproc=True)
521 op.ui.pushbuffer(error=True, subproc=True)
519 output = ''
522 output = ''
520 try:
523 try:
521 handler(op, part)
524 handler(op, part)
522 finally:
525 finally:
523 if output is not None:
526 if output is not None:
524 output = op.ui.popbuffer()
527 output = op.ui.popbuffer()
525 if output:
528 if output:
526 outpart = op.reply.newpart('output', data=output,
529 outpart = op.reply.newpart('output', data=output,
527 mandatory=False)
530 mandatory=False)
528 outpart.addparam(
531 outpart.addparam(
529 'in-reply-to', pycompat.bytestr(part.id), mandatory=False)
532 'in-reply-to', pycompat.bytestr(part.id), mandatory=False)
530
533
531 def decodecaps(blob):
534 def decodecaps(blob):
532 """decode a bundle2 caps bytes blob into a dictionary
535 """decode a bundle2 caps bytes blob into a dictionary
533
536
534 The blob is a list of capabilities (one per line)
537 The blob is a list of capabilities (one per line)
535 Capabilities may have values using a line of the form::
538 Capabilities may have values using a line of the form::
536
539
537 capability=value1,value2,value3
540 capability=value1,value2,value3
538
541
539 The values are always a list."""
542 The values are always a list."""
540 caps = {}
543 caps = {}
541 for line in blob.splitlines():
544 for line in blob.splitlines():
542 if not line:
545 if not line:
543 continue
546 continue
544 if '=' not in line:
547 if '=' not in line:
545 key, vals = line, ()
548 key, vals = line, ()
546 else:
549 else:
547 key, vals = line.split('=', 1)
550 key, vals = line.split('=', 1)
548 vals = vals.split(',')
551 vals = vals.split(',')
549 key = urlreq.unquote(key)
552 key = urlreq.unquote(key)
550 vals = [urlreq.unquote(v) for v in vals]
553 vals = [urlreq.unquote(v) for v in vals]
551 caps[key] = vals
554 caps[key] = vals
552 return caps
555 return caps
553
556
554 def encodecaps(caps):
557 def encodecaps(caps):
555 """encode a bundle2 caps dictionary into a bytes blob"""
558 """encode a bundle2 caps dictionary into a bytes blob"""
556 chunks = []
559 chunks = []
557 for ca in sorted(caps):
560 for ca in sorted(caps):
558 vals = caps[ca]
561 vals = caps[ca]
559 ca = urlreq.quote(ca)
562 ca = urlreq.quote(ca)
560 vals = [urlreq.quote(v) for v in vals]
563 vals = [urlreq.quote(v) for v in vals]
561 if vals:
564 if vals:
562 ca = "%s=%s" % (ca, ','.join(vals))
565 ca = "%s=%s" % (ca, ','.join(vals))
563 chunks.append(ca)
566 chunks.append(ca)
564 return '\n'.join(chunks)
567 return '\n'.join(chunks)
565
568
566 bundletypes = {
569 bundletypes = {
567 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
570 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
568 # since the unification ssh accepts a header but there
571 # since the unification ssh accepts a header but there
569 # is no capability signaling it.
572 # is no capability signaling it.
570 "HG20": (), # special-cased below
573 "HG20": (), # special-cased below
571 "HG10UN": ("HG10UN", 'UN'),
574 "HG10UN": ("HG10UN", 'UN'),
572 "HG10BZ": ("HG10", 'BZ'),
575 "HG10BZ": ("HG10", 'BZ'),
573 "HG10GZ": ("HG10GZ", 'GZ'),
576 "HG10GZ": ("HG10GZ", 'GZ'),
574 }
577 }
575
578
576 # hgweb uses this list to communicate its preferred type
579 # hgweb uses this list to communicate its preferred type
577 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
580 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
578
581
579 class bundle20(object):
582 class bundle20(object):
580 """represent an outgoing bundle2 container
583 """represent an outgoing bundle2 container
581
584
582 Use the `addparam` method to add stream level parameter. and `newpart` to
585 Use the `addparam` method to add stream level parameter. and `newpart` to
583 populate it. Then call `getchunks` to retrieve all the binary chunks of
586 populate it. Then call `getchunks` to retrieve all the binary chunks of
584 data that compose the bundle2 container."""
587 data that compose the bundle2 container."""
585
588
586 _magicstring = 'HG20'
589 _magicstring = 'HG20'
587
590
588 def __init__(self, ui, capabilities=()):
591 def __init__(self, ui, capabilities=()):
589 self.ui = ui
592 self.ui = ui
590 self._params = []
593 self._params = []
591 self._parts = []
594 self._parts = []
592 self.capabilities = dict(capabilities)
595 self.capabilities = dict(capabilities)
593 self._compengine = util.compengines.forbundletype('UN')
596 self._compengine = util.compengines.forbundletype('UN')
594 self._compopts = None
597 self._compopts = None
595
598
596 def setcompression(self, alg, compopts=None):
599 def setcompression(self, alg, compopts=None):
597 """setup core part compression to <alg>"""
600 """setup core part compression to <alg>"""
598 if alg in (None, 'UN'):
601 if alg in (None, 'UN'):
599 return
602 return
600 assert not any(n.lower() == 'compression' for n, v in self._params)
603 assert not any(n.lower() == 'compression' for n, v in self._params)
601 self.addparam('Compression', alg)
604 self.addparam('Compression', alg)
602 self._compengine = util.compengines.forbundletype(alg)
605 self._compengine = util.compengines.forbundletype(alg)
603 self._compopts = compopts
606 self._compopts = compopts
604
607
605 @property
608 @property
606 def nbparts(self):
609 def nbparts(self):
607 """total number of parts added to the bundler"""
610 """total number of parts added to the bundler"""
608 return len(self._parts)
611 return len(self._parts)
609
612
610 # methods used to defines the bundle2 content
613 # methods used to defines the bundle2 content
611 def addparam(self, name, value=None):
614 def addparam(self, name, value=None):
612 """add a stream level parameter"""
615 """add a stream level parameter"""
613 if not name:
616 if not name:
614 raise ValueError('empty parameter name')
617 raise ValueError('empty parameter name')
615 if name[0] not in pycompat.bytestr(string.ascii_letters):
618 if name[0] not in pycompat.bytestr(string.ascii_letters):
616 raise ValueError('non letter first character: %r' % name)
619 raise ValueError('non letter first character: %r' % name)
617 self._params.append((name, value))
620 self._params.append((name, value))
618
621
619 def addpart(self, part):
622 def addpart(self, part):
620 """add a new part to the bundle2 container
623 """add a new part to the bundle2 container
621
624
622 Parts contains the actual applicative payload."""
625 Parts contains the actual applicative payload."""
623 assert part.id is None
626 assert part.id is None
624 part.id = len(self._parts) # very cheap counter
627 part.id = len(self._parts) # very cheap counter
625 self._parts.append(part)
628 self._parts.append(part)
626
629
627 def newpart(self, typeid, *args, **kwargs):
630 def newpart(self, typeid, *args, **kwargs):
628 """create a new part and add it to the containers
631 """create a new part and add it to the containers
629
632
630 As the part is directly added to the containers. For now, this means
633 As the part is directly added to the containers. For now, this means
631 that any failure to properly initialize the part after calling
634 that any failure to properly initialize the part after calling
632 ``newpart`` should result in a failure of the whole bundling process.
635 ``newpart`` should result in a failure of the whole bundling process.
633
636
634 You can still fall back to manually create and add if you need better
637 You can still fall back to manually create and add if you need better
635 control."""
638 control."""
636 part = bundlepart(typeid, *args, **kwargs)
639 part = bundlepart(typeid, *args, **kwargs)
637 self.addpart(part)
640 self.addpart(part)
638 return part
641 return part
639
642
640 # methods used to generate the bundle2 stream
643 # methods used to generate the bundle2 stream
641 def getchunks(self):
644 def getchunks(self):
642 if self.ui.debugflag:
645 if self.ui.debugflag:
643 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
646 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
644 if self._params:
647 if self._params:
645 msg.append(' (%i params)' % len(self._params))
648 msg.append(' (%i params)' % len(self._params))
646 msg.append(' %i parts total\n' % len(self._parts))
649 msg.append(' %i parts total\n' % len(self._parts))
647 self.ui.debug(''.join(msg))
650 self.ui.debug(''.join(msg))
648 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
651 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
649 yield self._magicstring
652 yield self._magicstring
650 param = self._paramchunk()
653 param = self._paramchunk()
651 outdebug(self.ui, 'bundle parameter: %s' % param)
654 outdebug(self.ui, 'bundle parameter: %s' % param)
652 yield _pack(_fstreamparamsize, len(param))
655 yield _pack(_fstreamparamsize, len(param))
653 if param:
656 if param:
654 yield param
657 yield param
655 for chunk in self._compengine.compressstream(self._getcorechunk(),
658 for chunk in self._compengine.compressstream(self._getcorechunk(),
656 self._compopts):
659 self._compopts):
657 yield chunk
660 yield chunk
658
661
659 def _paramchunk(self):
662 def _paramchunk(self):
660 """return a encoded version of all stream parameters"""
663 """return a encoded version of all stream parameters"""
661 blocks = []
664 blocks = []
662 for par, value in self._params:
665 for par, value in self._params:
663 par = urlreq.quote(par)
666 par = urlreq.quote(par)
664 if value is not None:
667 if value is not None:
665 value = urlreq.quote(value)
668 value = urlreq.quote(value)
666 par = '%s=%s' % (par, value)
669 par = '%s=%s' % (par, value)
667 blocks.append(par)
670 blocks.append(par)
668 return ' '.join(blocks)
671 return ' '.join(blocks)
669
672
670 def _getcorechunk(self):
673 def _getcorechunk(self):
671 """yield chunk for the core part of the bundle
674 """yield chunk for the core part of the bundle
672
675
673 (all but headers and parameters)"""
676 (all but headers and parameters)"""
674 outdebug(self.ui, 'start of parts')
677 outdebug(self.ui, 'start of parts')
675 for part in self._parts:
678 for part in self._parts:
676 outdebug(self.ui, 'bundle part: "%s"' % part.type)
679 outdebug(self.ui, 'bundle part: "%s"' % part.type)
677 for chunk in part.getchunks(ui=self.ui):
680 for chunk in part.getchunks(ui=self.ui):
678 yield chunk
681 yield chunk
679 outdebug(self.ui, 'end of bundle')
682 outdebug(self.ui, 'end of bundle')
680 yield _pack(_fpartheadersize, 0)
683 yield _pack(_fpartheadersize, 0)
681
684
682
685
683 def salvageoutput(self):
686 def salvageoutput(self):
684 """return a list with a copy of all output parts in the bundle
687 """return a list with a copy of all output parts in the bundle
685
688
686 This is meant to be used during error handling to make sure we preserve
689 This is meant to be used during error handling to make sure we preserve
687 server output"""
690 server output"""
688 salvaged = []
691 salvaged = []
689 for part in self._parts:
692 for part in self._parts:
690 if part.type.startswith('output'):
693 if part.type.startswith('output'):
691 salvaged.append(part.copy())
694 salvaged.append(part.copy())
692 return salvaged
695 return salvaged
693
696
694
697
695 class unpackermixin(object):
698 class unpackermixin(object):
696 """A mixin to extract bytes and struct data from a stream"""
699 """A mixin to extract bytes and struct data from a stream"""
697
700
698 def __init__(self, fp):
701 def __init__(self, fp):
699 self._fp = fp
702 self._fp = fp
700
703
701 def _unpack(self, format):
704 def _unpack(self, format):
702 """unpack this struct format from the stream
705 """unpack this struct format from the stream
703
706
704 This method is meant for internal usage by the bundle2 protocol only.
707 This method is meant for internal usage by the bundle2 protocol only.
705 They directly manipulate the low level stream including bundle2 level
708 They directly manipulate the low level stream including bundle2 level
706 instruction.
709 instruction.
707
710
708 Do not use it to implement higher-level logic or methods."""
711 Do not use it to implement higher-level logic or methods."""
709 data = self._readexact(struct.calcsize(format))
712 data = self._readexact(struct.calcsize(format))
710 return _unpack(format, data)
713 return _unpack(format, data)
711
714
712 def _readexact(self, size):
715 def _readexact(self, size):
713 """read exactly <size> bytes from the stream
716 """read exactly <size> bytes from the stream
714
717
715 This method is meant for internal usage by the bundle2 protocol only.
718 This method is meant for internal usage by the bundle2 protocol only.
716 They directly manipulate the low level stream including bundle2 level
719 They directly manipulate the low level stream including bundle2 level
717 instruction.
720 instruction.
718
721
719 Do not use it to implement higher-level logic or methods."""
722 Do not use it to implement higher-level logic or methods."""
720 return changegroup.readexactly(self._fp, size)
723 return changegroup.readexactly(self._fp, size)
721
724
722 def getunbundler(ui, fp, magicstring=None):
725 def getunbundler(ui, fp, magicstring=None):
723 """return a valid unbundler object for a given magicstring"""
726 """return a valid unbundler object for a given magicstring"""
724 if magicstring is None:
727 if magicstring is None:
725 magicstring = changegroup.readexactly(fp, 4)
728 magicstring = changegroup.readexactly(fp, 4)
726 magic, version = magicstring[0:2], magicstring[2:4]
729 magic, version = magicstring[0:2], magicstring[2:4]
727 if magic != 'HG':
730 if magic != 'HG':
728 ui.debug(
731 ui.debug(
729 "error: invalid magic: %r (version %r), should be 'HG'\n"
732 "error: invalid magic: %r (version %r), should be 'HG'\n"
730 % (magic, version))
733 % (magic, version))
731 raise error.Abort(_('not a Mercurial bundle'))
734 raise error.Abort(_('not a Mercurial bundle'))
732 unbundlerclass = formatmap.get(version)
735 unbundlerclass = formatmap.get(version)
733 if unbundlerclass is None:
736 if unbundlerclass is None:
734 raise error.Abort(_('unknown bundle version %s') % version)
737 raise error.Abort(_('unknown bundle version %s') % version)
735 unbundler = unbundlerclass(ui, fp)
738 unbundler = unbundlerclass(ui, fp)
736 indebug(ui, 'start processing of %s stream' % magicstring)
739 indebug(ui, 'start processing of %s stream' % magicstring)
737 return unbundler
740 return unbundler
738
741
739 class unbundle20(unpackermixin):
742 class unbundle20(unpackermixin):
740 """interpret a bundle2 stream
743 """interpret a bundle2 stream
741
744
742 This class is fed with a binary stream and yields parts through its
745 This class is fed with a binary stream and yields parts through its
743 `iterparts` methods."""
746 `iterparts` methods."""
744
747
745 _magicstring = 'HG20'
748 _magicstring = 'HG20'
746
749
747 def __init__(self, ui, fp):
750 def __init__(self, ui, fp):
748 """If header is specified, we do not read it out of the stream."""
751 """If header is specified, we do not read it out of the stream."""
749 self.ui = ui
752 self.ui = ui
750 self._compengine = util.compengines.forbundletype('UN')
753 self._compengine = util.compengines.forbundletype('UN')
751 self._compressed = None
754 self._compressed = None
752 super(unbundle20, self).__init__(fp)
755 super(unbundle20, self).__init__(fp)
753
756
754 @util.propertycache
757 @util.propertycache
755 def params(self):
758 def params(self):
756 """dictionary of stream level parameters"""
759 """dictionary of stream level parameters"""
757 indebug(self.ui, 'reading bundle2 stream parameters')
760 indebug(self.ui, 'reading bundle2 stream parameters')
758 params = {}
761 params = {}
759 paramssize = self._unpack(_fstreamparamsize)[0]
762 paramssize = self._unpack(_fstreamparamsize)[0]
760 if paramssize < 0:
763 if paramssize < 0:
761 raise error.BundleValueError('negative bundle param size: %i'
764 raise error.BundleValueError('negative bundle param size: %i'
762 % paramssize)
765 % paramssize)
763 if paramssize:
766 if paramssize:
764 params = self._readexact(paramssize)
767 params = self._readexact(paramssize)
765 params = self._processallparams(params)
768 params = self._processallparams(params)
766 return params
769 return params
767
770
768 def _processallparams(self, paramsblock):
771 def _processallparams(self, paramsblock):
769 """"""
772 """"""
770 params = util.sortdict()
773 params = util.sortdict()
771 for p in paramsblock.split(' '):
774 for p in paramsblock.split(' '):
772 p = p.split('=', 1)
775 p = p.split('=', 1)
773 p = [urlreq.unquote(i) for i in p]
776 p = [urlreq.unquote(i) for i in p]
774 if len(p) < 2:
777 if len(p) < 2:
775 p.append(None)
778 p.append(None)
776 self._processparam(*p)
779 self._processparam(*p)
777 params[p[0]] = p[1]
780 params[p[0]] = p[1]
778 return params
781 return params
779
782
780
783
781 def _processparam(self, name, value):
784 def _processparam(self, name, value):
782 """process a parameter, applying its effect if needed
785 """process a parameter, applying its effect if needed
783
786
784 Parameter starting with a lower case letter are advisory and will be
787 Parameter starting with a lower case letter are advisory and will be
785 ignored when unknown. Those starting with an upper case letter are
788 ignored when unknown. Those starting with an upper case letter are
786 mandatory and will this function will raise a KeyError when unknown.
789 mandatory and will this function will raise a KeyError when unknown.
787
790
788 Note: no option are currently supported. Any input will be either
791 Note: no option are currently supported. Any input will be either
789 ignored or failing.
792 ignored or failing.
790 """
793 """
791 if not name:
794 if not name:
792 raise ValueError('empty parameter name')
795 raise ValueError('empty parameter name')
793 if name[0] not in pycompat.bytestr(string.ascii_letters):
796 if name[0] not in pycompat.bytestr(string.ascii_letters):
794 raise ValueError('non letter first character: %r' % name)
797 raise ValueError('non letter first character: %r' % name)
795 try:
798 try:
796 handler = b2streamparamsmap[name.lower()]
799 handler = b2streamparamsmap[name.lower()]
797 except KeyError:
800 except KeyError:
798 if name[0].islower():
801 if name[0].islower():
799 indebug(self.ui, "ignoring unknown parameter %r" % name)
802 indebug(self.ui, "ignoring unknown parameter %r" % name)
800 else:
803 else:
801 raise error.BundleUnknownFeatureError(params=(name,))
804 raise error.BundleUnknownFeatureError(params=(name,))
802 else:
805 else:
803 handler(self, name, value)
806 handler(self, name, value)
804
807
805 def _forwardchunks(self):
808 def _forwardchunks(self):
806 """utility to transfer a bundle2 as binary
809 """utility to transfer a bundle2 as binary
807
810
808 This is made necessary by the fact the 'getbundle' command over 'ssh'
811 This is made necessary by the fact the 'getbundle' command over 'ssh'
809 have no way to know then the reply end, relying on the bundle to be
812 have no way to know then the reply end, relying on the bundle to be
810 interpreted to know its end. This is terrible and we are sorry, but we
813 interpreted to know its end. This is terrible and we are sorry, but we
811 needed to move forward to get general delta enabled.
814 needed to move forward to get general delta enabled.
812 """
815 """
813 yield self._magicstring
816 yield self._magicstring
814 assert 'params' not in vars(self)
817 assert 'params' not in vars(self)
815 paramssize = self._unpack(_fstreamparamsize)[0]
818 paramssize = self._unpack(_fstreamparamsize)[0]
816 if paramssize < 0:
819 if paramssize < 0:
817 raise error.BundleValueError('negative bundle param size: %i'
820 raise error.BundleValueError('negative bundle param size: %i'
818 % paramssize)
821 % paramssize)
819 yield _pack(_fstreamparamsize, paramssize)
822 yield _pack(_fstreamparamsize, paramssize)
820 if paramssize:
823 if paramssize:
821 params = self._readexact(paramssize)
824 params = self._readexact(paramssize)
822 self._processallparams(params)
825 self._processallparams(params)
823 yield params
826 yield params
824 assert self._compengine.bundletype == 'UN'
827 assert self._compengine.bundletype == 'UN'
825 # From there, payload might need to be decompressed
828 # From there, payload might need to be decompressed
826 self._fp = self._compengine.decompressorreader(self._fp)
829 self._fp = self._compengine.decompressorreader(self._fp)
827 emptycount = 0
830 emptycount = 0
828 while emptycount < 2:
831 while emptycount < 2:
829 # so we can brainlessly loop
832 # so we can brainlessly loop
830 assert _fpartheadersize == _fpayloadsize
833 assert _fpartheadersize == _fpayloadsize
831 size = self._unpack(_fpartheadersize)[0]
834 size = self._unpack(_fpartheadersize)[0]
832 yield _pack(_fpartheadersize, size)
835 yield _pack(_fpartheadersize, size)
833 if size:
836 if size:
834 emptycount = 0
837 emptycount = 0
835 else:
838 else:
836 emptycount += 1
839 emptycount += 1
837 continue
840 continue
838 if size == flaginterrupt:
841 if size == flaginterrupt:
839 continue
842 continue
840 elif size < 0:
843 elif size < 0:
841 raise error.BundleValueError('negative chunk size: %i')
844 raise error.BundleValueError('negative chunk size: %i')
842 yield self._readexact(size)
845 yield self._readexact(size)
843
846
844
847
845 def iterparts(self):
848 def iterparts(self):
846 """yield all parts contained in the stream"""
849 """yield all parts contained in the stream"""
847 # make sure param have been loaded
850 # make sure param have been loaded
848 self.params
851 self.params
849 # From there, payload need to be decompressed
852 # From there, payload need to be decompressed
850 self._fp = self._compengine.decompressorreader(self._fp)
853 self._fp = self._compengine.decompressorreader(self._fp)
851 indebug(self.ui, 'start extraction of bundle2 parts')
854 indebug(self.ui, 'start extraction of bundle2 parts')
852 headerblock = self._readpartheader()
855 headerblock = self._readpartheader()
853 while headerblock is not None:
856 while headerblock is not None:
854 part = unbundlepart(self.ui, headerblock, self._fp)
857 part = unbundlepart(self.ui, headerblock, self._fp)
855 yield part
858 yield part
856 # Seek to the end of the part to force it's consumption so the next
859 # Seek to the end of the part to force it's consumption so the next
857 # part can be read. But then seek back to the beginning so the
860 # part can be read. But then seek back to the beginning so the
858 # code consuming this generator has a part that starts at 0.
861 # code consuming this generator has a part that starts at 0.
859 part.seek(0, 2)
862 part.seek(0, 2)
860 part.seek(0)
863 part.seek(0)
861 headerblock = self._readpartheader()
864 headerblock = self._readpartheader()
862 indebug(self.ui, 'end of bundle2 stream')
865 indebug(self.ui, 'end of bundle2 stream')
863
866
864 def _readpartheader(self):
867 def _readpartheader(self):
865 """reads a part header size and return the bytes blob
868 """reads a part header size and return the bytes blob
866
869
867 returns None if empty"""
870 returns None if empty"""
868 headersize = self._unpack(_fpartheadersize)[0]
871 headersize = self._unpack(_fpartheadersize)[0]
869 if headersize < 0:
872 if headersize < 0:
870 raise error.BundleValueError('negative part header size: %i'
873 raise error.BundleValueError('negative part header size: %i'
871 % headersize)
874 % headersize)
872 indebug(self.ui, 'part header size: %i' % headersize)
875 indebug(self.ui, 'part header size: %i' % headersize)
873 if headersize:
876 if headersize:
874 return self._readexact(headersize)
877 return self._readexact(headersize)
875 return None
878 return None
876
879
877 def compressed(self):
880 def compressed(self):
878 self.params # load params
881 self.params # load params
879 return self._compressed
882 return self._compressed
880
883
881 def close(self):
884 def close(self):
882 """close underlying file"""
885 """close underlying file"""
883 if util.safehasattr(self._fp, 'close'):
886 if util.safehasattr(self._fp, 'close'):
884 return self._fp.close()
887 return self._fp.close()
885
888
886 formatmap = {'20': unbundle20}
889 formatmap = {'20': unbundle20}
887
890
888 b2streamparamsmap = {}
891 b2streamparamsmap = {}
889
892
890 def b2streamparamhandler(name):
893 def b2streamparamhandler(name):
891 """register a handler for a stream level parameter"""
894 """register a handler for a stream level parameter"""
892 def decorator(func):
895 def decorator(func):
893 assert name not in formatmap
896 assert name not in formatmap
894 b2streamparamsmap[name] = func
897 b2streamparamsmap[name] = func
895 return func
898 return func
896 return decorator
899 return decorator
897
900
898 @b2streamparamhandler('compression')
901 @b2streamparamhandler('compression')
899 def processcompression(unbundler, param, value):
902 def processcompression(unbundler, param, value):
900 """read compression parameter and install payload decompression"""
903 """read compression parameter and install payload decompression"""
901 if value not in util.compengines.supportedbundletypes:
904 if value not in util.compengines.supportedbundletypes:
902 raise error.BundleUnknownFeatureError(params=(param,),
905 raise error.BundleUnknownFeatureError(params=(param,),
903 values=(value,))
906 values=(value,))
904 unbundler._compengine = util.compengines.forbundletype(value)
907 unbundler._compengine = util.compengines.forbundletype(value)
905 if value is not None:
908 if value is not None:
906 unbundler._compressed = True
909 unbundler._compressed = True
907
910
908 class bundlepart(object):
911 class bundlepart(object):
909 """A bundle2 part contains application level payload
912 """A bundle2 part contains application level payload
910
913
911 The part `type` is used to route the part to the application level
914 The part `type` is used to route the part to the application level
912 handler.
915 handler.
913
916
914 The part payload is contained in ``part.data``. It could be raw bytes or a
917 The part payload is contained in ``part.data``. It could be raw bytes or a
915 generator of byte chunks.
918 generator of byte chunks.
916
919
917 You can add parameters to the part using the ``addparam`` method.
920 You can add parameters to the part using the ``addparam`` method.
918 Parameters can be either mandatory (default) or advisory. Remote side
921 Parameters can be either mandatory (default) or advisory. Remote side
919 should be able to safely ignore the advisory ones.
922 should be able to safely ignore the advisory ones.
920
923
921 Both data and parameters cannot be modified after the generation has begun.
924 Both data and parameters cannot be modified after the generation has begun.
922 """
925 """
923
926
924 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
927 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
925 data='', mandatory=True):
928 data='', mandatory=True):
926 validateparttype(parttype)
929 validateparttype(parttype)
927 self.id = None
930 self.id = None
928 self.type = parttype
931 self.type = parttype
929 self._data = data
932 self._data = data
930 self._mandatoryparams = list(mandatoryparams)
933 self._mandatoryparams = list(mandatoryparams)
931 self._advisoryparams = list(advisoryparams)
934 self._advisoryparams = list(advisoryparams)
932 # checking for duplicated entries
935 # checking for duplicated entries
933 self._seenparams = set()
936 self._seenparams = set()
934 for pname, __ in self._mandatoryparams + self._advisoryparams:
937 for pname, __ in self._mandatoryparams + self._advisoryparams:
935 if pname in self._seenparams:
938 if pname in self._seenparams:
936 raise error.ProgrammingError('duplicated params: %s' % pname)
939 raise error.ProgrammingError('duplicated params: %s' % pname)
937 self._seenparams.add(pname)
940 self._seenparams.add(pname)
938 # status of the part's generation:
941 # status of the part's generation:
939 # - None: not started,
942 # - None: not started,
940 # - False: currently generated,
943 # - False: currently generated,
941 # - True: generation done.
944 # - True: generation done.
942 self._generated = None
945 self._generated = None
943 self.mandatory = mandatory
946 self.mandatory = mandatory
944
947
945 def __repr__(self):
948 def __repr__(self):
946 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
949 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
947 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
950 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
948 % (cls, id(self), self.id, self.type, self.mandatory))
951 % (cls, id(self), self.id, self.type, self.mandatory))
949
952
950 def copy(self):
953 def copy(self):
951 """return a copy of the part
954 """return a copy of the part
952
955
953 The new part have the very same content but no partid assigned yet.
956 The new part have the very same content but no partid assigned yet.
954 Parts with generated data cannot be copied."""
957 Parts with generated data cannot be copied."""
955 assert not util.safehasattr(self.data, 'next')
958 assert not util.safehasattr(self.data, 'next')
956 return self.__class__(self.type, self._mandatoryparams,
959 return self.__class__(self.type, self._mandatoryparams,
957 self._advisoryparams, self._data, self.mandatory)
960 self._advisoryparams, self._data, self.mandatory)
958
961
959 # methods used to defines the part content
962 # methods used to defines the part content
960 @property
963 @property
961 def data(self):
964 def data(self):
962 return self._data
965 return self._data
963
966
964 @data.setter
967 @data.setter
965 def data(self, data):
968 def data(self, data):
966 if self._generated is not None:
969 if self._generated is not None:
967 raise error.ReadOnlyPartError('part is being generated')
970 raise error.ReadOnlyPartError('part is being generated')
968 self._data = data
971 self._data = data
969
972
970 @property
973 @property
971 def mandatoryparams(self):
974 def mandatoryparams(self):
972 # make it an immutable tuple to force people through ``addparam``
975 # make it an immutable tuple to force people through ``addparam``
973 return tuple(self._mandatoryparams)
976 return tuple(self._mandatoryparams)
974
977
975 @property
978 @property
976 def advisoryparams(self):
979 def advisoryparams(self):
977 # make it an immutable tuple to force people through ``addparam``
980 # make it an immutable tuple to force people through ``addparam``
978 return tuple(self._advisoryparams)
981 return tuple(self._advisoryparams)
979
982
980 def addparam(self, name, value='', mandatory=True):
983 def addparam(self, name, value='', mandatory=True):
981 """add a parameter to the part
984 """add a parameter to the part
982
985
983 If 'mandatory' is set to True, the remote handler must claim support
986 If 'mandatory' is set to True, the remote handler must claim support
984 for this parameter or the unbundling will be aborted.
987 for this parameter or the unbundling will be aborted.
985
988
986 The 'name' and 'value' cannot exceed 255 bytes each.
989 The 'name' and 'value' cannot exceed 255 bytes each.
987 """
990 """
988 if self._generated is not None:
991 if self._generated is not None:
989 raise error.ReadOnlyPartError('part is being generated')
992 raise error.ReadOnlyPartError('part is being generated')
990 if name in self._seenparams:
993 if name in self._seenparams:
991 raise ValueError('duplicated params: %s' % name)
994 raise ValueError('duplicated params: %s' % name)
992 self._seenparams.add(name)
995 self._seenparams.add(name)
993 params = self._advisoryparams
996 params = self._advisoryparams
994 if mandatory:
997 if mandatory:
995 params = self._mandatoryparams
998 params = self._mandatoryparams
996 params.append((name, value))
999 params.append((name, value))
997
1000
998 # methods used to generates the bundle2 stream
1001 # methods used to generates the bundle2 stream
999 def getchunks(self, ui):
1002 def getchunks(self, ui):
1000 if self._generated is not None:
1003 if self._generated is not None:
1001 raise error.ProgrammingError('part can only be consumed once')
1004 raise error.ProgrammingError('part can only be consumed once')
1002 self._generated = False
1005 self._generated = False
1003
1006
1004 if ui.debugflag:
1007 if ui.debugflag:
1005 msg = ['bundle2-output-part: "%s"' % self.type]
1008 msg = ['bundle2-output-part: "%s"' % self.type]
1006 if not self.mandatory:
1009 if not self.mandatory:
1007 msg.append(' (advisory)')
1010 msg.append(' (advisory)')
1008 nbmp = len(self.mandatoryparams)
1011 nbmp = len(self.mandatoryparams)
1009 nbap = len(self.advisoryparams)
1012 nbap = len(self.advisoryparams)
1010 if nbmp or nbap:
1013 if nbmp or nbap:
1011 msg.append(' (params:')
1014 msg.append(' (params:')
1012 if nbmp:
1015 if nbmp:
1013 msg.append(' %i mandatory' % nbmp)
1016 msg.append(' %i mandatory' % nbmp)
1014 if nbap:
1017 if nbap:
1015 msg.append(' %i advisory' % nbmp)
1018 msg.append(' %i advisory' % nbmp)
1016 msg.append(')')
1019 msg.append(')')
1017 if not self.data:
1020 if not self.data:
1018 msg.append(' empty payload')
1021 msg.append(' empty payload')
1019 elif util.safehasattr(self.data, 'next'):
1022 elif util.safehasattr(self.data, 'next'):
1020 msg.append(' streamed payload')
1023 msg.append(' streamed payload')
1021 else:
1024 else:
1022 msg.append(' %i bytes payload' % len(self.data))
1025 msg.append(' %i bytes payload' % len(self.data))
1023 msg.append('\n')
1026 msg.append('\n')
1024 ui.debug(''.join(msg))
1027 ui.debug(''.join(msg))
1025
1028
1026 #### header
1029 #### header
1027 if self.mandatory:
1030 if self.mandatory:
1028 parttype = self.type.upper()
1031 parttype = self.type.upper()
1029 else:
1032 else:
1030 parttype = self.type.lower()
1033 parttype = self.type.lower()
1031 outdebug(ui, 'part %s: "%s"' % (pycompat.bytestr(self.id), parttype))
1034 outdebug(ui, 'part %s: "%s"' % (pycompat.bytestr(self.id), parttype))
1032 ## parttype
1035 ## parttype
1033 header = [_pack(_fparttypesize, len(parttype)),
1036 header = [_pack(_fparttypesize, len(parttype)),
1034 parttype, _pack(_fpartid, self.id),
1037 parttype, _pack(_fpartid, self.id),
1035 ]
1038 ]
1036 ## parameters
1039 ## parameters
1037 # count
1040 # count
1038 manpar = self.mandatoryparams
1041 manpar = self.mandatoryparams
1039 advpar = self.advisoryparams
1042 advpar = self.advisoryparams
1040 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
1043 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
1041 # size
1044 # size
1042 parsizes = []
1045 parsizes = []
1043 for key, value in manpar:
1046 for key, value in manpar:
1044 parsizes.append(len(key))
1047 parsizes.append(len(key))
1045 parsizes.append(len(value))
1048 parsizes.append(len(value))
1046 for key, value in advpar:
1049 for key, value in advpar:
1047 parsizes.append(len(key))
1050 parsizes.append(len(key))
1048 parsizes.append(len(value))
1051 parsizes.append(len(value))
1049 paramsizes = _pack(_makefpartparamsizes(len(parsizes) // 2), *parsizes)
1052 paramsizes = _pack(_makefpartparamsizes(len(parsizes) // 2), *parsizes)
1050 header.append(paramsizes)
1053 header.append(paramsizes)
1051 # key, value
1054 # key, value
1052 for key, value in manpar:
1055 for key, value in manpar:
1053 header.append(key)
1056 header.append(key)
1054 header.append(value)
1057 header.append(value)
1055 for key, value in advpar:
1058 for key, value in advpar:
1056 header.append(key)
1059 header.append(key)
1057 header.append(value)
1060 header.append(value)
1058 ## finalize header
1061 ## finalize header
1059 try:
1062 try:
1060 headerchunk = ''.join(header)
1063 headerchunk = ''.join(header)
1061 except TypeError:
1064 except TypeError:
1062 raise TypeError(r'Found a non-bytes trying to '
1065 raise TypeError(r'Found a non-bytes trying to '
1063 r'build bundle part header: %r' % header)
1066 r'build bundle part header: %r' % header)
1064 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
1067 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
1065 yield _pack(_fpartheadersize, len(headerchunk))
1068 yield _pack(_fpartheadersize, len(headerchunk))
1066 yield headerchunk
1069 yield headerchunk
1067 ## payload
1070 ## payload
1068 try:
1071 try:
1069 for chunk in self._payloadchunks():
1072 for chunk in self._payloadchunks():
1070 outdebug(ui, 'payload chunk size: %i' % len(chunk))
1073 outdebug(ui, 'payload chunk size: %i' % len(chunk))
1071 yield _pack(_fpayloadsize, len(chunk))
1074 yield _pack(_fpayloadsize, len(chunk))
1072 yield chunk
1075 yield chunk
1073 except GeneratorExit:
1076 except GeneratorExit:
1074 # GeneratorExit means that nobody is listening for our
1077 # GeneratorExit means that nobody is listening for our
1075 # results anyway, so just bail quickly rather than trying
1078 # results anyway, so just bail quickly rather than trying
1076 # to produce an error part.
1079 # to produce an error part.
1077 ui.debug('bundle2-generatorexit\n')
1080 ui.debug('bundle2-generatorexit\n')
1078 raise
1081 raise
1079 except BaseException as exc:
1082 except BaseException as exc:
1080 bexc = util.forcebytestr(exc)
1083 bexc = util.forcebytestr(exc)
1081 # backup exception data for later
1084 # backup exception data for later
1082 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
1085 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
1083 % bexc)
1086 % bexc)
1084 tb = sys.exc_info()[2]
1087 tb = sys.exc_info()[2]
1085 msg = 'unexpected error: %s' % bexc
1088 msg = 'unexpected error: %s' % bexc
1086 interpart = bundlepart('error:abort', [('message', msg)],
1089 interpart = bundlepart('error:abort', [('message', msg)],
1087 mandatory=False)
1090 mandatory=False)
1088 interpart.id = 0
1091 interpart.id = 0
1089 yield _pack(_fpayloadsize, -1)
1092 yield _pack(_fpayloadsize, -1)
1090 for chunk in interpart.getchunks(ui=ui):
1093 for chunk in interpart.getchunks(ui=ui):
1091 yield chunk
1094 yield chunk
1092 outdebug(ui, 'closing payload chunk')
1095 outdebug(ui, 'closing payload chunk')
1093 # abort current part payload
1096 # abort current part payload
1094 yield _pack(_fpayloadsize, 0)
1097 yield _pack(_fpayloadsize, 0)
1095 pycompat.raisewithtb(exc, tb)
1098 pycompat.raisewithtb(exc, tb)
1096 # end of payload
1099 # end of payload
1097 outdebug(ui, 'closing payload chunk')
1100 outdebug(ui, 'closing payload chunk')
1098 yield _pack(_fpayloadsize, 0)
1101 yield _pack(_fpayloadsize, 0)
1099 self._generated = True
1102 self._generated = True
1100
1103
1101 def _payloadchunks(self):
1104 def _payloadchunks(self):
1102 """yield chunks of a the part payload
1105 """yield chunks of a the part payload
1103
1106
1104 Exists to handle the different methods to provide data to a part."""
1107 Exists to handle the different methods to provide data to a part."""
1105 # we only support fixed size data now.
1108 # we only support fixed size data now.
1106 # This will be improved in the future.
1109 # This will be improved in the future.
1107 if (util.safehasattr(self.data, 'next')
1110 if (util.safehasattr(self.data, 'next')
1108 or util.safehasattr(self.data, '__next__')):
1111 or util.safehasattr(self.data, '__next__')):
1109 buff = util.chunkbuffer(self.data)
1112 buff = util.chunkbuffer(self.data)
1110 chunk = buff.read(preferedchunksize)
1113 chunk = buff.read(preferedchunksize)
1111 while chunk:
1114 while chunk:
1112 yield chunk
1115 yield chunk
1113 chunk = buff.read(preferedchunksize)
1116 chunk = buff.read(preferedchunksize)
1114 elif len(self.data):
1117 elif len(self.data):
1115 yield self.data
1118 yield self.data
1116
1119
1117
1120
1118 flaginterrupt = -1
1121 flaginterrupt = -1
1119
1122
1120 class interrupthandler(unpackermixin):
1123 class interrupthandler(unpackermixin):
1121 """read one part and process it with restricted capability
1124 """read one part and process it with restricted capability
1122
1125
1123 This allows to transmit exception raised on the producer size during part
1126 This allows to transmit exception raised on the producer size during part
1124 iteration while the consumer is reading a part.
1127 iteration while the consumer is reading a part.
1125
1128
1126 Part processed in this manner only have access to a ui object,"""
1129 Part processed in this manner only have access to a ui object,"""
1127
1130
1128 def __init__(self, ui, fp):
1131 def __init__(self, ui, fp):
1129 super(interrupthandler, self).__init__(fp)
1132 super(interrupthandler, self).__init__(fp)
1130 self.ui = ui
1133 self.ui = ui
1131
1134
1132 def _readpartheader(self):
1135 def _readpartheader(self):
1133 """reads a part header size and return the bytes blob
1136 """reads a part header size and return the bytes blob
1134
1137
1135 returns None if empty"""
1138 returns None if empty"""
1136 headersize = self._unpack(_fpartheadersize)[0]
1139 headersize = self._unpack(_fpartheadersize)[0]
1137 if headersize < 0:
1140 if headersize < 0:
1138 raise error.BundleValueError('negative part header size: %i'
1141 raise error.BundleValueError('negative part header size: %i'
1139 % headersize)
1142 % headersize)
1140 indebug(self.ui, 'part header size: %i\n' % headersize)
1143 indebug(self.ui, 'part header size: %i\n' % headersize)
1141 if headersize:
1144 if headersize:
1142 return self._readexact(headersize)
1145 return self._readexact(headersize)
1143 return None
1146 return None
1144
1147
1145 def __call__(self):
1148 def __call__(self):
1146
1149
1147 self.ui.debug('bundle2-input-stream-interrupt:'
1150 self.ui.debug('bundle2-input-stream-interrupt:'
1148 ' opening out of band context\n')
1151 ' opening out of band context\n')
1149 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1152 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1150 headerblock = self._readpartheader()
1153 headerblock = self._readpartheader()
1151 if headerblock is None:
1154 if headerblock is None:
1152 indebug(self.ui, 'no part found during interruption.')
1155 indebug(self.ui, 'no part found during interruption.')
1153 return
1156 return
1154 part = unbundlepart(self.ui, headerblock, self._fp)
1157 part = unbundlepart(self.ui, headerblock, self._fp)
1155 op = interruptoperation(self.ui)
1158 op = interruptoperation(self.ui)
1156 hardabort = False
1159 hardabort = False
1157 try:
1160 try:
1158 _processpart(op, part)
1161 _processpart(op, part)
1159 except (SystemExit, KeyboardInterrupt):
1162 except (SystemExit, KeyboardInterrupt):
1160 hardabort = True
1163 hardabort = True
1161 raise
1164 raise
1162 finally:
1165 finally:
1163 if not hardabort:
1166 if not hardabort:
1164 part.seek(0, 2)
1167 part.seek(0, 2)
1165 self.ui.debug('bundle2-input-stream-interrupt:'
1168 self.ui.debug('bundle2-input-stream-interrupt:'
1166 ' closing out of band context\n')
1169 ' closing out of band context\n')
1167
1170
1168 class interruptoperation(object):
1171 class interruptoperation(object):
1169 """A limited operation to be use by part handler during interruption
1172 """A limited operation to be use by part handler during interruption
1170
1173
1171 It only have access to an ui object.
1174 It only have access to an ui object.
1172 """
1175 """
1173
1176
1174 def __init__(self, ui):
1177 def __init__(self, ui):
1175 self.ui = ui
1178 self.ui = ui
1176 self.reply = None
1179 self.reply = None
1177 self.captureoutput = False
1180 self.captureoutput = False
1178
1181
1179 @property
1182 @property
1180 def repo(self):
1183 def repo(self):
1181 raise error.ProgrammingError('no repo access from stream interruption')
1184 raise error.ProgrammingError('no repo access from stream interruption')
1182
1185
1183 def gettransaction(self):
1186 def gettransaction(self):
1184 raise TransactionUnavailable('no repo access from stream interruption')
1187 raise TransactionUnavailable('no repo access from stream interruption')
1185
1188
1186 class unbundlepart(unpackermixin):
1189 class unbundlepart(unpackermixin):
1187 """a bundle part read from a bundle"""
1190 """a bundle part read from a bundle"""
1188
1191
1189 def __init__(self, ui, header, fp):
1192 def __init__(self, ui, header, fp):
1190 super(unbundlepart, self).__init__(fp)
1193 super(unbundlepart, self).__init__(fp)
1191 self._seekable = (util.safehasattr(fp, 'seek') and
1194 self._seekable = (util.safehasattr(fp, 'seek') and
1192 util.safehasattr(fp, 'tell'))
1195 util.safehasattr(fp, 'tell'))
1193 self.ui = ui
1196 self.ui = ui
1194 # unbundle state attr
1197 # unbundle state attr
1195 self._headerdata = header
1198 self._headerdata = header
1196 self._headeroffset = 0
1199 self._headeroffset = 0
1197 self._initialized = False
1200 self._initialized = False
1198 self.consumed = False
1201 self.consumed = False
1199 # part data
1202 # part data
1200 self.id = None
1203 self.id = None
1201 self.type = None
1204 self.type = None
1202 self.mandatoryparams = None
1205 self.mandatoryparams = None
1203 self.advisoryparams = None
1206 self.advisoryparams = None
1204 self.params = None
1207 self.params = None
1205 self.mandatorykeys = ()
1208 self.mandatorykeys = ()
1206 self._payloadstream = None
1209 self._payloadstream = None
1207 self._readheader()
1210 self._readheader()
1208 self._mandatory = None
1211 self._mandatory = None
1209 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1212 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1210 self._pos = 0
1213 self._pos = 0
1211
1214
1212 def _fromheader(self, size):
1215 def _fromheader(self, size):
1213 """return the next <size> byte from the header"""
1216 """return the next <size> byte from the header"""
1214 offset = self._headeroffset
1217 offset = self._headeroffset
1215 data = self._headerdata[offset:(offset + size)]
1218 data = self._headerdata[offset:(offset + size)]
1216 self._headeroffset = offset + size
1219 self._headeroffset = offset + size
1217 return data
1220 return data
1218
1221
1219 def _unpackheader(self, format):
1222 def _unpackheader(self, format):
1220 """read given format from header
1223 """read given format from header
1221
1224
1222 This automatically compute the size of the format to read."""
1225 This automatically compute the size of the format to read."""
1223 data = self._fromheader(struct.calcsize(format))
1226 data = self._fromheader(struct.calcsize(format))
1224 return _unpack(format, data)
1227 return _unpack(format, data)
1225
1228
1226 def _initparams(self, mandatoryparams, advisoryparams):
1229 def _initparams(self, mandatoryparams, advisoryparams):
1227 """internal function to setup all logic related parameters"""
1230 """internal function to setup all logic related parameters"""
1228 # make it read only to prevent people touching it by mistake.
1231 # make it read only to prevent people touching it by mistake.
1229 self.mandatoryparams = tuple(mandatoryparams)
1232 self.mandatoryparams = tuple(mandatoryparams)
1230 self.advisoryparams = tuple(advisoryparams)
1233 self.advisoryparams = tuple(advisoryparams)
1231 # user friendly UI
1234 # user friendly UI
1232 self.params = util.sortdict(self.mandatoryparams)
1235 self.params = util.sortdict(self.mandatoryparams)
1233 self.params.update(self.advisoryparams)
1236 self.params.update(self.advisoryparams)
1234 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1237 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1235
1238
1236 def _payloadchunks(self, chunknum=0):
1239 def _payloadchunks(self, chunknum=0):
1237 '''seek to specified chunk and start yielding data'''
1240 '''seek to specified chunk and start yielding data'''
1238 if len(self._chunkindex) == 0:
1241 if len(self._chunkindex) == 0:
1239 assert chunknum == 0, 'Must start with chunk 0'
1242 assert chunknum == 0, 'Must start with chunk 0'
1240 self._chunkindex.append((0, self._tellfp()))
1243 self._chunkindex.append((0, self._tellfp()))
1241 else:
1244 else:
1242 assert chunknum < len(self._chunkindex), \
1245 assert chunknum < len(self._chunkindex), \
1243 'Unknown chunk %d' % chunknum
1246 'Unknown chunk %d' % chunknum
1244 self._seekfp(self._chunkindex[chunknum][1])
1247 self._seekfp(self._chunkindex[chunknum][1])
1245
1248
1246 pos = self._chunkindex[chunknum][0]
1249 pos = self._chunkindex[chunknum][0]
1247 payloadsize = self._unpack(_fpayloadsize)[0]
1250 payloadsize = self._unpack(_fpayloadsize)[0]
1248 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1251 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1249 while payloadsize:
1252 while payloadsize:
1250 if payloadsize == flaginterrupt:
1253 if payloadsize == flaginterrupt:
1251 # interruption detection, the handler will now read a
1254 # interruption detection, the handler will now read a
1252 # single part and process it.
1255 # single part and process it.
1253 interrupthandler(self.ui, self._fp)()
1256 interrupthandler(self.ui, self._fp)()
1254 elif payloadsize < 0:
1257 elif payloadsize < 0:
1255 msg = 'negative payload chunk size: %i' % payloadsize
1258 msg = 'negative payload chunk size: %i' % payloadsize
1256 raise error.BundleValueError(msg)
1259 raise error.BundleValueError(msg)
1257 else:
1260 else:
1258 result = self._readexact(payloadsize)
1261 result = self._readexact(payloadsize)
1259 chunknum += 1
1262 chunknum += 1
1260 pos += payloadsize
1263 pos += payloadsize
1261 if chunknum == len(self._chunkindex):
1264 if chunknum == len(self._chunkindex):
1262 self._chunkindex.append((pos, self._tellfp()))
1265 self._chunkindex.append((pos, self._tellfp()))
1263 yield result
1266 yield result
1264 payloadsize = self._unpack(_fpayloadsize)[0]
1267 payloadsize = self._unpack(_fpayloadsize)[0]
1265 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1268 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1266
1269
1267 def _findchunk(self, pos):
1270 def _findchunk(self, pos):
1268 '''for a given payload position, return a chunk number and offset'''
1271 '''for a given payload position, return a chunk number and offset'''
1269 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1272 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1270 if ppos == pos:
1273 if ppos == pos:
1271 return chunk, 0
1274 return chunk, 0
1272 elif ppos > pos:
1275 elif ppos > pos:
1273 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1276 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1274 raise ValueError('Unknown chunk')
1277 raise ValueError('Unknown chunk')
1275
1278
1276 def _readheader(self):
1279 def _readheader(self):
1277 """read the header and setup the object"""
1280 """read the header and setup the object"""
1278 typesize = self._unpackheader(_fparttypesize)[0]
1281 typesize = self._unpackheader(_fparttypesize)[0]
1279 self.type = self._fromheader(typesize)
1282 self.type = self._fromheader(typesize)
1280 indebug(self.ui, 'part type: "%s"' % self.type)
1283 indebug(self.ui, 'part type: "%s"' % self.type)
1281 self.id = self._unpackheader(_fpartid)[0]
1284 self.id = self._unpackheader(_fpartid)[0]
1282 indebug(self.ui, 'part id: "%s"' % pycompat.bytestr(self.id))
1285 indebug(self.ui, 'part id: "%s"' % pycompat.bytestr(self.id))
1283 # extract mandatory bit from type
1286 # extract mandatory bit from type
1284 self.mandatory = (self.type != self.type.lower())
1287 self.mandatory = (self.type != self.type.lower())
1285 self.type = self.type.lower()
1288 self.type = self.type.lower()
1286 ## reading parameters
1289 ## reading parameters
1287 # param count
1290 # param count
1288 mancount, advcount = self._unpackheader(_fpartparamcount)
1291 mancount, advcount = self._unpackheader(_fpartparamcount)
1289 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1292 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1290 # param size
1293 # param size
1291 fparamsizes = _makefpartparamsizes(mancount + advcount)
1294 fparamsizes = _makefpartparamsizes(mancount + advcount)
1292 paramsizes = self._unpackheader(fparamsizes)
1295 paramsizes = self._unpackheader(fparamsizes)
1293 # make it a list of couple again
1296 # make it a list of couple again
1294 paramsizes = list(zip(paramsizes[::2], paramsizes[1::2]))
1297 paramsizes = list(zip(paramsizes[::2], paramsizes[1::2]))
1295 # split mandatory from advisory
1298 # split mandatory from advisory
1296 mansizes = paramsizes[:mancount]
1299 mansizes = paramsizes[:mancount]
1297 advsizes = paramsizes[mancount:]
1300 advsizes = paramsizes[mancount:]
1298 # retrieve param value
1301 # retrieve param value
1299 manparams = []
1302 manparams = []
1300 for key, value in mansizes:
1303 for key, value in mansizes:
1301 manparams.append((self._fromheader(key), self._fromheader(value)))
1304 manparams.append((self._fromheader(key), self._fromheader(value)))
1302 advparams = []
1305 advparams = []
1303 for key, value in advsizes:
1306 for key, value in advsizes:
1304 advparams.append((self._fromheader(key), self._fromheader(value)))
1307 advparams.append((self._fromheader(key), self._fromheader(value)))
1305 self._initparams(manparams, advparams)
1308 self._initparams(manparams, advparams)
1306 ## part payload
1309 ## part payload
1307 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1310 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1308 # we read the data, tell it
1311 # we read the data, tell it
1309 self._initialized = True
1312 self._initialized = True
1310
1313
1311 def read(self, size=None):
1314 def read(self, size=None):
1312 """read payload data"""
1315 """read payload data"""
1313 if not self._initialized:
1316 if not self._initialized:
1314 self._readheader()
1317 self._readheader()
1315 if size is None:
1318 if size is None:
1316 data = self._payloadstream.read()
1319 data = self._payloadstream.read()
1317 else:
1320 else:
1318 data = self._payloadstream.read(size)
1321 data = self._payloadstream.read(size)
1319 self._pos += len(data)
1322 self._pos += len(data)
1320 if size is None or len(data) < size:
1323 if size is None or len(data) < size:
1321 if not self.consumed and self._pos:
1324 if not self.consumed and self._pos:
1322 self.ui.debug('bundle2-input-part: total payload size %i\n'
1325 self.ui.debug('bundle2-input-part: total payload size %i\n'
1323 % self._pos)
1326 % self._pos)
1324 self.consumed = True
1327 self.consumed = True
1325 return data
1328 return data
1326
1329
1327 def tell(self):
1330 def tell(self):
1328 return self._pos
1331 return self._pos
1329
1332
1330 def seek(self, offset, whence=0):
1333 def seek(self, offset, whence=0):
1331 if whence == 0:
1334 if whence == 0:
1332 newpos = offset
1335 newpos = offset
1333 elif whence == 1:
1336 elif whence == 1:
1334 newpos = self._pos + offset
1337 newpos = self._pos + offset
1335 elif whence == 2:
1338 elif whence == 2:
1336 if not self.consumed:
1339 if not self.consumed:
1337 self.read()
1340 self.read()
1338 newpos = self._chunkindex[-1][0] - offset
1341 newpos = self._chunkindex[-1][0] - offset
1339 else:
1342 else:
1340 raise ValueError('Unknown whence value: %r' % (whence,))
1343 raise ValueError('Unknown whence value: %r' % (whence,))
1341
1344
1342 if newpos > self._chunkindex[-1][0] and not self.consumed:
1345 if newpos > self._chunkindex[-1][0] and not self.consumed:
1343 self.read()
1346 self.read()
1344 if not 0 <= newpos <= self._chunkindex[-1][0]:
1347 if not 0 <= newpos <= self._chunkindex[-1][0]:
1345 raise ValueError('Offset out of range')
1348 raise ValueError('Offset out of range')
1346
1349
1347 if self._pos != newpos:
1350 if self._pos != newpos:
1348 chunk, internaloffset = self._findchunk(newpos)
1351 chunk, internaloffset = self._findchunk(newpos)
1349 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1352 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1350 adjust = self.read(internaloffset)
1353 adjust = self.read(internaloffset)
1351 if len(adjust) != internaloffset:
1354 if len(adjust) != internaloffset:
1352 raise error.Abort(_('Seek failed\n'))
1355 raise error.Abort(_('Seek failed\n'))
1353 self._pos = newpos
1356 self._pos = newpos
1354
1357
1355 def _seekfp(self, offset, whence=0):
1358 def _seekfp(self, offset, whence=0):
1356 """move the underlying file pointer
1359 """move the underlying file pointer
1357
1360
1358 This method is meant for internal usage by the bundle2 protocol only.
1361 This method is meant for internal usage by the bundle2 protocol only.
1359 They directly manipulate the low level stream including bundle2 level
1362 They directly manipulate the low level stream including bundle2 level
1360 instruction.
1363 instruction.
1361
1364
1362 Do not use it to implement higher-level logic or methods."""
1365 Do not use it to implement higher-level logic or methods."""
1363 if self._seekable:
1366 if self._seekable:
1364 return self._fp.seek(offset, whence)
1367 return self._fp.seek(offset, whence)
1365 else:
1368 else:
1366 raise NotImplementedError(_('File pointer is not seekable'))
1369 raise NotImplementedError(_('File pointer is not seekable'))
1367
1370
1368 def _tellfp(self):
1371 def _tellfp(self):
1369 """return the file offset, or None if file is not seekable
1372 """return the file offset, or None if file is not seekable
1370
1373
1371 This method is meant for internal usage by the bundle2 protocol only.
1374 This method is meant for internal usage by the bundle2 protocol only.
1372 They directly manipulate the low level stream including bundle2 level
1375 They directly manipulate the low level stream including bundle2 level
1373 instruction.
1376 instruction.
1374
1377
1375 Do not use it to implement higher-level logic or methods."""
1378 Do not use it to implement higher-level logic or methods."""
1376 if self._seekable:
1379 if self._seekable:
1377 try:
1380 try:
1378 return self._fp.tell()
1381 return self._fp.tell()
1379 except IOError as e:
1382 except IOError as e:
1380 if e.errno == errno.ESPIPE:
1383 if e.errno == errno.ESPIPE:
1381 self._seekable = False
1384 self._seekable = False
1382 else:
1385 else:
1383 raise
1386 raise
1384 return None
1387 return None
1385
1388
1386 # These are only the static capabilities.
1389 # These are only the static capabilities.
1387 # Check the 'getrepocaps' function for the rest.
1390 # Check the 'getrepocaps' function for the rest.
1388 capabilities = {'HG20': (),
1391 capabilities = {'HG20': (),
1389 'error': ('abort', 'unsupportedcontent', 'pushraced',
1392 'error': ('abort', 'unsupportedcontent', 'pushraced',
1390 'pushkey'),
1393 'pushkey'),
1391 'listkeys': (),
1394 'listkeys': (),
1392 'pushkey': (),
1395 'pushkey': (),
1393 'digests': tuple(sorted(util.DIGESTS.keys())),
1396 'digests': tuple(sorted(util.DIGESTS.keys())),
1394 'remote-changegroup': ('http', 'https'),
1397 'remote-changegroup': ('http', 'https'),
1395 'hgtagsfnodes': (),
1398 'hgtagsfnodes': (),
1396 }
1399 }
1397
1400
1398 def getrepocaps(repo, allowpushback=False):
1401 def getrepocaps(repo, allowpushback=False):
1399 """return the bundle2 capabilities for a given repo
1402 """return the bundle2 capabilities for a given repo
1400
1403
1401 Exists to allow extensions (like evolution) to mutate the capabilities.
1404 Exists to allow extensions (like evolution) to mutate the capabilities.
1402 """
1405 """
1403 caps = capabilities.copy()
1406 caps = capabilities.copy()
1404 caps['changegroup'] = tuple(sorted(
1407 caps['changegroup'] = tuple(sorted(
1405 changegroup.supportedincomingversions(repo)))
1408 changegroup.supportedincomingversions(repo)))
1406 if obsolete.isenabled(repo, obsolete.exchangeopt):
1409 if obsolete.isenabled(repo, obsolete.exchangeopt):
1407 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1410 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1408 caps['obsmarkers'] = supportedformat
1411 caps['obsmarkers'] = supportedformat
1409 if allowpushback:
1412 if allowpushback:
1410 caps['pushback'] = ()
1413 caps['pushback'] = ()
1411 cpmode = repo.ui.config('server', 'concurrent-push-mode')
1414 cpmode = repo.ui.config('server', 'concurrent-push-mode')
1412 if cpmode == 'check-related':
1415 if cpmode == 'check-related':
1413 caps['checkheads'] = ('related',)
1416 caps['checkheads'] = ('related',)
1414 return caps
1417 return caps
1415
1418
1416 def bundle2caps(remote):
1419 def bundle2caps(remote):
1417 """return the bundle capabilities of a peer as dict"""
1420 """return the bundle capabilities of a peer as dict"""
1418 raw = remote.capable('bundle2')
1421 raw = remote.capable('bundle2')
1419 if not raw and raw != '':
1422 if not raw and raw != '':
1420 return {}
1423 return {}
1421 capsblob = urlreq.unquote(remote.capable('bundle2'))
1424 capsblob = urlreq.unquote(remote.capable('bundle2'))
1422 return decodecaps(capsblob)
1425 return decodecaps(capsblob)
1423
1426
1424 def obsmarkersversion(caps):
1427 def obsmarkersversion(caps):
1425 """extract the list of supported obsmarkers versions from a bundle2caps dict
1428 """extract the list of supported obsmarkers versions from a bundle2caps dict
1426 """
1429 """
1427 obscaps = caps.get('obsmarkers', ())
1430 obscaps = caps.get('obsmarkers', ())
1428 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1431 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1429
1432
1430 def writenewbundle(ui, repo, source, filename, bundletype, outgoing, opts,
1433 def writenewbundle(ui, repo, source, filename, bundletype, outgoing, opts,
1431 vfs=None, compression=None, compopts=None):
1434 vfs=None, compression=None, compopts=None):
1432 if bundletype.startswith('HG10'):
1435 if bundletype.startswith('HG10'):
1433 cg = changegroup.makechangegroup(repo, outgoing, '01', source)
1436 cg = changegroup.makechangegroup(repo, outgoing, '01', source)
1434 return writebundle(ui, cg, filename, bundletype, vfs=vfs,
1437 return writebundle(ui, cg, filename, bundletype, vfs=vfs,
1435 compression=compression, compopts=compopts)
1438 compression=compression, compopts=compopts)
1436 elif not bundletype.startswith('HG20'):
1439 elif not bundletype.startswith('HG20'):
1437 raise error.ProgrammingError('unknown bundle type: %s' % bundletype)
1440 raise error.ProgrammingError('unknown bundle type: %s' % bundletype)
1438
1441
1439 caps = {}
1442 caps = {}
1440 if 'obsolescence' in opts:
1443 if 'obsolescence' in opts:
1441 caps['obsmarkers'] = ('V1',)
1444 caps['obsmarkers'] = ('V1',)
1442 bundle = bundle20(ui, caps)
1445 bundle = bundle20(ui, caps)
1443 bundle.setcompression(compression, compopts)
1446 bundle.setcompression(compression, compopts)
1444 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1447 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1445 chunkiter = bundle.getchunks()
1448 chunkiter = bundle.getchunks()
1446
1449
1447 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1450 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1448
1451
1449 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1452 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1450 # We should eventually reconcile this logic with the one behind
1453 # We should eventually reconcile this logic with the one behind
1451 # 'exchange.getbundle2partsgenerator'.
1454 # 'exchange.getbundle2partsgenerator'.
1452 #
1455 #
1453 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1456 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1454 # different right now. So we keep them separated for now for the sake of
1457 # different right now. So we keep them separated for now for the sake of
1455 # simplicity.
1458 # simplicity.
1456
1459
1457 # we always want a changegroup in such bundle
1460 # we always want a changegroup in such bundle
1458 cgversion = opts.get('cg.version')
1461 cgversion = opts.get('cg.version')
1459 if cgversion is None:
1462 if cgversion is None:
1460 cgversion = changegroup.safeversion(repo)
1463 cgversion = changegroup.safeversion(repo)
1461 cg = changegroup.makechangegroup(repo, outgoing, cgversion, source)
1464 cg = changegroup.makechangegroup(repo, outgoing, cgversion, source)
1462 part = bundler.newpart('changegroup', data=cg.getchunks())
1465 part = bundler.newpart('changegroup', data=cg.getchunks())
1463 part.addparam('version', cg.version)
1466 part.addparam('version', cg.version)
1464 if 'clcount' in cg.extras:
1467 if 'clcount' in cg.extras:
1465 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1468 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1466 mandatory=False)
1469 mandatory=False)
1467 if opts.get('phases') and repo.revs('%ln and secret()',
1470 if opts.get('phases') and repo.revs('%ln and secret()',
1468 outgoing.missingheads):
1471 outgoing.missingheads):
1469 part.addparam('targetphase', '%d' % phases.secret, mandatory=False)
1472 part.addparam('targetphase', '%d' % phases.secret, mandatory=False)
1470
1473
1471 addparttagsfnodescache(repo, bundler, outgoing)
1474 addparttagsfnodescache(repo, bundler, outgoing)
1472
1475
1473 if opts.get('obsolescence', False):
1476 if opts.get('obsolescence', False):
1474 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1477 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1475 buildobsmarkerspart(bundler, obsmarkers)
1478 buildobsmarkerspart(bundler, obsmarkers)
1476
1479
1477 if opts.get('phases', False):
1480 if opts.get('phases', False):
1478 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1481 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1479 phasedata = []
1482 phasedata = []
1480 for phase in phases.allphases:
1483 for phase in phases.allphases:
1481 for head in headsbyphase[phase]:
1484 for head in headsbyphase[phase]:
1482 phasedata.append(_pack(_fphasesentry, phase, head))
1485 phasedata.append(_pack(_fphasesentry, phase, head))
1483 bundler.newpart('phase-heads', data=''.join(phasedata))
1486 bundler.newpart('phase-heads', data=''.join(phasedata))
1484
1487
1485 def addparttagsfnodescache(repo, bundler, outgoing):
1488 def addparttagsfnodescache(repo, bundler, outgoing):
1486 # we include the tags fnode cache for the bundle changeset
1489 # we include the tags fnode cache for the bundle changeset
1487 # (as an optional parts)
1490 # (as an optional parts)
1488 cache = tags.hgtagsfnodescache(repo.unfiltered())
1491 cache = tags.hgtagsfnodescache(repo.unfiltered())
1489 chunks = []
1492 chunks = []
1490
1493
1491 # .hgtags fnodes are only relevant for head changesets. While we could
1494 # .hgtags fnodes are only relevant for head changesets. While we could
1492 # transfer values for all known nodes, there will likely be little to
1495 # transfer values for all known nodes, there will likely be little to
1493 # no benefit.
1496 # no benefit.
1494 #
1497 #
1495 # We don't bother using a generator to produce output data because
1498 # We don't bother using a generator to produce output data because
1496 # a) we only have 40 bytes per head and even esoteric numbers of heads
1499 # a) we only have 40 bytes per head and even esoteric numbers of heads
1497 # consume little memory (1M heads is 40MB) b) we don't want to send the
1500 # consume little memory (1M heads is 40MB) b) we don't want to send the
1498 # part if we don't have entries and knowing if we have entries requires
1501 # part if we don't have entries and knowing if we have entries requires
1499 # cache lookups.
1502 # cache lookups.
1500 for node in outgoing.missingheads:
1503 for node in outgoing.missingheads:
1501 # Don't compute missing, as this may slow down serving.
1504 # Don't compute missing, as this may slow down serving.
1502 fnode = cache.getfnode(node, computemissing=False)
1505 fnode = cache.getfnode(node, computemissing=False)
1503 if fnode is not None:
1506 if fnode is not None:
1504 chunks.extend([node, fnode])
1507 chunks.extend([node, fnode])
1505
1508
1506 if chunks:
1509 if chunks:
1507 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1510 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1508
1511
1509 def buildobsmarkerspart(bundler, markers):
1512 def buildobsmarkerspart(bundler, markers):
1510 """add an obsmarker part to the bundler with <markers>
1513 """add an obsmarker part to the bundler with <markers>
1511
1514
1512 No part is created if markers is empty.
1515 No part is created if markers is empty.
1513 Raises ValueError if the bundler doesn't support any known obsmarker format.
1516 Raises ValueError if the bundler doesn't support any known obsmarker format.
1514 """
1517 """
1515 if not markers:
1518 if not markers:
1516 return None
1519 return None
1517
1520
1518 remoteversions = obsmarkersversion(bundler.capabilities)
1521 remoteversions = obsmarkersversion(bundler.capabilities)
1519 version = obsolete.commonversion(remoteversions)
1522 version = obsolete.commonversion(remoteversions)
1520 if version is None:
1523 if version is None:
1521 raise ValueError('bundler does not support common obsmarker format')
1524 raise ValueError('bundler does not support common obsmarker format')
1522 stream = obsolete.encodemarkers(markers, True, version=version)
1525 stream = obsolete.encodemarkers(markers, True, version=version)
1523 return bundler.newpart('obsmarkers', data=stream)
1526 return bundler.newpart('obsmarkers', data=stream)
1524
1527
1525 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1528 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1526 compopts=None):
1529 compopts=None):
1527 """Write a bundle file and return its filename.
1530 """Write a bundle file and return its filename.
1528
1531
1529 Existing files will not be overwritten.
1532 Existing files will not be overwritten.
1530 If no filename is specified, a temporary file is created.
1533 If no filename is specified, a temporary file is created.
1531 bz2 compression can be turned off.
1534 bz2 compression can be turned off.
1532 The bundle file will be deleted in case of errors.
1535 The bundle file will be deleted in case of errors.
1533 """
1536 """
1534
1537
1535 if bundletype == "HG20":
1538 if bundletype == "HG20":
1536 bundle = bundle20(ui)
1539 bundle = bundle20(ui)
1537 bundle.setcompression(compression, compopts)
1540 bundle.setcompression(compression, compopts)
1538 part = bundle.newpart('changegroup', data=cg.getchunks())
1541 part = bundle.newpart('changegroup', data=cg.getchunks())
1539 part.addparam('version', cg.version)
1542 part.addparam('version', cg.version)
1540 if 'clcount' in cg.extras:
1543 if 'clcount' in cg.extras:
1541 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1544 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1542 mandatory=False)
1545 mandatory=False)
1543 chunkiter = bundle.getchunks()
1546 chunkiter = bundle.getchunks()
1544 else:
1547 else:
1545 # compression argument is only for the bundle2 case
1548 # compression argument is only for the bundle2 case
1546 assert compression is None
1549 assert compression is None
1547 if cg.version != '01':
1550 if cg.version != '01':
1548 raise error.Abort(_('old bundle types only supports v1 '
1551 raise error.Abort(_('old bundle types only supports v1 '
1549 'changegroups'))
1552 'changegroups'))
1550 header, comp = bundletypes[bundletype]
1553 header, comp = bundletypes[bundletype]
1551 if comp not in util.compengines.supportedbundletypes:
1554 if comp not in util.compengines.supportedbundletypes:
1552 raise error.Abort(_('unknown stream compression type: %s')
1555 raise error.Abort(_('unknown stream compression type: %s')
1553 % comp)
1556 % comp)
1554 compengine = util.compengines.forbundletype(comp)
1557 compengine = util.compengines.forbundletype(comp)
1555 def chunkiter():
1558 def chunkiter():
1556 yield header
1559 yield header
1557 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1560 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1558 yield chunk
1561 yield chunk
1559 chunkiter = chunkiter()
1562 chunkiter = chunkiter()
1560
1563
1561 # parse the changegroup data, otherwise we will block
1564 # parse the changegroup data, otherwise we will block
1562 # in case of sshrepo because we don't know the end of the stream
1565 # in case of sshrepo because we don't know the end of the stream
1563 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1566 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1564
1567
1565 def combinechangegroupresults(op):
1568 def combinechangegroupresults(op):
1566 """logic to combine 0 or more addchangegroup results into one"""
1569 """logic to combine 0 or more addchangegroup results into one"""
1567 results = [r.get('return', 0)
1570 results = [r.get('return', 0)
1568 for r in op.records['changegroup']]
1571 for r in op.records['changegroup']]
1569 changedheads = 0
1572 changedheads = 0
1570 result = 1
1573 result = 1
1571 for ret in results:
1574 for ret in results:
1572 # If any changegroup result is 0, return 0
1575 # If any changegroup result is 0, return 0
1573 if ret == 0:
1576 if ret == 0:
1574 result = 0
1577 result = 0
1575 break
1578 break
1576 if ret < -1:
1579 if ret < -1:
1577 changedheads += ret + 1
1580 changedheads += ret + 1
1578 elif ret > 1:
1581 elif ret > 1:
1579 changedheads += ret - 1
1582 changedheads += ret - 1
1580 if changedheads > 0:
1583 if changedheads > 0:
1581 result = 1 + changedheads
1584 result = 1 + changedheads
1582 elif changedheads < 0:
1585 elif changedheads < 0:
1583 result = -1 + changedheads
1586 result = -1 + changedheads
1584 return result
1587 return result
1585
1588
1586 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest',
1589 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest',
1587 'targetphase'))
1590 'targetphase'))
1588 def handlechangegroup(op, inpart):
1591 def handlechangegroup(op, inpart):
1589 """apply a changegroup part on the repo
1592 """apply a changegroup part on the repo
1590
1593
1591 This is a very early implementation that will massive rework before being
1594 This is a very early implementation that will massive rework before being
1592 inflicted to any end-user.
1595 inflicted to any end-user.
1593 """
1596 """
1594 tr = op.gettransaction()
1597 tr = op.gettransaction()
1595 unpackerversion = inpart.params.get('version', '01')
1598 unpackerversion = inpart.params.get('version', '01')
1596 # We should raise an appropriate exception here
1599 # We should raise an appropriate exception here
1597 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1600 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1598 # the source and url passed here are overwritten by the one contained in
1601 # the source and url passed here are overwritten by the one contained in
1599 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1602 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1600 nbchangesets = None
1603 nbchangesets = None
1601 if 'nbchanges' in inpart.params:
1604 if 'nbchanges' in inpart.params:
1602 nbchangesets = int(inpart.params.get('nbchanges'))
1605 nbchangesets = int(inpart.params.get('nbchanges'))
1603 if ('treemanifest' in inpart.params and
1606 if ('treemanifest' in inpart.params and
1604 'treemanifest' not in op.repo.requirements):
1607 'treemanifest' not in op.repo.requirements):
1605 if len(op.repo.changelog) != 0:
1608 if len(op.repo.changelog) != 0:
1606 raise error.Abort(_(
1609 raise error.Abort(_(
1607 "bundle contains tree manifests, but local repo is "
1610 "bundle contains tree manifests, but local repo is "
1608 "non-empty and does not use tree manifests"))
1611 "non-empty and does not use tree manifests"))
1609 op.repo.requirements.add('treemanifest')
1612 op.repo.requirements.add('treemanifest')
1610 op.repo._applyopenerreqs()
1613 op.repo._applyopenerreqs()
1611 op.repo._writerequirements()
1614 op.repo._writerequirements()
1612 extrakwargs = {}
1615 extrakwargs = {}
1613 targetphase = inpart.params.get('targetphase')
1616 targetphase = inpart.params.get('targetphase')
1614 if targetphase is not None:
1617 if targetphase is not None:
1615 extrakwargs['targetphase'] = int(targetphase)
1618 extrakwargs['targetphase'] = int(targetphase)
1616 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2',
1619 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2',
1617 expectedtotal=nbchangesets, **extrakwargs)
1620 expectedtotal=nbchangesets, **extrakwargs)
1618 if op.reply is not None:
1621 if op.reply is not None:
1619 # This is definitely not the final form of this
1622 # This is definitely not the final form of this
1620 # return. But one need to start somewhere.
1623 # return. But one need to start somewhere.
1621 part = op.reply.newpart('reply:changegroup', mandatory=False)
1624 part = op.reply.newpart('reply:changegroup', mandatory=False)
1622 part.addparam(
1625 part.addparam(
1623 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1626 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1624 part.addparam('return', '%i' % ret, mandatory=False)
1627 part.addparam('return', '%i' % ret, mandatory=False)
1625 assert not inpart.read()
1628 assert not inpart.read()
1626
1629
1627 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1630 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1628 ['digest:%s' % k for k in util.DIGESTS.keys()])
1631 ['digest:%s' % k for k in util.DIGESTS.keys()])
1629 @parthandler('remote-changegroup', _remotechangegroupparams)
1632 @parthandler('remote-changegroup', _remotechangegroupparams)
1630 def handleremotechangegroup(op, inpart):
1633 def handleremotechangegroup(op, inpart):
1631 """apply a bundle10 on the repo, given an url and validation information
1634 """apply a bundle10 on the repo, given an url and validation information
1632
1635
1633 All the information about the remote bundle to import are given as
1636 All the information about the remote bundle to import are given as
1634 parameters. The parameters include:
1637 parameters. The parameters include:
1635 - url: the url to the bundle10.
1638 - url: the url to the bundle10.
1636 - size: the bundle10 file size. It is used to validate what was
1639 - size: the bundle10 file size. It is used to validate what was
1637 retrieved by the client matches the server knowledge about the bundle.
1640 retrieved by the client matches the server knowledge about the bundle.
1638 - digests: a space separated list of the digest types provided as
1641 - digests: a space separated list of the digest types provided as
1639 parameters.
1642 parameters.
1640 - digest:<digest-type>: the hexadecimal representation of the digest with
1643 - digest:<digest-type>: the hexadecimal representation of the digest with
1641 that name. Like the size, it is used to validate what was retrieved by
1644 that name. Like the size, it is used to validate what was retrieved by
1642 the client matches what the server knows about the bundle.
1645 the client matches what the server knows about the bundle.
1643
1646
1644 When multiple digest types are given, all of them are checked.
1647 When multiple digest types are given, all of them are checked.
1645 """
1648 """
1646 try:
1649 try:
1647 raw_url = inpart.params['url']
1650 raw_url = inpart.params['url']
1648 except KeyError:
1651 except KeyError:
1649 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1652 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1650 parsed_url = util.url(raw_url)
1653 parsed_url = util.url(raw_url)
1651 if parsed_url.scheme not in capabilities['remote-changegroup']:
1654 if parsed_url.scheme not in capabilities['remote-changegroup']:
1652 raise error.Abort(_('remote-changegroup does not support %s urls') %
1655 raise error.Abort(_('remote-changegroup does not support %s urls') %
1653 parsed_url.scheme)
1656 parsed_url.scheme)
1654
1657
1655 try:
1658 try:
1656 size = int(inpart.params['size'])
1659 size = int(inpart.params['size'])
1657 except ValueError:
1660 except ValueError:
1658 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1661 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1659 % 'size')
1662 % 'size')
1660 except KeyError:
1663 except KeyError:
1661 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1664 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1662
1665
1663 digests = {}
1666 digests = {}
1664 for typ in inpart.params.get('digests', '').split():
1667 for typ in inpart.params.get('digests', '').split():
1665 param = 'digest:%s' % typ
1668 param = 'digest:%s' % typ
1666 try:
1669 try:
1667 value = inpart.params[param]
1670 value = inpart.params[param]
1668 except KeyError:
1671 except KeyError:
1669 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1672 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1670 param)
1673 param)
1671 digests[typ] = value
1674 digests[typ] = value
1672
1675
1673 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1676 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1674
1677
1675 tr = op.gettransaction()
1678 tr = op.gettransaction()
1676 from . import exchange
1679 from . import exchange
1677 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1680 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1678 if not isinstance(cg, changegroup.cg1unpacker):
1681 if not isinstance(cg, changegroup.cg1unpacker):
1679 raise error.Abort(_('%s: not a bundle version 1.0') %
1682 raise error.Abort(_('%s: not a bundle version 1.0') %
1680 util.hidepassword(raw_url))
1683 util.hidepassword(raw_url))
1681 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2')
1684 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2')
1682 if op.reply is not None:
1685 if op.reply is not None:
1683 # This is definitely not the final form of this
1686 # This is definitely not the final form of this
1684 # return. But one need to start somewhere.
1687 # return. But one need to start somewhere.
1685 part = op.reply.newpart('reply:changegroup')
1688 part = op.reply.newpart('reply:changegroup')
1686 part.addparam(
1689 part.addparam(
1687 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1690 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1688 part.addparam('return', '%i' % ret, mandatory=False)
1691 part.addparam('return', '%i' % ret, mandatory=False)
1689 try:
1692 try:
1690 real_part.validate()
1693 real_part.validate()
1691 except error.Abort as e:
1694 except error.Abort as e:
1692 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1695 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1693 (util.hidepassword(raw_url), str(e)))
1696 (util.hidepassword(raw_url), str(e)))
1694 assert not inpart.read()
1697 assert not inpart.read()
1695
1698
1696 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1699 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1697 def handlereplychangegroup(op, inpart):
1700 def handlereplychangegroup(op, inpart):
1698 ret = int(inpart.params['return'])
1701 ret = int(inpart.params['return'])
1699 replyto = int(inpart.params['in-reply-to'])
1702 replyto = int(inpart.params['in-reply-to'])
1700 op.records.add('changegroup', {'return': ret}, replyto)
1703 op.records.add('changegroup', {'return': ret}, replyto)
1701
1704
1702 @parthandler('check:heads')
1705 @parthandler('check:heads')
1703 def handlecheckheads(op, inpart):
1706 def handlecheckheads(op, inpart):
1704 """check that head of the repo did not change
1707 """check that head of the repo did not change
1705
1708
1706 This is used to detect a push race when using unbundle.
1709 This is used to detect a push race when using unbundle.
1707 This replaces the "heads" argument of unbundle."""
1710 This replaces the "heads" argument of unbundle."""
1708 h = inpart.read(20)
1711 h = inpart.read(20)
1709 heads = []
1712 heads = []
1710 while len(h) == 20:
1713 while len(h) == 20:
1711 heads.append(h)
1714 heads.append(h)
1712 h = inpart.read(20)
1715 h = inpart.read(20)
1713 assert not h
1716 assert not h
1714 # Trigger a transaction so that we are guaranteed to have the lock now.
1717 # Trigger a transaction so that we are guaranteed to have the lock now.
1715 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1718 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1716 op.gettransaction()
1719 op.gettransaction()
1717 if sorted(heads) != sorted(op.repo.heads()):
1720 if sorted(heads) != sorted(op.repo.heads()):
1718 raise error.PushRaced('repository changed while pushing - '
1721 raise error.PushRaced('repository changed while pushing - '
1719 'please try again')
1722 'please try again')
1720
1723
1721 @parthandler('check:updated-heads')
1724 @parthandler('check:updated-heads')
1722 def handlecheckupdatedheads(op, inpart):
1725 def handlecheckupdatedheads(op, inpart):
1723 """check for race on the heads touched by a push
1726 """check for race on the heads touched by a push
1724
1727
1725 This is similar to 'check:heads' but focus on the heads actually updated
1728 This is similar to 'check:heads' but focus on the heads actually updated
1726 during the push. If other activities happen on unrelated heads, it is
1729 during the push. If other activities happen on unrelated heads, it is
1727 ignored.
1730 ignored.
1728
1731
1729 This allow server with high traffic to avoid push contention as long as
1732 This allow server with high traffic to avoid push contention as long as
1730 unrelated parts of the graph are involved."""
1733 unrelated parts of the graph are involved."""
1731 h = inpart.read(20)
1734 h = inpart.read(20)
1732 heads = []
1735 heads = []
1733 while len(h) == 20:
1736 while len(h) == 20:
1734 heads.append(h)
1737 heads.append(h)
1735 h = inpart.read(20)
1738 h = inpart.read(20)
1736 assert not h
1739 assert not h
1737 # trigger a transaction so that we are guaranteed to have the lock now.
1740 # trigger a transaction so that we are guaranteed to have the lock now.
1738 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1741 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1739 op.gettransaction()
1742 op.gettransaction()
1740
1743
1741 currentheads = set()
1744 currentheads = set()
1742 for ls in op.repo.branchmap().itervalues():
1745 for ls in op.repo.branchmap().itervalues():
1743 currentheads.update(ls)
1746 currentheads.update(ls)
1744
1747
1745 for h in heads:
1748 for h in heads:
1746 if h not in currentheads:
1749 if h not in currentheads:
1747 raise error.PushRaced('repository changed while pushing - '
1750 raise error.PushRaced('repository changed while pushing - '
1748 'please try again')
1751 'please try again')
1749
1752
1750 @parthandler('output')
1753 @parthandler('output')
1751 def handleoutput(op, inpart):
1754 def handleoutput(op, inpart):
1752 """forward output captured on the server to the client"""
1755 """forward output captured on the server to the client"""
1753 for line in inpart.read().splitlines():
1756 for line in inpart.read().splitlines():
1754 op.ui.status(_('remote: %s\n') % line)
1757 op.ui.status(_('remote: %s\n') % line)
1755
1758
1756 @parthandler('replycaps')
1759 @parthandler('replycaps')
1757 def handlereplycaps(op, inpart):
1760 def handlereplycaps(op, inpart):
1758 """Notify that a reply bundle should be created
1761 """Notify that a reply bundle should be created
1759
1762
1760 The payload contains the capabilities information for the reply"""
1763 The payload contains the capabilities information for the reply"""
1761 caps = decodecaps(inpart.read())
1764 caps = decodecaps(inpart.read())
1762 if op.reply is None:
1765 if op.reply is None:
1763 op.reply = bundle20(op.ui, caps)
1766 op.reply = bundle20(op.ui, caps)
1764
1767
1765 class AbortFromPart(error.Abort):
1768 class AbortFromPart(error.Abort):
1766 """Sub-class of Abort that denotes an error from a bundle2 part."""
1769 """Sub-class of Abort that denotes an error from a bundle2 part."""
1767
1770
1768 @parthandler('error:abort', ('message', 'hint'))
1771 @parthandler('error:abort', ('message', 'hint'))
1769 def handleerrorabort(op, inpart):
1772 def handleerrorabort(op, inpart):
1770 """Used to transmit abort error over the wire"""
1773 """Used to transmit abort error over the wire"""
1771 raise AbortFromPart(inpart.params['message'],
1774 raise AbortFromPart(inpart.params['message'],
1772 hint=inpart.params.get('hint'))
1775 hint=inpart.params.get('hint'))
1773
1776
1774 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1777 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1775 'in-reply-to'))
1778 'in-reply-to'))
1776 def handleerrorpushkey(op, inpart):
1779 def handleerrorpushkey(op, inpart):
1777 """Used to transmit failure of a mandatory pushkey over the wire"""
1780 """Used to transmit failure of a mandatory pushkey over the wire"""
1778 kwargs = {}
1781 kwargs = {}
1779 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1782 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1780 value = inpart.params.get(name)
1783 value = inpart.params.get(name)
1781 if value is not None:
1784 if value is not None:
1782 kwargs[name] = value
1785 kwargs[name] = value
1783 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1786 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1784
1787
1785 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1788 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1786 def handleerrorunsupportedcontent(op, inpart):
1789 def handleerrorunsupportedcontent(op, inpart):
1787 """Used to transmit unknown content error over the wire"""
1790 """Used to transmit unknown content error over the wire"""
1788 kwargs = {}
1791 kwargs = {}
1789 parttype = inpart.params.get('parttype')
1792 parttype = inpart.params.get('parttype')
1790 if parttype is not None:
1793 if parttype is not None:
1791 kwargs['parttype'] = parttype
1794 kwargs['parttype'] = parttype
1792 params = inpart.params.get('params')
1795 params = inpart.params.get('params')
1793 if params is not None:
1796 if params is not None:
1794 kwargs['params'] = params.split('\0')
1797 kwargs['params'] = params.split('\0')
1795
1798
1796 raise error.BundleUnknownFeatureError(**kwargs)
1799 raise error.BundleUnknownFeatureError(**kwargs)
1797
1800
1798 @parthandler('error:pushraced', ('message',))
1801 @parthandler('error:pushraced', ('message',))
1799 def handleerrorpushraced(op, inpart):
1802 def handleerrorpushraced(op, inpart):
1800 """Used to transmit push race error over the wire"""
1803 """Used to transmit push race error over the wire"""
1801 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1804 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1802
1805
1803 @parthandler('listkeys', ('namespace',))
1806 @parthandler('listkeys', ('namespace',))
1804 def handlelistkeys(op, inpart):
1807 def handlelistkeys(op, inpart):
1805 """retrieve pushkey namespace content stored in a bundle2"""
1808 """retrieve pushkey namespace content stored in a bundle2"""
1806 namespace = inpart.params['namespace']
1809 namespace = inpart.params['namespace']
1807 r = pushkey.decodekeys(inpart.read())
1810 r = pushkey.decodekeys(inpart.read())
1808 op.records.add('listkeys', (namespace, r))
1811 op.records.add('listkeys', (namespace, r))
1809
1812
1810 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1813 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1811 def handlepushkey(op, inpart):
1814 def handlepushkey(op, inpart):
1812 """process a pushkey request"""
1815 """process a pushkey request"""
1813 dec = pushkey.decode
1816 dec = pushkey.decode
1814 namespace = dec(inpart.params['namespace'])
1817 namespace = dec(inpart.params['namespace'])
1815 key = dec(inpart.params['key'])
1818 key = dec(inpart.params['key'])
1816 old = dec(inpart.params['old'])
1819 old = dec(inpart.params['old'])
1817 new = dec(inpart.params['new'])
1820 new = dec(inpart.params['new'])
1818 # Grab the transaction to ensure that we have the lock before performing the
1821 # Grab the transaction to ensure that we have the lock before performing the
1819 # pushkey.
1822 # pushkey.
1820 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1823 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1821 op.gettransaction()
1824 op.gettransaction()
1822 ret = op.repo.pushkey(namespace, key, old, new)
1825 ret = op.repo.pushkey(namespace, key, old, new)
1823 record = {'namespace': namespace,
1826 record = {'namespace': namespace,
1824 'key': key,
1827 'key': key,
1825 'old': old,
1828 'old': old,
1826 'new': new}
1829 'new': new}
1827 op.records.add('pushkey', record)
1830 op.records.add('pushkey', record)
1828 if op.reply is not None:
1831 if op.reply is not None:
1829 rpart = op.reply.newpart('reply:pushkey')
1832 rpart = op.reply.newpart('reply:pushkey')
1830 rpart.addparam(
1833 rpart.addparam(
1831 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1834 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1832 rpart.addparam('return', '%i' % ret, mandatory=False)
1835 rpart.addparam('return', '%i' % ret, mandatory=False)
1833 if inpart.mandatory and not ret:
1836 if inpart.mandatory and not ret:
1834 kwargs = {}
1837 kwargs = {}
1835 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1838 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1836 if key in inpart.params:
1839 if key in inpart.params:
1837 kwargs[key] = inpart.params[key]
1840 kwargs[key] = inpart.params[key]
1838 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1841 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1839
1842
1840 def _readphaseheads(inpart):
1843 def _readphaseheads(inpart):
1841 headsbyphase = [[] for i in phases.allphases]
1844 headsbyphase = [[] for i in phases.allphases]
1842 entrysize = struct.calcsize(_fphasesentry)
1845 entrysize = struct.calcsize(_fphasesentry)
1843 while True:
1846 while True:
1844 entry = inpart.read(entrysize)
1847 entry = inpart.read(entrysize)
1845 if len(entry) < entrysize:
1848 if len(entry) < entrysize:
1846 if entry:
1849 if entry:
1847 raise error.Abort(_('bad phase-heads bundle part'))
1850 raise error.Abort(_('bad phase-heads bundle part'))
1848 break
1851 break
1849 phase, node = struct.unpack(_fphasesentry, entry)
1852 phase, node = struct.unpack(_fphasesentry, entry)
1850 headsbyphase[phase].append(node)
1853 headsbyphase[phase].append(node)
1851 return headsbyphase
1854 return headsbyphase
1852
1855
1853 @parthandler('phase-heads')
1856 @parthandler('phase-heads')
1854 def handlephases(op, inpart):
1857 def handlephases(op, inpart):
1855 """apply phases from bundle part to repo"""
1858 """apply phases from bundle part to repo"""
1856 headsbyphase = _readphaseheads(inpart)
1859 headsbyphase = _readphaseheads(inpart)
1857 phases.updatephases(op.repo.unfiltered(), op.gettransaction(), headsbyphase)
1860 phases.updatephases(op.repo.unfiltered(), op.gettransaction(), headsbyphase)
1858 op.records.add('phase-heads', {})
1861 op.records.add('phase-heads', {})
1859
1862
1860 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1863 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1861 def handlepushkeyreply(op, inpart):
1864 def handlepushkeyreply(op, inpart):
1862 """retrieve the result of a pushkey request"""
1865 """retrieve the result of a pushkey request"""
1863 ret = int(inpart.params['return'])
1866 ret = int(inpart.params['return'])
1864 partid = int(inpart.params['in-reply-to'])
1867 partid = int(inpart.params['in-reply-to'])
1865 op.records.add('pushkey', {'return': ret}, partid)
1868 op.records.add('pushkey', {'return': ret}, partid)
1866
1869
1867 @parthandler('obsmarkers')
1870 @parthandler('obsmarkers')
1868 def handleobsmarker(op, inpart):
1871 def handleobsmarker(op, inpart):
1869 """add a stream of obsmarkers to the repo"""
1872 """add a stream of obsmarkers to the repo"""
1870 tr = op.gettransaction()
1873 tr = op.gettransaction()
1871 markerdata = inpart.read()
1874 markerdata = inpart.read()
1872 if op.ui.config('experimental', 'obsmarkers-exchange-debug'):
1875 if op.ui.config('experimental', 'obsmarkers-exchange-debug'):
1873 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1876 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1874 % len(markerdata))
1877 % len(markerdata))
1875 # The mergemarkers call will crash if marker creation is not enabled.
1878 # The mergemarkers call will crash if marker creation is not enabled.
1876 # we want to avoid this if the part is advisory.
1879 # we want to avoid this if the part is advisory.
1877 if not inpart.mandatory and op.repo.obsstore.readonly:
1880 if not inpart.mandatory and op.repo.obsstore.readonly:
1878 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1881 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1879 return
1882 return
1880 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1883 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1881 op.repo.invalidatevolatilesets()
1884 op.repo.invalidatevolatilesets()
1882 if new:
1885 if new:
1883 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1886 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1884 op.records.add('obsmarkers', {'new': new})
1887 op.records.add('obsmarkers', {'new': new})
1885 if op.reply is not None:
1888 if op.reply is not None:
1886 rpart = op.reply.newpart('reply:obsmarkers')
1889 rpart = op.reply.newpart('reply:obsmarkers')
1887 rpart.addparam(
1890 rpart.addparam(
1888 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1891 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1889 rpart.addparam('new', '%i' % new, mandatory=False)
1892 rpart.addparam('new', '%i' % new, mandatory=False)
1890
1893
1891
1894
1892 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1895 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1893 def handleobsmarkerreply(op, inpart):
1896 def handleobsmarkerreply(op, inpart):
1894 """retrieve the result of a pushkey request"""
1897 """retrieve the result of a pushkey request"""
1895 ret = int(inpart.params['new'])
1898 ret = int(inpart.params['new'])
1896 partid = int(inpart.params['in-reply-to'])
1899 partid = int(inpart.params['in-reply-to'])
1897 op.records.add('obsmarkers', {'new': ret}, partid)
1900 op.records.add('obsmarkers', {'new': ret}, partid)
1898
1901
1899 @parthandler('hgtagsfnodes')
1902 @parthandler('hgtagsfnodes')
1900 def handlehgtagsfnodes(op, inpart):
1903 def handlehgtagsfnodes(op, inpart):
1901 """Applies .hgtags fnodes cache entries to the local repo.
1904 """Applies .hgtags fnodes cache entries to the local repo.
1902
1905
1903 Payload is pairs of 20 byte changeset nodes and filenodes.
1906 Payload is pairs of 20 byte changeset nodes and filenodes.
1904 """
1907 """
1905 # Grab the transaction so we ensure that we have the lock at this point.
1908 # Grab the transaction so we ensure that we have the lock at this point.
1906 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1909 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1907 op.gettransaction()
1910 op.gettransaction()
1908 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1911 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1909
1912
1910 count = 0
1913 count = 0
1911 while True:
1914 while True:
1912 node = inpart.read(20)
1915 node = inpart.read(20)
1913 fnode = inpart.read(20)
1916 fnode = inpart.read(20)
1914 if len(node) < 20 or len(fnode) < 20:
1917 if len(node) < 20 or len(fnode) < 20:
1915 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1918 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1916 break
1919 break
1917 cache.setfnode(node, fnode)
1920 cache.setfnode(node, fnode)
1918 count += 1
1921 count += 1
1919
1922
1920 cache.write()
1923 cache.write()
1921 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
1924 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
1922
1925
1923 @parthandler('pushvars')
1926 @parthandler('pushvars')
1924 def bundle2getvars(op, part):
1927 def bundle2getvars(op, part):
1925 '''unbundle a bundle2 containing shellvars on the server'''
1928 '''unbundle a bundle2 containing shellvars on the server'''
1926 # An option to disable unbundling on server-side for security reasons
1929 # An option to disable unbundling on server-side for security reasons
1927 if op.ui.configbool('push', 'pushvars.server'):
1930 if op.ui.configbool('push', 'pushvars.server'):
1928 hookargs = {}
1931 hookargs = {}
1929 for key, value in part.advisoryparams:
1932 for key, value in part.advisoryparams:
1930 key = key.upper()
1933 key = key.upper()
1931 # We want pushed variables to have USERVAR_ prepended so we know
1934 # We want pushed variables to have USERVAR_ prepended so we know
1932 # they came from the --pushvar flag.
1935 # they came from the --pushvar flag.
1933 key = "USERVAR_" + key
1936 key = "USERVAR_" + key
1934 hookargs[key] = value
1937 hookargs[key] = value
1935 op.addhookargs(hookargs)
1938 op.addhookargs(hookargs)
General Comments 0
You need to be logged in to leave comments. Login now