##// END OF EJS Templates
bundle2: handle compression in _forwardchunks...
Joerg Sonnenberger -
r42319:29569f2d default
parent child Browse files
Show More
@@ -1,2326 +1,2335 b''
1 # bundle2.py - generic container format to transmit arbitrary data.
1 # bundle2.py - generic container format to transmit arbitrary data.
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7 """Handling of the new bundle2 format
7 """Handling of the new bundle2 format
8
8
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 payloads in an application agnostic way. It consist in a sequence of "parts"
10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 that will be handed to and processed by the application layer.
11 that will be handed to and processed by the application layer.
12
12
13
13
14 General format architecture
14 General format architecture
15 ===========================
15 ===========================
16
16
17 The format is architectured as follow
17 The format is architectured as follow
18
18
19 - magic string
19 - magic string
20 - stream level parameters
20 - stream level parameters
21 - payload parts (any number)
21 - payload parts (any number)
22 - end of stream marker.
22 - end of stream marker.
23
23
24 the Binary format
24 the Binary format
25 ============================
25 ============================
26
26
27 All numbers are unsigned and big-endian.
27 All numbers are unsigned and big-endian.
28
28
29 stream level parameters
29 stream level parameters
30 ------------------------
30 ------------------------
31
31
32 Binary format is as follow
32 Binary format is as follow
33
33
34 :params size: int32
34 :params size: int32
35
35
36 The total number of Bytes used by the parameters
36 The total number of Bytes used by the parameters
37
37
38 :params value: arbitrary number of Bytes
38 :params value: arbitrary number of Bytes
39
39
40 A blob of `params size` containing the serialized version of all stream level
40 A blob of `params size` containing the serialized version of all stream level
41 parameters.
41 parameters.
42
42
43 The blob contains a space separated list of parameters. Parameters with value
43 The blob contains a space separated list of parameters. Parameters with value
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45
45
46 Empty name are obviously forbidden.
46 Empty name are obviously forbidden.
47
47
48 Name MUST start with a letter. If this first letter is lower case, the
48 Name MUST start with a letter. If this first letter is lower case, the
49 parameter is advisory and can be safely ignored. However when the first
49 parameter is advisory and can be safely ignored. However when the first
50 letter is capital, the parameter is mandatory and the bundling process MUST
50 letter is capital, the parameter is mandatory and the bundling process MUST
51 stop if he is not able to proceed it.
51 stop if he is not able to proceed it.
52
52
53 Stream parameters use a simple textual format for two main reasons:
53 Stream parameters use a simple textual format for two main reasons:
54
54
55 - Stream level parameters should remain simple and we want to discourage any
55 - Stream level parameters should remain simple and we want to discourage any
56 crazy usage.
56 crazy usage.
57 - Textual data allow easy human inspection of a bundle2 header in case of
57 - Textual data allow easy human inspection of a bundle2 header in case of
58 troubles.
58 troubles.
59
59
60 Any Applicative level options MUST go into a bundle2 part instead.
60 Any Applicative level options MUST go into a bundle2 part instead.
61
61
62 Payload part
62 Payload part
63 ------------------------
63 ------------------------
64
64
65 Binary format is as follow
65 Binary format is as follow
66
66
67 :header size: int32
67 :header size: int32
68
68
69 The total number of Bytes used by the part header. When the header is empty
69 The total number of Bytes used by the part header. When the header is empty
70 (size = 0) this is interpreted as the end of stream marker.
70 (size = 0) this is interpreted as the end of stream marker.
71
71
72 :header:
72 :header:
73
73
74 The header defines how to interpret the part. It contains two piece of
74 The header defines how to interpret the part. It contains two piece of
75 data: the part type, and the part parameters.
75 data: the part type, and the part parameters.
76
76
77 The part type is used to route an application level handler, that can
77 The part type is used to route an application level handler, that can
78 interpret payload.
78 interpret payload.
79
79
80 Part parameters are passed to the application level handler. They are
80 Part parameters are passed to the application level handler. They are
81 meant to convey information that will help the application level object to
81 meant to convey information that will help the application level object to
82 interpret the part payload.
82 interpret the part payload.
83
83
84 The binary format of the header is has follow
84 The binary format of the header is has follow
85
85
86 :typesize: (one byte)
86 :typesize: (one byte)
87
87
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89
89
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 to this part.
91 to this part.
92
92
93 :parameters:
93 :parameters:
94
94
95 Part's parameter may have arbitrary content, the binary structure is::
95 Part's parameter may have arbitrary content, the binary structure is::
96
96
97 <mandatory-count><advisory-count><param-sizes><param-data>
97 <mandatory-count><advisory-count><param-sizes><param-data>
98
98
99 :mandatory-count: 1 byte, number of mandatory parameters
99 :mandatory-count: 1 byte, number of mandatory parameters
100
100
101 :advisory-count: 1 byte, number of advisory parameters
101 :advisory-count: 1 byte, number of advisory parameters
102
102
103 :param-sizes:
103 :param-sizes:
104
104
105 N couple of bytes, where N is the total number of parameters. Each
105 N couple of bytes, where N is the total number of parameters. Each
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107
107
108 :param-data:
108 :param-data:
109
109
110 A blob of bytes from which each parameter key and value can be
110 A blob of bytes from which each parameter key and value can be
111 retrieved using the list of size couples stored in the previous
111 retrieved using the list of size couples stored in the previous
112 field.
112 field.
113
113
114 Mandatory parameters comes first, then the advisory ones.
114 Mandatory parameters comes first, then the advisory ones.
115
115
116 Each parameter's key MUST be unique within the part.
116 Each parameter's key MUST be unique within the part.
117
117
118 :payload:
118 :payload:
119
119
120 payload is a series of `<chunksize><chunkdata>`.
120 payload is a series of `<chunksize><chunkdata>`.
121
121
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124
124
125 The current implementation always produces either zero or one chunk.
125 The current implementation always produces either zero or one chunk.
126 This is an implementation limitation that will ultimately be lifted.
126 This is an implementation limitation that will ultimately be lifted.
127
127
128 `chunksize` can be negative to trigger special case processing. No such
128 `chunksize` can be negative to trigger special case processing. No such
129 processing is in place yet.
129 processing is in place yet.
130
130
131 Bundle processing
131 Bundle processing
132 ============================
132 ============================
133
133
134 Each part is processed in order using a "part handler". Handler are registered
134 Each part is processed in order using a "part handler". Handler are registered
135 for a certain part type.
135 for a certain part type.
136
136
137 The matching of a part to its handler is case insensitive. The case of the
137 The matching of a part to its handler is case insensitive. The case of the
138 part type is used to know if a part is mandatory or advisory. If the Part type
138 part type is used to know if a part is mandatory or advisory. If the Part type
139 contains any uppercase char it is considered mandatory. When no handler is
139 contains any uppercase char it is considered mandatory. When no handler is
140 known for a Mandatory part, the process is aborted and an exception is raised.
140 known for a Mandatory part, the process is aborted and an exception is raised.
141 If the part is advisory and no handler is known, the part is ignored. When the
141 If the part is advisory and no handler is known, the part is ignored. When the
142 process is aborted, the full bundle is still read from the stream to keep the
142 process is aborted, the full bundle is still read from the stream to keep the
143 channel usable. But none of the part read from an abort are processed. In the
143 channel usable. But none of the part read from an abort are processed. In the
144 future, dropping the stream may become an option for channel we do not care to
144 future, dropping the stream may become an option for channel we do not care to
145 preserve.
145 preserve.
146 """
146 """
147
147
148 from __future__ import absolute_import, division
148 from __future__ import absolute_import, division
149
149
150 import collections
150 import collections
151 import errno
151 import errno
152 import os
152 import os
153 import re
153 import re
154 import string
154 import string
155 import struct
155 import struct
156 import sys
156 import sys
157
157
158 from .i18n import _
158 from .i18n import _
159 from . import (
159 from . import (
160 bookmarks,
160 bookmarks,
161 changegroup,
161 changegroup,
162 encoding,
162 encoding,
163 error,
163 error,
164 node as nodemod,
164 node as nodemod,
165 obsolete,
165 obsolete,
166 phases,
166 phases,
167 pushkey,
167 pushkey,
168 pycompat,
168 pycompat,
169 streamclone,
169 streamclone,
170 tags,
170 tags,
171 url,
171 url,
172 util,
172 util,
173 )
173 )
174 from .utils import (
174 from .utils import (
175 stringutil,
175 stringutil,
176 )
176 )
177
177
178 urlerr = util.urlerr
178 urlerr = util.urlerr
179 urlreq = util.urlreq
179 urlreq = util.urlreq
180
180
181 _pack = struct.pack
181 _pack = struct.pack
182 _unpack = struct.unpack
182 _unpack = struct.unpack
183
183
184 _fstreamparamsize = '>i'
184 _fstreamparamsize = '>i'
185 _fpartheadersize = '>i'
185 _fpartheadersize = '>i'
186 _fparttypesize = '>B'
186 _fparttypesize = '>B'
187 _fpartid = '>I'
187 _fpartid = '>I'
188 _fpayloadsize = '>i'
188 _fpayloadsize = '>i'
189 _fpartparamcount = '>BB'
189 _fpartparamcount = '>BB'
190
190
191 preferedchunksize = 32768
191 preferedchunksize = 32768
192
192
193 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
193 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
194
194
195 def outdebug(ui, message):
195 def outdebug(ui, message):
196 """debug regarding output stream (bundling)"""
196 """debug regarding output stream (bundling)"""
197 if ui.configbool('devel', 'bundle2.debug'):
197 if ui.configbool('devel', 'bundle2.debug'):
198 ui.debug('bundle2-output: %s\n' % message)
198 ui.debug('bundle2-output: %s\n' % message)
199
199
200 def indebug(ui, message):
200 def indebug(ui, message):
201 """debug on input stream (unbundling)"""
201 """debug on input stream (unbundling)"""
202 if ui.configbool('devel', 'bundle2.debug'):
202 if ui.configbool('devel', 'bundle2.debug'):
203 ui.debug('bundle2-input: %s\n' % message)
203 ui.debug('bundle2-input: %s\n' % message)
204
204
205 def validateparttype(parttype):
205 def validateparttype(parttype):
206 """raise ValueError if a parttype contains invalid character"""
206 """raise ValueError if a parttype contains invalid character"""
207 if _parttypeforbidden.search(parttype):
207 if _parttypeforbidden.search(parttype):
208 raise ValueError(parttype)
208 raise ValueError(parttype)
209
209
210 def _makefpartparamsizes(nbparams):
210 def _makefpartparamsizes(nbparams):
211 """return a struct format to read part parameter sizes
211 """return a struct format to read part parameter sizes
212
212
213 The number parameters is variable so we need to build that format
213 The number parameters is variable so we need to build that format
214 dynamically.
214 dynamically.
215 """
215 """
216 return '>'+('BB'*nbparams)
216 return '>'+('BB'*nbparams)
217
217
218 parthandlermapping = {}
218 parthandlermapping = {}
219
219
220 def parthandler(parttype, params=()):
220 def parthandler(parttype, params=()):
221 """decorator that register a function as a bundle2 part handler
221 """decorator that register a function as a bundle2 part handler
222
222
223 eg::
223 eg::
224
224
225 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
225 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
226 def myparttypehandler(...):
226 def myparttypehandler(...):
227 '''process a part of type "my part".'''
227 '''process a part of type "my part".'''
228 ...
228 ...
229 """
229 """
230 validateparttype(parttype)
230 validateparttype(parttype)
231 def _decorator(func):
231 def _decorator(func):
232 lparttype = parttype.lower() # enforce lower case matching.
232 lparttype = parttype.lower() # enforce lower case matching.
233 assert lparttype not in parthandlermapping
233 assert lparttype not in parthandlermapping
234 parthandlermapping[lparttype] = func
234 parthandlermapping[lparttype] = func
235 func.params = frozenset(params)
235 func.params = frozenset(params)
236 return func
236 return func
237 return _decorator
237 return _decorator
238
238
239 class unbundlerecords(object):
239 class unbundlerecords(object):
240 """keep record of what happens during and unbundle
240 """keep record of what happens during and unbundle
241
241
242 New records are added using `records.add('cat', obj)`. Where 'cat' is a
242 New records are added using `records.add('cat', obj)`. Where 'cat' is a
243 category of record and obj is an arbitrary object.
243 category of record and obj is an arbitrary object.
244
244
245 `records['cat']` will return all entries of this category 'cat'.
245 `records['cat']` will return all entries of this category 'cat'.
246
246
247 Iterating on the object itself will yield `('category', obj)` tuples
247 Iterating on the object itself will yield `('category', obj)` tuples
248 for all entries.
248 for all entries.
249
249
250 All iterations happens in chronological order.
250 All iterations happens in chronological order.
251 """
251 """
252
252
253 def __init__(self):
253 def __init__(self):
254 self._categories = {}
254 self._categories = {}
255 self._sequences = []
255 self._sequences = []
256 self._replies = {}
256 self._replies = {}
257
257
258 def add(self, category, entry, inreplyto=None):
258 def add(self, category, entry, inreplyto=None):
259 """add a new record of a given category.
259 """add a new record of a given category.
260
260
261 The entry can then be retrieved in the list returned by
261 The entry can then be retrieved in the list returned by
262 self['category']."""
262 self['category']."""
263 self._categories.setdefault(category, []).append(entry)
263 self._categories.setdefault(category, []).append(entry)
264 self._sequences.append((category, entry))
264 self._sequences.append((category, entry))
265 if inreplyto is not None:
265 if inreplyto is not None:
266 self.getreplies(inreplyto).add(category, entry)
266 self.getreplies(inreplyto).add(category, entry)
267
267
268 def getreplies(self, partid):
268 def getreplies(self, partid):
269 """get the records that are replies to a specific part"""
269 """get the records that are replies to a specific part"""
270 return self._replies.setdefault(partid, unbundlerecords())
270 return self._replies.setdefault(partid, unbundlerecords())
271
271
272 def __getitem__(self, cat):
272 def __getitem__(self, cat):
273 return tuple(self._categories.get(cat, ()))
273 return tuple(self._categories.get(cat, ()))
274
274
275 def __iter__(self):
275 def __iter__(self):
276 return iter(self._sequences)
276 return iter(self._sequences)
277
277
278 def __len__(self):
278 def __len__(self):
279 return len(self._sequences)
279 return len(self._sequences)
280
280
281 def __nonzero__(self):
281 def __nonzero__(self):
282 return bool(self._sequences)
282 return bool(self._sequences)
283
283
284 __bool__ = __nonzero__
284 __bool__ = __nonzero__
285
285
286 class bundleoperation(object):
286 class bundleoperation(object):
287 """an object that represents a single bundling process
287 """an object that represents a single bundling process
288
288
289 Its purpose is to carry unbundle-related objects and states.
289 Its purpose is to carry unbundle-related objects and states.
290
290
291 A new object should be created at the beginning of each bundle processing.
291 A new object should be created at the beginning of each bundle processing.
292 The object is to be returned by the processing function.
292 The object is to be returned by the processing function.
293
293
294 The object has very little content now it will ultimately contain:
294 The object has very little content now it will ultimately contain:
295 * an access to the repo the bundle is applied to,
295 * an access to the repo the bundle is applied to,
296 * a ui object,
296 * a ui object,
297 * a way to retrieve a transaction to add changes to the repo,
297 * a way to retrieve a transaction to add changes to the repo,
298 * a way to record the result of processing each part,
298 * a way to record the result of processing each part,
299 * a way to construct a bundle response when applicable.
299 * a way to construct a bundle response when applicable.
300 """
300 """
301
301
302 def __init__(self, repo, transactiongetter, captureoutput=True, source=''):
302 def __init__(self, repo, transactiongetter, captureoutput=True, source=''):
303 self.repo = repo
303 self.repo = repo
304 self.ui = repo.ui
304 self.ui = repo.ui
305 self.records = unbundlerecords()
305 self.records = unbundlerecords()
306 self.reply = None
306 self.reply = None
307 self.captureoutput = captureoutput
307 self.captureoutput = captureoutput
308 self.hookargs = {}
308 self.hookargs = {}
309 self._gettransaction = transactiongetter
309 self._gettransaction = transactiongetter
310 # carries value that can modify part behavior
310 # carries value that can modify part behavior
311 self.modes = {}
311 self.modes = {}
312 self.source = source
312 self.source = source
313
313
314 def gettransaction(self):
314 def gettransaction(self):
315 transaction = self._gettransaction()
315 transaction = self._gettransaction()
316
316
317 if self.hookargs:
317 if self.hookargs:
318 # the ones added to the transaction supercede those added
318 # the ones added to the transaction supercede those added
319 # to the operation.
319 # to the operation.
320 self.hookargs.update(transaction.hookargs)
320 self.hookargs.update(transaction.hookargs)
321 transaction.hookargs = self.hookargs
321 transaction.hookargs = self.hookargs
322
322
323 # mark the hookargs as flushed. further attempts to add to
323 # mark the hookargs as flushed. further attempts to add to
324 # hookargs will result in an abort.
324 # hookargs will result in an abort.
325 self.hookargs = None
325 self.hookargs = None
326
326
327 return transaction
327 return transaction
328
328
329 def addhookargs(self, hookargs):
329 def addhookargs(self, hookargs):
330 if self.hookargs is None:
330 if self.hookargs is None:
331 raise error.ProgrammingError('attempted to add hookargs to '
331 raise error.ProgrammingError('attempted to add hookargs to '
332 'operation after transaction started')
332 'operation after transaction started')
333 self.hookargs.update(hookargs)
333 self.hookargs.update(hookargs)
334
334
335 class TransactionUnavailable(RuntimeError):
335 class TransactionUnavailable(RuntimeError):
336 pass
336 pass
337
337
338 def _notransaction():
338 def _notransaction():
339 """default method to get a transaction while processing a bundle
339 """default method to get a transaction while processing a bundle
340
340
341 Raise an exception to highlight the fact that no transaction was expected
341 Raise an exception to highlight the fact that no transaction was expected
342 to be created"""
342 to be created"""
343 raise TransactionUnavailable()
343 raise TransactionUnavailable()
344
344
345 def applybundle(repo, unbundler, tr, source, url=None, **kwargs):
345 def applybundle(repo, unbundler, tr, source, url=None, **kwargs):
346 # transform me into unbundler.apply() as soon as the freeze is lifted
346 # transform me into unbundler.apply() as soon as the freeze is lifted
347 if isinstance(unbundler, unbundle20):
347 if isinstance(unbundler, unbundle20):
348 tr.hookargs['bundle2'] = '1'
348 tr.hookargs['bundle2'] = '1'
349 if source is not None and 'source' not in tr.hookargs:
349 if source is not None and 'source' not in tr.hookargs:
350 tr.hookargs['source'] = source
350 tr.hookargs['source'] = source
351 if url is not None and 'url' not in tr.hookargs:
351 if url is not None and 'url' not in tr.hookargs:
352 tr.hookargs['url'] = url
352 tr.hookargs['url'] = url
353 return processbundle(repo, unbundler, lambda: tr, source=source)
353 return processbundle(repo, unbundler, lambda: tr, source=source)
354 else:
354 else:
355 # the transactiongetter won't be used, but we might as well set it
355 # the transactiongetter won't be used, but we might as well set it
356 op = bundleoperation(repo, lambda: tr, source=source)
356 op = bundleoperation(repo, lambda: tr, source=source)
357 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
357 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
358 return op
358 return op
359
359
360 class partiterator(object):
360 class partiterator(object):
361 def __init__(self, repo, op, unbundler):
361 def __init__(self, repo, op, unbundler):
362 self.repo = repo
362 self.repo = repo
363 self.op = op
363 self.op = op
364 self.unbundler = unbundler
364 self.unbundler = unbundler
365 self.iterator = None
365 self.iterator = None
366 self.count = 0
366 self.count = 0
367 self.current = None
367 self.current = None
368
368
369 def __enter__(self):
369 def __enter__(self):
370 def func():
370 def func():
371 itr = enumerate(self.unbundler.iterparts())
371 itr = enumerate(self.unbundler.iterparts())
372 for count, p in itr:
372 for count, p in itr:
373 self.count = count
373 self.count = count
374 self.current = p
374 self.current = p
375 yield p
375 yield p
376 p.consume()
376 p.consume()
377 self.current = None
377 self.current = None
378 self.iterator = func()
378 self.iterator = func()
379 return self.iterator
379 return self.iterator
380
380
381 def __exit__(self, type, exc, tb):
381 def __exit__(self, type, exc, tb):
382 if not self.iterator:
382 if not self.iterator:
383 return
383 return
384
384
385 # Only gracefully abort in a normal exception situation. User aborts
385 # Only gracefully abort in a normal exception situation. User aborts
386 # like Ctrl+C throw a KeyboardInterrupt which is not a base Exception,
386 # like Ctrl+C throw a KeyboardInterrupt which is not a base Exception,
387 # and should not gracefully cleanup.
387 # and should not gracefully cleanup.
388 if isinstance(exc, Exception):
388 if isinstance(exc, Exception):
389 # Any exceptions seeking to the end of the bundle at this point are
389 # Any exceptions seeking to the end of the bundle at this point are
390 # almost certainly related to the underlying stream being bad.
390 # almost certainly related to the underlying stream being bad.
391 # And, chances are that the exception we're handling is related to
391 # And, chances are that the exception we're handling is related to
392 # getting in that bad state. So, we swallow the seeking error and
392 # getting in that bad state. So, we swallow the seeking error and
393 # re-raise the original error.
393 # re-raise the original error.
394 seekerror = False
394 seekerror = False
395 try:
395 try:
396 if self.current:
396 if self.current:
397 # consume the part content to not corrupt the stream.
397 # consume the part content to not corrupt the stream.
398 self.current.consume()
398 self.current.consume()
399
399
400 for part in self.iterator:
400 for part in self.iterator:
401 # consume the bundle content
401 # consume the bundle content
402 part.consume()
402 part.consume()
403 except Exception:
403 except Exception:
404 seekerror = True
404 seekerror = True
405
405
406 # Small hack to let caller code distinguish exceptions from bundle2
406 # Small hack to let caller code distinguish exceptions from bundle2
407 # processing from processing the old format. This is mostly needed
407 # processing from processing the old format. This is mostly needed
408 # to handle different return codes to unbundle according to the type
408 # to handle different return codes to unbundle according to the type
409 # of bundle. We should probably clean up or drop this return code
409 # of bundle. We should probably clean up or drop this return code
410 # craziness in a future version.
410 # craziness in a future version.
411 exc.duringunbundle2 = True
411 exc.duringunbundle2 = True
412 salvaged = []
412 salvaged = []
413 replycaps = None
413 replycaps = None
414 if self.op.reply is not None:
414 if self.op.reply is not None:
415 salvaged = self.op.reply.salvageoutput()
415 salvaged = self.op.reply.salvageoutput()
416 replycaps = self.op.reply.capabilities
416 replycaps = self.op.reply.capabilities
417 exc._replycaps = replycaps
417 exc._replycaps = replycaps
418 exc._bundle2salvagedoutput = salvaged
418 exc._bundle2salvagedoutput = salvaged
419
419
420 # Re-raising from a variable loses the original stack. So only use
420 # Re-raising from a variable loses the original stack. So only use
421 # that form if we need to.
421 # that form if we need to.
422 if seekerror:
422 if seekerror:
423 raise exc
423 raise exc
424
424
425 self.repo.ui.debug('bundle2-input-bundle: %i parts total\n' %
425 self.repo.ui.debug('bundle2-input-bundle: %i parts total\n' %
426 self.count)
426 self.count)
427
427
428 def processbundle(repo, unbundler, transactiongetter=None, op=None, source=''):
428 def processbundle(repo, unbundler, transactiongetter=None, op=None, source=''):
429 """This function process a bundle, apply effect to/from a repo
429 """This function process a bundle, apply effect to/from a repo
430
430
431 It iterates over each part then searches for and uses the proper handling
431 It iterates over each part then searches for and uses the proper handling
432 code to process the part. Parts are processed in order.
432 code to process the part. Parts are processed in order.
433
433
434 Unknown Mandatory part will abort the process.
434 Unknown Mandatory part will abort the process.
435
435
436 It is temporarily possible to provide a prebuilt bundleoperation to the
436 It is temporarily possible to provide a prebuilt bundleoperation to the
437 function. This is used to ensure output is properly propagated in case of
437 function. This is used to ensure output is properly propagated in case of
438 an error during the unbundling. This output capturing part will likely be
438 an error during the unbundling. This output capturing part will likely be
439 reworked and this ability will probably go away in the process.
439 reworked and this ability will probably go away in the process.
440 """
440 """
441 if op is None:
441 if op is None:
442 if transactiongetter is None:
442 if transactiongetter is None:
443 transactiongetter = _notransaction
443 transactiongetter = _notransaction
444 op = bundleoperation(repo, transactiongetter, source=source)
444 op = bundleoperation(repo, transactiongetter, source=source)
445 # todo:
445 # todo:
446 # - replace this is a init function soon.
446 # - replace this is a init function soon.
447 # - exception catching
447 # - exception catching
448 unbundler.params
448 unbundler.params
449 if repo.ui.debugflag:
449 if repo.ui.debugflag:
450 msg = ['bundle2-input-bundle:']
450 msg = ['bundle2-input-bundle:']
451 if unbundler.params:
451 if unbundler.params:
452 msg.append(' %i params' % len(unbundler.params))
452 msg.append(' %i params' % len(unbundler.params))
453 if op._gettransaction is None or op._gettransaction is _notransaction:
453 if op._gettransaction is None or op._gettransaction is _notransaction:
454 msg.append(' no-transaction')
454 msg.append(' no-transaction')
455 else:
455 else:
456 msg.append(' with-transaction')
456 msg.append(' with-transaction')
457 msg.append('\n')
457 msg.append('\n')
458 repo.ui.debug(''.join(msg))
458 repo.ui.debug(''.join(msg))
459
459
460 processparts(repo, op, unbundler)
460 processparts(repo, op, unbundler)
461
461
462 return op
462 return op
463
463
464 def processparts(repo, op, unbundler):
464 def processparts(repo, op, unbundler):
465 with partiterator(repo, op, unbundler) as parts:
465 with partiterator(repo, op, unbundler) as parts:
466 for part in parts:
466 for part in parts:
467 _processpart(op, part)
467 _processpart(op, part)
468
468
469 def _processchangegroup(op, cg, tr, source, url, **kwargs):
469 def _processchangegroup(op, cg, tr, source, url, **kwargs):
470 ret = cg.apply(op.repo, tr, source, url, **kwargs)
470 ret = cg.apply(op.repo, tr, source, url, **kwargs)
471 op.records.add('changegroup', {
471 op.records.add('changegroup', {
472 'return': ret,
472 'return': ret,
473 })
473 })
474 return ret
474 return ret
475
475
476 def _gethandler(op, part):
476 def _gethandler(op, part):
477 status = 'unknown' # used by debug output
477 status = 'unknown' # used by debug output
478 try:
478 try:
479 handler = parthandlermapping.get(part.type)
479 handler = parthandlermapping.get(part.type)
480 if handler is None:
480 if handler is None:
481 status = 'unsupported-type'
481 status = 'unsupported-type'
482 raise error.BundleUnknownFeatureError(parttype=part.type)
482 raise error.BundleUnknownFeatureError(parttype=part.type)
483 indebug(op.ui, 'found a handler for part %s' % part.type)
483 indebug(op.ui, 'found a handler for part %s' % part.type)
484 unknownparams = part.mandatorykeys - handler.params
484 unknownparams = part.mandatorykeys - handler.params
485 if unknownparams:
485 if unknownparams:
486 unknownparams = list(unknownparams)
486 unknownparams = list(unknownparams)
487 unknownparams.sort()
487 unknownparams.sort()
488 status = 'unsupported-params (%s)' % ', '.join(unknownparams)
488 status = 'unsupported-params (%s)' % ', '.join(unknownparams)
489 raise error.BundleUnknownFeatureError(parttype=part.type,
489 raise error.BundleUnknownFeatureError(parttype=part.type,
490 params=unknownparams)
490 params=unknownparams)
491 status = 'supported'
491 status = 'supported'
492 except error.BundleUnknownFeatureError as exc:
492 except error.BundleUnknownFeatureError as exc:
493 if part.mandatory: # mandatory parts
493 if part.mandatory: # mandatory parts
494 raise
494 raise
495 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
495 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
496 return # skip to part processing
496 return # skip to part processing
497 finally:
497 finally:
498 if op.ui.debugflag:
498 if op.ui.debugflag:
499 msg = ['bundle2-input-part: "%s"' % part.type]
499 msg = ['bundle2-input-part: "%s"' % part.type]
500 if not part.mandatory:
500 if not part.mandatory:
501 msg.append(' (advisory)')
501 msg.append(' (advisory)')
502 nbmp = len(part.mandatorykeys)
502 nbmp = len(part.mandatorykeys)
503 nbap = len(part.params) - nbmp
503 nbap = len(part.params) - nbmp
504 if nbmp or nbap:
504 if nbmp or nbap:
505 msg.append(' (params:')
505 msg.append(' (params:')
506 if nbmp:
506 if nbmp:
507 msg.append(' %i mandatory' % nbmp)
507 msg.append(' %i mandatory' % nbmp)
508 if nbap:
508 if nbap:
509 msg.append(' %i advisory' % nbmp)
509 msg.append(' %i advisory' % nbmp)
510 msg.append(')')
510 msg.append(')')
511 msg.append(' %s\n' % status)
511 msg.append(' %s\n' % status)
512 op.ui.debug(''.join(msg))
512 op.ui.debug(''.join(msg))
513
513
514 return handler
514 return handler
515
515
516 def _processpart(op, part):
516 def _processpart(op, part):
517 """process a single part from a bundle
517 """process a single part from a bundle
518
518
519 The part is guaranteed to have been fully consumed when the function exits
519 The part is guaranteed to have been fully consumed when the function exits
520 (even if an exception is raised)."""
520 (even if an exception is raised)."""
521 handler = _gethandler(op, part)
521 handler = _gethandler(op, part)
522 if handler is None:
522 if handler is None:
523 return
523 return
524
524
525 # handler is called outside the above try block so that we don't
525 # handler is called outside the above try block so that we don't
526 # risk catching KeyErrors from anything other than the
526 # risk catching KeyErrors from anything other than the
527 # parthandlermapping lookup (any KeyError raised by handler()
527 # parthandlermapping lookup (any KeyError raised by handler()
528 # itself represents a defect of a different variety).
528 # itself represents a defect of a different variety).
529 output = None
529 output = None
530 if op.captureoutput and op.reply is not None:
530 if op.captureoutput and op.reply is not None:
531 op.ui.pushbuffer(error=True, subproc=True)
531 op.ui.pushbuffer(error=True, subproc=True)
532 output = ''
532 output = ''
533 try:
533 try:
534 handler(op, part)
534 handler(op, part)
535 finally:
535 finally:
536 if output is not None:
536 if output is not None:
537 output = op.ui.popbuffer()
537 output = op.ui.popbuffer()
538 if output:
538 if output:
539 outpart = op.reply.newpart('output', data=output,
539 outpart = op.reply.newpart('output', data=output,
540 mandatory=False)
540 mandatory=False)
541 outpart.addparam(
541 outpart.addparam(
542 'in-reply-to', pycompat.bytestr(part.id), mandatory=False)
542 'in-reply-to', pycompat.bytestr(part.id), mandatory=False)
543
543
544 def decodecaps(blob):
544 def decodecaps(blob):
545 """decode a bundle2 caps bytes blob into a dictionary
545 """decode a bundle2 caps bytes blob into a dictionary
546
546
547 The blob is a list of capabilities (one per line)
547 The blob is a list of capabilities (one per line)
548 Capabilities may have values using a line of the form::
548 Capabilities may have values using a line of the form::
549
549
550 capability=value1,value2,value3
550 capability=value1,value2,value3
551
551
552 The values are always a list."""
552 The values are always a list."""
553 caps = {}
553 caps = {}
554 for line in blob.splitlines():
554 for line in blob.splitlines():
555 if not line:
555 if not line:
556 continue
556 continue
557 if '=' not in line:
557 if '=' not in line:
558 key, vals = line, ()
558 key, vals = line, ()
559 else:
559 else:
560 key, vals = line.split('=', 1)
560 key, vals = line.split('=', 1)
561 vals = vals.split(',')
561 vals = vals.split(',')
562 key = urlreq.unquote(key)
562 key = urlreq.unquote(key)
563 vals = [urlreq.unquote(v) for v in vals]
563 vals = [urlreq.unquote(v) for v in vals]
564 caps[key] = vals
564 caps[key] = vals
565 return caps
565 return caps
566
566
567 def encodecaps(caps):
567 def encodecaps(caps):
568 """encode a bundle2 caps dictionary into a bytes blob"""
568 """encode a bundle2 caps dictionary into a bytes blob"""
569 chunks = []
569 chunks = []
570 for ca in sorted(caps):
570 for ca in sorted(caps):
571 vals = caps[ca]
571 vals = caps[ca]
572 ca = urlreq.quote(ca)
572 ca = urlreq.quote(ca)
573 vals = [urlreq.quote(v) for v in vals]
573 vals = [urlreq.quote(v) for v in vals]
574 if vals:
574 if vals:
575 ca = "%s=%s" % (ca, ','.join(vals))
575 ca = "%s=%s" % (ca, ','.join(vals))
576 chunks.append(ca)
576 chunks.append(ca)
577 return '\n'.join(chunks)
577 return '\n'.join(chunks)
578
578
579 bundletypes = {
579 bundletypes = {
580 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
580 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
581 # since the unification ssh accepts a header but there
581 # since the unification ssh accepts a header but there
582 # is no capability signaling it.
582 # is no capability signaling it.
583 "HG20": (), # special-cased below
583 "HG20": (), # special-cased below
584 "HG10UN": ("HG10UN", 'UN'),
584 "HG10UN": ("HG10UN", 'UN'),
585 "HG10BZ": ("HG10", 'BZ'),
585 "HG10BZ": ("HG10", 'BZ'),
586 "HG10GZ": ("HG10GZ", 'GZ'),
586 "HG10GZ": ("HG10GZ", 'GZ'),
587 }
587 }
588
588
589 # hgweb uses this list to communicate its preferred type
589 # hgweb uses this list to communicate its preferred type
590 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
590 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
591
591
592 class bundle20(object):
592 class bundle20(object):
593 """represent an outgoing bundle2 container
593 """represent an outgoing bundle2 container
594
594
595 Use the `addparam` method to add stream level parameter. and `newpart` to
595 Use the `addparam` method to add stream level parameter. and `newpart` to
596 populate it. Then call `getchunks` to retrieve all the binary chunks of
596 populate it. Then call `getchunks` to retrieve all the binary chunks of
597 data that compose the bundle2 container."""
597 data that compose the bundle2 container."""
598
598
599 _magicstring = 'HG20'
599 _magicstring = 'HG20'
600
600
601 def __init__(self, ui, capabilities=()):
601 def __init__(self, ui, capabilities=()):
602 self.ui = ui
602 self.ui = ui
603 self._params = []
603 self._params = []
604 self._parts = []
604 self._parts = []
605 self.capabilities = dict(capabilities)
605 self.capabilities = dict(capabilities)
606 self._compengine = util.compengines.forbundletype('UN')
606 self._compengine = util.compengines.forbundletype('UN')
607 self._compopts = None
607 self._compopts = None
608 # If compression is being handled by a consumer of the raw
608 # If compression is being handled by a consumer of the raw
609 # data (e.g. the wire protocol), unsetting this flag tells
609 # data (e.g. the wire protocol), unsetting this flag tells
610 # consumers that the bundle is best left uncompressed.
610 # consumers that the bundle is best left uncompressed.
611 self.prefercompressed = True
611 self.prefercompressed = True
612
612
613 def setcompression(self, alg, compopts=None):
613 def setcompression(self, alg, compopts=None):
614 """setup core part compression to <alg>"""
614 """setup core part compression to <alg>"""
615 if alg in (None, 'UN'):
615 if alg in (None, 'UN'):
616 return
616 return
617 assert not any(n.lower() == 'compression' for n, v in self._params)
617 assert not any(n.lower() == 'compression' for n, v in self._params)
618 self.addparam('Compression', alg)
618 self.addparam('Compression', alg)
619 self._compengine = util.compengines.forbundletype(alg)
619 self._compengine = util.compengines.forbundletype(alg)
620 self._compopts = compopts
620 self._compopts = compopts
621
621
622 @property
622 @property
623 def nbparts(self):
623 def nbparts(self):
624 """total number of parts added to the bundler"""
624 """total number of parts added to the bundler"""
625 return len(self._parts)
625 return len(self._parts)
626
626
627 # methods used to defines the bundle2 content
627 # methods used to defines the bundle2 content
628 def addparam(self, name, value=None):
628 def addparam(self, name, value=None):
629 """add a stream level parameter"""
629 """add a stream level parameter"""
630 if not name:
630 if not name:
631 raise error.ProgrammingError(b'empty parameter name')
631 raise error.ProgrammingError(b'empty parameter name')
632 if name[0:1] not in pycompat.bytestr(string.ascii_letters):
632 if name[0:1] not in pycompat.bytestr(string.ascii_letters):
633 raise error.ProgrammingError(b'non letter first character: %s'
633 raise error.ProgrammingError(b'non letter first character: %s'
634 % name)
634 % name)
635 self._params.append((name, value))
635 self._params.append((name, value))
636
636
637 def addpart(self, part):
637 def addpart(self, part):
638 """add a new part to the bundle2 container
638 """add a new part to the bundle2 container
639
639
640 Parts contains the actual applicative payload."""
640 Parts contains the actual applicative payload."""
641 assert part.id is None
641 assert part.id is None
642 part.id = len(self._parts) # very cheap counter
642 part.id = len(self._parts) # very cheap counter
643 self._parts.append(part)
643 self._parts.append(part)
644
644
645 def newpart(self, typeid, *args, **kwargs):
645 def newpart(self, typeid, *args, **kwargs):
646 """create a new part and add it to the containers
646 """create a new part and add it to the containers
647
647
648 As the part is directly added to the containers. For now, this means
648 As the part is directly added to the containers. For now, this means
649 that any failure to properly initialize the part after calling
649 that any failure to properly initialize the part after calling
650 ``newpart`` should result in a failure of the whole bundling process.
650 ``newpart`` should result in a failure of the whole bundling process.
651
651
652 You can still fall back to manually create and add if you need better
652 You can still fall back to manually create and add if you need better
653 control."""
653 control."""
654 part = bundlepart(typeid, *args, **kwargs)
654 part = bundlepart(typeid, *args, **kwargs)
655 self.addpart(part)
655 self.addpart(part)
656 return part
656 return part
657
657
658 # methods used to generate the bundle2 stream
658 # methods used to generate the bundle2 stream
659 def getchunks(self):
659 def getchunks(self):
660 if self.ui.debugflag:
660 if self.ui.debugflag:
661 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
661 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
662 if self._params:
662 if self._params:
663 msg.append(' (%i params)' % len(self._params))
663 msg.append(' (%i params)' % len(self._params))
664 msg.append(' %i parts total\n' % len(self._parts))
664 msg.append(' %i parts total\n' % len(self._parts))
665 self.ui.debug(''.join(msg))
665 self.ui.debug(''.join(msg))
666 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
666 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
667 yield self._magicstring
667 yield self._magicstring
668 param = self._paramchunk()
668 param = self._paramchunk()
669 outdebug(self.ui, 'bundle parameter: %s' % param)
669 outdebug(self.ui, 'bundle parameter: %s' % param)
670 yield _pack(_fstreamparamsize, len(param))
670 yield _pack(_fstreamparamsize, len(param))
671 if param:
671 if param:
672 yield param
672 yield param
673 for chunk in self._compengine.compressstream(self._getcorechunk(),
673 for chunk in self._compengine.compressstream(self._getcorechunk(),
674 self._compopts):
674 self._compopts):
675 yield chunk
675 yield chunk
676
676
677 def _paramchunk(self):
677 def _paramchunk(self):
678 """return a encoded version of all stream parameters"""
678 """return a encoded version of all stream parameters"""
679 blocks = []
679 blocks = []
680 for par, value in self._params:
680 for par, value in self._params:
681 par = urlreq.quote(par)
681 par = urlreq.quote(par)
682 if value is not None:
682 if value is not None:
683 value = urlreq.quote(value)
683 value = urlreq.quote(value)
684 par = '%s=%s' % (par, value)
684 par = '%s=%s' % (par, value)
685 blocks.append(par)
685 blocks.append(par)
686 return ' '.join(blocks)
686 return ' '.join(blocks)
687
687
688 def _getcorechunk(self):
688 def _getcorechunk(self):
689 """yield chunk for the core part of the bundle
689 """yield chunk for the core part of the bundle
690
690
691 (all but headers and parameters)"""
691 (all but headers and parameters)"""
692 outdebug(self.ui, 'start of parts')
692 outdebug(self.ui, 'start of parts')
693 for part in self._parts:
693 for part in self._parts:
694 outdebug(self.ui, 'bundle part: "%s"' % part.type)
694 outdebug(self.ui, 'bundle part: "%s"' % part.type)
695 for chunk in part.getchunks(ui=self.ui):
695 for chunk in part.getchunks(ui=self.ui):
696 yield chunk
696 yield chunk
697 outdebug(self.ui, 'end of bundle')
697 outdebug(self.ui, 'end of bundle')
698 yield _pack(_fpartheadersize, 0)
698 yield _pack(_fpartheadersize, 0)
699
699
700
700
701 def salvageoutput(self):
701 def salvageoutput(self):
702 """return a list with a copy of all output parts in the bundle
702 """return a list with a copy of all output parts in the bundle
703
703
704 This is meant to be used during error handling to make sure we preserve
704 This is meant to be used during error handling to make sure we preserve
705 server output"""
705 server output"""
706 salvaged = []
706 salvaged = []
707 for part in self._parts:
707 for part in self._parts:
708 if part.type.startswith('output'):
708 if part.type.startswith('output'):
709 salvaged.append(part.copy())
709 salvaged.append(part.copy())
710 return salvaged
710 return salvaged
711
711
712
712
713 class unpackermixin(object):
713 class unpackermixin(object):
714 """A mixin to extract bytes and struct data from a stream"""
714 """A mixin to extract bytes and struct data from a stream"""
715
715
716 def __init__(self, fp):
716 def __init__(self, fp):
717 self._fp = fp
717 self._fp = fp
718
718
719 def _unpack(self, format):
719 def _unpack(self, format):
720 """unpack this struct format from the stream
720 """unpack this struct format from the stream
721
721
722 This method is meant for internal usage by the bundle2 protocol only.
722 This method is meant for internal usage by the bundle2 protocol only.
723 They directly manipulate the low level stream including bundle2 level
723 They directly manipulate the low level stream including bundle2 level
724 instruction.
724 instruction.
725
725
726 Do not use it to implement higher-level logic or methods."""
726 Do not use it to implement higher-level logic or methods."""
727 data = self._readexact(struct.calcsize(format))
727 data = self._readexact(struct.calcsize(format))
728 return _unpack(format, data)
728 return _unpack(format, data)
729
729
730 def _readexact(self, size):
730 def _readexact(self, size):
731 """read exactly <size> bytes from the stream
731 """read exactly <size> bytes from the stream
732
732
733 This method is meant for internal usage by the bundle2 protocol only.
733 This method is meant for internal usage by the bundle2 protocol only.
734 They directly manipulate the low level stream including bundle2 level
734 They directly manipulate the low level stream including bundle2 level
735 instruction.
735 instruction.
736
736
737 Do not use it to implement higher-level logic or methods."""
737 Do not use it to implement higher-level logic or methods."""
738 return changegroup.readexactly(self._fp, size)
738 return changegroup.readexactly(self._fp, size)
739
739
740 def getunbundler(ui, fp, magicstring=None):
740 def getunbundler(ui, fp, magicstring=None):
741 """return a valid unbundler object for a given magicstring"""
741 """return a valid unbundler object for a given magicstring"""
742 if magicstring is None:
742 if magicstring is None:
743 magicstring = changegroup.readexactly(fp, 4)
743 magicstring = changegroup.readexactly(fp, 4)
744 magic, version = magicstring[0:2], magicstring[2:4]
744 magic, version = magicstring[0:2], magicstring[2:4]
745 if magic != 'HG':
745 if magic != 'HG':
746 ui.debug(
746 ui.debug(
747 "error: invalid magic: %r (version %r), should be 'HG'\n"
747 "error: invalid magic: %r (version %r), should be 'HG'\n"
748 % (magic, version))
748 % (magic, version))
749 raise error.Abort(_('not a Mercurial bundle'))
749 raise error.Abort(_('not a Mercurial bundle'))
750 unbundlerclass = formatmap.get(version)
750 unbundlerclass = formatmap.get(version)
751 if unbundlerclass is None:
751 if unbundlerclass is None:
752 raise error.Abort(_('unknown bundle version %s') % version)
752 raise error.Abort(_('unknown bundle version %s') % version)
753 unbundler = unbundlerclass(ui, fp)
753 unbundler = unbundlerclass(ui, fp)
754 indebug(ui, 'start processing of %s stream' % magicstring)
754 indebug(ui, 'start processing of %s stream' % magicstring)
755 return unbundler
755 return unbundler
756
756
757 class unbundle20(unpackermixin):
757 class unbundle20(unpackermixin):
758 """interpret a bundle2 stream
758 """interpret a bundle2 stream
759
759
760 This class is fed with a binary stream and yields parts through its
760 This class is fed with a binary stream and yields parts through its
761 `iterparts` methods."""
761 `iterparts` methods."""
762
762
763 _magicstring = 'HG20'
763 _magicstring = 'HG20'
764
764
765 def __init__(self, ui, fp):
765 def __init__(self, ui, fp):
766 """If header is specified, we do not read it out of the stream."""
766 """If header is specified, we do not read it out of the stream."""
767 self.ui = ui
767 self.ui = ui
768 self._compengine = util.compengines.forbundletype('UN')
768 self._compengine = util.compengines.forbundletype('UN')
769 self._compressed = None
769 self._compressed = None
770 super(unbundle20, self).__init__(fp)
770 super(unbundle20, self).__init__(fp)
771
771
772 @util.propertycache
772 @util.propertycache
773 def params(self):
773 def params(self):
774 """dictionary of stream level parameters"""
774 """dictionary of stream level parameters"""
775 indebug(self.ui, 'reading bundle2 stream parameters')
775 indebug(self.ui, 'reading bundle2 stream parameters')
776 params = {}
776 params = {}
777 paramssize = self._unpack(_fstreamparamsize)[0]
777 paramssize = self._unpack(_fstreamparamsize)[0]
778 if paramssize < 0:
778 if paramssize < 0:
779 raise error.BundleValueError('negative bundle param size: %i'
779 raise error.BundleValueError('negative bundle param size: %i'
780 % paramssize)
780 % paramssize)
781 if paramssize:
781 if paramssize:
782 params = self._readexact(paramssize)
782 params = self._readexact(paramssize)
783 params = self._processallparams(params)
783 params = self._processallparams(params)
784 return params
784 return params
785
785
786 def _processallparams(self, paramsblock):
786 def _processallparams(self, paramsblock):
787 """"""
787 """"""
788 params = util.sortdict()
788 params = util.sortdict()
789 for p in paramsblock.split(' '):
789 for p in paramsblock.split(' '):
790 p = p.split('=', 1)
790 p = p.split('=', 1)
791 p = [urlreq.unquote(i) for i in p]
791 p = [urlreq.unquote(i) for i in p]
792 if len(p) < 2:
792 if len(p) < 2:
793 p.append(None)
793 p.append(None)
794 self._processparam(*p)
794 self._processparam(*p)
795 params[p[0]] = p[1]
795 params[p[0]] = p[1]
796 return params
796 return params
797
797
798
798
799 def _processparam(self, name, value):
799 def _processparam(self, name, value):
800 """process a parameter, applying its effect if needed
800 """process a parameter, applying its effect if needed
801
801
802 Parameter starting with a lower case letter are advisory and will be
802 Parameter starting with a lower case letter are advisory and will be
803 ignored when unknown. Those starting with an upper case letter are
803 ignored when unknown. Those starting with an upper case letter are
804 mandatory and will this function will raise a KeyError when unknown.
804 mandatory and will this function will raise a KeyError when unknown.
805
805
806 Note: no option are currently supported. Any input will be either
806 Note: no option are currently supported. Any input will be either
807 ignored or failing.
807 ignored or failing.
808 """
808 """
809 if not name:
809 if not name:
810 raise ValueError(r'empty parameter name')
810 raise ValueError(r'empty parameter name')
811 if name[0:1] not in pycompat.bytestr(string.ascii_letters):
811 if name[0:1] not in pycompat.bytestr(string.ascii_letters):
812 raise ValueError(r'non letter first character: %s' % name)
812 raise ValueError(r'non letter first character: %s' % name)
813 try:
813 try:
814 handler = b2streamparamsmap[name.lower()]
814 handler = b2streamparamsmap[name.lower()]
815 except KeyError:
815 except KeyError:
816 if name[0:1].islower():
816 if name[0:1].islower():
817 indebug(self.ui, "ignoring unknown parameter %s" % name)
817 indebug(self.ui, "ignoring unknown parameter %s" % name)
818 else:
818 else:
819 raise error.BundleUnknownFeatureError(params=(name,))
819 raise error.BundleUnknownFeatureError(params=(name,))
820 else:
820 else:
821 handler(self, name, value)
821 handler(self, name, value)
822
822
823 def _forwardchunks(self):
823 def _forwardchunks(self):
824 """utility to transfer a bundle2 as binary
824 """utility to transfer a bundle2 as binary
825
825
826 This is made necessary by the fact the 'getbundle' command over 'ssh'
826 This is made necessary by the fact the 'getbundle' command over 'ssh'
827 have no way to know then the reply end, relying on the bundle to be
827 have no way to know then the reply end, relying on the bundle to be
828 interpreted to know its end. This is terrible and we are sorry, but we
828 interpreted to know its end. This is terrible and we are sorry, but we
829 needed to move forward to get general delta enabled.
829 needed to move forward to get general delta enabled.
830 """
830 """
831 yield self._magicstring
831 yield self._magicstring
832 assert 'params' not in vars(self)
832 assert 'params' not in vars(self)
833 paramssize = self._unpack(_fstreamparamsize)[0]
833 paramssize = self._unpack(_fstreamparamsize)[0]
834 if paramssize < 0:
834 if paramssize < 0:
835 raise error.BundleValueError('negative bundle param size: %i'
835 raise error.BundleValueError('negative bundle param size: %i'
836 % paramssize)
836 % paramssize)
837 yield _pack(_fstreamparamsize, paramssize)
838 if paramssize:
837 if paramssize:
839 params = self._readexact(paramssize)
838 params = self._readexact(paramssize)
840 self._processallparams(params)
839 self._processallparams(params)
841 yield params
840 # The payload itself is decompressed below, so drop
842 assert self._compengine.bundletype()[1] == 'UN'
841 # the compression parameter passed down to compensate.
842 outparams = []
843 for p in params.split(' '):
844 k, v = p.split('=', 1)
845 if k.lower() != 'compression':
846 outparams.append(p)
847 outparams = ' '.join(outparams)
848 yield _pack(_fstreamparamsize, len(outparams))
849 yield outparams
850 else:
851 yield _pack(_fstreamparamsize, paramssize)
843 # From there, payload might need to be decompressed
852 # From there, payload might need to be decompressed
844 self._fp = self._compengine.decompressorreader(self._fp)
853 self._fp = self._compengine.decompressorreader(self._fp)
845 emptycount = 0
854 emptycount = 0
846 while emptycount < 2:
855 while emptycount < 2:
847 # so we can brainlessly loop
856 # so we can brainlessly loop
848 assert _fpartheadersize == _fpayloadsize
857 assert _fpartheadersize == _fpayloadsize
849 size = self._unpack(_fpartheadersize)[0]
858 size = self._unpack(_fpartheadersize)[0]
850 yield _pack(_fpartheadersize, size)
859 yield _pack(_fpartheadersize, size)
851 if size:
860 if size:
852 emptycount = 0
861 emptycount = 0
853 else:
862 else:
854 emptycount += 1
863 emptycount += 1
855 continue
864 continue
856 if size == flaginterrupt:
865 if size == flaginterrupt:
857 continue
866 continue
858 elif size < 0:
867 elif size < 0:
859 raise error.BundleValueError('negative chunk size: %i')
868 raise error.BundleValueError('negative chunk size: %i')
860 yield self._readexact(size)
869 yield self._readexact(size)
861
870
862
871
863 def iterparts(self, seekable=False):
872 def iterparts(self, seekable=False):
864 """yield all parts contained in the stream"""
873 """yield all parts contained in the stream"""
865 cls = seekableunbundlepart if seekable else unbundlepart
874 cls = seekableunbundlepart if seekable else unbundlepart
866 # make sure param have been loaded
875 # make sure param have been loaded
867 self.params
876 self.params
868 # From there, payload need to be decompressed
877 # From there, payload need to be decompressed
869 self._fp = self._compengine.decompressorreader(self._fp)
878 self._fp = self._compengine.decompressorreader(self._fp)
870 indebug(self.ui, 'start extraction of bundle2 parts')
879 indebug(self.ui, 'start extraction of bundle2 parts')
871 headerblock = self._readpartheader()
880 headerblock = self._readpartheader()
872 while headerblock is not None:
881 while headerblock is not None:
873 part = cls(self.ui, headerblock, self._fp)
882 part = cls(self.ui, headerblock, self._fp)
874 yield part
883 yield part
875 # Ensure part is fully consumed so we can start reading the next
884 # Ensure part is fully consumed so we can start reading the next
876 # part.
885 # part.
877 part.consume()
886 part.consume()
878
887
879 headerblock = self._readpartheader()
888 headerblock = self._readpartheader()
880 indebug(self.ui, 'end of bundle2 stream')
889 indebug(self.ui, 'end of bundle2 stream')
881
890
882 def _readpartheader(self):
891 def _readpartheader(self):
883 """reads a part header size and return the bytes blob
892 """reads a part header size and return the bytes blob
884
893
885 returns None if empty"""
894 returns None if empty"""
886 headersize = self._unpack(_fpartheadersize)[0]
895 headersize = self._unpack(_fpartheadersize)[0]
887 if headersize < 0:
896 if headersize < 0:
888 raise error.BundleValueError('negative part header size: %i'
897 raise error.BundleValueError('negative part header size: %i'
889 % headersize)
898 % headersize)
890 indebug(self.ui, 'part header size: %i' % headersize)
899 indebug(self.ui, 'part header size: %i' % headersize)
891 if headersize:
900 if headersize:
892 return self._readexact(headersize)
901 return self._readexact(headersize)
893 return None
902 return None
894
903
895 def compressed(self):
904 def compressed(self):
896 self.params # load params
905 self.params # load params
897 return self._compressed
906 return self._compressed
898
907
899 def close(self):
908 def close(self):
900 """close underlying file"""
909 """close underlying file"""
901 if util.safehasattr(self._fp, 'close'):
910 if util.safehasattr(self._fp, 'close'):
902 return self._fp.close()
911 return self._fp.close()
903
912
904 formatmap = {'20': unbundle20}
913 formatmap = {'20': unbundle20}
905
914
906 b2streamparamsmap = {}
915 b2streamparamsmap = {}
907
916
908 def b2streamparamhandler(name):
917 def b2streamparamhandler(name):
909 """register a handler for a stream level parameter"""
918 """register a handler for a stream level parameter"""
910 def decorator(func):
919 def decorator(func):
911 assert name not in formatmap
920 assert name not in formatmap
912 b2streamparamsmap[name] = func
921 b2streamparamsmap[name] = func
913 return func
922 return func
914 return decorator
923 return decorator
915
924
916 @b2streamparamhandler('compression')
925 @b2streamparamhandler('compression')
917 def processcompression(unbundler, param, value):
926 def processcompression(unbundler, param, value):
918 """read compression parameter and install payload decompression"""
927 """read compression parameter and install payload decompression"""
919 if value not in util.compengines.supportedbundletypes:
928 if value not in util.compengines.supportedbundletypes:
920 raise error.BundleUnknownFeatureError(params=(param,),
929 raise error.BundleUnknownFeatureError(params=(param,),
921 values=(value,))
930 values=(value,))
922 unbundler._compengine = util.compengines.forbundletype(value)
931 unbundler._compengine = util.compengines.forbundletype(value)
923 if value is not None:
932 if value is not None:
924 unbundler._compressed = True
933 unbundler._compressed = True
925
934
926 class bundlepart(object):
935 class bundlepart(object):
927 """A bundle2 part contains application level payload
936 """A bundle2 part contains application level payload
928
937
929 The part `type` is used to route the part to the application level
938 The part `type` is used to route the part to the application level
930 handler.
939 handler.
931
940
932 The part payload is contained in ``part.data``. It could be raw bytes or a
941 The part payload is contained in ``part.data``. It could be raw bytes or a
933 generator of byte chunks.
942 generator of byte chunks.
934
943
935 You can add parameters to the part using the ``addparam`` method.
944 You can add parameters to the part using the ``addparam`` method.
936 Parameters can be either mandatory (default) or advisory. Remote side
945 Parameters can be either mandatory (default) or advisory. Remote side
937 should be able to safely ignore the advisory ones.
946 should be able to safely ignore the advisory ones.
938
947
939 Both data and parameters cannot be modified after the generation has begun.
948 Both data and parameters cannot be modified after the generation has begun.
940 """
949 """
941
950
942 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
951 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
943 data='', mandatory=True):
952 data='', mandatory=True):
944 validateparttype(parttype)
953 validateparttype(parttype)
945 self.id = None
954 self.id = None
946 self.type = parttype
955 self.type = parttype
947 self._data = data
956 self._data = data
948 self._mandatoryparams = list(mandatoryparams)
957 self._mandatoryparams = list(mandatoryparams)
949 self._advisoryparams = list(advisoryparams)
958 self._advisoryparams = list(advisoryparams)
950 # checking for duplicated entries
959 # checking for duplicated entries
951 self._seenparams = set()
960 self._seenparams = set()
952 for pname, __ in self._mandatoryparams + self._advisoryparams:
961 for pname, __ in self._mandatoryparams + self._advisoryparams:
953 if pname in self._seenparams:
962 if pname in self._seenparams:
954 raise error.ProgrammingError('duplicated params: %s' % pname)
963 raise error.ProgrammingError('duplicated params: %s' % pname)
955 self._seenparams.add(pname)
964 self._seenparams.add(pname)
956 # status of the part's generation:
965 # status of the part's generation:
957 # - None: not started,
966 # - None: not started,
958 # - False: currently generated,
967 # - False: currently generated,
959 # - True: generation done.
968 # - True: generation done.
960 self._generated = None
969 self._generated = None
961 self.mandatory = mandatory
970 self.mandatory = mandatory
962
971
963 def __repr__(self):
972 def __repr__(self):
964 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
973 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
965 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
974 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
966 % (cls, id(self), self.id, self.type, self.mandatory))
975 % (cls, id(self), self.id, self.type, self.mandatory))
967
976
968 def copy(self):
977 def copy(self):
969 """return a copy of the part
978 """return a copy of the part
970
979
971 The new part have the very same content but no partid assigned yet.
980 The new part have the very same content but no partid assigned yet.
972 Parts with generated data cannot be copied."""
981 Parts with generated data cannot be copied."""
973 assert not util.safehasattr(self.data, 'next')
982 assert not util.safehasattr(self.data, 'next')
974 return self.__class__(self.type, self._mandatoryparams,
983 return self.__class__(self.type, self._mandatoryparams,
975 self._advisoryparams, self._data, self.mandatory)
984 self._advisoryparams, self._data, self.mandatory)
976
985
977 # methods used to defines the part content
986 # methods used to defines the part content
978 @property
987 @property
979 def data(self):
988 def data(self):
980 return self._data
989 return self._data
981
990
982 @data.setter
991 @data.setter
983 def data(self, data):
992 def data(self, data):
984 if self._generated is not None:
993 if self._generated is not None:
985 raise error.ReadOnlyPartError('part is being generated')
994 raise error.ReadOnlyPartError('part is being generated')
986 self._data = data
995 self._data = data
987
996
988 @property
997 @property
989 def mandatoryparams(self):
998 def mandatoryparams(self):
990 # make it an immutable tuple to force people through ``addparam``
999 # make it an immutable tuple to force people through ``addparam``
991 return tuple(self._mandatoryparams)
1000 return tuple(self._mandatoryparams)
992
1001
993 @property
1002 @property
994 def advisoryparams(self):
1003 def advisoryparams(self):
995 # make it an immutable tuple to force people through ``addparam``
1004 # make it an immutable tuple to force people through ``addparam``
996 return tuple(self._advisoryparams)
1005 return tuple(self._advisoryparams)
997
1006
998 def addparam(self, name, value='', mandatory=True):
1007 def addparam(self, name, value='', mandatory=True):
999 """add a parameter to the part
1008 """add a parameter to the part
1000
1009
1001 If 'mandatory' is set to True, the remote handler must claim support
1010 If 'mandatory' is set to True, the remote handler must claim support
1002 for this parameter or the unbundling will be aborted.
1011 for this parameter or the unbundling will be aborted.
1003
1012
1004 The 'name' and 'value' cannot exceed 255 bytes each.
1013 The 'name' and 'value' cannot exceed 255 bytes each.
1005 """
1014 """
1006 if self._generated is not None:
1015 if self._generated is not None:
1007 raise error.ReadOnlyPartError('part is being generated')
1016 raise error.ReadOnlyPartError('part is being generated')
1008 if name in self._seenparams:
1017 if name in self._seenparams:
1009 raise ValueError('duplicated params: %s' % name)
1018 raise ValueError('duplicated params: %s' % name)
1010 self._seenparams.add(name)
1019 self._seenparams.add(name)
1011 params = self._advisoryparams
1020 params = self._advisoryparams
1012 if mandatory:
1021 if mandatory:
1013 params = self._mandatoryparams
1022 params = self._mandatoryparams
1014 params.append((name, value))
1023 params.append((name, value))
1015
1024
1016 # methods used to generates the bundle2 stream
1025 # methods used to generates the bundle2 stream
1017 def getchunks(self, ui):
1026 def getchunks(self, ui):
1018 if self._generated is not None:
1027 if self._generated is not None:
1019 raise error.ProgrammingError('part can only be consumed once')
1028 raise error.ProgrammingError('part can only be consumed once')
1020 self._generated = False
1029 self._generated = False
1021
1030
1022 if ui.debugflag:
1031 if ui.debugflag:
1023 msg = ['bundle2-output-part: "%s"' % self.type]
1032 msg = ['bundle2-output-part: "%s"' % self.type]
1024 if not self.mandatory:
1033 if not self.mandatory:
1025 msg.append(' (advisory)')
1034 msg.append(' (advisory)')
1026 nbmp = len(self.mandatoryparams)
1035 nbmp = len(self.mandatoryparams)
1027 nbap = len(self.advisoryparams)
1036 nbap = len(self.advisoryparams)
1028 if nbmp or nbap:
1037 if nbmp or nbap:
1029 msg.append(' (params:')
1038 msg.append(' (params:')
1030 if nbmp:
1039 if nbmp:
1031 msg.append(' %i mandatory' % nbmp)
1040 msg.append(' %i mandatory' % nbmp)
1032 if nbap:
1041 if nbap:
1033 msg.append(' %i advisory' % nbmp)
1042 msg.append(' %i advisory' % nbmp)
1034 msg.append(')')
1043 msg.append(')')
1035 if not self.data:
1044 if not self.data:
1036 msg.append(' empty payload')
1045 msg.append(' empty payload')
1037 elif (util.safehasattr(self.data, 'next')
1046 elif (util.safehasattr(self.data, 'next')
1038 or util.safehasattr(self.data, '__next__')):
1047 or util.safehasattr(self.data, '__next__')):
1039 msg.append(' streamed payload')
1048 msg.append(' streamed payload')
1040 else:
1049 else:
1041 msg.append(' %i bytes payload' % len(self.data))
1050 msg.append(' %i bytes payload' % len(self.data))
1042 msg.append('\n')
1051 msg.append('\n')
1043 ui.debug(''.join(msg))
1052 ui.debug(''.join(msg))
1044
1053
1045 #### header
1054 #### header
1046 if self.mandatory:
1055 if self.mandatory:
1047 parttype = self.type.upper()
1056 parttype = self.type.upper()
1048 else:
1057 else:
1049 parttype = self.type.lower()
1058 parttype = self.type.lower()
1050 outdebug(ui, 'part %s: "%s"' % (pycompat.bytestr(self.id), parttype))
1059 outdebug(ui, 'part %s: "%s"' % (pycompat.bytestr(self.id), parttype))
1051 ## parttype
1060 ## parttype
1052 header = [_pack(_fparttypesize, len(parttype)),
1061 header = [_pack(_fparttypesize, len(parttype)),
1053 parttype, _pack(_fpartid, self.id),
1062 parttype, _pack(_fpartid, self.id),
1054 ]
1063 ]
1055 ## parameters
1064 ## parameters
1056 # count
1065 # count
1057 manpar = self.mandatoryparams
1066 manpar = self.mandatoryparams
1058 advpar = self.advisoryparams
1067 advpar = self.advisoryparams
1059 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
1068 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
1060 # size
1069 # size
1061 parsizes = []
1070 parsizes = []
1062 for key, value in manpar:
1071 for key, value in manpar:
1063 parsizes.append(len(key))
1072 parsizes.append(len(key))
1064 parsizes.append(len(value))
1073 parsizes.append(len(value))
1065 for key, value in advpar:
1074 for key, value in advpar:
1066 parsizes.append(len(key))
1075 parsizes.append(len(key))
1067 parsizes.append(len(value))
1076 parsizes.append(len(value))
1068 paramsizes = _pack(_makefpartparamsizes(len(parsizes) // 2), *parsizes)
1077 paramsizes = _pack(_makefpartparamsizes(len(parsizes) // 2), *parsizes)
1069 header.append(paramsizes)
1078 header.append(paramsizes)
1070 # key, value
1079 # key, value
1071 for key, value in manpar:
1080 for key, value in manpar:
1072 header.append(key)
1081 header.append(key)
1073 header.append(value)
1082 header.append(value)
1074 for key, value in advpar:
1083 for key, value in advpar:
1075 header.append(key)
1084 header.append(key)
1076 header.append(value)
1085 header.append(value)
1077 ## finalize header
1086 ## finalize header
1078 try:
1087 try:
1079 headerchunk = ''.join(header)
1088 headerchunk = ''.join(header)
1080 except TypeError:
1089 except TypeError:
1081 raise TypeError(r'Found a non-bytes trying to '
1090 raise TypeError(r'Found a non-bytes trying to '
1082 r'build bundle part header: %r' % header)
1091 r'build bundle part header: %r' % header)
1083 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
1092 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
1084 yield _pack(_fpartheadersize, len(headerchunk))
1093 yield _pack(_fpartheadersize, len(headerchunk))
1085 yield headerchunk
1094 yield headerchunk
1086 ## payload
1095 ## payload
1087 try:
1096 try:
1088 for chunk in self._payloadchunks():
1097 for chunk in self._payloadchunks():
1089 outdebug(ui, 'payload chunk size: %i' % len(chunk))
1098 outdebug(ui, 'payload chunk size: %i' % len(chunk))
1090 yield _pack(_fpayloadsize, len(chunk))
1099 yield _pack(_fpayloadsize, len(chunk))
1091 yield chunk
1100 yield chunk
1092 except GeneratorExit:
1101 except GeneratorExit:
1093 # GeneratorExit means that nobody is listening for our
1102 # GeneratorExit means that nobody is listening for our
1094 # results anyway, so just bail quickly rather than trying
1103 # results anyway, so just bail quickly rather than trying
1095 # to produce an error part.
1104 # to produce an error part.
1096 ui.debug('bundle2-generatorexit\n')
1105 ui.debug('bundle2-generatorexit\n')
1097 raise
1106 raise
1098 except BaseException as exc:
1107 except BaseException as exc:
1099 bexc = stringutil.forcebytestr(exc)
1108 bexc = stringutil.forcebytestr(exc)
1100 # backup exception data for later
1109 # backup exception data for later
1101 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
1110 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
1102 % bexc)
1111 % bexc)
1103 tb = sys.exc_info()[2]
1112 tb = sys.exc_info()[2]
1104 msg = 'unexpected error: %s' % bexc
1113 msg = 'unexpected error: %s' % bexc
1105 interpart = bundlepart('error:abort', [('message', msg)],
1114 interpart = bundlepart('error:abort', [('message', msg)],
1106 mandatory=False)
1115 mandatory=False)
1107 interpart.id = 0
1116 interpart.id = 0
1108 yield _pack(_fpayloadsize, -1)
1117 yield _pack(_fpayloadsize, -1)
1109 for chunk in interpart.getchunks(ui=ui):
1118 for chunk in interpart.getchunks(ui=ui):
1110 yield chunk
1119 yield chunk
1111 outdebug(ui, 'closing payload chunk')
1120 outdebug(ui, 'closing payload chunk')
1112 # abort current part payload
1121 # abort current part payload
1113 yield _pack(_fpayloadsize, 0)
1122 yield _pack(_fpayloadsize, 0)
1114 pycompat.raisewithtb(exc, tb)
1123 pycompat.raisewithtb(exc, tb)
1115 # end of payload
1124 # end of payload
1116 outdebug(ui, 'closing payload chunk')
1125 outdebug(ui, 'closing payload chunk')
1117 yield _pack(_fpayloadsize, 0)
1126 yield _pack(_fpayloadsize, 0)
1118 self._generated = True
1127 self._generated = True
1119
1128
1120 def _payloadchunks(self):
1129 def _payloadchunks(self):
1121 """yield chunks of a the part payload
1130 """yield chunks of a the part payload
1122
1131
1123 Exists to handle the different methods to provide data to a part."""
1132 Exists to handle the different methods to provide data to a part."""
1124 # we only support fixed size data now.
1133 # we only support fixed size data now.
1125 # This will be improved in the future.
1134 # This will be improved in the future.
1126 if (util.safehasattr(self.data, 'next')
1135 if (util.safehasattr(self.data, 'next')
1127 or util.safehasattr(self.data, '__next__')):
1136 or util.safehasattr(self.data, '__next__')):
1128 buff = util.chunkbuffer(self.data)
1137 buff = util.chunkbuffer(self.data)
1129 chunk = buff.read(preferedchunksize)
1138 chunk = buff.read(preferedchunksize)
1130 while chunk:
1139 while chunk:
1131 yield chunk
1140 yield chunk
1132 chunk = buff.read(preferedchunksize)
1141 chunk = buff.read(preferedchunksize)
1133 elif len(self.data):
1142 elif len(self.data):
1134 yield self.data
1143 yield self.data
1135
1144
1136
1145
1137 flaginterrupt = -1
1146 flaginterrupt = -1
1138
1147
1139 class interrupthandler(unpackermixin):
1148 class interrupthandler(unpackermixin):
1140 """read one part and process it with restricted capability
1149 """read one part and process it with restricted capability
1141
1150
1142 This allows to transmit exception raised on the producer size during part
1151 This allows to transmit exception raised on the producer size during part
1143 iteration while the consumer is reading a part.
1152 iteration while the consumer is reading a part.
1144
1153
1145 Part processed in this manner only have access to a ui object,"""
1154 Part processed in this manner only have access to a ui object,"""
1146
1155
1147 def __init__(self, ui, fp):
1156 def __init__(self, ui, fp):
1148 super(interrupthandler, self).__init__(fp)
1157 super(interrupthandler, self).__init__(fp)
1149 self.ui = ui
1158 self.ui = ui
1150
1159
1151 def _readpartheader(self):
1160 def _readpartheader(self):
1152 """reads a part header size and return the bytes blob
1161 """reads a part header size and return the bytes blob
1153
1162
1154 returns None if empty"""
1163 returns None if empty"""
1155 headersize = self._unpack(_fpartheadersize)[0]
1164 headersize = self._unpack(_fpartheadersize)[0]
1156 if headersize < 0:
1165 if headersize < 0:
1157 raise error.BundleValueError('negative part header size: %i'
1166 raise error.BundleValueError('negative part header size: %i'
1158 % headersize)
1167 % headersize)
1159 indebug(self.ui, 'part header size: %i\n' % headersize)
1168 indebug(self.ui, 'part header size: %i\n' % headersize)
1160 if headersize:
1169 if headersize:
1161 return self._readexact(headersize)
1170 return self._readexact(headersize)
1162 return None
1171 return None
1163
1172
1164 def __call__(self):
1173 def __call__(self):
1165
1174
1166 self.ui.debug('bundle2-input-stream-interrupt:'
1175 self.ui.debug('bundle2-input-stream-interrupt:'
1167 ' opening out of band context\n')
1176 ' opening out of band context\n')
1168 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1177 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1169 headerblock = self._readpartheader()
1178 headerblock = self._readpartheader()
1170 if headerblock is None:
1179 if headerblock is None:
1171 indebug(self.ui, 'no part found during interruption.')
1180 indebug(self.ui, 'no part found during interruption.')
1172 return
1181 return
1173 part = unbundlepart(self.ui, headerblock, self._fp)
1182 part = unbundlepart(self.ui, headerblock, self._fp)
1174 op = interruptoperation(self.ui)
1183 op = interruptoperation(self.ui)
1175 hardabort = False
1184 hardabort = False
1176 try:
1185 try:
1177 _processpart(op, part)
1186 _processpart(op, part)
1178 except (SystemExit, KeyboardInterrupt):
1187 except (SystemExit, KeyboardInterrupt):
1179 hardabort = True
1188 hardabort = True
1180 raise
1189 raise
1181 finally:
1190 finally:
1182 if not hardabort:
1191 if not hardabort:
1183 part.consume()
1192 part.consume()
1184 self.ui.debug('bundle2-input-stream-interrupt:'
1193 self.ui.debug('bundle2-input-stream-interrupt:'
1185 ' closing out of band context\n')
1194 ' closing out of band context\n')
1186
1195
1187 class interruptoperation(object):
1196 class interruptoperation(object):
1188 """A limited operation to be use by part handler during interruption
1197 """A limited operation to be use by part handler during interruption
1189
1198
1190 It only have access to an ui object.
1199 It only have access to an ui object.
1191 """
1200 """
1192
1201
1193 def __init__(self, ui):
1202 def __init__(self, ui):
1194 self.ui = ui
1203 self.ui = ui
1195 self.reply = None
1204 self.reply = None
1196 self.captureoutput = False
1205 self.captureoutput = False
1197
1206
1198 @property
1207 @property
1199 def repo(self):
1208 def repo(self):
1200 raise error.ProgrammingError('no repo access from stream interruption')
1209 raise error.ProgrammingError('no repo access from stream interruption')
1201
1210
1202 def gettransaction(self):
1211 def gettransaction(self):
1203 raise TransactionUnavailable('no repo access from stream interruption')
1212 raise TransactionUnavailable('no repo access from stream interruption')
1204
1213
1205 def decodepayloadchunks(ui, fh):
1214 def decodepayloadchunks(ui, fh):
1206 """Reads bundle2 part payload data into chunks.
1215 """Reads bundle2 part payload data into chunks.
1207
1216
1208 Part payload data consists of framed chunks. This function takes
1217 Part payload data consists of framed chunks. This function takes
1209 a file handle and emits those chunks.
1218 a file handle and emits those chunks.
1210 """
1219 """
1211 dolog = ui.configbool('devel', 'bundle2.debug')
1220 dolog = ui.configbool('devel', 'bundle2.debug')
1212 debug = ui.debug
1221 debug = ui.debug
1213
1222
1214 headerstruct = struct.Struct(_fpayloadsize)
1223 headerstruct = struct.Struct(_fpayloadsize)
1215 headersize = headerstruct.size
1224 headersize = headerstruct.size
1216 unpack = headerstruct.unpack
1225 unpack = headerstruct.unpack
1217
1226
1218 readexactly = changegroup.readexactly
1227 readexactly = changegroup.readexactly
1219 read = fh.read
1228 read = fh.read
1220
1229
1221 chunksize = unpack(readexactly(fh, headersize))[0]
1230 chunksize = unpack(readexactly(fh, headersize))[0]
1222 indebug(ui, 'payload chunk size: %i' % chunksize)
1231 indebug(ui, 'payload chunk size: %i' % chunksize)
1223
1232
1224 # changegroup.readexactly() is inlined below for performance.
1233 # changegroup.readexactly() is inlined below for performance.
1225 while chunksize:
1234 while chunksize:
1226 if chunksize >= 0:
1235 if chunksize >= 0:
1227 s = read(chunksize)
1236 s = read(chunksize)
1228 if len(s) < chunksize:
1237 if len(s) < chunksize:
1229 raise error.Abort(_('stream ended unexpectedly '
1238 raise error.Abort(_('stream ended unexpectedly '
1230 ' (got %d bytes, expected %d)') %
1239 ' (got %d bytes, expected %d)') %
1231 (len(s), chunksize))
1240 (len(s), chunksize))
1232
1241
1233 yield s
1242 yield s
1234 elif chunksize == flaginterrupt:
1243 elif chunksize == flaginterrupt:
1235 # Interrupt "signal" detected. The regular stream is interrupted
1244 # Interrupt "signal" detected. The regular stream is interrupted
1236 # and a bundle2 part follows. Consume it.
1245 # and a bundle2 part follows. Consume it.
1237 interrupthandler(ui, fh)()
1246 interrupthandler(ui, fh)()
1238 else:
1247 else:
1239 raise error.BundleValueError(
1248 raise error.BundleValueError(
1240 'negative payload chunk size: %s' % chunksize)
1249 'negative payload chunk size: %s' % chunksize)
1241
1250
1242 s = read(headersize)
1251 s = read(headersize)
1243 if len(s) < headersize:
1252 if len(s) < headersize:
1244 raise error.Abort(_('stream ended unexpectedly '
1253 raise error.Abort(_('stream ended unexpectedly '
1245 ' (got %d bytes, expected %d)') %
1254 ' (got %d bytes, expected %d)') %
1246 (len(s), chunksize))
1255 (len(s), chunksize))
1247
1256
1248 chunksize = unpack(s)[0]
1257 chunksize = unpack(s)[0]
1249
1258
1250 # indebug() inlined for performance.
1259 # indebug() inlined for performance.
1251 if dolog:
1260 if dolog:
1252 debug('bundle2-input: payload chunk size: %i\n' % chunksize)
1261 debug('bundle2-input: payload chunk size: %i\n' % chunksize)
1253
1262
1254 class unbundlepart(unpackermixin):
1263 class unbundlepart(unpackermixin):
1255 """a bundle part read from a bundle"""
1264 """a bundle part read from a bundle"""
1256
1265
1257 def __init__(self, ui, header, fp):
1266 def __init__(self, ui, header, fp):
1258 super(unbundlepart, self).__init__(fp)
1267 super(unbundlepart, self).__init__(fp)
1259 self._seekable = (util.safehasattr(fp, 'seek') and
1268 self._seekable = (util.safehasattr(fp, 'seek') and
1260 util.safehasattr(fp, 'tell'))
1269 util.safehasattr(fp, 'tell'))
1261 self.ui = ui
1270 self.ui = ui
1262 # unbundle state attr
1271 # unbundle state attr
1263 self._headerdata = header
1272 self._headerdata = header
1264 self._headeroffset = 0
1273 self._headeroffset = 0
1265 self._initialized = False
1274 self._initialized = False
1266 self.consumed = False
1275 self.consumed = False
1267 # part data
1276 # part data
1268 self.id = None
1277 self.id = None
1269 self.type = None
1278 self.type = None
1270 self.mandatoryparams = None
1279 self.mandatoryparams = None
1271 self.advisoryparams = None
1280 self.advisoryparams = None
1272 self.params = None
1281 self.params = None
1273 self.mandatorykeys = ()
1282 self.mandatorykeys = ()
1274 self._readheader()
1283 self._readheader()
1275 self._mandatory = None
1284 self._mandatory = None
1276 self._pos = 0
1285 self._pos = 0
1277
1286
1278 def _fromheader(self, size):
1287 def _fromheader(self, size):
1279 """return the next <size> byte from the header"""
1288 """return the next <size> byte from the header"""
1280 offset = self._headeroffset
1289 offset = self._headeroffset
1281 data = self._headerdata[offset:(offset + size)]
1290 data = self._headerdata[offset:(offset + size)]
1282 self._headeroffset = offset + size
1291 self._headeroffset = offset + size
1283 return data
1292 return data
1284
1293
1285 def _unpackheader(self, format):
1294 def _unpackheader(self, format):
1286 """read given format from header
1295 """read given format from header
1287
1296
1288 This automatically compute the size of the format to read."""
1297 This automatically compute the size of the format to read."""
1289 data = self._fromheader(struct.calcsize(format))
1298 data = self._fromheader(struct.calcsize(format))
1290 return _unpack(format, data)
1299 return _unpack(format, data)
1291
1300
1292 def _initparams(self, mandatoryparams, advisoryparams):
1301 def _initparams(self, mandatoryparams, advisoryparams):
1293 """internal function to setup all logic related parameters"""
1302 """internal function to setup all logic related parameters"""
1294 # make it read only to prevent people touching it by mistake.
1303 # make it read only to prevent people touching it by mistake.
1295 self.mandatoryparams = tuple(mandatoryparams)
1304 self.mandatoryparams = tuple(mandatoryparams)
1296 self.advisoryparams = tuple(advisoryparams)
1305 self.advisoryparams = tuple(advisoryparams)
1297 # user friendly UI
1306 # user friendly UI
1298 self.params = util.sortdict(self.mandatoryparams)
1307 self.params = util.sortdict(self.mandatoryparams)
1299 self.params.update(self.advisoryparams)
1308 self.params.update(self.advisoryparams)
1300 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1309 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1301
1310
1302 def _readheader(self):
1311 def _readheader(self):
1303 """read the header and setup the object"""
1312 """read the header and setup the object"""
1304 typesize = self._unpackheader(_fparttypesize)[0]
1313 typesize = self._unpackheader(_fparttypesize)[0]
1305 self.type = self._fromheader(typesize)
1314 self.type = self._fromheader(typesize)
1306 indebug(self.ui, 'part type: "%s"' % self.type)
1315 indebug(self.ui, 'part type: "%s"' % self.type)
1307 self.id = self._unpackheader(_fpartid)[0]
1316 self.id = self._unpackheader(_fpartid)[0]
1308 indebug(self.ui, 'part id: "%s"' % pycompat.bytestr(self.id))
1317 indebug(self.ui, 'part id: "%s"' % pycompat.bytestr(self.id))
1309 # extract mandatory bit from type
1318 # extract mandatory bit from type
1310 self.mandatory = (self.type != self.type.lower())
1319 self.mandatory = (self.type != self.type.lower())
1311 self.type = self.type.lower()
1320 self.type = self.type.lower()
1312 ## reading parameters
1321 ## reading parameters
1313 # param count
1322 # param count
1314 mancount, advcount = self._unpackheader(_fpartparamcount)
1323 mancount, advcount = self._unpackheader(_fpartparamcount)
1315 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1324 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1316 # param size
1325 # param size
1317 fparamsizes = _makefpartparamsizes(mancount + advcount)
1326 fparamsizes = _makefpartparamsizes(mancount + advcount)
1318 paramsizes = self._unpackheader(fparamsizes)
1327 paramsizes = self._unpackheader(fparamsizes)
1319 # make it a list of couple again
1328 # make it a list of couple again
1320 paramsizes = list(zip(paramsizes[::2], paramsizes[1::2]))
1329 paramsizes = list(zip(paramsizes[::2], paramsizes[1::2]))
1321 # split mandatory from advisory
1330 # split mandatory from advisory
1322 mansizes = paramsizes[:mancount]
1331 mansizes = paramsizes[:mancount]
1323 advsizes = paramsizes[mancount:]
1332 advsizes = paramsizes[mancount:]
1324 # retrieve param value
1333 # retrieve param value
1325 manparams = []
1334 manparams = []
1326 for key, value in mansizes:
1335 for key, value in mansizes:
1327 manparams.append((self._fromheader(key), self._fromheader(value)))
1336 manparams.append((self._fromheader(key), self._fromheader(value)))
1328 advparams = []
1337 advparams = []
1329 for key, value in advsizes:
1338 for key, value in advsizes:
1330 advparams.append((self._fromheader(key), self._fromheader(value)))
1339 advparams.append((self._fromheader(key), self._fromheader(value)))
1331 self._initparams(manparams, advparams)
1340 self._initparams(manparams, advparams)
1332 ## part payload
1341 ## part payload
1333 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1342 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1334 # we read the data, tell it
1343 # we read the data, tell it
1335 self._initialized = True
1344 self._initialized = True
1336
1345
1337 def _payloadchunks(self):
1346 def _payloadchunks(self):
1338 """Generator of decoded chunks in the payload."""
1347 """Generator of decoded chunks in the payload."""
1339 return decodepayloadchunks(self.ui, self._fp)
1348 return decodepayloadchunks(self.ui, self._fp)
1340
1349
1341 def consume(self):
1350 def consume(self):
1342 """Read the part payload until completion.
1351 """Read the part payload until completion.
1343
1352
1344 By consuming the part data, the underlying stream read offset will
1353 By consuming the part data, the underlying stream read offset will
1345 be advanced to the next part (or end of stream).
1354 be advanced to the next part (or end of stream).
1346 """
1355 """
1347 if self.consumed:
1356 if self.consumed:
1348 return
1357 return
1349
1358
1350 chunk = self.read(32768)
1359 chunk = self.read(32768)
1351 while chunk:
1360 while chunk:
1352 self._pos += len(chunk)
1361 self._pos += len(chunk)
1353 chunk = self.read(32768)
1362 chunk = self.read(32768)
1354
1363
1355 def read(self, size=None):
1364 def read(self, size=None):
1356 """read payload data"""
1365 """read payload data"""
1357 if not self._initialized:
1366 if not self._initialized:
1358 self._readheader()
1367 self._readheader()
1359 if size is None:
1368 if size is None:
1360 data = self._payloadstream.read()
1369 data = self._payloadstream.read()
1361 else:
1370 else:
1362 data = self._payloadstream.read(size)
1371 data = self._payloadstream.read(size)
1363 self._pos += len(data)
1372 self._pos += len(data)
1364 if size is None or len(data) < size:
1373 if size is None or len(data) < size:
1365 if not self.consumed and self._pos:
1374 if not self.consumed and self._pos:
1366 self.ui.debug('bundle2-input-part: total payload size %i\n'
1375 self.ui.debug('bundle2-input-part: total payload size %i\n'
1367 % self._pos)
1376 % self._pos)
1368 self.consumed = True
1377 self.consumed = True
1369 return data
1378 return data
1370
1379
1371 class seekableunbundlepart(unbundlepart):
1380 class seekableunbundlepart(unbundlepart):
1372 """A bundle2 part in a bundle that is seekable.
1381 """A bundle2 part in a bundle that is seekable.
1373
1382
1374 Regular ``unbundlepart`` instances can only be read once. This class
1383 Regular ``unbundlepart`` instances can only be read once. This class
1375 extends ``unbundlepart`` to enable bi-directional seeking within the
1384 extends ``unbundlepart`` to enable bi-directional seeking within the
1376 part.
1385 part.
1377
1386
1378 Bundle2 part data consists of framed chunks. Offsets when seeking
1387 Bundle2 part data consists of framed chunks. Offsets when seeking
1379 refer to the decoded data, not the offsets in the underlying bundle2
1388 refer to the decoded data, not the offsets in the underlying bundle2
1380 stream.
1389 stream.
1381
1390
1382 To facilitate quickly seeking within the decoded data, instances of this
1391 To facilitate quickly seeking within the decoded data, instances of this
1383 class maintain a mapping between offsets in the underlying stream and
1392 class maintain a mapping between offsets in the underlying stream and
1384 the decoded payload. This mapping will consume memory in proportion
1393 the decoded payload. This mapping will consume memory in proportion
1385 to the number of chunks within the payload (which almost certainly
1394 to the number of chunks within the payload (which almost certainly
1386 increases in proportion with the size of the part).
1395 increases in proportion with the size of the part).
1387 """
1396 """
1388 def __init__(self, ui, header, fp):
1397 def __init__(self, ui, header, fp):
1389 # (payload, file) offsets for chunk starts.
1398 # (payload, file) offsets for chunk starts.
1390 self._chunkindex = []
1399 self._chunkindex = []
1391
1400
1392 super(seekableunbundlepart, self).__init__(ui, header, fp)
1401 super(seekableunbundlepart, self).__init__(ui, header, fp)
1393
1402
1394 def _payloadchunks(self, chunknum=0):
1403 def _payloadchunks(self, chunknum=0):
1395 '''seek to specified chunk and start yielding data'''
1404 '''seek to specified chunk and start yielding data'''
1396 if len(self._chunkindex) == 0:
1405 if len(self._chunkindex) == 0:
1397 assert chunknum == 0, 'Must start with chunk 0'
1406 assert chunknum == 0, 'Must start with chunk 0'
1398 self._chunkindex.append((0, self._tellfp()))
1407 self._chunkindex.append((0, self._tellfp()))
1399 else:
1408 else:
1400 assert chunknum < len(self._chunkindex), (
1409 assert chunknum < len(self._chunkindex), (
1401 'Unknown chunk %d' % chunknum)
1410 'Unknown chunk %d' % chunknum)
1402 self._seekfp(self._chunkindex[chunknum][1])
1411 self._seekfp(self._chunkindex[chunknum][1])
1403
1412
1404 pos = self._chunkindex[chunknum][0]
1413 pos = self._chunkindex[chunknum][0]
1405
1414
1406 for chunk in decodepayloadchunks(self.ui, self._fp):
1415 for chunk in decodepayloadchunks(self.ui, self._fp):
1407 chunknum += 1
1416 chunknum += 1
1408 pos += len(chunk)
1417 pos += len(chunk)
1409 if chunknum == len(self._chunkindex):
1418 if chunknum == len(self._chunkindex):
1410 self._chunkindex.append((pos, self._tellfp()))
1419 self._chunkindex.append((pos, self._tellfp()))
1411
1420
1412 yield chunk
1421 yield chunk
1413
1422
1414 def _findchunk(self, pos):
1423 def _findchunk(self, pos):
1415 '''for a given payload position, return a chunk number and offset'''
1424 '''for a given payload position, return a chunk number and offset'''
1416 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1425 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1417 if ppos == pos:
1426 if ppos == pos:
1418 return chunk, 0
1427 return chunk, 0
1419 elif ppos > pos:
1428 elif ppos > pos:
1420 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1429 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1421 raise ValueError('Unknown chunk')
1430 raise ValueError('Unknown chunk')
1422
1431
1423 def tell(self):
1432 def tell(self):
1424 return self._pos
1433 return self._pos
1425
1434
1426 def seek(self, offset, whence=os.SEEK_SET):
1435 def seek(self, offset, whence=os.SEEK_SET):
1427 if whence == os.SEEK_SET:
1436 if whence == os.SEEK_SET:
1428 newpos = offset
1437 newpos = offset
1429 elif whence == os.SEEK_CUR:
1438 elif whence == os.SEEK_CUR:
1430 newpos = self._pos + offset
1439 newpos = self._pos + offset
1431 elif whence == os.SEEK_END:
1440 elif whence == os.SEEK_END:
1432 if not self.consumed:
1441 if not self.consumed:
1433 # Can't use self.consume() here because it advances self._pos.
1442 # Can't use self.consume() here because it advances self._pos.
1434 chunk = self.read(32768)
1443 chunk = self.read(32768)
1435 while chunk:
1444 while chunk:
1436 chunk = self.read(32768)
1445 chunk = self.read(32768)
1437 newpos = self._chunkindex[-1][0] - offset
1446 newpos = self._chunkindex[-1][0] - offset
1438 else:
1447 else:
1439 raise ValueError('Unknown whence value: %r' % (whence,))
1448 raise ValueError('Unknown whence value: %r' % (whence,))
1440
1449
1441 if newpos > self._chunkindex[-1][0] and not self.consumed:
1450 if newpos > self._chunkindex[-1][0] and not self.consumed:
1442 # Can't use self.consume() here because it advances self._pos.
1451 # Can't use self.consume() here because it advances self._pos.
1443 chunk = self.read(32768)
1452 chunk = self.read(32768)
1444 while chunk:
1453 while chunk:
1445 chunk = self.read(32668)
1454 chunk = self.read(32668)
1446
1455
1447 if not 0 <= newpos <= self._chunkindex[-1][0]:
1456 if not 0 <= newpos <= self._chunkindex[-1][0]:
1448 raise ValueError('Offset out of range')
1457 raise ValueError('Offset out of range')
1449
1458
1450 if self._pos != newpos:
1459 if self._pos != newpos:
1451 chunk, internaloffset = self._findchunk(newpos)
1460 chunk, internaloffset = self._findchunk(newpos)
1452 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1461 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1453 adjust = self.read(internaloffset)
1462 adjust = self.read(internaloffset)
1454 if len(adjust) != internaloffset:
1463 if len(adjust) != internaloffset:
1455 raise error.Abort(_('Seek failed\n'))
1464 raise error.Abort(_('Seek failed\n'))
1456 self._pos = newpos
1465 self._pos = newpos
1457
1466
1458 def _seekfp(self, offset, whence=0):
1467 def _seekfp(self, offset, whence=0):
1459 """move the underlying file pointer
1468 """move the underlying file pointer
1460
1469
1461 This method is meant for internal usage by the bundle2 protocol only.
1470 This method is meant for internal usage by the bundle2 protocol only.
1462 They directly manipulate the low level stream including bundle2 level
1471 They directly manipulate the low level stream including bundle2 level
1463 instruction.
1472 instruction.
1464
1473
1465 Do not use it to implement higher-level logic or methods."""
1474 Do not use it to implement higher-level logic or methods."""
1466 if self._seekable:
1475 if self._seekable:
1467 return self._fp.seek(offset, whence)
1476 return self._fp.seek(offset, whence)
1468 else:
1477 else:
1469 raise NotImplementedError(_('File pointer is not seekable'))
1478 raise NotImplementedError(_('File pointer is not seekable'))
1470
1479
1471 def _tellfp(self):
1480 def _tellfp(self):
1472 """return the file offset, or None if file is not seekable
1481 """return the file offset, or None if file is not seekable
1473
1482
1474 This method is meant for internal usage by the bundle2 protocol only.
1483 This method is meant for internal usage by the bundle2 protocol only.
1475 They directly manipulate the low level stream including bundle2 level
1484 They directly manipulate the low level stream including bundle2 level
1476 instruction.
1485 instruction.
1477
1486
1478 Do not use it to implement higher-level logic or methods."""
1487 Do not use it to implement higher-level logic or methods."""
1479 if self._seekable:
1488 if self._seekable:
1480 try:
1489 try:
1481 return self._fp.tell()
1490 return self._fp.tell()
1482 except IOError as e:
1491 except IOError as e:
1483 if e.errno == errno.ESPIPE:
1492 if e.errno == errno.ESPIPE:
1484 self._seekable = False
1493 self._seekable = False
1485 else:
1494 else:
1486 raise
1495 raise
1487 return None
1496 return None
1488
1497
1489 # These are only the static capabilities.
1498 # These are only the static capabilities.
1490 # Check the 'getrepocaps' function for the rest.
1499 # Check the 'getrepocaps' function for the rest.
1491 capabilities = {'HG20': (),
1500 capabilities = {'HG20': (),
1492 'bookmarks': (),
1501 'bookmarks': (),
1493 'error': ('abort', 'unsupportedcontent', 'pushraced',
1502 'error': ('abort', 'unsupportedcontent', 'pushraced',
1494 'pushkey'),
1503 'pushkey'),
1495 'listkeys': (),
1504 'listkeys': (),
1496 'pushkey': (),
1505 'pushkey': (),
1497 'digests': tuple(sorted(util.DIGESTS.keys())),
1506 'digests': tuple(sorted(util.DIGESTS.keys())),
1498 'remote-changegroup': ('http', 'https'),
1507 'remote-changegroup': ('http', 'https'),
1499 'hgtagsfnodes': (),
1508 'hgtagsfnodes': (),
1500 'rev-branch-cache': (),
1509 'rev-branch-cache': (),
1501 'phases': ('heads',),
1510 'phases': ('heads',),
1502 'stream': ('v2',),
1511 'stream': ('v2',),
1503 }
1512 }
1504
1513
1505 def getrepocaps(repo, allowpushback=False, role=None):
1514 def getrepocaps(repo, allowpushback=False, role=None):
1506 """return the bundle2 capabilities for a given repo
1515 """return the bundle2 capabilities for a given repo
1507
1516
1508 Exists to allow extensions (like evolution) to mutate the capabilities.
1517 Exists to allow extensions (like evolution) to mutate the capabilities.
1509
1518
1510 The returned value is used for servers advertising their capabilities as
1519 The returned value is used for servers advertising their capabilities as
1511 well as clients advertising their capabilities to servers as part of
1520 well as clients advertising their capabilities to servers as part of
1512 bundle2 requests. The ``role`` argument specifies which is which.
1521 bundle2 requests. The ``role`` argument specifies which is which.
1513 """
1522 """
1514 if role not in ('client', 'server'):
1523 if role not in ('client', 'server'):
1515 raise error.ProgrammingError('role argument must be client or server')
1524 raise error.ProgrammingError('role argument must be client or server')
1516
1525
1517 caps = capabilities.copy()
1526 caps = capabilities.copy()
1518 caps['changegroup'] = tuple(sorted(
1527 caps['changegroup'] = tuple(sorted(
1519 changegroup.supportedincomingversions(repo)))
1528 changegroup.supportedincomingversions(repo)))
1520 if obsolete.isenabled(repo, obsolete.exchangeopt):
1529 if obsolete.isenabled(repo, obsolete.exchangeopt):
1521 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1530 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1522 caps['obsmarkers'] = supportedformat
1531 caps['obsmarkers'] = supportedformat
1523 if allowpushback:
1532 if allowpushback:
1524 caps['pushback'] = ()
1533 caps['pushback'] = ()
1525 cpmode = repo.ui.config('server', 'concurrent-push-mode')
1534 cpmode = repo.ui.config('server', 'concurrent-push-mode')
1526 if cpmode == 'check-related':
1535 if cpmode == 'check-related':
1527 caps['checkheads'] = ('related',)
1536 caps['checkheads'] = ('related',)
1528 if 'phases' in repo.ui.configlist('devel', 'legacy.exchange'):
1537 if 'phases' in repo.ui.configlist('devel', 'legacy.exchange'):
1529 caps.pop('phases')
1538 caps.pop('phases')
1530
1539
1531 # Don't advertise stream clone support in server mode if not configured.
1540 # Don't advertise stream clone support in server mode if not configured.
1532 if role == 'server':
1541 if role == 'server':
1533 streamsupported = repo.ui.configbool('server', 'uncompressed',
1542 streamsupported = repo.ui.configbool('server', 'uncompressed',
1534 untrusted=True)
1543 untrusted=True)
1535 featuresupported = repo.ui.configbool('server', 'bundle2.stream')
1544 featuresupported = repo.ui.configbool('server', 'bundle2.stream')
1536
1545
1537 if not streamsupported or not featuresupported:
1546 if not streamsupported or not featuresupported:
1538 caps.pop('stream')
1547 caps.pop('stream')
1539 # Else always advertise support on client, because payload support
1548 # Else always advertise support on client, because payload support
1540 # should always be advertised.
1549 # should always be advertised.
1541
1550
1542 return caps
1551 return caps
1543
1552
1544 def bundle2caps(remote):
1553 def bundle2caps(remote):
1545 """return the bundle capabilities of a peer as dict"""
1554 """return the bundle capabilities of a peer as dict"""
1546 raw = remote.capable('bundle2')
1555 raw = remote.capable('bundle2')
1547 if not raw and raw != '':
1556 if not raw and raw != '':
1548 return {}
1557 return {}
1549 capsblob = urlreq.unquote(remote.capable('bundle2'))
1558 capsblob = urlreq.unquote(remote.capable('bundle2'))
1550 return decodecaps(capsblob)
1559 return decodecaps(capsblob)
1551
1560
1552 def obsmarkersversion(caps):
1561 def obsmarkersversion(caps):
1553 """extract the list of supported obsmarkers versions from a bundle2caps dict
1562 """extract the list of supported obsmarkers versions from a bundle2caps dict
1554 """
1563 """
1555 obscaps = caps.get('obsmarkers', ())
1564 obscaps = caps.get('obsmarkers', ())
1556 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1565 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1557
1566
1558 def writenewbundle(ui, repo, source, filename, bundletype, outgoing, opts,
1567 def writenewbundle(ui, repo, source, filename, bundletype, outgoing, opts,
1559 vfs=None, compression=None, compopts=None):
1568 vfs=None, compression=None, compopts=None):
1560 if bundletype.startswith('HG10'):
1569 if bundletype.startswith('HG10'):
1561 cg = changegroup.makechangegroup(repo, outgoing, '01', source)
1570 cg = changegroup.makechangegroup(repo, outgoing, '01', source)
1562 return writebundle(ui, cg, filename, bundletype, vfs=vfs,
1571 return writebundle(ui, cg, filename, bundletype, vfs=vfs,
1563 compression=compression, compopts=compopts)
1572 compression=compression, compopts=compopts)
1564 elif not bundletype.startswith('HG20'):
1573 elif not bundletype.startswith('HG20'):
1565 raise error.ProgrammingError('unknown bundle type: %s' % bundletype)
1574 raise error.ProgrammingError('unknown bundle type: %s' % bundletype)
1566
1575
1567 caps = {}
1576 caps = {}
1568 if 'obsolescence' in opts:
1577 if 'obsolescence' in opts:
1569 caps['obsmarkers'] = ('V1',)
1578 caps['obsmarkers'] = ('V1',)
1570 bundle = bundle20(ui, caps)
1579 bundle = bundle20(ui, caps)
1571 bundle.setcompression(compression, compopts)
1580 bundle.setcompression(compression, compopts)
1572 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1581 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1573 chunkiter = bundle.getchunks()
1582 chunkiter = bundle.getchunks()
1574
1583
1575 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1584 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1576
1585
1577 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1586 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1578 # We should eventually reconcile this logic with the one behind
1587 # We should eventually reconcile this logic with the one behind
1579 # 'exchange.getbundle2partsgenerator'.
1588 # 'exchange.getbundle2partsgenerator'.
1580 #
1589 #
1581 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1590 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1582 # different right now. So we keep them separated for now for the sake of
1591 # different right now. So we keep them separated for now for the sake of
1583 # simplicity.
1592 # simplicity.
1584
1593
1585 # we might not always want a changegroup in such bundle, for example in
1594 # we might not always want a changegroup in such bundle, for example in
1586 # stream bundles
1595 # stream bundles
1587 if opts.get('changegroup', True):
1596 if opts.get('changegroup', True):
1588 cgversion = opts.get('cg.version')
1597 cgversion = opts.get('cg.version')
1589 if cgversion is None:
1598 if cgversion is None:
1590 cgversion = changegroup.safeversion(repo)
1599 cgversion = changegroup.safeversion(repo)
1591 cg = changegroup.makechangegroup(repo, outgoing, cgversion, source)
1600 cg = changegroup.makechangegroup(repo, outgoing, cgversion, source)
1592 part = bundler.newpart('changegroup', data=cg.getchunks())
1601 part = bundler.newpart('changegroup', data=cg.getchunks())
1593 part.addparam('version', cg.version)
1602 part.addparam('version', cg.version)
1594 if 'clcount' in cg.extras:
1603 if 'clcount' in cg.extras:
1595 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1604 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1596 mandatory=False)
1605 mandatory=False)
1597 if opts.get('phases') and repo.revs('%ln and secret()',
1606 if opts.get('phases') and repo.revs('%ln and secret()',
1598 outgoing.missingheads):
1607 outgoing.missingheads):
1599 part.addparam('targetphase', '%d' % phases.secret, mandatory=False)
1608 part.addparam('targetphase', '%d' % phases.secret, mandatory=False)
1600
1609
1601 if opts.get('streamv2', False):
1610 if opts.get('streamv2', False):
1602 addpartbundlestream2(bundler, repo, stream=True)
1611 addpartbundlestream2(bundler, repo, stream=True)
1603
1612
1604 if opts.get('tagsfnodescache', True):
1613 if opts.get('tagsfnodescache', True):
1605 addparttagsfnodescache(repo, bundler, outgoing)
1614 addparttagsfnodescache(repo, bundler, outgoing)
1606
1615
1607 if opts.get('revbranchcache', True):
1616 if opts.get('revbranchcache', True):
1608 addpartrevbranchcache(repo, bundler, outgoing)
1617 addpartrevbranchcache(repo, bundler, outgoing)
1609
1618
1610 if opts.get('obsolescence', False):
1619 if opts.get('obsolescence', False):
1611 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1620 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1612 buildobsmarkerspart(bundler, obsmarkers)
1621 buildobsmarkerspart(bundler, obsmarkers)
1613
1622
1614 if opts.get('phases', False):
1623 if opts.get('phases', False):
1615 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1624 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1616 phasedata = phases.binaryencode(headsbyphase)
1625 phasedata = phases.binaryencode(headsbyphase)
1617 bundler.newpart('phase-heads', data=phasedata)
1626 bundler.newpart('phase-heads', data=phasedata)
1618
1627
1619 def addparttagsfnodescache(repo, bundler, outgoing):
1628 def addparttagsfnodescache(repo, bundler, outgoing):
1620 # we include the tags fnode cache for the bundle changeset
1629 # we include the tags fnode cache for the bundle changeset
1621 # (as an optional parts)
1630 # (as an optional parts)
1622 cache = tags.hgtagsfnodescache(repo.unfiltered())
1631 cache = tags.hgtagsfnodescache(repo.unfiltered())
1623 chunks = []
1632 chunks = []
1624
1633
1625 # .hgtags fnodes are only relevant for head changesets. While we could
1634 # .hgtags fnodes are only relevant for head changesets. While we could
1626 # transfer values for all known nodes, there will likely be little to
1635 # transfer values for all known nodes, there will likely be little to
1627 # no benefit.
1636 # no benefit.
1628 #
1637 #
1629 # We don't bother using a generator to produce output data because
1638 # We don't bother using a generator to produce output data because
1630 # a) we only have 40 bytes per head and even esoteric numbers of heads
1639 # a) we only have 40 bytes per head and even esoteric numbers of heads
1631 # consume little memory (1M heads is 40MB) b) we don't want to send the
1640 # consume little memory (1M heads is 40MB) b) we don't want to send the
1632 # part if we don't have entries and knowing if we have entries requires
1641 # part if we don't have entries and knowing if we have entries requires
1633 # cache lookups.
1642 # cache lookups.
1634 for node in outgoing.missingheads:
1643 for node in outgoing.missingheads:
1635 # Don't compute missing, as this may slow down serving.
1644 # Don't compute missing, as this may slow down serving.
1636 fnode = cache.getfnode(node, computemissing=False)
1645 fnode = cache.getfnode(node, computemissing=False)
1637 if fnode is not None:
1646 if fnode is not None:
1638 chunks.extend([node, fnode])
1647 chunks.extend([node, fnode])
1639
1648
1640 if chunks:
1649 if chunks:
1641 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1650 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1642
1651
1643 def addpartrevbranchcache(repo, bundler, outgoing):
1652 def addpartrevbranchcache(repo, bundler, outgoing):
1644 # we include the rev branch cache for the bundle changeset
1653 # we include the rev branch cache for the bundle changeset
1645 # (as an optional parts)
1654 # (as an optional parts)
1646 cache = repo.revbranchcache()
1655 cache = repo.revbranchcache()
1647 cl = repo.unfiltered().changelog
1656 cl = repo.unfiltered().changelog
1648 branchesdata = collections.defaultdict(lambda: (set(), set()))
1657 branchesdata = collections.defaultdict(lambda: (set(), set()))
1649 for node in outgoing.missing:
1658 for node in outgoing.missing:
1650 branch, close = cache.branchinfo(cl.rev(node))
1659 branch, close = cache.branchinfo(cl.rev(node))
1651 branchesdata[branch][close].add(node)
1660 branchesdata[branch][close].add(node)
1652
1661
1653 def generate():
1662 def generate():
1654 for branch, (nodes, closed) in sorted(branchesdata.items()):
1663 for branch, (nodes, closed) in sorted(branchesdata.items()):
1655 utf8branch = encoding.fromlocal(branch)
1664 utf8branch = encoding.fromlocal(branch)
1656 yield rbcstruct.pack(len(utf8branch), len(nodes), len(closed))
1665 yield rbcstruct.pack(len(utf8branch), len(nodes), len(closed))
1657 yield utf8branch
1666 yield utf8branch
1658 for n in sorted(nodes):
1667 for n in sorted(nodes):
1659 yield n
1668 yield n
1660 for n in sorted(closed):
1669 for n in sorted(closed):
1661 yield n
1670 yield n
1662
1671
1663 bundler.newpart('cache:rev-branch-cache', data=generate(),
1672 bundler.newpart('cache:rev-branch-cache', data=generate(),
1664 mandatory=False)
1673 mandatory=False)
1665
1674
1666 def _formatrequirementsspec(requirements):
1675 def _formatrequirementsspec(requirements):
1667 requirements = [req for req in requirements if req != "shared"]
1676 requirements = [req for req in requirements if req != "shared"]
1668 return urlreq.quote(','.join(sorted(requirements)))
1677 return urlreq.quote(','.join(sorted(requirements)))
1669
1678
1670 def _formatrequirementsparams(requirements):
1679 def _formatrequirementsparams(requirements):
1671 requirements = _formatrequirementsspec(requirements)
1680 requirements = _formatrequirementsspec(requirements)
1672 params = "%s%s" % (urlreq.quote("requirements="), requirements)
1681 params = "%s%s" % (urlreq.quote("requirements="), requirements)
1673 return params
1682 return params
1674
1683
1675 def addpartbundlestream2(bundler, repo, **kwargs):
1684 def addpartbundlestream2(bundler, repo, **kwargs):
1676 if not kwargs.get(r'stream', False):
1685 if not kwargs.get(r'stream', False):
1677 return
1686 return
1678
1687
1679 if not streamclone.allowservergeneration(repo):
1688 if not streamclone.allowservergeneration(repo):
1680 raise error.Abort(_('stream data requested but server does not allow '
1689 raise error.Abort(_('stream data requested but server does not allow '
1681 'this feature'),
1690 'this feature'),
1682 hint=_('well-behaved clients should not be '
1691 hint=_('well-behaved clients should not be '
1683 'requesting stream data from servers not '
1692 'requesting stream data from servers not '
1684 'advertising it; the client may be buggy'))
1693 'advertising it; the client may be buggy'))
1685
1694
1686 # Stream clones don't compress well. And compression undermines a
1695 # Stream clones don't compress well. And compression undermines a
1687 # goal of stream clones, which is to be fast. Communicate the desire
1696 # goal of stream clones, which is to be fast. Communicate the desire
1688 # to avoid compression to consumers of the bundle.
1697 # to avoid compression to consumers of the bundle.
1689 bundler.prefercompressed = False
1698 bundler.prefercompressed = False
1690
1699
1691 # get the includes and excludes
1700 # get the includes and excludes
1692 includepats = kwargs.get(r'includepats')
1701 includepats = kwargs.get(r'includepats')
1693 excludepats = kwargs.get(r'excludepats')
1702 excludepats = kwargs.get(r'excludepats')
1694
1703
1695 narrowstream = repo.ui.configbool('experimental',
1704 narrowstream = repo.ui.configbool('experimental',
1696 'server.stream-narrow-clones')
1705 'server.stream-narrow-clones')
1697
1706
1698 if (includepats or excludepats) and not narrowstream:
1707 if (includepats or excludepats) and not narrowstream:
1699 raise error.Abort(_('server does not support narrow stream clones'))
1708 raise error.Abort(_('server does not support narrow stream clones'))
1700
1709
1701 includeobsmarkers = False
1710 includeobsmarkers = False
1702 if repo.obsstore:
1711 if repo.obsstore:
1703 remoteversions = obsmarkersversion(bundler.capabilities)
1712 remoteversions = obsmarkersversion(bundler.capabilities)
1704 if not remoteversions:
1713 if not remoteversions:
1705 raise error.Abort(_('server has obsolescence markers, but client '
1714 raise error.Abort(_('server has obsolescence markers, but client '
1706 'cannot receive them via stream clone'))
1715 'cannot receive them via stream clone'))
1707 elif repo.obsstore._version in remoteversions:
1716 elif repo.obsstore._version in remoteversions:
1708 includeobsmarkers = True
1717 includeobsmarkers = True
1709
1718
1710 filecount, bytecount, it = streamclone.generatev2(repo, includepats,
1719 filecount, bytecount, it = streamclone.generatev2(repo, includepats,
1711 excludepats,
1720 excludepats,
1712 includeobsmarkers)
1721 includeobsmarkers)
1713 requirements = _formatrequirementsspec(repo.requirements)
1722 requirements = _formatrequirementsspec(repo.requirements)
1714 part = bundler.newpart('stream2', data=it)
1723 part = bundler.newpart('stream2', data=it)
1715 part.addparam('bytecount', '%d' % bytecount, mandatory=True)
1724 part.addparam('bytecount', '%d' % bytecount, mandatory=True)
1716 part.addparam('filecount', '%d' % filecount, mandatory=True)
1725 part.addparam('filecount', '%d' % filecount, mandatory=True)
1717 part.addparam('requirements', requirements, mandatory=True)
1726 part.addparam('requirements', requirements, mandatory=True)
1718
1727
1719 def buildobsmarkerspart(bundler, markers):
1728 def buildobsmarkerspart(bundler, markers):
1720 """add an obsmarker part to the bundler with <markers>
1729 """add an obsmarker part to the bundler with <markers>
1721
1730
1722 No part is created if markers is empty.
1731 No part is created if markers is empty.
1723 Raises ValueError if the bundler doesn't support any known obsmarker format.
1732 Raises ValueError if the bundler doesn't support any known obsmarker format.
1724 """
1733 """
1725 if not markers:
1734 if not markers:
1726 return None
1735 return None
1727
1736
1728 remoteversions = obsmarkersversion(bundler.capabilities)
1737 remoteversions = obsmarkersversion(bundler.capabilities)
1729 version = obsolete.commonversion(remoteversions)
1738 version = obsolete.commonversion(remoteversions)
1730 if version is None:
1739 if version is None:
1731 raise ValueError('bundler does not support common obsmarker format')
1740 raise ValueError('bundler does not support common obsmarker format')
1732 stream = obsolete.encodemarkers(markers, True, version=version)
1741 stream = obsolete.encodemarkers(markers, True, version=version)
1733 return bundler.newpart('obsmarkers', data=stream)
1742 return bundler.newpart('obsmarkers', data=stream)
1734
1743
1735 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1744 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1736 compopts=None):
1745 compopts=None):
1737 """Write a bundle file and return its filename.
1746 """Write a bundle file and return its filename.
1738
1747
1739 Existing files will not be overwritten.
1748 Existing files will not be overwritten.
1740 If no filename is specified, a temporary file is created.
1749 If no filename is specified, a temporary file is created.
1741 bz2 compression can be turned off.
1750 bz2 compression can be turned off.
1742 The bundle file will be deleted in case of errors.
1751 The bundle file will be deleted in case of errors.
1743 """
1752 """
1744
1753
1745 if bundletype == "HG20":
1754 if bundletype == "HG20":
1746 bundle = bundle20(ui)
1755 bundle = bundle20(ui)
1747 bundle.setcompression(compression, compopts)
1756 bundle.setcompression(compression, compopts)
1748 part = bundle.newpart('changegroup', data=cg.getchunks())
1757 part = bundle.newpart('changegroup', data=cg.getchunks())
1749 part.addparam('version', cg.version)
1758 part.addparam('version', cg.version)
1750 if 'clcount' in cg.extras:
1759 if 'clcount' in cg.extras:
1751 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1760 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1752 mandatory=False)
1761 mandatory=False)
1753 chunkiter = bundle.getchunks()
1762 chunkiter = bundle.getchunks()
1754 else:
1763 else:
1755 # compression argument is only for the bundle2 case
1764 # compression argument is only for the bundle2 case
1756 assert compression is None
1765 assert compression is None
1757 if cg.version != '01':
1766 if cg.version != '01':
1758 raise error.Abort(_('old bundle types only supports v1 '
1767 raise error.Abort(_('old bundle types only supports v1 '
1759 'changegroups'))
1768 'changegroups'))
1760 header, comp = bundletypes[bundletype]
1769 header, comp = bundletypes[bundletype]
1761 if comp not in util.compengines.supportedbundletypes:
1770 if comp not in util.compengines.supportedbundletypes:
1762 raise error.Abort(_('unknown stream compression type: %s')
1771 raise error.Abort(_('unknown stream compression type: %s')
1763 % comp)
1772 % comp)
1764 compengine = util.compengines.forbundletype(comp)
1773 compengine = util.compengines.forbundletype(comp)
1765 def chunkiter():
1774 def chunkiter():
1766 yield header
1775 yield header
1767 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1776 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1768 yield chunk
1777 yield chunk
1769 chunkiter = chunkiter()
1778 chunkiter = chunkiter()
1770
1779
1771 # parse the changegroup data, otherwise we will block
1780 # parse the changegroup data, otherwise we will block
1772 # in case of sshrepo because we don't know the end of the stream
1781 # in case of sshrepo because we don't know the end of the stream
1773 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1782 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1774
1783
1775 def combinechangegroupresults(op):
1784 def combinechangegroupresults(op):
1776 """logic to combine 0 or more addchangegroup results into one"""
1785 """logic to combine 0 or more addchangegroup results into one"""
1777 results = [r.get('return', 0)
1786 results = [r.get('return', 0)
1778 for r in op.records['changegroup']]
1787 for r in op.records['changegroup']]
1779 changedheads = 0
1788 changedheads = 0
1780 result = 1
1789 result = 1
1781 for ret in results:
1790 for ret in results:
1782 # If any changegroup result is 0, return 0
1791 # If any changegroup result is 0, return 0
1783 if ret == 0:
1792 if ret == 0:
1784 result = 0
1793 result = 0
1785 break
1794 break
1786 if ret < -1:
1795 if ret < -1:
1787 changedheads += ret + 1
1796 changedheads += ret + 1
1788 elif ret > 1:
1797 elif ret > 1:
1789 changedheads += ret - 1
1798 changedheads += ret - 1
1790 if changedheads > 0:
1799 if changedheads > 0:
1791 result = 1 + changedheads
1800 result = 1 + changedheads
1792 elif changedheads < 0:
1801 elif changedheads < 0:
1793 result = -1 + changedheads
1802 result = -1 + changedheads
1794 return result
1803 return result
1795
1804
1796 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest',
1805 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest',
1797 'targetphase'))
1806 'targetphase'))
1798 def handlechangegroup(op, inpart):
1807 def handlechangegroup(op, inpart):
1799 """apply a changegroup part on the repo
1808 """apply a changegroup part on the repo
1800
1809
1801 This is a very early implementation that will massive rework before being
1810 This is a very early implementation that will massive rework before being
1802 inflicted to any end-user.
1811 inflicted to any end-user.
1803 """
1812 """
1804 from . import localrepo
1813 from . import localrepo
1805
1814
1806 tr = op.gettransaction()
1815 tr = op.gettransaction()
1807 unpackerversion = inpart.params.get('version', '01')
1816 unpackerversion = inpart.params.get('version', '01')
1808 # We should raise an appropriate exception here
1817 # We should raise an appropriate exception here
1809 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1818 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1810 # the source and url passed here are overwritten by the one contained in
1819 # the source and url passed here are overwritten by the one contained in
1811 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1820 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1812 nbchangesets = None
1821 nbchangesets = None
1813 if 'nbchanges' in inpart.params:
1822 if 'nbchanges' in inpart.params:
1814 nbchangesets = int(inpart.params.get('nbchanges'))
1823 nbchangesets = int(inpart.params.get('nbchanges'))
1815 if ('treemanifest' in inpart.params and
1824 if ('treemanifest' in inpart.params and
1816 'treemanifest' not in op.repo.requirements):
1825 'treemanifest' not in op.repo.requirements):
1817 if len(op.repo.changelog) != 0:
1826 if len(op.repo.changelog) != 0:
1818 raise error.Abort(_(
1827 raise error.Abort(_(
1819 "bundle contains tree manifests, but local repo is "
1828 "bundle contains tree manifests, but local repo is "
1820 "non-empty and does not use tree manifests"))
1829 "non-empty and does not use tree manifests"))
1821 op.repo.requirements.add('treemanifest')
1830 op.repo.requirements.add('treemanifest')
1822 op.repo.svfs.options = localrepo.resolvestorevfsoptions(
1831 op.repo.svfs.options = localrepo.resolvestorevfsoptions(
1823 op.repo.ui, op.repo.requirements, op.repo.features)
1832 op.repo.ui, op.repo.requirements, op.repo.features)
1824 op.repo._writerequirements()
1833 op.repo._writerequirements()
1825 extrakwargs = {}
1834 extrakwargs = {}
1826 targetphase = inpart.params.get('targetphase')
1835 targetphase = inpart.params.get('targetphase')
1827 if targetphase is not None:
1836 if targetphase is not None:
1828 extrakwargs[r'targetphase'] = int(targetphase)
1837 extrakwargs[r'targetphase'] = int(targetphase)
1829 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2',
1838 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2',
1830 expectedtotal=nbchangesets, **extrakwargs)
1839 expectedtotal=nbchangesets, **extrakwargs)
1831 if op.reply is not None:
1840 if op.reply is not None:
1832 # This is definitely not the final form of this
1841 # This is definitely not the final form of this
1833 # return. But one need to start somewhere.
1842 # return. But one need to start somewhere.
1834 part = op.reply.newpart('reply:changegroup', mandatory=False)
1843 part = op.reply.newpart('reply:changegroup', mandatory=False)
1835 part.addparam(
1844 part.addparam(
1836 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1845 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1837 part.addparam('return', '%i' % ret, mandatory=False)
1846 part.addparam('return', '%i' % ret, mandatory=False)
1838 assert not inpart.read()
1847 assert not inpart.read()
1839
1848
1840 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1849 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1841 ['digest:%s' % k for k in util.DIGESTS.keys()])
1850 ['digest:%s' % k for k in util.DIGESTS.keys()])
1842 @parthandler('remote-changegroup', _remotechangegroupparams)
1851 @parthandler('remote-changegroup', _remotechangegroupparams)
1843 def handleremotechangegroup(op, inpart):
1852 def handleremotechangegroup(op, inpart):
1844 """apply a bundle10 on the repo, given an url and validation information
1853 """apply a bundle10 on the repo, given an url and validation information
1845
1854
1846 All the information about the remote bundle to import are given as
1855 All the information about the remote bundle to import are given as
1847 parameters. The parameters include:
1856 parameters. The parameters include:
1848 - url: the url to the bundle10.
1857 - url: the url to the bundle10.
1849 - size: the bundle10 file size. It is used to validate what was
1858 - size: the bundle10 file size. It is used to validate what was
1850 retrieved by the client matches the server knowledge about the bundle.
1859 retrieved by the client matches the server knowledge about the bundle.
1851 - digests: a space separated list of the digest types provided as
1860 - digests: a space separated list of the digest types provided as
1852 parameters.
1861 parameters.
1853 - digest:<digest-type>: the hexadecimal representation of the digest with
1862 - digest:<digest-type>: the hexadecimal representation of the digest with
1854 that name. Like the size, it is used to validate what was retrieved by
1863 that name. Like the size, it is used to validate what was retrieved by
1855 the client matches what the server knows about the bundle.
1864 the client matches what the server knows about the bundle.
1856
1865
1857 When multiple digest types are given, all of them are checked.
1866 When multiple digest types are given, all of them are checked.
1858 """
1867 """
1859 try:
1868 try:
1860 raw_url = inpart.params['url']
1869 raw_url = inpart.params['url']
1861 except KeyError:
1870 except KeyError:
1862 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1871 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1863 parsed_url = util.url(raw_url)
1872 parsed_url = util.url(raw_url)
1864 if parsed_url.scheme not in capabilities['remote-changegroup']:
1873 if parsed_url.scheme not in capabilities['remote-changegroup']:
1865 raise error.Abort(_('remote-changegroup does not support %s urls') %
1874 raise error.Abort(_('remote-changegroup does not support %s urls') %
1866 parsed_url.scheme)
1875 parsed_url.scheme)
1867
1876
1868 try:
1877 try:
1869 size = int(inpart.params['size'])
1878 size = int(inpart.params['size'])
1870 except ValueError:
1879 except ValueError:
1871 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1880 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1872 % 'size')
1881 % 'size')
1873 except KeyError:
1882 except KeyError:
1874 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1883 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1875
1884
1876 digests = {}
1885 digests = {}
1877 for typ in inpart.params.get('digests', '').split():
1886 for typ in inpart.params.get('digests', '').split():
1878 param = 'digest:%s' % typ
1887 param = 'digest:%s' % typ
1879 try:
1888 try:
1880 value = inpart.params[param]
1889 value = inpart.params[param]
1881 except KeyError:
1890 except KeyError:
1882 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1891 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1883 param)
1892 param)
1884 digests[typ] = value
1893 digests[typ] = value
1885
1894
1886 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1895 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1887
1896
1888 tr = op.gettransaction()
1897 tr = op.gettransaction()
1889 from . import exchange
1898 from . import exchange
1890 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1899 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1891 if not isinstance(cg, changegroup.cg1unpacker):
1900 if not isinstance(cg, changegroup.cg1unpacker):
1892 raise error.Abort(_('%s: not a bundle version 1.0') %
1901 raise error.Abort(_('%s: not a bundle version 1.0') %
1893 util.hidepassword(raw_url))
1902 util.hidepassword(raw_url))
1894 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2')
1903 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2')
1895 if op.reply is not None:
1904 if op.reply is not None:
1896 # This is definitely not the final form of this
1905 # This is definitely not the final form of this
1897 # return. But one need to start somewhere.
1906 # return. But one need to start somewhere.
1898 part = op.reply.newpart('reply:changegroup')
1907 part = op.reply.newpart('reply:changegroup')
1899 part.addparam(
1908 part.addparam(
1900 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1909 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1901 part.addparam('return', '%i' % ret, mandatory=False)
1910 part.addparam('return', '%i' % ret, mandatory=False)
1902 try:
1911 try:
1903 real_part.validate()
1912 real_part.validate()
1904 except error.Abort as e:
1913 except error.Abort as e:
1905 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1914 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1906 (util.hidepassword(raw_url), bytes(e)))
1915 (util.hidepassword(raw_url), bytes(e)))
1907 assert not inpart.read()
1916 assert not inpart.read()
1908
1917
1909 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1918 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1910 def handlereplychangegroup(op, inpart):
1919 def handlereplychangegroup(op, inpart):
1911 ret = int(inpart.params['return'])
1920 ret = int(inpart.params['return'])
1912 replyto = int(inpart.params['in-reply-to'])
1921 replyto = int(inpart.params['in-reply-to'])
1913 op.records.add('changegroup', {'return': ret}, replyto)
1922 op.records.add('changegroup', {'return': ret}, replyto)
1914
1923
1915 @parthandler('check:bookmarks')
1924 @parthandler('check:bookmarks')
1916 def handlecheckbookmarks(op, inpart):
1925 def handlecheckbookmarks(op, inpart):
1917 """check location of bookmarks
1926 """check location of bookmarks
1918
1927
1919 This part is to be used to detect push race regarding bookmark, it
1928 This part is to be used to detect push race regarding bookmark, it
1920 contains binary encoded (bookmark, node) tuple. If the local state does
1929 contains binary encoded (bookmark, node) tuple. If the local state does
1921 not marks the one in the part, a PushRaced exception is raised
1930 not marks the one in the part, a PushRaced exception is raised
1922 """
1931 """
1923 bookdata = bookmarks.binarydecode(inpart)
1932 bookdata = bookmarks.binarydecode(inpart)
1924
1933
1925 msgstandard = ('remote repository changed while pushing - please try again '
1934 msgstandard = ('remote repository changed while pushing - please try again '
1926 '(bookmark "%s" move from %s to %s)')
1935 '(bookmark "%s" move from %s to %s)')
1927 msgmissing = ('remote repository changed while pushing - please try again '
1936 msgmissing = ('remote repository changed while pushing - please try again '
1928 '(bookmark "%s" is missing, expected %s)')
1937 '(bookmark "%s" is missing, expected %s)')
1929 msgexist = ('remote repository changed while pushing - please try again '
1938 msgexist = ('remote repository changed while pushing - please try again '
1930 '(bookmark "%s" set on %s, expected missing)')
1939 '(bookmark "%s" set on %s, expected missing)')
1931 for book, node in bookdata:
1940 for book, node in bookdata:
1932 currentnode = op.repo._bookmarks.get(book)
1941 currentnode = op.repo._bookmarks.get(book)
1933 if currentnode != node:
1942 if currentnode != node:
1934 if node is None:
1943 if node is None:
1935 finalmsg = msgexist % (book, nodemod.short(currentnode))
1944 finalmsg = msgexist % (book, nodemod.short(currentnode))
1936 elif currentnode is None:
1945 elif currentnode is None:
1937 finalmsg = msgmissing % (book, nodemod.short(node))
1946 finalmsg = msgmissing % (book, nodemod.short(node))
1938 else:
1947 else:
1939 finalmsg = msgstandard % (book, nodemod.short(node),
1948 finalmsg = msgstandard % (book, nodemod.short(node),
1940 nodemod.short(currentnode))
1949 nodemod.short(currentnode))
1941 raise error.PushRaced(finalmsg)
1950 raise error.PushRaced(finalmsg)
1942
1951
1943 @parthandler('check:heads')
1952 @parthandler('check:heads')
1944 def handlecheckheads(op, inpart):
1953 def handlecheckheads(op, inpart):
1945 """check that head of the repo did not change
1954 """check that head of the repo did not change
1946
1955
1947 This is used to detect a push race when using unbundle.
1956 This is used to detect a push race when using unbundle.
1948 This replaces the "heads" argument of unbundle."""
1957 This replaces the "heads" argument of unbundle."""
1949 h = inpart.read(20)
1958 h = inpart.read(20)
1950 heads = []
1959 heads = []
1951 while len(h) == 20:
1960 while len(h) == 20:
1952 heads.append(h)
1961 heads.append(h)
1953 h = inpart.read(20)
1962 h = inpart.read(20)
1954 assert not h
1963 assert not h
1955 # Trigger a transaction so that we are guaranteed to have the lock now.
1964 # Trigger a transaction so that we are guaranteed to have the lock now.
1956 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1965 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1957 op.gettransaction()
1966 op.gettransaction()
1958 if sorted(heads) != sorted(op.repo.heads()):
1967 if sorted(heads) != sorted(op.repo.heads()):
1959 raise error.PushRaced('remote repository changed while pushing - '
1968 raise error.PushRaced('remote repository changed while pushing - '
1960 'please try again')
1969 'please try again')
1961
1970
1962 @parthandler('check:updated-heads')
1971 @parthandler('check:updated-heads')
1963 def handlecheckupdatedheads(op, inpart):
1972 def handlecheckupdatedheads(op, inpart):
1964 """check for race on the heads touched by a push
1973 """check for race on the heads touched by a push
1965
1974
1966 This is similar to 'check:heads' but focus on the heads actually updated
1975 This is similar to 'check:heads' but focus on the heads actually updated
1967 during the push. If other activities happen on unrelated heads, it is
1976 during the push. If other activities happen on unrelated heads, it is
1968 ignored.
1977 ignored.
1969
1978
1970 This allow server with high traffic to avoid push contention as long as
1979 This allow server with high traffic to avoid push contention as long as
1971 unrelated parts of the graph are involved."""
1980 unrelated parts of the graph are involved."""
1972 h = inpart.read(20)
1981 h = inpart.read(20)
1973 heads = []
1982 heads = []
1974 while len(h) == 20:
1983 while len(h) == 20:
1975 heads.append(h)
1984 heads.append(h)
1976 h = inpart.read(20)
1985 h = inpart.read(20)
1977 assert not h
1986 assert not h
1978 # trigger a transaction so that we are guaranteed to have the lock now.
1987 # trigger a transaction so that we are guaranteed to have the lock now.
1979 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1988 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1980 op.gettransaction()
1989 op.gettransaction()
1981
1990
1982 currentheads = set()
1991 currentheads = set()
1983 for ls in op.repo.branchmap().iterheads():
1992 for ls in op.repo.branchmap().iterheads():
1984 currentheads.update(ls)
1993 currentheads.update(ls)
1985
1994
1986 for h in heads:
1995 for h in heads:
1987 if h not in currentheads:
1996 if h not in currentheads:
1988 raise error.PushRaced('remote repository changed while pushing - '
1997 raise error.PushRaced('remote repository changed while pushing - '
1989 'please try again')
1998 'please try again')
1990
1999
1991 @parthandler('check:phases')
2000 @parthandler('check:phases')
1992 def handlecheckphases(op, inpart):
2001 def handlecheckphases(op, inpart):
1993 """check that phase boundaries of the repository did not change
2002 """check that phase boundaries of the repository did not change
1994
2003
1995 This is used to detect a push race.
2004 This is used to detect a push race.
1996 """
2005 """
1997 phasetonodes = phases.binarydecode(inpart)
2006 phasetonodes = phases.binarydecode(inpart)
1998 unfi = op.repo.unfiltered()
2007 unfi = op.repo.unfiltered()
1999 cl = unfi.changelog
2008 cl = unfi.changelog
2000 phasecache = unfi._phasecache
2009 phasecache = unfi._phasecache
2001 msg = ('remote repository changed while pushing - please try again '
2010 msg = ('remote repository changed while pushing - please try again '
2002 '(%s is %s expected %s)')
2011 '(%s is %s expected %s)')
2003 for expectedphase, nodes in enumerate(phasetonodes):
2012 for expectedphase, nodes in enumerate(phasetonodes):
2004 for n in nodes:
2013 for n in nodes:
2005 actualphase = phasecache.phase(unfi, cl.rev(n))
2014 actualphase = phasecache.phase(unfi, cl.rev(n))
2006 if actualphase != expectedphase:
2015 if actualphase != expectedphase:
2007 finalmsg = msg % (nodemod.short(n),
2016 finalmsg = msg % (nodemod.short(n),
2008 phases.phasenames[actualphase],
2017 phases.phasenames[actualphase],
2009 phases.phasenames[expectedphase])
2018 phases.phasenames[expectedphase])
2010 raise error.PushRaced(finalmsg)
2019 raise error.PushRaced(finalmsg)
2011
2020
2012 @parthandler('output')
2021 @parthandler('output')
2013 def handleoutput(op, inpart):
2022 def handleoutput(op, inpart):
2014 """forward output captured on the server to the client"""
2023 """forward output captured on the server to the client"""
2015 for line in inpart.read().splitlines():
2024 for line in inpart.read().splitlines():
2016 op.ui.status(_('remote: %s\n') % line)
2025 op.ui.status(_('remote: %s\n') % line)
2017
2026
2018 @parthandler('replycaps')
2027 @parthandler('replycaps')
2019 def handlereplycaps(op, inpart):
2028 def handlereplycaps(op, inpart):
2020 """Notify that a reply bundle should be created
2029 """Notify that a reply bundle should be created
2021
2030
2022 The payload contains the capabilities information for the reply"""
2031 The payload contains the capabilities information for the reply"""
2023 caps = decodecaps(inpart.read())
2032 caps = decodecaps(inpart.read())
2024 if op.reply is None:
2033 if op.reply is None:
2025 op.reply = bundle20(op.ui, caps)
2034 op.reply = bundle20(op.ui, caps)
2026
2035
2027 class AbortFromPart(error.Abort):
2036 class AbortFromPart(error.Abort):
2028 """Sub-class of Abort that denotes an error from a bundle2 part."""
2037 """Sub-class of Abort that denotes an error from a bundle2 part."""
2029
2038
2030 @parthandler('error:abort', ('message', 'hint'))
2039 @parthandler('error:abort', ('message', 'hint'))
2031 def handleerrorabort(op, inpart):
2040 def handleerrorabort(op, inpart):
2032 """Used to transmit abort error over the wire"""
2041 """Used to transmit abort error over the wire"""
2033 raise AbortFromPart(inpart.params['message'],
2042 raise AbortFromPart(inpart.params['message'],
2034 hint=inpart.params.get('hint'))
2043 hint=inpart.params.get('hint'))
2035
2044
2036 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
2045 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
2037 'in-reply-to'))
2046 'in-reply-to'))
2038 def handleerrorpushkey(op, inpart):
2047 def handleerrorpushkey(op, inpart):
2039 """Used to transmit failure of a mandatory pushkey over the wire"""
2048 """Used to transmit failure of a mandatory pushkey over the wire"""
2040 kwargs = {}
2049 kwargs = {}
2041 for name in ('namespace', 'key', 'new', 'old', 'ret'):
2050 for name in ('namespace', 'key', 'new', 'old', 'ret'):
2042 value = inpart.params.get(name)
2051 value = inpart.params.get(name)
2043 if value is not None:
2052 if value is not None:
2044 kwargs[name] = value
2053 kwargs[name] = value
2045 raise error.PushkeyFailed(inpart.params['in-reply-to'],
2054 raise error.PushkeyFailed(inpart.params['in-reply-to'],
2046 **pycompat.strkwargs(kwargs))
2055 **pycompat.strkwargs(kwargs))
2047
2056
2048 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
2057 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
2049 def handleerrorunsupportedcontent(op, inpart):
2058 def handleerrorunsupportedcontent(op, inpart):
2050 """Used to transmit unknown content error over the wire"""
2059 """Used to transmit unknown content error over the wire"""
2051 kwargs = {}
2060 kwargs = {}
2052 parttype = inpart.params.get('parttype')
2061 parttype = inpart.params.get('parttype')
2053 if parttype is not None:
2062 if parttype is not None:
2054 kwargs['parttype'] = parttype
2063 kwargs['parttype'] = parttype
2055 params = inpart.params.get('params')
2064 params = inpart.params.get('params')
2056 if params is not None:
2065 if params is not None:
2057 kwargs['params'] = params.split('\0')
2066 kwargs['params'] = params.split('\0')
2058
2067
2059 raise error.BundleUnknownFeatureError(**pycompat.strkwargs(kwargs))
2068 raise error.BundleUnknownFeatureError(**pycompat.strkwargs(kwargs))
2060
2069
2061 @parthandler('error:pushraced', ('message',))
2070 @parthandler('error:pushraced', ('message',))
2062 def handleerrorpushraced(op, inpart):
2071 def handleerrorpushraced(op, inpart):
2063 """Used to transmit push race error over the wire"""
2072 """Used to transmit push race error over the wire"""
2064 raise error.ResponseError(_('push failed:'), inpart.params['message'])
2073 raise error.ResponseError(_('push failed:'), inpart.params['message'])
2065
2074
2066 @parthandler('listkeys', ('namespace',))
2075 @parthandler('listkeys', ('namespace',))
2067 def handlelistkeys(op, inpart):
2076 def handlelistkeys(op, inpart):
2068 """retrieve pushkey namespace content stored in a bundle2"""
2077 """retrieve pushkey namespace content stored in a bundle2"""
2069 namespace = inpart.params['namespace']
2078 namespace = inpart.params['namespace']
2070 r = pushkey.decodekeys(inpart.read())
2079 r = pushkey.decodekeys(inpart.read())
2071 op.records.add('listkeys', (namespace, r))
2080 op.records.add('listkeys', (namespace, r))
2072
2081
2073 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
2082 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
2074 def handlepushkey(op, inpart):
2083 def handlepushkey(op, inpart):
2075 """process a pushkey request"""
2084 """process a pushkey request"""
2076 dec = pushkey.decode
2085 dec = pushkey.decode
2077 namespace = dec(inpart.params['namespace'])
2086 namespace = dec(inpart.params['namespace'])
2078 key = dec(inpart.params['key'])
2087 key = dec(inpart.params['key'])
2079 old = dec(inpart.params['old'])
2088 old = dec(inpart.params['old'])
2080 new = dec(inpart.params['new'])
2089 new = dec(inpart.params['new'])
2081 # Grab the transaction to ensure that we have the lock before performing the
2090 # Grab the transaction to ensure that we have the lock before performing the
2082 # pushkey.
2091 # pushkey.
2083 if op.ui.configbool('experimental', 'bundle2lazylocking'):
2092 if op.ui.configbool('experimental', 'bundle2lazylocking'):
2084 op.gettransaction()
2093 op.gettransaction()
2085 ret = op.repo.pushkey(namespace, key, old, new)
2094 ret = op.repo.pushkey(namespace, key, old, new)
2086 record = {'namespace': namespace,
2095 record = {'namespace': namespace,
2087 'key': key,
2096 'key': key,
2088 'old': old,
2097 'old': old,
2089 'new': new}
2098 'new': new}
2090 op.records.add('pushkey', record)
2099 op.records.add('pushkey', record)
2091 if op.reply is not None:
2100 if op.reply is not None:
2092 rpart = op.reply.newpart('reply:pushkey')
2101 rpart = op.reply.newpart('reply:pushkey')
2093 rpart.addparam(
2102 rpart.addparam(
2094 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
2103 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
2095 rpart.addparam('return', '%i' % ret, mandatory=False)
2104 rpart.addparam('return', '%i' % ret, mandatory=False)
2096 if inpart.mandatory and not ret:
2105 if inpart.mandatory and not ret:
2097 kwargs = {}
2106 kwargs = {}
2098 for key in ('namespace', 'key', 'new', 'old', 'ret'):
2107 for key in ('namespace', 'key', 'new', 'old', 'ret'):
2099 if key in inpart.params:
2108 if key in inpart.params:
2100 kwargs[key] = inpart.params[key]
2109 kwargs[key] = inpart.params[key]
2101 raise error.PushkeyFailed(partid='%d' % inpart.id,
2110 raise error.PushkeyFailed(partid='%d' % inpart.id,
2102 **pycompat.strkwargs(kwargs))
2111 **pycompat.strkwargs(kwargs))
2103
2112
2104 @parthandler('bookmarks')
2113 @parthandler('bookmarks')
2105 def handlebookmark(op, inpart):
2114 def handlebookmark(op, inpart):
2106 """transmit bookmark information
2115 """transmit bookmark information
2107
2116
2108 The part contains binary encoded bookmark information.
2117 The part contains binary encoded bookmark information.
2109
2118
2110 The exact behavior of this part can be controlled by the 'bookmarks' mode
2119 The exact behavior of this part can be controlled by the 'bookmarks' mode
2111 on the bundle operation.
2120 on the bundle operation.
2112
2121
2113 When mode is 'apply' (the default) the bookmark information is applied as
2122 When mode is 'apply' (the default) the bookmark information is applied as
2114 is to the unbundling repository. Make sure a 'check:bookmarks' part is
2123 is to the unbundling repository. Make sure a 'check:bookmarks' part is
2115 issued earlier to check for push races in such update. This behavior is
2124 issued earlier to check for push races in such update. This behavior is
2116 suitable for pushing.
2125 suitable for pushing.
2117
2126
2118 When mode is 'records', the information is recorded into the 'bookmarks'
2127 When mode is 'records', the information is recorded into the 'bookmarks'
2119 records of the bundle operation. This behavior is suitable for pulling.
2128 records of the bundle operation. This behavior is suitable for pulling.
2120 """
2129 """
2121 changes = bookmarks.binarydecode(inpart)
2130 changes = bookmarks.binarydecode(inpart)
2122
2131
2123 pushkeycompat = op.repo.ui.configbool('server', 'bookmarks-pushkey-compat')
2132 pushkeycompat = op.repo.ui.configbool('server', 'bookmarks-pushkey-compat')
2124 bookmarksmode = op.modes.get('bookmarks', 'apply')
2133 bookmarksmode = op.modes.get('bookmarks', 'apply')
2125
2134
2126 if bookmarksmode == 'apply':
2135 if bookmarksmode == 'apply':
2127 tr = op.gettransaction()
2136 tr = op.gettransaction()
2128 bookstore = op.repo._bookmarks
2137 bookstore = op.repo._bookmarks
2129 if pushkeycompat:
2138 if pushkeycompat:
2130 allhooks = []
2139 allhooks = []
2131 for book, node in changes:
2140 for book, node in changes:
2132 hookargs = tr.hookargs.copy()
2141 hookargs = tr.hookargs.copy()
2133 hookargs['pushkeycompat'] = '1'
2142 hookargs['pushkeycompat'] = '1'
2134 hookargs['namespace'] = 'bookmarks'
2143 hookargs['namespace'] = 'bookmarks'
2135 hookargs['key'] = book
2144 hookargs['key'] = book
2136 hookargs['old'] = nodemod.hex(bookstore.get(book, ''))
2145 hookargs['old'] = nodemod.hex(bookstore.get(book, ''))
2137 hookargs['new'] = nodemod.hex(node if node is not None else '')
2146 hookargs['new'] = nodemod.hex(node if node is not None else '')
2138 allhooks.append(hookargs)
2147 allhooks.append(hookargs)
2139
2148
2140 for hookargs in allhooks:
2149 for hookargs in allhooks:
2141 op.repo.hook('prepushkey', throw=True,
2150 op.repo.hook('prepushkey', throw=True,
2142 **pycompat.strkwargs(hookargs))
2151 **pycompat.strkwargs(hookargs))
2143
2152
2144 bookstore.applychanges(op.repo, op.gettransaction(), changes)
2153 bookstore.applychanges(op.repo, op.gettransaction(), changes)
2145
2154
2146 if pushkeycompat:
2155 if pushkeycompat:
2147 def runhook():
2156 def runhook():
2148 for hookargs in allhooks:
2157 for hookargs in allhooks:
2149 op.repo.hook('pushkey', **pycompat.strkwargs(hookargs))
2158 op.repo.hook('pushkey', **pycompat.strkwargs(hookargs))
2150 op.repo._afterlock(runhook)
2159 op.repo._afterlock(runhook)
2151
2160
2152 elif bookmarksmode == 'records':
2161 elif bookmarksmode == 'records':
2153 for book, node in changes:
2162 for book, node in changes:
2154 record = {'bookmark': book, 'node': node}
2163 record = {'bookmark': book, 'node': node}
2155 op.records.add('bookmarks', record)
2164 op.records.add('bookmarks', record)
2156 else:
2165 else:
2157 raise error.ProgrammingError('unkown bookmark mode: %s' % bookmarksmode)
2166 raise error.ProgrammingError('unkown bookmark mode: %s' % bookmarksmode)
2158
2167
2159 @parthandler('phase-heads')
2168 @parthandler('phase-heads')
2160 def handlephases(op, inpart):
2169 def handlephases(op, inpart):
2161 """apply phases from bundle part to repo"""
2170 """apply phases from bundle part to repo"""
2162 headsbyphase = phases.binarydecode(inpart)
2171 headsbyphase = phases.binarydecode(inpart)
2163 phases.updatephases(op.repo.unfiltered(), op.gettransaction, headsbyphase)
2172 phases.updatephases(op.repo.unfiltered(), op.gettransaction, headsbyphase)
2164
2173
2165 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
2174 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
2166 def handlepushkeyreply(op, inpart):
2175 def handlepushkeyreply(op, inpart):
2167 """retrieve the result of a pushkey request"""
2176 """retrieve the result of a pushkey request"""
2168 ret = int(inpart.params['return'])
2177 ret = int(inpart.params['return'])
2169 partid = int(inpart.params['in-reply-to'])
2178 partid = int(inpart.params['in-reply-to'])
2170 op.records.add('pushkey', {'return': ret}, partid)
2179 op.records.add('pushkey', {'return': ret}, partid)
2171
2180
2172 @parthandler('obsmarkers')
2181 @parthandler('obsmarkers')
2173 def handleobsmarker(op, inpart):
2182 def handleobsmarker(op, inpart):
2174 """add a stream of obsmarkers to the repo"""
2183 """add a stream of obsmarkers to the repo"""
2175 tr = op.gettransaction()
2184 tr = op.gettransaction()
2176 markerdata = inpart.read()
2185 markerdata = inpart.read()
2177 if op.ui.config('experimental', 'obsmarkers-exchange-debug'):
2186 if op.ui.config('experimental', 'obsmarkers-exchange-debug'):
2178 op.ui.write(('obsmarker-exchange: %i bytes received\n')
2187 op.ui.write(('obsmarker-exchange: %i bytes received\n')
2179 % len(markerdata))
2188 % len(markerdata))
2180 # The mergemarkers call will crash if marker creation is not enabled.
2189 # The mergemarkers call will crash if marker creation is not enabled.
2181 # we want to avoid this if the part is advisory.
2190 # we want to avoid this if the part is advisory.
2182 if not inpart.mandatory and op.repo.obsstore.readonly:
2191 if not inpart.mandatory and op.repo.obsstore.readonly:
2183 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled\n')
2192 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled\n')
2184 return
2193 return
2185 new = op.repo.obsstore.mergemarkers(tr, markerdata)
2194 new = op.repo.obsstore.mergemarkers(tr, markerdata)
2186 op.repo.invalidatevolatilesets()
2195 op.repo.invalidatevolatilesets()
2187 if new:
2196 if new:
2188 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
2197 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
2189 op.records.add('obsmarkers', {'new': new})
2198 op.records.add('obsmarkers', {'new': new})
2190 if op.reply is not None:
2199 if op.reply is not None:
2191 rpart = op.reply.newpart('reply:obsmarkers')
2200 rpart = op.reply.newpart('reply:obsmarkers')
2192 rpart.addparam(
2201 rpart.addparam(
2193 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
2202 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
2194 rpart.addparam('new', '%i' % new, mandatory=False)
2203 rpart.addparam('new', '%i' % new, mandatory=False)
2195
2204
2196
2205
2197 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
2206 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
2198 def handleobsmarkerreply(op, inpart):
2207 def handleobsmarkerreply(op, inpart):
2199 """retrieve the result of a pushkey request"""
2208 """retrieve the result of a pushkey request"""
2200 ret = int(inpart.params['new'])
2209 ret = int(inpart.params['new'])
2201 partid = int(inpart.params['in-reply-to'])
2210 partid = int(inpart.params['in-reply-to'])
2202 op.records.add('obsmarkers', {'new': ret}, partid)
2211 op.records.add('obsmarkers', {'new': ret}, partid)
2203
2212
2204 @parthandler('hgtagsfnodes')
2213 @parthandler('hgtagsfnodes')
2205 def handlehgtagsfnodes(op, inpart):
2214 def handlehgtagsfnodes(op, inpart):
2206 """Applies .hgtags fnodes cache entries to the local repo.
2215 """Applies .hgtags fnodes cache entries to the local repo.
2207
2216
2208 Payload is pairs of 20 byte changeset nodes and filenodes.
2217 Payload is pairs of 20 byte changeset nodes and filenodes.
2209 """
2218 """
2210 # Grab the transaction so we ensure that we have the lock at this point.
2219 # Grab the transaction so we ensure that we have the lock at this point.
2211 if op.ui.configbool('experimental', 'bundle2lazylocking'):
2220 if op.ui.configbool('experimental', 'bundle2lazylocking'):
2212 op.gettransaction()
2221 op.gettransaction()
2213 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
2222 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
2214
2223
2215 count = 0
2224 count = 0
2216 while True:
2225 while True:
2217 node = inpart.read(20)
2226 node = inpart.read(20)
2218 fnode = inpart.read(20)
2227 fnode = inpart.read(20)
2219 if len(node) < 20 or len(fnode) < 20:
2228 if len(node) < 20 or len(fnode) < 20:
2220 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
2229 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
2221 break
2230 break
2222 cache.setfnode(node, fnode)
2231 cache.setfnode(node, fnode)
2223 count += 1
2232 count += 1
2224
2233
2225 cache.write()
2234 cache.write()
2226 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
2235 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
2227
2236
2228 rbcstruct = struct.Struct('>III')
2237 rbcstruct = struct.Struct('>III')
2229
2238
2230 @parthandler('cache:rev-branch-cache')
2239 @parthandler('cache:rev-branch-cache')
2231 def handlerbc(op, inpart):
2240 def handlerbc(op, inpart):
2232 """receive a rev-branch-cache payload and update the local cache
2241 """receive a rev-branch-cache payload and update the local cache
2233
2242
2234 The payload is a series of data related to each branch
2243 The payload is a series of data related to each branch
2235
2244
2236 1) branch name length
2245 1) branch name length
2237 2) number of open heads
2246 2) number of open heads
2238 3) number of closed heads
2247 3) number of closed heads
2239 4) open heads nodes
2248 4) open heads nodes
2240 5) closed heads nodes
2249 5) closed heads nodes
2241 """
2250 """
2242 total = 0
2251 total = 0
2243 rawheader = inpart.read(rbcstruct.size)
2252 rawheader = inpart.read(rbcstruct.size)
2244 cache = op.repo.revbranchcache()
2253 cache = op.repo.revbranchcache()
2245 cl = op.repo.unfiltered().changelog
2254 cl = op.repo.unfiltered().changelog
2246 while rawheader:
2255 while rawheader:
2247 header = rbcstruct.unpack(rawheader)
2256 header = rbcstruct.unpack(rawheader)
2248 total += header[1] + header[2]
2257 total += header[1] + header[2]
2249 utf8branch = inpart.read(header[0])
2258 utf8branch = inpart.read(header[0])
2250 branch = encoding.tolocal(utf8branch)
2259 branch = encoding.tolocal(utf8branch)
2251 for x in pycompat.xrange(header[1]):
2260 for x in pycompat.xrange(header[1]):
2252 node = inpart.read(20)
2261 node = inpart.read(20)
2253 rev = cl.rev(node)
2262 rev = cl.rev(node)
2254 cache.setdata(branch, rev, node, False)
2263 cache.setdata(branch, rev, node, False)
2255 for x in pycompat.xrange(header[2]):
2264 for x in pycompat.xrange(header[2]):
2256 node = inpart.read(20)
2265 node = inpart.read(20)
2257 rev = cl.rev(node)
2266 rev = cl.rev(node)
2258 cache.setdata(branch, rev, node, True)
2267 cache.setdata(branch, rev, node, True)
2259 rawheader = inpart.read(rbcstruct.size)
2268 rawheader = inpart.read(rbcstruct.size)
2260 cache.write()
2269 cache.write()
2261
2270
2262 @parthandler('pushvars')
2271 @parthandler('pushvars')
2263 def bundle2getvars(op, part):
2272 def bundle2getvars(op, part):
2264 '''unbundle a bundle2 containing shellvars on the server'''
2273 '''unbundle a bundle2 containing shellvars on the server'''
2265 # An option to disable unbundling on server-side for security reasons
2274 # An option to disable unbundling on server-side for security reasons
2266 if op.ui.configbool('push', 'pushvars.server'):
2275 if op.ui.configbool('push', 'pushvars.server'):
2267 hookargs = {}
2276 hookargs = {}
2268 for key, value in part.advisoryparams:
2277 for key, value in part.advisoryparams:
2269 key = key.upper()
2278 key = key.upper()
2270 # We want pushed variables to have USERVAR_ prepended so we know
2279 # We want pushed variables to have USERVAR_ prepended so we know
2271 # they came from the --pushvar flag.
2280 # they came from the --pushvar flag.
2272 key = "USERVAR_" + key
2281 key = "USERVAR_" + key
2273 hookargs[key] = value
2282 hookargs[key] = value
2274 op.addhookargs(hookargs)
2283 op.addhookargs(hookargs)
2275
2284
2276 @parthandler('stream2', ('requirements', 'filecount', 'bytecount'))
2285 @parthandler('stream2', ('requirements', 'filecount', 'bytecount'))
2277 def handlestreamv2bundle(op, part):
2286 def handlestreamv2bundle(op, part):
2278
2287
2279 requirements = urlreq.unquote(part.params['requirements']).split(',')
2288 requirements = urlreq.unquote(part.params['requirements']).split(',')
2280 filecount = int(part.params['filecount'])
2289 filecount = int(part.params['filecount'])
2281 bytecount = int(part.params['bytecount'])
2290 bytecount = int(part.params['bytecount'])
2282
2291
2283 repo = op.repo
2292 repo = op.repo
2284 if len(repo):
2293 if len(repo):
2285 msg = _('cannot apply stream clone to non empty repository')
2294 msg = _('cannot apply stream clone to non empty repository')
2286 raise error.Abort(msg)
2295 raise error.Abort(msg)
2287
2296
2288 repo.ui.debug('applying stream bundle\n')
2297 repo.ui.debug('applying stream bundle\n')
2289 streamclone.applybundlev2(repo, part, filecount, bytecount,
2298 streamclone.applybundlev2(repo, part, filecount, bytecount,
2290 requirements)
2299 requirements)
2291
2300
2292 def widen_bundle(repo, oldmatcher, newmatcher, common, known, cgversion,
2301 def widen_bundle(repo, oldmatcher, newmatcher, common, known, cgversion,
2293 ellipses):
2302 ellipses):
2294 """generates bundle2 for widening a narrow clone
2303 """generates bundle2 for widening a narrow clone
2295
2304
2296 repo is the localrepository instance
2305 repo is the localrepository instance
2297 oldmatcher matches what the client already has
2306 oldmatcher matches what the client already has
2298 newmatcher matches what the client needs (including what it already has)
2307 newmatcher matches what the client needs (including what it already has)
2299 common is set of common heads between server and client
2308 common is set of common heads between server and client
2300 known is a set of revs known on the client side (used in ellipses)
2309 known is a set of revs known on the client side (used in ellipses)
2301 cgversion is the changegroup version to send
2310 cgversion is the changegroup version to send
2302 ellipses is boolean value telling whether to send ellipses data or not
2311 ellipses is boolean value telling whether to send ellipses data or not
2303
2312
2304 returns bundle2 of the data required for extending
2313 returns bundle2 of the data required for extending
2305 """
2314 """
2306 bundler = bundle20(repo.ui)
2315 bundler = bundle20(repo.ui)
2307 commonnodes = set()
2316 commonnodes = set()
2308 cl = repo.changelog
2317 cl = repo.changelog
2309 for r in repo.revs("::%ln", common):
2318 for r in repo.revs("::%ln", common):
2310 commonnodes.add(cl.node(r))
2319 commonnodes.add(cl.node(r))
2311 if commonnodes:
2320 if commonnodes:
2312 # XXX: we should only send the filelogs (and treemanifest). user
2321 # XXX: we should only send the filelogs (and treemanifest). user
2313 # already has the changelog and manifest
2322 # already has the changelog and manifest
2314 packer = changegroup.getbundler(cgversion, repo,
2323 packer = changegroup.getbundler(cgversion, repo,
2315 oldmatcher=oldmatcher,
2324 oldmatcher=oldmatcher,
2316 matcher=newmatcher,
2325 matcher=newmatcher,
2317 fullnodes=commonnodes)
2326 fullnodes=commonnodes)
2318 cgdata = packer.generate({nodemod.nullid}, list(commonnodes),
2327 cgdata = packer.generate({nodemod.nullid}, list(commonnodes),
2319 False, 'narrow_widen', changelog=False)
2328 False, 'narrow_widen', changelog=False)
2320
2329
2321 part = bundler.newpart('changegroup', data=cgdata)
2330 part = bundler.newpart('changegroup', data=cgdata)
2322 part.addparam('version', cgversion)
2331 part.addparam('version', cgversion)
2323 if 'treemanifest' in repo.requirements:
2332 if 'treemanifest' in repo.requirements:
2324 part.addparam('treemanifest', '1')
2333 part.addparam('treemanifest', '1')
2325
2334
2326 return bundler
2335 return bundler
@@ -1,157 +1,189 b''
1 #require no-chg
1 #require no-chg
2
2
3 $ hg init repo
3 $ hg init repo
4 $ cd repo
4 $ cd repo
5 $ echo foo > foo
5 $ echo foo > foo
6 $ hg ci -qAm 'add foo'
6 $ hg ci -qAm 'add foo'
7 $ echo >> foo
7 $ echo >> foo
8 $ hg ci -m 'change foo'
8 $ hg ci -m 'change foo'
9 $ hg up -qC 0
9 $ hg up -qC 0
10 $ echo bar > bar
10 $ echo bar > bar
11 $ hg ci -qAm 'add bar'
11 $ hg ci -qAm 'add bar'
12
12
13 $ hg log
13 $ hg log
14 changeset: 2:effea6de0384
14 changeset: 2:effea6de0384
15 tag: tip
15 tag: tip
16 parent: 0:bbd179dfa0a7
16 parent: 0:bbd179dfa0a7
17 user: test
17 user: test
18 date: Thu Jan 01 00:00:00 1970 +0000
18 date: Thu Jan 01 00:00:00 1970 +0000
19 summary: add bar
19 summary: add bar
20
20
21 changeset: 1:ed1b79f46b9a
21 changeset: 1:ed1b79f46b9a
22 user: test
22 user: test
23 date: Thu Jan 01 00:00:00 1970 +0000
23 date: Thu Jan 01 00:00:00 1970 +0000
24 summary: change foo
24 summary: change foo
25
25
26 changeset: 0:bbd179dfa0a7
26 changeset: 0:bbd179dfa0a7
27 user: test
27 user: test
28 date: Thu Jan 01 00:00:00 1970 +0000
28 date: Thu Jan 01 00:00:00 1970 +0000
29 summary: add foo
29 summary: add foo
30
30
31 $ cd ..
31 $ cd ..
32
32
33 Test pullbundle functionality
33 Test pullbundle functionality
34
34
35 $ cd repo
35 $ cd repo
36 $ cat <<EOF > .hg/hgrc
36 $ cat <<EOF > .hg/hgrc
37 > [server]
37 > [server]
38 > pullbundle = True
38 > pullbundle = True
39 > [extensions]
39 > [extensions]
40 > blackbox =
40 > blackbox =
41 > EOF
41 > EOF
42 $ hg bundle --base null -r 0 .hg/0.hg
42 $ hg bundle --base null -r 0 .hg/0.hg
43 1 changesets found
43 1 changesets found
44 $ hg bundle --base 0 -r 1 .hg/1.hg
44 $ hg bundle --base 0 -r 1 .hg/1.hg
45 1 changesets found
45 1 changesets found
46 $ hg bundle --base 1 -r 2 .hg/2.hg
46 $ hg bundle --base 1 -r 2 .hg/2.hg
47 1 changesets found
47 1 changesets found
48 $ cat <<EOF > .hg/pullbundles.manifest
48 $ cat <<EOF > .hg/pullbundles.manifest
49 > 2.hg BUNDLESPEC=none-v2 heads=effea6de0384e684f44435651cb7bd70b8735bd4 bases=bbd179dfa0a71671c253b3ae0aa1513b60d199fa
49 > 2.hg BUNDLESPEC=none-v2 heads=effea6de0384e684f44435651cb7bd70b8735bd4 bases=bbd179dfa0a71671c253b3ae0aa1513b60d199fa
50 > 1.hg BUNDLESPEC=bzip2-v2 heads=ed1b79f46b9a29f5a6efa59cf12fcfca43bead5a bases=bbd179dfa0a71671c253b3ae0aa1513b60d199fa
50 > 1.hg BUNDLESPEC=bzip2-v2 heads=ed1b79f46b9a29f5a6efa59cf12fcfca43bead5a bases=bbd179dfa0a71671c253b3ae0aa1513b60d199fa
51 > 0.hg BUNDLESPEC=gzip-v2 heads=bbd179dfa0a71671c253b3ae0aa1513b60d199fa
51 > 0.hg BUNDLESPEC=gzip-v2 heads=bbd179dfa0a71671c253b3ae0aa1513b60d199fa
52 > EOF
52 > EOF
53 $ hg --config blackbox.track=debug --debug serve -p $HGPORT2 -d --pid-file=../repo.pid
53 $ hg --config blackbox.track=debug --debug serve -p $HGPORT2 -d --pid-file=../repo.pid
54 listening at http://*:$HGPORT2/ (bound to $LOCALIP:$HGPORT2) (glob) (?)
54 listening at http://*:$HGPORT2/ (bound to $LOCALIP:$HGPORT2) (glob) (?)
55 $ cat ../repo.pid >> $DAEMON_PIDS
55 $ cat ../repo.pid >> $DAEMON_PIDS
56 $ cd ..
56 $ cd ..
57 $ hg clone -r 0 http://localhost:$HGPORT2/ repo.pullbundle
57 $ hg clone -r 0 http://localhost:$HGPORT2/ repo.pullbundle
58 adding changesets
58 adding changesets
59 adding manifests
59 adding manifests
60 adding file changes
60 adding file changes
61 added 1 changesets with 1 changes to 1 files
61 added 1 changesets with 1 changes to 1 files
62 new changesets bbd179dfa0a7 (1 drafts)
62 new changesets bbd179dfa0a7 (1 drafts)
63 updating to branch default
63 updating to branch default
64 1 files updated, 0 files merged, 0 files removed, 0 files unresolved
64 1 files updated, 0 files merged, 0 files removed, 0 files unresolved
65 $ cd repo.pullbundle
65 $ cd repo.pullbundle
66 $ hg pull -r 1
66 $ hg pull -r 1
67 pulling from http://localhost:$HGPORT2/
67 pulling from http://localhost:$HGPORT2/
68 searching for changes
68 searching for changes
69 adding changesets
69 adding changesets
70 adding manifests
70 adding manifests
71 adding file changes
71 adding file changes
72 added 1 changesets with 1 changes to 1 files
72 added 1 changesets with 1 changes to 1 files
73 new changesets ed1b79f46b9a (1 drafts)
73 new changesets ed1b79f46b9a (1 drafts)
74 (run 'hg update' to get a working copy)
74 (run 'hg update' to get a working copy)
75 $ hg pull -r 2
75 $ hg pull -r 2
76 pulling from http://localhost:$HGPORT2/
76 pulling from http://localhost:$HGPORT2/
77 searching for changes
77 searching for changes
78 adding changesets
78 adding changesets
79 adding manifests
79 adding manifests
80 adding file changes
80 adding file changes
81 added 1 changesets with 1 changes to 1 files (+1 heads)
81 added 1 changesets with 1 changes to 1 files (+1 heads)
82 new changesets effea6de0384 (1 drafts)
82 new changesets effea6de0384 (1 drafts)
83 (run 'hg heads' to see heads, 'hg merge' to merge)
83 (run 'hg heads' to see heads, 'hg merge' to merge)
84 $ cd ..
84 $ cd ..
85 $ killdaemons.py
85 $ killdaemons.py
86 $ grep 'sending pullbundle ' repo/.hg/blackbox.log
86 $ grep 'sending pullbundle ' repo/.hg/blackbox.log
87 * sending pullbundle "0.hg" (glob)
87 * sending pullbundle "0.hg" (glob)
88 * sending pullbundle "1.hg" (glob)
88 * sending pullbundle "1.hg" (glob)
89 * sending pullbundle "2.hg" (glob)
89 * sending pullbundle "2.hg" (glob)
90 $ rm repo/.hg/blackbox.log
90 $ rm repo/.hg/blackbox.log
91
91
92 Test pullbundle functionality for incremental pulls
92 Test pullbundle functionality for incremental pulls
93
93
94 $ cd repo
94 $ cd repo
95 $ hg --config blackbox.track=debug --debug serve -p $HGPORT2 -d --pid-file=../repo.pid
95 $ hg --config blackbox.track=debug --debug serve -p $HGPORT2 -d --pid-file=../repo.pid
96 listening at http://*:$HGPORT2/ (bound to $LOCALIP:$HGPORT2) (glob) (?)
96 listening at http://*:$HGPORT2/ (bound to $LOCALIP:$HGPORT2) (glob) (?)
97 $ cat ../repo.pid >> $DAEMON_PIDS
97 $ cat ../repo.pid >> $DAEMON_PIDS
98 $ cd ..
98 $ cd ..
99 $ hg clone http://localhost:$HGPORT2/ repo.pullbundle2
99 $ hg clone http://localhost:$HGPORT2/ repo.pullbundle2
100 requesting all changes
100 requesting all changes
101 adding changesets
101 adding changesets
102 adding manifests
102 adding manifests
103 adding file changes
103 adding file changes
104 added 1 changesets with 1 changes to 1 files
104 added 1 changesets with 1 changes to 1 files
105 adding changesets
105 adding changesets
106 adding manifests
106 adding manifests
107 adding file changes
107 adding file changes
108 added 1 changesets with 1 changes to 1 files
108 added 1 changesets with 1 changes to 1 files
109 adding changesets
109 adding changesets
110 adding manifests
110 adding manifests
111 adding file changes
111 adding file changes
112 added 1 changesets with 1 changes to 1 files (+1 heads)
112 added 1 changesets with 1 changes to 1 files (+1 heads)
113 new changesets bbd179dfa0a7:ed1b79f46b9a (3 drafts)
113 new changesets bbd179dfa0a7:ed1b79f46b9a (3 drafts)
114 updating to branch default
114 updating to branch default
115 1 files updated, 0 files merged, 0 files removed, 0 files unresolved
115 1 files updated, 0 files merged, 0 files removed, 0 files unresolved
116 $ killdaemons.py
116 $ killdaemons.py
117 $ grep 'sending pullbundle ' repo/.hg/blackbox.log
117 $ grep 'sending pullbundle ' repo/.hg/blackbox.log
118 * sending pullbundle "0.hg" (glob)
118 * sending pullbundle "0.hg" (glob)
119 * sending pullbundle "2.hg" (glob)
119 * sending pullbundle "2.hg" (glob)
120 * sending pullbundle "1.hg" (glob)
120 * sending pullbundle "1.hg" (glob)
121 $ rm repo/.hg/blackbox.log
121 $ rm repo/.hg/blackbox.log
122
122
123 Test pullbundle functionality for incoming
124
125 $ cd repo
126 $ hg --config blackbox.track=debug --debug serve -p $HGPORT2 -d --pid-file=../repo.pid
127 listening at http://*:$HGPORT2/ (bound to $LOCALIP:$HGPORT2) (glob) (?)
128 $ cat ../repo.pid >> $DAEMON_PIDS
129 $ cd ..
130 $ hg clone http://localhost:$HGPORT2/ repo.pullbundle2a -r 0
131 adding changesets
132 adding manifests
133 adding file changes
134 added 1 changesets with 1 changes to 1 files
135 new changesets bbd179dfa0a7 (1 drafts)
136 updating to branch default
137 1 files updated, 0 files merged, 0 files removed, 0 files unresolved
138 $ cd repo.pullbundle2a
139 $ hg incoming -r ed1b79f46b9a
140 comparing with http://localhost:$HGPORT2/
141 searching for changes
142 changeset: 1:ed1b79f46b9a
143 tag: tip
144 user: test
145 date: Thu Jan 01 00:00:00 1970 +0000
146 summary: change foo
147
148 $ cd ..
149 $ killdaemons.py
150 $ grep 'sending pullbundle ' repo/.hg/blackbox.log
151 * sending pullbundle "0.hg" (glob)
152 * sending pullbundle "1.hg" (glob)
153 $ rm repo/.hg/blackbox.log
154
123 Test recovery from misconfigured server sending no new data
155 Test recovery from misconfigured server sending no new data
124
156
125 $ cd repo
157 $ cd repo
126 $ cat <<EOF > .hg/pullbundles.manifest
158 $ cat <<EOF > .hg/pullbundles.manifest
127 > 0.hg heads=ed1b79f46b9a29f5a6efa59cf12fcfca43bead5a bases=bbd179dfa0a71671c253b3ae0aa1513b60d199fa
159 > 0.hg heads=ed1b79f46b9a29f5a6efa59cf12fcfca43bead5a bases=bbd179dfa0a71671c253b3ae0aa1513b60d199fa
128 > 0.hg heads=bbd179dfa0a71671c253b3ae0aa1513b60d199fa
160 > 0.hg heads=bbd179dfa0a71671c253b3ae0aa1513b60d199fa
129 > EOF
161 > EOF
130 $ hg --config blackbox.track=debug --debug serve -p $HGPORT2 -d --pid-file=../repo.pid
162 $ hg --config blackbox.track=debug --debug serve -p $HGPORT2 -d --pid-file=../repo.pid
131 listening at http://*:$HGPORT2/ (bound to $LOCALIP:$HGPORT2) (glob) (?)
163 listening at http://*:$HGPORT2/ (bound to $LOCALIP:$HGPORT2) (glob) (?)
132 $ cat ../repo.pid >> $DAEMON_PIDS
164 $ cat ../repo.pid >> $DAEMON_PIDS
133 $ cd ..
165 $ cd ..
134 $ hg clone -r 0 http://localhost:$HGPORT2/ repo.pullbundle3
166 $ hg clone -r 0 http://localhost:$HGPORT2/ repo.pullbundle3
135 adding changesets
167 adding changesets
136 adding manifests
168 adding manifests
137 adding file changes
169 adding file changes
138 added 1 changesets with 1 changes to 1 files
170 added 1 changesets with 1 changes to 1 files
139 new changesets bbd179dfa0a7 (1 drafts)
171 new changesets bbd179dfa0a7 (1 drafts)
140 updating to branch default
172 updating to branch default
141 1 files updated, 0 files merged, 0 files removed, 0 files unresolved
173 1 files updated, 0 files merged, 0 files removed, 0 files unresolved
142 $ cd repo.pullbundle3
174 $ cd repo.pullbundle3
143 $ hg pull -r 1
175 $ hg pull -r 1
144 pulling from http://localhost:$HGPORT2/
176 pulling from http://localhost:$HGPORT2/
145 searching for changes
177 searching for changes
146 adding changesets
178 adding changesets
147 adding manifests
179 adding manifests
148 adding file changes
180 adding file changes
149 added 0 changesets with 0 changes to 1 files
181 added 0 changesets with 0 changes to 1 files
150 abort: 00changelog.i@ed1b79f46b9a: no node!
182 abort: 00changelog.i@ed1b79f46b9a: no node!
151 [255]
183 [255]
152 $ cd ..
184 $ cd ..
153 $ killdaemons.py
185 $ killdaemons.py
154 $ grep 'sending pullbundle ' repo/.hg/blackbox.log
186 $ grep 'sending pullbundle ' repo/.hg/blackbox.log
155 * sending pullbundle "0.hg" (glob)
187 * sending pullbundle "0.hg" (glob)
156 * sending pullbundle "0.hg" (glob)
188 * sending pullbundle "0.hg" (glob)
157 $ rm repo/.hg/blackbox.log
189 $ rm repo/.hg/blackbox.log
General Comments 0
You need to be logged in to leave comments. Login now