##// END OF EJS Templates
bundle2: move handler validation out of processpart...
Durham Goode -
r34260:07e4170f default
parent child Browse files
Show More
@@ -1,1932 +1,1939 b''
1 1 # bundle2.py - generic container format to transmit arbitrary data.
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 """Handling of the new bundle2 format
8 8
9 9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 11 that will be handed to and processed by the application layer.
12 12
13 13
14 14 General format architecture
15 15 ===========================
16 16
17 17 The format is architectured as follow
18 18
19 19 - magic string
20 20 - stream level parameters
21 21 - payload parts (any number)
22 22 - end of stream marker.
23 23
24 24 the Binary format
25 25 ============================
26 26
27 27 All numbers are unsigned and big-endian.
28 28
29 29 stream level parameters
30 30 ------------------------
31 31
32 32 Binary format is as follow
33 33
34 34 :params size: int32
35 35
36 36 The total number of Bytes used by the parameters
37 37
38 38 :params value: arbitrary number of Bytes
39 39
40 40 A blob of `params size` containing the serialized version of all stream level
41 41 parameters.
42 42
43 43 The blob contains a space separated list of parameters. Parameters with value
44 44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45 45
46 46 Empty name are obviously forbidden.
47 47
48 48 Name MUST start with a letter. If this first letter is lower case, the
49 49 parameter is advisory and can be safely ignored. However when the first
50 50 letter is capital, the parameter is mandatory and the bundling process MUST
51 51 stop if he is not able to proceed it.
52 52
53 53 Stream parameters use a simple textual format for two main reasons:
54 54
55 55 - Stream level parameters should remain simple and we want to discourage any
56 56 crazy usage.
57 57 - Textual data allow easy human inspection of a bundle2 header in case of
58 58 troubles.
59 59
60 60 Any Applicative level options MUST go into a bundle2 part instead.
61 61
62 62 Payload part
63 63 ------------------------
64 64
65 65 Binary format is as follow
66 66
67 67 :header size: int32
68 68
69 69 The total number of Bytes used by the part header. When the header is empty
70 70 (size = 0) this is interpreted as the end of stream marker.
71 71
72 72 :header:
73 73
74 74 The header defines how to interpret the part. It contains two piece of
75 75 data: the part type, and the part parameters.
76 76
77 77 The part type is used to route an application level handler, that can
78 78 interpret payload.
79 79
80 80 Part parameters are passed to the application level handler. They are
81 81 meant to convey information that will help the application level object to
82 82 interpret the part payload.
83 83
84 84 The binary format of the header is has follow
85 85
86 86 :typesize: (one byte)
87 87
88 88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89 89
90 90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 91 to this part.
92 92
93 93 :parameters:
94 94
95 95 Part's parameter may have arbitrary content, the binary structure is::
96 96
97 97 <mandatory-count><advisory-count><param-sizes><param-data>
98 98
99 99 :mandatory-count: 1 byte, number of mandatory parameters
100 100
101 101 :advisory-count: 1 byte, number of advisory parameters
102 102
103 103 :param-sizes:
104 104
105 105 N couple of bytes, where N is the total number of parameters. Each
106 106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107 107
108 108 :param-data:
109 109
110 110 A blob of bytes from which each parameter key and value can be
111 111 retrieved using the list of size couples stored in the previous
112 112 field.
113 113
114 114 Mandatory parameters comes first, then the advisory ones.
115 115
116 116 Each parameter's key MUST be unique within the part.
117 117
118 118 :payload:
119 119
120 120 payload is a series of `<chunksize><chunkdata>`.
121 121
122 122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124 124
125 125 The current implementation always produces either zero or one chunk.
126 126 This is an implementation limitation that will ultimately be lifted.
127 127
128 128 `chunksize` can be negative to trigger special case processing. No such
129 129 processing is in place yet.
130 130
131 131 Bundle processing
132 132 ============================
133 133
134 134 Each part is processed in order using a "part handler". Handler are registered
135 135 for a certain part type.
136 136
137 137 The matching of a part to its handler is case insensitive. The case of the
138 138 part type is used to know if a part is mandatory or advisory. If the Part type
139 139 contains any uppercase char it is considered mandatory. When no handler is
140 140 known for a Mandatory part, the process is aborted and an exception is raised.
141 141 If the part is advisory and no handler is known, the part is ignored. When the
142 142 process is aborted, the full bundle is still read from the stream to keep the
143 143 channel usable. But none of the part read from an abort are processed. In the
144 144 future, dropping the stream may become an option for channel we do not care to
145 145 preserve.
146 146 """
147 147
148 148 from __future__ import absolute_import, division
149 149
150 150 import errno
151 151 import re
152 152 import string
153 153 import struct
154 154 import sys
155 155
156 156 from .i18n import _
157 157 from . import (
158 158 changegroup,
159 159 error,
160 160 obsolete,
161 161 phases,
162 162 pushkey,
163 163 pycompat,
164 164 tags,
165 165 url,
166 166 util,
167 167 )
168 168
169 169 urlerr = util.urlerr
170 170 urlreq = util.urlreq
171 171
172 172 _pack = struct.pack
173 173 _unpack = struct.unpack
174 174
175 175 _fstreamparamsize = '>i'
176 176 _fpartheadersize = '>i'
177 177 _fparttypesize = '>B'
178 178 _fpartid = '>I'
179 179 _fpayloadsize = '>i'
180 180 _fpartparamcount = '>BB'
181 181
182 182 _fphasesentry = '>i20s'
183 183
184 184 preferedchunksize = 4096
185 185
186 186 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
187 187
188 188 def outdebug(ui, message):
189 189 """debug regarding output stream (bundling)"""
190 190 if ui.configbool('devel', 'bundle2.debug'):
191 191 ui.debug('bundle2-output: %s\n' % message)
192 192
193 193 def indebug(ui, message):
194 194 """debug on input stream (unbundling)"""
195 195 if ui.configbool('devel', 'bundle2.debug'):
196 196 ui.debug('bundle2-input: %s\n' % message)
197 197
198 198 def validateparttype(parttype):
199 199 """raise ValueError if a parttype contains invalid character"""
200 200 if _parttypeforbidden.search(parttype):
201 201 raise ValueError(parttype)
202 202
203 203 def _makefpartparamsizes(nbparams):
204 204 """return a struct format to read part parameter sizes
205 205
206 206 The number parameters is variable so we need to build that format
207 207 dynamically.
208 208 """
209 209 return '>'+('BB'*nbparams)
210 210
211 211 parthandlermapping = {}
212 212
213 213 def parthandler(parttype, params=()):
214 214 """decorator that register a function as a bundle2 part handler
215 215
216 216 eg::
217 217
218 218 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
219 219 def myparttypehandler(...):
220 220 '''process a part of type "my part".'''
221 221 ...
222 222 """
223 223 validateparttype(parttype)
224 224 def _decorator(func):
225 225 lparttype = parttype.lower() # enforce lower case matching.
226 226 assert lparttype not in parthandlermapping
227 227 parthandlermapping[lparttype] = func
228 228 func.params = frozenset(params)
229 229 return func
230 230 return _decorator
231 231
232 232 class unbundlerecords(object):
233 233 """keep record of what happens during and unbundle
234 234
235 235 New records are added using `records.add('cat', obj)`. Where 'cat' is a
236 236 category of record and obj is an arbitrary object.
237 237
238 238 `records['cat']` will return all entries of this category 'cat'.
239 239
240 240 Iterating on the object itself will yield `('category', obj)` tuples
241 241 for all entries.
242 242
243 243 All iterations happens in chronological order.
244 244 """
245 245
246 246 def __init__(self):
247 247 self._categories = {}
248 248 self._sequences = []
249 249 self._replies = {}
250 250
251 251 def add(self, category, entry, inreplyto=None):
252 252 """add a new record of a given category.
253 253
254 254 The entry can then be retrieved in the list returned by
255 255 self['category']."""
256 256 self._categories.setdefault(category, []).append(entry)
257 257 self._sequences.append((category, entry))
258 258 if inreplyto is not None:
259 259 self.getreplies(inreplyto).add(category, entry)
260 260
261 261 def getreplies(self, partid):
262 262 """get the records that are replies to a specific part"""
263 263 return self._replies.setdefault(partid, unbundlerecords())
264 264
265 265 def __getitem__(self, cat):
266 266 return tuple(self._categories.get(cat, ()))
267 267
268 268 def __iter__(self):
269 269 return iter(self._sequences)
270 270
271 271 def __len__(self):
272 272 return len(self._sequences)
273 273
274 274 def __nonzero__(self):
275 275 return bool(self._sequences)
276 276
277 277 __bool__ = __nonzero__
278 278
279 279 class bundleoperation(object):
280 280 """an object that represents a single bundling process
281 281
282 282 Its purpose is to carry unbundle-related objects and states.
283 283
284 284 A new object should be created at the beginning of each bundle processing.
285 285 The object is to be returned by the processing function.
286 286
287 287 The object has very little content now it will ultimately contain:
288 288 * an access to the repo the bundle is applied to,
289 289 * a ui object,
290 290 * a way to retrieve a transaction to add changes to the repo,
291 291 * a way to record the result of processing each part,
292 292 * a way to construct a bundle response when applicable.
293 293 """
294 294
295 295 def __init__(self, repo, transactiongetter, captureoutput=True):
296 296 self.repo = repo
297 297 self.ui = repo.ui
298 298 self.records = unbundlerecords()
299 299 self.reply = None
300 300 self.captureoutput = captureoutput
301 301 self.hookargs = {}
302 302 self._gettransaction = transactiongetter
303 303
304 304 def gettransaction(self):
305 305 transaction = self._gettransaction()
306 306
307 307 if self.hookargs:
308 308 # the ones added to the transaction supercede those added
309 309 # to the operation.
310 310 self.hookargs.update(transaction.hookargs)
311 311 transaction.hookargs = self.hookargs
312 312
313 313 # mark the hookargs as flushed. further attempts to add to
314 314 # hookargs will result in an abort.
315 315 self.hookargs = None
316 316
317 317 return transaction
318 318
319 319 def addhookargs(self, hookargs):
320 320 if self.hookargs is None:
321 321 raise error.ProgrammingError('attempted to add hookargs to '
322 322 'operation after transaction started')
323 323 self.hookargs.update(hookargs)
324 324
325 325 class TransactionUnavailable(RuntimeError):
326 326 pass
327 327
328 328 def _notransaction():
329 329 """default method to get a transaction while processing a bundle
330 330
331 331 Raise an exception to highlight the fact that no transaction was expected
332 332 to be created"""
333 333 raise TransactionUnavailable()
334 334
335 335 def applybundle(repo, unbundler, tr, source=None, url=None, **kwargs):
336 336 # transform me into unbundler.apply() as soon as the freeze is lifted
337 337 if isinstance(unbundler, unbundle20):
338 338 tr.hookargs['bundle2'] = '1'
339 339 if source is not None and 'source' not in tr.hookargs:
340 340 tr.hookargs['source'] = source
341 341 if url is not None and 'url' not in tr.hookargs:
342 342 tr.hookargs['url'] = url
343 343 return processbundle(repo, unbundler, lambda: tr)
344 344 else:
345 345 # the transactiongetter won't be used, but we might as well set it
346 346 op = bundleoperation(repo, lambda: tr)
347 347 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
348 348 return op
349 349
350 350 class partiterator(object):
351 351 def __init__(self, repo, op, unbundler):
352 352 self.repo = repo
353 353 self.op = op
354 354 self.unbundler = unbundler
355 355 self.iterator = None
356 356 self.count = 0
357 357 self.current = None
358 358
359 359 def __enter__(self):
360 360 def func():
361 361 itr = enumerate(self.unbundler.iterparts())
362 362 for count, p in itr:
363 363 self.count = count
364 364 self.current = p
365 365 yield p
366 366 p.seek(0, 2)
367 367 self.current = None
368 368 self.iterator = func()
369 369 return self.iterator
370 370
371 371 def __exit__(self, type, exc, tb):
372 372 if not self.iterator:
373 373 return
374 374
375 375 if exc:
376 376 # If exiting or interrupted, do not attempt to seek the stream in
377 377 # the finally block below. This makes abort faster.
378 378 if (self.current and
379 379 not isinstance(exc, (SystemExit, KeyboardInterrupt))):
380 380 # consume the part content to not corrupt the stream.
381 381 self.current.seek(0, 2)
382 382
383 383 # Any exceptions seeking to the end of the bundle at this point are
384 384 # almost certainly related to the underlying stream being bad.
385 385 # And, chances are that the exception we're handling is related to
386 386 # getting in that bad state. So, we swallow the seeking error and
387 387 # re-raise the original error.
388 388 seekerror = False
389 389 try:
390 390 for part in self.iterator:
391 391 # consume the bundle content
392 392 part.seek(0, 2)
393 393 except Exception:
394 394 seekerror = True
395 395
396 396 # Small hack to let caller code distinguish exceptions from bundle2
397 397 # processing from processing the old format. This is mostly needed
398 398 # to handle different return codes to unbundle according to the type
399 399 # of bundle. We should probably clean up or drop this return code
400 400 # craziness in a future version.
401 401 exc.duringunbundle2 = True
402 402 salvaged = []
403 403 replycaps = None
404 404 if self.op.reply is not None:
405 405 salvaged = self.op.reply.salvageoutput()
406 406 replycaps = self.op.reply.capabilities
407 407 exc._replycaps = replycaps
408 408 exc._bundle2salvagedoutput = salvaged
409 409
410 410 # Re-raising from a variable loses the original stack. So only use
411 411 # that form if we need to.
412 412 if seekerror:
413 413 raise exc
414 414
415 415 self.repo.ui.debug('bundle2-input-bundle: %i parts total\n' %
416 416 self.count)
417 417
418 418 def processbundle(repo, unbundler, transactiongetter=None, op=None):
419 419 """This function process a bundle, apply effect to/from a repo
420 420
421 421 It iterates over each part then searches for and uses the proper handling
422 422 code to process the part. Parts are processed in order.
423 423
424 424 Unknown Mandatory part will abort the process.
425 425
426 426 It is temporarily possible to provide a prebuilt bundleoperation to the
427 427 function. This is used to ensure output is properly propagated in case of
428 428 an error during the unbundling. This output capturing part will likely be
429 429 reworked and this ability will probably go away in the process.
430 430 """
431 431 if op is None:
432 432 if transactiongetter is None:
433 433 transactiongetter = _notransaction
434 434 op = bundleoperation(repo, transactiongetter)
435 435 # todo:
436 436 # - replace this is a init function soon.
437 437 # - exception catching
438 438 unbundler.params
439 439 if repo.ui.debugflag:
440 440 msg = ['bundle2-input-bundle:']
441 441 if unbundler.params:
442 442 msg.append(' %i params' % len(unbundler.params))
443 443 if op._gettransaction is None or op._gettransaction is _notransaction:
444 444 msg.append(' no-transaction')
445 445 else:
446 446 msg.append(' with-transaction')
447 447 msg.append('\n')
448 448 repo.ui.debug(''.join(msg))
449 449
450 450 with partiterator(repo, op, unbundler) as parts:
451 451 for part in parts:
452 452 _processpart(op, part)
453 453
454 454 return op
455 455
456 456 def _processchangegroup(op, cg, tr, source, url, **kwargs):
457 457 ret = cg.apply(op.repo, tr, source, url, **kwargs)
458 458 op.records.add('changegroup', {
459 459 'return': ret,
460 460 })
461 461 return ret
462 462
463 def _gethandler(op, part):
464 status = 'unknown' # used by debug output
465 try:
466 handler = parthandlermapping.get(part.type)
467 if handler is None:
468 status = 'unsupported-type'
469 raise error.BundleUnknownFeatureError(parttype=part.type)
470 indebug(op.ui, 'found a handler for part %r' % part.type)
471 unknownparams = part.mandatorykeys - handler.params
472 if unknownparams:
473 unknownparams = list(unknownparams)
474 unknownparams.sort()
475 status = 'unsupported-params (%s)' % unknownparams
476 raise error.BundleUnknownFeatureError(parttype=part.type,
477 params=unknownparams)
478 status = 'supported'
479 except error.BundleUnknownFeatureError as exc:
480 if part.mandatory: # mandatory parts
481 raise
482 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
483 return # skip to part processing
484 finally:
485 if op.ui.debugflag:
486 msg = ['bundle2-input-part: "%s"' % part.type]
487 if not part.mandatory:
488 msg.append(' (advisory)')
489 nbmp = len(part.mandatorykeys)
490 nbap = len(part.params) - nbmp
491 if nbmp or nbap:
492 msg.append(' (params:')
493 if nbmp:
494 msg.append(' %i mandatory' % nbmp)
495 if nbap:
496 msg.append(' %i advisory' % nbmp)
497 msg.append(')')
498 msg.append(' %s\n' % status)
499 op.ui.debug(''.join(msg))
500
501 return handler
502
463 503 def _processpart(op, part):
464 504 """process a single part from a bundle
465 505
466 506 The part is guaranteed to have been fully consumed when the function exits
467 507 (even if an exception is raised)."""
468 status = 'unknown' # used by debug output
469 508 try:
470 try:
471 handler = parthandlermapping.get(part.type)
472 if handler is None:
473 status = 'unsupported-type'
474 raise error.BundleUnknownFeatureError(parttype=part.type)
475 indebug(op.ui, 'found a handler for part %r' % part.type)
476 unknownparams = part.mandatorykeys - handler.params
477 if unknownparams:
478 unknownparams = list(unknownparams)
479 unknownparams.sort()
480 status = 'unsupported-params (%s)' % unknownparams
481 raise error.BundleUnknownFeatureError(parttype=part.type,
482 params=unknownparams)
483 status = 'supported'
484 except error.BundleUnknownFeatureError as exc:
485 if part.mandatory: # mandatory parts
486 raise
487 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
488 return # skip to part processing
489 finally:
490 if op.ui.debugflag:
491 msg = ['bundle2-input-part: "%s"' % part.type]
492 if not part.mandatory:
493 msg.append(' (advisory)')
494 nbmp = len(part.mandatorykeys)
495 nbap = len(part.params) - nbmp
496 if nbmp or nbap:
497 msg.append(' (params:')
498 if nbmp:
499 msg.append(' %i mandatory' % nbmp)
500 if nbap:
501 msg.append(' %i advisory' % nbmp)
502 msg.append(')')
503 msg.append(' %s\n' % status)
504 op.ui.debug(''.join(msg))
509 handler = _gethandler(op, part)
510 if handler is None:
511 return
505 512
506 513 # handler is called outside the above try block so that we don't
507 514 # risk catching KeyErrors from anything other than the
508 515 # parthandlermapping lookup (any KeyError raised by handler()
509 516 # itself represents a defect of a different variety).
510 517 output = None
511 518 if op.captureoutput and op.reply is not None:
512 519 op.ui.pushbuffer(error=True, subproc=True)
513 520 output = ''
514 521 try:
515 522 handler(op, part)
516 523 finally:
517 524 if output is not None:
518 525 output = op.ui.popbuffer()
519 526 if output:
520 527 outpart = op.reply.newpart('output', data=output,
521 528 mandatory=False)
522 529 outpart.addparam(
523 530 'in-reply-to', pycompat.bytestr(part.id), mandatory=False)
524 531 finally:
525 532 pass
526 533
527 534
528 535 def decodecaps(blob):
529 536 """decode a bundle2 caps bytes blob into a dictionary
530 537
531 538 The blob is a list of capabilities (one per line)
532 539 Capabilities may have values using a line of the form::
533 540
534 541 capability=value1,value2,value3
535 542
536 543 The values are always a list."""
537 544 caps = {}
538 545 for line in blob.splitlines():
539 546 if not line:
540 547 continue
541 548 if '=' not in line:
542 549 key, vals = line, ()
543 550 else:
544 551 key, vals = line.split('=', 1)
545 552 vals = vals.split(',')
546 553 key = urlreq.unquote(key)
547 554 vals = [urlreq.unquote(v) for v in vals]
548 555 caps[key] = vals
549 556 return caps
550 557
551 558 def encodecaps(caps):
552 559 """encode a bundle2 caps dictionary into a bytes blob"""
553 560 chunks = []
554 561 for ca in sorted(caps):
555 562 vals = caps[ca]
556 563 ca = urlreq.quote(ca)
557 564 vals = [urlreq.quote(v) for v in vals]
558 565 if vals:
559 566 ca = "%s=%s" % (ca, ','.join(vals))
560 567 chunks.append(ca)
561 568 return '\n'.join(chunks)
562 569
563 570 bundletypes = {
564 571 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
565 572 # since the unification ssh accepts a header but there
566 573 # is no capability signaling it.
567 574 "HG20": (), # special-cased below
568 575 "HG10UN": ("HG10UN", 'UN'),
569 576 "HG10BZ": ("HG10", 'BZ'),
570 577 "HG10GZ": ("HG10GZ", 'GZ'),
571 578 }
572 579
573 580 # hgweb uses this list to communicate its preferred type
574 581 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
575 582
576 583 class bundle20(object):
577 584 """represent an outgoing bundle2 container
578 585
579 586 Use the `addparam` method to add stream level parameter. and `newpart` to
580 587 populate it. Then call `getchunks` to retrieve all the binary chunks of
581 588 data that compose the bundle2 container."""
582 589
583 590 _magicstring = 'HG20'
584 591
585 592 def __init__(self, ui, capabilities=()):
586 593 self.ui = ui
587 594 self._params = []
588 595 self._parts = []
589 596 self.capabilities = dict(capabilities)
590 597 self._compengine = util.compengines.forbundletype('UN')
591 598 self._compopts = None
592 599
593 600 def setcompression(self, alg, compopts=None):
594 601 """setup core part compression to <alg>"""
595 602 if alg in (None, 'UN'):
596 603 return
597 604 assert not any(n.lower() == 'compression' for n, v in self._params)
598 605 self.addparam('Compression', alg)
599 606 self._compengine = util.compengines.forbundletype(alg)
600 607 self._compopts = compopts
601 608
602 609 @property
603 610 def nbparts(self):
604 611 """total number of parts added to the bundler"""
605 612 return len(self._parts)
606 613
607 614 # methods used to defines the bundle2 content
608 615 def addparam(self, name, value=None):
609 616 """add a stream level parameter"""
610 617 if not name:
611 618 raise ValueError('empty parameter name')
612 619 if name[0] not in pycompat.bytestr(string.ascii_letters):
613 620 raise ValueError('non letter first character: %r' % name)
614 621 self._params.append((name, value))
615 622
616 623 def addpart(self, part):
617 624 """add a new part to the bundle2 container
618 625
619 626 Parts contains the actual applicative payload."""
620 627 assert part.id is None
621 628 part.id = len(self._parts) # very cheap counter
622 629 self._parts.append(part)
623 630
624 631 def newpart(self, typeid, *args, **kwargs):
625 632 """create a new part and add it to the containers
626 633
627 634 As the part is directly added to the containers. For now, this means
628 635 that any failure to properly initialize the part after calling
629 636 ``newpart`` should result in a failure of the whole bundling process.
630 637
631 638 You can still fall back to manually create and add if you need better
632 639 control."""
633 640 part = bundlepart(typeid, *args, **kwargs)
634 641 self.addpart(part)
635 642 return part
636 643
637 644 # methods used to generate the bundle2 stream
638 645 def getchunks(self):
639 646 if self.ui.debugflag:
640 647 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
641 648 if self._params:
642 649 msg.append(' (%i params)' % len(self._params))
643 650 msg.append(' %i parts total\n' % len(self._parts))
644 651 self.ui.debug(''.join(msg))
645 652 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
646 653 yield self._magicstring
647 654 param = self._paramchunk()
648 655 outdebug(self.ui, 'bundle parameter: %s' % param)
649 656 yield _pack(_fstreamparamsize, len(param))
650 657 if param:
651 658 yield param
652 659 for chunk in self._compengine.compressstream(self._getcorechunk(),
653 660 self._compopts):
654 661 yield chunk
655 662
656 663 def _paramchunk(self):
657 664 """return a encoded version of all stream parameters"""
658 665 blocks = []
659 666 for par, value in self._params:
660 667 par = urlreq.quote(par)
661 668 if value is not None:
662 669 value = urlreq.quote(value)
663 670 par = '%s=%s' % (par, value)
664 671 blocks.append(par)
665 672 return ' '.join(blocks)
666 673
667 674 def _getcorechunk(self):
668 675 """yield chunk for the core part of the bundle
669 676
670 677 (all but headers and parameters)"""
671 678 outdebug(self.ui, 'start of parts')
672 679 for part in self._parts:
673 680 outdebug(self.ui, 'bundle part: "%s"' % part.type)
674 681 for chunk in part.getchunks(ui=self.ui):
675 682 yield chunk
676 683 outdebug(self.ui, 'end of bundle')
677 684 yield _pack(_fpartheadersize, 0)
678 685
679 686
680 687 def salvageoutput(self):
681 688 """return a list with a copy of all output parts in the bundle
682 689
683 690 This is meant to be used during error handling to make sure we preserve
684 691 server output"""
685 692 salvaged = []
686 693 for part in self._parts:
687 694 if part.type.startswith('output'):
688 695 salvaged.append(part.copy())
689 696 return salvaged
690 697
691 698
692 699 class unpackermixin(object):
693 700 """A mixin to extract bytes and struct data from a stream"""
694 701
695 702 def __init__(self, fp):
696 703 self._fp = fp
697 704
698 705 def _unpack(self, format):
699 706 """unpack this struct format from the stream
700 707
701 708 This method is meant for internal usage by the bundle2 protocol only.
702 709 They directly manipulate the low level stream including bundle2 level
703 710 instruction.
704 711
705 712 Do not use it to implement higher-level logic or methods."""
706 713 data = self._readexact(struct.calcsize(format))
707 714 return _unpack(format, data)
708 715
709 716 def _readexact(self, size):
710 717 """read exactly <size> bytes from the stream
711 718
712 719 This method is meant for internal usage by the bundle2 protocol only.
713 720 They directly manipulate the low level stream including bundle2 level
714 721 instruction.
715 722
716 723 Do not use it to implement higher-level logic or methods."""
717 724 return changegroup.readexactly(self._fp, size)
718 725
719 726 def getunbundler(ui, fp, magicstring=None):
720 727 """return a valid unbundler object for a given magicstring"""
721 728 if magicstring is None:
722 729 magicstring = changegroup.readexactly(fp, 4)
723 730 magic, version = magicstring[0:2], magicstring[2:4]
724 731 if magic != 'HG':
725 732 ui.debug(
726 733 "error: invalid magic: %r (version %r), should be 'HG'\n"
727 734 % (magic, version))
728 735 raise error.Abort(_('not a Mercurial bundle'))
729 736 unbundlerclass = formatmap.get(version)
730 737 if unbundlerclass is None:
731 738 raise error.Abort(_('unknown bundle version %s') % version)
732 739 unbundler = unbundlerclass(ui, fp)
733 740 indebug(ui, 'start processing of %s stream' % magicstring)
734 741 return unbundler
735 742
736 743 class unbundle20(unpackermixin):
737 744 """interpret a bundle2 stream
738 745
739 746 This class is fed with a binary stream and yields parts through its
740 747 `iterparts` methods."""
741 748
742 749 _magicstring = 'HG20'
743 750
744 751 def __init__(self, ui, fp):
745 752 """If header is specified, we do not read it out of the stream."""
746 753 self.ui = ui
747 754 self._compengine = util.compengines.forbundletype('UN')
748 755 self._compressed = None
749 756 super(unbundle20, self).__init__(fp)
750 757
751 758 @util.propertycache
752 759 def params(self):
753 760 """dictionary of stream level parameters"""
754 761 indebug(self.ui, 'reading bundle2 stream parameters')
755 762 params = {}
756 763 paramssize = self._unpack(_fstreamparamsize)[0]
757 764 if paramssize < 0:
758 765 raise error.BundleValueError('negative bundle param size: %i'
759 766 % paramssize)
760 767 if paramssize:
761 768 params = self._readexact(paramssize)
762 769 params = self._processallparams(params)
763 770 return params
764 771
765 772 def _processallparams(self, paramsblock):
766 773 """"""
767 774 params = util.sortdict()
768 775 for p in paramsblock.split(' '):
769 776 p = p.split('=', 1)
770 777 p = [urlreq.unquote(i) for i in p]
771 778 if len(p) < 2:
772 779 p.append(None)
773 780 self._processparam(*p)
774 781 params[p[0]] = p[1]
775 782 return params
776 783
777 784
778 785 def _processparam(self, name, value):
779 786 """process a parameter, applying its effect if needed
780 787
781 788 Parameter starting with a lower case letter are advisory and will be
782 789 ignored when unknown. Those starting with an upper case letter are
783 790 mandatory and will this function will raise a KeyError when unknown.
784 791
785 792 Note: no option are currently supported. Any input will be either
786 793 ignored or failing.
787 794 """
788 795 if not name:
789 796 raise ValueError('empty parameter name')
790 797 if name[0] not in pycompat.bytestr(string.ascii_letters):
791 798 raise ValueError('non letter first character: %r' % name)
792 799 try:
793 800 handler = b2streamparamsmap[name.lower()]
794 801 except KeyError:
795 802 if name[0].islower():
796 803 indebug(self.ui, "ignoring unknown parameter %r" % name)
797 804 else:
798 805 raise error.BundleUnknownFeatureError(params=(name,))
799 806 else:
800 807 handler(self, name, value)
801 808
802 809 def _forwardchunks(self):
803 810 """utility to transfer a bundle2 as binary
804 811
805 812 This is made necessary by the fact the 'getbundle' command over 'ssh'
806 813 have no way to know then the reply end, relying on the bundle to be
807 814 interpreted to know its end. This is terrible and we are sorry, but we
808 815 needed to move forward to get general delta enabled.
809 816 """
810 817 yield self._magicstring
811 818 assert 'params' not in vars(self)
812 819 paramssize = self._unpack(_fstreamparamsize)[0]
813 820 if paramssize < 0:
814 821 raise error.BundleValueError('negative bundle param size: %i'
815 822 % paramssize)
816 823 yield _pack(_fstreamparamsize, paramssize)
817 824 if paramssize:
818 825 params = self._readexact(paramssize)
819 826 self._processallparams(params)
820 827 yield params
821 828 assert self._compengine.bundletype == 'UN'
822 829 # From there, payload might need to be decompressed
823 830 self._fp = self._compengine.decompressorreader(self._fp)
824 831 emptycount = 0
825 832 while emptycount < 2:
826 833 # so we can brainlessly loop
827 834 assert _fpartheadersize == _fpayloadsize
828 835 size = self._unpack(_fpartheadersize)[0]
829 836 yield _pack(_fpartheadersize, size)
830 837 if size:
831 838 emptycount = 0
832 839 else:
833 840 emptycount += 1
834 841 continue
835 842 if size == flaginterrupt:
836 843 continue
837 844 elif size < 0:
838 845 raise error.BundleValueError('negative chunk size: %i')
839 846 yield self._readexact(size)
840 847
841 848
842 849 def iterparts(self):
843 850 """yield all parts contained in the stream"""
844 851 # make sure param have been loaded
845 852 self.params
846 853 # From there, payload need to be decompressed
847 854 self._fp = self._compengine.decompressorreader(self._fp)
848 855 indebug(self.ui, 'start extraction of bundle2 parts')
849 856 headerblock = self._readpartheader()
850 857 while headerblock is not None:
851 858 part = unbundlepart(self.ui, headerblock, self._fp)
852 859 yield part
853 860 # Seek to the end of the part to force it's consumption so the next
854 861 # part can be read. But then seek back to the beginning so the
855 862 # code consuming this generator has a part that starts at 0.
856 863 part.seek(0, 2)
857 864 part.seek(0)
858 865 headerblock = self._readpartheader()
859 866 indebug(self.ui, 'end of bundle2 stream')
860 867
861 868 def _readpartheader(self):
862 869 """reads a part header size and return the bytes blob
863 870
864 871 returns None if empty"""
865 872 headersize = self._unpack(_fpartheadersize)[0]
866 873 if headersize < 0:
867 874 raise error.BundleValueError('negative part header size: %i'
868 875 % headersize)
869 876 indebug(self.ui, 'part header size: %i' % headersize)
870 877 if headersize:
871 878 return self._readexact(headersize)
872 879 return None
873 880
874 881 def compressed(self):
875 882 self.params # load params
876 883 return self._compressed
877 884
878 885 def close(self):
879 886 """close underlying file"""
880 887 if util.safehasattr(self._fp, 'close'):
881 888 return self._fp.close()
882 889
883 890 formatmap = {'20': unbundle20}
884 891
885 892 b2streamparamsmap = {}
886 893
887 894 def b2streamparamhandler(name):
888 895 """register a handler for a stream level parameter"""
889 896 def decorator(func):
890 897 assert name not in formatmap
891 898 b2streamparamsmap[name] = func
892 899 return func
893 900 return decorator
894 901
895 902 @b2streamparamhandler('compression')
896 903 def processcompression(unbundler, param, value):
897 904 """read compression parameter and install payload decompression"""
898 905 if value not in util.compengines.supportedbundletypes:
899 906 raise error.BundleUnknownFeatureError(params=(param,),
900 907 values=(value,))
901 908 unbundler._compengine = util.compengines.forbundletype(value)
902 909 if value is not None:
903 910 unbundler._compressed = True
904 911
905 912 class bundlepart(object):
906 913 """A bundle2 part contains application level payload
907 914
908 915 The part `type` is used to route the part to the application level
909 916 handler.
910 917
911 918 The part payload is contained in ``part.data``. It could be raw bytes or a
912 919 generator of byte chunks.
913 920
914 921 You can add parameters to the part using the ``addparam`` method.
915 922 Parameters can be either mandatory (default) or advisory. Remote side
916 923 should be able to safely ignore the advisory ones.
917 924
918 925 Both data and parameters cannot be modified after the generation has begun.
919 926 """
920 927
921 928 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
922 929 data='', mandatory=True):
923 930 validateparttype(parttype)
924 931 self.id = None
925 932 self.type = parttype
926 933 self._data = data
927 934 self._mandatoryparams = list(mandatoryparams)
928 935 self._advisoryparams = list(advisoryparams)
929 936 # checking for duplicated entries
930 937 self._seenparams = set()
931 938 for pname, __ in self._mandatoryparams + self._advisoryparams:
932 939 if pname in self._seenparams:
933 940 raise error.ProgrammingError('duplicated params: %s' % pname)
934 941 self._seenparams.add(pname)
935 942 # status of the part's generation:
936 943 # - None: not started,
937 944 # - False: currently generated,
938 945 # - True: generation done.
939 946 self._generated = None
940 947 self.mandatory = mandatory
941 948
942 949 def __repr__(self):
943 950 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
944 951 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
945 952 % (cls, id(self), self.id, self.type, self.mandatory))
946 953
947 954 def copy(self):
948 955 """return a copy of the part
949 956
950 957 The new part have the very same content but no partid assigned yet.
951 958 Parts with generated data cannot be copied."""
952 959 assert not util.safehasattr(self.data, 'next')
953 960 return self.__class__(self.type, self._mandatoryparams,
954 961 self._advisoryparams, self._data, self.mandatory)
955 962
956 963 # methods used to defines the part content
957 964 @property
958 965 def data(self):
959 966 return self._data
960 967
961 968 @data.setter
962 969 def data(self, data):
963 970 if self._generated is not None:
964 971 raise error.ReadOnlyPartError('part is being generated')
965 972 self._data = data
966 973
967 974 @property
968 975 def mandatoryparams(self):
969 976 # make it an immutable tuple to force people through ``addparam``
970 977 return tuple(self._mandatoryparams)
971 978
972 979 @property
973 980 def advisoryparams(self):
974 981 # make it an immutable tuple to force people through ``addparam``
975 982 return tuple(self._advisoryparams)
976 983
977 984 def addparam(self, name, value='', mandatory=True):
978 985 """add a parameter to the part
979 986
980 987 If 'mandatory' is set to True, the remote handler must claim support
981 988 for this parameter or the unbundling will be aborted.
982 989
983 990 The 'name' and 'value' cannot exceed 255 bytes each.
984 991 """
985 992 if self._generated is not None:
986 993 raise error.ReadOnlyPartError('part is being generated')
987 994 if name in self._seenparams:
988 995 raise ValueError('duplicated params: %s' % name)
989 996 self._seenparams.add(name)
990 997 params = self._advisoryparams
991 998 if mandatory:
992 999 params = self._mandatoryparams
993 1000 params.append((name, value))
994 1001
995 1002 # methods used to generates the bundle2 stream
996 1003 def getchunks(self, ui):
997 1004 if self._generated is not None:
998 1005 raise error.ProgrammingError('part can only be consumed once')
999 1006 self._generated = False
1000 1007
1001 1008 if ui.debugflag:
1002 1009 msg = ['bundle2-output-part: "%s"' % self.type]
1003 1010 if not self.mandatory:
1004 1011 msg.append(' (advisory)')
1005 1012 nbmp = len(self.mandatoryparams)
1006 1013 nbap = len(self.advisoryparams)
1007 1014 if nbmp or nbap:
1008 1015 msg.append(' (params:')
1009 1016 if nbmp:
1010 1017 msg.append(' %i mandatory' % nbmp)
1011 1018 if nbap:
1012 1019 msg.append(' %i advisory' % nbmp)
1013 1020 msg.append(')')
1014 1021 if not self.data:
1015 1022 msg.append(' empty payload')
1016 1023 elif util.safehasattr(self.data, 'next'):
1017 1024 msg.append(' streamed payload')
1018 1025 else:
1019 1026 msg.append(' %i bytes payload' % len(self.data))
1020 1027 msg.append('\n')
1021 1028 ui.debug(''.join(msg))
1022 1029
1023 1030 #### header
1024 1031 if self.mandatory:
1025 1032 parttype = self.type.upper()
1026 1033 else:
1027 1034 parttype = self.type.lower()
1028 1035 outdebug(ui, 'part %s: "%s"' % (pycompat.bytestr(self.id), parttype))
1029 1036 ## parttype
1030 1037 header = [_pack(_fparttypesize, len(parttype)),
1031 1038 parttype, _pack(_fpartid, self.id),
1032 1039 ]
1033 1040 ## parameters
1034 1041 # count
1035 1042 manpar = self.mandatoryparams
1036 1043 advpar = self.advisoryparams
1037 1044 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
1038 1045 # size
1039 1046 parsizes = []
1040 1047 for key, value in manpar:
1041 1048 parsizes.append(len(key))
1042 1049 parsizes.append(len(value))
1043 1050 for key, value in advpar:
1044 1051 parsizes.append(len(key))
1045 1052 parsizes.append(len(value))
1046 1053 paramsizes = _pack(_makefpartparamsizes(len(parsizes) // 2), *parsizes)
1047 1054 header.append(paramsizes)
1048 1055 # key, value
1049 1056 for key, value in manpar:
1050 1057 header.append(key)
1051 1058 header.append(value)
1052 1059 for key, value in advpar:
1053 1060 header.append(key)
1054 1061 header.append(value)
1055 1062 ## finalize header
1056 1063 try:
1057 1064 headerchunk = ''.join(header)
1058 1065 except TypeError:
1059 1066 raise TypeError(r'Found a non-bytes trying to '
1060 1067 r'build bundle part header: %r' % header)
1061 1068 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
1062 1069 yield _pack(_fpartheadersize, len(headerchunk))
1063 1070 yield headerchunk
1064 1071 ## payload
1065 1072 try:
1066 1073 for chunk in self._payloadchunks():
1067 1074 outdebug(ui, 'payload chunk size: %i' % len(chunk))
1068 1075 yield _pack(_fpayloadsize, len(chunk))
1069 1076 yield chunk
1070 1077 except GeneratorExit:
1071 1078 # GeneratorExit means that nobody is listening for our
1072 1079 # results anyway, so just bail quickly rather than trying
1073 1080 # to produce an error part.
1074 1081 ui.debug('bundle2-generatorexit\n')
1075 1082 raise
1076 1083 except BaseException as exc:
1077 1084 bexc = util.forcebytestr(exc)
1078 1085 # backup exception data for later
1079 1086 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
1080 1087 % bexc)
1081 1088 tb = sys.exc_info()[2]
1082 1089 msg = 'unexpected error: %s' % bexc
1083 1090 interpart = bundlepart('error:abort', [('message', msg)],
1084 1091 mandatory=False)
1085 1092 interpart.id = 0
1086 1093 yield _pack(_fpayloadsize, -1)
1087 1094 for chunk in interpart.getchunks(ui=ui):
1088 1095 yield chunk
1089 1096 outdebug(ui, 'closing payload chunk')
1090 1097 # abort current part payload
1091 1098 yield _pack(_fpayloadsize, 0)
1092 1099 pycompat.raisewithtb(exc, tb)
1093 1100 # end of payload
1094 1101 outdebug(ui, 'closing payload chunk')
1095 1102 yield _pack(_fpayloadsize, 0)
1096 1103 self._generated = True
1097 1104
1098 1105 def _payloadchunks(self):
1099 1106 """yield chunks of a the part payload
1100 1107
1101 1108 Exists to handle the different methods to provide data to a part."""
1102 1109 # we only support fixed size data now.
1103 1110 # This will be improved in the future.
1104 1111 if (util.safehasattr(self.data, 'next')
1105 1112 or util.safehasattr(self.data, '__next__')):
1106 1113 buff = util.chunkbuffer(self.data)
1107 1114 chunk = buff.read(preferedchunksize)
1108 1115 while chunk:
1109 1116 yield chunk
1110 1117 chunk = buff.read(preferedchunksize)
1111 1118 elif len(self.data):
1112 1119 yield self.data
1113 1120
1114 1121
1115 1122 flaginterrupt = -1
1116 1123
1117 1124 class interrupthandler(unpackermixin):
1118 1125 """read one part and process it with restricted capability
1119 1126
1120 1127 This allows to transmit exception raised on the producer size during part
1121 1128 iteration while the consumer is reading a part.
1122 1129
1123 1130 Part processed in this manner only have access to a ui object,"""
1124 1131
1125 1132 def __init__(self, ui, fp):
1126 1133 super(interrupthandler, self).__init__(fp)
1127 1134 self.ui = ui
1128 1135
1129 1136 def _readpartheader(self):
1130 1137 """reads a part header size and return the bytes blob
1131 1138
1132 1139 returns None if empty"""
1133 1140 headersize = self._unpack(_fpartheadersize)[0]
1134 1141 if headersize < 0:
1135 1142 raise error.BundleValueError('negative part header size: %i'
1136 1143 % headersize)
1137 1144 indebug(self.ui, 'part header size: %i\n' % headersize)
1138 1145 if headersize:
1139 1146 return self._readexact(headersize)
1140 1147 return None
1141 1148
1142 1149 def __call__(self):
1143 1150
1144 1151 self.ui.debug('bundle2-input-stream-interrupt:'
1145 1152 ' opening out of band context\n')
1146 1153 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1147 1154 headerblock = self._readpartheader()
1148 1155 if headerblock is None:
1149 1156 indebug(self.ui, 'no part found during interruption.')
1150 1157 return
1151 1158 part = unbundlepart(self.ui, headerblock, self._fp)
1152 1159 op = interruptoperation(self.ui)
1153 1160 hardabort = False
1154 1161 try:
1155 1162 _processpart(op, part)
1156 1163 except (SystemExit, KeyboardInterrupt):
1157 1164 hardabort = True
1158 1165 raise
1159 1166 finally:
1160 1167 if not hardabort:
1161 1168 part.seek(0, 2)
1162 1169 self.ui.debug('bundle2-input-stream-interrupt:'
1163 1170 ' closing out of band context\n')
1164 1171
1165 1172 class interruptoperation(object):
1166 1173 """A limited operation to be use by part handler during interruption
1167 1174
1168 1175 It only have access to an ui object.
1169 1176 """
1170 1177
1171 1178 def __init__(self, ui):
1172 1179 self.ui = ui
1173 1180 self.reply = None
1174 1181 self.captureoutput = False
1175 1182
1176 1183 @property
1177 1184 def repo(self):
1178 1185 raise error.ProgrammingError('no repo access from stream interruption')
1179 1186
1180 1187 def gettransaction(self):
1181 1188 raise TransactionUnavailable('no repo access from stream interruption')
1182 1189
1183 1190 class unbundlepart(unpackermixin):
1184 1191 """a bundle part read from a bundle"""
1185 1192
1186 1193 def __init__(self, ui, header, fp):
1187 1194 super(unbundlepart, self).__init__(fp)
1188 1195 self._seekable = (util.safehasattr(fp, 'seek') and
1189 1196 util.safehasattr(fp, 'tell'))
1190 1197 self.ui = ui
1191 1198 # unbundle state attr
1192 1199 self._headerdata = header
1193 1200 self._headeroffset = 0
1194 1201 self._initialized = False
1195 1202 self.consumed = False
1196 1203 # part data
1197 1204 self.id = None
1198 1205 self.type = None
1199 1206 self.mandatoryparams = None
1200 1207 self.advisoryparams = None
1201 1208 self.params = None
1202 1209 self.mandatorykeys = ()
1203 1210 self._payloadstream = None
1204 1211 self._readheader()
1205 1212 self._mandatory = None
1206 1213 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1207 1214 self._pos = 0
1208 1215
1209 1216 def _fromheader(self, size):
1210 1217 """return the next <size> byte from the header"""
1211 1218 offset = self._headeroffset
1212 1219 data = self._headerdata[offset:(offset + size)]
1213 1220 self._headeroffset = offset + size
1214 1221 return data
1215 1222
1216 1223 def _unpackheader(self, format):
1217 1224 """read given format from header
1218 1225
1219 1226 This automatically compute the size of the format to read."""
1220 1227 data = self._fromheader(struct.calcsize(format))
1221 1228 return _unpack(format, data)
1222 1229
1223 1230 def _initparams(self, mandatoryparams, advisoryparams):
1224 1231 """internal function to setup all logic related parameters"""
1225 1232 # make it read only to prevent people touching it by mistake.
1226 1233 self.mandatoryparams = tuple(mandatoryparams)
1227 1234 self.advisoryparams = tuple(advisoryparams)
1228 1235 # user friendly UI
1229 1236 self.params = util.sortdict(self.mandatoryparams)
1230 1237 self.params.update(self.advisoryparams)
1231 1238 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1232 1239
1233 1240 def _payloadchunks(self, chunknum=0):
1234 1241 '''seek to specified chunk and start yielding data'''
1235 1242 if len(self._chunkindex) == 0:
1236 1243 assert chunknum == 0, 'Must start with chunk 0'
1237 1244 self._chunkindex.append((0, self._tellfp()))
1238 1245 else:
1239 1246 assert chunknum < len(self._chunkindex), \
1240 1247 'Unknown chunk %d' % chunknum
1241 1248 self._seekfp(self._chunkindex[chunknum][1])
1242 1249
1243 1250 pos = self._chunkindex[chunknum][0]
1244 1251 payloadsize = self._unpack(_fpayloadsize)[0]
1245 1252 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1246 1253 while payloadsize:
1247 1254 if payloadsize == flaginterrupt:
1248 1255 # interruption detection, the handler will now read a
1249 1256 # single part and process it.
1250 1257 interrupthandler(self.ui, self._fp)()
1251 1258 elif payloadsize < 0:
1252 1259 msg = 'negative payload chunk size: %i' % payloadsize
1253 1260 raise error.BundleValueError(msg)
1254 1261 else:
1255 1262 result = self._readexact(payloadsize)
1256 1263 chunknum += 1
1257 1264 pos += payloadsize
1258 1265 if chunknum == len(self._chunkindex):
1259 1266 self._chunkindex.append((pos, self._tellfp()))
1260 1267 yield result
1261 1268 payloadsize = self._unpack(_fpayloadsize)[0]
1262 1269 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1263 1270
1264 1271 def _findchunk(self, pos):
1265 1272 '''for a given payload position, return a chunk number and offset'''
1266 1273 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1267 1274 if ppos == pos:
1268 1275 return chunk, 0
1269 1276 elif ppos > pos:
1270 1277 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1271 1278 raise ValueError('Unknown chunk')
1272 1279
1273 1280 def _readheader(self):
1274 1281 """read the header and setup the object"""
1275 1282 typesize = self._unpackheader(_fparttypesize)[0]
1276 1283 self.type = self._fromheader(typesize)
1277 1284 indebug(self.ui, 'part type: "%s"' % self.type)
1278 1285 self.id = self._unpackheader(_fpartid)[0]
1279 1286 indebug(self.ui, 'part id: "%s"' % pycompat.bytestr(self.id))
1280 1287 # extract mandatory bit from type
1281 1288 self.mandatory = (self.type != self.type.lower())
1282 1289 self.type = self.type.lower()
1283 1290 ## reading parameters
1284 1291 # param count
1285 1292 mancount, advcount = self._unpackheader(_fpartparamcount)
1286 1293 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1287 1294 # param size
1288 1295 fparamsizes = _makefpartparamsizes(mancount + advcount)
1289 1296 paramsizes = self._unpackheader(fparamsizes)
1290 1297 # make it a list of couple again
1291 1298 paramsizes = list(zip(paramsizes[::2], paramsizes[1::2]))
1292 1299 # split mandatory from advisory
1293 1300 mansizes = paramsizes[:mancount]
1294 1301 advsizes = paramsizes[mancount:]
1295 1302 # retrieve param value
1296 1303 manparams = []
1297 1304 for key, value in mansizes:
1298 1305 manparams.append((self._fromheader(key), self._fromheader(value)))
1299 1306 advparams = []
1300 1307 for key, value in advsizes:
1301 1308 advparams.append((self._fromheader(key), self._fromheader(value)))
1302 1309 self._initparams(manparams, advparams)
1303 1310 ## part payload
1304 1311 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1305 1312 # we read the data, tell it
1306 1313 self._initialized = True
1307 1314
1308 1315 def read(self, size=None):
1309 1316 """read payload data"""
1310 1317 if not self._initialized:
1311 1318 self._readheader()
1312 1319 if size is None:
1313 1320 data = self._payloadstream.read()
1314 1321 else:
1315 1322 data = self._payloadstream.read(size)
1316 1323 self._pos += len(data)
1317 1324 if size is None or len(data) < size:
1318 1325 if not self.consumed and self._pos:
1319 1326 self.ui.debug('bundle2-input-part: total payload size %i\n'
1320 1327 % self._pos)
1321 1328 self.consumed = True
1322 1329 return data
1323 1330
1324 1331 def tell(self):
1325 1332 return self._pos
1326 1333
1327 1334 def seek(self, offset, whence=0):
1328 1335 if whence == 0:
1329 1336 newpos = offset
1330 1337 elif whence == 1:
1331 1338 newpos = self._pos + offset
1332 1339 elif whence == 2:
1333 1340 if not self.consumed:
1334 1341 self.read()
1335 1342 newpos = self._chunkindex[-1][0] - offset
1336 1343 else:
1337 1344 raise ValueError('Unknown whence value: %r' % (whence,))
1338 1345
1339 1346 if newpos > self._chunkindex[-1][0] and not self.consumed:
1340 1347 self.read()
1341 1348 if not 0 <= newpos <= self._chunkindex[-1][0]:
1342 1349 raise ValueError('Offset out of range')
1343 1350
1344 1351 if self._pos != newpos:
1345 1352 chunk, internaloffset = self._findchunk(newpos)
1346 1353 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1347 1354 adjust = self.read(internaloffset)
1348 1355 if len(adjust) != internaloffset:
1349 1356 raise error.Abort(_('Seek failed\n'))
1350 1357 self._pos = newpos
1351 1358
1352 1359 def _seekfp(self, offset, whence=0):
1353 1360 """move the underlying file pointer
1354 1361
1355 1362 This method is meant for internal usage by the bundle2 protocol only.
1356 1363 They directly manipulate the low level stream including bundle2 level
1357 1364 instruction.
1358 1365
1359 1366 Do not use it to implement higher-level logic or methods."""
1360 1367 if self._seekable:
1361 1368 return self._fp.seek(offset, whence)
1362 1369 else:
1363 1370 raise NotImplementedError(_('File pointer is not seekable'))
1364 1371
1365 1372 def _tellfp(self):
1366 1373 """return the file offset, or None if file is not seekable
1367 1374
1368 1375 This method is meant for internal usage by the bundle2 protocol only.
1369 1376 They directly manipulate the low level stream including bundle2 level
1370 1377 instruction.
1371 1378
1372 1379 Do not use it to implement higher-level logic or methods."""
1373 1380 if self._seekable:
1374 1381 try:
1375 1382 return self._fp.tell()
1376 1383 except IOError as e:
1377 1384 if e.errno == errno.ESPIPE:
1378 1385 self._seekable = False
1379 1386 else:
1380 1387 raise
1381 1388 return None
1382 1389
1383 1390 # These are only the static capabilities.
1384 1391 # Check the 'getrepocaps' function for the rest.
1385 1392 capabilities = {'HG20': (),
1386 1393 'error': ('abort', 'unsupportedcontent', 'pushraced',
1387 1394 'pushkey'),
1388 1395 'listkeys': (),
1389 1396 'pushkey': (),
1390 1397 'digests': tuple(sorted(util.DIGESTS.keys())),
1391 1398 'remote-changegroup': ('http', 'https'),
1392 1399 'hgtagsfnodes': (),
1393 1400 }
1394 1401
1395 1402 def getrepocaps(repo, allowpushback=False):
1396 1403 """return the bundle2 capabilities for a given repo
1397 1404
1398 1405 Exists to allow extensions (like evolution) to mutate the capabilities.
1399 1406 """
1400 1407 caps = capabilities.copy()
1401 1408 caps['changegroup'] = tuple(sorted(
1402 1409 changegroup.supportedincomingversions(repo)))
1403 1410 if obsolete.isenabled(repo, obsolete.exchangeopt):
1404 1411 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1405 1412 caps['obsmarkers'] = supportedformat
1406 1413 if allowpushback:
1407 1414 caps['pushback'] = ()
1408 1415 cpmode = repo.ui.config('server', 'concurrent-push-mode')
1409 1416 if cpmode == 'check-related':
1410 1417 caps['checkheads'] = ('related',)
1411 1418 return caps
1412 1419
1413 1420 def bundle2caps(remote):
1414 1421 """return the bundle capabilities of a peer as dict"""
1415 1422 raw = remote.capable('bundle2')
1416 1423 if not raw and raw != '':
1417 1424 return {}
1418 1425 capsblob = urlreq.unquote(remote.capable('bundle2'))
1419 1426 return decodecaps(capsblob)
1420 1427
1421 1428 def obsmarkersversion(caps):
1422 1429 """extract the list of supported obsmarkers versions from a bundle2caps dict
1423 1430 """
1424 1431 obscaps = caps.get('obsmarkers', ())
1425 1432 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1426 1433
1427 1434 def writenewbundle(ui, repo, source, filename, bundletype, outgoing, opts,
1428 1435 vfs=None, compression=None, compopts=None):
1429 1436 if bundletype.startswith('HG10'):
1430 1437 cg = changegroup.makechangegroup(repo, outgoing, '01', source)
1431 1438 return writebundle(ui, cg, filename, bundletype, vfs=vfs,
1432 1439 compression=compression, compopts=compopts)
1433 1440 elif not bundletype.startswith('HG20'):
1434 1441 raise error.ProgrammingError('unknown bundle type: %s' % bundletype)
1435 1442
1436 1443 caps = {}
1437 1444 if 'obsolescence' in opts:
1438 1445 caps['obsmarkers'] = ('V1',)
1439 1446 bundle = bundle20(ui, caps)
1440 1447 bundle.setcompression(compression, compopts)
1441 1448 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1442 1449 chunkiter = bundle.getchunks()
1443 1450
1444 1451 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1445 1452
1446 1453 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1447 1454 # We should eventually reconcile this logic with the one behind
1448 1455 # 'exchange.getbundle2partsgenerator'.
1449 1456 #
1450 1457 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1451 1458 # different right now. So we keep them separated for now for the sake of
1452 1459 # simplicity.
1453 1460
1454 1461 # we always want a changegroup in such bundle
1455 1462 cgversion = opts.get('cg.version')
1456 1463 if cgversion is None:
1457 1464 cgversion = changegroup.safeversion(repo)
1458 1465 cg = changegroup.makechangegroup(repo, outgoing, cgversion, source)
1459 1466 part = bundler.newpart('changegroup', data=cg.getchunks())
1460 1467 part.addparam('version', cg.version)
1461 1468 if 'clcount' in cg.extras:
1462 1469 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1463 1470 mandatory=False)
1464 1471 if opts.get('phases') and repo.revs('%ln and secret()',
1465 1472 outgoing.missingheads):
1466 1473 part.addparam('targetphase', '%d' % phases.secret, mandatory=False)
1467 1474
1468 1475 addparttagsfnodescache(repo, bundler, outgoing)
1469 1476
1470 1477 if opts.get('obsolescence', False):
1471 1478 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1472 1479 buildobsmarkerspart(bundler, obsmarkers)
1473 1480
1474 1481 if opts.get('phases', False):
1475 1482 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1476 1483 phasedata = []
1477 1484 for phase in phases.allphases:
1478 1485 for head in headsbyphase[phase]:
1479 1486 phasedata.append(_pack(_fphasesentry, phase, head))
1480 1487 bundler.newpart('phase-heads', data=''.join(phasedata))
1481 1488
1482 1489 def addparttagsfnodescache(repo, bundler, outgoing):
1483 1490 # we include the tags fnode cache for the bundle changeset
1484 1491 # (as an optional parts)
1485 1492 cache = tags.hgtagsfnodescache(repo.unfiltered())
1486 1493 chunks = []
1487 1494
1488 1495 # .hgtags fnodes are only relevant for head changesets. While we could
1489 1496 # transfer values for all known nodes, there will likely be little to
1490 1497 # no benefit.
1491 1498 #
1492 1499 # We don't bother using a generator to produce output data because
1493 1500 # a) we only have 40 bytes per head and even esoteric numbers of heads
1494 1501 # consume little memory (1M heads is 40MB) b) we don't want to send the
1495 1502 # part if we don't have entries and knowing if we have entries requires
1496 1503 # cache lookups.
1497 1504 for node in outgoing.missingheads:
1498 1505 # Don't compute missing, as this may slow down serving.
1499 1506 fnode = cache.getfnode(node, computemissing=False)
1500 1507 if fnode is not None:
1501 1508 chunks.extend([node, fnode])
1502 1509
1503 1510 if chunks:
1504 1511 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1505 1512
1506 1513 def buildobsmarkerspart(bundler, markers):
1507 1514 """add an obsmarker part to the bundler with <markers>
1508 1515
1509 1516 No part is created if markers is empty.
1510 1517 Raises ValueError if the bundler doesn't support any known obsmarker format.
1511 1518 """
1512 1519 if not markers:
1513 1520 return None
1514 1521
1515 1522 remoteversions = obsmarkersversion(bundler.capabilities)
1516 1523 version = obsolete.commonversion(remoteversions)
1517 1524 if version is None:
1518 1525 raise ValueError('bundler does not support common obsmarker format')
1519 1526 stream = obsolete.encodemarkers(markers, True, version=version)
1520 1527 return bundler.newpart('obsmarkers', data=stream)
1521 1528
1522 1529 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1523 1530 compopts=None):
1524 1531 """Write a bundle file and return its filename.
1525 1532
1526 1533 Existing files will not be overwritten.
1527 1534 If no filename is specified, a temporary file is created.
1528 1535 bz2 compression can be turned off.
1529 1536 The bundle file will be deleted in case of errors.
1530 1537 """
1531 1538
1532 1539 if bundletype == "HG20":
1533 1540 bundle = bundle20(ui)
1534 1541 bundle.setcompression(compression, compopts)
1535 1542 part = bundle.newpart('changegroup', data=cg.getchunks())
1536 1543 part.addparam('version', cg.version)
1537 1544 if 'clcount' in cg.extras:
1538 1545 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1539 1546 mandatory=False)
1540 1547 chunkiter = bundle.getchunks()
1541 1548 else:
1542 1549 # compression argument is only for the bundle2 case
1543 1550 assert compression is None
1544 1551 if cg.version != '01':
1545 1552 raise error.Abort(_('old bundle types only supports v1 '
1546 1553 'changegroups'))
1547 1554 header, comp = bundletypes[bundletype]
1548 1555 if comp not in util.compengines.supportedbundletypes:
1549 1556 raise error.Abort(_('unknown stream compression type: %s')
1550 1557 % comp)
1551 1558 compengine = util.compengines.forbundletype(comp)
1552 1559 def chunkiter():
1553 1560 yield header
1554 1561 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1555 1562 yield chunk
1556 1563 chunkiter = chunkiter()
1557 1564
1558 1565 # parse the changegroup data, otherwise we will block
1559 1566 # in case of sshrepo because we don't know the end of the stream
1560 1567 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1561 1568
1562 1569 def combinechangegroupresults(op):
1563 1570 """logic to combine 0 or more addchangegroup results into one"""
1564 1571 results = [r.get('return', 0)
1565 1572 for r in op.records['changegroup']]
1566 1573 changedheads = 0
1567 1574 result = 1
1568 1575 for ret in results:
1569 1576 # If any changegroup result is 0, return 0
1570 1577 if ret == 0:
1571 1578 result = 0
1572 1579 break
1573 1580 if ret < -1:
1574 1581 changedheads += ret + 1
1575 1582 elif ret > 1:
1576 1583 changedheads += ret - 1
1577 1584 if changedheads > 0:
1578 1585 result = 1 + changedheads
1579 1586 elif changedheads < 0:
1580 1587 result = -1 + changedheads
1581 1588 return result
1582 1589
1583 1590 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest',
1584 1591 'targetphase'))
1585 1592 def handlechangegroup(op, inpart):
1586 1593 """apply a changegroup part on the repo
1587 1594
1588 1595 This is a very early implementation that will massive rework before being
1589 1596 inflicted to any end-user.
1590 1597 """
1591 1598 tr = op.gettransaction()
1592 1599 unpackerversion = inpart.params.get('version', '01')
1593 1600 # We should raise an appropriate exception here
1594 1601 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1595 1602 # the source and url passed here are overwritten by the one contained in
1596 1603 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1597 1604 nbchangesets = None
1598 1605 if 'nbchanges' in inpart.params:
1599 1606 nbchangesets = int(inpart.params.get('nbchanges'))
1600 1607 if ('treemanifest' in inpart.params and
1601 1608 'treemanifest' not in op.repo.requirements):
1602 1609 if len(op.repo.changelog) != 0:
1603 1610 raise error.Abort(_(
1604 1611 "bundle contains tree manifests, but local repo is "
1605 1612 "non-empty and does not use tree manifests"))
1606 1613 op.repo.requirements.add('treemanifest')
1607 1614 op.repo._applyopenerreqs()
1608 1615 op.repo._writerequirements()
1609 1616 extrakwargs = {}
1610 1617 targetphase = inpart.params.get('targetphase')
1611 1618 if targetphase is not None:
1612 1619 extrakwargs['targetphase'] = int(targetphase)
1613 1620 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2',
1614 1621 expectedtotal=nbchangesets, **extrakwargs)
1615 1622 if op.reply is not None:
1616 1623 # This is definitely not the final form of this
1617 1624 # return. But one need to start somewhere.
1618 1625 part = op.reply.newpart('reply:changegroup', mandatory=False)
1619 1626 part.addparam(
1620 1627 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1621 1628 part.addparam('return', '%i' % ret, mandatory=False)
1622 1629 assert not inpart.read()
1623 1630
1624 1631 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1625 1632 ['digest:%s' % k for k in util.DIGESTS.keys()])
1626 1633 @parthandler('remote-changegroup', _remotechangegroupparams)
1627 1634 def handleremotechangegroup(op, inpart):
1628 1635 """apply a bundle10 on the repo, given an url and validation information
1629 1636
1630 1637 All the information about the remote bundle to import are given as
1631 1638 parameters. The parameters include:
1632 1639 - url: the url to the bundle10.
1633 1640 - size: the bundle10 file size. It is used to validate what was
1634 1641 retrieved by the client matches the server knowledge about the bundle.
1635 1642 - digests: a space separated list of the digest types provided as
1636 1643 parameters.
1637 1644 - digest:<digest-type>: the hexadecimal representation of the digest with
1638 1645 that name. Like the size, it is used to validate what was retrieved by
1639 1646 the client matches what the server knows about the bundle.
1640 1647
1641 1648 When multiple digest types are given, all of them are checked.
1642 1649 """
1643 1650 try:
1644 1651 raw_url = inpart.params['url']
1645 1652 except KeyError:
1646 1653 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1647 1654 parsed_url = util.url(raw_url)
1648 1655 if parsed_url.scheme not in capabilities['remote-changegroup']:
1649 1656 raise error.Abort(_('remote-changegroup does not support %s urls') %
1650 1657 parsed_url.scheme)
1651 1658
1652 1659 try:
1653 1660 size = int(inpart.params['size'])
1654 1661 except ValueError:
1655 1662 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1656 1663 % 'size')
1657 1664 except KeyError:
1658 1665 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1659 1666
1660 1667 digests = {}
1661 1668 for typ in inpart.params.get('digests', '').split():
1662 1669 param = 'digest:%s' % typ
1663 1670 try:
1664 1671 value = inpart.params[param]
1665 1672 except KeyError:
1666 1673 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1667 1674 param)
1668 1675 digests[typ] = value
1669 1676
1670 1677 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1671 1678
1672 1679 tr = op.gettransaction()
1673 1680 from . import exchange
1674 1681 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1675 1682 if not isinstance(cg, changegroup.cg1unpacker):
1676 1683 raise error.Abort(_('%s: not a bundle version 1.0') %
1677 1684 util.hidepassword(raw_url))
1678 1685 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2')
1679 1686 if op.reply is not None:
1680 1687 # This is definitely not the final form of this
1681 1688 # return. But one need to start somewhere.
1682 1689 part = op.reply.newpart('reply:changegroup')
1683 1690 part.addparam(
1684 1691 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1685 1692 part.addparam('return', '%i' % ret, mandatory=False)
1686 1693 try:
1687 1694 real_part.validate()
1688 1695 except error.Abort as e:
1689 1696 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1690 1697 (util.hidepassword(raw_url), str(e)))
1691 1698 assert not inpart.read()
1692 1699
1693 1700 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1694 1701 def handlereplychangegroup(op, inpart):
1695 1702 ret = int(inpart.params['return'])
1696 1703 replyto = int(inpart.params['in-reply-to'])
1697 1704 op.records.add('changegroup', {'return': ret}, replyto)
1698 1705
1699 1706 @parthandler('check:heads')
1700 1707 def handlecheckheads(op, inpart):
1701 1708 """check that head of the repo did not change
1702 1709
1703 1710 This is used to detect a push race when using unbundle.
1704 1711 This replaces the "heads" argument of unbundle."""
1705 1712 h = inpart.read(20)
1706 1713 heads = []
1707 1714 while len(h) == 20:
1708 1715 heads.append(h)
1709 1716 h = inpart.read(20)
1710 1717 assert not h
1711 1718 # Trigger a transaction so that we are guaranteed to have the lock now.
1712 1719 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1713 1720 op.gettransaction()
1714 1721 if sorted(heads) != sorted(op.repo.heads()):
1715 1722 raise error.PushRaced('repository changed while pushing - '
1716 1723 'please try again')
1717 1724
1718 1725 @parthandler('check:updated-heads')
1719 1726 def handlecheckupdatedheads(op, inpart):
1720 1727 """check for race on the heads touched by a push
1721 1728
1722 1729 This is similar to 'check:heads' but focus on the heads actually updated
1723 1730 during the push. If other activities happen on unrelated heads, it is
1724 1731 ignored.
1725 1732
1726 1733 This allow server with high traffic to avoid push contention as long as
1727 1734 unrelated parts of the graph are involved."""
1728 1735 h = inpart.read(20)
1729 1736 heads = []
1730 1737 while len(h) == 20:
1731 1738 heads.append(h)
1732 1739 h = inpart.read(20)
1733 1740 assert not h
1734 1741 # trigger a transaction so that we are guaranteed to have the lock now.
1735 1742 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1736 1743 op.gettransaction()
1737 1744
1738 1745 currentheads = set()
1739 1746 for ls in op.repo.branchmap().itervalues():
1740 1747 currentheads.update(ls)
1741 1748
1742 1749 for h in heads:
1743 1750 if h not in currentheads:
1744 1751 raise error.PushRaced('repository changed while pushing - '
1745 1752 'please try again')
1746 1753
1747 1754 @parthandler('output')
1748 1755 def handleoutput(op, inpart):
1749 1756 """forward output captured on the server to the client"""
1750 1757 for line in inpart.read().splitlines():
1751 1758 op.ui.status(_('remote: %s\n') % line)
1752 1759
1753 1760 @parthandler('replycaps')
1754 1761 def handlereplycaps(op, inpart):
1755 1762 """Notify that a reply bundle should be created
1756 1763
1757 1764 The payload contains the capabilities information for the reply"""
1758 1765 caps = decodecaps(inpart.read())
1759 1766 if op.reply is None:
1760 1767 op.reply = bundle20(op.ui, caps)
1761 1768
1762 1769 class AbortFromPart(error.Abort):
1763 1770 """Sub-class of Abort that denotes an error from a bundle2 part."""
1764 1771
1765 1772 @parthandler('error:abort', ('message', 'hint'))
1766 1773 def handleerrorabort(op, inpart):
1767 1774 """Used to transmit abort error over the wire"""
1768 1775 raise AbortFromPart(inpart.params['message'],
1769 1776 hint=inpart.params.get('hint'))
1770 1777
1771 1778 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1772 1779 'in-reply-to'))
1773 1780 def handleerrorpushkey(op, inpart):
1774 1781 """Used to transmit failure of a mandatory pushkey over the wire"""
1775 1782 kwargs = {}
1776 1783 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1777 1784 value = inpart.params.get(name)
1778 1785 if value is not None:
1779 1786 kwargs[name] = value
1780 1787 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1781 1788
1782 1789 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1783 1790 def handleerrorunsupportedcontent(op, inpart):
1784 1791 """Used to transmit unknown content error over the wire"""
1785 1792 kwargs = {}
1786 1793 parttype = inpart.params.get('parttype')
1787 1794 if parttype is not None:
1788 1795 kwargs['parttype'] = parttype
1789 1796 params = inpart.params.get('params')
1790 1797 if params is not None:
1791 1798 kwargs['params'] = params.split('\0')
1792 1799
1793 1800 raise error.BundleUnknownFeatureError(**kwargs)
1794 1801
1795 1802 @parthandler('error:pushraced', ('message',))
1796 1803 def handleerrorpushraced(op, inpart):
1797 1804 """Used to transmit push race error over the wire"""
1798 1805 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1799 1806
1800 1807 @parthandler('listkeys', ('namespace',))
1801 1808 def handlelistkeys(op, inpart):
1802 1809 """retrieve pushkey namespace content stored in a bundle2"""
1803 1810 namespace = inpart.params['namespace']
1804 1811 r = pushkey.decodekeys(inpart.read())
1805 1812 op.records.add('listkeys', (namespace, r))
1806 1813
1807 1814 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1808 1815 def handlepushkey(op, inpart):
1809 1816 """process a pushkey request"""
1810 1817 dec = pushkey.decode
1811 1818 namespace = dec(inpart.params['namespace'])
1812 1819 key = dec(inpart.params['key'])
1813 1820 old = dec(inpart.params['old'])
1814 1821 new = dec(inpart.params['new'])
1815 1822 # Grab the transaction to ensure that we have the lock before performing the
1816 1823 # pushkey.
1817 1824 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1818 1825 op.gettransaction()
1819 1826 ret = op.repo.pushkey(namespace, key, old, new)
1820 1827 record = {'namespace': namespace,
1821 1828 'key': key,
1822 1829 'old': old,
1823 1830 'new': new}
1824 1831 op.records.add('pushkey', record)
1825 1832 if op.reply is not None:
1826 1833 rpart = op.reply.newpart('reply:pushkey')
1827 1834 rpart.addparam(
1828 1835 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1829 1836 rpart.addparam('return', '%i' % ret, mandatory=False)
1830 1837 if inpart.mandatory and not ret:
1831 1838 kwargs = {}
1832 1839 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1833 1840 if key in inpart.params:
1834 1841 kwargs[key] = inpart.params[key]
1835 1842 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1836 1843
1837 1844 def _readphaseheads(inpart):
1838 1845 headsbyphase = [[] for i in phases.allphases]
1839 1846 entrysize = struct.calcsize(_fphasesentry)
1840 1847 while True:
1841 1848 entry = inpart.read(entrysize)
1842 1849 if len(entry) < entrysize:
1843 1850 if entry:
1844 1851 raise error.Abort(_('bad phase-heads bundle part'))
1845 1852 break
1846 1853 phase, node = struct.unpack(_fphasesentry, entry)
1847 1854 headsbyphase[phase].append(node)
1848 1855 return headsbyphase
1849 1856
1850 1857 @parthandler('phase-heads')
1851 1858 def handlephases(op, inpart):
1852 1859 """apply phases from bundle part to repo"""
1853 1860 headsbyphase = _readphaseheads(inpart)
1854 1861 phases.updatephases(op.repo.unfiltered(), op.gettransaction(), headsbyphase)
1855 1862 op.records.add('phase-heads', {})
1856 1863
1857 1864 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1858 1865 def handlepushkeyreply(op, inpart):
1859 1866 """retrieve the result of a pushkey request"""
1860 1867 ret = int(inpart.params['return'])
1861 1868 partid = int(inpart.params['in-reply-to'])
1862 1869 op.records.add('pushkey', {'return': ret}, partid)
1863 1870
1864 1871 @parthandler('obsmarkers')
1865 1872 def handleobsmarker(op, inpart):
1866 1873 """add a stream of obsmarkers to the repo"""
1867 1874 tr = op.gettransaction()
1868 1875 markerdata = inpart.read()
1869 1876 if op.ui.config('experimental', 'obsmarkers-exchange-debug'):
1870 1877 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1871 1878 % len(markerdata))
1872 1879 # The mergemarkers call will crash if marker creation is not enabled.
1873 1880 # we want to avoid this if the part is advisory.
1874 1881 if not inpart.mandatory and op.repo.obsstore.readonly:
1875 1882 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1876 1883 return
1877 1884 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1878 1885 op.repo.invalidatevolatilesets()
1879 1886 if new:
1880 1887 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1881 1888 op.records.add('obsmarkers', {'new': new})
1882 1889 if op.reply is not None:
1883 1890 rpart = op.reply.newpart('reply:obsmarkers')
1884 1891 rpart.addparam(
1885 1892 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1886 1893 rpart.addparam('new', '%i' % new, mandatory=False)
1887 1894
1888 1895
1889 1896 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1890 1897 def handleobsmarkerreply(op, inpart):
1891 1898 """retrieve the result of a pushkey request"""
1892 1899 ret = int(inpart.params['new'])
1893 1900 partid = int(inpart.params['in-reply-to'])
1894 1901 op.records.add('obsmarkers', {'new': ret}, partid)
1895 1902
1896 1903 @parthandler('hgtagsfnodes')
1897 1904 def handlehgtagsfnodes(op, inpart):
1898 1905 """Applies .hgtags fnodes cache entries to the local repo.
1899 1906
1900 1907 Payload is pairs of 20 byte changeset nodes and filenodes.
1901 1908 """
1902 1909 # Grab the transaction so we ensure that we have the lock at this point.
1903 1910 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1904 1911 op.gettransaction()
1905 1912 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1906 1913
1907 1914 count = 0
1908 1915 while True:
1909 1916 node = inpart.read(20)
1910 1917 fnode = inpart.read(20)
1911 1918 if len(node) < 20 or len(fnode) < 20:
1912 1919 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1913 1920 break
1914 1921 cache.setfnode(node, fnode)
1915 1922 count += 1
1916 1923
1917 1924 cache.write()
1918 1925 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
1919 1926
1920 1927 @parthandler('pushvars')
1921 1928 def bundle2getvars(op, part):
1922 1929 '''unbundle a bundle2 containing shellvars on the server'''
1923 1930 # An option to disable unbundling on server-side for security reasons
1924 1931 if op.ui.configbool('push', 'pushvars.server'):
1925 1932 hookargs = {}
1926 1933 for key, value in part.advisoryparams:
1927 1934 key = key.upper()
1928 1935 # We want pushed variables to have USERVAR_ prepended so we know
1929 1936 # they came from the --pushvar flag.
1930 1937 key = "USERVAR_" + key
1931 1938 hookargs[key] = value
1932 1939 op.addhookargs(hookargs)
General Comments 0
You need to be logged in to leave comments. Login now