##// END OF EJS Templates
bundle2: use os.SEEK_* constants...
Gregory Szorc -
r35037:241d9cac default
parent child Browse files
Show More
@@ -1,1945 +1,1946
1 # bundle2.py - generic container format to transmit arbitrary data.
1 # bundle2.py - generic container format to transmit arbitrary data.
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7 """Handling of the new bundle2 format
7 """Handling of the new bundle2 format
8
8
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 payloads in an application agnostic way. It consist in a sequence of "parts"
10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 that will be handed to and processed by the application layer.
11 that will be handed to and processed by the application layer.
12
12
13
13
14 General format architecture
14 General format architecture
15 ===========================
15 ===========================
16
16
17 The format is architectured as follow
17 The format is architectured as follow
18
18
19 - magic string
19 - magic string
20 - stream level parameters
20 - stream level parameters
21 - payload parts (any number)
21 - payload parts (any number)
22 - end of stream marker.
22 - end of stream marker.
23
23
24 the Binary format
24 the Binary format
25 ============================
25 ============================
26
26
27 All numbers are unsigned and big-endian.
27 All numbers are unsigned and big-endian.
28
28
29 stream level parameters
29 stream level parameters
30 ------------------------
30 ------------------------
31
31
32 Binary format is as follow
32 Binary format is as follow
33
33
34 :params size: int32
34 :params size: int32
35
35
36 The total number of Bytes used by the parameters
36 The total number of Bytes used by the parameters
37
37
38 :params value: arbitrary number of Bytes
38 :params value: arbitrary number of Bytes
39
39
40 A blob of `params size` containing the serialized version of all stream level
40 A blob of `params size` containing the serialized version of all stream level
41 parameters.
41 parameters.
42
42
43 The blob contains a space separated list of parameters. Parameters with value
43 The blob contains a space separated list of parameters. Parameters with value
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45
45
46 Empty name are obviously forbidden.
46 Empty name are obviously forbidden.
47
47
48 Name MUST start with a letter. If this first letter is lower case, the
48 Name MUST start with a letter. If this first letter is lower case, the
49 parameter is advisory and can be safely ignored. However when the first
49 parameter is advisory and can be safely ignored. However when the first
50 letter is capital, the parameter is mandatory and the bundling process MUST
50 letter is capital, the parameter is mandatory and the bundling process MUST
51 stop if he is not able to proceed it.
51 stop if he is not able to proceed it.
52
52
53 Stream parameters use a simple textual format for two main reasons:
53 Stream parameters use a simple textual format for two main reasons:
54
54
55 - Stream level parameters should remain simple and we want to discourage any
55 - Stream level parameters should remain simple and we want to discourage any
56 crazy usage.
56 crazy usage.
57 - Textual data allow easy human inspection of a bundle2 header in case of
57 - Textual data allow easy human inspection of a bundle2 header in case of
58 troubles.
58 troubles.
59
59
60 Any Applicative level options MUST go into a bundle2 part instead.
60 Any Applicative level options MUST go into a bundle2 part instead.
61
61
62 Payload part
62 Payload part
63 ------------------------
63 ------------------------
64
64
65 Binary format is as follow
65 Binary format is as follow
66
66
67 :header size: int32
67 :header size: int32
68
68
69 The total number of Bytes used by the part header. When the header is empty
69 The total number of Bytes used by the part header. When the header is empty
70 (size = 0) this is interpreted as the end of stream marker.
70 (size = 0) this is interpreted as the end of stream marker.
71
71
72 :header:
72 :header:
73
73
74 The header defines how to interpret the part. It contains two piece of
74 The header defines how to interpret the part. It contains two piece of
75 data: the part type, and the part parameters.
75 data: the part type, and the part parameters.
76
76
77 The part type is used to route an application level handler, that can
77 The part type is used to route an application level handler, that can
78 interpret payload.
78 interpret payload.
79
79
80 Part parameters are passed to the application level handler. They are
80 Part parameters are passed to the application level handler. They are
81 meant to convey information that will help the application level object to
81 meant to convey information that will help the application level object to
82 interpret the part payload.
82 interpret the part payload.
83
83
84 The binary format of the header is has follow
84 The binary format of the header is has follow
85
85
86 :typesize: (one byte)
86 :typesize: (one byte)
87
87
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89
89
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 to this part.
91 to this part.
92
92
93 :parameters:
93 :parameters:
94
94
95 Part's parameter may have arbitrary content, the binary structure is::
95 Part's parameter may have arbitrary content, the binary structure is::
96
96
97 <mandatory-count><advisory-count><param-sizes><param-data>
97 <mandatory-count><advisory-count><param-sizes><param-data>
98
98
99 :mandatory-count: 1 byte, number of mandatory parameters
99 :mandatory-count: 1 byte, number of mandatory parameters
100
100
101 :advisory-count: 1 byte, number of advisory parameters
101 :advisory-count: 1 byte, number of advisory parameters
102
102
103 :param-sizes:
103 :param-sizes:
104
104
105 N couple of bytes, where N is the total number of parameters. Each
105 N couple of bytes, where N is the total number of parameters. Each
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107
107
108 :param-data:
108 :param-data:
109
109
110 A blob of bytes from which each parameter key and value can be
110 A blob of bytes from which each parameter key and value can be
111 retrieved using the list of size couples stored in the previous
111 retrieved using the list of size couples stored in the previous
112 field.
112 field.
113
113
114 Mandatory parameters comes first, then the advisory ones.
114 Mandatory parameters comes first, then the advisory ones.
115
115
116 Each parameter's key MUST be unique within the part.
116 Each parameter's key MUST be unique within the part.
117
117
118 :payload:
118 :payload:
119
119
120 payload is a series of `<chunksize><chunkdata>`.
120 payload is a series of `<chunksize><chunkdata>`.
121
121
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124
124
125 The current implementation always produces either zero or one chunk.
125 The current implementation always produces either zero or one chunk.
126 This is an implementation limitation that will ultimately be lifted.
126 This is an implementation limitation that will ultimately be lifted.
127
127
128 `chunksize` can be negative to trigger special case processing. No such
128 `chunksize` can be negative to trigger special case processing. No such
129 processing is in place yet.
129 processing is in place yet.
130
130
131 Bundle processing
131 Bundle processing
132 ============================
132 ============================
133
133
134 Each part is processed in order using a "part handler". Handler are registered
134 Each part is processed in order using a "part handler". Handler are registered
135 for a certain part type.
135 for a certain part type.
136
136
137 The matching of a part to its handler is case insensitive. The case of the
137 The matching of a part to its handler is case insensitive. The case of the
138 part type is used to know if a part is mandatory or advisory. If the Part type
138 part type is used to know if a part is mandatory or advisory. If the Part type
139 contains any uppercase char it is considered mandatory. When no handler is
139 contains any uppercase char it is considered mandatory. When no handler is
140 known for a Mandatory part, the process is aborted and an exception is raised.
140 known for a Mandatory part, the process is aborted and an exception is raised.
141 If the part is advisory and no handler is known, the part is ignored. When the
141 If the part is advisory and no handler is known, the part is ignored. When the
142 process is aborted, the full bundle is still read from the stream to keep the
142 process is aborted, the full bundle is still read from the stream to keep the
143 channel usable. But none of the part read from an abort are processed. In the
143 channel usable. But none of the part read from an abort are processed. In the
144 future, dropping the stream may become an option for channel we do not care to
144 future, dropping the stream may become an option for channel we do not care to
145 preserve.
145 preserve.
146 """
146 """
147
147
148 from __future__ import absolute_import, division
148 from __future__ import absolute_import, division
149
149
150 import errno
150 import errno
151 import os
151 import re
152 import re
152 import string
153 import string
153 import struct
154 import struct
154 import sys
155 import sys
155
156
156 from .i18n import _
157 from .i18n import _
157 from . import (
158 from . import (
158 changegroup,
159 changegroup,
159 error,
160 error,
160 node as nodemod,
161 node as nodemod,
161 obsolete,
162 obsolete,
162 phases,
163 phases,
163 pushkey,
164 pushkey,
164 pycompat,
165 pycompat,
165 tags,
166 tags,
166 url,
167 url,
167 util,
168 util,
168 )
169 )
169
170
170 urlerr = util.urlerr
171 urlerr = util.urlerr
171 urlreq = util.urlreq
172 urlreq = util.urlreq
172
173
173 _pack = struct.pack
174 _pack = struct.pack
174 _unpack = struct.unpack
175 _unpack = struct.unpack
175
176
176 _fstreamparamsize = '>i'
177 _fstreamparamsize = '>i'
177 _fpartheadersize = '>i'
178 _fpartheadersize = '>i'
178 _fparttypesize = '>B'
179 _fparttypesize = '>B'
179 _fpartid = '>I'
180 _fpartid = '>I'
180 _fpayloadsize = '>i'
181 _fpayloadsize = '>i'
181 _fpartparamcount = '>BB'
182 _fpartparamcount = '>BB'
182
183
183 preferedchunksize = 4096
184 preferedchunksize = 4096
184
185
185 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
186 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
186
187
187 def outdebug(ui, message):
188 def outdebug(ui, message):
188 """debug regarding output stream (bundling)"""
189 """debug regarding output stream (bundling)"""
189 if ui.configbool('devel', 'bundle2.debug'):
190 if ui.configbool('devel', 'bundle2.debug'):
190 ui.debug('bundle2-output: %s\n' % message)
191 ui.debug('bundle2-output: %s\n' % message)
191
192
192 def indebug(ui, message):
193 def indebug(ui, message):
193 """debug on input stream (unbundling)"""
194 """debug on input stream (unbundling)"""
194 if ui.configbool('devel', 'bundle2.debug'):
195 if ui.configbool('devel', 'bundle2.debug'):
195 ui.debug('bundle2-input: %s\n' % message)
196 ui.debug('bundle2-input: %s\n' % message)
196
197
197 def validateparttype(parttype):
198 def validateparttype(parttype):
198 """raise ValueError if a parttype contains invalid character"""
199 """raise ValueError if a parttype contains invalid character"""
199 if _parttypeforbidden.search(parttype):
200 if _parttypeforbidden.search(parttype):
200 raise ValueError(parttype)
201 raise ValueError(parttype)
201
202
202 def _makefpartparamsizes(nbparams):
203 def _makefpartparamsizes(nbparams):
203 """return a struct format to read part parameter sizes
204 """return a struct format to read part parameter sizes
204
205
205 The number parameters is variable so we need to build that format
206 The number parameters is variable so we need to build that format
206 dynamically.
207 dynamically.
207 """
208 """
208 return '>'+('BB'*nbparams)
209 return '>'+('BB'*nbparams)
209
210
210 parthandlermapping = {}
211 parthandlermapping = {}
211
212
212 def parthandler(parttype, params=()):
213 def parthandler(parttype, params=()):
213 """decorator that register a function as a bundle2 part handler
214 """decorator that register a function as a bundle2 part handler
214
215
215 eg::
216 eg::
216
217
217 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
218 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
218 def myparttypehandler(...):
219 def myparttypehandler(...):
219 '''process a part of type "my part".'''
220 '''process a part of type "my part".'''
220 ...
221 ...
221 """
222 """
222 validateparttype(parttype)
223 validateparttype(parttype)
223 def _decorator(func):
224 def _decorator(func):
224 lparttype = parttype.lower() # enforce lower case matching.
225 lparttype = parttype.lower() # enforce lower case matching.
225 assert lparttype not in parthandlermapping
226 assert lparttype not in parthandlermapping
226 parthandlermapping[lparttype] = func
227 parthandlermapping[lparttype] = func
227 func.params = frozenset(params)
228 func.params = frozenset(params)
228 return func
229 return func
229 return _decorator
230 return _decorator
230
231
231 class unbundlerecords(object):
232 class unbundlerecords(object):
232 """keep record of what happens during and unbundle
233 """keep record of what happens during and unbundle
233
234
234 New records are added using `records.add('cat', obj)`. Where 'cat' is a
235 New records are added using `records.add('cat', obj)`. Where 'cat' is a
235 category of record and obj is an arbitrary object.
236 category of record and obj is an arbitrary object.
236
237
237 `records['cat']` will return all entries of this category 'cat'.
238 `records['cat']` will return all entries of this category 'cat'.
238
239
239 Iterating on the object itself will yield `('category', obj)` tuples
240 Iterating on the object itself will yield `('category', obj)` tuples
240 for all entries.
241 for all entries.
241
242
242 All iterations happens in chronological order.
243 All iterations happens in chronological order.
243 """
244 """
244
245
245 def __init__(self):
246 def __init__(self):
246 self._categories = {}
247 self._categories = {}
247 self._sequences = []
248 self._sequences = []
248 self._replies = {}
249 self._replies = {}
249
250
250 def add(self, category, entry, inreplyto=None):
251 def add(self, category, entry, inreplyto=None):
251 """add a new record of a given category.
252 """add a new record of a given category.
252
253
253 The entry can then be retrieved in the list returned by
254 The entry can then be retrieved in the list returned by
254 self['category']."""
255 self['category']."""
255 self._categories.setdefault(category, []).append(entry)
256 self._categories.setdefault(category, []).append(entry)
256 self._sequences.append((category, entry))
257 self._sequences.append((category, entry))
257 if inreplyto is not None:
258 if inreplyto is not None:
258 self.getreplies(inreplyto).add(category, entry)
259 self.getreplies(inreplyto).add(category, entry)
259
260
260 def getreplies(self, partid):
261 def getreplies(self, partid):
261 """get the records that are replies to a specific part"""
262 """get the records that are replies to a specific part"""
262 return self._replies.setdefault(partid, unbundlerecords())
263 return self._replies.setdefault(partid, unbundlerecords())
263
264
264 def __getitem__(self, cat):
265 def __getitem__(self, cat):
265 return tuple(self._categories.get(cat, ()))
266 return tuple(self._categories.get(cat, ()))
266
267
267 def __iter__(self):
268 def __iter__(self):
268 return iter(self._sequences)
269 return iter(self._sequences)
269
270
270 def __len__(self):
271 def __len__(self):
271 return len(self._sequences)
272 return len(self._sequences)
272
273
273 def __nonzero__(self):
274 def __nonzero__(self):
274 return bool(self._sequences)
275 return bool(self._sequences)
275
276
276 __bool__ = __nonzero__
277 __bool__ = __nonzero__
277
278
278 class bundleoperation(object):
279 class bundleoperation(object):
279 """an object that represents a single bundling process
280 """an object that represents a single bundling process
280
281
281 Its purpose is to carry unbundle-related objects and states.
282 Its purpose is to carry unbundle-related objects and states.
282
283
283 A new object should be created at the beginning of each bundle processing.
284 A new object should be created at the beginning of each bundle processing.
284 The object is to be returned by the processing function.
285 The object is to be returned by the processing function.
285
286
286 The object has very little content now it will ultimately contain:
287 The object has very little content now it will ultimately contain:
287 * an access to the repo the bundle is applied to,
288 * an access to the repo the bundle is applied to,
288 * a ui object,
289 * a ui object,
289 * a way to retrieve a transaction to add changes to the repo,
290 * a way to retrieve a transaction to add changes to the repo,
290 * a way to record the result of processing each part,
291 * a way to record the result of processing each part,
291 * a way to construct a bundle response when applicable.
292 * a way to construct a bundle response when applicable.
292 """
293 """
293
294
294 def __init__(self, repo, transactiongetter, captureoutput=True):
295 def __init__(self, repo, transactiongetter, captureoutput=True):
295 self.repo = repo
296 self.repo = repo
296 self.ui = repo.ui
297 self.ui = repo.ui
297 self.records = unbundlerecords()
298 self.records = unbundlerecords()
298 self.reply = None
299 self.reply = None
299 self.captureoutput = captureoutput
300 self.captureoutput = captureoutput
300 self.hookargs = {}
301 self.hookargs = {}
301 self._gettransaction = transactiongetter
302 self._gettransaction = transactiongetter
302
303
303 def gettransaction(self):
304 def gettransaction(self):
304 transaction = self._gettransaction()
305 transaction = self._gettransaction()
305
306
306 if self.hookargs:
307 if self.hookargs:
307 # the ones added to the transaction supercede those added
308 # the ones added to the transaction supercede those added
308 # to the operation.
309 # to the operation.
309 self.hookargs.update(transaction.hookargs)
310 self.hookargs.update(transaction.hookargs)
310 transaction.hookargs = self.hookargs
311 transaction.hookargs = self.hookargs
311
312
312 # mark the hookargs as flushed. further attempts to add to
313 # mark the hookargs as flushed. further attempts to add to
313 # hookargs will result in an abort.
314 # hookargs will result in an abort.
314 self.hookargs = None
315 self.hookargs = None
315
316
316 return transaction
317 return transaction
317
318
318 def addhookargs(self, hookargs):
319 def addhookargs(self, hookargs):
319 if self.hookargs is None:
320 if self.hookargs is None:
320 raise error.ProgrammingError('attempted to add hookargs to '
321 raise error.ProgrammingError('attempted to add hookargs to '
321 'operation after transaction started')
322 'operation after transaction started')
322 self.hookargs.update(hookargs)
323 self.hookargs.update(hookargs)
323
324
324 class TransactionUnavailable(RuntimeError):
325 class TransactionUnavailable(RuntimeError):
325 pass
326 pass
326
327
327 def _notransaction():
328 def _notransaction():
328 """default method to get a transaction while processing a bundle
329 """default method to get a transaction while processing a bundle
329
330
330 Raise an exception to highlight the fact that no transaction was expected
331 Raise an exception to highlight the fact that no transaction was expected
331 to be created"""
332 to be created"""
332 raise TransactionUnavailable()
333 raise TransactionUnavailable()
333
334
334 def applybundle(repo, unbundler, tr, source=None, url=None, **kwargs):
335 def applybundle(repo, unbundler, tr, source=None, url=None, **kwargs):
335 # transform me into unbundler.apply() as soon as the freeze is lifted
336 # transform me into unbundler.apply() as soon as the freeze is lifted
336 if isinstance(unbundler, unbundle20):
337 if isinstance(unbundler, unbundle20):
337 tr.hookargs['bundle2'] = '1'
338 tr.hookargs['bundle2'] = '1'
338 if source is not None and 'source' not in tr.hookargs:
339 if source is not None and 'source' not in tr.hookargs:
339 tr.hookargs['source'] = source
340 tr.hookargs['source'] = source
340 if url is not None and 'url' not in tr.hookargs:
341 if url is not None and 'url' not in tr.hookargs:
341 tr.hookargs['url'] = url
342 tr.hookargs['url'] = url
342 return processbundle(repo, unbundler, lambda: tr)
343 return processbundle(repo, unbundler, lambda: tr)
343 else:
344 else:
344 # the transactiongetter won't be used, but we might as well set it
345 # the transactiongetter won't be used, but we might as well set it
345 op = bundleoperation(repo, lambda: tr)
346 op = bundleoperation(repo, lambda: tr)
346 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
347 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
347 return op
348 return op
348
349
349 class partiterator(object):
350 class partiterator(object):
350 def __init__(self, repo, op, unbundler):
351 def __init__(self, repo, op, unbundler):
351 self.repo = repo
352 self.repo = repo
352 self.op = op
353 self.op = op
353 self.unbundler = unbundler
354 self.unbundler = unbundler
354 self.iterator = None
355 self.iterator = None
355 self.count = 0
356 self.count = 0
356 self.current = None
357 self.current = None
357
358
358 def __enter__(self):
359 def __enter__(self):
359 def func():
360 def func():
360 itr = enumerate(self.unbundler.iterparts())
361 itr = enumerate(self.unbundler.iterparts())
361 for count, p in itr:
362 for count, p in itr:
362 self.count = count
363 self.count = count
363 self.current = p
364 self.current = p
364 yield p
365 yield p
365 p.seek(0, 2)
366 p.seek(0, os.SEEK_END)
366 self.current = None
367 self.current = None
367 self.iterator = func()
368 self.iterator = func()
368 return self.iterator
369 return self.iterator
369
370
370 def __exit__(self, type, exc, tb):
371 def __exit__(self, type, exc, tb):
371 if not self.iterator:
372 if not self.iterator:
372 return
373 return
373
374
374 # Only gracefully abort in a normal exception situation. User aborts
375 # Only gracefully abort in a normal exception situation. User aborts
375 # like Ctrl+C throw a KeyboardInterrupt which is not a base Exception,
376 # like Ctrl+C throw a KeyboardInterrupt which is not a base Exception,
376 # and should not gracefully cleanup.
377 # and should not gracefully cleanup.
377 if isinstance(exc, Exception):
378 if isinstance(exc, Exception):
378 # Any exceptions seeking to the end of the bundle at this point are
379 # Any exceptions seeking to the end of the bundle at this point are
379 # almost certainly related to the underlying stream being bad.
380 # almost certainly related to the underlying stream being bad.
380 # And, chances are that the exception we're handling is related to
381 # And, chances are that the exception we're handling is related to
381 # getting in that bad state. So, we swallow the seeking error and
382 # getting in that bad state. So, we swallow the seeking error and
382 # re-raise the original error.
383 # re-raise the original error.
383 seekerror = False
384 seekerror = False
384 try:
385 try:
385 if self.current:
386 if self.current:
386 # consume the part content to not corrupt the stream.
387 # consume the part content to not corrupt the stream.
387 self.current.seek(0, 2)
388 self.current.seek(0, os.SEEK_END)
388
389
389 for part in self.iterator:
390 for part in self.iterator:
390 # consume the bundle content
391 # consume the bundle content
391 part.seek(0, 2)
392 part.seek(0, os.SEEK_END)
392 except Exception:
393 except Exception:
393 seekerror = True
394 seekerror = True
394
395
395 # Small hack to let caller code distinguish exceptions from bundle2
396 # Small hack to let caller code distinguish exceptions from bundle2
396 # processing from processing the old format. This is mostly needed
397 # processing from processing the old format. This is mostly needed
397 # to handle different return codes to unbundle according to the type
398 # to handle different return codes to unbundle according to the type
398 # of bundle. We should probably clean up or drop this return code
399 # of bundle. We should probably clean up or drop this return code
399 # craziness in a future version.
400 # craziness in a future version.
400 exc.duringunbundle2 = True
401 exc.duringunbundle2 = True
401 salvaged = []
402 salvaged = []
402 replycaps = None
403 replycaps = None
403 if self.op.reply is not None:
404 if self.op.reply is not None:
404 salvaged = self.op.reply.salvageoutput()
405 salvaged = self.op.reply.salvageoutput()
405 replycaps = self.op.reply.capabilities
406 replycaps = self.op.reply.capabilities
406 exc._replycaps = replycaps
407 exc._replycaps = replycaps
407 exc._bundle2salvagedoutput = salvaged
408 exc._bundle2salvagedoutput = salvaged
408
409
409 # Re-raising from a variable loses the original stack. So only use
410 # Re-raising from a variable loses the original stack. So only use
410 # that form if we need to.
411 # that form if we need to.
411 if seekerror:
412 if seekerror:
412 raise exc
413 raise exc
413
414
414 self.repo.ui.debug('bundle2-input-bundle: %i parts total\n' %
415 self.repo.ui.debug('bundle2-input-bundle: %i parts total\n' %
415 self.count)
416 self.count)
416
417
417 def processbundle(repo, unbundler, transactiongetter=None, op=None):
418 def processbundle(repo, unbundler, transactiongetter=None, op=None):
418 """This function process a bundle, apply effect to/from a repo
419 """This function process a bundle, apply effect to/from a repo
419
420
420 It iterates over each part then searches for and uses the proper handling
421 It iterates over each part then searches for and uses the proper handling
421 code to process the part. Parts are processed in order.
422 code to process the part. Parts are processed in order.
422
423
423 Unknown Mandatory part will abort the process.
424 Unknown Mandatory part will abort the process.
424
425
425 It is temporarily possible to provide a prebuilt bundleoperation to the
426 It is temporarily possible to provide a prebuilt bundleoperation to the
426 function. This is used to ensure output is properly propagated in case of
427 function. This is used to ensure output is properly propagated in case of
427 an error during the unbundling. This output capturing part will likely be
428 an error during the unbundling. This output capturing part will likely be
428 reworked and this ability will probably go away in the process.
429 reworked and this ability will probably go away in the process.
429 """
430 """
430 if op is None:
431 if op is None:
431 if transactiongetter is None:
432 if transactiongetter is None:
432 transactiongetter = _notransaction
433 transactiongetter = _notransaction
433 op = bundleoperation(repo, transactiongetter)
434 op = bundleoperation(repo, transactiongetter)
434 # todo:
435 # todo:
435 # - replace this is a init function soon.
436 # - replace this is a init function soon.
436 # - exception catching
437 # - exception catching
437 unbundler.params
438 unbundler.params
438 if repo.ui.debugflag:
439 if repo.ui.debugflag:
439 msg = ['bundle2-input-bundle:']
440 msg = ['bundle2-input-bundle:']
440 if unbundler.params:
441 if unbundler.params:
441 msg.append(' %i params' % len(unbundler.params))
442 msg.append(' %i params' % len(unbundler.params))
442 if op._gettransaction is None or op._gettransaction is _notransaction:
443 if op._gettransaction is None or op._gettransaction is _notransaction:
443 msg.append(' no-transaction')
444 msg.append(' no-transaction')
444 else:
445 else:
445 msg.append(' with-transaction')
446 msg.append(' with-transaction')
446 msg.append('\n')
447 msg.append('\n')
447 repo.ui.debug(''.join(msg))
448 repo.ui.debug(''.join(msg))
448
449
449 processparts(repo, op, unbundler)
450 processparts(repo, op, unbundler)
450
451
451 return op
452 return op
452
453
453 def processparts(repo, op, unbundler):
454 def processparts(repo, op, unbundler):
454 with partiterator(repo, op, unbundler) as parts:
455 with partiterator(repo, op, unbundler) as parts:
455 for part in parts:
456 for part in parts:
456 _processpart(op, part)
457 _processpart(op, part)
457
458
458 def _processchangegroup(op, cg, tr, source, url, **kwargs):
459 def _processchangegroup(op, cg, tr, source, url, **kwargs):
459 ret = cg.apply(op.repo, tr, source, url, **kwargs)
460 ret = cg.apply(op.repo, tr, source, url, **kwargs)
460 op.records.add('changegroup', {
461 op.records.add('changegroup', {
461 'return': ret,
462 'return': ret,
462 })
463 })
463 return ret
464 return ret
464
465
465 def _gethandler(op, part):
466 def _gethandler(op, part):
466 status = 'unknown' # used by debug output
467 status = 'unknown' # used by debug output
467 try:
468 try:
468 handler = parthandlermapping.get(part.type)
469 handler = parthandlermapping.get(part.type)
469 if handler is None:
470 if handler is None:
470 status = 'unsupported-type'
471 status = 'unsupported-type'
471 raise error.BundleUnknownFeatureError(parttype=part.type)
472 raise error.BundleUnknownFeatureError(parttype=part.type)
472 indebug(op.ui, 'found a handler for part %s' % part.type)
473 indebug(op.ui, 'found a handler for part %s' % part.type)
473 unknownparams = part.mandatorykeys - handler.params
474 unknownparams = part.mandatorykeys - handler.params
474 if unknownparams:
475 if unknownparams:
475 unknownparams = list(unknownparams)
476 unknownparams = list(unknownparams)
476 unknownparams.sort()
477 unknownparams.sort()
477 status = 'unsupported-params (%s)' % ', '.join(unknownparams)
478 status = 'unsupported-params (%s)' % ', '.join(unknownparams)
478 raise error.BundleUnknownFeatureError(parttype=part.type,
479 raise error.BundleUnknownFeatureError(parttype=part.type,
479 params=unknownparams)
480 params=unknownparams)
480 status = 'supported'
481 status = 'supported'
481 except error.BundleUnknownFeatureError as exc:
482 except error.BundleUnknownFeatureError as exc:
482 if part.mandatory: # mandatory parts
483 if part.mandatory: # mandatory parts
483 raise
484 raise
484 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
485 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
485 return # skip to part processing
486 return # skip to part processing
486 finally:
487 finally:
487 if op.ui.debugflag:
488 if op.ui.debugflag:
488 msg = ['bundle2-input-part: "%s"' % part.type]
489 msg = ['bundle2-input-part: "%s"' % part.type]
489 if not part.mandatory:
490 if not part.mandatory:
490 msg.append(' (advisory)')
491 msg.append(' (advisory)')
491 nbmp = len(part.mandatorykeys)
492 nbmp = len(part.mandatorykeys)
492 nbap = len(part.params) - nbmp
493 nbap = len(part.params) - nbmp
493 if nbmp or nbap:
494 if nbmp or nbap:
494 msg.append(' (params:')
495 msg.append(' (params:')
495 if nbmp:
496 if nbmp:
496 msg.append(' %i mandatory' % nbmp)
497 msg.append(' %i mandatory' % nbmp)
497 if nbap:
498 if nbap:
498 msg.append(' %i advisory' % nbmp)
499 msg.append(' %i advisory' % nbmp)
499 msg.append(')')
500 msg.append(')')
500 msg.append(' %s\n' % status)
501 msg.append(' %s\n' % status)
501 op.ui.debug(''.join(msg))
502 op.ui.debug(''.join(msg))
502
503
503 return handler
504 return handler
504
505
505 def _processpart(op, part):
506 def _processpart(op, part):
506 """process a single part from a bundle
507 """process a single part from a bundle
507
508
508 The part is guaranteed to have been fully consumed when the function exits
509 The part is guaranteed to have been fully consumed when the function exits
509 (even if an exception is raised)."""
510 (even if an exception is raised)."""
510 handler = _gethandler(op, part)
511 handler = _gethandler(op, part)
511 if handler is None:
512 if handler is None:
512 return
513 return
513
514
514 # handler is called outside the above try block so that we don't
515 # handler is called outside the above try block so that we don't
515 # risk catching KeyErrors from anything other than the
516 # risk catching KeyErrors from anything other than the
516 # parthandlermapping lookup (any KeyError raised by handler()
517 # parthandlermapping lookup (any KeyError raised by handler()
517 # itself represents a defect of a different variety).
518 # itself represents a defect of a different variety).
518 output = None
519 output = None
519 if op.captureoutput and op.reply is not None:
520 if op.captureoutput and op.reply is not None:
520 op.ui.pushbuffer(error=True, subproc=True)
521 op.ui.pushbuffer(error=True, subproc=True)
521 output = ''
522 output = ''
522 try:
523 try:
523 handler(op, part)
524 handler(op, part)
524 finally:
525 finally:
525 if output is not None:
526 if output is not None:
526 output = op.ui.popbuffer()
527 output = op.ui.popbuffer()
527 if output:
528 if output:
528 outpart = op.reply.newpart('output', data=output,
529 outpart = op.reply.newpart('output', data=output,
529 mandatory=False)
530 mandatory=False)
530 outpart.addparam(
531 outpart.addparam(
531 'in-reply-to', pycompat.bytestr(part.id), mandatory=False)
532 'in-reply-to', pycompat.bytestr(part.id), mandatory=False)
532
533
533 def decodecaps(blob):
534 def decodecaps(blob):
534 """decode a bundle2 caps bytes blob into a dictionary
535 """decode a bundle2 caps bytes blob into a dictionary
535
536
536 The blob is a list of capabilities (one per line)
537 The blob is a list of capabilities (one per line)
537 Capabilities may have values using a line of the form::
538 Capabilities may have values using a line of the form::
538
539
539 capability=value1,value2,value3
540 capability=value1,value2,value3
540
541
541 The values are always a list."""
542 The values are always a list."""
542 caps = {}
543 caps = {}
543 for line in blob.splitlines():
544 for line in blob.splitlines():
544 if not line:
545 if not line:
545 continue
546 continue
546 if '=' not in line:
547 if '=' not in line:
547 key, vals = line, ()
548 key, vals = line, ()
548 else:
549 else:
549 key, vals = line.split('=', 1)
550 key, vals = line.split('=', 1)
550 vals = vals.split(',')
551 vals = vals.split(',')
551 key = urlreq.unquote(key)
552 key = urlreq.unquote(key)
552 vals = [urlreq.unquote(v) for v in vals]
553 vals = [urlreq.unquote(v) for v in vals]
553 caps[key] = vals
554 caps[key] = vals
554 return caps
555 return caps
555
556
556 def encodecaps(caps):
557 def encodecaps(caps):
557 """encode a bundle2 caps dictionary into a bytes blob"""
558 """encode a bundle2 caps dictionary into a bytes blob"""
558 chunks = []
559 chunks = []
559 for ca in sorted(caps):
560 for ca in sorted(caps):
560 vals = caps[ca]
561 vals = caps[ca]
561 ca = urlreq.quote(ca)
562 ca = urlreq.quote(ca)
562 vals = [urlreq.quote(v) for v in vals]
563 vals = [urlreq.quote(v) for v in vals]
563 if vals:
564 if vals:
564 ca = "%s=%s" % (ca, ','.join(vals))
565 ca = "%s=%s" % (ca, ','.join(vals))
565 chunks.append(ca)
566 chunks.append(ca)
566 return '\n'.join(chunks)
567 return '\n'.join(chunks)
567
568
568 bundletypes = {
569 bundletypes = {
569 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
570 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
570 # since the unification ssh accepts a header but there
571 # since the unification ssh accepts a header but there
571 # is no capability signaling it.
572 # is no capability signaling it.
572 "HG20": (), # special-cased below
573 "HG20": (), # special-cased below
573 "HG10UN": ("HG10UN", 'UN'),
574 "HG10UN": ("HG10UN", 'UN'),
574 "HG10BZ": ("HG10", 'BZ'),
575 "HG10BZ": ("HG10", 'BZ'),
575 "HG10GZ": ("HG10GZ", 'GZ'),
576 "HG10GZ": ("HG10GZ", 'GZ'),
576 }
577 }
577
578
578 # hgweb uses this list to communicate its preferred type
579 # hgweb uses this list to communicate its preferred type
579 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
580 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
580
581
581 class bundle20(object):
582 class bundle20(object):
582 """represent an outgoing bundle2 container
583 """represent an outgoing bundle2 container
583
584
584 Use the `addparam` method to add stream level parameter. and `newpart` to
585 Use the `addparam` method to add stream level parameter. and `newpart` to
585 populate it. Then call `getchunks` to retrieve all the binary chunks of
586 populate it. Then call `getchunks` to retrieve all the binary chunks of
586 data that compose the bundle2 container."""
587 data that compose the bundle2 container."""
587
588
588 _magicstring = 'HG20'
589 _magicstring = 'HG20'
589
590
590 def __init__(self, ui, capabilities=()):
591 def __init__(self, ui, capabilities=()):
591 self.ui = ui
592 self.ui = ui
592 self._params = []
593 self._params = []
593 self._parts = []
594 self._parts = []
594 self.capabilities = dict(capabilities)
595 self.capabilities = dict(capabilities)
595 self._compengine = util.compengines.forbundletype('UN')
596 self._compengine = util.compengines.forbundletype('UN')
596 self._compopts = None
597 self._compopts = None
597
598
598 def setcompression(self, alg, compopts=None):
599 def setcompression(self, alg, compopts=None):
599 """setup core part compression to <alg>"""
600 """setup core part compression to <alg>"""
600 if alg in (None, 'UN'):
601 if alg in (None, 'UN'):
601 return
602 return
602 assert not any(n.lower() == 'compression' for n, v in self._params)
603 assert not any(n.lower() == 'compression' for n, v in self._params)
603 self.addparam('Compression', alg)
604 self.addparam('Compression', alg)
604 self._compengine = util.compengines.forbundletype(alg)
605 self._compengine = util.compengines.forbundletype(alg)
605 self._compopts = compopts
606 self._compopts = compopts
606
607
607 @property
608 @property
608 def nbparts(self):
609 def nbparts(self):
609 """total number of parts added to the bundler"""
610 """total number of parts added to the bundler"""
610 return len(self._parts)
611 return len(self._parts)
611
612
612 # methods used to defines the bundle2 content
613 # methods used to defines the bundle2 content
613 def addparam(self, name, value=None):
614 def addparam(self, name, value=None):
614 """add a stream level parameter"""
615 """add a stream level parameter"""
615 if not name:
616 if not name:
616 raise ValueError(r'empty parameter name')
617 raise ValueError(r'empty parameter name')
617 if name[0:1] not in pycompat.bytestr(string.ascii_letters):
618 if name[0:1] not in pycompat.bytestr(string.ascii_letters):
618 raise ValueError(r'non letter first character: %s' % name)
619 raise ValueError(r'non letter first character: %s' % name)
619 self._params.append((name, value))
620 self._params.append((name, value))
620
621
621 def addpart(self, part):
622 def addpart(self, part):
622 """add a new part to the bundle2 container
623 """add a new part to the bundle2 container
623
624
624 Parts contains the actual applicative payload."""
625 Parts contains the actual applicative payload."""
625 assert part.id is None
626 assert part.id is None
626 part.id = len(self._parts) # very cheap counter
627 part.id = len(self._parts) # very cheap counter
627 self._parts.append(part)
628 self._parts.append(part)
628
629
629 def newpart(self, typeid, *args, **kwargs):
630 def newpart(self, typeid, *args, **kwargs):
630 """create a new part and add it to the containers
631 """create a new part and add it to the containers
631
632
632 As the part is directly added to the containers. For now, this means
633 As the part is directly added to the containers. For now, this means
633 that any failure to properly initialize the part after calling
634 that any failure to properly initialize the part after calling
634 ``newpart`` should result in a failure of the whole bundling process.
635 ``newpart`` should result in a failure of the whole bundling process.
635
636
636 You can still fall back to manually create and add if you need better
637 You can still fall back to manually create and add if you need better
637 control."""
638 control."""
638 part = bundlepart(typeid, *args, **kwargs)
639 part = bundlepart(typeid, *args, **kwargs)
639 self.addpart(part)
640 self.addpart(part)
640 return part
641 return part
641
642
642 # methods used to generate the bundle2 stream
643 # methods used to generate the bundle2 stream
643 def getchunks(self):
644 def getchunks(self):
644 if self.ui.debugflag:
645 if self.ui.debugflag:
645 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
646 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
646 if self._params:
647 if self._params:
647 msg.append(' (%i params)' % len(self._params))
648 msg.append(' (%i params)' % len(self._params))
648 msg.append(' %i parts total\n' % len(self._parts))
649 msg.append(' %i parts total\n' % len(self._parts))
649 self.ui.debug(''.join(msg))
650 self.ui.debug(''.join(msg))
650 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
651 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
651 yield self._magicstring
652 yield self._magicstring
652 param = self._paramchunk()
653 param = self._paramchunk()
653 outdebug(self.ui, 'bundle parameter: %s' % param)
654 outdebug(self.ui, 'bundle parameter: %s' % param)
654 yield _pack(_fstreamparamsize, len(param))
655 yield _pack(_fstreamparamsize, len(param))
655 if param:
656 if param:
656 yield param
657 yield param
657 for chunk in self._compengine.compressstream(self._getcorechunk(),
658 for chunk in self._compengine.compressstream(self._getcorechunk(),
658 self._compopts):
659 self._compopts):
659 yield chunk
660 yield chunk
660
661
661 def _paramchunk(self):
662 def _paramchunk(self):
662 """return a encoded version of all stream parameters"""
663 """return a encoded version of all stream parameters"""
663 blocks = []
664 blocks = []
664 for par, value in self._params:
665 for par, value in self._params:
665 par = urlreq.quote(par)
666 par = urlreq.quote(par)
666 if value is not None:
667 if value is not None:
667 value = urlreq.quote(value)
668 value = urlreq.quote(value)
668 par = '%s=%s' % (par, value)
669 par = '%s=%s' % (par, value)
669 blocks.append(par)
670 blocks.append(par)
670 return ' '.join(blocks)
671 return ' '.join(blocks)
671
672
672 def _getcorechunk(self):
673 def _getcorechunk(self):
673 """yield chunk for the core part of the bundle
674 """yield chunk for the core part of the bundle
674
675
675 (all but headers and parameters)"""
676 (all but headers and parameters)"""
676 outdebug(self.ui, 'start of parts')
677 outdebug(self.ui, 'start of parts')
677 for part in self._parts:
678 for part in self._parts:
678 outdebug(self.ui, 'bundle part: "%s"' % part.type)
679 outdebug(self.ui, 'bundle part: "%s"' % part.type)
679 for chunk in part.getchunks(ui=self.ui):
680 for chunk in part.getchunks(ui=self.ui):
680 yield chunk
681 yield chunk
681 outdebug(self.ui, 'end of bundle')
682 outdebug(self.ui, 'end of bundle')
682 yield _pack(_fpartheadersize, 0)
683 yield _pack(_fpartheadersize, 0)
683
684
684
685
685 def salvageoutput(self):
686 def salvageoutput(self):
686 """return a list with a copy of all output parts in the bundle
687 """return a list with a copy of all output parts in the bundle
687
688
688 This is meant to be used during error handling to make sure we preserve
689 This is meant to be used during error handling to make sure we preserve
689 server output"""
690 server output"""
690 salvaged = []
691 salvaged = []
691 for part in self._parts:
692 for part in self._parts:
692 if part.type.startswith('output'):
693 if part.type.startswith('output'):
693 salvaged.append(part.copy())
694 salvaged.append(part.copy())
694 return salvaged
695 return salvaged
695
696
696
697
697 class unpackermixin(object):
698 class unpackermixin(object):
698 """A mixin to extract bytes and struct data from a stream"""
699 """A mixin to extract bytes and struct data from a stream"""
699
700
700 def __init__(self, fp):
701 def __init__(self, fp):
701 self._fp = fp
702 self._fp = fp
702
703
703 def _unpack(self, format):
704 def _unpack(self, format):
704 """unpack this struct format from the stream
705 """unpack this struct format from the stream
705
706
706 This method is meant for internal usage by the bundle2 protocol only.
707 This method is meant for internal usage by the bundle2 protocol only.
707 They directly manipulate the low level stream including bundle2 level
708 They directly manipulate the low level stream including bundle2 level
708 instruction.
709 instruction.
709
710
710 Do not use it to implement higher-level logic or methods."""
711 Do not use it to implement higher-level logic or methods."""
711 data = self._readexact(struct.calcsize(format))
712 data = self._readexact(struct.calcsize(format))
712 return _unpack(format, data)
713 return _unpack(format, data)
713
714
714 def _readexact(self, size):
715 def _readexact(self, size):
715 """read exactly <size> bytes from the stream
716 """read exactly <size> bytes from the stream
716
717
717 This method is meant for internal usage by the bundle2 protocol only.
718 This method is meant for internal usage by the bundle2 protocol only.
718 They directly manipulate the low level stream including bundle2 level
719 They directly manipulate the low level stream including bundle2 level
719 instruction.
720 instruction.
720
721
721 Do not use it to implement higher-level logic or methods."""
722 Do not use it to implement higher-level logic or methods."""
722 return changegroup.readexactly(self._fp, size)
723 return changegroup.readexactly(self._fp, size)
723
724
724 def getunbundler(ui, fp, magicstring=None):
725 def getunbundler(ui, fp, magicstring=None):
725 """return a valid unbundler object for a given magicstring"""
726 """return a valid unbundler object for a given magicstring"""
726 if magicstring is None:
727 if magicstring is None:
727 magicstring = changegroup.readexactly(fp, 4)
728 magicstring = changegroup.readexactly(fp, 4)
728 magic, version = magicstring[0:2], magicstring[2:4]
729 magic, version = magicstring[0:2], magicstring[2:4]
729 if magic != 'HG':
730 if magic != 'HG':
730 ui.debug(
731 ui.debug(
731 "error: invalid magic: %r (version %r), should be 'HG'\n"
732 "error: invalid magic: %r (version %r), should be 'HG'\n"
732 % (magic, version))
733 % (magic, version))
733 raise error.Abort(_('not a Mercurial bundle'))
734 raise error.Abort(_('not a Mercurial bundle'))
734 unbundlerclass = formatmap.get(version)
735 unbundlerclass = formatmap.get(version)
735 if unbundlerclass is None:
736 if unbundlerclass is None:
736 raise error.Abort(_('unknown bundle version %s') % version)
737 raise error.Abort(_('unknown bundle version %s') % version)
737 unbundler = unbundlerclass(ui, fp)
738 unbundler = unbundlerclass(ui, fp)
738 indebug(ui, 'start processing of %s stream' % magicstring)
739 indebug(ui, 'start processing of %s stream' % magicstring)
739 return unbundler
740 return unbundler
740
741
741 class unbundle20(unpackermixin):
742 class unbundle20(unpackermixin):
742 """interpret a bundle2 stream
743 """interpret a bundle2 stream
743
744
744 This class is fed with a binary stream and yields parts through its
745 This class is fed with a binary stream and yields parts through its
745 `iterparts` methods."""
746 `iterparts` methods."""
746
747
747 _magicstring = 'HG20'
748 _magicstring = 'HG20'
748
749
749 def __init__(self, ui, fp):
750 def __init__(self, ui, fp):
750 """If header is specified, we do not read it out of the stream."""
751 """If header is specified, we do not read it out of the stream."""
751 self.ui = ui
752 self.ui = ui
752 self._compengine = util.compengines.forbundletype('UN')
753 self._compengine = util.compengines.forbundletype('UN')
753 self._compressed = None
754 self._compressed = None
754 super(unbundle20, self).__init__(fp)
755 super(unbundle20, self).__init__(fp)
755
756
756 @util.propertycache
757 @util.propertycache
757 def params(self):
758 def params(self):
758 """dictionary of stream level parameters"""
759 """dictionary of stream level parameters"""
759 indebug(self.ui, 'reading bundle2 stream parameters')
760 indebug(self.ui, 'reading bundle2 stream parameters')
760 params = {}
761 params = {}
761 paramssize = self._unpack(_fstreamparamsize)[0]
762 paramssize = self._unpack(_fstreamparamsize)[0]
762 if paramssize < 0:
763 if paramssize < 0:
763 raise error.BundleValueError('negative bundle param size: %i'
764 raise error.BundleValueError('negative bundle param size: %i'
764 % paramssize)
765 % paramssize)
765 if paramssize:
766 if paramssize:
766 params = self._readexact(paramssize)
767 params = self._readexact(paramssize)
767 params = self._processallparams(params)
768 params = self._processallparams(params)
768 return params
769 return params
769
770
770 def _processallparams(self, paramsblock):
771 def _processallparams(self, paramsblock):
771 """"""
772 """"""
772 params = util.sortdict()
773 params = util.sortdict()
773 for p in paramsblock.split(' '):
774 for p in paramsblock.split(' '):
774 p = p.split('=', 1)
775 p = p.split('=', 1)
775 p = [urlreq.unquote(i) for i in p]
776 p = [urlreq.unquote(i) for i in p]
776 if len(p) < 2:
777 if len(p) < 2:
777 p.append(None)
778 p.append(None)
778 self._processparam(*p)
779 self._processparam(*p)
779 params[p[0]] = p[1]
780 params[p[0]] = p[1]
780 return params
781 return params
781
782
782
783
783 def _processparam(self, name, value):
784 def _processparam(self, name, value):
784 """process a parameter, applying its effect if needed
785 """process a parameter, applying its effect if needed
785
786
786 Parameter starting with a lower case letter are advisory and will be
787 Parameter starting with a lower case letter are advisory and will be
787 ignored when unknown. Those starting with an upper case letter are
788 ignored when unknown. Those starting with an upper case letter are
788 mandatory and will this function will raise a KeyError when unknown.
789 mandatory and will this function will raise a KeyError when unknown.
789
790
790 Note: no option are currently supported. Any input will be either
791 Note: no option are currently supported. Any input will be either
791 ignored or failing.
792 ignored or failing.
792 """
793 """
793 if not name:
794 if not name:
794 raise ValueError(r'empty parameter name')
795 raise ValueError(r'empty parameter name')
795 if name[0:1] not in pycompat.bytestr(string.ascii_letters):
796 if name[0:1] not in pycompat.bytestr(string.ascii_letters):
796 raise ValueError(r'non letter first character: %s' % name)
797 raise ValueError(r'non letter first character: %s' % name)
797 try:
798 try:
798 handler = b2streamparamsmap[name.lower()]
799 handler = b2streamparamsmap[name.lower()]
799 except KeyError:
800 except KeyError:
800 if name[0:1].islower():
801 if name[0:1].islower():
801 indebug(self.ui, "ignoring unknown parameter %s" % name)
802 indebug(self.ui, "ignoring unknown parameter %s" % name)
802 else:
803 else:
803 raise error.BundleUnknownFeatureError(params=(name,))
804 raise error.BundleUnknownFeatureError(params=(name,))
804 else:
805 else:
805 handler(self, name, value)
806 handler(self, name, value)
806
807
807 def _forwardchunks(self):
808 def _forwardchunks(self):
808 """utility to transfer a bundle2 as binary
809 """utility to transfer a bundle2 as binary
809
810
810 This is made necessary by the fact the 'getbundle' command over 'ssh'
811 This is made necessary by the fact the 'getbundle' command over 'ssh'
811 have no way to know then the reply end, relying on the bundle to be
812 have no way to know then the reply end, relying on the bundle to be
812 interpreted to know its end. This is terrible and we are sorry, but we
813 interpreted to know its end. This is terrible and we are sorry, but we
813 needed to move forward to get general delta enabled.
814 needed to move forward to get general delta enabled.
814 """
815 """
815 yield self._magicstring
816 yield self._magicstring
816 assert 'params' not in vars(self)
817 assert 'params' not in vars(self)
817 paramssize = self._unpack(_fstreamparamsize)[0]
818 paramssize = self._unpack(_fstreamparamsize)[0]
818 if paramssize < 0:
819 if paramssize < 0:
819 raise error.BundleValueError('negative bundle param size: %i'
820 raise error.BundleValueError('negative bundle param size: %i'
820 % paramssize)
821 % paramssize)
821 yield _pack(_fstreamparamsize, paramssize)
822 yield _pack(_fstreamparamsize, paramssize)
822 if paramssize:
823 if paramssize:
823 params = self._readexact(paramssize)
824 params = self._readexact(paramssize)
824 self._processallparams(params)
825 self._processallparams(params)
825 yield params
826 yield params
826 assert self._compengine.bundletype == 'UN'
827 assert self._compengine.bundletype == 'UN'
827 # From there, payload might need to be decompressed
828 # From there, payload might need to be decompressed
828 self._fp = self._compengine.decompressorreader(self._fp)
829 self._fp = self._compengine.decompressorreader(self._fp)
829 emptycount = 0
830 emptycount = 0
830 while emptycount < 2:
831 while emptycount < 2:
831 # so we can brainlessly loop
832 # so we can brainlessly loop
832 assert _fpartheadersize == _fpayloadsize
833 assert _fpartheadersize == _fpayloadsize
833 size = self._unpack(_fpartheadersize)[0]
834 size = self._unpack(_fpartheadersize)[0]
834 yield _pack(_fpartheadersize, size)
835 yield _pack(_fpartheadersize, size)
835 if size:
836 if size:
836 emptycount = 0
837 emptycount = 0
837 else:
838 else:
838 emptycount += 1
839 emptycount += 1
839 continue
840 continue
840 if size == flaginterrupt:
841 if size == flaginterrupt:
841 continue
842 continue
842 elif size < 0:
843 elif size < 0:
843 raise error.BundleValueError('negative chunk size: %i')
844 raise error.BundleValueError('negative chunk size: %i')
844 yield self._readexact(size)
845 yield self._readexact(size)
845
846
846
847
847 def iterparts(self):
848 def iterparts(self):
848 """yield all parts contained in the stream"""
849 """yield all parts contained in the stream"""
849 # make sure param have been loaded
850 # make sure param have been loaded
850 self.params
851 self.params
851 # From there, payload need to be decompressed
852 # From there, payload need to be decompressed
852 self._fp = self._compengine.decompressorreader(self._fp)
853 self._fp = self._compengine.decompressorreader(self._fp)
853 indebug(self.ui, 'start extraction of bundle2 parts')
854 indebug(self.ui, 'start extraction of bundle2 parts')
854 headerblock = self._readpartheader()
855 headerblock = self._readpartheader()
855 while headerblock is not None:
856 while headerblock is not None:
856 part = unbundlepart(self.ui, headerblock, self._fp)
857 part = unbundlepart(self.ui, headerblock, self._fp)
857 yield part
858 yield part
858 # Seek to the end of the part to force it's consumption so the next
859 # Seek to the end of the part to force it's consumption so the next
859 # part can be read. But then seek back to the beginning so the
860 # part can be read. But then seek back to the beginning so the
860 # code consuming this generator has a part that starts at 0.
861 # code consuming this generator has a part that starts at 0.
861 part.seek(0, 2)
862 part.seek(0, os.SEEK_END)
862 part.seek(0)
863 part.seek(0, os.SEEK_SET)
863 headerblock = self._readpartheader()
864 headerblock = self._readpartheader()
864 indebug(self.ui, 'end of bundle2 stream')
865 indebug(self.ui, 'end of bundle2 stream')
865
866
866 def _readpartheader(self):
867 def _readpartheader(self):
867 """reads a part header size and return the bytes blob
868 """reads a part header size and return the bytes blob
868
869
869 returns None if empty"""
870 returns None if empty"""
870 headersize = self._unpack(_fpartheadersize)[0]
871 headersize = self._unpack(_fpartheadersize)[0]
871 if headersize < 0:
872 if headersize < 0:
872 raise error.BundleValueError('negative part header size: %i'
873 raise error.BundleValueError('negative part header size: %i'
873 % headersize)
874 % headersize)
874 indebug(self.ui, 'part header size: %i' % headersize)
875 indebug(self.ui, 'part header size: %i' % headersize)
875 if headersize:
876 if headersize:
876 return self._readexact(headersize)
877 return self._readexact(headersize)
877 return None
878 return None
878
879
879 def compressed(self):
880 def compressed(self):
880 self.params # load params
881 self.params # load params
881 return self._compressed
882 return self._compressed
882
883
883 def close(self):
884 def close(self):
884 """close underlying file"""
885 """close underlying file"""
885 if util.safehasattr(self._fp, 'close'):
886 if util.safehasattr(self._fp, 'close'):
886 return self._fp.close()
887 return self._fp.close()
887
888
888 formatmap = {'20': unbundle20}
889 formatmap = {'20': unbundle20}
889
890
890 b2streamparamsmap = {}
891 b2streamparamsmap = {}
891
892
892 def b2streamparamhandler(name):
893 def b2streamparamhandler(name):
893 """register a handler for a stream level parameter"""
894 """register a handler for a stream level parameter"""
894 def decorator(func):
895 def decorator(func):
895 assert name not in formatmap
896 assert name not in formatmap
896 b2streamparamsmap[name] = func
897 b2streamparamsmap[name] = func
897 return func
898 return func
898 return decorator
899 return decorator
899
900
900 @b2streamparamhandler('compression')
901 @b2streamparamhandler('compression')
901 def processcompression(unbundler, param, value):
902 def processcompression(unbundler, param, value):
902 """read compression parameter and install payload decompression"""
903 """read compression parameter and install payload decompression"""
903 if value not in util.compengines.supportedbundletypes:
904 if value not in util.compengines.supportedbundletypes:
904 raise error.BundleUnknownFeatureError(params=(param,),
905 raise error.BundleUnknownFeatureError(params=(param,),
905 values=(value,))
906 values=(value,))
906 unbundler._compengine = util.compengines.forbundletype(value)
907 unbundler._compengine = util.compengines.forbundletype(value)
907 if value is not None:
908 if value is not None:
908 unbundler._compressed = True
909 unbundler._compressed = True
909
910
910 class bundlepart(object):
911 class bundlepart(object):
911 """A bundle2 part contains application level payload
912 """A bundle2 part contains application level payload
912
913
913 The part `type` is used to route the part to the application level
914 The part `type` is used to route the part to the application level
914 handler.
915 handler.
915
916
916 The part payload is contained in ``part.data``. It could be raw bytes or a
917 The part payload is contained in ``part.data``. It could be raw bytes or a
917 generator of byte chunks.
918 generator of byte chunks.
918
919
919 You can add parameters to the part using the ``addparam`` method.
920 You can add parameters to the part using the ``addparam`` method.
920 Parameters can be either mandatory (default) or advisory. Remote side
921 Parameters can be either mandatory (default) or advisory. Remote side
921 should be able to safely ignore the advisory ones.
922 should be able to safely ignore the advisory ones.
922
923
923 Both data and parameters cannot be modified after the generation has begun.
924 Both data and parameters cannot be modified after the generation has begun.
924 """
925 """
925
926
926 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
927 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
927 data='', mandatory=True):
928 data='', mandatory=True):
928 validateparttype(parttype)
929 validateparttype(parttype)
929 self.id = None
930 self.id = None
930 self.type = parttype
931 self.type = parttype
931 self._data = data
932 self._data = data
932 self._mandatoryparams = list(mandatoryparams)
933 self._mandatoryparams = list(mandatoryparams)
933 self._advisoryparams = list(advisoryparams)
934 self._advisoryparams = list(advisoryparams)
934 # checking for duplicated entries
935 # checking for duplicated entries
935 self._seenparams = set()
936 self._seenparams = set()
936 for pname, __ in self._mandatoryparams + self._advisoryparams:
937 for pname, __ in self._mandatoryparams + self._advisoryparams:
937 if pname in self._seenparams:
938 if pname in self._seenparams:
938 raise error.ProgrammingError('duplicated params: %s' % pname)
939 raise error.ProgrammingError('duplicated params: %s' % pname)
939 self._seenparams.add(pname)
940 self._seenparams.add(pname)
940 # status of the part's generation:
941 # status of the part's generation:
941 # - None: not started,
942 # - None: not started,
942 # - False: currently generated,
943 # - False: currently generated,
943 # - True: generation done.
944 # - True: generation done.
944 self._generated = None
945 self._generated = None
945 self.mandatory = mandatory
946 self.mandatory = mandatory
946
947
947 def __repr__(self):
948 def __repr__(self):
948 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
949 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
949 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
950 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
950 % (cls, id(self), self.id, self.type, self.mandatory))
951 % (cls, id(self), self.id, self.type, self.mandatory))
951
952
952 def copy(self):
953 def copy(self):
953 """return a copy of the part
954 """return a copy of the part
954
955
955 The new part have the very same content but no partid assigned yet.
956 The new part have the very same content but no partid assigned yet.
956 Parts with generated data cannot be copied."""
957 Parts with generated data cannot be copied."""
957 assert not util.safehasattr(self.data, 'next')
958 assert not util.safehasattr(self.data, 'next')
958 return self.__class__(self.type, self._mandatoryparams,
959 return self.__class__(self.type, self._mandatoryparams,
959 self._advisoryparams, self._data, self.mandatory)
960 self._advisoryparams, self._data, self.mandatory)
960
961
961 # methods used to defines the part content
962 # methods used to defines the part content
962 @property
963 @property
963 def data(self):
964 def data(self):
964 return self._data
965 return self._data
965
966
966 @data.setter
967 @data.setter
967 def data(self, data):
968 def data(self, data):
968 if self._generated is not None:
969 if self._generated is not None:
969 raise error.ReadOnlyPartError('part is being generated')
970 raise error.ReadOnlyPartError('part is being generated')
970 self._data = data
971 self._data = data
971
972
972 @property
973 @property
973 def mandatoryparams(self):
974 def mandatoryparams(self):
974 # make it an immutable tuple to force people through ``addparam``
975 # make it an immutable tuple to force people through ``addparam``
975 return tuple(self._mandatoryparams)
976 return tuple(self._mandatoryparams)
976
977
977 @property
978 @property
978 def advisoryparams(self):
979 def advisoryparams(self):
979 # make it an immutable tuple to force people through ``addparam``
980 # make it an immutable tuple to force people through ``addparam``
980 return tuple(self._advisoryparams)
981 return tuple(self._advisoryparams)
981
982
982 def addparam(self, name, value='', mandatory=True):
983 def addparam(self, name, value='', mandatory=True):
983 """add a parameter to the part
984 """add a parameter to the part
984
985
985 If 'mandatory' is set to True, the remote handler must claim support
986 If 'mandatory' is set to True, the remote handler must claim support
986 for this parameter or the unbundling will be aborted.
987 for this parameter or the unbundling will be aborted.
987
988
988 The 'name' and 'value' cannot exceed 255 bytes each.
989 The 'name' and 'value' cannot exceed 255 bytes each.
989 """
990 """
990 if self._generated is not None:
991 if self._generated is not None:
991 raise error.ReadOnlyPartError('part is being generated')
992 raise error.ReadOnlyPartError('part is being generated')
992 if name in self._seenparams:
993 if name in self._seenparams:
993 raise ValueError('duplicated params: %s' % name)
994 raise ValueError('duplicated params: %s' % name)
994 self._seenparams.add(name)
995 self._seenparams.add(name)
995 params = self._advisoryparams
996 params = self._advisoryparams
996 if mandatory:
997 if mandatory:
997 params = self._mandatoryparams
998 params = self._mandatoryparams
998 params.append((name, value))
999 params.append((name, value))
999
1000
1000 # methods used to generates the bundle2 stream
1001 # methods used to generates the bundle2 stream
1001 def getchunks(self, ui):
1002 def getchunks(self, ui):
1002 if self._generated is not None:
1003 if self._generated is not None:
1003 raise error.ProgrammingError('part can only be consumed once')
1004 raise error.ProgrammingError('part can only be consumed once')
1004 self._generated = False
1005 self._generated = False
1005
1006
1006 if ui.debugflag:
1007 if ui.debugflag:
1007 msg = ['bundle2-output-part: "%s"' % self.type]
1008 msg = ['bundle2-output-part: "%s"' % self.type]
1008 if not self.mandatory:
1009 if not self.mandatory:
1009 msg.append(' (advisory)')
1010 msg.append(' (advisory)')
1010 nbmp = len(self.mandatoryparams)
1011 nbmp = len(self.mandatoryparams)
1011 nbap = len(self.advisoryparams)
1012 nbap = len(self.advisoryparams)
1012 if nbmp or nbap:
1013 if nbmp or nbap:
1013 msg.append(' (params:')
1014 msg.append(' (params:')
1014 if nbmp:
1015 if nbmp:
1015 msg.append(' %i mandatory' % nbmp)
1016 msg.append(' %i mandatory' % nbmp)
1016 if nbap:
1017 if nbap:
1017 msg.append(' %i advisory' % nbmp)
1018 msg.append(' %i advisory' % nbmp)
1018 msg.append(')')
1019 msg.append(')')
1019 if not self.data:
1020 if not self.data:
1020 msg.append(' empty payload')
1021 msg.append(' empty payload')
1021 elif (util.safehasattr(self.data, 'next')
1022 elif (util.safehasattr(self.data, 'next')
1022 or util.safehasattr(self.data, '__next__')):
1023 or util.safehasattr(self.data, '__next__')):
1023 msg.append(' streamed payload')
1024 msg.append(' streamed payload')
1024 else:
1025 else:
1025 msg.append(' %i bytes payload' % len(self.data))
1026 msg.append(' %i bytes payload' % len(self.data))
1026 msg.append('\n')
1027 msg.append('\n')
1027 ui.debug(''.join(msg))
1028 ui.debug(''.join(msg))
1028
1029
1029 #### header
1030 #### header
1030 if self.mandatory:
1031 if self.mandatory:
1031 parttype = self.type.upper()
1032 parttype = self.type.upper()
1032 else:
1033 else:
1033 parttype = self.type.lower()
1034 parttype = self.type.lower()
1034 outdebug(ui, 'part %s: "%s"' % (pycompat.bytestr(self.id), parttype))
1035 outdebug(ui, 'part %s: "%s"' % (pycompat.bytestr(self.id), parttype))
1035 ## parttype
1036 ## parttype
1036 header = [_pack(_fparttypesize, len(parttype)),
1037 header = [_pack(_fparttypesize, len(parttype)),
1037 parttype, _pack(_fpartid, self.id),
1038 parttype, _pack(_fpartid, self.id),
1038 ]
1039 ]
1039 ## parameters
1040 ## parameters
1040 # count
1041 # count
1041 manpar = self.mandatoryparams
1042 manpar = self.mandatoryparams
1042 advpar = self.advisoryparams
1043 advpar = self.advisoryparams
1043 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
1044 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
1044 # size
1045 # size
1045 parsizes = []
1046 parsizes = []
1046 for key, value in manpar:
1047 for key, value in manpar:
1047 parsizes.append(len(key))
1048 parsizes.append(len(key))
1048 parsizes.append(len(value))
1049 parsizes.append(len(value))
1049 for key, value in advpar:
1050 for key, value in advpar:
1050 parsizes.append(len(key))
1051 parsizes.append(len(key))
1051 parsizes.append(len(value))
1052 parsizes.append(len(value))
1052 paramsizes = _pack(_makefpartparamsizes(len(parsizes) // 2), *parsizes)
1053 paramsizes = _pack(_makefpartparamsizes(len(parsizes) // 2), *parsizes)
1053 header.append(paramsizes)
1054 header.append(paramsizes)
1054 # key, value
1055 # key, value
1055 for key, value in manpar:
1056 for key, value in manpar:
1056 header.append(key)
1057 header.append(key)
1057 header.append(value)
1058 header.append(value)
1058 for key, value in advpar:
1059 for key, value in advpar:
1059 header.append(key)
1060 header.append(key)
1060 header.append(value)
1061 header.append(value)
1061 ## finalize header
1062 ## finalize header
1062 try:
1063 try:
1063 headerchunk = ''.join(header)
1064 headerchunk = ''.join(header)
1064 except TypeError:
1065 except TypeError:
1065 raise TypeError(r'Found a non-bytes trying to '
1066 raise TypeError(r'Found a non-bytes trying to '
1066 r'build bundle part header: %r' % header)
1067 r'build bundle part header: %r' % header)
1067 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
1068 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
1068 yield _pack(_fpartheadersize, len(headerchunk))
1069 yield _pack(_fpartheadersize, len(headerchunk))
1069 yield headerchunk
1070 yield headerchunk
1070 ## payload
1071 ## payload
1071 try:
1072 try:
1072 for chunk in self._payloadchunks():
1073 for chunk in self._payloadchunks():
1073 outdebug(ui, 'payload chunk size: %i' % len(chunk))
1074 outdebug(ui, 'payload chunk size: %i' % len(chunk))
1074 yield _pack(_fpayloadsize, len(chunk))
1075 yield _pack(_fpayloadsize, len(chunk))
1075 yield chunk
1076 yield chunk
1076 except GeneratorExit:
1077 except GeneratorExit:
1077 # GeneratorExit means that nobody is listening for our
1078 # GeneratorExit means that nobody is listening for our
1078 # results anyway, so just bail quickly rather than trying
1079 # results anyway, so just bail quickly rather than trying
1079 # to produce an error part.
1080 # to produce an error part.
1080 ui.debug('bundle2-generatorexit\n')
1081 ui.debug('bundle2-generatorexit\n')
1081 raise
1082 raise
1082 except BaseException as exc:
1083 except BaseException as exc:
1083 bexc = util.forcebytestr(exc)
1084 bexc = util.forcebytestr(exc)
1084 # backup exception data for later
1085 # backup exception data for later
1085 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
1086 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
1086 % bexc)
1087 % bexc)
1087 tb = sys.exc_info()[2]
1088 tb = sys.exc_info()[2]
1088 msg = 'unexpected error: %s' % bexc
1089 msg = 'unexpected error: %s' % bexc
1089 interpart = bundlepart('error:abort', [('message', msg)],
1090 interpart = bundlepart('error:abort', [('message', msg)],
1090 mandatory=False)
1091 mandatory=False)
1091 interpart.id = 0
1092 interpart.id = 0
1092 yield _pack(_fpayloadsize, -1)
1093 yield _pack(_fpayloadsize, -1)
1093 for chunk in interpart.getchunks(ui=ui):
1094 for chunk in interpart.getchunks(ui=ui):
1094 yield chunk
1095 yield chunk
1095 outdebug(ui, 'closing payload chunk')
1096 outdebug(ui, 'closing payload chunk')
1096 # abort current part payload
1097 # abort current part payload
1097 yield _pack(_fpayloadsize, 0)
1098 yield _pack(_fpayloadsize, 0)
1098 pycompat.raisewithtb(exc, tb)
1099 pycompat.raisewithtb(exc, tb)
1099 # end of payload
1100 # end of payload
1100 outdebug(ui, 'closing payload chunk')
1101 outdebug(ui, 'closing payload chunk')
1101 yield _pack(_fpayloadsize, 0)
1102 yield _pack(_fpayloadsize, 0)
1102 self._generated = True
1103 self._generated = True
1103
1104
1104 def _payloadchunks(self):
1105 def _payloadchunks(self):
1105 """yield chunks of a the part payload
1106 """yield chunks of a the part payload
1106
1107
1107 Exists to handle the different methods to provide data to a part."""
1108 Exists to handle the different methods to provide data to a part."""
1108 # we only support fixed size data now.
1109 # we only support fixed size data now.
1109 # This will be improved in the future.
1110 # This will be improved in the future.
1110 if (util.safehasattr(self.data, 'next')
1111 if (util.safehasattr(self.data, 'next')
1111 or util.safehasattr(self.data, '__next__')):
1112 or util.safehasattr(self.data, '__next__')):
1112 buff = util.chunkbuffer(self.data)
1113 buff = util.chunkbuffer(self.data)
1113 chunk = buff.read(preferedchunksize)
1114 chunk = buff.read(preferedchunksize)
1114 while chunk:
1115 while chunk:
1115 yield chunk
1116 yield chunk
1116 chunk = buff.read(preferedchunksize)
1117 chunk = buff.read(preferedchunksize)
1117 elif len(self.data):
1118 elif len(self.data):
1118 yield self.data
1119 yield self.data
1119
1120
1120
1121
1121 flaginterrupt = -1
1122 flaginterrupt = -1
1122
1123
1123 class interrupthandler(unpackermixin):
1124 class interrupthandler(unpackermixin):
1124 """read one part and process it with restricted capability
1125 """read one part and process it with restricted capability
1125
1126
1126 This allows to transmit exception raised on the producer size during part
1127 This allows to transmit exception raised on the producer size during part
1127 iteration while the consumer is reading a part.
1128 iteration while the consumer is reading a part.
1128
1129
1129 Part processed in this manner only have access to a ui object,"""
1130 Part processed in this manner only have access to a ui object,"""
1130
1131
1131 def __init__(self, ui, fp):
1132 def __init__(self, ui, fp):
1132 super(interrupthandler, self).__init__(fp)
1133 super(interrupthandler, self).__init__(fp)
1133 self.ui = ui
1134 self.ui = ui
1134
1135
1135 def _readpartheader(self):
1136 def _readpartheader(self):
1136 """reads a part header size and return the bytes blob
1137 """reads a part header size and return the bytes blob
1137
1138
1138 returns None if empty"""
1139 returns None if empty"""
1139 headersize = self._unpack(_fpartheadersize)[0]
1140 headersize = self._unpack(_fpartheadersize)[0]
1140 if headersize < 0:
1141 if headersize < 0:
1141 raise error.BundleValueError('negative part header size: %i'
1142 raise error.BundleValueError('negative part header size: %i'
1142 % headersize)
1143 % headersize)
1143 indebug(self.ui, 'part header size: %i\n' % headersize)
1144 indebug(self.ui, 'part header size: %i\n' % headersize)
1144 if headersize:
1145 if headersize:
1145 return self._readexact(headersize)
1146 return self._readexact(headersize)
1146 return None
1147 return None
1147
1148
1148 def __call__(self):
1149 def __call__(self):
1149
1150
1150 self.ui.debug('bundle2-input-stream-interrupt:'
1151 self.ui.debug('bundle2-input-stream-interrupt:'
1151 ' opening out of band context\n')
1152 ' opening out of band context\n')
1152 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1153 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1153 headerblock = self._readpartheader()
1154 headerblock = self._readpartheader()
1154 if headerblock is None:
1155 if headerblock is None:
1155 indebug(self.ui, 'no part found during interruption.')
1156 indebug(self.ui, 'no part found during interruption.')
1156 return
1157 return
1157 part = unbundlepart(self.ui, headerblock, self._fp)
1158 part = unbundlepart(self.ui, headerblock, self._fp)
1158 op = interruptoperation(self.ui)
1159 op = interruptoperation(self.ui)
1159 hardabort = False
1160 hardabort = False
1160 try:
1161 try:
1161 _processpart(op, part)
1162 _processpart(op, part)
1162 except (SystemExit, KeyboardInterrupt):
1163 except (SystemExit, KeyboardInterrupt):
1163 hardabort = True
1164 hardabort = True
1164 raise
1165 raise
1165 finally:
1166 finally:
1166 if not hardabort:
1167 if not hardabort:
1167 part.seek(0, 2)
1168 part.seek(0, os.SEEK_END)
1168 self.ui.debug('bundle2-input-stream-interrupt:'
1169 self.ui.debug('bundle2-input-stream-interrupt:'
1169 ' closing out of band context\n')
1170 ' closing out of band context\n')
1170
1171
1171 class interruptoperation(object):
1172 class interruptoperation(object):
1172 """A limited operation to be use by part handler during interruption
1173 """A limited operation to be use by part handler during interruption
1173
1174
1174 It only have access to an ui object.
1175 It only have access to an ui object.
1175 """
1176 """
1176
1177
1177 def __init__(self, ui):
1178 def __init__(self, ui):
1178 self.ui = ui
1179 self.ui = ui
1179 self.reply = None
1180 self.reply = None
1180 self.captureoutput = False
1181 self.captureoutput = False
1181
1182
1182 @property
1183 @property
1183 def repo(self):
1184 def repo(self):
1184 raise error.ProgrammingError('no repo access from stream interruption')
1185 raise error.ProgrammingError('no repo access from stream interruption')
1185
1186
1186 def gettransaction(self):
1187 def gettransaction(self):
1187 raise TransactionUnavailable('no repo access from stream interruption')
1188 raise TransactionUnavailable('no repo access from stream interruption')
1188
1189
1189 class unbundlepart(unpackermixin):
1190 class unbundlepart(unpackermixin):
1190 """a bundle part read from a bundle"""
1191 """a bundle part read from a bundle"""
1191
1192
1192 def __init__(self, ui, header, fp):
1193 def __init__(self, ui, header, fp):
1193 super(unbundlepart, self).__init__(fp)
1194 super(unbundlepart, self).__init__(fp)
1194 self._seekable = (util.safehasattr(fp, 'seek') and
1195 self._seekable = (util.safehasattr(fp, 'seek') and
1195 util.safehasattr(fp, 'tell'))
1196 util.safehasattr(fp, 'tell'))
1196 self.ui = ui
1197 self.ui = ui
1197 # unbundle state attr
1198 # unbundle state attr
1198 self._headerdata = header
1199 self._headerdata = header
1199 self._headeroffset = 0
1200 self._headeroffset = 0
1200 self._initialized = False
1201 self._initialized = False
1201 self.consumed = False
1202 self.consumed = False
1202 # part data
1203 # part data
1203 self.id = None
1204 self.id = None
1204 self.type = None
1205 self.type = None
1205 self.mandatoryparams = None
1206 self.mandatoryparams = None
1206 self.advisoryparams = None
1207 self.advisoryparams = None
1207 self.params = None
1208 self.params = None
1208 self.mandatorykeys = ()
1209 self.mandatorykeys = ()
1209 self._payloadstream = None
1210 self._payloadstream = None
1210 self._readheader()
1211 self._readheader()
1211 self._mandatory = None
1212 self._mandatory = None
1212 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1213 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1213 self._pos = 0
1214 self._pos = 0
1214
1215
1215 def _fromheader(self, size):
1216 def _fromheader(self, size):
1216 """return the next <size> byte from the header"""
1217 """return the next <size> byte from the header"""
1217 offset = self._headeroffset
1218 offset = self._headeroffset
1218 data = self._headerdata[offset:(offset + size)]
1219 data = self._headerdata[offset:(offset + size)]
1219 self._headeroffset = offset + size
1220 self._headeroffset = offset + size
1220 return data
1221 return data
1221
1222
1222 def _unpackheader(self, format):
1223 def _unpackheader(self, format):
1223 """read given format from header
1224 """read given format from header
1224
1225
1225 This automatically compute the size of the format to read."""
1226 This automatically compute the size of the format to read."""
1226 data = self._fromheader(struct.calcsize(format))
1227 data = self._fromheader(struct.calcsize(format))
1227 return _unpack(format, data)
1228 return _unpack(format, data)
1228
1229
1229 def _initparams(self, mandatoryparams, advisoryparams):
1230 def _initparams(self, mandatoryparams, advisoryparams):
1230 """internal function to setup all logic related parameters"""
1231 """internal function to setup all logic related parameters"""
1231 # make it read only to prevent people touching it by mistake.
1232 # make it read only to prevent people touching it by mistake.
1232 self.mandatoryparams = tuple(mandatoryparams)
1233 self.mandatoryparams = tuple(mandatoryparams)
1233 self.advisoryparams = tuple(advisoryparams)
1234 self.advisoryparams = tuple(advisoryparams)
1234 # user friendly UI
1235 # user friendly UI
1235 self.params = util.sortdict(self.mandatoryparams)
1236 self.params = util.sortdict(self.mandatoryparams)
1236 self.params.update(self.advisoryparams)
1237 self.params.update(self.advisoryparams)
1237 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1238 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1238
1239
1239 def _payloadchunks(self, chunknum=0):
1240 def _payloadchunks(self, chunknum=0):
1240 '''seek to specified chunk and start yielding data'''
1241 '''seek to specified chunk and start yielding data'''
1241 if len(self._chunkindex) == 0:
1242 if len(self._chunkindex) == 0:
1242 assert chunknum == 0, 'Must start with chunk 0'
1243 assert chunknum == 0, 'Must start with chunk 0'
1243 self._chunkindex.append((0, self._tellfp()))
1244 self._chunkindex.append((0, self._tellfp()))
1244 else:
1245 else:
1245 assert chunknum < len(self._chunkindex), \
1246 assert chunknum < len(self._chunkindex), \
1246 'Unknown chunk %d' % chunknum
1247 'Unknown chunk %d' % chunknum
1247 self._seekfp(self._chunkindex[chunknum][1])
1248 self._seekfp(self._chunkindex[chunknum][1])
1248
1249
1249 pos = self._chunkindex[chunknum][0]
1250 pos = self._chunkindex[chunknum][0]
1250 payloadsize = self._unpack(_fpayloadsize)[0]
1251 payloadsize = self._unpack(_fpayloadsize)[0]
1251 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1252 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1252 while payloadsize:
1253 while payloadsize:
1253 if payloadsize == flaginterrupt:
1254 if payloadsize == flaginterrupt:
1254 # interruption detection, the handler will now read a
1255 # interruption detection, the handler will now read a
1255 # single part and process it.
1256 # single part and process it.
1256 interrupthandler(self.ui, self._fp)()
1257 interrupthandler(self.ui, self._fp)()
1257 elif payloadsize < 0:
1258 elif payloadsize < 0:
1258 msg = 'negative payload chunk size: %i' % payloadsize
1259 msg = 'negative payload chunk size: %i' % payloadsize
1259 raise error.BundleValueError(msg)
1260 raise error.BundleValueError(msg)
1260 else:
1261 else:
1261 result = self._readexact(payloadsize)
1262 result = self._readexact(payloadsize)
1262 chunknum += 1
1263 chunknum += 1
1263 pos += payloadsize
1264 pos += payloadsize
1264 if chunknum == len(self._chunkindex):
1265 if chunknum == len(self._chunkindex):
1265 self._chunkindex.append((pos, self._tellfp()))
1266 self._chunkindex.append((pos, self._tellfp()))
1266 yield result
1267 yield result
1267 payloadsize = self._unpack(_fpayloadsize)[0]
1268 payloadsize = self._unpack(_fpayloadsize)[0]
1268 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1269 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1269
1270
1270 def _findchunk(self, pos):
1271 def _findchunk(self, pos):
1271 '''for a given payload position, return a chunk number and offset'''
1272 '''for a given payload position, return a chunk number and offset'''
1272 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1273 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1273 if ppos == pos:
1274 if ppos == pos:
1274 return chunk, 0
1275 return chunk, 0
1275 elif ppos > pos:
1276 elif ppos > pos:
1276 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1277 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1277 raise ValueError('Unknown chunk')
1278 raise ValueError('Unknown chunk')
1278
1279
1279 def _readheader(self):
1280 def _readheader(self):
1280 """read the header and setup the object"""
1281 """read the header and setup the object"""
1281 typesize = self._unpackheader(_fparttypesize)[0]
1282 typesize = self._unpackheader(_fparttypesize)[0]
1282 self.type = self._fromheader(typesize)
1283 self.type = self._fromheader(typesize)
1283 indebug(self.ui, 'part type: "%s"' % self.type)
1284 indebug(self.ui, 'part type: "%s"' % self.type)
1284 self.id = self._unpackheader(_fpartid)[0]
1285 self.id = self._unpackheader(_fpartid)[0]
1285 indebug(self.ui, 'part id: "%s"' % pycompat.bytestr(self.id))
1286 indebug(self.ui, 'part id: "%s"' % pycompat.bytestr(self.id))
1286 # extract mandatory bit from type
1287 # extract mandatory bit from type
1287 self.mandatory = (self.type != self.type.lower())
1288 self.mandatory = (self.type != self.type.lower())
1288 self.type = self.type.lower()
1289 self.type = self.type.lower()
1289 ## reading parameters
1290 ## reading parameters
1290 # param count
1291 # param count
1291 mancount, advcount = self._unpackheader(_fpartparamcount)
1292 mancount, advcount = self._unpackheader(_fpartparamcount)
1292 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1293 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1293 # param size
1294 # param size
1294 fparamsizes = _makefpartparamsizes(mancount + advcount)
1295 fparamsizes = _makefpartparamsizes(mancount + advcount)
1295 paramsizes = self._unpackheader(fparamsizes)
1296 paramsizes = self._unpackheader(fparamsizes)
1296 # make it a list of couple again
1297 # make it a list of couple again
1297 paramsizes = list(zip(paramsizes[::2], paramsizes[1::2]))
1298 paramsizes = list(zip(paramsizes[::2], paramsizes[1::2]))
1298 # split mandatory from advisory
1299 # split mandatory from advisory
1299 mansizes = paramsizes[:mancount]
1300 mansizes = paramsizes[:mancount]
1300 advsizes = paramsizes[mancount:]
1301 advsizes = paramsizes[mancount:]
1301 # retrieve param value
1302 # retrieve param value
1302 manparams = []
1303 manparams = []
1303 for key, value in mansizes:
1304 for key, value in mansizes:
1304 manparams.append((self._fromheader(key), self._fromheader(value)))
1305 manparams.append((self._fromheader(key), self._fromheader(value)))
1305 advparams = []
1306 advparams = []
1306 for key, value in advsizes:
1307 for key, value in advsizes:
1307 advparams.append((self._fromheader(key), self._fromheader(value)))
1308 advparams.append((self._fromheader(key), self._fromheader(value)))
1308 self._initparams(manparams, advparams)
1309 self._initparams(manparams, advparams)
1309 ## part payload
1310 ## part payload
1310 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1311 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1311 # we read the data, tell it
1312 # we read the data, tell it
1312 self._initialized = True
1313 self._initialized = True
1313
1314
1314 def read(self, size=None):
1315 def read(self, size=None):
1315 """read payload data"""
1316 """read payload data"""
1316 if not self._initialized:
1317 if not self._initialized:
1317 self._readheader()
1318 self._readheader()
1318 if size is None:
1319 if size is None:
1319 data = self._payloadstream.read()
1320 data = self._payloadstream.read()
1320 else:
1321 else:
1321 data = self._payloadstream.read(size)
1322 data = self._payloadstream.read(size)
1322 self._pos += len(data)
1323 self._pos += len(data)
1323 if size is None or len(data) < size:
1324 if size is None or len(data) < size:
1324 if not self.consumed and self._pos:
1325 if not self.consumed and self._pos:
1325 self.ui.debug('bundle2-input-part: total payload size %i\n'
1326 self.ui.debug('bundle2-input-part: total payload size %i\n'
1326 % self._pos)
1327 % self._pos)
1327 self.consumed = True
1328 self.consumed = True
1328 return data
1329 return data
1329
1330
1330 def tell(self):
1331 def tell(self):
1331 return self._pos
1332 return self._pos
1332
1333
1333 def seek(self, offset, whence=0):
1334 def seek(self, offset, whence=os.SEEK_SET):
1334 if whence == 0:
1335 if whence == os.SEEK_SET:
1335 newpos = offset
1336 newpos = offset
1336 elif whence == 1:
1337 elif whence == os.SEEK_CUR:
1337 newpos = self._pos + offset
1338 newpos = self._pos + offset
1338 elif whence == 2:
1339 elif whence == os.SEEK_END:
1339 if not self.consumed:
1340 if not self.consumed:
1340 self.read()
1341 self.read()
1341 newpos = self._chunkindex[-1][0] - offset
1342 newpos = self._chunkindex[-1][0] - offset
1342 else:
1343 else:
1343 raise ValueError('Unknown whence value: %r' % (whence,))
1344 raise ValueError('Unknown whence value: %r' % (whence,))
1344
1345
1345 if newpos > self._chunkindex[-1][0] and not self.consumed:
1346 if newpos > self._chunkindex[-1][0] and not self.consumed:
1346 self.read()
1347 self.read()
1347 if not 0 <= newpos <= self._chunkindex[-1][0]:
1348 if not 0 <= newpos <= self._chunkindex[-1][0]:
1348 raise ValueError('Offset out of range')
1349 raise ValueError('Offset out of range')
1349
1350
1350 if self._pos != newpos:
1351 if self._pos != newpos:
1351 chunk, internaloffset = self._findchunk(newpos)
1352 chunk, internaloffset = self._findchunk(newpos)
1352 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1353 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1353 adjust = self.read(internaloffset)
1354 adjust = self.read(internaloffset)
1354 if len(adjust) != internaloffset:
1355 if len(adjust) != internaloffset:
1355 raise error.Abort(_('Seek failed\n'))
1356 raise error.Abort(_('Seek failed\n'))
1356 self._pos = newpos
1357 self._pos = newpos
1357
1358
1358 def _seekfp(self, offset, whence=0):
1359 def _seekfp(self, offset, whence=0):
1359 """move the underlying file pointer
1360 """move the underlying file pointer
1360
1361
1361 This method is meant for internal usage by the bundle2 protocol only.
1362 This method is meant for internal usage by the bundle2 protocol only.
1362 They directly manipulate the low level stream including bundle2 level
1363 They directly manipulate the low level stream including bundle2 level
1363 instruction.
1364 instruction.
1364
1365
1365 Do not use it to implement higher-level logic or methods."""
1366 Do not use it to implement higher-level logic or methods."""
1366 if self._seekable:
1367 if self._seekable:
1367 return self._fp.seek(offset, whence)
1368 return self._fp.seek(offset, whence)
1368 else:
1369 else:
1369 raise NotImplementedError(_('File pointer is not seekable'))
1370 raise NotImplementedError(_('File pointer is not seekable'))
1370
1371
1371 def _tellfp(self):
1372 def _tellfp(self):
1372 """return the file offset, or None if file is not seekable
1373 """return the file offset, or None if file is not seekable
1373
1374
1374 This method is meant for internal usage by the bundle2 protocol only.
1375 This method is meant for internal usage by the bundle2 protocol only.
1375 They directly manipulate the low level stream including bundle2 level
1376 They directly manipulate the low level stream including bundle2 level
1376 instruction.
1377 instruction.
1377
1378
1378 Do not use it to implement higher-level logic or methods."""
1379 Do not use it to implement higher-level logic or methods."""
1379 if self._seekable:
1380 if self._seekable:
1380 try:
1381 try:
1381 return self._fp.tell()
1382 return self._fp.tell()
1382 except IOError as e:
1383 except IOError as e:
1383 if e.errno == errno.ESPIPE:
1384 if e.errno == errno.ESPIPE:
1384 self._seekable = False
1385 self._seekable = False
1385 else:
1386 else:
1386 raise
1387 raise
1387 return None
1388 return None
1388
1389
1389 # These are only the static capabilities.
1390 # These are only the static capabilities.
1390 # Check the 'getrepocaps' function for the rest.
1391 # Check the 'getrepocaps' function for the rest.
1391 capabilities = {'HG20': (),
1392 capabilities = {'HG20': (),
1392 'error': ('abort', 'unsupportedcontent', 'pushraced',
1393 'error': ('abort', 'unsupportedcontent', 'pushraced',
1393 'pushkey'),
1394 'pushkey'),
1394 'listkeys': (),
1395 'listkeys': (),
1395 'pushkey': (),
1396 'pushkey': (),
1396 'digests': tuple(sorted(util.DIGESTS.keys())),
1397 'digests': tuple(sorted(util.DIGESTS.keys())),
1397 'remote-changegroup': ('http', 'https'),
1398 'remote-changegroup': ('http', 'https'),
1398 'hgtagsfnodes': (),
1399 'hgtagsfnodes': (),
1399 'phases': ('heads',),
1400 'phases': ('heads',),
1400 }
1401 }
1401
1402
1402 def getrepocaps(repo, allowpushback=False):
1403 def getrepocaps(repo, allowpushback=False):
1403 """return the bundle2 capabilities for a given repo
1404 """return the bundle2 capabilities for a given repo
1404
1405
1405 Exists to allow extensions (like evolution) to mutate the capabilities.
1406 Exists to allow extensions (like evolution) to mutate the capabilities.
1406 """
1407 """
1407 caps = capabilities.copy()
1408 caps = capabilities.copy()
1408 caps['changegroup'] = tuple(sorted(
1409 caps['changegroup'] = tuple(sorted(
1409 changegroup.supportedincomingversions(repo)))
1410 changegroup.supportedincomingversions(repo)))
1410 if obsolete.isenabled(repo, obsolete.exchangeopt):
1411 if obsolete.isenabled(repo, obsolete.exchangeopt):
1411 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1412 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1412 caps['obsmarkers'] = supportedformat
1413 caps['obsmarkers'] = supportedformat
1413 if allowpushback:
1414 if allowpushback:
1414 caps['pushback'] = ()
1415 caps['pushback'] = ()
1415 cpmode = repo.ui.config('server', 'concurrent-push-mode')
1416 cpmode = repo.ui.config('server', 'concurrent-push-mode')
1416 if cpmode == 'check-related':
1417 if cpmode == 'check-related':
1417 caps['checkheads'] = ('related',)
1418 caps['checkheads'] = ('related',)
1418 if 'phases' in repo.ui.configlist('devel', 'legacy.exchange'):
1419 if 'phases' in repo.ui.configlist('devel', 'legacy.exchange'):
1419 caps.pop('phases')
1420 caps.pop('phases')
1420 return caps
1421 return caps
1421
1422
1422 def bundle2caps(remote):
1423 def bundle2caps(remote):
1423 """return the bundle capabilities of a peer as dict"""
1424 """return the bundle capabilities of a peer as dict"""
1424 raw = remote.capable('bundle2')
1425 raw = remote.capable('bundle2')
1425 if not raw and raw != '':
1426 if not raw and raw != '':
1426 return {}
1427 return {}
1427 capsblob = urlreq.unquote(remote.capable('bundle2'))
1428 capsblob = urlreq.unquote(remote.capable('bundle2'))
1428 return decodecaps(capsblob)
1429 return decodecaps(capsblob)
1429
1430
1430 def obsmarkersversion(caps):
1431 def obsmarkersversion(caps):
1431 """extract the list of supported obsmarkers versions from a bundle2caps dict
1432 """extract the list of supported obsmarkers versions from a bundle2caps dict
1432 """
1433 """
1433 obscaps = caps.get('obsmarkers', ())
1434 obscaps = caps.get('obsmarkers', ())
1434 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1435 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1435
1436
1436 def writenewbundle(ui, repo, source, filename, bundletype, outgoing, opts,
1437 def writenewbundle(ui, repo, source, filename, bundletype, outgoing, opts,
1437 vfs=None, compression=None, compopts=None):
1438 vfs=None, compression=None, compopts=None):
1438 if bundletype.startswith('HG10'):
1439 if bundletype.startswith('HG10'):
1439 cg = changegroup.makechangegroup(repo, outgoing, '01', source)
1440 cg = changegroup.makechangegroup(repo, outgoing, '01', source)
1440 return writebundle(ui, cg, filename, bundletype, vfs=vfs,
1441 return writebundle(ui, cg, filename, bundletype, vfs=vfs,
1441 compression=compression, compopts=compopts)
1442 compression=compression, compopts=compopts)
1442 elif not bundletype.startswith('HG20'):
1443 elif not bundletype.startswith('HG20'):
1443 raise error.ProgrammingError('unknown bundle type: %s' % bundletype)
1444 raise error.ProgrammingError('unknown bundle type: %s' % bundletype)
1444
1445
1445 caps = {}
1446 caps = {}
1446 if 'obsolescence' in opts:
1447 if 'obsolescence' in opts:
1447 caps['obsmarkers'] = ('V1',)
1448 caps['obsmarkers'] = ('V1',)
1448 bundle = bundle20(ui, caps)
1449 bundle = bundle20(ui, caps)
1449 bundle.setcompression(compression, compopts)
1450 bundle.setcompression(compression, compopts)
1450 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1451 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1451 chunkiter = bundle.getchunks()
1452 chunkiter = bundle.getchunks()
1452
1453
1453 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1454 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1454
1455
1455 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1456 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1456 # We should eventually reconcile this logic with the one behind
1457 # We should eventually reconcile this logic with the one behind
1457 # 'exchange.getbundle2partsgenerator'.
1458 # 'exchange.getbundle2partsgenerator'.
1458 #
1459 #
1459 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1460 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1460 # different right now. So we keep them separated for now for the sake of
1461 # different right now. So we keep them separated for now for the sake of
1461 # simplicity.
1462 # simplicity.
1462
1463
1463 # we always want a changegroup in such bundle
1464 # we always want a changegroup in such bundle
1464 cgversion = opts.get('cg.version')
1465 cgversion = opts.get('cg.version')
1465 if cgversion is None:
1466 if cgversion is None:
1466 cgversion = changegroup.safeversion(repo)
1467 cgversion = changegroup.safeversion(repo)
1467 cg = changegroup.makechangegroup(repo, outgoing, cgversion, source)
1468 cg = changegroup.makechangegroup(repo, outgoing, cgversion, source)
1468 part = bundler.newpart('changegroup', data=cg.getchunks())
1469 part = bundler.newpart('changegroup', data=cg.getchunks())
1469 part.addparam('version', cg.version)
1470 part.addparam('version', cg.version)
1470 if 'clcount' in cg.extras:
1471 if 'clcount' in cg.extras:
1471 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1472 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1472 mandatory=False)
1473 mandatory=False)
1473 if opts.get('phases') and repo.revs('%ln and secret()',
1474 if opts.get('phases') and repo.revs('%ln and secret()',
1474 outgoing.missingheads):
1475 outgoing.missingheads):
1475 part.addparam('targetphase', '%d' % phases.secret, mandatory=False)
1476 part.addparam('targetphase', '%d' % phases.secret, mandatory=False)
1476
1477
1477 addparttagsfnodescache(repo, bundler, outgoing)
1478 addparttagsfnodescache(repo, bundler, outgoing)
1478
1479
1479 if opts.get('obsolescence', False):
1480 if opts.get('obsolescence', False):
1480 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1481 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1481 buildobsmarkerspart(bundler, obsmarkers)
1482 buildobsmarkerspart(bundler, obsmarkers)
1482
1483
1483 if opts.get('phases', False):
1484 if opts.get('phases', False):
1484 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1485 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1485 phasedata = phases.binaryencode(headsbyphase)
1486 phasedata = phases.binaryencode(headsbyphase)
1486 bundler.newpart('phase-heads', data=phasedata)
1487 bundler.newpart('phase-heads', data=phasedata)
1487
1488
1488 def addparttagsfnodescache(repo, bundler, outgoing):
1489 def addparttagsfnodescache(repo, bundler, outgoing):
1489 # we include the tags fnode cache for the bundle changeset
1490 # we include the tags fnode cache for the bundle changeset
1490 # (as an optional parts)
1491 # (as an optional parts)
1491 cache = tags.hgtagsfnodescache(repo.unfiltered())
1492 cache = tags.hgtagsfnodescache(repo.unfiltered())
1492 chunks = []
1493 chunks = []
1493
1494
1494 # .hgtags fnodes are only relevant for head changesets. While we could
1495 # .hgtags fnodes are only relevant for head changesets. While we could
1495 # transfer values for all known nodes, there will likely be little to
1496 # transfer values for all known nodes, there will likely be little to
1496 # no benefit.
1497 # no benefit.
1497 #
1498 #
1498 # We don't bother using a generator to produce output data because
1499 # We don't bother using a generator to produce output data because
1499 # a) we only have 40 bytes per head and even esoteric numbers of heads
1500 # a) we only have 40 bytes per head and even esoteric numbers of heads
1500 # consume little memory (1M heads is 40MB) b) we don't want to send the
1501 # consume little memory (1M heads is 40MB) b) we don't want to send the
1501 # part if we don't have entries and knowing if we have entries requires
1502 # part if we don't have entries and knowing if we have entries requires
1502 # cache lookups.
1503 # cache lookups.
1503 for node in outgoing.missingheads:
1504 for node in outgoing.missingheads:
1504 # Don't compute missing, as this may slow down serving.
1505 # Don't compute missing, as this may slow down serving.
1505 fnode = cache.getfnode(node, computemissing=False)
1506 fnode = cache.getfnode(node, computemissing=False)
1506 if fnode is not None:
1507 if fnode is not None:
1507 chunks.extend([node, fnode])
1508 chunks.extend([node, fnode])
1508
1509
1509 if chunks:
1510 if chunks:
1510 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1511 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1511
1512
1512 def buildobsmarkerspart(bundler, markers):
1513 def buildobsmarkerspart(bundler, markers):
1513 """add an obsmarker part to the bundler with <markers>
1514 """add an obsmarker part to the bundler with <markers>
1514
1515
1515 No part is created if markers is empty.
1516 No part is created if markers is empty.
1516 Raises ValueError if the bundler doesn't support any known obsmarker format.
1517 Raises ValueError if the bundler doesn't support any known obsmarker format.
1517 """
1518 """
1518 if not markers:
1519 if not markers:
1519 return None
1520 return None
1520
1521
1521 remoteversions = obsmarkersversion(bundler.capabilities)
1522 remoteversions = obsmarkersversion(bundler.capabilities)
1522 version = obsolete.commonversion(remoteversions)
1523 version = obsolete.commonversion(remoteversions)
1523 if version is None:
1524 if version is None:
1524 raise ValueError('bundler does not support common obsmarker format')
1525 raise ValueError('bundler does not support common obsmarker format')
1525 stream = obsolete.encodemarkers(markers, True, version=version)
1526 stream = obsolete.encodemarkers(markers, True, version=version)
1526 return bundler.newpart('obsmarkers', data=stream)
1527 return bundler.newpart('obsmarkers', data=stream)
1527
1528
1528 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1529 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1529 compopts=None):
1530 compopts=None):
1530 """Write a bundle file and return its filename.
1531 """Write a bundle file and return its filename.
1531
1532
1532 Existing files will not be overwritten.
1533 Existing files will not be overwritten.
1533 If no filename is specified, a temporary file is created.
1534 If no filename is specified, a temporary file is created.
1534 bz2 compression can be turned off.
1535 bz2 compression can be turned off.
1535 The bundle file will be deleted in case of errors.
1536 The bundle file will be deleted in case of errors.
1536 """
1537 """
1537
1538
1538 if bundletype == "HG20":
1539 if bundletype == "HG20":
1539 bundle = bundle20(ui)
1540 bundle = bundle20(ui)
1540 bundle.setcompression(compression, compopts)
1541 bundle.setcompression(compression, compopts)
1541 part = bundle.newpart('changegroup', data=cg.getchunks())
1542 part = bundle.newpart('changegroup', data=cg.getchunks())
1542 part.addparam('version', cg.version)
1543 part.addparam('version', cg.version)
1543 if 'clcount' in cg.extras:
1544 if 'clcount' in cg.extras:
1544 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1545 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1545 mandatory=False)
1546 mandatory=False)
1546 chunkiter = bundle.getchunks()
1547 chunkiter = bundle.getchunks()
1547 else:
1548 else:
1548 # compression argument is only for the bundle2 case
1549 # compression argument is only for the bundle2 case
1549 assert compression is None
1550 assert compression is None
1550 if cg.version != '01':
1551 if cg.version != '01':
1551 raise error.Abort(_('old bundle types only supports v1 '
1552 raise error.Abort(_('old bundle types only supports v1 '
1552 'changegroups'))
1553 'changegroups'))
1553 header, comp = bundletypes[bundletype]
1554 header, comp = bundletypes[bundletype]
1554 if comp not in util.compengines.supportedbundletypes:
1555 if comp not in util.compengines.supportedbundletypes:
1555 raise error.Abort(_('unknown stream compression type: %s')
1556 raise error.Abort(_('unknown stream compression type: %s')
1556 % comp)
1557 % comp)
1557 compengine = util.compengines.forbundletype(comp)
1558 compengine = util.compengines.forbundletype(comp)
1558 def chunkiter():
1559 def chunkiter():
1559 yield header
1560 yield header
1560 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1561 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1561 yield chunk
1562 yield chunk
1562 chunkiter = chunkiter()
1563 chunkiter = chunkiter()
1563
1564
1564 # parse the changegroup data, otherwise we will block
1565 # parse the changegroup data, otherwise we will block
1565 # in case of sshrepo because we don't know the end of the stream
1566 # in case of sshrepo because we don't know the end of the stream
1566 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1567 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1567
1568
1568 def combinechangegroupresults(op):
1569 def combinechangegroupresults(op):
1569 """logic to combine 0 or more addchangegroup results into one"""
1570 """logic to combine 0 or more addchangegroup results into one"""
1570 results = [r.get('return', 0)
1571 results = [r.get('return', 0)
1571 for r in op.records['changegroup']]
1572 for r in op.records['changegroup']]
1572 changedheads = 0
1573 changedheads = 0
1573 result = 1
1574 result = 1
1574 for ret in results:
1575 for ret in results:
1575 # If any changegroup result is 0, return 0
1576 # If any changegroup result is 0, return 0
1576 if ret == 0:
1577 if ret == 0:
1577 result = 0
1578 result = 0
1578 break
1579 break
1579 if ret < -1:
1580 if ret < -1:
1580 changedheads += ret + 1
1581 changedheads += ret + 1
1581 elif ret > 1:
1582 elif ret > 1:
1582 changedheads += ret - 1
1583 changedheads += ret - 1
1583 if changedheads > 0:
1584 if changedheads > 0:
1584 result = 1 + changedheads
1585 result = 1 + changedheads
1585 elif changedheads < 0:
1586 elif changedheads < 0:
1586 result = -1 + changedheads
1587 result = -1 + changedheads
1587 return result
1588 return result
1588
1589
1589 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest',
1590 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest',
1590 'targetphase'))
1591 'targetphase'))
1591 def handlechangegroup(op, inpart):
1592 def handlechangegroup(op, inpart):
1592 """apply a changegroup part on the repo
1593 """apply a changegroup part on the repo
1593
1594
1594 This is a very early implementation that will massive rework before being
1595 This is a very early implementation that will massive rework before being
1595 inflicted to any end-user.
1596 inflicted to any end-user.
1596 """
1597 """
1597 tr = op.gettransaction()
1598 tr = op.gettransaction()
1598 unpackerversion = inpart.params.get('version', '01')
1599 unpackerversion = inpart.params.get('version', '01')
1599 # We should raise an appropriate exception here
1600 # We should raise an appropriate exception here
1600 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1601 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1601 # the source and url passed here are overwritten by the one contained in
1602 # the source and url passed here are overwritten by the one contained in
1602 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1603 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1603 nbchangesets = None
1604 nbchangesets = None
1604 if 'nbchanges' in inpart.params:
1605 if 'nbchanges' in inpart.params:
1605 nbchangesets = int(inpart.params.get('nbchanges'))
1606 nbchangesets = int(inpart.params.get('nbchanges'))
1606 if ('treemanifest' in inpart.params and
1607 if ('treemanifest' in inpart.params and
1607 'treemanifest' not in op.repo.requirements):
1608 'treemanifest' not in op.repo.requirements):
1608 if len(op.repo.changelog) != 0:
1609 if len(op.repo.changelog) != 0:
1609 raise error.Abort(_(
1610 raise error.Abort(_(
1610 "bundle contains tree manifests, but local repo is "
1611 "bundle contains tree manifests, but local repo is "
1611 "non-empty and does not use tree manifests"))
1612 "non-empty and does not use tree manifests"))
1612 op.repo.requirements.add('treemanifest')
1613 op.repo.requirements.add('treemanifest')
1613 op.repo._applyopenerreqs()
1614 op.repo._applyopenerreqs()
1614 op.repo._writerequirements()
1615 op.repo._writerequirements()
1615 extrakwargs = {}
1616 extrakwargs = {}
1616 targetphase = inpart.params.get('targetphase')
1617 targetphase = inpart.params.get('targetphase')
1617 if targetphase is not None:
1618 if targetphase is not None:
1618 extrakwargs['targetphase'] = int(targetphase)
1619 extrakwargs['targetphase'] = int(targetphase)
1619 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2',
1620 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2',
1620 expectedtotal=nbchangesets, **extrakwargs)
1621 expectedtotal=nbchangesets, **extrakwargs)
1621 if op.reply is not None:
1622 if op.reply is not None:
1622 # This is definitely not the final form of this
1623 # This is definitely not the final form of this
1623 # return. But one need to start somewhere.
1624 # return. But one need to start somewhere.
1624 part = op.reply.newpart('reply:changegroup', mandatory=False)
1625 part = op.reply.newpart('reply:changegroup', mandatory=False)
1625 part.addparam(
1626 part.addparam(
1626 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1627 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1627 part.addparam('return', '%i' % ret, mandatory=False)
1628 part.addparam('return', '%i' % ret, mandatory=False)
1628 assert not inpart.read()
1629 assert not inpart.read()
1629
1630
1630 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1631 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1631 ['digest:%s' % k for k in util.DIGESTS.keys()])
1632 ['digest:%s' % k for k in util.DIGESTS.keys()])
1632 @parthandler('remote-changegroup', _remotechangegroupparams)
1633 @parthandler('remote-changegroup', _remotechangegroupparams)
1633 def handleremotechangegroup(op, inpart):
1634 def handleremotechangegroup(op, inpart):
1634 """apply a bundle10 on the repo, given an url and validation information
1635 """apply a bundle10 on the repo, given an url and validation information
1635
1636
1636 All the information about the remote bundle to import are given as
1637 All the information about the remote bundle to import are given as
1637 parameters. The parameters include:
1638 parameters. The parameters include:
1638 - url: the url to the bundle10.
1639 - url: the url to the bundle10.
1639 - size: the bundle10 file size. It is used to validate what was
1640 - size: the bundle10 file size. It is used to validate what was
1640 retrieved by the client matches the server knowledge about the bundle.
1641 retrieved by the client matches the server knowledge about the bundle.
1641 - digests: a space separated list of the digest types provided as
1642 - digests: a space separated list of the digest types provided as
1642 parameters.
1643 parameters.
1643 - digest:<digest-type>: the hexadecimal representation of the digest with
1644 - digest:<digest-type>: the hexadecimal representation of the digest with
1644 that name. Like the size, it is used to validate what was retrieved by
1645 that name. Like the size, it is used to validate what was retrieved by
1645 the client matches what the server knows about the bundle.
1646 the client matches what the server knows about the bundle.
1646
1647
1647 When multiple digest types are given, all of them are checked.
1648 When multiple digest types are given, all of them are checked.
1648 """
1649 """
1649 try:
1650 try:
1650 raw_url = inpart.params['url']
1651 raw_url = inpart.params['url']
1651 except KeyError:
1652 except KeyError:
1652 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1653 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1653 parsed_url = util.url(raw_url)
1654 parsed_url = util.url(raw_url)
1654 if parsed_url.scheme not in capabilities['remote-changegroup']:
1655 if parsed_url.scheme not in capabilities['remote-changegroup']:
1655 raise error.Abort(_('remote-changegroup does not support %s urls') %
1656 raise error.Abort(_('remote-changegroup does not support %s urls') %
1656 parsed_url.scheme)
1657 parsed_url.scheme)
1657
1658
1658 try:
1659 try:
1659 size = int(inpart.params['size'])
1660 size = int(inpart.params['size'])
1660 except ValueError:
1661 except ValueError:
1661 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1662 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1662 % 'size')
1663 % 'size')
1663 except KeyError:
1664 except KeyError:
1664 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1665 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1665
1666
1666 digests = {}
1667 digests = {}
1667 for typ in inpart.params.get('digests', '').split():
1668 for typ in inpart.params.get('digests', '').split():
1668 param = 'digest:%s' % typ
1669 param = 'digest:%s' % typ
1669 try:
1670 try:
1670 value = inpart.params[param]
1671 value = inpart.params[param]
1671 except KeyError:
1672 except KeyError:
1672 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1673 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1673 param)
1674 param)
1674 digests[typ] = value
1675 digests[typ] = value
1675
1676
1676 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1677 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1677
1678
1678 tr = op.gettransaction()
1679 tr = op.gettransaction()
1679 from . import exchange
1680 from . import exchange
1680 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1681 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1681 if not isinstance(cg, changegroup.cg1unpacker):
1682 if not isinstance(cg, changegroup.cg1unpacker):
1682 raise error.Abort(_('%s: not a bundle version 1.0') %
1683 raise error.Abort(_('%s: not a bundle version 1.0') %
1683 util.hidepassword(raw_url))
1684 util.hidepassword(raw_url))
1684 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2')
1685 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2')
1685 if op.reply is not None:
1686 if op.reply is not None:
1686 # This is definitely not the final form of this
1687 # This is definitely not the final form of this
1687 # return. But one need to start somewhere.
1688 # return. But one need to start somewhere.
1688 part = op.reply.newpart('reply:changegroup')
1689 part = op.reply.newpart('reply:changegroup')
1689 part.addparam(
1690 part.addparam(
1690 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1691 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1691 part.addparam('return', '%i' % ret, mandatory=False)
1692 part.addparam('return', '%i' % ret, mandatory=False)
1692 try:
1693 try:
1693 real_part.validate()
1694 real_part.validate()
1694 except error.Abort as e:
1695 except error.Abort as e:
1695 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1696 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1696 (util.hidepassword(raw_url), str(e)))
1697 (util.hidepassword(raw_url), str(e)))
1697 assert not inpart.read()
1698 assert not inpart.read()
1698
1699
1699 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1700 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1700 def handlereplychangegroup(op, inpart):
1701 def handlereplychangegroup(op, inpart):
1701 ret = int(inpart.params['return'])
1702 ret = int(inpart.params['return'])
1702 replyto = int(inpart.params['in-reply-to'])
1703 replyto = int(inpart.params['in-reply-to'])
1703 op.records.add('changegroup', {'return': ret}, replyto)
1704 op.records.add('changegroup', {'return': ret}, replyto)
1704
1705
1705 @parthandler('check:heads')
1706 @parthandler('check:heads')
1706 def handlecheckheads(op, inpart):
1707 def handlecheckheads(op, inpart):
1707 """check that head of the repo did not change
1708 """check that head of the repo did not change
1708
1709
1709 This is used to detect a push race when using unbundle.
1710 This is used to detect a push race when using unbundle.
1710 This replaces the "heads" argument of unbundle."""
1711 This replaces the "heads" argument of unbundle."""
1711 h = inpart.read(20)
1712 h = inpart.read(20)
1712 heads = []
1713 heads = []
1713 while len(h) == 20:
1714 while len(h) == 20:
1714 heads.append(h)
1715 heads.append(h)
1715 h = inpart.read(20)
1716 h = inpart.read(20)
1716 assert not h
1717 assert not h
1717 # Trigger a transaction so that we are guaranteed to have the lock now.
1718 # Trigger a transaction so that we are guaranteed to have the lock now.
1718 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1719 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1719 op.gettransaction()
1720 op.gettransaction()
1720 if sorted(heads) != sorted(op.repo.heads()):
1721 if sorted(heads) != sorted(op.repo.heads()):
1721 raise error.PushRaced('repository changed while pushing - '
1722 raise error.PushRaced('repository changed while pushing - '
1722 'please try again')
1723 'please try again')
1723
1724
1724 @parthandler('check:updated-heads')
1725 @parthandler('check:updated-heads')
1725 def handlecheckupdatedheads(op, inpart):
1726 def handlecheckupdatedheads(op, inpart):
1726 """check for race on the heads touched by a push
1727 """check for race on the heads touched by a push
1727
1728
1728 This is similar to 'check:heads' but focus on the heads actually updated
1729 This is similar to 'check:heads' but focus on the heads actually updated
1729 during the push. If other activities happen on unrelated heads, it is
1730 during the push. If other activities happen on unrelated heads, it is
1730 ignored.
1731 ignored.
1731
1732
1732 This allow server with high traffic to avoid push contention as long as
1733 This allow server with high traffic to avoid push contention as long as
1733 unrelated parts of the graph are involved."""
1734 unrelated parts of the graph are involved."""
1734 h = inpart.read(20)
1735 h = inpart.read(20)
1735 heads = []
1736 heads = []
1736 while len(h) == 20:
1737 while len(h) == 20:
1737 heads.append(h)
1738 heads.append(h)
1738 h = inpart.read(20)
1739 h = inpart.read(20)
1739 assert not h
1740 assert not h
1740 # trigger a transaction so that we are guaranteed to have the lock now.
1741 # trigger a transaction so that we are guaranteed to have the lock now.
1741 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1742 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1742 op.gettransaction()
1743 op.gettransaction()
1743
1744
1744 currentheads = set()
1745 currentheads = set()
1745 for ls in op.repo.branchmap().itervalues():
1746 for ls in op.repo.branchmap().itervalues():
1746 currentheads.update(ls)
1747 currentheads.update(ls)
1747
1748
1748 for h in heads:
1749 for h in heads:
1749 if h not in currentheads:
1750 if h not in currentheads:
1750 raise error.PushRaced('repository changed while pushing - '
1751 raise error.PushRaced('repository changed while pushing - '
1751 'please try again')
1752 'please try again')
1752
1753
1753 @parthandler('check:phases')
1754 @parthandler('check:phases')
1754 def handlecheckphases(op, inpart):
1755 def handlecheckphases(op, inpart):
1755 """check that phase boundaries of the repository did not change
1756 """check that phase boundaries of the repository did not change
1756
1757
1757 This is used to detect a push race.
1758 This is used to detect a push race.
1758 """
1759 """
1759 phasetonodes = phases.binarydecode(inpart)
1760 phasetonodes = phases.binarydecode(inpart)
1760 unfi = op.repo.unfiltered()
1761 unfi = op.repo.unfiltered()
1761 cl = unfi.changelog
1762 cl = unfi.changelog
1762 phasecache = unfi._phasecache
1763 phasecache = unfi._phasecache
1763 msg = ('repository changed while pushing - please try again '
1764 msg = ('repository changed while pushing - please try again '
1764 '(%s is %s expected %s)')
1765 '(%s is %s expected %s)')
1765 for expectedphase, nodes in enumerate(phasetonodes):
1766 for expectedphase, nodes in enumerate(phasetonodes):
1766 for n in nodes:
1767 for n in nodes:
1767 actualphase = phasecache.phase(unfi, cl.rev(n))
1768 actualphase = phasecache.phase(unfi, cl.rev(n))
1768 if actualphase != expectedphase:
1769 if actualphase != expectedphase:
1769 finalmsg = msg % (nodemod.short(n),
1770 finalmsg = msg % (nodemod.short(n),
1770 phases.phasenames[actualphase],
1771 phases.phasenames[actualphase],
1771 phases.phasenames[expectedphase])
1772 phases.phasenames[expectedphase])
1772 raise error.PushRaced(finalmsg)
1773 raise error.PushRaced(finalmsg)
1773
1774
1774 @parthandler('output')
1775 @parthandler('output')
1775 def handleoutput(op, inpart):
1776 def handleoutput(op, inpart):
1776 """forward output captured on the server to the client"""
1777 """forward output captured on the server to the client"""
1777 for line in inpart.read().splitlines():
1778 for line in inpart.read().splitlines():
1778 op.ui.status(_('remote: %s\n') % line)
1779 op.ui.status(_('remote: %s\n') % line)
1779
1780
1780 @parthandler('replycaps')
1781 @parthandler('replycaps')
1781 def handlereplycaps(op, inpart):
1782 def handlereplycaps(op, inpart):
1782 """Notify that a reply bundle should be created
1783 """Notify that a reply bundle should be created
1783
1784
1784 The payload contains the capabilities information for the reply"""
1785 The payload contains the capabilities information for the reply"""
1785 caps = decodecaps(inpart.read())
1786 caps = decodecaps(inpart.read())
1786 if op.reply is None:
1787 if op.reply is None:
1787 op.reply = bundle20(op.ui, caps)
1788 op.reply = bundle20(op.ui, caps)
1788
1789
1789 class AbortFromPart(error.Abort):
1790 class AbortFromPart(error.Abort):
1790 """Sub-class of Abort that denotes an error from a bundle2 part."""
1791 """Sub-class of Abort that denotes an error from a bundle2 part."""
1791
1792
1792 @parthandler('error:abort', ('message', 'hint'))
1793 @parthandler('error:abort', ('message', 'hint'))
1793 def handleerrorabort(op, inpart):
1794 def handleerrorabort(op, inpart):
1794 """Used to transmit abort error over the wire"""
1795 """Used to transmit abort error over the wire"""
1795 raise AbortFromPart(inpart.params['message'],
1796 raise AbortFromPart(inpart.params['message'],
1796 hint=inpart.params.get('hint'))
1797 hint=inpart.params.get('hint'))
1797
1798
1798 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1799 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1799 'in-reply-to'))
1800 'in-reply-to'))
1800 def handleerrorpushkey(op, inpart):
1801 def handleerrorpushkey(op, inpart):
1801 """Used to transmit failure of a mandatory pushkey over the wire"""
1802 """Used to transmit failure of a mandatory pushkey over the wire"""
1802 kwargs = {}
1803 kwargs = {}
1803 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1804 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1804 value = inpart.params.get(name)
1805 value = inpart.params.get(name)
1805 if value is not None:
1806 if value is not None:
1806 kwargs[name] = value
1807 kwargs[name] = value
1807 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1808 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1808
1809
1809 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1810 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1810 def handleerrorunsupportedcontent(op, inpart):
1811 def handleerrorunsupportedcontent(op, inpart):
1811 """Used to transmit unknown content error over the wire"""
1812 """Used to transmit unknown content error over the wire"""
1812 kwargs = {}
1813 kwargs = {}
1813 parttype = inpart.params.get('parttype')
1814 parttype = inpart.params.get('parttype')
1814 if parttype is not None:
1815 if parttype is not None:
1815 kwargs['parttype'] = parttype
1816 kwargs['parttype'] = parttype
1816 params = inpart.params.get('params')
1817 params = inpart.params.get('params')
1817 if params is not None:
1818 if params is not None:
1818 kwargs['params'] = params.split('\0')
1819 kwargs['params'] = params.split('\0')
1819
1820
1820 raise error.BundleUnknownFeatureError(**kwargs)
1821 raise error.BundleUnknownFeatureError(**kwargs)
1821
1822
1822 @parthandler('error:pushraced', ('message',))
1823 @parthandler('error:pushraced', ('message',))
1823 def handleerrorpushraced(op, inpart):
1824 def handleerrorpushraced(op, inpart):
1824 """Used to transmit push race error over the wire"""
1825 """Used to transmit push race error over the wire"""
1825 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1826 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1826
1827
1827 @parthandler('listkeys', ('namespace',))
1828 @parthandler('listkeys', ('namespace',))
1828 def handlelistkeys(op, inpart):
1829 def handlelistkeys(op, inpart):
1829 """retrieve pushkey namespace content stored in a bundle2"""
1830 """retrieve pushkey namespace content stored in a bundle2"""
1830 namespace = inpart.params['namespace']
1831 namespace = inpart.params['namespace']
1831 r = pushkey.decodekeys(inpart.read())
1832 r = pushkey.decodekeys(inpart.read())
1832 op.records.add('listkeys', (namespace, r))
1833 op.records.add('listkeys', (namespace, r))
1833
1834
1834 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1835 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1835 def handlepushkey(op, inpart):
1836 def handlepushkey(op, inpart):
1836 """process a pushkey request"""
1837 """process a pushkey request"""
1837 dec = pushkey.decode
1838 dec = pushkey.decode
1838 namespace = dec(inpart.params['namespace'])
1839 namespace = dec(inpart.params['namespace'])
1839 key = dec(inpart.params['key'])
1840 key = dec(inpart.params['key'])
1840 old = dec(inpart.params['old'])
1841 old = dec(inpart.params['old'])
1841 new = dec(inpart.params['new'])
1842 new = dec(inpart.params['new'])
1842 # Grab the transaction to ensure that we have the lock before performing the
1843 # Grab the transaction to ensure that we have the lock before performing the
1843 # pushkey.
1844 # pushkey.
1844 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1845 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1845 op.gettransaction()
1846 op.gettransaction()
1846 ret = op.repo.pushkey(namespace, key, old, new)
1847 ret = op.repo.pushkey(namespace, key, old, new)
1847 record = {'namespace': namespace,
1848 record = {'namespace': namespace,
1848 'key': key,
1849 'key': key,
1849 'old': old,
1850 'old': old,
1850 'new': new}
1851 'new': new}
1851 op.records.add('pushkey', record)
1852 op.records.add('pushkey', record)
1852 if op.reply is not None:
1853 if op.reply is not None:
1853 rpart = op.reply.newpart('reply:pushkey')
1854 rpart = op.reply.newpart('reply:pushkey')
1854 rpart.addparam(
1855 rpart.addparam(
1855 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1856 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1856 rpart.addparam('return', '%i' % ret, mandatory=False)
1857 rpart.addparam('return', '%i' % ret, mandatory=False)
1857 if inpart.mandatory and not ret:
1858 if inpart.mandatory and not ret:
1858 kwargs = {}
1859 kwargs = {}
1859 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1860 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1860 if key in inpart.params:
1861 if key in inpart.params:
1861 kwargs[key] = inpart.params[key]
1862 kwargs[key] = inpart.params[key]
1862 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1863 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1863
1864
1864 @parthandler('phase-heads')
1865 @parthandler('phase-heads')
1865 def handlephases(op, inpart):
1866 def handlephases(op, inpart):
1866 """apply phases from bundle part to repo"""
1867 """apply phases from bundle part to repo"""
1867 headsbyphase = phases.binarydecode(inpart)
1868 headsbyphase = phases.binarydecode(inpart)
1868 phases.updatephases(op.repo.unfiltered(), op.gettransaction, headsbyphase)
1869 phases.updatephases(op.repo.unfiltered(), op.gettransaction, headsbyphase)
1869
1870
1870 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1871 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1871 def handlepushkeyreply(op, inpart):
1872 def handlepushkeyreply(op, inpart):
1872 """retrieve the result of a pushkey request"""
1873 """retrieve the result of a pushkey request"""
1873 ret = int(inpart.params['return'])
1874 ret = int(inpart.params['return'])
1874 partid = int(inpart.params['in-reply-to'])
1875 partid = int(inpart.params['in-reply-to'])
1875 op.records.add('pushkey', {'return': ret}, partid)
1876 op.records.add('pushkey', {'return': ret}, partid)
1876
1877
1877 @parthandler('obsmarkers')
1878 @parthandler('obsmarkers')
1878 def handleobsmarker(op, inpart):
1879 def handleobsmarker(op, inpart):
1879 """add a stream of obsmarkers to the repo"""
1880 """add a stream of obsmarkers to the repo"""
1880 tr = op.gettransaction()
1881 tr = op.gettransaction()
1881 markerdata = inpart.read()
1882 markerdata = inpart.read()
1882 if op.ui.config('experimental', 'obsmarkers-exchange-debug'):
1883 if op.ui.config('experimental', 'obsmarkers-exchange-debug'):
1883 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1884 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1884 % len(markerdata))
1885 % len(markerdata))
1885 # The mergemarkers call will crash if marker creation is not enabled.
1886 # The mergemarkers call will crash if marker creation is not enabled.
1886 # we want to avoid this if the part is advisory.
1887 # we want to avoid this if the part is advisory.
1887 if not inpart.mandatory and op.repo.obsstore.readonly:
1888 if not inpart.mandatory and op.repo.obsstore.readonly:
1888 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1889 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1889 return
1890 return
1890 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1891 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1891 op.repo.invalidatevolatilesets()
1892 op.repo.invalidatevolatilesets()
1892 if new:
1893 if new:
1893 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1894 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1894 op.records.add('obsmarkers', {'new': new})
1895 op.records.add('obsmarkers', {'new': new})
1895 if op.reply is not None:
1896 if op.reply is not None:
1896 rpart = op.reply.newpart('reply:obsmarkers')
1897 rpart = op.reply.newpart('reply:obsmarkers')
1897 rpart.addparam(
1898 rpart.addparam(
1898 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1899 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1899 rpart.addparam('new', '%i' % new, mandatory=False)
1900 rpart.addparam('new', '%i' % new, mandatory=False)
1900
1901
1901
1902
1902 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1903 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1903 def handleobsmarkerreply(op, inpart):
1904 def handleobsmarkerreply(op, inpart):
1904 """retrieve the result of a pushkey request"""
1905 """retrieve the result of a pushkey request"""
1905 ret = int(inpart.params['new'])
1906 ret = int(inpart.params['new'])
1906 partid = int(inpart.params['in-reply-to'])
1907 partid = int(inpart.params['in-reply-to'])
1907 op.records.add('obsmarkers', {'new': ret}, partid)
1908 op.records.add('obsmarkers', {'new': ret}, partid)
1908
1909
1909 @parthandler('hgtagsfnodes')
1910 @parthandler('hgtagsfnodes')
1910 def handlehgtagsfnodes(op, inpart):
1911 def handlehgtagsfnodes(op, inpart):
1911 """Applies .hgtags fnodes cache entries to the local repo.
1912 """Applies .hgtags fnodes cache entries to the local repo.
1912
1913
1913 Payload is pairs of 20 byte changeset nodes and filenodes.
1914 Payload is pairs of 20 byte changeset nodes and filenodes.
1914 """
1915 """
1915 # Grab the transaction so we ensure that we have the lock at this point.
1916 # Grab the transaction so we ensure that we have the lock at this point.
1916 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1917 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1917 op.gettransaction()
1918 op.gettransaction()
1918 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1919 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1919
1920
1920 count = 0
1921 count = 0
1921 while True:
1922 while True:
1922 node = inpart.read(20)
1923 node = inpart.read(20)
1923 fnode = inpart.read(20)
1924 fnode = inpart.read(20)
1924 if len(node) < 20 or len(fnode) < 20:
1925 if len(node) < 20 or len(fnode) < 20:
1925 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1926 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1926 break
1927 break
1927 cache.setfnode(node, fnode)
1928 cache.setfnode(node, fnode)
1928 count += 1
1929 count += 1
1929
1930
1930 cache.write()
1931 cache.write()
1931 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
1932 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
1932
1933
1933 @parthandler('pushvars')
1934 @parthandler('pushvars')
1934 def bundle2getvars(op, part):
1935 def bundle2getvars(op, part):
1935 '''unbundle a bundle2 containing shellvars on the server'''
1936 '''unbundle a bundle2 containing shellvars on the server'''
1936 # An option to disable unbundling on server-side for security reasons
1937 # An option to disable unbundling on server-side for security reasons
1937 if op.ui.configbool('push', 'pushvars.server'):
1938 if op.ui.configbool('push', 'pushvars.server'):
1938 hookargs = {}
1939 hookargs = {}
1939 for key, value in part.advisoryparams:
1940 for key, value in part.advisoryparams:
1940 key = key.upper()
1941 key = key.upper()
1941 # We want pushed variables to have USERVAR_ prepended so we know
1942 # We want pushed variables to have USERVAR_ prepended so we know
1942 # they came from the --pushvar flag.
1943 # they came from the --pushvar flag.
1943 key = "USERVAR_" + key
1944 key = "USERVAR_" + key
1944 hookargs[key] = value
1945 hookargs[key] = value
1945 op.addhookargs(hookargs)
1946 op.addhookargs(hookargs)
General Comments 0
You need to be logged in to leave comments. Login now