##// END OF EJS Templates
rev-branch-cache: add a function to generate a part...
Boris Feld -
r36982:79b73be4 default
parent child Browse files
Show More
@@ -1,2195 +1,2218 b''
1 # bundle2.py - generic container format to transmit arbitrary data.
1 # bundle2.py - generic container format to transmit arbitrary data.
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7 """Handling of the new bundle2 format
7 """Handling of the new bundle2 format
8
8
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 payloads in an application agnostic way. It consist in a sequence of "parts"
10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 that will be handed to and processed by the application layer.
11 that will be handed to and processed by the application layer.
12
12
13
13
14 General format architecture
14 General format architecture
15 ===========================
15 ===========================
16
16
17 The format is architectured as follow
17 The format is architectured as follow
18
18
19 - magic string
19 - magic string
20 - stream level parameters
20 - stream level parameters
21 - payload parts (any number)
21 - payload parts (any number)
22 - end of stream marker.
22 - end of stream marker.
23
23
24 the Binary format
24 the Binary format
25 ============================
25 ============================
26
26
27 All numbers are unsigned and big-endian.
27 All numbers are unsigned and big-endian.
28
28
29 stream level parameters
29 stream level parameters
30 ------------------------
30 ------------------------
31
31
32 Binary format is as follow
32 Binary format is as follow
33
33
34 :params size: int32
34 :params size: int32
35
35
36 The total number of Bytes used by the parameters
36 The total number of Bytes used by the parameters
37
37
38 :params value: arbitrary number of Bytes
38 :params value: arbitrary number of Bytes
39
39
40 A blob of `params size` containing the serialized version of all stream level
40 A blob of `params size` containing the serialized version of all stream level
41 parameters.
41 parameters.
42
42
43 The blob contains a space separated list of parameters. Parameters with value
43 The blob contains a space separated list of parameters. Parameters with value
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45
45
46 Empty name are obviously forbidden.
46 Empty name are obviously forbidden.
47
47
48 Name MUST start with a letter. If this first letter is lower case, the
48 Name MUST start with a letter. If this first letter is lower case, the
49 parameter is advisory and can be safely ignored. However when the first
49 parameter is advisory and can be safely ignored. However when the first
50 letter is capital, the parameter is mandatory and the bundling process MUST
50 letter is capital, the parameter is mandatory and the bundling process MUST
51 stop if he is not able to proceed it.
51 stop if he is not able to proceed it.
52
52
53 Stream parameters use a simple textual format for two main reasons:
53 Stream parameters use a simple textual format for two main reasons:
54
54
55 - Stream level parameters should remain simple and we want to discourage any
55 - Stream level parameters should remain simple and we want to discourage any
56 crazy usage.
56 crazy usage.
57 - Textual data allow easy human inspection of a bundle2 header in case of
57 - Textual data allow easy human inspection of a bundle2 header in case of
58 troubles.
58 troubles.
59
59
60 Any Applicative level options MUST go into a bundle2 part instead.
60 Any Applicative level options MUST go into a bundle2 part instead.
61
61
62 Payload part
62 Payload part
63 ------------------------
63 ------------------------
64
64
65 Binary format is as follow
65 Binary format is as follow
66
66
67 :header size: int32
67 :header size: int32
68
68
69 The total number of Bytes used by the part header. When the header is empty
69 The total number of Bytes used by the part header. When the header is empty
70 (size = 0) this is interpreted as the end of stream marker.
70 (size = 0) this is interpreted as the end of stream marker.
71
71
72 :header:
72 :header:
73
73
74 The header defines how to interpret the part. It contains two piece of
74 The header defines how to interpret the part. It contains two piece of
75 data: the part type, and the part parameters.
75 data: the part type, and the part parameters.
76
76
77 The part type is used to route an application level handler, that can
77 The part type is used to route an application level handler, that can
78 interpret payload.
78 interpret payload.
79
79
80 Part parameters are passed to the application level handler. They are
80 Part parameters are passed to the application level handler. They are
81 meant to convey information that will help the application level object to
81 meant to convey information that will help the application level object to
82 interpret the part payload.
82 interpret the part payload.
83
83
84 The binary format of the header is has follow
84 The binary format of the header is has follow
85
85
86 :typesize: (one byte)
86 :typesize: (one byte)
87
87
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89
89
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 to this part.
91 to this part.
92
92
93 :parameters:
93 :parameters:
94
94
95 Part's parameter may have arbitrary content, the binary structure is::
95 Part's parameter may have arbitrary content, the binary structure is::
96
96
97 <mandatory-count><advisory-count><param-sizes><param-data>
97 <mandatory-count><advisory-count><param-sizes><param-data>
98
98
99 :mandatory-count: 1 byte, number of mandatory parameters
99 :mandatory-count: 1 byte, number of mandatory parameters
100
100
101 :advisory-count: 1 byte, number of advisory parameters
101 :advisory-count: 1 byte, number of advisory parameters
102
102
103 :param-sizes:
103 :param-sizes:
104
104
105 N couple of bytes, where N is the total number of parameters. Each
105 N couple of bytes, where N is the total number of parameters. Each
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107
107
108 :param-data:
108 :param-data:
109
109
110 A blob of bytes from which each parameter key and value can be
110 A blob of bytes from which each parameter key and value can be
111 retrieved using the list of size couples stored in the previous
111 retrieved using the list of size couples stored in the previous
112 field.
112 field.
113
113
114 Mandatory parameters comes first, then the advisory ones.
114 Mandatory parameters comes first, then the advisory ones.
115
115
116 Each parameter's key MUST be unique within the part.
116 Each parameter's key MUST be unique within the part.
117
117
118 :payload:
118 :payload:
119
119
120 payload is a series of `<chunksize><chunkdata>`.
120 payload is a series of `<chunksize><chunkdata>`.
121
121
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124
124
125 The current implementation always produces either zero or one chunk.
125 The current implementation always produces either zero or one chunk.
126 This is an implementation limitation that will ultimately be lifted.
126 This is an implementation limitation that will ultimately be lifted.
127
127
128 `chunksize` can be negative to trigger special case processing. No such
128 `chunksize` can be negative to trigger special case processing. No such
129 processing is in place yet.
129 processing is in place yet.
130
130
131 Bundle processing
131 Bundle processing
132 ============================
132 ============================
133
133
134 Each part is processed in order using a "part handler". Handler are registered
134 Each part is processed in order using a "part handler". Handler are registered
135 for a certain part type.
135 for a certain part type.
136
136
137 The matching of a part to its handler is case insensitive. The case of the
137 The matching of a part to its handler is case insensitive. The case of the
138 part type is used to know if a part is mandatory or advisory. If the Part type
138 part type is used to know if a part is mandatory or advisory. If the Part type
139 contains any uppercase char it is considered mandatory. When no handler is
139 contains any uppercase char it is considered mandatory. When no handler is
140 known for a Mandatory part, the process is aborted and an exception is raised.
140 known for a Mandatory part, the process is aborted and an exception is raised.
141 If the part is advisory and no handler is known, the part is ignored. When the
141 If the part is advisory and no handler is known, the part is ignored. When the
142 process is aborted, the full bundle is still read from the stream to keep the
142 process is aborted, the full bundle is still read from the stream to keep the
143 channel usable. But none of the part read from an abort are processed. In the
143 channel usable. But none of the part read from an abort are processed. In the
144 future, dropping the stream may become an option for channel we do not care to
144 future, dropping the stream may become an option for channel we do not care to
145 preserve.
145 preserve.
146 """
146 """
147
147
148 from __future__ import absolute_import, division
148 from __future__ import absolute_import, division
149
149
150 import collections
150 import errno
151 import errno
151 import os
152 import os
152 import re
153 import re
153 import string
154 import string
154 import struct
155 import struct
155 import sys
156 import sys
156
157
157 from .i18n import _
158 from .i18n import _
158 from . import (
159 from . import (
159 bookmarks,
160 bookmarks,
160 changegroup,
161 changegroup,
161 encoding,
162 encoding,
162 error,
163 error,
163 node as nodemod,
164 node as nodemod,
164 obsolete,
165 obsolete,
165 phases,
166 phases,
166 pushkey,
167 pushkey,
167 pycompat,
168 pycompat,
168 streamclone,
169 streamclone,
169 tags,
170 tags,
170 url,
171 url,
171 util,
172 util,
172 )
173 )
173
174
174 urlerr = util.urlerr
175 urlerr = util.urlerr
175 urlreq = util.urlreq
176 urlreq = util.urlreq
176
177
177 _pack = struct.pack
178 _pack = struct.pack
178 _unpack = struct.unpack
179 _unpack = struct.unpack
179
180
180 _fstreamparamsize = '>i'
181 _fstreamparamsize = '>i'
181 _fpartheadersize = '>i'
182 _fpartheadersize = '>i'
182 _fparttypesize = '>B'
183 _fparttypesize = '>B'
183 _fpartid = '>I'
184 _fpartid = '>I'
184 _fpayloadsize = '>i'
185 _fpayloadsize = '>i'
185 _fpartparamcount = '>BB'
186 _fpartparamcount = '>BB'
186
187
187 preferedchunksize = 32768
188 preferedchunksize = 32768
188
189
189 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
190 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
190
191
191 def outdebug(ui, message):
192 def outdebug(ui, message):
192 """debug regarding output stream (bundling)"""
193 """debug regarding output stream (bundling)"""
193 if ui.configbool('devel', 'bundle2.debug'):
194 if ui.configbool('devel', 'bundle2.debug'):
194 ui.debug('bundle2-output: %s\n' % message)
195 ui.debug('bundle2-output: %s\n' % message)
195
196
196 def indebug(ui, message):
197 def indebug(ui, message):
197 """debug on input stream (unbundling)"""
198 """debug on input stream (unbundling)"""
198 if ui.configbool('devel', 'bundle2.debug'):
199 if ui.configbool('devel', 'bundle2.debug'):
199 ui.debug('bundle2-input: %s\n' % message)
200 ui.debug('bundle2-input: %s\n' % message)
200
201
201 def validateparttype(parttype):
202 def validateparttype(parttype):
202 """raise ValueError if a parttype contains invalid character"""
203 """raise ValueError if a parttype contains invalid character"""
203 if _parttypeforbidden.search(parttype):
204 if _parttypeforbidden.search(parttype):
204 raise ValueError(parttype)
205 raise ValueError(parttype)
205
206
206 def _makefpartparamsizes(nbparams):
207 def _makefpartparamsizes(nbparams):
207 """return a struct format to read part parameter sizes
208 """return a struct format to read part parameter sizes
208
209
209 The number parameters is variable so we need to build that format
210 The number parameters is variable so we need to build that format
210 dynamically.
211 dynamically.
211 """
212 """
212 return '>'+('BB'*nbparams)
213 return '>'+('BB'*nbparams)
213
214
214 parthandlermapping = {}
215 parthandlermapping = {}
215
216
216 def parthandler(parttype, params=()):
217 def parthandler(parttype, params=()):
217 """decorator that register a function as a bundle2 part handler
218 """decorator that register a function as a bundle2 part handler
218
219
219 eg::
220 eg::
220
221
221 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
222 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
222 def myparttypehandler(...):
223 def myparttypehandler(...):
223 '''process a part of type "my part".'''
224 '''process a part of type "my part".'''
224 ...
225 ...
225 """
226 """
226 validateparttype(parttype)
227 validateparttype(parttype)
227 def _decorator(func):
228 def _decorator(func):
228 lparttype = parttype.lower() # enforce lower case matching.
229 lparttype = parttype.lower() # enforce lower case matching.
229 assert lparttype not in parthandlermapping
230 assert lparttype not in parthandlermapping
230 parthandlermapping[lparttype] = func
231 parthandlermapping[lparttype] = func
231 func.params = frozenset(params)
232 func.params = frozenset(params)
232 return func
233 return func
233 return _decorator
234 return _decorator
234
235
235 class unbundlerecords(object):
236 class unbundlerecords(object):
236 """keep record of what happens during and unbundle
237 """keep record of what happens during and unbundle
237
238
238 New records are added using `records.add('cat', obj)`. Where 'cat' is a
239 New records are added using `records.add('cat', obj)`. Where 'cat' is a
239 category of record and obj is an arbitrary object.
240 category of record and obj is an arbitrary object.
240
241
241 `records['cat']` will return all entries of this category 'cat'.
242 `records['cat']` will return all entries of this category 'cat'.
242
243
243 Iterating on the object itself will yield `('category', obj)` tuples
244 Iterating on the object itself will yield `('category', obj)` tuples
244 for all entries.
245 for all entries.
245
246
246 All iterations happens in chronological order.
247 All iterations happens in chronological order.
247 """
248 """
248
249
249 def __init__(self):
250 def __init__(self):
250 self._categories = {}
251 self._categories = {}
251 self._sequences = []
252 self._sequences = []
252 self._replies = {}
253 self._replies = {}
253
254
254 def add(self, category, entry, inreplyto=None):
255 def add(self, category, entry, inreplyto=None):
255 """add a new record of a given category.
256 """add a new record of a given category.
256
257
257 The entry can then be retrieved in the list returned by
258 The entry can then be retrieved in the list returned by
258 self['category']."""
259 self['category']."""
259 self._categories.setdefault(category, []).append(entry)
260 self._categories.setdefault(category, []).append(entry)
260 self._sequences.append((category, entry))
261 self._sequences.append((category, entry))
261 if inreplyto is not None:
262 if inreplyto is not None:
262 self.getreplies(inreplyto).add(category, entry)
263 self.getreplies(inreplyto).add(category, entry)
263
264
264 def getreplies(self, partid):
265 def getreplies(self, partid):
265 """get the records that are replies to a specific part"""
266 """get the records that are replies to a specific part"""
266 return self._replies.setdefault(partid, unbundlerecords())
267 return self._replies.setdefault(partid, unbundlerecords())
267
268
268 def __getitem__(self, cat):
269 def __getitem__(self, cat):
269 return tuple(self._categories.get(cat, ()))
270 return tuple(self._categories.get(cat, ()))
270
271
271 def __iter__(self):
272 def __iter__(self):
272 return iter(self._sequences)
273 return iter(self._sequences)
273
274
274 def __len__(self):
275 def __len__(self):
275 return len(self._sequences)
276 return len(self._sequences)
276
277
277 def __nonzero__(self):
278 def __nonzero__(self):
278 return bool(self._sequences)
279 return bool(self._sequences)
279
280
280 __bool__ = __nonzero__
281 __bool__ = __nonzero__
281
282
282 class bundleoperation(object):
283 class bundleoperation(object):
283 """an object that represents a single bundling process
284 """an object that represents a single bundling process
284
285
285 Its purpose is to carry unbundle-related objects and states.
286 Its purpose is to carry unbundle-related objects and states.
286
287
287 A new object should be created at the beginning of each bundle processing.
288 A new object should be created at the beginning of each bundle processing.
288 The object is to be returned by the processing function.
289 The object is to be returned by the processing function.
289
290
290 The object has very little content now it will ultimately contain:
291 The object has very little content now it will ultimately contain:
291 * an access to the repo the bundle is applied to,
292 * an access to the repo the bundle is applied to,
292 * a ui object,
293 * a ui object,
293 * a way to retrieve a transaction to add changes to the repo,
294 * a way to retrieve a transaction to add changes to the repo,
294 * a way to record the result of processing each part,
295 * a way to record the result of processing each part,
295 * a way to construct a bundle response when applicable.
296 * a way to construct a bundle response when applicable.
296 """
297 """
297
298
298 def __init__(self, repo, transactiongetter, captureoutput=True):
299 def __init__(self, repo, transactiongetter, captureoutput=True):
299 self.repo = repo
300 self.repo = repo
300 self.ui = repo.ui
301 self.ui = repo.ui
301 self.records = unbundlerecords()
302 self.records = unbundlerecords()
302 self.reply = None
303 self.reply = None
303 self.captureoutput = captureoutput
304 self.captureoutput = captureoutput
304 self.hookargs = {}
305 self.hookargs = {}
305 self._gettransaction = transactiongetter
306 self._gettransaction = transactiongetter
306 # carries value that can modify part behavior
307 # carries value that can modify part behavior
307 self.modes = {}
308 self.modes = {}
308
309
309 def gettransaction(self):
310 def gettransaction(self):
310 transaction = self._gettransaction()
311 transaction = self._gettransaction()
311
312
312 if self.hookargs:
313 if self.hookargs:
313 # the ones added to the transaction supercede those added
314 # the ones added to the transaction supercede those added
314 # to the operation.
315 # to the operation.
315 self.hookargs.update(transaction.hookargs)
316 self.hookargs.update(transaction.hookargs)
316 transaction.hookargs = self.hookargs
317 transaction.hookargs = self.hookargs
317
318
318 # mark the hookargs as flushed. further attempts to add to
319 # mark the hookargs as flushed. further attempts to add to
319 # hookargs will result in an abort.
320 # hookargs will result in an abort.
320 self.hookargs = None
321 self.hookargs = None
321
322
322 return transaction
323 return transaction
323
324
324 def addhookargs(self, hookargs):
325 def addhookargs(self, hookargs):
325 if self.hookargs is None:
326 if self.hookargs is None:
326 raise error.ProgrammingError('attempted to add hookargs to '
327 raise error.ProgrammingError('attempted to add hookargs to '
327 'operation after transaction started')
328 'operation after transaction started')
328 self.hookargs.update(hookargs)
329 self.hookargs.update(hookargs)
329
330
330 class TransactionUnavailable(RuntimeError):
331 class TransactionUnavailable(RuntimeError):
331 pass
332 pass
332
333
333 def _notransaction():
334 def _notransaction():
334 """default method to get a transaction while processing a bundle
335 """default method to get a transaction while processing a bundle
335
336
336 Raise an exception to highlight the fact that no transaction was expected
337 Raise an exception to highlight the fact that no transaction was expected
337 to be created"""
338 to be created"""
338 raise TransactionUnavailable()
339 raise TransactionUnavailable()
339
340
340 def applybundle(repo, unbundler, tr, source=None, url=None, **kwargs):
341 def applybundle(repo, unbundler, tr, source=None, url=None, **kwargs):
341 # transform me into unbundler.apply() as soon as the freeze is lifted
342 # transform me into unbundler.apply() as soon as the freeze is lifted
342 if isinstance(unbundler, unbundle20):
343 if isinstance(unbundler, unbundle20):
343 tr.hookargs['bundle2'] = '1'
344 tr.hookargs['bundle2'] = '1'
344 if source is not None and 'source' not in tr.hookargs:
345 if source is not None and 'source' not in tr.hookargs:
345 tr.hookargs['source'] = source
346 tr.hookargs['source'] = source
346 if url is not None and 'url' not in tr.hookargs:
347 if url is not None and 'url' not in tr.hookargs:
347 tr.hookargs['url'] = url
348 tr.hookargs['url'] = url
348 return processbundle(repo, unbundler, lambda: tr)
349 return processbundle(repo, unbundler, lambda: tr)
349 else:
350 else:
350 # the transactiongetter won't be used, but we might as well set it
351 # the transactiongetter won't be used, but we might as well set it
351 op = bundleoperation(repo, lambda: tr)
352 op = bundleoperation(repo, lambda: tr)
352 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
353 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
353 return op
354 return op
354
355
355 class partiterator(object):
356 class partiterator(object):
356 def __init__(self, repo, op, unbundler):
357 def __init__(self, repo, op, unbundler):
357 self.repo = repo
358 self.repo = repo
358 self.op = op
359 self.op = op
359 self.unbundler = unbundler
360 self.unbundler = unbundler
360 self.iterator = None
361 self.iterator = None
361 self.count = 0
362 self.count = 0
362 self.current = None
363 self.current = None
363
364
364 def __enter__(self):
365 def __enter__(self):
365 def func():
366 def func():
366 itr = enumerate(self.unbundler.iterparts())
367 itr = enumerate(self.unbundler.iterparts())
367 for count, p in itr:
368 for count, p in itr:
368 self.count = count
369 self.count = count
369 self.current = p
370 self.current = p
370 yield p
371 yield p
371 p.consume()
372 p.consume()
372 self.current = None
373 self.current = None
373 self.iterator = func()
374 self.iterator = func()
374 return self.iterator
375 return self.iterator
375
376
376 def __exit__(self, type, exc, tb):
377 def __exit__(self, type, exc, tb):
377 if not self.iterator:
378 if not self.iterator:
378 return
379 return
379
380
380 # Only gracefully abort in a normal exception situation. User aborts
381 # Only gracefully abort in a normal exception situation. User aborts
381 # like Ctrl+C throw a KeyboardInterrupt which is not a base Exception,
382 # like Ctrl+C throw a KeyboardInterrupt which is not a base Exception,
382 # and should not gracefully cleanup.
383 # and should not gracefully cleanup.
383 if isinstance(exc, Exception):
384 if isinstance(exc, Exception):
384 # Any exceptions seeking to the end of the bundle at this point are
385 # Any exceptions seeking to the end of the bundle at this point are
385 # almost certainly related to the underlying stream being bad.
386 # almost certainly related to the underlying stream being bad.
386 # And, chances are that the exception we're handling is related to
387 # And, chances are that the exception we're handling is related to
387 # getting in that bad state. So, we swallow the seeking error and
388 # getting in that bad state. So, we swallow the seeking error and
388 # re-raise the original error.
389 # re-raise the original error.
389 seekerror = False
390 seekerror = False
390 try:
391 try:
391 if self.current:
392 if self.current:
392 # consume the part content to not corrupt the stream.
393 # consume the part content to not corrupt the stream.
393 self.current.consume()
394 self.current.consume()
394
395
395 for part in self.iterator:
396 for part in self.iterator:
396 # consume the bundle content
397 # consume the bundle content
397 part.consume()
398 part.consume()
398 except Exception:
399 except Exception:
399 seekerror = True
400 seekerror = True
400
401
401 # Small hack to let caller code distinguish exceptions from bundle2
402 # Small hack to let caller code distinguish exceptions from bundle2
402 # processing from processing the old format. This is mostly needed
403 # processing from processing the old format. This is mostly needed
403 # to handle different return codes to unbundle according to the type
404 # to handle different return codes to unbundle according to the type
404 # of bundle. We should probably clean up or drop this return code
405 # of bundle. We should probably clean up or drop this return code
405 # craziness in a future version.
406 # craziness in a future version.
406 exc.duringunbundle2 = True
407 exc.duringunbundle2 = True
407 salvaged = []
408 salvaged = []
408 replycaps = None
409 replycaps = None
409 if self.op.reply is not None:
410 if self.op.reply is not None:
410 salvaged = self.op.reply.salvageoutput()
411 salvaged = self.op.reply.salvageoutput()
411 replycaps = self.op.reply.capabilities
412 replycaps = self.op.reply.capabilities
412 exc._replycaps = replycaps
413 exc._replycaps = replycaps
413 exc._bundle2salvagedoutput = salvaged
414 exc._bundle2salvagedoutput = salvaged
414
415
415 # Re-raising from a variable loses the original stack. So only use
416 # Re-raising from a variable loses the original stack. So only use
416 # that form if we need to.
417 # that form if we need to.
417 if seekerror:
418 if seekerror:
418 raise exc
419 raise exc
419
420
420 self.repo.ui.debug('bundle2-input-bundle: %i parts total\n' %
421 self.repo.ui.debug('bundle2-input-bundle: %i parts total\n' %
421 self.count)
422 self.count)
422
423
423 def processbundle(repo, unbundler, transactiongetter=None, op=None):
424 def processbundle(repo, unbundler, transactiongetter=None, op=None):
424 """This function process a bundle, apply effect to/from a repo
425 """This function process a bundle, apply effect to/from a repo
425
426
426 It iterates over each part then searches for and uses the proper handling
427 It iterates over each part then searches for and uses the proper handling
427 code to process the part. Parts are processed in order.
428 code to process the part. Parts are processed in order.
428
429
429 Unknown Mandatory part will abort the process.
430 Unknown Mandatory part will abort the process.
430
431
431 It is temporarily possible to provide a prebuilt bundleoperation to the
432 It is temporarily possible to provide a prebuilt bundleoperation to the
432 function. This is used to ensure output is properly propagated in case of
433 function. This is used to ensure output is properly propagated in case of
433 an error during the unbundling. This output capturing part will likely be
434 an error during the unbundling. This output capturing part will likely be
434 reworked and this ability will probably go away in the process.
435 reworked and this ability will probably go away in the process.
435 """
436 """
436 if op is None:
437 if op is None:
437 if transactiongetter is None:
438 if transactiongetter is None:
438 transactiongetter = _notransaction
439 transactiongetter = _notransaction
439 op = bundleoperation(repo, transactiongetter)
440 op = bundleoperation(repo, transactiongetter)
440 # todo:
441 # todo:
441 # - replace this is a init function soon.
442 # - replace this is a init function soon.
442 # - exception catching
443 # - exception catching
443 unbundler.params
444 unbundler.params
444 if repo.ui.debugflag:
445 if repo.ui.debugflag:
445 msg = ['bundle2-input-bundle:']
446 msg = ['bundle2-input-bundle:']
446 if unbundler.params:
447 if unbundler.params:
447 msg.append(' %i params' % len(unbundler.params))
448 msg.append(' %i params' % len(unbundler.params))
448 if op._gettransaction is None or op._gettransaction is _notransaction:
449 if op._gettransaction is None or op._gettransaction is _notransaction:
449 msg.append(' no-transaction')
450 msg.append(' no-transaction')
450 else:
451 else:
451 msg.append(' with-transaction')
452 msg.append(' with-transaction')
452 msg.append('\n')
453 msg.append('\n')
453 repo.ui.debug(''.join(msg))
454 repo.ui.debug(''.join(msg))
454
455
455 processparts(repo, op, unbundler)
456 processparts(repo, op, unbundler)
456
457
457 return op
458 return op
458
459
459 def processparts(repo, op, unbundler):
460 def processparts(repo, op, unbundler):
460 with partiterator(repo, op, unbundler) as parts:
461 with partiterator(repo, op, unbundler) as parts:
461 for part in parts:
462 for part in parts:
462 _processpart(op, part)
463 _processpart(op, part)
463
464
464 def _processchangegroup(op, cg, tr, source, url, **kwargs):
465 def _processchangegroup(op, cg, tr, source, url, **kwargs):
465 ret = cg.apply(op.repo, tr, source, url, **kwargs)
466 ret = cg.apply(op.repo, tr, source, url, **kwargs)
466 op.records.add('changegroup', {
467 op.records.add('changegroup', {
467 'return': ret,
468 'return': ret,
468 })
469 })
469 return ret
470 return ret
470
471
471 def _gethandler(op, part):
472 def _gethandler(op, part):
472 status = 'unknown' # used by debug output
473 status = 'unknown' # used by debug output
473 try:
474 try:
474 handler = parthandlermapping.get(part.type)
475 handler = parthandlermapping.get(part.type)
475 if handler is None:
476 if handler is None:
476 status = 'unsupported-type'
477 status = 'unsupported-type'
477 raise error.BundleUnknownFeatureError(parttype=part.type)
478 raise error.BundleUnknownFeatureError(parttype=part.type)
478 indebug(op.ui, 'found a handler for part %s' % part.type)
479 indebug(op.ui, 'found a handler for part %s' % part.type)
479 unknownparams = part.mandatorykeys - handler.params
480 unknownparams = part.mandatorykeys - handler.params
480 if unknownparams:
481 if unknownparams:
481 unknownparams = list(unknownparams)
482 unknownparams = list(unknownparams)
482 unknownparams.sort()
483 unknownparams.sort()
483 status = 'unsupported-params (%s)' % ', '.join(unknownparams)
484 status = 'unsupported-params (%s)' % ', '.join(unknownparams)
484 raise error.BundleUnknownFeatureError(parttype=part.type,
485 raise error.BundleUnknownFeatureError(parttype=part.type,
485 params=unknownparams)
486 params=unknownparams)
486 status = 'supported'
487 status = 'supported'
487 except error.BundleUnknownFeatureError as exc:
488 except error.BundleUnknownFeatureError as exc:
488 if part.mandatory: # mandatory parts
489 if part.mandatory: # mandatory parts
489 raise
490 raise
490 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
491 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
491 return # skip to part processing
492 return # skip to part processing
492 finally:
493 finally:
493 if op.ui.debugflag:
494 if op.ui.debugflag:
494 msg = ['bundle2-input-part: "%s"' % part.type]
495 msg = ['bundle2-input-part: "%s"' % part.type]
495 if not part.mandatory:
496 if not part.mandatory:
496 msg.append(' (advisory)')
497 msg.append(' (advisory)')
497 nbmp = len(part.mandatorykeys)
498 nbmp = len(part.mandatorykeys)
498 nbap = len(part.params) - nbmp
499 nbap = len(part.params) - nbmp
499 if nbmp or nbap:
500 if nbmp or nbap:
500 msg.append(' (params:')
501 msg.append(' (params:')
501 if nbmp:
502 if nbmp:
502 msg.append(' %i mandatory' % nbmp)
503 msg.append(' %i mandatory' % nbmp)
503 if nbap:
504 if nbap:
504 msg.append(' %i advisory' % nbmp)
505 msg.append(' %i advisory' % nbmp)
505 msg.append(')')
506 msg.append(')')
506 msg.append(' %s\n' % status)
507 msg.append(' %s\n' % status)
507 op.ui.debug(''.join(msg))
508 op.ui.debug(''.join(msg))
508
509
509 return handler
510 return handler
510
511
511 def _processpart(op, part):
512 def _processpart(op, part):
512 """process a single part from a bundle
513 """process a single part from a bundle
513
514
514 The part is guaranteed to have been fully consumed when the function exits
515 The part is guaranteed to have been fully consumed when the function exits
515 (even if an exception is raised)."""
516 (even if an exception is raised)."""
516 handler = _gethandler(op, part)
517 handler = _gethandler(op, part)
517 if handler is None:
518 if handler is None:
518 return
519 return
519
520
520 # handler is called outside the above try block so that we don't
521 # handler is called outside the above try block so that we don't
521 # risk catching KeyErrors from anything other than the
522 # risk catching KeyErrors from anything other than the
522 # parthandlermapping lookup (any KeyError raised by handler()
523 # parthandlermapping lookup (any KeyError raised by handler()
523 # itself represents a defect of a different variety).
524 # itself represents a defect of a different variety).
524 output = None
525 output = None
525 if op.captureoutput and op.reply is not None:
526 if op.captureoutput and op.reply is not None:
526 op.ui.pushbuffer(error=True, subproc=True)
527 op.ui.pushbuffer(error=True, subproc=True)
527 output = ''
528 output = ''
528 try:
529 try:
529 handler(op, part)
530 handler(op, part)
530 finally:
531 finally:
531 if output is not None:
532 if output is not None:
532 output = op.ui.popbuffer()
533 output = op.ui.popbuffer()
533 if output:
534 if output:
534 outpart = op.reply.newpart('output', data=output,
535 outpart = op.reply.newpart('output', data=output,
535 mandatory=False)
536 mandatory=False)
536 outpart.addparam(
537 outpart.addparam(
537 'in-reply-to', pycompat.bytestr(part.id), mandatory=False)
538 'in-reply-to', pycompat.bytestr(part.id), mandatory=False)
538
539
539 def decodecaps(blob):
540 def decodecaps(blob):
540 """decode a bundle2 caps bytes blob into a dictionary
541 """decode a bundle2 caps bytes blob into a dictionary
541
542
542 The blob is a list of capabilities (one per line)
543 The blob is a list of capabilities (one per line)
543 Capabilities may have values using a line of the form::
544 Capabilities may have values using a line of the form::
544
545
545 capability=value1,value2,value3
546 capability=value1,value2,value3
546
547
547 The values are always a list."""
548 The values are always a list."""
548 caps = {}
549 caps = {}
549 for line in blob.splitlines():
550 for line in blob.splitlines():
550 if not line:
551 if not line:
551 continue
552 continue
552 if '=' not in line:
553 if '=' not in line:
553 key, vals = line, ()
554 key, vals = line, ()
554 else:
555 else:
555 key, vals = line.split('=', 1)
556 key, vals = line.split('=', 1)
556 vals = vals.split(',')
557 vals = vals.split(',')
557 key = urlreq.unquote(key)
558 key = urlreq.unquote(key)
558 vals = [urlreq.unquote(v) for v in vals]
559 vals = [urlreq.unquote(v) for v in vals]
559 caps[key] = vals
560 caps[key] = vals
560 return caps
561 return caps
561
562
562 def encodecaps(caps):
563 def encodecaps(caps):
563 """encode a bundle2 caps dictionary into a bytes blob"""
564 """encode a bundle2 caps dictionary into a bytes blob"""
564 chunks = []
565 chunks = []
565 for ca in sorted(caps):
566 for ca in sorted(caps):
566 vals = caps[ca]
567 vals = caps[ca]
567 ca = urlreq.quote(ca)
568 ca = urlreq.quote(ca)
568 vals = [urlreq.quote(v) for v in vals]
569 vals = [urlreq.quote(v) for v in vals]
569 if vals:
570 if vals:
570 ca = "%s=%s" % (ca, ','.join(vals))
571 ca = "%s=%s" % (ca, ','.join(vals))
571 chunks.append(ca)
572 chunks.append(ca)
572 return '\n'.join(chunks)
573 return '\n'.join(chunks)
573
574
574 bundletypes = {
575 bundletypes = {
575 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
576 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
576 # since the unification ssh accepts a header but there
577 # since the unification ssh accepts a header but there
577 # is no capability signaling it.
578 # is no capability signaling it.
578 "HG20": (), # special-cased below
579 "HG20": (), # special-cased below
579 "HG10UN": ("HG10UN", 'UN'),
580 "HG10UN": ("HG10UN", 'UN'),
580 "HG10BZ": ("HG10", 'BZ'),
581 "HG10BZ": ("HG10", 'BZ'),
581 "HG10GZ": ("HG10GZ", 'GZ'),
582 "HG10GZ": ("HG10GZ", 'GZ'),
582 }
583 }
583
584
584 # hgweb uses this list to communicate its preferred type
585 # hgweb uses this list to communicate its preferred type
585 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
586 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
586
587
587 class bundle20(object):
588 class bundle20(object):
588 """represent an outgoing bundle2 container
589 """represent an outgoing bundle2 container
589
590
590 Use the `addparam` method to add stream level parameter. and `newpart` to
591 Use the `addparam` method to add stream level parameter. and `newpart` to
591 populate it. Then call `getchunks` to retrieve all the binary chunks of
592 populate it. Then call `getchunks` to retrieve all the binary chunks of
592 data that compose the bundle2 container."""
593 data that compose the bundle2 container."""
593
594
594 _magicstring = 'HG20'
595 _magicstring = 'HG20'
595
596
596 def __init__(self, ui, capabilities=()):
597 def __init__(self, ui, capabilities=()):
597 self.ui = ui
598 self.ui = ui
598 self._params = []
599 self._params = []
599 self._parts = []
600 self._parts = []
600 self.capabilities = dict(capabilities)
601 self.capabilities = dict(capabilities)
601 self._compengine = util.compengines.forbundletype('UN')
602 self._compengine = util.compengines.forbundletype('UN')
602 self._compopts = None
603 self._compopts = None
603 # If compression is being handled by a consumer of the raw
604 # If compression is being handled by a consumer of the raw
604 # data (e.g. the wire protocol), unsetting this flag tells
605 # data (e.g. the wire protocol), unsetting this flag tells
605 # consumers that the bundle is best left uncompressed.
606 # consumers that the bundle is best left uncompressed.
606 self.prefercompressed = True
607 self.prefercompressed = True
607
608
608 def setcompression(self, alg, compopts=None):
609 def setcompression(self, alg, compopts=None):
609 """setup core part compression to <alg>"""
610 """setup core part compression to <alg>"""
610 if alg in (None, 'UN'):
611 if alg in (None, 'UN'):
611 return
612 return
612 assert not any(n.lower() == 'compression' for n, v in self._params)
613 assert not any(n.lower() == 'compression' for n, v in self._params)
613 self.addparam('Compression', alg)
614 self.addparam('Compression', alg)
614 self._compengine = util.compengines.forbundletype(alg)
615 self._compengine = util.compengines.forbundletype(alg)
615 self._compopts = compopts
616 self._compopts = compopts
616
617
617 @property
618 @property
618 def nbparts(self):
619 def nbparts(self):
619 """total number of parts added to the bundler"""
620 """total number of parts added to the bundler"""
620 return len(self._parts)
621 return len(self._parts)
621
622
622 # methods used to defines the bundle2 content
623 # methods used to defines the bundle2 content
623 def addparam(self, name, value=None):
624 def addparam(self, name, value=None):
624 """add a stream level parameter"""
625 """add a stream level parameter"""
625 if not name:
626 if not name:
626 raise ValueError(r'empty parameter name')
627 raise ValueError(r'empty parameter name')
627 if name[0:1] not in pycompat.bytestr(string.ascii_letters):
628 if name[0:1] not in pycompat.bytestr(string.ascii_letters):
628 raise ValueError(r'non letter first character: %s' % name)
629 raise ValueError(r'non letter first character: %s' % name)
629 self._params.append((name, value))
630 self._params.append((name, value))
630
631
631 def addpart(self, part):
632 def addpart(self, part):
632 """add a new part to the bundle2 container
633 """add a new part to the bundle2 container
633
634
634 Parts contains the actual applicative payload."""
635 Parts contains the actual applicative payload."""
635 assert part.id is None
636 assert part.id is None
636 part.id = len(self._parts) # very cheap counter
637 part.id = len(self._parts) # very cheap counter
637 self._parts.append(part)
638 self._parts.append(part)
638
639
639 def newpart(self, typeid, *args, **kwargs):
640 def newpart(self, typeid, *args, **kwargs):
640 """create a new part and add it to the containers
641 """create a new part and add it to the containers
641
642
642 As the part is directly added to the containers. For now, this means
643 As the part is directly added to the containers. For now, this means
643 that any failure to properly initialize the part after calling
644 that any failure to properly initialize the part after calling
644 ``newpart`` should result in a failure of the whole bundling process.
645 ``newpart`` should result in a failure of the whole bundling process.
645
646
646 You can still fall back to manually create and add if you need better
647 You can still fall back to manually create and add if you need better
647 control."""
648 control."""
648 part = bundlepart(typeid, *args, **kwargs)
649 part = bundlepart(typeid, *args, **kwargs)
649 self.addpart(part)
650 self.addpart(part)
650 return part
651 return part
651
652
652 # methods used to generate the bundle2 stream
653 # methods used to generate the bundle2 stream
653 def getchunks(self):
654 def getchunks(self):
654 if self.ui.debugflag:
655 if self.ui.debugflag:
655 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
656 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
656 if self._params:
657 if self._params:
657 msg.append(' (%i params)' % len(self._params))
658 msg.append(' (%i params)' % len(self._params))
658 msg.append(' %i parts total\n' % len(self._parts))
659 msg.append(' %i parts total\n' % len(self._parts))
659 self.ui.debug(''.join(msg))
660 self.ui.debug(''.join(msg))
660 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
661 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
661 yield self._magicstring
662 yield self._magicstring
662 param = self._paramchunk()
663 param = self._paramchunk()
663 outdebug(self.ui, 'bundle parameter: %s' % param)
664 outdebug(self.ui, 'bundle parameter: %s' % param)
664 yield _pack(_fstreamparamsize, len(param))
665 yield _pack(_fstreamparamsize, len(param))
665 if param:
666 if param:
666 yield param
667 yield param
667 for chunk in self._compengine.compressstream(self._getcorechunk(),
668 for chunk in self._compengine.compressstream(self._getcorechunk(),
668 self._compopts):
669 self._compopts):
669 yield chunk
670 yield chunk
670
671
671 def _paramchunk(self):
672 def _paramchunk(self):
672 """return a encoded version of all stream parameters"""
673 """return a encoded version of all stream parameters"""
673 blocks = []
674 blocks = []
674 for par, value in self._params:
675 for par, value in self._params:
675 par = urlreq.quote(par)
676 par = urlreq.quote(par)
676 if value is not None:
677 if value is not None:
677 value = urlreq.quote(value)
678 value = urlreq.quote(value)
678 par = '%s=%s' % (par, value)
679 par = '%s=%s' % (par, value)
679 blocks.append(par)
680 blocks.append(par)
680 return ' '.join(blocks)
681 return ' '.join(blocks)
681
682
682 def _getcorechunk(self):
683 def _getcorechunk(self):
683 """yield chunk for the core part of the bundle
684 """yield chunk for the core part of the bundle
684
685
685 (all but headers and parameters)"""
686 (all but headers and parameters)"""
686 outdebug(self.ui, 'start of parts')
687 outdebug(self.ui, 'start of parts')
687 for part in self._parts:
688 for part in self._parts:
688 outdebug(self.ui, 'bundle part: "%s"' % part.type)
689 outdebug(self.ui, 'bundle part: "%s"' % part.type)
689 for chunk in part.getchunks(ui=self.ui):
690 for chunk in part.getchunks(ui=self.ui):
690 yield chunk
691 yield chunk
691 outdebug(self.ui, 'end of bundle')
692 outdebug(self.ui, 'end of bundle')
692 yield _pack(_fpartheadersize, 0)
693 yield _pack(_fpartheadersize, 0)
693
694
694
695
695 def salvageoutput(self):
696 def salvageoutput(self):
696 """return a list with a copy of all output parts in the bundle
697 """return a list with a copy of all output parts in the bundle
697
698
698 This is meant to be used during error handling to make sure we preserve
699 This is meant to be used during error handling to make sure we preserve
699 server output"""
700 server output"""
700 salvaged = []
701 salvaged = []
701 for part in self._parts:
702 for part in self._parts:
702 if part.type.startswith('output'):
703 if part.type.startswith('output'):
703 salvaged.append(part.copy())
704 salvaged.append(part.copy())
704 return salvaged
705 return salvaged
705
706
706
707
707 class unpackermixin(object):
708 class unpackermixin(object):
708 """A mixin to extract bytes and struct data from a stream"""
709 """A mixin to extract bytes and struct data from a stream"""
709
710
710 def __init__(self, fp):
711 def __init__(self, fp):
711 self._fp = fp
712 self._fp = fp
712
713
713 def _unpack(self, format):
714 def _unpack(self, format):
714 """unpack this struct format from the stream
715 """unpack this struct format from the stream
715
716
716 This method is meant for internal usage by the bundle2 protocol only.
717 This method is meant for internal usage by the bundle2 protocol only.
717 They directly manipulate the low level stream including bundle2 level
718 They directly manipulate the low level stream including bundle2 level
718 instruction.
719 instruction.
719
720
720 Do not use it to implement higher-level logic or methods."""
721 Do not use it to implement higher-level logic or methods."""
721 data = self._readexact(struct.calcsize(format))
722 data = self._readexact(struct.calcsize(format))
722 return _unpack(format, data)
723 return _unpack(format, data)
723
724
724 def _readexact(self, size):
725 def _readexact(self, size):
725 """read exactly <size> bytes from the stream
726 """read exactly <size> bytes from the stream
726
727
727 This method is meant for internal usage by the bundle2 protocol only.
728 This method is meant for internal usage by the bundle2 protocol only.
728 They directly manipulate the low level stream including bundle2 level
729 They directly manipulate the low level stream including bundle2 level
729 instruction.
730 instruction.
730
731
731 Do not use it to implement higher-level logic or methods."""
732 Do not use it to implement higher-level logic or methods."""
732 return changegroup.readexactly(self._fp, size)
733 return changegroup.readexactly(self._fp, size)
733
734
734 def getunbundler(ui, fp, magicstring=None):
735 def getunbundler(ui, fp, magicstring=None):
735 """return a valid unbundler object for a given magicstring"""
736 """return a valid unbundler object for a given magicstring"""
736 if magicstring is None:
737 if magicstring is None:
737 magicstring = changegroup.readexactly(fp, 4)
738 magicstring = changegroup.readexactly(fp, 4)
738 magic, version = magicstring[0:2], magicstring[2:4]
739 magic, version = magicstring[0:2], magicstring[2:4]
739 if magic != 'HG':
740 if magic != 'HG':
740 ui.debug(
741 ui.debug(
741 "error: invalid magic: %r (version %r), should be 'HG'\n"
742 "error: invalid magic: %r (version %r), should be 'HG'\n"
742 % (magic, version))
743 % (magic, version))
743 raise error.Abort(_('not a Mercurial bundle'))
744 raise error.Abort(_('not a Mercurial bundle'))
744 unbundlerclass = formatmap.get(version)
745 unbundlerclass = formatmap.get(version)
745 if unbundlerclass is None:
746 if unbundlerclass is None:
746 raise error.Abort(_('unknown bundle version %s') % version)
747 raise error.Abort(_('unknown bundle version %s') % version)
747 unbundler = unbundlerclass(ui, fp)
748 unbundler = unbundlerclass(ui, fp)
748 indebug(ui, 'start processing of %s stream' % magicstring)
749 indebug(ui, 'start processing of %s stream' % magicstring)
749 return unbundler
750 return unbundler
750
751
751 class unbundle20(unpackermixin):
752 class unbundle20(unpackermixin):
752 """interpret a bundle2 stream
753 """interpret a bundle2 stream
753
754
754 This class is fed with a binary stream and yields parts through its
755 This class is fed with a binary stream and yields parts through its
755 `iterparts` methods."""
756 `iterparts` methods."""
756
757
757 _magicstring = 'HG20'
758 _magicstring = 'HG20'
758
759
759 def __init__(self, ui, fp):
760 def __init__(self, ui, fp):
760 """If header is specified, we do not read it out of the stream."""
761 """If header is specified, we do not read it out of the stream."""
761 self.ui = ui
762 self.ui = ui
762 self._compengine = util.compengines.forbundletype('UN')
763 self._compengine = util.compengines.forbundletype('UN')
763 self._compressed = None
764 self._compressed = None
764 super(unbundle20, self).__init__(fp)
765 super(unbundle20, self).__init__(fp)
765
766
766 @util.propertycache
767 @util.propertycache
767 def params(self):
768 def params(self):
768 """dictionary of stream level parameters"""
769 """dictionary of stream level parameters"""
769 indebug(self.ui, 'reading bundle2 stream parameters')
770 indebug(self.ui, 'reading bundle2 stream parameters')
770 params = {}
771 params = {}
771 paramssize = self._unpack(_fstreamparamsize)[0]
772 paramssize = self._unpack(_fstreamparamsize)[0]
772 if paramssize < 0:
773 if paramssize < 0:
773 raise error.BundleValueError('negative bundle param size: %i'
774 raise error.BundleValueError('negative bundle param size: %i'
774 % paramssize)
775 % paramssize)
775 if paramssize:
776 if paramssize:
776 params = self._readexact(paramssize)
777 params = self._readexact(paramssize)
777 params = self._processallparams(params)
778 params = self._processallparams(params)
778 return params
779 return params
779
780
780 def _processallparams(self, paramsblock):
781 def _processallparams(self, paramsblock):
781 """"""
782 """"""
782 params = util.sortdict()
783 params = util.sortdict()
783 for p in paramsblock.split(' '):
784 for p in paramsblock.split(' '):
784 p = p.split('=', 1)
785 p = p.split('=', 1)
785 p = [urlreq.unquote(i) for i in p]
786 p = [urlreq.unquote(i) for i in p]
786 if len(p) < 2:
787 if len(p) < 2:
787 p.append(None)
788 p.append(None)
788 self._processparam(*p)
789 self._processparam(*p)
789 params[p[0]] = p[1]
790 params[p[0]] = p[1]
790 return params
791 return params
791
792
792
793
793 def _processparam(self, name, value):
794 def _processparam(self, name, value):
794 """process a parameter, applying its effect if needed
795 """process a parameter, applying its effect if needed
795
796
796 Parameter starting with a lower case letter are advisory and will be
797 Parameter starting with a lower case letter are advisory and will be
797 ignored when unknown. Those starting with an upper case letter are
798 ignored when unknown. Those starting with an upper case letter are
798 mandatory and will this function will raise a KeyError when unknown.
799 mandatory and will this function will raise a KeyError when unknown.
799
800
800 Note: no option are currently supported. Any input will be either
801 Note: no option are currently supported. Any input will be either
801 ignored or failing.
802 ignored or failing.
802 """
803 """
803 if not name:
804 if not name:
804 raise ValueError(r'empty parameter name')
805 raise ValueError(r'empty parameter name')
805 if name[0:1] not in pycompat.bytestr(string.ascii_letters):
806 if name[0:1] not in pycompat.bytestr(string.ascii_letters):
806 raise ValueError(r'non letter first character: %s' % name)
807 raise ValueError(r'non letter first character: %s' % name)
807 try:
808 try:
808 handler = b2streamparamsmap[name.lower()]
809 handler = b2streamparamsmap[name.lower()]
809 except KeyError:
810 except KeyError:
810 if name[0:1].islower():
811 if name[0:1].islower():
811 indebug(self.ui, "ignoring unknown parameter %s" % name)
812 indebug(self.ui, "ignoring unknown parameter %s" % name)
812 else:
813 else:
813 raise error.BundleUnknownFeatureError(params=(name,))
814 raise error.BundleUnknownFeatureError(params=(name,))
814 else:
815 else:
815 handler(self, name, value)
816 handler(self, name, value)
816
817
817 def _forwardchunks(self):
818 def _forwardchunks(self):
818 """utility to transfer a bundle2 as binary
819 """utility to transfer a bundle2 as binary
819
820
820 This is made necessary by the fact the 'getbundle' command over 'ssh'
821 This is made necessary by the fact the 'getbundle' command over 'ssh'
821 have no way to know then the reply end, relying on the bundle to be
822 have no way to know then the reply end, relying on the bundle to be
822 interpreted to know its end. This is terrible and we are sorry, but we
823 interpreted to know its end. This is terrible and we are sorry, but we
823 needed to move forward to get general delta enabled.
824 needed to move forward to get general delta enabled.
824 """
825 """
825 yield self._magicstring
826 yield self._magicstring
826 assert 'params' not in vars(self)
827 assert 'params' not in vars(self)
827 paramssize = self._unpack(_fstreamparamsize)[0]
828 paramssize = self._unpack(_fstreamparamsize)[0]
828 if paramssize < 0:
829 if paramssize < 0:
829 raise error.BundleValueError('negative bundle param size: %i'
830 raise error.BundleValueError('negative bundle param size: %i'
830 % paramssize)
831 % paramssize)
831 yield _pack(_fstreamparamsize, paramssize)
832 yield _pack(_fstreamparamsize, paramssize)
832 if paramssize:
833 if paramssize:
833 params = self._readexact(paramssize)
834 params = self._readexact(paramssize)
834 self._processallparams(params)
835 self._processallparams(params)
835 yield params
836 yield params
836 assert self._compengine.bundletype == 'UN'
837 assert self._compengine.bundletype == 'UN'
837 # From there, payload might need to be decompressed
838 # From there, payload might need to be decompressed
838 self._fp = self._compengine.decompressorreader(self._fp)
839 self._fp = self._compengine.decompressorreader(self._fp)
839 emptycount = 0
840 emptycount = 0
840 while emptycount < 2:
841 while emptycount < 2:
841 # so we can brainlessly loop
842 # so we can brainlessly loop
842 assert _fpartheadersize == _fpayloadsize
843 assert _fpartheadersize == _fpayloadsize
843 size = self._unpack(_fpartheadersize)[0]
844 size = self._unpack(_fpartheadersize)[0]
844 yield _pack(_fpartheadersize, size)
845 yield _pack(_fpartheadersize, size)
845 if size:
846 if size:
846 emptycount = 0
847 emptycount = 0
847 else:
848 else:
848 emptycount += 1
849 emptycount += 1
849 continue
850 continue
850 if size == flaginterrupt:
851 if size == flaginterrupt:
851 continue
852 continue
852 elif size < 0:
853 elif size < 0:
853 raise error.BundleValueError('negative chunk size: %i')
854 raise error.BundleValueError('negative chunk size: %i')
854 yield self._readexact(size)
855 yield self._readexact(size)
855
856
856
857
857 def iterparts(self, seekable=False):
858 def iterparts(self, seekable=False):
858 """yield all parts contained in the stream"""
859 """yield all parts contained in the stream"""
859 cls = seekableunbundlepart if seekable else unbundlepart
860 cls = seekableunbundlepart if seekable else unbundlepart
860 # make sure param have been loaded
861 # make sure param have been loaded
861 self.params
862 self.params
862 # From there, payload need to be decompressed
863 # From there, payload need to be decompressed
863 self._fp = self._compengine.decompressorreader(self._fp)
864 self._fp = self._compengine.decompressorreader(self._fp)
864 indebug(self.ui, 'start extraction of bundle2 parts')
865 indebug(self.ui, 'start extraction of bundle2 parts')
865 headerblock = self._readpartheader()
866 headerblock = self._readpartheader()
866 while headerblock is not None:
867 while headerblock is not None:
867 part = cls(self.ui, headerblock, self._fp)
868 part = cls(self.ui, headerblock, self._fp)
868 yield part
869 yield part
869 # Ensure part is fully consumed so we can start reading the next
870 # Ensure part is fully consumed so we can start reading the next
870 # part.
871 # part.
871 part.consume()
872 part.consume()
872
873
873 headerblock = self._readpartheader()
874 headerblock = self._readpartheader()
874 indebug(self.ui, 'end of bundle2 stream')
875 indebug(self.ui, 'end of bundle2 stream')
875
876
876 def _readpartheader(self):
877 def _readpartheader(self):
877 """reads a part header size and return the bytes blob
878 """reads a part header size and return the bytes blob
878
879
879 returns None if empty"""
880 returns None if empty"""
880 headersize = self._unpack(_fpartheadersize)[0]
881 headersize = self._unpack(_fpartheadersize)[0]
881 if headersize < 0:
882 if headersize < 0:
882 raise error.BundleValueError('negative part header size: %i'
883 raise error.BundleValueError('negative part header size: %i'
883 % headersize)
884 % headersize)
884 indebug(self.ui, 'part header size: %i' % headersize)
885 indebug(self.ui, 'part header size: %i' % headersize)
885 if headersize:
886 if headersize:
886 return self._readexact(headersize)
887 return self._readexact(headersize)
887 return None
888 return None
888
889
889 def compressed(self):
890 def compressed(self):
890 self.params # load params
891 self.params # load params
891 return self._compressed
892 return self._compressed
892
893
893 def close(self):
894 def close(self):
894 """close underlying file"""
895 """close underlying file"""
895 if util.safehasattr(self._fp, 'close'):
896 if util.safehasattr(self._fp, 'close'):
896 return self._fp.close()
897 return self._fp.close()
897
898
898 formatmap = {'20': unbundle20}
899 formatmap = {'20': unbundle20}
899
900
900 b2streamparamsmap = {}
901 b2streamparamsmap = {}
901
902
902 def b2streamparamhandler(name):
903 def b2streamparamhandler(name):
903 """register a handler for a stream level parameter"""
904 """register a handler for a stream level parameter"""
904 def decorator(func):
905 def decorator(func):
905 assert name not in formatmap
906 assert name not in formatmap
906 b2streamparamsmap[name] = func
907 b2streamparamsmap[name] = func
907 return func
908 return func
908 return decorator
909 return decorator
909
910
910 @b2streamparamhandler('compression')
911 @b2streamparamhandler('compression')
911 def processcompression(unbundler, param, value):
912 def processcompression(unbundler, param, value):
912 """read compression parameter and install payload decompression"""
913 """read compression parameter and install payload decompression"""
913 if value not in util.compengines.supportedbundletypes:
914 if value not in util.compengines.supportedbundletypes:
914 raise error.BundleUnknownFeatureError(params=(param,),
915 raise error.BundleUnknownFeatureError(params=(param,),
915 values=(value,))
916 values=(value,))
916 unbundler._compengine = util.compengines.forbundletype(value)
917 unbundler._compengine = util.compengines.forbundletype(value)
917 if value is not None:
918 if value is not None:
918 unbundler._compressed = True
919 unbundler._compressed = True
919
920
920 class bundlepart(object):
921 class bundlepart(object):
921 """A bundle2 part contains application level payload
922 """A bundle2 part contains application level payload
922
923
923 The part `type` is used to route the part to the application level
924 The part `type` is used to route the part to the application level
924 handler.
925 handler.
925
926
926 The part payload is contained in ``part.data``. It could be raw bytes or a
927 The part payload is contained in ``part.data``. It could be raw bytes or a
927 generator of byte chunks.
928 generator of byte chunks.
928
929
929 You can add parameters to the part using the ``addparam`` method.
930 You can add parameters to the part using the ``addparam`` method.
930 Parameters can be either mandatory (default) or advisory. Remote side
931 Parameters can be either mandatory (default) or advisory. Remote side
931 should be able to safely ignore the advisory ones.
932 should be able to safely ignore the advisory ones.
932
933
933 Both data and parameters cannot be modified after the generation has begun.
934 Both data and parameters cannot be modified after the generation has begun.
934 """
935 """
935
936
936 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
937 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
937 data='', mandatory=True):
938 data='', mandatory=True):
938 validateparttype(parttype)
939 validateparttype(parttype)
939 self.id = None
940 self.id = None
940 self.type = parttype
941 self.type = parttype
941 self._data = data
942 self._data = data
942 self._mandatoryparams = list(mandatoryparams)
943 self._mandatoryparams = list(mandatoryparams)
943 self._advisoryparams = list(advisoryparams)
944 self._advisoryparams = list(advisoryparams)
944 # checking for duplicated entries
945 # checking for duplicated entries
945 self._seenparams = set()
946 self._seenparams = set()
946 for pname, __ in self._mandatoryparams + self._advisoryparams:
947 for pname, __ in self._mandatoryparams + self._advisoryparams:
947 if pname in self._seenparams:
948 if pname in self._seenparams:
948 raise error.ProgrammingError('duplicated params: %s' % pname)
949 raise error.ProgrammingError('duplicated params: %s' % pname)
949 self._seenparams.add(pname)
950 self._seenparams.add(pname)
950 # status of the part's generation:
951 # status of the part's generation:
951 # - None: not started,
952 # - None: not started,
952 # - False: currently generated,
953 # - False: currently generated,
953 # - True: generation done.
954 # - True: generation done.
954 self._generated = None
955 self._generated = None
955 self.mandatory = mandatory
956 self.mandatory = mandatory
956
957
957 def __repr__(self):
958 def __repr__(self):
958 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
959 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
959 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
960 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
960 % (cls, id(self), self.id, self.type, self.mandatory))
961 % (cls, id(self), self.id, self.type, self.mandatory))
961
962
962 def copy(self):
963 def copy(self):
963 """return a copy of the part
964 """return a copy of the part
964
965
965 The new part have the very same content but no partid assigned yet.
966 The new part have the very same content but no partid assigned yet.
966 Parts with generated data cannot be copied."""
967 Parts with generated data cannot be copied."""
967 assert not util.safehasattr(self.data, 'next')
968 assert not util.safehasattr(self.data, 'next')
968 return self.__class__(self.type, self._mandatoryparams,
969 return self.__class__(self.type, self._mandatoryparams,
969 self._advisoryparams, self._data, self.mandatory)
970 self._advisoryparams, self._data, self.mandatory)
970
971
971 # methods used to defines the part content
972 # methods used to defines the part content
972 @property
973 @property
973 def data(self):
974 def data(self):
974 return self._data
975 return self._data
975
976
976 @data.setter
977 @data.setter
977 def data(self, data):
978 def data(self, data):
978 if self._generated is not None:
979 if self._generated is not None:
979 raise error.ReadOnlyPartError('part is being generated')
980 raise error.ReadOnlyPartError('part is being generated')
980 self._data = data
981 self._data = data
981
982
982 @property
983 @property
983 def mandatoryparams(self):
984 def mandatoryparams(self):
984 # make it an immutable tuple to force people through ``addparam``
985 # make it an immutable tuple to force people through ``addparam``
985 return tuple(self._mandatoryparams)
986 return tuple(self._mandatoryparams)
986
987
987 @property
988 @property
988 def advisoryparams(self):
989 def advisoryparams(self):
989 # make it an immutable tuple to force people through ``addparam``
990 # make it an immutable tuple to force people through ``addparam``
990 return tuple(self._advisoryparams)
991 return tuple(self._advisoryparams)
991
992
992 def addparam(self, name, value='', mandatory=True):
993 def addparam(self, name, value='', mandatory=True):
993 """add a parameter to the part
994 """add a parameter to the part
994
995
995 If 'mandatory' is set to True, the remote handler must claim support
996 If 'mandatory' is set to True, the remote handler must claim support
996 for this parameter or the unbundling will be aborted.
997 for this parameter or the unbundling will be aborted.
997
998
998 The 'name' and 'value' cannot exceed 255 bytes each.
999 The 'name' and 'value' cannot exceed 255 bytes each.
999 """
1000 """
1000 if self._generated is not None:
1001 if self._generated is not None:
1001 raise error.ReadOnlyPartError('part is being generated')
1002 raise error.ReadOnlyPartError('part is being generated')
1002 if name in self._seenparams:
1003 if name in self._seenparams:
1003 raise ValueError('duplicated params: %s' % name)
1004 raise ValueError('duplicated params: %s' % name)
1004 self._seenparams.add(name)
1005 self._seenparams.add(name)
1005 params = self._advisoryparams
1006 params = self._advisoryparams
1006 if mandatory:
1007 if mandatory:
1007 params = self._mandatoryparams
1008 params = self._mandatoryparams
1008 params.append((name, value))
1009 params.append((name, value))
1009
1010
1010 # methods used to generates the bundle2 stream
1011 # methods used to generates the bundle2 stream
1011 def getchunks(self, ui):
1012 def getchunks(self, ui):
1012 if self._generated is not None:
1013 if self._generated is not None:
1013 raise error.ProgrammingError('part can only be consumed once')
1014 raise error.ProgrammingError('part can only be consumed once')
1014 self._generated = False
1015 self._generated = False
1015
1016
1016 if ui.debugflag:
1017 if ui.debugflag:
1017 msg = ['bundle2-output-part: "%s"' % self.type]
1018 msg = ['bundle2-output-part: "%s"' % self.type]
1018 if not self.mandatory:
1019 if not self.mandatory:
1019 msg.append(' (advisory)')
1020 msg.append(' (advisory)')
1020 nbmp = len(self.mandatoryparams)
1021 nbmp = len(self.mandatoryparams)
1021 nbap = len(self.advisoryparams)
1022 nbap = len(self.advisoryparams)
1022 if nbmp or nbap:
1023 if nbmp or nbap:
1023 msg.append(' (params:')
1024 msg.append(' (params:')
1024 if nbmp:
1025 if nbmp:
1025 msg.append(' %i mandatory' % nbmp)
1026 msg.append(' %i mandatory' % nbmp)
1026 if nbap:
1027 if nbap:
1027 msg.append(' %i advisory' % nbmp)
1028 msg.append(' %i advisory' % nbmp)
1028 msg.append(')')
1029 msg.append(')')
1029 if not self.data:
1030 if not self.data:
1030 msg.append(' empty payload')
1031 msg.append(' empty payload')
1031 elif (util.safehasattr(self.data, 'next')
1032 elif (util.safehasattr(self.data, 'next')
1032 or util.safehasattr(self.data, '__next__')):
1033 or util.safehasattr(self.data, '__next__')):
1033 msg.append(' streamed payload')
1034 msg.append(' streamed payload')
1034 else:
1035 else:
1035 msg.append(' %i bytes payload' % len(self.data))
1036 msg.append(' %i bytes payload' % len(self.data))
1036 msg.append('\n')
1037 msg.append('\n')
1037 ui.debug(''.join(msg))
1038 ui.debug(''.join(msg))
1038
1039
1039 #### header
1040 #### header
1040 if self.mandatory:
1041 if self.mandatory:
1041 parttype = self.type.upper()
1042 parttype = self.type.upper()
1042 else:
1043 else:
1043 parttype = self.type.lower()
1044 parttype = self.type.lower()
1044 outdebug(ui, 'part %s: "%s"' % (pycompat.bytestr(self.id), parttype))
1045 outdebug(ui, 'part %s: "%s"' % (pycompat.bytestr(self.id), parttype))
1045 ## parttype
1046 ## parttype
1046 header = [_pack(_fparttypesize, len(parttype)),
1047 header = [_pack(_fparttypesize, len(parttype)),
1047 parttype, _pack(_fpartid, self.id),
1048 parttype, _pack(_fpartid, self.id),
1048 ]
1049 ]
1049 ## parameters
1050 ## parameters
1050 # count
1051 # count
1051 manpar = self.mandatoryparams
1052 manpar = self.mandatoryparams
1052 advpar = self.advisoryparams
1053 advpar = self.advisoryparams
1053 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
1054 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
1054 # size
1055 # size
1055 parsizes = []
1056 parsizes = []
1056 for key, value in manpar:
1057 for key, value in manpar:
1057 parsizes.append(len(key))
1058 parsizes.append(len(key))
1058 parsizes.append(len(value))
1059 parsizes.append(len(value))
1059 for key, value in advpar:
1060 for key, value in advpar:
1060 parsizes.append(len(key))
1061 parsizes.append(len(key))
1061 parsizes.append(len(value))
1062 parsizes.append(len(value))
1062 paramsizes = _pack(_makefpartparamsizes(len(parsizes) // 2), *parsizes)
1063 paramsizes = _pack(_makefpartparamsizes(len(parsizes) // 2), *parsizes)
1063 header.append(paramsizes)
1064 header.append(paramsizes)
1064 # key, value
1065 # key, value
1065 for key, value in manpar:
1066 for key, value in manpar:
1066 header.append(key)
1067 header.append(key)
1067 header.append(value)
1068 header.append(value)
1068 for key, value in advpar:
1069 for key, value in advpar:
1069 header.append(key)
1070 header.append(key)
1070 header.append(value)
1071 header.append(value)
1071 ## finalize header
1072 ## finalize header
1072 try:
1073 try:
1073 headerchunk = ''.join(header)
1074 headerchunk = ''.join(header)
1074 except TypeError:
1075 except TypeError:
1075 raise TypeError(r'Found a non-bytes trying to '
1076 raise TypeError(r'Found a non-bytes trying to '
1076 r'build bundle part header: %r' % header)
1077 r'build bundle part header: %r' % header)
1077 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
1078 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
1078 yield _pack(_fpartheadersize, len(headerchunk))
1079 yield _pack(_fpartheadersize, len(headerchunk))
1079 yield headerchunk
1080 yield headerchunk
1080 ## payload
1081 ## payload
1081 try:
1082 try:
1082 for chunk in self._payloadchunks():
1083 for chunk in self._payloadchunks():
1083 outdebug(ui, 'payload chunk size: %i' % len(chunk))
1084 outdebug(ui, 'payload chunk size: %i' % len(chunk))
1084 yield _pack(_fpayloadsize, len(chunk))
1085 yield _pack(_fpayloadsize, len(chunk))
1085 yield chunk
1086 yield chunk
1086 except GeneratorExit:
1087 except GeneratorExit:
1087 # GeneratorExit means that nobody is listening for our
1088 # GeneratorExit means that nobody is listening for our
1088 # results anyway, so just bail quickly rather than trying
1089 # results anyway, so just bail quickly rather than trying
1089 # to produce an error part.
1090 # to produce an error part.
1090 ui.debug('bundle2-generatorexit\n')
1091 ui.debug('bundle2-generatorexit\n')
1091 raise
1092 raise
1092 except BaseException as exc:
1093 except BaseException as exc:
1093 bexc = util.forcebytestr(exc)
1094 bexc = util.forcebytestr(exc)
1094 # backup exception data for later
1095 # backup exception data for later
1095 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
1096 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
1096 % bexc)
1097 % bexc)
1097 tb = sys.exc_info()[2]
1098 tb = sys.exc_info()[2]
1098 msg = 'unexpected error: %s' % bexc
1099 msg = 'unexpected error: %s' % bexc
1099 interpart = bundlepart('error:abort', [('message', msg)],
1100 interpart = bundlepart('error:abort', [('message', msg)],
1100 mandatory=False)
1101 mandatory=False)
1101 interpart.id = 0
1102 interpart.id = 0
1102 yield _pack(_fpayloadsize, -1)
1103 yield _pack(_fpayloadsize, -1)
1103 for chunk in interpart.getchunks(ui=ui):
1104 for chunk in interpart.getchunks(ui=ui):
1104 yield chunk
1105 yield chunk
1105 outdebug(ui, 'closing payload chunk')
1106 outdebug(ui, 'closing payload chunk')
1106 # abort current part payload
1107 # abort current part payload
1107 yield _pack(_fpayloadsize, 0)
1108 yield _pack(_fpayloadsize, 0)
1108 pycompat.raisewithtb(exc, tb)
1109 pycompat.raisewithtb(exc, tb)
1109 # end of payload
1110 # end of payload
1110 outdebug(ui, 'closing payload chunk')
1111 outdebug(ui, 'closing payload chunk')
1111 yield _pack(_fpayloadsize, 0)
1112 yield _pack(_fpayloadsize, 0)
1112 self._generated = True
1113 self._generated = True
1113
1114
1114 def _payloadchunks(self):
1115 def _payloadchunks(self):
1115 """yield chunks of a the part payload
1116 """yield chunks of a the part payload
1116
1117
1117 Exists to handle the different methods to provide data to a part."""
1118 Exists to handle the different methods to provide data to a part."""
1118 # we only support fixed size data now.
1119 # we only support fixed size data now.
1119 # This will be improved in the future.
1120 # This will be improved in the future.
1120 if (util.safehasattr(self.data, 'next')
1121 if (util.safehasattr(self.data, 'next')
1121 or util.safehasattr(self.data, '__next__')):
1122 or util.safehasattr(self.data, '__next__')):
1122 buff = util.chunkbuffer(self.data)
1123 buff = util.chunkbuffer(self.data)
1123 chunk = buff.read(preferedchunksize)
1124 chunk = buff.read(preferedchunksize)
1124 while chunk:
1125 while chunk:
1125 yield chunk
1126 yield chunk
1126 chunk = buff.read(preferedchunksize)
1127 chunk = buff.read(preferedchunksize)
1127 elif len(self.data):
1128 elif len(self.data):
1128 yield self.data
1129 yield self.data
1129
1130
1130
1131
1131 flaginterrupt = -1
1132 flaginterrupt = -1
1132
1133
1133 class interrupthandler(unpackermixin):
1134 class interrupthandler(unpackermixin):
1134 """read one part and process it with restricted capability
1135 """read one part and process it with restricted capability
1135
1136
1136 This allows to transmit exception raised on the producer size during part
1137 This allows to transmit exception raised on the producer size during part
1137 iteration while the consumer is reading a part.
1138 iteration while the consumer is reading a part.
1138
1139
1139 Part processed in this manner only have access to a ui object,"""
1140 Part processed in this manner only have access to a ui object,"""
1140
1141
1141 def __init__(self, ui, fp):
1142 def __init__(self, ui, fp):
1142 super(interrupthandler, self).__init__(fp)
1143 super(interrupthandler, self).__init__(fp)
1143 self.ui = ui
1144 self.ui = ui
1144
1145
1145 def _readpartheader(self):
1146 def _readpartheader(self):
1146 """reads a part header size and return the bytes blob
1147 """reads a part header size and return the bytes blob
1147
1148
1148 returns None if empty"""
1149 returns None if empty"""
1149 headersize = self._unpack(_fpartheadersize)[0]
1150 headersize = self._unpack(_fpartheadersize)[0]
1150 if headersize < 0:
1151 if headersize < 0:
1151 raise error.BundleValueError('negative part header size: %i'
1152 raise error.BundleValueError('negative part header size: %i'
1152 % headersize)
1153 % headersize)
1153 indebug(self.ui, 'part header size: %i\n' % headersize)
1154 indebug(self.ui, 'part header size: %i\n' % headersize)
1154 if headersize:
1155 if headersize:
1155 return self._readexact(headersize)
1156 return self._readexact(headersize)
1156 return None
1157 return None
1157
1158
1158 def __call__(self):
1159 def __call__(self):
1159
1160
1160 self.ui.debug('bundle2-input-stream-interrupt:'
1161 self.ui.debug('bundle2-input-stream-interrupt:'
1161 ' opening out of band context\n')
1162 ' opening out of band context\n')
1162 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1163 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1163 headerblock = self._readpartheader()
1164 headerblock = self._readpartheader()
1164 if headerblock is None:
1165 if headerblock is None:
1165 indebug(self.ui, 'no part found during interruption.')
1166 indebug(self.ui, 'no part found during interruption.')
1166 return
1167 return
1167 part = unbundlepart(self.ui, headerblock, self._fp)
1168 part = unbundlepart(self.ui, headerblock, self._fp)
1168 op = interruptoperation(self.ui)
1169 op = interruptoperation(self.ui)
1169 hardabort = False
1170 hardabort = False
1170 try:
1171 try:
1171 _processpart(op, part)
1172 _processpart(op, part)
1172 except (SystemExit, KeyboardInterrupt):
1173 except (SystemExit, KeyboardInterrupt):
1173 hardabort = True
1174 hardabort = True
1174 raise
1175 raise
1175 finally:
1176 finally:
1176 if not hardabort:
1177 if not hardabort:
1177 part.consume()
1178 part.consume()
1178 self.ui.debug('bundle2-input-stream-interrupt:'
1179 self.ui.debug('bundle2-input-stream-interrupt:'
1179 ' closing out of band context\n')
1180 ' closing out of band context\n')
1180
1181
1181 class interruptoperation(object):
1182 class interruptoperation(object):
1182 """A limited operation to be use by part handler during interruption
1183 """A limited operation to be use by part handler during interruption
1183
1184
1184 It only have access to an ui object.
1185 It only have access to an ui object.
1185 """
1186 """
1186
1187
1187 def __init__(self, ui):
1188 def __init__(self, ui):
1188 self.ui = ui
1189 self.ui = ui
1189 self.reply = None
1190 self.reply = None
1190 self.captureoutput = False
1191 self.captureoutput = False
1191
1192
1192 @property
1193 @property
1193 def repo(self):
1194 def repo(self):
1194 raise error.ProgrammingError('no repo access from stream interruption')
1195 raise error.ProgrammingError('no repo access from stream interruption')
1195
1196
1196 def gettransaction(self):
1197 def gettransaction(self):
1197 raise TransactionUnavailable('no repo access from stream interruption')
1198 raise TransactionUnavailable('no repo access from stream interruption')
1198
1199
1199 def decodepayloadchunks(ui, fh):
1200 def decodepayloadchunks(ui, fh):
1200 """Reads bundle2 part payload data into chunks.
1201 """Reads bundle2 part payload data into chunks.
1201
1202
1202 Part payload data consists of framed chunks. This function takes
1203 Part payload data consists of framed chunks. This function takes
1203 a file handle and emits those chunks.
1204 a file handle and emits those chunks.
1204 """
1205 """
1205 dolog = ui.configbool('devel', 'bundle2.debug')
1206 dolog = ui.configbool('devel', 'bundle2.debug')
1206 debug = ui.debug
1207 debug = ui.debug
1207
1208
1208 headerstruct = struct.Struct(_fpayloadsize)
1209 headerstruct = struct.Struct(_fpayloadsize)
1209 headersize = headerstruct.size
1210 headersize = headerstruct.size
1210 unpack = headerstruct.unpack
1211 unpack = headerstruct.unpack
1211
1212
1212 readexactly = changegroup.readexactly
1213 readexactly = changegroup.readexactly
1213 read = fh.read
1214 read = fh.read
1214
1215
1215 chunksize = unpack(readexactly(fh, headersize))[0]
1216 chunksize = unpack(readexactly(fh, headersize))[0]
1216 indebug(ui, 'payload chunk size: %i' % chunksize)
1217 indebug(ui, 'payload chunk size: %i' % chunksize)
1217
1218
1218 # changegroup.readexactly() is inlined below for performance.
1219 # changegroup.readexactly() is inlined below for performance.
1219 while chunksize:
1220 while chunksize:
1220 if chunksize >= 0:
1221 if chunksize >= 0:
1221 s = read(chunksize)
1222 s = read(chunksize)
1222 if len(s) < chunksize:
1223 if len(s) < chunksize:
1223 raise error.Abort(_('stream ended unexpectedly '
1224 raise error.Abort(_('stream ended unexpectedly '
1224 ' (got %d bytes, expected %d)') %
1225 ' (got %d bytes, expected %d)') %
1225 (len(s), chunksize))
1226 (len(s), chunksize))
1226
1227
1227 yield s
1228 yield s
1228 elif chunksize == flaginterrupt:
1229 elif chunksize == flaginterrupt:
1229 # Interrupt "signal" detected. The regular stream is interrupted
1230 # Interrupt "signal" detected. The regular stream is interrupted
1230 # and a bundle2 part follows. Consume it.
1231 # and a bundle2 part follows. Consume it.
1231 interrupthandler(ui, fh)()
1232 interrupthandler(ui, fh)()
1232 else:
1233 else:
1233 raise error.BundleValueError(
1234 raise error.BundleValueError(
1234 'negative payload chunk size: %s' % chunksize)
1235 'negative payload chunk size: %s' % chunksize)
1235
1236
1236 s = read(headersize)
1237 s = read(headersize)
1237 if len(s) < headersize:
1238 if len(s) < headersize:
1238 raise error.Abort(_('stream ended unexpectedly '
1239 raise error.Abort(_('stream ended unexpectedly '
1239 ' (got %d bytes, expected %d)') %
1240 ' (got %d bytes, expected %d)') %
1240 (len(s), chunksize))
1241 (len(s), chunksize))
1241
1242
1242 chunksize = unpack(s)[0]
1243 chunksize = unpack(s)[0]
1243
1244
1244 # indebug() inlined for performance.
1245 # indebug() inlined for performance.
1245 if dolog:
1246 if dolog:
1246 debug('bundle2-input: payload chunk size: %i\n' % chunksize)
1247 debug('bundle2-input: payload chunk size: %i\n' % chunksize)
1247
1248
1248 class unbundlepart(unpackermixin):
1249 class unbundlepart(unpackermixin):
1249 """a bundle part read from a bundle"""
1250 """a bundle part read from a bundle"""
1250
1251
1251 def __init__(self, ui, header, fp):
1252 def __init__(self, ui, header, fp):
1252 super(unbundlepart, self).__init__(fp)
1253 super(unbundlepart, self).__init__(fp)
1253 self._seekable = (util.safehasattr(fp, 'seek') and
1254 self._seekable = (util.safehasattr(fp, 'seek') and
1254 util.safehasattr(fp, 'tell'))
1255 util.safehasattr(fp, 'tell'))
1255 self.ui = ui
1256 self.ui = ui
1256 # unbundle state attr
1257 # unbundle state attr
1257 self._headerdata = header
1258 self._headerdata = header
1258 self._headeroffset = 0
1259 self._headeroffset = 0
1259 self._initialized = False
1260 self._initialized = False
1260 self.consumed = False
1261 self.consumed = False
1261 # part data
1262 # part data
1262 self.id = None
1263 self.id = None
1263 self.type = None
1264 self.type = None
1264 self.mandatoryparams = None
1265 self.mandatoryparams = None
1265 self.advisoryparams = None
1266 self.advisoryparams = None
1266 self.params = None
1267 self.params = None
1267 self.mandatorykeys = ()
1268 self.mandatorykeys = ()
1268 self._readheader()
1269 self._readheader()
1269 self._mandatory = None
1270 self._mandatory = None
1270 self._pos = 0
1271 self._pos = 0
1271
1272
1272 def _fromheader(self, size):
1273 def _fromheader(self, size):
1273 """return the next <size> byte from the header"""
1274 """return the next <size> byte from the header"""
1274 offset = self._headeroffset
1275 offset = self._headeroffset
1275 data = self._headerdata[offset:(offset + size)]
1276 data = self._headerdata[offset:(offset + size)]
1276 self._headeroffset = offset + size
1277 self._headeroffset = offset + size
1277 return data
1278 return data
1278
1279
1279 def _unpackheader(self, format):
1280 def _unpackheader(self, format):
1280 """read given format from header
1281 """read given format from header
1281
1282
1282 This automatically compute the size of the format to read."""
1283 This automatically compute the size of the format to read."""
1283 data = self._fromheader(struct.calcsize(format))
1284 data = self._fromheader(struct.calcsize(format))
1284 return _unpack(format, data)
1285 return _unpack(format, data)
1285
1286
1286 def _initparams(self, mandatoryparams, advisoryparams):
1287 def _initparams(self, mandatoryparams, advisoryparams):
1287 """internal function to setup all logic related parameters"""
1288 """internal function to setup all logic related parameters"""
1288 # make it read only to prevent people touching it by mistake.
1289 # make it read only to prevent people touching it by mistake.
1289 self.mandatoryparams = tuple(mandatoryparams)
1290 self.mandatoryparams = tuple(mandatoryparams)
1290 self.advisoryparams = tuple(advisoryparams)
1291 self.advisoryparams = tuple(advisoryparams)
1291 # user friendly UI
1292 # user friendly UI
1292 self.params = util.sortdict(self.mandatoryparams)
1293 self.params = util.sortdict(self.mandatoryparams)
1293 self.params.update(self.advisoryparams)
1294 self.params.update(self.advisoryparams)
1294 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1295 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1295
1296
1296 def _readheader(self):
1297 def _readheader(self):
1297 """read the header and setup the object"""
1298 """read the header and setup the object"""
1298 typesize = self._unpackheader(_fparttypesize)[0]
1299 typesize = self._unpackheader(_fparttypesize)[0]
1299 self.type = self._fromheader(typesize)
1300 self.type = self._fromheader(typesize)
1300 indebug(self.ui, 'part type: "%s"' % self.type)
1301 indebug(self.ui, 'part type: "%s"' % self.type)
1301 self.id = self._unpackheader(_fpartid)[0]
1302 self.id = self._unpackheader(_fpartid)[0]
1302 indebug(self.ui, 'part id: "%s"' % pycompat.bytestr(self.id))
1303 indebug(self.ui, 'part id: "%s"' % pycompat.bytestr(self.id))
1303 # extract mandatory bit from type
1304 # extract mandatory bit from type
1304 self.mandatory = (self.type != self.type.lower())
1305 self.mandatory = (self.type != self.type.lower())
1305 self.type = self.type.lower()
1306 self.type = self.type.lower()
1306 ## reading parameters
1307 ## reading parameters
1307 # param count
1308 # param count
1308 mancount, advcount = self._unpackheader(_fpartparamcount)
1309 mancount, advcount = self._unpackheader(_fpartparamcount)
1309 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1310 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1310 # param size
1311 # param size
1311 fparamsizes = _makefpartparamsizes(mancount + advcount)
1312 fparamsizes = _makefpartparamsizes(mancount + advcount)
1312 paramsizes = self._unpackheader(fparamsizes)
1313 paramsizes = self._unpackheader(fparamsizes)
1313 # make it a list of couple again
1314 # make it a list of couple again
1314 paramsizes = list(zip(paramsizes[::2], paramsizes[1::2]))
1315 paramsizes = list(zip(paramsizes[::2], paramsizes[1::2]))
1315 # split mandatory from advisory
1316 # split mandatory from advisory
1316 mansizes = paramsizes[:mancount]
1317 mansizes = paramsizes[:mancount]
1317 advsizes = paramsizes[mancount:]
1318 advsizes = paramsizes[mancount:]
1318 # retrieve param value
1319 # retrieve param value
1319 manparams = []
1320 manparams = []
1320 for key, value in mansizes:
1321 for key, value in mansizes:
1321 manparams.append((self._fromheader(key), self._fromheader(value)))
1322 manparams.append((self._fromheader(key), self._fromheader(value)))
1322 advparams = []
1323 advparams = []
1323 for key, value in advsizes:
1324 for key, value in advsizes:
1324 advparams.append((self._fromheader(key), self._fromheader(value)))
1325 advparams.append((self._fromheader(key), self._fromheader(value)))
1325 self._initparams(manparams, advparams)
1326 self._initparams(manparams, advparams)
1326 ## part payload
1327 ## part payload
1327 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1328 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1328 # we read the data, tell it
1329 # we read the data, tell it
1329 self._initialized = True
1330 self._initialized = True
1330
1331
1331 def _payloadchunks(self):
1332 def _payloadchunks(self):
1332 """Generator of decoded chunks in the payload."""
1333 """Generator of decoded chunks in the payload."""
1333 return decodepayloadchunks(self.ui, self._fp)
1334 return decodepayloadchunks(self.ui, self._fp)
1334
1335
1335 def consume(self):
1336 def consume(self):
1336 """Read the part payload until completion.
1337 """Read the part payload until completion.
1337
1338
1338 By consuming the part data, the underlying stream read offset will
1339 By consuming the part data, the underlying stream read offset will
1339 be advanced to the next part (or end of stream).
1340 be advanced to the next part (or end of stream).
1340 """
1341 """
1341 if self.consumed:
1342 if self.consumed:
1342 return
1343 return
1343
1344
1344 chunk = self.read(32768)
1345 chunk = self.read(32768)
1345 while chunk:
1346 while chunk:
1346 self._pos += len(chunk)
1347 self._pos += len(chunk)
1347 chunk = self.read(32768)
1348 chunk = self.read(32768)
1348
1349
1349 def read(self, size=None):
1350 def read(self, size=None):
1350 """read payload data"""
1351 """read payload data"""
1351 if not self._initialized:
1352 if not self._initialized:
1352 self._readheader()
1353 self._readheader()
1353 if size is None:
1354 if size is None:
1354 data = self._payloadstream.read()
1355 data = self._payloadstream.read()
1355 else:
1356 else:
1356 data = self._payloadstream.read(size)
1357 data = self._payloadstream.read(size)
1357 self._pos += len(data)
1358 self._pos += len(data)
1358 if size is None or len(data) < size:
1359 if size is None or len(data) < size:
1359 if not self.consumed and self._pos:
1360 if not self.consumed and self._pos:
1360 self.ui.debug('bundle2-input-part: total payload size %i\n'
1361 self.ui.debug('bundle2-input-part: total payload size %i\n'
1361 % self._pos)
1362 % self._pos)
1362 self.consumed = True
1363 self.consumed = True
1363 return data
1364 return data
1364
1365
1365 class seekableunbundlepart(unbundlepart):
1366 class seekableunbundlepart(unbundlepart):
1366 """A bundle2 part in a bundle that is seekable.
1367 """A bundle2 part in a bundle that is seekable.
1367
1368
1368 Regular ``unbundlepart`` instances can only be read once. This class
1369 Regular ``unbundlepart`` instances can only be read once. This class
1369 extends ``unbundlepart`` to enable bi-directional seeking within the
1370 extends ``unbundlepart`` to enable bi-directional seeking within the
1370 part.
1371 part.
1371
1372
1372 Bundle2 part data consists of framed chunks. Offsets when seeking
1373 Bundle2 part data consists of framed chunks. Offsets when seeking
1373 refer to the decoded data, not the offsets in the underlying bundle2
1374 refer to the decoded data, not the offsets in the underlying bundle2
1374 stream.
1375 stream.
1375
1376
1376 To facilitate quickly seeking within the decoded data, instances of this
1377 To facilitate quickly seeking within the decoded data, instances of this
1377 class maintain a mapping between offsets in the underlying stream and
1378 class maintain a mapping between offsets in the underlying stream and
1378 the decoded payload. This mapping will consume memory in proportion
1379 the decoded payload. This mapping will consume memory in proportion
1379 to the number of chunks within the payload (which almost certainly
1380 to the number of chunks within the payload (which almost certainly
1380 increases in proportion with the size of the part).
1381 increases in proportion with the size of the part).
1381 """
1382 """
1382 def __init__(self, ui, header, fp):
1383 def __init__(self, ui, header, fp):
1383 # (payload, file) offsets for chunk starts.
1384 # (payload, file) offsets for chunk starts.
1384 self._chunkindex = []
1385 self._chunkindex = []
1385
1386
1386 super(seekableunbundlepart, self).__init__(ui, header, fp)
1387 super(seekableunbundlepart, self).__init__(ui, header, fp)
1387
1388
1388 def _payloadchunks(self, chunknum=0):
1389 def _payloadchunks(self, chunknum=0):
1389 '''seek to specified chunk and start yielding data'''
1390 '''seek to specified chunk and start yielding data'''
1390 if len(self._chunkindex) == 0:
1391 if len(self._chunkindex) == 0:
1391 assert chunknum == 0, 'Must start with chunk 0'
1392 assert chunknum == 0, 'Must start with chunk 0'
1392 self._chunkindex.append((0, self._tellfp()))
1393 self._chunkindex.append((0, self._tellfp()))
1393 else:
1394 else:
1394 assert chunknum < len(self._chunkindex), \
1395 assert chunknum < len(self._chunkindex), \
1395 'Unknown chunk %d' % chunknum
1396 'Unknown chunk %d' % chunknum
1396 self._seekfp(self._chunkindex[chunknum][1])
1397 self._seekfp(self._chunkindex[chunknum][1])
1397
1398
1398 pos = self._chunkindex[chunknum][0]
1399 pos = self._chunkindex[chunknum][0]
1399
1400
1400 for chunk in decodepayloadchunks(self.ui, self._fp):
1401 for chunk in decodepayloadchunks(self.ui, self._fp):
1401 chunknum += 1
1402 chunknum += 1
1402 pos += len(chunk)
1403 pos += len(chunk)
1403 if chunknum == len(self._chunkindex):
1404 if chunknum == len(self._chunkindex):
1404 self._chunkindex.append((pos, self._tellfp()))
1405 self._chunkindex.append((pos, self._tellfp()))
1405
1406
1406 yield chunk
1407 yield chunk
1407
1408
1408 def _findchunk(self, pos):
1409 def _findchunk(self, pos):
1409 '''for a given payload position, return a chunk number and offset'''
1410 '''for a given payload position, return a chunk number and offset'''
1410 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1411 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1411 if ppos == pos:
1412 if ppos == pos:
1412 return chunk, 0
1413 return chunk, 0
1413 elif ppos > pos:
1414 elif ppos > pos:
1414 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1415 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1415 raise ValueError('Unknown chunk')
1416 raise ValueError('Unknown chunk')
1416
1417
1417 def tell(self):
1418 def tell(self):
1418 return self._pos
1419 return self._pos
1419
1420
1420 def seek(self, offset, whence=os.SEEK_SET):
1421 def seek(self, offset, whence=os.SEEK_SET):
1421 if whence == os.SEEK_SET:
1422 if whence == os.SEEK_SET:
1422 newpos = offset
1423 newpos = offset
1423 elif whence == os.SEEK_CUR:
1424 elif whence == os.SEEK_CUR:
1424 newpos = self._pos + offset
1425 newpos = self._pos + offset
1425 elif whence == os.SEEK_END:
1426 elif whence == os.SEEK_END:
1426 if not self.consumed:
1427 if not self.consumed:
1427 # Can't use self.consume() here because it advances self._pos.
1428 # Can't use self.consume() here because it advances self._pos.
1428 chunk = self.read(32768)
1429 chunk = self.read(32768)
1429 while chunk:
1430 while chunk:
1430 chunk = self.read(32768)
1431 chunk = self.read(32768)
1431 newpos = self._chunkindex[-1][0] - offset
1432 newpos = self._chunkindex[-1][0] - offset
1432 else:
1433 else:
1433 raise ValueError('Unknown whence value: %r' % (whence,))
1434 raise ValueError('Unknown whence value: %r' % (whence,))
1434
1435
1435 if newpos > self._chunkindex[-1][0] and not self.consumed:
1436 if newpos > self._chunkindex[-1][0] and not self.consumed:
1436 # Can't use self.consume() here because it advances self._pos.
1437 # Can't use self.consume() here because it advances self._pos.
1437 chunk = self.read(32768)
1438 chunk = self.read(32768)
1438 while chunk:
1439 while chunk:
1439 chunk = self.read(32668)
1440 chunk = self.read(32668)
1440
1441
1441 if not 0 <= newpos <= self._chunkindex[-1][0]:
1442 if not 0 <= newpos <= self._chunkindex[-1][0]:
1442 raise ValueError('Offset out of range')
1443 raise ValueError('Offset out of range')
1443
1444
1444 if self._pos != newpos:
1445 if self._pos != newpos:
1445 chunk, internaloffset = self._findchunk(newpos)
1446 chunk, internaloffset = self._findchunk(newpos)
1446 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1447 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1447 adjust = self.read(internaloffset)
1448 adjust = self.read(internaloffset)
1448 if len(adjust) != internaloffset:
1449 if len(adjust) != internaloffset:
1449 raise error.Abort(_('Seek failed\n'))
1450 raise error.Abort(_('Seek failed\n'))
1450 self._pos = newpos
1451 self._pos = newpos
1451
1452
1452 def _seekfp(self, offset, whence=0):
1453 def _seekfp(self, offset, whence=0):
1453 """move the underlying file pointer
1454 """move the underlying file pointer
1454
1455
1455 This method is meant for internal usage by the bundle2 protocol only.
1456 This method is meant for internal usage by the bundle2 protocol only.
1456 They directly manipulate the low level stream including bundle2 level
1457 They directly manipulate the low level stream including bundle2 level
1457 instruction.
1458 instruction.
1458
1459
1459 Do not use it to implement higher-level logic or methods."""
1460 Do not use it to implement higher-level logic or methods."""
1460 if self._seekable:
1461 if self._seekable:
1461 return self._fp.seek(offset, whence)
1462 return self._fp.seek(offset, whence)
1462 else:
1463 else:
1463 raise NotImplementedError(_('File pointer is not seekable'))
1464 raise NotImplementedError(_('File pointer is not seekable'))
1464
1465
1465 def _tellfp(self):
1466 def _tellfp(self):
1466 """return the file offset, or None if file is not seekable
1467 """return the file offset, or None if file is not seekable
1467
1468
1468 This method is meant for internal usage by the bundle2 protocol only.
1469 This method is meant for internal usage by the bundle2 protocol only.
1469 They directly manipulate the low level stream including bundle2 level
1470 They directly manipulate the low level stream including bundle2 level
1470 instruction.
1471 instruction.
1471
1472
1472 Do not use it to implement higher-level logic or methods."""
1473 Do not use it to implement higher-level logic or methods."""
1473 if self._seekable:
1474 if self._seekable:
1474 try:
1475 try:
1475 return self._fp.tell()
1476 return self._fp.tell()
1476 except IOError as e:
1477 except IOError as e:
1477 if e.errno == errno.ESPIPE:
1478 if e.errno == errno.ESPIPE:
1478 self._seekable = False
1479 self._seekable = False
1479 else:
1480 else:
1480 raise
1481 raise
1481 return None
1482 return None
1482
1483
1483 # These are only the static capabilities.
1484 # These are only the static capabilities.
1484 # Check the 'getrepocaps' function for the rest.
1485 # Check the 'getrepocaps' function for the rest.
1485 capabilities = {'HG20': (),
1486 capabilities = {'HG20': (),
1486 'bookmarks': (),
1487 'bookmarks': (),
1487 'error': ('abort', 'unsupportedcontent', 'pushraced',
1488 'error': ('abort', 'unsupportedcontent', 'pushraced',
1488 'pushkey'),
1489 'pushkey'),
1489 'listkeys': (),
1490 'listkeys': (),
1490 'pushkey': (),
1491 'pushkey': (),
1491 'digests': tuple(sorted(util.DIGESTS.keys())),
1492 'digests': tuple(sorted(util.DIGESTS.keys())),
1492 'remote-changegroup': ('http', 'https'),
1493 'remote-changegroup': ('http', 'https'),
1493 'hgtagsfnodes': (),
1494 'hgtagsfnodes': (),
1494 'phases': ('heads',),
1495 'phases': ('heads',),
1495 'stream': ('v2',),
1496 'stream': ('v2',),
1496 }
1497 }
1497
1498
1498 def getrepocaps(repo, allowpushback=False, role=None):
1499 def getrepocaps(repo, allowpushback=False, role=None):
1499 """return the bundle2 capabilities for a given repo
1500 """return the bundle2 capabilities for a given repo
1500
1501
1501 Exists to allow extensions (like evolution) to mutate the capabilities.
1502 Exists to allow extensions (like evolution) to mutate the capabilities.
1502
1503
1503 The returned value is used for servers advertising their capabilities as
1504 The returned value is used for servers advertising their capabilities as
1504 well as clients advertising their capabilities to servers as part of
1505 well as clients advertising their capabilities to servers as part of
1505 bundle2 requests. The ``role`` argument specifies which is which.
1506 bundle2 requests. The ``role`` argument specifies which is which.
1506 """
1507 """
1507 if role not in ('client', 'server'):
1508 if role not in ('client', 'server'):
1508 raise error.ProgrammingError('role argument must be client or server')
1509 raise error.ProgrammingError('role argument must be client or server')
1509
1510
1510 caps = capabilities.copy()
1511 caps = capabilities.copy()
1511 caps['changegroup'] = tuple(sorted(
1512 caps['changegroup'] = tuple(sorted(
1512 changegroup.supportedincomingversions(repo)))
1513 changegroup.supportedincomingversions(repo)))
1513 if obsolete.isenabled(repo, obsolete.exchangeopt):
1514 if obsolete.isenabled(repo, obsolete.exchangeopt):
1514 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1515 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1515 caps['obsmarkers'] = supportedformat
1516 caps['obsmarkers'] = supportedformat
1516 if allowpushback:
1517 if allowpushback:
1517 caps['pushback'] = ()
1518 caps['pushback'] = ()
1518 cpmode = repo.ui.config('server', 'concurrent-push-mode')
1519 cpmode = repo.ui.config('server', 'concurrent-push-mode')
1519 if cpmode == 'check-related':
1520 if cpmode == 'check-related':
1520 caps['checkheads'] = ('related',)
1521 caps['checkheads'] = ('related',)
1521 if 'phases' in repo.ui.configlist('devel', 'legacy.exchange'):
1522 if 'phases' in repo.ui.configlist('devel', 'legacy.exchange'):
1522 caps.pop('phases')
1523 caps.pop('phases')
1523
1524
1524 # Don't advertise stream clone support in server mode if not configured.
1525 # Don't advertise stream clone support in server mode if not configured.
1525 if role == 'server':
1526 if role == 'server':
1526 streamsupported = repo.ui.configbool('server', 'uncompressed',
1527 streamsupported = repo.ui.configbool('server', 'uncompressed',
1527 untrusted=True)
1528 untrusted=True)
1528 featuresupported = repo.ui.configbool('experimental', 'bundle2.stream')
1529 featuresupported = repo.ui.configbool('experimental', 'bundle2.stream')
1529
1530
1530 if not streamsupported or not featuresupported:
1531 if not streamsupported or not featuresupported:
1531 caps.pop('stream')
1532 caps.pop('stream')
1532 # Else always advertise support on client, because payload support
1533 # Else always advertise support on client, because payload support
1533 # should always be advertised.
1534 # should always be advertised.
1534
1535
1535 return caps
1536 return caps
1536
1537
1537 def bundle2caps(remote):
1538 def bundle2caps(remote):
1538 """return the bundle capabilities of a peer as dict"""
1539 """return the bundle capabilities of a peer as dict"""
1539 raw = remote.capable('bundle2')
1540 raw = remote.capable('bundle2')
1540 if not raw and raw != '':
1541 if not raw and raw != '':
1541 return {}
1542 return {}
1542 capsblob = urlreq.unquote(remote.capable('bundle2'))
1543 capsblob = urlreq.unquote(remote.capable('bundle2'))
1543 return decodecaps(capsblob)
1544 return decodecaps(capsblob)
1544
1545
1545 def obsmarkersversion(caps):
1546 def obsmarkersversion(caps):
1546 """extract the list of supported obsmarkers versions from a bundle2caps dict
1547 """extract the list of supported obsmarkers versions from a bundle2caps dict
1547 """
1548 """
1548 obscaps = caps.get('obsmarkers', ())
1549 obscaps = caps.get('obsmarkers', ())
1549 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1550 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1550
1551
1551 def writenewbundle(ui, repo, source, filename, bundletype, outgoing, opts,
1552 def writenewbundle(ui, repo, source, filename, bundletype, outgoing, opts,
1552 vfs=None, compression=None, compopts=None):
1553 vfs=None, compression=None, compopts=None):
1553 if bundletype.startswith('HG10'):
1554 if bundletype.startswith('HG10'):
1554 cg = changegroup.makechangegroup(repo, outgoing, '01', source)
1555 cg = changegroup.makechangegroup(repo, outgoing, '01', source)
1555 return writebundle(ui, cg, filename, bundletype, vfs=vfs,
1556 return writebundle(ui, cg, filename, bundletype, vfs=vfs,
1556 compression=compression, compopts=compopts)
1557 compression=compression, compopts=compopts)
1557 elif not bundletype.startswith('HG20'):
1558 elif not bundletype.startswith('HG20'):
1558 raise error.ProgrammingError('unknown bundle type: %s' % bundletype)
1559 raise error.ProgrammingError('unknown bundle type: %s' % bundletype)
1559
1560
1560 caps = {}
1561 caps = {}
1561 if 'obsolescence' in opts:
1562 if 'obsolescence' in opts:
1562 caps['obsmarkers'] = ('V1',)
1563 caps['obsmarkers'] = ('V1',)
1563 bundle = bundle20(ui, caps)
1564 bundle = bundle20(ui, caps)
1564 bundle.setcompression(compression, compopts)
1565 bundle.setcompression(compression, compopts)
1565 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1566 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1566 chunkiter = bundle.getchunks()
1567 chunkiter = bundle.getchunks()
1567
1568
1568 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1569 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1569
1570
1570 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1571 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1571 # We should eventually reconcile this logic with the one behind
1572 # We should eventually reconcile this logic with the one behind
1572 # 'exchange.getbundle2partsgenerator'.
1573 # 'exchange.getbundle2partsgenerator'.
1573 #
1574 #
1574 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1575 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1575 # different right now. So we keep them separated for now for the sake of
1576 # different right now. So we keep them separated for now for the sake of
1576 # simplicity.
1577 # simplicity.
1577
1578
1578 # we always want a changegroup in such bundle
1579 # we always want a changegroup in such bundle
1579 cgversion = opts.get('cg.version')
1580 cgversion = opts.get('cg.version')
1580 if cgversion is None:
1581 if cgversion is None:
1581 cgversion = changegroup.safeversion(repo)
1582 cgversion = changegroup.safeversion(repo)
1582 cg = changegroup.makechangegroup(repo, outgoing, cgversion, source)
1583 cg = changegroup.makechangegroup(repo, outgoing, cgversion, source)
1583 part = bundler.newpart('changegroup', data=cg.getchunks())
1584 part = bundler.newpart('changegroup', data=cg.getchunks())
1584 part.addparam('version', cg.version)
1585 part.addparam('version', cg.version)
1585 if 'clcount' in cg.extras:
1586 if 'clcount' in cg.extras:
1586 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1587 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1587 mandatory=False)
1588 mandatory=False)
1588 if opts.get('phases') and repo.revs('%ln and secret()',
1589 if opts.get('phases') and repo.revs('%ln and secret()',
1589 outgoing.missingheads):
1590 outgoing.missingheads):
1590 part.addparam('targetphase', '%d' % phases.secret, mandatory=False)
1591 part.addparam('targetphase', '%d' % phases.secret, mandatory=False)
1591
1592
1592 addparttagsfnodescache(repo, bundler, outgoing)
1593 addparttagsfnodescache(repo, bundler, outgoing)
1593
1594
1594 if opts.get('obsolescence', False):
1595 if opts.get('obsolescence', False):
1595 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1596 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1596 buildobsmarkerspart(bundler, obsmarkers)
1597 buildobsmarkerspart(bundler, obsmarkers)
1597
1598
1598 if opts.get('phases', False):
1599 if opts.get('phases', False):
1599 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1600 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1600 phasedata = phases.binaryencode(headsbyphase)
1601 phasedata = phases.binaryencode(headsbyphase)
1601 bundler.newpart('phase-heads', data=phasedata)
1602 bundler.newpart('phase-heads', data=phasedata)
1602
1603
1603 def addparttagsfnodescache(repo, bundler, outgoing):
1604 def addparttagsfnodescache(repo, bundler, outgoing):
1604 # we include the tags fnode cache for the bundle changeset
1605 # we include the tags fnode cache for the bundle changeset
1605 # (as an optional parts)
1606 # (as an optional parts)
1606 cache = tags.hgtagsfnodescache(repo.unfiltered())
1607 cache = tags.hgtagsfnodescache(repo.unfiltered())
1607 chunks = []
1608 chunks = []
1608
1609
1609 # .hgtags fnodes are only relevant for head changesets. While we could
1610 # .hgtags fnodes are only relevant for head changesets. While we could
1610 # transfer values for all known nodes, there will likely be little to
1611 # transfer values for all known nodes, there will likely be little to
1611 # no benefit.
1612 # no benefit.
1612 #
1613 #
1613 # We don't bother using a generator to produce output data because
1614 # We don't bother using a generator to produce output data because
1614 # a) we only have 40 bytes per head and even esoteric numbers of heads
1615 # a) we only have 40 bytes per head and even esoteric numbers of heads
1615 # consume little memory (1M heads is 40MB) b) we don't want to send the
1616 # consume little memory (1M heads is 40MB) b) we don't want to send the
1616 # part if we don't have entries and knowing if we have entries requires
1617 # part if we don't have entries and knowing if we have entries requires
1617 # cache lookups.
1618 # cache lookups.
1618 for node in outgoing.missingheads:
1619 for node in outgoing.missingheads:
1619 # Don't compute missing, as this may slow down serving.
1620 # Don't compute missing, as this may slow down serving.
1620 fnode = cache.getfnode(node, computemissing=False)
1621 fnode = cache.getfnode(node, computemissing=False)
1621 if fnode is not None:
1622 if fnode is not None:
1622 chunks.extend([node, fnode])
1623 chunks.extend([node, fnode])
1623
1624
1624 if chunks:
1625 if chunks:
1625 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1626 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1626
1627
1628 def addpartrevbranchcache(repo, bundler, outgoing):
1629 # we include the rev branch cache for the bundle changeset
1630 # (as an optional parts)
1631 cache = repo.revbranchcache()
1632 cl = repo.unfiltered().changelog
1633 branchesdata = collections.defaultdict(lambda: (set(), set()))
1634 for node in outgoing.missing:
1635 branch, close = cache.branchinfo(cl.rev(node))
1636 branchesdata[branch][close].add(node)
1637
1638 def generate():
1639 for branch, (nodes, closed) in sorted(branchesdata.items()):
1640 utf8branch = encoding.fromlocal(branch)
1641 yield rbcstruct.pack(len(utf8branch), len(nodes), len(closed))
1642 yield utf8branch
1643 for n in sorted(nodes):
1644 yield n
1645 for n in sorted(closed):
1646 yield n
1647
1648 bundler.newpart('cache:rev-branch-cache', data=generate())
1649
1627 def buildobsmarkerspart(bundler, markers):
1650 def buildobsmarkerspart(bundler, markers):
1628 """add an obsmarker part to the bundler with <markers>
1651 """add an obsmarker part to the bundler with <markers>
1629
1652
1630 No part is created if markers is empty.
1653 No part is created if markers is empty.
1631 Raises ValueError if the bundler doesn't support any known obsmarker format.
1654 Raises ValueError if the bundler doesn't support any known obsmarker format.
1632 """
1655 """
1633 if not markers:
1656 if not markers:
1634 return None
1657 return None
1635
1658
1636 remoteversions = obsmarkersversion(bundler.capabilities)
1659 remoteversions = obsmarkersversion(bundler.capabilities)
1637 version = obsolete.commonversion(remoteversions)
1660 version = obsolete.commonversion(remoteversions)
1638 if version is None:
1661 if version is None:
1639 raise ValueError('bundler does not support common obsmarker format')
1662 raise ValueError('bundler does not support common obsmarker format')
1640 stream = obsolete.encodemarkers(markers, True, version=version)
1663 stream = obsolete.encodemarkers(markers, True, version=version)
1641 return bundler.newpart('obsmarkers', data=stream)
1664 return bundler.newpart('obsmarkers', data=stream)
1642
1665
1643 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1666 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1644 compopts=None):
1667 compopts=None):
1645 """Write a bundle file and return its filename.
1668 """Write a bundle file and return its filename.
1646
1669
1647 Existing files will not be overwritten.
1670 Existing files will not be overwritten.
1648 If no filename is specified, a temporary file is created.
1671 If no filename is specified, a temporary file is created.
1649 bz2 compression can be turned off.
1672 bz2 compression can be turned off.
1650 The bundle file will be deleted in case of errors.
1673 The bundle file will be deleted in case of errors.
1651 """
1674 """
1652
1675
1653 if bundletype == "HG20":
1676 if bundletype == "HG20":
1654 bundle = bundle20(ui)
1677 bundle = bundle20(ui)
1655 bundle.setcompression(compression, compopts)
1678 bundle.setcompression(compression, compopts)
1656 part = bundle.newpart('changegroup', data=cg.getchunks())
1679 part = bundle.newpart('changegroup', data=cg.getchunks())
1657 part.addparam('version', cg.version)
1680 part.addparam('version', cg.version)
1658 if 'clcount' in cg.extras:
1681 if 'clcount' in cg.extras:
1659 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1682 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1660 mandatory=False)
1683 mandatory=False)
1661 chunkiter = bundle.getchunks()
1684 chunkiter = bundle.getchunks()
1662 else:
1685 else:
1663 # compression argument is only for the bundle2 case
1686 # compression argument is only for the bundle2 case
1664 assert compression is None
1687 assert compression is None
1665 if cg.version != '01':
1688 if cg.version != '01':
1666 raise error.Abort(_('old bundle types only supports v1 '
1689 raise error.Abort(_('old bundle types only supports v1 '
1667 'changegroups'))
1690 'changegroups'))
1668 header, comp = bundletypes[bundletype]
1691 header, comp = bundletypes[bundletype]
1669 if comp not in util.compengines.supportedbundletypes:
1692 if comp not in util.compengines.supportedbundletypes:
1670 raise error.Abort(_('unknown stream compression type: %s')
1693 raise error.Abort(_('unknown stream compression type: %s')
1671 % comp)
1694 % comp)
1672 compengine = util.compengines.forbundletype(comp)
1695 compengine = util.compengines.forbundletype(comp)
1673 def chunkiter():
1696 def chunkiter():
1674 yield header
1697 yield header
1675 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1698 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1676 yield chunk
1699 yield chunk
1677 chunkiter = chunkiter()
1700 chunkiter = chunkiter()
1678
1701
1679 # parse the changegroup data, otherwise we will block
1702 # parse the changegroup data, otherwise we will block
1680 # in case of sshrepo because we don't know the end of the stream
1703 # in case of sshrepo because we don't know the end of the stream
1681 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1704 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1682
1705
1683 def combinechangegroupresults(op):
1706 def combinechangegroupresults(op):
1684 """logic to combine 0 or more addchangegroup results into one"""
1707 """logic to combine 0 or more addchangegroup results into one"""
1685 results = [r.get('return', 0)
1708 results = [r.get('return', 0)
1686 for r in op.records['changegroup']]
1709 for r in op.records['changegroup']]
1687 changedheads = 0
1710 changedheads = 0
1688 result = 1
1711 result = 1
1689 for ret in results:
1712 for ret in results:
1690 # If any changegroup result is 0, return 0
1713 # If any changegroup result is 0, return 0
1691 if ret == 0:
1714 if ret == 0:
1692 result = 0
1715 result = 0
1693 break
1716 break
1694 if ret < -1:
1717 if ret < -1:
1695 changedheads += ret + 1
1718 changedheads += ret + 1
1696 elif ret > 1:
1719 elif ret > 1:
1697 changedheads += ret - 1
1720 changedheads += ret - 1
1698 if changedheads > 0:
1721 if changedheads > 0:
1699 result = 1 + changedheads
1722 result = 1 + changedheads
1700 elif changedheads < 0:
1723 elif changedheads < 0:
1701 result = -1 + changedheads
1724 result = -1 + changedheads
1702 return result
1725 return result
1703
1726
1704 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest',
1727 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest',
1705 'targetphase'))
1728 'targetphase'))
1706 def handlechangegroup(op, inpart):
1729 def handlechangegroup(op, inpart):
1707 """apply a changegroup part on the repo
1730 """apply a changegroup part on the repo
1708
1731
1709 This is a very early implementation that will massive rework before being
1732 This is a very early implementation that will massive rework before being
1710 inflicted to any end-user.
1733 inflicted to any end-user.
1711 """
1734 """
1712 tr = op.gettransaction()
1735 tr = op.gettransaction()
1713 unpackerversion = inpart.params.get('version', '01')
1736 unpackerversion = inpart.params.get('version', '01')
1714 # We should raise an appropriate exception here
1737 # We should raise an appropriate exception here
1715 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1738 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1716 # the source and url passed here are overwritten by the one contained in
1739 # the source and url passed here are overwritten by the one contained in
1717 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1740 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1718 nbchangesets = None
1741 nbchangesets = None
1719 if 'nbchanges' in inpart.params:
1742 if 'nbchanges' in inpart.params:
1720 nbchangesets = int(inpart.params.get('nbchanges'))
1743 nbchangesets = int(inpart.params.get('nbchanges'))
1721 if ('treemanifest' in inpart.params and
1744 if ('treemanifest' in inpart.params and
1722 'treemanifest' not in op.repo.requirements):
1745 'treemanifest' not in op.repo.requirements):
1723 if len(op.repo.changelog) != 0:
1746 if len(op.repo.changelog) != 0:
1724 raise error.Abort(_(
1747 raise error.Abort(_(
1725 "bundle contains tree manifests, but local repo is "
1748 "bundle contains tree manifests, but local repo is "
1726 "non-empty and does not use tree manifests"))
1749 "non-empty and does not use tree manifests"))
1727 op.repo.requirements.add('treemanifest')
1750 op.repo.requirements.add('treemanifest')
1728 op.repo._applyopenerreqs()
1751 op.repo._applyopenerreqs()
1729 op.repo._writerequirements()
1752 op.repo._writerequirements()
1730 extrakwargs = {}
1753 extrakwargs = {}
1731 targetphase = inpart.params.get('targetphase')
1754 targetphase = inpart.params.get('targetphase')
1732 if targetphase is not None:
1755 if targetphase is not None:
1733 extrakwargs[r'targetphase'] = int(targetphase)
1756 extrakwargs[r'targetphase'] = int(targetphase)
1734 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2',
1757 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2',
1735 expectedtotal=nbchangesets, **extrakwargs)
1758 expectedtotal=nbchangesets, **extrakwargs)
1736 if op.reply is not None:
1759 if op.reply is not None:
1737 # This is definitely not the final form of this
1760 # This is definitely not the final form of this
1738 # return. But one need to start somewhere.
1761 # return. But one need to start somewhere.
1739 part = op.reply.newpart('reply:changegroup', mandatory=False)
1762 part = op.reply.newpart('reply:changegroup', mandatory=False)
1740 part.addparam(
1763 part.addparam(
1741 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1764 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1742 part.addparam('return', '%i' % ret, mandatory=False)
1765 part.addparam('return', '%i' % ret, mandatory=False)
1743 assert not inpart.read()
1766 assert not inpart.read()
1744
1767
1745 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1768 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1746 ['digest:%s' % k for k in util.DIGESTS.keys()])
1769 ['digest:%s' % k for k in util.DIGESTS.keys()])
1747 @parthandler('remote-changegroup', _remotechangegroupparams)
1770 @parthandler('remote-changegroup', _remotechangegroupparams)
1748 def handleremotechangegroup(op, inpart):
1771 def handleremotechangegroup(op, inpart):
1749 """apply a bundle10 on the repo, given an url and validation information
1772 """apply a bundle10 on the repo, given an url and validation information
1750
1773
1751 All the information about the remote bundle to import are given as
1774 All the information about the remote bundle to import are given as
1752 parameters. The parameters include:
1775 parameters. The parameters include:
1753 - url: the url to the bundle10.
1776 - url: the url to the bundle10.
1754 - size: the bundle10 file size. It is used to validate what was
1777 - size: the bundle10 file size. It is used to validate what was
1755 retrieved by the client matches the server knowledge about the bundle.
1778 retrieved by the client matches the server knowledge about the bundle.
1756 - digests: a space separated list of the digest types provided as
1779 - digests: a space separated list of the digest types provided as
1757 parameters.
1780 parameters.
1758 - digest:<digest-type>: the hexadecimal representation of the digest with
1781 - digest:<digest-type>: the hexadecimal representation of the digest with
1759 that name. Like the size, it is used to validate what was retrieved by
1782 that name. Like the size, it is used to validate what was retrieved by
1760 the client matches what the server knows about the bundle.
1783 the client matches what the server knows about the bundle.
1761
1784
1762 When multiple digest types are given, all of them are checked.
1785 When multiple digest types are given, all of them are checked.
1763 """
1786 """
1764 try:
1787 try:
1765 raw_url = inpart.params['url']
1788 raw_url = inpart.params['url']
1766 except KeyError:
1789 except KeyError:
1767 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1790 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1768 parsed_url = util.url(raw_url)
1791 parsed_url = util.url(raw_url)
1769 if parsed_url.scheme not in capabilities['remote-changegroup']:
1792 if parsed_url.scheme not in capabilities['remote-changegroup']:
1770 raise error.Abort(_('remote-changegroup does not support %s urls') %
1793 raise error.Abort(_('remote-changegroup does not support %s urls') %
1771 parsed_url.scheme)
1794 parsed_url.scheme)
1772
1795
1773 try:
1796 try:
1774 size = int(inpart.params['size'])
1797 size = int(inpart.params['size'])
1775 except ValueError:
1798 except ValueError:
1776 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1799 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1777 % 'size')
1800 % 'size')
1778 except KeyError:
1801 except KeyError:
1779 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1802 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1780
1803
1781 digests = {}
1804 digests = {}
1782 for typ in inpart.params.get('digests', '').split():
1805 for typ in inpart.params.get('digests', '').split():
1783 param = 'digest:%s' % typ
1806 param = 'digest:%s' % typ
1784 try:
1807 try:
1785 value = inpart.params[param]
1808 value = inpart.params[param]
1786 except KeyError:
1809 except KeyError:
1787 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1810 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1788 param)
1811 param)
1789 digests[typ] = value
1812 digests[typ] = value
1790
1813
1791 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1814 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1792
1815
1793 tr = op.gettransaction()
1816 tr = op.gettransaction()
1794 from . import exchange
1817 from . import exchange
1795 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1818 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1796 if not isinstance(cg, changegroup.cg1unpacker):
1819 if not isinstance(cg, changegroup.cg1unpacker):
1797 raise error.Abort(_('%s: not a bundle version 1.0') %
1820 raise error.Abort(_('%s: not a bundle version 1.0') %
1798 util.hidepassword(raw_url))
1821 util.hidepassword(raw_url))
1799 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2')
1822 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2')
1800 if op.reply is not None:
1823 if op.reply is not None:
1801 # This is definitely not the final form of this
1824 # This is definitely not the final form of this
1802 # return. But one need to start somewhere.
1825 # return. But one need to start somewhere.
1803 part = op.reply.newpart('reply:changegroup')
1826 part = op.reply.newpart('reply:changegroup')
1804 part.addparam(
1827 part.addparam(
1805 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1828 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1806 part.addparam('return', '%i' % ret, mandatory=False)
1829 part.addparam('return', '%i' % ret, mandatory=False)
1807 try:
1830 try:
1808 real_part.validate()
1831 real_part.validate()
1809 except error.Abort as e:
1832 except error.Abort as e:
1810 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1833 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1811 (util.hidepassword(raw_url), str(e)))
1834 (util.hidepassword(raw_url), str(e)))
1812 assert not inpart.read()
1835 assert not inpart.read()
1813
1836
1814 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1837 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1815 def handlereplychangegroup(op, inpart):
1838 def handlereplychangegroup(op, inpart):
1816 ret = int(inpart.params['return'])
1839 ret = int(inpart.params['return'])
1817 replyto = int(inpart.params['in-reply-to'])
1840 replyto = int(inpart.params['in-reply-to'])
1818 op.records.add('changegroup', {'return': ret}, replyto)
1841 op.records.add('changegroup', {'return': ret}, replyto)
1819
1842
1820 @parthandler('check:bookmarks')
1843 @parthandler('check:bookmarks')
1821 def handlecheckbookmarks(op, inpart):
1844 def handlecheckbookmarks(op, inpart):
1822 """check location of bookmarks
1845 """check location of bookmarks
1823
1846
1824 This part is to be used to detect push race regarding bookmark, it
1847 This part is to be used to detect push race regarding bookmark, it
1825 contains binary encoded (bookmark, node) tuple. If the local state does
1848 contains binary encoded (bookmark, node) tuple. If the local state does
1826 not marks the one in the part, a PushRaced exception is raised
1849 not marks the one in the part, a PushRaced exception is raised
1827 """
1850 """
1828 bookdata = bookmarks.binarydecode(inpart)
1851 bookdata = bookmarks.binarydecode(inpart)
1829
1852
1830 msgstandard = ('repository changed while pushing - please try again '
1853 msgstandard = ('repository changed while pushing - please try again '
1831 '(bookmark "%s" move from %s to %s)')
1854 '(bookmark "%s" move from %s to %s)')
1832 msgmissing = ('repository changed while pushing - please try again '
1855 msgmissing = ('repository changed while pushing - please try again '
1833 '(bookmark "%s" is missing, expected %s)')
1856 '(bookmark "%s" is missing, expected %s)')
1834 msgexist = ('repository changed while pushing - please try again '
1857 msgexist = ('repository changed while pushing - please try again '
1835 '(bookmark "%s" set on %s, expected missing)')
1858 '(bookmark "%s" set on %s, expected missing)')
1836 for book, node in bookdata:
1859 for book, node in bookdata:
1837 currentnode = op.repo._bookmarks.get(book)
1860 currentnode = op.repo._bookmarks.get(book)
1838 if currentnode != node:
1861 if currentnode != node:
1839 if node is None:
1862 if node is None:
1840 finalmsg = msgexist % (book, nodemod.short(currentnode))
1863 finalmsg = msgexist % (book, nodemod.short(currentnode))
1841 elif currentnode is None:
1864 elif currentnode is None:
1842 finalmsg = msgmissing % (book, nodemod.short(node))
1865 finalmsg = msgmissing % (book, nodemod.short(node))
1843 else:
1866 else:
1844 finalmsg = msgstandard % (book, nodemod.short(node),
1867 finalmsg = msgstandard % (book, nodemod.short(node),
1845 nodemod.short(currentnode))
1868 nodemod.short(currentnode))
1846 raise error.PushRaced(finalmsg)
1869 raise error.PushRaced(finalmsg)
1847
1870
1848 @parthandler('check:heads')
1871 @parthandler('check:heads')
1849 def handlecheckheads(op, inpart):
1872 def handlecheckheads(op, inpart):
1850 """check that head of the repo did not change
1873 """check that head of the repo did not change
1851
1874
1852 This is used to detect a push race when using unbundle.
1875 This is used to detect a push race when using unbundle.
1853 This replaces the "heads" argument of unbundle."""
1876 This replaces the "heads" argument of unbundle."""
1854 h = inpart.read(20)
1877 h = inpart.read(20)
1855 heads = []
1878 heads = []
1856 while len(h) == 20:
1879 while len(h) == 20:
1857 heads.append(h)
1880 heads.append(h)
1858 h = inpart.read(20)
1881 h = inpart.read(20)
1859 assert not h
1882 assert not h
1860 # Trigger a transaction so that we are guaranteed to have the lock now.
1883 # Trigger a transaction so that we are guaranteed to have the lock now.
1861 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1884 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1862 op.gettransaction()
1885 op.gettransaction()
1863 if sorted(heads) != sorted(op.repo.heads()):
1886 if sorted(heads) != sorted(op.repo.heads()):
1864 raise error.PushRaced('repository changed while pushing - '
1887 raise error.PushRaced('repository changed while pushing - '
1865 'please try again')
1888 'please try again')
1866
1889
1867 @parthandler('check:updated-heads')
1890 @parthandler('check:updated-heads')
1868 def handlecheckupdatedheads(op, inpart):
1891 def handlecheckupdatedheads(op, inpart):
1869 """check for race on the heads touched by a push
1892 """check for race on the heads touched by a push
1870
1893
1871 This is similar to 'check:heads' but focus on the heads actually updated
1894 This is similar to 'check:heads' but focus on the heads actually updated
1872 during the push. If other activities happen on unrelated heads, it is
1895 during the push. If other activities happen on unrelated heads, it is
1873 ignored.
1896 ignored.
1874
1897
1875 This allow server with high traffic to avoid push contention as long as
1898 This allow server with high traffic to avoid push contention as long as
1876 unrelated parts of the graph are involved."""
1899 unrelated parts of the graph are involved."""
1877 h = inpart.read(20)
1900 h = inpart.read(20)
1878 heads = []
1901 heads = []
1879 while len(h) == 20:
1902 while len(h) == 20:
1880 heads.append(h)
1903 heads.append(h)
1881 h = inpart.read(20)
1904 h = inpart.read(20)
1882 assert not h
1905 assert not h
1883 # trigger a transaction so that we are guaranteed to have the lock now.
1906 # trigger a transaction so that we are guaranteed to have the lock now.
1884 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1907 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1885 op.gettransaction()
1908 op.gettransaction()
1886
1909
1887 currentheads = set()
1910 currentheads = set()
1888 for ls in op.repo.branchmap().itervalues():
1911 for ls in op.repo.branchmap().itervalues():
1889 currentheads.update(ls)
1912 currentheads.update(ls)
1890
1913
1891 for h in heads:
1914 for h in heads:
1892 if h not in currentheads:
1915 if h not in currentheads:
1893 raise error.PushRaced('repository changed while pushing - '
1916 raise error.PushRaced('repository changed while pushing - '
1894 'please try again')
1917 'please try again')
1895
1918
1896 @parthandler('check:phases')
1919 @parthandler('check:phases')
1897 def handlecheckphases(op, inpart):
1920 def handlecheckphases(op, inpart):
1898 """check that phase boundaries of the repository did not change
1921 """check that phase boundaries of the repository did not change
1899
1922
1900 This is used to detect a push race.
1923 This is used to detect a push race.
1901 """
1924 """
1902 phasetonodes = phases.binarydecode(inpart)
1925 phasetonodes = phases.binarydecode(inpart)
1903 unfi = op.repo.unfiltered()
1926 unfi = op.repo.unfiltered()
1904 cl = unfi.changelog
1927 cl = unfi.changelog
1905 phasecache = unfi._phasecache
1928 phasecache = unfi._phasecache
1906 msg = ('repository changed while pushing - please try again '
1929 msg = ('repository changed while pushing - please try again '
1907 '(%s is %s expected %s)')
1930 '(%s is %s expected %s)')
1908 for expectedphase, nodes in enumerate(phasetonodes):
1931 for expectedphase, nodes in enumerate(phasetonodes):
1909 for n in nodes:
1932 for n in nodes:
1910 actualphase = phasecache.phase(unfi, cl.rev(n))
1933 actualphase = phasecache.phase(unfi, cl.rev(n))
1911 if actualphase != expectedphase:
1934 if actualphase != expectedphase:
1912 finalmsg = msg % (nodemod.short(n),
1935 finalmsg = msg % (nodemod.short(n),
1913 phases.phasenames[actualphase],
1936 phases.phasenames[actualphase],
1914 phases.phasenames[expectedphase])
1937 phases.phasenames[expectedphase])
1915 raise error.PushRaced(finalmsg)
1938 raise error.PushRaced(finalmsg)
1916
1939
1917 @parthandler('output')
1940 @parthandler('output')
1918 def handleoutput(op, inpart):
1941 def handleoutput(op, inpart):
1919 """forward output captured on the server to the client"""
1942 """forward output captured on the server to the client"""
1920 for line in inpart.read().splitlines():
1943 for line in inpart.read().splitlines():
1921 op.ui.status(_('remote: %s\n') % line)
1944 op.ui.status(_('remote: %s\n') % line)
1922
1945
1923 @parthandler('replycaps')
1946 @parthandler('replycaps')
1924 def handlereplycaps(op, inpart):
1947 def handlereplycaps(op, inpart):
1925 """Notify that a reply bundle should be created
1948 """Notify that a reply bundle should be created
1926
1949
1927 The payload contains the capabilities information for the reply"""
1950 The payload contains the capabilities information for the reply"""
1928 caps = decodecaps(inpart.read())
1951 caps = decodecaps(inpart.read())
1929 if op.reply is None:
1952 if op.reply is None:
1930 op.reply = bundle20(op.ui, caps)
1953 op.reply = bundle20(op.ui, caps)
1931
1954
1932 class AbortFromPart(error.Abort):
1955 class AbortFromPart(error.Abort):
1933 """Sub-class of Abort that denotes an error from a bundle2 part."""
1956 """Sub-class of Abort that denotes an error from a bundle2 part."""
1934
1957
1935 @parthandler('error:abort', ('message', 'hint'))
1958 @parthandler('error:abort', ('message', 'hint'))
1936 def handleerrorabort(op, inpart):
1959 def handleerrorabort(op, inpart):
1937 """Used to transmit abort error over the wire"""
1960 """Used to transmit abort error over the wire"""
1938 raise AbortFromPart(inpart.params['message'],
1961 raise AbortFromPart(inpart.params['message'],
1939 hint=inpart.params.get('hint'))
1962 hint=inpart.params.get('hint'))
1940
1963
1941 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1964 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1942 'in-reply-to'))
1965 'in-reply-to'))
1943 def handleerrorpushkey(op, inpart):
1966 def handleerrorpushkey(op, inpart):
1944 """Used to transmit failure of a mandatory pushkey over the wire"""
1967 """Used to transmit failure of a mandatory pushkey over the wire"""
1945 kwargs = {}
1968 kwargs = {}
1946 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1969 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1947 value = inpart.params.get(name)
1970 value = inpart.params.get(name)
1948 if value is not None:
1971 if value is not None:
1949 kwargs[name] = value
1972 kwargs[name] = value
1950 raise error.PushkeyFailed(inpart.params['in-reply-to'],
1973 raise error.PushkeyFailed(inpart.params['in-reply-to'],
1951 **pycompat.strkwargs(kwargs))
1974 **pycompat.strkwargs(kwargs))
1952
1975
1953 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1976 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1954 def handleerrorunsupportedcontent(op, inpart):
1977 def handleerrorunsupportedcontent(op, inpart):
1955 """Used to transmit unknown content error over the wire"""
1978 """Used to transmit unknown content error over the wire"""
1956 kwargs = {}
1979 kwargs = {}
1957 parttype = inpart.params.get('parttype')
1980 parttype = inpart.params.get('parttype')
1958 if parttype is not None:
1981 if parttype is not None:
1959 kwargs['parttype'] = parttype
1982 kwargs['parttype'] = parttype
1960 params = inpart.params.get('params')
1983 params = inpart.params.get('params')
1961 if params is not None:
1984 if params is not None:
1962 kwargs['params'] = params.split('\0')
1985 kwargs['params'] = params.split('\0')
1963
1986
1964 raise error.BundleUnknownFeatureError(**pycompat.strkwargs(kwargs))
1987 raise error.BundleUnknownFeatureError(**pycompat.strkwargs(kwargs))
1965
1988
1966 @parthandler('error:pushraced', ('message',))
1989 @parthandler('error:pushraced', ('message',))
1967 def handleerrorpushraced(op, inpart):
1990 def handleerrorpushraced(op, inpart):
1968 """Used to transmit push race error over the wire"""
1991 """Used to transmit push race error over the wire"""
1969 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1992 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1970
1993
1971 @parthandler('listkeys', ('namespace',))
1994 @parthandler('listkeys', ('namespace',))
1972 def handlelistkeys(op, inpart):
1995 def handlelistkeys(op, inpart):
1973 """retrieve pushkey namespace content stored in a bundle2"""
1996 """retrieve pushkey namespace content stored in a bundle2"""
1974 namespace = inpart.params['namespace']
1997 namespace = inpart.params['namespace']
1975 r = pushkey.decodekeys(inpart.read())
1998 r = pushkey.decodekeys(inpart.read())
1976 op.records.add('listkeys', (namespace, r))
1999 op.records.add('listkeys', (namespace, r))
1977
2000
1978 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
2001 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1979 def handlepushkey(op, inpart):
2002 def handlepushkey(op, inpart):
1980 """process a pushkey request"""
2003 """process a pushkey request"""
1981 dec = pushkey.decode
2004 dec = pushkey.decode
1982 namespace = dec(inpart.params['namespace'])
2005 namespace = dec(inpart.params['namespace'])
1983 key = dec(inpart.params['key'])
2006 key = dec(inpart.params['key'])
1984 old = dec(inpart.params['old'])
2007 old = dec(inpart.params['old'])
1985 new = dec(inpart.params['new'])
2008 new = dec(inpart.params['new'])
1986 # Grab the transaction to ensure that we have the lock before performing the
2009 # Grab the transaction to ensure that we have the lock before performing the
1987 # pushkey.
2010 # pushkey.
1988 if op.ui.configbool('experimental', 'bundle2lazylocking'):
2011 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1989 op.gettransaction()
2012 op.gettransaction()
1990 ret = op.repo.pushkey(namespace, key, old, new)
2013 ret = op.repo.pushkey(namespace, key, old, new)
1991 record = {'namespace': namespace,
2014 record = {'namespace': namespace,
1992 'key': key,
2015 'key': key,
1993 'old': old,
2016 'old': old,
1994 'new': new}
2017 'new': new}
1995 op.records.add('pushkey', record)
2018 op.records.add('pushkey', record)
1996 if op.reply is not None:
2019 if op.reply is not None:
1997 rpart = op.reply.newpart('reply:pushkey')
2020 rpart = op.reply.newpart('reply:pushkey')
1998 rpart.addparam(
2021 rpart.addparam(
1999 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
2022 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
2000 rpart.addparam('return', '%i' % ret, mandatory=False)
2023 rpart.addparam('return', '%i' % ret, mandatory=False)
2001 if inpart.mandatory and not ret:
2024 if inpart.mandatory and not ret:
2002 kwargs = {}
2025 kwargs = {}
2003 for key in ('namespace', 'key', 'new', 'old', 'ret'):
2026 for key in ('namespace', 'key', 'new', 'old', 'ret'):
2004 if key in inpart.params:
2027 if key in inpart.params:
2005 kwargs[key] = inpart.params[key]
2028 kwargs[key] = inpart.params[key]
2006 raise error.PushkeyFailed(partid='%d' % inpart.id,
2029 raise error.PushkeyFailed(partid='%d' % inpart.id,
2007 **pycompat.strkwargs(kwargs))
2030 **pycompat.strkwargs(kwargs))
2008
2031
2009 @parthandler('bookmarks')
2032 @parthandler('bookmarks')
2010 def handlebookmark(op, inpart):
2033 def handlebookmark(op, inpart):
2011 """transmit bookmark information
2034 """transmit bookmark information
2012
2035
2013 The part contains binary encoded bookmark information.
2036 The part contains binary encoded bookmark information.
2014
2037
2015 The exact behavior of this part can be controlled by the 'bookmarks' mode
2038 The exact behavior of this part can be controlled by the 'bookmarks' mode
2016 on the bundle operation.
2039 on the bundle operation.
2017
2040
2018 When mode is 'apply' (the default) the bookmark information is applied as
2041 When mode is 'apply' (the default) the bookmark information is applied as
2019 is to the unbundling repository. Make sure a 'check:bookmarks' part is
2042 is to the unbundling repository. Make sure a 'check:bookmarks' part is
2020 issued earlier to check for push races in such update. This behavior is
2043 issued earlier to check for push races in such update. This behavior is
2021 suitable for pushing.
2044 suitable for pushing.
2022
2045
2023 When mode is 'records', the information is recorded into the 'bookmarks'
2046 When mode is 'records', the information is recorded into the 'bookmarks'
2024 records of the bundle operation. This behavior is suitable for pulling.
2047 records of the bundle operation. This behavior is suitable for pulling.
2025 """
2048 """
2026 changes = bookmarks.binarydecode(inpart)
2049 changes = bookmarks.binarydecode(inpart)
2027
2050
2028 pushkeycompat = op.repo.ui.configbool('server', 'bookmarks-pushkey-compat')
2051 pushkeycompat = op.repo.ui.configbool('server', 'bookmarks-pushkey-compat')
2029 bookmarksmode = op.modes.get('bookmarks', 'apply')
2052 bookmarksmode = op.modes.get('bookmarks', 'apply')
2030
2053
2031 if bookmarksmode == 'apply':
2054 if bookmarksmode == 'apply':
2032 tr = op.gettransaction()
2055 tr = op.gettransaction()
2033 bookstore = op.repo._bookmarks
2056 bookstore = op.repo._bookmarks
2034 if pushkeycompat:
2057 if pushkeycompat:
2035 allhooks = []
2058 allhooks = []
2036 for book, node in changes:
2059 for book, node in changes:
2037 hookargs = tr.hookargs.copy()
2060 hookargs = tr.hookargs.copy()
2038 hookargs['pushkeycompat'] = '1'
2061 hookargs['pushkeycompat'] = '1'
2039 hookargs['namespace'] = 'bookmarks'
2062 hookargs['namespace'] = 'bookmarks'
2040 hookargs['key'] = book
2063 hookargs['key'] = book
2041 hookargs['old'] = nodemod.hex(bookstore.get(book, ''))
2064 hookargs['old'] = nodemod.hex(bookstore.get(book, ''))
2042 hookargs['new'] = nodemod.hex(node if node is not None else '')
2065 hookargs['new'] = nodemod.hex(node if node is not None else '')
2043 allhooks.append(hookargs)
2066 allhooks.append(hookargs)
2044
2067
2045 for hookargs in allhooks:
2068 for hookargs in allhooks:
2046 op.repo.hook('prepushkey', throw=True,
2069 op.repo.hook('prepushkey', throw=True,
2047 **pycompat.strkwargs(hookargs))
2070 **pycompat.strkwargs(hookargs))
2048
2071
2049 bookstore.applychanges(op.repo, op.gettransaction(), changes)
2072 bookstore.applychanges(op.repo, op.gettransaction(), changes)
2050
2073
2051 if pushkeycompat:
2074 if pushkeycompat:
2052 def runhook():
2075 def runhook():
2053 for hookargs in allhooks:
2076 for hookargs in allhooks:
2054 op.repo.hook('pushkey', **pycompat.strkwargs(hookargs))
2077 op.repo.hook('pushkey', **pycompat.strkwargs(hookargs))
2055 op.repo._afterlock(runhook)
2078 op.repo._afterlock(runhook)
2056
2079
2057 elif bookmarksmode == 'records':
2080 elif bookmarksmode == 'records':
2058 for book, node in changes:
2081 for book, node in changes:
2059 record = {'bookmark': book, 'node': node}
2082 record = {'bookmark': book, 'node': node}
2060 op.records.add('bookmarks', record)
2083 op.records.add('bookmarks', record)
2061 else:
2084 else:
2062 raise error.ProgrammingError('unkown bookmark mode: %s' % bookmarksmode)
2085 raise error.ProgrammingError('unkown bookmark mode: %s' % bookmarksmode)
2063
2086
2064 @parthandler('phase-heads')
2087 @parthandler('phase-heads')
2065 def handlephases(op, inpart):
2088 def handlephases(op, inpart):
2066 """apply phases from bundle part to repo"""
2089 """apply phases from bundle part to repo"""
2067 headsbyphase = phases.binarydecode(inpart)
2090 headsbyphase = phases.binarydecode(inpart)
2068 phases.updatephases(op.repo.unfiltered(), op.gettransaction, headsbyphase)
2091 phases.updatephases(op.repo.unfiltered(), op.gettransaction, headsbyphase)
2069
2092
2070 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
2093 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
2071 def handlepushkeyreply(op, inpart):
2094 def handlepushkeyreply(op, inpart):
2072 """retrieve the result of a pushkey request"""
2095 """retrieve the result of a pushkey request"""
2073 ret = int(inpart.params['return'])
2096 ret = int(inpart.params['return'])
2074 partid = int(inpart.params['in-reply-to'])
2097 partid = int(inpart.params['in-reply-to'])
2075 op.records.add('pushkey', {'return': ret}, partid)
2098 op.records.add('pushkey', {'return': ret}, partid)
2076
2099
2077 @parthandler('obsmarkers')
2100 @parthandler('obsmarkers')
2078 def handleobsmarker(op, inpart):
2101 def handleobsmarker(op, inpart):
2079 """add a stream of obsmarkers to the repo"""
2102 """add a stream of obsmarkers to the repo"""
2080 tr = op.gettransaction()
2103 tr = op.gettransaction()
2081 markerdata = inpart.read()
2104 markerdata = inpart.read()
2082 if op.ui.config('experimental', 'obsmarkers-exchange-debug'):
2105 if op.ui.config('experimental', 'obsmarkers-exchange-debug'):
2083 op.ui.write(('obsmarker-exchange: %i bytes received\n')
2106 op.ui.write(('obsmarker-exchange: %i bytes received\n')
2084 % len(markerdata))
2107 % len(markerdata))
2085 # The mergemarkers call will crash if marker creation is not enabled.
2108 # The mergemarkers call will crash if marker creation is not enabled.
2086 # we want to avoid this if the part is advisory.
2109 # we want to avoid this if the part is advisory.
2087 if not inpart.mandatory and op.repo.obsstore.readonly:
2110 if not inpart.mandatory and op.repo.obsstore.readonly:
2088 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled\n')
2111 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled\n')
2089 return
2112 return
2090 new = op.repo.obsstore.mergemarkers(tr, markerdata)
2113 new = op.repo.obsstore.mergemarkers(tr, markerdata)
2091 op.repo.invalidatevolatilesets()
2114 op.repo.invalidatevolatilesets()
2092 if new:
2115 if new:
2093 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
2116 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
2094 op.records.add('obsmarkers', {'new': new})
2117 op.records.add('obsmarkers', {'new': new})
2095 if op.reply is not None:
2118 if op.reply is not None:
2096 rpart = op.reply.newpart('reply:obsmarkers')
2119 rpart = op.reply.newpart('reply:obsmarkers')
2097 rpart.addparam(
2120 rpart.addparam(
2098 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
2121 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
2099 rpart.addparam('new', '%i' % new, mandatory=False)
2122 rpart.addparam('new', '%i' % new, mandatory=False)
2100
2123
2101
2124
2102 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
2125 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
2103 def handleobsmarkerreply(op, inpart):
2126 def handleobsmarkerreply(op, inpart):
2104 """retrieve the result of a pushkey request"""
2127 """retrieve the result of a pushkey request"""
2105 ret = int(inpart.params['new'])
2128 ret = int(inpart.params['new'])
2106 partid = int(inpart.params['in-reply-to'])
2129 partid = int(inpart.params['in-reply-to'])
2107 op.records.add('obsmarkers', {'new': ret}, partid)
2130 op.records.add('obsmarkers', {'new': ret}, partid)
2108
2131
2109 @parthandler('hgtagsfnodes')
2132 @parthandler('hgtagsfnodes')
2110 def handlehgtagsfnodes(op, inpart):
2133 def handlehgtagsfnodes(op, inpart):
2111 """Applies .hgtags fnodes cache entries to the local repo.
2134 """Applies .hgtags fnodes cache entries to the local repo.
2112
2135
2113 Payload is pairs of 20 byte changeset nodes and filenodes.
2136 Payload is pairs of 20 byte changeset nodes and filenodes.
2114 """
2137 """
2115 # Grab the transaction so we ensure that we have the lock at this point.
2138 # Grab the transaction so we ensure that we have the lock at this point.
2116 if op.ui.configbool('experimental', 'bundle2lazylocking'):
2139 if op.ui.configbool('experimental', 'bundle2lazylocking'):
2117 op.gettransaction()
2140 op.gettransaction()
2118 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
2141 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
2119
2142
2120 count = 0
2143 count = 0
2121 while True:
2144 while True:
2122 node = inpart.read(20)
2145 node = inpart.read(20)
2123 fnode = inpart.read(20)
2146 fnode = inpart.read(20)
2124 if len(node) < 20 or len(fnode) < 20:
2147 if len(node) < 20 or len(fnode) < 20:
2125 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
2148 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
2126 break
2149 break
2127 cache.setfnode(node, fnode)
2150 cache.setfnode(node, fnode)
2128 count += 1
2151 count += 1
2129
2152
2130 cache.write()
2153 cache.write()
2131 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
2154 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
2132
2155
2133 rbcstruct = struct.Struct('>III')
2156 rbcstruct = struct.Struct('>III')
2134
2157
2135 @parthandler('cache:rev-branch-cache')
2158 @parthandler('cache:rev-branch-cache')
2136 def handlerbc(op, inpart):
2159 def handlerbc(op, inpart):
2137 """receive a rev-branch-cache payload and update the local cache
2160 """receive a rev-branch-cache payload and update the local cache
2138
2161
2139 The payload is a series of data related to each branch
2162 The payload is a series of data related to each branch
2140
2163
2141 1) branch name length
2164 1) branch name length
2142 2) number of open heads
2165 2) number of open heads
2143 3) number of closed heads
2166 3) number of closed heads
2144 4) open heads nodes
2167 4) open heads nodes
2145 5) closed heads nodes
2168 5) closed heads nodes
2146 """
2169 """
2147 total = 0
2170 total = 0
2148 rawheader = inpart.read(rbcstruct.size)
2171 rawheader = inpart.read(rbcstruct.size)
2149 cache = op.repo.revbranchcache()
2172 cache = op.repo.revbranchcache()
2150 cl = op.repo.unfiltered().changelog
2173 cl = op.repo.unfiltered().changelog
2151 while rawheader:
2174 while rawheader:
2152 header = rbcstruct.unpack(rawheader)
2175 header = rbcstruct.unpack(rawheader)
2153 total += header[1] + header[2]
2176 total += header[1] + header[2]
2154 utf8branch = inpart.read(header[0])
2177 utf8branch = inpart.read(header[0])
2155 branch = encoding.tolocal(utf8branch)
2178 branch = encoding.tolocal(utf8branch)
2156 for x in xrange(header[1]):
2179 for x in xrange(header[1]):
2157 node = inpart.read(20)
2180 node = inpart.read(20)
2158 rev = cl.rev(node)
2181 rev = cl.rev(node)
2159 cache.setdata(branch, rev, node, False)
2182 cache.setdata(branch, rev, node, False)
2160 for x in xrange(header[2]):
2183 for x in xrange(header[2]):
2161 node = inpart.read(20)
2184 node = inpart.read(20)
2162 rev = cl.rev(node)
2185 rev = cl.rev(node)
2163 cache.setdata(branch, rev, node, True)
2186 cache.setdata(branch, rev, node, True)
2164 rawheader = inpart.read(rbcstruct.size)
2187 rawheader = inpart.read(rbcstruct.size)
2165 cache.write()
2188 cache.write()
2166
2189
2167 @parthandler('pushvars')
2190 @parthandler('pushvars')
2168 def bundle2getvars(op, part):
2191 def bundle2getvars(op, part):
2169 '''unbundle a bundle2 containing shellvars on the server'''
2192 '''unbundle a bundle2 containing shellvars on the server'''
2170 # An option to disable unbundling on server-side for security reasons
2193 # An option to disable unbundling on server-side for security reasons
2171 if op.ui.configbool('push', 'pushvars.server'):
2194 if op.ui.configbool('push', 'pushvars.server'):
2172 hookargs = {}
2195 hookargs = {}
2173 for key, value in part.advisoryparams:
2196 for key, value in part.advisoryparams:
2174 key = key.upper()
2197 key = key.upper()
2175 # We want pushed variables to have USERVAR_ prepended so we know
2198 # We want pushed variables to have USERVAR_ prepended so we know
2176 # they came from the --pushvar flag.
2199 # they came from the --pushvar flag.
2177 key = "USERVAR_" + key
2200 key = "USERVAR_" + key
2178 hookargs[key] = value
2201 hookargs[key] = value
2179 op.addhookargs(hookargs)
2202 op.addhookargs(hookargs)
2180
2203
2181 @parthandler('stream2', ('requirements', 'filecount', 'bytecount'))
2204 @parthandler('stream2', ('requirements', 'filecount', 'bytecount'))
2182 def handlestreamv2bundle(op, part):
2205 def handlestreamv2bundle(op, part):
2183
2206
2184 requirements = urlreq.unquote(part.params['requirements']).split(',')
2207 requirements = urlreq.unquote(part.params['requirements']).split(',')
2185 filecount = int(part.params['filecount'])
2208 filecount = int(part.params['filecount'])
2186 bytecount = int(part.params['bytecount'])
2209 bytecount = int(part.params['bytecount'])
2187
2210
2188 repo = op.repo
2211 repo = op.repo
2189 if len(repo):
2212 if len(repo):
2190 msg = _('cannot apply stream clone to non empty repository')
2213 msg = _('cannot apply stream clone to non empty repository')
2191 raise error.Abort(msg)
2214 raise error.Abort(msg)
2192
2215
2193 repo.ui.debug('applying stream bundle\n')
2216 repo.ui.debug('applying stream bundle\n')
2194 streamclone.applybundlev2(repo, part, filecount, bytecount,
2217 streamclone.applybundlev2(repo, part, filecount, bytecount,
2195 requirements)
2218 requirements)
General Comments 0
You need to be logged in to leave comments. Login now