##// END OF EJS Templates
bundle2: move exception handling into part iterator...
Durham Goode -
r34151:21c2df59 default
parent child Browse files
Show More
@@ -1,1919 +1,1917
1 1 # bundle2.py - generic container format to transmit arbitrary data.
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 """Handling of the new bundle2 format
8 8
9 9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 11 that will be handed to and processed by the application layer.
12 12
13 13
14 14 General format architecture
15 15 ===========================
16 16
17 17 The format is architectured as follow
18 18
19 19 - magic string
20 20 - stream level parameters
21 21 - payload parts (any number)
22 22 - end of stream marker.
23 23
24 24 the Binary format
25 25 ============================
26 26
27 27 All numbers are unsigned and big-endian.
28 28
29 29 stream level parameters
30 30 ------------------------
31 31
32 32 Binary format is as follow
33 33
34 34 :params size: int32
35 35
36 36 The total number of Bytes used by the parameters
37 37
38 38 :params value: arbitrary number of Bytes
39 39
40 40 A blob of `params size` containing the serialized version of all stream level
41 41 parameters.
42 42
43 43 The blob contains a space separated list of parameters. Parameters with value
44 44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45 45
46 46 Empty name are obviously forbidden.
47 47
48 48 Name MUST start with a letter. If this first letter is lower case, the
49 49 parameter is advisory and can be safely ignored. However when the first
50 50 letter is capital, the parameter is mandatory and the bundling process MUST
51 51 stop if he is not able to proceed it.
52 52
53 53 Stream parameters use a simple textual format for two main reasons:
54 54
55 55 - Stream level parameters should remain simple and we want to discourage any
56 56 crazy usage.
57 57 - Textual data allow easy human inspection of a bundle2 header in case of
58 58 troubles.
59 59
60 60 Any Applicative level options MUST go into a bundle2 part instead.
61 61
62 62 Payload part
63 63 ------------------------
64 64
65 65 Binary format is as follow
66 66
67 67 :header size: int32
68 68
69 69 The total number of Bytes used by the part header. When the header is empty
70 70 (size = 0) this is interpreted as the end of stream marker.
71 71
72 72 :header:
73 73
74 74 The header defines how to interpret the part. It contains two piece of
75 75 data: the part type, and the part parameters.
76 76
77 77 The part type is used to route an application level handler, that can
78 78 interpret payload.
79 79
80 80 Part parameters are passed to the application level handler. They are
81 81 meant to convey information that will help the application level object to
82 82 interpret the part payload.
83 83
84 84 The binary format of the header is has follow
85 85
86 86 :typesize: (one byte)
87 87
88 88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89 89
90 90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 91 to this part.
92 92
93 93 :parameters:
94 94
95 95 Part's parameter may have arbitrary content, the binary structure is::
96 96
97 97 <mandatory-count><advisory-count><param-sizes><param-data>
98 98
99 99 :mandatory-count: 1 byte, number of mandatory parameters
100 100
101 101 :advisory-count: 1 byte, number of advisory parameters
102 102
103 103 :param-sizes:
104 104
105 105 N couple of bytes, where N is the total number of parameters. Each
106 106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107 107
108 108 :param-data:
109 109
110 110 A blob of bytes from which each parameter key and value can be
111 111 retrieved using the list of size couples stored in the previous
112 112 field.
113 113
114 114 Mandatory parameters comes first, then the advisory ones.
115 115
116 116 Each parameter's key MUST be unique within the part.
117 117
118 118 :payload:
119 119
120 120 payload is a series of `<chunksize><chunkdata>`.
121 121
122 122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124 124
125 125 The current implementation always produces either zero or one chunk.
126 126 This is an implementation limitation that will ultimately be lifted.
127 127
128 128 `chunksize` can be negative to trigger special case processing. No such
129 129 processing is in place yet.
130 130
131 131 Bundle processing
132 132 ============================
133 133
134 134 Each part is processed in order using a "part handler". Handler are registered
135 135 for a certain part type.
136 136
137 137 The matching of a part to its handler is case insensitive. The case of the
138 138 part type is used to know if a part is mandatory or advisory. If the Part type
139 139 contains any uppercase char it is considered mandatory. When no handler is
140 140 known for a Mandatory part, the process is aborted and an exception is raised.
141 141 If the part is advisory and no handler is known, the part is ignored. When the
142 142 process is aborted, the full bundle is still read from the stream to keep the
143 143 channel usable. But none of the part read from an abort are processed. In the
144 144 future, dropping the stream may become an option for channel we do not care to
145 145 preserve.
146 146 """
147 147
148 148 from __future__ import absolute_import, division
149 149
150 150 import errno
151 151 import re
152 152 import string
153 153 import struct
154 154 import sys
155 155
156 156 from .i18n import _
157 157 from . import (
158 158 changegroup,
159 159 error,
160 160 obsolete,
161 161 phases,
162 162 pushkey,
163 163 pycompat,
164 164 tags,
165 165 url,
166 166 util,
167 167 )
168 168
169 169 urlerr = util.urlerr
170 170 urlreq = util.urlreq
171 171
172 172 _pack = struct.pack
173 173 _unpack = struct.unpack
174 174
175 175 _fstreamparamsize = '>i'
176 176 _fpartheadersize = '>i'
177 177 _fparttypesize = '>B'
178 178 _fpartid = '>I'
179 179 _fpayloadsize = '>i'
180 180 _fpartparamcount = '>BB'
181 181
182 182 _fphasesentry = '>i20s'
183 183
184 184 preferedchunksize = 4096
185 185
186 186 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
187 187
188 188 def outdebug(ui, message):
189 189 """debug regarding output stream (bundling)"""
190 190 if ui.configbool('devel', 'bundle2.debug'):
191 191 ui.debug('bundle2-output: %s\n' % message)
192 192
193 193 def indebug(ui, message):
194 194 """debug on input stream (unbundling)"""
195 195 if ui.configbool('devel', 'bundle2.debug'):
196 196 ui.debug('bundle2-input: %s\n' % message)
197 197
198 198 def validateparttype(parttype):
199 199 """raise ValueError if a parttype contains invalid character"""
200 200 if _parttypeforbidden.search(parttype):
201 201 raise ValueError(parttype)
202 202
203 203 def _makefpartparamsizes(nbparams):
204 204 """return a struct format to read part parameter sizes
205 205
206 206 The number parameters is variable so we need to build that format
207 207 dynamically.
208 208 """
209 209 return '>'+('BB'*nbparams)
210 210
211 211 parthandlermapping = {}
212 212
213 213 def parthandler(parttype, params=()):
214 214 """decorator that register a function as a bundle2 part handler
215 215
216 216 eg::
217 217
218 218 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
219 219 def myparttypehandler(...):
220 220 '''process a part of type "my part".'''
221 221 ...
222 222 """
223 223 validateparttype(parttype)
224 224 def _decorator(func):
225 225 lparttype = parttype.lower() # enforce lower case matching.
226 226 assert lparttype not in parthandlermapping
227 227 parthandlermapping[lparttype] = func
228 228 func.params = frozenset(params)
229 229 return func
230 230 return _decorator
231 231
232 232 class unbundlerecords(object):
233 233 """keep record of what happens during and unbundle
234 234
235 235 New records are added using `records.add('cat', obj)`. Where 'cat' is a
236 236 category of record and obj is an arbitrary object.
237 237
238 238 `records['cat']` will return all entries of this category 'cat'.
239 239
240 240 Iterating on the object itself will yield `('category', obj)` tuples
241 241 for all entries.
242 242
243 243 All iterations happens in chronological order.
244 244 """
245 245
246 246 def __init__(self):
247 247 self._categories = {}
248 248 self._sequences = []
249 249 self._replies = {}
250 250
251 251 def add(self, category, entry, inreplyto=None):
252 252 """add a new record of a given category.
253 253
254 254 The entry can then be retrieved in the list returned by
255 255 self['category']."""
256 256 self._categories.setdefault(category, []).append(entry)
257 257 self._sequences.append((category, entry))
258 258 if inreplyto is not None:
259 259 self.getreplies(inreplyto).add(category, entry)
260 260
261 261 def getreplies(self, partid):
262 262 """get the records that are replies to a specific part"""
263 263 return self._replies.setdefault(partid, unbundlerecords())
264 264
265 265 def __getitem__(self, cat):
266 266 return tuple(self._categories.get(cat, ()))
267 267
268 268 def __iter__(self):
269 269 return iter(self._sequences)
270 270
271 271 def __len__(self):
272 272 return len(self._sequences)
273 273
274 274 def __nonzero__(self):
275 275 return bool(self._sequences)
276 276
277 277 __bool__ = __nonzero__
278 278
279 279 class bundleoperation(object):
280 280 """an object that represents a single bundling process
281 281
282 282 Its purpose is to carry unbundle-related objects and states.
283 283
284 284 A new object should be created at the beginning of each bundle processing.
285 285 The object is to be returned by the processing function.
286 286
287 287 The object has very little content now it will ultimately contain:
288 288 * an access to the repo the bundle is applied to,
289 289 * a ui object,
290 290 * a way to retrieve a transaction to add changes to the repo,
291 291 * a way to record the result of processing each part,
292 292 * a way to construct a bundle response when applicable.
293 293 """
294 294
295 295 def __init__(self, repo, transactiongetter, captureoutput=True):
296 296 self.repo = repo
297 297 self.ui = repo.ui
298 298 self.records = unbundlerecords()
299 299 self.reply = None
300 300 self.captureoutput = captureoutput
301 301 self.hookargs = {}
302 302 self._gettransaction = transactiongetter
303 303
304 304 def gettransaction(self):
305 305 transaction = self._gettransaction()
306 306
307 307 if self.hookargs:
308 308 # the ones added to the transaction supercede those added
309 309 # to the operation.
310 310 self.hookargs.update(transaction.hookargs)
311 311 transaction.hookargs = self.hookargs
312 312
313 313 # mark the hookargs as flushed. further attempts to add to
314 314 # hookargs will result in an abort.
315 315 self.hookargs = None
316 316
317 317 return transaction
318 318
319 319 def addhookargs(self, hookargs):
320 320 if self.hookargs is None:
321 321 raise error.ProgrammingError('attempted to add hookargs to '
322 322 'operation after transaction started')
323 323 self.hookargs.update(hookargs)
324 324
325 325 class TransactionUnavailable(RuntimeError):
326 326 pass
327 327
328 328 def _notransaction():
329 329 """default method to get a transaction while processing a bundle
330 330
331 331 Raise an exception to highlight the fact that no transaction was expected
332 332 to be created"""
333 333 raise TransactionUnavailable()
334 334
335 335 def applybundle(repo, unbundler, tr, source=None, url=None, **kwargs):
336 336 # transform me into unbundler.apply() as soon as the freeze is lifted
337 337 if isinstance(unbundler, unbundle20):
338 338 tr.hookargs['bundle2'] = '1'
339 339 if source is not None and 'source' not in tr.hookargs:
340 340 tr.hookargs['source'] = source
341 341 if url is not None and 'url' not in tr.hookargs:
342 342 tr.hookargs['url'] = url
343 343 return processbundle(repo, unbundler, lambda: tr)
344 344 else:
345 345 # the transactiongetter won't be used, but we might as well set it
346 346 op = bundleoperation(repo, lambda: tr)
347 347 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
348 348 return op
349 349
350 350 class partiterator(object):
351 def __init__(self, repo, unbundler):
351 def __init__(self, repo, op, unbundler):
352 352 self.repo = repo
353 self.op = op
353 354 self.unbundler = unbundler
354 355 self.iterator = None
355 356 self.count = 0
356 357
357 358 def __enter__(self):
358 359 def func():
359 360 itr = enumerate(self.unbundler.iterparts())
360 361 for count, p in itr:
361 362 self.count = count
362 363 yield p
363 364 self.iterator = func()
364 365 return self.iterator
365 366
366 def __exit__(self, type, value, tb):
367 def __exit__(self, type, exc, tb):
367 368 if not self.iterator:
368 369 return
369 370
371 if exc:
372 # Any exceptions seeking to the end of the bundle at this point are
373 # almost certainly related to the underlying stream being bad.
374 # And, chances are that the exception we're handling is related to
375 # getting in that bad state. So, we swallow the seeking error and
376 # re-raise the original error.
377 seekerror = False
378 try:
379 for part in self.iterator:
380 # consume the bundle content
381 part.seek(0, 2)
382 except Exception:
383 seekerror = True
384
385 # Small hack to let caller code distinguish exceptions from bundle2
386 # processing from processing the old format. This is mostly needed
387 # to handle different return codes to unbundle according to the type
388 # of bundle. We should probably clean up or drop this return code
389 # craziness in a future version.
390 exc.duringunbundle2 = True
391 salvaged = []
392 replycaps = None
393 if self.op.reply is not None:
394 salvaged = self.op.reply.salvageoutput()
395 replycaps = self.op.reply.capabilities
396 exc._replycaps = replycaps
397 exc._bundle2salvagedoutput = salvaged
398
399 # Re-raising from a variable loses the original stack. So only use
400 # that form if we need to.
401 if seekerror:
402 raise exc
403
370 404 self.repo.ui.debug('bundle2-input-bundle: %i parts total\n' %
371 405 self.count)
372 406
373 407 def processbundle(repo, unbundler, transactiongetter=None, op=None):
374 408 """This function process a bundle, apply effect to/from a repo
375 409
376 410 It iterates over each part then searches for and uses the proper handling
377 411 code to process the part. Parts are processed in order.
378 412
379 413 Unknown Mandatory part will abort the process.
380 414
381 415 It is temporarily possible to provide a prebuilt bundleoperation to the
382 416 function. This is used to ensure output is properly propagated in case of
383 417 an error during the unbundling. This output capturing part will likely be
384 418 reworked and this ability will probably go away in the process.
385 419 """
386 420 if op is None:
387 421 if transactiongetter is None:
388 422 transactiongetter = _notransaction
389 423 op = bundleoperation(repo, transactiongetter)
390 424 # todo:
391 425 # - replace this is a init function soon.
392 426 # - exception catching
393 427 unbundler.params
394 428 if repo.ui.debugflag:
395 429 msg = ['bundle2-input-bundle:']
396 430 if unbundler.params:
397 431 msg.append(' %i params' % len(unbundler.params))
398 432 if op._gettransaction is None or op._gettransaction is _notransaction:
399 433 msg.append(' no-transaction')
400 434 else:
401 435 msg.append(' with-transaction')
402 436 msg.append('\n')
403 437 repo.ui.debug(''.join(msg))
404 438
405 with partiterator(repo, unbundler) as parts:
406 part = None
407 try:
439 with partiterator(repo, op, unbundler) as parts:
408 440 for part in parts:
409 441 _processpart(op, part)
410 except Exception as exc:
411 # Any exceptions seeking to the end of the bundle at this point are
412 # almost certainly related to the underlying stream being bad.
413 # And, chances are that the exception we're handling is related to
414 # getting in that bad state. So, we swallow the seeking error and
415 # re-raise the original error.
416 seekerror = False
417 try:
418 for part in parts:
419 # consume the bundle content
420 part.seek(0, 2)
421 except Exception:
422 seekerror = True
423
424 # Small hack to let caller code distinguish exceptions from bundle2
425 # processing from processing the old format. This is mostly needed
426 # to handle different return codes to unbundle according to the type
427 # of bundle. We should probably clean up or drop this return code
428 # craziness in a future version.
429 exc.duringunbundle2 = True
430 salvaged = []
431 replycaps = None
432 if op.reply is not None:
433 salvaged = op.reply.salvageoutput()
434 replycaps = op.reply.capabilities
435 exc._replycaps = replycaps
436 exc._bundle2salvagedoutput = salvaged
437
438 # Re-raising from a variable loses the original stack. So only use
439 # that form if we need to.
440 if seekerror:
441 raise exc
442 else:
443 raise
444 442
445 443 return op
446 444
447 445 def _processchangegroup(op, cg, tr, source, url, **kwargs):
448 446 ret = cg.apply(op.repo, tr, source, url, **kwargs)
449 447 op.records.add('changegroup', {
450 448 'return': ret,
451 449 })
452 450 return ret
453 451
454 452 def _processpart(op, part):
455 453 """process a single part from a bundle
456 454
457 455 The part is guaranteed to have been fully consumed when the function exits
458 456 (even if an exception is raised)."""
459 457 status = 'unknown' # used by debug output
460 458 hardabort = False
461 459 try:
462 460 try:
463 461 handler = parthandlermapping.get(part.type)
464 462 if handler is None:
465 463 status = 'unsupported-type'
466 464 raise error.BundleUnknownFeatureError(parttype=part.type)
467 465 indebug(op.ui, 'found a handler for part %r' % part.type)
468 466 unknownparams = part.mandatorykeys - handler.params
469 467 if unknownparams:
470 468 unknownparams = list(unknownparams)
471 469 unknownparams.sort()
472 470 status = 'unsupported-params (%s)' % unknownparams
473 471 raise error.BundleUnknownFeatureError(parttype=part.type,
474 472 params=unknownparams)
475 473 status = 'supported'
476 474 except error.BundleUnknownFeatureError as exc:
477 475 if part.mandatory: # mandatory parts
478 476 raise
479 477 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
480 478 return # skip to part processing
481 479 finally:
482 480 if op.ui.debugflag:
483 481 msg = ['bundle2-input-part: "%s"' % part.type]
484 482 if not part.mandatory:
485 483 msg.append(' (advisory)')
486 484 nbmp = len(part.mandatorykeys)
487 485 nbap = len(part.params) - nbmp
488 486 if nbmp or nbap:
489 487 msg.append(' (params:')
490 488 if nbmp:
491 489 msg.append(' %i mandatory' % nbmp)
492 490 if nbap:
493 491 msg.append(' %i advisory' % nbmp)
494 492 msg.append(')')
495 493 msg.append(' %s\n' % status)
496 494 op.ui.debug(''.join(msg))
497 495
498 496 # handler is called outside the above try block so that we don't
499 497 # risk catching KeyErrors from anything other than the
500 498 # parthandlermapping lookup (any KeyError raised by handler()
501 499 # itself represents a defect of a different variety).
502 500 output = None
503 501 if op.captureoutput and op.reply is not None:
504 502 op.ui.pushbuffer(error=True, subproc=True)
505 503 output = ''
506 504 try:
507 505 handler(op, part)
508 506 finally:
509 507 if output is not None:
510 508 output = op.ui.popbuffer()
511 509 if output:
512 510 outpart = op.reply.newpart('output', data=output,
513 511 mandatory=False)
514 512 outpart.addparam(
515 513 'in-reply-to', pycompat.bytestr(part.id), mandatory=False)
516 514 # If exiting or interrupted, do not attempt to seek the stream in the
517 515 # finally block below. This makes abort faster.
518 516 except (SystemExit, KeyboardInterrupt):
519 517 hardabort = True
520 518 raise
521 519 finally:
522 520 # consume the part content to not corrupt the stream.
523 521 if not hardabort:
524 522 part.seek(0, 2)
525 523
526 524
527 525 def decodecaps(blob):
528 526 """decode a bundle2 caps bytes blob into a dictionary
529 527
530 528 The blob is a list of capabilities (one per line)
531 529 Capabilities may have values using a line of the form::
532 530
533 531 capability=value1,value2,value3
534 532
535 533 The values are always a list."""
536 534 caps = {}
537 535 for line in blob.splitlines():
538 536 if not line:
539 537 continue
540 538 if '=' not in line:
541 539 key, vals = line, ()
542 540 else:
543 541 key, vals = line.split('=', 1)
544 542 vals = vals.split(',')
545 543 key = urlreq.unquote(key)
546 544 vals = [urlreq.unquote(v) for v in vals]
547 545 caps[key] = vals
548 546 return caps
549 547
550 548 def encodecaps(caps):
551 549 """encode a bundle2 caps dictionary into a bytes blob"""
552 550 chunks = []
553 551 for ca in sorted(caps):
554 552 vals = caps[ca]
555 553 ca = urlreq.quote(ca)
556 554 vals = [urlreq.quote(v) for v in vals]
557 555 if vals:
558 556 ca = "%s=%s" % (ca, ','.join(vals))
559 557 chunks.append(ca)
560 558 return '\n'.join(chunks)
561 559
562 560 bundletypes = {
563 561 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
564 562 # since the unification ssh accepts a header but there
565 563 # is no capability signaling it.
566 564 "HG20": (), # special-cased below
567 565 "HG10UN": ("HG10UN", 'UN'),
568 566 "HG10BZ": ("HG10", 'BZ'),
569 567 "HG10GZ": ("HG10GZ", 'GZ'),
570 568 }
571 569
572 570 # hgweb uses this list to communicate its preferred type
573 571 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
574 572
575 573 class bundle20(object):
576 574 """represent an outgoing bundle2 container
577 575
578 576 Use the `addparam` method to add stream level parameter. and `newpart` to
579 577 populate it. Then call `getchunks` to retrieve all the binary chunks of
580 578 data that compose the bundle2 container."""
581 579
582 580 _magicstring = 'HG20'
583 581
584 582 def __init__(self, ui, capabilities=()):
585 583 self.ui = ui
586 584 self._params = []
587 585 self._parts = []
588 586 self.capabilities = dict(capabilities)
589 587 self._compengine = util.compengines.forbundletype('UN')
590 588 self._compopts = None
591 589
592 590 def setcompression(self, alg, compopts=None):
593 591 """setup core part compression to <alg>"""
594 592 if alg in (None, 'UN'):
595 593 return
596 594 assert not any(n.lower() == 'compression' for n, v in self._params)
597 595 self.addparam('Compression', alg)
598 596 self._compengine = util.compengines.forbundletype(alg)
599 597 self._compopts = compopts
600 598
601 599 @property
602 600 def nbparts(self):
603 601 """total number of parts added to the bundler"""
604 602 return len(self._parts)
605 603
606 604 # methods used to defines the bundle2 content
607 605 def addparam(self, name, value=None):
608 606 """add a stream level parameter"""
609 607 if not name:
610 608 raise ValueError('empty parameter name')
611 609 if name[0] not in pycompat.bytestr(string.ascii_letters):
612 610 raise ValueError('non letter first character: %r' % name)
613 611 self._params.append((name, value))
614 612
615 613 def addpart(self, part):
616 614 """add a new part to the bundle2 container
617 615
618 616 Parts contains the actual applicative payload."""
619 617 assert part.id is None
620 618 part.id = len(self._parts) # very cheap counter
621 619 self._parts.append(part)
622 620
623 621 def newpart(self, typeid, *args, **kwargs):
624 622 """create a new part and add it to the containers
625 623
626 624 As the part is directly added to the containers. For now, this means
627 625 that any failure to properly initialize the part after calling
628 626 ``newpart`` should result in a failure of the whole bundling process.
629 627
630 628 You can still fall back to manually create and add if you need better
631 629 control."""
632 630 part = bundlepart(typeid, *args, **kwargs)
633 631 self.addpart(part)
634 632 return part
635 633
636 634 # methods used to generate the bundle2 stream
637 635 def getchunks(self):
638 636 if self.ui.debugflag:
639 637 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
640 638 if self._params:
641 639 msg.append(' (%i params)' % len(self._params))
642 640 msg.append(' %i parts total\n' % len(self._parts))
643 641 self.ui.debug(''.join(msg))
644 642 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
645 643 yield self._magicstring
646 644 param = self._paramchunk()
647 645 outdebug(self.ui, 'bundle parameter: %s' % param)
648 646 yield _pack(_fstreamparamsize, len(param))
649 647 if param:
650 648 yield param
651 649 for chunk in self._compengine.compressstream(self._getcorechunk(),
652 650 self._compopts):
653 651 yield chunk
654 652
655 653 def _paramchunk(self):
656 654 """return a encoded version of all stream parameters"""
657 655 blocks = []
658 656 for par, value in self._params:
659 657 par = urlreq.quote(par)
660 658 if value is not None:
661 659 value = urlreq.quote(value)
662 660 par = '%s=%s' % (par, value)
663 661 blocks.append(par)
664 662 return ' '.join(blocks)
665 663
666 664 def _getcorechunk(self):
667 665 """yield chunk for the core part of the bundle
668 666
669 667 (all but headers and parameters)"""
670 668 outdebug(self.ui, 'start of parts')
671 669 for part in self._parts:
672 670 outdebug(self.ui, 'bundle part: "%s"' % part.type)
673 671 for chunk in part.getchunks(ui=self.ui):
674 672 yield chunk
675 673 outdebug(self.ui, 'end of bundle')
676 674 yield _pack(_fpartheadersize, 0)
677 675
678 676
679 677 def salvageoutput(self):
680 678 """return a list with a copy of all output parts in the bundle
681 679
682 680 This is meant to be used during error handling to make sure we preserve
683 681 server output"""
684 682 salvaged = []
685 683 for part in self._parts:
686 684 if part.type.startswith('output'):
687 685 salvaged.append(part.copy())
688 686 return salvaged
689 687
690 688
691 689 class unpackermixin(object):
692 690 """A mixin to extract bytes and struct data from a stream"""
693 691
694 692 def __init__(self, fp):
695 693 self._fp = fp
696 694
697 695 def _unpack(self, format):
698 696 """unpack this struct format from the stream
699 697
700 698 This method is meant for internal usage by the bundle2 protocol only.
701 699 They directly manipulate the low level stream including bundle2 level
702 700 instruction.
703 701
704 702 Do not use it to implement higher-level logic or methods."""
705 703 data = self._readexact(struct.calcsize(format))
706 704 return _unpack(format, data)
707 705
708 706 def _readexact(self, size):
709 707 """read exactly <size> bytes from the stream
710 708
711 709 This method is meant for internal usage by the bundle2 protocol only.
712 710 They directly manipulate the low level stream including bundle2 level
713 711 instruction.
714 712
715 713 Do not use it to implement higher-level logic or methods."""
716 714 return changegroup.readexactly(self._fp, size)
717 715
718 716 def getunbundler(ui, fp, magicstring=None):
719 717 """return a valid unbundler object for a given magicstring"""
720 718 if magicstring is None:
721 719 magicstring = changegroup.readexactly(fp, 4)
722 720 magic, version = magicstring[0:2], magicstring[2:4]
723 721 if magic != 'HG':
724 722 ui.debug(
725 723 "error: invalid magic: %r (version %r), should be 'HG'\n"
726 724 % (magic, version))
727 725 raise error.Abort(_('not a Mercurial bundle'))
728 726 unbundlerclass = formatmap.get(version)
729 727 if unbundlerclass is None:
730 728 raise error.Abort(_('unknown bundle version %s') % version)
731 729 unbundler = unbundlerclass(ui, fp)
732 730 indebug(ui, 'start processing of %s stream' % magicstring)
733 731 return unbundler
734 732
735 733 class unbundle20(unpackermixin):
736 734 """interpret a bundle2 stream
737 735
738 736 This class is fed with a binary stream and yields parts through its
739 737 `iterparts` methods."""
740 738
741 739 _magicstring = 'HG20'
742 740
743 741 def __init__(self, ui, fp):
744 742 """If header is specified, we do not read it out of the stream."""
745 743 self.ui = ui
746 744 self._compengine = util.compengines.forbundletype('UN')
747 745 self._compressed = None
748 746 super(unbundle20, self).__init__(fp)
749 747
750 748 @util.propertycache
751 749 def params(self):
752 750 """dictionary of stream level parameters"""
753 751 indebug(self.ui, 'reading bundle2 stream parameters')
754 752 params = {}
755 753 paramssize = self._unpack(_fstreamparamsize)[0]
756 754 if paramssize < 0:
757 755 raise error.BundleValueError('negative bundle param size: %i'
758 756 % paramssize)
759 757 if paramssize:
760 758 params = self._readexact(paramssize)
761 759 params = self._processallparams(params)
762 760 return params
763 761
764 762 def _processallparams(self, paramsblock):
765 763 """"""
766 764 params = util.sortdict()
767 765 for p in paramsblock.split(' '):
768 766 p = p.split('=', 1)
769 767 p = [urlreq.unquote(i) for i in p]
770 768 if len(p) < 2:
771 769 p.append(None)
772 770 self._processparam(*p)
773 771 params[p[0]] = p[1]
774 772 return params
775 773
776 774
777 775 def _processparam(self, name, value):
778 776 """process a parameter, applying its effect if needed
779 777
780 778 Parameter starting with a lower case letter are advisory and will be
781 779 ignored when unknown. Those starting with an upper case letter are
782 780 mandatory and will this function will raise a KeyError when unknown.
783 781
784 782 Note: no option are currently supported. Any input will be either
785 783 ignored or failing.
786 784 """
787 785 if not name:
788 786 raise ValueError('empty parameter name')
789 787 if name[0] not in pycompat.bytestr(string.ascii_letters):
790 788 raise ValueError('non letter first character: %r' % name)
791 789 try:
792 790 handler = b2streamparamsmap[name.lower()]
793 791 except KeyError:
794 792 if name[0].islower():
795 793 indebug(self.ui, "ignoring unknown parameter %r" % name)
796 794 else:
797 795 raise error.BundleUnknownFeatureError(params=(name,))
798 796 else:
799 797 handler(self, name, value)
800 798
801 799 def _forwardchunks(self):
802 800 """utility to transfer a bundle2 as binary
803 801
804 802 This is made necessary by the fact the 'getbundle' command over 'ssh'
805 803 have no way to know then the reply end, relying on the bundle to be
806 804 interpreted to know its end. This is terrible and we are sorry, but we
807 805 needed to move forward to get general delta enabled.
808 806 """
809 807 yield self._magicstring
810 808 assert 'params' not in vars(self)
811 809 paramssize = self._unpack(_fstreamparamsize)[0]
812 810 if paramssize < 0:
813 811 raise error.BundleValueError('negative bundle param size: %i'
814 812 % paramssize)
815 813 yield _pack(_fstreamparamsize, paramssize)
816 814 if paramssize:
817 815 params = self._readexact(paramssize)
818 816 self._processallparams(params)
819 817 yield params
820 818 assert self._compengine.bundletype == 'UN'
821 819 # From there, payload might need to be decompressed
822 820 self._fp = self._compengine.decompressorreader(self._fp)
823 821 emptycount = 0
824 822 while emptycount < 2:
825 823 # so we can brainlessly loop
826 824 assert _fpartheadersize == _fpayloadsize
827 825 size = self._unpack(_fpartheadersize)[0]
828 826 yield _pack(_fpartheadersize, size)
829 827 if size:
830 828 emptycount = 0
831 829 else:
832 830 emptycount += 1
833 831 continue
834 832 if size == flaginterrupt:
835 833 continue
836 834 elif size < 0:
837 835 raise error.BundleValueError('negative chunk size: %i')
838 836 yield self._readexact(size)
839 837
840 838
841 839 def iterparts(self):
842 840 """yield all parts contained in the stream"""
843 841 # make sure param have been loaded
844 842 self.params
845 843 # From there, payload need to be decompressed
846 844 self._fp = self._compengine.decompressorreader(self._fp)
847 845 indebug(self.ui, 'start extraction of bundle2 parts')
848 846 headerblock = self._readpartheader()
849 847 while headerblock is not None:
850 848 part = unbundlepart(self.ui, headerblock, self._fp)
851 849 yield part
852 850 # Seek to the end of the part to force it's consumption so the next
853 851 # part can be read. But then seek back to the beginning so the
854 852 # code consuming this generator has a part that starts at 0.
855 853 part.seek(0, 2)
856 854 part.seek(0)
857 855 headerblock = self._readpartheader()
858 856 indebug(self.ui, 'end of bundle2 stream')
859 857
860 858 def _readpartheader(self):
861 859 """reads a part header size and return the bytes blob
862 860
863 861 returns None if empty"""
864 862 headersize = self._unpack(_fpartheadersize)[0]
865 863 if headersize < 0:
866 864 raise error.BundleValueError('negative part header size: %i'
867 865 % headersize)
868 866 indebug(self.ui, 'part header size: %i' % headersize)
869 867 if headersize:
870 868 return self._readexact(headersize)
871 869 return None
872 870
873 871 def compressed(self):
874 872 self.params # load params
875 873 return self._compressed
876 874
877 875 def close(self):
878 876 """close underlying file"""
879 877 if util.safehasattr(self._fp, 'close'):
880 878 return self._fp.close()
881 879
882 880 formatmap = {'20': unbundle20}
883 881
884 882 b2streamparamsmap = {}
885 883
886 884 def b2streamparamhandler(name):
887 885 """register a handler for a stream level parameter"""
888 886 def decorator(func):
889 887 assert name not in formatmap
890 888 b2streamparamsmap[name] = func
891 889 return func
892 890 return decorator
893 891
894 892 @b2streamparamhandler('compression')
895 893 def processcompression(unbundler, param, value):
896 894 """read compression parameter and install payload decompression"""
897 895 if value not in util.compengines.supportedbundletypes:
898 896 raise error.BundleUnknownFeatureError(params=(param,),
899 897 values=(value,))
900 898 unbundler._compengine = util.compengines.forbundletype(value)
901 899 if value is not None:
902 900 unbundler._compressed = True
903 901
904 902 class bundlepart(object):
905 903 """A bundle2 part contains application level payload
906 904
907 905 The part `type` is used to route the part to the application level
908 906 handler.
909 907
910 908 The part payload is contained in ``part.data``. It could be raw bytes or a
911 909 generator of byte chunks.
912 910
913 911 You can add parameters to the part using the ``addparam`` method.
914 912 Parameters can be either mandatory (default) or advisory. Remote side
915 913 should be able to safely ignore the advisory ones.
916 914
917 915 Both data and parameters cannot be modified after the generation has begun.
918 916 """
919 917
920 918 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
921 919 data='', mandatory=True):
922 920 validateparttype(parttype)
923 921 self.id = None
924 922 self.type = parttype
925 923 self._data = data
926 924 self._mandatoryparams = list(mandatoryparams)
927 925 self._advisoryparams = list(advisoryparams)
928 926 # checking for duplicated entries
929 927 self._seenparams = set()
930 928 for pname, __ in self._mandatoryparams + self._advisoryparams:
931 929 if pname in self._seenparams:
932 930 raise error.ProgrammingError('duplicated params: %s' % pname)
933 931 self._seenparams.add(pname)
934 932 # status of the part's generation:
935 933 # - None: not started,
936 934 # - False: currently generated,
937 935 # - True: generation done.
938 936 self._generated = None
939 937 self.mandatory = mandatory
940 938
941 939 def __repr__(self):
942 940 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
943 941 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
944 942 % (cls, id(self), self.id, self.type, self.mandatory))
945 943
946 944 def copy(self):
947 945 """return a copy of the part
948 946
949 947 The new part have the very same content but no partid assigned yet.
950 948 Parts with generated data cannot be copied."""
951 949 assert not util.safehasattr(self.data, 'next')
952 950 return self.__class__(self.type, self._mandatoryparams,
953 951 self._advisoryparams, self._data, self.mandatory)
954 952
955 953 # methods used to defines the part content
956 954 @property
957 955 def data(self):
958 956 return self._data
959 957
960 958 @data.setter
961 959 def data(self, data):
962 960 if self._generated is not None:
963 961 raise error.ReadOnlyPartError('part is being generated')
964 962 self._data = data
965 963
966 964 @property
967 965 def mandatoryparams(self):
968 966 # make it an immutable tuple to force people through ``addparam``
969 967 return tuple(self._mandatoryparams)
970 968
971 969 @property
972 970 def advisoryparams(self):
973 971 # make it an immutable tuple to force people through ``addparam``
974 972 return tuple(self._advisoryparams)
975 973
976 974 def addparam(self, name, value='', mandatory=True):
977 975 """add a parameter to the part
978 976
979 977 If 'mandatory' is set to True, the remote handler must claim support
980 978 for this parameter or the unbundling will be aborted.
981 979
982 980 The 'name' and 'value' cannot exceed 255 bytes each.
983 981 """
984 982 if self._generated is not None:
985 983 raise error.ReadOnlyPartError('part is being generated')
986 984 if name in self._seenparams:
987 985 raise ValueError('duplicated params: %s' % name)
988 986 self._seenparams.add(name)
989 987 params = self._advisoryparams
990 988 if mandatory:
991 989 params = self._mandatoryparams
992 990 params.append((name, value))
993 991
994 992 # methods used to generates the bundle2 stream
995 993 def getchunks(self, ui):
996 994 if self._generated is not None:
997 995 raise error.ProgrammingError('part can only be consumed once')
998 996 self._generated = False
999 997
1000 998 if ui.debugflag:
1001 999 msg = ['bundle2-output-part: "%s"' % self.type]
1002 1000 if not self.mandatory:
1003 1001 msg.append(' (advisory)')
1004 1002 nbmp = len(self.mandatoryparams)
1005 1003 nbap = len(self.advisoryparams)
1006 1004 if nbmp or nbap:
1007 1005 msg.append(' (params:')
1008 1006 if nbmp:
1009 1007 msg.append(' %i mandatory' % nbmp)
1010 1008 if nbap:
1011 1009 msg.append(' %i advisory' % nbmp)
1012 1010 msg.append(')')
1013 1011 if not self.data:
1014 1012 msg.append(' empty payload')
1015 1013 elif util.safehasattr(self.data, 'next'):
1016 1014 msg.append(' streamed payload')
1017 1015 else:
1018 1016 msg.append(' %i bytes payload' % len(self.data))
1019 1017 msg.append('\n')
1020 1018 ui.debug(''.join(msg))
1021 1019
1022 1020 #### header
1023 1021 if self.mandatory:
1024 1022 parttype = self.type.upper()
1025 1023 else:
1026 1024 parttype = self.type.lower()
1027 1025 outdebug(ui, 'part %s: "%s"' % (pycompat.bytestr(self.id), parttype))
1028 1026 ## parttype
1029 1027 header = [_pack(_fparttypesize, len(parttype)),
1030 1028 parttype, _pack(_fpartid, self.id),
1031 1029 ]
1032 1030 ## parameters
1033 1031 # count
1034 1032 manpar = self.mandatoryparams
1035 1033 advpar = self.advisoryparams
1036 1034 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
1037 1035 # size
1038 1036 parsizes = []
1039 1037 for key, value in manpar:
1040 1038 parsizes.append(len(key))
1041 1039 parsizes.append(len(value))
1042 1040 for key, value in advpar:
1043 1041 parsizes.append(len(key))
1044 1042 parsizes.append(len(value))
1045 1043 paramsizes = _pack(_makefpartparamsizes(len(parsizes) // 2), *parsizes)
1046 1044 header.append(paramsizes)
1047 1045 # key, value
1048 1046 for key, value in manpar:
1049 1047 header.append(key)
1050 1048 header.append(value)
1051 1049 for key, value in advpar:
1052 1050 header.append(key)
1053 1051 header.append(value)
1054 1052 ## finalize header
1055 1053 headerchunk = ''.join(header)
1056 1054 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
1057 1055 yield _pack(_fpartheadersize, len(headerchunk))
1058 1056 yield headerchunk
1059 1057 ## payload
1060 1058 try:
1061 1059 for chunk in self._payloadchunks():
1062 1060 outdebug(ui, 'payload chunk size: %i' % len(chunk))
1063 1061 yield _pack(_fpayloadsize, len(chunk))
1064 1062 yield chunk
1065 1063 except GeneratorExit:
1066 1064 # GeneratorExit means that nobody is listening for our
1067 1065 # results anyway, so just bail quickly rather than trying
1068 1066 # to produce an error part.
1069 1067 ui.debug('bundle2-generatorexit\n')
1070 1068 raise
1071 1069 except BaseException as exc:
1072 1070 bexc = util.forcebytestr(exc)
1073 1071 # backup exception data for later
1074 1072 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
1075 1073 % bexc)
1076 1074 tb = sys.exc_info()[2]
1077 1075 msg = 'unexpected error: %s' % bexc
1078 1076 interpart = bundlepart('error:abort', [('message', msg)],
1079 1077 mandatory=False)
1080 1078 interpart.id = 0
1081 1079 yield _pack(_fpayloadsize, -1)
1082 1080 for chunk in interpart.getchunks(ui=ui):
1083 1081 yield chunk
1084 1082 outdebug(ui, 'closing payload chunk')
1085 1083 # abort current part payload
1086 1084 yield _pack(_fpayloadsize, 0)
1087 1085 pycompat.raisewithtb(exc, tb)
1088 1086 # end of payload
1089 1087 outdebug(ui, 'closing payload chunk')
1090 1088 yield _pack(_fpayloadsize, 0)
1091 1089 self._generated = True
1092 1090
1093 1091 def _payloadchunks(self):
1094 1092 """yield chunks of a the part payload
1095 1093
1096 1094 Exists to handle the different methods to provide data to a part."""
1097 1095 # we only support fixed size data now.
1098 1096 # This will be improved in the future.
1099 1097 if (util.safehasattr(self.data, 'next')
1100 1098 or util.safehasattr(self.data, '__next__')):
1101 1099 buff = util.chunkbuffer(self.data)
1102 1100 chunk = buff.read(preferedchunksize)
1103 1101 while chunk:
1104 1102 yield chunk
1105 1103 chunk = buff.read(preferedchunksize)
1106 1104 elif len(self.data):
1107 1105 yield self.data
1108 1106
1109 1107
1110 1108 flaginterrupt = -1
1111 1109
1112 1110 class interrupthandler(unpackermixin):
1113 1111 """read one part and process it with restricted capability
1114 1112
1115 1113 This allows to transmit exception raised on the producer size during part
1116 1114 iteration while the consumer is reading a part.
1117 1115
1118 1116 Part processed in this manner only have access to a ui object,"""
1119 1117
1120 1118 def __init__(self, ui, fp):
1121 1119 super(interrupthandler, self).__init__(fp)
1122 1120 self.ui = ui
1123 1121
1124 1122 def _readpartheader(self):
1125 1123 """reads a part header size and return the bytes blob
1126 1124
1127 1125 returns None if empty"""
1128 1126 headersize = self._unpack(_fpartheadersize)[0]
1129 1127 if headersize < 0:
1130 1128 raise error.BundleValueError('negative part header size: %i'
1131 1129 % headersize)
1132 1130 indebug(self.ui, 'part header size: %i\n' % headersize)
1133 1131 if headersize:
1134 1132 return self._readexact(headersize)
1135 1133 return None
1136 1134
1137 1135 def __call__(self):
1138 1136
1139 1137 self.ui.debug('bundle2-input-stream-interrupt:'
1140 1138 ' opening out of band context\n')
1141 1139 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1142 1140 headerblock = self._readpartheader()
1143 1141 if headerblock is None:
1144 1142 indebug(self.ui, 'no part found during interruption.')
1145 1143 return
1146 1144 part = unbundlepart(self.ui, headerblock, self._fp)
1147 1145 op = interruptoperation(self.ui)
1148 1146 _processpart(op, part)
1149 1147 self.ui.debug('bundle2-input-stream-interrupt:'
1150 1148 ' closing out of band context\n')
1151 1149
1152 1150 class interruptoperation(object):
1153 1151 """A limited operation to be use by part handler during interruption
1154 1152
1155 1153 It only have access to an ui object.
1156 1154 """
1157 1155
1158 1156 def __init__(self, ui):
1159 1157 self.ui = ui
1160 1158 self.reply = None
1161 1159 self.captureoutput = False
1162 1160
1163 1161 @property
1164 1162 def repo(self):
1165 1163 raise error.ProgrammingError('no repo access from stream interruption')
1166 1164
1167 1165 def gettransaction(self):
1168 1166 raise TransactionUnavailable('no repo access from stream interruption')
1169 1167
1170 1168 class unbundlepart(unpackermixin):
1171 1169 """a bundle part read from a bundle"""
1172 1170
1173 1171 def __init__(self, ui, header, fp):
1174 1172 super(unbundlepart, self).__init__(fp)
1175 1173 self._seekable = (util.safehasattr(fp, 'seek') and
1176 1174 util.safehasattr(fp, 'tell'))
1177 1175 self.ui = ui
1178 1176 # unbundle state attr
1179 1177 self._headerdata = header
1180 1178 self._headeroffset = 0
1181 1179 self._initialized = False
1182 1180 self.consumed = False
1183 1181 # part data
1184 1182 self.id = None
1185 1183 self.type = None
1186 1184 self.mandatoryparams = None
1187 1185 self.advisoryparams = None
1188 1186 self.params = None
1189 1187 self.mandatorykeys = ()
1190 1188 self._payloadstream = None
1191 1189 self._readheader()
1192 1190 self._mandatory = None
1193 1191 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1194 1192 self._pos = 0
1195 1193
1196 1194 def _fromheader(self, size):
1197 1195 """return the next <size> byte from the header"""
1198 1196 offset = self._headeroffset
1199 1197 data = self._headerdata[offset:(offset + size)]
1200 1198 self._headeroffset = offset + size
1201 1199 return data
1202 1200
1203 1201 def _unpackheader(self, format):
1204 1202 """read given format from header
1205 1203
1206 1204 This automatically compute the size of the format to read."""
1207 1205 data = self._fromheader(struct.calcsize(format))
1208 1206 return _unpack(format, data)
1209 1207
1210 1208 def _initparams(self, mandatoryparams, advisoryparams):
1211 1209 """internal function to setup all logic related parameters"""
1212 1210 # make it read only to prevent people touching it by mistake.
1213 1211 self.mandatoryparams = tuple(mandatoryparams)
1214 1212 self.advisoryparams = tuple(advisoryparams)
1215 1213 # user friendly UI
1216 1214 self.params = util.sortdict(self.mandatoryparams)
1217 1215 self.params.update(self.advisoryparams)
1218 1216 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1219 1217
1220 1218 def _payloadchunks(self, chunknum=0):
1221 1219 '''seek to specified chunk and start yielding data'''
1222 1220 if len(self._chunkindex) == 0:
1223 1221 assert chunknum == 0, 'Must start with chunk 0'
1224 1222 self._chunkindex.append((0, self._tellfp()))
1225 1223 else:
1226 1224 assert chunknum < len(self._chunkindex), \
1227 1225 'Unknown chunk %d' % chunknum
1228 1226 self._seekfp(self._chunkindex[chunknum][1])
1229 1227
1230 1228 pos = self._chunkindex[chunknum][0]
1231 1229 payloadsize = self._unpack(_fpayloadsize)[0]
1232 1230 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1233 1231 while payloadsize:
1234 1232 if payloadsize == flaginterrupt:
1235 1233 # interruption detection, the handler will now read a
1236 1234 # single part and process it.
1237 1235 interrupthandler(self.ui, self._fp)()
1238 1236 elif payloadsize < 0:
1239 1237 msg = 'negative payload chunk size: %i' % payloadsize
1240 1238 raise error.BundleValueError(msg)
1241 1239 else:
1242 1240 result = self._readexact(payloadsize)
1243 1241 chunknum += 1
1244 1242 pos += payloadsize
1245 1243 if chunknum == len(self._chunkindex):
1246 1244 self._chunkindex.append((pos, self._tellfp()))
1247 1245 yield result
1248 1246 payloadsize = self._unpack(_fpayloadsize)[0]
1249 1247 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1250 1248
1251 1249 def _findchunk(self, pos):
1252 1250 '''for a given payload position, return a chunk number and offset'''
1253 1251 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1254 1252 if ppos == pos:
1255 1253 return chunk, 0
1256 1254 elif ppos > pos:
1257 1255 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1258 1256 raise ValueError('Unknown chunk')
1259 1257
1260 1258 def _readheader(self):
1261 1259 """read the header and setup the object"""
1262 1260 typesize = self._unpackheader(_fparttypesize)[0]
1263 1261 self.type = self._fromheader(typesize)
1264 1262 indebug(self.ui, 'part type: "%s"' % self.type)
1265 1263 self.id = self._unpackheader(_fpartid)[0]
1266 1264 indebug(self.ui, 'part id: "%s"' % pycompat.bytestr(self.id))
1267 1265 # extract mandatory bit from type
1268 1266 self.mandatory = (self.type != self.type.lower())
1269 1267 self.type = self.type.lower()
1270 1268 ## reading parameters
1271 1269 # param count
1272 1270 mancount, advcount = self._unpackheader(_fpartparamcount)
1273 1271 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1274 1272 # param size
1275 1273 fparamsizes = _makefpartparamsizes(mancount + advcount)
1276 1274 paramsizes = self._unpackheader(fparamsizes)
1277 1275 # make it a list of couple again
1278 1276 paramsizes = list(zip(paramsizes[::2], paramsizes[1::2]))
1279 1277 # split mandatory from advisory
1280 1278 mansizes = paramsizes[:mancount]
1281 1279 advsizes = paramsizes[mancount:]
1282 1280 # retrieve param value
1283 1281 manparams = []
1284 1282 for key, value in mansizes:
1285 1283 manparams.append((self._fromheader(key), self._fromheader(value)))
1286 1284 advparams = []
1287 1285 for key, value in advsizes:
1288 1286 advparams.append((self._fromheader(key), self._fromheader(value)))
1289 1287 self._initparams(manparams, advparams)
1290 1288 ## part payload
1291 1289 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1292 1290 # we read the data, tell it
1293 1291 self._initialized = True
1294 1292
1295 1293 def read(self, size=None):
1296 1294 """read payload data"""
1297 1295 if not self._initialized:
1298 1296 self._readheader()
1299 1297 if size is None:
1300 1298 data = self._payloadstream.read()
1301 1299 else:
1302 1300 data = self._payloadstream.read(size)
1303 1301 self._pos += len(data)
1304 1302 if size is None or len(data) < size:
1305 1303 if not self.consumed and self._pos:
1306 1304 self.ui.debug('bundle2-input-part: total payload size %i\n'
1307 1305 % self._pos)
1308 1306 self.consumed = True
1309 1307 return data
1310 1308
1311 1309 def tell(self):
1312 1310 return self._pos
1313 1311
1314 1312 def seek(self, offset, whence=0):
1315 1313 if whence == 0:
1316 1314 newpos = offset
1317 1315 elif whence == 1:
1318 1316 newpos = self._pos + offset
1319 1317 elif whence == 2:
1320 1318 if not self.consumed:
1321 1319 self.read()
1322 1320 newpos = self._chunkindex[-1][0] - offset
1323 1321 else:
1324 1322 raise ValueError('Unknown whence value: %r' % (whence,))
1325 1323
1326 1324 if newpos > self._chunkindex[-1][0] and not self.consumed:
1327 1325 self.read()
1328 1326 if not 0 <= newpos <= self._chunkindex[-1][0]:
1329 1327 raise ValueError('Offset out of range')
1330 1328
1331 1329 if self._pos != newpos:
1332 1330 chunk, internaloffset = self._findchunk(newpos)
1333 1331 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1334 1332 adjust = self.read(internaloffset)
1335 1333 if len(adjust) != internaloffset:
1336 1334 raise error.Abort(_('Seek failed\n'))
1337 1335 self._pos = newpos
1338 1336
1339 1337 def _seekfp(self, offset, whence=0):
1340 1338 """move the underlying file pointer
1341 1339
1342 1340 This method is meant for internal usage by the bundle2 protocol only.
1343 1341 They directly manipulate the low level stream including bundle2 level
1344 1342 instruction.
1345 1343
1346 1344 Do not use it to implement higher-level logic or methods."""
1347 1345 if self._seekable:
1348 1346 return self._fp.seek(offset, whence)
1349 1347 else:
1350 1348 raise NotImplementedError(_('File pointer is not seekable'))
1351 1349
1352 1350 def _tellfp(self):
1353 1351 """return the file offset, or None if file is not seekable
1354 1352
1355 1353 This method is meant for internal usage by the bundle2 protocol only.
1356 1354 They directly manipulate the low level stream including bundle2 level
1357 1355 instruction.
1358 1356
1359 1357 Do not use it to implement higher-level logic or methods."""
1360 1358 if self._seekable:
1361 1359 try:
1362 1360 return self._fp.tell()
1363 1361 except IOError as e:
1364 1362 if e.errno == errno.ESPIPE:
1365 1363 self._seekable = False
1366 1364 else:
1367 1365 raise
1368 1366 return None
1369 1367
1370 1368 # These are only the static capabilities.
1371 1369 # Check the 'getrepocaps' function for the rest.
1372 1370 capabilities = {'HG20': (),
1373 1371 'error': ('abort', 'unsupportedcontent', 'pushraced',
1374 1372 'pushkey'),
1375 1373 'listkeys': (),
1376 1374 'pushkey': (),
1377 1375 'digests': tuple(sorted(util.DIGESTS.keys())),
1378 1376 'remote-changegroup': ('http', 'https'),
1379 1377 'hgtagsfnodes': (),
1380 1378 }
1381 1379
1382 1380 def getrepocaps(repo, allowpushback=False):
1383 1381 """return the bundle2 capabilities for a given repo
1384 1382
1385 1383 Exists to allow extensions (like evolution) to mutate the capabilities.
1386 1384 """
1387 1385 caps = capabilities.copy()
1388 1386 caps['changegroup'] = tuple(sorted(
1389 1387 changegroup.supportedincomingversions(repo)))
1390 1388 if obsolete.isenabled(repo, obsolete.exchangeopt):
1391 1389 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1392 1390 caps['obsmarkers'] = supportedformat
1393 1391 if allowpushback:
1394 1392 caps['pushback'] = ()
1395 1393 cpmode = repo.ui.config('server', 'concurrent-push-mode')
1396 1394 if cpmode == 'check-related':
1397 1395 caps['checkheads'] = ('related',)
1398 1396 return caps
1399 1397
1400 1398 def bundle2caps(remote):
1401 1399 """return the bundle capabilities of a peer as dict"""
1402 1400 raw = remote.capable('bundle2')
1403 1401 if not raw and raw != '':
1404 1402 return {}
1405 1403 capsblob = urlreq.unquote(remote.capable('bundle2'))
1406 1404 return decodecaps(capsblob)
1407 1405
1408 1406 def obsmarkersversion(caps):
1409 1407 """extract the list of supported obsmarkers versions from a bundle2caps dict
1410 1408 """
1411 1409 obscaps = caps.get('obsmarkers', ())
1412 1410 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1413 1411
1414 1412 def writenewbundle(ui, repo, source, filename, bundletype, outgoing, opts,
1415 1413 vfs=None, compression=None, compopts=None):
1416 1414 if bundletype.startswith('HG10'):
1417 1415 cg = changegroup.makechangegroup(repo, outgoing, '01', source)
1418 1416 return writebundle(ui, cg, filename, bundletype, vfs=vfs,
1419 1417 compression=compression, compopts=compopts)
1420 1418 elif not bundletype.startswith('HG20'):
1421 1419 raise error.ProgrammingError('unknown bundle type: %s' % bundletype)
1422 1420
1423 1421 caps = {}
1424 1422 if 'obsolescence' in opts:
1425 1423 caps['obsmarkers'] = ('V1',)
1426 1424 bundle = bundle20(ui, caps)
1427 1425 bundle.setcompression(compression, compopts)
1428 1426 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1429 1427 chunkiter = bundle.getchunks()
1430 1428
1431 1429 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1432 1430
1433 1431 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1434 1432 # We should eventually reconcile this logic with the one behind
1435 1433 # 'exchange.getbundle2partsgenerator'.
1436 1434 #
1437 1435 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1438 1436 # different right now. So we keep them separated for now for the sake of
1439 1437 # simplicity.
1440 1438
1441 1439 # we always want a changegroup in such bundle
1442 1440 cgversion = opts.get('cg.version')
1443 1441 if cgversion is None:
1444 1442 cgversion = changegroup.safeversion(repo)
1445 1443 cg = changegroup.makechangegroup(repo, outgoing, cgversion, source)
1446 1444 part = bundler.newpart('changegroup', data=cg.getchunks())
1447 1445 part.addparam('version', cg.version)
1448 1446 if 'clcount' in cg.extras:
1449 1447 part.addparam('nbchanges', str(cg.extras['clcount']),
1450 1448 mandatory=False)
1451 1449 if opts.get('phases') and repo.revs('%ln and secret()',
1452 1450 outgoing.missingheads):
1453 1451 part.addparam('targetphase', '%d' % phases.secret, mandatory=False)
1454 1452
1455 1453 addparttagsfnodescache(repo, bundler, outgoing)
1456 1454
1457 1455 if opts.get('obsolescence', False):
1458 1456 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1459 1457 buildobsmarkerspart(bundler, obsmarkers)
1460 1458
1461 1459 if opts.get('phases', False):
1462 1460 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1463 1461 phasedata = []
1464 1462 for phase in phases.allphases:
1465 1463 for head in headsbyphase[phase]:
1466 1464 phasedata.append(_pack(_fphasesentry, phase, head))
1467 1465 bundler.newpart('phase-heads', data=''.join(phasedata))
1468 1466
1469 1467 def addparttagsfnodescache(repo, bundler, outgoing):
1470 1468 # we include the tags fnode cache for the bundle changeset
1471 1469 # (as an optional parts)
1472 1470 cache = tags.hgtagsfnodescache(repo.unfiltered())
1473 1471 chunks = []
1474 1472
1475 1473 # .hgtags fnodes are only relevant for head changesets. While we could
1476 1474 # transfer values for all known nodes, there will likely be little to
1477 1475 # no benefit.
1478 1476 #
1479 1477 # We don't bother using a generator to produce output data because
1480 1478 # a) we only have 40 bytes per head and even esoteric numbers of heads
1481 1479 # consume little memory (1M heads is 40MB) b) we don't want to send the
1482 1480 # part if we don't have entries and knowing if we have entries requires
1483 1481 # cache lookups.
1484 1482 for node in outgoing.missingheads:
1485 1483 # Don't compute missing, as this may slow down serving.
1486 1484 fnode = cache.getfnode(node, computemissing=False)
1487 1485 if fnode is not None:
1488 1486 chunks.extend([node, fnode])
1489 1487
1490 1488 if chunks:
1491 1489 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1492 1490
1493 1491 def buildobsmarkerspart(bundler, markers):
1494 1492 """add an obsmarker part to the bundler with <markers>
1495 1493
1496 1494 No part is created if markers is empty.
1497 1495 Raises ValueError if the bundler doesn't support any known obsmarker format.
1498 1496 """
1499 1497 if not markers:
1500 1498 return None
1501 1499
1502 1500 remoteversions = obsmarkersversion(bundler.capabilities)
1503 1501 version = obsolete.commonversion(remoteversions)
1504 1502 if version is None:
1505 1503 raise ValueError('bundler does not support common obsmarker format')
1506 1504 stream = obsolete.encodemarkers(markers, True, version=version)
1507 1505 return bundler.newpart('obsmarkers', data=stream)
1508 1506
1509 1507 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1510 1508 compopts=None):
1511 1509 """Write a bundle file and return its filename.
1512 1510
1513 1511 Existing files will not be overwritten.
1514 1512 If no filename is specified, a temporary file is created.
1515 1513 bz2 compression can be turned off.
1516 1514 The bundle file will be deleted in case of errors.
1517 1515 """
1518 1516
1519 1517 if bundletype == "HG20":
1520 1518 bundle = bundle20(ui)
1521 1519 bundle.setcompression(compression, compopts)
1522 1520 part = bundle.newpart('changegroup', data=cg.getchunks())
1523 1521 part.addparam('version', cg.version)
1524 1522 if 'clcount' in cg.extras:
1525 1523 part.addparam('nbchanges', str(cg.extras['clcount']),
1526 1524 mandatory=False)
1527 1525 chunkiter = bundle.getchunks()
1528 1526 else:
1529 1527 # compression argument is only for the bundle2 case
1530 1528 assert compression is None
1531 1529 if cg.version != '01':
1532 1530 raise error.Abort(_('old bundle types only supports v1 '
1533 1531 'changegroups'))
1534 1532 header, comp = bundletypes[bundletype]
1535 1533 if comp not in util.compengines.supportedbundletypes:
1536 1534 raise error.Abort(_('unknown stream compression type: %s')
1537 1535 % comp)
1538 1536 compengine = util.compengines.forbundletype(comp)
1539 1537 def chunkiter():
1540 1538 yield header
1541 1539 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1542 1540 yield chunk
1543 1541 chunkiter = chunkiter()
1544 1542
1545 1543 # parse the changegroup data, otherwise we will block
1546 1544 # in case of sshrepo because we don't know the end of the stream
1547 1545 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1548 1546
1549 1547 def combinechangegroupresults(op):
1550 1548 """logic to combine 0 or more addchangegroup results into one"""
1551 1549 results = [r.get('return', 0)
1552 1550 for r in op.records['changegroup']]
1553 1551 changedheads = 0
1554 1552 result = 1
1555 1553 for ret in results:
1556 1554 # If any changegroup result is 0, return 0
1557 1555 if ret == 0:
1558 1556 result = 0
1559 1557 break
1560 1558 if ret < -1:
1561 1559 changedheads += ret + 1
1562 1560 elif ret > 1:
1563 1561 changedheads += ret - 1
1564 1562 if changedheads > 0:
1565 1563 result = 1 + changedheads
1566 1564 elif changedheads < 0:
1567 1565 result = -1 + changedheads
1568 1566 return result
1569 1567
1570 1568 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest',
1571 1569 'targetphase'))
1572 1570 def handlechangegroup(op, inpart):
1573 1571 """apply a changegroup part on the repo
1574 1572
1575 1573 This is a very early implementation that will massive rework before being
1576 1574 inflicted to any end-user.
1577 1575 """
1578 1576 tr = op.gettransaction()
1579 1577 unpackerversion = inpart.params.get('version', '01')
1580 1578 # We should raise an appropriate exception here
1581 1579 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1582 1580 # the source and url passed here are overwritten by the one contained in
1583 1581 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1584 1582 nbchangesets = None
1585 1583 if 'nbchanges' in inpart.params:
1586 1584 nbchangesets = int(inpart.params.get('nbchanges'))
1587 1585 if ('treemanifest' in inpart.params and
1588 1586 'treemanifest' not in op.repo.requirements):
1589 1587 if len(op.repo.changelog) != 0:
1590 1588 raise error.Abort(_(
1591 1589 "bundle contains tree manifests, but local repo is "
1592 1590 "non-empty and does not use tree manifests"))
1593 1591 op.repo.requirements.add('treemanifest')
1594 1592 op.repo._applyopenerreqs()
1595 1593 op.repo._writerequirements()
1596 1594 extrakwargs = {}
1597 1595 targetphase = inpart.params.get('targetphase')
1598 1596 if targetphase is not None:
1599 1597 extrakwargs['targetphase'] = int(targetphase)
1600 1598 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2',
1601 1599 expectedtotal=nbchangesets, **extrakwargs)
1602 1600 if op.reply is not None:
1603 1601 # This is definitely not the final form of this
1604 1602 # return. But one need to start somewhere.
1605 1603 part = op.reply.newpart('reply:changegroup', mandatory=False)
1606 1604 part.addparam(
1607 1605 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1608 1606 part.addparam('return', '%i' % ret, mandatory=False)
1609 1607 assert not inpart.read()
1610 1608
1611 1609 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1612 1610 ['digest:%s' % k for k in util.DIGESTS.keys()])
1613 1611 @parthandler('remote-changegroup', _remotechangegroupparams)
1614 1612 def handleremotechangegroup(op, inpart):
1615 1613 """apply a bundle10 on the repo, given an url and validation information
1616 1614
1617 1615 All the information about the remote bundle to import are given as
1618 1616 parameters. The parameters include:
1619 1617 - url: the url to the bundle10.
1620 1618 - size: the bundle10 file size. It is used to validate what was
1621 1619 retrieved by the client matches the server knowledge about the bundle.
1622 1620 - digests: a space separated list of the digest types provided as
1623 1621 parameters.
1624 1622 - digest:<digest-type>: the hexadecimal representation of the digest with
1625 1623 that name. Like the size, it is used to validate what was retrieved by
1626 1624 the client matches what the server knows about the bundle.
1627 1625
1628 1626 When multiple digest types are given, all of them are checked.
1629 1627 """
1630 1628 try:
1631 1629 raw_url = inpart.params['url']
1632 1630 except KeyError:
1633 1631 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1634 1632 parsed_url = util.url(raw_url)
1635 1633 if parsed_url.scheme not in capabilities['remote-changegroup']:
1636 1634 raise error.Abort(_('remote-changegroup does not support %s urls') %
1637 1635 parsed_url.scheme)
1638 1636
1639 1637 try:
1640 1638 size = int(inpart.params['size'])
1641 1639 except ValueError:
1642 1640 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1643 1641 % 'size')
1644 1642 except KeyError:
1645 1643 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1646 1644
1647 1645 digests = {}
1648 1646 for typ in inpart.params.get('digests', '').split():
1649 1647 param = 'digest:%s' % typ
1650 1648 try:
1651 1649 value = inpart.params[param]
1652 1650 except KeyError:
1653 1651 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1654 1652 param)
1655 1653 digests[typ] = value
1656 1654
1657 1655 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1658 1656
1659 1657 tr = op.gettransaction()
1660 1658 from . import exchange
1661 1659 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1662 1660 if not isinstance(cg, changegroup.cg1unpacker):
1663 1661 raise error.Abort(_('%s: not a bundle version 1.0') %
1664 1662 util.hidepassword(raw_url))
1665 1663 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2')
1666 1664 if op.reply is not None:
1667 1665 # This is definitely not the final form of this
1668 1666 # return. But one need to start somewhere.
1669 1667 part = op.reply.newpart('reply:changegroup')
1670 1668 part.addparam(
1671 1669 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1672 1670 part.addparam('return', '%i' % ret, mandatory=False)
1673 1671 try:
1674 1672 real_part.validate()
1675 1673 except error.Abort as e:
1676 1674 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1677 1675 (util.hidepassword(raw_url), str(e)))
1678 1676 assert not inpart.read()
1679 1677
1680 1678 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1681 1679 def handlereplychangegroup(op, inpart):
1682 1680 ret = int(inpart.params['return'])
1683 1681 replyto = int(inpart.params['in-reply-to'])
1684 1682 op.records.add('changegroup', {'return': ret}, replyto)
1685 1683
1686 1684 @parthandler('check:heads')
1687 1685 def handlecheckheads(op, inpart):
1688 1686 """check that head of the repo did not change
1689 1687
1690 1688 This is used to detect a push race when using unbundle.
1691 1689 This replaces the "heads" argument of unbundle."""
1692 1690 h = inpart.read(20)
1693 1691 heads = []
1694 1692 while len(h) == 20:
1695 1693 heads.append(h)
1696 1694 h = inpart.read(20)
1697 1695 assert not h
1698 1696 # Trigger a transaction so that we are guaranteed to have the lock now.
1699 1697 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1700 1698 op.gettransaction()
1701 1699 if sorted(heads) != sorted(op.repo.heads()):
1702 1700 raise error.PushRaced('repository changed while pushing - '
1703 1701 'please try again')
1704 1702
1705 1703 @parthandler('check:updated-heads')
1706 1704 def handlecheckupdatedheads(op, inpart):
1707 1705 """check for race on the heads touched by a push
1708 1706
1709 1707 This is similar to 'check:heads' but focus on the heads actually updated
1710 1708 during the push. If other activities happen on unrelated heads, it is
1711 1709 ignored.
1712 1710
1713 1711 This allow server with high traffic to avoid push contention as long as
1714 1712 unrelated parts of the graph are involved."""
1715 1713 h = inpart.read(20)
1716 1714 heads = []
1717 1715 while len(h) == 20:
1718 1716 heads.append(h)
1719 1717 h = inpart.read(20)
1720 1718 assert not h
1721 1719 # trigger a transaction so that we are guaranteed to have the lock now.
1722 1720 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1723 1721 op.gettransaction()
1724 1722
1725 1723 currentheads = set()
1726 1724 for ls in op.repo.branchmap().itervalues():
1727 1725 currentheads.update(ls)
1728 1726
1729 1727 for h in heads:
1730 1728 if h not in currentheads:
1731 1729 raise error.PushRaced('repository changed while pushing - '
1732 1730 'please try again')
1733 1731
1734 1732 @parthandler('output')
1735 1733 def handleoutput(op, inpart):
1736 1734 """forward output captured on the server to the client"""
1737 1735 for line in inpart.read().splitlines():
1738 1736 op.ui.status(_('remote: %s\n') % line)
1739 1737
1740 1738 @parthandler('replycaps')
1741 1739 def handlereplycaps(op, inpart):
1742 1740 """Notify that a reply bundle should be created
1743 1741
1744 1742 The payload contains the capabilities information for the reply"""
1745 1743 caps = decodecaps(inpart.read())
1746 1744 if op.reply is None:
1747 1745 op.reply = bundle20(op.ui, caps)
1748 1746
1749 1747 class AbortFromPart(error.Abort):
1750 1748 """Sub-class of Abort that denotes an error from a bundle2 part."""
1751 1749
1752 1750 @parthandler('error:abort', ('message', 'hint'))
1753 1751 def handleerrorabort(op, inpart):
1754 1752 """Used to transmit abort error over the wire"""
1755 1753 raise AbortFromPart(inpart.params['message'],
1756 1754 hint=inpart.params.get('hint'))
1757 1755
1758 1756 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1759 1757 'in-reply-to'))
1760 1758 def handleerrorpushkey(op, inpart):
1761 1759 """Used to transmit failure of a mandatory pushkey over the wire"""
1762 1760 kwargs = {}
1763 1761 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1764 1762 value = inpart.params.get(name)
1765 1763 if value is not None:
1766 1764 kwargs[name] = value
1767 1765 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1768 1766
1769 1767 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1770 1768 def handleerrorunsupportedcontent(op, inpart):
1771 1769 """Used to transmit unknown content error over the wire"""
1772 1770 kwargs = {}
1773 1771 parttype = inpart.params.get('parttype')
1774 1772 if parttype is not None:
1775 1773 kwargs['parttype'] = parttype
1776 1774 params = inpart.params.get('params')
1777 1775 if params is not None:
1778 1776 kwargs['params'] = params.split('\0')
1779 1777
1780 1778 raise error.BundleUnknownFeatureError(**kwargs)
1781 1779
1782 1780 @parthandler('error:pushraced', ('message',))
1783 1781 def handleerrorpushraced(op, inpart):
1784 1782 """Used to transmit push race error over the wire"""
1785 1783 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1786 1784
1787 1785 @parthandler('listkeys', ('namespace',))
1788 1786 def handlelistkeys(op, inpart):
1789 1787 """retrieve pushkey namespace content stored in a bundle2"""
1790 1788 namespace = inpart.params['namespace']
1791 1789 r = pushkey.decodekeys(inpart.read())
1792 1790 op.records.add('listkeys', (namespace, r))
1793 1791
1794 1792 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1795 1793 def handlepushkey(op, inpart):
1796 1794 """process a pushkey request"""
1797 1795 dec = pushkey.decode
1798 1796 namespace = dec(inpart.params['namespace'])
1799 1797 key = dec(inpart.params['key'])
1800 1798 old = dec(inpart.params['old'])
1801 1799 new = dec(inpart.params['new'])
1802 1800 # Grab the transaction to ensure that we have the lock before performing the
1803 1801 # pushkey.
1804 1802 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1805 1803 op.gettransaction()
1806 1804 ret = op.repo.pushkey(namespace, key, old, new)
1807 1805 record = {'namespace': namespace,
1808 1806 'key': key,
1809 1807 'old': old,
1810 1808 'new': new}
1811 1809 op.records.add('pushkey', record)
1812 1810 if op.reply is not None:
1813 1811 rpart = op.reply.newpart('reply:pushkey')
1814 1812 rpart.addparam(
1815 1813 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1816 1814 rpart.addparam('return', '%i' % ret, mandatory=False)
1817 1815 if inpart.mandatory and not ret:
1818 1816 kwargs = {}
1819 1817 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1820 1818 if key in inpart.params:
1821 1819 kwargs[key] = inpart.params[key]
1822 1820 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1823 1821
1824 1822 def _readphaseheads(inpart):
1825 1823 headsbyphase = [[] for i in phases.allphases]
1826 1824 entrysize = struct.calcsize(_fphasesentry)
1827 1825 while True:
1828 1826 entry = inpart.read(entrysize)
1829 1827 if len(entry) < entrysize:
1830 1828 if entry:
1831 1829 raise error.Abort(_('bad phase-heads bundle part'))
1832 1830 break
1833 1831 phase, node = struct.unpack(_fphasesentry, entry)
1834 1832 headsbyphase[phase].append(node)
1835 1833 return headsbyphase
1836 1834
1837 1835 @parthandler('phase-heads')
1838 1836 def handlephases(op, inpart):
1839 1837 """apply phases from bundle part to repo"""
1840 1838 headsbyphase = _readphaseheads(inpart)
1841 1839 phases.updatephases(op.repo.unfiltered(), op.gettransaction(), headsbyphase)
1842 1840 op.records.add('phase-heads', {})
1843 1841
1844 1842 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1845 1843 def handlepushkeyreply(op, inpart):
1846 1844 """retrieve the result of a pushkey request"""
1847 1845 ret = int(inpart.params['return'])
1848 1846 partid = int(inpart.params['in-reply-to'])
1849 1847 op.records.add('pushkey', {'return': ret}, partid)
1850 1848
1851 1849 @parthandler('obsmarkers')
1852 1850 def handleobsmarker(op, inpart):
1853 1851 """add a stream of obsmarkers to the repo"""
1854 1852 tr = op.gettransaction()
1855 1853 markerdata = inpart.read()
1856 1854 if op.ui.config('experimental', 'obsmarkers-exchange-debug'):
1857 1855 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1858 1856 % len(markerdata))
1859 1857 # The mergemarkers call will crash if marker creation is not enabled.
1860 1858 # we want to avoid this if the part is advisory.
1861 1859 if not inpart.mandatory and op.repo.obsstore.readonly:
1862 1860 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1863 1861 return
1864 1862 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1865 1863 op.repo.invalidatevolatilesets()
1866 1864 if new:
1867 1865 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1868 1866 op.records.add('obsmarkers', {'new': new})
1869 1867 if op.reply is not None:
1870 1868 rpart = op.reply.newpart('reply:obsmarkers')
1871 1869 rpart.addparam(
1872 1870 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1873 1871 rpart.addparam('new', '%i' % new, mandatory=False)
1874 1872
1875 1873
1876 1874 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1877 1875 def handleobsmarkerreply(op, inpart):
1878 1876 """retrieve the result of a pushkey request"""
1879 1877 ret = int(inpart.params['new'])
1880 1878 partid = int(inpart.params['in-reply-to'])
1881 1879 op.records.add('obsmarkers', {'new': ret}, partid)
1882 1880
1883 1881 @parthandler('hgtagsfnodes')
1884 1882 def handlehgtagsfnodes(op, inpart):
1885 1883 """Applies .hgtags fnodes cache entries to the local repo.
1886 1884
1887 1885 Payload is pairs of 20 byte changeset nodes and filenodes.
1888 1886 """
1889 1887 # Grab the transaction so we ensure that we have the lock at this point.
1890 1888 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1891 1889 op.gettransaction()
1892 1890 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1893 1891
1894 1892 count = 0
1895 1893 while True:
1896 1894 node = inpart.read(20)
1897 1895 fnode = inpart.read(20)
1898 1896 if len(node) < 20 or len(fnode) < 20:
1899 1897 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1900 1898 break
1901 1899 cache.setfnode(node, fnode)
1902 1900 count += 1
1903 1901
1904 1902 cache.write()
1905 1903 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
1906 1904
1907 1905 @parthandler('pushvars')
1908 1906 def bundle2getvars(op, part):
1909 1907 '''unbundle a bundle2 containing shellvars on the server'''
1910 1908 # An option to disable unbundling on server-side for security reasons
1911 1909 if op.ui.configbool('push', 'pushvars.server'):
1912 1910 hookargs = {}
1913 1911 for key, value in part.advisoryparams:
1914 1912 key = key.upper()
1915 1913 # We want pushed variables to have USERVAR_ prepended so we know
1916 1914 # they came from the --pushvar flag.
1917 1915 key = "USERVAR_" + key
1918 1916 hookargs[key] = value
1919 1917 op.addhookargs(hookargs)
General Comments 0
You need to be logged in to leave comments. Login now