##// END OF EJS Templates
bundle2: support a 'records' mode for the 'bookmarks' part...
Boris Feld -
r35267:496154e4 default
parent child Browse files
Show More
@@ -1,2101 +1,2116 b''
1 # bundle2.py - generic container format to transmit arbitrary data.
1 # bundle2.py - generic container format to transmit arbitrary data.
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7 """Handling of the new bundle2 format
7 """Handling of the new bundle2 format
8
8
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 payloads in an application agnostic way. It consist in a sequence of "parts"
10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 that will be handed to and processed by the application layer.
11 that will be handed to and processed by the application layer.
12
12
13
13
14 General format architecture
14 General format architecture
15 ===========================
15 ===========================
16
16
17 The format is architectured as follow
17 The format is architectured as follow
18
18
19 - magic string
19 - magic string
20 - stream level parameters
20 - stream level parameters
21 - payload parts (any number)
21 - payload parts (any number)
22 - end of stream marker.
22 - end of stream marker.
23
23
24 the Binary format
24 the Binary format
25 ============================
25 ============================
26
26
27 All numbers are unsigned and big-endian.
27 All numbers are unsigned and big-endian.
28
28
29 stream level parameters
29 stream level parameters
30 ------------------------
30 ------------------------
31
31
32 Binary format is as follow
32 Binary format is as follow
33
33
34 :params size: int32
34 :params size: int32
35
35
36 The total number of Bytes used by the parameters
36 The total number of Bytes used by the parameters
37
37
38 :params value: arbitrary number of Bytes
38 :params value: arbitrary number of Bytes
39
39
40 A blob of `params size` containing the serialized version of all stream level
40 A blob of `params size` containing the serialized version of all stream level
41 parameters.
41 parameters.
42
42
43 The blob contains a space separated list of parameters. Parameters with value
43 The blob contains a space separated list of parameters. Parameters with value
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45
45
46 Empty name are obviously forbidden.
46 Empty name are obviously forbidden.
47
47
48 Name MUST start with a letter. If this first letter is lower case, the
48 Name MUST start with a letter. If this first letter is lower case, the
49 parameter is advisory and can be safely ignored. However when the first
49 parameter is advisory and can be safely ignored. However when the first
50 letter is capital, the parameter is mandatory and the bundling process MUST
50 letter is capital, the parameter is mandatory and the bundling process MUST
51 stop if he is not able to proceed it.
51 stop if he is not able to proceed it.
52
52
53 Stream parameters use a simple textual format for two main reasons:
53 Stream parameters use a simple textual format for two main reasons:
54
54
55 - Stream level parameters should remain simple and we want to discourage any
55 - Stream level parameters should remain simple and we want to discourage any
56 crazy usage.
56 crazy usage.
57 - Textual data allow easy human inspection of a bundle2 header in case of
57 - Textual data allow easy human inspection of a bundle2 header in case of
58 troubles.
58 troubles.
59
59
60 Any Applicative level options MUST go into a bundle2 part instead.
60 Any Applicative level options MUST go into a bundle2 part instead.
61
61
62 Payload part
62 Payload part
63 ------------------------
63 ------------------------
64
64
65 Binary format is as follow
65 Binary format is as follow
66
66
67 :header size: int32
67 :header size: int32
68
68
69 The total number of Bytes used by the part header. When the header is empty
69 The total number of Bytes used by the part header. When the header is empty
70 (size = 0) this is interpreted as the end of stream marker.
70 (size = 0) this is interpreted as the end of stream marker.
71
71
72 :header:
72 :header:
73
73
74 The header defines how to interpret the part. It contains two piece of
74 The header defines how to interpret the part. It contains two piece of
75 data: the part type, and the part parameters.
75 data: the part type, and the part parameters.
76
76
77 The part type is used to route an application level handler, that can
77 The part type is used to route an application level handler, that can
78 interpret payload.
78 interpret payload.
79
79
80 Part parameters are passed to the application level handler. They are
80 Part parameters are passed to the application level handler. They are
81 meant to convey information that will help the application level object to
81 meant to convey information that will help the application level object to
82 interpret the part payload.
82 interpret the part payload.
83
83
84 The binary format of the header is has follow
84 The binary format of the header is has follow
85
85
86 :typesize: (one byte)
86 :typesize: (one byte)
87
87
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89
89
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 to this part.
91 to this part.
92
92
93 :parameters:
93 :parameters:
94
94
95 Part's parameter may have arbitrary content, the binary structure is::
95 Part's parameter may have arbitrary content, the binary structure is::
96
96
97 <mandatory-count><advisory-count><param-sizes><param-data>
97 <mandatory-count><advisory-count><param-sizes><param-data>
98
98
99 :mandatory-count: 1 byte, number of mandatory parameters
99 :mandatory-count: 1 byte, number of mandatory parameters
100
100
101 :advisory-count: 1 byte, number of advisory parameters
101 :advisory-count: 1 byte, number of advisory parameters
102
102
103 :param-sizes:
103 :param-sizes:
104
104
105 N couple of bytes, where N is the total number of parameters. Each
105 N couple of bytes, where N is the total number of parameters. Each
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107
107
108 :param-data:
108 :param-data:
109
109
110 A blob of bytes from which each parameter key and value can be
110 A blob of bytes from which each parameter key and value can be
111 retrieved using the list of size couples stored in the previous
111 retrieved using the list of size couples stored in the previous
112 field.
112 field.
113
113
114 Mandatory parameters comes first, then the advisory ones.
114 Mandatory parameters comes first, then the advisory ones.
115
115
116 Each parameter's key MUST be unique within the part.
116 Each parameter's key MUST be unique within the part.
117
117
118 :payload:
118 :payload:
119
119
120 payload is a series of `<chunksize><chunkdata>`.
120 payload is a series of `<chunksize><chunkdata>`.
121
121
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124
124
125 The current implementation always produces either zero or one chunk.
125 The current implementation always produces either zero or one chunk.
126 This is an implementation limitation that will ultimately be lifted.
126 This is an implementation limitation that will ultimately be lifted.
127
127
128 `chunksize` can be negative to trigger special case processing. No such
128 `chunksize` can be negative to trigger special case processing. No such
129 processing is in place yet.
129 processing is in place yet.
130
130
131 Bundle processing
131 Bundle processing
132 ============================
132 ============================
133
133
134 Each part is processed in order using a "part handler". Handler are registered
134 Each part is processed in order using a "part handler". Handler are registered
135 for a certain part type.
135 for a certain part type.
136
136
137 The matching of a part to its handler is case insensitive. The case of the
137 The matching of a part to its handler is case insensitive. The case of the
138 part type is used to know if a part is mandatory or advisory. If the Part type
138 part type is used to know if a part is mandatory or advisory. If the Part type
139 contains any uppercase char it is considered mandatory. When no handler is
139 contains any uppercase char it is considered mandatory. When no handler is
140 known for a Mandatory part, the process is aborted and an exception is raised.
140 known for a Mandatory part, the process is aborted and an exception is raised.
141 If the part is advisory and no handler is known, the part is ignored. When the
141 If the part is advisory and no handler is known, the part is ignored. When the
142 process is aborted, the full bundle is still read from the stream to keep the
142 process is aborted, the full bundle is still read from the stream to keep the
143 channel usable. But none of the part read from an abort are processed. In the
143 channel usable. But none of the part read from an abort are processed. In the
144 future, dropping the stream may become an option for channel we do not care to
144 future, dropping the stream may become an option for channel we do not care to
145 preserve.
145 preserve.
146 """
146 """
147
147
148 from __future__ import absolute_import, division
148 from __future__ import absolute_import, division
149
149
150 import errno
150 import errno
151 import os
151 import os
152 import re
152 import re
153 import string
153 import string
154 import struct
154 import struct
155 import sys
155 import sys
156
156
157 from .i18n import _
157 from .i18n import _
158 from . import (
158 from . import (
159 bookmarks,
159 bookmarks,
160 changegroup,
160 changegroup,
161 error,
161 error,
162 node as nodemod,
162 node as nodemod,
163 obsolete,
163 obsolete,
164 phases,
164 phases,
165 pushkey,
165 pushkey,
166 pycompat,
166 pycompat,
167 tags,
167 tags,
168 url,
168 url,
169 util,
169 util,
170 )
170 )
171
171
172 urlerr = util.urlerr
172 urlerr = util.urlerr
173 urlreq = util.urlreq
173 urlreq = util.urlreq
174
174
175 _pack = struct.pack
175 _pack = struct.pack
176 _unpack = struct.unpack
176 _unpack = struct.unpack
177
177
178 _fstreamparamsize = '>i'
178 _fstreamparamsize = '>i'
179 _fpartheadersize = '>i'
179 _fpartheadersize = '>i'
180 _fparttypesize = '>B'
180 _fparttypesize = '>B'
181 _fpartid = '>I'
181 _fpartid = '>I'
182 _fpayloadsize = '>i'
182 _fpayloadsize = '>i'
183 _fpartparamcount = '>BB'
183 _fpartparamcount = '>BB'
184
184
185 preferedchunksize = 4096
185 preferedchunksize = 4096
186
186
187 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
187 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
188
188
189 def outdebug(ui, message):
189 def outdebug(ui, message):
190 """debug regarding output stream (bundling)"""
190 """debug regarding output stream (bundling)"""
191 if ui.configbool('devel', 'bundle2.debug'):
191 if ui.configbool('devel', 'bundle2.debug'):
192 ui.debug('bundle2-output: %s\n' % message)
192 ui.debug('bundle2-output: %s\n' % message)
193
193
194 def indebug(ui, message):
194 def indebug(ui, message):
195 """debug on input stream (unbundling)"""
195 """debug on input stream (unbundling)"""
196 if ui.configbool('devel', 'bundle2.debug'):
196 if ui.configbool('devel', 'bundle2.debug'):
197 ui.debug('bundle2-input: %s\n' % message)
197 ui.debug('bundle2-input: %s\n' % message)
198
198
199 def validateparttype(parttype):
199 def validateparttype(parttype):
200 """raise ValueError if a parttype contains invalid character"""
200 """raise ValueError if a parttype contains invalid character"""
201 if _parttypeforbidden.search(parttype):
201 if _parttypeforbidden.search(parttype):
202 raise ValueError(parttype)
202 raise ValueError(parttype)
203
203
204 def _makefpartparamsizes(nbparams):
204 def _makefpartparamsizes(nbparams):
205 """return a struct format to read part parameter sizes
205 """return a struct format to read part parameter sizes
206
206
207 The number parameters is variable so we need to build that format
207 The number parameters is variable so we need to build that format
208 dynamically.
208 dynamically.
209 """
209 """
210 return '>'+('BB'*nbparams)
210 return '>'+('BB'*nbparams)
211
211
212 parthandlermapping = {}
212 parthandlermapping = {}
213
213
214 def parthandler(parttype, params=()):
214 def parthandler(parttype, params=()):
215 """decorator that register a function as a bundle2 part handler
215 """decorator that register a function as a bundle2 part handler
216
216
217 eg::
217 eg::
218
218
219 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
219 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
220 def myparttypehandler(...):
220 def myparttypehandler(...):
221 '''process a part of type "my part".'''
221 '''process a part of type "my part".'''
222 ...
222 ...
223 """
223 """
224 validateparttype(parttype)
224 validateparttype(parttype)
225 def _decorator(func):
225 def _decorator(func):
226 lparttype = parttype.lower() # enforce lower case matching.
226 lparttype = parttype.lower() # enforce lower case matching.
227 assert lparttype not in parthandlermapping
227 assert lparttype not in parthandlermapping
228 parthandlermapping[lparttype] = func
228 parthandlermapping[lparttype] = func
229 func.params = frozenset(params)
229 func.params = frozenset(params)
230 return func
230 return func
231 return _decorator
231 return _decorator
232
232
233 class unbundlerecords(object):
233 class unbundlerecords(object):
234 """keep record of what happens during and unbundle
234 """keep record of what happens during and unbundle
235
235
236 New records are added using `records.add('cat', obj)`. Where 'cat' is a
236 New records are added using `records.add('cat', obj)`. Where 'cat' is a
237 category of record and obj is an arbitrary object.
237 category of record and obj is an arbitrary object.
238
238
239 `records['cat']` will return all entries of this category 'cat'.
239 `records['cat']` will return all entries of this category 'cat'.
240
240
241 Iterating on the object itself will yield `('category', obj)` tuples
241 Iterating on the object itself will yield `('category', obj)` tuples
242 for all entries.
242 for all entries.
243
243
244 All iterations happens in chronological order.
244 All iterations happens in chronological order.
245 """
245 """
246
246
247 def __init__(self):
247 def __init__(self):
248 self._categories = {}
248 self._categories = {}
249 self._sequences = []
249 self._sequences = []
250 self._replies = {}
250 self._replies = {}
251
251
252 def add(self, category, entry, inreplyto=None):
252 def add(self, category, entry, inreplyto=None):
253 """add a new record of a given category.
253 """add a new record of a given category.
254
254
255 The entry can then be retrieved in the list returned by
255 The entry can then be retrieved in the list returned by
256 self['category']."""
256 self['category']."""
257 self._categories.setdefault(category, []).append(entry)
257 self._categories.setdefault(category, []).append(entry)
258 self._sequences.append((category, entry))
258 self._sequences.append((category, entry))
259 if inreplyto is not None:
259 if inreplyto is not None:
260 self.getreplies(inreplyto).add(category, entry)
260 self.getreplies(inreplyto).add(category, entry)
261
261
262 def getreplies(self, partid):
262 def getreplies(self, partid):
263 """get the records that are replies to a specific part"""
263 """get the records that are replies to a specific part"""
264 return self._replies.setdefault(partid, unbundlerecords())
264 return self._replies.setdefault(partid, unbundlerecords())
265
265
266 def __getitem__(self, cat):
266 def __getitem__(self, cat):
267 return tuple(self._categories.get(cat, ()))
267 return tuple(self._categories.get(cat, ()))
268
268
269 def __iter__(self):
269 def __iter__(self):
270 return iter(self._sequences)
270 return iter(self._sequences)
271
271
272 def __len__(self):
272 def __len__(self):
273 return len(self._sequences)
273 return len(self._sequences)
274
274
275 def __nonzero__(self):
275 def __nonzero__(self):
276 return bool(self._sequences)
276 return bool(self._sequences)
277
277
278 __bool__ = __nonzero__
278 __bool__ = __nonzero__
279
279
280 class bundleoperation(object):
280 class bundleoperation(object):
281 """an object that represents a single bundling process
281 """an object that represents a single bundling process
282
282
283 Its purpose is to carry unbundle-related objects and states.
283 Its purpose is to carry unbundle-related objects and states.
284
284
285 A new object should be created at the beginning of each bundle processing.
285 A new object should be created at the beginning of each bundle processing.
286 The object is to be returned by the processing function.
286 The object is to be returned by the processing function.
287
287
288 The object has very little content now it will ultimately contain:
288 The object has very little content now it will ultimately contain:
289 * an access to the repo the bundle is applied to,
289 * an access to the repo the bundle is applied to,
290 * a ui object,
290 * a ui object,
291 * a way to retrieve a transaction to add changes to the repo,
291 * a way to retrieve a transaction to add changes to the repo,
292 * a way to record the result of processing each part,
292 * a way to record the result of processing each part,
293 * a way to construct a bundle response when applicable.
293 * a way to construct a bundle response when applicable.
294 """
294 """
295
295
296 def __init__(self, repo, transactiongetter, captureoutput=True):
296 def __init__(self, repo, transactiongetter, captureoutput=True):
297 self.repo = repo
297 self.repo = repo
298 self.ui = repo.ui
298 self.ui = repo.ui
299 self.records = unbundlerecords()
299 self.records = unbundlerecords()
300 self.reply = None
300 self.reply = None
301 self.captureoutput = captureoutput
301 self.captureoutput = captureoutput
302 self.hookargs = {}
302 self.hookargs = {}
303 self._gettransaction = transactiongetter
303 self._gettransaction = transactiongetter
304 # carries value that can modify part behavior
304 # carries value that can modify part behavior
305 self.modes = {}
305 self.modes = {}
306
306
307 def gettransaction(self):
307 def gettransaction(self):
308 transaction = self._gettransaction()
308 transaction = self._gettransaction()
309
309
310 if self.hookargs:
310 if self.hookargs:
311 # the ones added to the transaction supercede those added
311 # the ones added to the transaction supercede those added
312 # to the operation.
312 # to the operation.
313 self.hookargs.update(transaction.hookargs)
313 self.hookargs.update(transaction.hookargs)
314 transaction.hookargs = self.hookargs
314 transaction.hookargs = self.hookargs
315
315
316 # mark the hookargs as flushed. further attempts to add to
316 # mark the hookargs as flushed. further attempts to add to
317 # hookargs will result in an abort.
317 # hookargs will result in an abort.
318 self.hookargs = None
318 self.hookargs = None
319
319
320 return transaction
320 return transaction
321
321
322 def addhookargs(self, hookargs):
322 def addhookargs(self, hookargs):
323 if self.hookargs is None:
323 if self.hookargs is None:
324 raise error.ProgrammingError('attempted to add hookargs to '
324 raise error.ProgrammingError('attempted to add hookargs to '
325 'operation after transaction started')
325 'operation after transaction started')
326 self.hookargs.update(hookargs)
326 self.hookargs.update(hookargs)
327
327
328 class TransactionUnavailable(RuntimeError):
328 class TransactionUnavailable(RuntimeError):
329 pass
329 pass
330
330
331 def _notransaction():
331 def _notransaction():
332 """default method to get a transaction while processing a bundle
332 """default method to get a transaction while processing a bundle
333
333
334 Raise an exception to highlight the fact that no transaction was expected
334 Raise an exception to highlight the fact that no transaction was expected
335 to be created"""
335 to be created"""
336 raise TransactionUnavailable()
336 raise TransactionUnavailable()
337
337
338 def applybundle(repo, unbundler, tr, source=None, url=None, **kwargs):
338 def applybundle(repo, unbundler, tr, source=None, url=None, **kwargs):
339 # transform me into unbundler.apply() as soon as the freeze is lifted
339 # transform me into unbundler.apply() as soon as the freeze is lifted
340 if isinstance(unbundler, unbundle20):
340 if isinstance(unbundler, unbundle20):
341 tr.hookargs['bundle2'] = '1'
341 tr.hookargs['bundle2'] = '1'
342 if source is not None and 'source' not in tr.hookargs:
342 if source is not None and 'source' not in tr.hookargs:
343 tr.hookargs['source'] = source
343 tr.hookargs['source'] = source
344 if url is not None and 'url' not in tr.hookargs:
344 if url is not None and 'url' not in tr.hookargs:
345 tr.hookargs['url'] = url
345 tr.hookargs['url'] = url
346 return processbundle(repo, unbundler, lambda: tr)
346 return processbundle(repo, unbundler, lambda: tr)
347 else:
347 else:
348 # the transactiongetter won't be used, but we might as well set it
348 # the transactiongetter won't be used, but we might as well set it
349 op = bundleoperation(repo, lambda: tr)
349 op = bundleoperation(repo, lambda: tr)
350 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
350 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
351 return op
351 return op
352
352
353 class partiterator(object):
353 class partiterator(object):
354 def __init__(self, repo, op, unbundler):
354 def __init__(self, repo, op, unbundler):
355 self.repo = repo
355 self.repo = repo
356 self.op = op
356 self.op = op
357 self.unbundler = unbundler
357 self.unbundler = unbundler
358 self.iterator = None
358 self.iterator = None
359 self.count = 0
359 self.count = 0
360 self.current = None
360 self.current = None
361
361
362 def __enter__(self):
362 def __enter__(self):
363 def func():
363 def func():
364 itr = enumerate(self.unbundler.iterparts())
364 itr = enumerate(self.unbundler.iterparts())
365 for count, p in itr:
365 for count, p in itr:
366 self.count = count
366 self.count = count
367 self.current = p
367 self.current = p
368 yield p
368 yield p
369 p.consume()
369 p.consume()
370 self.current = None
370 self.current = None
371 self.iterator = func()
371 self.iterator = func()
372 return self.iterator
372 return self.iterator
373
373
374 def __exit__(self, type, exc, tb):
374 def __exit__(self, type, exc, tb):
375 if not self.iterator:
375 if not self.iterator:
376 return
376 return
377
377
378 # Only gracefully abort in a normal exception situation. User aborts
378 # Only gracefully abort in a normal exception situation. User aborts
379 # like Ctrl+C throw a KeyboardInterrupt which is not a base Exception,
379 # like Ctrl+C throw a KeyboardInterrupt which is not a base Exception,
380 # and should not gracefully cleanup.
380 # and should not gracefully cleanup.
381 if isinstance(exc, Exception):
381 if isinstance(exc, Exception):
382 # Any exceptions seeking to the end of the bundle at this point are
382 # Any exceptions seeking to the end of the bundle at this point are
383 # almost certainly related to the underlying stream being bad.
383 # almost certainly related to the underlying stream being bad.
384 # And, chances are that the exception we're handling is related to
384 # And, chances are that the exception we're handling is related to
385 # getting in that bad state. So, we swallow the seeking error and
385 # getting in that bad state. So, we swallow the seeking error and
386 # re-raise the original error.
386 # re-raise the original error.
387 seekerror = False
387 seekerror = False
388 try:
388 try:
389 if self.current:
389 if self.current:
390 # consume the part content to not corrupt the stream.
390 # consume the part content to not corrupt the stream.
391 self.current.consume()
391 self.current.consume()
392
392
393 for part in self.iterator:
393 for part in self.iterator:
394 # consume the bundle content
394 # consume the bundle content
395 part.consume()
395 part.consume()
396 except Exception:
396 except Exception:
397 seekerror = True
397 seekerror = True
398
398
399 # Small hack to let caller code distinguish exceptions from bundle2
399 # Small hack to let caller code distinguish exceptions from bundle2
400 # processing from processing the old format. This is mostly needed
400 # processing from processing the old format. This is mostly needed
401 # to handle different return codes to unbundle according to the type
401 # to handle different return codes to unbundle according to the type
402 # of bundle. We should probably clean up or drop this return code
402 # of bundle. We should probably clean up or drop this return code
403 # craziness in a future version.
403 # craziness in a future version.
404 exc.duringunbundle2 = True
404 exc.duringunbundle2 = True
405 salvaged = []
405 salvaged = []
406 replycaps = None
406 replycaps = None
407 if self.op.reply is not None:
407 if self.op.reply is not None:
408 salvaged = self.op.reply.salvageoutput()
408 salvaged = self.op.reply.salvageoutput()
409 replycaps = self.op.reply.capabilities
409 replycaps = self.op.reply.capabilities
410 exc._replycaps = replycaps
410 exc._replycaps = replycaps
411 exc._bundle2salvagedoutput = salvaged
411 exc._bundle2salvagedoutput = salvaged
412
412
413 # Re-raising from a variable loses the original stack. So only use
413 # Re-raising from a variable loses the original stack. So only use
414 # that form if we need to.
414 # that form if we need to.
415 if seekerror:
415 if seekerror:
416 raise exc
416 raise exc
417
417
418 self.repo.ui.debug('bundle2-input-bundle: %i parts total\n' %
418 self.repo.ui.debug('bundle2-input-bundle: %i parts total\n' %
419 self.count)
419 self.count)
420
420
421 def processbundle(repo, unbundler, transactiongetter=None, op=None):
421 def processbundle(repo, unbundler, transactiongetter=None, op=None):
422 """This function process a bundle, apply effect to/from a repo
422 """This function process a bundle, apply effect to/from a repo
423
423
424 It iterates over each part then searches for and uses the proper handling
424 It iterates over each part then searches for and uses the proper handling
425 code to process the part. Parts are processed in order.
425 code to process the part. Parts are processed in order.
426
426
427 Unknown Mandatory part will abort the process.
427 Unknown Mandatory part will abort the process.
428
428
429 It is temporarily possible to provide a prebuilt bundleoperation to the
429 It is temporarily possible to provide a prebuilt bundleoperation to the
430 function. This is used to ensure output is properly propagated in case of
430 function. This is used to ensure output is properly propagated in case of
431 an error during the unbundling. This output capturing part will likely be
431 an error during the unbundling. This output capturing part will likely be
432 reworked and this ability will probably go away in the process.
432 reworked and this ability will probably go away in the process.
433 """
433 """
434 if op is None:
434 if op is None:
435 if transactiongetter is None:
435 if transactiongetter is None:
436 transactiongetter = _notransaction
436 transactiongetter = _notransaction
437 op = bundleoperation(repo, transactiongetter)
437 op = bundleoperation(repo, transactiongetter)
438 # todo:
438 # todo:
439 # - replace this is a init function soon.
439 # - replace this is a init function soon.
440 # - exception catching
440 # - exception catching
441 unbundler.params
441 unbundler.params
442 if repo.ui.debugflag:
442 if repo.ui.debugflag:
443 msg = ['bundle2-input-bundle:']
443 msg = ['bundle2-input-bundle:']
444 if unbundler.params:
444 if unbundler.params:
445 msg.append(' %i params' % len(unbundler.params))
445 msg.append(' %i params' % len(unbundler.params))
446 if op._gettransaction is None or op._gettransaction is _notransaction:
446 if op._gettransaction is None or op._gettransaction is _notransaction:
447 msg.append(' no-transaction')
447 msg.append(' no-transaction')
448 else:
448 else:
449 msg.append(' with-transaction')
449 msg.append(' with-transaction')
450 msg.append('\n')
450 msg.append('\n')
451 repo.ui.debug(''.join(msg))
451 repo.ui.debug(''.join(msg))
452
452
453 processparts(repo, op, unbundler)
453 processparts(repo, op, unbundler)
454
454
455 return op
455 return op
456
456
457 def processparts(repo, op, unbundler):
457 def processparts(repo, op, unbundler):
458 with partiterator(repo, op, unbundler) as parts:
458 with partiterator(repo, op, unbundler) as parts:
459 for part in parts:
459 for part in parts:
460 _processpart(op, part)
460 _processpart(op, part)
461
461
462 def _processchangegroup(op, cg, tr, source, url, **kwargs):
462 def _processchangegroup(op, cg, tr, source, url, **kwargs):
463 ret = cg.apply(op.repo, tr, source, url, **kwargs)
463 ret = cg.apply(op.repo, tr, source, url, **kwargs)
464 op.records.add('changegroup', {
464 op.records.add('changegroup', {
465 'return': ret,
465 'return': ret,
466 })
466 })
467 return ret
467 return ret
468
468
469 def _gethandler(op, part):
469 def _gethandler(op, part):
470 status = 'unknown' # used by debug output
470 status = 'unknown' # used by debug output
471 try:
471 try:
472 handler = parthandlermapping.get(part.type)
472 handler = parthandlermapping.get(part.type)
473 if handler is None:
473 if handler is None:
474 status = 'unsupported-type'
474 status = 'unsupported-type'
475 raise error.BundleUnknownFeatureError(parttype=part.type)
475 raise error.BundleUnknownFeatureError(parttype=part.type)
476 indebug(op.ui, 'found a handler for part %s' % part.type)
476 indebug(op.ui, 'found a handler for part %s' % part.type)
477 unknownparams = part.mandatorykeys - handler.params
477 unknownparams = part.mandatorykeys - handler.params
478 if unknownparams:
478 if unknownparams:
479 unknownparams = list(unknownparams)
479 unknownparams = list(unknownparams)
480 unknownparams.sort()
480 unknownparams.sort()
481 status = 'unsupported-params (%s)' % ', '.join(unknownparams)
481 status = 'unsupported-params (%s)' % ', '.join(unknownparams)
482 raise error.BundleUnknownFeatureError(parttype=part.type,
482 raise error.BundleUnknownFeatureError(parttype=part.type,
483 params=unknownparams)
483 params=unknownparams)
484 status = 'supported'
484 status = 'supported'
485 except error.BundleUnknownFeatureError as exc:
485 except error.BundleUnknownFeatureError as exc:
486 if part.mandatory: # mandatory parts
486 if part.mandatory: # mandatory parts
487 raise
487 raise
488 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
488 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
489 return # skip to part processing
489 return # skip to part processing
490 finally:
490 finally:
491 if op.ui.debugflag:
491 if op.ui.debugflag:
492 msg = ['bundle2-input-part: "%s"' % part.type]
492 msg = ['bundle2-input-part: "%s"' % part.type]
493 if not part.mandatory:
493 if not part.mandatory:
494 msg.append(' (advisory)')
494 msg.append(' (advisory)')
495 nbmp = len(part.mandatorykeys)
495 nbmp = len(part.mandatorykeys)
496 nbap = len(part.params) - nbmp
496 nbap = len(part.params) - nbmp
497 if nbmp or nbap:
497 if nbmp or nbap:
498 msg.append(' (params:')
498 msg.append(' (params:')
499 if nbmp:
499 if nbmp:
500 msg.append(' %i mandatory' % nbmp)
500 msg.append(' %i mandatory' % nbmp)
501 if nbap:
501 if nbap:
502 msg.append(' %i advisory' % nbmp)
502 msg.append(' %i advisory' % nbmp)
503 msg.append(')')
503 msg.append(')')
504 msg.append(' %s\n' % status)
504 msg.append(' %s\n' % status)
505 op.ui.debug(''.join(msg))
505 op.ui.debug(''.join(msg))
506
506
507 return handler
507 return handler
508
508
509 def _processpart(op, part):
509 def _processpart(op, part):
510 """process a single part from a bundle
510 """process a single part from a bundle
511
511
512 The part is guaranteed to have been fully consumed when the function exits
512 The part is guaranteed to have been fully consumed when the function exits
513 (even if an exception is raised)."""
513 (even if an exception is raised)."""
514 handler = _gethandler(op, part)
514 handler = _gethandler(op, part)
515 if handler is None:
515 if handler is None:
516 return
516 return
517
517
518 # handler is called outside the above try block so that we don't
518 # handler is called outside the above try block so that we don't
519 # risk catching KeyErrors from anything other than the
519 # risk catching KeyErrors from anything other than the
520 # parthandlermapping lookup (any KeyError raised by handler()
520 # parthandlermapping lookup (any KeyError raised by handler()
521 # itself represents a defect of a different variety).
521 # itself represents a defect of a different variety).
522 output = None
522 output = None
523 if op.captureoutput and op.reply is not None:
523 if op.captureoutput and op.reply is not None:
524 op.ui.pushbuffer(error=True, subproc=True)
524 op.ui.pushbuffer(error=True, subproc=True)
525 output = ''
525 output = ''
526 try:
526 try:
527 handler(op, part)
527 handler(op, part)
528 finally:
528 finally:
529 if output is not None:
529 if output is not None:
530 output = op.ui.popbuffer()
530 output = op.ui.popbuffer()
531 if output:
531 if output:
532 outpart = op.reply.newpart('output', data=output,
532 outpart = op.reply.newpart('output', data=output,
533 mandatory=False)
533 mandatory=False)
534 outpart.addparam(
534 outpart.addparam(
535 'in-reply-to', pycompat.bytestr(part.id), mandatory=False)
535 'in-reply-to', pycompat.bytestr(part.id), mandatory=False)
536
536
537 def decodecaps(blob):
537 def decodecaps(blob):
538 """decode a bundle2 caps bytes blob into a dictionary
538 """decode a bundle2 caps bytes blob into a dictionary
539
539
540 The blob is a list of capabilities (one per line)
540 The blob is a list of capabilities (one per line)
541 Capabilities may have values using a line of the form::
541 Capabilities may have values using a line of the form::
542
542
543 capability=value1,value2,value3
543 capability=value1,value2,value3
544
544
545 The values are always a list."""
545 The values are always a list."""
546 caps = {}
546 caps = {}
547 for line in blob.splitlines():
547 for line in blob.splitlines():
548 if not line:
548 if not line:
549 continue
549 continue
550 if '=' not in line:
550 if '=' not in line:
551 key, vals = line, ()
551 key, vals = line, ()
552 else:
552 else:
553 key, vals = line.split('=', 1)
553 key, vals = line.split('=', 1)
554 vals = vals.split(',')
554 vals = vals.split(',')
555 key = urlreq.unquote(key)
555 key = urlreq.unquote(key)
556 vals = [urlreq.unquote(v) for v in vals]
556 vals = [urlreq.unquote(v) for v in vals]
557 caps[key] = vals
557 caps[key] = vals
558 return caps
558 return caps
559
559
560 def encodecaps(caps):
560 def encodecaps(caps):
561 """encode a bundle2 caps dictionary into a bytes blob"""
561 """encode a bundle2 caps dictionary into a bytes blob"""
562 chunks = []
562 chunks = []
563 for ca in sorted(caps):
563 for ca in sorted(caps):
564 vals = caps[ca]
564 vals = caps[ca]
565 ca = urlreq.quote(ca)
565 ca = urlreq.quote(ca)
566 vals = [urlreq.quote(v) for v in vals]
566 vals = [urlreq.quote(v) for v in vals]
567 if vals:
567 if vals:
568 ca = "%s=%s" % (ca, ','.join(vals))
568 ca = "%s=%s" % (ca, ','.join(vals))
569 chunks.append(ca)
569 chunks.append(ca)
570 return '\n'.join(chunks)
570 return '\n'.join(chunks)
571
571
572 bundletypes = {
572 bundletypes = {
573 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
573 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
574 # since the unification ssh accepts a header but there
574 # since the unification ssh accepts a header but there
575 # is no capability signaling it.
575 # is no capability signaling it.
576 "HG20": (), # special-cased below
576 "HG20": (), # special-cased below
577 "HG10UN": ("HG10UN", 'UN'),
577 "HG10UN": ("HG10UN", 'UN'),
578 "HG10BZ": ("HG10", 'BZ'),
578 "HG10BZ": ("HG10", 'BZ'),
579 "HG10GZ": ("HG10GZ", 'GZ'),
579 "HG10GZ": ("HG10GZ", 'GZ'),
580 }
580 }
581
581
582 # hgweb uses this list to communicate its preferred type
582 # hgweb uses this list to communicate its preferred type
583 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
583 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
584
584
585 class bundle20(object):
585 class bundle20(object):
586 """represent an outgoing bundle2 container
586 """represent an outgoing bundle2 container
587
587
588 Use the `addparam` method to add stream level parameter. and `newpart` to
588 Use the `addparam` method to add stream level parameter. and `newpart` to
589 populate it. Then call `getchunks` to retrieve all the binary chunks of
589 populate it. Then call `getchunks` to retrieve all the binary chunks of
590 data that compose the bundle2 container."""
590 data that compose the bundle2 container."""
591
591
592 _magicstring = 'HG20'
592 _magicstring = 'HG20'
593
593
594 def __init__(self, ui, capabilities=()):
594 def __init__(self, ui, capabilities=()):
595 self.ui = ui
595 self.ui = ui
596 self._params = []
596 self._params = []
597 self._parts = []
597 self._parts = []
598 self.capabilities = dict(capabilities)
598 self.capabilities = dict(capabilities)
599 self._compengine = util.compengines.forbundletype('UN')
599 self._compengine = util.compengines.forbundletype('UN')
600 self._compopts = None
600 self._compopts = None
601
601
602 def setcompression(self, alg, compopts=None):
602 def setcompression(self, alg, compopts=None):
603 """setup core part compression to <alg>"""
603 """setup core part compression to <alg>"""
604 if alg in (None, 'UN'):
604 if alg in (None, 'UN'):
605 return
605 return
606 assert not any(n.lower() == 'compression' for n, v in self._params)
606 assert not any(n.lower() == 'compression' for n, v in self._params)
607 self.addparam('Compression', alg)
607 self.addparam('Compression', alg)
608 self._compengine = util.compengines.forbundletype(alg)
608 self._compengine = util.compengines.forbundletype(alg)
609 self._compopts = compopts
609 self._compopts = compopts
610
610
611 @property
611 @property
612 def nbparts(self):
612 def nbparts(self):
613 """total number of parts added to the bundler"""
613 """total number of parts added to the bundler"""
614 return len(self._parts)
614 return len(self._parts)
615
615
616 # methods used to defines the bundle2 content
616 # methods used to defines the bundle2 content
617 def addparam(self, name, value=None):
617 def addparam(self, name, value=None):
618 """add a stream level parameter"""
618 """add a stream level parameter"""
619 if not name:
619 if not name:
620 raise ValueError(r'empty parameter name')
620 raise ValueError(r'empty parameter name')
621 if name[0:1] not in pycompat.bytestr(string.ascii_letters):
621 if name[0:1] not in pycompat.bytestr(string.ascii_letters):
622 raise ValueError(r'non letter first character: %s' % name)
622 raise ValueError(r'non letter first character: %s' % name)
623 self._params.append((name, value))
623 self._params.append((name, value))
624
624
625 def addpart(self, part):
625 def addpart(self, part):
626 """add a new part to the bundle2 container
626 """add a new part to the bundle2 container
627
627
628 Parts contains the actual applicative payload."""
628 Parts contains the actual applicative payload."""
629 assert part.id is None
629 assert part.id is None
630 part.id = len(self._parts) # very cheap counter
630 part.id = len(self._parts) # very cheap counter
631 self._parts.append(part)
631 self._parts.append(part)
632
632
633 def newpart(self, typeid, *args, **kwargs):
633 def newpart(self, typeid, *args, **kwargs):
634 """create a new part and add it to the containers
634 """create a new part and add it to the containers
635
635
636 As the part is directly added to the containers. For now, this means
636 As the part is directly added to the containers. For now, this means
637 that any failure to properly initialize the part after calling
637 that any failure to properly initialize the part after calling
638 ``newpart`` should result in a failure of the whole bundling process.
638 ``newpart`` should result in a failure of the whole bundling process.
639
639
640 You can still fall back to manually create and add if you need better
640 You can still fall back to manually create and add if you need better
641 control."""
641 control."""
642 part = bundlepart(typeid, *args, **kwargs)
642 part = bundlepart(typeid, *args, **kwargs)
643 self.addpart(part)
643 self.addpart(part)
644 return part
644 return part
645
645
646 # methods used to generate the bundle2 stream
646 # methods used to generate the bundle2 stream
647 def getchunks(self):
647 def getchunks(self):
648 if self.ui.debugflag:
648 if self.ui.debugflag:
649 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
649 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
650 if self._params:
650 if self._params:
651 msg.append(' (%i params)' % len(self._params))
651 msg.append(' (%i params)' % len(self._params))
652 msg.append(' %i parts total\n' % len(self._parts))
652 msg.append(' %i parts total\n' % len(self._parts))
653 self.ui.debug(''.join(msg))
653 self.ui.debug(''.join(msg))
654 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
654 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
655 yield self._magicstring
655 yield self._magicstring
656 param = self._paramchunk()
656 param = self._paramchunk()
657 outdebug(self.ui, 'bundle parameter: %s' % param)
657 outdebug(self.ui, 'bundle parameter: %s' % param)
658 yield _pack(_fstreamparamsize, len(param))
658 yield _pack(_fstreamparamsize, len(param))
659 if param:
659 if param:
660 yield param
660 yield param
661 for chunk in self._compengine.compressstream(self._getcorechunk(),
661 for chunk in self._compengine.compressstream(self._getcorechunk(),
662 self._compopts):
662 self._compopts):
663 yield chunk
663 yield chunk
664
664
665 def _paramchunk(self):
665 def _paramchunk(self):
666 """return a encoded version of all stream parameters"""
666 """return a encoded version of all stream parameters"""
667 blocks = []
667 blocks = []
668 for par, value in self._params:
668 for par, value in self._params:
669 par = urlreq.quote(par)
669 par = urlreq.quote(par)
670 if value is not None:
670 if value is not None:
671 value = urlreq.quote(value)
671 value = urlreq.quote(value)
672 par = '%s=%s' % (par, value)
672 par = '%s=%s' % (par, value)
673 blocks.append(par)
673 blocks.append(par)
674 return ' '.join(blocks)
674 return ' '.join(blocks)
675
675
676 def _getcorechunk(self):
676 def _getcorechunk(self):
677 """yield chunk for the core part of the bundle
677 """yield chunk for the core part of the bundle
678
678
679 (all but headers and parameters)"""
679 (all but headers and parameters)"""
680 outdebug(self.ui, 'start of parts')
680 outdebug(self.ui, 'start of parts')
681 for part in self._parts:
681 for part in self._parts:
682 outdebug(self.ui, 'bundle part: "%s"' % part.type)
682 outdebug(self.ui, 'bundle part: "%s"' % part.type)
683 for chunk in part.getchunks(ui=self.ui):
683 for chunk in part.getchunks(ui=self.ui):
684 yield chunk
684 yield chunk
685 outdebug(self.ui, 'end of bundle')
685 outdebug(self.ui, 'end of bundle')
686 yield _pack(_fpartheadersize, 0)
686 yield _pack(_fpartheadersize, 0)
687
687
688
688
689 def salvageoutput(self):
689 def salvageoutput(self):
690 """return a list with a copy of all output parts in the bundle
690 """return a list with a copy of all output parts in the bundle
691
691
692 This is meant to be used during error handling to make sure we preserve
692 This is meant to be used during error handling to make sure we preserve
693 server output"""
693 server output"""
694 salvaged = []
694 salvaged = []
695 for part in self._parts:
695 for part in self._parts:
696 if part.type.startswith('output'):
696 if part.type.startswith('output'):
697 salvaged.append(part.copy())
697 salvaged.append(part.copy())
698 return salvaged
698 return salvaged
699
699
700
700
701 class unpackermixin(object):
701 class unpackermixin(object):
702 """A mixin to extract bytes and struct data from a stream"""
702 """A mixin to extract bytes and struct data from a stream"""
703
703
704 def __init__(self, fp):
704 def __init__(self, fp):
705 self._fp = fp
705 self._fp = fp
706
706
707 def _unpack(self, format):
707 def _unpack(self, format):
708 """unpack this struct format from the stream
708 """unpack this struct format from the stream
709
709
710 This method is meant for internal usage by the bundle2 protocol only.
710 This method is meant for internal usage by the bundle2 protocol only.
711 They directly manipulate the low level stream including bundle2 level
711 They directly manipulate the low level stream including bundle2 level
712 instruction.
712 instruction.
713
713
714 Do not use it to implement higher-level logic or methods."""
714 Do not use it to implement higher-level logic or methods."""
715 data = self._readexact(struct.calcsize(format))
715 data = self._readexact(struct.calcsize(format))
716 return _unpack(format, data)
716 return _unpack(format, data)
717
717
718 def _readexact(self, size):
718 def _readexact(self, size):
719 """read exactly <size> bytes from the stream
719 """read exactly <size> bytes from the stream
720
720
721 This method is meant for internal usage by the bundle2 protocol only.
721 This method is meant for internal usage by the bundle2 protocol only.
722 They directly manipulate the low level stream including bundle2 level
722 They directly manipulate the low level stream including bundle2 level
723 instruction.
723 instruction.
724
724
725 Do not use it to implement higher-level logic or methods."""
725 Do not use it to implement higher-level logic or methods."""
726 return changegroup.readexactly(self._fp, size)
726 return changegroup.readexactly(self._fp, size)
727
727
728 def getunbundler(ui, fp, magicstring=None):
728 def getunbundler(ui, fp, magicstring=None):
729 """return a valid unbundler object for a given magicstring"""
729 """return a valid unbundler object for a given magicstring"""
730 if magicstring is None:
730 if magicstring is None:
731 magicstring = changegroup.readexactly(fp, 4)
731 magicstring = changegroup.readexactly(fp, 4)
732 magic, version = magicstring[0:2], magicstring[2:4]
732 magic, version = magicstring[0:2], magicstring[2:4]
733 if magic != 'HG':
733 if magic != 'HG':
734 ui.debug(
734 ui.debug(
735 "error: invalid magic: %r (version %r), should be 'HG'\n"
735 "error: invalid magic: %r (version %r), should be 'HG'\n"
736 % (magic, version))
736 % (magic, version))
737 raise error.Abort(_('not a Mercurial bundle'))
737 raise error.Abort(_('not a Mercurial bundle'))
738 unbundlerclass = formatmap.get(version)
738 unbundlerclass = formatmap.get(version)
739 if unbundlerclass is None:
739 if unbundlerclass is None:
740 raise error.Abort(_('unknown bundle version %s') % version)
740 raise error.Abort(_('unknown bundle version %s') % version)
741 unbundler = unbundlerclass(ui, fp)
741 unbundler = unbundlerclass(ui, fp)
742 indebug(ui, 'start processing of %s stream' % magicstring)
742 indebug(ui, 'start processing of %s stream' % magicstring)
743 return unbundler
743 return unbundler
744
744
745 class unbundle20(unpackermixin):
745 class unbundle20(unpackermixin):
746 """interpret a bundle2 stream
746 """interpret a bundle2 stream
747
747
748 This class is fed with a binary stream and yields parts through its
748 This class is fed with a binary stream and yields parts through its
749 `iterparts` methods."""
749 `iterparts` methods."""
750
750
751 _magicstring = 'HG20'
751 _magicstring = 'HG20'
752
752
753 def __init__(self, ui, fp):
753 def __init__(self, ui, fp):
754 """If header is specified, we do not read it out of the stream."""
754 """If header is specified, we do not read it out of the stream."""
755 self.ui = ui
755 self.ui = ui
756 self._compengine = util.compengines.forbundletype('UN')
756 self._compengine = util.compengines.forbundletype('UN')
757 self._compressed = None
757 self._compressed = None
758 super(unbundle20, self).__init__(fp)
758 super(unbundle20, self).__init__(fp)
759
759
760 @util.propertycache
760 @util.propertycache
761 def params(self):
761 def params(self):
762 """dictionary of stream level parameters"""
762 """dictionary of stream level parameters"""
763 indebug(self.ui, 'reading bundle2 stream parameters')
763 indebug(self.ui, 'reading bundle2 stream parameters')
764 params = {}
764 params = {}
765 paramssize = self._unpack(_fstreamparamsize)[0]
765 paramssize = self._unpack(_fstreamparamsize)[0]
766 if paramssize < 0:
766 if paramssize < 0:
767 raise error.BundleValueError('negative bundle param size: %i'
767 raise error.BundleValueError('negative bundle param size: %i'
768 % paramssize)
768 % paramssize)
769 if paramssize:
769 if paramssize:
770 params = self._readexact(paramssize)
770 params = self._readexact(paramssize)
771 params = self._processallparams(params)
771 params = self._processallparams(params)
772 return params
772 return params
773
773
774 def _processallparams(self, paramsblock):
774 def _processallparams(self, paramsblock):
775 """"""
775 """"""
776 params = util.sortdict()
776 params = util.sortdict()
777 for p in paramsblock.split(' '):
777 for p in paramsblock.split(' '):
778 p = p.split('=', 1)
778 p = p.split('=', 1)
779 p = [urlreq.unquote(i) for i in p]
779 p = [urlreq.unquote(i) for i in p]
780 if len(p) < 2:
780 if len(p) < 2:
781 p.append(None)
781 p.append(None)
782 self._processparam(*p)
782 self._processparam(*p)
783 params[p[0]] = p[1]
783 params[p[0]] = p[1]
784 return params
784 return params
785
785
786
786
787 def _processparam(self, name, value):
787 def _processparam(self, name, value):
788 """process a parameter, applying its effect if needed
788 """process a parameter, applying its effect if needed
789
789
790 Parameter starting with a lower case letter are advisory and will be
790 Parameter starting with a lower case letter are advisory and will be
791 ignored when unknown. Those starting with an upper case letter are
791 ignored when unknown. Those starting with an upper case letter are
792 mandatory and will this function will raise a KeyError when unknown.
792 mandatory and will this function will raise a KeyError when unknown.
793
793
794 Note: no option are currently supported. Any input will be either
794 Note: no option are currently supported. Any input will be either
795 ignored or failing.
795 ignored or failing.
796 """
796 """
797 if not name:
797 if not name:
798 raise ValueError(r'empty parameter name')
798 raise ValueError(r'empty parameter name')
799 if name[0:1] not in pycompat.bytestr(string.ascii_letters):
799 if name[0:1] not in pycompat.bytestr(string.ascii_letters):
800 raise ValueError(r'non letter first character: %s' % name)
800 raise ValueError(r'non letter first character: %s' % name)
801 try:
801 try:
802 handler = b2streamparamsmap[name.lower()]
802 handler = b2streamparamsmap[name.lower()]
803 except KeyError:
803 except KeyError:
804 if name[0:1].islower():
804 if name[0:1].islower():
805 indebug(self.ui, "ignoring unknown parameter %s" % name)
805 indebug(self.ui, "ignoring unknown parameter %s" % name)
806 else:
806 else:
807 raise error.BundleUnknownFeatureError(params=(name,))
807 raise error.BundleUnknownFeatureError(params=(name,))
808 else:
808 else:
809 handler(self, name, value)
809 handler(self, name, value)
810
810
811 def _forwardchunks(self):
811 def _forwardchunks(self):
812 """utility to transfer a bundle2 as binary
812 """utility to transfer a bundle2 as binary
813
813
814 This is made necessary by the fact the 'getbundle' command over 'ssh'
814 This is made necessary by the fact the 'getbundle' command over 'ssh'
815 have no way to know then the reply end, relying on the bundle to be
815 have no way to know then the reply end, relying on the bundle to be
816 interpreted to know its end. This is terrible and we are sorry, but we
816 interpreted to know its end. This is terrible and we are sorry, but we
817 needed to move forward to get general delta enabled.
817 needed to move forward to get general delta enabled.
818 """
818 """
819 yield self._magicstring
819 yield self._magicstring
820 assert 'params' not in vars(self)
820 assert 'params' not in vars(self)
821 paramssize = self._unpack(_fstreamparamsize)[0]
821 paramssize = self._unpack(_fstreamparamsize)[0]
822 if paramssize < 0:
822 if paramssize < 0:
823 raise error.BundleValueError('negative bundle param size: %i'
823 raise error.BundleValueError('negative bundle param size: %i'
824 % paramssize)
824 % paramssize)
825 yield _pack(_fstreamparamsize, paramssize)
825 yield _pack(_fstreamparamsize, paramssize)
826 if paramssize:
826 if paramssize:
827 params = self._readexact(paramssize)
827 params = self._readexact(paramssize)
828 self._processallparams(params)
828 self._processallparams(params)
829 yield params
829 yield params
830 assert self._compengine.bundletype == 'UN'
830 assert self._compengine.bundletype == 'UN'
831 # From there, payload might need to be decompressed
831 # From there, payload might need to be decompressed
832 self._fp = self._compengine.decompressorreader(self._fp)
832 self._fp = self._compengine.decompressorreader(self._fp)
833 emptycount = 0
833 emptycount = 0
834 while emptycount < 2:
834 while emptycount < 2:
835 # so we can brainlessly loop
835 # so we can brainlessly loop
836 assert _fpartheadersize == _fpayloadsize
836 assert _fpartheadersize == _fpayloadsize
837 size = self._unpack(_fpartheadersize)[0]
837 size = self._unpack(_fpartheadersize)[0]
838 yield _pack(_fpartheadersize, size)
838 yield _pack(_fpartheadersize, size)
839 if size:
839 if size:
840 emptycount = 0
840 emptycount = 0
841 else:
841 else:
842 emptycount += 1
842 emptycount += 1
843 continue
843 continue
844 if size == flaginterrupt:
844 if size == flaginterrupt:
845 continue
845 continue
846 elif size < 0:
846 elif size < 0:
847 raise error.BundleValueError('negative chunk size: %i')
847 raise error.BundleValueError('negative chunk size: %i')
848 yield self._readexact(size)
848 yield self._readexact(size)
849
849
850
850
851 def iterparts(self, seekable=False):
851 def iterparts(self, seekable=False):
852 """yield all parts contained in the stream"""
852 """yield all parts contained in the stream"""
853 cls = seekableunbundlepart if seekable else unbundlepart
853 cls = seekableunbundlepart if seekable else unbundlepart
854 # make sure param have been loaded
854 # make sure param have been loaded
855 self.params
855 self.params
856 # From there, payload need to be decompressed
856 # From there, payload need to be decompressed
857 self._fp = self._compengine.decompressorreader(self._fp)
857 self._fp = self._compengine.decompressorreader(self._fp)
858 indebug(self.ui, 'start extraction of bundle2 parts')
858 indebug(self.ui, 'start extraction of bundle2 parts')
859 headerblock = self._readpartheader()
859 headerblock = self._readpartheader()
860 while headerblock is not None:
860 while headerblock is not None:
861 part = cls(self.ui, headerblock, self._fp)
861 part = cls(self.ui, headerblock, self._fp)
862 yield part
862 yield part
863 # Ensure part is fully consumed so we can start reading the next
863 # Ensure part is fully consumed so we can start reading the next
864 # part.
864 # part.
865 part.consume()
865 part.consume()
866
866
867 headerblock = self._readpartheader()
867 headerblock = self._readpartheader()
868 indebug(self.ui, 'end of bundle2 stream')
868 indebug(self.ui, 'end of bundle2 stream')
869
869
870 def _readpartheader(self):
870 def _readpartheader(self):
871 """reads a part header size and return the bytes blob
871 """reads a part header size and return the bytes blob
872
872
873 returns None if empty"""
873 returns None if empty"""
874 headersize = self._unpack(_fpartheadersize)[0]
874 headersize = self._unpack(_fpartheadersize)[0]
875 if headersize < 0:
875 if headersize < 0:
876 raise error.BundleValueError('negative part header size: %i'
876 raise error.BundleValueError('negative part header size: %i'
877 % headersize)
877 % headersize)
878 indebug(self.ui, 'part header size: %i' % headersize)
878 indebug(self.ui, 'part header size: %i' % headersize)
879 if headersize:
879 if headersize:
880 return self._readexact(headersize)
880 return self._readexact(headersize)
881 return None
881 return None
882
882
883 def compressed(self):
883 def compressed(self):
884 self.params # load params
884 self.params # load params
885 return self._compressed
885 return self._compressed
886
886
887 def close(self):
887 def close(self):
888 """close underlying file"""
888 """close underlying file"""
889 if util.safehasattr(self._fp, 'close'):
889 if util.safehasattr(self._fp, 'close'):
890 return self._fp.close()
890 return self._fp.close()
891
891
892 formatmap = {'20': unbundle20}
892 formatmap = {'20': unbundle20}
893
893
894 b2streamparamsmap = {}
894 b2streamparamsmap = {}
895
895
896 def b2streamparamhandler(name):
896 def b2streamparamhandler(name):
897 """register a handler for a stream level parameter"""
897 """register a handler for a stream level parameter"""
898 def decorator(func):
898 def decorator(func):
899 assert name not in formatmap
899 assert name not in formatmap
900 b2streamparamsmap[name] = func
900 b2streamparamsmap[name] = func
901 return func
901 return func
902 return decorator
902 return decorator
903
903
904 @b2streamparamhandler('compression')
904 @b2streamparamhandler('compression')
905 def processcompression(unbundler, param, value):
905 def processcompression(unbundler, param, value):
906 """read compression parameter and install payload decompression"""
906 """read compression parameter and install payload decompression"""
907 if value not in util.compengines.supportedbundletypes:
907 if value not in util.compengines.supportedbundletypes:
908 raise error.BundleUnknownFeatureError(params=(param,),
908 raise error.BundleUnknownFeatureError(params=(param,),
909 values=(value,))
909 values=(value,))
910 unbundler._compengine = util.compengines.forbundletype(value)
910 unbundler._compengine = util.compengines.forbundletype(value)
911 if value is not None:
911 if value is not None:
912 unbundler._compressed = True
912 unbundler._compressed = True
913
913
914 class bundlepart(object):
914 class bundlepart(object):
915 """A bundle2 part contains application level payload
915 """A bundle2 part contains application level payload
916
916
917 The part `type` is used to route the part to the application level
917 The part `type` is used to route the part to the application level
918 handler.
918 handler.
919
919
920 The part payload is contained in ``part.data``. It could be raw bytes or a
920 The part payload is contained in ``part.data``. It could be raw bytes or a
921 generator of byte chunks.
921 generator of byte chunks.
922
922
923 You can add parameters to the part using the ``addparam`` method.
923 You can add parameters to the part using the ``addparam`` method.
924 Parameters can be either mandatory (default) or advisory. Remote side
924 Parameters can be either mandatory (default) or advisory. Remote side
925 should be able to safely ignore the advisory ones.
925 should be able to safely ignore the advisory ones.
926
926
927 Both data and parameters cannot be modified after the generation has begun.
927 Both data and parameters cannot be modified after the generation has begun.
928 """
928 """
929
929
930 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
930 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
931 data='', mandatory=True):
931 data='', mandatory=True):
932 validateparttype(parttype)
932 validateparttype(parttype)
933 self.id = None
933 self.id = None
934 self.type = parttype
934 self.type = parttype
935 self._data = data
935 self._data = data
936 self._mandatoryparams = list(mandatoryparams)
936 self._mandatoryparams = list(mandatoryparams)
937 self._advisoryparams = list(advisoryparams)
937 self._advisoryparams = list(advisoryparams)
938 # checking for duplicated entries
938 # checking for duplicated entries
939 self._seenparams = set()
939 self._seenparams = set()
940 for pname, __ in self._mandatoryparams + self._advisoryparams:
940 for pname, __ in self._mandatoryparams + self._advisoryparams:
941 if pname in self._seenparams:
941 if pname in self._seenparams:
942 raise error.ProgrammingError('duplicated params: %s' % pname)
942 raise error.ProgrammingError('duplicated params: %s' % pname)
943 self._seenparams.add(pname)
943 self._seenparams.add(pname)
944 # status of the part's generation:
944 # status of the part's generation:
945 # - None: not started,
945 # - None: not started,
946 # - False: currently generated,
946 # - False: currently generated,
947 # - True: generation done.
947 # - True: generation done.
948 self._generated = None
948 self._generated = None
949 self.mandatory = mandatory
949 self.mandatory = mandatory
950
950
951 def __repr__(self):
951 def __repr__(self):
952 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
952 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
953 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
953 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
954 % (cls, id(self), self.id, self.type, self.mandatory))
954 % (cls, id(self), self.id, self.type, self.mandatory))
955
955
956 def copy(self):
956 def copy(self):
957 """return a copy of the part
957 """return a copy of the part
958
958
959 The new part have the very same content but no partid assigned yet.
959 The new part have the very same content but no partid assigned yet.
960 Parts with generated data cannot be copied."""
960 Parts with generated data cannot be copied."""
961 assert not util.safehasattr(self.data, 'next')
961 assert not util.safehasattr(self.data, 'next')
962 return self.__class__(self.type, self._mandatoryparams,
962 return self.__class__(self.type, self._mandatoryparams,
963 self._advisoryparams, self._data, self.mandatory)
963 self._advisoryparams, self._data, self.mandatory)
964
964
965 # methods used to defines the part content
965 # methods used to defines the part content
966 @property
966 @property
967 def data(self):
967 def data(self):
968 return self._data
968 return self._data
969
969
970 @data.setter
970 @data.setter
971 def data(self, data):
971 def data(self, data):
972 if self._generated is not None:
972 if self._generated is not None:
973 raise error.ReadOnlyPartError('part is being generated')
973 raise error.ReadOnlyPartError('part is being generated')
974 self._data = data
974 self._data = data
975
975
976 @property
976 @property
977 def mandatoryparams(self):
977 def mandatoryparams(self):
978 # make it an immutable tuple to force people through ``addparam``
978 # make it an immutable tuple to force people through ``addparam``
979 return tuple(self._mandatoryparams)
979 return tuple(self._mandatoryparams)
980
980
981 @property
981 @property
982 def advisoryparams(self):
982 def advisoryparams(self):
983 # make it an immutable tuple to force people through ``addparam``
983 # make it an immutable tuple to force people through ``addparam``
984 return tuple(self._advisoryparams)
984 return tuple(self._advisoryparams)
985
985
986 def addparam(self, name, value='', mandatory=True):
986 def addparam(self, name, value='', mandatory=True):
987 """add a parameter to the part
987 """add a parameter to the part
988
988
989 If 'mandatory' is set to True, the remote handler must claim support
989 If 'mandatory' is set to True, the remote handler must claim support
990 for this parameter or the unbundling will be aborted.
990 for this parameter or the unbundling will be aborted.
991
991
992 The 'name' and 'value' cannot exceed 255 bytes each.
992 The 'name' and 'value' cannot exceed 255 bytes each.
993 """
993 """
994 if self._generated is not None:
994 if self._generated is not None:
995 raise error.ReadOnlyPartError('part is being generated')
995 raise error.ReadOnlyPartError('part is being generated')
996 if name in self._seenparams:
996 if name in self._seenparams:
997 raise ValueError('duplicated params: %s' % name)
997 raise ValueError('duplicated params: %s' % name)
998 self._seenparams.add(name)
998 self._seenparams.add(name)
999 params = self._advisoryparams
999 params = self._advisoryparams
1000 if mandatory:
1000 if mandatory:
1001 params = self._mandatoryparams
1001 params = self._mandatoryparams
1002 params.append((name, value))
1002 params.append((name, value))
1003
1003
1004 # methods used to generates the bundle2 stream
1004 # methods used to generates the bundle2 stream
1005 def getchunks(self, ui):
1005 def getchunks(self, ui):
1006 if self._generated is not None:
1006 if self._generated is not None:
1007 raise error.ProgrammingError('part can only be consumed once')
1007 raise error.ProgrammingError('part can only be consumed once')
1008 self._generated = False
1008 self._generated = False
1009
1009
1010 if ui.debugflag:
1010 if ui.debugflag:
1011 msg = ['bundle2-output-part: "%s"' % self.type]
1011 msg = ['bundle2-output-part: "%s"' % self.type]
1012 if not self.mandatory:
1012 if not self.mandatory:
1013 msg.append(' (advisory)')
1013 msg.append(' (advisory)')
1014 nbmp = len(self.mandatoryparams)
1014 nbmp = len(self.mandatoryparams)
1015 nbap = len(self.advisoryparams)
1015 nbap = len(self.advisoryparams)
1016 if nbmp or nbap:
1016 if nbmp or nbap:
1017 msg.append(' (params:')
1017 msg.append(' (params:')
1018 if nbmp:
1018 if nbmp:
1019 msg.append(' %i mandatory' % nbmp)
1019 msg.append(' %i mandatory' % nbmp)
1020 if nbap:
1020 if nbap:
1021 msg.append(' %i advisory' % nbmp)
1021 msg.append(' %i advisory' % nbmp)
1022 msg.append(')')
1022 msg.append(')')
1023 if not self.data:
1023 if not self.data:
1024 msg.append(' empty payload')
1024 msg.append(' empty payload')
1025 elif (util.safehasattr(self.data, 'next')
1025 elif (util.safehasattr(self.data, 'next')
1026 or util.safehasattr(self.data, '__next__')):
1026 or util.safehasattr(self.data, '__next__')):
1027 msg.append(' streamed payload')
1027 msg.append(' streamed payload')
1028 else:
1028 else:
1029 msg.append(' %i bytes payload' % len(self.data))
1029 msg.append(' %i bytes payload' % len(self.data))
1030 msg.append('\n')
1030 msg.append('\n')
1031 ui.debug(''.join(msg))
1031 ui.debug(''.join(msg))
1032
1032
1033 #### header
1033 #### header
1034 if self.mandatory:
1034 if self.mandatory:
1035 parttype = self.type.upper()
1035 parttype = self.type.upper()
1036 else:
1036 else:
1037 parttype = self.type.lower()
1037 parttype = self.type.lower()
1038 outdebug(ui, 'part %s: "%s"' % (pycompat.bytestr(self.id), parttype))
1038 outdebug(ui, 'part %s: "%s"' % (pycompat.bytestr(self.id), parttype))
1039 ## parttype
1039 ## parttype
1040 header = [_pack(_fparttypesize, len(parttype)),
1040 header = [_pack(_fparttypesize, len(parttype)),
1041 parttype, _pack(_fpartid, self.id),
1041 parttype, _pack(_fpartid, self.id),
1042 ]
1042 ]
1043 ## parameters
1043 ## parameters
1044 # count
1044 # count
1045 manpar = self.mandatoryparams
1045 manpar = self.mandatoryparams
1046 advpar = self.advisoryparams
1046 advpar = self.advisoryparams
1047 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
1047 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
1048 # size
1048 # size
1049 parsizes = []
1049 parsizes = []
1050 for key, value in manpar:
1050 for key, value in manpar:
1051 parsizes.append(len(key))
1051 parsizes.append(len(key))
1052 parsizes.append(len(value))
1052 parsizes.append(len(value))
1053 for key, value in advpar:
1053 for key, value in advpar:
1054 parsizes.append(len(key))
1054 parsizes.append(len(key))
1055 parsizes.append(len(value))
1055 parsizes.append(len(value))
1056 paramsizes = _pack(_makefpartparamsizes(len(parsizes) // 2), *parsizes)
1056 paramsizes = _pack(_makefpartparamsizes(len(parsizes) // 2), *parsizes)
1057 header.append(paramsizes)
1057 header.append(paramsizes)
1058 # key, value
1058 # key, value
1059 for key, value in manpar:
1059 for key, value in manpar:
1060 header.append(key)
1060 header.append(key)
1061 header.append(value)
1061 header.append(value)
1062 for key, value in advpar:
1062 for key, value in advpar:
1063 header.append(key)
1063 header.append(key)
1064 header.append(value)
1064 header.append(value)
1065 ## finalize header
1065 ## finalize header
1066 try:
1066 try:
1067 headerchunk = ''.join(header)
1067 headerchunk = ''.join(header)
1068 except TypeError:
1068 except TypeError:
1069 raise TypeError(r'Found a non-bytes trying to '
1069 raise TypeError(r'Found a non-bytes trying to '
1070 r'build bundle part header: %r' % header)
1070 r'build bundle part header: %r' % header)
1071 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
1071 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
1072 yield _pack(_fpartheadersize, len(headerchunk))
1072 yield _pack(_fpartheadersize, len(headerchunk))
1073 yield headerchunk
1073 yield headerchunk
1074 ## payload
1074 ## payload
1075 try:
1075 try:
1076 for chunk in self._payloadchunks():
1076 for chunk in self._payloadchunks():
1077 outdebug(ui, 'payload chunk size: %i' % len(chunk))
1077 outdebug(ui, 'payload chunk size: %i' % len(chunk))
1078 yield _pack(_fpayloadsize, len(chunk))
1078 yield _pack(_fpayloadsize, len(chunk))
1079 yield chunk
1079 yield chunk
1080 except GeneratorExit:
1080 except GeneratorExit:
1081 # GeneratorExit means that nobody is listening for our
1081 # GeneratorExit means that nobody is listening for our
1082 # results anyway, so just bail quickly rather than trying
1082 # results anyway, so just bail quickly rather than trying
1083 # to produce an error part.
1083 # to produce an error part.
1084 ui.debug('bundle2-generatorexit\n')
1084 ui.debug('bundle2-generatorexit\n')
1085 raise
1085 raise
1086 except BaseException as exc:
1086 except BaseException as exc:
1087 bexc = util.forcebytestr(exc)
1087 bexc = util.forcebytestr(exc)
1088 # backup exception data for later
1088 # backup exception data for later
1089 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
1089 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
1090 % bexc)
1090 % bexc)
1091 tb = sys.exc_info()[2]
1091 tb = sys.exc_info()[2]
1092 msg = 'unexpected error: %s' % bexc
1092 msg = 'unexpected error: %s' % bexc
1093 interpart = bundlepart('error:abort', [('message', msg)],
1093 interpart = bundlepart('error:abort', [('message', msg)],
1094 mandatory=False)
1094 mandatory=False)
1095 interpart.id = 0
1095 interpart.id = 0
1096 yield _pack(_fpayloadsize, -1)
1096 yield _pack(_fpayloadsize, -1)
1097 for chunk in interpart.getchunks(ui=ui):
1097 for chunk in interpart.getchunks(ui=ui):
1098 yield chunk
1098 yield chunk
1099 outdebug(ui, 'closing payload chunk')
1099 outdebug(ui, 'closing payload chunk')
1100 # abort current part payload
1100 # abort current part payload
1101 yield _pack(_fpayloadsize, 0)
1101 yield _pack(_fpayloadsize, 0)
1102 pycompat.raisewithtb(exc, tb)
1102 pycompat.raisewithtb(exc, tb)
1103 # end of payload
1103 # end of payload
1104 outdebug(ui, 'closing payload chunk')
1104 outdebug(ui, 'closing payload chunk')
1105 yield _pack(_fpayloadsize, 0)
1105 yield _pack(_fpayloadsize, 0)
1106 self._generated = True
1106 self._generated = True
1107
1107
1108 def _payloadchunks(self):
1108 def _payloadchunks(self):
1109 """yield chunks of a the part payload
1109 """yield chunks of a the part payload
1110
1110
1111 Exists to handle the different methods to provide data to a part."""
1111 Exists to handle the different methods to provide data to a part."""
1112 # we only support fixed size data now.
1112 # we only support fixed size data now.
1113 # This will be improved in the future.
1113 # This will be improved in the future.
1114 if (util.safehasattr(self.data, 'next')
1114 if (util.safehasattr(self.data, 'next')
1115 or util.safehasattr(self.data, '__next__')):
1115 or util.safehasattr(self.data, '__next__')):
1116 buff = util.chunkbuffer(self.data)
1116 buff = util.chunkbuffer(self.data)
1117 chunk = buff.read(preferedchunksize)
1117 chunk = buff.read(preferedchunksize)
1118 while chunk:
1118 while chunk:
1119 yield chunk
1119 yield chunk
1120 chunk = buff.read(preferedchunksize)
1120 chunk = buff.read(preferedchunksize)
1121 elif len(self.data):
1121 elif len(self.data):
1122 yield self.data
1122 yield self.data
1123
1123
1124
1124
1125 flaginterrupt = -1
1125 flaginterrupt = -1
1126
1126
1127 class interrupthandler(unpackermixin):
1127 class interrupthandler(unpackermixin):
1128 """read one part and process it with restricted capability
1128 """read one part and process it with restricted capability
1129
1129
1130 This allows to transmit exception raised on the producer size during part
1130 This allows to transmit exception raised on the producer size during part
1131 iteration while the consumer is reading a part.
1131 iteration while the consumer is reading a part.
1132
1132
1133 Part processed in this manner only have access to a ui object,"""
1133 Part processed in this manner only have access to a ui object,"""
1134
1134
1135 def __init__(self, ui, fp):
1135 def __init__(self, ui, fp):
1136 super(interrupthandler, self).__init__(fp)
1136 super(interrupthandler, self).__init__(fp)
1137 self.ui = ui
1137 self.ui = ui
1138
1138
1139 def _readpartheader(self):
1139 def _readpartheader(self):
1140 """reads a part header size and return the bytes blob
1140 """reads a part header size and return the bytes blob
1141
1141
1142 returns None if empty"""
1142 returns None if empty"""
1143 headersize = self._unpack(_fpartheadersize)[0]
1143 headersize = self._unpack(_fpartheadersize)[0]
1144 if headersize < 0:
1144 if headersize < 0:
1145 raise error.BundleValueError('negative part header size: %i'
1145 raise error.BundleValueError('negative part header size: %i'
1146 % headersize)
1146 % headersize)
1147 indebug(self.ui, 'part header size: %i\n' % headersize)
1147 indebug(self.ui, 'part header size: %i\n' % headersize)
1148 if headersize:
1148 if headersize:
1149 return self._readexact(headersize)
1149 return self._readexact(headersize)
1150 return None
1150 return None
1151
1151
1152 def __call__(self):
1152 def __call__(self):
1153
1153
1154 self.ui.debug('bundle2-input-stream-interrupt:'
1154 self.ui.debug('bundle2-input-stream-interrupt:'
1155 ' opening out of band context\n')
1155 ' opening out of band context\n')
1156 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1156 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1157 headerblock = self._readpartheader()
1157 headerblock = self._readpartheader()
1158 if headerblock is None:
1158 if headerblock is None:
1159 indebug(self.ui, 'no part found during interruption.')
1159 indebug(self.ui, 'no part found during interruption.')
1160 return
1160 return
1161 part = unbundlepart(self.ui, headerblock, self._fp)
1161 part = unbundlepart(self.ui, headerblock, self._fp)
1162 op = interruptoperation(self.ui)
1162 op = interruptoperation(self.ui)
1163 hardabort = False
1163 hardabort = False
1164 try:
1164 try:
1165 _processpart(op, part)
1165 _processpart(op, part)
1166 except (SystemExit, KeyboardInterrupt):
1166 except (SystemExit, KeyboardInterrupt):
1167 hardabort = True
1167 hardabort = True
1168 raise
1168 raise
1169 finally:
1169 finally:
1170 if not hardabort:
1170 if not hardabort:
1171 part.consume()
1171 part.consume()
1172 self.ui.debug('bundle2-input-stream-interrupt:'
1172 self.ui.debug('bundle2-input-stream-interrupt:'
1173 ' closing out of band context\n')
1173 ' closing out of band context\n')
1174
1174
1175 class interruptoperation(object):
1175 class interruptoperation(object):
1176 """A limited operation to be use by part handler during interruption
1176 """A limited operation to be use by part handler during interruption
1177
1177
1178 It only have access to an ui object.
1178 It only have access to an ui object.
1179 """
1179 """
1180
1180
1181 def __init__(self, ui):
1181 def __init__(self, ui):
1182 self.ui = ui
1182 self.ui = ui
1183 self.reply = None
1183 self.reply = None
1184 self.captureoutput = False
1184 self.captureoutput = False
1185
1185
1186 @property
1186 @property
1187 def repo(self):
1187 def repo(self):
1188 raise error.ProgrammingError('no repo access from stream interruption')
1188 raise error.ProgrammingError('no repo access from stream interruption')
1189
1189
1190 def gettransaction(self):
1190 def gettransaction(self):
1191 raise TransactionUnavailable('no repo access from stream interruption')
1191 raise TransactionUnavailable('no repo access from stream interruption')
1192
1192
1193 def decodepayloadchunks(ui, fh):
1193 def decodepayloadchunks(ui, fh):
1194 """Reads bundle2 part payload data into chunks.
1194 """Reads bundle2 part payload data into chunks.
1195
1195
1196 Part payload data consists of framed chunks. This function takes
1196 Part payload data consists of framed chunks. This function takes
1197 a file handle and emits those chunks.
1197 a file handle and emits those chunks.
1198 """
1198 """
1199 dolog = ui.configbool('devel', 'bundle2.debug')
1199 dolog = ui.configbool('devel', 'bundle2.debug')
1200 debug = ui.debug
1200 debug = ui.debug
1201
1201
1202 headerstruct = struct.Struct(_fpayloadsize)
1202 headerstruct = struct.Struct(_fpayloadsize)
1203 headersize = headerstruct.size
1203 headersize = headerstruct.size
1204 unpack = headerstruct.unpack
1204 unpack = headerstruct.unpack
1205
1205
1206 readexactly = changegroup.readexactly
1206 readexactly = changegroup.readexactly
1207 read = fh.read
1207 read = fh.read
1208
1208
1209 chunksize = unpack(readexactly(fh, headersize))[0]
1209 chunksize = unpack(readexactly(fh, headersize))[0]
1210 indebug(ui, 'payload chunk size: %i' % chunksize)
1210 indebug(ui, 'payload chunk size: %i' % chunksize)
1211
1211
1212 # changegroup.readexactly() is inlined below for performance.
1212 # changegroup.readexactly() is inlined below for performance.
1213 while chunksize:
1213 while chunksize:
1214 if chunksize >= 0:
1214 if chunksize >= 0:
1215 s = read(chunksize)
1215 s = read(chunksize)
1216 if len(s) < chunksize:
1216 if len(s) < chunksize:
1217 raise error.Abort(_('stream ended unexpectedly '
1217 raise error.Abort(_('stream ended unexpectedly '
1218 ' (got %d bytes, expected %d)') %
1218 ' (got %d bytes, expected %d)') %
1219 (len(s), chunksize))
1219 (len(s), chunksize))
1220
1220
1221 yield s
1221 yield s
1222 elif chunksize == flaginterrupt:
1222 elif chunksize == flaginterrupt:
1223 # Interrupt "signal" detected. The regular stream is interrupted
1223 # Interrupt "signal" detected. The regular stream is interrupted
1224 # and a bundle2 part follows. Consume it.
1224 # and a bundle2 part follows. Consume it.
1225 interrupthandler(ui, fh)()
1225 interrupthandler(ui, fh)()
1226 else:
1226 else:
1227 raise error.BundleValueError(
1227 raise error.BundleValueError(
1228 'negative payload chunk size: %s' % chunksize)
1228 'negative payload chunk size: %s' % chunksize)
1229
1229
1230 s = read(headersize)
1230 s = read(headersize)
1231 if len(s) < headersize:
1231 if len(s) < headersize:
1232 raise error.Abort(_('stream ended unexpectedly '
1232 raise error.Abort(_('stream ended unexpectedly '
1233 ' (got %d bytes, expected %d)') %
1233 ' (got %d bytes, expected %d)') %
1234 (len(s), chunksize))
1234 (len(s), chunksize))
1235
1235
1236 chunksize = unpack(s)[0]
1236 chunksize = unpack(s)[0]
1237
1237
1238 # indebug() inlined for performance.
1238 # indebug() inlined for performance.
1239 if dolog:
1239 if dolog:
1240 debug('bundle2-input: payload chunk size: %i\n' % chunksize)
1240 debug('bundle2-input: payload chunk size: %i\n' % chunksize)
1241
1241
1242 class unbundlepart(unpackermixin):
1242 class unbundlepart(unpackermixin):
1243 """a bundle part read from a bundle"""
1243 """a bundle part read from a bundle"""
1244
1244
1245 def __init__(self, ui, header, fp):
1245 def __init__(self, ui, header, fp):
1246 super(unbundlepart, self).__init__(fp)
1246 super(unbundlepart, self).__init__(fp)
1247 self._seekable = (util.safehasattr(fp, 'seek') and
1247 self._seekable = (util.safehasattr(fp, 'seek') and
1248 util.safehasattr(fp, 'tell'))
1248 util.safehasattr(fp, 'tell'))
1249 self.ui = ui
1249 self.ui = ui
1250 # unbundle state attr
1250 # unbundle state attr
1251 self._headerdata = header
1251 self._headerdata = header
1252 self._headeroffset = 0
1252 self._headeroffset = 0
1253 self._initialized = False
1253 self._initialized = False
1254 self.consumed = False
1254 self.consumed = False
1255 # part data
1255 # part data
1256 self.id = None
1256 self.id = None
1257 self.type = None
1257 self.type = None
1258 self.mandatoryparams = None
1258 self.mandatoryparams = None
1259 self.advisoryparams = None
1259 self.advisoryparams = None
1260 self.params = None
1260 self.params = None
1261 self.mandatorykeys = ()
1261 self.mandatorykeys = ()
1262 self._readheader()
1262 self._readheader()
1263 self._mandatory = None
1263 self._mandatory = None
1264 self._pos = 0
1264 self._pos = 0
1265
1265
1266 def _fromheader(self, size):
1266 def _fromheader(self, size):
1267 """return the next <size> byte from the header"""
1267 """return the next <size> byte from the header"""
1268 offset = self._headeroffset
1268 offset = self._headeroffset
1269 data = self._headerdata[offset:(offset + size)]
1269 data = self._headerdata[offset:(offset + size)]
1270 self._headeroffset = offset + size
1270 self._headeroffset = offset + size
1271 return data
1271 return data
1272
1272
1273 def _unpackheader(self, format):
1273 def _unpackheader(self, format):
1274 """read given format from header
1274 """read given format from header
1275
1275
1276 This automatically compute the size of the format to read."""
1276 This automatically compute the size of the format to read."""
1277 data = self._fromheader(struct.calcsize(format))
1277 data = self._fromheader(struct.calcsize(format))
1278 return _unpack(format, data)
1278 return _unpack(format, data)
1279
1279
1280 def _initparams(self, mandatoryparams, advisoryparams):
1280 def _initparams(self, mandatoryparams, advisoryparams):
1281 """internal function to setup all logic related parameters"""
1281 """internal function to setup all logic related parameters"""
1282 # make it read only to prevent people touching it by mistake.
1282 # make it read only to prevent people touching it by mistake.
1283 self.mandatoryparams = tuple(mandatoryparams)
1283 self.mandatoryparams = tuple(mandatoryparams)
1284 self.advisoryparams = tuple(advisoryparams)
1284 self.advisoryparams = tuple(advisoryparams)
1285 # user friendly UI
1285 # user friendly UI
1286 self.params = util.sortdict(self.mandatoryparams)
1286 self.params = util.sortdict(self.mandatoryparams)
1287 self.params.update(self.advisoryparams)
1287 self.params.update(self.advisoryparams)
1288 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1288 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1289
1289
1290 def _readheader(self):
1290 def _readheader(self):
1291 """read the header and setup the object"""
1291 """read the header and setup the object"""
1292 typesize = self._unpackheader(_fparttypesize)[0]
1292 typesize = self._unpackheader(_fparttypesize)[0]
1293 self.type = self._fromheader(typesize)
1293 self.type = self._fromheader(typesize)
1294 indebug(self.ui, 'part type: "%s"' % self.type)
1294 indebug(self.ui, 'part type: "%s"' % self.type)
1295 self.id = self._unpackheader(_fpartid)[0]
1295 self.id = self._unpackheader(_fpartid)[0]
1296 indebug(self.ui, 'part id: "%s"' % pycompat.bytestr(self.id))
1296 indebug(self.ui, 'part id: "%s"' % pycompat.bytestr(self.id))
1297 # extract mandatory bit from type
1297 # extract mandatory bit from type
1298 self.mandatory = (self.type != self.type.lower())
1298 self.mandatory = (self.type != self.type.lower())
1299 self.type = self.type.lower()
1299 self.type = self.type.lower()
1300 ## reading parameters
1300 ## reading parameters
1301 # param count
1301 # param count
1302 mancount, advcount = self._unpackheader(_fpartparamcount)
1302 mancount, advcount = self._unpackheader(_fpartparamcount)
1303 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1303 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1304 # param size
1304 # param size
1305 fparamsizes = _makefpartparamsizes(mancount + advcount)
1305 fparamsizes = _makefpartparamsizes(mancount + advcount)
1306 paramsizes = self._unpackheader(fparamsizes)
1306 paramsizes = self._unpackheader(fparamsizes)
1307 # make it a list of couple again
1307 # make it a list of couple again
1308 paramsizes = list(zip(paramsizes[::2], paramsizes[1::2]))
1308 paramsizes = list(zip(paramsizes[::2], paramsizes[1::2]))
1309 # split mandatory from advisory
1309 # split mandatory from advisory
1310 mansizes = paramsizes[:mancount]
1310 mansizes = paramsizes[:mancount]
1311 advsizes = paramsizes[mancount:]
1311 advsizes = paramsizes[mancount:]
1312 # retrieve param value
1312 # retrieve param value
1313 manparams = []
1313 manparams = []
1314 for key, value in mansizes:
1314 for key, value in mansizes:
1315 manparams.append((self._fromheader(key), self._fromheader(value)))
1315 manparams.append((self._fromheader(key), self._fromheader(value)))
1316 advparams = []
1316 advparams = []
1317 for key, value in advsizes:
1317 for key, value in advsizes:
1318 advparams.append((self._fromheader(key), self._fromheader(value)))
1318 advparams.append((self._fromheader(key), self._fromheader(value)))
1319 self._initparams(manparams, advparams)
1319 self._initparams(manparams, advparams)
1320 ## part payload
1320 ## part payload
1321 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1321 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1322 # we read the data, tell it
1322 # we read the data, tell it
1323 self._initialized = True
1323 self._initialized = True
1324
1324
1325 def _payloadchunks(self):
1325 def _payloadchunks(self):
1326 """Generator of decoded chunks in the payload."""
1326 """Generator of decoded chunks in the payload."""
1327 return decodepayloadchunks(self.ui, self._fp)
1327 return decodepayloadchunks(self.ui, self._fp)
1328
1328
1329 def consume(self):
1329 def consume(self):
1330 """Read the part payload until completion.
1330 """Read the part payload until completion.
1331
1331
1332 By consuming the part data, the underlying stream read offset will
1332 By consuming the part data, the underlying stream read offset will
1333 be advanced to the next part (or end of stream).
1333 be advanced to the next part (or end of stream).
1334 """
1334 """
1335 if self.consumed:
1335 if self.consumed:
1336 return
1336 return
1337
1337
1338 chunk = self.read(32768)
1338 chunk = self.read(32768)
1339 while chunk:
1339 while chunk:
1340 self._pos += len(chunk)
1340 self._pos += len(chunk)
1341 chunk = self.read(32768)
1341 chunk = self.read(32768)
1342
1342
1343 def read(self, size=None):
1343 def read(self, size=None):
1344 """read payload data"""
1344 """read payload data"""
1345 if not self._initialized:
1345 if not self._initialized:
1346 self._readheader()
1346 self._readheader()
1347 if size is None:
1347 if size is None:
1348 data = self._payloadstream.read()
1348 data = self._payloadstream.read()
1349 else:
1349 else:
1350 data = self._payloadstream.read(size)
1350 data = self._payloadstream.read(size)
1351 self._pos += len(data)
1351 self._pos += len(data)
1352 if size is None or len(data) < size:
1352 if size is None or len(data) < size:
1353 if not self.consumed and self._pos:
1353 if not self.consumed and self._pos:
1354 self.ui.debug('bundle2-input-part: total payload size %i\n'
1354 self.ui.debug('bundle2-input-part: total payload size %i\n'
1355 % self._pos)
1355 % self._pos)
1356 self.consumed = True
1356 self.consumed = True
1357 return data
1357 return data
1358
1358
1359 class seekableunbundlepart(unbundlepart):
1359 class seekableunbundlepart(unbundlepart):
1360 """A bundle2 part in a bundle that is seekable.
1360 """A bundle2 part in a bundle that is seekable.
1361
1361
1362 Regular ``unbundlepart`` instances can only be read once. This class
1362 Regular ``unbundlepart`` instances can only be read once. This class
1363 extends ``unbundlepart`` to enable bi-directional seeking within the
1363 extends ``unbundlepart`` to enable bi-directional seeking within the
1364 part.
1364 part.
1365
1365
1366 Bundle2 part data consists of framed chunks. Offsets when seeking
1366 Bundle2 part data consists of framed chunks. Offsets when seeking
1367 refer to the decoded data, not the offsets in the underlying bundle2
1367 refer to the decoded data, not the offsets in the underlying bundle2
1368 stream.
1368 stream.
1369
1369
1370 To facilitate quickly seeking within the decoded data, instances of this
1370 To facilitate quickly seeking within the decoded data, instances of this
1371 class maintain a mapping between offsets in the underlying stream and
1371 class maintain a mapping between offsets in the underlying stream and
1372 the decoded payload. This mapping will consume memory in proportion
1372 the decoded payload. This mapping will consume memory in proportion
1373 to the number of chunks within the payload (which almost certainly
1373 to the number of chunks within the payload (which almost certainly
1374 increases in proportion with the size of the part).
1374 increases in proportion with the size of the part).
1375 """
1375 """
1376 def __init__(self, ui, header, fp):
1376 def __init__(self, ui, header, fp):
1377 # (payload, file) offsets for chunk starts.
1377 # (payload, file) offsets for chunk starts.
1378 self._chunkindex = []
1378 self._chunkindex = []
1379
1379
1380 super(seekableunbundlepart, self).__init__(ui, header, fp)
1380 super(seekableunbundlepart, self).__init__(ui, header, fp)
1381
1381
1382 def _payloadchunks(self, chunknum=0):
1382 def _payloadchunks(self, chunknum=0):
1383 '''seek to specified chunk and start yielding data'''
1383 '''seek to specified chunk and start yielding data'''
1384 if len(self._chunkindex) == 0:
1384 if len(self._chunkindex) == 0:
1385 assert chunknum == 0, 'Must start with chunk 0'
1385 assert chunknum == 0, 'Must start with chunk 0'
1386 self._chunkindex.append((0, self._tellfp()))
1386 self._chunkindex.append((0, self._tellfp()))
1387 else:
1387 else:
1388 assert chunknum < len(self._chunkindex), \
1388 assert chunknum < len(self._chunkindex), \
1389 'Unknown chunk %d' % chunknum
1389 'Unknown chunk %d' % chunknum
1390 self._seekfp(self._chunkindex[chunknum][1])
1390 self._seekfp(self._chunkindex[chunknum][1])
1391
1391
1392 pos = self._chunkindex[chunknum][0]
1392 pos = self._chunkindex[chunknum][0]
1393
1393
1394 for chunk in decodepayloadchunks(self.ui, self._fp):
1394 for chunk in decodepayloadchunks(self.ui, self._fp):
1395 chunknum += 1
1395 chunknum += 1
1396 pos += len(chunk)
1396 pos += len(chunk)
1397 if chunknum == len(self._chunkindex):
1397 if chunknum == len(self._chunkindex):
1398 self._chunkindex.append((pos, self._tellfp()))
1398 self._chunkindex.append((pos, self._tellfp()))
1399
1399
1400 yield chunk
1400 yield chunk
1401
1401
1402 def _findchunk(self, pos):
1402 def _findchunk(self, pos):
1403 '''for a given payload position, return a chunk number and offset'''
1403 '''for a given payload position, return a chunk number and offset'''
1404 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1404 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1405 if ppos == pos:
1405 if ppos == pos:
1406 return chunk, 0
1406 return chunk, 0
1407 elif ppos > pos:
1407 elif ppos > pos:
1408 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1408 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1409 raise ValueError('Unknown chunk')
1409 raise ValueError('Unknown chunk')
1410
1410
1411 def tell(self):
1411 def tell(self):
1412 return self._pos
1412 return self._pos
1413
1413
1414 def seek(self, offset, whence=os.SEEK_SET):
1414 def seek(self, offset, whence=os.SEEK_SET):
1415 if whence == os.SEEK_SET:
1415 if whence == os.SEEK_SET:
1416 newpos = offset
1416 newpos = offset
1417 elif whence == os.SEEK_CUR:
1417 elif whence == os.SEEK_CUR:
1418 newpos = self._pos + offset
1418 newpos = self._pos + offset
1419 elif whence == os.SEEK_END:
1419 elif whence == os.SEEK_END:
1420 if not self.consumed:
1420 if not self.consumed:
1421 # Can't use self.consume() here because it advances self._pos.
1421 # Can't use self.consume() here because it advances self._pos.
1422 chunk = self.read(32768)
1422 chunk = self.read(32768)
1423 while chunk:
1423 while chunk:
1424 chunk = self.read(32768)
1424 chunk = self.read(32768)
1425 newpos = self._chunkindex[-1][0] - offset
1425 newpos = self._chunkindex[-1][0] - offset
1426 else:
1426 else:
1427 raise ValueError('Unknown whence value: %r' % (whence,))
1427 raise ValueError('Unknown whence value: %r' % (whence,))
1428
1428
1429 if newpos > self._chunkindex[-1][0] and not self.consumed:
1429 if newpos > self._chunkindex[-1][0] and not self.consumed:
1430 # Can't use self.consume() here because it advances self._pos.
1430 # Can't use self.consume() here because it advances self._pos.
1431 chunk = self.read(32768)
1431 chunk = self.read(32768)
1432 while chunk:
1432 while chunk:
1433 chunk = self.read(32668)
1433 chunk = self.read(32668)
1434
1434
1435 if not 0 <= newpos <= self._chunkindex[-1][0]:
1435 if not 0 <= newpos <= self._chunkindex[-1][0]:
1436 raise ValueError('Offset out of range')
1436 raise ValueError('Offset out of range')
1437
1437
1438 if self._pos != newpos:
1438 if self._pos != newpos:
1439 chunk, internaloffset = self._findchunk(newpos)
1439 chunk, internaloffset = self._findchunk(newpos)
1440 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1440 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1441 adjust = self.read(internaloffset)
1441 adjust = self.read(internaloffset)
1442 if len(adjust) != internaloffset:
1442 if len(adjust) != internaloffset:
1443 raise error.Abort(_('Seek failed\n'))
1443 raise error.Abort(_('Seek failed\n'))
1444 self._pos = newpos
1444 self._pos = newpos
1445
1445
1446 def _seekfp(self, offset, whence=0):
1446 def _seekfp(self, offset, whence=0):
1447 """move the underlying file pointer
1447 """move the underlying file pointer
1448
1448
1449 This method is meant for internal usage by the bundle2 protocol only.
1449 This method is meant for internal usage by the bundle2 protocol only.
1450 They directly manipulate the low level stream including bundle2 level
1450 They directly manipulate the low level stream including bundle2 level
1451 instruction.
1451 instruction.
1452
1452
1453 Do not use it to implement higher-level logic or methods."""
1453 Do not use it to implement higher-level logic or methods."""
1454 if self._seekable:
1454 if self._seekable:
1455 return self._fp.seek(offset, whence)
1455 return self._fp.seek(offset, whence)
1456 else:
1456 else:
1457 raise NotImplementedError(_('File pointer is not seekable'))
1457 raise NotImplementedError(_('File pointer is not seekable'))
1458
1458
1459 def _tellfp(self):
1459 def _tellfp(self):
1460 """return the file offset, or None if file is not seekable
1460 """return the file offset, or None if file is not seekable
1461
1461
1462 This method is meant for internal usage by the bundle2 protocol only.
1462 This method is meant for internal usage by the bundle2 protocol only.
1463 They directly manipulate the low level stream including bundle2 level
1463 They directly manipulate the low level stream including bundle2 level
1464 instruction.
1464 instruction.
1465
1465
1466 Do not use it to implement higher-level logic or methods."""
1466 Do not use it to implement higher-level logic or methods."""
1467 if self._seekable:
1467 if self._seekable:
1468 try:
1468 try:
1469 return self._fp.tell()
1469 return self._fp.tell()
1470 except IOError as e:
1470 except IOError as e:
1471 if e.errno == errno.ESPIPE:
1471 if e.errno == errno.ESPIPE:
1472 self._seekable = False
1472 self._seekable = False
1473 else:
1473 else:
1474 raise
1474 raise
1475 return None
1475 return None
1476
1476
1477 # These are only the static capabilities.
1477 # These are only the static capabilities.
1478 # Check the 'getrepocaps' function for the rest.
1478 # Check the 'getrepocaps' function for the rest.
1479 capabilities = {'HG20': (),
1479 capabilities = {'HG20': (),
1480 'bookmarks': (),
1480 'bookmarks': (),
1481 'error': ('abort', 'unsupportedcontent', 'pushraced',
1481 'error': ('abort', 'unsupportedcontent', 'pushraced',
1482 'pushkey'),
1482 'pushkey'),
1483 'listkeys': (),
1483 'listkeys': (),
1484 'pushkey': (),
1484 'pushkey': (),
1485 'digests': tuple(sorted(util.DIGESTS.keys())),
1485 'digests': tuple(sorted(util.DIGESTS.keys())),
1486 'remote-changegroup': ('http', 'https'),
1486 'remote-changegroup': ('http', 'https'),
1487 'hgtagsfnodes': (),
1487 'hgtagsfnodes': (),
1488 'phases': ('heads',),
1488 'phases': ('heads',),
1489 }
1489 }
1490
1490
1491 def getrepocaps(repo, allowpushback=False):
1491 def getrepocaps(repo, allowpushback=False):
1492 """return the bundle2 capabilities for a given repo
1492 """return the bundle2 capabilities for a given repo
1493
1493
1494 Exists to allow extensions (like evolution) to mutate the capabilities.
1494 Exists to allow extensions (like evolution) to mutate the capabilities.
1495 """
1495 """
1496 caps = capabilities.copy()
1496 caps = capabilities.copy()
1497 caps['changegroup'] = tuple(sorted(
1497 caps['changegroup'] = tuple(sorted(
1498 changegroup.supportedincomingversions(repo)))
1498 changegroup.supportedincomingversions(repo)))
1499 if obsolete.isenabled(repo, obsolete.exchangeopt):
1499 if obsolete.isenabled(repo, obsolete.exchangeopt):
1500 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1500 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1501 caps['obsmarkers'] = supportedformat
1501 caps['obsmarkers'] = supportedformat
1502 if allowpushback:
1502 if allowpushback:
1503 caps['pushback'] = ()
1503 caps['pushback'] = ()
1504 cpmode = repo.ui.config('server', 'concurrent-push-mode')
1504 cpmode = repo.ui.config('server', 'concurrent-push-mode')
1505 if cpmode == 'check-related':
1505 if cpmode == 'check-related':
1506 caps['checkheads'] = ('related',)
1506 caps['checkheads'] = ('related',)
1507 if 'phases' in repo.ui.configlist('devel', 'legacy.exchange'):
1507 if 'phases' in repo.ui.configlist('devel', 'legacy.exchange'):
1508 caps.pop('phases')
1508 caps.pop('phases')
1509 return caps
1509 return caps
1510
1510
1511 def bundle2caps(remote):
1511 def bundle2caps(remote):
1512 """return the bundle capabilities of a peer as dict"""
1512 """return the bundle capabilities of a peer as dict"""
1513 raw = remote.capable('bundle2')
1513 raw = remote.capable('bundle2')
1514 if not raw and raw != '':
1514 if not raw and raw != '':
1515 return {}
1515 return {}
1516 capsblob = urlreq.unquote(remote.capable('bundle2'))
1516 capsblob = urlreq.unquote(remote.capable('bundle2'))
1517 return decodecaps(capsblob)
1517 return decodecaps(capsblob)
1518
1518
1519 def obsmarkersversion(caps):
1519 def obsmarkersversion(caps):
1520 """extract the list of supported obsmarkers versions from a bundle2caps dict
1520 """extract the list of supported obsmarkers versions from a bundle2caps dict
1521 """
1521 """
1522 obscaps = caps.get('obsmarkers', ())
1522 obscaps = caps.get('obsmarkers', ())
1523 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1523 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1524
1524
1525 def writenewbundle(ui, repo, source, filename, bundletype, outgoing, opts,
1525 def writenewbundle(ui, repo, source, filename, bundletype, outgoing, opts,
1526 vfs=None, compression=None, compopts=None):
1526 vfs=None, compression=None, compopts=None):
1527 if bundletype.startswith('HG10'):
1527 if bundletype.startswith('HG10'):
1528 cg = changegroup.makechangegroup(repo, outgoing, '01', source)
1528 cg = changegroup.makechangegroup(repo, outgoing, '01', source)
1529 return writebundle(ui, cg, filename, bundletype, vfs=vfs,
1529 return writebundle(ui, cg, filename, bundletype, vfs=vfs,
1530 compression=compression, compopts=compopts)
1530 compression=compression, compopts=compopts)
1531 elif not bundletype.startswith('HG20'):
1531 elif not bundletype.startswith('HG20'):
1532 raise error.ProgrammingError('unknown bundle type: %s' % bundletype)
1532 raise error.ProgrammingError('unknown bundle type: %s' % bundletype)
1533
1533
1534 caps = {}
1534 caps = {}
1535 if 'obsolescence' in opts:
1535 if 'obsolescence' in opts:
1536 caps['obsmarkers'] = ('V1',)
1536 caps['obsmarkers'] = ('V1',)
1537 bundle = bundle20(ui, caps)
1537 bundle = bundle20(ui, caps)
1538 bundle.setcompression(compression, compopts)
1538 bundle.setcompression(compression, compopts)
1539 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1539 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1540 chunkiter = bundle.getchunks()
1540 chunkiter = bundle.getchunks()
1541
1541
1542 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1542 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1543
1543
1544 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1544 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1545 # We should eventually reconcile this logic with the one behind
1545 # We should eventually reconcile this logic with the one behind
1546 # 'exchange.getbundle2partsgenerator'.
1546 # 'exchange.getbundle2partsgenerator'.
1547 #
1547 #
1548 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1548 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1549 # different right now. So we keep them separated for now for the sake of
1549 # different right now. So we keep them separated for now for the sake of
1550 # simplicity.
1550 # simplicity.
1551
1551
1552 # we always want a changegroup in such bundle
1552 # we always want a changegroup in such bundle
1553 cgversion = opts.get('cg.version')
1553 cgversion = opts.get('cg.version')
1554 if cgversion is None:
1554 if cgversion is None:
1555 cgversion = changegroup.safeversion(repo)
1555 cgversion = changegroup.safeversion(repo)
1556 cg = changegroup.makechangegroup(repo, outgoing, cgversion, source)
1556 cg = changegroup.makechangegroup(repo, outgoing, cgversion, source)
1557 part = bundler.newpart('changegroup', data=cg.getchunks())
1557 part = bundler.newpart('changegroup', data=cg.getchunks())
1558 part.addparam('version', cg.version)
1558 part.addparam('version', cg.version)
1559 if 'clcount' in cg.extras:
1559 if 'clcount' in cg.extras:
1560 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1560 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1561 mandatory=False)
1561 mandatory=False)
1562 if opts.get('phases') and repo.revs('%ln and secret()',
1562 if opts.get('phases') and repo.revs('%ln and secret()',
1563 outgoing.missingheads):
1563 outgoing.missingheads):
1564 part.addparam('targetphase', '%d' % phases.secret, mandatory=False)
1564 part.addparam('targetphase', '%d' % phases.secret, mandatory=False)
1565
1565
1566 addparttagsfnodescache(repo, bundler, outgoing)
1566 addparttagsfnodescache(repo, bundler, outgoing)
1567
1567
1568 if opts.get('obsolescence', False):
1568 if opts.get('obsolescence', False):
1569 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1569 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1570 buildobsmarkerspart(bundler, obsmarkers)
1570 buildobsmarkerspart(bundler, obsmarkers)
1571
1571
1572 if opts.get('phases', False):
1572 if opts.get('phases', False):
1573 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1573 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1574 phasedata = phases.binaryencode(headsbyphase)
1574 phasedata = phases.binaryencode(headsbyphase)
1575 bundler.newpart('phase-heads', data=phasedata)
1575 bundler.newpart('phase-heads', data=phasedata)
1576
1576
1577 def addparttagsfnodescache(repo, bundler, outgoing):
1577 def addparttagsfnodescache(repo, bundler, outgoing):
1578 # we include the tags fnode cache for the bundle changeset
1578 # we include the tags fnode cache for the bundle changeset
1579 # (as an optional parts)
1579 # (as an optional parts)
1580 cache = tags.hgtagsfnodescache(repo.unfiltered())
1580 cache = tags.hgtagsfnodescache(repo.unfiltered())
1581 chunks = []
1581 chunks = []
1582
1582
1583 # .hgtags fnodes are only relevant for head changesets. While we could
1583 # .hgtags fnodes are only relevant for head changesets. While we could
1584 # transfer values for all known nodes, there will likely be little to
1584 # transfer values for all known nodes, there will likely be little to
1585 # no benefit.
1585 # no benefit.
1586 #
1586 #
1587 # We don't bother using a generator to produce output data because
1587 # We don't bother using a generator to produce output data because
1588 # a) we only have 40 bytes per head and even esoteric numbers of heads
1588 # a) we only have 40 bytes per head and even esoteric numbers of heads
1589 # consume little memory (1M heads is 40MB) b) we don't want to send the
1589 # consume little memory (1M heads is 40MB) b) we don't want to send the
1590 # part if we don't have entries and knowing if we have entries requires
1590 # part if we don't have entries and knowing if we have entries requires
1591 # cache lookups.
1591 # cache lookups.
1592 for node in outgoing.missingheads:
1592 for node in outgoing.missingheads:
1593 # Don't compute missing, as this may slow down serving.
1593 # Don't compute missing, as this may slow down serving.
1594 fnode = cache.getfnode(node, computemissing=False)
1594 fnode = cache.getfnode(node, computemissing=False)
1595 if fnode is not None:
1595 if fnode is not None:
1596 chunks.extend([node, fnode])
1596 chunks.extend([node, fnode])
1597
1597
1598 if chunks:
1598 if chunks:
1599 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1599 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1600
1600
1601 def buildobsmarkerspart(bundler, markers):
1601 def buildobsmarkerspart(bundler, markers):
1602 """add an obsmarker part to the bundler with <markers>
1602 """add an obsmarker part to the bundler with <markers>
1603
1603
1604 No part is created if markers is empty.
1604 No part is created if markers is empty.
1605 Raises ValueError if the bundler doesn't support any known obsmarker format.
1605 Raises ValueError if the bundler doesn't support any known obsmarker format.
1606 """
1606 """
1607 if not markers:
1607 if not markers:
1608 return None
1608 return None
1609
1609
1610 remoteversions = obsmarkersversion(bundler.capabilities)
1610 remoteversions = obsmarkersversion(bundler.capabilities)
1611 version = obsolete.commonversion(remoteversions)
1611 version = obsolete.commonversion(remoteversions)
1612 if version is None:
1612 if version is None:
1613 raise ValueError('bundler does not support common obsmarker format')
1613 raise ValueError('bundler does not support common obsmarker format')
1614 stream = obsolete.encodemarkers(markers, True, version=version)
1614 stream = obsolete.encodemarkers(markers, True, version=version)
1615 return bundler.newpart('obsmarkers', data=stream)
1615 return bundler.newpart('obsmarkers', data=stream)
1616
1616
1617 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1617 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1618 compopts=None):
1618 compopts=None):
1619 """Write a bundle file and return its filename.
1619 """Write a bundle file and return its filename.
1620
1620
1621 Existing files will not be overwritten.
1621 Existing files will not be overwritten.
1622 If no filename is specified, a temporary file is created.
1622 If no filename is specified, a temporary file is created.
1623 bz2 compression can be turned off.
1623 bz2 compression can be turned off.
1624 The bundle file will be deleted in case of errors.
1624 The bundle file will be deleted in case of errors.
1625 """
1625 """
1626
1626
1627 if bundletype == "HG20":
1627 if bundletype == "HG20":
1628 bundle = bundle20(ui)
1628 bundle = bundle20(ui)
1629 bundle.setcompression(compression, compopts)
1629 bundle.setcompression(compression, compopts)
1630 part = bundle.newpart('changegroup', data=cg.getchunks())
1630 part = bundle.newpart('changegroup', data=cg.getchunks())
1631 part.addparam('version', cg.version)
1631 part.addparam('version', cg.version)
1632 if 'clcount' in cg.extras:
1632 if 'clcount' in cg.extras:
1633 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1633 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1634 mandatory=False)
1634 mandatory=False)
1635 chunkiter = bundle.getchunks()
1635 chunkiter = bundle.getchunks()
1636 else:
1636 else:
1637 # compression argument is only for the bundle2 case
1637 # compression argument is only for the bundle2 case
1638 assert compression is None
1638 assert compression is None
1639 if cg.version != '01':
1639 if cg.version != '01':
1640 raise error.Abort(_('old bundle types only supports v1 '
1640 raise error.Abort(_('old bundle types only supports v1 '
1641 'changegroups'))
1641 'changegroups'))
1642 header, comp = bundletypes[bundletype]
1642 header, comp = bundletypes[bundletype]
1643 if comp not in util.compengines.supportedbundletypes:
1643 if comp not in util.compengines.supportedbundletypes:
1644 raise error.Abort(_('unknown stream compression type: %s')
1644 raise error.Abort(_('unknown stream compression type: %s')
1645 % comp)
1645 % comp)
1646 compengine = util.compengines.forbundletype(comp)
1646 compengine = util.compengines.forbundletype(comp)
1647 def chunkiter():
1647 def chunkiter():
1648 yield header
1648 yield header
1649 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1649 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1650 yield chunk
1650 yield chunk
1651 chunkiter = chunkiter()
1651 chunkiter = chunkiter()
1652
1652
1653 # parse the changegroup data, otherwise we will block
1653 # parse the changegroup data, otherwise we will block
1654 # in case of sshrepo because we don't know the end of the stream
1654 # in case of sshrepo because we don't know the end of the stream
1655 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1655 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1656
1656
1657 def combinechangegroupresults(op):
1657 def combinechangegroupresults(op):
1658 """logic to combine 0 or more addchangegroup results into one"""
1658 """logic to combine 0 or more addchangegroup results into one"""
1659 results = [r.get('return', 0)
1659 results = [r.get('return', 0)
1660 for r in op.records['changegroup']]
1660 for r in op.records['changegroup']]
1661 changedheads = 0
1661 changedheads = 0
1662 result = 1
1662 result = 1
1663 for ret in results:
1663 for ret in results:
1664 # If any changegroup result is 0, return 0
1664 # If any changegroup result is 0, return 0
1665 if ret == 0:
1665 if ret == 0:
1666 result = 0
1666 result = 0
1667 break
1667 break
1668 if ret < -1:
1668 if ret < -1:
1669 changedheads += ret + 1
1669 changedheads += ret + 1
1670 elif ret > 1:
1670 elif ret > 1:
1671 changedheads += ret - 1
1671 changedheads += ret - 1
1672 if changedheads > 0:
1672 if changedheads > 0:
1673 result = 1 + changedheads
1673 result = 1 + changedheads
1674 elif changedheads < 0:
1674 elif changedheads < 0:
1675 result = -1 + changedheads
1675 result = -1 + changedheads
1676 return result
1676 return result
1677
1677
1678 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest',
1678 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest',
1679 'targetphase'))
1679 'targetphase'))
1680 def handlechangegroup(op, inpart):
1680 def handlechangegroup(op, inpart):
1681 """apply a changegroup part on the repo
1681 """apply a changegroup part on the repo
1682
1682
1683 This is a very early implementation that will massive rework before being
1683 This is a very early implementation that will massive rework before being
1684 inflicted to any end-user.
1684 inflicted to any end-user.
1685 """
1685 """
1686 tr = op.gettransaction()
1686 tr = op.gettransaction()
1687 unpackerversion = inpart.params.get('version', '01')
1687 unpackerversion = inpart.params.get('version', '01')
1688 # We should raise an appropriate exception here
1688 # We should raise an appropriate exception here
1689 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1689 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1690 # the source and url passed here are overwritten by the one contained in
1690 # the source and url passed here are overwritten by the one contained in
1691 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1691 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1692 nbchangesets = None
1692 nbchangesets = None
1693 if 'nbchanges' in inpart.params:
1693 if 'nbchanges' in inpart.params:
1694 nbchangesets = int(inpart.params.get('nbchanges'))
1694 nbchangesets = int(inpart.params.get('nbchanges'))
1695 if ('treemanifest' in inpart.params and
1695 if ('treemanifest' in inpart.params and
1696 'treemanifest' not in op.repo.requirements):
1696 'treemanifest' not in op.repo.requirements):
1697 if len(op.repo.changelog) != 0:
1697 if len(op.repo.changelog) != 0:
1698 raise error.Abort(_(
1698 raise error.Abort(_(
1699 "bundle contains tree manifests, but local repo is "
1699 "bundle contains tree manifests, but local repo is "
1700 "non-empty and does not use tree manifests"))
1700 "non-empty and does not use tree manifests"))
1701 op.repo.requirements.add('treemanifest')
1701 op.repo.requirements.add('treemanifest')
1702 op.repo._applyopenerreqs()
1702 op.repo._applyopenerreqs()
1703 op.repo._writerequirements()
1703 op.repo._writerequirements()
1704 extrakwargs = {}
1704 extrakwargs = {}
1705 targetphase = inpart.params.get('targetphase')
1705 targetphase = inpart.params.get('targetphase')
1706 if targetphase is not None:
1706 if targetphase is not None:
1707 extrakwargs['targetphase'] = int(targetphase)
1707 extrakwargs['targetphase'] = int(targetphase)
1708 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2',
1708 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2',
1709 expectedtotal=nbchangesets, **extrakwargs)
1709 expectedtotal=nbchangesets, **extrakwargs)
1710 if op.reply is not None:
1710 if op.reply is not None:
1711 # This is definitely not the final form of this
1711 # This is definitely not the final form of this
1712 # return. But one need to start somewhere.
1712 # return. But one need to start somewhere.
1713 part = op.reply.newpart('reply:changegroup', mandatory=False)
1713 part = op.reply.newpart('reply:changegroup', mandatory=False)
1714 part.addparam(
1714 part.addparam(
1715 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1715 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1716 part.addparam('return', '%i' % ret, mandatory=False)
1716 part.addparam('return', '%i' % ret, mandatory=False)
1717 assert not inpart.read()
1717 assert not inpart.read()
1718
1718
1719 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1719 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1720 ['digest:%s' % k for k in util.DIGESTS.keys()])
1720 ['digest:%s' % k for k in util.DIGESTS.keys()])
1721 @parthandler('remote-changegroup', _remotechangegroupparams)
1721 @parthandler('remote-changegroup', _remotechangegroupparams)
1722 def handleremotechangegroup(op, inpart):
1722 def handleremotechangegroup(op, inpart):
1723 """apply a bundle10 on the repo, given an url and validation information
1723 """apply a bundle10 on the repo, given an url and validation information
1724
1724
1725 All the information about the remote bundle to import are given as
1725 All the information about the remote bundle to import are given as
1726 parameters. The parameters include:
1726 parameters. The parameters include:
1727 - url: the url to the bundle10.
1727 - url: the url to the bundle10.
1728 - size: the bundle10 file size. It is used to validate what was
1728 - size: the bundle10 file size. It is used to validate what was
1729 retrieved by the client matches the server knowledge about the bundle.
1729 retrieved by the client matches the server knowledge about the bundle.
1730 - digests: a space separated list of the digest types provided as
1730 - digests: a space separated list of the digest types provided as
1731 parameters.
1731 parameters.
1732 - digest:<digest-type>: the hexadecimal representation of the digest with
1732 - digest:<digest-type>: the hexadecimal representation of the digest with
1733 that name. Like the size, it is used to validate what was retrieved by
1733 that name. Like the size, it is used to validate what was retrieved by
1734 the client matches what the server knows about the bundle.
1734 the client matches what the server knows about the bundle.
1735
1735
1736 When multiple digest types are given, all of them are checked.
1736 When multiple digest types are given, all of them are checked.
1737 """
1737 """
1738 try:
1738 try:
1739 raw_url = inpart.params['url']
1739 raw_url = inpart.params['url']
1740 except KeyError:
1740 except KeyError:
1741 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1741 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1742 parsed_url = util.url(raw_url)
1742 parsed_url = util.url(raw_url)
1743 if parsed_url.scheme not in capabilities['remote-changegroup']:
1743 if parsed_url.scheme not in capabilities['remote-changegroup']:
1744 raise error.Abort(_('remote-changegroup does not support %s urls') %
1744 raise error.Abort(_('remote-changegroup does not support %s urls') %
1745 parsed_url.scheme)
1745 parsed_url.scheme)
1746
1746
1747 try:
1747 try:
1748 size = int(inpart.params['size'])
1748 size = int(inpart.params['size'])
1749 except ValueError:
1749 except ValueError:
1750 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1750 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1751 % 'size')
1751 % 'size')
1752 except KeyError:
1752 except KeyError:
1753 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1753 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1754
1754
1755 digests = {}
1755 digests = {}
1756 for typ in inpart.params.get('digests', '').split():
1756 for typ in inpart.params.get('digests', '').split():
1757 param = 'digest:%s' % typ
1757 param = 'digest:%s' % typ
1758 try:
1758 try:
1759 value = inpart.params[param]
1759 value = inpart.params[param]
1760 except KeyError:
1760 except KeyError:
1761 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1761 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1762 param)
1762 param)
1763 digests[typ] = value
1763 digests[typ] = value
1764
1764
1765 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1765 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1766
1766
1767 tr = op.gettransaction()
1767 tr = op.gettransaction()
1768 from . import exchange
1768 from . import exchange
1769 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1769 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1770 if not isinstance(cg, changegroup.cg1unpacker):
1770 if not isinstance(cg, changegroup.cg1unpacker):
1771 raise error.Abort(_('%s: not a bundle version 1.0') %
1771 raise error.Abort(_('%s: not a bundle version 1.0') %
1772 util.hidepassword(raw_url))
1772 util.hidepassword(raw_url))
1773 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2')
1773 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2')
1774 if op.reply is not None:
1774 if op.reply is not None:
1775 # This is definitely not the final form of this
1775 # This is definitely not the final form of this
1776 # return. But one need to start somewhere.
1776 # return. But one need to start somewhere.
1777 part = op.reply.newpart('reply:changegroup')
1777 part = op.reply.newpart('reply:changegroup')
1778 part.addparam(
1778 part.addparam(
1779 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1779 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1780 part.addparam('return', '%i' % ret, mandatory=False)
1780 part.addparam('return', '%i' % ret, mandatory=False)
1781 try:
1781 try:
1782 real_part.validate()
1782 real_part.validate()
1783 except error.Abort as e:
1783 except error.Abort as e:
1784 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1784 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1785 (util.hidepassword(raw_url), str(e)))
1785 (util.hidepassword(raw_url), str(e)))
1786 assert not inpart.read()
1786 assert not inpart.read()
1787
1787
1788 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1788 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1789 def handlereplychangegroup(op, inpart):
1789 def handlereplychangegroup(op, inpart):
1790 ret = int(inpart.params['return'])
1790 ret = int(inpart.params['return'])
1791 replyto = int(inpart.params['in-reply-to'])
1791 replyto = int(inpart.params['in-reply-to'])
1792 op.records.add('changegroup', {'return': ret}, replyto)
1792 op.records.add('changegroup', {'return': ret}, replyto)
1793
1793
1794 @parthandler('check:bookmarks')
1794 @parthandler('check:bookmarks')
1795 def handlecheckbookmarks(op, inpart):
1795 def handlecheckbookmarks(op, inpart):
1796 """check location of bookmarks
1796 """check location of bookmarks
1797
1797
1798 This part is to be used to detect push race regarding bookmark, it
1798 This part is to be used to detect push race regarding bookmark, it
1799 contains binary encoded (bookmark, node) tuple. If the local state does
1799 contains binary encoded (bookmark, node) tuple. If the local state does
1800 not marks the one in the part, a PushRaced exception is raised
1800 not marks the one in the part, a PushRaced exception is raised
1801 """
1801 """
1802 bookdata = bookmarks.binarydecode(inpart)
1802 bookdata = bookmarks.binarydecode(inpart)
1803
1803
1804 msgstandard = ('repository changed while pushing - please try again '
1804 msgstandard = ('repository changed while pushing - please try again '
1805 '(bookmark "%s" move from %s to %s)')
1805 '(bookmark "%s" move from %s to %s)')
1806 msgmissing = ('repository changed while pushing - please try again '
1806 msgmissing = ('repository changed while pushing - please try again '
1807 '(bookmark "%s" is missing, expected %s)')
1807 '(bookmark "%s" is missing, expected %s)')
1808 msgexist = ('repository changed while pushing - please try again '
1808 msgexist = ('repository changed while pushing - please try again '
1809 '(bookmark "%s" set on %s, expected missing)')
1809 '(bookmark "%s" set on %s, expected missing)')
1810 for book, node in bookdata:
1810 for book, node in bookdata:
1811 currentnode = op.repo._bookmarks.get(book)
1811 currentnode = op.repo._bookmarks.get(book)
1812 if currentnode != node:
1812 if currentnode != node:
1813 if node is None:
1813 if node is None:
1814 finalmsg = msgexist % (book, nodemod.short(currentnode))
1814 finalmsg = msgexist % (book, nodemod.short(currentnode))
1815 elif currentnode is None:
1815 elif currentnode is None:
1816 finalmsg = msgmissing % (book, nodemod.short(node))
1816 finalmsg = msgmissing % (book, nodemod.short(node))
1817 else:
1817 else:
1818 finalmsg = msgstandard % (book, nodemod.short(node),
1818 finalmsg = msgstandard % (book, nodemod.short(node),
1819 nodemod.short(currentnode))
1819 nodemod.short(currentnode))
1820 raise error.PushRaced(finalmsg)
1820 raise error.PushRaced(finalmsg)
1821
1821
1822 @parthandler('check:heads')
1822 @parthandler('check:heads')
1823 def handlecheckheads(op, inpart):
1823 def handlecheckheads(op, inpart):
1824 """check that head of the repo did not change
1824 """check that head of the repo did not change
1825
1825
1826 This is used to detect a push race when using unbundle.
1826 This is used to detect a push race when using unbundle.
1827 This replaces the "heads" argument of unbundle."""
1827 This replaces the "heads" argument of unbundle."""
1828 h = inpart.read(20)
1828 h = inpart.read(20)
1829 heads = []
1829 heads = []
1830 while len(h) == 20:
1830 while len(h) == 20:
1831 heads.append(h)
1831 heads.append(h)
1832 h = inpart.read(20)
1832 h = inpart.read(20)
1833 assert not h
1833 assert not h
1834 # Trigger a transaction so that we are guaranteed to have the lock now.
1834 # Trigger a transaction so that we are guaranteed to have the lock now.
1835 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1835 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1836 op.gettransaction()
1836 op.gettransaction()
1837 if sorted(heads) != sorted(op.repo.heads()):
1837 if sorted(heads) != sorted(op.repo.heads()):
1838 raise error.PushRaced('repository changed while pushing - '
1838 raise error.PushRaced('repository changed while pushing - '
1839 'please try again')
1839 'please try again')
1840
1840
1841 @parthandler('check:updated-heads')
1841 @parthandler('check:updated-heads')
1842 def handlecheckupdatedheads(op, inpart):
1842 def handlecheckupdatedheads(op, inpart):
1843 """check for race on the heads touched by a push
1843 """check for race on the heads touched by a push
1844
1844
1845 This is similar to 'check:heads' but focus on the heads actually updated
1845 This is similar to 'check:heads' but focus on the heads actually updated
1846 during the push. If other activities happen on unrelated heads, it is
1846 during the push. If other activities happen on unrelated heads, it is
1847 ignored.
1847 ignored.
1848
1848
1849 This allow server with high traffic to avoid push contention as long as
1849 This allow server with high traffic to avoid push contention as long as
1850 unrelated parts of the graph are involved."""
1850 unrelated parts of the graph are involved."""
1851 h = inpart.read(20)
1851 h = inpart.read(20)
1852 heads = []
1852 heads = []
1853 while len(h) == 20:
1853 while len(h) == 20:
1854 heads.append(h)
1854 heads.append(h)
1855 h = inpart.read(20)
1855 h = inpart.read(20)
1856 assert not h
1856 assert not h
1857 # trigger a transaction so that we are guaranteed to have the lock now.
1857 # trigger a transaction so that we are guaranteed to have the lock now.
1858 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1858 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1859 op.gettransaction()
1859 op.gettransaction()
1860
1860
1861 currentheads = set()
1861 currentheads = set()
1862 for ls in op.repo.branchmap().itervalues():
1862 for ls in op.repo.branchmap().itervalues():
1863 currentheads.update(ls)
1863 currentheads.update(ls)
1864
1864
1865 for h in heads:
1865 for h in heads:
1866 if h not in currentheads:
1866 if h not in currentheads:
1867 raise error.PushRaced('repository changed while pushing - '
1867 raise error.PushRaced('repository changed while pushing - '
1868 'please try again')
1868 'please try again')
1869
1869
1870 @parthandler('check:phases')
1870 @parthandler('check:phases')
1871 def handlecheckphases(op, inpart):
1871 def handlecheckphases(op, inpart):
1872 """check that phase boundaries of the repository did not change
1872 """check that phase boundaries of the repository did not change
1873
1873
1874 This is used to detect a push race.
1874 This is used to detect a push race.
1875 """
1875 """
1876 phasetonodes = phases.binarydecode(inpart)
1876 phasetonodes = phases.binarydecode(inpart)
1877 unfi = op.repo.unfiltered()
1877 unfi = op.repo.unfiltered()
1878 cl = unfi.changelog
1878 cl = unfi.changelog
1879 phasecache = unfi._phasecache
1879 phasecache = unfi._phasecache
1880 msg = ('repository changed while pushing - please try again '
1880 msg = ('repository changed while pushing - please try again '
1881 '(%s is %s expected %s)')
1881 '(%s is %s expected %s)')
1882 for expectedphase, nodes in enumerate(phasetonodes):
1882 for expectedphase, nodes in enumerate(phasetonodes):
1883 for n in nodes:
1883 for n in nodes:
1884 actualphase = phasecache.phase(unfi, cl.rev(n))
1884 actualphase = phasecache.phase(unfi, cl.rev(n))
1885 if actualphase != expectedphase:
1885 if actualphase != expectedphase:
1886 finalmsg = msg % (nodemod.short(n),
1886 finalmsg = msg % (nodemod.short(n),
1887 phases.phasenames[actualphase],
1887 phases.phasenames[actualphase],
1888 phases.phasenames[expectedphase])
1888 phases.phasenames[expectedphase])
1889 raise error.PushRaced(finalmsg)
1889 raise error.PushRaced(finalmsg)
1890
1890
1891 @parthandler('output')
1891 @parthandler('output')
1892 def handleoutput(op, inpart):
1892 def handleoutput(op, inpart):
1893 """forward output captured on the server to the client"""
1893 """forward output captured on the server to the client"""
1894 for line in inpart.read().splitlines():
1894 for line in inpart.read().splitlines():
1895 op.ui.status(_('remote: %s\n') % line)
1895 op.ui.status(_('remote: %s\n') % line)
1896
1896
1897 @parthandler('replycaps')
1897 @parthandler('replycaps')
1898 def handlereplycaps(op, inpart):
1898 def handlereplycaps(op, inpart):
1899 """Notify that a reply bundle should be created
1899 """Notify that a reply bundle should be created
1900
1900
1901 The payload contains the capabilities information for the reply"""
1901 The payload contains the capabilities information for the reply"""
1902 caps = decodecaps(inpart.read())
1902 caps = decodecaps(inpart.read())
1903 if op.reply is None:
1903 if op.reply is None:
1904 op.reply = bundle20(op.ui, caps)
1904 op.reply = bundle20(op.ui, caps)
1905
1905
1906 class AbortFromPart(error.Abort):
1906 class AbortFromPart(error.Abort):
1907 """Sub-class of Abort that denotes an error from a bundle2 part."""
1907 """Sub-class of Abort that denotes an error from a bundle2 part."""
1908
1908
1909 @parthandler('error:abort', ('message', 'hint'))
1909 @parthandler('error:abort', ('message', 'hint'))
1910 def handleerrorabort(op, inpart):
1910 def handleerrorabort(op, inpart):
1911 """Used to transmit abort error over the wire"""
1911 """Used to transmit abort error over the wire"""
1912 raise AbortFromPart(inpart.params['message'],
1912 raise AbortFromPart(inpart.params['message'],
1913 hint=inpart.params.get('hint'))
1913 hint=inpart.params.get('hint'))
1914
1914
1915 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1915 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1916 'in-reply-to'))
1916 'in-reply-to'))
1917 def handleerrorpushkey(op, inpart):
1917 def handleerrorpushkey(op, inpart):
1918 """Used to transmit failure of a mandatory pushkey over the wire"""
1918 """Used to transmit failure of a mandatory pushkey over the wire"""
1919 kwargs = {}
1919 kwargs = {}
1920 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1920 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1921 value = inpart.params.get(name)
1921 value = inpart.params.get(name)
1922 if value is not None:
1922 if value is not None:
1923 kwargs[name] = value
1923 kwargs[name] = value
1924 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1924 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1925
1925
1926 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1926 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1927 def handleerrorunsupportedcontent(op, inpart):
1927 def handleerrorunsupportedcontent(op, inpart):
1928 """Used to transmit unknown content error over the wire"""
1928 """Used to transmit unknown content error over the wire"""
1929 kwargs = {}
1929 kwargs = {}
1930 parttype = inpart.params.get('parttype')
1930 parttype = inpart.params.get('parttype')
1931 if parttype is not None:
1931 if parttype is not None:
1932 kwargs['parttype'] = parttype
1932 kwargs['parttype'] = parttype
1933 params = inpart.params.get('params')
1933 params = inpart.params.get('params')
1934 if params is not None:
1934 if params is not None:
1935 kwargs['params'] = params.split('\0')
1935 kwargs['params'] = params.split('\0')
1936
1936
1937 raise error.BundleUnknownFeatureError(**kwargs)
1937 raise error.BundleUnknownFeatureError(**kwargs)
1938
1938
1939 @parthandler('error:pushraced', ('message',))
1939 @parthandler('error:pushraced', ('message',))
1940 def handleerrorpushraced(op, inpart):
1940 def handleerrorpushraced(op, inpart):
1941 """Used to transmit push race error over the wire"""
1941 """Used to transmit push race error over the wire"""
1942 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1942 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1943
1943
1944 @parthandler('listkeys', ('namespace',))
1944 @parthandler('listkeys', ('namespace',))
1945 def handlelistkeys(op, inpart):
1945 def handlelistkeys(op, inpart):
1946 """retrieve pushkey namespace content stored in a bundle2"""
1946 """retrieve pushkey namespace content stored in a bundle2"""
1947 namespace = inpart.params['namespace']
1947 namespace = inpart.params['namespace']
1948 r = pushkey.decodekeys(inpart.read())
1948 r = pushkey.decodekeys(inpart.read())
1949 op.records.add('listkeys', (namespace, r))
1949 op.records.add('listkeys', (namespace, r))
1950
1950
1951 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1951 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1952 def handlepushkey(op, inpart):
1952 def handlepushkey(op, inpart):
1953 """process a pushkey request"""
1953 """process a pushkey request"""
1954 dec = pushkey.decode
1954 dec = pushkey.decode
1955 namespace = dec(inpart.params['namespace'])
1955 namespace = dec(inpart.params['namespace'])
1956 key = dec(inpart.params['key'])
1956 key = dec(inpart.params['key'])
1957 old = dec(inpart.params['old'])
1957 old = dec(inpart.params['old'])
1958 new = dec(inpart.params['new'])
1958 new = dec(inpart.params['new'])
1959 # Grab the transaction to ensure that we have the lock before performing the
1959 # Grab the transaction to ensure that we have the lock before performing the
1960 # pushkey.
1960 # pushkey.
1961 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1961 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1962 op.gettransaction()
1962 op.gettransaction()
1963 ret = op.repo.pushkey(namespace, key, old, new)
1963 ret = op.repo.pushkey(namespace, key, old, new)
1964 record = {'namespace': namespace,
1964 record = {'namespace': namespace,
1965 'key': key,
1965 'key': key,
1966 'old': old,
1966 'old': old,
1967 'new': new}
1967 'new': new}
1968 op.records.add('pushkey', record)
1968 op.records.add('pushkey', record)
1969 if op.reply is not None:
1969 if op.reply is not None:
1970 rpart = op.reply.newpart('reply:pushkey')
1970 rpart = op.reply.newpart('reply:pushkey')
1971 rpart.addparam(
1971 rpart.addparam(
1972 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1972 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1973 rpart.addparam('return', '%i' % ret, mandatory=False)
1973 rpart.addparam('return', '%i' % ret, mandatory=False)
1974 if inpart.mandatory and not ret:
1974 if inpart.mandatory and not ret:
1975 kwargs = {}
1975 kwargs = {}
1976 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1976 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1977 if key in inpart.params:
1977 if key in inpart.params:
1978 kwargs[key] = inpart.params[key]
1978 kwargs[key] = inpart.params[key]
1979 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1979 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1980
1980
1981 @parthandler('bookmarks')
1981 @parthandler('bookmarks')
1982 def handlebookmark(op, inpart):
1982 def handlebookmark(op, inpart):
1983 """transmit bookmark information
1983 """transmit bookmark information
1984
1984
1985 The part contains binary encoded bookmark information. The bookmark
1985 The part contains binary encoded bookmark information.
1986 information is applied as is to the unbundling repository. Make sure a
1986
1987 'check:bookmarks' part is issued earlier to check for race condition in
1987 The exact behavior of this part can be controlled by the 'bookmarks' mode
1988 such update.
1988 on the bundle operation.
1989
1989
1990 This behavior is suitable for pushing. Semantic adjustment will be needed
1990 When mode is 'apply' (the default) the bookmark information is applied as
1991 for pull.
1991 is to the unbundling repository. Make sure a 'check:bookmarks' part is
1992 issued earlier to check for push races in such update. This behavior is
1993 suitable for pushing.
1994
1995 When mode is 'records', the information is recorded into the 'bookmarks'
1996 records of the bundle operation. This behavior is suitable for pulling.
1992 """
1997 """
1993 changes = bookmarks.binarydecode(inpart)
1998 changes = bookmarks.binarydecode(inpart)
1994
1999
1995 tr = op.gettransaction()
2000 pushkeycompat = op.repo.ui.configbool('server', 'bookmarks-pushkey-compat')
1996 bookstore = op.repo._bookmarks
2001 bookmarksmode = op.modes.get('bookmarks', 'apply')
1997
2002
1998 pushkeycompat = op.repo.ui.configbool('server', 'bookmarks-pushkey-compat')
2003 if bookmarksmode == 'apply':
1999 if pushkeycompat:
2004 tr = op.gettransaction()
2000 allhooks = []
2005 bookstore = op.repo._bookmarks
2006 if pushkeycompat:
2007 allhooks = []
2008 for book, node in changes:
2009 hookargs = tr.hookargs.copy()
2010 hookargs['pushkeycompat'] = '1'
2011 hookargs['namespace'] = 'bookmark'
2012 hookargs['key'] = book
2013 hookargs['old'] = nodemod.hex(bookstore.get(book, ''))
2014 hookargs['new'] = nodemod.hex(node if node is not None else '')
2015 allhooks.append(hookargs)
2016
2017 for hookargs in allhooks:
2018 op.repo.hook('prepushkey', throw=True, **hookargs)
2019
2020 bookstore.applychanges(op.repo, op.gettransaction(), changes)
2021
2022 if pushkeycompat:
2023 def runhook():
2024 for hookargs in allhooks:
2025 op.repo.hook('prepushkey', **hookargs)
2026 op.repo._afterlock(runhook)
2027
2028 elif bookmarksmode == 'records':
2001 for book, node in changes:
2029 for book, node in changes:
2002 hookargs = tr.hookargs.copy()
2030 record = {'bookmark': book, 'node': node}
2003 hookargs['pushkeycompat'] = '1'
2031 op.records.add('bookmarks', record)
2004 hookargs['namespace'] = 'bookmark'
2032 else:
2005 hookargs['key'] = book
2033 raise error.ProgrammingError('unkown bookmark mode: %s' % bookmarksmode)
2006 hookargs['old'] = nodemod.hex(bookstore.get(book, ''))
2007 hookargs['new'] = nodemod.hex(node if node is not None else '')
2008 allhooks.append(hookargs)
2009 for hookargs in allhooks:
2010 op.repo.hook('prepushkey', throw=True, **hookargs)
2011
2012 bookstore.applychanges(op.repo, tr, changes)
2013
2014 if pushkeycompat:
2015 def runhook():
2016 for hookargs in allhooks:
2017 op.repo.hook('prepushkey', **hookargs)
2018 op.repo._afterlock(runhook)
2019
2034
2020 @parthandler('phase-heads')
2035 @parthandler('phase-heads')
2021 def handlephases(op, inpart):
2036 def handlephases(op, inpart):
2022 """apply phases from bundle part to repo"""
2037 """apply phases from bundle part to repo"""
2023 headsbyphase = phases.binarydecode(inpart)
2038 headsbyphase = phases.binarydecode(inpart)
2024 phases.updatephases(op.repo.unfiltered(), op.gettransaction, headsbyphase)
2039 phases.updatephases(op.repo.unfiltered(), op.gettransaction, headsbyphase)
2025
2040
2026 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
2041 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
2027 def handlepushkeyreply(op, inpart):
2042 def handlepushkeyreply(op, inpart):
2028 """retrieve the result of a pushkey request"""
2043 """retrieve the result of a pushkey request"""
2029 ret = int(inpart.params['return'])
2044 ret = int(inpart.params['return'])
2030 partid = int(inpart.params['in-reply-to'])
2045 partid = int(inpart.params['in-reply-to'])
2031 op.records.add('pushkey', {'return': ret}, partid)
2046 op.records.add('pushkey', {'return': ret}, partid)
2032
2047
2033 @parthandler('obsmarkers')
2048 @parthandler('obsmarkers')
2034 def handleobsmarker(op, inpart):
2049 def handleobsmarker(op, inpart):
2035 """add a stream of obsmarkers to the repo"""
2050 """add a stream of obsmarkers to the repo"""
2036 tr = op.gettransaction()
2051 tr = op.gettransaction()
2037 markerdata = inpart.read()
2052 markerdata = inpart.read()
2038 if op.ui.config('experimental', 'obsmarkers-exchange-debug'):
2053 if op.ui.config('experimental', 'obsmarkers-exchange-debug'):
2039 op.ui.write(('obsmarker-exchange: %i bytes received\n')
2054 op.ui.write(('obsmarker-exchange: %i bytes received\n')
2040 % len(markerdata))
2055 % len(markerdata))
2041 # The mergemarkers call will crash if marker creation is not enabled.
2056 # The mergemarkers call will crash if marker creation is not enabled.
2042 # we want to avoid this if the part is advisory.
2057 # we want to avoid this if the part is advisory.
2043 if not inpart.mandatory and op.repo.obsstore.readonly:
2058 if not inpart.mandatory and op.repo.obsstore.readonly:
2044 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
2059 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
2045 return
2060 return
2046 new = op.repo.obsstore.mergemarkers(tr, markerdata)
2061 new = op.repo.obsstore.mergemarkers(tr, markerdata)
2047 op.repo.invalidatevolatilesets()
2062 op.repo.invalidatevolatilesets()
2048 if new:
2063 if new:
2049 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
2064 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
2050 op.records.add('obsmarkers', {'new': new})
2065 op.records.add('obsmarkers', {'new': new})
2051 if op.reply is not None:
2066 if op.reply is not None:
2052 rpart = op.reply.newpart('reply:obsmarkers')
2067 rpart = op.reply.newpart('reply:obsmarkers')
2053 rpart.addparam(
2068 rpart.addparam(
2054 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
2069 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
2055 rpart.addparam('new', '%i' % new, mandatory=False)
2070 rpart.addparam('new', '%i' % new, mandatory=False)
2056
2071
2057
2072
2058 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
2073 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
2059 def handleobsmarkerreply(op, inpart):
2074 def handleobsmarkerreply(op, inpart):
2060 """retrieve the result of a pushkey request"""
2075 """retrieve the result of a pushkey request"""
2061 ret = int(inpart.params['new'])
2076 ret = int(inpart.params['new'])
2062 partid = int(inpart.params['in-reply-to'])
2077 partid = int(inpart.params['in-reply-to'])
2063 op.records.add('obsmarkers', {'new': ret}, partid)
2078 op.records.add('obsmarkers', {'new': ret}, partid)
2064
2079
2065 @parthandler('hgtagsfnodes')
2080 @parthandler('hgtagsfnodes')
2066 def handlehgtagsfnodes(op, inpart):
2081 def handlehgtagsfnodes(op, inpart):
2067 """Applies .hgtags fnodes cache entries to the local repo.
2082 """Applies .hgtags fnodes cache entries to the local repo.
2068
2083
2069 Payload is pairs of 20 byte changeset nodes and filenodes.
2084 Payload is pairs of 20 byte changeset nodes and filenodes.
2070 """
2085 """
2071 # Grab the transaction so we ensure that we have the lock at this point.
2086 # Grab the transaction so we ensure that we have the lock at this point.
2072 if op.ui.configbool('experimental', 'bundle2lazylocking'):
2087 if op.ui.configbool('experimental', 'bundle2lazylocking'):
2073 op.gettransaction()
2088 op.gettransaction()
2074 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
2089 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
2075
2090
2076 count = 0
2091 count = 0
2077 while True:
2092 while True:
2078 node = inpart.read(20)
2093 node = inpart.read(20)
2079 fnode = inpart.read(20)
2094 fnode = inpart.read(20)
2080 if len(node) < 20 or len(fnode) < 20:
2095 if len(node) < 20 or len(fnode) < 20:
2081 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
2096 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
2082 break
2097 break
2083 cache.setfnode(node, fnode)
2098 cache.setfnode(node, fnode)
2084 count += 1
2099 count += 1
2085
2100
2086 cache.write()
2101 cache.write()
2087 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
2102 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
2088
2103
2089 @parthandler('pushvars')
2104 @parthandler('pushvars')
2090 def bundle2getvars(op, part):
2105 def bundle2getvars(op, part):
2091 '''unbundle a bundle2 containing shellvars on the server'''
2106 '''unbundle a bundle2 containing shellvars on the server'''
2092 # An option to disable unbundling on server-side for security reasons
2107 # An option to disable unbundling on server-side for security reasons
2093 if op.ui.configbool('push', 'pushvars.server'):
2108 if op.ui.configbool('push', 'pushvars.server'):
2094 hookargs = {}
2109 hookargs = {}
2095 for key, value in part.advisoryparams:
2110 for key, value in part.advisoryparams:
2096 key = key.upper()
2111 key = key.upper()
2097 # We want pushed variables to have USERVAR_ prepended so we know
2112 # We want pushed variables to have USERVAR_ prepended so we know
2098 # they came from the --pushvar flag.
2113 # they came from the --pushvar flag.
2099 key = "USERVAR_" + key
2114 key = "USERVAR_" + key
2100 hookargs[key] = value
2115 hookargs[key] = value
2101 op.addhookargs(hookargs)
2116 op.addhookargs(hookargs)
General Comments 0
You need to be logged in to leave comments. Login now