##// END OF EJS Templates
revbranchcache: add a bundle2 handler for a rbc part...
Boris Feld -
r36981:9988fc10 default
parent child Browse files
Show More
@@ -1,2160 +1,2195 b''
1 # bundle2.py - generic container format to transmit arbitrary data.
1 # bundle2.py - generic container format to transmit arbitrary data.
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7 """Handling of the new bundle2 format
7 """Handling of the new bundle2 format
8
8
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 payloads in an application agnostic way. It consist in a sequence of "parts"
10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 that will be handed to and processed by the application layer.
11 that will be handed to and processed by the application layer.
12
12
13
13
14 General format architecture
14 General format architecture
15 ===========================
15 ===========================
16
16
17 The format is architectured as follow
17 The format is architectured as follow
18
18
19 - magic string
19 - magic string
20 - stream level parameters
20 - stream level parameters
21 - payload parts (any number)
21 - payload parts (any number)
22 - end of stream marker.
22 - end of stream marker.
23
23
24 the Binary format
24 the Binary format
25 ============================
25 ============================
26
26
27 All numbers are unsigned and big-endian.
27 All numbers are unsigned and big-endian.
28
28
29 stream level parameters
29 stream level parameters
30 ------------------------
30 ------------------------
31
31
32 Binary format is as follow
32 Binary format is as follow
33
33
34 :params size: int32
34 :params size: int32
35
35
36 The total number of Bytes used by the parameters
36 The total number of Bytes used by the parameters
37
37
38 :params value: arbitrary number of Bytes
38 :params value: arbitrary number of Bytes
39
39
40 A blob of `params size` containing the serialized version of all stream level
40 A blob of `params size` containing the serialized version of all stream level
41 parameters.
41 parameters.
42
42
43 The blob contains a space separated list of parameters. Parameters with value
43 The blob contains a space separated list of parameters. Parameters with value
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45
45
46 Empty name are obviously forbidden.
46 Empty name are obviously forbidden.
47
47
48 Name MUST start with a letter. If this first letter is lower case, the
48 Name MUST start with a letter. If this first letter is lower case, the
49 parameter is advisory and can be safely ignored. However when the first
49 parameter is advisory and can be safely ignored. However when the first
50 letter is capital, the parameter is mandatory and the bundling process MUST
50 letter is capital, the parameter is mandatory and the bundling process MUST
51 stop if he is not able to proceed it.
51 stop if he is not able to proceed it.
52
52
53 Stream parameters use a simple textual format for two main reasons:
53 Stream parameters use a simple textual format for two main reasons:
54
54
55 - Stream level parameters should remain simple and we want to discourage any
55 - Stream level parameters should remain simple and we want to discourage any
56 crazy usage.
56 crazy usage.
57 - Textual data allow easy human inspection of a bundle2 header in case of
57 - Textual data allow easy human inspection of a bundle2 header in case of
58 troubles.
58 troubles.
59
59
60 Any Applicative level options MUST go into a bundle2 part instead.
60 Any Applicative level options MUST go into a bundle2 part instead.
61
61
62 Payload part
62 Payload part
63 ------------------------
63 ------------------------
64
64
65 Binary format is as follow
65 Binary format is as follow
66
66
67 :header size: int32
67 :header size: int32
68
68
69 The total number of Bytes used by the part header. When the header is empty
69 The total number of Bytes used by the part header. When the header is empty
70 (size = 0) this is interpreted as the end of stream marker.
70 (size = 0) this is interpreted as the end of stream marker.
71
71
72 :header:
72 :header:
73
73
74 The header defines how to interpret the part. It contains two piece of
74 The header defines how to interpret the part. It contains two piece of
75 data: the part type, and the part parameters.
75 data: the part type, and the part parameters.
76
76
77 The part type is used to route an application level handler, that can
77 The part type is used to route an application level handler, that can
78 interpret payload.
78 interpret payload.
79
79
80 Part parameters are passed to the application level handler. They are
80 Part parameters are passed to the application level handler. They are
81 meant to convey information that will help the application level object to
81 meant to convey information that will help the application level object to
82 interpret the part payload.
82 interpret the part payload.
83
83
84 The binary format of the header is has follow
84 The binary format of the header is has follow
85
85
86 :typesize: (one byte)
86 :typesize: (one byte)
87
87
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89
89
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 to this part.
91 to this part.
92
92
93 :parameters:
93 :parameters:
94
94
95 Part's parameter may have arbitrary content, the binary structure is::
95 Part's parameter may have arbitrary content, the binary structure is::
96
96
97 <mandatory-count><advisory-count><param-sizes><param-data>
97 <mandatory-count><advisory-count><param-sizes><param-data>
98
98
99 :mandatory-count: 1 byte, number of mandatory parameters
99 :mandatory-count: 1 byte, number of mandatory parameters
100
100
101 :advisory-count: 1 byte, number of advisory parameters
101 :advisory-count: 1 byte, number of advisory parameters
102
102
103 :param-sizes:
103 :param-sizes:
104
104
105 N couple of bytes, where N is the total number of parameters. Each
105 N couple of bytes, where N is the total number of parameters. Each
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107
107
108 :param-data:
108 :param-data:
109
109
110 A blob of bytes from which each parameter key and value can be
110 A blob of bytes from which each parameter key and value can be
111 retrieved using the list of size couples stored in the previous
111 retrieved using the list of size couples stored in the previous
112 field.
112 field.
113
113
114 Mandatory parameters comes first, then the advisory ones.
114 Mandatory parameters comes first, then the advisory ones.
115
115
116 Each parameter's key MUST be unique within the part.
116 Each parameter's key MUST be unique within the part.
117
117
118 :payload:
118 :payload:
119
119
120 payload is a series of `<chunksize><chunkdata>`.
120 payload is a series of `<chunksize><chunkdata>`.
121
121
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124
124
125 The current implementation always produces either zero or one chunk.
125 The current implementation always produces either zero or one chunk.
126 This is an implementation limitation that will ultimately be lifted.
126 This is an implementation limitation that will ultimately be lifted.
127
127
128 `chunksize` can be negative to trigger special case processing. No such
128 `chunksize` can be negative to trigger special case processing. No such
129 processing is in place yet.
129 processing is in place yet.
130
130
131 Bundle processing
131 Bundle processing
132 ============================
132 ============================
133
133
134 Each part is processed in order using a "part handler". Handler are registered
134 Each part is processed in order using a "part handler". Handler are registered
135 for a certain part type.
135 for a certain part type.
136
136
137 The matching of a part to its handler is case insensitive. The case of the
137 The matching of a part to its handler is case insensitive. The case of the
138 part type is used to know if a part is mandatory or advisory. If the Part type
138 part type is used to know if a part is mandatory or advisory. If the Part type
139 contains any uppercase char it is considered mandatory. When no handler is
139 contains any uppercase char it is considered mandatory. When no handler is
140 known for a Mandatory part, the process is aborted and an exception is raised.
140 known for a Mandatory part, the process is aborted and an exception is raised.
141 If the part is advisory and no handler is known, the part is ignored. When the
141 If the part is advisory and no handler is known, the part is ignored. When the
142 process is aborted, the full bundle is still read from the stream to keep the
142 process is aborted, the full bundle is still read from the stream to keep the
143 channel usable. But none of the part read from an abort are processed. In the
143 channel usable. But none of the part read from an abort are processed. In the
144 future, dropping the stream may become an option for channel we do not care to
144 future, dropping the stream may become an option for channel we do not care to
145 preserve.
145 preserve.
146 """
146 """
147
147
148 from __future__ import absolute_import, division
148 from __future__ import absolute_import, division
149
149
150 import errno
150 import errno
151 import os
151 import os
152 import re
152 import re
153 import string
153 import string
154 import struct
154 import struct
155 import sys
155 import sys
156
156
157 from .i18n import _
157 from .i18n import _
158 from . import (
158 from . import (
159 bookmarks,
159 bookmarks,
160 changegroup,
160 changegroup,
161 encoding,
161 error,
162 error,
162 node as nodemod,
163 node as nodemod,
163 obsolete,
164 obsolete,
164 phases,
165 phases,
165 pushkey,
166 pushkey,
166 pycompat,
167 pycompat,
167 streamclone,
168 streamclone,
168 tags,
169 tags,
169 url,
170 url,
170 util,
171 util,
171 )
172 )
172
173
173 urlerr = util.urlerr
174 urlerr = util.urlerr
174 urlreq = util.urlreq
175 urlreq = util.urlreq
175
176
176 _pack = struct.pack
177 _pack = struct.pack
177 _unpack = struct.unpack
178 _unpack = struct.unpack
178
179
179 _fstreamparamsize = '>i'
180 _fstreamparamsize = '>i'
180 _fpartheadersize = '>i'
181 _fpartheadersize = '>i'
181 _fparttypesize = '>B'
182 _fparttypesize = '>B'
182 _fpartid = '>I'
183 _fpartid = '>I'
183 _fpayloadsize = '>i'
184 _fpayloadsize = '>i'
184 _fpartparamcount = '>BB'
185 _fpartparamcount = '>BB'
185
186
186 preferedchunksize = 32768
187 preferedchunksize = 32768
187
188
188 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
189 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
189
190
190 def outdebug(ui, message):
191 def outdebug(ui, message):
191 """debug regarding output stream (bundling)"""
192 """debug regarding output stream (bundling)"""
192 if ui.configbool('devel', 'bundle2.debug'):
193 if ui.configbool('devel', 'bundle2.debug'):
193 ui.debug('bundle2-output: %s\n' % message)
194 ui.debug('bundle2-output: %s\n' % message)
194
195
195 def indebug(ui, message):
196 def indebug(ui, message):
196 """debug on input stream (unbundling)"""
197 """debug on input stream (unbundling)"""
197 if ui.configbool('devel', 'bundle2.debug'):
198 if ui.configbool('devel', 'bundle2.debug'):
198 ui.debug('bundle2-input: %s\n' % message)
199 ui.debug('bundle2-input: %s\n' % message)
199
200
200 def validateparttype(parttype):
201 def validateparttype(parttype):
201 """raise ValueError if a parttype contains invalid character"""
202 """raise ValueError if a parttype contains invalid character"""
202 if _parttypeforbidden.search(parttype):
203 if _parttypeforbidden.search(parttype):
203 raise ValueError(parttype)
204 raise ValueError(parttype)
204
205
205 def _makefpartparamsizes(nbparams):
206 def _makefpartparamsizes(nbparams):
206 """return a struct format to read part parameter sizes
207 """return a struct format to read part parameter sizes
207
208
208 The number parameters is variable so we need to build that format
209 The number parameters is variable so we need to build that format
209 dynamically.
210 dynamically.
210 """
211 """
211 return '>'+('BB'*nbparams)
212 return '>'+('BB'*nbparams)
212
213
213 parthandlermapping = {}
214 parthandlermapping = {}
214
215
215 def parthandler(parttype, params=()):
216 def parthandler(parttype, params=()):
216 """decorator that register a function as a bundle2 part handler
217 """decorator that register a function as a bundle2 part handler
217
218
218 eg::
219 eg::
219
220
220 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
221 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
221 def myparttypehandler(...):
222 def myparttypehandler(...):
222 '''process a part of type "my part".'''
223 '''process a part of type "my part".'''
223 ...
224 ...
224 """
225 """
225 validateparttype(parttype)
226 validateparttype(parttype)
226 def _decorator(func):
227 def _decorator(func):
227 lparttype = parttype.lower() # enforce lower case matching.
228 lparttype = parttype.lower() # enforce lower case matching.
228 assert lparttype not in parthandlermapping
229 assert lparttype not in parthandlermapping
229 parthandlermapping[lparttype] = func
230 parthandlermapping[lparttype] = func
230 func.params = frozenset(params)
231 func.params = frozenset(params)
231 return func
232 return func
232 return _decorator
233 return _decorator
233
234
234 class unbundlerecords(object):
235 class unbundlerecords(object):
235 """keep record of what happens during and unbundle
236 """keep record of what happens during and unbundle
236
237
237 New records are added using `records.add('cat', obj)`. Where 'cat' is a
238 New records are added using `records.add('cat', obj)`. Where 'cat' is a
238 category of record and obj is an arbitrary object.
239 category of record and obj is an arbitrary object.
239
240
240 `records['cat']` will return all entries of this category 'cat'.
241 `records['cat']` will return all entries of this category 'cat'.
241
242
242 Iterating on the object itself will yield `('category', obj)` tuples
243 Iterating on the object itself will yield `('category', obj)` tuples
243 for all entries.
244 for all entries.
244
245
245 All iterations happens in chronological order.
246 All iterations happens in chronological order.
246 """
247 """
247
248
248 def __init__(self):
249 def __init__(self):
249 self._categories = {}
250 self._categories = {}
250 self._sequences = []
251 self._sequences = []
251 self._replies = {}
252 self._replies = {}
252
253
253 def add(self, category, entry, inreplyto=None):
254 def add(self, category, entry, inreplyto=None):
254 """add a new record of a given category.
255 """add a new record of a given category.
255
256
256 The entry can then be retrieved in the list returned by
257 The entry can then be retrieved in the list returned by
257 self['category']."""
258 self['category']."""
258 self._categories.setdefault(category, []).append(entry)
259 self._categories.setdefault(category, []).append(entry)
259 self._sequences.append((category, entry))
260 self._sequences.append((category, entry))
260 if inreplyto is not None:
261 if inreplyto is not None:
261 self.getreplies(inreplyto).add(category, entry)
262 self.getreplies(inreplyto).add(category, entry)
262
263
263 def getreplies(self, partid):
264 def getreplies(self, partid):
264 """get the records that are replies to a specific part"""
265 """get the records that are replies to a specific part"""
265 return self._replies.setdefault(partid, unbundlerecords())
266 return self._replies.setdefault(partid, unbundlerecords())
266
267
267 def __getitem__(self, cat):
268 def __getitem__(self, cat):
268 return tuple(self._categories.get(cat, ()))
269 return tuple(self._categories.get(cat, ()))
269
270
270 def __iter__(self):
271 def __iter__(self):
271 return iter(self._sequences)
272 return iter(self._sequences)
272
273
273 def __len__(self):
274 def __len__(self):
274 return len(self._sequences)
275 return len(self._sequences)
275
276
276 def __nonzero__(self):
277 def __nonzero__(self):
277 return bool(self._sequences)
278 return bool(self._sequences)
278
279
279 __bool__ = __nonzero__
280 __bool__ = __nonzero__
280
281
281 class bundleoperation(object):
282 class bundleoperation(object):
282 """an object that represents a single bundling process
283 """an object that represents a single bundling process
283
284
284 Its purpose is to carry unbundle-related objects and states.
285 Its purpose is to carry unbundle-related objects and states.
285
286
286 A new object should be created at the beginning of each bundle processing.
287 A new object should be created at the beginning of each bundle processing.
287 The object is to be returned by the processing function.
288 The object is to be returned by the processing function.
288
289
289 The object has very little content now it will ultimately contain:
290 The object has very little content now it will ultimately contain:
290 * an access to the repo the bundle is applied to,
291 * an access to the repo the bundle is applied to,
291 * a ui object,
292 * a ui object,
292 * a way to retrieve a transaction to add changes to the repo,
293 * a way to retrieve a transaction to add changes to the repo,
293 * a way to record the result of processing each part,
294 * a way to record the result of processing each part,
294 * a way to construct a bundle response when applicable.
295 * a way to construct a bundle response when applicable.
295 """
296 """
296
297
297 def __init__(self, repo, transactiongetter, captureoutput=True):
298 def __init__(self, repo, transactiongetter, captureoutput=True):
298 self.repo = repo
299 self.repo = repo
299 self.ui = repo.ui
300 self.ui = repo.ui
300 self.records = unbundlerecords()
301 self.records = unbundlerecords()
301 self.reply = None
302 self.reply = None
302 self.captureoutput = captureoutput
303 self.captureoutput = captureoutput
303 self.hookargs = {}
304 self.hookargs = {}
304 self._gettransaction = transactiongetter
305 self._gettransaction = transactiongetter
305 # carries value that can modify part behavior
306 # carries value that can modify part behavior
306 self.modes = {}
307 self.modes = {}
307
308
308 def gettransaction(self):
309 def gettransaction(self):
309 transaction = self._gettransaction()
310 transaction = self._gettransaction()
310
311
311 if self.hookargs:
312 if self.hookargs:
312 # the ones added to the transaction supercede those added
313 # the ones added to the transaction supercede those added
313 # to the operation.
314 # to the operation.
314 self.hookargs.update(transaction.hookargs)
315 self.hookargs.update(transaction.hookargs)
315 transaction.hookargs = self.hookargs
316 transaction.hookargs = self.hookargs
316
317
317 # mark the hookargs as flushed. further attempts to add to
318 # mark the hookargs as flushed. further attempts to add to
318 # hookargs will result in an abort.
319 # hookargs will result in an abort.
319 self.hookargs = None
320 self.hookargs = None
320
321
321 return transaction
322 return transaction
322
323
323 def addhookargs(self, hookargs):
324 def addhookargs(self, hookargs):
324 if self.hookargs is None:
325 if self.hookargs is None:
325 raise error.ProgrammingError('attempted to add hookargs to '
326 raise error.ProgrammingError('attempted to add hookargs to '
326 'operation after transaction started')
327 'operation after transaction started')
327 self.hookargs.update(hookargs)
328 self.hookargs.update(hookargs)
328
329
329 class TransactionUnavailable(RuntimeError):
330 class TransactionUnavailable(RuntimeError):
330 pass
331 pass
331
332
332 def _notransaction():
333 def _notransaction():
333 """default method to get a transaction while processing a bundle
334 """default method to get a transaction while processing a bundle
334
335
335 Raise an exception to highlight the fact that no transaction was expected
336 Raise an exception to highlight the fact that no transaction was expected
336 to be created"""
337 to be created"""
337 raise TransactionUnavailable()
338 raise TransactionUnavailable()
338
339
339 def applybundle(repo, unbundler, tr, source=None, url=None, **kwargs):
340 def applybundle(repo, unbundler, tr, source=None, url=None, **kwargs):
340 # transform me into unbundler.apply() as soon as the freeze is lifted
341 # transform me into unbundler.apply() as soon as the freeze is lifted
341 if isinstance(unbundler, unbundle20):
342 if isinstance(unbundler, unbundle20):
342 tr.hookargs['bundle2'] = '1'
343 tr.hookargs['bundle2'] = '1'
343 if source is not None and 'source' not in tr.hookargs:
344 if source is not None and 'source' not in tr.hookargs:
344 tr.hookargs['source'] = source
345 tr.hookargs['source'] = source
345 if url is not None and 'url' not in tr.hookargs:
346 if url is not None and 'url' not in tr.hookargs:
346 tr.hookargs['url'] = url
347 tr.hookargs['url'] = url
347 return processbundle(repo, unbundler, lambda: tr)
348 return processbundle(repo, unbundler, lambda: tr)
348 else:
349 else:
349 # the transactiongetter won't be used, but we might as well set it
350 # the transactiongetter won't be used, but we might as well set it
350 op = bundleoperation(repo, lambda: tr)
351 op = bundleoperation(repo, lambda: tr)
351 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
352 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
352 return op
353 return op
353
354
354 class partiterator(object):
355 class partiterator(object):
355 def __init__(self, repo, op, unbundler):
356 def __init__(self, repo, op, unbundler):
356 self.repo = repo
357 self.repo = repo
357 self.op = op
358 self.op = op
358 self.unbundler = unbundler
359 self.unbundler = unbundler
359 self.iterator = None
360 self.iterator = None
360 self.count = 0
361 self.count = 0
361 self.current = None
362 self.current = None
362
363
363 def __enter__(self):
364 def __enter__(self):
364 def func():
365 def func():
365 itr = enumerate(self.unbundler.iterparts())
366 itr = enumerate(self.unbundler.iterparts())
366 for count, p in itr:
367 for count, p in itr:
367 self.count = count
368 self.count = count
368 self.current = p
369 self.current = p
369 yield p
370 yield p
370 p.consume()
371 p.consume()
371 self.current = None
372 self.current = None
372 self.iterator = func()
373 self.iterator = func()
373 return self.iterator
374 return self.iterator
374
375
375 def __exit__(self, type, exc, tb):
376 def __exit__(self, type, exc, tb):
376 if not self.iterator:
377 if not self.iterator:
377 return
378 return
378
379
379 # Only gracefully abort in a normal exception situation. User aborts
380 # Only gracefully abort in a normal exception situation. User aborts
380 # like Ctrl+C throw a KeyboardInterrupt which is not a base Exception,
381 # like Ctrl+C throw a KeyboardInterrupt which is not a base Exception,
381 # and should not gracefully cleanup.
382 # and should not gracefully cleanup.
382 if isinstance(exc, Exception):
383 if isinstance(exc, Exception):
383 # Any exceptions seeking to the end of the bundle at this point are
384 # Any exceptions seeking to the end of the bundle at this point are
384 # almost certainly related to the underlying stream being bad.
385 # almost certainly related to the underlying stream being bad.
385 # And, chances are that the exception we're handling is related to
386 # And, chances are that the exception we're handling is related to
386 # getting in that bad state. So, we swallow the seeking error and
387 # getting in that bad state. So, we swallow the seeking error and
387 # re-raise the original error.
388 # re-raise the original error.
388 seekerror = False
389 seekerror = False
389 try:
390 try:
390 if self.current:
391 if self.current:
391 # consume the part content to not corrupt the stream.
392 # consume the part content to not corrupt the stream.
392 self.current.consume()
393 self.current.consume()
393
394
394 for part in self.iterator:
395 for part in self.iterator:
395 # consume the bundle content
396 # consume the bundle content
396 part.consume()
397 part.consume()
397 except Exception:
398 except Exception:
398 seekerror = True
399 seekerror = True
399
400
400 # Small hack to let caller code distinguish exceptions from bundle2
401 # Small hack to let caller code distinguish exceptions from bundle2
401 # processing from processing the old format. This is mostly needed
402 # processing from processing the old format. This is mostly needed
402 # to handle different return codes to unbundle according to the type
403 # to handle different return codes to unbundle according to the type
403 # of bundle. We should probably clean up or drop this return code
404 # of bundle. We should probably clean up or drop this return code
404 # craziness in a future version.
405 # craziness in a future version.
405 exc.duringunbundle2 = True
406 exc.duringunbundle2 = True
406 salvaged = []
407 salvaged = []
407 replycaps = None
408 replycaps = None
408 if self.op.reply is not None:
409 if self.op.reply is not None:
409 salvaged = self.op.reply.salvageoutput()
410 salvaged = self.op.reply.salvageoutput()
410 replycaps = self.op.reply.capabilities
411 replycaps = self.op.reply.capabilities
411 exc._replycaps = replycaps
412 exc._replycaps = replycaps
412 exc._bundle2salvagedoutput = salvaged
413 exc._bundle2salvagedoutput = salvaged
413
414
414 # Re-raising from a variable loses the original stack. So only use
415 # Re-raising from a variable loses the original stack. So only use
415 # that form if we need to.
416 # that form if we need to.
416 if seekerror:
417 if seekerror:
417 raise exc
418 raise exc
418
419
419 self.repo.ui.debug('bundle2-input-bundle: %i parts total\n' %
420 self.repo.ui.debug('bundle2-input-bundle: %i parts total\n' %
420 self.count)
421 self.count)
421
422
422 def processbundle(repo, unbundler, transactiongetter=None, op=None):
423 def processbundle(repo, unbundler, transactiongetter=None, op=None):
423 """This function process a bundle, apply effect to/from a repo
424 """This function process a bundle, apply effect to/from a repo
424
425
425 It iterates over each part then searches for and uses the proper handling
426 It iterates over each part then searches for and uses the proper handling
426 code to process the part. Parts are processed in order.
427 code to process the part. Parts are processed in order.
427
428
428 Unknown Mandatory part will abort the process.
429 Unknown Mandatory part will abort the process.
429
430
430 It is temporarily possible to provide a prebuilt bundleoperation to the
431 It is temporarily possible to provide a prebuilt bundleoperation to the
431 function. This is used to ensure output is properly propagated in case of
432 function. This is used to ensure output is properly propagated in case of
432 an error during the unbundling. This output capturing part will likely be
433 an error during the unbundling. This output capturing part will likely be
433 reworked and this ability will probably go away in the process.
434 reworked and this ability will probably go away in the process.
434 """
435 """
435 if op is None:
436 if op is None:
436 if transactiongetter is None:
437 if transactiongetter is None:
437 transactiongetter = _notransaction
438 transactiongetter = _notransaction
438 op = bundleoperation(repo, transactiongetter)
439 op = bundleoperation(repo, transactiongetter)
439 # todo:
440 # todo:
440 # - replace this is a init function soon.
441 # - replace this is a init function soon.
441 # - exception catching
442 # - exception catching
442 unbundler.params
443 unbundler.params
443 if repo.ui.debugflag:
444 if repo.ui.debugflag:
444 msg = ['bundle2-input-bundle:']
445 msg = ['bundle2-input-bundle:']
445 if unbundler.params:
446 if unbundler.params:
446 msg.append(' %i params' % len(unbundler.params))
447 msg.append(' %i params' % len(unbundler.params))
447 if op._gettransaction is None or op._gettransaction is _notransaction:
448 if op._gettransaction is None or op._gettransaction is _notransaction:
448 msg.append(' no-transaction')
449 msg.append(' no-transaction')
449 else:
450 else:
450 msg.append(' with-transaction')
451 msg.append(' with-transaction')
451 msg.append('\n')
452 msg.append('\n')
452 repo.ui.debug(''.join(msg))
453 repo.ui.debug(''.join(msg))
453
454
454 processparts(repo, op, unbundler)
455 processparts(repo, op, unbundler)
455
456
456 return op
457 return op
457
458
458 def processparts(repo, op, unbundler):
459 def processparts(repo, op, unbundler):
459 with partiterator(repo, op, unbundler) as parts:
460 with partiterator(repo, op, unbundler) as parts:
460 for part in parts:
461 for part in parts:
461 _processpart(op, part)
462 _processpart(op, part)
462
463
463 def _processchangegroup(op, cg, tr, source, url, **kwargs):
464 def _processchangegroup(op, cg, tr, source, url, **kwargs):
464 ret = cg.apply(op.repo, tr, source, url, **kwargs)
465 ret = cg.apply(op.repo, tr, source, url, **kwargs)
465 op.records.add('changegroup', {
466 op.records.add('changegroup', {
466 'return': ret,
467 'return': ret,
467 })
468 })
468 return ret
469 return ret
469
470
470 def _gethandler(op, part):
471 def _gethandler(op, part):
471 status = 'unknown' # used by debug output
472 status = 'unknown' # used by debug output
472 try:
473 try:
473 handler = parthandlermapping.get(part.type)
474 handler = parthandlermapping.get(part.type)
474 if handler is None:
475 if handler is None:
475 status = 'unsupported-type'
476 status = 'unsupported-type'
476 raise error.BundleUnknownFeatureError(parttype=part.type)
477 raise error.BundleUnknownFeatureError(parttype=part.type)
477 indebug(op.ui, 'found a handler for part %s' % part.type)
478 indebug(op.ui, 'found a handler for part %s' % part.type)
478 unknownparams = part.mandatorykeys - handler.params
479 unknownparams = part.mandatorykeys - handler.params
479 if unknownparams:
480 if unknownparams:
480 unknownparams = list(unknownparams)
481 unknownparams = list(unknownparams)
481 unknownparams.sort()
482 unknownparams.sort()
482 status = 'unsupported-params (%s)' % ', '.join(unknownparams)
483 status = 'unsupported-params (%s)' % ', '.join(unknownparams)
483 raise error.BundleUnknownFeatureError(parttype=part.type,
484 raise error.BundleUnknownFeatureError(parttype=part.type,
484 params=unknownparams)
485 params=unknownparams)
485 status = 'supported'
486 status = 'supported'
486 except error.BundleUnknownFeatureError as exc:
487 except error.BundleUnknownFeatureError as exc:
487 if part.mandatory: # mandatory parts
488 if part.mandatory: # mandatory parts
488 raise
489 raise
489 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
490 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
490 return # skip to part processing
491 return # skip to part processing
491 finally:
492 finally:
492 if op.ui.debugflag:
493 if op.ui.debugflag:
493 msg = ['bundle2-input-part: "%s"' % part.type]
494 msg = ['bundle2-input-part: "%s"' % part.type]
494 if not part.mandatory:
495 if not part.mandatory:
495 msg.append(' (advisory)')
496 msg.append(' (advisory)')
496 nbmp = len(part.mandatorykeys)
497 nbmp = len(part.mandatorykeys)
497 nbap = len(part.params) - nbmp
498 nbap = len(part.params) - nbmp
498 if nbmp or nbap:
499 if nbmp or nbap:
499 msg.append(' (params:')
500 msg.append(' (params:')
500 if nbmp:
501 if nbmp:
501 msg.append(' %i mandatory' % nbmp)
502 msg.append(' %i mandatory' % nbmp)
502 if nbap:
503 if nbap:
503 msg.append(' %i advisory' % nbmp)
504 msg.append(' %i advisory' % nbmp)
504 msg.append(')')
505 msg.append(')')
505 msg.append(' %s\n' % status)
506 msg.append(' %s\n' % status)
506 op.ui.debug(''.join(msg))
507 op.ui.debug(''.join(msg))
507
508
508 return handler
509 return handler
509
510
510 def _processpart(op, part):
511 def _processpart(op, part):
511 """process a single part from a bundle
512 """process a single part from a bundle
512
513
513 The part is guaranteed to have been fully consumed when the function exits
514 The part is guaranteed to have been fully consumed when the function exits
514 (even if an exception is raised)."""
515 (even if an exception is raised)."""
515 handler = _gethandler(op, part)
516 handler = _gethandler(op, part)
516 if handler is None:
517 if handler is None:
517 return
518 return
518
519
519 # handler is called outside the above try block so that we don't
520 # handler is called outside the above try block so that we don't
520 # risk catching KeyErrors from anything other than the
521 # risk catching KeyErrors from anything other than the
521 # parthandlermapping lookup (any KeyError raised by handler()
522 # parthandlermapping lookup (any KeyError raised by handler()
522 # itself represents a defect of a different variety).
523 # itself represents a defect of a different variety).
523 output = None
524 output = None
524 if op.captureoutput and op.reply is not None:
525 if op.captureoutput and op.reply is not None:
525 op.ui.pushbuffer(error=True, subproc=True)
526 op.ui.pushbuffer(error=True, subproc=True)
526 output = ''
527 output = ''
527 try:
528 try:
528 handler(op, part)
529 handler(op, part)
529 finally:
530 finally:
530 if output is not None:
531 if output is not None:
531 output = op.ui.popbuffer()
532 output = op.ui.popbuffer()
532 if output:
533 if output:
533 outpart = op.reply.newpart('output', data=output,
534 outpart = op.reply.newpart('output', data=output,
534 mandatory=False)
535 mandatory=False)
535 outpart.addparam(
536 outpart.addparam(
536 'in-reply-to', pycompat.bytestr(part.id), mandatory=False)
537 'in-reply-to', pycompat.bytestr(part.id), mandatory=False)
537
538
538 def decodecaps(blob):
539 def decodecaps(blob):
539 """decode a bundle2 caps bytes blob into a dictionary
540 """decode a bundle2 caps bytes blob into a dictionary
540
541
541 The blob is a list of capabilities (one per line)
542 The blob is a list of capabilities (one per line)
542 Capabilities may have values using a line of the form::
543 Capabilities may have values using a line of the form::
543
544
544 capability=value1,value2,value3
545 capability=value1,value2,value3
545
546
546 The values are always a list."""
547 The values are always a list."""
547 caps = {}
548 caps = {}
548 for line in blob.splitlines():
549 for line in blob.splitlines():
549 if not line:
550 if not line:
550 continue
551 continue
551 if '=' not in line:
552 if '=' not in line:
552 key, vals = line, ()
553 key, vals = line, ()
553 else:
554 else:
554 key, vals = line.split('=', 1)
555 key, vals = line.split('=', 1)
555 vals = vals.split(',')
556 vals = vals.split(',')
556 key = urlreq.unquote(key)
557 key = urlreq.unquote(key)
557 vals = [urlreq.unquote(v) for v in vals]
558 vals = [urlreq.unquote(v) for v in vals]
558 caps[key] = vals
559 caps[key] = vals
559 return caps
560 return caps
560
561
561 def encodecaps(caps):
562 def encodecaps(caps):
562 """encode a bundle2 caps dictionary into a bytes blob"""
563 """encode a bundle2 caps dictionary into a bytes blob"""
563 chunks = []
564 chunks = []
564 for ca in sorted(caps):
565 for ca in sorted(caps):
565 vals = caps[ca]
566 vals = caps[ca]
566 ca = urlreq.quote(ca)
567 ca = urlreq.quote(ca)
567 vals = [urlreq.quote(v) for v in vals]
568 vals = [urlreq.quote(v) for v in vals]
568 if vals:
569 if vals:
569 ca = "%s=%s" % (ca, ','.join(vals))
570 ca = "%s=%s" % (ca, ','.join(vals))
570 chunks.append(ca)
571 chunks.append(ca)
571 return '\n'.join(chunks)
572 return '\n'.join(chunks)
572
573
573 bundletypes = {
574 bundletypes = {
574 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
575 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
575 # since the unification ssh accepts a header but there
576 # since the unification ssh accepts a header but there
576 # is no capability signaling it.
577 # is no capability signaling it.
577 "HG20": (), # special-cased below
578 "HG20": (), # special-cased below
578 "HG10UN": ("HG10UN", 'UN'),
579 "HG10UN": ("HG10UN", 'UN'),
579 "HG10BZ": ("HG10", 'BZ'),
580 "HG10BZ": ("HG10", 'BZ'),
580 "HG10GZ": ("HG10GZ", 'GZ'),
581 "HG10GZ": ("HG10GZ", 'GZ'),
581 }
582 }
582
583
583 # hgweb uses this list to communicate its preferred type
584 # hgweb uses this list to communicate its preferred type
584 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
585 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
585
586
586 class bundle20(object):
587 class bundle20(object):
587 """represent an outgoing bundle2 container
588 """represent an outgoing bundle2 container
588
589
589 Use the `addparam` method to add stream level parameter. and `newpart` to
590 Use the `addparam` method to add stream level parameter. and `newpart` to
590 populate it. Then call `getchunks` to retrieve all the binary chunks of
591 populate it. Then call `getchunks` to retrieve all the binary chunks of
591 data that compose the bundle2 container."""
592 data that compose the bundle2 container."""
592
593
593 _magicstring = 'HG20'
594 _magicstring = 'HG20'
594
595
595 def __init__(self, ui, capabilities=()):
596 def __init__(self, ui, capabilities=()):
596 self.ui = ui
597 self.ui = ui
597 self._params = []
598 self._params = []
598 self._parts = []
599 self._parts = []
599 self.capabilities = dict(capabilities)
600 self.capabilities = dict(capabilities)
600 self._compengine = util.compengines.forbundletype('UN')
601 self._compengine = util.compengines.forbundletype('UN')
601 self._compopts = None
602 self._compopts = None
602 # If compression is being handled by a consumer of the raw
603 # If compression is being handled by a consumer of the raw
603 # data (e.g. the wire protocol), unsetting this flag tells
604 # data (e.g. the wire protocol), unsetting this flag tells
604 # consumers that the bundle is best left uncompressed.
605 # consumers that the bundle is best left uncompressed.
605 self.prefercompressed = True
606 self.prefercompressed = True
606
607
607 def setcompression(self, alg, compopts=None):
608 def setcompression(self, alg, compopts=None):
608 """setup core part compression to <alg>"""
609 """setup core part compression to <alg>"""
609 if alg in (None, 'UN'):
610 if alg in (None, 'UN'):
610 return
611 return
611 assert not any(n.lower() == 'compression' for n, v in self._params)
612 assert not any(n.lower() == 'compression' for n, v in self._params)
612 self.addparam('Compression', alg)
613 self.addparam('Compression', alg)
613 self._compengine = util.compengines.forbundletype(alg)
614 self._compengine = util.compengines.forbundletype(alg)
614 self._compopts = compopts
615 self._compopts = compopts
615
616
616 @property
617 @property
617 def nbparts(self):
618 def nbparts(self):
618 """total number of parts added to the bundler"""
619 """total number of parts added to the bundler"""
619 return len(self._parts)
620 return len(self._parts)
620
621
621 # methods used to defines the bundle2 content
622 # methods used to defines the bundle2 content
622 def addparam(self, name, value=None):
623 def addparam(self, name, value=None):
623 """add a stream level parameter"""
624 """add a stream level parameter"""
624 if not name:
625 if not name:
625 raise ValueError(r'empty parameter name')
626 raise ValueError(r'empty parameter name')
626 if name[0:1] not in pycompat.bytestr(string.ascii_letters):
627 if name[0:1] not in pycompat.bytestr(string.ascii_letters):
627 raise ValueError(r'non letter first character: %s' % name)
628 raise ValueError(r'non letter first character: %s' % name)
628 self._params.append((name, value))
629 self._params.append((name, value))
629
630
630 def addpart(self, part):
631 def addpart(self, part):
631 """add a new part to the bundle2 container
632 """add a new part to the bundle2 container
632
633
633 Parts contains the actual applicative payload."""
634 Parts contains the actual applicative payload."""
634 assert part.id is None
635 assert part.id is None
635 part.id = len(self._parts) # very cheap counter
636 part.id = len(self._parts) # very cheap counter
636 self._parts.append(part)
637 self._parts.append(part)
637
638
638 def newpart(self, typeid, *args, **kwargs):
639 def newpart(self, typeid, *args, **kwargs):
639 """create a new part and add it to the containers
640 """create a new part and add it to the containers
640
641
641 As the part is directly added to the containers. For now, this means
642 As the part is directly added to the containers. For now, this means
642 that any failure to properly initialize the part after calling
643 that any failure to properly initialize the part after calling
643 ``newpart`` should result in a failure of the whole bundling process.
644 ``newpart`` should result in a failure of the whole bundling process.
644
645
645 You can still fall back to manually create and add if you need better
646 You can still fall back to manually create and add if you need better
646 control."""
647 control."""
647 part = bundlepart(typeid, *args, **kwargs)
648 part = bundlepart(typeid, *args, **kwargs)
648 self.addpart(part)
649 self.addpart(part)
649 return part
650 return part
650
651
651 # methods used to generate the bundle2 stream
652 # methods used to generate the bundle2 stream
652 def getchunks(self):
653 def getchunks(self):
653 if self.ui.debugflag:
654 if self.ui.debugflag:
654 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
655 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
655 if self._params:
656 if self._params:
656 msg.append(' (%i params)' % len(self._params))
657 msg.append(' (%i params)' % len(self._params))
657 msg.append(' %i parts total\n' % len(self._parts))
658 msg.append(' %i parts total\n' % len(self._parts))
658 self.ui.debug(''.join(msg))
659 self.ui.debug(''.join(msg))
659 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
660 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
660 yield self._magicstring
661 yield self._magicstring
661 param = self._paramchunk()
662 param = self._paramchunk()
662 outdebug(self.ui, 'bundle parameter: %s' % param)
663 outdebug(self.ui, 'bundle parameter: %s' % param)
663 yield _pack(_fstreamparamsize, len(param))
664 yield _pack(_fstreamparamsize, len(param))
664 if param:
665 if param:
665 yield param
666 yield param
666 for chunk in self._compengine.compressstream(self._getcorechunk(),
667 for chunk in self._compengine.compressstream(self._getcorechunk(),
667 self._compopts):
668 self._compopts):
668 yield chunk
669 yield chunk
669
670
670 def _paramchunk(self):
671 def _paramchunk(self):
671 """return a encoded version of all stream parameters"""
672 """return a encoded version of all stream parameters"""
672 blocks = []
673 blocks = []
673 for par, value in self._params:
674 for par, value in self._params:
674 par = urlreq.quote(par)
675 par = urlreq.quote(par)
675 if value is not None:
676 if value is not None:
676 value = urlreq.quote(value)
677 value = urlreq.quote(value)
677 par = '%s=%s' % (par, value)
678 par = '%s=%s' % (par, value)
678 blocks.append(par)
679 blocks.append(par)
679 return ' '.join(blocks)
680 return ' '.join(blocks)
680
681
681 def _getcorechunk(self):
682 def _getcorechunk(self):
682 """yield chunk for the core part of the bundle
683 """yield chunk for the core part of the bundle
683
684
684 (all but headers and parameters)"""
685 (all but headers and parameters)"""
685 outdebug(self.ui, 'start of parts')
686 outdebug(self.ui, 'start of parts')
686 for part in self._parts:
687 for part in self._parts:
687 outdebug(self.ui, 'bundle part: "%s"' % part.type)
688 outdebug(self.ui, 'bundle part: "%s"' % part.type)
688 for chunk in part.getchunks(ui=self.ui):
689 for chunk in part.getchunks(ui=self.ui):
689 yield chunk
690 yield chunk
690 outdebug(self.ui, 'end of bundle')
691 outdebug(self.ui, 'end of bundle')
691 yield _pack(_fpartheadersize, 0)
692 yield _pack(_fpartheadersize, 0)
692
693
693
694
694 def salvageoutput(self):
695 def salvageoutput(self):
695 """return a list with a copy of all output parts in the bundle
696 """return a list with a copy of all output parts in the bundle
696
697
697 This is meant to be used during error handling to make sure we preserve
698 This is meant to be used during error handling to make sure we preserve
698 server output"""
699 server output"""
699 salvaged = []
700 salvaged = []
700 for part in self._parts:
701 for part in self._parts:
701 if part.type.startswith('output'):
702 if part.type.startswith('output'):
702 salvaged.append(part.copy())
703 salvaged.append(part.copy())
703 return salvaged
704 return salvaged
704
705
705
706
706 class unpackermixin(object):
707 class unpackermixin(object):
707 """A mixin to extract bytes and struct data from a stream"""
708 """A mixin to extract bytes and struct data from a stream"""
708
709
709 def __init__(self, fp):
710 def __init__(self, fp):
710 self._fp = fp
711 self._fp = fp
711
712
712 def _unpack(self, format):
713 def _unpack(self, format):
713 """unpack this struct format from the stream
714 """unpack this struct format from the stream
714
715
715 This method is meant for internal usage by the bundle2 protocol only.
716 This method is meant for internal usage by the bundle2 protocol only.
716 They directly manipulate the low level stream including bundle2 level
717 They directly manipulate the low level stream including bundle2 level
717 instruction.
718 instruction.
718
719
719 Do not use it to implement higher-level logic or methods."""
720 Do not use it to implement higher-level logic or methods."""
720 data = self._readexact(struct.calcsize(format))
721 data = self._readexact(struct.calcsize(format))
721 return _unpack(format, data)
722 return _unpack(format, data)
722
723
723 def _readexact(self, size):
724 def _readexact(self, size):
724 """read exactly <size> bytes from the stream
725 """read exactly <size> bytes from the stream
725
726
726 This method is meant for internal usage by the bundle2 protocol only.
727 This method is meant for internal usage by the bundle2 protocol only.
727 They directly manipulate the low level stream including bundle2 level
728 They directly manipulate the low level stream including bundle2 level
728 instruction.
729 instruction.
729
730
730 Do not use it to implement higher-level logic or methods."""
731 Do not use it to implement higher-level logic or methods."""
731 return changegroup.readexactly(self._fp, size)
732 return changegroup.readexactly(self._fp, size)
732
733
733 def getunbundler(ui, fp, magicstring=None):
734 def getunbundler(ui, fp, magicstring=None):
734 """return a valid unbundler object for a given magicstring"""
735 """return a valid unbundler object for a given magicstring"""
735 if magicstring is None:
736 if magicstring is None:
736 magicstring = changegroup.readexactly(fp, 4)
737 magicstring = changegroup.readexactly(fp, 4)
737 magic, version = magicstring[0:2], magicstring[2:4]
738 magic, version = magicstring[0:2], magicstring[2:4]
738 if magic != 'HG':
739 if magic != 'HG':
739 ui.debug(
740 ui.debug(
740 "error: invalid magic: %r (version %r), should be 'HG'\n"
741 "error: invalid magic: %r (version %r), should be 'HG'\n"
741 % (magic, version))
742 % (magic, version))
742 raise error.Abort(_('not a Mercurial bundle'))
743 raise error.Abort(_('not a Mercurial bundle'))
743 unbundlerclass = formatmap.get(version)
744 unbundlerclass = formatmap.get(version)
744 if unbundlerclass is None:
745 if unbundlerclass is None:
745 raise error.Abort(_('unknown bundle version %s') % version)
746 raise error.Abort(_('unknown bundle version %s') % version)
746 unbundler = unbundlerclass(ui, fp)
747 unbundler = unbundlerclass(ui, fp)
747 indebug(ui, 'start processing of %s stream' % magicstring)
748 indebug(ui, 'start processing of %s stream' % magicstring)
748 return unbundler
749 return unbundler
749
750
750 class unbundle20(unpackermixin):
751 class unbundle20(unpackermixin):
751 """interpret a bundle2 stream
752 """interpret a bundle2 stream
752
753
753 This class is fed with a binary stream and yields parts through its
754 This class is fed with a binary stream and yields parts through its
754 `iterparts` methods."""
755 `iterparts` methods."""
755
756
756 _magicstring = 'HG20'
757 _magicstring = 'HG20'
757
758
758 def __init__(self, ui, fp):
759 def __init__(self, ui, fp):
759 """If header is specified, we do not read it out of the stream."""
760 """If header is specified, we do not read it out of the stream."""
760 self.ui = ui
761 self.ui = ui
761 self._compengine = util.compengines.forbundletype('UN')
762 self._compengine = util.compengines.forbundletype('UN')
762 self._compressed = None
763 self._compressed = None
763 super(unbundle20, self).__init__(fp)
764 super(unbundle20, self).__init__(fp)
764
765
765 @util.propertycache
766 @util.propertycache
766 def params(self):
767 def params(self):
767 """dictionary of stream level parameters"""
768 """dictionary of stream level parameters"""
768 indebug(self.ui, 'reading bundle2 stream parameters')
769 indebug(self.ui, 'reading bundle2 stream parameters')
769 params = {}
770 params = {}
770 paramssize = self._unpack(_fstreamparamsize)[0]
771 paramssize = self._unpack(_fstreamparamsize)[0]
771 if paramssize < 0:
772 if paramssize < 0:
772 raise error.BundleValueError('negative bundle param size: %i'
773 raise error.BundleValueError('negative bundle param size: %i'
773 % paramssize)
774 % paramssize)
774 if paramssize:
775 if paramssize:
775 params = self._readexact(paramssize)
776 params = self._readexact(paramssize)
776 params = self._processallparams(params)
777 params = self._processallparams(params)
777 return params
778 return params
778
779
779 def _processallparams(self, paramsblock):
780 def _processallparams(self, paramsblock):
780 """"""
781 """"""
781 params = util.sortdict()
782 params = util.sortdict()
782 for p in paramsblock.split(' '):
783 for p in paramsblock.split(' '):
783 p = p.split('=', 1)
784 p = p.split('=', 1)
784 p = [urlreq.unquote(i) for i in p]
785 p = [urlreq.unquote(i) for i in p]
785 if len(p) < 2:
786 if len(p) < 2:
786 p.append(None)
787 p.append(None)
787 self._processparam(*p)
788 self._processparam(*p)
788 params[p[0]] = p[1]
789 params[p[0]] = p[1]
789 return params
790 return params
790
791
791
792
792 def _processparam(self, name, value):
793 def _processparam(self, name, value):
793 """process a parameter, applying its effect if needed
794 """process a parameter, applying its effect if needed
794
795
795 Parameter starting with a lower case letter are advisory and will be
796 Parameter starting with a lower case letter are advisory and will be
796 ignored when unknown. Those starting with an upper case letter are
797 ignored when unknown. Those starting with an upper case letter are
797 mandatory and will this function will raise a KeyError when unknown.
798 mandatory and will this function will raise a KeyError when unknown.
798
799
799 Note: no option are currently supported. Any input will be either
800 Note: no option are currently supported. Any input will be either
800 ignored or failing.
801 ignored or failing.
801 """
802 """
802 if not name:
803 if not name:
803 raise ValueError(r'empty parameter name')
804 raise ValueError(r'empty parameter name')
804 if name[0:1] not in pycompat.bytestr(string.ascii_letters):
805 if name[0:1] not in pycompat.bytestr(string.ascii_letters):
805 raise ValueError(r'non letter first character: %s' % name)
806 raise ValueError(r'non letter first character: %s' % name)
806 try:
807 try:
807 handler = b2streamparamsmap[name.lower()]
808 handler = b2streamparamsmap[name.lower()]
808 except KeyError:
809 except KeyError:
809 if name[0:1].islower():
810 if name[0:1].islower():
810 indebug(self.ui, "ignoring unknown parameter %s" % name)
811 indebug(self.ui, "ignoring unknown parameter %s" % name)
811 else:
812 else:
812 raise error.BundleUnknownFeatureError(params=(name,))
813 raise error.BundleUnknownFeatureError(params=(name,))
813 else:
814 else:
814 handler(self, name, value)
815 handler(self, name, value)
815
816
816 def _forwardchunks(self):
817 def _forwardchunks(self):
817 """utility to transfer a bundle2 as binary
818 """utility to transfer a bundle2 as binary
818
819
819 This is made necessary by the fact the 'getbundle' command over 'ssh'
820 This is made necessary by the fact the 'getbundle' command over 'ssh'
820 have no way to know then the reply end, relying on the bundle to be
821 have no way to know then the reply end, relying on the bundle to be
821 interpreted to know its end. This is terrible and we are sorry, but we
822 interpreted to know its end. This is terrible and we are sorry, but we
822 needed to move forward to get general delta enabled.
823 needed to move forward to get general delta enabled.
823 """
824 """
824 yield self._magicstring
825 yield self._magicstring
825 assert 'params' not in vars(self)
826 assert 'params' not in vars(self)
826 paramssize = self._unpack(_fstreamparamsize)[0]
827 paramssize = self._unpack(_fstreamparamsize)[0]
827 if paramssize < 0:
828 if paramssize < 0:
828 raise error.BundleValueError('negative bundle param size: %i'
829 raise error.BundleValueError('negative bundle param size: %i'
829 % paramssize)
830 % paramssize)
830 yield _pack(_fstreamparamsize, paramssize)
831 yield _pack(_fstreamparamsize, paramssize)
831 if paramssize:
832 if paramssize:
832 params = self._readexact(paramssize)
833 params = self._readexact(paramssize)
833 self._processallparams(params)
834 self._processallparams(params)
834 yield params
835 yield params
835 assert self._compengine.bundletype == 'UN'
836 assert self._compengine.bundletype == 'UN'
836 # From there, payload might need to be decompressed
837 # From there, payload might need to be decompressed
837 self._fp = self._compengine.decompressorreader(self._fp)
838 self._fp = self._compengine.decompressorreader(self._fp)
838 emptycount = 0
839 emptycount = 0
839 while emptycount < 2:
840 while emptycount < 2:
840 # so we can brainlessly loop
841 # so we can brainlessly loop
841 assert _fpartheadersize == _fpayloadsize
842 assert _fpartheadersize == _fpayloadsize
842 size = self._unpack(_fpartheadersize)[0]
843 size = self._unpack(_fpartheadersize)[0]
843 yield _pack(_fpartheadersize, size)
844 yield _pack(_fpartheadersize, size)
844 if size:
845 if size:
845 emptycount = 0
846 emptycount = 0
846 else:
847 else:
847 emptycount += 1
848 emptycount += 1
848 continue
849 continue
849 if size == flaginterrupt:
850 if size == flaginterrupt:
850 continue
851 continue
851 elif size < 0:
852 elif size < 0:
852 raise error.BundleValueError('negative chunk size: %i')
853 raise error.BundleValueError('negative chunk size: %i')
853 yield self._readexact(size)
854 yield self._readexact(size)
854
855
855
856
856 def iterparts(self, seekable=False):
857 def iterparts(self, seekable=False):
857 """yield all parts contained in the stream"""
858 """yield all parts contained in the stream"""
858 cls = seekableunbundlepart if seekable else unbundlepart
859 cls = seekableunbundlepart if seekable else unbundlepart
859 # make sure param have been loaded
860 # make sure param have been loaded
860 self.params
861 self.params
861 # From there, payload need to be decompressed
862 # From there, payload need to be decompressed
862 self._fp = self._compengine.decompressorreader(self._fp)
863 self._fp = self._compengine.decompressorreader(self._fp)
863 indebug(self.ui, 'start extraction of bundle2 parts')
864 indebug(self.ui, 'start extraction of bundle2 parts')
864 headerblock = self._readpartheader()
865 headerblock = self._readpartheader()
865 while headerblock is not None:
866 while headerblock is not None:
866 part = cls(self.ui, headerblock, self._fp)
867 part = cls(self.ui, headerblock, self._fp)
867 yield part
868 yield part
868 # Ensure part is fully consumed so we can start reading the next
869 # Ensure part is fully consumed so we can start reading the next
869 # part.
870 # part.
870 part.consume()
871 part.consume()
871
872
872 headerblock = self._readpartheader()
873 headerblock = self._readpartheader()
873 indebug(self.ui, 'end of bundle2 stream')
874 indebug(self.ui, 'end of bundle2 stream')
874
875
875 def _readpartheader(self):
876 def _readpartheader(self):
876 """reads a part header size and return the bytes blob
877 """reads a part header size and return the bytes blob
877
878
878 returns None if empty"""
879 returns None if empty"""
879 headersize = self._unpack(_fpartheadersize)[0]
880 headersize = self._unpack(_fpartheadersize)[0]
880 if headersize < 0:
881 if headersize < 0:
881 raise error.BundleValueError('negative part header size: %i'
882 raise error.BundleValueError('negative part header size: %i'
882 % headersize)
883 % headersize)
883 indebug(self.ui, 'part header size: %i' % headersize)
884 indebug(self.ui, 'part header size: %i' % headersize)
884 if headersize:
885 if headersize:
885 return self._readexact(headersize)
886 return self._readexact(headersize)
886 return None
887 return None
887
888
888 def compressed(self):
889 def compressed(self):
889 self.params # load params
890 self.params # load params
890 return self._compressed
891 return self._compressed
891
892
892 def close(self):
893 def close(self):
893 """close underlying file"""
894 """close underlying file"""
894 if util.safehasattr(self._fp, 'close'):
895 if util.safehasattr(self._fp, 'close'):
895 return self._fp.close()
896 return self._fp.close()
896
897
897 formatmap = {'20': unbundle20}
898 formatmap = {'20': unbundle20}
898
899
899 b2streamparamsmap = {}
900 b2streamparamsmap = {}
900
901
901 def b2streamparamhandler(name):
902 def b2streamparamhandler(name):
902 """register a handler for a stream level parameter"""
903 """register a handler for a stream level parameter"""
903 def decorator(func):
904 def decorator(func):
904 assert name not in formatmap
905 assert name not in formatmap
905 b2streamparamsmap[name] = func
906 b2streamparamsmap[name] = func
906 return func
907 return func
907 return decorator
908 return decorator
908
909
909 @b2streamparamhandler('compression')
910 @b2streamparamhandler('compression')
910 def processcompression(unbundler, param, value):
911 def processcompression(unbundler, param, value):
911 """read compression parameter and install payload decompression"""
912 """read compression parameter and install payload decompression"""
912 if value not in util.compengines.supportedbundletypes:
913 if value not in util.compengines.supportedbundletypes:
913 raise error.BundleUnknownFeatureError(params=(param,),
914 raise error.BundleUnknownFeatureError(params=(param,),
914 values=(value,))
915 values=(value,))
915 unbundler._compengine = util.compengines.forbundletype(value)
916 unbundler._compengine = util.compengines.forbundletype(value)
916 if value is not None:
917 if value is not None:
917 unbundler._compressed = True
918 unbundler._compressed = True
918
919
919 class bundlepart(object):
920 class bundlepart(object):
920 """A bundle2 part contains application level payload
921 """A bundle2 part contains application level payload
921
922
922 The part `type` is used to route the part to the application level
923 The part `type` is used to route the part to the application level
923 handler.
924 handler.
924
925
925 The part payload is contained in ``part.data``. It could be raw bytes or a
926 The part payload is contained in ``part.data``. It could be raw bytes or a
926 generator of byte chunks.
927 generator of byte chunks.
927
928
928 You can add parameters to the part using the ``addparam`` method.
929 You can add parameters to the part using the ``addparam`` method.
929 Parameters can be either mandatory (default) or advisory. Remote side
930 Parameters can be either mandatory (default) or advisory. Remote side
930 should be able to safely ignore the advisory ones.
931 should be able to safely ignore the advisory ones.
931
932
932 Both data and parameters cannot be modified after the generation has begun.
933 Both data and parameters cannot be modified after the generation has begun.
933 """
934 """
934
935
935 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
936 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
936 data='', mandatory=True):
937 data='', mandatory=True):
937 validateparttype(parttype)
938 validateparttype(parttype)
938 self.id = None
939 self.id = None
939 self.type = parttype
940 self.type = parttype
940 self._data = data
941 self._data = data
941 self._mandatoryparams = list(mandatoryparams)
942 self._mandatoryparams = list(mandatoryparams)
942 self._advisoryparams = list(advisoryparams)
943 self._advisoryparams = list(advisoryparams)
943 # checking for duplicated entries
944 # checking for duplicated entries
944 self._seenparams = set()
945 self._seenparams = set()
945 for pname, __ in self._mandatoryparams + self._advisoryparams:
946 for pname, __ in self._mandatoryparams + self._advisoryparams:
946 if pname in self._seenparams:
947 if pname in self._seenparams:
947 raise error.ProgrammingError('duplicated params: %s' % pname)
948 raise error.ProgrammingError('duplicated params: %s' % pname)
948 self._seenparams.add(pname)
949 self._seenparams.add(pname)
949 # status of the part's generation:
950 # status of the part's generation:
950 # - None: not started,
951 # - None: not started,
951 # - False: currently generated,
952 # - False: currently generated,
952 # - True: generation done.
953 # - True: generation done.
953 self._generated = None
954 self._generated = None
954 self.mandatory = mandatory
955 self.mandatory = mandatory
955
956
956 def __repr__(self):
957 def __repr__(self):
957 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
958 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
958 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
959 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
959 % (cls, id(self), self.id, self.type, self.mandatory))
960 % (cls, id(self), self.id, self.type, self.mandatory))
960
961
961 def copy(self):
962 def copy(self):
962 """return a copy of the part
963 """return a copy of the part
963
964
964 The new part have the very same content but no partid assigned yet.
965 The new part have the very same content but no partid assigned yet.
965 Parts with generated data cannot be copied."""
966 Parts with generated data cannot be copied."""
966 assert not util.safehasattr(self.data, 'next')
967 assert not util.safehasattr(self.data, 'next')
967 return self.__class__(self.type, self._mandatoryparams,
968 return self.__class__(self.type, self._mandatoryparams,
968 self._advisoryparams, self._data, self.mandatory)
969 self._advisoryparams, self._data, self.mandatory)
969
970
970 # methods used to defines the part content
971 # methods used to defines the part content
971 @property
972 @property
972 def data(self):
973 def data(self):
973 return self._data
974 return self._data
974
975
975 @data.setter
976 @data.setter
976 def data(self, data):
977 def data(self, data):
977 if self._generated is not None:
978 if self._generated is not None:
978 raise error.ReadOnlyPartError('part is being generated')
979 raise error.ReadOnlyPartError('part is being generated')
979 self._data = data
980 self._data = data
980
981
981 @property
982 @property
982 def mandatoryparams(self):
983 def mandatoryparams(self):
983 # make it an immutable tuple to force people through ``addparam``
984 # make it an immutable tuple to force people through ``addparam``
984 return tuple(self._mandatoryparams)
985 return tuple(self._mandatoryparams)
985
986
986 @property
987 @property
987 def advisoryparams(self):
988 def advisoryparams(self):
988 # make it an immutable tuple to force people through ``addparam``
989 # make it an immutable tuple to force people through ``addparam``
989 return tuple(self._advisoryparams)
990 return tuple(self._advisoryparams)
990
991
991 def addparam(self, name, value='', mandatory=True):
992 def addparam(self, name, value='', mandatory=True):
992 """add a parameter to the part
993 """add a parameter to the part
993
994
994 If 'mandatory' is set to True, the remote handler must claim support
995 If 'mandatory' is set to True, the remote handler must claim support
995 for this parameter or the unbundling will be aborted.
996 for this parameter or the unbundling will be aborted.
996
997
997 The 'name' and 'value' cannot exceed 255 bytes each.
998 The 'name' and 'value' cannot exceed 255 bytes each.
998 """
999 """
999 if self._generated is not None:
1000 if self._generated is not None:
1000 raise error.ReadOnlyPartError('part is being generated')
1001 raise error.ReadOnlyPartError('part is being generated')
1001 if name in self._seenparams:
1002 if name in self._seenparams:
1002 raise ValueError('duplicated params: %s' % name)
1003 raise ValueError('duplicated params: %s' % name)
1003 self._seenparams.add(name)
1004 self._seenparams.add(name)
1004 params = self._advisoryparams
1005 params = self._advisoryparams
1005 if mandatory:
1006 if mandatory:
1006 params = self._mandatoryparams
1007 params = self._mandatoryparams
1007 params.append((name, value))
1008 params.append((name, value))
1008
1009
1009 # methods used to generates the bundle2 stream
1010 # methods used to generates the bundle2 stream
1010 def getchunks(self, ui):
1011 def getchunks(self, ui):
1011 if self._generated is not None:
1012 if self._generated is not None:
1012 raise error.ProgrammingError('part can only be consumed once')
1013 raise error.ProgrammingError('part can only be consumed once')
1013 self._generated = False
1014 self._generated = False
1014
1015
1015 if ui.debugflag:
1016 if ui.debugflag:
1016 msg = ['bundle2-output-part: "%s"' % self.type]
1017 msg = ['bundle2-output-part: "%s"' % self.type]
1017 if not self.mandatory:
1018 if not self.mandatory:
1018 msg.append(' (advisory)')
1019 msg.append(' (advisory)')
1019 nbmp = len(self.mandatoryparams)
1020 nbmp = len(self.mandatoryparams)
1020 nbap = len(self.advisoryparams)
1021 nbap = len(self.advisoryparams)
1021 if nbmp or nbap:
1022 if nbmp or nbap:
1022 msg.append(' (params:')
1023 msg.append(' (params:')
1023 if nbmp:
1024 if nbmp:
1024 msg.append(' %i mandatory' % nbmp)
1025 msg.append(' %i mandatory' % nbmp)
1025 if nbap:
1026 if nbap:
1026 msg.append(' %i advisory' % nbmp)
1027 msg.append(' %i advisory' % nbmp)
1027 msg.append(')')
1028 msg.append(')')
1028 if not self.data:
1029 if not self.data:
1029 msg.append(' empty payload')
1030 msg.append(' empty payload')
1030 elif (util.safehasattr(self.data, 'next')
1031 elif (util.safehasattr(self.data, 'next')
1031 or util.safehasattr(self.data, '__next__')):
1032 or util.safehasattr(self.data, '__next__')):
1032 msg.append(' streamed payload')
1033 msg.append(' streamed payload')
1033 else:
1034 else:
1034 msg.append(' %i bytes payload' % len(self.data))
1035 msg.append(' %i bytes payload' % len(self.data))
1035 msg.append('\n')
1036 msg.append('\n')
1036 ui.debug(''.join(msg))
1037 ui.debug(''.join(msg))
1037
1038
1038 #### header
1039 #### header
1039 if self.mandatory:
1040 if self.mandatory:
1040 parttype = self.type.upper()
1041 parttype = self.type.upper()
1041 else:
1042 else:
1042 parttype = self.type.lower()
1043 parttype = self.type.lower()
1043 outdebug(ui, 'part %s: "%s"' % (pycompat.bytestr(self.id), parttype))
1044 outdebug(ui, 'part %s: "%s"' % (pycompat.bytestr(self.id), parttype))
1044 ## parttype
1045 ## parttype
1045 header = [_pack(_fparttypesize, len(parttype)),
1046 header = [_pack(_fparttypesize, len(parttype)),
1046 parttype, _pack(_fpartid, self.id),
1047 parttype, _pack(_fpartid, self.id),
1047 ]
1048 ]
1048 ## parameters
1049 ## parameters
1049 # count
1050 # count
1050 manpar = self.mandatoryparams
1051 manpar = self.mandatoryparams
1051 advpar = self.advisoryparams
1052 advpar = self.advisoryparams
1052 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
1053 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
1053 # size
1054 # size
1054 parsizes = []
1055 parsizes = []
1055 for key, value in manpar:
1056 for key, value in manpar:
1056 parsizes.append(len(key))
1057 parsizes.append(len(key))
1057 parsizes.append(len(value))
1058 parsizes.append(len(value))
1058 for key, value in advpar:
1059 for key, value in advpar:
1059 parsizes.append(len(key))
1060 parsizes.append(len(key))
1060 parsizes.append(len(value))
1061 parsizes.append(len(value))
1061 paramsizes = _pack(_makefpartparamsizes(len(parsizes) // 2), *parsizes)
1062 paramsizes = _pack(_makefpartparamsizes(len(parsizes) // 2), *parsizes)
1062 header.append(paramsizes)
1063 header.append(paramsizes)
1063 # key, value
1064 # key, value
1064 for key, value in manpar:
1065 for key, value in manpar:
1065 header.append(key)
1066 header.append(key)
1066 header.append(value)
1067 header.append(value)
1067 for key, value in advpar:
1068 for key, value in advpar:
1068 header.append(key)
1069 header.append(key)
1069 header.append(value)
1070 header.append(value)
1070 ## finalize header
1071 ## finalize header
1071 try:
1072 try:
1072 headerchunk = ''.join(header)
1073 headerchunk = ''.join(header)
1073 except TypeError:
1074 except TypeError:
1074 raise TypeError(r'Found a non-bytes trying to '
1075 raise TypeError(r'Found a non-bytes trying to '
1075 r'build bundle part header: %r' % header)
1076 r'build bundle part header: %r' % header)
1076 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
1077 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
1077 yield _pack(_fpartheadersize, len(headerchunk))
1078 yield _pack(_fpartheadersize, len(headerchunk))
1078 yield headerchunk
1079 yield headerchunk
1079 ## payload
1080 ## payload
1080 try:
1081 try:
1081 for chunk in self._payloadchunks():
1082 for chunk in self._payloadchunks():
1082 outdebug(ui, 'payload chunk size: %i' % len(chunk))
1083 outdebug(ui, 'payload chunk size: %i' % len(chunk))
1083 yield _pack(_fpayloadsize, len(chunk))
1084 yield _pack(_fpayloadsize, len(chunk))
1084 yield chunk
1085 yield chunk
1085 except GeneratorExit:
1086 except GeneratorExit:
1086 # GeneratorExit means that nobody is listening for our
1087 # GeneratorExit means that nobody is listening for our
1087 # results anyway, so just bail quickly rather than trying
1088 # results anyway, so just bail quickly rather than trying
1088 # to produce an error part.
1089 # to produce an error part.
1089 ui.debug('bundle2-generatorexit\n')
1090 ui.debug('bundle2-generatorexit\n')
1090 raise
1091 raise
1091 except BaseException as exc:
1092 except BaseException as exc:
1092 bexc = util.forcebytestr(exc)
1093 bexc = util.forcebytestr(exc)
1093 # backup exception data for later
1094 # backup exception data for later
1094 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
1095 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
1095 % bexc)
1096 % bexc)
1096 tb = sys.exc_info()[2]
1097 tb = sys.exc_info()[2]
1097 msg = 'unexpected error: %s' % bexc
1098 msg = 'unexpected error: %s' % bexc
1098 interpart = bundlepart('error:abort', [('message', msg)],
1099 interpart = bundlepart('error:abort', [('message', msg)],
1099 mandatory=False)
1100 mandatory=False)
1100 interpart.id = 0
1101 interpart.id = 0
1101 yield _pack(_fpayloadsize, -1)
1102 yield _pack(_fpayloadsize, -1)
1102 for chunk in interpart.getchunks(ui=ui):
1103 for chunk in interpart.getchunks(ui=ui):
1103 yield chunk
1104 yield chunk
1104 outdebug(ui, 'closing payload chunk')
1105 outdebug(ui, 'closing payload chunk')
1105 # abort current part payload
1106 # abort current part payload
1106 yield _pack(_fpayloadsize, 0)
1107 yield _pack(_fpayloadsize, 0)
1107 pycompat.raisewithtb(exc, tb)
1108 pycompat.raisewithtb(exc, tb)
1108 # end of payload
1109 # end of payload
1109 outdebug(ui, 'closing payload chunk')
1110 outdebug(ui, 'closing payload chunk')
1110 yield _pack(_fpayloadsize, 0)
1111 yield _pack(_fpayloadsize, 0)
1111 self._generated = True
1112 self._generated = True
1112
1113
1113 def _payloadchunks(self):
1114 def _payloadchunks(self):
1114 """yield chunks of a the part payload
1115 """yield chunks of a the part payload
1115
1116
1116 Exists to handle the different methods to provide data to a part."""
1117 Exists to handle the different methods to provide data to a part."""
1117 # we only support fixed size data now.
1118 # we only support fixed size data now.
1118 # This will be improved in the future.
1119 # This will be improved in the future.
1119 if (util.safehasattr(self.data, 'next')
1120 if (util.safehasattr(self.data, 'next')
1120 or util.safehasattr(self.data, '__next__')):
1121 or util.safehasattr(self.data, '__next__')):
1121 buff = util.chunkbuffer(self.data)
1122 buff = util.chunkbuffer(self.data)
1122 chunk = buff.read(preferedchunksize)
1123 chunk = buff.read(preferedchunksize)
1123 while chunk:
1124 while chunk:
1124 yield chunk
1125 yield chunk
1125 chunk = buff.read(preferedchunksize)
1126 chunk = buff.read(preferedchunksize)
1126 elif len(self.data):
1127 elif len(self.data):
1127 yield self.data
1128 yield self.data
1128
1129
1129
1130
1130 flaginterrupt = -1
1131 flaginterrupt = -1
1131
1132
1132 class interrupthandler(unpackermixin):
1133 class interrupthandler(unpackermixin):
1133 """read one part and process it with restricted capability
1134 """read one part and process it with restricted capability
1134
1135
1135 This allows to transmit exception raised on the producer size during part
1136 This allows to transmit exception raised on the producer size during part
1136 iteration while the consumer is reading a part.
1137 iteration while the consumer is reading a part.
1137
1138
1138 Part processed in this manner only have access to a ui object,"""
1139 Part processed in this manner only have access to a ui object,"""
1139
1140
1140 def __init__(self, ui, fp):
1141 def __init__(self, ui, fp):
1141 super(interrupthandler, self).__init__(fp)
1142 super(interrupthandler, self).__init__(fp)
1142 self.ui = ui
1143 self.ui = ui
1143
1144
1144 def _readpartheader(self):
1145 def _readpartheader(self):
1145 """reads a part header size and return the bytes blob
1146 """reads a part header size and return the bytes blob
1146
1147
1147 returns None if empty"""
1148 returns None if empty"""
1148 headersize = self._unpack(_fpartheadersize)[0]
1149 headersize = self._unpack(_fpartheadersize)[0]
1149 if headersize < 0:
1150 if headersize < 0:
1150 raise error.BundleValueError('negative part header size: %i'
1151 raise error.BundleValueError('negative part header size: %i'
1151 % headersize)
1152 % headersize)
1152 indebug(self.ui, 'part header size: %i\n' % headersize)
1153 indebug(self.ui, 'part header size: %i\n' % headersize)
1153 if headersize:
1154 if headersize:
1154 return self._readexact(headersize)
1155 return self._readexact(headersize)
1155 return None
1156 return None
1156
1157
1157 def __call__(self):
1158 def __call__(self):
1158
1159
1159 self.ui.debug('bundle2-input-stream-interrupt:'
1160 self.ui.debug('bundle2-input-stream-interrupt:'
1160 ' opening out of band context\n')
1161 ' opening out of band context\n')
1161 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1162 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1162 headerblock = self._readpartheader()
1163 headerblock = self._readpartheader()
1163 if headerblock is None:
1164 if headerblock is None:
1164 indebug(self.ui, 'no part found during interruption.')
1165 indebug(self.ui, 'no part found during interruption.')
1165 return
1166 return
1166 part = unbundlepart(self.ui, headerblock, self._fp)
1167 part = unbundlepart(self.ui, headerblock, self._fp)
1167 op = interruptoperation(self.ui)
1168 op = interruptoperation(self.ui)
1168 hardabort = False
1169 hardabort = False
1169 try:
1170 try:
1170 _processpart(op, part)
1171 _processpart(op, part)
1171 except (SystemExit, KeyboardInterrupt):
1172 except (SystemExit, KeyboardInterrupt):
1172 hardabort = True
1173 hardabort = True
1173 raise
1174 raise
1174 finally:
1175 finally:
1175 if not hardabort:
1176 if not hardabort:
1176 part.consume()
1177 part.consume()
1177 self.ui.debug('bundle2-input-stream-interrupt:'
1178 self.ui.debug('bundle2-input-stream-interrupt:'
1178 ' closing out of band context\n')
1179 ' closing out of band context\n')
1179
1180
1180 class interruptoperation(object):
1181 class interruptoperation(object):
1181 """A limited operation to be use by part handler during interruption
1182 """A limited operation to be use by part handler during interruption
1182
1183
1183 It only have access to an ui object.
1184 It only have access to an ui object.
1184 """
1185 """
1185
1186
1186 def __init__(self, ui):
1187 def __init__(self, ui):
1187 self.ui = ui
1188 self.ui = ui
1188 self.reply = None
1189 self.reply = None
1189 self.captureoutput = False
1190 self.captureoutput = False
1190
1191
1191 @property
1192 @property
1192 def repo(self):
1193 def repo(self):
1193 raise error.ProgrammingError('no repo access from stream interruption')
1194 raise error.ProgrammingError('no repo access from stream interruption')
1194
1195
1195 def gettransaction(self):
1196 def gettransaction(self):
1196 raise TransactionUnavailable('no repo access from stream interruption')
1197 raise TransactionUnavailable('no repo access from stream interruption')
1197
1198
1198 def decodepayloadchunks(ui, fh):
1199 def decodepayloadchunks(ui, fh):
1199 """Reads bundle2 part payload data into chunks.
1200 """Reads bundle2 part payload data into chunks.
1200
1201
1201 Part payload data consists of framed chunks. This function takes
1202 Part payload data consists of framed chunks. This function takes
1202 a file handle and emits those chunks.
1203 a file handle and emits those chunks.
1203 """
1204 """
1204 dolog = ui.configbool('devel', 'bundle2.debug')
1205 dolog = ui.configbool('devel', 'bundle2.debug')
1205 debug = ui.debug
1206 debug = ui.debug
1206
1207
1207 headerstruct = struct.Struct(_fpayloadsize)
1208 headerstruct = struct.Struct(_fpayloadsize)
1208 headersize = headerstruct.size
1209 headersize = headerstruct.size
1209 unpack = headerstruct.unpack
1210 unpack = headerstruct.unpack
1210
1211
1211 readexactly = changegroup.readexactly
1212 readexactly = changegroup.readexactly
1212 read = fh.read
1213 read = fh.read
1213
1214
1214 chunksize = unpack(readexactly(fh, headersize))[0]
1215 chunksize = unpack(readexactly(fh, headersize))[0]
1215 indebug(ui, 'payload chunk size: %i' % chunksize)
1216 indebug(ui, 'payload chunk size: %i' % chunksize)
1216
1217
1217 # changegroup.readexactly() is inlined below for performance.
1218 # changegroup.readexactly() is inlined below for performance.
1218 while chunksize:
1219 while chunksize:
1219 if chunksize >= 0:
1220 if chunksize >= 0:
1220 s = read(chunksize)
1221 s = read(chunksize)
1221 if len(s) < chunksize:
1222 if len(s) < chunksize:
1222 raise error.Abort(_('stream ended unexpectedly '
1223 raise error.Abort(_('stream ended unexpectedly '
1223 ' (got %d bytes, expected %d)') %
1224 ' (got %d bytes, expected %d)') %
1224 (len(s), chunksize))
1225 (len(s), chunksize))
1225
1226
1226 yield s
1227 yield s
1227 elif chunksize == flaginterrupt:
1228 elif chunksize == flaginterrupt:
1228 # Interrupt "signal" detected. The regular stream is interrupted
1229 # Interrupt "signal" detected. The regular stream is interrupted
1229 # and a bundle2 part follows. Consume it.
1230 # and a bundle2 part follows. Consume it.
1230 interrupthandler(ui, fh)()
1231 interrupthandler(ui, fh)()
1231 else:
1232 else:
1232 raise error.BundleValueError(
1233 raise error.BundleValueError(
1233 'negative payload chunk size: %s' % chunksize)
1234 'negative payload chunk size: %s' % chunksize)
1234
1235
1235 s = read(headersize)
1236 s = read(headersize)
1236 if len(s) < headersize:
1237 if len(s) < headersize:
1237 raise error.Abort(_('stream ended unexpectedly '
1238 raise error.Abort(_('stream ended unexpectedly '
1238 ' (got %d bytes, expected %d)') %
1239 ' (got %d bytes, expected %d)') %
1239 (len(s), chunksize))
1240 (len(s), chunksize))
1240
1241
1241 chunksize = unpack(s)[0]
1242 chunksize = unpack(s)[0]
1242
1243
1243 # indebug() inlined for performance.
1244 # indebug() inlined for performance.
1244 if dolog:
1245 if dolog:
1245 debug('bundle2-input: payload chunk size: %i\n' % chunksize)
1246 debug('bundle2-input: payload chunk size: %i\n' % chunksize)
1246
1247
1247 class unbundlepart(unpackermixin):
1248 class unbundlepart(unpackermixin):
1248 """a bundle part read from a bundle"""
1249 """a bundle part read from a bundle"""
1249
1250
1250 def __init__(self, ui, header, fp):
1251 def __init__(self, ui, header, fp):
1251 super(unbundlepart, self).__init__(fp)
1252 super(unbundlepart, self).__init__(fp)
1252 self._seekable = (util.safehasattr(fp, 'seek') and
1253 self._seekable = (util.safehasattr(fp, 'seek') and
1253 util.safehasattr(fp, 'tell'))
1254 util.safehasattr(fp, 'tell'))
1254 self.ui = ui
1255 self.ui = ui
1255 # unbundle state attr
1256 # unbundle state attr
1256 self._headerdata = header
1257 self._headerdata = header
1257 self._headeroffset = 0
1258 self._headeroffset = 0
1258 self._initialized = False
1259 self._initialized = False
1259 self.consumed = False
1260 self.consumed = False
1260 # part data
1261 # part data
1261 self.id = None
1262 self.id = None
1262 self.type = None
1263 self.type = None
1263 self.mandatoryparams = None
1264 self.mandatoryparams = None
1264 self.advisoryparams = None
1265 self.advisoryparams = None
1265 self.params = None
1266 self.params = None
1266 self.mandatorykeys = ()
1267 self.mandatorykeys = ()
1267 self._readheader()
1268 self._readheader()
1268 self._mandatory = None
1269 self._mandatory = None
1269 self._pos = 0
1270 self._pos = 0
1270
1271
1271 def _fromheader(self, size):
1272 def _fromheader(self, size):
1272 """return the next <size> byte from the header"""
1273 """return the next <size> byte from the header"""
1273 offset = self._headeroffset
1274 offset = self._headeroffset
1274 data = self._headerdata[offset:(offset + size)]
1275 data = self._headerdata[offset:(offset + size)]
1275 self._headeroffset = offset + size
1276 self._headeroffset = offset + size
1276 return data
1277 return data
1277
1278
1278 def _unpackheader(self, format):
1279 def _unpackheader(self, format):
1279 """read given format from header
1280 """read given format from header
1280
1281
1281 This automatically compute the size of the format to read."""
1282 This automatically compute the size of the format to read."""
1282 data = self._fromheader(struct.calcsize(format))
1283 data = self._fromheader(struct.calcsize(format))
1283 return _unpack(format, data)
1284 return _unpack(format, data)
1284
1285
1285 def _initparams(self, mandatoryparams, advisoryparams):
1286 def _initparams(self, mandatoryparams, advisoryparams):
1286 """internal function to setup all logic related parameters"""
1287 """internal function to setup all logic related parameters"""
1287 # make it read only to prevent people touching it by mistake.
1288 # make it read only to prevent people touching it by mistake.
1288 self.mandatoryparams = tuple(mandatoryparams)
1289 self.mandatoryparams = tuple(mandatoryparams)
1289 self.advisoryparams = tuple(advisoryparams)
1290 self.advisoryparams = tuple(advisoryparams)
1290 # user friendly UI
1291 # user friendly UI
1291 self.params = util.sortdict(self.mandatoryparams)
1292 self.params = util.sortdict(self.mandatoryparams)
1292 self.params.update(self.advisoryparams)
1293 self.params.update(self.advisoryparams)
1293 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1294 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1294
1295
1295 def _readheader(self):
1296 def _readheader(self):
1296 """read the header and setup the object"""
1297 """read the header and setup the object"""
1297 typesize = self._unpackheader(_fparttypesize)[0]
1298 typesize = self._unpackheader(_fparttypesize)[0]
1298 self.type = self._fromheader(typesize)
1299 self.type = self._fromheader(typesize)
1299 indebug(self.ui, 'part type: "%s"' % self.type)
1300 indebug(self.ui, 'part type: "%s"' % self.type)
1300 self.id = self._unpackheader(_fpartid)[0]
1301 self.id = self._unpackheader(_fpartid)[0]
1301 indebug(self.ui, 'part id: "%s"' % pycompat.bytestr(self.id))
1302 indebug(self.ui, 'part id: "%s"' % pycompat.bytestr(self.id))
1302 # extract mandatory bit from type
1303 # extract mandatory bit from type
1303 self.mandatory = (self.type != self.type.lower())
1304 self.mandatory = (self.type != self.type.lower())
1304 self.type = self.type.lower()
1305 self.type = self.type.lower()
1305 ## reading parameters
1306 ## reading parameters
1306 # param count
1307 # param count
1307 mancount, advcount = self._unpackheader(_fpartparamcount)
1308 mancount, advcount = self._unpackheader(_fpartparamcount)
1308 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1309 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1309 # param size
1310 # param size
1310 fparamsizes = _makefpartparamsizes(mancount + advcount)
1311 fparamsizes = _makefpartparamsizes(mancount + advcount)
1311 paramsizes = self._unpackheader(fparamsizes)
1312 paramsizes = self._unpackheader(fparamsizes)
1312 # make it a list of couple again
1313 # make it a list of couple again
1313 paramsizes = list(zip(paramsizes[::2], paramsizes[1::2]))
1314 paramsizes = list(zip(paramsizes[::2], paramsizes[1::2]))
1314 # split mandatory from advisory
1315 # split mandatory from advisory
1315 mansizes = paramsizes[:mancount]
1316 mansizes = paramsizes[:mancount]
1316 advsizes = paramsizes[mancount:]
1317 advsizes = paramsizes[mancount:]
1317 # retrieve param value
1318 # retrieve param value
1318 manparams = []
1319 manparams = []
1319 for key, value in mansizes:
1320 for key, value in mansizes:
1320 manparams.append((self._fromheader(key), self._fromheader(value)))
1321 manparams.append((self._fromheader(key), self._fromheader(value)))
1321 advparams = []
1322 advparams = []
1322 for key, value in advsizes:
1323 for key, value in advsizes:
1323 advparams.append((self._fromheader(key), self._fromheader(value)))
1324 advparams.append((self._fromheader(key), self._fromheader(value)))
1324 self._initparams(manparams, advparams)
1325 self._initparams(manparams, advparams)
1325 ## part payload
1326 ## part payload
1326 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1327 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1327 # we read the data, tell it
1328 # we read the data, tell it
1328 self._initialized = True
1329 self._initialized = True
1329
1330
1330 def _payloadchunks(self):
1331 def _payloadchunks(self):
1331 """Generator of decoded chunks in the payload."""
1332 """Generator of decoded chunks in the payload."""
1332 return decodepayloadchunks(self.ui, self._fp)
1333 return decodepayloadchunks(self.ui, self._fp)
1333
1334
1334 def consume(self):
1335 def consume(self):
1335 """Read the part payload until completion.
1336 """Read the part payload until completion.
1336
1337
1337 By consuming the part data, the underlying stream read offset will
1338 By consuming the part data, the underlying stream read offset will
1338 be advanced to the next part (or end of stream).
1339 be advanced to the next part (or end of stream).
1339 """
1340 """
1340 if self.consumed:
1341 if self.consumed:
1341 return
1342 return
1342
1343
1343 chunk = self.read(32768)
1344 chunk = self.read(32768)
1344 while chunk:
1345 while chunk:
1345 self._pos += len(chunk)
1346 self._pos += len(chunk)
1346 chunk = self.read(32768)
1347 chunk = self.read(32768)
1347
1348
1348 def read(self, size=None):
1349 def read(self, size=None):
1349 """read payload data"""
1350 """read payload data"""
1350 if not self._initialized:
1351 if not self._initialized:
1351 self._readheader()
1352 self._readheader()
1352 if size is None:
1353 if size is None:
1353 data = self._payloadstream.read()
1354 data = self._payloadstream.read()
1354 else:
1355 else:
1355 data = self._payloadstream.read(size)
1356 data = self._payloadstream.read(size)
1356 self._pos += len(data)
1357 self._pos += len(data)
1357 if size is None or len(data) < size:
1358 if size is None or len(data) < size:
1358 if not self.consumed and self._pos:
1359 if not self.consumed and self._pos:
1359 self.ui.debug('bundle2-input-part: total payload size %i\n'
1360 self.ui.debug('bundle2-input-part: total payload size %i\n'
1360 % self._pos)
1361 % self._pos)
1361 self.consumed = True
1362 self.consumed = True
1362 return data
1363 return data
1363
1364
1364 class seekableunbundlepart(unbundlepart):
1365 class seekableunbundlepart(unbundlepart):
1365 """A bundle2 part in a bundle that is seekable.
1366 """A bundle2 part in a bundle that is seekable.
1366
1367
1367 Regular ``unbundlepart`` instances can only be read once. This class
1368 Regular ``unbundlepart`` instances can only be read once. This class
1368 extends ``unbundlepart`` to enable bi-directional seeking within the
1369 extends ``unbundlepart`` to enable bi-directional seeking within the
1369 part.
1370 part.
1370
1371
1371 Bundle2 part data consists of framed chunks. Offsets when seeking
1372 Bundle2 part data consists of framed chunks. Offsets when seeking
1372 refer to the decoded data, not the offsets in the underlying bundle2
1373 refer to the decoded data, not the offsets in the underlying bundle2
1373 stream.
1374 stream.
1374
1375
1375 To facilitate quickly seeking within the decoded data, instances of this
1376 To facilitate quickly seeking within the decoded data, instances of this
1376 class maintain a mapping between offsets in the underlying stream and
1377 class maintain a mapping between offsets in the underlying stream and
1377 the decoded payload. This mapping will consume memory in proportion
1378 the decoded payload. This mapping will consume memory in proportion
1378 to the number of chunks within the payload (which almost certainly
1379 to the number of chunks within the payload (which almost certainly
1379 increases in proportion with the size of the part).
1380 increases in proportion with the size of the part).
1380 """
1381 """
1381 def __init__(self, ui, header, fp):
1382 def __init__(self, ui, header, fp):
1382 # (payload, file) offsets for chunk starts.
1383 # (payload, file) offsets for chunk starts.
1383 self._chunkindex = []
1384 self._chunkindex = []
1384
1385
1385 super(seekableunbundlepart, self).__init__(ui, header, fp)
1386 super(seekableunbundlepart, self).__init__(ui, header, fp)
1386
1387
1387 def _payloadchunks(self, chunknum=0):
1388 def _payloadchunks(self, chunknum=0):
1388 '''seek to specified chunk and start yielding data'''
1389 '''seek to specified chunk and start yielding data'''
1389 if len(self._chunkindex) == 0:
1390 if len(self._chunkindex) == 0:
1390 assert chunknum == 0, 'Must start with chunk 0'
1391 assert chunknum == 0, 'Must start with chunk 0'
1391 self._chunkindex.append((0, self._tellfp()))
1392 self._chunkindex.append((0, self._tellfp()))
1392 else:
1393 else:
1393 assert chunknum < len(self._chunkindex), \
1394 assert chunknum < len(self._chunkindex), \
1394 'Unknown chunk %d' % chunknum
1395 'Unknown chunk %d' % chunknum
1395 self._seekfp(self._chunkindex[chunknum][1])
1396 self._seekfp(self._chunkindex[chunknum][1])
1396
1397
1397 pos = self._chunkindex[chunknum][0]
1398 pos = self._chunkindex[chunknum][0]
1398
1399
1399 for chunk in decodepayloadchunks(self.ui, self._fp):
1400 for chunk in decodepayloadchunks(self.ui, self._fp):
1400 chunknum += 1
1401 chunknum += 1
1401 pos += len(chunk)
1402 pos += len(chunk)
1402 if chunknum == len(self._chunkindex):
1403 if chunknum == len(self._chunkindex):
1403 self._chunkindex.append((pos, self._tellfp()))
1404 self._chunkindex.append((pos, self._tellfp()))
1404
1405
1405 yield chunk
1406 yield chunk
1406
1407
1407 def _findchunk(self, pos):
1408 def _findchunk(self, pos):
1408 '''for a given payload position, return a chunk number and offset'''
1409 '''for a given payload position, return a chunk number and offset'''
1409 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1410 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1410 if ppos == pos:
1411 if ppos == pos:
1411 return chunk, 0
1412 return chunk, 0
1412 elif ppos > pos:
1413 elif ppos > pos:
1413 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1414 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1414 raise ValueError('Unknown chunk')
1415 raise ValueError('Unknown chunk')
1415
1416
1416 def tell(self):
1417 def tell(self):
1417 return self._pos
1418 return self._pos
1418
1419
1419 def seek(self, offset, whence=os.SEEK_SET):
1420 def seek(self, offset, whence=os.SEEK_SET):
1420 if whence == os.SEEK_SET:
1421 if whence == os.SEEK_SET:
1421 newpos = offset
1422 newpos = offset
1422 elif whence == os.SEEK_CUR:
1423 elif whence == os.SEEK_CUR:
1423 newpos = self._pos + offset
1424 newpos = self._pos + offset
1424 elif whence == os.SEEK_END:
1425 elif whence == os.SEEK_END:
1425 if not self.consumed:
1426 if not self.consumed:
1426 # Can't use self.consume() here because it advances self._pos.
1427 # Can't use self.consume() here because it advances self._pos.
1427 chunk = self.read(32768)
1428 chunk = self.read(32768)
1428 while chunk:
1429 while chunk:
1429 chunk = self.read(32768)
1430 chunk = self.read(32768)
1430 newpos = self._chunkindex[-1][0] - offset
1431 newpos = self._chunkindex[-1][0] - offset
1431 else:
1432 else:
1432 raise ValueError('Unknown whence value: %r' % (whence,))
1433 raise ValueError('Unknown whence value: %r' % (whence,))
1433
1434
1434 if newpos > self._chunkindex[-1][0] and not self.consumed:
1435 if newpos > self._chunkindex[-1][0] and not self.consumed:
1435 # Can't use self.consume() here because it advances self._pos.
1436 # Can't use self.consume() here because it advances self._pos.
1436 chunk = self.read(32768)
1437 chunk = self.read(32768)
1437 while chunk:
1438 while chunk:
1438 chunk = self.read(32668)
1439 chunk = self.read(32668)
1439
1440
1440 if not 0 <= newpos <= self._chunkindex[-1][0]:
1441 if not 0 <= newpos <= self._chunkindex[-1][0]:
1441 raise ValueError('Offset out of range')
1442 raise ValueError('Offset out of range')
1442
1443
1443 if self._pos != newpos:
1444 if self._pos != newpos:
1444 chunk, internaloffset = self._findchunk(newpos)
1445 chunk, internaloffset = self._findchunk(newpos)
1445 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1446 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1446 adjust = self.read(internaloffset)
1447 adjust = self.read(internaloffset)
1447 if len(adjust) != internaloffset:
1448 if len(adjust) != internaloffset:
1448 raise error.Abort(_('Seek failed\n'))
1449 raise error.Abort(_('Seek failed\n'))
1449 self._pos = newpos
1450 self._pos = newpos
1450
1451
1451 def _seekfp(self, offset, whence=0):
1452 def _seekfp(self, offset, whence=0):
1452 """move the underlying file pointer
1453 """move the underlying file pointer
1453
1454
1454 This method is meant for internal usage by the bundle2 protocol only.
1455 This method is meant for internal usage by the bundle2 protocol only.
1455 They directly manipulate the low level stream including bundle2 level
1456 They directly manipulate the low level stream including bundle2 level
1456 instruction.
1457 instruction.
1457
1458
1458 Do not use it to implement higher-level logic or methods."""
1459 Do not use it to implement higher-level logic or methods."""
1459 if self._seekable:
1460 if self._seekable:
1460 return self._fp.seek(offset, whence)
1461 return self._fp.seek(offset, whence)
1461 else:
1462 else:
1462 raise NotImplementedError(_('File pointer is not seekable'))
1463 raise NotImplementedError(_('File pointer is not seekable'))
1463
1464
1464 def _tellfp(self):
1465 def _tellfp(self):
1465 """return the file offset, or None if file is not seekable
1466 """return the file offset, or None if file is not seekable
1466
1467
1467 This method is meant for internal usage by the bundle2 protocol only.
1468 This method is meant for internal usage by the bundle2 protocol only.
1468 They directly manipulate the low level stream including bundle2 level
1469 They directly manipulate the low level stream including bundle2 level
1469 instruction.
1470 instruction.
1470
1471
1471 Do not use it to implement higher-level logic or methods."""
1472 Do not use it to implement higher-level logic or methods."""
1472 if self._seekable:
1473 if self._seekable:
1473 try:
1474 try:
1474 return self._fp.tell()
1475 return self._fp.tell()
1475 except IOError as e:
1476 except IOError as e:
1476 if e.errno == errno.ESPIPE:
1477 if e.errno == errno.ESPIPE:
1477 self._seekable = False
1478 self._seekable = False
1478 else:
1479 else:
1479 raise
1480 raise
1480 return None
1481 return None
1481
1482
1482 # These are only the static capabilities.
1483 # These are only the static capabilities.
1483 # Check the 'getrepocaps' function for the rest.
1484 # Check the 'getrepocaps' function for the rest.
1484 capabilities = {'HG20': (),
1485 capabilities = {'HG20': (),
1485 'bookmarks': (),
1486 'bookmarks': (),
1486 'error': ('abort', 'unsupportedcontent', 'pushraced',
1487 'error': ('abort', 'unsupportedcontent', 'pushraced',
1487 'pushkey'),
1488 'pushkey'),
1488 'listkeys': (),
1489 'listkeys': (),
1489 'pushkey': (),
1490 'pushkey': (),
1490 'digests': tuple(sorted(util.DIGESTS.keys())),
1491 'digests': tuple(sorted(util.DIGESTS.keys())),
1491 'remote-changegroup': ('http', 'https'),
1492 'remote-changegroup': ('http', 'https'),
1492 'hgtagsfnodes': (),
1493 'hgtagsfnodes': (),
1493 'phases': ('heads',),
1494 'phases': ('heads',),
1494 'stream': ('v2',),
1495 'stream': ('v2',),
1495 }
1496 }
1496
1497
1497 def getrepocaps(repo, allowpushback=False, role=None):
1498 def getrepocaps(repo, allowpushback=False, role=None):
1498 """return the bundle2 capabilities for a given repo
1499 """return the bundle2 capabilities for a given repo
1499
1500
1500 Exists to allow extensions (like evolution) to mutate the capabilities.
1501 Exists to allow extensions (like evolution) to mutate the capabilities.
1501
1502
1502 The returned value is used for servers advertising their capabilities as
1503 The returned value is used for servers advertising their capabilities as
1503 well as clients advertising their capabilities to servers as part of
1504 well as clients advertising their capabilities to servers as part of
1504 bundle2 requests. The ``role`` argument specifies which is which.
1505 bundle2 requests. The ``role`` argument specifies which is which.
1505 """
1506 """
1506 if role not in ('client', 'server'):
1507 if role not in ('client', 'server'):
1507 raise error.ProgrammingError('role argument must be client or server')
1508 raise error.ProgrammingError('role argument must be client or server')
1508
1509
1509 caps = capabilities.copy()
1510 caps = capabilities.copy()
1510 caps['changegroup'] = tuple(sorted(
1511 caps['changegroup'] = tuple(sorted(
1511 changegroup.supportedincomingversions(repo)))
1512 changegroup.supportedincomingversions(repo)))
1512 if obsolete.isenabled(repo, obsolete.exchangeopt):
1513 if obsolete.isenabled(repo, obsolete.exchangeopt):
1513 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1514 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1514 caps['obsmarkers'] = supportedformat
1515 caps['obsmarkers'] = supportedformat
1515 if allowpushback:
1516 if allowpushback:
1516 caps['pushback'] = ()
1517 caps['pushback'] = ()
1517 cpmode = repo.ui.config('server', 'concurrent-push-mode')
1518 cpmode = repo.ui.config('server', 'concurrent-push-mode')
1518 if cpmode == 'check-related':
1519 if cpmode == 'check-related':
1519 caps['checkheads'] = ('related',)
1520 caps['checkheads'] = ('related',)
1520 if 'phases' in repo.ui.configlist('devel', 'legacy.exchange'):
1521 if 'phases' in repo.ui.configlist('devel', 'legacy.exchange'):
1521 caps.pop('phases')
1522 caps.pop('phases')
1522
1523
1523 # Don't advertise stream clone support in server mode if not configured.
1524 # Don't advertise stream clone support in server mode if not configured.
1524 if role == 'server':
1525 if role == 'server':
1525 streamsupported = repo.ui.configbool('server', 'uncompressed',
1526 streamsupported = repo.ui.configbool('server', 'uncompressed',
1526 untrusted=True)
1527 untrusted=True)
1527 featuresupported = repo.ui.configbool('experimental', 'bundle2.stream')
1528 featuresupported = repo.ui.configbool('experimental', 'bundle2.stream')
1528
1529
1529 if not streamsupported or not featuresupported:
1530 if not streamsupported or not featuresupported:
1530 caps.pop('stream')
1531 caps.pop('stream')
1531 # Else always advertise support on client, because payload support
1532 # Else always advertise support on client, because payload support
1532 # should always be advertised.
1533 # should always be advertised.
1533
1534
1534 return caps
1535 return caps
1535
1536
1536 def bundle2caps(remote):
1537 def bundle2caps(remote):
1537 """return the bundle capabilities of a peer as dict"""
1538 """return the bundle capabilities of a peer as dict"""
1538 raw = remote.capable('bundle2')
1539 raw = remote.capable('bundle2')
1539 if not raw and raw != '':
1540 if not raw and raw != '':
1540 return {}
1541 return {}
1541 capsblob = urlreq.unquote(remote.capable('bundle2'))
1542 capsblob = urlreq.unquote(remote.capable('bundle2'))
1542 return decodecaps(capsblob)
1543 return decodecaps(capsblob)
1543
1544
1544 def obsmarkersversion(caps):
1545 def obsmarkersversion(caps):
1545 """extract the list of supported obsmarkers versions from a bundle2caps dict
1546 """extract the list of supported obsmarkers versions from a bundle2caps dict
1546 """
1547 """
1547 obscaps = caps.get('obsmarkers', ())
1548 obscaps = caps.get('obsmarkers', ())
1548 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1549 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1549
1550
1550 def writenewbundle(ui, repo, source, filename, bundletype, outgoing, opts,
1551 def writenewbundle(ui, repo, source, filename, bundletype, outgoing, opts,
1551 vfs=None, compression=None, compopts=None):
1552 vfs=None, compression=None, compopts=None):
1552 if bundletype.startswith('HG10'):
1553 if bundletype.startswith('HG10'):
1553 cg = changegroup.makechangegroup(repo, outgoing, '01', source)
1554 cg = changegroup.makechangegroup(repo, outgoing, '01', source)
1554 return writebundle(ui, cg, filename, bundletype, vfs=vfs,
1555 return writebundle(ui, cg, filename, bundletype, vfs=vfs,
1555 compression=compression, compopts=compopts)
1556 compression=compression, compopts=compopts)
1556 elif not bundletype.startswith('HG20'):
1557 elif not bundletype.startswith('HG20'):
1557 raise error.ProgrammingError('unknown bundle type: %s' % bundletype)
1558 raise error.ProgrammingError('unknown bundle type: %s' % bundletype)
1558
1559
1559 caps = {}
1560 caps = {}
1560 if 'obsolescence' in opts:
1561 if 'obsolescence' in opts:
1561 caps['obsmarkers'] = ('V1',)
1562 caps['obsmarkers'] = ('V1',)
1562 bundle = bundle20(ui, caps)
1563 bundle = bundle20(ui, caps)
1563 bundle.setcompression(compression, compopts)
1564 bundle.setcompression(compression, compopts)
1564 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1565 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1565 chunkiter = bundle.getchunks()
1566 chunkiter = bundle.getchunks()
1566
1567
1567 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1568 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1568
1569
1569 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1570 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1570 # We should eventually reconcile this logic with the one behind
1571 # We should eventually reconcile this logic with the one behind
1571 # 'exchange.getbundle2partsgenerator'.
1572 # 'exchange.getbundle2partsgenerator'.
1572 #
1573 #
1573 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1574 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1574 # different right now. So we keep them separated for now for the sake of
1575 # different right now. So we keep them separated for now for the sake of
1575 # simplicity.
1576 # simplicity.
1576
1577
1577 # we always want a changegroup in such bundle
1578 # we always want a changegroup in such bundle
1578 cgversion = opts.get('cg.version')
1579 cgversion = opts.get('cg.version')
1579 if cgversion is None:
1580 if cgversion is None:
1580 cgversion = changegroup.safeversion(repo)
1581 cgversion = changegroup.safeversion(repo)
1581 cg = changegroup.makechangegroup(repo, outgoing, cgversion, source)
1582 cg = changegroup.makechangegroup(repo, outgoing, cgversion, source)
1582 part = bundler.newpart('changegroup', data=cg.getchunks())
1583 part = bundler.newpart('changegroup', data=cg.getchunks())
1583 part.addparam('version', cg.version)
1584 part.addparam('version', cg.version)
1584 if 'clcount' in cg.extras:
1585 if 'clcount' in cg.extras:
1585 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1586 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1586 mandatory=False)
1587 mandatory=False)
1587 if opts.get('phases') and repo.revs('%ln and secret()',
1588 if opts.get('phases') and repo.revs('%ln and secret()',
1588 outgoing.missingheads):
1589 outgoing.missingheads):
1589 part.addparam('targetphase', '%d' % phases.secret, mandatory=False)
1590 part.addparam('targetphase', '%d' % phases.secret, mandatory=False)
1590
1591
1591 addparttagsfnodescache(repo, bundler, outgoing)
1592 addparttagsfnodescache(repo, bundler, outgoing)
1592
1593
1593 if opts.get('obsolescence', False):
1594 if opts.get('obsolescence', False):
1594 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1595 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1595 buildobsmarkerspart(bundler, obsmarkers)
1596 buildobsmarkerspart(bundler, obsmarkers)
1596
1597
1597 if opts.get('phases', False):
1598 if opts.get('phases', False):
1598 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1599 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1599 phasedata = phases.binaryencode(headsbyphase)
1600 phasedata = phases.binaryencode(headsbyphase)
1600 bundler.newpart('phase-heads', data=phasedata)
1601 bundler.newpart('phase-heads', data=phasedata)
1601
1602
1602 def addparttagsfnodescache(repo, bundler, outgoing):
1603 def addparttagsfnodescache(repo, bundler, outgoing):
1603 # we include the tags fnode cache for the bundle changeset
1604 # we include the tags fnode cache for the bundle changeset
1604 # (as an optional parts)
1605 # (as an optional parts)
1605 cache = tags.hgtagsfnodescache(repo.unfiltered())
1606 cache = tags.hgtagsfnodescache(repo.unfiltered())
1606 chunks = []
1607 chunks = []
1607
1608
1608 # .hgtags fnodes are only relevant for head changesets. While we could
1609 # .hgtags fnodes are only relevant for head changesets. While we could
1609 # transfer values for all known nodes, there will likely be little to
1610 # transfer values for all known nodes, there will likely be little to
1610 # no benefit.
1611 # no benefit.
1611 #
1612 #
1612 # We don't bother using a generator to produce output data because
1613 # We don't bother using a generator to produce output data because
1613 # a) we only have 40 bytes per head and even esoteric numbers of heads
1614 # a) we only have 40 bytes per head and even esoteric numbers of heads
1614 # consume little memory (1M heads is 40MB) b) we don't want to send the
1615 # consume little memory (1M heads is 40MB) b) we don't want to send the
1615 # part if we don't have entries and knowing if we have entries requires
1616 # part if we don't have entries and knowing if we have entries requires
1616 # cache lookups.
1617 # cache lookups.
1617 for node in outgoing.missingheads:
1618 for node in outgoing.missingheads:
1618 # Don't compute missing, as this may slow down serving.
1619 # Don't compute missing, as this may slow down serving.
1619 fnode = cache.getfnode(node, computemissing=False)
1620 fnode = cache.getfnode(node, computemissing=False)
1620 if fnode is not None:
1621 if fnode is not None:
1621 chunks.extend([node, fnode])
1622 chunks.extend([node, fnode])
1622
1623
1623 if chunks:
1624 if chunks:
1624 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1625 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1625
1626
1626 def buildobsmarkerspart(bundler, markers):
1627 def buildobsmarkerspart(bundler, markers):
1627 """add an obsmarker part to the bundler with <markers>
1628 """add an obsmarker part to the bundler with <markers>
1628
1629
1629 No part is created if markers is empty.
1630 No part is created if markers is empty.
1630 Raises ValueError if the bundler doesn't support any known obsmarker format.
1631 Raises ValueError if the bundler doesn't support any known obsmarker format.
1631 """
1632 """
1632 if not markers:
1633 if not markers:
1633 return None
1634 return None
1634
1635
1635 remoteversions = obsmarkersversion(bundler.capabilities)
1636 remoteversions = obsmarkersversion(bundler.capabilities)
1636 version = obsolete.commonversion(remoteversions)
1637 version = obsolete.commonversion(remoteversions)
1637 if version is None:
1638 if version is None:
1638 raise ValueError('bundler does not support common obsmarker format')
1639 raise ValueError('bundler does not support common obsmarker format')
1639 stream = obsolete.encodemarkers(markers, True, version=version)
1640 stream = obsolete.encodemarkers(markers, True, version=version)
1640 return bundler.newpart('obsmarkers', data=stream)
1641 return bundler.newpart('obsmarkers', data=stream)
1641
1642
1642 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1643 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1643 compopts=None):
1644 compopts=None):
1644 """Write a bundle file and return its filename.
1645 """Write a bundle file and return its filename.
1645
1646
1646 Existing files will not be overwritten.
1647 Existing files will not be overwritten.
1647 If no filename is specified, a temporary file is created.
1648 If no filename is specified, a temporary file is created.
1648 bz2 compression can be turned off.
1649 bz2 compression can be turned off.
1649 The bundle file will be deleted in case of errors.
1650 The bundle file will be deleted in case of errors.
1650 """
1651 """
1651
1652
1652 if bundletype == "HG20":
1653 if bundletype == "HG20":
1653 bundle = bundle20(ui)
1654 bundle = bundle20(ui)
1654 bundle.setcompression(compression, compopts)
1655 bundle.setcompression(compression, compopts)
1655 part = bundle.newpart('changegroup', data=cg.getchunks())
1656 part = bundle.newpart('changegroup', data=cg.getchunks())
1656 part.addparam('version', cg.version)
1657 part.addparam('version', cg.version)
1657 if 'clcount' in cg.extras:
1658 if 'clcount' in cg.extras:
1658 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1659 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1659 mandatory=False)
1660 mandatory=False)
1660 chunkiter = bundle.getchunks()
1661 chunkiter = bundle.getchunks()
1661 else:
1662 else:
1662 # compression argument is only for the bundle2 case
1663 # compression argument is only for the bundle2 case
1663 assert compression is None
1664 assert compression is None
1664 if cg.version != '01':
1665 if cg.version != '01':
1665 raise error.Abort(_('old bundle types only supports v1 '
1666 raise error.Abort(_('old bundle types only supports v1 '
1666 'changegroups'))
1667 'changegroups'))
1667 header, comp = bundletypes[bundletype]
1668 header, comp = bundletypes[bundletype]
1668 if comp not in util.compengines.supportedbundletypes:
1669 if comp not in util.compengines.supportedbundletypes:
1669 raise error.Abort(_('unknown stream compression type: %s')
1670 raise error.Abort(_('unknown stream compression type: %s')
1670 % comp)
1671 % comp)
1671 compengine = util.compengines.forbundletype(comp)
1672 compengine = util.compengines.forbundletype(comp)
1672 def chunkiter():
1673 def chunkiter():
1673 yield header
1674 yield header
1674 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1675 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1675 yield chunk
1676 yield chunk
1676 chunkiter = chunkiter()
1677 chunkiter = chunkiter()
1677
1678
1678 # parse the changegroup data, otherwise we will block
1679 # parse the changegroup data, otherwise we will block
1679 # in case of sshrepo because we don't know the end of the stream
1680 # in case of sshrepo because we don't know the end of the stream
1680 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1681 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1681
1682
1682 def combinechangegroupresults(op):
1683 def combinechangegroupresults(op):
1683 """logic to combine 0 or more addchangegroup results into one"""
1684 """logic to combine 0 or more addchangegroup results into one"""
1684 results = [r.get('return', 0)
1685 results = [r.get('return', 0)
1685 for r in op.records['changegroup']]
1686 for r in op.records['changegroup']]
1686 changedheads = 0
1687 changedheads = 0
1687 result = 1
1688 result = 1
1688 for ret in results:
1689 for ret in results:
1689 # If any changegroup result is 0, return 0
1690 # If any changegroup result is 0, return 0
1690 if ret == 0:
1691 if ret == 0:
1691 result = 0
1692 result = 0
1692 break
1693 break
1693 if ret < -1:
1694 if ret < -1:
1694 changedheads += ret + 1
1695 changedheads += ret + 1
1695 elif ret > 1:
1696 elif ret > 1:
1696 changedheads += ret - 1
1697 changedheads += ret - 1
1697 if changedheads > 0:
1698 if changedheads > 0:
1698 result = 1 + changedheads
1699 result = 1 + changedheads
1699 elif changedheads < 0:
1700 elif changedheads < 0:
1700 result = -1 + changedheads
1701 result = -1 + changedheads
1701 return result
1702 return result
1702
1703
1703 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest',
1704 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest',
1704 'targetphase'))
1705 'targetphase'))
1705 def handlechangegroup(op, inpart):
1706 def handlechangegroup(op, inpart):
1706 """apply a changegroup part on the repo
1707 """apply a changegroup part on the repo
1707
1708
1708 This is a very early implementation that will massive rework before being
1709 This is a very early implementation that will massive rework before being
1709 inflicted to any end-user.
1710 inflicted to any end-user.
1710 """
1711 """
1711 tr = op.gettransaction()
1712 tr = op.gettransaction()
1712 unpackerversion = inpart.params.get('version', '01')
1713 unpackerversion = inpart.params.get('version', '01')
1713 # We should raise an appropriate exception here
1714 # We should raise an appropriate exception here
1714 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1715 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1715 # the source and url passed here are overwritten by the one contained in
1716 # the source and url passed here are overwritten by the one contained in
1716 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1717 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1717 nbchangesets = None
1718 nbchangesets = None
1718 if 'nbchanges' in inpart.params:
1719 if 'nbchanges' in inpart.params:
1719 nbchangesets = int(inpart.params.get('nbchanges'))
1720 nbchangesets = int(inpart.params.get('nbchanges'))
1720 if ('treemanifest' in inpart.params and
1721 if ('treemanifest' in inpart.params and
1721 'treemanifest' not in op.repo.requirements):
1722 'treemanifest' not in op.repo.requirements):
1722 if len(op.repo.changelog) != 0:
1723 if len(op.repo.changelog) != 0:
1723 raise error.Abort(_(
1724 raise error.Abort(_(
1724 "bundle contains tree manifests, but local repo is "
1725 "bundle contains tree manifests, but local repo is "
1725 "non-empty and does not use tree manifests"))
1726 "non-empty and does not use tree manifests"))
1726 op.repo.requirements.add('treemanifest')
1727 op.repo.requirements.add('treemanifest')
1727 op.repo._applyopenerreqs()
1728 op.repo._applyopenerreqs()
1728 op.repo._writerequirements()
1729 op.repo._writerequirements()
1729 extrakwargs = {}
1730 extrakwargs = {}
1730 targetphase = inpart.params.get('targetphase')
1731 targetphase = inpart.params.get('targetphase')
1731 if targetphase is not None:
1732 if targetphase is not None:
1732 extrakwargs[r'targetphase'] = int(targetphase)
1733 extrakwargs[r'targetphase'] = int(targetphase)
1733 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2',
1734 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2',
1734 expectedtotal=nbchangesets, **extrakwargs)
1735 expectedtotal=nbchangesets, **extrakwargs)
1735 if op.reply is not None:
1736 if op.reply is not None:
1736 # This is definitely not the final form of this
1737 # This is definitely not the final form of this
1737 # return. But one need to start somewhere.
1738 # return. But one need to start somewhere.
1738 part = op.reply.newpart('reply:changegroup', mandatory=False)
1739 part = op.reply.newpart('reply:changegroup', mandatory=False)
1739 part.addparam(
1740 part.addparam(
1740 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1741 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1741 part.addparam('return', '%i' % ret, mandatory=False)
1742 part.addparam('return', '%i' % ret, mandatory=False)
1742 assert not inpart.read()
1743 assert not inpart.read()
1743
1744
1744 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1745 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1745 ['digest:%s' % k for k in util.DIGESTS.keys()])
1746 ['digest:%s' % k for k in util.DIGESTS.keys()])
1746 @parthandler('remote-changegroup', _remotechangegroupparams)
1747 @parthandler('remote-changegroup', _remotechangegroupparams)
1747 def handleremotechangegroup(op, inpart):
1748 def handleremotechangegroup(op, inpart):
1748 """apply a bundle10 on the repo, given an url and validation information
1749 """apply a bundle10 on the repo, given an url and validation information
1749
1750
1750 All the information about the remote bundle to import are given as
1751 All the information about the remote bundle to import are given as
1751 parameters. The parameters include:
1752 parameters. The parameters include:
1752 - url: the url to the bundle10.
1753 - url: the url to the bundle10.
1753 - size: the bundle10 file size. It is used to validate what was
1754 - size: the bundle10 file size. It is used to validate what was
1754 retrieved by the client matches the server knowledge about the bundle.
1755 retrieved by the client matches the server knowledge about the bundle.
1755 - digests: a space separated list of the digest types provided as
1756 - digests: a space separated list of the digest types provided as
1756 parameters.
1757 parameters.
1757 - digest:<digest-type>: the hexadecimal representation of the digest with
1758 - digest:<digest-type>: the hexadecimal representation of the digest with
1758 that name. Like the size, it is used to validate what was retrieved by
1759 that name. Like the size, it is used to validate what was retrieved by
1759 the client matches what the server knows about the bundle.
1760 the client matches what the server knows about the bundle.
1760
1761
1761 When multiple digest types are given, all of them are checked.
1762 When multiple digest types are given, all of them are checked.
1762 """
1763 """
1763 try:
1764 try:
1764 raw_url = inpart.params['url']
1765 raw_url = inpart.params['url']
1765 except KeyError:
1766 except KeyError:
1766 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1767 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1767 parsed_url = util.url(raw_url)
1768 parsed_url = util.url(raw_url)
1768 if parsed_url.scheme not in capabilities['remote-changegroup']:
1769 if parsed_url.scheme not in capabilities['remote-changegroup']:
1769 raise error.Abort(_('remote-changegroup does not support %s urls') %
1770 raise error.Abort(_('remote-changegroup does not support %s urls') %
1770 parsed_url.scheme)
1771 parsed_url.scheme)
1771
1772
1772 try:
1773 try:
1773 size = int(inpart.params['size'])
1774 size = int(inpart.params['size'])
1774 except ValueError:
1775 except ValueError:
1775 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1776 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1776 % 'size')
1777 % 'size')
1777 except KeyError:
1778 except KeyError:
1778 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1779 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1779
1780
1780 digests = {}
1781 digests = {}
1781 for typ in inpart.params.get('digests', '').split():
1782 for typ in inpart.params.get('digests', '').split():
1782 param = 'digest:%s' % typ
1783 param = 'digest:%s' % typ
1783 try:
1784 try:
1784 value = inpart.params[param]
1785 value = inpart.params[param]
1785 except KeyError:
1786 except KeyError:
1786 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1787 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1787 param)
1788 param)
1788 digests[typ] = value
1789 digests[typ] = value
1789
1790
1790 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1791 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1791
1792
1792 tr = op.gettransaction()
1793 tr = op.gettransaction()
1793 from . import exchange
1794 from . import exchange
1794 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1795 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1795 if not isinstance(cg, changegroup.cg1unpacker):
1796 if not isinstance(cg, changegroup.cg1unpacker):
1796 raise error.Abort(_('%s: not a bundle version 1.0') %
1797 raise error.Abort(_('%s: not a bundle version 1.0') %
1797 util.hidepassword(raw_url))
1798 util.hidepassword(raw_url))
1798 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2')
1799 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2')
1799 if op.reply is not None:
1800 if op.reply is not None:
1800 # This is definitely not the final form of this
1801 # This is definitely not the final form of this
1801 # return. But one need to start somewhere.
1802 # return. But one need to start somewhere.
1802 part = op.reply.newpart('reply:changegroup')
1803 part = op.reply.newpart('reply:changegroup')
1803 part.addparam(
1804 part.addparam(
1804 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1805 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1805 part.addparam('return', '%i' % ret, mandatory=False)
1806 part.addparam('return', '%i' % ret, mandatory=False)
1806 try:
1807 try:
1807 real_part.validate()
1808 real_part.validate()
1808 except error.Abort as e:
1809 except error.Abort as e:
1809 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1810 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1810 (util.hidepassword(raw_url), str(e)))
1811 (util.hidepassword(raw_url), str(e)))
1811 assert not inpart.read()
1812 assert not inpart.read()
1812
1813
1813 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1814 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1814 def handlereplychangegroup(op, inpart):
1815 def handlereplychangegroup(op, inpart):
1815 ret = int(inpart.params['return'])
1816 ret = int(inpart.params['return'])
1816 replyto = int(inpart.params['in-reply-to'])
1817 replyto = int(inpart.params['in-reply-to'])
1817 op.records.add('changegroup', {'return': ret}, replyto)
1818 op.records.add('changegroup', {'return': ret}, replyto)
1818
1819
1819 @parthandler('check:bookmarks')
1820 @parthandler('check:bookmarks')
1820 def handlecheckbookmarks(op, inpart):
1821 def handlecheckbookmarks(op, inpart):
1821 """check location of bookmarks
1822 """check location of bookmarks
1822
1823
1823 This part is to be used to detect push race regarding bookmark, it
1824 This part is to be used to detect push race regarding bookmark, it
1824 contains binary encoded (bookmark, node) tuple. If the local state does
1825 contains binary encoded (bookmark, node) tuple. If the local state does
1825 not marks the one in the part, a PushRaced exception is raised
1826 not marks the one in the part, a PushRaced exception is raised
1826 """
1827 """
1827 bookdata = bookmarks.binarydecode(inpart)
1828 bookdata = bookmarks.binarydecode(inpart)
1828
1829
1829 msgstandard = ('repository changed while pushing - please try again '
1830 msgstandard = ('repository changed while pushing - please try again '
1830 '(bookmark "%s" move from %s to %s)')
1831 '(bookmark "%s" move from %s to %s)')
1831 msgmissing = ('repository changed while pushing - please try again '
1832 msgmissing = ('repository changed while pushing - please try again '
1832 '(bookmark "%s" is missing, expected %s)')
1833 '(bookmark "%s" is missing, expected %s)')
1833 msgexist = ('repository changed while pushing - please try again '
1834 msgexist = ('repository changed while pushing - please try again '
1834 '(bookmark "%s" set on %s, expected missing)')
1835 '(bookmark "%s" set on %s, expected missing)')
1835 for book, node in bookdata:
1836 for book, node in bookdata:
1836 currentnode = op.repo._bookmarks.get(book)
1837 currentnode = op.repo._bookmarks.get(book)
1837 if currentnode != node:
1838 if currentnode != node:
1838 if node is None:
1839 if node is None:
1839 finalmsg = msgexist % (book, nodemod.short(currentnode))
1840 finalmsg = msgexist % (book, nodemod.short(currentnode))
1840 elif currentnode is None:
1841 elif currentnode is None:
1841 finalmsg = msgmissing % (book, nodemod.short(node))
1842 finalmsg = msgmissing % (book, nodemod.short(node))
1842 else:
1843 else:
1843 finalmsg = msgstandard % (book, nodemod.short(node),
1844 finalmsg = msgstandard % (book, nodemod.short(node),
1844 nodemod.short(currentnode))
1845 nodemod.short(currentnode))
1845 raise error.PushRaced(finalmsg)
1846 raise error.PushRaced(finalmsg)
1846
1847
1847 @parthandler('check:heads')
1848 @parthandler('check:heads')
1848 def handlecheckheads(op, inpart):
1849 def handlecheckheads(op, inpart):
1849 """check that head of the repo did not change
1850 """check that head of the repo did not change
1850
1851
1851 This is used to detect a push race when using unbundle.
1852 This is used to detect a push race when using unbundle.
1852 This replaces the "heads" argument of unbundle."""
1853 This replaces the "heads" argument of unbundle."""
1853 h = inpart.read(20)
1854 h = inpart.read(20)
1854 heads = []
1855 heads = []
1855 while len(h) == 20:
1856 while len(h) == 20:
1856 heads.append(h)
1857 heads.append(h)
1857 h = inpart.read(20)
1858 h = inpart.read(20)
1858 assert not h
1859 assert not h
1859 # Trigger a transaction so that we are guaranteed to have the lock now.
1860 # Trigger a transaction so that we are guaranteed to have the lock now.
1860 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1861 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1861 op.gettransaction()
1862 op.gettransaction()
1862 if sorted(heads) != sorted(op.repo.heads()):
1863 if sorted(heads) != sorted(op.repo.heads()):
1863 raise error.PushRaced('repository changed while pushing - '
1864 raise error.PushRaced('repository changed while pushing - '
1864 'please try again')
1865 'please try again')
1865
1866
1866 @parthandler('check:updated-heads')
1867 @parthandler('check:updated-heads')
1867 def handlecheckupdatedheads(op, inpart):
1868 def handlecheckupdatedheads(op, inpart):
1868 """check for race on the heads touched by a push
1869 """check for race on the heads touched by a push
1869
1870
1870 This is similar to 'check:heads' but focus on the heads actually updated
1871 This is similar to 'check:heads' but focus on the heads actually updated
1871 during the push. If other activities happen on unrelated heads, it is
1872 during the push. If other activities happen on unrelated heads, it is
1872 ignored.
1873 ignored.
1873
1874
1874 This allow server with high traffic to avoid push contention as long as
1875 This allow server with high traffic to avoid push contention as long as
1875 unrelated parts of the graph are involved."""
1876 unrelated parts of the graph are involved."""
1876 h = inpart.read(20)
1877 h = inpart.read(20)
1877 heads = []
1878 heads = []
1878 while len(h) == 20:
1879 while len(h) == 20:
1879 heads.append(h)
1880 heads.append(h)
1880 h = inpart.read(20)
1881 h = inpart.read(20)
1881 assert not h
1882 assert not h
1882 # trigger a transaction so that we are guaranteed to have the lock now.
1883 # trigger a transaction so that we are guaranteed to have the lock now.
1883 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1884 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1884 op.gettransaction()
1885 op.gettransaction()
1885
1886
1886 currentheads = set()
1887 currentheads = set()
1887 for ls in op.repo.branchmap().itervalues():
1888 for ls in op.repo.branchmap().itervalues():
1888 currentheads.update(ls)
1889 currentheads.update(ls)
1889
1890
1890 for h in heads:
1891 for h in heads:
1891 if h not in currentheads:
1892 if h not in currentheads:
1892 raise error.PushRaced('repository changed while pushing - '
1893 raise error.PushRaced('repository changed while pushing - '
1893 'please try again')
1894 'please try again')
1894
1895
1895 @parthandler('check:phases')
1896 @parthandler('check:phases')
1896 def handlecheckphases(op, inpart):
1897 def handlecheckphases(op, inpart):
1897 """check that phase boundaries of the repository did not change
1898 """check that phase boundaries of the repository did not change
1898
1899
1899 This is used to detect a push race.
1900 This is used to detect a push race.
1900 """
1901 """
1901 phasetonodes = phases.binarydecode(inpart)
1902 phasetonodes = phases.binarydecode(inpart)
1902 unfi = op.repo.unfiltered()
1903 unfi = op.repo.unfiltered()
1903 cl = unfi.changelog
1904 cl = unfi.changelog
1904 phasecache = unfi._phasecache
1905 phasecache = unfi._phasecache
1905 msg = ('repository changed while pushing - please try again '
1906 msg = ('repository changed while pushing - please try again '
1906 '(%s is %s expected %s)')
1907 '(%s is %s expected %s)')
1907 for expectedphase, nodes in enumerate(phasetonodes):
1908 for expectedphase, nodes in enumerate(phasetonodes):
1908 for n in nodes:
1909 for n in nodes:
1909 actualphase = phasecache.phase(unfi, cl.rev(n))
1910 actualphase = phasecache.phase(unfi, cl.rev(n))
1910 if actualphase != expectedphase:
1911 if actualphase != expectedphase:
1911 finalmsg = msg % (nodemod.short(n),
1912 finalmsg = msg % (nodemod.short(n),
1912 phases.phasenames[actualphase],
1913 phases.phasenames[actualphase],
1913 phases.phasenames[expectedphase])
1914 phases.phasenames[expectedphase])
1914 raise error.PushRaced(finalmsg)
1915 raise error.PushRaced(finalmsg)
1915
1916
1916 @parthandler('output')
1917 @parthandler('output')
1917 def handleoutput(op, inpart):
1918 def handleoutput(op, inpart):
1918 """forward output captured on the server to the client"""
1919 """forward output captured on the server to the client"""
1919 for line in inpart.read().splitlines():
1920 for line in inpart.read().splitlines():
1920 op.ui.status(_('remote: %s\n') % line)
1921 op.ui.status(_('remote: %s\n') % line)
1921
1922
1922 @parthandler('replycaps')
1923 @parthandler('replycaps')
1923 def handlereplycaps(op, inpart):
1924 def handlereplycaps(op, inpart):
1924 """Notify that a reply bundle should be created
1925 """Notify that a reply bundle should be created
1925
1926
1926 The payload contains the capabilities information for the reply"""
1927 The payload contains the capabilities information for the reply"""
1927 caps = decodecaps(inpart.read())
1928 caps = decodecaps(inpart.read())
1928 if op.reply is None:
1929 if op.reply is None:
1929 op.reply = bundle20(op.ui, caps)
1930 op.reply = bundle20(op.ui, caps)
1930
1931
1931 class AbortFromPart(error.Abort):
1932 class AbortFromPart(error.Abort):
1932 """Sub-class of Abort that denotes an error from a bundle2 part."""
1933 """Sub-class of Abort that denotes an error from a bundle2 part."""
1933
1934
1934 @parthandler('error:abort', ('message', 'hint'))
1935 @parthandler('error:abort', ('message', 'hint'))
1935 def handleerrorabort(op, inpart):
1936 def handleerrorabort(op, inpart):
1936 """Used to transmit abort error over the wire"""
1937 """Used to transmit abort error over the wire"""
1937 raise AbortFromPart(inpart.params['message'],
1938 raise AbortFromPart(inpart.params['message'],
1938 hint=inpart.params.get('hint'))
1939 hint=inpart.params.get('hint'))
1939
1940
1940 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1941 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1941 'in-reply-to'))
1942 'in-reply-to'))
1942 def handleerrorpushkey(op, inpart):
1943 def handleerrorpushkey(op, inpart):
1943 """Used to transmit failure of a mandatory pushkey over the wire"""
1944 """Used to transmit failure of a mandatory pushkey over the wire"""
1944 kwargs = {}
1945 kwargs = {}
1945 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1946 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1946 value = inpart.params.get(name)
1947 value = inpart.params.get(name)
1947 if value is not None:
1948 if value is not None:
1948 kwargs[name] = value
1949 kwargs[name] = value
1949 raise error.PushkeyFailed(inpart.params['in-reply-to'],
1950 raise error.PushkeyFailed(inpart.params['in-reply-to'],
1950 **pycompat.strkwargs(kwargs))
1951 **pycompat.strkwargs(kwargs))
1951
1952
1952 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1953 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1953 def handleerrorunsupportedcontent(op, inpart):
1954 def handleerrorunsupportedcontent(op, inpart):
1954 """Used to transmit unknown content error over the wire"""
1955 """Used to transmit unknown content error over the wire"""
1955 kwargs = {}
1956 kwargs = {}
1956 parttype = inpart.params.get('parttype')
1957 parttype = inpart.params.get('parttype')
1957 if parttype is not None:
1958 if parttype is not None:
1958 kwargs['parttype'] = parttype
1959 kwargs['parttype'] = parttype
1959 params = inpart.params.get('params')
1960 params = inpart.params.get('params')
1960 if params is not None:
1961 if params is not None:
1961 kwargs['params'] = params.split('\0')
1962 kwargs['params'] = params.split('\0')
1962
1963
1963 raise error.BundleUnknownFeatureError(**pycompat.strkwargs(kwargs))
1964 raise error.BundleUnknownFeatureError(**pycompat.strkwargs(kwargs))
1964
1965
1965 @parthandler('error:pushraced', ('message',))
1966 @parthandler('error:pushraced', ('message',))
1966 def handleerrorpushraced(op, inpart):
1967 def handleerrorpushraced(op, inpart):
1967 """Used to transmit push race error over the wire"""
1968 """Used to transmit push race error over the wire"""
1968 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1969 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1969
1970
1970 @parthandler('listkeys', ('namespace',))
1971 @parthandler('listkeys', ('namespace',))
1971 def handlelistkeys(op, inpart):
1972 def handlelistkeys(op, inpart):
1972 """retrieve pushkey namespace content stored in a bundle2"""
1973 """retrieve pushkey namespace content stored in a bundle2"""
1973 namespace = inpart.params['namespace']
1974 namespace = inpart.params['namespace']
1974 r = pushkey.decodekeys(inpart.read())
1975 r = pushkey.decodekeys(inpart.read())
1975 op.records.add('listkeys', (namespace, r))
1976 op.records.add('listkeys', (namespace, r))
1976
1977
1977 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1978 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1978 def handlepushkey(op, inpart):
1979 def handlepushkey(op, inpart):
1979 """process a pushkey request"""
1980 """process a pushkey request"""
1980 dec = pushkey.decode
1981 dec = pushkey.decode
1981 namespace = dec(inpart.params['namespace'])
1982 namespace = dec(inpart.params['namespace'])
1982 key = dec(inpart.params['key'])
1983 key = dec(inpart.params['key'])
1983 old = dec(inpart.params['old'])
1984 old = dec(inpart.params['old'])
1984 new = dec(inpart.params['new'])
1985 new = dec(inpart.params['new'])
1985 # Grab the transaction to ensure that we have the lock before performing the
1986 # Grab the transaction to ensure that we have the lock before performing the
1986 # pushkey.
1987 # pushkey.
1987 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1988 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1988 op.gettransaction()
1989 op.gettransaction()
1989 ret = op.repo.pushkey(namespace, key, old, new)
1990 ret = op.repo.pushkey(namespace, key, old, new)
1990 record = {'namespace': namespace,
1991 record = {'namespace': namespace,
1991 'key': key,
1992 'key': key,
1992 'old': old,
1993 'old': old,
1993 'new': new}
1994 'new': new}
1994 op.records.add('pushkey', record)
1995 op.records.add('pushkey', record)
1995 if op.reply is not None:
1996 if op.reply is not None:
1996 rpart = op.reply.newpart('reply:pushkey')
1997 rpart = op.reply.newpart('reply:pushkey')
1997 rpart.addparam(
1998 rpart.addparam(
1998 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1999 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1999 rpart.addparam('return', '%i' % ret, mandatory=False)
2000 rpart.addparam('return', '%i' % ret, mandatory=False)
2000 if inpart.mandatory and not ret:
2001 if inpart.mandatory and not ret:
2001 kwargs = {}
2002 kwargs = {}
2002 for key in ('namespace', 'key', 'new', 'old', 'ret'):
2003 for key in ('namespace', 'key', 'new', 'old', 'ret'):
2003 if key in inpart.params:
2004 if key in inpart.params:
2004 kwargs[key] = inpart.params[key]
2005 kwargs[key] = inpart.params[key]
2005 raise error.PushkeyFailed(partid='%d' % inpart.id,
2006 raise error.PushkeyFailed(partid='%d' % inpart.id,
2006 **pycompat.strkwargs(kwargs))
2007 **pycompat.strkwargs(kwargs))
2007
2008
2008 @parthandler('bookmarks')
2009 @parthandler('bookmarks')
2009 def handlebookmark(op, inpart):
2010 def handlebookmark(op, inpart):
2010 """transmit bookmark information
2011 """transmit bookmark information
2011
2012
2012 The part contains binary encoded bookmark information.
2013 The part contains binary encoded bookmark information.
2013
2014
2014 The exact behavior of this part can be controlled by the 'bookmarks' mode
2015 The exact behavior of this part can be controlled by the 'bookmarks' mode
2015 on the bundle operation.
2016 on the bundle operation.
2016
2017
2017 When mode is 'apply' (the default) the bookmark information is applied as
2018 When mode is 'apply' (the default) the bookmark information is applied as
2018 is to the unbundling repository. Make sure a 'check:bookmarks' part is
2019 is to the unbundling repository. Make sure a 'check:bookmarks' part is
2019 issued earlier to check for push races in such update. This behavior is
2020 issued earlier to check for push races in such update. This behavior is
2020 suitable for pushing.
2021 suitable for pushing.
2021
2022
2022 When mode is 'records', the information is recorded into the 'bookmarks'
2023 When mode is 'records', the information is recorded into the 'bookmarks'
2023 records of the bundle operation. This behavior is suitable for pulling.
2024 records of the bundle operation. This behavior is suitable for pulling.
2024 """
2025 """
2025 changes = bookmarks.binarydecode(inpart)
2026 changes = bookmarks.binarydecode(inpart)
2026
2027
2027 pushkeycompat = op.repo.ui.configbool('server', 'bookmarks-pushkey-compat')
2028 pushkeycompat = op.repo.ui.configbool('server', 'bookmarks-pushkey-compat')
2028 bookmarksmode = op.modes.get('bookmarks', 'apply')
2029 bookmarksmode = op.modes.get('bookmarks', 'apply')
2029
2030
2030 if bookmarksmode == 'apply':
2031 if bookmarksmode == 'apply':
2031 tr = op.gettransaction()
2032 tr = op.gettransaction()
2032 bookstore = op.repo._bookmarks
2033 bookstore = op.repo._bookmarks
2033 if pushkeycompat:
2034 if pushkeycompat:
2034 allhooks = []
2035 allhooks = []
2035 for book, node in changes:
2036 for book, node in changes:
2036 hookargs = tr.hookargs.copy()
2037 hookargs = tr.hookargs.copy()
2037 hookargs['pushkeycompat'] = '1'
2038 hookargs['pushkeycompat'] = '1'
2038 hookargs['namespace'] = 'bookmarks'
2039 hookargs['namespace'] = 'bookmarks'
2039 hookargs['key'] = book
2040 hookargs['key'] = book
2040 hookargs['old'] = nodemod.hex(bookstore.get(book, ''))
2041 hookargs['old'] = nodemod.hex(bookstore.get(book, ''))
2041 hookargs['new'] = nodemod.hex(node if node is not None else '')
2042 hookargs['new'] = nodemod.hex(node if node is not None else '')
2042 allhooks.append(hookargs)
2043 allhooks.append(hookargs)
2043
2044
2044 for hookargs in allhooks:
2045 for hookargs in allhooks:
2045 op.repo.hook('prepushkey', throw=True,
2046 op.repo.hook('prepushkey', throw=True,
2046 **pycompat.strkwargs(hookargs))
2047 **pycompat.strkwargs(hookargs))
2047
2048
2048 bookstore.applychanges(op.repo, op.gettransaction(), changes)
2049 bookstore.applychanges(op.repo, op.gettransaction(), changes)
2049
2050
2050 if pushkeycompat:
2051 if pushkeycompat:
2051 def runhook():
2052 def runhook():
2052 for hookargs in allhooks:
2053 for hookargs in allhooks:
2053 op.repo.hook('pushkey', **pycompat.strkwargs(hookargs))
2054 op.repo.hook('pushkey', **pycompat.strkwargs(hookargs))
2054 op.repo._afterlock(runhook)
2055 op.repo._afterlock(runhook)
2055
2056
2056 elif bookmarksmode == 'records':
2057 elif bookmarksmode == 'records':
2057 for book, node in changes:
2058 for book, node in changes:
2058 record = {'bookmark': book, 'node': node}
2059 record = {'bookmark': book, 'node': node}
2059 op.records.add('bookmarks', record)
2060 op.records.add('bookmarks', record)
2060 else:
2061 else:
2061 raise error.ProgrammingError('unkown bookmark mode: %s' % bookmarksmode)
2062 raise error.ProgrammingError('unkown bookmark mode: %s' % bookmarksmode)
2062
2063
2063 @parthandler('phase-heads')
2064 @parthandler('phase-heads')
2064 def handlephases(op, inpart):
2065 def handlephases(op, inpart):
2065 """apply phases from bundle part to repo"""
2066 """apply phases from bundle part to repo"""
2066 headsbyphase = phases.binarydecode(inpart)
2067 headsbyphase = phases.binarydecode(inpart)
2067 phases.updatephases(op.repo.unfiltered(), op.gettransaction, headsbyphase)
2068 phases.updatephases(op.repo.unfiltered(), op.gettransaction, headsbyphase)
2068
2069
2069 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
2070 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
2070 def handlepushkeyreply(op, inpart):
2071 def handlepushkeyreply(op, inpart):
2071 """retrieve the result of a pushkey request"""
2072 """retrieve the result of a pushkey request"""
2072 ret = int(inpart.params['return'])
2073 ret = int(inpart.params['return'])
2073 partid = int(inpart.params['in-reply-to'])
2074 partid = int(inpart.params['in-reply-to'])
2074 op.records.add('pushkey', {'return': ret}, partid)
2075 op.records.add('pushkey', {'return': ret}, partid)
2075
2076
2076 @parthandler('obsmarkers')
2077 @parthandler('obsmarkers')
2077 def handleobsmarker(op, inpart):
2078 def handleobsmarker(op, inpart):
2078 """add a stream of obsmarkers to the repo"""
2079 """add a stream of obsmarkers to the repo"""
2079 tr = op.gettransaction()
2080 tr = op.gettransaction()
2080 markerdata = inpart.read()
2081 markerdata = inpart.read()
2081 if op.ui.config('experimental', 'obsmarkers-exchange-debug'):
2082 if op.ui.config('experimental', 'obsmarkers-exchange-debug'):
2082 op.ui.write(('obsmarker-exchange: %i bytes received\n')
2083 op.ui.write(('obsmarker-exchange: %i bytes received\n')
2083 % len(markerdata))
2084 % len(markerdata))
2084 # The mergemarkers call will crash if marker creation is not enabled.
2085 # The mergemarkers call will crash if marker creation is not enabled.
2085 # we want to avoid this if the part is advisory.
2086 # we want to avoid this if the part is advisory.
2086 if not inpart.mandatory and op.repo.obsstore.readonly:
2087 if not inpart.mandatory and op.repo.obsstore.readonly:
2087 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled\n')
2088 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled\n')
2088 return
2089 return
2089 new = op.repo.obsstore.mergemarkers(tr, markerdata)
2090 new = op.repo.obsstore.mergemarkers(tr, markerdata)
2090 op.repo.invalidatevolatilesets()
2091 op.repo.invalidatevolatilesets()
2091 if new:
2092 if new:
2092 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
2093 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
2093 op.records.add('obsmarkers', {'new': new})
2094 op.records.add('obsmarkers', {'new': new})
2094 if op.reply is not None:
2095 if op.reply is not None:
2095 rpart = op.reply.newpart('reply:obsmarkers')
2096 rpart = op.reply.newpart('reply:obsmarkers')
2096 rpart.addparam(
2097 rpart.addparam(
2097 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
2098 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
2098 rpart.addparam('new', '%i' % new, mandatory=False)
2099 rpart.addparam('new', '%i' % new, mandatory=False)
2099
2100
2100
2101
2101 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
2102 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
2102 def handleobsmarkerreply(op, inpart):
2103 def handleobsmarkerreply(op, inpart):
2103 """retrieve the result of a pushkey request"""
2104 """retrieve the result of a pushkey request"""
2104 ret = int(inpart.params['new'])
2105 ret = int(inpart.params['new'])
2105 partid = int(inpart.params['in-reply-to'])
2106 partid = int(inpart.params['in-reply-to'])
2106 op.records.add('obsmarkers', {'new': ret}, partid)
2107 op.records.add('obsmarkers', {'new': ret}, partid)
2107
2108
2108 @parthandler('hgtagsfnodes')
2109 @parthandler('hgtagsfnodes')
2109 def handlehgtagsfnodes(op, inpart):
2110 def handlehgtagsfnodes(op, inpart):
2110 """Applies .hgtags fnodes cache entries to the local repo.
2111 """Applies .hgtags fnodes cache entries to the local repo.
2111
2112
2112 Payload is pairs of 20 byte changeset nodes and filenodes.
2113 Payload is pairs of 20 byte changeset nodes and filenodes.
2113 """
2114 """
2114 # Grab the transaction so we ensure that we have the lock at this point.
2115 # Grab the transaction so we ensure that we have the lock at this point.
2115 if op.ui.configbool('experimental', 'bundle2lazylocking'):
2116 if op.ui.configbool('experimental', 'bundle2lazylocking'):
2116 op.gettransaction()
2117 op.gettransaction()
2117 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
2118 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
2118
2119
2119 count = 0
2120 count = 0
2120 while True:
2121 while True:
2121 node = inpart.read(20)
2122 node = inpart.read(20)
2122 fnode = inpart.read(20)
2123 fnode = inpart.read(20)
2123 if len(node) < 20 or len(fnode) < 20:
2124 if len(node) < 20 or len(fnode) < 20:
2124 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
2125 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
2125 break
2126 break
2126 cache.setfnode(node, fnode)
2127 cache.setfnode(node, fnode)
2127 count += 1
2128 count += 1
2128
2129
2129 cache.write()
2130 cache.write()
2130 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
2131 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
2131
2132
2133 rbcstruct = struct.Struct('>III')
2134
2135 @parthandler('cache:rev-branch-cache')
2136 def handlerbc(op, inpart):
2137 """receive a rev-branch-cache payload and update the local cache
2138
2139 The payload is a series of data related to each branch
2140
2141 1) branch name length
2142 2) number of open heads
2143 3) number of closed heads
2144 4) open heads nodes
2145 5) closed heads nodes
2146 """
2147 total = 0
2148 rawheader = inpart.read(rbcstruct.size)
2149 cache = op.repo.revbranchcache()
2150 cl = op.repo.unfiltered().changelog
2151 while rawheader:
2152 header = rbcstruct.unpack(rawheader)
2153 total += header[1] + header[2]
2154 utf8branch = inpart.read(header[0])
2155 branch = encoding.tolocal(utf8branch)
2156 for x in xrange(header[1]):
2157 node = inpart.read(20)
2158 rev = cl.rev(node)
2159 cache.setdata(branch, rev, node, False)
2160 for x in xrange(header[2]):
2161 node = inpart.read(20)
2162 rev = cl.rev(node)
2163 cache.setdata(branch, rev, node, True)
2164 rawheader = inpart.read(rbcstruct.size)
2165 cache.write()
2166
2132 @parthandler('pushvars')
2167 @parthandler('pushvars')
2133 def bundle2getvars(op, part):
2168 def bundle2getvars(op, part):
2134 '''unbundle a bundle2 containing shellvars on the server'''
2169 '''unbundle a bundle2 containing shellvars on the server'''
2135 # An option to disable unbundling on server-side for security reasons
2170 # An option to disable unbundling on server-side for security reasons
2136 if op.ui.configbool('push', 'pushvars.server'):
2171 if op.ui.configbool('push', 'pushvars.server'):
2137 hookargs = {}
2172 hookargs = {}
2138 for key, value in part.advisoryparams:
2173 for key, value in part.advisoryparams:
2139 key = key.upper()
2174 key = key.upper()
2140 # We want pushed variables to have USERVAR_ prepended so we know
2175 # We want pushed variables to have USERVAR_ prepended so we know
2141 # they came from the --pushvar flag.
2176 # they came from the --pushvar flag.
2142 key = "USERVAR_" + key
2177 key = "USERVAR_" + key
2143 hookargs[key] = value
2178 hookargs[key] = value
2144 op.addhookargs(hookargs)
2179 op.addhookargs(hookargs)
2145
2180
2146 @parthandler('stream2', ('requirements', 'filecount', 'bytecount'))
2181 @parthandler('stream2', ('requirements', 'filecount', 'bytecount'))
2147 def handlestreamv2bundle(op, part):
2182 def handlestreamv2bundle(op, part):
2148
2183
2149 requirements = urlreq.unquote(part.params['requirements']).split(',')
2184 requirements = urlreq.unquote(part.params['requirements']).split(',')
2150 filecount = int(part.params['filecount'])
2185 filecount = int(part.params['filecount'])
2151 bytecount = int(part.params['bytecount'])
2186 bytecount = int(part.params['bytecount'])
2152
2187
2153 repo = op.repo
2188 repo = op.repo
2154 if len(repo):
2189 if len(repo):
2155 msg = _('cannot apply stream clone to non empty repository')
2190 msg = _('cannot apply stream clone to non empty repository')
2156 raise error.Abort(msg)
2191 raise error.Abort(msg)
2157
2192
2158 repo.ui.debug('applying stream bundle\n')
2193 repo.ui.debug('applying stream bundle\n')
2159 streamclone.applybundlev2(repo, part, filecount, bytecount,
2194 streamclone.applybundlev2(repo, part, filecount, bytecount,
2160 requirements)
2195 requirements)
General Comments 0
You need to be logged in to leave comments. Login now