##// END OF EJS Templates
streamclone: move requirement update into consumev2...
Boris Feld -
r35822:2d3e486d stable
parent child Browse files
Show More
@@ -1,2165 +1,2157 b''
1 1 # bundle2.py - generic container format to transmit arbitrary data.
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 """Handling of the new bundle2 format
8 8
9 9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 11 that will be handed to and processed by the application layer.
12 12
13 13
14 14 General format architecture
15 15 ===========================
16 16
17 17 The format is architectured as follow
18 18
19 19 - magic string
20 20 - stream level parameters
21 21 - payload parts (any number)
22 22 - end of stream marker.
23 23
24 24 the Binary format
25 25 ============================
26 26
27 27 All numbers are unsigned and big-endian.
28 28
29 29 stream level parameters
30 30 ------------------------
31 31
32 32 Binary format is as follow
33 33
34 34 :params size: int32
35 35
36 36 The total number of Bytes used by the parameters
37 37
38 38 :params value: arbitrary number of Bytes
39 39
40 40 A blob of `params size` containing the serialized version of all stream level
41 41 parameters.
42 42
43 43 The blob contains a space separated list of parameters. Parameters with value
44 44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45 45
46 46 Empty name are obviously forbidden.
47 47
48 48 Name MUST start with a letter. If this first letter is lower case, the
49 49 parameter is advisory and can be safely ignored. However when the first
50 50 letter is capital, the parameter is mandatory and the bundling process MUST
51 51 stop if he is not able to proceed it.
52 52
53 53 Stream parameters use a simple textual format for two main reasons:
54 54
55 55 - Stream level parameters should remain simple and we want to discourage any
56 56 crazy usage.
57 57 - Textual data allow easy human inspection of a bundle2 header in case of
58 58 troubles.
59 59
60 60 Any Applicative level options MUST go into a bundle2 part instead.
61 61
62 62 Payload part
63 63 ------------------------
64 64
65 65 Binary format is as follow
66 66
67 67 :header size: int32
68 68
69 69 The total number of Bytes used by the part header. When the header is empty
70 70 (size = 0) this is interpreted as the end of stream marker.
71 71
72 72 :header:
73 73
74 74 The header defines how to interpret the part. It contains two piece of
75 75 data: the part type, and the part parameters.
76 76
77 77 The part type is used to route an application level handler, that can
78 78 interpret payload.
79 79
80 80 Part parameters are passed to the application level handler. They are
81 81 meant to convey information that will help the application level object to
82 82 interpret the part payload.
83 83
84 84 The binary format of the header is has follow
85 85
86 86 :typesize: (one byte)
87 87
88 88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89 89
90 90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 91 to this part.
92 92
93 93 :parameters:
94 94
95 95 Part's parameter may have arbitrary content, the binary structure is::
96 96
97 97 <mandatory-count><advisory-count><param-sizes><param-data>
98 98
99 99 :mandatory-count: 1 byte, number of mandatory parameters
100 100
101 101 :advisory-count: 1 byte, number of advisory parameters
102 102
103 103 :param-sizes:
104 104
105 105 N couple of bytes, where N is the total number of parameters. Each
106 106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107 107
108 108 :param-data:
109 109
110 110 A blob of bytes from which each parameter key and value can be
111 111 retrieved using the list of size couples stored in the previous
112 112 field.
113 113
114 114 Mandatory parameters comes first, then the advisory ones.
115 115
116 116 Each parameter's key MUST be unique within the part.
117 117
118 118 :payload:
119 119
120 120 payload is a series of `<chunksize><chunkdata>`.
121 121
122 122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124 124
125 125 The current implementation always produces either zero or one chunk.
126 126 This is an implementation limitation that will ultimately be lifted.
127 127
128 128 `chunksize` can be negative to trigger special case processing. No such
129 129 processing is in place yet.
130 130
131 131 Bundle processing
132 132 ============================
133 133
134 134 Each part is processed in order using a "part handler". Handler are registered
135 135 for a certain part type.
136 136
137 137 The matching of a part to its handler is case insensitive. The case of the
138 138 part type is used to know if a part is mandatory or advisory. If the Part type
139 139 contains any uppercase char it is considered mandatory. When no handler is
140 140 known for a Mandatory part, the process is aborted and an exception is raised.
141 141 If the part is advisory and no handler is known, the part is ignored. When the
142 142 process is aborted, the full bundle is still read from the stream to keep the
143 143 channel usable. But none of the part read from an abort are processed. In the
144 144 future, dropping the stream may become an option for channel we do not care to
145 145 preserve.
146 146 """
147 147
148 148 from __future__ import absolute_import, division
149 149
150 150 import errno
151 151 import os
152 152 import re
153 153 import string
154 154 import struct
155 155 import sys
156 156
157 157 from .i18n import _
158 158 from . import (
159 159 bookmarks,
160 160 changegroup,
161 161 error,
162 162 node as nodemod,
163 163 obsolete,
164 164 phases,
165 165 pushkey,
166 166 pycompat,
167 167 streamclone,
168 168 tags,
169 169 url,
170 170 util,
171 171 )
172 172
173 173 urlerr = util.urlerr
174 174 urlreq = util.urlreq
175 175
176 176 _pack = struct.pack
177 177 _unpack = struct.unpack
178 178
179 179 _fstreamparamsize = '>i'
180 180 _fpartheadersize = '>i'
181 181 _fparttypesize = '>B'
182 182 _fpartid = '>I'
183 183 _fpayloadsize = '>i'
184 184 _fpartparamcount = '>BB'
185 185
186 186 preferedchunksize = 32768
187 187
188 188 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
189 189
190 190 def outdebug(ui, message):
191 191 """debug regarding output stream (bundling)"""
192 192 if ui.configbool('devel', 'bundle2.debug'):
193 193 ui.debug('bundle2-output: %s\n' % message)
194 194
195 195 def indebug(ui, message):
196 196 """debug on input stream (unbundling)"""
197 197 if ui.configbool('devel', 'bundle2.debug'):
198 198 ui.debug('bundle2-input: %s\n' % message)
199 199
200 200 def validateparttype(parttype):
201 201 """raise ValueError if a parttype contains invalid character"""
202 202 if _parttypeforbidden.search(parttype):
203 203 raise ValueError(parttype)
204 204
205 205 def _makefpartparamsizes(nbparams):
206 206 """return a struct format to read part parameter sizes
207 207
208 208 The number parameters is variable so we need to build that format
209 209 dynamically.
210 210 """
211 211 return '>'+('BB'*nbparams)
212 212
213 213 parthandlermapping = {}
214 214
215 215 def parthandler(parttype, params=()):
216 216 """decorator that register a function as a bundle2 part handler
217 217
218 218 eg::
219 219
220 220 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
221 221 def myparttypehandler(...):
222 222 '''process a part of type "my part".'''
223 223 ...
224 224 """
225 225 validateparttype(parttype)
226 226 def _decorator(func):
227 227 lparttype = parttype.lower() # enforce lower case matching.
228 228 assert lparttype not in parthandlermapping
229 229 parthandlermapping[lparttype] = func
230 230 func.params = frozenset(params)
231 231 return func
232 232 return _decorator
233 233
234 234 class unbundlerecords(object):
235 235 """keep record of what happens during and unbundle
236 236
237 237 New records are added using `records.add('cat', obj)`. Where 'cat' is a
238 238 category of record and obj is an arbitrary object.
239 239
240 240 `records['cat']` will return all entries of this category 'cat'.
241 241
242 242 Iterating on the object itself will yield `('category', obj)` tuples
243 243 for all entries.
244 244
245 245 All iterations happens in chronological order.
246 246 """
247 247
248 248 def __init__(self):
249 249 self._categories = {}
250 250 self._sequences = []
251 251 self._replies = {}
252 252
253 253 def add(self, category, entry, inreplyto=None):
254 254 """add a new record of a given category.
255 255
256 256 The entry can then be retrieved in the list returned by
257 257 self['category']."""
258 258 self._categories.setdefault(category, []).append(entry)
259 259 self._sequences.append((category, entry))
260 260 if inreplyto is not None:
261 261 self.getreplies(inreplyto).add(category, entry)
262 262
263 263 def getreplies(self, partid):
264 264 """get the records that are replies to a specific part"""
265 265 return self._replies.setdefault(partid, unbundlerecords())
266 266
267 267 def __getitem__(self, cat):
268 268 return tuple(self._categories.get(cat, ()))
269 269
270 270 def __iter__(self):
271 271 return iter(self._sequences)
272 272
273 273 def __len__(self):
274 274 return len(self._sequences)
275 275
276 276 def __nonzero__(self):
277 277 return bool(self._sequences)
278 278
279 279 __bool__ = __nonzero__
280 280
281 281 class bundleoperation(object):
282 282 """an object that represents a single bundling process
283 283
284 284 Its purpose is to carry unbundle-related objects and states.
285 285
286 286 A new object should be created at the beginning of each bundle processing.
287 287 The object is to be returned by the processing function.
288 288
289 289 The object has very little content now it will ultimately contain:
290 290 * an access to the repo the bundle is applied to,
291 291 * a ui object,
292 292 * a way to retrieve a transaction to add changes to the repo,
293 293 * a way to record the result of processing each part,
294 294 * a way to construct a bundle response when applicable.
295 295 """
296 296
297 297 def __init__(self, repo, transactiongetter, captureoutput=True):
298 298 self.repo = repo
299 299 self.ui = repo.ui
300 300 self.records = unbundlerecords()
301 301 self.reply = None
302 302 self.captureoutput = captureoutput
303 303 self.hookargs = {}
304 304 self._gettransaction = transactiongetter
305 305 # carries value that can modify part behavior
306 306 self.modes = {}
307 307
308 308 def gettransaction(self):
309 309 transaction = self._gettransaction()
310 310
311 311 if self.hookargs:
312 312 # the ones added to the transaction supercede those added
313 313 # to the operation.
314 314 self.hookargs.update(transaction.hookargs)
315 315 transaction.hookargs = self.hookargs
316 316
317 317 # mark the hookargs as flushed. further attempts to add to
318 318 # hookargs will result in an abort.
319 319 self.hookargs = None
320 320
321 321 return transaction
322 322
323 323 def addhookargs(self, hookargs):
324 324 if self.hookargs is None:
325 325 raise error.ProgrammingError('attempted to add hookargs to '
326 326 'operation after transaction started')
327 327 self.hookargs.update(hookargs)
328 328
329 329 class TransactionUnavailable(RuntimeError):
330 330 pass
331 331
332 332 def _notransaction():
333 333 """default method to get a transaction while processing a bundle
334 334
335 335 Raise an exception to highlight the fact that no transaction was expected
336 336 to be created"""
337 337 raise TransactionUnavailable()
338 338
339 339 def applybundle(repo, unbundler, tr, source=None, url=None, **kwargs):
340 340 # transform me into unbundler.apply() as soon as the freeze is lifted
341 341 if isinstance(unbundler, unbundle20):
342 342 tr.hookargs['bundle2'] = '1'
343 343 if source is not None and 'source' not in tr.hookargs:
344 344 tr.hookargs['source'] = source
345 345 if url is not None and 'url' not in tr.hookargs:
346 346 tr.hookargs['url'] = url
347 347 return processbundle(repo, unbundler, lambda: tr)
348 348 else:
349 349 # the transactiongetter won't be used, but we might as well set it
350 350 op = bundleoperation(repo, lambda: tr)
351 351 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
352 352 return op
353 353
354 354 class partiterator(object):
355 355 def __init__(self, repo, op, unbundler):
356 356 self.repo = repo
357 357 self.op = op
358 358 self.unbundler = unbundler
359 359 self.iterator = None
360 360 self.count = 0
361 361 self.current = None
362 362
363 363 def __enter__(self):
364 364 def func():
365 365 itr = enumerate(self.unbundler.iterparts())
366 366 for count, p in itr:
367 367 self.count = count
368 368 self.current = p
369 369 yield p
370 370 p.consume()
371 371 self.current = None
372 372 self.iterator = func()
373 373 return self.iterator
374 374
375 375 def __exit__(self, type, exc, tb):
376 376 if not self.iterator:
377 377 return
378 378
379 379 # Only gracefully abort in a normal exception situation. User aborts
380 380 # like Ctrl+C throw a KeyboardInterrupt which is not a base Exception,
381 381 # and should not gracefully cleanup.
382 382 if isinstance(exc, Exception):
383 383 # Any exceptions seeking to the end of the bundle at this point are
384 384 # almost certainly related to the underlying stream being bad.
385 385 # And, chances are that the exception we're handling is related to
386 386 # getting in that bad state. So, we swallow the seeking error and
387 387 # re-raise the original error.
388 388 seekerror = False
389 389 try:
390 390 if self.current:
391 391 # consume the part content to not corrupt the stream.
392 392 self.current.consume()
393 393
394 394 for part in self.iterator:
395 395 # consume the bundle content
396 396 part.consume()
397 397 except Exception:
398 398 seekerror = True
399 399
400 400 # Small hack to let caller code distinguish exceptions from bundle2
401 401 # processing from processing the old format. This is mostly needed
402 402 # to handle different return codes to unbundle according to the type
403 403 # of bundle. We should probably clean up or drop this return code
404 404 # craziness in a future version.
405 405 exc.duringunbundle2 = True
406 406 salvaged = []
407 407 replycaps = None
408 408 if self.op.reply is not None:
409 409 salvaged = self.op.reply.salvageoutput()
410 410 replycaps = self.op.reply.capabilities
411 411 exc._replycaps = replycaps
412 412 exc._bundle2salvagedoutput = salvaged
413 413
414 414 # Re-raising from a variable loses the original stack. So only use
415 415 # that form if we need to.
416 416 if seekerror:
417 417 raise exc
418 418
419 419 self.repo.ui.debug('bundle2-input-bundle: %i parts total\n' %
420 420 self.count)
421 421
422 422 def processbundle(repo, unbundler, transactiongetter=None, op=None):
423 423 """This function process a bundle, apply effect to/from a repo
424 424
425 425 It iterates over each part then searches for and uses the proper handling
426 426 code to process the part. Parts are processed in order.
427 427
428 428 Unknown Mandatory part will abort the process.
429 429
430 430 It is temporarily possible to provide a prebuilt bundleoperation to the
431 431 function. This is used to ensure output is properly propagated in case of
432 432 an error during the unbundling. This output capturing part will likely be
433 433 reworked and this ability will probably go away in the process.
434 434 """
435 435 if op is None:
436 436 if transactiongetter is None:
437 437 transactiongetter = _notransaction
438 438 op = bundleoperation(repo, transactiongetter)
439 439 # todo:
440 440 # - replace this is a init function soon.
441 441 # - exception catching
442 442 unbundler.params
443 443 if repo.ui.debugflag:
444 444 msg = ['bundle2-input-bundle:']
445 445 if unbundler.params:
446 446 msg.append(' %i params' % len(unbundler.params))
447 447 if op._gettransaction is None or op._gettransaction is _notransaction:
448 448 msg.append(' no-transaction')
449 449 else:
450 450 msg.append(' with-transaction')
451 451 msg.append('\n')
452 452 repo.ui.debug(''.join(msg))
453 453
454 454 processparts(repo, op, unbundler)
455 455
456 456 return op
457 457
458 458 def processparts(repo, op, unbundler):
459 459 with partiterator(repo, op, unbundler) as parts:
460 460 for part in parts:
461 461 _processpart(op, part)
462 462
463 463 def _processchangegroup(op, cg, tr, source, url, **kwargs):
464 464 ret = cg.apply(op.repo, tr, source, url, **kwargs)
465 465 op.records.add('changegroup', {
466 466 'return': ret,
467 467 })
468 468 return ret
469 469
470 470 def _gethandler(op, part):
471 471 status = 'unknown' # used by debug output
472 472 try:
473 473 handler = parthandlermapping.get(part.type)
474 474 if handler is None:
475 475 status = 'unsupported-type'
476 476 raise error.BundleUnknownFeatureError(parttype=part.type)
477 477 indebug(op.ui, 'found a handler for part %s' % part.type)
478 478 unknownparams = part.mandatorykeys - handler.params
479 479 if unknownparams:
480 480 unknownparams = list(unknownparams)
481 481 unknownparams.sort()
482 482 status = 'unsupported-params (%s)' % ', '.join(unknownparams)
483 483 raise error.BundleUnknownFeatureError(parttype=part.type,
484 484 params=unknownparams)
485 485 status = 'supported'
486 486 except error.BundleUnknownFeatureError as exc:
487 487 if part.mandatory: # mandatory parts
488 488 raise
489 489 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
490 490 return # skip to part processing
491 491 finally:
492 492 if op.ui.debugflag:
493 493 msg = ['bundle2-input-part: "%s"' % part.type]
494 494 if not part.mandatory:
495 495 msg.append(' (advisory)')
496 496 nbmp = len(part.mandatorykeys)
497 497 nbap = len(part.params) - nbmp
498 498 if nbmp or nbap:
499 499 msg.append(' (params:')
500 500 if nbmp:
501 501 msg.append(' %i mandatory' % nbmp)
502 502 if nbap:
503 503 msg.append(' %i advisory' % nbmp)
504 504 msg.append(')')
505 505 msg.append(' %s\n' % status)
506 506 op.ui.debug(''.join(msg))
507 507
508 508 return handler
509 509
510 510 def _processpart(op, part):
511 511 """process a single part from a bundle
512 512
513 513 The part is guaranteed to have been fully consumed when the function exits
514 514 (even if an exception is raised)."""
515 515 handler = _gethandler(op, part)
516 516 if handler is None:
517 517 return
518 518
519 519 # handler is called outside the above try block so that we don't
520 520 # risk catching KeyErrors from anything other than the
521 521 # parthandlermapping lookup (any KeyError raised by handler()
522 522 # itself represents a defect of a different variety).
523 523 output = None
524 524 if op.captureoutput and op.reply is not None:
525 525 op.ui.pushbuffer(error=True, subproc=True)
526 526 output = ''
527 527 try:
528 528 handler(op, part)
529 529 finally:
530 530 if output is not None:
531 531 output = op.ui.popbuffer()
532 532 if output:
533 533 outpart = op.reply.newpart('output', data=output,
534 534 mandatory=False)
535 535 outpart.addparam(
536 536 'in-reply-to', pycompat.bytestr(part.id), mandatory=False)
537 537
538 538 def decodecaps(blob):
539 539 """decode a bundle2 caps bytes blob into a dictionary
540 540
541 541 The blob is a list of capabilities (one per line)
542 542 Capabilities may have values using a line of the form::
543 543
544 544 capability=value1,value2,value3
545 545
546 546 The values are always a list."""
547 547 caps = {}
548 548 for line in blob.splitlines():
549 549 if not line:
550 550 continue
551 551 if '=' not in line:
552 552 key, vals = line, ()
553 553 else:
554 554 key, vals = line.split('=', 1)
555 555 vals = vals.split(',')
556 556 key = urlreq.unquote(key)
557 557 vals = [urlreq.unquote(v) for v in vals]
558 558 caps[key] = vals
559 559 return caps
560 560
561 561 def encodecaps(caps):
562 562 """encode a bundle2 caps dictionary into a bytes blob"""
563 563 chunks = []
564 564 for ca in sorted(caps):
565 565 vals = caps[ca]
566 566 ca = urlreq.quote(ca)
567 567 vals = [urlreq.quote(v) for v in vals]
568 568 if vals:
569 569 ca = "%s=%s" % (ca, ','.join(vals))
570 570 chunks.append(ca)
571 571 return '\n'.join(chunks)
572 572
573 573 bundletypes = {
574 574 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
575 575 # since the unification ssh accepts a header but there
576 576 # is no capability signaling it.
577 577 "HG20": (), # special-cased below
578 578 "HG10UN": ("HG10UN", 'UN'),
579 579 "HG10BZ": ("HG10", 'BZ'),
580 580 "HG10GZ": ("HG10GZ", 'GZ'),
581 581 }
582 582
583 583 # hgweb uses this list to communicate its preferred type
584 584 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
585 585
586 586 class bundle20(object):
587 587 """represent an outgoing bundle2 container
588 588
589 589 Use the `addparam` method to add stream level parameter. and `newpart` to
590 590 populate it. Then call `getchunks` to retrieve all the binary chunks of
591 591 data that compose the bundle2 container."""
592 592
593 593 _magicstring = 'HG20'
594 594
595 595 def __init__(self, ui, capabilities=()):
596 596 self.ui = ui
597 597 self._params = []
598 598 self._parts = []
599 599 self.capabilities = dict(capabilities)
600 600 self._compengine = util.compengines.forbundletype('UN')
601 601 self._compopts = None
602 602 # If compression is being handled by a consumer of the raw
603 603 # data (e.g. the wire protocol), unsetting this flag tells
604 604 # consumers that the bundle is best left uncompressed.
605 605 self.prefercompressed = True
606 606
607 607 def setcompression(self, alg, compopts=None):
608 608 """setup core part compression to <alg>"""
609 609 if alg in (None, 'UN'):
610 610 return
611 611 assert not any(n.lower() == 'compression' for n, v in self._params)
612 612 self.addparam('Compression', alg)
613 613 self._compengine = util.compengines.forbundletype(alg)
614 614 self._compopts = compopts
615 615
616 616 @property
617 617 def nbparts(self):
618 618 """total number of parts added to the bundler"""
619 619 return len(self._parts)
620 620
621 621 # methods used to defines the bundle2 content
622 622 def addparam(self, name, value=None):
623 623 """add a stream level parameter"""
624 624 if not name:
625 625 raise ValueError(r'empty parameter name')
626 626 if name[0:1] not in pycompat.bytestr(string.ascii_letters):
627 627 raise ValueError(r'non letter first character: %s' % name)
628 628 self._params.append((name, value))
629 629
630 630 def addpart(self, part):
631 631 """add a new part to the bundle2 container
632 632
633 633 Parts contains the actual applicative payload."""
634 634 assert part.id is None
635 635 part.id = len(self._parts) # very cheap counter
636 636 self._parts.append(part)
637 637
638 638 def newpart(self, typeid, *args, **kwargs):
639 639 """create a new part and add it to the containers
640 640
641 641 As the part is directly added to the containers. For now, this means
642 642 that any failure to properly initialize the part after calling
643 643 ``newpart`` should result in a failure of the whole bundling process.
644 644
645 645 You can still fall back to manually create and add if you need better
646 646 control."""
647 647 part = bundlepart(typeid, *args, **kwargs)
648 648 self.addpart(part)
649 649 return part
650 650
651 651 # methods used to generate the bundle2 stream
652 652 def getchunks(self):
653 653 if self.ui.debugflag:
654 654 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
655 655 if self._params:
656 656 msg.append(' (%i params)' % len(self._params))
657 657 msg.append(' %i parts total\n' % len(self._parts))
658 658 self.ui.debug(''.join(msg))
659 659 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
660 660 yield self._magicstring
661 661 param = self._paramchunk()
662 662 outdebug(self.ui, 'bundle parameter: %s' % param)
663 663 yield _pack(_fstreamparamsize, len(param))
664 664 if param:
665 665 yield param
666 666 for chunk in self._compengine.compressstream(self._getcorechunk(),
667 667 self._compopts):
668 668 yield chunk
669 669
670 670 def _paramchunk(self):
671 671 """return a encoded version of all stream parameters"""
672 672 blocks = []
673 673 for par, value in self._params:
674 674 par = urlreq.quote(par)
675 675 if value is not None:
676 676 value = urlreq.quote(value)
677 677 par = '%s=%s' % (par, value)
678 678 blocks.append(par)
679 679 return ' '.join(blocks)
680 680
681 681 def _getcorechunk(self):
682 682 """yield chunk for the core part of the bundle
683 683
684 684 (all but headers and parameters)"""
685 685 outdebug(self.ui, 'start of parts')
686 686 for part in self._parts:
687 687 outdebug(self.ui, 'bundle part: "%s"' % part.type)
688 688 for chunk in part.getchunks(ui=self.ui):
689 689 yield chunk
690 690 outdebug(self.ui, 'end of bundle')
691 691 yield _pack(_fpartheadersize, 0)
692 692
693 693
694 694 def salvageoutput(self):
695 695 """return a list with a copy of all output parts in the bundle
696 696
697 697 This is meant to be used during error handling to make sure we preserve
698 698 server output"""
699 699 salvaged = []
700 700 for part in self._parts:
701 701 if part.type.startswith('output'):
702 702 salvaged.append(part.copy())
703 703 return salvaged
704 704
705 705
706 706 class unpackermixin(object):
707 707 """A mixin to extract bytes and struct data from a stream"""
708 708
709 709 def __init__(self, fp):
710 710 self._fp = fp
711 711
712 712 def _unpack(self, format):
713 713 """unpack this struct format from the stream
714 714
715 715 This method is meant for internal usage by the bundle2 protocol only.
716 716 They directly manipulate the low level stream including bundle2 level
717 717 instruction.
718 718
719 719 Do not use it to implement higher-level logic or methods."""
720 720 data = self._readexact(struct.calcsize(format))
721 721 return _unpack(format, data)
722 722
723 723 def _readexact(self, size):
724 724 """read exactly <size> bytes from the stream
725 725
726 726 This method is meant for internal usage by the bundle2 protocol only.
727 727 They directly manipulate the low level stream including bundle2 level
728 728 instruction.
729 729
730 730 Do not use it to implement higher-level logic or methods."""
731 731 return changegroup.readexactly(self._fp, size)
732 732
733 733 def getunbundler(ui, fp, magicstring=None):
734 734 """return a valid unbundler object for a given magicstring"""
735 735 if magicstring is None:
736 736 magicstring = changegroup.readexactly(fp, 4)
737 737 magic, version = magicstring[0:2], magicstring[2:4]
738 738 if magic != 'HG':
739 739 ui.debug(
740 740 "error: invalid magic: %r (version %r), should be 'HG'\n"
741 741 % (magic, version))
742 742 raise error.Abort(_('not a Mercurial bundle'))
743 743 unbundlerclass = formatmap.get(version)
744 744 if unbundlerclass is None:
745 745 raise error.Abort(_('unknown bundle version %s') % version)
746 746 unbundler = unbundlerclass(ui, fp)
747 747 indebug(ui, 'start processing of %s stream' % magicstring)
748 748 return unbundler
749 749
750 750 class unbundle20(unpackermixin):
751 751 """interpret a bundle2 stream
752 752
753 753 This class is fed with a binary stream and yields parts through its
754 754 `iterparts` methods."""
755 755
756 756 _magicstring = 'HG20'
757 757
758 758 def __init__(self, ui, fp):
759 759 """If header is specified, we do not read it out of the stream."""
760 760 self.ui = ui
761 761 self._compengine = util.compengines.forbundletype('UN')
762 762 self._compressed = None
763 763 super(unbundle20, self).__init__(fp)
764 764
765 765 @util.propertycache
766 766 def params(self):
767 767 """dictionary of stream level parameters"""
768 768 indebug(self.ui, 'reading bundle2 stream parameters')
769 769 params = {}
770 770 paramssize = self._unpack(_fstreamparamsize)[0]
771 771 if paramssize < 0:
772 772 raise error.BundleValueError('negative bundle param size: %i'
773 773 % paramssize)
774 774 if paramssize:
775 775 params = self._readexact(paramssize)
776 776 params = self._processallparams(params)
777 777 return params
778 778
779 779 def _processallparams(self, paramsblock):
780 780 """"""
781 781 params = util.sortdict()
782 782 for p in paramsblock.split(' '):
783 783 p = p.split('=', 1)
784 784 p = [urlreq.unquote(i) for i in p]
785 785 if len(p) < 2:
786 786 p.append(None)
787 787 self._processparam(*p)
788 788 params[p[0]] = p[1]
789 789 return params
790 790
791 791
792 792 def _processparam(self, name, value):
793 793 """process a parameter, applying its effect if needed
794 794
795 795 Parameter starting with a lower case letter are advisory and will be
796 796 ignored when unknown. Those starting with an upper case letter are
797 797 mandatory and will this function will raise a KeyError when unknown.
798 798
799 799 Note: no option are currently supported. Any input will be either
800 800 ignored or failing.
801 801 """
802 802 if not name:
803 803 raise ValueError(r'empty parameter name')
804 804 if name[0:1] not in pycompat.bytestr(string.ascii_letters):
805 805 raise ValueError(r'non letter first character: %s' % name)
806 806 try:
807 807 handler = b2streamparamsmap[name.lower()]
808 808 except KeyError:
809 809 if name[0:1].islower():
810 810 indebug(self.ui, "ignoring unknown parameter %s" % name)
811 811 else:
812 812 raise error.BundleUnknownFeatureError(params=(name,))
813 813 else:
814 814 handler(self, name, value)
815 815
816 816 def _forwardchunks(self):
817 817 """utility to transfer a bundle2 as binary
818 818
819 819 This is made necessary by the fact the 'getbundle' command over 'ssh'
820 820 have no way to know then the reply end, relying on the bundle to be
821 821 interpreted to know its end. This is terrible and we are sorry, but we
822 822 needed to move forward to get general delta enabled.
823 823 """
824 824 yield self._magicstring
825 825 assert 'params' not in vars(self)
826 826 paramssize = self._unpack(_fstreamparamsize)[0]
827 827 if paramssize < 0:
828 828 raise error.BundleValueError('negative bundle param size: %i'
829 829 % paramssize)
830 830 yield _pack(_fstreamparamsize, paramssize)
831 831 if paramssize:
832 832 params = self._readexact(paramssize)
833 833 self._processallparams(params)
834 834 yield params
835 835 assert self._compengine.bundletype == 'UN'
836 836 # From there, payload might need to be decompressed
837 837 self._fp = self._compengine.decompressorreader(self._fp)
838 838 emptycount = 0
839 839 while emptycount < 2:
840 840 # so we can brainlessly loop
841 841 assert _fpartheadersize == _fpayloadsize
842 842 size = self._unpack(_fpartheadersize)[0]
843 843 yield _pack(_fpartheadersize, size)
844 844 if size:
845 845 emptycount = 0
846 846 else:
847 847 emptycount += 1
848 848 continue
849 849 if size == flaginterrupt:
850 850 continue
851 851 elif size < 0:
852 852 raise error.BundleValueError('negative chunk size: %i')
853 853 yield self._readexact(size)
854 854
855 855
856 856 def iterparts(self, seekable=False):
857 857 """yield all parts contained in the stream"""
858 858 cls = seekableunbundlepart if seekable else unbundlepart
859 859 # make sure param have been loaded
860 860 self.params
861 861 # From there, payload need to be decompressed
862 862 self._fp = self._compengine.decompressorreader(self._fp)
863 863 indebug(self.ui, 'start extraction of bundle2 parts')
864 864 headerblock = self._readpartheader()
865 865 while headerblock is not None:
866 866 part = cls(self.ui, headerblock, self._fp)
867 867 yield part
868 868 # Ensure part is fully consumed so we can start reading the next
869 869 # part.
870 870 part.consume()
871 871
872 872 headerblock = self._readpartheader()
873 873 indebug(self.ui, 'end of bundle2 stream')
874 874
875 875 def _readpartheader(self):
876 876 """reads a part header size and return the bytes blob
877 877
878 878 returns None if empty"""
879 879 headersize = self._unpack(_fpartheadersize)[0]
880 880 if headersize < 0:
881 881 raise error.BundleValueError('negative part header size: %i'
882 882 % headersize)
883 883 indebug(self.ui, 'part header size: %i' % headersize)
884 884 if headersize:
885 885 return self._readexact(headersize)
886 886 return None
887 887
888 888 def compressed(self):
889 889 self.params # load params
890 890 return self._compressed
891 891
892 892 def close(self):
893 893 """close underlying file"""
894 894 if util.safehasattr(self._fp, 'close'):
895 895 return self._fp.close()
896 896
897 897 formatmap = {'20': unbundle20}
898 898
899 899 b2streamparamsmap = {}
900 900
901 901 def b2streamparamhandler(name):
902 902 """register a handler for a stream level parameter"""
903 903 def decorator(func):
904 904 assert name not in formatmap
905 905 b2streamparamsmap[name] = func
906 906 return func
907 907 return decorator
908 908
909 909 @b2streamparamhandler('compression')
910 910 def processcompression(unbundler, param, value):
911 911 """read compression parameter and install payload decompression"""
912 912 if value not in util.compengines.supportedbundletypes:
913 913 raise error.BundleUnknownFeatureError(params=(param,),
914 914 values=(value,))
915 915 unbundler._compengine = util.compengines.forbundletype(value)
916 916 if value is not None:
917 917 unbundler._compressed = True
918 918
919 919 class bundlepart(object):
920 920 """A bundle2 part contains application level payload
921 921
922 922 The part `type` is used to route the part to the application level
923 923 handler.
924 924
925 925 The part payload is contained in ``part.data``. It could be raw bytes or a
926 926 generator of byte chunks.
927 927
928 928 You can add parameters to the part using the ``addparam`` method.
929 929 Parameters can be either mandatory (default) or advisory. Remote side
930 930 should be able to safely ignore the advisory ones.
931 931
932 932 Both data and parameters cannot be modified after the generation has begun.
933 933 """
934 934
935 935 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
936 936 data='', mandatory=True):
937 937 validateparttype(parttype)
938 938 self.id = None
939 939 self.type = parttype
940 940 self._data = data
941 941 self._mandatoryparams = list(mandatoryparams)
942 942 self._advisoryparams = list(advisoryparams)
943 943 # checking for duplicated entries
944 944 self._seenparams = set()
945 945 for pname, __ in self._mandatoryparams + self._advisoryparams:
946 946 if pname in self._seenparams:
947 947 raise error.ProgrammingError('duplicated params: %s' % pname)
948 948 self._seenparams.add(pname)
949 949 # status of the part's generation:
950 950 # - None: not started,
951 951 # - False: currently generated,
952 952 # - True: generation done.
953 953 self._generated = None
954 954 self.mandatory = mandatory
955 955
956 956 def __repr__(self):
957 957 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
958 958 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
959 959 % (cls, id(self), self.id, self.type, self.mandatory))
960 960
961 961 def copy(self):
962 962 """return a copy of the part
963 963
964 964 The new part have the very same content but no partid assigned yet.
965 965 Parts with generated data cannot be copied."""
966 966 assert not util.safehasattr(self.data, 'next')
967 967 return self.__class__(self.type, self._mandatoryparams,
968 968 self._advisoryparams, self._data, self.mandatory)
969 969
970 970 # methods used to defines the part content
971 971 @property
972 972 def data(self):
973 973 return self._data
974 974
975 975 @data.setter
976 976 def data(self, data):
977 977 if self._generated is not None:
978 978 raise error.ReadOnlyPartError('part is being generated')
979 979 self._data = data
980 980
981 981 @property
982 982 def mandatoryparams(self):
983 983 # make it an immutable tuple to force people through ``addparam``
984 984 return tuple(self._mandatoryparams)
985 985
986 986 @property
987 987 def advisoryparams(self):
988 988 # make it an immutable tuple to force people through ``addparam``
989 989 return tuple(self._advisoryparams)
990 990
991 991 def addparam(self, name, value='', mandatory=True):
992 992 """add a parameter to the part
993 993
994 994 If 'mandatory' is set to True, the remote handler must claim support
995 995 for this parameter or the unbundling will be aborted.
996 996
997 997 The 'name' and 'value' cannot exceed 255 bytes each.
998 998 """
999 999 if self._generated is not None:
1000 1000 raise error.ReadOnlyPartError('part is being generated')
1001 1001 if name in self._seenparams:
1002 1002 raise ValueError('duplicated params: %s' % name)
1003 1003 self._seenparams.add(name)
1004 1004 params = self._advisoryparams
1005 1005 if mandatory:
1006 1006 params = self._mandatoryparams
1007 1007 params.append((name, value))
1008 1008
1009 1009 # methods used to generates the bundle2 stream
1010 1010 def getchunks(self, ui):
1011 1011 if self._generated is not None:
1012 1012 raise error.ProgrammingError('part can only be consumed once')
1013 1013 self._generated = False
1014 1014
1015 1015 if ui.debugflag:
1016 1016 msg = ['bundle2-output-part: "%s"' % self.type]
1017 1017 if not self.mandatory:
1018 1018 msg.append(' (advisory)')
1019 1019 nbmp = len(self.mandatoryparams)
1020 1020 nbap = len(self.advisoryparams)
1021 1021 if nbmp or nbap:
1022 1022 msg.append(' (params:')
1023 1023 if nbmp:
1024 1024 msg.append(' %i mandatory' % nbmp)
1025 1025 if nbap:
1026 1026 msg.append(' %i advisory' % nbmp)
1027 1027 msg.append(')')
1028 1028 if not self.data:
1029 1029 msg.append(' empty payload')
1030 1030 elif (util.safehasattr(self.data, 'next')
1031 1031 or util.safehasattr(self.data, '__next__')):
1032 1032 msg.append(' streamed payload')
1033 1033 else:
1034 1034 msg.append(' %i bytes payload' % len(self.data))
1035 1035 msg.append('\n')
1036 1036 ui.debug(''.join(msg))
1037 1037
1038 1038 #### header
1039 1039 if self.mandatory:
1040 1040 parttype = self.type.upper()
1041 1041 else:
1042 1042 parttype = self.type.lower()
1043 1043 outdebug(ui, 'part %s: "%s"' % (pycompat.bytestr(self.id), parttype))
1044 1044 ## parttype
1045 1045 header = [_pack(_fparttypesize, len(parttype)),
1046 1046 parttype, _pack(_fpartid, self.id),
1047 1047 ]
1048 1048 ## parameters
1049 1049 # count
1050 1050 manpar = self.mandatoryparams
1051 1051 advpar = self.advisoryparams
1052 1052 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
1053 1053 # size
1054 1054 parsizes = []
1055 1055 for key, value in manpar:
1056 1056 parsizes.append(len(key))
1057 1057 parsizes.append(len(value))
1058 1058 for key, value in advpar:
1059 1059 parsizes.append(len(key))
1060 1060 parsizes.append(len(value))
1061 1061 paramsizes = _pack(_makefpartparamsizes(len(parsizes) // 2), *parsizes)
1062 1062 header.append(paramsizes)
1063 1063 # key, value
1064 1064 for key, value in manpar:
1065 1065 header.append(key)
1066 1066 header.append(value)
1067 1067 for key, value in advpar:
1068 1068 header.append(key)
1069 1069 header.append(value)
1070 1070 ## finalize header
1071 1071 try:
1072 1072 headerchunk = ''.join(header)
1073 1073 except TypeError:
1074 1074 raise TypeError(r'Found a non-bytes trying to '
1075 1075 r'build bundle part header: %r' % header)
1076 1076 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
1077 1077 yield _pack(_fpartheadersize, len(headerchunk))
1078 1078 yield headerchunk
1079 1079 ## payload
1080 1080 try:
1081 1081 for chunk in self._payloadchunks():
1082 1082 outdebug(ui, 'payload chunk size: %i' % len(chunk))
1083 1083 yield _pack(_fpayloadsize, len(chunk))
1084 1084 yield chunk
1085 1085 except GeneratorExit:
1086 1086 # GeneratorExit means that nobody is listening for our
1087 1087 # results anyway, so just bail quickly rather than trying
1088 1088 # to produce an error part.
1089 1089 ui.debug('bundle2-generatorexit\n')
1090 1090 raise
1091 1091 except BaseException as exc:
1092 1092 bexc = util.forcebytestr(exc)
1093 1093 # backup exception data for later
1094 1094 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
1095 1095 % bexc)
1096 1096 tb = sys.exc_info()[2]
1097 1097 msg = 'unexpected error: %s' % bexc
1098 1098 interpart = bundlepart('error:abort', [('message', msg)],
1099 1099 mandatory=False)
1100 1100 interpart.id = 0
1101 1101 yield _pack(_fpayloadsize, -1)
1102 1102 for chunk in interpart.getchunks(ui=ui):
1103 1103 yield chunk
1104 1104 outdebug(ui, 'closing payload chunk')
1105 1105 # abort current part payload
1106 1106 yield _pack(_fpayloadsize, 0)
1107 1107 pycompat.raisewithtb(exc, tb)
1108 1108 # end of payload
1109 1109 outdebug(ui, 'closing payload chunk')
1110 1110 yield _pack(_fpayloadsize, 0)
1111 1111 self._generated = True
1112 1112
1113 1113 def _payloadchunks(self):
1114 1114 """yield chunks of a the part payload
1115 1115
1116 1116 Exists to handle the different methods to provide data to a part."""
1117 1117 # we only support fixed size data now.
1118 1118 # This will be improved in the future.
1119 1119 if (util.safehasattr(self.data, 'next')
1120 1120 or util.safehasattr(self.data, '__next__')):
1121 1121 buff = util.chunkbuffer(self.data)
1122 1122 chunk = buff.read(preferedchunksize)
1123 1123 while chunk:
1124 1124 yield chunk
1125 1125 chunk = buff.read(preferedchunksize)
1126 1126 elif len(self.data):
1127 1127 yield self.data
1128 1128
1129 1129
1130 1130 flaginterrupt = -1
1131 1131
1132 1132 class interrupthandler(unpackermixin):
1133 1133 """read one part and process it with restricted capability
1134 1134
1135 1135 This allows to transmit exception raised on the producer size during part
1136 1136 iteration while the consumer is reading a part.
1137 1137
1138 1138 Part processed in this manner only have access to a ui object,"""
1139 1139
1140 1140 def __init__(self, ui, fp):
1141 1141 super(interrupthandler, self).__init__(fp)
1142 1142 self.ui = ui
1143 1143
1144 1144 def _readpartheader(self):
1145 1145 """reads a part header size and return the bytes blob
1146 1146
1147 1147 returns None if empty"""
1148 1148 headersize = self._unpack(_fpartheadersize)[0]
1149 1149 if headersize < 0:
1150 1150 raise error.BundleValueError('negative part header size: %i'
1151 1151 % headersize)
1152 1152 indebug(self.ui, 'part header size: %i\n' % headersize)
1153 1153 if headersize:
1154 1154 return self._readexact(headersize)
1155 1155 return None
1156 1156
1157 1157 def __call__(self):
1158 1158
1159 1159 self.ui.debug('bundle2-input-stream-interrupt:'
1160 1160 ' opening out of band context\n')
1161 1161 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1162 1162 headerblock = self._readpartheader()
1163 1163 if headerblock is None:
1164 1164 indebug(self.ui, 'no part found during interruption.')
1165 1165 return
1166 1166 part = unbundlepart(self.ui, headerblock, self._fp)
1167 1167 op = interruptoperation(self.ui)
1168 1168 hardabort = False
1169 1169 try:
1170 1170 _processpart(op, part)
1171 1171 except (SystemExit, KeyboardInterrupt):
1172 1172 hardabort = True
1173 1173 raise
1174 1174 finally:
1175 1175 if not hardabort:
1176 1176 part.consume()
1177 1177 self.ui.debug('bundle2-input-stream-interrupt:'
1178 1178 ' closing out of band context\n')
1179 1179
1180 1180 class interruptoperation(object):
1181 1181 """A limited operation to be use by part handler during interruption
1182 1182
1183 1183 It only have access to an ui object.
1184 1184 """
1185 1185
1186 1186 def __init__(self, ui):
1187 1187 self.ui = ui
1188 1188 self.reply = None
1189 1189 self.captureoutput = False
1190 1190
1191 1191 @property
1192 1192 def repo(self):
1193 1193 raise error.ProgrammingError('no repo access from stream interruption')
1194 1194
1195 1195 def gettransaction(self):
1196 1196 raise TransactionUnavailable('no repo access from stream interruption')
1197 1197
1198 1198 def decodepayloadchunks(ui, fh):
1199 1199 """Reads bundle2 part payload data into chunks.
1200 1200
1201 1201 Part payload data consists of framed chunks. This function takes
1202 1202 a file handle and emits those chunks.
1203 1203 """
1204 1204 dolog = ui.configbool('devel', 'bundle2.debug')
1205 1205 debug = ui.debug
1206 1206
1207 1207 headerstruct = struct.Struct(_fpayloadsize)
1208 1208 headersize = headerstruct.size
1209 1209 unpack = headerstruct.unpack
1210 1210
1211 1211 readexactly = changegroup.readexactly
1212 1212 read = fh.read
1213 1213
1214 1214 chunksize = unpack(readexactly(fh, headersize))[0]
1215 1215 indebug(ui, 'payload chunk size: %i' % chunksize)
1216 1216
1217 1217 # changegroup.readexactly() is inlined below for performance.
1218 1218 while chunksize:
1219 1219 if chunksize >= 0:
1220 1220 s = read(chunksize)
1221 1221 if len(s) < chunksize:
1222 1222 raise error.Abort(_('stream ended unexpectedly '
1223 1223 ' (got %d bytes, expected %d)') %
1224 1224 (len(s), chunksize))
1225 1225
1226 1226 yield s
1227 1227 elif chunksize == flaginterrupt:
1228 1228 # Interrupt "signal" detected. The regular stream is interrupted
1229 1229 # and a bundle2 part follows. Consume it.
1230 1230 interrupthandler(ui, fh)()
1231 1231 else:
1232 1232 raise error.BundleValueError(
1233 1233 'negative payload chunk size: %s' % chunksize)
1234 1234
1235 1235 s = read(headersize)
1236 1236 if len(s) < headersize:
1237 1237 raise error.Abort(_('stream ended unexpectedly '
1238 1238 ' (got %d bytes, expected %d)') %
1239 1239 (len(s), chunksize))
1240 1240
1241 1241 chunksize = unpack(s)[0]
1242 1242
1243 1243 # indebug() inlined for performance.
1244 1244 if dolog:
1245 1245 debug('bundle2-input: payload chunk size: %i\n' % chunksize)
1246 1246
1247 1247 class unbundlepart(unpackermixin):
1248 1248 """a bundle part read from a bundle"""
1249 1249
1250 1250 def __init__(self, ui, header, fp):
1251 1251 super(unbundlepart, self).__init__(fp)
1252 1252 self._seekable = (util.safehasattr(fp, 'seek') and
1253 1253 util.safehasattr(fp, 'tell'))
1254 1254 self.ui = ui
1255 1255 # unbundle state attr
1256 1256 self._headerdata = header
1257 1257 self._headeroffset = 0
1258 1258 self._initialized = False
1259 1259 self.consumed = False
1260 1260 # part data
1261 1261 self.id = None
1262 1262 self.type = None
1263 1263 self.mandatoryparams = None
1264 1264 self.advisoryparams = None
1265 1265 self.params = None
1266 1266 self.mandatorykeys = ()
1267 1267 self._readheader()
1268 1268 self._mandatory = None
1269 1269 self._pos = 0
1270 1270
1271 1271 def _fromheader(self, size):
1272 1272 """return the next <size> byte from the header"""
1273 1273 offset = self._headeroffset
1274 1274 data = self._headerdata[offset:(offset + size)]
1275 1275 self._headeroffset = offset + size
1276 1276 return data
1277 1277
1278 1278 def _unpackheader(self, format):
1279 1279 """read given format from header
1280 1280
1281 1281 This automatically compute the size of the format to read."""
1282 1282 data = self._fromheader(struct.calcsize(format))
1283 1283 return _unpack(format, data)
1284 1284
1285 1285 def _initparams(self, mandatoryparams, advisoryparams):
1286 1286 """internal function to setup all logic related parameters"""
1287 1287 # make it read only to prevent people touching it by mistake.
1288 1288 self.mandatoryparams = tuple(mandatoryparams)
1289 1289 self.advisoryparams = tuple(advisoryparams)
1290 1290 # user friendly UI
1291 1291 self.params = util.sortdict(self.mandatoryparams)
1292 1292 self.params.update(self.advisoryparams)
1293 1293 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1294 1294
1295 1295 def _readheader(self):
1296 1296 """read the header and setup the object"""
1297 1297 typesize = self._unpackheader(_fparttypesize)[0]
1298 1298 self.type = self._fromheader(typesize)
1299 1299 indebug(self.ui, 'part type: "%s"' % self.type)
1300 1300 self.id = self._unpackheader(_fpartid)[0]
1301 1301 indebug(self.ui, 'part id: "%s"' % pycompat.bytestr(self.id))
1302 1302 # extract mandatory bit from type
1303 1303 self.mandatory = (self.type != self.type.lower())
1304 1304 self.type = self.type.lower()
1305 1305 ## reading parameters
1306 1306 # param count
1307 1307 mancount, advcount = self._unpackheader(_fpartparamcount)
1308 1308 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1309 1309 # param size
1310 1310 fparamsizes = _makefpartparamsizes(mancount + advcount)
1311 1311 paramsizes = self._unpackheader(fparamsizes)
1312 1312 # make it a list of couple again
1313 1313 paramsizes = list(zip(paramsizes[::2], paramsizes[1::2]))
1314 1314 # split mandatory from advisory
1315 1315 mansizes = paramsizes[:mancount]
1316 1316 advsizes = paramsizes[mancount:]
1317 1317 # retrieve param value
1318 1318 manparams = []
1319 1319 for key, value in mansizes:
1320 1320 manparams.append((self._fromheader(key), self._fromheader(value)))
1321 1321 advparams = []
1322 1322 for key, value in advsizes:
1323 1323 advparams.append((self._fromheader(key), self._fromheader(value)))
1324 1324 self._initparams(manparams, advparams)
1325 1325 ## part payload
1326 1326 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1327 1327 # we read the data, tell it
1328 1328 self._initialized = True
1329 1329
1330 1330 def _payloadchunks(self):
1331 1331 """Generator of decoded chunks in the payload."""
1332 1332 return decodepayloadchunks(self.ui, self._fp)
1333 1333
1334 1334 def consume(self):
1335 1335 """Read the part payload until completion.
1336 1336
1337 1337 By consuming the part data, the underlying stream read offset will
1338 1338 be advanced to the next part (or end of stream).
1339 1339 """
1340 1340 if self.consumed:
1341 1341 return
1342 1342
1343 1343 chunk = self.read(32768)
1344 1344 while chunk:
1345 1345 self._pos += len(chunk)
1346 1346 chunk = self.read(32768)
1347 1347
1348 1348 def read(self, size=None):
1349 1349 """read payload data"""
1350 1350 if not self._initialized:
1351 1351 self._readheader()
1352 1352 if size is None:
1353 1353 data = self._payloadstream.read()
1354 1354 else:
1355 1355 data = self._payloadstream.read(size)
1356 1356 self._pos += len(data)
1357 1357 if size is None or len(data) < size:
1358 1358 if not self.consumed and self._pos:
1359 1359 self.ui.debug('bundle2-input-part: total payload size %i\n'
1360 1360 % self._pos)
1361 1361 self.consumed = True
1362 1362 return data
1363 1363
1364 1364 class seekableunbundlepart(unbundlepart):
1365 1365 """A bundle2 part in a bundle that is seekable.
1366 1366
1367 1367 Regular ``unbundlepart`` instances can only be read once. This class
1368 1368 extends ``unbundlepart`` to enable bi-directional seeking within the
1369 1369 part.
1370 1370
1371 1371 Bundle2 part data consists of framed chunks. Offsets when seeking
1372 1372 refer to the decoded data, not the offsets in the underlying bundle2
1373 1373 stream.
1374 1374
1375 1375 To facilitate quickly seeking within the decoded data, instances of this
1376 1376 class maintain a mapping between offsets in the underlying stream and
1377 1377 the decoded payload. This mapping will consume memory in proportion
1378 1378 to the number of chunks within the payload (which almost certainly
1379 1379 increases in proportion with the size of the part).
1380 1380 """
1381 1381 def __init__(self, ui, header, fp):
1382 1382 # (payload, file) offsets for chunk starts.
1383 1383 self._chunkindex = []
1384 1384
1385 1385 super(seekableunbundlepart, self).__init__(ui, header, fp)
1386 1386
1387 1387 def _payloadchunks(self, chunknum=0):
1388 1388 '''seek to specified chunk and start yielding data'''
1389 1389 if len(self._chunkindex) == 0:
1390 1390 assert chunknum == 0, 'Must start with chunk 0'
1391 1391 self._chunkindex.append((0, self._tellfp()))
1392 1392 else:
1393 1393 assert chunknum < len(self._chunkindex), \
1394 1394 'Unknown chunk %d' % chunknum
1395 1395 self._seekfp(self._chunkindex[chunknum][1])
1396 1396
1397 1397 pos = self._chunkindex[chunknum][0]
1398 1398
1399 1399 for chunk in decodepayloadchunks(self.ui, self._fp):
1400 1400 chunknum += 1
1401 1401 pos += len(chunk)
1402 1402 if chunknum == len(self._chunkindex):
1403 1403 self._chunkindex.append((pos, self._tellfp()))
1404 1404
1405 1405 yield chunk
1406 1406
1407 1407 def _findchunk(self, pos):
1408 1408 '''for a given payload position, return a chunk number and offset'''
1409 1409 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1410 1410 if ppos == pos:
1411 1411 return chunk, 0
1412 1412 elif ppos > pos:
1413 1413 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1414 1414 raise ValueError('Unknown chunk')
1415 1415
1416 1416 def tell(self):
1417 1417 return self._pos
1418 1418
1419 1419 def seek(self, offset, whence=os.SEEK_SET):
1420 1420 if whence == os.SEEK_SET:
1421 1421 newpos = offset
1422 1422 elif whence == os.SEEK_CUR:
1423 1423 newpos = self._pos + offset
1424 1424 elif whence == os.SEEK_END:
1425 1425 if not self.consumed:
1426 1426 # Can't use self.consume() here because it advances self._pos.
1427 1427 chunk = self.read(32768)
1428 1428 while chunk:
1429 1429 chunk = self.read(32768)
1430 1430 newpos = self._chunkindex[-1][0] - offset
1431 1431 else:
1432 1432 raise ValueError('Unknown whence value: %r' % (whence,))
1433 1433
1434 1434 if newpos > self._chunkindex[-1][0] and not self.consumed:
1435 1435 # Can't use self.consume() here because it advances self._pos.
1436 1436 chunk = self.read(32768)
1437 1437 while chunk:
1438 1438 chunk = self.read(32668)
1439 1439
1440 1440 if not 0 <= newpos <= self._chunkindex[-1][0]:
1441 1441 raise ValueError('Offset out of range')
1442 1442
1443 1443 if self._pos != newpos:
1444 1444 chunk, internaloffset = self._findchunk(newpos)
1445 1445 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1446 1446 adjust = self.read(internaloffset)
1447 1447 if len(adjust) != internaloffset:
1448 1448 raise error.Abort(_('Seek failed\n'))
1449 1449 self._pos = newpos
1450 1450
1451 1451 def _seekfp(self, offset, whence=0):
1452 1452 """move the underlying file pointer
1453 1453
1454 1454 This method is meant for internal usage by the bundle2 protocol only.
1455 1455 They directly manipulate the low level stream including bundle2 level
1456 1456 instruction.
1457 1457
1458 1458 Do not use it to implement higher-level logic or methods."""
1459 1459 if self._seekable:
1460 1460 return self._fp.seek(offset, whence)
1461 1461 else:
1462 1462 raise NotImplementedError(_('File pointer is not seekable'))
1463 1463
1464 1464 def _tellfp(self):
1465 1465 """return the file offset, or None if file is not seekable
1466 1466
1467 1467 This method is meant for internal usage by the bundle2 protocol only.
1468 1468 They directly manipulate the low level stream including bundle2 level
1469 1469 instruction.
1470 1470
1471 1471 Do not use it to implement higher-level logic or methods."""
1472 1472 if self._seekable:
1473 1473 try:
1474 1474 return self._fp.tell()
1475 1475 except IOError as e:
1476 1476 if e.errno == errno.ESPIPE:
1477 1477 self._seekable = False
1478 1478 else:
1479 1479 raise
1480 1480 return None
1481 1481
1482 1482 # These are only the static capabilities.
1483 1483 # Check the 'getrepocaps' function for the rest.
1484 1484 capabilities = {'HG20': (),
1485 1485 'bookmarks': (),
1486 1486 'error': ('abort', 'unsupportedcontent', 'pushraced',
1487 1487 'pushkey'),
1488 1488 'listkeys': (),
1489 1489 'pushkey': (),
1490 1490 'digests': tuple(sorted(util.DIGESTS.keys())),
1491 1491 'remote-changegroup': ('http', 'https'),
1492 1492 'hgtagsfnodes': (),
1493 1493 'phases': ('heads',),
1494 1494 'stream': ('v2',),
1495 1495 }
1496 1496
1497 1497 def getrepocaps(repo, allowpushback=False, role=None):
1498 1498 """return the bundle2 capabilities for a given repo
1499 1499
1500 1500 Exists to allow extensions (like evolution) to mutate the capabilities.
1501 1501
1502 1502 The returned value is used for servers advertising their capabilities as
1503 1503 well as clients advertising their capabilities to servers as part of
1504 1504 bundle2 requests. The ``role`` argument specifies which is which.
1505 1505 """
1506 1506 if role not in ('client', 'server'):
1507 1507 raise error.ProgrammingError('role argument must be client or server')
1508 1508
1509 1509 caps = capabilities.copy()
1510 1510 caps['changegroup'] = tuple(sorted(
1511 1511 changegroup.supportedincomingversions(repo)))
1512 1512 if obsolete.isenabled(repo, obsolete.exchangeopt):
1513 1513 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1514 1514 caps['obsmarkers'] = supportedformat
1515 1515 if allowpushback:
1516 1516 caps['pushback'] = ()
1517 1517 cpmode = repo.ui.config('server', 'concurrent-push-mode')
1518 1518 if cpmode == 'check-related':
1519 1519 caps['checkheads'] = ('related',)
1520 1520 if 'phases' in repo.ui.configlist('devel', 'legacy.exchange'):
1521 1521 caps.pop('phases')
1522 1522
1523 1523 # Don't advertise stream clone support in server mode if not configured.
1524 1524 if role == 'server':
1525 1525 streamsupported = repo.ui.configbool('server', 'uncompressed',
1526 1526 untrusted=True)
1527 1527 featuresupported = repo.ui.configbool('experimental', 'bundle2.stream')
1528 1528
1529 1529 if not streamsupported or not featuresupported:
1530 1530 caps.pop('stream')
1531 1531 # Else always advertise support on client, because payload support
1532 1532 # should always be advertised.
1533 1533
1534 1534 return caps
1535 1535
1536 1536 def bundle2caps(remote):
1537 1537 """return the bundle capabilities of a peer as dict"""
1538 1538 raw = remote.capable('bundle2')
1539 1539 if not raw and raw != '':
1540 1540 return {}
1541 1541 capsblob = urlreq.unquote(remote.capable('bundle2'))
1542 1542 return decodecaps(capsblob)
1543 1543
1544 1544 def obsmarkersversion(caps):
1545 1545 """extract the list of supported obsmarkers versions from a bundle2caps dict
1546 1546 """
1547 1547 obscaps = caps.get('obsmarkers', ())
1548 1548 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1549 1549
1550 1550 def writenewbundle(ui, repo, source, filename, bundletype, outgoing, opts,
1551 1551 vfs=None, compression=None, compopts=None):
1552 1552 if bundletype.startswith('HG10'):
1553 1553 cg = changegroup.makechangegroup(repo, outgoing, '01', source)
1554 1554 return writebundle(ui, cg, filename, bundletype, vfs=vfs,
1555 1555 compression=compression, compopts=compopts)
1556 1556 elif not bundletype.startswith('HG20'):
1557 1557 raise error.ProgrammingError('unknown bundle type: %s' % bundletype)
1558 1558
1559 1559 caps = {}
1560 1560 if 'obsolescence' in opts:
1561 1561 caps['obsmarkers'] = ('V1',)
1562 1562 bundle = bundle20(ui, caps)
1563 1563 bundle.setcompression(compression, compopts)
1564 1564 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1565 1565 chunkiter = bundle.getchunks()
1566 1566
1567 1567 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1568 1568
1569 1569 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1570 1570 # We should eventually reconcile this logic with the one behind
1571 1571 # 'exchange.getbundle2partsgenerator'.
1572 1572 #
1573 1573 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1574 1574 # different right now. So we keep them separated for now for the sake of
1575 1575 # simplicity.
1576 1576
1577 1577 # we always want a changegroup in such bundle
1578 1578 cgversion = opts.get('cg.version')
1579 1579 if cgversion is None:
1580 1580 cgversion = changegroup.safeversion(repo)
1581 1581 cg = changegroup.makechangegroup(repo, outgoing, cgversion, source)
1582 1582 part = bundler.newpart('changegroup', data=cg.getchunks())
1583 1583 part.addparam('version', cg.version)
1584 1584 if 'clcount' in cg.extras:
1585 1585 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1586 1586 mandatory=False)
1587 1587 if opts.get('phases') and repo.revs('%ln and secret()',
1588 1588 outgoing.missingheads):
1589 1589 part.addparam('targetphase', '%d' % phases.secret, mandatory=False)
1590 1590
1591 1591 addparttagsfnodescache(repo, bundler, outgoing)
1592 1592
1593 1593 if opts.get('obsolescence', False):
1594 1594 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1595 1595 buildobsmarkerspart(bundler, obsmarkers)
1596 1596
1597 1597 if opts.get('phases', False):
1598 1598 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1599 1599 phasedata = phases.binaryencode(headsbyphase)
1600 1600 bundler.newpart('phase-heads', data=phasedata)
1601 1601
1602 1602 def addparttagsfnodescache(repo, bundler, outgoing):
1603 1603 # we include the tags fnode cache for the bundle changeset
1604 1604 # (as an optional parts)
1605 1605 cache = tags.hgtagsfnodescache(repo.unfiltered())
1606 1606 chunks = []
1607 1607
1608 1608 # .hgtags fnodes are only relevant for head changesets. While we could
1609 1609 # transfer values for all known nodes, there will likely be little to
1610 1610 # no benefit.
1611 1611 #
1612 1612 # We don't bother using a generator to produce output data because
1613 1613 # a) we only have 40 bytes per head and even esoteric numbers of heads
1614 1614 # consume little memory (1M heads is 40MB) b) we don't want to send the
1615 1615 # part if we don't have entries and knowing if we have entries requires
1616 1616 # cache lookups.
1617 1617 for node in outgoing.missingheads:
1618 1618 # Don't compute missing, as this may slow down serving.
1619 1619 fnode = cache.getfnode(node, computemissing=False)
1620 1620 if fnode is not None:
1621 1621 chunks.extend([node, fnode])
1622 1622
1623 1623 if chunks:
1624 1624 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1625 1625
1626 1626 def buildobsmarkerspart(bundler, markers):
1627 1627 """add an obsmarker part to the bundler with <markers>
1628 1628
1629 1629 No part is created if markers is empty.
1630 1630 Raises ValueError if the bundler doesn't support any known obsmarker format.
1631 1631 """
1632 1632 if not markers:
1633 1633 return None
1634 1634
1635 1635 remoteversions = obsmarkersversion(bundler.capabilities)
1636 1636 version = obsolete.commonversion(remoteversions)
1637 1637 if version is None:
1638 1638 raise ValueError('bundler does not support common obsmarker format')
1639 1639 stream = obsolete.encodemarkers(markers, True, version=version)
1640 1640 return bundler.newpart('obsmarkers', data=stream)
1641 1641
1642 1642 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1643 1643 compopts=None):
1644 1644 """Write a bundle file and return its filename.
1645 1645
1646 1646 Existing files will not be overwritten.
1647 1647 If no filename is specified, a temporary file is created.
1648 1648 bz2 compression can be turned off.
1649 1649 The bundle file will be deleted in case of errors.
1650 1650 """
1651 1651
1652 1652 if bundletype == "HG20":
1653 1653 bundle = bundle20(ui)
1654 1654 bundle.setcompression(compression, compopts)
1655 1655 part = bundle.newpart('changegroup', data=cg.getchunks())
1656 1656 part.addparam('version', cg.version)
1657 1657 if 'clcount' in cg.extras:
1658 1658 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1659 1659 mandatory=False)
1660 1660 chunkiter = bundle.getchunks()
1661 1661 else:
1662 1662 # compression argument is only for the bundle2 case
1663 1663 assert compression is None
1664 1664 if cg.version != '01':
1665 1665 raise error.Abort(_('old bundle types only supports v1 '
1666 1666 'changegroups'))
1667 1667 header, comp = bundletypes[bundletype]
1668 1668 if comp not in util.compengines.supportedbundletypes:
1669 1669 raise error.Abort(_('unknown stream compression type: %s')
1670 1670 % comp)
1671 1671 compengine = util.compengines.forbundletype(comp)
1672 1672 def chunkiter():
1673 1673 yield header
1674 1674 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1675 1675 yield chunk
1676 1676 chunkiter = chunkiter()
1677 1677
1678 1678 # parse the changegroup data, otherwise we will block
1679 1679 # in case of sshrepo because we don't know the end of the stream
1680 1680 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1681 1681
1682 1682 def combinechangegroupresults(op):
1683 1683 """logic to combine 0 or more addchangegroup results into one"""
1684 1684 results = [r.get('return', 0)
1685 1685 for r in op.records['changegroup']]
1686 1686 changedheads = 0
1687 1687 result = 1
1688 1688 for ret in results:
1689 1689 # If any changegroup result is 0, return 0
1690 1690 if ret == 0:
1691 1691 result = 0
1692 1692 break
1693 1693 if ret < -1:
1694 1694 changedheads += ret + 1
1695 1695 elif ret > 1:
1696 1696 changedheads += ret - 1
1697 1697 if changedheads > 0:
1698 1698 result = 1 + changedheads
1699 1699 elif changedheads < 0:
1700 1700 result = -1 + changedheads
1701 1701 return result
1702 1702
1703 1703 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest',
1704 1704 'targetphase'))
1705 1705 def handlechangegroup(op, inpart):
1706 1706 """apply a changegroup part on the repo
1707 1707
1708 1708 This is a very early implementation that will massive rework before being
1709 1709 inflicted to any end-user.
1710 1710 """
1711 1711 tr = op.gettransaction()
1712 1712 unpackerversion = inpart.params.get('version', '01')
1713 1713 # We should raise an appropriate exception here
1714 1714 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1715 1715 # the source and url passed here are overwritten by the one contained in
1716 1716 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1717 1717 nbchangesets = None
1718 1718 if 'nbchanges' in inpart.params:
1719 1719 nbchangesets = int(inpart.params.get('nbchanges'))
1720 1720 if ('treemanifest' in inpart.params and
1721 1721 'treemanifest' not in op.repo.requirements):
1722 1722 if len(op.repo.changelog) != 0:
1723 1723 raise error.Abort(_(
1724 1724 "bundle contains tree manifests, but local repo is "
1725 1725 "non-empty and does not use tree manifests"))
1726 1726 op.repo.requirements.add('treemanifest')
1727 1727 op.repo._applyopenerreqs()
1728 1728 op.repo._writerequirements()
1729 1729 extrakwargs = {}
1730 1730 targetphase = inpart.params.get('targetphase')
1731 1731 if targetphase is not None:
1732 1732 extrakwargs['targetphase'] = int(targetphase)
1733 1733 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2',
1734 1734 expectedtotal=nbchangesets, **extrakwargs)
1735 1735 if op.reply is not None:
1736 1736 # This is definitely not the final form of this
1737 1737 # return. But one need to start somewhere.
1738 1738 part = op.reply.newpart('reply:changegroup', mandatory=False)
1739 1739 part.addparam(
1740 1740 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1741 1741 part.addparam('return', '%i' % ret, mandatory=False)
1742 1742 assert not inpart.read()
1743 1743
1744 1744 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1745 1745 ['digest:%s' % k for k in util.DIGESTS.keys()])
1746 1746 @parthandler('remote-changegroup', _remotechangegroupparams)
1747 1747 def handleremotechangegroup(op, inpart):
1748 1748 """apply a bundle10 on the repo, given an url and validation information
1749 1749
1750 1750 All the information about the remote bundle to import are given as
1751 1751 parameters. The parameters include:
1752 1752 - url: the url to the bundle10.
1753 1753 - size: the bundle10 file size. It is used to validate what was
1754 1754 retrieved by the client matches the server knowledge about the bundle.
1755 1755 - digests: a space separated list of the digest types provided as
1756 1756 parameters.
1757 1757 - digest:<digest-type>: the hexadecimal representation of the digest with
1758 1758 that name. Like the size, it is used to validate what was retrieved by
1759 1759 the client matches what the server knows about the bundle.
1760 1760
1761 1761 When multiple digest types are given, all of them are checked.
1762 1762 """
1763 1763 try:
1764 1764 raw_url = inpart.params['url']
1765 1765 except KeyError:
1766 1766 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1767 1767 parsed_url = util.url(raw_url)
1768 1768 if parsed_url.scheme not in capabilities['remote-changegroup']:
1769 1769 raise error.Abort(_('remote-changegroup does not support %s urls') %
1770 1770 parsed_url.scheme)
1771 1771
1772 1772 try:
1773 1773 size = int(inpart.params['size'])
1774 1774 except ValueError:
1775 1775 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1776 1776 % 'size')
1777 1777 except KeyError:
1778 1778 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1779 1779
1780 1780 digests = {}
1781 1781 for typ in inpart.params.get('digests', '').split():
1782 1782 param = 'digest:%s' % typ
1783 1783 try:
1784 1784 value = inpart.params[param]
1785 1785 except KeyError:
1786 1786 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1787 1787 param)
1788 1788 digests[typ] = value
1789 1789
1790 1790 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1791 1791
1792 1792 tr = op.gettransaction()
1793 1793 from . import exchange
1794 1794 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1795 1795 if not isinstance(cg, changegroup.cg1unpacker):
1796 1796 raise error.Abort(_('%s: not a bundle version 1.0') %
1797 1797 util.hidepassword(raw_url))
1798 1798 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2')
1799 1799 if op.reply is not None:
1800 1800 # This is definitely not the final form of this
1801 1801 # return. But one need to start somewhere.
1802 1802 part = op.reply.newpart('reply:changegroup')
1803 1803 part.addparam(
1804 1804 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1805 1805 part.addparam('return', '%i' % ret, mandatory=False)
1806 1806 try:
1807 1807 real_part.validate()
1808 1808 except error.Abort as e:
1809 1809 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1810 1810 (util.hidepassword(raw_url), str(e)))
1811 1811 assert not inpart.read()
1812 1812
1813 1813 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1814 1814 def handlereplychangegroup(op, inpart):
1815 1815 ret = int(inpart.params['return'])
1816 1816 replyto = int(inpart.params['in-reply-to'])
1817 1817 op.records.add('changegroup', {'return': ret}, replyto)
1818 1818
1819 1819 @parthandler('check:bookmarks')
1820 1820 def handlecheckbookmarks(op, inpart):
1821 1821 """check location of bookmarks
1822 1822
1823 1823 This part is to be used to detect push race regarding bookmark, it
1824 1824 contains binary encoded (bookmark, node) tuple. If the local state does
1825 1825 not marks the one in the part, a PushRaced exception is raised
1826 1826 """
1827 1827 bookdata = bookmarks.binarydecode(inpart)
1828 1828
1829 1829 msgstandard = ('repository changed while pushing - please try again '
1830 1830 '(bookmark "%s" move from %s to %s)')
1831 1831 msgmissing = ('repository changed while pushing - please try again '
1832 1832 '(bookmark "%s" is missing, expected %s)')
1833 1833 msgexist = ('repository changed while pushing - please try again '
1834 1834 '(bookmark "%s" set on %s, expected missing)')
1835 1835 for book, node in bookdata:
1836 1836 currentnode = op.repo._bookmarks.get(book)
1837 1837 if currentnode != node:
1838 1838 if node is None:
1839 1839 finalmsg = msgexist % (book, nodemod.short(currentnode))
1840 1840 elif currentnode is None:
1841 1841 finalmsg = msgmissing % (book, nodemod.short(node))
1842 1842 else:
1843 1843 finalmsg = msgstandard % (book, nodemod.short(node),
1844 1844 nodemod.short(currentnode))
1845 1845 raise error.PushRaced(finalmsg)
1846 1846
1847 1847 @parthandler('check:heads')
1848 1848 def handlecheckheads(op, inpart):
1849 1849 """check that head of the repo did not change
1850 1850
1851 1851 This is used to detect a push race when using unbundle.
1852 1852 This replaces the "heads" argument of unbundle."""
1853 1853 h = inpart.read(20)
1854 1854 heads = []
1855 1855 while len(h) == 20:
1856 1856 heads.append(h)
1857 1857 h = inpart.read(20)
1858 1858 assert not h
1859 1859 # Trigger a transaction so that we are guaranteed to have the lock now.
1860 1860 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1861 1861 op.gettransaction()
1862 1862 if sorted(heads) != sorted(op.repo.heads()):
1863 1863 raise error.PushRaced('repository changed while pushing - '
1864 1864 'please try again')
1865 1865
1866 1866 @parthandler('check:updated-heads')
1867 1867 def handlecheckupdatedheads(op, inpart):
1868 1868 """check for race on the heads touched by a push
1869 1869
1870 1870 This is similar to 'check:heads' but focus on the heads actually updated
1871 1871 during the push. If other activities happen on unrelated heads, it is
1872 1872 ignored.
1873 1873
1874 1874 This allow server with high traffic to avoid push contention as long as
1875 1875 unrelated parts of the graph are involved."""
1876 1876 h = inpart.read(20)
1877 1877 heads = []
1878 1878 while len(h) == 20:
1879 1879 heads.append(h)
1880 1880 h = inpart.read(20)
1881 1881 assert not h
1882 1882 # trigger a transaction so that we are guaranteed to have the lock now.
1883 1883 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1884 1884 op.gettransaction()
1885 1885
1886 1886 currentheads = set()
1887 1887 for ls in op.repo.branchmap().itervalues():
1888 1888 currentheads.update(ls)
1889 1889
1890 1890 for h in heads:
1891 1891 if h not in currentheads:
1892 1892 raise error.PushRaced('repository changed while pushing - '
1893 1893 'please try again')
1894 1894
1895 1895 @parthandler('check:phases')
1896 1896 def handlecheckphases(op, inpart):
1897 1897 """check that phase boundaries of the repository did not change
1898 1898
1899 1899 This is used to detect a push race.
1900 1900 """
1901 1901 phasetonodes = phases.binarydecode(inpart)
1902 1902 unfi = op.repo.unfiltered()
1903 1903 cl = unfi.changelog
1904 1904 phasecache = unfi._phasecache
1905 1905 msg = ('repository changed while pushing - please try again '
1906 1906 '(%s is %s expected %s)')
1907 1907 for expectedphase, nodes in enumerate(phasetonodes):
1908 1908 for n in nodes:
1909 1909 actualphase = phasecache.phase(unfi, cl.rev(n))
1910 1910 if actualphase != expectedphase:
1911 1911 finalmsg = msg % (nodemod.short(n),
1912 1912 phases.phasenames[actualphase],
1913 1913 phases.phasenames[expectedphase])
1914 1914 raise error.PushRaced(finalmsg)
1915 1915
1916 1916 @parthandler('output')
1917 1917 def handleoutput(op, inpart):
1918 1918 """forward output captured on the server to the client"""
1919 1919 for line in inpart.read().splitlines():
1920 1920 op.ui.status(_('remote: %s\n') % line)
1921 1921
1922 1922 @parthandler('replycaps')
1923 1923 def handlereplycaps(op, inpart):
1924 1924 """Notify that a reply bundle should be created
1925 1925
1926 1926 The payload contains the capabilities information for the reply"""
1927 1927 caps = decodecaps(inpart.read())
1928 1928 if op.reply is None:
1929 1929 op.reply = bundle20(op.ui, caps)
1930 1930
1931 1931 class AbortFromPart(error.Abort):
1932 1932 """Sub-class of Abort that denotes an error from a bundle2 part."""
1933 1933
1934 1934 @parthandler('error:abort', ('message', 'hint'))
1935 1935 def handleerrorabort(op, inpart):
1936 1936 """Used to transmit abort error over the wire"""
1937 1937 raise AbortFromPart(inpart.params['message'],
1938 1938 hint=inpart.params.get('hint'))
1939 1939
1940 1940 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1941 1941 'in-reply-to'))
1942 1942 def handleerrorpushkey(op, inpart):
1943 1943 """Used to transmit failure of a mandatory pushkey over the wire"""
1944 1944 kwargs = {}
1945 1945 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1946 1946 value = inpart.params.get(name)
1947 1947 if value is not None:
1948 1948 kwargs[name] = value
1949 1949 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1950 1950
1951 1951 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1952 1952 def handleerrorunsupportedcontent(op, inpart):
1953 1953 """Used to transmit unknown content error over the wire"""
1954 1954 kwargs = {}
1955 1955 parttype = inpart.params.get('parttype')
1956 1956 if parttype is not None:
1957 1957 kwargs['parttype'] = parttype
1958 1958 params = inpart.params.get('params')
1959 1959 if params is not None:
1960 1960 kwargs['params'] = params.split('\0')
1961 1961
1962 1962 raise error.BundleUnknownFeatureError(**kwargs)
1963 1963
1964 1964 @parthandler('error:pushraced', ('message',))
1965 1965 def handleerrorpushraced(op, inpart):
1966 1966 """Used to transmit push race error over the wire"""
1967 1967 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1968 1968
1969 1969 @parthandler('listkeys', ('namespace',))
1970 1970 def handlelistkeys(op, inpart):
1971 1971 """retrieve pushkey namespace content stored in a bundle2"""
1972 1972 namespace = inpart.params['namespace']
1973 1973 r = pushkey.decodekeys(inpart.read())
1974 1974 op.records.add('listkeys', (namespace, r))
1975 1975
1976 1976 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1977 1977 def handlepushkey(op, inpart):
1978 1978 """process a pushkey request"""
1979 1979 dec = pushkey.decode
1980 1980 namespace = dec(inpart.params['namespace'])
1981 1981 key = dec(inpart.params['key'])
1982 1982 old = dec(inpart.params['old'])
1983 1983 new = dec(inpart.params['new'])
1984 1984 # Grab the transaction to ensure that we have the lock before performing the
1985 1985 # pushkey.
1986 1986 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1987 1987 op.gettransaction()
1988 1988 ret = op.repo.pushkey(namespace, key, old, new)
1989 1989 record = {'namespace': namespace,
1990 1990 'key': key,
1991 1991 'old': old,
1992 1992 'new': new}
1993 1993 op.records.add('pushkey', record)
1994 1994 if op.reply is not None:
1995 1995 rpart = op.reply.newpart('reply:pushkey')
1996 1996 rpart.addparam(
1997 1997 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1998 1998 rpart.addparam('return', '%i' % ret, mandatory=False)
1999 1999 if inpart.mandatory and not ret:
2000 2000 kwargs = {}
2001 2001 for key in ('namespace', 'key', 'new', 'old', 'ret'):
2002 2002 if key in inpart.params:
2003 2003 kwargs[key] = inpart.params[key]
2004 2004 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
2005 2005
2006 2006 @parthandler('bookmarks')
2007 2007 def handlebookmark(op, inpart):
2008 2008 """transmit bookmark information
2009 2009
2010 2010 The part contains binary encoded bookmark information.
2011 2011
2012 2012 The exact behavior of this part can be controlled by the 'bookmarks' mode
2013 2013 on the bundle operation.
2014 2014
2015 2015 When mode is 'apply' (the default) the bookmark information is applied as
2016 2016 is to the unbundling repository. Make sure a 'check:bookmarks' part is
2017 2017 issued earlier to check for push races in such update. This behavior is
2018 2018 suitable for pushing.
2019 2019
2020 2020 When mode is 'records', the information is recorded into the 'bookmarks'
2021 2021 records of the bundle operation. This behavior is suitable for pulling.
2022 2022 """
2023 2023 changes = bookmarks.binarydecode(inpart)
2024 2024
2025 2025 pushkeycompat = op.repo.ui.configbool('server', 'bookmarks-pushkey-compat')
2026 2026 bookmarksmode = op.modes.get('bookmarks', 'apply')
2027 2027
2028 2028 if bookmarksmode == 'apply':
2029 2029 tr = op.gettransaction()
2030 2030 bookstore = op.repo._bookmarks
2031 2031 if pushkeycompat:
2032 2032 allhooks = []
2033 2033 for book, node in changes:
2034 2034 hookargs = tr.hookargs.copy()
2035 2035 hookargs['pushkeycompat'] = '1'
2036 2036 hookargs['namespace'] = 'bookmark'
2037 2037 hookargs['key'] = book
2038 2038 hookargs['old'] = nodemod.hex(bookstore.get(book, ''))
2039 2039 hookargs['new'] = nodemod.hex(node if node is not None else '')
2040 2040 allhooks.append(hookargs)
2041 2041
2042 2042 for hookargs in allhooks:
2043 2043 op.repo.hook('prepushkey', throw=True, **hookargs)
2044 2044
2045 2045 bookstore.applychanges(op.repo, op.gettransaction(), changes)
2046 2046
2047 2047 if pushkeycompat:
2048 2048 def runhook():
2049 2049 for hookargs in allhooks:
2050 2050 op.repo.hook('pushkey', **hookargs)
2051 2051 op.repo._afterlock(runhook)
2052 2052
2053 2053 elif bookmarksmode == 'records':
2054 2054 for book, node in changes:
2055 2055 record = {'bookmark': book, 'node': node}
2056 2056 op.records.add('bookmarks', record)
2057 2057 else:
2058 2058 raise error.ProgrammingError('unkown bookmark mode: %s' % bookmarksmode)
2059 2059
2060 2060 @parthandler('phase-heads')
2061 2061 def handlephases(op, inpart):
2062 2062 """apply phases from bundle part to repo"""
2063 2063 headsbyphase = phases.binarydecode(inpart)
2064 2064 phases.updatephases(op.repo.unfiltered(), op.gettransaction, headsbyphase)
2065 2065
2066 2066 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
2067 2067 def handlepushkeyreply(op, inpart):
2068 2068 """retrieve the result of a pushkey request"""
2069 2069 ret = int(inpart.params['return'])
2070 2070 partid = int(inpart.params['in-reply-to'])
2071 2071 op.records.add('pushkey', {'return': ret}, partid)
2072 2072
2073 2073 @parthandler('obsmarkers')
2074 2074 def handleobsmarker(op, inpart):
2075 2075 """add a stream of obsmarkers to the repo"""
2076 2076 tr = op.gettransaction()
2077 2077 markerdata = inpart.read()
2078 2078 if op.ui.config('experimental', 'obsmarkers-exchange-debug'):
2079 2079 op.ui.write(('obsmarker-exchange: %i bytes received\n')
2080 2080 % len(markerdata))
2081 2081 # The mergemarkers call will crash if marker creation is not enabled.
2082 2082 # we want to avoid this if the part is advisory.
2083 2083 if not inpart.mandatory and op.repo.obsstore.readonly:
2084 2084 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled\n')
2085 2085 return
2086 2086 new = op.repo.obsstore.mergemarkers(tr, markerdata)
2087 2087 op.repo.invalidatevolatilesets()
2088 2088 if new:
2089 2089 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
2090 2090 op.records.add('obsmarkers', {'new': new})
2091 2091 if op.reply is not None:
2092 2092 rpart = op.reply.newpart('reply:obsmarkers')
2093 2093 rpart.addparam(
2094 2094 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
2095 2095 rpart.addparam('new', '%i' % new, mandatory=False)
2096 2096
2097 2097
2098 2098 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
2099 2099 def handleobsmarkerreply(op, inpart):
2100 2100 """retrieve the result of a pushkey request"""
2101 2101 ret = int(inpart.params['new'])
2102 2102 partid = int(inpart.params['in-reply-to'])
2103 2103 op.records.add('obsmarkers', {'new': ret}, partid)
2104 2104
2105 2105 @parthandler('hgtagsfnodes')
2106 2106 def handlehgtagsfnodes(op, inpart):
2107 2107 """Applies .hgtags fnodes cache entries to the local repo.
2108 2108
2109 2109 Payload is pairs of 20 byte changeset nodes and filenodes.
2110 2110 """
2111 2111 # Grab the transaction so we ensure that we have the lock at this point.
2112 2112 if op.ui.configbool('experimental', 'bundle2lazylocking'):
2113 2113 op.gettransaction()
2114 2114 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
2115 2115
2116 2116 count = 0
2117 2117 while True:
2118 2118 node = inpart.read(20)
2119 2119 fnode = inpart.read(20)
2120 2120 if len(node) < 20 or len(fnode) < 20:
2121 2121 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
2122 2122 break
2123 2123 cache.setfnode(node, fnode)
2124 2124 count += 1
2125 2125
2126 2126 cache.write()
2127 2127 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
2128 2128
2129 2129 @parthandler('pushvars')
2130 2130 def bundle2getvars(op, part):
2131 2131 '''unbundle a bundle2 containing shellvars on the server'''
2132 2132 # An option to disable unbundling on server-side for security reasons
2133 2133 if op.ui.configbool('push', 'pushvars.server'):
2134 2134 hookargs = {}
2135 2135 for key, value in part.advisoryparams:
2136 2136 key = key.upper()
2137 2137 # We want pushed variables to have USERVAR_ prepended so we know
2138 2138 # they came from the --pushvar flag.
2139 2139 key = "USERVAR_" + key
2140 2140 hookargs[key] = value
2141 2141 op.addhookargs(hookargs)
2142 2142
2143 2143 @parthandler('stream2', ('requirements', 'filecount', 'bytecount'))
2144 2144 def handlestreamv2bundle(op, part):
2145 2145
2146 2146 requirements = part.params['requirements'].split()
2147 2147 filecount = int(part.params['filecount'])
2148 2148 bytecount = int(part.params['bytecount'])
2149 2149
2150 2150 repo = op.repo
2151 2151 if len(repo):
2152 2152 msg = _('cannot apply stream clone to non empty repository')
2153 2153 raise error.Abort(msg)
2154 2154
2155 2155 repo.ui.debug('applying stream bundle\n')
2156 2156 streamclone.applybundlev2(repo, part, filecount, bytecount,
2157 2157 requirements)
2158
2159 # new requirements = old non-format requirements +
2160 # new format-related remote requirements
2161 # requirements from the streamed-in repository
2162 repo.requirements = set(requirements) | (
2163 repo.requirements - repo.supportedformats)
2164 repo._applyopenerreqs()
2165 repo._writerequirements()
@@ -1,634 +1,642 b''
1 1 # streamclone.py - producing and consuming streaming repository data
2 2 #
3 3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import contextlib
11 11 import os
12 12 import struct
13 13 import tempfile
14 14 import warnings
15 15
16 16 from .i18n import _
17 17 from . import (
18 18 branchmap,
19 19 cacheutil,
20 20 error,
21 21 phases,
22 22 store,
23 23 util,
24 24 )
25 25
26 26 def canperformstreamclone(pullop, bundle2=False):
27 27 """Whether it is possible to perform a streaming clone as part of pull.
28 28
29 29 ``bundle2`` will cause the function to consider stream clone through
30 30 bundle2 and only through bundle2.
31 31
32 32 Returns a tuple of (supported, requirements). ``supported`` is True if
33 33 streaming clone is supported and False otherwise. ``requirements`` is
34 34 a set of repo requirements from the remote, or ``None`` if stream clone
35 35 isn't supported.
36 36 """
37 37 repo = pullop.repo
38 38 remote = pullop.remote
39 39
40 40 bundle2supported = False
41 41 if pullop.canusebundle2:
42 42 if 'v2' in pullop.remotebundle2caps.get('stream', []):
43 43 bundle2supported = True
44 44 # else
45 45 # Server doesn't support bundle2 stream clone or doesn't support
46 46 # the versions we support. Fall back and possibly allow legacy.
47 47
48 48 # Ensures legacy code path uses available bundle2.
49 49 if bundle2supported and not bundle2:
50 50 return False, None
51 51 # Ensures bundle2 doesn't try to do a stream clone if it isn't supported.
52 52 elif bundle2 and not bundle2supported:
53 53 return False, None
54 54
55 55 # Streaming clone only works on empty repositories.
56 56 if len(repo):
57 57 return False, None
58 58
59 59 # Streaming clone only works if all data is being requested.
60 60 if pullop.heads:
61 61 return False, None
62 62
63 63 streamrequested = pullop.streamclonerequested
64 64
65 65 # If we don't have a preference, let the server decide for us. This
66 66 # likely only comes into play in LANs.
67 67 if streamrequested is None:
68 68 # The server can advertise whether to prefer streaming clone.
69 69 streamrequested = remote.capable('stream-preferred')
70 70
71 71 if not streamrequested:
72 72 return False, None
73 73
74 74 # In order for stream clone to work, the client has to support all the
75 75 # requirements advertised by the server.
76 76 #
77 77 # The server advertises its requirements via the "stream" and "streamreqs"
78 78 # capability. "stream" (a value-less capability) is advertised if and only
79 79 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
80 80 # is advertised and contains a comma-delimited list of requirements.
81 81 requirements = set()
82 82 if remote.capable('stream'):
83 83 requirements.add('revlogv1')
84 84 else:
85 85 streamreqs = remote.capable('streamreqs')
86 86 # This is weird and shouldn't happen with modern servers.
87 87 if not streamreqs:
88 88 pullop.repo.ui.warn(_(
89 89 'warning: stream clone requested but server has them '
90 90 'disabled\n'))
91 91 return False, None
92 92
93 93 streamreqs = set(streamreqs.split(','))
94 94 # Server requires something we don't support. Bail.
95 95 missingreqs = streamreqs - repo.supportedformats
96 96 if missingreqs:
97 97 pullop.repo.ui.warn(_(
98 98 'warning: stream clone requested but client is missing '
99 99 'requirements: %s\n') % ', '.join(sorted(missingreqs)))
100 100 pullop.repo.ui.warn(
101 101 _('(see https://www.mercurial-scm.org/wiki/MissingRequirement '
102 102 'for more information)\n'))
103 103 return False, None
104 104 requirements = streamreqs
105 105
106 106 return True, requirements
107 107
108 108 def maybeperformlegacystreamclone(pullop):
109 109 """Possibly perform a legacy stream clone operation.
110 110
111 111 Legacy stream clones are performed as part of pull but before all other
112 112 operations.
113 113
114 114 A legacy stream clone will not be performed if a bundle2 stream clone is
115 115 supported.
116 116 """
117 117 supported, requirements = canperformstreamclone(pullop)
118 118
119 119 if not supported:
120 120 return
121 121
122 122 repo = pullop.repo
123 123 remote = pullop.remote
124 124
125 125 # Save remote branchmap. We will use it later to speed up branchcache
126 126 # creation.
127 127 rbranchmap = None
128 128 if remote.capable('branchmap'):
129 129 rbranchmap = remote.branchmap()
130 130
131 131 repo.ui.status(_('streaming all changes\n'))
132 132
133 133 fp = remote.stream_out()
134 134 l = fp.readline()
135 135 try:
136 136 resp = int(l)
137 137 except ValueError:
138 138 raise error.ResponseError(
139 139 _('unexpected response from remote server:'), l)
140 140 if resp == 1:
141 141 raise error.Abort(_('operation forbidden by server'))
142 142 elif resp == 2:
143 143 raise error.Abort(_('locking the remote repository failed'))
144 144 elif resp != 0:
145 145 raise error.Abort(_('the server sent an unknown error code'))
146 146
147 147 l = fp.readline()
148 148 try:
149 149 filecount, bytecount = map(int, l.split(' ', 1))
150 150 except (ValueError, TypeError):
151 151 raise error.ResponseError(
152 152 _('unexpected response from remote server:'), l)
153 153
154 154 with repo.lock():
155 155 consumev1(repo, fp, filecount, bytecount)
156 156
157 157 # new requirements = old non-format requirements +
158 158 # new format-related remote requirements
159 159 # requirements from the streamed-in repository
160 160 repo.requirements = requirements | (
161 161 repo.requirements - repo.supportedformats)
162 162 repo._applyopenerreqs()
163 163 repo._writerequirements()
164 164
165 165 if rbranchmap:
166 166 branchmap.replacecache(repo, rbranchmap)
167 167
168 168 repo.invalidate()
169 169
170 170 def allowservergeneration(repo):
171 171 """Whether streaming clones are allowed from the server."""
172 172 if not repo.ui.configbool('server', 'uncompressed', untrusted=True):
173 173 return False
174 174
175 175 # The way stream clone works makes it impossible to hide secret changesets.
176 176 # So don't allow this by default.
177 177 secret = phases.hassecret(repo)
178 178 if secret:
179 179 return repo.ui.configbool('server', 'uncompressedallowsecret')
180 180
181 181 return True
182 182
183 183 # This is it's own function so extensions can override it.
184 184 def _walkstreamfiles(repo):
185 185 return repo.store.walk()
186 186
187 187 def generatev1(repo):
188 188 """Emit content for version 1 of a streaming clone.
189 189
190 190 This returns a 3-tuple of (file count, byte size, data iterator).
191 191
192 192 The data iterator consists of N entries for each file being transferred.
193 193 Each file entry starts as a line with the file name and integer size
194 194 delimited by a null byte.
195 195
196 196 The raw file data follows. Following the raw file data is the next file
197 197 entry, or EOF.
198 198
199 199 When used on the wire protocol, an additional line indicating protocol
200 200 success will be prepended to the stream. This function is not responsible
201 201 for adding it.
202 202
203 203 This function will obtain a repository lock to ensure a consistent view of
204 204 the store is captured. It therefore may raise LockError.
205 205 """
206 206 entries = []
207 207 total_bytes = 0
208 208 # Get consistent snapshot of repo, lock during scan.
209 209 with repo.lock():
210 210 repo.ui.debug('scanning\n')
211 211 for name, ename, size in _walkstreamfiles(repo):
212 212 if size:
213 213 entries.append((name, size))
214 214 total_bytes += size
215 215
216 216 repo.ui.debug('%d files, %d bytes to transfer\n' %
217 217 (len(entries), total_bytes))
218 218
219 219 svfs = repo.svfs
220 220 debugflag = repo.ui.debugflag
221 221
222 222 def emitrevlogdata():
223 223 for name, size in entries:
224 224 if debugflag:
225 225 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
226 226 # partially encode name over the wire for backwards compat
227 227 yield '%s\0%d\n' % (store.encodedir(name), size)
228 228 # auditing at this stage is both pointless (paths are already
229 229 # trusted by the local repo) and expensive
230 230 with svfs(name, 'rb', auditpath=False) as fp:
231 231 if size <= 65536:
232 232 yield fp.read(size)
233 233 else:
234 234 for chunk in util.filechunkiter(fp, limit=size):
235 235 yield chunk
236 236
237 237 return len(entries), total_bytes, emitrevlogdata()
238 238
239 239 def generatev1wireproto(repo):
240 240 """Emit content for version 1 of streaming clone suitable for the wire.
241 241
242 242 This is the data output from ``generatev1()`` with 2 header lines. The
243 243 first line indicates overall success. The 2nd contains the file count and
244 244 byte size of payload.
245 245
246 246 The success line contains "0" for success, "1" for stream generation not
247 247 allowed, and "2" for error locking the repository (possibly indicating
248 248 a permissions error for the server process).
249 249 """
250 250 if not allowservergeneration(repo):
251 251 yield '1\n'
252 252 return
253 253
254 254 try:
255 255 filecount, bytecount, it = generatev1(repo)
256 256 except error.LockError:
257 257 yield '2\n'
258 258 return
259 259
260 260 # Indicates successful response.
261 261 yield '0\n'
262 262 yield '%d %d\n' % (filecount, bytecount)
263 263 for chunk in it:
264 264 yield chunk
265 265
266 266 def generatebundlev1(repo, compression='UN'):
267 267 """Emit content for version 1 of a stream clone bundle.
268 268
269 269 The first 4 bytes of the output ("HGS1") denote this as stream clone
270 270 bundle version 1.
271 271
272 272 The next 2 bytes indicate the compression type. Only "UN" is currently
273 273 supported.
274 274
275 275 The next 16 bytes are two 64-bit big endian unsigned integers indicating
276 276 file count and byte count, respectively.
277 277
278 278 The next 2 bytes is a 16-bit big endian unsigned short declaring the length
279 279 of the requirements string, including a trailing \0. The following N bytes
280 280 are the requirements string, which is ASCII containing a comma-delimited
281 281 list of repo requirements that are needed to support the data.
282 282
283 283 The remaining content is the output of ``generatev1()`` (which may be
284 284 compressed in the future).
285 285
286 286 Returns a tuple of (requirements, data generator).
287 287 """
288 288 if compression != 'UN':
289 289 raise ValueError('we do not support the compression argument yet')
290 290
291 291 requirements = repo.requirements & repo.supportedformats
292 292 requires = ','.join(sorted(requirements))
293 293
294 294 def gen():
295 295 yield 'HGS1'
296 296 yield compression
297 297
298 298 filecount, bytecount, it = generatev1(repo)
299 299 repo.ui.status(_('writing %d bytes for %d files\n') %
300 300 (bytecount, filecount))
301 301
302 302 yield struct.pack('>QQ', filecount, bytecount)
303 303 yield struct.pack('>H', len(requires) + 1)
304 304 yield requires + '\0'
305 305
306 306 # This is where we'll add compression in the future.
307 307 assert compression == 'UN'
308 308
309 309 seen = 0
310 310 repo.ui.progress(_('bundle'), 0, total=bytecount, unit=_('bytes'))
311 311
312 312 for chunk in it:
313 313 seen += len(chunk)
314 314 repo.ui.progress(_('bundle'), seen, total=bytecount,
315 315 unit=_('bytes'))
316 316 yield chunk
317 317
318 318 repo.ui.progress(_('bundle'), None)
319 319
320 320 return requirements, gen()
321 321
322 322 def consumev1(repo, fp, filecount, bytecount):
323 323 """Apply the contents from version 1 of a streaming clone file handle.
324 324
325 325 This takes the output from "stream_out" and applies it to the specified
326 326 repository.
327 327
328 328 Like "stream_out," the status line added by the wire protocol is not
329 329 handled by this function.
330 330 """
331 331 with repo.lock():
332 332 repo.ui.status(_('%d files to transfer, %s of data\n') %
333 333 (filecount, util.bytecount(bytecount)))
334 334 handled_bytes = 0
335 335 repo.ui.progress(_('clone'), 0, total=bytecount, unit=_('bytes'))
336 336 start = util.timer()
337 337
338 338 # TODO: get rid of (potential) inconsistency
339 339 #
340 340 # If transaction is started and any @filecache property is
341 341 # changed at this point, it causes inconsistency between
342 342 # in-memory cached property and streamclone-ed file on the
343 343 # disk. Nested transaction prevents transaction scope "clone"
344 344 # below from writing in-memory changes out at the end of it,
345 345 # even though in-memory changes are discarded at the end of it
346 346 # regardless of transaction nesting.
347 347 #
348 348 # But transaction nesting can't be simply prohibited, because
349 349 # nesting occurs also in ordinary case (e.g. enabling
350 350 # clonebundles).
351 351
352 352 with repo.transaction('clone'):
353 353 with repo.svfs.backgroundclosing(repo.ui, expectedcount=filecount):
354 354 for i in xrange(filecount):
355 355 # XXX doesn't support '\n' or '\r' in filenames
356 356 l = fp.readline()
357 357 try:
358 358 name, size = l.split('\0', 1)
359 359 size = int(size)
360 360 except (ValueError, TypeError):
361 361 raise error.ResponseError(
362 362 _('unexpected response from remote server:'), l)
363 363 if repo.ui.debugflag:
364 364 repo.ui.debug('adding %s (%s)\n' %
365 365 (name, util.bytecount(size)))
366 366 # for backwards compat, name was partially encoded
367 367 path = store.decodedir(name)
368 368 with repo.svfs(path, 'w', backgroundclose=True) as ofp:
369 369 for chunk in util.filechunkiter(fp, limit=size):
370 370 handled_bytes += len(chunk)
371 371 repo.ui.progress(_('clone'), handled_bytes,
372 372 total=bytecount, unit=_('bytes'))
373 373 ofp.write(chunk)
374 374
375 375 # force @filecache properties to be reloaded from
376 376 # streamclone-ed file at next access
377 377 repo.invalidate(clearfilecache=True)
378 378
379 379 elapsed = util.timer() - start
380 380 if elapsed <= 0:
381 381 elapsed = 0.001
382 382 repo.ui.progress(_('clone'), None)
383 383 repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
384 384 (util.bytecount(bytecount), elapsed,
385 385 util.bytecount(bytecount / elapsed)))
386 386
387 387 def readbundle1header(fp):
388 388 compression = fp.read(2)
389 389 if compression != 'UN':
390 390 raise error.Abort(_('only uncompressed stream clone bundles are '
391 391 'supported; got %s') % compression)
392 392
393 393 filecount, bytecount = struct.unpack('>QQ', fp.read(16))
394 394 requireslen = struct.unpack('>H', fp.read(2))[0]
395 395 requires = fp.read(requireslen)
396 396
397 397 if not requires.endswith('\0'):
398 398 raise error.Abort(_('malformed stream clone bundle: '
399 399 'requirements not properly encoded'))
400 400
401 401 requirements = set(requires.rstrip('\0').split(','))
402 402
403 403 return filecount, bytecount, requirements
404 404
405 405 def applybundlev1(repo, fp):
406 406 """Apply the content from a stream clone bundle version 1.
407 407
408 408 We assume the 4 byte header has been read and validated and the file handle
409 409 is at the 2 byte compression identifier.
410 410 """
411 411 if len(repo):
412 412 raise error.Abort(_('cannot apply stream clone bundle on non-empty '
413 413 'repo'))
414 414
415 415 filecount, bytecount, requirements = readbundle1header(fp)
416 416 missingreqs = requirements - repo.supportedformats
417 417 if missingreqs:
418 418 raise error.Abort(_('unable to apply stream clone: '
419 419 'unsupported format: %s') %
420 420 ', '.join(sorted(missingreqs)))
421 421
422 422 consumev1(repo, fp, filecount, bytecount)
423 423
424 424 class streamcloneapplier(object):
425 425 """Class to manage applying streaming clone bundles.
426 426
427 427 We need to wrap ``applybundlev1()`` in a dedicated type to enable bundle
428 428 readers to perform bundle type-specific functionality.
429 429 """
430 430 def __init__(self, fh):
431 431 self._fh = fh
432 432
433 433 def apply(self, repo):
434 434 return applybundlev1(repo, self._fh)
435 435
436 436 # type of file to stream
437 437 _fileappend = 0 # append only file
438 438 _filefull = 1 # full snapshot file
439 439
440 440 # Source of the file
441 441 _srcstore = 's' # store (svfs)
442 442 _srccache = 'c' # cache (cache)
443 443
444 444 # This is it's own function so extensions can override it.
445 445 def _walkstreamfullstorefiles(repo):
446 446 """list snapshot file from the store"""
447 447 fnames = []
448 448 if not repo.publishing():
449 449 fnames.append('phaseroots')
450 450 return fnames
451 451
452 452 def _filterfull(entry, copy, vfsmap):
453 453 """actually copy the snapshot files"""
454 454 src, name, ftype, data = entry
455 455 if ftype != _filefull:
456 456 return entry
457 457 return (src, name, ftype, copy(vfsmap[src].join(name)))
458 458
459 459 @contextlib.contextmanager
460 460 def maketempcopies():
461 461 """return a function to temporary copy file"""
462 462 files = []
463 463 try:
464 464 def copy(src):
465 465 fd, dst = tempfile.mkstemp()
466 466 os.close(fd)
467 467 files.append(dst)
468 468 util.copyfiles(src, dst, hardlink=True)
469 469 return dst
470 470 yield copy
471 471 finally:
472 472 for tmp in files:
473 473 util.tryunlink(tmp)
474 474
475 475 def _makemap(repo):
476 476 """make a (src -> vfs) map for the repo"""
477 477 vfsmap = {
478 478 _srcstore: repo.svfs,
479 479 _srccache: repo.cachevfs,
480 480 }
481 481 # we keep repo.vfs out of the on purpose, ther are too many danger there
482 482 # (eg: .hg/hgrc)
483 483 assert repo.vfs not in vfsmap.values()
484 484
485 485 return vfsmap
486 486
487 487 def _emit2(repo, entries, totalfilesize):
488 488 """actually emit the stream bundle"""
489 489 vfsmap = _makemap(repo)
490 490 progress = repo.ui.progress
491 491 progress(_('bundle'), 0, total=totalfilesize, unit=_('bytes'))
492 492 with maketempcopies() as copy:
493 493 try:
494 494 # copy is delayed until we are in the try
495 495 entries = [_filterfull(e, copy, vfsmap) for e in entries]
496 496 yield None # this release the lock on the repository
497 497 seen = 0
498 498
499 499 for src, name, ftype, data in entries:
500 500 vfs = vfsmap[src]
501 501 yield src
502 502 yield util.uvarintencode(len(name))
503 503 if ftype == _fileappend:
504 504 fp = vfs(name)
505 505 size = data
506 506 elif ftype == _filefull:
507 507 fp = open(data, 'rb')
508 508 size = util.fstat(fp).st_size
509 509 try:
510 510 yield util.uvarintencode(size)
511 511 yield name
512 512 if size <= 65536:
513 513 chunks = (fp.read(size),)
514 514 else:
515 515 chunks = util.filechunkiter(fp, limit=size)
516 516 for chunk in chunks:
517 517 seen += len(chunk)
518 518 progress(_('bundle'), seen, total=totalfilesize,
519 519 unit=_('bytes'))
520 520 yield chunk
521 521 finally:
522 522 fp.close()
523 523 finally:
524 524 progress(_('bundle'), None)
525 525
526 526 def generatev2(repo):
527 527 """Emit content for version 2 of a streaming clone.
528 528
529 529 the data stream consists the following entries:
530 530 1) A char representing the file destination (eg: store or cache)
531 531 2) A varint containing the length of the filename
532 532 3) A varint containing the length of file data
533 533 4) N bytes containing the filename (the internal, store-agnostic form)
534 534 5) N bytes containing the file data
535 535
536 536 Returns a 3-tuple of (file count, file size, data iterator).
537 537 """
538 538
539 539 with repo.lock():
540 540
541 541 entries = []
542 542 totalfilesize = 0
543 543
544 544 repo.ui.debug('scanning\n')
545 545 for name, ename, size in _walkstreamfiles(repo):
546 546 if size:
547 547 entries.append((_srcstore, name, _fileappend, size))
548 548 totalfilesize += size
549 549 for name in _walkstreamfullstorefiles(repo):
550 550 if repo.svfs.exists(name):
551 551 totalfilesize += repo.svfs.lstat(name).st_size
552 552 entries.append((_srcstore, name, _filefull, None))
553 553 for name in cacheutil.cachetocopy(repo):
554 554 if repo.cachevfs.exists(name):
555 555 totalfilesize += repo.cachevfs.lstat(name).st_size
556 556 entries.append((_srccache, name, _filefull, None))
557 557
558 558 chunks = _emit2(repo, entries, totalfilesize)
559 559 first = next(chunks)
560 560 assert first is None
561 561
562 562 return len(entries), totalfilesize, chunks
563 563
564 564 @contextlib.contextmanager
565 565 def nested(*ctxs):
566 566 with warnings.catch_warnings():
567 567 # For some reason, Python decided 'nested' was deprecated without
568 568 # replacement. They officially advertised for filtering the deprecation
569 569 # warning for people who actually need the feature.
570 570 warnings.filterwarnings("ignore",category=DeprecationWarning)
571 571 with contextlib.nested(*ctxs):
572 572 yield
573 573
574 574 def consumev2(repo, fp, filecount, filesize):
575 575 """Apply the contents from a version 2 streaming clone.
576 576
577 577 Data is read from an object that only needs to provide a ``read(size)``
578 578 method.
579 579 """
580 580 with repo.lock():
581 581 repo.ui.status(_('%d files to transfer, %s of data\n') %
582 582 (filecount, util.bytecount(filesize)))
583 583
584 584 start = util.timer()
585 585 handledbytes = 0
586 586 progress = repo.ui.progress
587 587
588 588 progress(_('clone'), handledbytes, total=filesize, unit=_('bytes'))
589 589
590 590 vfsmap = _makemap(repo)
591 591
592 592 with repo.transaction('clone'):
593 593 ctxs = (vfs.backgroundclosing(repo.ui)
594 594 for vfs in vfsmap.values())
595 595 with nested(*ctxs):
596 596 for i in range(filecount):
597 597 src = util.readexactly(fp, 1)
598 598 vfs = vfsmap[src]
599 599 namelen = util.uvarintdecodestream(fp)
600 600 datalen = util.uvarintdecodestream(fp)
601 601
602 602 name = util.readexactly(fp, namelen)
603 603
604 604 if repo.ui.debugflag:
605 605 repo.ui.debug('adding [%s] %s (%s)\n' %
606 606 (src, name, util.bytecount(datalen)))
607 607
608 608 with vfs(name, 'w') as ofp:
609 609 for chunk in util.filechunkiter(fp, limit=datalen):
610 610 handledbytes += len(chunk)
611 611 progress(_('clone'), handledbytes, total=filesize,
612 612 unit=_('bytes'))
613 613 ofp.write(chunk)
614 614
615 615 # force @filecache properties to be reloaded from
616 616 # streamclone-ed file at next access
617 617 repo.invalidate(clearfilecache=True)
618 618
619 619 elapsed = util.timer() - start
620 620 if elapsed <= 0:
621 621 elapsed = 0.001
622 622 progress(_('clone'), None)
623 623 repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
624 624 (util.bytecount(handledbytes), elapsed,
625 625 util.bytecount(handledbytes / elapsed)))
626 626
627 627 def applybundlev2(repo, fp, filecount, filesize, requirements):
628 628 missingreqs = [r for r in requirements if r not in repo.supported]
629 629 if missingreqs:
630 630 raise error.Abort(_('unable to apply stream clone: '
631 631 'unsupported format: %s') %
632 632 ', '.join(sorted(missingreqs)))
633 633
634 634 consumev2(repo, fp, filecount, filesize)
635
636 # new requirements = old non-format requirements +
637 # new format-related remote requirements
638 # requirements from the streamed-in repository
639 repo.requirements = set(requirements) | (
640 repo.requirements - repo.supportedformats)
641 repo._applyopenerreqs()
642 repo._writerequirements()
General Comments 0
You need to be logged in to leave comments. Login now