##// END OF EJS Templates
bundle2: immediate exit for ctrl+c (issue5692)...
Durham Goode -
r34638:5f79f5f8 default
parent child Browse files
Show More
@@ -1,1923 +1,1923 b''
1 # bundle2.py - generic container format to transmit arbitrary data.
1 # bundle2.py - generic container format to transmit arbitrary data.
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7 """Handling of the new bundle2 format
7 """Handling of the new bundle2 format
8
8
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 payloads in an application agnostic way. It consist in a sequence of "parts"
10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 that will be handed to and processed by the application layer.
11 that will be handed to and processed by the application layer.
12
12
13
13
14 General format architecture
14 General format architecture
15 ===========================
15 ===========================
16
16
17 The format is architectured as follow
17 The format is architectured as follow
18
18
19 - magic string
19 - magic string
20 - stream level parameters
20 - stream level parameters
21 - payload parts (any number)
21 - payload parts (any number)
22 - end of stream marker.
22 - end of stream marker.
23
23
24 the Binary format
24 the Binary format
25 ============================
25 ============================
26
26
27 All numbers are unsigned and big-endian.
27 All numbers are unsigned and big-endian.
28
28
29 stream level parameters
29 stream level parameters
30 ------------------------
30 ------------------------
31
31
32 Binary format is as follow
32 Binary format is as follow
33
33
34 :params size: int32
34 :params size: int32
35
35
36 The total number of Bytes used by the parameters
36 The total number of Bytes used by the parameters
37
37
38 :params value: arbitrary number of Bytes
38 :params value: arbitrary number of Bytes
39
39
40 A blob of `params size` containing the serialized version of all stream level
40 A blob of `params size` containing the serialized version of all stream level
41 parameters.
41 parameters.
42
42
43 The blob contains a space separated list of parameters. Parameters with value
43 The blob contains a space separated list of parameters. Parameters with value
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45
45
46 Empty name are obviously forbidden.
46 Empty name are obviously forbidden.
47
47
48 Name MUST start with a letter. If this first letter is lower case, the
48 Name MUST start with a letter. If this first letter is lower case, the
49 parameter is advisory and can be safely ignored. However when the first
49 parameter is advisory and can be safely ignored. However when the first
50 letter is capital, the parameter is mandatory and the bundling process MUST
50 letter is capital, the parameter is mandatory and the bundling process MUST
51 stop if he is not able to proceed it.
51 stop if he is not able to proceed it.
52
52
53 Stream parameters use a simple textual format for two main reasons:
53 Stream parameters use a simple textual format for two main reasons:
54
54
55 - Stream level parameters should remain simple and we want to discourage any
55 - Stream level parameters should remain simple and we want to discourage any
56 crazy usage.
56 crazy usage.
57 - Textual data allow easy human inspection of a bundle2 header in case of
57 - Textual data allow easy human inspection of a bundle2 header in case of
58 troubles.
58 troubles.
59
59
60 Any Applicative level options MUST go into a bundle2 part instead.
60 Any Applicative level options MUST go into a bundle2 part instead.
61
61
62 Payload part
62 Payload part
63 ------------------------
63 ------------------------
64
64
65 Binary format is as follow
65 Binary format is as follow
66
66
67 :header size: int32
67 :header size: int32
68
68
69 The total number of Bytes used by the part header. When the header is empty
69 The total number of Bytes used by the part header. When the header is empty
70 (size = 0) this is interpreted as the end of stream marker.
70 (size = 0) this is interpreted as the end of stream marker.
71
71
72 :header:
72 :header:
73
73
74 The header defines how to interpret the part. It contains two piece of
74 The header defines how to interpret the part. It contains two piece of
75 data: the part type, and the part parameters.
75 data: the part type, and the part parameters.
76
76
77 The part type is used to route an application level handler, that can
77 The part type is used to route an application level handler, that can
78 interpret payload.
78 interpret payload.
79
79
80 Part parameters are passed to the application level handler. They are
80 Part parameters are passed to the application level handler. They are
81 meant to convey information that will help the application level object to
81 meant to convey information that will help the application level object to
82 interpret the part payload.
82 interpret the part payload.
83
83
84 The binary format of the header is has follow
84 The binary format of the header is has follow
85
85
86 :typesize: (one byte)
86 :typesize: (one byte)
87
87
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89
89
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 to this part.
91 to this part.
92
92
93 :parameters:
93 :parameters:
94
94
95 Part's parameter may have arbitrary content, the binary structure is::
95 Part's parameter may have arbitrary content, the binary structure is::
96
96
97 <mandatory-count><advisory-count><param-sizes><param-data>
97 <mandatory-count><advisory-count><param-sizes><param-data>
98
98
99 :mandatory-count: 1 byte, number of mandatory parameters
99 :mandatory-count: 1 byte, number of mandatory parameters
100
100
101 :advisory-count: 1 byte, number of advisory parameters
101 :advisory-count: 1 byte, number of advisory parameters
102
102
103 :param-sizes:
103 :param-sizes:
104
104
105 N couple of bytes, where N is the total number of parameters. Each
105 N couple of bytes, where N is the total number of parameters. Each
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107
107
108 :param-data:
108 :param-data:
109
109
110 A blob of bytes from which each parameter key and value can be
110 A blob of bytes from which each parameter key and value can be
111 retrieved using the list of size couples stored in the previous
111 retrieved using the list of size couples stored in the previous
112 field.
112 field.
113
113
114 Mandatory parameters comes first, then the advisory ones.
114 Mandatory parameters comes first, then the advisory ones.
115
115
116 Each parameter's key MUST be unique within the part.
116 Each parameter's key MUST be unique within the part.
117
117
118 :payload:
118 :payload:
119
119
120 payload is a series of `<chunksize><chunkdata>`.
120 payload is a series of `<chunksize><chunkdata>`.
121
121
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124
124
125 The current implementation always produces either zero or one chunk.
125 The current implementation always produces either zero or one chunk.
126 This is an implementation limitation that will ultimately be lifted.
126 This is an implementation limitation that will ultimately be lifted.
127
127
128 `chunksize` can be negative to trigger special case processing. No such
128 `chunksize` can be negative to trigger special case processing. No such
129 processing is in place yet.
129 processing is in place yet.
130
130
131 Bundle processing
131 Bundle processing
132 ============================
132 ============================
133
133
134 Each part is processed in order using a "part handler". Handler are registered
134 Each part is processed in order using a "part handler". Handler are registered
135 for a certain part type.
135 for a certain part type.
136
136
137 The matching of a part to its handler is case insensitive. The case of the
137 The matching of a part to its handler is case insensitive. The case of the
138 part type is used to know if a part is mandatory or advisory. If the Part type
138 part type is used to know if a part is mandatory or advisory. If the Part type
139 contains any uppercase char it is considered mandatory. When no handler is
139 contains any uppercase char it is considered mandatory. When no handler is
140 known for a Mandatory part, the process is aborted and an exception is raised.
140 known for a Mandatory part, the process is aborted and an exception is raised.
141 If the part is advisory and no handler is known, the part is ignored. When the
141 If the part is advisory and no handler is known, the part is ignored. When the
142 process is aborted, the full bundle is still read from the stream to keep the
142 process is aborted, the full bundle is still read from the stream to keep the
143 channel usable. But none of the part read from an abort are processed. In the
143 channel usable. But none of the part read from an abort are processed. In the
144 future, dropping the stream may become an option for channel we do not care to
144 future, dropping the stream may become an option for channel we do not care to
145 preserve.
145 preserve.
146 """
146 """
147
147
148 from __future__ import absolute_import, division
148 from __future__ import absolute_import, division
149
149
150 import errno
150 import errno
151 import re
151 import re
152 import string
152 import string
153 import struct
153 import struct
154 import sys
154 import sys
155
155
156 from .i18n import _
156 from .i18n import _
157 from . import (
157 from . import (
158 changegroup,
158 changegroup,
159 error,
159 error,
160 obsolete,
160 obsolete,
161 phases,
161 phases,
162 pushkey,
162 pushkey,
163 pycompat,
163 pycompat,
164 tags,
164 tags,
165 url,
165 url,
166 util,
166 util,
167 )
167 )
168
168
169 urlerr = util.urlerr
169 urlerr = util.urlerr
170 urlreq = util.urlreq
170 urlreq = util.urlreq
171
171
172 _pack = struct.pack
172 _pack = struct.pack
173 _unpack = struct.unpack
173 _unpack = struct.unpack
174
174
175 _fstreamparamsize = '>i'
175 _fstreamparamsize = '>i'
176 _fpartheadersize = '>i'
176 _fpartheadersize = '>i'
177 _fparttypesize = '>B'
177 _fparttypesize = '>B'
178 _fpartid = '>I'
178 _fpartid = '>I'
179 _fpayloadsize = '>i'
179 _fpayloadsize = '>i'
180 _fpartparamcount = '>BB'
180 _fpartparamcount = '>BB'
181
181
182 preferedchunksize = 4096
182 preferedchunksize = 4096
183
183
184 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
184 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
185
185
186 def outdebug(ui, message):
186 def outdebug(ui, message):
187 """debug regarding output stream (bundling)"""
187 """debug regarding output stream (bundling)"""
188 if ui.configbool('devel', 'bundle2.debug'):
188 if ui.configbool('devel', 'bundle2.debug'):
189 ui.debug('bundle2-output: %s\n' % message)
189 ui.debug('bundle2-output: %s\n' % message)
190
190
191 def indebug(ui, message):
191 def indebug(ui, message):
192 """debug on input stream (unbundling)"""
192 """debug on input stream (unbundling)"""
193 if ui.configbool('devel', 'bundle2.debug'):
193 if ui.configbool('devel', 'bundle2.debug'):
194 ui.debug('bundle2-input: %s\n' % message)
194 ui.debug('bundle2-input: %s\n' % message)
195
195
196 def validateparttype(parttype):
196 def validateparttype(parttype):
197 """raise ValueError if a parttype contains invalid character"""
197 """raise ValueError if a parttype contains invalid character"""
198 if _parttypeforbidden.search(parttype):
198 if _parttypeforbidden.search(parttype):
199 raise ValueError(parttype)
199 raise ValueError(parttype)
200
200
201 def _makefpartparamsizes(nbparams):
201 def _makefpartparamsizes(nbparams):
202 """return a struct format to read part parameter sizes
202 """return a struct format to read part parameter sizes
203
203
204 The number parameters is variable so we need to build that format
204 The number parameters is variable so we need to build that format
205 dynamically.
205 dynamically.
206 """
206 """
207 return '>'+('BB'*nbparams)
207 return '>'+('BB'*nbparams)
208
208
209 parthandlermapping = {}
209 parthandlermapping = {}
210
210
211 def parthandler(parttype, params=()):
211 def parthandler(parttype, params=()):
212 """decorator that register a function as a bundle2 part handler
212 """decorator that register a function as a bundle2 part handler
213
213
214 eg::
214 eg::
215
215
216 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
216 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
217 def myparttypehandler(...):
217 def myparttypehandler(...):
218 '''process a part of type "my part".'''
218 '''process a part of type "my part".'''
219 ...
219 ...
220 """
220 """
221 validateparttype(parttype)
221 validateparttype(parttype)
222 def _decorator(func):
222 def _decorator(func):
223 lparttype = parttype.lower() # enforce lower case matching.
223 lparttype = parttype.lower() # enforce lower case matching.
224 assert lparttype not in parthandlermapping
224 assert lparttype not in parthandlermapping
225 parthandlermapping[lparttype] = func
225 parthandlermapping[lparttype] = func
226 func.params = frozenset(params)
226 func.params = frozenset(params)
227 return func
227 return func
228 return _decorator
228 return _decorator
229
229
230 class unbundlerecords(object):
230 class unbundlerecords(object):
231 """keep record of what happens during and unbundle
231 """keep record of what happens during and unbundle
232
232
233 New records are added using `records.add('cat', obj)`. Where 'cat' is a
233 New records are added using `records.add('cat', obj)`. Where 'cat' is a
234 category of record and obj is an arbitrary object.
234 category of record and obj is an arbitrary object.
235
235
236 `records['cat']` will return all entries of this category 'cat'.
236 `records['cat']` will return all entries of this category 'cat'.
237
237
238 Iterating on the object itself will yield `('category', obj)` tuples
238 Iterating on the object itself will yield `('category', obj)` tuples
239 for all entries.
239 for all entries.
240
240
241 All iterations happens in chronological order.
241 All iterations happens in chronological order.
242 """
242 """
243
243
244 def __init__(self):
244 def __init__(self):
245 self._categories = {}
245 self._categories = {}
246 self._sequences = []
246 self._sequences = []
247 self._replies = {}
247 self._replies = {}
248
248
249 def add(self, category, entry, inreplyto=None):
249 def add(self, category, entry, inreplyto=None):
250 """add a new record of a given category.
250 """add a new record of a given category.
251
251
252 The entry can then be retrieved in the list returned by
252 The entry can then be retrieved in the list returned by
253 self['category']."""
253 self['category']."""
254 self._categories.setdefault(category, []).append(entry)
254 self._categories.setdefault(category, []).append(entry)
255 self._sequences.append((category, entry))
255 self._sequences.append((category, entry))
256 if inreplyto is not None:
256 if inreplyto is not None:
257 self.getreplies(inreplyto).add(category, entry)
257 self.getreplies(inreplyto).add(category, entry)
258
258
259 def getreplies(self, partid):
259 def getreplies(self, partid):
260 """get the records that are replies to a specific part"""
260 """get the records that are replies to a specific part"""
261 return self._replies.setdefault(partid, unbundlerecords())
261 return self._replies.setdefault(partid, unbundlerecords())
262
262
263 def __getitem__(self, cat):
263 def __getitem__(self, cat):
264 return tuple(self._categories.get(cat, ()))
264 return tuple(self._categories.get(cat, ()))
265
265
266 def __iter__(self):
266 def __iter__(self):
267 return iter(self._sequences)
267 return iter(self._sequences)
268
268
269 def __len__(self):
269 def __len__(self):
270 return len(self._sequences)
270 return len(self._sequences)
271
271
272 def __nonzero__(self):
272 def __nonzero__(self):
273 return bool(self._sequences)
273 return bool(self._sequences)
274
274
275 __bool__ = __nonzero__
275 __bool__ = __nonzero__
276
276
277 class bundleoperation(object):
277 class bundleoperation(object):
278 """an object that represents a single bundling process
278 """an object that represents a single bundling process
279
279
280 Its purpose is to carry unbundle-related objects and states.
280 Its purpose is to carry unbundle-related objects and states.
281
281
282 A new object should be created at the beginning of each bundle processing.
282 A new object should be created at the beginning of each bundle processing.
283 The object is to be returned by the processing function.
283 The object is to be returned by the processing function.
284
284
285 The object has very little content now it will ultimately contain:
285 The object has very little content now it will ultimately contain:
286 * an access to the repo the bundle is applied to,
286 * an access to the repo the bundle is applied to,
287 * a ui object,
287 * a ui object,
288 * a way to retrieve a transaction to add changes to the repo,
288 * a way to retrieve a transaction to add changes to the repo,
289 * a way to record the result of processing each part,
289 * a way to record the result of processing each part,
290 * a way to construct a bundle response when applicable.
290 * a way to construct a bundle response when applicable.
291 """
291 """
292
292
293 def __init__(self, repo, transactiongetter, captureoutput=True):
293 def __init__(self, repo, transactiongetter, captureoutput=True):
294 self.repo = repo
294 self.repo = repo
295 self.ui = repo.ui
295 self.ui = repo.ui
296 self.records = unbundlerecords()
296 self.records = unbundlerecords()
297 self.reply = None
297 self.reply = None
298 self.captureoutput = captureoutput
298 self.captureoutput = captureoutput
299 self.hookargs = {}
299 self.hookargs = {}
300 self._gettransaction = transactiongetter
300 self._gettransaction = transactiongetter
301
301
302 def gettransaction(self):
302 def gettransaction(self):
303 transaction = self._gettransaction()
303 transaction = self._gettransaction()
304
304
305 if self.hookargs:
305 if self.hookargs:
306 # the ones added to the transaction supercede those added
306 # the ones added to the transaction supercede those added
307 # to the operation.
307 # to the operation.
308 self.hookargs.update(transaction.hookargs)
308 self.hookargs.update(transaction.hookargs)
309 transaction.hookargs = self.hookargs
309 transaction.hookargs = self.hookargs
310
310
311 # mark the hookargs as flushed. further attempts to add to
311 # mark the hookargs as flushed. further attempts to add to
312 # hookargs will result in an abort.
312 # hookargs will result in an abort.
313 self.hookargs = None
313 self.hookargs = None
314
314
315 return transaction
315 return transaction
316
316
317 def addhookargs(self, hookargs):
317 def addhookargs(self, hookargs):
318 if self.hookargs is None:
318 if self.hookargs is None:
319 raise error.ProgrammingError('attempted to add hookargs to '
319 raise error.ProgrammingError('attempted to add hookargs to '
320 'operation after transaction started')
320 'operation after transaction started')
321 self.hookargs.update(hookargs)
321 self.hookargs.update(hookargs)
322
322
323 class TransactionUnavailable(RuntimeError):
323 class TransactionUnavailable(RuntimeError):
324 pass
324 pass
325
325
326 def _notransaction():
326 def _notransaction():
327 """default method to get a transaction while processing a bundle
327 """default method to get a transaction while processing a bundle
328
328
329 Raise an exception to highlight the fact that no transaction was expected
329 Raise an exception to highlight the fact that no transaction was expected
330 to be created"""
330 to be created"""
331 raise TransactionUnavailable()
331 raise TransactionUnavailable()
332
332
333 def applybundle(repo, unbundler, tr, source=None, url=None, **kwargs):
333 def applybundle(repo, unbundler, tr, source=None, url=None, **kwargs):
334 # transform me into unbundler.apply() as soon as the freeze is lifted
334 # transform me into unbundler.apply() as soon as the freeze is lifted
335 if isinstance(unbundler, unbundle20):
335 if isinstance(unbundler, unbundle20):
336 tr.hookargs['bundle2'] = '1'
336 tr.hookargs['bundle2'] = '1'
337 if source is not None and 'source' not in tr.hookargs:
337 if source is not None and 'source' not in tr.hookargs:
338 tr.hookargs['source'] = source
338 tr.hookargs['source'] = source
339 if url is not None and 'url' not in tr.hookargs:
339 if url is not None and 'url' not in tr.hookargs:
340 tr.hookargs['url'] = url
340 tr.hookargs['url'] = url
341 return processbundle(repo, unbundler, lambda: tr)
341 return processbundle(repo, unbundler, lambda: tr)
342 else:
342 else:
343 # the transactiongetter won't be used, but we might as well set it
343 # the transactiongetter won't be used, but we might as well set it
344 op = bundleoperation(repo, lambda: tr)
344 op = bundleoperation(repo, lambda: tr)
345 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
345 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
346 return op
346 return op
347
347
348 class partiterator(object):
348 class partiterator(object):
349 def __init__(self, repo, op, unbundler):
349 def __init__(self, repo, op, unbundler):
350 self.repo = repo
350 self.repo = repo
351 self.op = op
351 self.op = op
352 self.unbundler = unbundler
352 self.unbundler = unbundler
353 self.iterator = None
353 self.iterator = None
354 self.count = 0
354 self.count = 0
355 self.current = None
355 self.current = None
356
356
357 def __enter__(self):
357 def __enter__(self):
358 def func():
358 def func():
359 itr = enumerate(self.unbundler.iterparts())
359 itr = enumerate(self.unbundler.iterparts())
360 for count, p in itr:
360 for count, p in itr:
361 self.count = count
361 self.count = count
362 self.current = p
362 self.current = p
363 yield p
363 yield p
364 p.seek(0, 2)
364 p.seek(0, 2)
365 self.current = None
365 self.current = None
366 self.iterator = func()
366 self.iterator = func()
367 return self.iterator
367 return self.iterator
368
368
369 def __exit__(self, type, exc, tb):
369 def __exit__(self, type, exc, tb):
370 if not self.iterator:
370 if not self.iterator:
371 return
371 return
372
372
373 if exc:
373 # Only gracefully abort in a normal exception situation. User aborts
374 # If exiting or interrupted, do not attempt to seek the stream in
374 # like Ctrl+C throw a KeyboardInterrupt which is not a base Exception,
375 # the finally block below. This makes abort faster.
375 # and should not gracefully cleanup.
376 if (self.current and
376 if isinstance(exc, Exception):
377 not isinstance(exc, (SystemExit, KeyboardInterrupt))):
378 # consume the part content to not corrupt the stream.
379 self.current.seek(0, 2)
380
381 # Any exceptions seeking to the end of the bundle at this point are
377 # Any exceptions seeking to the end of the bundle at this point are
382 # almost certainly related to the underlying stream being bad.
378 # almost certainly related to the underlying stream being bad.
383 # And, chances are that the exception we're handling is related to
379 # And, chances are that the exception we're handling is related to
384 # getting in that bad state. So, we swallow the seeking error and
380 # getting in that bad state. So, we swallow the seeking error and
385 # re-raise the original error.
381 # re-raise the original error.
386 seekerror = False
382 seekerror = False
387 try:
383 try:
384 if self.current:
385 # consume the part content to not corrupt the stream.
386 self.current.seek(0, 2)
387
388 for part in self.iterator:
388 for part in self.iterator:
389 # consume the bundle content
389 # consume the bundle content
390 part.seek(0, 2)
390 part.seek(0, 2)
391 except Exception:
391 except Exception:
392 seekerror = True
392 seekerror = True
393
393
394 # Small hack to let caller code distinguish exceptions from bundle2
394 # Small hack to let caller code distinguish exceptions from bundle2
395 # processing from processing the old format. This is mostly needed
395 # processing from processing the old format. This is mostly needed
396 # to handle different return codes to unbundle according to the type
396 # to handle different return codes to unbundle according to the type
397 # of bundle. We should probably clean up or drop this return code
397 # of bundle. We should probably clean up or drop this return code
398 # craziness in a future version.
398 # craziness in a future version.
399 exc.duringunbundle2 = True
399 exc.duringunbundle2 = True
400 salvaged = []
400 salvaged = []
401 replycaps = None
401 replycaps = None
402 if self.op.reply is not None:
402 if self.op.reply is not None:
403 salvaged = self.op.reply.salvageoutput()
403 salvaged = self.op.reply.salvageoutput()
404 replycaps = self.op.reply.capabilities
404 replycaps = self.op.reply.capabilities
405 exc._replycaps = replycaps
405 exc._replycaps = replycaps
406 exc._bundle2salvagedoutput = salvaged
406 exc._bundle2salvagedoutput = salvaged
407
407
408 # Re-raising from a variable loses the original stack. So only use
408 # Re-raising from a variable loses the original stack. So only use
409 # that form if we need to.
409 # that form if we need to.
410 if seekerror:
410 if seekerror:
411 raise exc
411 raise exc
412
412
413 self.repo.ui.debug('bundle2-input-bundle: %i parts total\n' %
413 self.repo.ui.debug('bundle2-input-bundle: %i parts total\n' %
414 self.count)
414 self.count)
415
415
416 def processbundle(repo, unbundler, transactiongetter=None, op=None):
416 def processbundle(repo, unbundler, transactiongetter=None, op=None):
417 """This function process a bundle, apply effect to/from a repo
417 """This function process a bundle, apply effect to/from a repo
418
418
419 It iterates over each part then searches for and uses the proper handling
419 It iterates over each part then searches for and uses the proper handling
420 code to process the part. Parts are processed in order.
420 code to process the part. Parts are processed in order.
421
421
422 Unknown Mandatory part will abort the process.
422 Unknown Mandatory part will abort the process.
423
423
424 It is temporarily possible to provide a prebuilt bundleoperation to the
424 It is temporarily possible to provide a prebuilt bundleoperation to the
425 function. This is used to ensure output is properly propagated in case of
425 function. This is used to ensure output is properly propagated in case of
426 an error during the unbundling. This output capturing part will likely be
426 an error during the unbundling. This output capturing part will likely be
427 reworked and this ability will probably go away in the process.
427 reworked and this ability will probably go away in the process.
428 """
428 """
429 if op is None:
429 if op is None:
430 if transactiongetter is None:
430 if transactiongetter is None:
431 transactiongetter = _notransaction
431 transactiongetter = _notransaction
432 op = bundleoperation(repo, transactiongetter)
432 op = bundleoperation(repo, transactiongetter)
433 # todo:
433 # todo:
434 # - replace this is a init function soon.
434 # - replace this is a init function soon.
435 # - exception catching
435 # - exception catching
436 unbundler.params
436 unbundler.params
437 if repo.ui.debugflag:
437 if repo.ui.debugflag:
438 msg = ['bundle2-input-bundle:']
438 msg = ['bundle2-input-bundle:']
439 if unbundler.params:
439 if unbundler.params:
440 msg.append(' %i params' % len(unbundler.params))
440 msg.append(' %i params' % len(unbundler.params))
441 if op._gettransaction is None or op._gettransaction is _notransaction:
441 if op._gettransaction is None or op._gettransaction is _notransaction:
442 msg.append(' no-transaction')
442 msg.append(' no-transaction')
443 else:
443 else:
444 msg.append(' with-transaction')
444 msg.append(' with-transaction')
445 msg.append('\n')
445 msg.append('\n')
446 repo.ui.debug(''.join(msg))
446 repo.ui.debug(''.join(msg))
447
447
448 processparts(repo, op, unbundler)
448 processparts(repo, op, unbundler)
449
449
450 return op
450 return op
451
451
452 def processparts(repo, op, unbundler):
452 def processparts(repo, op, unbundler):
453 with partiterator(repo, op, unbundler) as parts:
453 with partiterator(repo, op, unbundler) as parts:
454 for part in parts:
454 for part in parts:
455 _processpart(op, part)
455 _processpart(op, part)
456
456
457 def _processchangegroup(op, cg, tr, source, url, **kwargs):
457 def _processchangegroup(op, cg, tr, source, url, **kwargs):
458 ret = cg.apply(op.repo, tr, source, url, **kwargs)
458 ret = cg.apply(op.repo, tr, source, url, **kwargs)
459 op.records.add('changegroup', {
459 op.records.add('changegroup', {
460 'return': ret,
460 'return': ret,
461 })
461 })
462 return ret
462 return ret
463
463
464 def _gethandler(op, part):
464 def _gethandler(op, part):
465 status = 'unknown' # used by debug output
465 status = 'unknown' # used by debug output
466 try:
466 try:
467 handler = parthandlermapping.get(part.type)
467 handler = parthandlermapping.get(part.type)
468 if handler is None:
468 if handler is None:
469 status = 'unsupported-type'
469 status = 'unsupported-type'
470 raise error.BundleUnknownFeatureError(parttype=part.type)
470 raise error.BundleUnknownFeatureError(parttype=part.type)
471 indebug(op.ui, 'found a handler for part %s' % part.type)
471 indebug(op.ui, 'found a handler for part %s' % part.type)
472 unknownparams = part.mandatorykeys - handler.params
472 unknownparams = part.mandatorykeys - handler.params
473 if unknownparams:
473 if unknownparams:
474 unknownparams = list(unknownparams)
474 unknownparams = list(unknownparams)
475 unknownparams.sort()
475 unknownparams.sort()
476 status = 'unsupported-params (%s)' % ', '.join(unknownparams)
476 status = 'unsupported-params (%s)' % ', '.join(unknownparams)
477 raise error.BundleUnknownFeatureError(parttype=part.type,
477 raise error.BundleUnknownFeatureError(parttype=part.type,
478 params=unknownparams)
478 params=unknownparams)
479 status = 'supported'
479 status = 'supported'
480 except error.BundleUnknownFeatureError as exc:
480 except error.BundleUnknownFeatureError as exc:
481 if part.mandatory: # mandatory parts
481 if part.mandatory: # mandatory parts
482 raise
482 raise
483 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
483 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
484 return # skip to part processing
484 return # skip to part processing
485 finally:
485 finally:
486 if op.ui.debugflag:
486 if op.ui.debugflag:
487 msg = ['bundle2-input-part: "%s"' % part.type]
487 msg = ['bundle2-input-part: "%s"' % part.type]
488 if not part.mandatory:
488 if not part.mandatory:
489 msg.append(' (advisory)')
489 msg.append(' (advisory)')
490 nbmp = len(part.mandatorykeys)
490 nbmp = len(part.mandatorykeys)
491 nbap = len(part.params) - nbmp
491 nbap = len(part.params) - nbmp
492 if nbmp or nbap:
492 if nbmp or nbap:
493 msg.append(' (params:')
493 msg.append(' (params:')
494 if nbmp:
494 if nbmp:
495 msg.append(' %i mandatory' % nbmp)
495 msg.append(' %i mandatory' % nbmp)
496 if nbap:
496 if nbap:
497 msg.append(' %i advisory' % nbmp)
497 msg.append(' %i advisory' % nbmp)
498 msg.append(')')
498 msg.append(')')
499 msg.append(' %s\n' % status)
499 msg.append(' %s\n' % status)
500 op.ui.debug(''.join(msg))
500 op.ui.debug(''.join(msg))
501
501
502 return handler
502 return handler
503
503
504 def _processpart(op, part):
504 def _processpart(op, part):
505 """process a single part from a bundle
505 """process a single part from a bundle
506
506
507 The part is guaranteed to have been fully consumed when the function exits
507 The part is guaranteed to have been fully consumed when the function exits
508 (even if an exception is raised)."""
508 (even if an exception is raised)."""
509 handler = _gethandler(op, part)
509 handler = _gethandler(op, part)
510 if handler is None:
510 if handler is None:
511 return
511 return
512
512
513 # handler is called outside the above try block so that we don't
513 # handler is called outside the above try block so that we don't
514 # risk catching KeyErrors from anything other than the
514 # risk catching KeyErrors from anything other than the
515 # parthandlermapping lookup (any KeyError raised by handler()
515 # parthandlermapping lookup (any KeyError raised by handler()
516 # itself represents a defect of a different variety).
516 # itself represents a defect of a different variety).
517 output = None
517 output = None
518 if op.captureoutput and op.reply is not None:
518 if op.captureoutput and op.reply is not None:
519 op.ui.pushbuffer(error=True, subproc=True)
519 op.ui.pushbuffer(error=True, subproc=True)
520 output = ''
520 output = ''
521 try:
521 try:
522 handler(op, part)
522 handler(op, part)
523 finally:
523 finally:
524 if output is not None:
524 if output is not None:
525 output = op.ui.popbuffer()
525 output = op.ui.popbuffer()
526 if output:
526 if output:
527 outpart = op.reply.newpart('output', data=output,
527 outpart = op.reply.newpart('output', data=output,
528 mandatory=False)
528 mandatory=False)
529 outpart.addparam(
529 outpart.addparam(
530 'in-reply-to', pycompat.bytestr(part.id), mandatory=False)
530 'in-reply-to', pycompat.bytestr(part.id), mandatory=False)
531
531
532 def decodecaps(blob):
532 def decodecaps(blob):
533 """decode a bundle2 caps bytes blob into a dictionary
533 """decode a bundle2 caps bytes blob into a dictionary
534
534
535 The blob is a list of capabilities (one per line)
535 The blob is a list of capabilities (one per line)
536 Capabilities may have values using a line of the form::
536 Capabilities may have values using a line of the form::
537
537
538 capability=value1,value2,value3
538 capability=value1,value2,value3
539
539
540 The values are always a list."""
540 The values are always a list."""
541 caps = {}
541 caps = {}
542 for line in blob.splitlines():
542 for line in blob.splitlines():
543 if not line:
543 if not line:
544 continue
544 continue
545 if '=' not in line:
545 if '=' not in line:
546 key, vals = line, ()
546 key, vals = line, ()
547 else:
547 else:
548 key, vals = line.split('=', 1)
548 key, vals = line.split('=', 1)
549 vals = vals.split(',')
549 vals = vals.split(',')
550 key = urlreq.unquote(key)
550 key = urlreq.unquote(key)
551 vals = [urlreq.unquote(v) for v in vals]
551 vals = [urlreq.unquote(v) for v in vals]
552 caps[key] = vals
552 caps[key] = vals
553 return caps
553 return caps
554
554
555 def encodecaps(caps):
555 def encodecaps(caps):
556 """encode a bundle2 caps dictionary into a bytes blob"""
556 """encode a bundle2 caps dictionary into a bytes blob"""
557 chunks = []
557 chunks = []
558 for ca in sorted(caps):
558 for ca in sorted(caps):
559 vals = caps[ca]
559 vals = caps[ca]
560 ca = urlreq.quote(ca)
560 ca = urlreq.quote(ca)
561 vals = [urlreq.quote(v) for v in vals]
561 vals = [urlreq.quote(v) for v in vals]
562 if vals:
562 if vals:
563 ca = "%s=%s" % (ca, ','.join(vals))
563 ca = "%s=%s" % (ca, ','.join(vals))
564 chunks.append(ca)
564 chunks.append(ca)
565 return '\n'.join(chunks)
565 return '\n'.join(chunks)
566
566
567 bundletypes = {
567 bundletypes = {
568 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
568 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
569 # since the unification ssh accepts a header but there
569 # since the unification ssh accepts a header but there
570 # is no capability signaling it.
570 # is no capability signaling it.
571 "HG20": (), # special-cased below
571 "HG20": (), # special-cased below
572 "HG10UN": ("HG10UN", 'UN'),
572 "HG10UN": ("HG10UN", 'UN'),
573 "HG10BZ": ("HG10", 'BZ'),
573 "HG10BZ": ("HG10", 'BZ'),
574 "HG10GZ": ("HG10GZ", 'GZ'),
574 "HG10GZ": ("HG10GZ", 'GZ'),
575 }
575 }
576
576
577 # hgweb uses this list to communicate its preferred type
577 # hgweb uses this list to communicate its preferred type
578 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
578 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
579
579
580 class bundle20(object):
580 class bundle20(object):
581 """represent an outgoing bundle2 container
581 """represent an outgoing bundle2 container
582
582
583 Use the `addparam` method to add stream level parameter. and `newpart` to
583 Use the `addparam` method to add stream level parameter. and `newpart` to
584 populate it. Then call `getchunks` to retrieve all the binary chunks of
584 populate it. Then call `getchunks` to retrieve all the binary chunks of
585 data that compose the bundle2 container."""
585 data that compose the bundle2 container."""
586
586
587 _magicstring = 'HG20'
587 _magicstring = 'HG20'
588
588
589 def __init__(self, ui, capabilities=()):
589 def __init__(self, ui, capabilities=()):
590 self.ui = ui
590 self.ui = ui
591 self._params = []
591 self._params = []
592 self._parts = []
592 self._parts = []
593 self.capabilities = dict(capabilities)
593 self.capabilities = dict(capabilities)
594 self._compengine = util.compengines.forbundletype('UN')
594 self._compengine = util.compengines.forbundletype('UN')
595 self._compopts = None
595 self._compopts = None
596
596
597 def setcompression(self, alg, compopts=None):
597 def setcompression(self, alg, compopts=None):
598 """setup core part compression to <alg>"""
598 """setup core part compression to <alg>"""
599 if alg in (None, 'UN'):
599 if alg in (None, 'UN'):
600 return
600 return
601 assert not any(n.lower() == 'compression' for n, v in self._params)
601 assert not any(n.lower() == 'compression' for n, v in self._params)
602 self.addparam('Compression', alg)
602 self.addparam('Compression', alg)
603 self._compengine = util.compengines.forbundletype(alg)
603 self._compengine = util.compengines.forbundletype(alg)
604 self._compopts = compopts
604 self._compopts = compopts
605
605
606 @property
606 @property
607 def nbparts(self):
607 def nbparts(self):
608 """total number of parts added to the bundler"""
608 """total number of parts added to the bundler"""
609 return len(self._parts)
609 return len(self._parts)
610
610
611 # methods used to defines the bundle2 content
611 # methods used to defines the bundle2 content
612 def addparam(self, name, value=None):
612 def addparam(self, name, value=None):
613 """add a stream level parameter"""
613 """add a stream level parameter"""
614 if not name:
614 if not name:
615 raise ValueError(r'empty parameter name')
615 raise ValueError(r'empty parameter name')
616 if name[0:1] not in pycompat.bytestr(string.ascii_letters):
616 if name[0:1] not in pycompat.bytestr(string.ascii_letters):
617 raise ValueError(r'non letter first character: %s' % name)
617 raise ValueError(r'non letter first character: %s' % name)
618 self._params.append((name, value))
618 self._params.append((name, value))
619
619
620 def addpart(self, part):
620 def addpart(self, part):
621 """add a new part to the bundle2 container
621 """add a new part to the bundle2 container
622
622
623 Parts contains the actual applicative payload."""
623 Parts contains the actual applicative payload."""
624 assert part.id is None
624 assert part.id is None
625 part.id = len(self._parts) # very cheap counter
625 part.id = len(self._parts) # very cheap counter
626 self._parts.append(part)
626 self._parts.append(part)
627
627
628 def newpart(self, typeid, *args, **kwargs):
628 def newpart(self, typeid, *args, **kwargs):
629 """create a new part and add it to the containers
629 """create a new part and add it to the containers
630
630
631 As the part is directly added to the containers. For now, this means
631 As the part is directly added to the containers. For now, this means
632 that any failure to properly initialize the part after calling
632 that any failure to properly initialize the part after calling
633 ``newpart`` should result in a failure of the whole bundling process.
633 ``newpart`` should result in a failure of the whole bundling process.
634
634
635 You can still fall back to manually create and add if you need better
635 You can still fall back to manually create and add if you need better
636 control."""
636 control."""
637 part = bundlepart(typeid, *args, **kwargs)
637 part = bundlepart(typeid, *args, **kwargs)
638 self.addpart(part)
638 self.addpart(part)
639 return part
639 return part
640
640
641 # methods used to generate the bundle2 stream
641 # methods used to generate the bundle2 stream
642 def getchunks(self):
642 def getchunks(self):
643 if self.ui.debugflag:
643 if self.ui.debugflag:
644 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
644 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
645 if self._params:
645 if self._params:
646 msg.append(' (%i params)' % len(self._params))
646 msg.append(' (%i params)' % len(self._params))
647 msg.append(' %i parts total\n' % len(self._parts))
647 msg.append(' %i parts total\n' % len(self._parts))
648 self.ui.debug(''.join(msg))
648 self.ui.debug(''.join(msg))
649 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
649 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
650 yield self._magicstring
650 yield self._magicstring
651 param = self._paramchunk()
651 param = self._paramchunk()
652 outdebug(self.ui, 'bundle parameter: %s' % param)
652 outdebug(self.ui, 'bundle parameter: %s' % param)
653 yield _pack(_fstreamparamsize, len(param))
653 yield _pack(_fstreamparamsize, len(param))
654 if param:
654 if param:
655 yield param
655 yield param
656 for chunk in self._compengine.compressstream(self._getcorechunk(),
656 for chunk in self._compengine.compressstream(self._getcorechunk(),
657 self._compopts):
657 self._compopts):
658 yield chunk
658 yield chunk
659
659
660 def _paramchunk(self):
660 def _paramchunk(self):
661 """return a encoded version of all stream parameters"""
661 """return a encoded version of all stream parameters"""
662 blocks = []
662 blocks = []
663 for par, value in self._params:
663 for par, value in self._params:
664 par = urlreq.quote(par)
664 par = urlreq.quote(par)
665 if value is not None:
665 if value is not None:
666 value = urlreq.quote(value)
666 value = urlreq.quote(value)
667 par = '%s=%s' % (par, value)
667 par = '%s=%s' % (par, value)
668 blocks.append(par)
668 blocks.append(par)
669 return ' '.join(blocks)
669 return ' '.join(blocks)
670
670
671 def _getcorechunk(self):
671 def _getcorechunk(self):
672 """yield chunk for the core part of the bundle
672 """yield chunk for the core part of the bundle
673
673
674 (all but headers and parameters)"""
674 (all but headers and parameters)"""
675 outdebug(self.ui, 'start of parts')
675 outdebug(self.ui, 'start of parts')
676 for part in self._parts:
676 for part in self._parts:
677 outdebug(self.ui, 'bundle part: "%s"' % part.type)
677 outdebug(self.ui, 'bundle part: "%s"' % part.type)
678 for chunk in part.getchunks(ui=self.ui):
678 for chunk in part.getchunks(ui=self.ui):
679 yield chunk
679 yield chunk
680 outdebug(self.ui, 'end of bundle')
680 outdebug(self.ui, 'end of bundle')
681 yield _pack(_fpartheadersize, 0)
681 yield _pack(_fpartheadersize, 0)
682
682
683
683
684 def salvageoutput(self):
684 def salvageoutput(self):
685 """return a list with a copy of all output parts in the bundle
685 """return a list with a copy of all output parts in the bundle
686
686
687 This is meant to be used during error handling to make sure we preserve
687 This is meant to be used during error handling to make sure we preserve
688 server output"""
688 server output"""
689 salvaged = []
689 salvaged = []
690 for part in self._parts:
690 for part in self._parts:
691 if part.type.startswith('output'):
691 if part.type.startswith('output'):
692 salvaged.append(part.copy())
692 salvaged.append(part.copy())
693 return salvaged
693 return salvaged
694
694
695
695
696 class unpackermixin(object):
696 class unpackermixin(object):
697 """A mixin to extract bytes and struct data from a stream"""
697 """A mixin to extract bytes and struct data from a stream"""
698
698
699 def __init__(self, fp):
699 def __init__(self, fp):
700 self._fp = fp
700 self._fp = fp
701
701
702 def _unpack(self, format):
702 def _unpack(self, format):
703 """unpack this struct format from the stream
703 """unpack this struct format from the stream
704
704
705 This method is meant for internal usage by the bundle2 protocol only.
705 This method is meant for internal usage by the bundle2 protocol only.
706 They directly manipulate the low level stream including bundle2 level
706 They directly manipulate the low level stream including bundle2 level
707 instruction.
707 instruction.
708
708
709 Do not use it to implement higher-level logic or methods."""
709 Do not use it to implement higher-level logic or methods."""
710 data = self._readexact(struct.calcsize(format))
710 data = self._readexact(struct.calcsize(format))
711 return _unpack(format, data)
711 return _unpack(format, data)
712
712
713 def _readexact(self, size):
713 def _readexact(self, size):
714 """read exactly <size> bytes from the stream
714 """read exactly <size> bytes from the stream
715
715
716 This method is meant for internal usage by the bundle2 protocol only.
716 This method is meant for internal usage by the bundle2 protocol only.
717 They directly manipulate the low level stream including bundle2 level
717 They directly manipulate the low level stream including bundle2 level
718 instruction.
718 instruction.
719
719
720 Do not use it to implement higher-level logic or methods."""
720 Do not use it to implement higher-level logic or methods."""
721 return changegroup.readexactly(self._fp, size)
721 return changegroup.readexactly(self._fp, size)
722
722
723 def getunbundler(ui, fp, magicstring=None):
723 def getunbundler(ui, fp, magicstring=None):
724 """return a valid unbundler object for a given magicstring"""
724 """return a valid unbundler object for a given magicstring"""
725 if magicstring is None:
725 if magicstring is None:
726 magicstring = changegroup.readexactly(fp, 4)
726 magicstring = changegroup.readexactly(fp, 4)
727 magic, version = magicstring[0:2], magicstring[2:4]
727 magic, version = magicstring[0:2], magicstring[2:4]
728 if magic != 'HG':
728 if magic != 'HG':
729 ui.debug(
729 ui.debug(
730 "error: invalid magic: %r (version %r), should be 'HG'\n"
730 "error: invalid magic: %r (version %r), should be 'HG'\n"
731 % (magic, version))
731 % (magic, version))
732 raise error.Abort(_('not a Mercurial bundle'))
732 raise error.Abort(_('not a Mercurial bundle'))
733 unbundlerclass = formatmap.get(version)
733 unbundlerclass = formatmap.get(version)
734 if unbundlerclass is None:
734 if unbundlerclass is None:
735 raise error.Abort(_('unknown bundle version %s') % version)
735 raise error.Abort(_('unknown bundle version %s') % version)
736 unbundler = unbundlerclass(ui, fp)
736 unbundler = unbundlerclass(ui, fp)
737 indebug(ui, 'start processing of %s stream' % magicstring)
737 indebug(ui, 'start processing of %s stream' % magicstring)
738 return unbundler
738 return unbundler
739
739
740 class unbundle20(unpackermixin):
740 class unbundle20(unpackermixin):
741 """interpret a bundle2 stream
741 """interpret a bundle2 stream
742
742
743 This class is fed with a binary stream and yields parts through its
743 This class is fed with a binary stream and yields parts through its
744 `iterparts` methods."""
744 `iterparts` methods."""
745
745
746 _magicstring = 'HG20'
746 _magicstring = 'HG20'
747
747
748 def __init__(self, ui, fp):
748 def __init__(self, ui, fp):
749 """If header is specified, we do not read it out of the stream."""
749 """If header is specified, we do not read it out of the stream."""
750 self.ui = ui
750 self.ui = ui
751 self._compengine = util.compengines.forbundletype('UN')
751 self._compengine = util.compengines.forbundletype('UN')
752 self._compressed = None
752 self._compressed = None
753 super(unbundle20, self).__init__(fp)
753 super(unbundle20, self).__init__(fp)
754
754
755 @util.propertycache
755 @util.propertycache
756 def params(self):
756 def params(self):
757 """dictionary of stream level parameters"""
757 """dictionary of stream level parameters"""
758 indebug(self.ui, 'reading bundle2 stream parameters')
758 indebug(self.ui, 'reading bundle2 stream parameters')
759 params = {}
759 params = {}
760 paramssize = self._unpack(_fstreamparamsize)[0]
760 paramssize = self._unpack(_fstreamparamsize)[0]
761 if paramssize < 0:
761 if paramssize < 0:
762 raise error.BundleValueError('negative bundle param size: %i'
762 raise error.BundleValueError('negative bundle param size: %i'
763 % paramssize)
763 % paramssize)
764 if paramssize:
764 if paramssize:
765 params = self._readexact(paramssize)
765 params = self._readexact(paramssize)
766 params = self._processallparams(params)
766 params = self._processallparams(params)
767 return params
767 return params
768
768
769 def _processallparams(self, paramsblock):
769 def _processallparams(self, paramsblock):
770 """"""
770 """"""
771 params = util.sortdict()
771 params = util.sortdict()
772 for p in paramsblock.split(' '):
772 for p in paramsblock.split(' '):
773 p = p.split('=', 1)
773 p = p.split('=', 1)
774 p = [urlreq.unquote(i) for i in p]
774 p = [urlreq.unquote(i) for i in p]
775 if len(p) < 2:
775 if len(p) < 2:
776 p.append(None)
776 p.append(None)
777 self._processparam(*p)
777 self._processparam(*p)
778 params[p[0]] = p[1]
778 params[p[0]] = p[1]
779 return params
779 return params
780
780
781
781
782 def _processparam(self, name, value):
782 def _processparam(self, name, value):
783 """process a parameter, applying its effect if needed
783 """process a parameter, applying its effect if needed
784
784
785 Parameter starting with a lower case letter are advisory and will be
785 Parameter starting with a lower case letter are advisory and will be
786 ignored when unknown. Those starting with an upper case letter are
786 ignored when unknown. Those starting with an upper case letter are
787 mandatory and will this function will raise a KeyError when unknown.
787 mandatory and will this function will raise a KeyError when unknown.
788
788
789 Note: no option are currently supported. Any input will be either
789 Note: no option are currently supported. Any input will be either
790 ignored or failing.
790 ignored or failing.
791 """
791 """
792 if not name:
792 if not name:
793 raise ValueError(r'empty parameter name')
793 raise ValueError(r'empty parameter name')
794 if name[0:1] not in pycompat.bytestr(string.ascii_letters):
794 if name[0:1] not in pycompat.bytestr(string.ascii_letters):
795 raise ValueError(r'non letter first character: %s' % name)
795 raise ValueError(r'non letter first character: %s' % name)
796 try:
796 try:
797 handler = b2streamparamsmap[name.lower()]
797 handler = b2streamparamsmap[name.lower()]
798 except KeyError:
798 except KeyError:
799 if name[0:1].islower():
799 if name[0:1].islower():
800 indebug(self.ui, "ignoring unknown parameter %s" % name)
800 indebug(self.ui, "ignoring unknown parameter %s" % name)
801 else:
801 else:
802 raise error.BundleUnknownFeatureError(params=(name,))
802 raise error.BundleUnknownFeatureError(params=(name,))
803 else:
803 else:
804 handler(self, name, value)
804 handler(self, name, value)
805
805
806 def _forwardchunks(self):
806 def _forwardchunks(self):
807 """utility to transfer a bundle2 as binary
807 """utility to transfer a bundle2 as binary
808
808
809 This is made necessary by the fact the 'getbundle' command over 'ssh'
809 This is made necessary by the fact the 'getbundle' command over 'ssh'
810 have no way to know then the reply end, relying on the bundle to be
810 have no way to know then the reply end, relying on the bundle to be
811 interpreted to know its end. This is terrible and we are sorry, but we
811 interpreted to know its end. This is terrible and we are sorry, but we
812 needed to move forward to get general delta enabled.
812 needed to move forward to get general delta enabled.
813 """
813 """
814 yield self._magicstring
814 yield self._magicstring
815 assert 'params' not in vars(self)
815 assert 'params' not in vars(self)
816 paramssize = self._unpack(_fstreamparamsize)[0]
816 paramssize = self._unpack(_fstreamparamsize)[0]
817 if paramssize < 0:
817 if paramssize < 0:
818 raise error.BundleValueError('negative bundle param size: %i'
818 raise error.BundleValueError('negative bundle param size: %i'
819 % paramssize)
819 % paramssize)
820 yield _pack(_fstreamparamsize, paramssize)
820 yield _pack(_fstreamparamsize, paramssize)
821 if paramssize:
821 if paramssize:
822 params = self._readexact(paramssize)
822 params = self._readexact(paramssize)
823 self._processallparams(params)
823 self._processallparams(params)
824 yield params
824 yield params
825 assert self._compengine.bundletype == 'UN'
825 assert self._compengine.bundletype == 'UN'
826 # From there, payload might need to be decompressed
826 # From there, payload might need to be decompressed
827 self._fp = self._compengine.decompressorreader(self._fp)
827 self._fp = self._compengine.decompressorreader(self._fp)
828 emptycount = 0
828 emptycount = 0
829 while emptycount < 2:
829 while emptycount < 2:
830 # so we can brainlessly loop
830 # so we can brainlessly loop
831 assert _fpartheadersize == _fpayloadsize
831 assert _fpartheadersize == _fpayloadsize
832 size = self._unpack(_fpartheadersize)[0]
832 size = self._unpack(_fpartheadersize)[0]
833 yield _pack(_fpartheadersize, size)
833 yield _pack(_fpartheadersize, size)
834 if size:
834 if size:
835 emptycount = 0
835 emptycount = 0
836 else:
836 else:
837 emptycount += 1
837 emptycount += 1
838 continue
838 continue
839 if size == flaginterrupt:
839 if size == flaginterrupt:
840 continue
840 continue
841 elif size < 0:
841 elif size < 0:
842 raise error.BundleValueError('negative chunk size: %i')
842 raise error.BundleValueError('negative chunk size: %i')
843 yield self._readexact(size)
843 yield self._readexact(size)
844
844
845
845
846 def iterparts(self):
846 def iterparts(self):
847 """yield all parts contained in the stream"""
847 """yield all parts contained in the stream"""
848 # make sure param have been loaded
848 # make sure param have been loaded
849 self.params
849 self.params
850 # From there, payload need to be decompressed
850 # From there, payload need to be decompressed
851 self._fp = self._compengine.decompressorreader(self._fp)
851 self._fp = self._compengine.decompressorreader(self._fp)
852 indebug(self.ui, 'start extraction of bundle2 parts')
852 indebug(self.ui, 'start extraction of bundle2 parts')
853 headerblock = self._readpartheader()
853 headerblock = self._readpartheader()
854 while headerblock is not None:
854 while headerblock is not None:
855 part = unbundlepart(self.ui, headerblock, self._fp)
855 part = unbundlepart(self.ui, headerblock, self._fp)
856 yield part
856 yield part
857 # Seek to the end of the part to force it's consumption so the next
857 # Seek to the end of the part to force it's consumption so the next
858 # part can be read. But then seek back to the beginning so the
858 # part can be read. But then seek back to the beginning so the
859 # code consuming this generator has a part that starts at 0.
859 # code consuming this generator has a part that starts at 0.
860 part.seek(0, 2)
860 part.seek(0, 2)
861 part.seek(0)
861 part.seek(0)
862 headerblock = self._readpartheader()
862 headerblock = self._readpartheader()
863 indebug(self.ui, 'end of bundle2 stream')
863 indebug(self.ui, 'end of bundle2 stream')
864
864
865 def _readpartheader(self):
865 def _readpartheader(self):
866 """reads a part header size and return the bytes blob
866 """reads a part header size and return the bytes blob
867
867
868 returns None if empty"""
868 returns None if empty"""
869 headersize = self._unpack(_fpartheadersize)[0]
869 headersize = self._unpack(_fpartheadersize)[0]
870 if headersize < 0:
870 if headersize < 0:
871 raise error.BundleValueError('negative part header size: %i'
871 raise error.BundleValueError('negative part header size: %i'
872 % headersize)
872 % headersize)
873 indebug(self.ui, 'part header size: %i' % headersize)
873 indebug(self.ui, 'part header size: %i' % headersize)
874 if headersize:
874 if headersize:
875 return self._readexact(headersize)
875 return self._readexact(headersize)
876 return None
876 return None
877
877
878 def compressed(self):
878 def compressed(self):
879 self.params # load params
879 self.params # load params
880 return self._compressed
880 return self._compressed
881
881
882 def close(self):
882 def close(self):
883 """close underlying file"""
883 """close underlying file"""
884 if util.safehasattr(self._fp, 'close'):
884 if util.safehasattr(self._fp, 'close'):
885 return self._fp.close()
885 return self._fp.close()
886
886
887 formatmap = {'20': unbundle20}
887 formatmap = {'20': unbundle20}
888
888
889 b2streamparamsmap = {}
889 b2streamparamsmap = {}
890
890
891 def b2streamparamhandler(name):
891 def b2streamparamhandler(name):
892 """register a handler for a stream level parameter"""
892 """register a handler for a stream level parameter"""
893 def decorator(func):
893 def decorator(func):
894 assert name not in formatmap
894 assert name not in formatmap
895 b2streamparamsmap[name] = func
895 b2streamparamsmap[name] = func
896 return func
896 return func
897 return decorator
897 return decorator
898
898
899 @b2streamparamhandler('compression')
899 @b2streamparamhandler('compression')
900 def processcompression(unbundler, param, value):
900 def processcompression(unbundler, param, value):
901 """read compression parameter and install payload decompression"""
901 """read compression parameter and install payload decompression"""
902 if value not in util.compengines.supportedbundletypes:
902 if value not in util.compengines.supportedbundletypes:
903 raise error.BundleUnknownFeatureError(params=(param,),
903 raise error.BundleUnknownFeatureError(params=(param,),
904 values=(value,))
904 values=(value,))
905 unbundler._compengine = util.compengines.forbundletype(value)
905 unbundler._compengine = util.compengines.forbundletype(value)
906 if value is not None:
906 if value is not None:
907 unbundler._compressed = True
907 unbundler._compressed = True
908
908
909 class bundlepart(object):
909 class bundlepart(object):
910 """A bundle2 part contains application level payload
910 """A bundle2 part contains application level payload
911
911
912 The part `type` is used to route the part to the application level
912 The part `type` is used to route the part to the application level
913 handler.
913 handler.
914
914
915 The part payload is contained in ``part.data``. It could be raw bytes or a
915 The part payload is contained in ``part.data``. It could be raw bytes or a
916 generator of byte chunks.
916 generator of byte chunks.
917
917
918 You can add parameters to the part using the ``addparam`` method.
918 You can add parameters to the part using the ``addparam`` method.
919 Parameters can be either mandatory (default) or advisory. Remote side
919 Parameters can be either mandatory (default) or advisory. Remote side
920 should be able to safely ignore the advisory ones.
920 should be able to safely ignore the advisory ones.
921
921
922 Both data and parameters cannot be modified after the generation has begun.
922 Both data and parameters cannot be modified after the generation has begun.
923 """
923 """
924
924
925 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
925 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
926 data='', mandatory=True):
926 data='', mandatory=True):
927 validateparttype(parttype)
927 validateparttype(parttype)
928 self.id = None
928 self.id = None
929 self.type = parttype
929 self.type = parttype
930 self._data = data
930 self._data = data
931 self._mandatoryparams = list(mandatoryparams)
931 self._mandatoryparams = list(mandatoryparams)
932 self._advisoryparams = list(advisoryparams)
932 self._advisoryparams = list(advisoryparams)
933 # checking for duplicated entries
933 # checking for duplicated entries
934 self._seenparams = set()
934 self._seenparams = set()
935 for pname, __ in self._mandatoryparams + self._advisoryparams:
935 for pname, __ in self._mandatoryparams + self._advisoryparams:
936 if pname in self._seenparams:
936 if pname in self._seenparams:
937 raise error.ProgrammingError('duplicated params: %s' % pname)
937 raise error.ProgrammingError('duplicated params: %s' % pname)
938 self._seenparams.add(pname)
938 self._seenparams.add(pname)
939 # status of the part's generation:
939 # status of the part's generation:
940 # - None: not started,
940 # - None: not started,
941 # - False: currently generated,
941 # - False: currently generated,
942 # - True: generation done.
942 # - True: generation done.
943 self._generated = None
943 self._generated = None
944 self.mandatory = mandatory
944 self.mandatory = mandatory
945
945
946 def __repr__(self):
946 def __repr__(self):
947 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
947 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
948 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
948 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
949 % (cls, id(self), self.id, self.type, self.mandatory))
949 % (cls, id(self), self.id, self.type, self.mandatory))
950
950
951 def copy(self):
951 def copy(self):
952 """return a copy of the part
952 """return a copy of the part
953
953
954 The new part have the very same content but no partid assigned yet.
954 The new part have the very same content but no partid assigned yet.
955 Parts with generated data cannot be copied."""
955 Parts with generated data cannot be copied."""
956 assert not util.safehasattr(self.data, 'next')
956 assert not util.safehasattr(self.data, 'next')
957 return self.__class__(self.type, self._mandatoryparams,
957 return self.__class__(self.type, self._mandatoryparams,
958 self._advisoryparams, self._data, self.mandatory)
958 self._advisoryparams, self._data, self.mandatory)
959
959
960 # methods used to defines the part content
960 # methods used to defines the part content
961 @property
961 @property
962 def data(self):
962 def data(self):
963 return self._data
963 return self._data
964
964
965 @data.setter
965 @data.setter
966 def data(self, data):
966 def data(self, data):
967 if self._generated is not None:
967 if self._generated is not None:
968 raise error.ReadOnlyPartError('part is being generated')
968 raise error.ReadOnlyPartError('part is being generated')
969 self._data = data
969 self._data = data
970
970
971 @property
971 @property
972 def mandatoryparams(self):
972 def mandatoryparams(self):
973 # make it an immutable tuple to force people through ``addparam``
973 # make it an immutable tuple to force people through ``addparam``
974 return tuple(self._mandatoryparams)
974 return tuple(self._mandatoryparams)
975
975
976 @property
976 @property
977 def advisoryparams(self):
977 def advisoryparams(self):
978 # make it an immutable tuple to force people through ``addparam``
978 # make it an immutable tuple to force people through ``addparam``
979 return tuple(self._advisoryparams)
979 return tuple(self._advisoryparams)
980
980
981 def addparam(self, name, value='', mandatory=True):
981 def addparam(self, name, value='', mandatory=True):
982 """add a parameter to the part
982 """add a parameter to the part
983
983
984 If 'mandatory' is set to True, the remote handler must claim support
984 If 'mandatory' is set to True, the remote handler must claim support
985 for this parameter or the unbundling will be aborted.
985 for this parameter or the unbundling will be aborted.
986
986
987 The 'name' and 'value' cannot exceed 255 bytes each.
987 The 'name' and 'value' cannot exceed 255 bytes each.
988 """
988 """
989 if self._generated is not None:
989 if self._generated is not None:
990 raise error.ReadOnlyPartError('part is being generated')
990 raise error.ReadOnlyPartError('part is being generated')
991 if name in self._seenparams:
991 if name in self._seenparams:
992 raise ValueError('duplicated params: %s' % name)
992 raise ValueError('duplicated params: %s' % name)
993 self._seenparams.add(name)
993 self._seenparams.add(name)
994 params = self._advisoryparams
994 params = self._advisoryparams
995 if mandatory:
995 if mandatory:
996 params = self._mandatoryparams
996 params = self._mandatoryparams
997 params.append((name, value))
997 params.append((name, value))
998
998
999 # methods used to generates the bundle2 stream
999 # methods used to generates the bundle2 stream
1000 def getchunks(self, ui):
1000 def getchunks(self, ui):
1001 if self._generated is not None:
1001 if self._generated is not None:
1002 raise error.ProgrammingError('part can only be consumed once')
1002 raise error.ProgrammingError('part can only be consumed once')
1003 self._generated = False
1003 self._generated = False
1004
1004
1005 if ui.debugflag:
1005 if ui.debugflag:
1006 msg = ['bundle2-output-part: "%s"' % self.type]
1006 msg = ['bundle2-output-part: "%s"' % self.type]
1007 if not self.mandatory:
1007 if not self.mandatory:
1008 msg.append(' (advisory)')
1008 msg.append(' (advisory)')
1009 nbmp = len(self.mandatoryparams)
1009 nbmp = len(self.mandatoryparams)
1010 nbap = len(self.advisoryparams)
1010 nbap = len(self.advisoryparams)
1011 if nbmp or nbap:
1011 if nbmp or nbap:
1012 msg.append(' (params:')
1012 msg.append(' (params:')
1013 if nbmp:
1013 if nbmp:
1014 msg.append(' %i mandatory' % nbmp)
1014 msg.append(' %i mandatory' % nbmp)
1015 if nbap:
1015 if nbap:
1016 msg.append(' %i advisory' % nbmp)
1016 msg.append(' %i advisory' % nbmp)
1017 msg.append(')')
1017 msg.append(')')
1018 if not self.data:
1018 if not self.data:
1019 msg.append(' empty payload')
1019 msg.append(' empty payload')
1020 elif (util.safehasattr(self.data, 'next')
1020 elif (util.safehasattr(self.data, 'next')
1021 or util.safehasattr(self.data, '__next__')):
1021 or util.safehasattr(self.data, '__next__')):
1022 msg.append(' streamed payload')
1022 msg.append(' streamed payload')
1023 else:
1023 else:
1024 msg.append(' %i bytes payload' % len(self.data))
1024 msg.append(' %i bytes payload' % len(self.data))
1025 msg.append('\n')
1025 msg.append('\n')
1026 ui.debug(''.join(msg))
1026 ui.debug(''.join(msg))
1027
1027
1028 #### header
1028 #### header
1029 if self.mandatory:
1029 if self.mandatory:
1030 parttype = self.type.upper()
1030 parttype = self.type.upper()
1031 else:
1031 else:
1032 parttype = self.type.lower()
1032 parttype = self.type.lower()
1033 outdebug(ui, 'part %s: "%s"' % (pycompat.bytestr(self.id), parttype))
1033 outdebug(ui, 'part %s: "%s"' % (pycompat.bytestr(self.id), parttype))
1034 ## parttype
1034 ## parttype
1035 header = [_pack(_fparttypesize, len(parttype)),
1035 header = [_pack(_fparttypesize, len(parttype)),
1036 parttype, _pack(_fpartid, self.id),
1036 parttype, _pack(_fpartid, self.id),
1037 ]
1037 ]
1038 ## parameters
1038 ## parameters
1039 # count
1039 # count
1040 manpar = self.mandatoryparams
1040 manpar = self.mandatoryparams
1041 advpar = self.advisoryparams
1041 advpar = self.advisoryparams
1042 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
1042 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
1043 # size
1043 # size
1044 parsizes = []
1044 parsizes = []
1045 for key, value in manpar:
1045 for key, value in manpar:
1046 parsizes.append(len(key))
1046 parsizes.append(len(key))
1047 parsizes.append(len(value))
1047 parsizes.append(len(value))
1048 for key, value in advpar:
1048 for key, value in advpar:
1049 parsizes.append(len(key))
1049 parsizes.append(len(key))
1050 parsizes.append(len(value))
1050 parsizes.append(len(value))
1051 paramsizes = _pack(_makefpartparamsizes(len(parsizes) // 2), *parsizes)
1051 paramsizes = _pack(_makefpartparamsizes(len(parsizes) // 2), *parsizes)
1052 header.append(paramsizes)
1052 header.append(paramsizes)
1053 # key, value
1053 # key, value
1054 for key, value in manpar:
1054 for key, value in manpar:
1055 header.append(key)
1055 header.append(key)
1056 header.append(value)
1056 header.append(value)
1057 for key, value in advpar:
1057 for key, value in advpar:
1058 header.append(key)
1058 header.append(key)
1059 header.append(value)
1059 header.append(value)
1060 ## finalize header
1060 ## finalize header
1061 try:
1061 try:
1062 headerchunk = ''.join(header)
1062 headerchunk = ''.join(header)
1063 except TypeError:
1063 except TypeError:
1064 raise TypeError(r'Found a non-bytes trying to '
1064 raise TypeError(r'Found a non-bytes trying to '
1065 r'build bundle part header: %r' % header)
1065 r'build bundle part header: %r' % header)
1066 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
1066 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
1067 yield _pack(_fpartheadersize, len(headerchunk))
1067 yield _pack(_fpartheadersize, len(headerchunk))
1068 yield headerchunk
1068 yield headerchunk
1069 ## payload
1069 ## payload
1070 try:
1070 try:
1071 for chunk in self._payloadchunks():
1071 for chunk in self._payloadchunks():
1072 outdebug(ui, 'payload chunk size: %i' % len(chunk))
1072 outdebug(ui, 'payload chunk size: %i' % len(chunk))
1073 yield _pack(_fpayloadsize, len(chunk))
1073 yield _pack(_fpayloadsize, len(chunk))
1074 yield chunk
1074 yield chunk
1075 except GeneratorExit:
1075 except GeneratorExit:
1076 # GeneratorExit means that nobody is listening for our
1076 # GeneratorExit means that nobody is listening for our
1077 # results anyway, so just bail quickly rather than trying
1077 # results anyway, so just bail quickly rather than trying
1078 # to produce an error part.
1078 # to produce an error part.
1079 ui.debug('bundle2-generatorexit\n')
1079 ui.debug('bundle2-generatorexit\n')
1080 raise
1080 raise
1081 except BaseException as exc:
1081 except BaseException as exc:
1082 bexc = util.forcebytestr(exc)
1082 bexc = util.forcebytestr(exc)
1083 # backup exception data for later
1083 # backup exception data for later
1084 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
1084 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
1085 % bexc)
1085 % bexc)
1086 tb = sys.exc_info()[2]
1086 tb = sys.exc_info()[2]
1087 msg = 'unexpected error: %s' % bexc
1087 msg = 'unexpected error: %s' % bexc
1088 interpart = bundlepart('error:abort', [('message', msg)],
1088 interpart = bundlepart('error:abort', [('message', msg)],
1089 mandatory=False)
1089 mandatory=False)
1090 interpart.id = 0
1090 interpart.id = 0
1091 yield _pack(_fpayloadsize, -1)
1091 yield _pack(_fpayloadsize, -1)
1092 for chunk in interpart.getchunks(ui=ui):
1092 for chunk in interpart.getchunks(ui=ui):
1093 yield chunk
1093 yield chunk
1094 outdebug(ui, 'closing payload chunk')
1094 outdebug(ui, 'closing payload chunk')
1095 # abort current part payload
1095 # abort current part payload
1096 yield _pack(_fpayloadsize, 0)
1096 yield _pack(_fpayloadsize, 0)
1097 pycompat.raisewithtb(exc, tb)
1097 pycompat.raisewithtb(exc, tb)
1098 # end of payload
1098 # end of payload
1099 outdebug(ui, 'closing payload chunk')
1099 outdebug(ui, 'closing payload chunk')
1100 yield _pack(_fpayloadsize, 0)
1100 yield _pack(_fpayloadsize, 0)
1101 self._generated = True
1101 self._generated = True
1102
1102
1103 def _payloadchunks(self):
1103 def _payloadchunks(self):
1104 """yield chunks of a the part payload
1104 """yield chunks of a the part payload
1105
1105
1106 Exists to handle the different methods to provide data to a part."""
1106 Exists to handle the different methods to provide data to a part."""
1107 # we only support fixed size data now.
1107 # we only support fixed size data now.
1108 # This will be improved in the future.
1108 # This will be improved in the future.
1109 if (util.safehasattr(self.data, 'next')
1109 if (util.safehasattr(self.data, 'next')
1110 or util.safehasattr(self.data, '__next__')):
1110 or util.safehasattr(self.data, '__next__')):
1111 buff = util.chunkbuffer(self.data)
1111 buff = util.chunkbuffer(self.data)
1112 chunk = buff.read(preferedchunksize)
1112 chunk = buff.read(preferedchunksize)
1113 while chunk:
1113 while chunk:
1114 yield chunk
1114 yield chunk
1115 chunk = buff.read(preferedchunksize)
1115 chunk = buff.read(preferedchunksize)
1116 elif len(self.data):
1116 elif len(self.data):
1117 yield self.data
1117 yield self.data
1118
1118
1119
1119
1120 flaginterrupt = -1
1120 flaginterrupt = -1
1121
1121
1122 class interrupthandler(unpackermixin):
1122 class interrupthandler(unpackermixin):
1123 """read one part and process it with restricted capability
1123 """read one part and process it with restricted capability
1124
1124
1125 This allows to transmit exception raised on the producer size during part
1125 This allows to transmit exception raised on the producer size during part
1126 iteration while the consumer is reading a part.
1126 iteration while the consumer is reading a part.
1127
1127
1128 Part processed in this manner only have access to a ui object,"""
1128 Part processed in this manner only have access to a ui object,"""
1129
1129
1130 def __init__(self, ui, fp):
1130 def __init__(self, ui, fp):
1131 super(interrupthandler, self).__init__(fp)
1131 super(interrupthandler, self).__init__(fp)
1132 self.ui = ui
1132 self.ui = ui
1133
1133
1134 def _readpartheader(self):
1134 def _readpartheader(self):
1135 """reads a part header size and return the bytes blob
1135 """reads a part header size and return the bytes blob
1136
1136
1137 returns None if empty"""
1137 returns None if empty"""
1138 headersize = self._unpack(_fpartheadersize)[0]
1138 headersize = self._unpack(_fpartheadersize)[0]
1139 if headersize < 0:
1139 if headersize < 0:
1140 raise error.BundleValueError('negative part header size: %i'
1140 raise error.BundleValueError('negative part header size: %i'
1141 % headersize)
1141 % headersize)
1142 indebug(self.ui, 'part header size: %i\n' % headersize)
1142 indebug(self.ui, 'part header size: %i\n' % headersize)
1143 if headersize:
1143 if headersize:
1144 return self._readexact(headersize)
1144 return self._readexact(headersize)
1145 return None
1145 return None
1146
1146
1147 def __call__(self):
1147 def __call__(self):
1148
1148
1149 self.ui.debug('bundle2-input-stream-interrupt:'
1149 self.ui.debug('bundle2-input-stream-interrupt:'
1150 ' opening out of band context\n')
1150 ' opening out of band context\n')
1151 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1151 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1152 headerblock = self._readpartheader()
1152 headerblock = self._readpartheader()
1153 if headerblock is None:
1153 if headerblock is None:
1154 indebug(self.ui, 'no part found during interruption.')
1154 indebug(self.ui, 'no part found during interruption.')
1155 return
1155 return
1156 part = unbundlepart(self.ui, headerblock, self._fp)
1156 part = unbundlepart(self.ui, headerblock, self._fp)
1157 op = interruptoperation(self.ui)
1157 op = interruptoperation(self.ui)
1158 hardabort = False
1158 hardabort = False
1159 try:
1159 try:
1160 _processpart(op, part)
1160 _processpart(op, part)
1161 except (SystemExit, KeyboardInterrupt):
1161 except (SystemExit, KeyboardInterrupt):
1162 hardabort = True
1162 hardabort = True
1163 raise
1163 raise
1164 finally:
1164 finally:
1165 if not hardabort:
1165 if not hardabort:
1166 part.seek(0, 2)
1166 part.seek(0, 2)
1167 self.ui.debug('bundle2-input-stream-interrupt:'
1167 self.ui.debug('bundle2-input-stream-interrupt:'
1168 ' closing out of band context\n')
1168 ' closing out of band context\n')
1169
1169
1170 class interruptoperation(object):
1170 class interruptoperation(object):
1171 """A limited operation to be use by part handler during interruption
1171 """A limited operation to be use by part handler during interruption
1172
1172
1173 It only have access to an ui object.
1173 It only have access to an ui object.
1174 """
1174 """
1175
1175
1176 def __init__(self, ui):
1176 def __init__(self, ui):
1177 self.ui = ui
1177 self.ui = ui
1178 self.reply = None
1178 self.reply = None
1179 self.captureoutput = False
1179 self.captureoutput = False
1180
1180
1181 @property
1181 @property
1182 def repo(self):
1182 def repo(self):
1183 raise error.ProgrammingError('no repo access from stream interruption')
1183 raise error.ProgrammingError('no repo access from stream interruption')
1184
1184
1185 def gettransaction(self):
1185 def gettransaction(self):
1186 raise TransactionUnavailable('no repo access from stream interruption')
1186 raise TransactionUnavailable('no repo access from stream interruption')
1187
1187
1188 class unbundlepart(unpackermixin):
1188 class unbundlepart(unpackermixin):
1189 """a bundle part read from a bundle"""
1189 """a bundle part read from a bundle"""
1190
1190
1191 def __init__(self, ui, header, fp):
1191 def __init__(self, ui, header, fp):
1192 super(unbundlepart, self).__init__(fp)
1192 super(unbundlepart, self).__init__(fp)
1193 self._seekable = (util.safehasattr(fp, 'seek') and
1193 self._seekable = (util.safehasattr(fp, 'seek') and
1194 util.safehasattr(fp, 'tell'))
1194 util.safehasattr(fp, 'tell'))
1195 self.ui = ui
1195 self.ui = ui
1196 # unbundle state attr
1196 # unbundle state attr
1197 self._headerdata = header
1197 self._headerdata = header
1198 self._headeroffset = 0
1198 self._headeroffset = 0
1199 self._initialized = False
1199 self._initialized = False
1200 self.consumed = False
1200 self.consumed = False
1201 # part data
1201 # part data
1202 self.id = None
1202 self.id = None
1203 self.type = None
1203 self.type = None
1204 self.mandatoryparams = None
1204 self.mandatoryparams = None
1205 self.advisoryparams = None
1205 self.advisoryparams = None
1206 self.params = None
1206 self.params = None
1207 self.mandatorykeys = ()
1207 self.mandatorykeys = ()
1208 self._payloadstream = None
1208 self._payloadstream = None
1209 self._readheader()
1209 self._readheader()
1210 self._mandatory = None
1210 self._mandatory = None
1211 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1211 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1212 self._pos = 0
1212 self._pos = 0
1213
1213
1214 def _fromheader(self, size):
1214 def _fromheader(self, size):
1215 """return the next <size> byte from the header"""
1215 """return the next <size> byte from the header"""
1216 offset = self._headeroffset
1216 offset = self._headeroffset
1217 data = self._headerdata[offset:(offset + size)]
1217 data = self._headerdata[offset:(offset + size)]
1218 self._headeroffset = offset + size
1218 self._headeroffset = offset + size
1219 return data
1219 return data
1220
1220
1221 def _unpackheader(self, format):
1221 def _unpackheader(self, format):
1222 """read given format from header
1222 """read given format from header
1223
1223
1224 This automatically compute the size of the format to read."""
1224 This automatically compute the size of the format to read."""
1225 data = self._fromheader(struct.calcsize(format))
1225 data = self._fromheader(struct.calcsize(format))
1226 return _unpack(format, data)
1226 return _unpack(format, data)
1227
1227
1228 def _initparams(self, mandatoryparams, advisoryparams):
1228 def _initparams(self, mandatoryparams, advisoryparams):
1229 """internal function to setup all logic related parameters"""
1229 """internal function to setup all logic related parameters"""
1230 # make it read only to prevent people touching it by mistake.
1230 # make it read only to prevent people touching it by mistake.
1231 self.mandatoryparams = tuple(mandatoryparams)
1231 self.mandatoryparams = tuple(mandatoryparams)
1232 self.advisoryparams = tuple(advisoryparams)
1232 self.advisoryparams = tuple(advisoryparams)
1233 # user friendly UI
1233 # user friendly UI
1234 self.params = util.sortdict(self.mandatoryparams)
1234 self.params = util.sortdict(self.mandatoryparams)
1235 self.params.update(self.advisoryparams)
1235 self.params.update(self.advisoryparams)
1236 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1236 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1237
1237
1238 def _payloadchunks(self, chunknum=0):
1238 def _payloadchunks(self, chunknum=0):
1239 '''seek to specified chunk and start yielding data'''
1239 '''seek to specified chunk and start yielding data'''
1240 if len(self._chunkindex) == 0:
1240 if len(self._chunkindex) == 0:
1241 assert chunknum == 0, 'Must start with chunk 0'
1241 assert chunknum == 0, 'Must start with chunk 0'
1242 self._chunkindex.append((0, self._tellfp()))
1242 self._chunkindex.append((0, self._tellfp()))
1243 else:
1243 else:
1244 assert chunknum < len(self._chunkindex), \
1244 assert chunknum < len(self._chunkindex), \
1245 'Unknown chunk %d' % chunknum
1245 'Unknown chunk %d' % chunknum
1246 self._seekfp(self._chunkindex[chunknum][1])
1246 self._seekfp(self._chunkindex[chunknum][1])
1247
1247
1248 pos = self._chunkindex[chunknum][0]
1248 pos = self._chunkindex[chunknum][0]
1249 payloadsize = self._unpack(_fpayloadsize)[0]
1249 payloadsize = self._unpack(_fpayloadsize)[0]
1250 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1250 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1251 while payloadsize:
1251 while payloadsize:
1252 if payloadsize == flaginterrupt:
1252 if payloadsize == flaginterrupt:
1253 # interruption detection, the handler will now read a
1253 # interruption detection, the handler will now read a
1254 # single part and process it.
1254 # single part and process it.
1255 interrupthandler(self.ui, self._fp)()
1255 interrupthandler(self.ui, self._fp)()
1256 elif payloadsize < 0:
1256 elif payloadsize < 0:
1257 msg = 'negative payload chunk size: %i' % payloadsize
1257 msg = 'negative payload chunk size: %i' % payloadsize
1258 raise error.BundleValueError(msg)
1258 raise error.BundleValueError(msg)
1259 else:
1259 else:
1260 result = self._readexact(payloadsize)
1260 result = self._readexact(payloadsize)
1261 chunknum += 1
1261 chunknum += 1
1262 pos += payloadsize
1262 pos += payloadsize
1263 if chunknum == len(self._chunkindex):
1263 if chunknum == len(self._chunkindex):
1264 self._chunkindex.append((pos, self._tellfp()))
1264 self._chunkindex.append((pos, self._tellfp()))
1265 yield result
1265 yield result
1266 payloadsize = self._unpack(_fpayloadsize)[0]
1266 payloadsize = self._unpack(_fpayloadsize)[0]
1267 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1267 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1268
1268
1269 def _findchunk(self, pos):
1269 def _findchunk(self, pos):
1270 '''for a given payload position, return a chunk number and offset'''
1270 '''for a given payload position, return a chunk number and offset'''
1271 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1271 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1272 if ppos == pos:
1272 if ppos == pos:
1273 return chunk, 0
1273 return chunk, 0
1274 elif ppos > pos:
1274 elif ppos > pos:
1275 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1275 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1276 raise ValueError('Unknown chunk')
1276 raise ValueError('Unknown chunk')
1277
1277
1278 def _readheader(self):
1278 def _readheader(self):
1279 """read the header and setup the object"""
1279 """read the header and setup the object"""
1280 typesize = self._unpackheader(_fparttypesize)[0]
1280 typesize = self._unpackheader(_fparttypesize)[0]
1281 self.type = self._fromheader(typesize)
1281 self.type = self._fromheader(typesize)
1282 indebug(self.ui, 'part type: "%s"' % self.type)
1282 indebug(self.ui, 'part type: "%s"' % self.type)
1283 self.id = self._unpackheader(_fpartid)[0]
1283 self.id = self._unpackheader(_fpartid)[0]
1284 indebug(self.ui, 'part id: "%s"' % pycompat.bytestr(self.id))
1284 indebug(self.ui, 'part id: "%s"' % pycompat.bytestr(self.id))
1285 # extract mandatory bit from type
1285 # extract mandatory bit from type
1286 self.mandatory = (self.type != self.type.lower())
1286 self.mandatory = (self.type != self.type.lower())
1287 self.type = self.type.lower()
1287 self.type = self.type.lower()
1288 ## reading parameters
1288 ## reading parameters
1289 # param count
1289 # param count
1290 mancount, advcount = self._unpackheader(_fpartparamcount)
1290 mancount, advcount = self._unpackheader(_fpartparamcount)
1291 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1291 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1292 # param size
1292 # param size
1293 fparamsizes = _makefpartparamsizes(mancount + advcount)
1293 fparamsizes = _makefpartparamsizes(mancount + advcount)
1294 paramsizes = self._unpackheader(fparamsizes)
1294 paramsizes = self._unpackheader(fparamsizes)
1295 # make it a list of couple again
1295 # make it a list of couple again
1296 paramsizes = list(zip(paramsizes[::2], paramsizes[1::2]))
1296 paramsizes = list(zip(paramsizes[::2], paramsizes[1::2]))
1297 # split mandatory from advisory
1297 # split mandatory from advisory
1298 mansizes = paramsizes[:mancount]
1298 mansizes = paramsizes[:mancount]
1299 advsizes = paramsizes[mancount:]
1299 advsizes = paramsizes[mancount:]
1300 # retrieve param value
1300 # retrieve param value
1301 manparams = []
1301 manparams = []
1302 for key, value in mansizes:
1302 for key, value in mansizes:
1303 manparams.append((self._fromheader(key), self._fromheader(value)))
1303 manparams.append((self._fromheader(key), self._fromheader(value)))
1304 advparams = []
1304 advparams = []
1305 for key, value in advsizes:
1305 for key, value in advsizes:
1306 advparams.append((self._fromheader(key), self._fromheader(value)))
1306 advparams.append((self._fromheader(key), self._fromheader(value)))
1307 self._initparams(manparams, advparams)
1307 self._initparams(manparams, advparams)
1308 ## part payload
1308 ## part payload
1309 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1309 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1310 # we read the data, tell it
1310 # we read the data, tell it
1311 self._initialized = True
1311 self._initialized = True
1312
1312
1313 def read(self, size=None):
1313 def read(self, size=None):
1314 """read payload data"""
1314 """read payload data"""
1315 if not self._initialized:
1315 if not self._initialized:
1316 self._readheader()
1316 self._readheader()
1317 if size is None:
1317 if size is None:
1318 data = self._payloadstream.read()
1318 data = self._payloadstream.read()
1319 else:
1319 else:
1320 data = self._payloadstream.read(size)
1320 data = self._payloadstream.read(size)
1321 self._pos += len(data)
1321 self._pos += len(data)
1322 if size is None or len(data) < size:
1322 if size is None or len(data) < size:
1323 if not self.consumed and self._pos:
1323 if not self.consumed and self._pos:
1324 self.ui.debug('bundle2-input-part: total payload size %i\n'
1324 self.ui.debug('bundle2-input-part: total payload size %i\n'
1325 % self._pos)
1325 % self._pos)
1326 self.consumed = True
1326 self.consumed = True
1327 return data
1327 return data
1328
1328
1329 def tell(self):
1329 def tell(self):
1330 return self._pos
1330 return self._pos
1331
1331
1332 def seek(self, offset, whence=0):
1332 def seek(self, offset, whence=0):
1333 if whence == 0:
1333 if whence == 0:
1334 newpos = offset
1334 newpos = offset
1335 elif whence == 1:
1335 elif whence == 1:
1336 newpos = self._pos + offset
1336 newpos = self._pos + offset
1337 elif whence == 2:
1337 elif whence == 2:
1338 if not self.consumed:
1338 if not self.consumed:
1339 self.read()
1339 self.read()
1340 newpos = self._chunkindex[-1][0] - offset
1340 newpos = self._chunkindex[-1][0] - offset
1341 else:
1341 else:
1342 raise ValueError('Unknown whence value: %r' % (whence,))
1342 raise ValueError('Unknown whence value: %r' % (whence,))
1343
1343
1344 if newpos > self._chunkindex[-1][0] and not self.consumed:
1344 if newpos > self._chunkindex[-1][0] and not self.consumed:
1345 self.read()
1345 self.read()
1346 if not 0 <= newpos <= self._chunkindex[-1][0]:
1346 if not 0 <= newpos <= self._chunkindex[-1][0]:
1347 raise ValueError('Offset out of range')
1347 raise ValueError('Offset out of range')
1348
1348
1349 if self._pos != newpos:
1349 if self._pos != newpos:
1350 chunk, internaloffset = self._findchunk(newpos)
1350 chunk, internaloffset = self._findchunk(newpos)
1351 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1351 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1352 adjust = self.read(internaloffset)
1352 adjust = self.read(internaloffset)
1353 if len(adjust) != internaloffset:
1353 if len(adjust) != internaloffset:
1354 raise error.Abort(_('Seek failed\n'))
1354 raise error.Abort(_('Seek failed\n'))
1355 self._pos = newpos
1355 self._pos = newpos
1356
1356
1357 def _seekfp(self, offset, whence=0):
1357 def _seekfp(self, offset, whence=0):
1358 """move the underlying file pointer
1358 """move the underlying file pointer
1359
1359
1360 This method is meant for internal usage by the bundle2 protocol only.
1360 This method is meant for internal usage by the bundle2 protocol only.
1361 They directly manipulate the low level stream including bundle2 level
1361 They directly manipulate the low level stream including bundle2 level
1362 instruction.
1362 instruction.
1363
1363
1364 Do not use it to implement higher-level logic or methods."""
1364 Do not use it to implement higher-level logic or methods."""
1365 if self._seekable:
1365 if self._seekable:
1366 return self._fp.seek(offset, whence)
1366 return self._fp.seek(offset, whence)
1367 else:
1367 else:
1368 raise NotImplementedError(_('File pointer is not seekable'))
1368 raise NotImplementedError(_('File pointer is not seekable'))
1369
1369
1370 def _tellfp(self):
1370 def _tellfp(self):
1371 """return the file offset, or None if file is not seekable
1371 """return the file offset, or None if file is not seekable
1372
1372
1373 This method is meant for internal usage by the bundle2 protocol only.
1373 This method is meant for internal usage by the bundle2 protocol only.
1374 They directly manipulate the low level stream including bundle2 level
1374 They directly manipulate the low level stream including bundle2 level
1375 instruction.
1375 instruction.
1376
1376
1377 Do not use it to implement higher-level logic or methods."""
1377 Do not use it to implement higher-level logic or methods."""
1378 if self._seekable:
1378 if self._seekable:
1379 try:
1379 try:
1380 return self._fp.tell()
1380 return self._fp.tell()
1381 except IOError as e:
1381 except IOError as e:
1382 if e.errno == errno.ESPIPE:
1382 if e.errno == errno.ESPIPE:
1383 self._seekable = False
1383 self._seekable = False
1384 else:
1384 else:
1385 raise
1385 raise
1386 return None
1386 return None
1387
1387
1388 # These are only the static capabilities.
1388 # These are only the static capabilities.
1389 # Check the 'getrepocaps' function for the rest.
1389 # Check the 'getrepocaps' function for the rest.
1390 capabilities = {'HG20': (),
1390 capabilities = {'HG20': (),
1391 'error': ('abort', 'unsupportedcontent', 'pushraced',
1391 'error': ('abort', 'unsupportedcontent', 'pushraced',
1392 'pushkey'),
1392 'pushkey'),
1393 'listkeys': (),
1393 'listkeys': (),
1394 'pushkey': (),
1394 'pushkey': (),
1395 'digests': tuple(sorted(util.DIGESTS.keys())),
1395 'digests': tuple(sorted(util.DIGESTS.keys())),
1396 'remote-changegroup': ('http', 'https'),
1396 'remote-changegroup': ('http', 'https'),
1397 'hgtagsfnodes': (),
1397 'hgtagsfnodes': (),
1398 'phases': ('heads',),
1398 'phases': ('heads',),
1399 }
1399 }
1400
1400
1401 def getrepocaps(repo, allowpushback=False):
1401 def getrepocaps(repo, allowpushback=False):
1402 """return the bundle2 capabilities for a given repo
1402 """return the bundle2 capabilities for a given repo
1403
1403
1404 Exists to allow extensions (like evolution) to mutate the capabilities.
1404 Exists to allow extensions (like evolution) to mutate the capabilities.
1405 """
1405 """
1406 caps = capabilities.copy()
1406 caps = capabilities.copy()
1407 caps['changegroup'] = tuple(sorted(
1407 caps['changegroup'] = tuple(sorted(
1408 changegroup.supportedincomingversions(repo)))
1408 changegroup.supportedincomingversions(repo)))
1409 if obsolete.isenabled(repo, obsolete.exchangeopt):
1409 if obsolete.isenabled(repo, obsolete.exchangeopt):
1410 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1410 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1411 caps['obsmarkers'] = supportedformat
1411 caps['obsmarkers'] = supportedformat
1412 if allowpushback:
1412 if allowpushback:
1413 caps['pushback'] = ()
1413 caps['pushback'] = ()
1414 cpmode = repo.ui.config('server', 'concurrent-push-mode')
1414 cpmode = repo.ui.config('server', 'concurrent-push-mode')
1415 if cpmode == 'check-related':
1415 if cpmode == 'check-related':
1416 caps['checkheads'] = ('related',)
1416 caps['checkheads'] = ('related',)
1417 if 'phases' in repo.ui.configlist('devel', 'legacy.exchange'):
1417 if 'phases' in repo.ui.configlist('devel', 'legacy.exchange'):
1418 caps.pop('phases')
1418 caps.pop('phases')
1419 return caps
1419 return caps
1420
1420
1421 def bundle2caps(remote):
1421 def bundle2caps(remote):
1422 """return the bundle capabilities of a peer as dict"""
1422 """return the bundle capabilities of a peer as dict"""
1423 raw = remote.capable('bundle2')
1423 raw = remote.capable('bundle2')
1424 if not raw and raw != '':
1424 if not raw and raw != '':
1425 return {}
1425 return {}
1426 capsblob = urlreq.unquote(remote.capable('bundle2'))
1426 capsblob = urlreq.unquote(remote.capable('bundle2'))
1427 return decodecaps(capsblob)
1427 return decodecaps(capsblob)
1428
1428
1429 def obsmarkersversion(caps):
1429 def obsmarkersversion(caps):
1430 """extract the list of supported obsmarkers versions from a bundle2caps dict
1430 """extract the list of supported obsmarkers versions from a bundle2caps dict
1431 """
1431 """
1432 obscaps = caps.get('obsmarkers', ())
1432 obscaps = caps.get('obsmarkers', ())
1433 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1433 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1434
1434
1435 def writenewbundle(ui, repo, source, filename, bundletype, outgoing, opts,
1435 def writenewbundle(ui, repo, source, filename, bundletype, outgoing, opts,
1436 vfs=None, compression=None, compopts=None):
1436 vfs=None, compression=None, compopts=None):
1437 if bundletype.startswith('HG10'):
1437 if bundletype.startswith('HG10'):
1438 cg = changegroup.makechangegroup(repo, outgoing, '01', source)
1438 cg = changegroup.makechangegroup(repo, outgoing, '01', source)
1439 return writebundle(ui, cg, filename, bundletype, vfs=vfs,
1439 return writebundle(ui, cg, filename, bundletype, vfs=vfs,
1440 compression=compression, compopts=compopts)
1440 compression=compression, compopts=compopts)
1441 elif not bundletype.startswith('HG20'):
1441 elif not bundletype.startswith('HG20'):
1442 raise error.ProgrammingError('unknown bundle type: %s' % bundletype)
1442 raise error.ProgrammingError('unknown bundle type: %s' % bundletype)
1443
1443
1444 caps = {}
1444 caps = {}
1445 if 'obsolescence' in opts:
1445 if 'obsolescence' in opts:
1446 caps['obsmarkers'] = ('V1',)
1446 caps['obsmarkers'] = ('V1',)
1447 bundle = bundle20(ui, caps)
1447 bundle = bundle20(ui, caps)
1448 bundle.setcompression(compression, compopts)
1448 bundle.setcompression(compression, compopts)
1449 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1449 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1450 chunkiter = bundle.getchunks()
1450 chunkiter = bundle.getchunks()
1451
1451
1452 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1452 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1453
1453
1454 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1454 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1455 # We should eventually reconcile this logic with the one behind
1455 # We should eventually reconcile this logic with the one behind
1456 # 'exchange.getbundle2partsgenerator'.
1456 # 'exchange.getbundle2partsgenerator'.
1457 #
1457 #
1458 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1458 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1459 # different right now. So we keep them separated for now for the sake of
1459 # different right now. So we keep them separated for now for the sake of
1460 # simplicity.
1460 # simplicity.
1461
1461
1462 # we always want a changegroup in such bundle
1462 # we always want a changegroup in such bundle
1463 cgversion = opts.get('cg.version')
1463 cgversion = opts.get('cg.version')
1464 if cgversion is None:
1464 if cgversion is None:
1465 cgversion = changegroup.safeversion(repo)
1465 cgversion = changegroup.safeversion(repo)
1466 cg = changegroup.makechangegroup(repo, outgoing, cgversion, source)
1466 cg = changegroup.makechangegroup(repo, outgoing, cgversion, source)
1467 part = bundler.newpart('changegroup', data=cg.getchunks())
1467 part = bundler.newpart('changegroup', data=cg.getchunks())
1468 part.addparam('version', cg.version)
1468 part.addparam('version', cg.version)
1469 if 'clcount' in cg.extras:
1469 if 'clcount' in cg.extras:
1470 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1470 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1471 mandatory=False)
1471 mandatory=False)
1472 if opts.get('phases') and repo.revs('%ln and secret()',
1472 if opts.get('phases') and repo.revs('%ln and secret()',
1473 outgoing.missingheads):
1473 outgoing.missingheads):
1474 part.addparam('targetphase', '%d' % phases.secret, mandatory=False)
1474 part.addparam('targetphase', '%d' % phases.secret, mandatory=False)
1475
1475
1476 addparttagsfnodescache(repo, bundler, outgoing)
1476 addparttagsfnodescache(repo, bundler, outgoing)
1477
1477
1478 if opts.get('obsolescence', False):
1478 if opts.get('obsolescence', False):
1479 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1479 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1480 buildobsmarkerspart(bundler, obsmarkers)
1480 buildobsmarkerspart(bundler, obsmarkers)
1481
1481
1482 if opts.get('phases', False):
1482 if opts.get('phases', False):
1483 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1483 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1484 phasedata = phases.binaryencode(headsbyphase)
1484 phasedata = phases.binaryencode(headsbyphase)
1485 bundler.newpart('phase-heads', data=phasedata)
1485 bundler.newpart('phase-heads', data=phasedata)
1486
1486
1487 def addparttagsfnodescache(repo, bundler, outgoing):
1487 def addparttagsfnodescache(repo, bundler, outgoing):
1488 # we include the tags fnode cache for the bundle changeset
1488 # we include the tags fnode cache for the bundle changeset
1489 # (as an optional parts)
1489 # (as an optional parts)
1490 cache = tags.hgtagsfnodescache(repo.unfiltered())
1490 cache = tags.hgtagsfnodescache(repo.unfiltered())
1491 chunks = []
1491 chunks = []
1492
1492
1493 # .hgtags fnodes are only relevant for head changesets. While we could
1493 # .hgtags fnodes are only relevant for head changesets. While we could
1494 # transfer values for all known nodes, there will likely be little to
1494 # transfer values for all known nodes, there will likely be little to
1495 # no benefit.
1495 # no benefit.
1496 #
1496 #
1497 # We don't bother using a generator to produce output data because
1497 # We don't bother using a generator to produce output data because
1498 # a) we only have 40 bytes per head and even esoteric numbers of heads
1498 # a) we only have 40 bytes per head and even esoteric numbers of heads
1499 # consume little memory (1M heads is 40MB) b) we don't want to send the
1499 # consume little memory (1M heads is 40MB) b) we don't want to send the
1500 # part if we don't have entries and knowing if we have entries requires
1500 # part if we don't have entries and knowing if we have entries requires
1501 # cache lookups.
1501 # cache lookups.
1502 for node in outgoing.missingheads:
1502 for node in outgoing.missingheads:
1503 # Don't compute missing, as this may slow down serving.
1503 # Don't compute missing, as this may slow down serving.
1504 fnode = cache.getfnode(node, computemissing=False)
1504 fnode = cache.getfnode(node, computemissing=False)
1505 if fnode is not None:
1505 if fnode is not None:
1506 chunks.extend([node, fnode])
1506 chunks.extend([node, fnode])
1507
1507
1508 if chunks:
1508 if chunks:
1509 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1509 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1510
1510
1511 def buildobsmarkerspart(bundler, markers):
1511 def buildobsmarkerspart(bundler, markers):
1512 """add an obsmarker part to the bundler with <markers>
1512 """add an obsmarker part to the bundler with <markers>
1513
1513
1514 No part is created if markers is empty.
1514 No part is created if markers is empty.
1515 Raises ValueError if the bundler doesn't support any known obsmarker format.
1515 Raises ValueError if the bundler doesn't support any known obsmarker format.
1516 """
1516 """
1517 if not markers:
1517 if not markers:
1518 return None
1518 return None
1519
1519
1520 remoteversions = obsmarkersversion(bundler.capabilities)
1520 remoteversions = obsmarkersversion(bundler.capabilities)
1521 version = obsolete.commonversion(remoteversions)
1521 version = obsolete.commonversion(remoteversions)
1522 if version is None:
1522 if version is None:
1523 raise ValueError('bundler does not support common obsmarker format')
1523 raise ValueError('bundler does not support common obsmarker format')
1524 stream = obsolete.encodemarkers(markers, True, version=version)
1524 stream = obsolete.encodemarkers(markers, True, version=version)
1525 return bundler.newpart('obsmarkers', data=stream)
1525 return bundler.newpart('obsmarkers', data=stream)
1526
1526
1527 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1527 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1528 compopts=None):
1528 compopts=None):
1529 """Write a bundle file and return its filename.
1529 """Write a bundle file and return its filename.
1530
1530
1531 Existing files will not be overwritten.
1531 Existing files will not be overwritten.
1532 If no filename is specified, a temporary file is created.
1532 If no filename is specified, a temporary file is created.
1533 bz2 compression can be turned off.
1533 bz2 compression can be turned off.
1534 The bundle file will be deleted in case of errors.
1534 The bundle file will be deleted in case of errors.
1535 """
1535 """
1536
1536
1537 if bundletype == "HG20":
1537 if bundletype == "HG20":
1538 bundle = bundle20(ui)
1538 bundle = bundle20(ui)
1539 bundle.setcompression(compression, compopts)
1539 bundle.setcompression(compression, compopts)
1540 part = bundle.newpart('changegroup', data=cg.getchunks())
1540 part = bundle.newpart('changegroup', data=cg.getchunks())
1541 part.addparam('version', cg.version)
1541 part.addparam('version', cg.version)
1542 if 'clcount' in cg.extras:
1542 if 'clcount' in cg.extras:
1543 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1543 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1544 mandatory=False)
1544 mandatory=False)
1545 chunkiter = bundle.getchunks()
1545 chunkiter = bundle.getchunks()
1546 else:
1546 else:
1547 # compression argument is only for the bundle2 case
1547 # compression argument is only for the bundle2 case
1548 assert compression is None
1548 assert compression is None
1549 if cg.version != '01':
1549 if cg.version != '01':
1550 raise error.Abort(_('old bundle types only supports v1 '
1550 raise error.Abort(_('old bundle types only supports v1 '
1551 'changegroups'))
1551 'changegroups'))
1552 header, comp = bundletypes[bundletype]
1552 header, comp = bundletypes[bundletype]
1553 if comp not in util.compengines.supportedbundletypes:
1553 if comp not in util.compengines.supportedbundletypes:
1554 raise error.Abort(_('unknown stream compression type: %s')
1554 raise error.Abort(_('unknown stream compression type: %s')
1555 % comp)
1555 % comp)
1556 compengine = util.compengines.forbundletype(comp)
1556 compengine = util.compengines.forbundletype(comp)
1557 def chunkiter():
1557 def chunkiter():
1558 yield header
1558 yield header
1559 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1559 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1560 yield chunk
1560 yield chunk
1561 chunkiter = chunkiter()
1561 chunkiter = chunkiter()
1562
1562
1563 # parse the changegroup data, otherwise we will block
1563 # parse the changegroup data, otherwise we will block
1564 # in case of sshrepo because we don't know the end of the stream
1564 # in case of sshrepo because we don't know the end of the stream
1565 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1565 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1566
1566
1567 def combinechangegroupresults(op):
1567 def combinechangegroupresults(op):
1568 """logic to combine 0 or more addchangegroup results into one"""
1568 """logic to combine 0 or more addchangegroup results into one"""
1569 results = [r.get('return', 0)
1569 results = [r.get('return', 0)
1570 for r in op.records['changegroup']]
1570 for r in op.records['changegroup']]
1571 changedheads = 0
1571 changedheads = 0
1572 result = 1
1572 result = 1
1573 for ret in results:
1573 for ret in results:
1574 # If any changegroup result is 0, return 0
1574 # If any changegroup result is 0, return 0
1575 if ret == 0:
1575 if ret == 0:
1576 result = 0
1576 result = 0
1577 break
1577 break
1578 if ret < -1:
1578 if ret < -1:
1579 changedheads += ret + 1
1579 changedheads += ret + 1
1580 elif ret > 1:
1580 elif ret > 1:
1581 changedheads += ret - 1
1581 changedheads += ret - 1
1582 if changedheads > 0:
1582 if changedheads > 0:
1583 result = 1 + changedheads
1583 result = 1 + changedheads
1584 elif changedheads < 0:
1584 elif changedheads < 0:
1585 result = -1 + changedheads
1585 result = -1 + changedheads
1586 return result
1586 return result
1587
1587
1588 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest',
1588 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest',
1589 'targetphase'))
1589 'targetphase'))
1590 def handlechangegroup(op, inpart):
1590 def handlechangegroup(op, inpart):
1591 """apply a changegroup part on the repo
1591 """apply a changegroup part on the repo
1592
1592
1593 This is a very early implementation that will massive rework before being
1593 This is a very early implementation that will massive rework before being
1594 inflicted to any end-user.
1594 inflicted to any end-user.
1595 """
1595 """
1596 tr = op.gettransaction()
1596 tr = op.gettransaction()
1597 unpackerversion = inpart.params.get('version', '01')
1597 unpackerversion = inpart.params.get('version', '01')
1598 # We should raise an appropriate exception here
1598 # We should raise an appropriate exception here
1599 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1599 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1600 # the source and url passed here are overwritten by the one contained in
1600 # the source and url passed here are overwritten by the one contained in
1601 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1601 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1602 nbchangesets = None
1602 nbchangesets = None
1603 if 'nbchanges' in inpart.params:
1603 if 'nbchanges' in inpart.params:
1604 nbchangesets = int(inpart.params.get('nbchanges'))
1604 nbchangesets = int(inpart.params.get('nbchanges'))
1605 if ('treemanifest' in inpart.params and
1605 if ('treemanifest' in inpart.params and
1606 'treemanifest' not in op.repo.requirements):
1606 'treemanifest' not in op.repo.requirements):
1607 if len(op.repo.changelog) != 0:
1607 if len(op.repo.changelog) != 0:
1608 raise error.Abort(_(
1608 raise error.Abort(_(
1609 "bundle contains tree manifests, but local repo is "
1609 "bundle contains tree manifests, but local repo is "
1610 "non-empty and does not use tree manifests"))
1610 "non-empty and does not use tree manifests"))
1611 op.repo.requirements.add('treemanifest')
1611 op.repo.requirements.add('treemanifest')
1612 op.repo._applyopenerreqs()
1612 op.repo._applyopenerreqs()
1613 op.repo._writerequirements()
1613 op.repo._writerequirements()
1614 extrakwargs = {}
1614 extrakwargs = {}
1615 targetphase = inpart.params.get('targetphase')
1615 targetphase = inpart.params.get('targetphase')
1616 if targetphase is not None:
1616 if targetphase is not None:
1617 extrakwargs['targetphase'] = int(targetphase)
1617 extrakwargs['targetphase'] = int(targetphase)
1618 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2',
1618 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2',
1619 expectedtotal=nbchangesets, **extrakwargs)
1619 expectedtotal=nbchangesets, **extrakwargs)
1620 if op.reply is not None:
1620 if op.reply is not None:
1621 # This is definitely not the final form of this
1621 # This is definitely not the final form of this
1622 # return. But one need to start somewhere.
1622 # return. But one need to start somewhere.
1623 part = op.reply.newpart('reply:changegroup', mandatory=False)
1623 part = op.reply.newpart('reply:changegroup', mandatory=False)
1624 part.addparam(
1624 part.addparam(
1625 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1625 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1626 part.addparam('return', '%i' % ret, mandatory=False)
1626 part.addparam('return', '%i' % ret, mandatory=False)
1627 assert not inpart.read()
1627 assert not inpart.read()
1628
1628
1629 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1629 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1630 ['digest:%s' % k for k in util.DIGESTS.keys()])
1630 ['digest:%s' % k for k in util.DIGESTS.keys()])
1631 @parthandler('remote-changegroup', _remotechangegroupparams)
1631 @parthandler('remote-changegroup', _remotechangegroupparams)
1632 def handleremotechangegroup(op, inpart):
1632 def handleremotechangegroup(op, inpart):
1633 """apply a bundle10 on the repo, given an url and validation information
1633 """apply a bundle10 on the repo, given an url and validation information
1634
1634
1635 All the information about the remote bundle to import are given as
1635 All the information about the remote bundle to import are given as
1636 parameters. The parameters include:
1636 parameters. The parameters include:
1637 - url: the url to the bundle10.
1637 - url: the url to the bundle10.
1638 - size: the bundle10 file size. It is used to validate what was
1638 - size: the bundle10 file size. It is used to validate what was
1639 retrieved by the client matches the server knowledge about the bundle.
1639 retrieved by the client matches the server knowledge about the bundle.
1640 - digests: a space separated list of the digest types provided as
1640 - digests: a space separated list of the digest types provided as
1641 parameters.
1641 parameters.
1642 - digest:<digest-type>: the hexadecimal representation of the digest with
1642 - digest:<digest-type>: the hexadecimal representation of the digest with
1643 that name. Like the size, it is used to validate what was retrieved by
1643 that name. Like the size, it is used to validate what was retrieved by
1644 the client matches what the server knows about the bundle.
1644 the client matches what the server knows about the bundle.
1645
1645
1646 When multiple digest types are given, all of them are checked.
1646 When multiple digest types are given, all of them are checked.
1647 """
1647 """
1648 try:
1648 try:
1649 raw_url = inpart.params['url']
1649 raw_url = inpart.params['url']
1650 except KeyError:
1650 except KeyError:
1651 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1651 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1652 parsed_url = util.url(raw_url)
1652 parsed_url = util.url(raw_url)
1653 if parsed_url.scheme not in capabilities['remote-changegroup']:
1653 if parsed_url.scheme not in capabilities['remote-changegroup']:
1654 raise error.Abort(_('remote-changegroup does not support %s urls') %
1654 raise error.Abort(_('remote-changegroup does not support %s urls') %
1655 parsed_url.scheme)
1655 parsed_url.scheme)
1656
1656
1657 try:
1657 try:
1658 size = int(inpart.params['size'])
1658 size = int(inpart.params['size'])
1659 except ValueError:
1659 except ValueError:
1660 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1660 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1661 % 'size')
1661 % 'size')
1662 except KeyError:
1662 except KeyError:
1663 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1663 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1664
1664
1665 digests = {}
1665 digests = {}
1666 for typ in inpart.params.get('digests', '').split():
1666 for typ in inpart.params.get('digests', '').split():
1667 param = 'digest:%s' % typ
1667 param = 'digest:%s' % typ
1668 try:
1668 try:
1669 value = inpart.params[param]
1669 value = inpart.params[param]
1670 except KeyError:
1670 except KeyError:
1671 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1671 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1672 param)
1672 param)
1673 digests[typ] = value
1673 digests[typ] = value
1674
1674
1675 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1675 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1676
1676
1677 tr = op.gettransaction()
1677 tr = op.gettransaction()
1678 from . import exchange
1678 from . import exchange
1679 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1679 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1680 if not isinstance(cg, changegroup.cg1unpacker):
1680 if not isinstance(cg, changegroup.cg1unpacker):
1681 raise error.Abort(_('%s: not a bundle version 1.0') %
1681 raise error.Abort(_('%s: not a bundle version 1.0') %
1682 util.hidepassword(raw_url))
1682 util.hidepassword(raw_url))
1683 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2')
1683 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2')
1684 if op.reply is not None:
1684 if op.reply is not None:
1685 # This is definitely not the final form of this
1685 # This is definitely not the final form of this
1686 # return. But one need to start somewhere.
1686 # return. But one need to start somewhere.
1687 part = op.reply.newpart('reply:changegroup')
1687 part = op.reply.newpart('reply:changegroup')
1688 part.addparam(
1688 part.addparam(
1689 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1689 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1690 part.addparam('return', '%i' % ret, mandatory=False)
1690 part.addparam('return', '%i' % ret, mandatory=False)
1691 try:
1691 try:
1692 real_part.validate()
1692 real_part.validate()
1693 except error.Abort as e:
1693 except error.Abort as e:
1694 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1694 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1695 (util.hidepassword(raw_url), str(e)))
1695 (util.hidepassword(raw_url), str(e)))
1696 assert not inpart.read()
1696 assert not inpart.read()
1697
1697
1698 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1698 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1699 def handlereplychangegroup(op, inpart):
1699 def handlereplychangegroup(op, inpart):
1700 ret = int(inpart.params['return'])
1700 ret = int(inpart.params['return'])
1701 replyto = int(inpart.params['in-reply-to'])
1701 replyto = int(inpart.params['in-reply-to'])
1702 op.records.add('changegroup', {'return': ret}, replyto)
1702 op.records.add('changegroup', {'return': ret}, replyto)
1703
1703
1704 @parthandler('check:heads')
1704 @parthandler('check:heads')
1705 def handlecheckheads(op, inpart):
1705 def handlecheckheads(op, inpart):
1706 """check that head of the repo did not change
1706 """check that head of the repo did not change
1707
1707
1708 This is used to detect a push race when using unbundle.
1708 This is used to detect a push race when using unbundle.
1709 This replaces the "heads" argument of unbundle."""
1709 This replaces the "heads" argument of unbundle."""
1710 h = inpart.read(20)
1710 h = inpart.read(20)
1711 heads = []
1711 heads = []
1712 while len(h) == 20:
1712 while len(h) == 20:
1713 heads.append(h)
1713 heads.append(h)
1714 h = inpart.read(20)
1714 h = inpart.read(20)
1715 assert not h
1715 assert not h
1716 # Trigger a transaction so that we are guaranteed to have the lock now.
1716 # Trigger a transaction so that we are guaranteed to have the lock now.
1717 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1717 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1718 op.gettransaction()
1718 op.gettransaction()
1719 if sorted(heads) != sorted(op.repo.heads()):
1719 if sorted(heads) != sorted(op.repo.heads()):
1720 raise error.PushRaced('repository changed while pushing - '
1720 raise error.PushRaced('repository changed while pushing - '
1721 'please try again')
1721 'please try again')
1722
1722
1723 @parthandler('check:updated-heads')
1723 @parthandler('check:updated-heads')
1724 def handlecheckupdatedheads(op, inpart):
1724 def handlecheckupdatedheads(op, inpart):
1725 """check for race on the heads touched by a push
1725 """check for race on the heads touched by a push
1726
1726
1727 This is similar to 'check:heads' but focus on the heads actually updated
1727 This is similar to 'check:heads' but focus on the heads actually updated
1728 during the push. If other activities happen on unrelated heads, it is
1728 during the push. If other activities happen on unrelated heads, it is
1729 ignored.
1729 ignored.
1730
1730
1731 This allow server with high traffic to avoid push contention as long as
1731 This allow server with high traffic to avoid push contention as long as
1732 unrelated parts of the graph are involved."""
1732 unrelated parts of the graph are involved."""
1733 h = inpart.read(20)
1733 h = inpart.read(20)
1734 heads = []
1734 heads = []
1735 while len(h) == 20:
1735 while len(h) == 20:
1736 heads.append(h)
1736 heads.append(h)
1737 h = inpart.read(20)
1737 h = inpart.read(20)
1738 assert not h
1738 assert not h
1739 # trigger a transaction so that we are guaranteed to have the lock now.
1739 # trigger a transaction so that we are guaranteed to have the lock now.
1740 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1740 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1741 op.gettransaction()
1741 op.gettransaction()
1742
1742
1743 currentheads = set()
1743 currentheads = set()
1744 for ls in op.repo.branchmap().itervalues():
1744 for ls in op.repo.branchmap().itervalues():
1745 currentheads.update(ls)
1745 currentheads.update(ls)
1746
1746
1747 for h in heads:
1747 for h in heads:
1748 if h not in currentheads:
1748 if h not in currentheads:
1749 raise error.PushRaced('repository changed while pushing - '
1749 raise error.PushRaced('repository changed while pushing - '
1750 'please try again')
1750 'please try again')
1751
1751
1752 @parthandler('output')
1752 @parthandler('output')
1753 def handleoutput(op, inpart):
1753 def handleoutput(op, inpart):
1754 """forward output captured on the server to the client"""
1754 """forward output captured on the server to the client"""
1755 for line in inpart.read().splitlines():
1755 for line in inpart.read().splitlines():
1756 op.ui.status(_('remote: %s\n') % line)
1756 op.ui.status(_('remote: %s\n') % line)
1757
1757
1758 @parthandler('replycaps')
1758 @parthandler('replycaps')
1759 def handlereplycaps(op, inpart):
1759 def handlereplycaps(op, inpart):
1760 """Notify that a reply bundle should be created
1760 """Notify that a reply bundle should be created
1761
1761
1762 The payload contains the capabilities information for the reply"""
1762 The payload contains the capabilities information for the reply"""
1763 caps = decodecaps(inpart.read())
1763 caps = decodecaps(inpart.read())
1764 if op.reply is None:
1764 if op.reply is None:
1765 op.reply = bundle20(op.ui, caps)
1765 op.reply = bundle20(op.ui, caps)
1766
1766
1767 class AbortFromPart(error.Abort):
1767 class AbortFromPart(error.Abort):
1768 """Sub-class of Abort that denotes an error from a bundle2 part."""
1768 """Sub-class of Abort that denotes an error from a bundle2 part."""
1769
1769
1770 @parthandler('error:abort', ('message', 'hint'))
1770 @parthandler('error:abort', ('message', 'hint'))
1771 def handleerrorabort(op, inpart):
1771 def handleerrorabort(op, inpart):
1772 """Used to transmit abort error over the wire"""
1772 """Used to transmit abort error over the wire"""
1773 raise AbortFromPart(inpart.params['message'],
1773 raise AbortFromPart(inpart.params['message'],
1774 hint=inpart.params.get('hint'))
1774 hint=inpart.params.get('hint'))
1775
1775
1776 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1776 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1777 'in-reply-to'))
1777 'in-reply-to'))
1778 def handleerrorpushkey(op, inpart):
1778 def handleerrorpushkey(op, inpart):
1779 """Used to transmit failure of a mandatory pushkey over the wire"""
1779 """Used to transmit failure of a mandatory pushkey over the wire"""
1780 kwargs = {}
1780 kwargs = {}
1781 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1781 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1782 value = inpart.params.get(name)
1782 value = inpart.params.get(name)
1783 if value is not None:
1783 if value is not None:
1784 kwargs[name] = value
1784 kwargs[name] = value
1785 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1785 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1786
1786
1787 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1787 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1788 def handleerrorunsupportedcontent(op, inpart):
1788 def handleerrorunsupportedcontent(op, inpart):
1789 """Used to transmit unknown content error over the wire"""
1789 """Used to transmit unknown content error over the wire"""
1790 kwargs = {}
1790 kwargs = {}
1791 parttype = inpart.params.get('parttype')
1791 parttype = inpart.params.get('parttype')
1792 if parttype is not None:
1792 if parttype is not None:
1793 kwargs['parttype'] = parttype
1793 kwargs['parttype'] = parttype
1794 params = inpart.params.get('params')
1794 params = inpart.params.get('params')
1795 if params is not None:
1795 if params is not None:
1796 kwargs['params'] = params.split('\0')
1796 kwargs['params'] = params.split('\0')
1797
1797
1798 raise error.BundleUnknownFeatureError(**kwargs)
1798 raise error.BundleUnknownFeatureError(**kwargs)
1799
1799
1800 @parthandler('error:pushraced', ('message',))
1800 @parthandler('error:pushraced', ('message',))
1801 def handleerrorpushraced(op, inpart):
1801 def handleerrorpushraced(op, inpart):
1802 """Used to transmit push race error over the wire"""
1802 """Used to transmit push race error over the wire"""
1803 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1803 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1804
1804
1805 @parthandler('listkeys', ('namespace',))
1805 @parthandler('listkeys', ('namespace',))
1806 def handlelistkeys(op, inpart):
1806 def handlelistkeys(op, inpart):
1807 """retrieve pushkey namespace content stored in a bundle2"""
1807 """retrieve pushkey namespace content stored in a bundle2"""
1808 namespace = inpart.params['namespace']
1808 namespace = inpart.params['namespace']
1809 r = pushkey.decodekeys(inpart.read())
1809 r = pushkey.decodekeys(inpart.read())
1810 op.records.add('listkeys', (namespace, r))
1810 op.records.add('listkeys', (namespace, r))
1811
1811
1812 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1812 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1813 def handlepushkey(op, inpart):
1813 def handlepushkey(op, inpart):
1814 """process a pushkey request"""
1814 """process a pushkey request"""
1815 dec = pushkey.decode
1815 dec = pushkey.decode
1816 namespace = dec(inpart.params['namespace'])
1816 namespace = dec(inpart.params['namespace'])
1817 key = dec(inpart.params['key'])
1817 key = dec(inpart.params['key'])
1818 old = dec(inpart.params['old'])
1818 old = dec(inpart.params['old'])
1819 new = dec(inpart.params['new'])
1819 new = dec(inpart.params['new'])
1820 # Grab the transaction to ensure that we have the lock before performing the
1820 # Grab the transaction to ensure that we have the lock before performing the
1821 # pushkey.
1821 # pushkey.
1822 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1822 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1823 op.gettransaction()
1823 op.gettransaction()
1824 ret = op.repo.pushkey(namespace, key, old, new)
1824 ret = op.repo.pushkey(namespace, key, old, new)
1825 record = {'namespace': namespace,
1825 record = {'namespace': namespace,
1826 'key': key,
1826 'key': key,
1827 'old': old,
1827 'old': old,
1828 'new': new}
1828 'new': new}
1829 op.records.add('pushkey', record)
1829 op.records.add('pushkey', record)
1830 if op.reply is not None:
1830 if op.reply is not None:
1831 rpart = op.reply.newpart('reply:pushkey')
1831 rpart = op.reply.newpart('reply:pushkey')
1832 rpart.addparam(
1832 rpart.addparam(
1833 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1833 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1834 rpart.addparam('return', '%i' % ret, mandatory=False)
1834 rpart.addparam('return', '%i' % ret, mandatory=False)
1835 if inpart.mandatory and not ret:
1835 if inpart.mandatory and not ret:
1836 kwargs = {}
1836 kwargs = {}
1837 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1837 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1838 if key in inpart.params:
1838 if key in inpart.params:
1839 kwargs[key] = inpart.params[key]
1839 kwargs[key] = inpart.params[key]
1840 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1840 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1841
1841
1842 @parthandler('phase-heads')
1842 @parthandler('phase-heads')
1843 def handlephases(op, inpart):
1843 def handlephases(op, inpart):
1844 """apply phases from bundle part to repo"""
1844 """apply phases from bundle part to repo"""
1845 headsbyphase = phases.binarydecode(inpart)
1845 headsbyphase = phases.binarydecode(inpart)
1846 phases.updatephases(op.repo.unfiltered(), op.gettransaction, headsbyphase)
1846 phases.updatephases(op.repo.unfiltered(), op.gettransaction, headsbyphase)
1847
1847
1848 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1848 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1849 def handlepushkeyreply(op, inpart):
1849 def handlepushkeyreply(op, inpart):
1850 """retrieve the result of a pushkey request"""
1850 """retrieve the result of a pushkey request"""
1851 ret = int(inpart.params['return'])
1851 ret = int(inpart.params['return'])
1852 partid = int(inpart.params['in-reply-to'])
1852 partid = int(inpart.params['in-reply-to'])
1853 op.records.add('pushkey', {'return': ret}, partid)
1853 op.records.add('pushkey', {'return': ret}, partid)
1854
1854
1855 @parthandler('obsmarkers')
1855 @parthandler('obsmarkers')
1856 def handleobsmarker(op, inpart):
1856 def handleobsmarker(op, inpart):
1857 """add a stream of obsmarkers to the repo"""
1857 """add a stream of obsmarkers to the repo"""
1858 tr = op.gettransaction()
1858 tr = op.gettransaction()
1859 markerdata = inpart.read()
1859 markerdata = inpart.read()
1860 if op.ui.config('experimental', 'obsmarkers-exchange-debug'):
1860 if op.ui.config('experimental', 'obsmarkers-exchange-debug'):
1861 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1861 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1862 % len(markerdata))
1862 % len(markerdata))
1863 # The mergemarkers call will crash if marker creation is not enabled.
1863 # The mergemarkers call will crash if marker creation is not enabled.
1864 # we want to avoid this if the part is advisory.
1864 # we want to avoid this if the part is advisory.
1865 if not inpart.mandatory and op.repo.obsstore.readonly:
1865 if not inpart.mandatory and op.repo.obsstore.readonly:
1866 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1866 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1867 return
1867 return
1868 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1868 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1869 op.repo.invalidatevolatilesets()
1869 op.repo.invalidatevolatilesets()
1870 if new:
1870 if new:
1871 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1871 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1872 op.records.add('obsmarkers', {'new': new})
1872 op.records.add('obsmarkers', {'new': new})
1873 if op.reply is not None:
1873 if op.reply is not None:
1874 rpart = op.reply.newpart('reply:obsmarkers')
1874 rpart = op.reply.newpart('reply:obsmarkers')
1875 rpart.addparam(
1875 rpart.addparam(
1876 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1876 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1877 rpart.addparam('new', '%i' % new, mandatory=False)
1877 rpart.addparam('new', '%i' % new, mandatory=False)
1878
1878
1879
1879
1880 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1880 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1881 def handleobsmarkerreply(op, inpart):
1881 def handleobsmarkerreply(op, inpart):
1882 """retrieve the result of a pushkey request"""
1882 """retrieve the result of a pushkey request"""
1883 ret = int(inpart.params['new'])
1883 ret = int(inpart.params['new'])
1884 partid = int(inpart.params['in-reply-to'])
1884 partid = int(inpart.params['in-reply-to'])
1885 op.records.add('obsmarkers', {'new': ret}, partid)
1885 op.records.add('obsmarkers', {'new': ret}, partid)
1886
1886
1887 @parthandler('hgtagsfnodes')
1887 @parthandler('hgtagsfnodes')
1888 def handlehgtagsfnodes(op, inpart):
1888 def handlehgtagsfnodes(op, inpart):
1889 """Applies .hgtags fnodes cache entries to the local repo.
1889 """Applies .hgtags fnodes cache entries to the local repo.
1890
1890
1891 Payload is pairs of 20 byte changeset nodes and filenodes.
1891 Payload is pairs of 20 byte changeset nodes and filenodes.
1892 """
1892 """
1893 # Grab the transaction so we ensure that we have the lock at this point.
1893 # Grab the transaction so we ensure that we have the lock at this point.
1894 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1894 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1895 op.gettransaction()
1895 op.gettransaction()
1896 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1896 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1897
1897
1898 count = 0
1898 count = 0
1899 while True:
1899 while True:
1900 node = inpart.read(20)
1900 node = inpart.read(20)
1901 fnode = inpart.read(20)
1901 fnode = inpart.read(20)
1902 if len(node) < 20 or len(fnode) < 20:
1902 if len(node) < 20 or len(fnode) < 20:
1903 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1903 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1904 break
1904 break
1905 cache.setfnode(node, fnode)
1905 cache.setfnode(node, fnode)
1906 count += 1
1906 count += 1
1907
1907
1908 cache.write()
1908 cache.write()
1909 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
1909 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
1910
1910
1911 @parthandler('pushvars')
1911 @parthandler('pushvars')
1912 def bundle2getvars(op, part):
1912 def bundle2getvars(op, part):
1913 '''unbundle a bundle2 containing shellvars on the server'''
1913 '''unbundle a bundle2 containing shellvars on the server'''
1914 # An option to disable unbundling on server-side for security reasons
1914 # An option to disable unbundling on server-side for security reasons
1915 if op.ui.configbool('push', 'pushvars.server'):
1915 if op.ui.configbool('push', 'pushvars.server'):
1916 hookargs = {}
1916 hookargs = {}
1917 for key, value in part.advisoryparams:
1917 for key, value in part.advisoryparams:
1918 key = key.upper()
1918 key = key.upper()
1919 # We want pushed variables to have USERVAR_ prepended so we know
1919 # We want pushed variables to have USERVAR_ prepended so we know
1920 # they came from the --pushvar flag.
1920 # they came from the --pushvar flag.
1921 key = "USERVAR_" + key
1921 key = "USERVAR_" + key
1922 hookargs[key] = value
1922 hookargs[key] = value
1923 op.addhookargs(hookargs)
1923 op.addhookargs(hookargs)
General Comments 0
You need to be logged in to leave comments. Login now