##// END OF EJS Templates
phase: introduce a new 'check:phases' part...
Boris Feld -
r34821:a95067b1 default
parent child Browse files
Show More
@@ -1,1923 +1,1945 b''
1 # bundle2.py - generic container format to transmit arbitrary data.
1 # bundle2.py - generic container format to transmit arbitrary data.
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7 """Handling of the new bundle2 format
7 """Handling of the new bundle2 format
8
8
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 payloads in an application agnostic way. It consist in a sequence of "parts"
10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 that will be handed to and processed by the application layer.
11 that will be handed to and processed by the application layer.
12
12
13
13
14 General format architecture
14 General format architecture
15 ===========================
15 ===========================
16
16
17 The format is architectured as follow
17 The format is architectured as follow
18
18
19 - magic string
19 - magic string
20 - stream level parameters
20 - stream level parameters
21 - payload parts (any number)
21 - payload parts (any number)
22 - end of stream marker.
22 - end of stream marker.
23
23
24 the Binary format
24 the Binary format
25 ============================
25 ============================
26
26
27 All numbers are unsigned and big-endian.
27 All numbers are unsigned and big-endian.
28
28
29 stream level parameters
29 stream level parameters
30 ------------------------
30 ------------------------
31
31
32 Binary format is as follow
32 Binary format is as follow
33
33
34 :params size: int32
34 :params size: int32
35
35
36 The total number of Bytes used by the parameters
36 The total number of Bytes used by the parameters
37
37
38 :params value: arbitrary number of Bytes
38 :params value: arbitrary number of Bytes
39
39
40 A blob of `params size` containing the serialized version of all stream level
40 A blob of `params size` containing the serialized version of all stream level
41 parameters.
41 parameters.
42
42
43 The blob contains a space separated list of parameters. Parameters with value
43 The blob contains a space separated list of parameters. Parameters with value
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45
45
46 Empty name are obviously forbidden.
46 Empty name are obviously forbidden.
47
47
48 Name MUST start with a letter. If this first letter is lower case, the
48 Name MUST start with a letter. If this first letter is lower case, the
49 parameter is advisory and can be safely ignored. However when the first
49 parameter is advisory and can be safely ignored. However when the first
50 letter is capital, the parameter is mandatory and the bundling process MUST
50 letter is capital, the parameter is mandatory and the bundling process MUST
51 stop if he is not able to proceed it.
51 stop if he is not able to proceed it.
52
52
53 Stream parameters use a simple textual format for two main reasons:
53 Stream parameters use a simple textual format for two main reasons:
54
54
55 - Stream level parameters should remain simple and we want to discourage any
55 - Stream level parameters should remain simple and we want to discourage any
56 crazy usage.
56 crazy usage.
57 - Textual data allow easy human inspection of a bundle2 header in case of
57 - Textual data allow easy human inspection of a bundle2 header in case of
58 troubles.
58 troubles.
59
59
60 Any Applicative level options MUST go into a bundle2 part instead.
60 Any Applicative level options MUST go into a bundle2 part instead.
61
61
62 Payload part
62 Payload part
63 ------------------------
63 ------------------------
64
64
65 Binary format is as follow
65 Binary format is as follow
66
66
67 :header size: int32
67 :header size: int32
68
68
69 The total number of Bytes used by the part header. When the header is empty
69 The total number of Bytes used by the part header. When the header is empty
70 (size = 0) this is interpreted as the end of stream marker.
70 (size = 0) this is interpreted as the end of stream marker.
71
71
72 :header:
72 :header:
73
73
74 The header defines how to interpret the part. It contains two piece of
74 The header defines how to interpret the part. It contains two piece of
75 data: the part type, and the part parameters.
75 data: the part type, and the part parameters.
76
76
77 The part type is used to route an application level handler, that can
77 The part type is used to route an application level handler, that can
78 interpret payload.
78 interpret payload.
79
79
80 Part parameters are passed to the application level handler. They are
80 Part parameters are passed to the application level handler. They are
81 meant to convey information that will help the application level object to
81 meant to convey information that will help the application level object to
82 interpret the part payload.
82 interpret the part payload.
83
83
84 The binary format of the header is has follow
84 The binary format of the header is has follow
85
85
86 :typesize: (one byte)
86 :typesize: (one byte)
87
87
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89
89
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 to this part.
91 to this part.
92
92
93 :parameters:
93 :parameters:
94
94
95 Part's parameter may have arbitrary content, the binary structure is::
95 Part's parameter may have arbitrary content, the binary structure is::
96
96
97 <mandatory-count><advisory-count><param-sizes><param-data>
97 <mandatory-count><advisory-count><param-sizes><param-data>
98
98
99 :mandatory-count: 1 byte, number of mandatory parameters
99 :mandatory-count: 1 byte, number of mandatory parameters
100
100
101 :advisory-count: 1 byte, number of advisory parameters
101 :advisory-count: 1 byte, number of advisory parameters
102
102
103 :param-sizes:
103 :param-sizes:
104
104
105 N couple of bytes, where N is the total number of parameters. Each
105 N couple of bytes, where N is the total number of parameters. Each
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107
107
108 :param-data:
108 :param-data:
109
109
110 A blob of bytes from which each parameter key and value can be
110 A blob of bytes from which each parameter key and value can be
111 retrieved using the list of size couples stored in the previous
111 retrieved using the list of size couples stored in the previous
112 field.
112 field.
113
113
114 Mandatory parameters comes first, then the advisory ones.
114 Mandatory parameters comes first, then the advisory ones.
115
115
116 Each parameter's key MUST be unique within the part.
116 Each parameter's key MUST be unique within the part.
117
117
118 :payload:
118 :payload:
119
119
120 payload is a series of `<chunksize><chunkdata>`.
120 payload is a series of `<chunksize><chunkdata>`.
121
121
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124
124
125 The current implementation always produces either zero or one chunk.
125 The current implementation always produces either zero or one chunk.
126 This is an implementation limitation that will ultimately be lifted.
126 This is an implementation limitation that will ultimately be lifted.
127
127
128 `chunksize` can be negative to trigger special case processing. No such
128 `chunksize` can be negative to trigger special case processing. No such
129 processing is in place yet.
129 processing is in place yet.
130
130
131 Bundle processing
131 Bundle processing
132 ============================
132 ============================
133
133
134 Each part is processed in order using a "part handler". Handler are registered
134 Each part is processed in order using a "part handler". Handler are registered
135 for a certain part type.
135 for a certain part type.
136
136
137 The matching of a part to its handler is case insensitive. The case of the
137 The matching of a part to its handler is case insensitive. The case of the
138 part type is used to know if a part is mandatory or advisory. If the Part type
138 part type is used to know if a part is mandatory or advisory. If the Part type
139 contains any uppercase char it is considered mandatory. When no handler is
139 contains any uppercase char it is considered mandatory. When no handler is
140 known for a Mandatory part, the process is aborted and an exception is raised.
140 known for a Mandatory part, the process is aborted and an exception is raised.
141 If the part is advisory and no handler is known, the part is ignored. When the
141 If the part is advisory and no handler is known, the part is ignored. When the
142 process is aborted, the full bundle is still read from the stream to keep the
142 process is aborted, the full bundle is still read from the stream to keep the
143 channel usable. But none of the part read from an abort are processed. In the
143 channel usable. But none of the part read from an abort are processed. In the
144 future, dropping the stream may become an option for channel we do not care to
144 future, dropping the stream may become an option for channel we do not care to
145 preserve.
145 preserve.
146 """
146 """
147
147
148 from __future__ import absolute_import, division
148 from __future__ import absolute_import, division
149
149
150 import errno
150 import errno
151 import re
151 import re
152 import string
152 import string
153 import struct
153 import struct
154 import sys
154 import sys
155
155
156 from .i18n import _
156 from .i18n import _
157 from . import (
157 from . import (
158 changegroup,
158 changegroup,
159 error,
159 error,
160 node as nodemod,
160 obsolete,
161 obsolete,
161 phases,
162 phases,
162 pushkey,
163 pushkey,
163 pycompat,
164 pycompat,
164 tags,
165 tags,
165 url,
166 url,
166 util,
167 util,
167 )
168 )
168
169
169 urlerr = util.urlerr
170 urlerr = util.urlerr
170 urlreq = util.urlreq
171 urlreq = util.urlreq
171
172
172 _pack = struct.pack
173 _pack = struct.pack
173 _unpack = struct.unpack
174 _unpack = struct.unpack
174
175
175 _fstreamparamsize = '>i'
176 _fstreamparamsize = '>i'
176 _fpartheadersize = '>i'
177 _fpartheadersize = '>i'
177 _fparttypesize = '>B'
178 _fparttypesize = '>B'
178 _fpartid = '>I'
179 _fpartid = '>I'
179 _fpayloadsize = '>i'
180 _fpayloadsize = '>i'
180 _fpartparamcount = '>BB'
181 _fpartparamcount = '>BB'
181
182
182 preferedchunksize = 4096
183 preferedchunksize = 4096
183
184
184 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
185 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
185
186
186 def outdebug(ui, message):
187 def outdebug(ui, message):
187 """debug regarding output stream (bundling)"""
188 """debug regarding output stream (bundling)"""
188 if ui.configbool('devel', 'bundle2.debug'):
189 if ui.configbool('devel', 'bundle2.debug'):
189 ui.debug('bundle2-output: %s\n' % message)
190 ui.debug('bundle2-output: %s\n' % message)
190
191
191 def indebug(ui, message):
192 def indebug(ui, message):
192 """debug on input stream (unbundling)"""
193 """debug on input stream (unbundling)"""
193 if ui.configbool('devel', 'bundle2.debug'):
194 if ui.configbool('devel', 'bundle2.debug'):
194 ui.debug('bundle2-input: %s\n' % message)
195 ui.debug('bundle2-input: %s\n' % message)
195
196
196 def validateparttype(parttype):
197 def validateparttype(parttype):
197 """raise ValueError if a parttype contains invalid character"""
198 """raise ValueError if a parttype contains invalid character"""
198 if _parttypeforbidden.search(parttype):
199 if _parttypeforbidden.search(parttype):
199 raise ValueError(parttype)
200 raise ValueError(parttype)
200
201
201 def _makefpartparamsizes(nbparams):
202 def _makefpartparamsizes(nbparams):
202 """return a struct format to read part parameter sizes
203 """return a struct format to read part parameter sizes
203
204
204 The number parameters is variable so we need to build that format
205 The number parameters is variable so we need to build that format
205 dynamically.
206 dynamically.
206 """
207 """
207 return '>'+('BB'*nbparams)
208 return '>'+('BB'*nbparams)
208
209
209 parthandlermapping = {}
210 parthandlermapping = {}
210
211
211 def parthandler(parttype, params=()):
212 def parthandler(parttype, params=()):
212 """decorator that register a function as a bundle2 part handler
213 """decorator that register a function as a bundle2 part handler
213
214
214 eg::
215 eg::
215
216
216 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
217 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
217 def myparttypehandler(...):
218 def myparttypehandler(...):
218 '''process a part of type "my part".'''
219 '''process a part of type "my part".'''
219 ...
220 ...
220 """
221 """
221 validateparttype(parttype)
222 validateparttype(parttype)
222 def _decorator(func):
223 def _decorator(func):
223 lparttype = parttype.lower() # enforce lower case matching.
224 lparttype = parttype.lower() # enforce lower case matching.
224 assert lparttype not in parthandlermapping
225 assert lparttype not in parthandlermapping
225 parthandlermapping[lparttype] = func
226 parthandlermapping[lparttype] = func
226 func.params = frozenset(params)
227 func.params = frozenset(params)
227 return func
228 return func
228 return _decorator
229 return _decorator
229
230
230 class unbundlerecords(object):
231 class unbundlerecords(object):
231 """keep record of what happens during and unbundle
232 """keep record of what happens during and unbundle
232
233
233 New records are added using `records.add('cat', obj)`. Where 'cat' is a
234 New records are added using `records.add('cat', obj)`. Where 'cat' is a
234 category of record and obj is an arbitrary object.
235 category of record and obj is an arbitrary object.
235
236
236 `records['cat']` will return all entries of this category 'cat'.
237 `records['cat']` will return all entries of this category 'cat'.
237
238
238 Iterating on the object itself will yield `('category', obj)` tuples
239 Iterating on the object itself will yield `('category', obj)` tuples
239 for all entries.
240 for all entries.
240
241
241 All iterations happens in chronological order.
242 All iterations happens in chronological order.
242 """
243 """
243
244
244 def __init__(self):
245 def __init__(self):
245 self._categories = {}
246 self._categories = {}
246 self._sequences = []
247 self._sequences = []
247 self._replies = {}
248 self._replies = {}
248
249
249 def add(self, category, entry, inreplyto=None):
250 def add(self, category, entry, inreplyto=None):
250 """add a new record of a given category.
251 """add a new record of a given category.
251
252
252 The entry can then be retrieved in the list returned by
253 The entry can then be retrieved in the list returned by
253 self['category']."""
254 self['category']."""
254 self._categories.setdefault(category, []).append(entry)
255 self._categories.setdefault(category, []).append(entry)
255 self._sequences.append((category, entry))
256 self._sequences.append((category, entry))
256 if inreplyto is not None:
257 if inreplyto is not None:
257 self.getreplies(inreplyto).add(category, entry)
258 self.getreplies(inreplyto).add(category, entry)
258
259
259 def getreplies(self, partid):
260 def getreplies(self, partid):
260 """get the records that are replies to a specific part"""
261 """get the records that are replies to a specific part"""
261 return self._replies.setdefault(partid, unbundlerecords())
262 return self._replies.setdefault(partid, unbundlerecords())
262
263
263 def __getitem__(self, cat):
264 def __getitem__(self, cat):
264 return tuple(self._categories.get(cat, ()))
265 return tuple(self._categories.get(cat, ()))
265
266
266 def __iter__(self):
267 def __iter__(self):
267 return iter(self._sequences)
268 return iter(self._sequences)
268
269
269 def __len__(self):
270 def __len__(self):
270 return len(self._sequences)
271 return len(self._sequences)
271
272
272 def __nonzero__(self):
273 def __nonzero__(self):
273 return bool(self._sequences)
274 return bool(self._sequences)
274
275
275 __bool__ = __nonzero__
276 __bool__ = __nonzero__
276
277
277 class bundleoperation(object):
278 class bundleoperation(object):
278 """an object that represents a single bundling process
279 """an object that represents a single bundling process
279
280
280 Its purpose is to carry unbundle-related objects and states.
281 Its purpose is to carry unbundle-related objects and states.
281
282
282 A new object should be created at the beginning of each bundle processing.
283 A new object should be created at the beginning of each bundle processing.
283 The object is to be returned by the processing function.
284 The object is to be returned by the processing function.
284
285
285 The object has very little content now it will ultimately contain:
286 The object has very little content now it will ultimately contain:
286 * an access to the repo the bundle is applied to,
287 * an access to the repo the bundle is applied to,
287 * a ui object,
288 * a ui object,
288 * a way to retrieve a transaction to add changes to the repo,
289 * a way to retrieve a transaction to add changes to the repo,
289 * a way to record the result of processing each part,
290 * a way to record the result of processing each part,
290 * a way to construct a bundle response when applicable.
291 * a way to construct a bundle response when applicable.
291 """
292 """
292
293
293 def __init__(self, repo, transactiongetter, captureoutput=True):
294 def __init__(self, repo, transactiongetter, captureoutput=True):
294 self.repo = repo
295 self.repo = repo
295 self.ui = repo.ui
296 self.ui = repo.ui
296 self.records = unbundlerecords()
297 self.records = unbundlerecords()
297 self.reply = None
298 self.reply = None
298 self.captureoutput = captureoutput
299 self.captureoutput = captureoutput
299 self.hookargs = {}
300 self.hookargs = {}
300 self._gettransaction = transactiongetter
301 self._gettransaction = transactiongetter
301
302
302 def gettransaction(self):
303 def gettransaction(self):
303 transaction = self._gettransaction()
304 transaction = self._gettransaction()
304
305
305 if self.hookargs:
306 if self.hookargs:
306 # the ones added to the transaction supercede those added
307 # the ones added to the transaction supercede those added
307 # to the operation.
308 # to the operation.
308 self.hookargs.update(transaction.hookargs)
309 self.hookargs.update(transaction.hookargs)
309 transaction.hookargs = self.hookargs
310 transaction.hookargs = self.hookargs
310
311
311 # mark the hookargs as flushed. further attempts to add to
312 # mark the hookargs as flushed. further attempts to add to
312 # hookargs will result in an abort.
313 # hookargs will result in an abort.
313 self.hookargs = None
314 self.hookargs = None
314
315
315 return transaction
316 return transaction
316
317
317 def addhookargs(self, hookargs):
318 def addhookargs(self, hookargs):
318 if self.hookargs is None:
319 if self.hookargs is None:
319 raise error.ProgrammingError('attempted to add hookargs to '
320 raise error.ProgrammingError('attempted to add hookargs to '
320 'operation after transaction started')
321 'operation after transaction started')
321 self.hookargs.update(hookargs)
322 self.hookargs.update(hookargs)
322
323
323 class TransactionUnavailable(RuntimeError):
324 class TransactionUnavailable(RuntimeError):
324 pass
325 pass
325
326
326 def _notransaction():
327 def _notransaction():
327 """default method to get a transaction while processing a bundle
328 """default method to get a transaction while processing a bundle
328
329
329 Raise an exception to highlight the fact that no transaction was expected
330 Raise an exception to highlight the fact that no transaction was expected
330 to be created"""
331 to be created"""
331 raise TransactionUnavailable()
332 raise TransactionUnavailable()
332
333
333 def applybundle(repo, unbundler, tr, source=None, url=None, **kwargs):
334 def applybundle(repo, unbundler, tr, source=None, url=None, **kwargs):
334 # transform me into unbundler.apply() as soon as the freeze is lifted
335 # transform me into unbundler.apply() as soon as the freeze is lifted
335 if isinstance(unbundler, unbundle20):
336 if isinstance(unbundler, unbundle20):
336 tr.hookargs['bundle2'] = '1'
337 tr.hookargs['bundle2'] = '1'
337 if source is not None and 'source' not in tr.hookargs:
338 if source is not None and 'source' not in tr.hookargs:
338 tr.hookargs['source'] = source
339 tr.hookargs['source'] = source
339 if url is not None and 'url' not in tr.hookargs:
340 if url is not None and 'url' not in tr.hookargs:
340 tr.hookargs['url'] = url
341 tr.hookargs['url'] = url
341 return processbundle(repo, unbundler, lambda: tr)
342 return processbundle(repo, unbundler, lambda: tr)
342 else:
343 else:
343 # the transactiongetter won't be used, but we might as well set it
344 # the transactiongetter won't be used, but we might as well set it
344 op = bundleoperation(repo, lambda: tr)
345 op = bundleoperation(repo, lambda: tr)
345 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
346 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
346 return op
347 return op
347
348
348 class partiterator(object):
349 class partiterator(object):
349 def __init__(self, repo, op, unbundler):
350 def __init__(self, repo, op, unbundler):
350 self.repo = repo
351 self.repo = repo
351 self.op = op
352 self.op = op
352 self.unbundler = unbundler
353 self.unbundler = unbundler
353 self.iterator = None
354 self.iterator = None
354 self.count = 0
355 self.count = 0
355 self.current = None
356 self.current = None
356
357
357 def __enter__(self):
358 def __enter__(self):
358 def func():
359 def func():
359 itr = enumerate(self.unbundler.iterparts())
360 itr = enumerate(self.unbundler.iterparts())
360 for count, p in itr:
361 for count, p in itr:
361 self.count = count
362 self.count = count
362 self.current = p
363 self.current = p
363 yield p
364 yield p
364 p.seek(0, 2)
365 p.seek(0, 2)
365 self.current = None
366 self.current = None
366 self.iterator = func()
367 self.iterator = func()
367 return self.iterator
368 return self.iterator
368
369
369 def __exit__(self, type, exc, tb):
370 def __exit__(self, type, exc, tb):
370 if not self.iterator:
371 if not self.iterator:
371 return
372 return
372
373
373 # Only gracefully abort in a normal exception situation. User aborts
374 # Only gracefully abort in a normal exception situation. User aborts
374 # like Ctrl+C throw a KeyboardInterrupt which is not a base Exception,
375 # like Ctrl+C throw a KeyboardInterrupt which is not a base Exception,
375 # and should not gracefully cleanup.
376 # and should not gracefully cleanup.
376 if isinstance(exc, Exception):
377 if isinstance(exc, Exception):
377 # Any exceptions seeking to the end of the bundle at this point are
378 # Any exceptions seeking to the end of the bundle at this point are
378 # almost certainly related to the underlying stream being bad.
379 # almost certainly related to the underlying stream being bad.
379 # And, chances are that the exception we're handling is related to
380 # And, chances are that the exception we're handling is related to
380 # getting in that bad state. So, we swallow the seeking error and
381 # getting in that bad state. So, we swallow the seeking error and
381 # re-raise the original error.
382 # re-raise the original error.
382 seekerror = False
383 seekerror = False
383 try:
384 try:
384 if self.current:
385 if self.current:
385 # consume the part content to not corrupt the stream.
386 # consume the part content to not corrupt the stream.
386 self.current.seek(0, 2)
387 self.current.seek(0, 2)
387
388
388 for part in self.iterator:
389 for part in self.iterator:
389 # consume the bundle content
390 # consume the bundle content
390 part.seek(0, 2)
391 part.seek(0, 2)
391 except Exception:
392 except Exception:
392 seekerror = True
393 seekerror = True
393
394
394 # Small hack to let caller code distinguish exceptions from bundle2
395 # Small hack to let caller code distinguish exceptions from bundle2
395 # processing from processing the old format. This is mostly needed
396 # processing from processing the old format. This is mostly needed
396 # to handle different return codes to unbundle according to the type
397 # to handle different return codes to unbundle according to the type
397 # of bundle. We should probably clean up or drop this return code
398 # of bundle. We should probably clean up or drop this return code
398 # craziness in a future version.
399 # craziness in a future version.
399 exc.duringunbundle2 = True
400 exc.duringunbundle2 = True
400 salvaged = []
401 salvaged = []
401 replycaps = None
402 replycaps = None
402 if self.op.reply is not None:
403 if self.op.reply is not None:
403 salvaged = self.op.reply.salvageoutput()
404 salvaged = self.op.reply.salvageoutput()
404 replycaps = self.op.reply.capabilities
405 replycaps = self.op.reply.capabilities
405 exc._replycaps = replycaps
406 exc._replycaps = replycaps
406 exc._bundle2salvagedoutput = salvaged
407 exc._bundle2salvagedoutput = salvaged
407
408
408 # Re-raising from a variable loses the original stack. So only use
409 # Re-raising from a variable loses the original stack. So only use
409 # that form if we need to.
410 # that form if we need to.
410 if seekerror:
411 if seekerror:
411 raise exc
412 raise exc
412
413
413 self.repo.ui.debug('bundle2-input-bundle: %i parts total\n' %
414 self.repo.ui.debug('bundle2-input-bundle: %i parts total\n' %
414 self.count)
415 self.count)
415
416
416 def processbundle(repo, unbundler, transactiongetter=None, op=None):
417 def processbundle(repo, unbundler, transactiongetter=None, op=None):
417 """This function process a bundle, apply effect to/from a repo
418 """This function process a bundle, apply effect to/from a repo
418
419
419 It iterates over each part then searches for and uses the proper handling
420 It iterates over each part then searches for and uses the proper handling
420 code to process the part. Parts are processed in order.
421 code to process the part. Parts are processed in order.
421
422
422 Unknown Mandatory part will abort the process.
423 Unknown Mandatory part will abort the process.
423
424
424 It is temporarily possible to provide a prebuilt bundleoperation to the
425 It is temporarily possible to provide a prebuilt bundleoperation to the
425 function. This is used to ensure output is properly propagated in case of
426 function. This is used to ensure output is properly propagated in case of
426 an error during the unbundling. This output capturing part will likely be
427 an error during the unbundling. This output capturing part will likely be
427 reworked and this ability will probably go away in the process.
428 reworked and this ability will probably go away in the process.
428 """
429 """
429 if op is None:
430 if op is None:
430 if transactiongetter is None:
431 if transactiongetter is None:
431 transactiongetter = _notransaction
432 transactiongetter = _notransaction
432 op = bundleoperation(repo, transactiongetter)
433 op = bundleoperation(repo, transactiongetter)
433 # todo:
434 # todo:
434 # - replace this is a init function soon.
435 # - replace this is a init function soon.
435 # - exception catching
436 # - exception catching
436 unbundler.params
437 unbundler.params
437 if repo.ui.debugflag:
438 if repo.ui.debugflag:
438 msg = ['bundle2-input-bundle:']
439 msg = ['bundle2-input-bundle:']
439 if unbundler.params:
440 if unbundler.params:
440 msg.append(' %i params' % len(unbundler.params))
441 msg.append(' %i params' % len(unbundler.params))
441 if op._gettransaction is None or op._gettransaction is _notransaction:
442 if op._gettransaction is None or op._gettransaction is _notransaction:
442 msg.append(' no-transaction')
443 msg.append(' no-transaction')
443 else:
444 else:
444 msg.append(' with-transaction')
445 msg.append(' with-transaction')
445 msg.append('\n')
446 msg.append('\n')
446 repo.ui.debug(''.join(msg))
447 repo.ui.debug(''.join(msg))
447
448
448 processparts(repo, op, unbundler)
449 processparts(repo, op, unbundler)
449
450
450 return op
451 return op
451
452
452 def processparts(repo, op, unbundler):
453 def processparts(repo, op, unbundler):
453 with partiterator(repo, op, unbundler) as parts:
454 with partiterator(repo, op, unbundler) as parts:
454 for part in parts:
455 for part in parts:
455 _processpart(op, part)
456 _processpart(op, part)
456
457
457 def _processchangegroup(op, cg, tr, source, url, **kwargs):
458 def _processchangegroup(op, cg, tr, source, url, **kwargs):
458 ret = cg.apply(op.repo, tr, source, url, **kwargs)
459 ret = cg.apply(op.repo, tr, source, url, **kwargs)
459 op.records.add('changegroup', {
460 op.records.add('changegroup', {
460 'return': ret,
461 'return': ret,
461 })
462 })
462 return ret
463 return ret
463
464
464 def _gethandler(op, part):
465 def _gethandler(op, part):
465 status = 'unknown' # used by debug output
466 status = 'unknown' # used by debug output
466 try:
467 try:
467 handler = parthandlermapping.get(part.type)
468 handler = parthandlermapping.get(part.type)
468 if handler is None:
469 if handler is None:
469 status = 'unsupported-type'
470 status = 'unsupported-type'
470 raise error.BundleUnknownFeatureError(parttype=part.type)
471 raise error.BundleUnknownFeatureError(parttype=part.type)
471 indebug(op.ui, 'found a handler for part %s' % part.type)
472 indebug(op.ui, 'found a handler for part %s' % part.type)
472 unknownparams = part.mandatorykeys - handler.params
473 unknownparams = part.mandatorykeys - handler.params
473 if unknownparams:
474 if unknownparams:
474 unknownparams = list(unknownparams)
475 unknownparams = list(unknownparams)
475 unknownparams.sort()
476 unknownparams.sort()
476 status = 'unsupported-params (%s)' % ', '.join(unknownparams)
477 status = 'unsupported-params (%s)' % ', '.join(unknownparams)
477 raise error.BundleUnknownFeatureError(parttype=part.type,
478 raise error.BundleUnknownFeatureError(parttype=part.type,
478 params=unknownparams)
479 params=unknownparams)
479 status = 'supported'
480 status = 'supported'
480 except error.BundleUnknownFeatureError as exc:
481 except error.BundleUnknownFeatureError as exc:
481 if part.mandatory: # mandatory parts
482 if part.mandatory: # mandatory parts
482 raise
483 raise
483 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
484 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
484 return # skip to part processing
485 return # skip to part processing
485 finally:
486 finally:
486 if op.ui.debugflag:
487 if op.ui.debugflag:
487 msg = ['bundle2-input-part: "%s"' % part.type]
488 msg = ['bundle2-input-part: "%s"' % part.type]
488 if not part.mandatory:
489 if not part.mandatory:
489 msg.append(' (advisory)')
490 msg.append(' (advisory)')
490 nbmp = len(part.mandatorykeys)
491 nbmp = len(part.mandatorykeys)
491 nbap = len(part.params) - nbmp
492 nbap = len(part.params) - nbmp
492 if nbmp or nbap:
493 if nbmp or nbap:
493 msg.append(' (params:')
494 msg.append(' (params:')
494 if nbmp:
495 if nbmp:
495 msg.append(' %i mandatory' % nbmp)
496 msg.append(' %i mandatory' % nbmp)
496 if nbap:
497 if nbap:
497 msg.append(' %i advisory' % nbmp)
498 msg.append(' %i advisory' % nbmp)
498 msg.append(')')
499 msg.append(')')
499 msg.append(' %s\n' % status)
500 msg.append(' %s\n' % status)
500 op.ui.debug(''.join(msg))
501 op.ui.debug(''.join(msg))
501
502
502 return handler
503 return handler
503
504
504 def _processpart(op, part):
505 def _processpart(op, part):
505 """process a single part from a bundle
506 """process a single part from a bundle
506
507
507 The part is guaranteed to have been fully consumed when the function exits
508 The part is guaranteed to have been fully consumed when the function exits
508 (even if an exception is raised)."""
509 (even if an exception is raised)."""
509 handler = _gethandler(op, part)
510 handler = _gethandler(op, part)
510 if handler is None:
511 if handler is None:
511 return
512 return
512
513
513 # handler is called outside the above try block so that we don't
514 # handler is called outside the above try block so that we don't
514 # risk catching KeyErrors from anything other than the
515 # risk catching KeyErrors from anything other than the
515 # parthandlermapping lookup (any KeyError raised by handler()
516 # parthandlermapping lookup (any KeyError raised by handler()
516 # itself represents a defect of a different variety).
517 # itself represents a defect of a different variety).
517 output = None
518 output = None
518 if op.captureoutput and op.reply is not None:
519 if op.captureoutput and op.reply is not None:
519 op.ui.pushbuffer(error=True, subproc=True)
520 op.ui.pushbuffer(error=True, subproc=True)
520 output = ''
521 output = ''
521 try:
522 try:
522 handler(op, part)
523 handler(op, part)
523 finally:
524 finally:
524 if output is not None:
525 if output is not None:
525 output = op.ui.popbuffer()
526 output = op.ui.popbuffer()
526 if output:
527 if output:
527 outpart = op.reply.newpart('output', data=output,
528 outpart = op.reply.newpart('output', data=output,
528 mandatory=False)
529 mandatory=False)
529 outpart.addparam(
530 outpart.addparam(
530 'in-reply-to', pycompat.bytestr(part.id), mandatory=False)
531 'in-reply-to', pycompat.bytestr(part.id), mandatory=False)
531
532
532 def decodecaps(blob):
533 def decodecaps(blob):
533 """decode a bundle2 caps bytes blob into a dictionary
534 """decode a bundle2 caps bytes blob into a dictionary
534
535
535 The blob is a list of capabilities (one per line)
536 The blob is a list of capabilities (one per line)
536 Capabilities may have values using a line of the form::
537 Capabilities may have values using a line of the form::
537
538
538 capability=value1,value2,value3
539 capability=value1,value2,value3
539
540
540 The values are always a list."""
541 The values are always a list."""
541 caps = {}
542 caps = {}
542 for line in blob.splitlines():
543 for line in blob.splitlines():
543 if not line:
544 if not line:
544 continue
545 continue
545 if '=' not in line:
546 if '=' not in line:
546 key, vals = line, ()
547 key, vals = line, ()
547 else:
548 else:
548 key, vals = line.split('=', 1)
549 key, vals = line.split('=', 1)
549 vals = vals.split(',')
550 vals = vals.split(',')
550 key = urlreq.unquote(key)
551 key = urlreq.unquote(key)
551 vals = [urlreq.unquote(v) for v in vals]
552 vals = [urlreq.unquote(v) for v in vals]
552 caps[key] = vals
553 caps[key] = vals
553 return caps
554 return caps
554
555
555 def encodecaps(caps):
556 def encodecaps(caps):
556 """encode a bundle2 caps dictionary into a bytes blob"""
557 """encode a bundle2 caps dictionary into a bytes blob"""
557 chunks = []
558 chunks = []
558 for ca in sorted(caps):
559 for ca in sorted(caps):
559 vals = caps[ca]
560 vals = caps[ca]
560 ca = urlreq.quote(ca)
561 ca = urlreq.quote(ca)
561 vals = [urlreq.quote(v) for v in vals]
562 vals = [urlreq.quote(v) for v in vals]
562 if vals:
563 if vals:
563 ca = "%s=%s" % (ca, ','.join(vals))
564 ca = "%s=%s" % (ca, ','.join(vals))
564 chunks.append(ca)
565 chunks.append(ca)
565 return '\n'.join(chunks)
566 return '\n'.join(chunks)
566
567
567 bundletypes = {
568 bundletypes = {
568 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
569 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
569 # since the unification ssh accepts a header but there
570 # since the unification ssh accepts a header but there
570 # is no capability signaling it.
571 # is no capability signaling it.
571 "HG20": (), # special-cased below
572 "HG20": (), # special-cased below
572 "HG10UN": ("HG10UN", 'UN'),
573 "HG10UN": ("HG10UN", 'UN'),
573 "HG10BZ": ("HG10", 'BZ'),
574 "HG10BZ": ("HG10", 'BZ'),
574 "HG10GZ": ("HG10GZ", 'GZ'),
575 "HG10GZ": ("HG10GZ", 'GZ'),
575 }
576 }
576
577
577 # hgweb uses this list to communicate its preferred type
578 # hgweb uses this list to communicate its preferred type
578 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
579 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
579
580
580 class bundle20(object):
581 class bundle20(object):
581 """represent an outgoing bundle2 container
582 """represent an outgoing bundle2 container
582
583
583 Use the `addparam` method to add stream level parameter. and `newpart` to
584 Use the `addparam` method to add stream level parameter. and `newpart` to
584 populate it. Then call `getchunks` to retrieve all the binary chunks of
585 populate it. Then call `getchunks` to retrieve all the binary chunks of
585 data that compose the bundle2 container."""
586 data that compose the bundle2 container."""
586
587
587 _magicstring = 'HG20'
588 _magicstring = 'HG20'
588
589
589 def __init__(self, ui, capabilities=()):
590 def __init__(self, ui, capabilities=()):
590 self.ui = ui
591 self.ui = ui
591 self._params = []
592 self._params = []
592 self._parts = []
593 self._parts = []
593 self.capabilities = dict(capabilities)
594 self.capabilities = dict(capabilities)
594 self._compengine = util.compengines.forbundletype('UN')
595 self._compengine = util.compengines.forbundletype('UN')
595 self._compopts = None
596 self._compopts = None
596
597
597 def setcompression(self, alg, compopts=None):
598 def setcompression(self, alg, compopts=None):
598 """setup core part compression to <alg>"""
599 """setup core part compression to <alg>"""
599 if alg in (None, 'UN'):
600 if alg in (None, 'UN'):
600 return
601 return
601 assert not any(n.lower() == 'compression' for n, v in self._params)
602 assert not any(n.lower() == 'compression' for n, v in self._params)
602 self.addparam('Compression', alg)
603 self.addparam('Compression', alg)
603 self._compengine = util.compengines.forbundletype(alg)
604 self._compengine = util.compengines.forbundletype(alg)
604 self._compopts = compopts
605 self._compopts = compopts
605
606
606 @property
607 @property
607 def nbparts(self):
608 def nbparts(self):
608 """total number of parts added to the bundler"""
609 """total number of parts added to the bundler"""
609 return len(self._parts)
610 return len(self._parts)
610
611
611 # methods used to defines the bundle2 content
612 # methods used to defines the bundle2 content
612 def addparam(self, name, value=None):
613 def addparam(self, name, value=None):
613 """add a stream level parameter"""
614 """add a stream level parameter"""
614 if not name:
615 if not name:
615 raise ValueError(r'empty parameter name')
616 raise ValueError(r'empty parameter name')
616 if name[0:1] not in pycompat.bytestr(string.ascii_letters):
617 if name[0:1] not in pycompat.bytestr(string.ascii_letters):
617 raise ValueError(r'non letter first character: %s' % name)
618 raise ValueError(r'non letter first character: %s' % name)
618 self._params.append((name, value))
619 self._params.append((name, value))
619
620
620 def addpart(self, part):
621 def addpart(self, part):
621 """add a new part to the bundle2 container
622 """add a new part to the bundle2 container
622
623
623 Parts contains the actual applicative payload."""
624 Parts contains the actual applicative payload."""
624 assert part.id is None
625 assert part.id is None
625 part.id = len(self._parts) # very cheap counter
626 part.id = len(self._parts) # very cheap counter
626 self._parts.append(part)
627 self._parts.append(part)
627
628
628 def newpart(self, typeid, *args, **kwargs):
629 def newpart(self, typeid, *args, **kwargs):
629 """create a new part and add it to the containers
630 """create a new part and add it to the containers
630
631
631 As the part is directly added to the containers. For now, this means
632 As the part is directly added to the containers. For now, this means
632 that any failure to properly initialize the part after calling
633 that any failure to properly initialize the part after calling
633 ``newpart`` should result in a failure of the whole bundling process.
634 ``newpart`` should result in a failure of the whole bundling process.
634
635
635 You can still fall back to manually create and add if you need better
636 You can still fall back to manually create and add if you need better
636 control."""
637 control."""
637 part = bundlepart(typeid, *args, **kwargs)
638 part = bundlepart(typeid, *args, **kwargs)
638 self.addpart(part)
639 self.addpart(part)
639 return part
640 return part
640
641
641 # methods used to generate the bundle2 stream
642 # methods used to generate the bundle2 stream
642 def getchunks(self):
643 def getchunks(self):
643 if self.ui.debugflag:
644 if self.ui.debugflag:
644 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
645 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
645 if self._params:
646 if self._params:
646 msg.append(' (%i params)' % len(self._params))
647 msg.append(' (%i params)' % len(self._params))
647 msg.append(' %i parts total\n' % len(self._parts))
648 msg.append(' %i parts total\n' % len(self._parts))
648 self.ui.debug(''.join(msg))
649 self.ui.debug(''.join(msg))
649 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
650 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
650 yield self._magicstring
651 yield self._magicstring
651 param = self._paramchunk()
652 param = self._paramchunk()
652 outdebug(self.ui, 'bundle parameter: %s' % param)
653 outdebug(self.ui, 'bundle parameter: %s' % param)
653 yield _pack(_fstreamparamsize, len(param))
654 yield _pack(_fstreamparamsize, len(param))
654 if param:
655 if param:
655 yield param
656 yield param
656 for chunk in self._compengine.compressstream(self._getcorechunk(),
657 for chunk in self._compengine.compressstream(self._getcorechunk(),
657 self._compopts):
658 self._compopts):
658 yield chunk
659 yield chunk
659
660
660 def _paramchunk(self):
661 def _paramchunk(self):
661 """return a encoded version of all stream parameters"""
662 """return a encoded version of all stream parameters"""
662 blocks = []
663 blocks = []
663 for par, value in self._params:
664 for par, value in self._params:
664 par = urlreq.quote(par)
665 par = urlreq.quote(par)
665 if value is not None:
666 if value is not None:
666 value = urlreq.quote(value)
667 value = urlreq.quote(value)
667 par = '%s=%s' % (par, value)
668 par = '%s=%s' % (par, value)
668 blocks.append(par)
669 blocks.append(par)
669 return ' '.join(blocks)
670 return ' '.join(blocks)
670
671
671 def _getcorechunk(self):
672 def _getcorechunk(self):
672 """yield chunk for the core part of the bundle
673 """yield chunk for the core part of the bundle
673
674
674 (all but headers and parameters)"""
675 (all but headers and parameters)"""
675 outdebug(self.ui, 'start of parts')
676 outdebug(self.ui, 'start of parts')
676 for part in self._parts:
677 for part in self._parts:
677 outdebug(self.ui, 'bundle part: "%s"' % part.type)
678 outdebug(self.ui, 'bundle part: "%s"' % part.type)
678 for chunk in part.getchunks(ui=self.ui):
679 for chunk in part.getchunks(ui=self.ui):
679 yield chunk
680 yield chunk
680 outdebug(self.ui, 'end of bundle')
681 outdebug(self.ui, 'end of bundle')
681 yield _pack(_fpartheadersize, 0)
682 yield _pack(_fpartheadersize, 0)
682
683
683
684
684 def salvageoutput(self):
685 def salvageoutput(self):
685 """return a list with a copy of all output parts in the bundle
686 """return a list with a copy of all output parts in the bundle
686
687
687 This is meant to be used during error handling to make sure we preserve
688 This is meant to be used during error handling to make sure we preserve
688 server output"""
689 server output"""
689 salvaged = []
690 salvaged = []
690 for part in self._parts:
691 for part in self._parts:
691 if part.type.startswith('output'):
692 if part.type.startswith('output'):
692 salvaged.append(part.copy())
693 salvaged.append(part.copy())
693 return salvaged
694 return salvaged
694
695
695
696
696 class unpackermixin(object):
697 class unpackermixin(object):
697 """A mixin to extract bytes and struct data from a stream"""
698 """A mixin to extract bytes and struct data from a stream"""
698
699
699 def __init__(self, fp):
700 def __init__(self, fp):
700 self._fp = fp
701 self._fp = fp
701
702
702 def _unpack(self, format):
703 def _unpack(self, format):
703 """unpack this struct format from the stream
704 """unpack this struct format from the stream
704
705
705 This method is meant for internal usage by the bundle2 protocol only.
706 This method is meant for internal usage by the bundle2 protocol only.
706 They directly manipulate the low level stream including bundle2 level
707 They directly manipulate the low level stream including bundle2 level
707 instruction.
708 instruction.
708
709
709 Do not use it to implement higher-level logic or methods."""
710 Do not use it to implement higher-level logic or methods."""
710 data = self._readexact(struct.calcsize(format))
711 data = self._readexact(struct.calcsize(format))
711 return _unpack(format, data)
712 return _unpack(format, data)
712
713
713 def _readexact(self, size):
714 def _readexact(self, size):
714 """read exactly <size> bytes from the stream
715 """read exactly <size> bytes from the stream
715
716
716 This method is meant for internal usage by the bundle2 protocol only.
717 This method is meant for internal usage by the bundle2 protocol only.
717 They directly manipulate the low level stream including bundle2 level
718 They directly manipulate the low level stream including bundle2 level
718 instruction.
719 instruction.
719
720
720 Do not use it to implement higher-level logic or methods."""
721 Do not use it to implement higher-level logic or methods."""
721 return changegroup.readexactly(self._fp, size)
722 return changegroup.readexactly(self._fp, size)
722
723
723 def getunbundler(ui, fp, magicstring=None):
724 def getunbundler(ui, fp, magicstring=None):
724 """return a valid unbundler object for a given magicstring"""
725 """return a valid unbundler object for a given magicstring"""
725 if magicstring is None:
726 if magicstring is None:
726 magicstring = changegroup.readexactly(fp, 4)
727 magicstring = changegroup.readexactly(fp, 4)
727 magic, version = magicstring[0:2], magicstring[2:4]
728 magic, version = magicstring[0:2], magicstring[2:4]
728 if magic != 'HG':
729 if magic != 'HG':
729 ui.debug(
730 ui.debug(
730 "error: invalid magic: %r (version %r), should be 'HG'\n"
731 "error: invalid magic: %r (version %r), should be 'HG'\n"
731 % (magic, version))
732 % (magic, version))
732 raise error.Abort(_('not a Mercurial bundle'))
733 raise error.Abort(_('not a Mercurial bundle'))
733 unbundlerclass = formatmap.get(version)
734 unbundlerclass = formatmap.get(version)
734 if unbundlerclass is None:
735 if unbundlerclass is None:
735 raise error.Abort(_('unknown bundle version %s') % version)
736 raise error.Abort(_('unknown bundle version %s') % version)
736 unbundler = unbundlerclass(ui, fp)
737 unbundler = unbundlerclass(ui, fp)
737 indebug(ui, 'start processing of %s stream' % magicstring)
738 indebug(ui, 'start processing of %s stream' % magicstring)
738 return unbundler
739 return unbundler
739
740
740 class unbundle20(unpackermixin):
741 class unbundle20(unpackermixin):
741 """interpret a bundle2 stream
742 """interpret a bundle2 stream
742
743
743 This class is fed with a binary stream and yields parts through its
744 This class is fed with a binary stream and yields parts through its
744 `iterparts` methods."""
745 `iterparts` methods."""
745
746
746 _magicstring = 'HG20'
747 _magicstring = 'HG20'
747
748
748 def __init__(self, ui, fp):
749 def __init__(self, ui, fp):
749 """If header is specified, we do not read it out of the stream."""
750 """If header is specified, we do not read it out of the stream."""
750 self.ui = ui
751 self.ui = ui
751 self._compengine = util.compengines.forbundletype('UN')
752 self._compengine = util.compengines.forbundletype('UN')
752 self._compressed = None
753 self._compressed = None
753 super(unbundle20, self).__init__(fp)
754 super(unbundle20, self).__init__(fp)
754
755
755 @util.propertycache
756 @util.propertycache
756 def params(self):
757 def params(self):
757 """dictionary of stream level parameters"""
758 """dictionary of stream level parameters"""
758 indebug(self.ui, 'reading bundle2 stream parameters')
759 indebug(self.ui, 'reading bundle2 stream parameters')
759 params = {}
760 params = {}
760 paramssize = self._unpack(_fstreamparamsize)[0]
761 paramssize = self._unpack(_fstreamparamsize)[0]
761 if paramssize < 0:
762 if paramssize < 0:
762 raise error.BundleValueError('negative bundle param size: %i'
763 raise error.BundleValueError('negative bundle param size: %i'
763 % paramssize)
764 % paramssize)
764 if paramssize:
765 if paramssize:
765 params = self._readexact(paramssize)
766 params = self._readexact(paramssize)
766 params = self._processallparams(params)
767 params = self._processallparams(params)
767 return params
768 return params
768
769
769 def _processallparams(self, paramsblock):
770 def _processallparams(self, paramsblock):
770 """"""
771 """"""
771 params = util.sortdict()
772 params = util.sortdict()
772 for p in paramsblock.split(' '):
773 for p in paramsblock.split(' '):
773 p = p.split('=', 1)
774 p = p.split('=', 1)
774 p = [urlreq.unquote(i) for i in p]
775 p = [urlreq.unquote(i) for i in p]
775 if len(p) < 2:
776 if len(p) < 2:
776 p.append(None)
777 p.append(None)
777 self._processparam(*p)
778 self._processparam(*p)
778 params[p[0]] = p[1]
779 params[p[0]] = p[1]
779 return params
780 return params
780
781
781
782
782 def _processparam(self, name, value):
783 def _processparam(self, name, value):
783 """process a parameter, applying its effect if needed
784 """process a parameter, applying its effect if needed
784
785
785 Parameter starting with a lower case letter are advisory and will be
786 Parameter starting with a lower case letter are advisory and will be
786 ignored when unknown. Those starting with an upper case letter are
787 ignored when unknown. Those starting with an upper case letter are
787 mandatory and will this function will raise a KeyError when unknown.
788 mandatory and will this function will raise a KeyError when unknown.
788
789
789 Note: no option are currently supported. Any input will be either
790 Note: no option are currently supported. Any input will be either
790 ignored or failing.
791 ignored or failing.
791 """
792 """
792 if not name:
793 if not name:
793 raise ValueError(r'empty parameter name')
794 raise ValueError(r'empty parameter name')
794 if name[0:1] not in pycompat.bytestr(string.ascii_letters):
795 if name[0:1] not in pycompat.bytestr(string.ascii_letters):
795 raise ValueError(r'non letter first character: %s' % name)
796 raise ValueError(r'non letter first character: %s' % name)
796 try:
797 try:
797 handler = b2streamparamsmap[name.lower()]
798 handler = b2streamparamsmap[name.lower()]
798 except KeyError:
799 except KeyError:
799 if name[0:1].islower():
800 if name[0:1].islower():
800 indebug(self.ui, "ignoring unknown parameter %s" % name)
801 indebug(self.ui, "ignoring unknown parameter %s" % name)
801 else:
802 else:
802 raise error.BundleUnknownFeatureError(params=(name,))
803 raise error.BundleUnknownFeatureError(params=(name,))
803 else:
804 else:
804 handler(self, name, value)
805 handler(self, name, value)
805
806
806 def _forwardchunks(self):
807 def _forwardchunks(self):
807 """utility to transfer a bundle2 as binary
808 """utility to transfer a bundle2 as binary
808
809
809 This is made necessary by the fact the 'getbundle' command over 'ssh'
810 This is made necessary by the fact the 'getbundle' command over 'ssh'
810 have no way to know then the reply end, relying on the bundle to be
811 have no way to know then the reply end, relying on the bundle to be
811 interpreted to know its end. This is terrible and we are sorry, but we
812 interpreted to know its end. This is terrible and we are sorry, but we
812 needed to move forward to get general delta enabled.
813 needed to move forward to get general delta enabled.
813 """
814 """
814 yield self._magicstring
815 yield self._magicstring
815 assert 'params' not in vars(self)
816 assert 'params' not in vars(self)
816 paramssize = self._unpack(_fstreamparamsize)[0]
817 paramssize = self._unpack(_fstreamparamsize)[0]
817 if paramssize < 0:
818 if paramssize < 0:
818 raise error.BundleValueError('negative bundle param size: %i'
819 raise error.BundleValueError('negative bundle param size: %i'
819 % paramssize)
820 % paramssize)
820 yield _pack(_fstreamparamsize, paramssize)
821 yield _pack(_fstreamparamsize, paramssize)
821 if paramssize:
822 if paramssize:
822 params = self._readexact(paramssize)
823 params = self._readexact(paramssize)
823 self._processallparams(params)
824 self._processallparams(params)
824 yield params
825 yield params
825 assert self._compengine.bundletype == 'UN'
826 assert self._compengine.bundletype == 'UN'
826 # From there, payload might need to be decompressed
827 # From there, payload might need to be decompressed
827 self._fp = self._compengine.decompressorreader(self._fp)
828 self._fp = self._compengine.decompressorreader(self._fp)
828 emptycount = 0
829 emptycount = 0
829 while emptycount < 2:
830 while emptycount < 2:
830 # so we can brainlessly loop
831 # so we can brainlessly loop
831 assert _fpartheadersize == _fpayloadsize
832 assert _fpartheadersize == _fpayloadsize
832 size = self._unpack(_fpartheadersize)[0]
833 size = self._unpack(_fpartheadersize)[0]
833 yield _pack(_fpartheadersize, size)
834 yield _pack(_fpartheadersize, size)
834 if size:
835 if size:
835 emptycount = 0
836 emptycount = 0
836 else:
837 else:
837 emptycount += 1
838 emptycount += 1
838 continue
839 continue
839 if size == flaginterrupt:
840 if size == flaginterrupt:
840 continue
841 continue
841 elif size < 0:
842 elif size < 0:
842 raise error.BundleValueError('negative chunk size: %i')
843 raise error.BundleValueError('negative chunk size: %i')
843 yield self._readexact(size)
844 yield self._readexact(size)
844
845
845
846
846 def iterparts(self):
847 def iterparts(self):
847 """yield all parts contained in the stream"""
848 """yield all parts contained in the stream"""
848 # make sure param have been loaded
849 # make sure param have been loaded
849 self.params
850 self.params
850 # From there, payload need to be decompressed
851 # From there, payload need to be decompressed
851 self._fp = self._compengine.decompressorreader(self._fp)
852 self._fp = self._compengine.decompressorreader(self._fp)
852 indebug(self.ui, 'start extraction of bundle2 parts')
853 indebug(self.ui, 'start extraction of bundle2 parts')
853 headerblock = self._readpartheader()
854 headerblock = self._readpartheader()
854 while headerblock is not None:
855 while headerblock is not None:
855 part = unbundlepart(self.ui, headerblock, self._fp)
856 part = unbundlepart(self.ui, headerblock, self._fp)
856 yield part
857 yield part
857 # Seek to the end of the part to force it's consumption so the next
858 # Seek to the end of the part to force it's consumption so the next
858 # part can be read. But then seek back to the beginning so the
859 # part can be read. But then seek back to the beginning so the
859 # code consuming this generator has a part that starts at 0.
860 # code consuming this generator has a part that starts at 0.
860 part.seek(0, 2)
861 part.seek(0, 2)
861 part.seek(0)
862 part.seek(0)
862 headerblock = self._readpartheader()
863 headerblock = self._readpartheader()
863 indebug(self.ui, 'end of bundle2 stream')
864 indebug(self.ui, 'end of bundle2 stream')
864
865
865 def _readpartheader(self):
866 def _readpartheader(self):
866 """reads a part header size and return the bytes blob
867 """reads a part header size and return the bytes blob
867
868
868 returns None if empty"""
869 returns None if empty"""
869 headersize = self._unpack(_fpartheadersize)[0]
870 headersize = self._unpack(_fpartheadersize)[0]
870 if headersize < 0:
871 if headersize < 0:
871 raise error.BundleValueError('negative part header size: %i'
872 raise error.BundleValueError('negative part header size: %i'
872 % headersize)
873 % headersize)
873 indebug(self.ui, 'part header size: %i' % headersize)
874 indebug(self.ui, 'part header size: %i' % headersize)
874 if headersize:
875 if headersize:
875 return self._readexact(headersize)
876 return self._readexact(headersize)
876 return None
877 return None
877
878
878 def compressed(self):
879 def compressed(self):
879 self.params # load params
880 self.params # load params
880 return self._compressed
881 return self._compressed
881
882
882 def close(self):
883 def close(self):
883 """close underlying file"""
884 """close underlying file"""
884 if util.safehasattr(self._fp, 'close'):
885 if util.safehasattr(self._fp, 'close'):
885 return self._fp.close()
886 return self._fp.close()
886
887
887 formatmap = {'20': unbundle20}
888 formatmap = {'20': unbundle20}
888
889
889 b2streamparamsmap = {}
890 b2streamparamsmap = {}
890
891
891 def b2streamparamhandler(name):
892 def b2streamparamhandler(name):
892 """register a handler for a stream level parameter"""
893 """register a handler for a stream level parameter"""
893 def decorator(func):
894 def decorator(func):
894 assert name not in formatmap
895 assert name not in formatmap
895 b2streamparamsmap[name] = func
896 b2streamparamsmap[name] = func
896 return func
897 return func
897 return decorator
898 return decorator
898
899
899 @b2streamparamhandler('compression')
900 @b2streamparamhandler('compression')
900 def processcompression(unbundler, param, value):
901 def processcompression(unbundler, param, value):
901 """read compression parameter and install payload decompression"""
902 """read compression parameter and install payload decompression"""
902 if value not in util.compengines.supportedbundletypes:
903 if value not in util.compengines.supportedbundletypes:
903 raise error.BundleUnknownFeatureError(params=(param,),
904 raise error.BundleUnknownFeatureError(params=(param,),
904 values=(value,))
905 values=(value,))
905 unbundler._compengine = util.compengines.forbundletype(value)
906 unbundler._compengine = util.compengines.forbundletype(value)
906 if value is not None:
907 if value is not None:
907 unbundler._compressed = True
908 unbundler._compressed = True
908
909
909 class bundlepart(object):
910 class bundlepart(object):
910 """A bundle2 part contains application level payload
911 """A bundle2 part contains application level payload
911
912
912 The part `type` is used to route the part to the application level
913 The part `type` is used to route the part to the application level
913 handler.
914 handler.
914
915
915 The part payload is contained in ``part.data``. It could be raw bytes or a
916 The part payload is contained in ``part.data``. It could be raw bytes or a
916 generator of byte chunks.
917 generator of byte chunks.
917
918
918 You can add parameters to the part using the ``addparam`` method.
919 You can add parameters to the part using the ``addparam`` method.
919 Parameters can be either mandatory (default) or advisory. Remote side
920 Parameters can be either mandatory (default) or advisory. Remote side
920 should be able to safely ignore the advisory ones.
921 should be able to safely ignore the advisory ones.
921
922
922 Both data and parameters cannot be modified after the generation has begun.
923 Both data and parameters cannot be modified after the generation has begun.
923 """
924 """
924
925
925 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
926 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
926 data='', mandatory=True):
927 data='', mandatory=True):
927 validateparttype(parttype)
928 validateparttype(parttype)
928 self.id = None
929 self.id = None
929 self.type = parttype
930 self.type = parttype
930 self._data = data
931 self._data = data
931 self._mandatoryparams = list(mandatoryparams)
932 self._mandatoryparams = list(mandatoryparams)
932 self._advisoryparams = list(advisoryparams)
933 self._advisoryparams = list(advisoryparams)
933 # checking for duplicated entries
934 # checking for duplicated entries
934 self._seenparams = set()
935 self._seenparams = set()
935 for pname, __ in self._mandatoryparams + self._advisoryparams:
936 for pname, __ in self._mandatoryparams + self._advisoryparams:
936 if pname in self._seenparams:
937 if pname in self._seenparams:
937 raise error.ProgrammingError('duplicated params: %s' % pname)
938 raise error.ProgrammingError('duplicated params: %s' % pname)
938 self._seenparams.add(pname)
939 self._seenparams.add(pname)
939 # status of the part's generation:
940 # status of the part's generation:
940 # - None: not started,
941 # - None: not started,
941 # - False: currently generated,
942 # - False: currently generated,
942 # - True: generation done.
943 # - True: generation done.
943 self._generated = None
944 self._generated = None
944 self.mandatory = mandatory
945 self.mandatory = mandatory
945
946
946 def __repr__(self):
947 def __repr__(self):
947 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
948 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
948 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
949 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
949 % (cls, id(self), self.id, self.type, self.mandatory))
950 % (cls, id(self), self.id, self.type, self.mandatory))
950
951
951 def copy(self):
952 def copy(self):
952 """return a copy of the part
953 """return a copy of the part
953
954
954 The new part have the very same content but no partid assigned yet.
955 The new part have the very same content but no partid assigned yet.
955 Parts with generated data cannot be copied."""
956 Parts with generated data cannot be copied."""
956 assert not util.safehasattr(self.data, 'next')
957 assert not util.safehasattr(self.data, 'next')
957 return self.__class__(self.type, self._mandatoryparams,
958 return self.__class__(self.type, self._mandatoryparams,
958 self._advisoryparams, self._data, self.mandatory)
959 self._advisoryparams, self._data, self.mandatory)
959
960
960 # methods used to defines the part content
961 # methods used to defines the part content
961 @property
962 @property
962 def data(self):
963 def data(self):
963 return self._data
964 return self._data
964
965
965 @data.setter
966 @data.setter
966 def data(self, data):
967 def data(self, data):
967 if self._generated is not None:
968 if self._generated is not None:
968 raise error.ReadOnlyPartError('part is being generated')
969 raise error.ReadOnlyPartError('part is being generated')
969 self._data = data
970 self._data = data
970
971
971 @property
972 @property
972 def mandatoryparams(self):
973 def mandatoryparams(self):
973 # make it an immutable tuple to force people through ``addparam``
974 # make it an immutable tuple to force people through ``addparam``
974 return tuple(self._mandatoryparams)
975 return tuple(self._mandatoryparams)
975
976
976 @property
977 @property
977 def advisoryparams(self):
978 def advisoryparams(self):
978 # make it an immutable tuple to force people through ``addparam``
979 # make it an immutable tuple to force people through ``addparam``
979 return tuple(self._advisoryparams)
980 return tuple(self._advisoryparams)
980
981
981 def addparam(self, name, value='', mandatory=True):
982 def addparam(self, name, value='', mandatory=True):
982 """add a parameter to the part
983 """add a parameter to the part
983
984
984 If 'mandatory' is set to True, the remote handler must claim support
985 If 'mandatory' is set to True, the remote handler must claim support
985 for this parameter or the unbundling will be aborted.
986 for this parameter or the unbundling will be aborted.
986
987
987 The 'name' and 'value' cannot exceed 255 bytes each.
988 The 'name' and 'value' cannot exceed 255 bytes each.
988 """
989 """
989 if self._generated is not None:
990 if self._generated is not None:
990 raise error.ReadOnlyPartError('part is being generated')
991 raise error.ReadOnlyPartError('part is being generated')
991 if name in self._seenparams:
992 if name in self._seenparams:
992 raise ValueError('duplicated params: %s' % name)
993 raise ValueError('duplicated params: %s' % name)
993 self._seenparams.add(name)
994 self._seenparams.add(name)
994 params = self._advisoryparams
995 params = self._advisoryparams
995 if mandatory:
996 if mandatory:
996 params = self._mandatoryparams
997 params = self._mandatoryparams
997 params.append((name, value))
998 params.append((name, value))
998
999
999 # methods used to generates the bundle2 stream
1000 # methods used to generates the bundle2 stream
1000 def getchunks(self, ui):
1001 def getchunks(self, ui):
1001 if self._generated is not None:
1002 if self._generated is not None:
1002 raise error.ProgrammingError('part can only be consumed once')
1003 raise error.ProgrammingError('part can only be consumed once')
1003 self._generated = False
1004 self._generated = False
1004
1005
1005 if ui.debugflag:
1006 if ui.debugflag:
1006 msg = ['bundle2-output-part: "%s"' % self.type]
1007 msg = ['bundle2-output-part: "%s"' % self.type]
1007 if not self.mandatory:
1008 if not self.mandatory:
1008 msg.append(' (advisory)')
1009 msg.append(' (advisory)')
1009 nbmp = len(self.mandatoryparams)
1010 nbmp = len(self.mandatoryparams)
1010 nbap = len(self.advisoryparams)
1011 nbap = len(self.advisoryparams)
1011 if nbmp or nbap:
1012 if nbmp or nbap:
1012 msg.append(' (params:')
1013 msg.append(' (params:')
1013 if nbmp:
1014 if nbmp:
1014 msg.append(' %i mandatory' % nbmp)
1015 msg.append(' %i mandatory' % nbmp)
1015 if nbap:
1016 if nbap:
1016 msg.append(' %i advisory' % nbmp)
1017 msg.append(' %i advisory' % nbmp)
1017 msg.append(')')
1018 msg.append(')')
1018 if not self.data:
1019 if not self.data:
1019 msg.append(' empty payload')
1020 msg.append(' empty payload')
1020 elif (util.safehasattr(self.data, 'next')
1021 elif (util.safehasattr(self.data, 'next')
1021 or util.safehasattr(self.data, '__next__')):
1022 or util.safehasattr(self.data, '__next__')):
1022 msg.append(' streamed payload')
1023 msg.append(' streamed payload')
1023 else:
1024 else:
1024 msg.append(' %i bytes payload' % len(self.data))
1025 msg.append(' %i bytes payload' % len(self.data))
1025 msg.append('\n')
1026 msg.append('\n')
1026 ui.debug(''.join(msg))
1027 ui.debug(''.join(msg))
1027
1028
1028 #### header
1029 #### header
1029 if self.mandatory:
1030 if self.mandatory:
1030 parttype = self.type.upper()
1031 parttype = self.type.upper()
1031 else:
1032 else:
1032 parttype = self.type.lower()
1033 parttype = self.type.lower()
1033 outdebug(ui, 'part %s: "%s"' % (pycompat.bytestr(self.id), parttype))
1034 outdebug(ui, 'part %s: "%s"' % (pycompat.bytestr(self.id), parttype))
1034 ## parttype
1035 ## parttype
1035 header = [_pack(_fparttypesize, len(parttype)),
1036 header = [_pack(_fparttypesize, len(parttype)),
1036 parttype, _pack(_fpartid, self.id),
1037 parttype, _pack(_fpartid, self.id),
1037 ]
1038 ]
1038 ## parameters
1039 ## parameters
1039 # count
1040 # count
1040 manpar = self.mandatoryparams
1041 manpar = self.mandatoryparams
1041 advpar = self.advisoryparams
1042 advpar = self.advisoryparams
1042 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
1043 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
1043 # size
1044 # size
1044 parsizes = []
1045 parsizes = []
1045 for key, value in manpar:
1046 for key, value in manpar:
1046 parsizes.append(len(key))
1047 parsizes.append(len(key))
1047 parsizes.append(len(value))
1048 parsizes.append(len(value))
1048 for key, value in advpar:
1049 for key, value in advpar:
1049 parsizes.append(len(key))
1050 parsizes.append(len(key))
1050 parsizes.append(len(value))
1051 parsizes.append(len(value))
1051 paramsizes = _pack(_makefpartparamsizes(len(parsizes) // 2), *parsizes)
1052 paramsizes = _pack(_makefpartparamsizes(len(parsizes) // 2), *parsizes)
1052 header.append(paramsizes)
1053 header.append(paramsizes)
1053 # key, value
1054 # key, value
1054 for key, value in manpar:
1055 for key, value in manpar:
1055 header.append(key)
1056 header.append(key)
1056 header.append(value)
1057 header.append(value)
1057 for key, value in advpar:
1058 for key, value in advpar:
1058 header.append(key)
1059 header.append(key)
1059 header.append(value)
1060 header.append(value)
1060 ## finalize header
1061 ## finalize header
1061 try:
1062 try:
1062 headerchunk = ''.join(header)
1063 headerchunk = ''.join(header)
1063 except TypeError:
1064 except TypeError:
1064 raise TypeError(r'Found a non-bytes trying to '
1065 raise TypeError(r'Found a non-bytes trying to '
1065 r'build bundle part header: %r' % header)
1066 r'build bundle part header: %r' % header)
1066 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
1067 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
1067 yield _pack(_fpartheadersize, len(headerchunk))
1068 yield _pack(_fpartheadersize, len(headerchunk))
1068 yield headerchunk
1069 yield headerchunk
1069 ## payload
1070 ## payload
1070 try:
1071 try:
1071 for chunk in self._payloadchunks():
1072 for chunk in self._payloadchunks():
1072 outdebug(ui, 'payload chunk size: %i' % len(chunk))
1073 outdebug(ui, 'payload chunk size: %i' % len(chunk))
1073 yield _pack(_fpayloadsize, len(chunk))
1074 yield _pack(_fpayloadsize, len(chunk))
1074 yield chunk
1075 yield chunk
1075 except GeneratorExit:
1076 except GeneratorExit:
1076 # GeneratorExit means that nobody is listening for our
1077 # GeneratorExit means that nobody is listening for our
1077 # results anyway, so just bail quickly rather than trying
1078 # results anyway, so just bail quickly rather than trying
1078 # to produce an error part.
1079 # to produce an error part.
1079 ui.debug('bundle2-generatorexit\n')
1080 ui.debug('bundle2-generatorexit\n')
1080 raise
1081 raise
1081 except BaseException as exc:
1082 except BaseException as exc:
1082 bexc = util.forcebytestr(exc)
1083 bexc = util.forcebytestr(exc)
1083 # backup exception data for later
1084 # backup exception data for later
1084 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
1085 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
1085 % bexc)
1086 % bexc)
1086 tb = sys.exc_info()[2]
1087 tb = sys.exc_info()[2]
1087 msg = 'unexpected error: %s' % bexc
1088 msg = 'unexpected error: %s' % bexc
1088 interpart = bundlepart('error:abort', [('message', msg)],
1089 interpart = bundlepart('error:abort', [('message', msg)],
1089 mandatory=False)
1090 mandatory=False)
1090 interpart.id = 0
1091 interpart.id = 0
1091 yield _pack(_fpayloadsize, -1)
1092 yield _pack(_fpayloadsize, -1)
1092 for chunk in interpart.getchunks(ui=ui):
1093 for chunk in interpart.getchunks(ui=ui):
1093 yield chunk
1094 yield chunk
1094 outdebug(ui, 'closing payload chunk')
1095 outdebug(ui, 'closing payload chunk')
1095 # abort current part payload
1096 # abort current part payload
1096 yield _pack(_fpayloadsize, 0)
1097 yield _pack(_fpayloadsize, 0)
1097 pycompat.raisewithtb(exc, tb)
1098 pycompat.raisewithtb(exc, tb)
1098 # end of payload
1099 # end of payload
1099 outdebug(ui, 'closing payload chunk')
1100 outdebug(ui, 'closing payload chunk')
1100 yield _pack(_fpayloadsize, 0)
1101 yield _pack(_fpayloadsize, 0)
1101 self._generated = True
1102 self._generated = True
1102
1103
1103 def _payloadchunks(self):
1104 def _payloadchunks(self):
1104 """yield chunks of a the part payload
1105 """yield chunks of a the part payload
1105
1106
1106 Exists to handle the different methods to provide data to a part."""
1107 Exists to handle the different methods to provide data to a part."""
1107 # we only support fixed size data now.
1108 # we only support fixed size data now.
1108 # This will be improved in the future.
1109 # This will be improved in the future.
1109 if (util.safehasattr(self.data, 'next')
1110 if (util.safehasattr(self.data, 'next')
1110 or util.safehasattr(self.data, '__next__')):
1111 or util.safehasattr(self.data, '__next__')):
1111 buff = util.chunkbuffer(self.data)
1112 buff = util.chunkbuffer(self.data)
1112 chunk = buff.read(preferedchunksize)
1113 chunk = buff.read(preferedchunksize)
1113 while chunk:
1114 while chunk:
1114 yield chunk
1115 yield chunk
1115 chunk = buff.read(preferedchunksize)
1116 chunk = buff.read(preferedchunksize)
1116 elif len(self.data):
1117 elif len(self.data):
1117 yield self.data
1118 yield self.data
1118
1119
1119
1120
1120 flaginterrupt = -1
1121 flaginterrupt = -1
1121
1122
1122 class interrupthandler(unpackermixin):
1123 class interrupthandler(unpackermixin):
1123 """read one part and process it with restricted capability
1124 """read one part and process it with restricted capability
1124
1125
1125 This allows to transmit exception raised on the producer size during part
1126 This allows to transmit exception raised on the producer size during part
1126 iteration while the consumer is reading a part.
1127 iteration while the consumer is reading a part.
1127
1128
1128 Part processed in this manner only have access to a ui object,"""
1129 Part processed in this manner only have access to a ui object,"""
1129
1130
1130 def __init__(self, ui, fp):
1131 def __init__(self, ui, fp):
1131 super(interrupthandler, self).__init__(fp)
1132 super(interrupthandler, self).__init__(fp)
1132 self.ui = ui
1133 self.ui = ui
1133
1134
1134 def _readpartheader(self):
1135 def _readpartheader(self):
1135 """reads a part header size and return the bytes blob
1136 """reads a part header size and return the bytes blob
1136
1137
1137 returns None if empty"""
1138 returns None if empty"""
1138 headersize = self._unpack(_fpartheadersize)[0]
1139 headersize = self._unpack(_fpartheadersize)[0]
1139 if headersize < 0:
1140 if headersize < 0:
1140 raise error.BundleValueError('negative part header size: %i'
1141 raise error.BundleValueError('negative part header size: %i'
1141 % headersize)
1142 % headersize)
1142 indebug(self.ui, 'part header size: %i\n' % headersize)
1143 indebug(self.ui, 'part header size: %i\n' % headersize)
1143 if headersize:
1144 if headersize:
1144 return self._readexact(headersize)
1145 return self._readexact(headersize)
1145 return None
1146 return None
1146
1147
1147 def __call__(self):
1148 def __call__(self):
1148
1149
1149 self.ui.debug('bundle2-input-stream-interrupt:'
1150 self.ui.debug('bundle2-input-stream-interrupt:'
1150 ' opening out of band context\n')
1151 ' opening out of band context\n')
1151 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1152 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1152 headerblock = self._readpartheader()
1153 headerblock = self._readpartheader()
1153 if headerblock is None:
1154 if headerblock is None:
1154 indebug(self.ui, 'no part found during interruption.')
1155 indebug(self.ui, 'no part found during interruption.')
1155 return
1156 return
1156 part = unbundlepart(self.ui, headerblock, self._fp)
1157 part = unbundlepart(self.ui, headerblock, self._fp)
1157 op = interruptoperation(self.ui)
1158 op = interruptoperation(self.ui)
1158 hardabort = False
1159 hardabort = False
1159 try:
1160 try:
1160 _processpart(op, part)
1161 _processpart(op, part)
1161 except (SystemExit, KeyboardInterrupt):
1162 except (SystemExit, KeyboardInterrupt):
1162 hardabort = True
1163 hardabort = True
1163 raise
1164 raise
1164 finally:
1165 finally:
1165 if not hardabort:
1166 if not hardabort:
1166 part.seek(0, 2)
1167 part.seek(0, 2)
1167 self.ui.debug('bundle2-input-stream-interrupt:'
1168 self.ui.debug('bundle2-input-stream-interrupt:'
1168 ' closing out of band context\n')
1169 ' closing out of band context\n')
1169
1170
1170 class interruptoperation(object):
1171 class interruptoperation(object):
1171 """A limited operation to be use by part handler during interruption
1172 """A limited operation to be use by part handler during interruption
1172
1173
1173 It only have access to an ui object.
1174 It only have access to an ui object.
1174 """
1175 """
1175
1176
1176 def __init__(self, ui):
1177 def __init__(self, ui):
1177 self.ui = ui
1178 self.ui = ui
1178 self.reply = None
1179 self.reply = None
1179 self.captureoutput = False
1180 self.captureoutput = False
1180
1181
1181 @property
1182 @property
1182 def repo(self):
1183 def repo(self):
1183 raise error.ProgrammingError('no repo access from stream interruption')
1184 raise error.ProgrammingError('no repo access from stream interruption')
1184
1185
1185 def gettransaction(self):
1186 def gettransaction(self):
1186 raise TransactionUnavailable('no repo access from stream interruption')
1187 raise TransactionUnavailable('no repo access from stream interruption')
1187
1188
1188 class unbundlepart(unpackermixin):
1189 class unbundlepart(unpackermixin):
1189 """a bundle part read from a bundle"""
1190 """a bundle part read from a bundle"""
1190
1191
1191 def __init__(self, ui, header, fp):
1192 def __init__(self, ui, header, fp):
1192 super(unbundlepart, self).__init__(fp)
1193 super(unbundlepart, self).__init__(fp)
1193 self._seekable = (util.safehasattr(fp, 'seek') and
1194 self._seekable = (util.safehasattr(fp, 'seek') and
1194 util.safehasattr(fp, 'tell'))
1195 util.safehasattr(fp, 'tell'))
1195 self.ui = ui
1196 self.ui = ui
1196 # unbundle state attr
1197 # unbundle state attr
1197 self._headerdata = header
1198 self._headerdata = header
1198 self._headeroffset = 0
1199 self._headeroffset = 0
1199 self._initialized = False
1200 self._initialized = False
1200 self.consumed = False
1201 self.consumed = False
1201 # part data
1202 # part data
1202 self.id = None
1203 self.id = None
1203 self.type = None
1204 self.type = None
1204 self.mandatoryparams = None
1205 self.mandatoryparams = None
1205 self.advisoryparams = None
1206 self.advisoryparams = None
1206 self.params = None
1207 self.params = None
1207 self.mandatorykeys = ()
1208 self.mandatorykeys = ()
1208 self._payloadstream = None
1209 self._payloadstream = None
1209 self._readheader()
1210 self._readheader()
1210 self._mandatory = None
1211 self._mandatory = None
1211 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1212 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1212 self._pos = 0
1213 self._pos = 0
1213
1214
1214 def _fromheader(self, size):
1215 def _fromheader(self, size):
1215 """return the next <size> byte from the header"""
1216 """return the next <size> byte from the header"""
1216 offset = self._headeroffset
1217 offset = self._headeroffset
1217 data = self._headerdata[offset:(offset + size)]
1218 data = self._headerdata[offset:(offset + size)]
1218 self._headeroffset = offset + size
1219 self._headeroffset = offset + size
1219 return data
1220 return data
1220
1221
1221 def _unpackheader(self, format):
1222 def _unpackheader(self, format):
1222 """read given format from header
1223 """read given format from header
1223
1224
1224 This automatically compute the size of the format to read."""
1225 This automatically compute the size of the format to read."""
1225 data = self._fromheader(struct.calcsize(format))
1226 data = self._fromheader(struct.calcsize(format))
1226 return _unpack(format, data)
1227 return _unpack(format, data)
1227
1228
1228 def _initparams(self, mandatoryparams, advisoryparams):
1229 def _initparams(self, mandatoryparams, advisoryparams):
1229 """internal function to setup all logic related parameters"""
1230 """internal function to setup all logic related parameters"""
1230 # make it read only to prevent people touching it by mistake.
1231 # make it read only to prevent people touching it by mistake.
1231 self.mandatoryparams = tuple(mandatoryparams)
1232 self.mandatoryparams = tuple(mandatoryparams)
1232 self.advisoryparams = tuple(advisoryparams)
1233 self.advisoryparams = tuple(advisoryparams)
1233 # user friendly UI
1234 # user friendly UI
1234 self.params = util.sortdict(self.mandatoryparams)
1235 self.params = util.sortdict(self.mandatoryparams)
1235 self.params.update(self.advisoryparams)
1236 self.params.update(self.advisoryparams)
1236 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1237 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1237
1238
1238 def _payloadchunks(self, chunknum=0):
1239 def _payloadchunks(self, chunknum=0):
1239 '''seek to specified chunk and start yielding data'''
1240 '''seek to specified chunk and start yielding data'''
1240 if len(self._chunkindex) == 0:
1241 if len(self._chunkindex) == 0:
1241 assert chunknum == 0, 'Must start with chunk 0'
1242 assert chunknum == 0, 'Must start with chunk 0'
1242 self._chunkindex.append((0, self._tellfp()))
1243 self._chunkindex.append((0, self._tellfp()))
1243 else:
1244 else:
1244 assert chunknum < len(self._chunkindex), \
1245 assert chunknum < len(self._chunkindex), \
1245 'Unknown chunk %d' % chunknum
1246 'Unknown chunk %d' % chunknum
1246 self._seekfp(self._chunkindex[chunknum][1])
1247 self._seekfp(self._chunkindex[chunknum][1])
1247
1248
1248 pos = self._chunkindex[chunknum][0]
1249 pos = self._chunkindex[chunknum][0]
1249 payloadsize = self._unpack(_fpayloadsize)[0]
1250 payloadsize = self._unpack(_fpayloadsize)[0]
1250 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1251 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1251 while payloadsize:
1252 while payloadsize:
1252 if payloadsize == flaginterrupt:
1253 if payloadsize == flaginterrupt:
1253 # interruption detection, the handler will now read a
1254 # interruption detection, the handler will now read a
1254 # single part and process it.
1255 # single part and process it.
1255 interrupthandler(self.ui, self._fp)()
1256 interrupthandler(self.ui, self._fp)()
1256 elif payloadsize < 0:
1257 elif payloadsize < 0:
1257 msg = 'negative payload chunk size: %i' % payloadsize
1258 msg = 'negative payload chunk size: %i' % payloadsize
1258 raise error.BundleValueError(msg)
1259 raise error.BundleValueError(msg)
1259 else:
1260 else:
1260 result = self._readexact(payloadsize)
1261 result = self._readexact(payloadsize)
1261 chunknum += 1
1262 chunknum += 1
1262 pos += payloadsize
1263 pos += payloadsize
1263 if chunknum == len(self._chunkindex):
1264 if chunknum == len(self._chunkindex):
1264 self._chunkindex.append((pos, self._tellfp()))
1265 self._chunkindex.append((pos, self._tellfp()))
1265 yield result
1266 yield result
1266 payloadsize = self._unpack(_fpayloadsize)[0]
1267 payloadsize = self._unpack(_fpayloadsize)[0]
1267 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1268 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1268
1269
1269 def _findchunk(self, pos):
1270 def _findchunk(self, pos):
1270 '''for a given payload position, return a chunk number and offset'''
1271 '''for a given payload position, return a chunk number and offset'''
1271 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1272 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1272 if ppos == pos:
1273 if ppos == pos:
1273 return chunk, 0
1274 return chunk, 0
1274 elif ppos > pos:
1275 elif ppos > pos:
1275 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1276 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1276 raise ValueError('Unknown chunk')
1277 raise ValueError('Unknown chunk')
1277
1278
1278 def _readheader(self):
1279 def _readheader(self):
1279 """read the header and setup the object"""
1280 """read the header and setup the object"""
1280 typesize = self._unpackheader(_fparttypesize)[0]
1281 typesize = self._unpackheader(_fparttypesize)[0]
1281 self.type = self._fromheader(typesize)
1282 self.type = self._fromheader(typesize)
1282 indebug(self.ui, 'part type: "%s"' % self.type)
1283 indebug(self.ui, 'part type: "%s"' % self.type)
1283 self.id = self._unpackheader(_fpartid)[0]
1284 self.id = self._unpackheader(_fpartid)[0]
1284 indebug(self.ui, 'part id: "%s"' % pycompat.bytestr(self.id))
1285 indebug(self.ui, 'part id: "%s"' % pycompat.bytestr(self.id))
1285 # extract mandatory bit from type
1286 # extract mandatory bit from type
1286 self.mandatory = (self.type != self.type.lower())
1287 self.mandatory = (self.type != self.type.lower())
1287 self.type = self.type.lower()
1288 self.type = self.type.lower()
1288 ## reading parameters
1289 ## reading parameters
1289 # param count
1290 # param count
1290 mancount, advcount = self._unpackheader(_fpartparamcount)
1291 mancount, advcount = self._unpackheader(_fpartparamcount)
1291 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1292 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1292 # param size
1293 # param size
1293 fparamsizes = _makefpartparamsizes(mancount + advcount)
1294 fparamsizes = _makefpartparamsizes(mancount + advcount)
1294 paramsizes = self._unpackheader(fparamsizes)
1295 paramsizes = self._unpackheader(fparamsizes)
1295 # make it a list of couple again
1296 # make it a list of couple again
1296 paramsizes = list(zip(paramsizes[::2], paramsizes[1::2]))
1297 paramsizes = list(zip(paramsizes[::2], paramsizes[1::2]))
1297 # split mandatory from advisory
1298 # split mandatory from advisory
1298 mansizes = paramsizes[:mancount]
1299 mansizes = paramsizes[:mancount]
1299 advsizes = paramsizes[mancount:]
1300 advsizes = paramsizes[mancount:]
1300 # retrieve param value
1301 # retrieve param value
1301 manparams = []
1302 manparams = []
1302 for key, value in mansizes:
1303 for key, value in mansizes:
1303 manparams.append((self._fromheader(key), self._fromheader(value)))
1304 manparams.append((self._fromheader(key), self._fromheader(value)))
1304 advparams = []
1305 advparams = []
1305 for key, value in advsizes:
1306 for key, value in advsizes:
1306 advparams.append((self._fromheader(key), self._fromheader(value)))
1307 advparams.append((self._fromheader(key), self._fromheader(value)))
1307 self._initparams(manparams, advparams)
1308 self._initparams(manparams, advparams)
1308 ## part payload
1309 ## part payload
1309 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1310 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1310 # we read the data, tell it
1311 # we read the data, tell it
1311 self._initialized = True
1312 self._initialized = True
1312
1313
1313 def read(self, size=None):
1314 def read(self, size=None):
1314 """read payload data"""
1315 """read payload data"""
1315 if not self._initialized:
1316 if not self._initialized:
1316 self._readheader()
1317 self._readheader()
1317 if size is None:
1318 if size is None:
1318 data = self._payloadstream.read()
1319 data = self._payloadstream.read()
1319 else:
1320 else:
1320 data = self._payloadstream.read(size)
1321 data = self._payloadstream.read(size)
1321 self._pos += len(data)
1322 self._pos += len(data)
1322 if size is None or len(data) < size:
1323 if size is None or len(data) < size:
1323 if not self.consumed and self._pos:
1324 if not self.consumed and self._pos:
1324 self.ui.debug('bundle2-input-part: total payload size %i\n'
1325 self.ui.debug('bundle2-input-part: total payload size %i\n'
1325 % self._pos)
1326 % self._pos)
1326 self.consumed = True
1327 self.consumed = True
1327 return data
1328 return data
1328
1329
1329 def tell(self):
1330 def tell(self):
1330 return self._pos
1331 return self._pos
1331
1332
1332 def seek(self, offset, whence=0):
1333 def seek(self, offset, whence=0):
1333 if whence == 0:
1334 if whence == 0:
1334 newpos = offset
1335 newpos = offset
1335 elif whence == 1:
1336 elif whence == 1:
1336 newpos = self._pos + offset
1337 newpos = self._pos + offset
1337 elif whence == 2:
1338 elif whence == 2:
1338 if not self.consumed:
1339 if not self.consumed:
1339 self.read()
1340 self.read()
1340 newpos = self._chunkindex[-1][0] - offset
1341 newpos = self._chunkindex[-1][0] - offset
1341 else:
1342 else:
1342 raise ValueError('Unknown whence value: %r' % (whence,))
1343 raise ValueError('Unknown whence value: %r' % (whence,))
1343
1344
1344 if newpos > self._chunkindex[-1][0] and not self.consumed:
1345 if newpos > self._chunkindex[-1][0] and not self.consumed:
1345 self.read()
1346 self.read()
1346 if not 0 <= newpos <= self._chunkindex[-1][0]:
1347 if not 0 <= newpos <= self._chunkindex[-1][0]:
1347 raise ValueError('Offset out of range')
1348 raise ValueError('Offset out of range')
1348
1349
1349 if self._pos != newpos:
1350 if self._pos != newpos:
1350 chunk, internaloffset = self._findchunk(newpos)
1351 chunk, internaloffset = self._findchunk(newpos)
1351 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1352 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1352 adjust = self.read(internaloffset)
1353 adjust = self.read(internaloffset)
1353 if len(adjust) != internaloffset:
1354 if len(adjust) != internaloffset:
1354 raise error.Abort(_('Seek failed\n'))
1355 raise error.Abort(_('Seek failed\n'))
1355 self._pos = newpos
1356 self._pos = newpos
1356
1357
1357 def _seekfp(self, offset, whence=0):
1358 def _seekfp(self, offset, whence=0):
1358 """move the underlying file pointer
1359 """move the underlying file pointer
1359
1360
1360 This method is meant for internal usage by the bundle2 protocol only.
1361 This method is meant for internal usage by the bundle2 protocol only.
1361 They directly manipulate the low level stream including bundle2 level
1362 They directly manipulate the low level stream including bundle2 level
1362 instruction.
1363 instruction.
1363
1364
1364 Do not use it to implement higher-level logic or methods."""
1365 Do not use it to implement higher-level logic or methods."""
1365 if self._seekable:
1366 if self._seekable:
1366 return self._fp.seek(offset, whence)
1367 return self._fp.seek(offset, whence)
1367 else:
1368 else:
1368 raise NotImplementedError(_('File pointer is not seekable'))
1369 raise NotImplementedError(_('File pointer is not seekable'))
1369
1370
1370 def _tellfp(self):
1371 def _tellfp(self):
1371 """return the file offset, or None if file is not seekable
1372 """return the file offset, or None if file is not seekable
1372
1373
1373 This method is meant for internal usage by the bundle2 protocol only.
1374 This method is meant for internal usage by the bundle2 protocol only.
1374 They directly manipulate the low level stream including bundle2 level
1375 They directly manipulate the low level stream including bundle2 level
1375 instruction.
1376 instruction.
1376
1377
1377 Do not use it to implement higher-level logic or methods."""
1378 Do not use it to implement higher-level logic or methods."""
1378 if self._seekable:
1379 if self._seekable:
1379 try:
1380 try:
1380 return self._fp.tell()
1381 return self._fp.tell()
1381 except IOError as e:
1382 except IOError as e:
1382 if e.errno == errno.ESPIPE:
1383 if e.errno == errno.ESPIPE:
1383 self._seekable = False
1384 self._seekable = False
1384 else:
1385 else:
1385 raise
1386 raise
1386 return None
1387 return None
1387
1388
1388 # These are only the static capabilities.
1389 # These are only the static capabilities.
1389 # Check the 'getrepocaps' function for the rest.
1390 # Check the 'getrepocaps' function for the rest.
1390 capabilities = {'HG20': (),
1391 capabilities = {'HG20': (),
1391 'error': ('abort', 'unsupportedcontent', 'pushraced',
1392 'error': ('abort', 'unsupportedcontent', 'pushraced',
1392 'pushkey'),
1393 'pushkey'),
1393 'listkeys': (),
1394 'listkeys': (),
1394 'pushkey': (),
1395 'pushkey': (),
1395 'digests': tuple(sorted(util.DIGESTS.keys())),
1396 'digests': tuple(sorted(util.DIGESTS.keys())),
1396 'remote-changegroup': ('http', 'https'),
1397 'remote-changegroup': ('http', 'https'),
1397 'hgtagsfnodes': (),
1398 'hgtagsfnodes': (),
1398 'phases': ('heads',),
1399 'phases': ('heads',),
1399 }
1400 }
1400
1401
1401 def getrepocaps(repo, allowpushback=False):
1402 def getrepocaps(repo, allowpushback=False):
1402 """return the bundle2 capabilities for a given repo
1403 """return the bundle2 capabilities for a given repo
1403
1404
1404 Exists to allow extensions (like evolution) to mutate the capabilities.
1405 Exists to allow extensions (like evolution) to mutate the capabilities.
1405 """
1406 """
1406 caps = capabilities.copy()
1407 caps = capabilities.copy()
1407 caps['changegroup'] = tuple(sorted(
1408 caps['changegroup'] = tuple(sorted(
1408 changegroup.supportedincomingversions(repo)))
1409 changegroup.supportedincomingversions(repo)))
1409 if obsolete.isenabled(repo, obsolete.exchangeopt):
1410 if obsolete.isenabled(repo, obsolete.exchangeopt):
1410 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1411 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1411 caps['obsmarkers'] = supportedformat
1412 caps['obsmarkers'] = supportedformat
1412 if allowpushback:
1413 if allowpushback:
1413 caps['pushback'] = ()
1414 caps['pushback'] = ()
1414 cpmode = repo.ui.config('server', 'concurrent-push-mode')
1415 cpmode = repo.ui.config('server', 'concurrent-push-mode')
1415 if cpmode == 'check-related':
1416 if cpmode == 'check-related':
1416 caps['checkheads'] = ('related',)
1417 caps['checkheads'] = ('related',)
1417 if 'phases' in repo.ui.configlist('devel', 'legacy.exchange'):
1418 if 'phases' in repo.ui.configlist('devel', 'legacy.exchange'):
1418 caps.pop('phases')
1419 caps.pop('phases')
1419 return caps
1420 return caps
1420
1421
1421 def bundle2caps(remote):
1422 def bundle2caps(remote):
1422 """return the bundle capabilities of a peer as dict"""
1423 """return the bundle capabilities of a peer as dict"""
1423 raw = remote.capable('bundle2')
1424 raw = remote.capable('bundle2')
1424 if not raw and raw != '':
1425 if not raw and raw != '':
1425 return {}
1426 return {}
1426 capsblob = urlreq.unquote(remote.capable('bundle2'))
1427 capsblob = urlreq.unquote(remote.capable('bundle2'))
1427 return decodecaps(capsblob)
1428 return decodecaps(capsblob)
1428
1429
1429 def obsmarkersversion(caps):
1430 def obsmarkersversion(caps):
1430 """extract the list of supported obsmarkers versions from a bundle2caps dict
1431 """extract the list of supported obsmarkers versions from a bundle2caps dict
1431 """
1432 """
1432 obscaps = caps.get('obsmarkers', ())
1433 obscaps = caps.get('obsmarkers', ())
1433 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1434 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1434
1435
1435 def writenewbundle(ui, repo, source, filename, bundletype, outgoing, opts,
1436 def writenewbundle(ui, repo, source, filename, bundletype, outgoing, opts,
1436 vfs=None, compression=None, compopts=None):
1437 vfs=None, compression=None, compopts=None):
1437 if bundletype.startswith('HG10'):
1438 if bundletype.startswith('HG10'):
1438 cg = changegroup.makechangegroup(repo, outgoing, '01', source)
1439 cg = changegroup.makechangegroup(repo, outgoing, '01', source)
1439 return writebundle(ui, cg, filename, bundletype, vfs=vfs,
1440 return writebundle(ui, cg, filename, bundletype, vfs=vfs,
1440 compression=compression, compopts=compopts)
1441 compression=compression, compopts=compopts)
1441 elif not bundletype.startswith('HG20'):
1442 elif not bundletype.startswith('HG20'):
1442 raise error.ProgrammingError('unknown bundle type: %s' % bundletype)
1443 raise error.ProgrammingError('unknown bundle type: %s' % bundletype)
1443
1444
1444 caps = {}
1445 caps = {}
1445 if 'obsolescence' in opts:
1446 if 'obsolescence' in opts:
1446 caps['obsmarkers'] = ('V1',)
1447 caps['obsmarkers'] = ('V1',)
1447 bundle = bundle20(ui, caps)
1448 bundle = bundle20(ui, caps)
1448 bundle.setcompression(compression, compopts)
1449 bundle.setcompression(compression, compopts)
1449 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1450 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1450 chunkiter = bundle.getchunks()
1451 chunkiter = bundle.getchunks()
1451
1452
1452 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1453 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1453
1454
1454 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1455 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1455 # We should eventually reconcile this logic with the one behind
1456 # We should eventually reconcile this logic with the one behind
1456 # 'exchange.getbundle2partsgenerator'.
1457 # 'exchange.getbundle2partsgenerator'.
1457 #
1458 #
1458 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1459 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1459 # different right now. So we keep them separated for now for the sake of
1460 # different right now. So we keep them separated for now for the sake of
1460 # simplicity.
1461 # simplicity.
1461
1462
1462 # we always want a changegroup in such bundle
1463 # we always want a changegroup in such bundle
1463 cgversion = opts.get('cg.version')
1464 cgversion = opts.get('cg.version')
1464 if cgversion is None:
1465 if cgversion is None:
1465 cgversion = changegroup.safeversion(repo)
1466 cgversion = changegroup.safeversion(repo)
1466 cg = changegroup.makechangegroup(repo, outgoing, cgversion, source)
1467 cg = changegroup.makechangegroup(repo, outgoing, cgversion, source)
1467 part = bundler.newpart('changegroup', data=cg.getchunks())
1468 part = bundler.newpart('changegroup', data=cg.getchunks())
1468 part.addparam('version', cg.version)
1469 part.addparam('version', cg.version)
1469 if 'clcount' in cg.extras:
1470 if 'clcount' in cg.extras:
1470 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1471 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1471 mandatory=False)
1472 mandatory=False)
1472 if opts.get('phases') and repo.revs('%ln and secret()',
1473 if opts.get('phases') and repo.revs('%ln and secret()',
1473 outgoing.missingheads):
1474 outgoing.missingheads):
1474 part.addparam('targetphase', '%d' % phases.secret, mandatory=False)
1475 part.addparam('targetphase', '%d' % phases.secret, mandatory=False)
1475
1476
1476 addparttagsfnodescache(repo, bundler, outgoing)
1477 addparttagsfnodescache(repo, bundler, outgoing)
1477
1478
1478 if opts.get('obsolescence', False):
1479 if opts.get('obsolescence', False):
1479 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1480 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1480 buildobsmarkerspart(bundler, obsmarkers)
1481 buildobsmarkerspart(bundler, obsmarkers)
1481
1482
1482 if opts.get('phases', False):
1483 if opts.get('phases', False):
1483 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1484 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1484 phasedata = phases.binaryencode(headsbyphase)
1485 phasedata = phases.binaryencode(headsbyphase)
1485 bundler.newpart('phase-heads', data=phasedata)
1486 bundler.newpart('phase-heads', data=phasedata)
1486
1487
1487 def addparttagsfnodescache(repo, bundler, outgoing):
1488 def addparttagsfnodescache(repo, bundler, outgoing):
1488 # we include the tags fnode cache for the bundle changeset
1489 # we include the tags fnode cache for the bundle changeset
1489 # (as an optional parts)
1490 # (as an optional parts)
1490 cache = tags.hgtagsfnodescache(repo.unfiltered())
1491 cache = tags.hgtagsfnodescache(repo.unfiltered())
1491 chunks = []
1492 chunks = []
1492
1493
1493 # .hgtags fnodes are only relevant for head changesets. While we could
1494 # .hgtags fnodes are only relevant for head changesets. While we could
1494 # transfer values for all known nodes, there will likely be little to
1495 # transfer values for all known nodes, there will likely be little to
1495 # no benefit.
1496 # no benefit.
1496 #
1497 #
1497 # We don't bother using a generator to produce output data because
1498 # We don't bother using a generator to produce output data because
1498 # a) we only have 40 bytes per head and even esoteric numbers of heads
1499 # a) we only have 40 bytes per head and even esoteric numbers of heads
1499 # consume little memory (1M heads is 40MB) b) we don't want to send the
1500 # consume little memory (1M heads is 40MB) b) we don't want to send the
1500 # part if we don't have entries and knowing if we have entries requires
1501 # part if we don't have entries and knowing if we have entries requires
1501 # cache lookups.
1502 # cache lookups.
1502 for node in outgoing.missingheads:
1503 for node in outgoing.missingheads:
1503 # Don't compute missing, as this may slow down serving.
1504 # Don't compute missing, as this may slow down serving.
1504 fnode = cache.getfnode(node, computemissing=False)
1505 fnode = cache.getfnode(node, computemissing=False)
1505 if fnode is not None:
1506 if fnode is not None:
1506 chunks.extend([node, fnode])
1507 chunks.extend([node, fnode])
1507
1508
1508 if chunks:
1509 if chunks:
1509 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1510 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1510
1511
1511 def buildobsmarkerspart(bundler, markers):
1512 def buildobsmarkerspart(bundler, markers):
1512 """add an obsmarker part to the bundler with <markers>
1513 """add an obsmarker part to the bundler with <markers>
1513
1514
1514 No part is created if markers is empty.
1515 No part is created if markers is empty.
1515 Raises ValueError if the bundler doesn't support any known obsmarker format.
1516 Raises ValueError if the bundler doesn't support any known obsmarker format.
1516 """
1517 """
1517 if not markers:
1518 if not markers:
1518 return None
1519 return None
1519
1520
1520 remoteversions = obsmarkersversion(bundler.capabilities)
1521 remoteversions = obsmarkersversion(bundler.capabilities)
1521 version = obsolete.commonversion(remoteversions)
1522 version = obsolete.commonversion(remoteversions)
1522 if version is None:
1523 if version is None:
1523 raise ValueError('bundler does not support common obsmarker format')
1524 raise ValueError('bundler does not support common obsmarker format')
1524 stream = obsolete.encodemarkers(markers, True, version=version)
1525 stream = obsolete.encodemarkers(markers, True, version=version)
1525 return bundler.newpart('obsmarkers', data=stream)
1526 return bundler.newpart('obsmarkers', data=stream)
1526
1527
1527 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1528 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1528 compopts=None):
1529 compopts=None):
1529 """Write a bundle file and return its filename.
1530 """Write a bundle file and return its filename.
1530
1531
1531 Existing files will not be overwritten.
1532 Existing files will not be overwritten.
1532 If no filename is specified, a temporary file is created.
1533 If no filename is specified, a temporary file is created.
1533 bz2 compression can be turned off.
1534 bz2 compression can be turned off.
1534 The bundle file will be deleted in case of errors.
1535 The bundle file will be deleted in case of errors.
1535 """
1536 """
1536
1537
1537 if bundletype == "HG20":
1538 if bundletype == "HG20":
1538 bundle = bundle20(ui)
1539 bundle = bundle20(ui)
1539 bundle.setcompression(compression, compopts)
1540 bundle.setcompression(compression, compopts)
1540 part = bundle.newpart('changegroup', data=cg.getchunks())
1541 part = bundle.newpart('changegroup', data=cg.getchunks())
1541 part.addparam('version', cg.version)
1542 part.addparam('version', cg.version)
1542 if 'clcount' in cg.extras:
1543 if 'clcount' in cg.extras:
1543 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1544 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1544 mandatory=False)
1545 mandatory=False)
1545 chunkiter = bundle.getchunks()
1546 chunkiter = bundle.getchunks()
1546 else:
1547 else:
1547 # compression argument is only for the bundle2 case
1548 # compression argument is only for the bundle2 case
1548 assert compression is None
1549 assert compression is None
1549 if cg.version != '01':
1550 if cg.version != '01':
1550 raise error.Abort(_('old bundle types only supports v1 '
1551 raise error.Abort(_('old bundle types only supports v1 '
1551 'changegroups'))
1552 'changegroups'))
1552 header, comp = bundletypes[bundletype]
1553 header, comp = bundletypes[bundletype]
1553 if comp not in util.compengines.supportedbundletypes:
1554 if comp not in util.compengines.supportedbundletypes:
1554 raise error.Abort(_('unknown stream compression type: %s')
1555 raise error.Abort(_('unknown stream compression type: %s')
1555 % comp)
1556 % comp)
1556 compengine = util.compengines.forbundletype(comp)
1557 compengine = util.compengines.forbundletype(comp)
1557 def chunkiter():
1558 def chunkiter():
1558 yield header
1559 yield header
1559 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1560 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1560 yield chunk
1561 yield chunk
1561 chunkiter = chunkiter()
1562 chunkiter = chunkiter()
1562
1563
1563 # parse the changegroup data, otherwise we will block
1564 # parse the changegroup data, otherwise we will block
1564 # in case of sshrepo because we don't know the end of the stream
1565 # in case of sshrepo because we don't know the end of the stream
1565 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1566 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1566
1567
1567 def combinechangegroupresults(op):
1568 def combinechangegroupresults(op):
1568 """logic to combine 0 or more addchangegroup results into one"""
1569 """logic to combine 0 or more addchangegroup results into one"""
1569 results = [r.get('return', 0)
1570 results = [r.get('return', 0)
1570 for r in op.records['changegroup']]
1571 for r in op.records['changegroup']]
1571 changedheads = 0
1572 changedheads = 0
1572 result = 1
1573 result = 1
1573 for ret in results:
1574 for ret in results:
1574 # If any changegroup result is 0, return 0
1575 # If any changegroup result is 0, return 0
1575 if ret == 0:
1576 if ret == 0:
1576 result = 0
1577 result = 0
1577 break
1578 break
1578 if ret < -1:
1579 if ret < -1:
1579 changedheads += ret + 1
1580 changedheads += ret + 1
1580 elif ret > 1:
1581 elif ret > 1:
1581 changedheads += ret - 1
1582 changedheads += ret - 1
1582 if changedheads > 0:
1583 if changedheads > 0:
1583 result = 1 + changedheads
1584 result = 1 + changedheads
1584 elif changedheads < 0:
1585 elif changedheads < 0:
1585 result = -1 + changedheads
1586 result = -1 + changedheads
1586 return result
1587 return result
1587
1588
1588 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest',
1589 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest',
1589 'targetphase'))
1590 'targetphase'))
1590 def handlechangegroup(op, inpart):
1591 def handlechangegroup(op, inpart):
1591 """apply a changegroup part on the repo
1592 """apply a changegroup part on the repo
1592
1593
1593 This is a very early implementation that will massive rework before being
1594 This is a very early implementation that will massive rework before being
1594 inflicted to any end-user.
1595 inflicted to any end-user.
1595 """
1596 """
1596 tr = op.gettransaction()
1597 tr = op.gettransaction()
1597 unpackerversion = inpart.params.get('version', '01')
1598 unpackerversion = inpart.params.get('version', '01')
1598 # We should raise an appropriate exception here
1599 # We should raise an appropriate exception here
1599 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1600 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1600 # the source and url passed here are overwritten by the one contained in
1601 # the source and url passed here are overwritten by the one contained in
1601 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1602 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1602 nbchangesets = None
1603 nbchangesets = None
1603 if 'nbchanges' in inpart.params:
1604 if 'nbchanges' in inpart.params:
1604 nbchangesets = int(inpart.params.get('nbchanges'))
1605 nbchangesets = int(inpart.params.get('nbchanges'))
1605 if ('treemanifest' in inpart.params and
1606 if ('treemanifest' in inpart.params and
1606 'treemanifest' not in op.repo.requirements):
1607 'treemanifest' not in op.repo.requirements):
1607 if len(op.repo.changelog) != 0:
1608 if len(op.repo.changelog) != 0:
1608 raise error.Abort(_(
1609 raise error.Abort(_(
1609 "bundle contains tree manifests, but local repo is "
1610 "bundle contains tree manifests, but local repo is "
1610 "non-empty and does not use tree manifests"))
1611 "non-empty and does not use tree manifests"))
1611 op.repo.requirements.add('treemanifest')
1612 op.repo.requirements.add('treemanifest')
1612 op.repo._applyopenerreqs()
1613 op.repo._applyopenerreqs()
1613 op.repo._writerequirements()
1614 op.repo._writerequirements()
1614 extrakwargs = {}
1615 extrakwargs = {}
1615 targetphase = inpart.params.get('targetphase')
1616 targetphase = inpart.params.get('targetphase')
1616 if targetphase is not None:
1617 if targetphase is not None:
1617 extrakwargs['targetphase'] = int(targetphase)
1618 extrakwargs['targetphase'] = int(targetphase)
1618 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2',
1619 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2',
1619 expectedtotal=nbchangesets, **extrakwargs)
1620 expectedtotal=nbchangesets, **extrakwargs)
1620 if op.reply is not None:
1621 if op.reply is not None:
1621 # This is definitely not the final form of this
1622 # This is definitely not the final form of this
1622 # return. But one need to start somewhere.
1623 # return. But one need to start somewhere.
1623 part = op.reply.newpart('reply:changegroup', mandatory=False)
1624 part = op.reply.newpart('reply:changegroup', mandatory=False)
1624 part.addparam(
1625 part.addparam(
1625 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1626 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1626 part.addparam('return', '%i' % ret, mandatory=False)
1627 part.addparam('return', '%i' % ret, mandatory=False)
1627 assert not inpart.read()
1628 assert not inpart.read()
1628
1629
1629 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1630 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1630 ['digest:%s' % k for k in util.DIGESTS.keys()])
1631 ['digest:%s' % k for k in util.DIGESTS.keys()])
1631 @parthandler('remote-changegroup', _remotechangegroupparams)
1632 @parthandler('remote-changegroup', _remotechangegroupparams)
1632 def handleremotechangegroup(op, inpart):
1633 def handleremotechangegroup(op, inpart):
1633 """apply a bundle10 on the repo, given an url and validation information
1634 """apply a bundle10 on the repo, given an url and validation information
1634
1635
1635 All the information about the remote bundle to import are given as
1636 All the information about the remote bundle to import are given as
1636 parameters. The parameters include:
1637 parameters. The parameters include:
1637 - url: the url to the bundle10.
1638 - url: the url to the bundle10.
1638 - size: the bundle10 file size. It is used to validate what was
1639 - size: the bundle10 file size. It is used to validate what was
1639 retrieved by the client matches the server knowledge about the bundle.
1640 retrieved by the client matches the server knowledge about the bundle.
1640 - digests: a space separated list of the digest types provided as
1641 - digests: a space separated list of the digest types provided as
1641 parameters.
1642 parameters.
1642 - digest:<digest-type>: the hexadecimal representation of the digest with
1643 - digest:<digest-type>: the hexadecimal representation of the digest with
1643 that name. Like the size, it is used to validate what was retrieved by
1644 that name. Like the size, it is used to validate what was retrieved by
1644 the client matches what the server knows about the bundle.
1645 the client matches what the server knows about the bundle.
1645
1646
1646 When multiple digest types are given, all of them are checked.
1647 When multiple digest types are given, all of them are checked.
1647 """
1648 """
1648 try:
1649 try:
1649 raw_url = inpart.params['url']
1650 raw_url = inpart.params['url']
1650 except KeyError:
1651 except KeyError:
1651 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1652 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1652 parsed_url = util.url(raw_url)
1653 parsed_url = util.url(raw_url)
1653 if parsed_url.scheme not in capabilities['remote-changegroup']:
1654 if parsed_url.scheme not in capabilities['remote-changegroup']:
1654 raise error.Abort(_('remote-changegroup does not support %s urls') %
1655 raise error.Abort(_('remote-changegroup does not support %s urls') %
1655 parsed_url.scheme)
1656 parsed_url.scheme)
1656
1657
1657 try:
1658 try:
1658 size = int(inpart.params['size'])
1659 size = int(inpart.params['size'])
1659 except ValueError:
1660 except ValueError:
1660 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1661 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1661 % 'size')
1662 % 'size')
1662 except KeyError:
1663 except KeyError:
1663 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1664 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1664
1665
1665 digests = {}
1666 digests = {}
1666 for typ in inpart.params.get('digests', '').split():
1667 for typ in inpart.params.get('digests', '').split():
1667 param = 'digest:%s' % typ
1668 param = 'digest:%s' % typ
1668 try:
1669 try:
1669 value = inpart.params[param]
1670 value = inpart.params[param]
1670 except KeyError:
1671 except KeyError:
1671 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1672 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1672 param)
1673 param)
1673 digests[typ] = value
1674 digests[typ] = value
1674
1675
1675 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1676 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1676
1677
1677 tr = op.gettransaction()
1678 tr = op.gettransaction()
1678 from . import exchange
1679 from . import exchange
1679 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1680 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1680 if not isinstance(cg, changegroup.cg1unpacker):
1681 if not isinstance(cg, changegroup.cg1unpacker):
1681 raise error.Abort(_('%s: not a bundle version 1.0') %
1682 raise error.Abort(_('%s: not a bundle version 1.0') %
1682 util.hidepassword(raw_url))
1683 util.hidepassword(raw_url))
1683 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2')
1684 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2')
1684 if op.reply is not None:
1685 if op.reply is not None:
1685 # This is definitely not the final form of this
1686 # This is definitely not the final form of this
1686 # return. But one need to start somewhere.
1687 # return. But one need to start somewhere.
1687 part = op.reply.newpart('reply:changegroup')
1688 part = op.reply.newpart('reply:changegroup')
1688 part.addparam(
1689 part.addparam(
1689 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1690 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1690 part.addparam('return', '%i' % ret, mandatory=False)
1691 part.addparam('return', '%i' % ret, mandatory=False)
1691 try:
1692 try:
1692 real_part.validate()
1693 real_part.validate()
1693 except error.Abort as e:
1694 except error.Abort as e:
1694 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1695 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1695 (util.hidepassword(raw_url), str(e)))
1696 (util.hidepassword(raw_url), str(e)))
1696 assert not inpart.read()
1697 assert not inpart.read()
1697
1698
1698 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1699 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1699 def handlereplychangegroup(op, inpart):
1700 def handlereplychangegroup(op, inpart):
1700 ret = int(inpart.params['return'])
1701 ret = int(inpart.params['return'])
1701 replyto = int(inpart.params['in-reply-to'])
1702 replyto = int(inpart.params['in-reply-to'])
1702 op.records.add('changegroup', {'return': ret}, replyto)
1703 op.records.add('changegroup', {'return': ret}, replyto)
1703
1704
1704 @parthandler('check:heads')
1705 @parthandler('check:heads')
1705 def handlecheckheads(op, inpart):
1706 def handlecheckheads(op, inpart):
1706 """check that head of the repo did not change
1707 """check that head of the repo did not change
1707
1708
1708 This is used to detect a push race when using unbundle.
1709 This is used to detect a push race when using unbundle.
1709 This replaces the "heads" argument of unbundle."""
1710 This replaces the "heads" argument of unbundle."""
1710 h = inpart.read(20)
1711 h = inpart.read(20)
1711 heads = []
1712 heads = []
1712 while len(h) == 20:
1713 while len(h) == 20:
1713 heads.append(h)
1714 heads.append(h)
1714 h = inpart.read(20)
1715 h = inpart.read(20)
1715 assert not h
1716 assert not h
1716 # Trigger a transaction so that we are guaranteed to have the lock now.
1717 # Trigger a transaction so that we are guaranteed to have the lock now.
1717 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1718 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1718 op.gettransaction()
1719 op.gettransaction()
1719 if sorted(heads) != sorted(op.repo.heads()):
1720 if sorted(heads) != sorted(op.repo.heads()):
1720 raise error.PushRaced('repository changed while pushing - '
1721 raise error.PushRaced('repository changed while pushing - '
1721 'please try again')
1722 'please try again')
1722
1723
1723 @parthandler('check:updated-heads')
1724 @parthandler('check:updated-heads')
1724 def handlecheckupdatedheads(op, inpart):
1725 def handlecheckupdatedheads(op, inpart):
1725 """check for race on the heads touched by a push
1726 """check for race on the heads touched by a push
1726
1727
1727 This is similar to 'check:heads' but focus on the heads actually updated
1728 This is similar to 'check:heads' but focus on the heads actually updated
1728 during the push. If other activities happen on unrelated heads, it is
1729 during the push. If other activities happen on unrelated heads, it is
1729 ignored.
1730 ignored.
1730
1731
1731 This allow server with high traffic to avoid push contention as long as
1732 This allow server with high traffic to avoid push contention as long as
1732 unrelated parts of the graph are involved."""
1733 unrelated parts of the graph are involved."""
1733 h = inpart.read(20)
1734 h = inpart.read(20)
1734 heads = []
1735 heads = []
1735 while len(h) == 20:
1736 while len(h) == 20:
1736 heads.append(h)
1737 heads.append(h)
1737 h = inpart.read(20)
1738 h = inpart.read(20)
1738 assert not h
1739 assert not h
1739 # trigger a transaction so that we are guaranteed to have the lock now.
1740 # trigger a transaction so that we are guaranteed to have the lock now.
1740 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1741 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1741 op.gettransaction()
1742 op.gettransaction()
1742
1743
1743 currentheads = set()
1744 currentheads = set()
1744 for ls in op.repo.branchmap().itervalues():
1745 for ls in op.repo.branchmap().itervalues():
1745 currentheads.update(ls)
1746 currentheads.update(ls)
1746
1747
1747 for h in heads:
1748 for h in heads:
1748 if h not in currentheads:
1749 if h not in currentheads:
1749 raise error.PushRaced('repository changed while pushing - '
1750 raise error.PushRaced('repository changed while pushing - '
1750 'please try again')
1751 'please try again')
1751
1752
1753 @parthandler('check:phases')
1754 def handlecheckphases(op, inpart):
1755 """check that phase boundaries of the repository did not change
1756
1757 This is used to detect a push race.
1758 """
1759 phasetonodes = phases.binarydecode(inpart)
1760 unfi = op.repo.unfiltered()
1761 cl = unfi.changelog
1762 phasecache = unfi._phasecache
1763 msg = ('repository changed while pushing - please try again '
1764 '(%s is %s expected %s)')
1765 for expectedphase, nodes in enumerate(phasetonodes):
1766 for n in nodes:
1767 actualphase = phasecache.phase(unfi, cl.rev(n))
1768 if actualphase != expectedphase:
1769 finalmsg = msg % (nodemod.short(n),
1770 phases.phasenames[actualphase],
1771 phases.phasenames[expectedphase])
1772 raise error.PushRaced(finalmsg)
1773
1752 @parthandler('output')
1774 @parthandler('output')
1753 def handleoutput(op, inpart):
1775 def handleoutput(op, inpart):
1754 """forward output captured on the server to the client"""
1776 """forward output captured on the server to the client"""
1755 for line in inpart.read().splitlines():
1777 for line in inpart.read().splitlines():
1756 op.ui.status(_('remote: %s\n') % line)
1778 op.ui.status(_('remote: %s\n') % line)
1757
1779
1758 @parthandler('replycaps')
1780 @parthandler('replycaps')
1759 def handlereplycaps(op, inpart):
1781 def handlereplycaps(op, inpart):
1760 """Notify that a reply bundle should be created
1782 """Notify that a reply bundle should be created
1761
1783
1762 The payload contains the capabilities information for the reply"""
1784 The payload contains the capabilities information for the reply"""
1763 caps = decodecaps(inpart.read())
1785 caps = decodecaps(inpart.read())
1764 if op.reply is None:
1786 if op.reply is None:
1765 op.reply = bundle20(op.ui, caps)
1787 op.reply = bundle20(op.ui, caps)
1766
1788
1767 class AbortFromPart(error.Abort):
1789 class AbortFromPart(error.Abort):
1768 """Sub-class of Abort that denotes an error from a bundle2 part."""
1790 """Sub-class of Abort that denotes an error from a bundle2 part."""
1769
1791
1770 @parthandler('error:abort', ('message', 'hint'))
1792 @parthandler('error:abort', ('message', 'hint'))
1771 def handleerrorabort(op, inpart):
1793 def handleerrorabort(op, inpart):
1772 """Used to transmit abort error over the wire"""
1794 """Used to transmit abort error over the wire"""
1773 raise AbortFromPart(inpart.params['message'],
1795 raise AbortFromPart(inpart.params['message'],
1774 hint=inpart.params.get('hint'))
1796 hint=inpart.params.get('hint'))
1775
1797
1776 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1798 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1777 'in-reply-to'))
1799 'in-reply-to'))
1778 def handleerrorpushkey(op, inpart):
1800 def handleerrorpushkey(op, inpart):
1779 """Used to transmit failure of a mandatory pushkey over the wire"""
1801 """Used to transmit failure of a mandatory pushkey over the wire"""
1780 kwargs = {}
1802 kwargs = {}
1781 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1803 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1782 value = inpart.params.get(name)
1804 value = inpart.params.get(name)
1783 if value is not None:
1805 if value is not None:
1784 kwargs[name] = value
1806 kwargs[name] = value
1785 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1807 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1786
1808
1787 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1809 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1788 def handleerrorunsupportedcontent(op, inpart):
1810 def handleerrorunsupportedcontent(op, inpart):
1789 """Used to transmit unknown content error over the wire"""
1811 """Used to transmit unknown content error over the wire"""
1790 kwargs = {}
1812 kwargs = {}
1791 parttype = inpart.params.get('parttype')
1813 parttype = inpart.params.get('parttype')
1792 if parttype is not None:
1814 if parttype is not None:
1793 kwargs['parttype'] = parttype
1815 kwargs['parttype'] = parttype
1794 params = inpart.params.get('params')
1816 params = inpart.params.get('params')
1795 if params is not None:
1817 if params is not None:
1796 kwargs['params'] = params.split('\0')
1818 kwargs['params'] = params.split('\0')
1797
1819
1798 raise error.BundleUnknownFeatureError(**kwargs)
1820 raise error.BundleUnknownFeatureError(**kwargs)
1799
1821
1800 @parthandler('error:pushraced', ('message',))
1822 @parthandler('error:pushraced', ('message',))
1801 def handleerrorpushraced(op, inpart):
1823 def handleerrorpushraced(op, inpart):
1802 """Used to transmit push race error over the wire"""
1824 """Used to transmit push race error over the wire"""
1803 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1825 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1804
1826
1805 @parthandler('listkeys', ('namespace',))
1827 @parthandler('listkeys', ('namespace',))
1806 def handlelistkeys(op, inpart):
1828 def handlelistkeys(op, inpart):
1807 """retrieve pushkey namespace content stored in a bundle2"""
1829 """retrieve pushkey namespace content stored in a bundle2"""
1808 namespace = inpart.params['namespace']
1830 namespace = inpart.params['namespace']
1809 r = pushkey.decodekeys(inpart.read())
1831 r = pushkey.decodekeys(inpart.read())
1810 op.records.add('listkeys', (namespace, r))
1832 op.records.add('listkeys', (namespace, r))
1811
1833
1812 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1834 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1813 def handlepushkey(op, inpart):
1835 def handlepushkey(op, inpart):
1814 """process a pushkey request"""
1836 """process a pushkey request"""
1815 dec = pushkey.decode
1837 dec = pushkey.decode
1816 namespace = dec(inpart.params['namespace'])
1838 namespace = dec(inpart.params['namespace'])
1817 key = dec(inpart.params['key'])
1839 key = dec(inpart.params['key'])
1818 old = dec(inpart.params['old'])
1840 old = dec(inpart.params['old'])
1819 new = dec(inpart.params['new'])
1841 new = dec(inpart.params['new'])
1820 # Grab the transaction to ensure that we have the lock before performing the
1842 # Grab the transaction to ensure that we have the lock before performing the
1821 # pushkey.
1843 # pushkey.
1822 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1844 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1823 op.gettransaction()
1845 op.gettransaction()
1824 ret = op.repo.pushkey(namespace, key, old, new)
1846 ret = op.repo.pushkey(namespace, key, old, new)
1825 record = {'namespace': namespace,
1847 record = {'namespace': namespace,
1826 'key': key,
1848 'key': key,
1827 'old': old,
1849 'old': old,
1828 'new': new}
1850 'new': new}
1829 op.records.add('pushkey', record)
1851 op.records.add('pushkey', record)
1830 if op.reply is not None:
1852 if op.reply is not None:
1831 rpart = op.reply.newpart('reply:pushkey')
1853 rpart = op.reply.newpart('reply:pushkey')
1832 rpart.addparam(
1854 rpart.addparam(
1833 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1855 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1834 rpart.addparam('return', '%i' % ret, mandatory=False)
1856 rpart.addparam('return', '%i' % ret, mandatory=False)
1835 if inpart.mandatory and not ret:
1857 if inpart.mandatory and not ret:
1836 kwargs = {}
1858 kwargs = {}
1837 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1859 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1838 if key in inpart.params:
1860 if key in inpart.params:
1839 kwargs[key] = inpart.params[key]
1861 kwargs[key] = inpart.params[key]
1840 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1862 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1841
1863
1842 @parthandler('phase-heads')
1864 @parthandler('phase-heads')
1843 def handlephases(op, inpart):
1865 def handlephases(op, inpart):
1844 """apply phases from bundle part to repo"""
1866 """apply phases from bundle part to repo"""
1845 headsbyphase = phases.binarydecode(inpart)
1867 headsbyphase = phases.binarydecode(inpart)
1846 phases.updatephases(op.repo.unfiltered(), op.gettransaction, headsbyphase)
1868 phases.updatephases(op.repo.unfiltered(), op.gettransaction, headsbyphase)
1847
1869
1848 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1870 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1849 def handlepushkeyreply(op, inpart):
1871 def handlepushkeyreply(op, inpart):
1850 """retrieve the result of a pushkey request"""
1872 """retrieve the result of a pushkey request"""
1851 ret = int(inpart.params['return'])
1873 ret = int(inpart.params['return'])
1852 partid = int(inpart.params['in-reply-to'])
1874 partid = int(inpart.params['in-reply-to'])
1853 op.records.add('pushkey', {'return': ret}, partid)
1875 op.records.add('pushkey', {'return': ret}, partid)
1854
1876
1855 @parthandler('obsmarkers')
1877 @parthandler('obsmarkers')
1856 def handleobsmarker(op, inpart):
1878 def handleobsmarker(op, inpart):
1857 """add a stream of obsmarkers to the repo"""
1879 """add a stream of obsmarkers to the repo"""
1858 tr = op.gettransaction()
1880 tr = op.gettransaction()
1859 markerdata = inpart.read()
1881 markerdata = inpart.read()
1860 if op.ui.config('experimental', 'obsmarkers-exchange-debug'):
1882 if op.ui.config('experimental', 'obsmarkers-exchange-debug'):
1861 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1883 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1862 % len(markerdata))
1884 % len(markerdata))
1863 # The mergemarkers call will crash if marker creation is not enabled.
1885 # The mergemarkers call will crash if marker creation is not enabled.
1864 # we want to avoid this if the part is advisory.
1886 # we want to avoid this if the part is advisory.
1865 if not inpart.mandatory and op.repo.obsstore.readonly:
1887 if not inpart.mandatory and op.repo.obsstore.readonly:
1866 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1888 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1867 return
1889 return
1868 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1890 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1869 op.repo.invalidatevolatilesets()
1891 op.repo.invalidatevolatilesets()
1870 if new:
1892 if new:
1871 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1893 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1872 op.records.add('obsmarkers', {'new': new})
1894 op.records.add('obsmarkers', {'new': new})
1873 if op.reply is not None:
1895 if op.reply is not None:
1874 rpart = op.reply.newpart('reply:obsmarkers')
1896 rpart = op.reply.newpart('reply:obsmarkers')
1875 rpart.addparam(
1897 rpart.addparam(
1876 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1898 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1877 rpart.addparam('new', '%i' % new, mandatory=False)
1899 rpart.addparam('new', '%i' % new, mandatory=False)
1878
1900
1879
1901
1880 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1902 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1881 def handleobsmarkerreply(op, inpart):
1903 def handleobsmarkerreply(op, inpart):
1882 """retrieve the result of a pushkey request"""
1904 """retrieve the result of a pushkey request"""
1883 ret = int(inpart.params['new'])
1905 ret = int(inpart.params['new'])
1884 partid = int(inpart.params['in-reply-to'])
1906 partid = int(inpart.params['in-reply-to'])
1885 op.records.add('obsmarkers', {'new': ret}, partid)
1907 op.records.add('obsmarkers', {'new': ret}, partid)
1886
1908
1887 @parthandler('hgtagsfnodes')
1909 @parthandler('hgtagsfnodes')
1888 def handlehgtagsfnodes(op, inpart):
1910 def handlehgtagsfnodes(op, inpart):
1889 """Applies .hgtags fnodes cache entries to the local repo.
1911 """Applies .hgtags fnodes cache entries to the local repo.
1890
1912
1891 Payload is pairs of 20 byte changeset nodes and filenodes.
1913 Payload is pairs of 20 byte changeset nodes and filenodes.
1892 """
1914 """
1893 # Grab the transaction so we ensure that we have the lock at this point.
1915 # Grab the transaction so we ensure that we have the lock at this point.
1894 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1916 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1895 op.gettransaction()
1917 op.gettransaction()
1896 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1918 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1897
1919
1898 count = 0
1920 count = 0
1899 while True:
1921 while True:
1900 node = inpart.read(20)
1922 node = inpart.read(20)
1901 fnode = inpart.read(20)
1923 fnode = inpart.read(20)
1902 if len(node) < 20 or len(fnode) < 20:
1924 if len(node) < 20 or len(fnode) < 20:
1903 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1925 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1904 break
1926 break
1905 cache.setfnode(node, fnode)
1927 cache.setfnode(node, fnode)
1906 count += 1
1928 count += 1
1907
1929
1908 cache.write()
1930 cache.write()
1909 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
1931 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
1910
1932
1911 @parthandler('pushvars')
1933 @parthandler('pushvars')
1912 def bundle2getvars(op, part):
1934 def bundle2getvars(op, part):
1913 '''unbundle a bundle2 containing shellvars on the server'''
1935 '''unbundle a bundle2 containing shellvars on the server'''
1914 # An option to disable unbundling on server-side for security reasons
1936 # An option to disable unbundling on server-side for security reasons
1915 if op.ui.configbool('push', 'pushvars.server'):
1937 if op.ui.configbool('push', 'pushvars.server'):
1916 hookargs = {}
1938 hookargs = {}
1917 for key, value in part.advisoryparams:
1939 for key, value in part.advisoryparams:
1918 key = key.upper()
1940 key = key.upper()
1919 # We want pushed variables to have USERVAR_ prepended so we know
1941 # We want pushed variables to have USERVAR_ prepended so we know
1920 # they came from the --pushvar flag.
1942 # they came from the --pushvar flag.
1921 key = "USERVAR_" + key
1943 key = "USERVAR_" + key
1922 hookargs[key] = value
1944 hookargs[key] = value
1923 op.addhookargs(hookargs)
1945 op.addhookargs(hookargs)
General Comments 0
You need to be logged in to leave comments. Login now