##// END OF EJS Templates
bundle2.getunbundler: rename "header" to "magicstring"...
Pierre-Yves David -
r25640:39f0064a default
parent child Browse files
Show More
@@ -1,1410 +1,1410
1 1 # bundle2.py - generic container format to transmit arbitrary data.
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 """Handling of the new bundle2 format
8 8
9 9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 11 that will be handed to and processed by the application layer.
12 12
13 13
14 14 General format architecture
15 15 ===========================
16 16
17 17 The format is architectured as follow
18 18
19 19 - magic string
20 20 - stream level parameters
21 21 - payload parts (any number)
22 22 - end of stream marker.
23 23
24 24 the Binary format
25 25 ============================
26 26
27 27 All numbers are unsigned and big-endian.
28 28
29 29 stream level parameters
30 30 ------------------------
31 31
32 32 Binary format is as follow
33 33
34 34 :params size: int32
35 35
36 36 The total number of Bytes used by the parameters
37 37
38 38 :params value: arbitrary number of Bytes
39 39
40 40 A blob of `params size` containing the serialized version of all stream level
41 41 parameters.
42 42
43 43 The blob contains a space separated list of parameters. Parameters with value
44 44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45 45
46 46 Empty name are obviously forbidden.
47 47
48 48 Name MUST start with a letter. If this first letter is lower case, the
49 49 parameter is advisory and can be safely ignored. However when the first
50 50 letter is capital, the parameter is mandatory and the bundling process MUST
51 51 stop if he is not able to proceed it.
52 52
53 53 Stream parameters use a simple textual format for two main reasons:
54 54
55 55 - Stream level parameters should remain simple and we want to discourage any
56 56 crazy usage.
57 57 - Textual data allow easy human inspection of a bundle2 header in case of
58 58 troubles.
59 59
60 60 Any Applicative level options MUST go into a bundle2 part instead.
61 61
62 62 Payload part
63 63 ------------------------
64 64
65 65 Binary format is as follow
66 66
67 67 :header size: int32
68 68
69 69 The total number of Bytes used by the part header. When the header is empty
70 70 (size = 0) this is interpreted as the end of stream marker.
71 71
72 72 :header:
73 73
74 74 The header defines how to interpret the part. It contains two piece of
75 75 data: the part type, and the part parameters.
76 76
77 77 The part type is used to route an application level handler, that can
78 78 interpret payload.
79 79
80 80 Part parameters are passed to the application level handler. They are
81 81 meant to convey information that will help the application level object to
82 82 interpret the part payload.
83 83
84 84 The binary format of the header is has follow
85 85
86 86 :typesize: (one byte)
87 87
88 88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89 89
90 90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 91 to this part.
92 92
93 93 :parameters:
94 94
95 95 Part's parameter may have arbitrary content, the binary structure is::
96 96
97 97 <mandatory-count><advisory-count><param-sizes><param-data>
98 98
99 99 :mandatory-count: 1 byte, number of mandatory parameters
100 100
101 101 :advisory-count: 1 byte, number of advisory parameters
102 102
103 103 :param-sizes:
104 104
105 105 N couple of bytes, where N is the total number of parameters. Each
106 106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107 107
108 108 :param-data:
109 109
110 110 A blob of bytes from which each parameter key and value can be
111 111 retrieved using the list of size couples stored in the previous
112 112 field.
113 113
114 114 Mandatory parameters comes first, then the advisory ones.
115 115
116 116 Each parameter's key MUST be unique within the part.
117 117
118 118 :payload:
119 119
120 120 payload is a series of `<chunksize><chunkdata>`.
121 121
122 122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124 124
125 125 The current implementation always produces either zero or one chunk.
126 126 This is an implementation limitation that will ultimately be lifted.
127 127
128 128 `chunksize` can be negative to trigger special case processing. No such
129 129 processing is in place yet.
130 130
131 131 Bundle processing
132 132 ============================
133 133
134 134 Each part is processed in order using a "part handler". Handler are registered
135 135 for a certain part type.
136 136
137 137 The matching of a part to its handler is case insensitive. The case of the
138 138 part type is used to know if a part is mandatory or advisory. If the Part type
139 139 contains any uppercase char it is considered mandatory. When no handler is
140 140 known for a Mandatory part, the process is aborted and an exception is raised.
141 141 If the part is advisory and no handler is known, the part is ignored. When the
142 142 process is aborted, the full bundle is still read from the stream to keep the
143 143 channel usable. But none of the part read from an abort are processed. In the
144 144 future, dropping the stream may become an option for channel we do not care to
145 145 preserve.
146 146 """
147 147
148 148 import errno
149 149 import sys
150 150 import util
151 151 import struct
152 152 import urllib
153 153 import string
154 154 import obsolete
155 155 import pushkey
156 156 import url
157 157 import re
158 158
159 159 import changegroup, error, tags
160 160 from i18n import _
161 161
162 162 _pack = struct.pack
163 163 _unpack = struct.unpack
164 164
165 165 _fstreamparamsize = '>i'
166 166 _fpartheadersize = '>i'
167 167 _fparttypesize = '>B'
168 168 _fpartid = '>I'
169 169 _fpayloadsize = '>i'
170 170 _fpartparamcount = '>BB'
171 171
172 172 preferedchunksize = 4096
173 173
174 174 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
175 175
176 176 def outdebug(ui, message):
177 177 """debug regarding output stream (bundling)"""
178 178 if ui.configbool('devel', 'bundle2.debug', False):
179 179 ui.debug('bundle2-output: %s\n' % message)
180 180
181 181 def indebug(ui, message):
182 182 """debug on input stream (unbundling)"""
183 183 if ui.configbool('devel', 'bundle2.debug', False):
184 184 ui.debug('bundle2-input: %s\n' % message)
185 185
186 186 def validateparttype(parttype):
187 187 """raise ValueError if a parttype contains invalid character"""
188 188 if _parttypeforbidden.search(parttype):
189 189 raise ValueError(parttype)
190 190
191 191 def _makefpartparamsizes(nbparams):
192 192 """return a struct format to read part parameter sizes
193 193
194 194 The number parameters is variable so we need to build that format
195 195 dynamically.
196 196 """
197 197 return '>'+('BB'*nbparams)
198 198
199 199 parthandlermapping = {}
200 200
201 201 def parthandler(parttype, params=()):
202 202 """decorator that register a function as a bundle2 part handler
203 203
204 204 eg::
205 205
206 206 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
207 207 def myparttypehandler(...):
208 208 '''process a part of type "my part".'''
209 209 ...
210 210 """
211 211 validateparttype(parttype)
212 212 def _decorator(func):
213 213 lparttype = parttype.lower() # enforce lower case matching.
214 214 assert lparttype not in parthandlermapping
215 215 parthandlermapping[lparttype] = func
216 216 func.params = frozenset(params)
217 217 return func
218 218 return _decorator
219 219
220 220 class unbundlerecords(object):
221 221 """keep record of what happens during and unbundle
222 222
223 223 New records are added using `records.add('cat', obj)`. Where 'cat' is a
224 224 category of record and obj is an arbitrary object.
225 225
226 226 `records['cat']` will return all entries of this category 'cat'.
227 227
228 228 Iterating on the object itself will yield `('category', obj)` tuples
229 229 for all entries.
230 230
231 231 All iterations happens in chronological order.
232 232 """
233 233
234 234 def __init__(self):
235 235 self._categories = {}
236 236 self._sequences = []
237 237 self._replies = {}
238 238
239 239 def add(self, category, entry, inreplyto=None):
240 240 """add a new record of a given category.
241 241
242 242 The entry can then be retrieved in the list returned by
243 243 self['category']."""
244 244 self._categories.setdefault(category, []).append(entry)
245 245 self._sequences.append((category, entry))
246 246 if inreplyto is not None:
247 247 self.getreplies(inreplyto).add(category, entry)
248 248
249 249 def getreplies(self, partid):
250 250 """get the records that are replies to a specific part"""
251 251 return self._replies.setdefault(partid, unbundlerecords())
252 252
253 253 def __getitem__(self, cat):
254 254 return tuple(self._categories.get(cat, ()))
255 255
256 256 def __iter__(self):
257 257 return iter(self._sequences)
258 258
259 259 def __len__(self):
260 260 return len(self._sequences)
261 261
262 262 def __nonzero__(self):
263 263 return bool(self._sequences)
264 264
265 265 class bundleoperation(object):
266 266 """an object that represents a single bundling process
267 267
268 268 Its purpose is to carry unbundle-related objects and states.
269 269
270 270 A new object should be created at the beginning of each bundle processing.
271 271 The object is to be returned by the processing function.
272 272
273 273 The object has very little content now it will ultimately contain:
274 274 * an access to the repo the bundle is applied to,
275 275 * a ui object,
276 276 * a way to retrieve a transaction to add changes to the repo,
277 277 * a way to record the result of processing each part,
278 278 * a way to construct a bundle response when applicable.
279 279 """
280 280
281 281 def __init__(self, repo, transactiongetter, captureoutput=True):
282 282 self.repo = repo
283 283 self.ui = repo.ui
284 284 self.records = unbundlerecords()
285 285 self.gettransaction = transactiongetter
286 286 self.reply = None
287 287 self.captureoutput = captureoutput
288 288
289 289 class TransactionUnavailable(RuntimeError):
290 290 pass
291 291
292 292 def _notransaction():
293 293 """default method to get a transaction while processing a bundle
294 294
295 295 Raise an exception to highlight the fact that no transaction was expected
296 296 to be created"""
297 297 raise TransactionUnavailable()
298 298
299 299 def processbundle(repo, unbundler, transactiongetter=None, op=None):
300 300 """This function process a bundle, apply effect to/from a repo
301 301
302 302 It iterates over each part then searches for and uses the proper handling
303 303 code to process the part. Parts are processed in order.
304 304
305 305 This is very early version of this function that will be strongly reworked
306 306 before final usage.
307 307
308 308 Unknown Mandatory part will abort the process.
309 309
310 310 It is temporarily possible to provide a prebuilt bundleoperation to the
311 311 function. This is used to ensure output is properly propagated in case of
312 312 an error during the unbundling. This output capturing part will likely be
313 313 reworked and this ability will probably go away in the process.
314 314 """
315 315 if op is None:
316 316 if transactiongetter is None:
317 317 transactiongetter = _notransaction
318 318 op = bundleoperation(repo, transactiongetter)
319 319 # todo:
320 320 # - replace this is a init function soon.
321 321 # - exception catching
322 322 unbundler.params
323 323 if repo.ui.debugflag:
324 324 msg = ['bundle2-input-bundle:']
325 325 if unbundler.params:
326 326 msg.append(' %i params')
327 327 if op.gettransaction is None:
328 328 msg.append(' no-transaction')
329 329 else:
330 330 msg.append(' with-transaction')
331 331 msg.append('\n')
332 332 repo.ui.debug(''.join(msg))
333 333 iterparts = enumerate(unbundler.iterparts())
334 334 part = None
335 335 nbpart = 0
336 336 try:
337 337 for nbpart, part in iterparts:
338 338 _processpart(op, part)
339 339 except BaseException, exc:
340 340 for nbpart, part in iterparts:
341 341 # consume the bundle content
342 342 part.seek(0, 2)
343 343 # Small hack to let caller code distinguish exceptions from bundle2
344 344 # processing from processing the old format. This is mostly
345 345 # needed to handle different return codes to unbundle according to the
346 346 # type of bundle. We should probably clean up or drop this return code
347 347 # craziness in a future version.
348 348 exc.duringunbundle2 = True
349 349 salvaged = []
350 350 replycaps = None
351 351 if op.reply is not None:
352 352 salvaged = op.reply.salvageoutput()
353 353 replycaps = op.reply.capabilities
354 354 exc._replycaps = replycaps
355 355 exc._bundle2salvagedoutput = salvaged
356 356 raise
357 357 finally:
358 358 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
359 359
360 360 return op
361 361
362 362 def _processpart(op, part):
363 363 """process a single part from a bundle
364 364
365 365 The part is guaranteed to have been fully consumed when the function exits
366 366 (even if an exception is raised)."""
367 367 status = 'unknown' # used by debug output
368 368 try:
369 369 try:
370 370 handler = parthandlermapping.get(part.type)
371 371 if handler is None:
372 372 status = 'unsupported-type'
373 373 raise error.UnsupportedPartError(parttype=part.type)
374 374 indebug(op.ui, 'found a handler for part %r' % part.type)
375 375 unknownparams = part.mandatorykeys - handler.params
376 376 if unknownparams:
377 377 unknownparams = list(unknownparams)
378 378 unknownparams.sort()
379 379 status = 'unsupported-params (%s)' % unknownparams
380 380 raise error.UnsupportedPartError(parttype=part.type,
381 381 params=unknownparams)
382 382 status = 'supported'
383 383 except error.UnsupportedPartError, exc:
384 384 if part.mandatory: # mandatory parts
385 385 raise
386 386 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
387 387 return # skip to part processing
388 388 finally:
389 389 if op.ui.debugflag:
390 390 msg = ['bundle2-input-part: "%s"' % part.type]
391 391 if not part.mandatory:
392 392 msg.append(' (advisory)')
393 393 nbmp = len(part.mandatorykeys)
394 394 nbap = len(part.params) - nbmp
395 395 if nbmp or nbap:
396 396 msg.append(' (params:')
397 397 if nbmp:
398 398 msg.append(' %i mandatory' % nbmp)
399 399 if nbap:
400 400 msg.append(' %i advisory' % nbmp)
401 401 msg.append(')')
402 402 msg.append(' %s\n' % status)
403 403 op.ui.debug(''.join(msg))
404 404
405 405 # handler is called outside the above try block so that we don't
406 406 # risk catching KeyErrors from anything other than the
407 407 # parthandlermapping lookup (any KeyError raised by handler()
408 408 # itself represents a defect of a different variety).
409 409 output = None
410 410 if op.captureoutput and op.reply is not None:
411 411 op.ui.pushbuffer(error=True, subproc=True)
412 412 output = ''
413 413 try:
414 414 handler(op, part)
415 415 finally:
416 416 if output is not None:
417 417 output = op.ui.popbuffer()
418 418 if output:
419 419 outpart = op.reply.newpart('output', data=output,
420 420 mandatory=False)
421 421 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
422 422 finally:
423 423 # consume the part content to not corrupt the stream.
424 424 part.seek(0, 2)
425 425
426 426
427 427 def decodecaps(blob):
428 428 """decode a bundle2 caps bytes blob into a dictionary
429 429
430 430 The blob is a list of capabilities (one per line)
431 431 Capabilities may have values using a line of the form::
432 432
433 433 capability=value1,value2,value3
434 434
435 435 The values are always a list."""
436 436 caps = {}
437 437 for line in blob.splitlines():
438 438 if not line:
439 439 continue
440 440 if '=' not in line:
441 441 key, vals = line, ()
442 442 else:
443 443 key, vals = line.split('=', 1)
444 444 vals = vals.split(',')
445 445 key = urllib.unquote(key)
446 446 vals = [urllib.unquote(v) for v in vals]
447 447 caps[key] = vals
448 448 return caps
449 449
450 450 def encodecaps(caps):
451 451 """encode a bundle2 caps dictionary into a bytes blob"""
452 452 chunks = []
453 453 for ca in sorted(caps):
454 454 vals = caps[ca]
455 455 ca = urllib.quote(ca)
456 456 vals = [urllib.quote(v) for v in vals]
457 457 if vals:
458 458 ca = "%s=%s" % (ca, ','.join(vals))
459 459 chunks.append(ca)
460 460 return '\n'.join(chunks)
461 461
462 462 class bundle20(object):
463 463 """represent an outgoing bundle2 container
464 464
465 465 Use the `addparam` method to add stream level parameter. and `newpart` to
466 466 populate it. Then call `getchunks` to retrieve all the binary chunks of
467 467 data that compose the bundle2 container."""
468 468
469 469 _magicstring = 'HG20'
470 470
471 471 def __init__(self, ui, capabilities=()):
472 472 self.ui = ui
473 473 self._params = []
474 474 self._parts = []
475 475 self.capabilities = dict(capabilities)
476 476
477 477 @property
478 478 def nbparts(self):
479 479 """total number of parts added to the bundler"""
480 480 return len(self._parts)
481 481
482 482 # methods used to defines the bundle2 content
483 483 def addparam(self, name, value=None):
484 484 """add a stream level parameter"""
485 485 if not name:
486 486 raise ValueError('empty parameter name')
487 487 if name[0] not in string.letters:
488 488 raise ValueError('non letter first character: %r' % name)
489 489 self._params.append((name, value))
490 490
491 491 def addpart(self, part):
492 492 """add a new part to the bundle2 container
493 493
494 494 Parts contains the actual applicative payload."""
495 495 assert part.id is None
496 496 part.id = len(self._parts) # very cheap counter
497 497 self._parts.append(part)
498 498
499 499 def newpart(self, typeid, *args, **kwargs):
500 500 """create a new part and add it to the containers
501 501
502 502 As the part is directly added to the containers. For now, this means
503 503 that any failure to properly initialize the part after calling
504 504 ``newpart`` should result in a failure of the whole bundling process.
505 505
506 506 You can still fall back to manually create and add if you need better
507 507 control."""
508 508 part = bundlepart(typeid, *args, **kwargs)
509 509 self.addpart(part)
510 510 return part
511 511
512 512 # methods used to generate the bundle2 stream
513 513 def getchunks(self):
514 514 if self.ui.debugflag:
515 515 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
516 516 if self._params:
517 517 msg.append(' (%i params)' % len(self._params))
518 518 msg.append(' %i parts total\n' % len(self._parts))
519 519 self.ui.debug(''.join(msg))
520 520 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
521 521 yield self._magicstring
522 522 param = self._paramchunk()
523 523 outdebug(self.ui, 'bundle parameter: %s' % param)
524 524 yield _pack(_fstreamparamsize, len(param))
525 525 if param:
526 526 yield param
527 527
528 528 outdebug(self.ui, 'start of parts')
529 529 for part in self._parts:
530 530 outdebug(self.ui, 'bundle part: "%s"' % part.type)
531 531 for chunk in part.getchunks(ui=self.ui):
532 532 yield chunk
533 533 outdebug(self.ui, 'end of bundle')
534 534 yield _pack(_fpartheadersize, 0)
535 535
536 536 def _paramchunk(self):
537 537 """return a encoded version of all stream parameters"""
538 538 blocks = []
539 539 for par, value in self._params:
540 540 par = urllib.quote(par)
541 541 if value is not None:
542 542 value = urllib.quote(value)
543 543 par = '%s=%s' % (par, value)
544 544 blocks.append(par)
545 545 return ' '.join(blocks)
546 546
547 547 def salvageoutput(self):
548 548 """return a list with a copy of all output parts in the bundle
549 549
550 550 This is meant to be used during error handling to make sure we preserve
551 551 server output"""
552 552 salvaged = []
553 553 for part in self._parts:
554 554 if part.type.startswith('output'):
555 555 salvaged.append(part.copy())
556 556 return salvaged
557 557
558 558
559 559 class unpackermixin(object):
560 560 """A mixin to extract bytes and struct data from a stream"""
561 561
562 562 def __init__(self, fp):
563 563 self._fp = fp
564 564 self._seekable = (util.safehasattr(fp, 'seek') and
565 565 util.safehasattr(fp, 'tell'))
566 566
567 567 def _unpack(self, format):
568 568 """unpack this struct format from the stream"""
569 569 data = self._readexact(struct.calcsize(format))
570 570 return _unpack(format, data)
571 571
572 572 def _readexact(self, size):
573 573 """read exactly <size> bytes from the stream"""
574 574 return changegroup.readexactly(self._fp, size)
575 575
576 576 def seek(self, offset, whence=0):
577 577 """move the underlying file pointer"""
578 578 if self._seekable:
579 579 return self._fp.seek(offset, whence)
580 580 else:
581 581 raise NotImplementedError(_('File pointer is not seekable'))
582 582
583 583 def tell(self):
584 584 """return the file offset, or None if file is not seekable"""
585 585 if self._seekable:
586 586 try:
587 587 return self._fp.tell()
588 588 except IOError, e:
589 589 if e.errno == errno.ESPIPE:
590 590 self._seekable = False
591 591 else:
592 592 raise
593 593 return None
594 594
595 595 def close(self):
596 596 """close underlying file"""
597 597 if util.safehasattr(self._fp, 'close'):
598 598 return self._fp.close()
599 599
600 def getunbundler(ui, fp, header=None):
601 """return a valid unbundler object for a given header"""
602 if header is None:
603 header = changegroup.readexactly(fp, 4)
604 magic, version = header[0:2], header[2:4]
600 def getunbundler(ui, fp, magicstring=None):
601 """return a valid unbundler object for a given magicstring"""
602 if magicstring is None:
603 magicstring = changegroup.readexactly(fp, 4)
604 magic, version = magicstring[0:2], magicstring[2:4]
605 605 if magic != 'HG':
606 606 raise util.Abort(_('not a Mercurial bundle'))
607 607 unbundlerclass = formatmap.get(version)
608 608 if unbundlerclass is None:
609 609 raise util.Abort(_('unknown bundle version %s') % version)
610 610 unbundler = unbundlerclass(ui, fp)
611 indebug(ui, 'start processing of %s stream' % header)
611 indebug(ui, 'start processing of %s stream' % magicstring)
612 612 return unbundler
613 613
614 614 class unbundle20(unpackermixin):
615 615 """interpret a bundle2 stream
616 616
617 617 This class is fed with a binary stream and yields parts through its
618 618 `iterparts` methods."""
619 619
620 620 def __init__(self, ui, fp):
621 621 """If header is specified, we do not read it out of the stream."""
622 622 self.ui = ui
623 623 super(unbundle20, self).__init__(fp)
624 624
625 625 @util.propertycache
626 626 def params(self):
627 627 """dictionary of stream level parameters"""
628 628 indebug(self.ui, 'reading bundle2 stream parameters')
629 629 params = {}
630 630 paramssize = self._unpack(_fstreamparamsize)[0]
631 631 if paramssize < 0:
632 632 raise error.BundleValueError('negative bundle param size: %i'
633 633 % paramssize)
634 634 if paramssize:
635 635 for p in self._readexact(paramssize).split(' '):
636 636 p = p.split('=', 1)
637 637 p = [urllib.unquote(i) for i in p]
638 638 if len(p) < 2:
639 639 p.append(None)
640 640 self._processparam(*p)
641 641 params[p[0]] = p[1]
642 642 return params
643 643
644 644 def _processparam(self, name, value):
645 645 """process a parameter, applying its effect if needed
646 646
647 647 Parameter starting with a lower case letter are advisory and will be
648 648 ignored when unknown. Those starting with an upper case letter are
649 649 mandatory and will this function will raise a KeyError when unknown.
650 650
651 651 Note: no option are currently supported. Any input will be either
652 652 ignored or failing.
653 653 """
654 654 if not name:
655 655 raise ValueError('empty parameter name')
656 656 if name[0] not in string.letters:
657 657 raise ValueError('non letter first character: %r' % name)
658 658 # Some logic will be later added here to try to process the option for
659 659 # a dict of known parameter.
660 660 if name[0].islower():
661 661 indebug(self.ui, "ignoring unknown parameter %r" % name)
662 662 else:
663 663 raise error.UnsupportedPartError(params=(name,))
664 664
665 665
666 666 def iterparts(self):
667 667 """yield all parts contained in the stream"""
668 668 # make sure param have been loaded
669 669 self.params
670 670 indebug(self.ui, 'start extraction of bundle2 parts')
671 671 headerblock = self._readpartheader()
672 672 while headerblock is not None:
673 673 part = unbundlepart(self.ui, headerblock, self._fp)
674 674 yield part
675 675 part.seek(0, 2)
676 676 headerblock = self._readpartheader()
677 677 indebug(self.ui, 'end of bundle2 stream')
678 678
679 679 def _readpartheader(self):
680 680 """reads a part header size and return the bytes blob
681 681
682 682 returns None if empty"""
683 683 headersize = self._unpack(_fpartheadersize)[0]
684 684 if headersize < 0:
685 685 raise error.BundleValueError('negative part header size: %i'
686 686 % headersize)
687 687 indebug(self.ui, 'part header size: %i' % headersize)
688 688 if headersize:
689 689 return self._readexact(headersize)
690 690 return None
691 691
692 692 def compressed(self):
693 693 return False
694 694
695 695 formatmap = {'20': unbundle20}
696 696
697 697 class bundlepart(object):
698 698 """A bundle2 part contains application level payload
699 699
700 700 The part `type` is used to route the part to the application level
701 701 handler.
702 702
703 703 The part payload is contained in ``part.data``. It could be raw bytes or a
704 704 generator of byte chunks.
705 705
706 706 You can add parameters to the part using the ``addparam`` method.
707 707 Parameters can be either mandatory (default) or advisory. Remote side
708 708 should be able to safely ignore the advisory ones.
709 709
710 710 Both data and parameters cannot be modified after the generation has begun.
711 711 """
712 712
713 713 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
714 714 data='', mandatory=True):
715 715 validateparttype(parttype)
716 716 self.id = None
717 717 self.type = parttype
718 718 self._data = data
719 719 self._mandatoryparams = list(mandatoryparams)
720 720 self._advisoryparams = list(advisoryparams)
721 721 # checking for duplicated entries
722 722 self._seenparams = set()
723 723 for pname, __ in self._mandatoryparams + self._advisoryparams:
724 724 if pname in self._seenparams:
725 725 raise RuntimeError('duplicated params: %s' % pname)
726 726 self._seenparams.add(pname)
727 727 # status of the part's generation:
728 728 # - None: not started,
729 729 # - False: currently generated,
730 730 # - True: generation done.
731 731 self._generated = None
732 732 self.mandatory = mandatory
733 733
734 734 def copy(self):
735 735 """return a copy of the part
736 736
737 737 The new part have the very same content but no partid assigned yet.
738 738 Parts with generated data cannot be copied."""
739 739 assert not util.safehasattr(self.data, 'next')
740 740 return self.__class__(self.type, self._mandatoryparams,
741 741 self._advisoryparams, self._data, self.mandatory)
742 742
743 743 # methods used to defines the part content
744 744 def __setdata(self, data):
745 745 if self._generated is not None:
746 746 raise error.ReadOnlyPartError('part is being generated')
747 747 self._data = data
748 748 def __getdata(self):
749 749 return self._data
750 750 data = property(__getdata, __setdata)
751 751
752 752 @property
753 753 def mandatoryparams(self):
754 754 # make it an immutable tuple to force people through ``addparam``
755 755 return tuple(self._mandatoryparams)
756 756
757 757 @property
758 758 def advisoryparams(self):
759 759 # make it an immutable tuple to force people through ``addparam``
760 760 return tuple(self._advisoryparams)
761 761
762 762 def addparam(self, name, value='', mandatory=True):
763 763 if self._generated is not None:
764 764 raise error.ReadOnlyPartError('part is being generated')
765 765 if name in self._seenparams:
766 766 raise ValueError('duplicated params: %s' % name)
767 767 self._seenparams.add(name)
768 768 params = self._advisoryparams
769 769 if mandatory:
770 770 params = self._mandatoryparams
771 771 params.append((name, value))
772 772
773 773 # methods used to generates the bundle2 stream
774 774 def getchunks(self, ui):
775 775 if self._generated is not None:
776 776 raise RuntimeError('part can only be consumed once')
777 777 self._generated = False
778 778
779 779 if ui.debugflag:
780 780 msg = ['bundle2-output-part: "%s"' % self.type]
781 781 if not self.mandatory:
782 782 msg.append(' (advisory)')
783 783 nbmp = len(self.mandatoryparams)
784 784 nbap = len(self.advisoryparams)
785 785 if nbmp or nbap:
786 786 msg.append(' (params:')
787 787 if nbmp:
788 788 msg.append(' %i mandatory' % nbmp)
789 789 if nbap:
790 790 msg.append(' %i advisory' % nbmp)
791 791 msg.append(')')
792 792 if not self.data:
793 793 msg.append(' empty payload')
794 794 elif util.safehasattr(self.data, 'next'):
795 795 msg.append(' streamed payload')
796 796 else:
797 797 msg.append(' %i bytes payload' % len(self.data))
798 798 msg.append('\n')
799 799 ui.debug(''.join(msg))
800 800
801 801 #### header
802 802 if self.mandatory:
803 803 parttype = self.type.upper()
804 804 else:
805 805 parttype = self.type.lower()
806 806 outdebug(ui, 'part %s: "%s"' % (self.id, parttype))
807 807 ## parttype
808 808 header = [_pack(_fparttypesize, len(parttype)),
809 809 parttype, _pack(_fpartid, self.id),
810 810 ]
811 811 ## parameters
812 812 # count
813 813 manpar = self.mandatoryparams
814 814 advpar = self.advisoryparams
815 815 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
816 816 # size
817 817 parsizes = []
818 818 for key, value in manpar:
819 819 parsizes.append(len(key))
820 820 parsizes.append(len(value))
821 821 for key, value in advpar:
822 822 parsizes.append(len(key))
823 823 parsizes.append(len(value))
824 824 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
825 825 header.append(paramsizes)
826 826 # key, value
827 827 for key, value in manpar:
828 828 header.append(key)
829 829 header.append(value)
830 830 for key, value in advpar:
831 831 header.append(key)
832 832 header.append(value)
833 833 ## finalize header
834 834 headerchunk = ''.join(header)
835 835 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
836 836 yield _pack(_fpartheadersize, len(headerchunk))
837 837 yield headerchunk
838 838 ## payload
839 839 try:
840 840 for chunk in self._payloadchunks():
841 841 outdebug(ui, 'payload chunk size: %i' % len(chunk))
842 842 yield _pack(_fpayloadsize, len(chunk))
843 843 yield chunk
844 844 except BaseException, exc:
845 845 # backup exception data for later
846 846 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
847 847 % exc)
848 848 exc_info = sys.exc_info()
849 849 msg = 'unexpected error: %s' % exc
850 850 interpart = bundlepart('error:abort', [('message', msg)],
851 851 mandatory=False)
852 852 interpart.id = 0
853 853 yield _pack(_fpayloadsize, -1)
854 854 for chunk in interpart.getchunks(ui=ui):
855 855 yield chunk
856 856 outdebug(ui, 'closing payload chunk')
857 857 # abort current part payload
858 858 yield _pack(_fpayloadsize, 0)
859 859 raise exc_info[0], exc_info[1], exc_info[2]
860 860 # end of payload
861 861 outdebug(ui, 'closing payload chunk')
862 862 yield _pack(_fpayloadsize, 0)
863 863 self._generated = True
864 864
865 865 def _payloadchunks(self):
866 866 """yield chunks of a the part payload
867 867
868 868 Exists to handle the different methods to provide data to a part."""
869 869 # we only support fixed size data now.
870 870 # This will be improved in the future.
871 871 if util.safehasattr(self.data, 'next'):
872 872 buff = util.chunkbuffer(self.data)
873 873 chunk = buff.read(preferedchunksize)
874 874 while chunk:
875 875 yield chunk
876 876 chunk = buff.read(preferedchunksize)
877 877 elif len(self.data):
878 878 yield self.data
879 879
880 880
881 881 flaginterrupt = -1
882 882
883 883 class interrupthandler(unpackermixin):
884 884 """read one part and process it with restricted capability
885 885
886 886 This allows to transmit exception raised on the producer size during part
887 887 iteration while the consumer is reading a part.
888 888
889 889 Part processed in this manner only have access to a ui object,"""
890 890
891 891 def __init__(self, ui, fp):
892 892 super(interrupthandler, self).__init__(fp)
893 893 self.ui = ui
894 894
895 895 def _readpartheader(self):
896 896 """reads a part header size and return the bytes blob
897 897
898 898 returns None if empty"""
899 899 headersize = self._unpack(_fpartheadersize)[0]
900 900 if headersize < 0:
901 901 raise error.BundleValueError('negative part header size: %i'
902 902 % headersize)
903 903 indebug(self.ui, 'part header size: %i\n' % headersize)
904 904 if headersize:
905 905 return self._readexact(headersize)
906 906 return None
907 907
908 908 def __call__(self):
909 909
910 910 self.ui.debug('bundle2-input-stream-interrupt:'
911 911 ' opening out of band context\n')
912 912 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
913 913 headerblock = self._readpartheader()
914 914 if headerblock is None:
915 915 indebug(self.ui, 'no part found during interruption.')
916 916 return
917 917 part = unbundlepart(self.ui, headerblock, self._fp)
918 918 op = interruptoperation(self.ui)
919 919 _processpart(op, part)
920 920 self.ui.debug('bundle2-input-stream-interrupt:'
921 921 ' closing out of band context\n')
922 922
923 923 class interruptoperation(object):
924 924 """A limited operation to be use by part handler during interruption
925 925
926 926 It only have access to an ui object.
927 927 """
928 928
929 929 def __init__(self, ui):
930 930 self.ui = ui
931 931 self.reply = None
932 932 self.captureoutput = False
933 933
934 934 @property
935 935 def repo(self):
936 936 raise RuntimeError('no repo access from stream interruption')
937 937
938 938 def gettransaction(self):
939 939 raise TransactionUnavailable('no repo access from stream interruption')
940 940
941 941 class unbundlepart(unpackermixin):
942 942 """a bundle part read from a bundle"""
943 943
944 944 def __init__(self, ui, header, fp):
945 945 super(unbundlepart, self).__init__(fp)
946 946 self.ui = ui
947 947 # unbundle state attr
948 948 self._headerdata = header
949 949 self._headeroffset = 0
950 950 self._initialized = False
951 951 self.consumed = False
952 952 # part data
953 953 self.id = None
954 954 self.type = None
955 955 self.mandatoryparams = None
956 956 self.advisoryparams = None
957 957 self.params = None
958 958 self.mandatorykeys = ()
959 959 self._payloadstream = None
960 960 self._readheader()
961 961 self._mandatory = None
962 962 self._chunkindex = [] #(payload, file) position tuples for chunk starts
963 963 self._pos = 0
964 964
965 965 def _fromheader(self, size):
966 966 """return the next <size> byte from the header"""
967 967 offset = self._headeroffset
968 968 data = self._headerdata[offset:(offset + size)]
969 969 self._headeroffset = offset + size
970 970 return data
971 971
972 972 def _unpackheader(self, format):
973 973 """read given format from header
974 974
975 975 This automatically compute the size of the format to read."""
976 976 data = self._fromheader(struct.calcsize(format))
977 977 return _unpack(format, data)
978 978
979 979 def _initparams(self, mandatoryparams, advisoryparams):
980 980 """internal function to setup all logic related parameters"""
981 981 # make it read only to prevent people touching it by mistake.
982 982 self.mandatoryparams = tuple(mandatoryparams)
983 983 self.advisoryparams = tuple(advisoryparams)
984 984 # user friendly UI
985 985 self.params = dict(self.mandatoryparams)
986 986 self.params.update(dict(self.advisoryparams))
987 987 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
988 988
989 989 def _payloadchunks(self, chunknum=0):
990 990 '''seek to specified chunk and start yielding data'''
991 991 if len(self._chunkindex) == 0:
992 992 assert chunknum == 0, 'Must start with chunk 0'
993 993 self._chunkindex.append((0, super(unbundlepart, self).tell()))
994 994 else:
995 995 assert chunknum < len(self._chunkindex), \
996 996 'Unknown chunk %d' % chunknum
997 997 super(unbundlepart, self).seek(self._chunkindex[chunknum][1])
998 998
999 999 pos = self._chunkindex[chunknum][0]
1000 1000 payloadsize = self._unpack(_fpayloadsize)[0]
1001 1001 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1002 1002 while payloadsize:
1003 1003 if payloadsize == flaginterrupt:
1004 1004 # interruption detection, the handler will now read a
1005 1005 # single part and process it.
1006 1006 interrupthandler(self.ui, self._fp)()
1007 1007 elif payloadsize < 0:
1008 1008 msg = 'negative payload chunk size: %i' % payloadsize
1009 1009 raise error.BundleValueError(msg)
1010 1010 else:
1011 1011 result = self._readexact(payloadsize)
1012 1012 chunknum += 1
1013 1013 pos += payloadsize
1014 1014 if chunknum == len(self._chunkindex):
1015 1015 self._chunkindex.append((pos,
1016 1016 super(unbundlepart, self).tell()))
1017 1017 yield result
1018 1018 payloadsize = self._unpack(_fpayloadsize)[0]
1019 1019 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1020 1020
1021 1021 def _findchunk(self, pos):
1022 1022 '''for a given payload position, return a chunk number and offset'''
1023 1023 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1024 1024 if ppos == pos:
1025 1025 return chunk, 0
1026 1026 elif ppos > pos:
1027 1027 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1028 1028 raise ValueError('Unknown chunk')
1029 1029
1030 1030 def _readheader(self):
1031 1031 """read the header and setup the object"""
1032 1032 typesize = self._unpackheader(_fparttypesize)[0]
1033 1033 self.type = self._fromheader(typesize)
1034 1034 indebug(self.ui, 'part type: "%s"' % self.type)
1035 1035 self.id = self._unpackheader(_fpartid)[0]
1036 1036 indebug(self.ui, 'part id: "%s"' % self.id)
1037 1037 # extract mandatory bit from type
1038 1038 self.mandatory = (self.type != self.type.lower())
1039 1039 self.type = self.type.lower()
1040 1040 ## reading parameters
1041 1041 # param count
1042 1042 mancount, advcount = self._unpackheader(_fpartparamcount)
1043 1043 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1044 1044 # param size
1045 1045 fparamsizes = _makefpartparamsizes(mancount + advcount)
1046 1046 paramsizes = self._unpackheader(fparamsizes)
1047 1047 # make it a list of couple again
1048 1048 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
1049 1049 # split mandatory from advisory
1050 1050 mansizes = paramsizes[:mancount]
1051 1051 advsizes = paramsizes[mancount:]
1052 1052 # retrieve param value
1053 1053 manparams = []
1054 1054 for key, value in mansizes:
1055 1055 manparams.append((self._fromheader(key), self._fromheader(value)))
1056 1056 advparams = []
1057 1057 for key, value in advsizes:
1058 1058 advparams.append((self._fromheader(key), self._fromheader(value)))
1059 1059 self._initparams(manparams, advparams)
1060 1060 ## part payload
1061 1061 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1062 1062 # we read the data, tell it
1063 1063 self._initialized = True
1064 1064
1065 1065 def read(self, size=None):
1066 1066 """read payload data"""
1067 1067 if not self._initialized:
1068 1068 self._readheader()
1069 1069 if size is None:
1070 1070 data = self._payloadstream.read()
1071 1071 else:
1072 1072 data = self._payloadstream.read(size)
1073 1073 self._pos += len(data)
1074 1074 if size is None or len(data) < size:
1075 1075 if not self.consumed and self._pos:
1076 1076 self.ui.debug('bundle2-input-part: total payload size %i\n'
1077 1077 % self._pos)
1078 1078 self.consumed = True
1079 1079 return data
1080 1080
1081 1081 def tell(self):
1082 1082 return self._pos
1083 1083
1084 1084 def seek(self, offset, whence=0):
1085 1085 if whence == 0:
1086 1086 newpos = offset
1087 1087 elif whence == 1:
1088 1088 newpos = self._pos + offset
1089 1089 elif whence == 2:
1090 1090 if not self.consumed:
1091 1091 self.read()
1092 1092 newpos = self._chunkindex[-1][0] - offset
1093 1093 else:
1094 1094 raise ValueError('Unknown whence value: %r' % (whence,))
1095 1095
1096 1096 if newpos > self._chunkindex[-1][0] and not self.consumed:
1097 1097 self.read()
1098 1098 if not 0 <= newpos <= self._chunkindex[-1][0]:
1099 1099 raise ValueError('Offset out of range')
1100 1100
1101 1101 if self._pos != newpos:
1102 1102 chunk, internaloffset = self._findchunk(newpos)
1103 1103 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1104 1104 adjust = self.read(internaloffset)
1105 1105 if len(adjust) != internaloffset:
1106 1106 raise util.Abort(_('Seek failed\n'))
1107 1107 self._pos = newpos
1108 1108
1109 1109 # These are only the static capabilities.
1110 1110 # Check the 'getrepocaps' function for the rest.
1111 1111 capabilities = {'HG20': (),
1112 1112 'error': ('abort', 'unsupportedcontent', 'pushraced',
1113 1113 'pushkey'),
1114 1114 'listkeys': (),
1115 1115 'pushkey': (),
1116 1116 'digests': tuple(sorted(util.DIGESTS.keys())),
1117 1117 'remote-changegroup': ('http', 'https'),
1118 1118 'hgtagsfnodes': (),
1119 1119 }
1120 1120
1121 1121 def getrepocaps(repo, allowpushback=False):
1122 1122 """return the bundle2 capabilities for a given repo
1123 1123
1124 1124 Exists to allow extensions (like evolution) to mutate the capabilities.
1125 1125 """
1126 1126 caps = capabilities.copy()
1127 1127 caps['changegroup'] = tuple(sorted(changegroup.packermap.keys()))
1128 1128 if obsolete.isenabled(repo, obsolete.exchangeopt):
1129 1129 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1130 1130 caps['obsmarkers'] = supportedformat
1131 1131 if allowpushback:
1132 1132 caps['pushback'] = ()
1133 1133 return caps
1134 1134
1135 1135 def bundle2caps(remote):
1136 1136 """return the bundle capabilities of a peer as dict"""
1137 1137 raw = remote.capable('bundle2')
1138 1138 if not raw and raw != '':
1139 1139 return {}
1140 1140 capsblob = urllib.unquote(remote.capable('bundle2'))
1141 1141 return decodecaps(capsblob)
1142 1142
1143 1143 def obsmarkersversion(caps):
1144 1144 """extract the list of supported obsmarkers versions from a bundle2caps dict
1145 1145 """
1146 1146 obscaps = caps.get('obsmarkers', ())
1147 1147 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1148 1148
1149 1149 @parthandler('changegroup', ('version', 'nbchanges'))
1150 1150 def handlechangegroup(op, inpart):
1151 1151 """apply a changegroup part on the repo
1152 1152
1153 1153 This is a very early implementation that will massive rework before being
1154 1154 inflicted to any end-user.
1155 1155 """
1156 1156 # Make sure we trigger a transaction creation
1157 1157 #
1158 1158 # The addchangegroup function will get a transaction object by itself, but
1159 1159 # we need to make sure we trigger the creation of a transaction object used
1160 1160 # for the whole processing scope.
1161 1161 op.gettransaction()
1162 1162 unpackerversion = inpart.params.get('version', '01')
1163 1163 # We should raise an appropriate exception here
1164 1164 unpacker = changegroup.packermap[unpackerversion][1]
1165 1165 cg = unpacker(inpart, 'UN')
1166 1166 # the source and url passed here are overwritten by the one contained in
1167 1167 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1168 1168 nbchangesets = None
1169 1169 if 'nbchanges' in inpart.params:
1170 1170 nbchangesets = int(inpart.params.get('nbchanges'))
1171 1171 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2',
1172 1172 expectedtotal=nbchangesets)
1173 1173 op.records.add('changegroup', {'return': ret})
1174 1174 if op.reply is not None:
1175 1175 # This is definitely not the final form of this
1176 1176 # return. But one need to start somewhere.
1177 1177 part = op.reply.newpart('reply:changegroup', mandatory=False)
1178 1178 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1179 1179 part.addparam('return', '%i' % ret, mandatory=False)
1180 1180 assert not inpart.read()
1181 1181
1182 1182 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1183 1183 ['digest:%s' % k for k in util.DIGESTS.keys()])
1184 1184 @parthandler('remote-changegroup', _remotechangegroupparams)
1185 1185 def handleremotechangegroup(op, inpart):
1186 1186 """apply a bundle10 on the repo, given an url and validation information
1187 1187
1188 1188 All the information about the remote bundle to import are given as
1189 1189 parameters. The parameters include:
1190 1190 - url: the url to the bundle10.
1191 1191 - size: the bundle10 file size. It is used to validate what was
1192 1192 retrieved by the client matches the server knowledge about the bundle.
1193 1193 - digests: a space separated list of the digest types provided as
1194 1194 parameters.
1195 1195 - digest:<digest-type>: the hexadecimal representation of the digest with
1196 1196 that name. Like the size, it is used to validate what was retrieved by
1197 1197 the client matches what the server knows about the bundle.
1198 1198
1199 1199 When multiple digest types are given, all of them are checked.
1200 1200 """
1201 1201 try:
1202 1202 raw_url = inpart.params['url']
1203 1203 except KeyError:
1204 1204 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1205 1205 parsed_url = util.url(raw_url)
1206 1206 if parsed_url.scheme not in capabilities['remote-changegroup']:
1207 1207 raise util.Abort(_('remote-changegroup does not support %s urls') %
1208 1208 parsed_url.scheme)
1209 1209
1210 1210 try:
1211 1211 size = int(inpart.params['size'])
1212 1212 except ValueError:
1213 1213 raise util.Abort(_('remote-changegroup: invalid value for param "%s"')
1214 1214 % 'size')
1215 1215 except KeyError:
1216 1216 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1217 1217
1218 1218 digests = {}
1219 1219 for typ in inpart.params.get('digests', '').split():
1220 1220 param = 'digest:%s' % typ
1221 1221 try:
1222 1222 value = inpart.params[param]
1223 1223 except KeyError:
1224 1224 raise util.Abort(_('remote-changegroup: missing "%s" param') %
1225 1225 param)
1226 1226 digests[typ] = value
1227 1227
1228 1228 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1229 1229
1230 1230 # Make sure we trigger a transaction creation
1231 1231 #
1232 1232 # The addchangegroup function will get a transaction object by itself, but
1233 1233 # we need to make sure we trigger the creation of a transaction object used
1234 1234 # for the whole processing scope.
1235 1235 op.gettransaction()
1236 1236 import exchange
1237 1237 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1238 1238 if not isinstance(cg, changegroup.cg1unpacker):
1239 1239 raise util.Abort(_('%s: not a bundle version 1.0') %
1240 1240 util.hidepassword(raw_url))
1241 1241 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
1242 1242 op.records.add('changegroup', {'return': ret})
1243 1243 if op.reply is not None:
1244 1244 # This is definitely not the final form of this
1245 1245 # return. But one need to start somewhere.
1246 1246 part = op.reply.newpart('reply:changegroup')
1247 1247 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1248 1248 part.addparam('return', '%i' % ret, mandatory=False)
1249 1249 try:
1250 1250 real_part.validate()
1251 1251 except util.Abort, e:
1252 1252 raise util.Abort(_('bundle at %s is corrupted:\n%s') %
1253 1253 (util.hidepassword(raw_url), str(e)))
1254 1254 assert not inpart.read()
1255 1255
1256 1256 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1257 1257 def handlereplychangegroup(op, inpart):
1258 1258 ret = int(inpart.params['return'])
1259 1259 replyto = int(inpart.params['in-reply-to'])
1260 1260 op.records.add('changegroup', {'return': ret}, replyto)
1261 1261
1262 1262 @parthandler('check:heads')
1263 1263 def handlecheckheads(op, inpart):
1264 1264 """check that head of the repo did not change
1265 1265
1266 1266 This is used to detect a push race when using unbundle.
1267 1267 This replaces the "heads" argument of unbundle."""
1268 1268 h = inpart.read(20)
1269 1269 heads = []
1270 1270 while len(h) == 20:
1271 1271 heads.append(h)
1272 1272 h = inpart.read(20)
1273 1273 assert not h
1274 1274 if heads != op.repo.heads():
1275 1275 raise error.PushRaced('repository changed while pushing - '
1276 1276 'please try again')
1277 1277
1278 1278 @parthandler('output')
1279 1279 def handleoutput(op, inpart):
1280 1280 """forward output captured on the server to the client"""
1281 1281 for line in inpart.read().splitlines():
1282 1282 op.ui.status(('remote: %s\n' % line))
1283 1283
1284 1284 @parthandler('replycaps')
1285 1285 def handlereplycaps(op, inpart):
1286 1286 """Notify that a reply bundle should be created
1287 1287
1288 1288 The payload contains the capabilities information for the reply"""
1289 1289 caps = decodecaps(inpart.read())
1290 1290 if op.reply is None:
1291 1291 op.reply = bundle20(op.ui, caps)
1292 1292
1293 1293 @parthandler('error:abort', ('message', 'hint'))
1294 1294 def handleerrorabort(op, inpart):
1295 1295 """Used to transmit abort error over the wire"""
1296 1296 raise util.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
1297 1297
1298 1298 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1299 1299 'in-reply-to'))
1300 1300 def handleerrorpushkey(op, inpart):
1301 1301 """Used to transmit failure of a mandatory pushkey over the wire"""
1302 1302 kwargs = {}
1303 1303 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1304 1304 value = inpart.params.get(name)
1305 1305 if value is not None:
1306 1306 kwargs[name] = value
1307 1307 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1308 1308
1309 1309 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1310 1310 def handleerrorunsupportedcontent(op, inpart):
1311 1311 """Used to transmit unknown content error over the wire"""
1312 1312 kwargs = {}
1313 1313 parttype = inpart.params.get('parttype')
1314 1314 if parttype is not None:
1315 1315 kwargs['parttype'] = parttype
1316 1316 params = inpart.params.get('params')
1317 1317 if params is not None:
1318 1318 kwargs['params'] = params.split('\0')
1319 1319
1320 1320 raise error.UnsupportedPartError(**kwargs)
1321 1321
1322 1322 @parthandler('error:pushraced', ('message',))
1323 1323 def handleerrorpushraced(op, inpart):
1324 1324 """Used to transmit push race error over the wire"""
1325 1325 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1326 1326
1327 1327 @parthandler('listkeys', ('namespace',))
1328 1328 def handlelistkeys(op, inpart):
1329 1329 """retrieve pushkey namespace content stored in a bundle2"""
1330 1330 namespace = inpart.params['namespace']
1331 1331 r = pushkey.decodekeys(inpart.read())
1332 1332 op.records.add('listkeys', (namespace, r))
1333 1333
1334 1334 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1335 1335 def handlepushkey(op, inpart):
1336 1336 """process a pushkey request"""
1337 1337 dec = pushkey.decode
1338 1338 namespace = dec(inpart.params['namespace'])
1339 1339 key = dec(inpart.params['key'])
1340 1340 old = dec(inpart.params['old'])
1341 1341 new = dec(inpart.params['new'])
1342 1342 ret = op.repo.pushkey(namespace, key, old, new)
1343 1343 record = {'namespace': namespace,
1344 1344 'key': key,
1345 1345 'old': old,
1346 1346 'new': new}
1347 1347 op.records.add('pushkey', record)
1348 1348 if op.reply is not None:
1349 1349 rpart = op.reply.newpart('reply:pushkey')
1350 1350 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1351 1351 rpart.addparam('return', '%i' % ret, mandatory=False)
1352 1352 if inpart.mandatory and not ret:
1353 1353 kwargs = {}
1354 1354 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1355 1355 if key in inpart.params:
1356 1356 kwargs[key] = inpart.params[key]
1357 1357 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1358 1358
1359 1359 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1360 1360 def handlepushkeyreply(op, inpart):
1361 1361 """retrieve the result of a pushkey request"""
1362 1362 ret = int(inpart.params['return'])
1363 1363 partid = int(inpart.params['in-reply-to'])
1364 1364 op.records.add('pushkey', {'return': ret}, partid)
1365 1365
1366 1366 @parthandler('obsmarkers')
1367 1367 def handleobsmarker(op, inpart):
1368 1368 """add a stream of obsmarkers to the repo"""
1369 1369 tr = op.gettransaction()
1370 1370 markerdata = inpart.read()
1371 1371 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1372 1372 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1373 1373 % len(markerdata))
1374 1374 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1375 1375 if new:
1376 1376 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1377 1377 op.records.add('obsmarkers', {'new': new})
1378 1378 if op.reply is not None:
1379 1379 rpart = op.reply.newpart('reply:obsmarkers')
1380 1380 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1381 1381 rpart.addparam('new', '%i' % new, mandatory=False)
1382 1382
1383 1383
1384 1384 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1385 1385 def handleobsmarkerreply(op, inpart):
1386 1386 """retrieve the result of a pushkey request"""
1387 1387 ret = int(inpart.params['new'])
1388 1388 partid = int(inpart.params['in-reply-to'])
1389 1389 op.records.add('obsmarkers', {'new': ret}, partid)
1390 1390
1391 1391 @parthandler('hgtagsfnodes')
1392 1392 def handlehgtagsfnodes(op, inpart):
1393 1393 """Applies .hgtags fnodes cache entries to the local repo.
1394 1394
1395 1395 Payload is pairs of 20 byte changeset nodes and filenodes.
1396 1396 """
1397 1397 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1398 1398
1399 1399 count = 0
1400 1400 while True:
1401 1401 node = inpart.read(20)
1402 1402 fnode = inpart.read(20)
1403 1403 if len(node) < 20 or len(fnode) < 20:
1404 1404 op.ui.debug('received incomplete .hgtags fnodes data, ignoring\n')
1405 1405 break
1406 1406 cache.setfnode(node, fnode)
1407 1407 count += 1
1408 1408
1409 1409 cache.write()
1410 1410 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
@@ -1,1572 +1,1572
1 1 # exchange.py - utility to exchange data between repos.
2 2 #
3 3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 import time
9 9 from i18n import _
10 10 from node import hex, nullid
11 11 import errno, urllib
12 12 import util, scmutil, changegroup, base85, error, store
13 13 import discovery, phases, obsolete, bookmarks as bookmod, bundle2, pushkey
14 14 import lock as lockmod
15 15 import tags
16 16
17 17 def readbundle(ui, fh, fname, vfs=None):
18 18 header = changegroup.readexactly(fh, 4)
19 19
20 20 alg = None
21 21 if not fname:
22 22 fname = "stream"
23 23 if not header.startswith('HG') and header.startswith('\0'):
24 24 fh = changegroup.headerlessfixup(fh, header)
25 25 header = "HG10"
26 26 alg = 'UN'
27 27 elif vfs:
28 28 fname = vfs.join(fname)
29 29
30 30 magic, version = header[0:2], header[2:4]
31 31
32 32 if magic != 'HG':
33 33 raise util.Abort(_('%s: not a Mercurial bundle') % fname)
34 34 if version == '10':
35 35 if alg is None:
36 36 alg = changegroup.readexactly(fh, 2)
37 37 return changegroup.cg1unpacker(fh, alg)
38 38 elif version.startswith('2'):
39 return bundle2.getunbundler(ui, fh, header=magic + version)
39 return bundle2.getunbundler(ui, fh, magicstring=magic + version)
40 40 else:
41 41 raise util.Abort(_('%s: unknown bundle version %s') % (fname, version))
42 42
43 43 def buildobsmarkerspart(bundler, markers):
44 44 """add an obsmarker part to the bundler with <markers>
45 45
46 46 No part is created if markers is empty.
47 47 Raises ValueError if the bundler doesn't support any known obsmarker format.
48 48 """
49 49 if markers:
50 50 remoteversions = bundle2.obsmarkersversion(bundler.capabilities)
51 51 version = obsolete.commonversion(remoteversions)
52 52 if version is None:
53 53 raise ValueError('bundler do not support common obsmarker format')
54 54 stream = obsolete.encodemarkers(markers, True, version=version)
55 55 return bundler.newpart('obsmarkers', data=stream)
56 56 return None
57 57
58 58 def _canusebundle2(op):
59 59 """return true if a pull/push can use bundle2
60 60
61 61 Feel free to nuke this function when we drop the experimental option"""
62 62 return (op.repo.ui.configbool('experimental', 'bundle2-exp', True)
63 63 and op.remote.capable('bundle2'))
64 64
65 65
66 66 class pushoperation(object):
67 67 """A object that represent a single push operation
68 68
69 69 It purpose is to carry push related state and very common operation.
70 70
71 71 A new should be created at the beginning of each push and discarded
72 72 afterward.
73 73 """
74 74
75 75 def __init__(self, repo, remote, force=False, revs=None, newbranch=False,
76 76 bookmarks=()):
77 77 # repo we push from
78 78 self.repo = repo
79 79 self.ui = repo.ui
80 80 # repo we push to
81 81 self.remote = remote
82 82 # force option provided
83 83 self.force = force
84 84 # revs to be pushed (None is "all")
85 85 self.revs = revs
86 86 # bookmark explicitly pushed
87 87 self.bookmarks = bookmarks
88 88 # allow push of new branch
89 89 self.newbranch = newbranch
90 90 # did a local lock get acquired?
91 91 self.locallocked = None
92 92 # step already performed
93 93 # (used to check what steps have been already performed through bundle2)
94 94 self.stepsdone = set()
95 95 # Integer version of the changegroup push result
96 96 # - None means nothing to push
97 97 # - 0 means HTTP error
98 98 # - 1 means we pushed and remote head count is unchanged *or*
99 99 # we have outgoing changesets but refused to push
100 100 # - other values as described by addchangegroup()
101 101 self.cgresult = None
102 102 # Boolean value for the bookmark push
103 103 self.bkresult = None
104 104 # discover.outgoing object (contains common and outgoing data)
105 105 self.outgoing = None
106 106 # all remote heads before the push
107 107 self.remoteheads = None
108 108 # testable as a boolean indicating if any nodes are missing locally.
109 109 self.incoming = None
110 110 # phases changes that must be pushed along side the changesets
111 111 self.outdatedphases = None
112 112 # phases changes that must be pushed if changeset push fails
113 113 self.fallbackoutdatedphases = None
114 114 # outgoing obsmarkers
115 115 self.outobsmarkers = set()
116 116 # outgoing bookmarks
117 117 self.outbookmarks = []
118 118 # transaction manager
119 119 self.trmanager = None
120 120 # map { pushkey partid -> callback handling failure}
121 121 # used to handle exception from mandatory pushkey part failure
122 122 self.pkfailcb = {}
123 123
124 124 @util.propertycache
125 125 def futureheads(self):
126 126 """future remote heads if the changeset push succeeds"""
127 127 return self.outgoing.missingheads
128 128
129 129 @util.propertycache
130 130 def fallbackheads(self):
131 131 """future remote heads if the changeset push fails"""
132 132 if self.revs is None:
133 133 # not target to push, all common are relevant
134 134 return self.outgoing.commonheads
135 135 unfi = self.repo.unfiltered()
136 136 # I want cheads = heads(::missingheads and ::commonheads)
137 137 # (missingheads is revs with secret changeset filtered out)
138 138 #
139 139 # This can be expressed as:
140 140 # cheads = ( (missingheads and ::commonheads)
141 141 # + (commonheads and ::missingheads))"
142 142 # )
143 143 #
144 144 # while trying to push we already computed the following:
145 145 # common = (::commonheads)
146 146 # missing = ((commonheads::missingheads) - commonheads)
147 147 #
148 148 # We can pick:
149 149 # * missingheads part of common (::commonheads)
150 150 common = set(self.outgoing.common)
151 151 nm = self.repo.changelog.nodemap
152 152 cheads = [node for node in self.revs if nm[node] in common]
153 153 # and
154 154 # * commonheads parents on missing
155 155 revset = unfi.set('%ln and parents(roots(%ln))',
156 156 self.outgoing.commonheads,
157 157 self.outgoing.missing)
158 158 cheads.extend(c.node() for c in revset)
159 159 return cheads
160 160
161 161 @property
162 162 def commonheads(self):
163 163 """set of all common heads after changeset bundle push"""
164 164 if self.cgresult:
165 165 return self.futureheads
166 166 else:
167 167 return self.fallbackheads
168 168
169 169 # mapping of message used when pushing bookmark
170 170 bookmsgmap = {'update': (_("updating bookmark %s\n"),
171 171 _('updating bookmark %s failed!\n')),
172 172 'export': (_("exporting bookmark %s\n"),
173 173 _('exporting bookmark %s failed!\n')),
174 174 'delete': (_("deleting remote bookmark %s\n"),
175 175 _('deleting remote bookmark %s failed!\n')),
176 176 }
177 177
178 178
179 179 def push(repo, remote, force=False, revs=None, newbranch=False, bookmarks=()):
180 180 '''Push outgoing changesets (limited by revs) from a local
181 181 repository to remote. Return an integer:
182 182 - None means nothing to push
183 183 - 0 means HTTP error
184 184 - 1 means we pushed and remote head count is unchanged *or*
185 185 we have outgoing changesets but refused to push
186 186 - other values as described by addchangegroup()
187 187 '''
188 188 pushop = pushoperation(repo, remote, force, revs, newbranch, bookmarks)
189 189 if pushop.remote.local():
190 190 missing = (set(pushop.repo.requirements)
191 191 - pushop.remote.local().supported)
192 192 if missing:
193 193 msg = _("required features are not"
194 194 " supported in the destination:"
195 195 " %s") % (', '.join(sorted(missing)))
196 196 raise util.Abort(msg)
197 197
198 198 # there are two ways to push to remote repo:
199 199 #
200 200 # addchangegroup assumes local user can lock remote
201 201 # repo (local filesystem, old ssh servers).
202 202 #
203 203 # unbundle assumes local user cannot lock remote repo (new ssh
204 204 # servers, http servers).
205 205
206 206 if not pushop.remote.canpush():
207 207 raise util.Abort(_("destination does not support push"))
208 208 # get local lock as we might write phase data
209 209 localwlock = locallock = None
210 210 try:
211 211 # bundle2 push may receive a reply bundle touching bookmarks or other
212 212 # things requiring the wlock. Take it now to ensure proper ordering.
213 213 maypushback = pushop.ui.configbool('experimental', 'bundle2.pushback')
214 214 if _canusebundle2(pushop) and maypushback:
215 215 localwlock = pushop.repo.wlock()
216 216 locallock = pushop.repo.lock()
217 217 pushop.locallocked = True
218 218 except IOError, err:
219 219 pushop.locallocked = False
220 220 if err.errno != errno.EACCES:
221 221 raise
222 222 # source repo cannot be locked.
223 223 # We do not abort the push, but just disable the local phase
224 224 # synchronisation.
225 225 msg = 'cannot lock source repository: %s\n' % err
226 226 pushop.ui.debug(msg)
227 227 try:
228 228 if pushop.locallocked:
229 229 pushop.trmanager = transactionmanager(repo,
230 230 'push-response',
231 231 pushop.remote.url())
232 232 pushop.repo.checkpush(pushop)
233 233 lock = None
234 234 unbundle = pushop.remote.capable('unbundle')
235 235 if not unbundle:
236 236 lock = pushop.remote.lock()
237 237 try:
238 238 _pushdiscovery(pushop)
239 239 if _canusebundle2(pushop):
240 240 _pushbundle2(pushop)
241 241 _pushchangeset(pushop)
242 242 _pushsyncphase(pushop)
243 243 _pushobsolete(pushop)
244 244 _pushbookmark(pushop)
245 245 finally:
246 246 if lock is not None:
247 247 lock.release()
248 248 if pushop.trmanager:
249 249 pushop.trmanager.close()
250 250 finally:
251 251 if pushop.trmanager:
252 252 pushop.trmanager.release()
253 253 if locallock is not None:
254 254 locallock.release()
255 255 if localwlock is not None:
256 256 localwlock.release()
257 257
258 258 return pushop
259 259
260 260 # list of steps to perform discovery before push
261 261 pushdiscoveryorder = []
262 262
263 263 # Mapping between step name and function
264 264 #
265 265 # This exists to help extensions wrap steps if necessary
266 266 pushdiscoverymapping = {}
267 267
268 268 def pushdiscovery(stepname):
269 269 """decorator for function performing discovery before push
270 270
271 271 The function is added to the step -> function mapping and appended to the
272 272 list of steps. Beware that decorated function will be added in order (this
273 273 may matter).
274 274
275 275 You can only use this decorator for a new step, if you want to wrap a step
276 276 from an extension, change the pushdiscovery dictionary directly."""
277 277 def dec(func):
278 278 assert stepname not in pushdiscoverymapping
279 279 pushdiscoverymapping[stepname] = func
280 280 pushdiscoveryorder.append(stepname)
281 281 return func
282 282 return dec
283 283
284 284 def _pushdiscovery(pushop):
285 285 """Run all discovery steps"""
286 286 for stepname in pushdiscoveryorder:
287 287 step = pushdiscoverymapping[stepname]
288 288 step(pushop)
289 289
290 290 @pushdiscovery('changeset')
291 291 def _pushdiscoverychangeset(pushop):
292 292 """discover the changeset that need to be pushed"""
293 293 fci = discovery.findcommonincoming
294 294 commoninc = fci(pushop.repo, pushop.remote, force=pushop.force)
295 295 common, inc, remoteheads = commoninc
296 296 fco = discovery.findcommonoutgoing
297 297 outgoing = fco(pushop.repo, pushop.remote, onlyheads=pushop.revs,
298 298 commoninc=commoninc, force=pushop.force)
299 299 pushop.outgoing = outgoing
300 300 pushop.remoteheads = remoteheads
301 301 pushop.incoming = inc
302 302
303 303 @pushdiscovery('phase')
304 304 def _pushdiscoveryphase(pushop):
305 305 """discover the phase that needs to be pushed
306 306
307 307 (computed for both success and failure case for changesets push)"""
308 308 outgoing = pushop.outgoing
309 309 unfi = pushop.repo.unfiltered()
310 310 remotephases = pushop.remote.listkeys('phases')
311 311 publishing = remotephases.get('publishing', False)
312 312 if (pushop.ui.configbool('ui', '_usedassubrepo', False)
313 313 and remotephases # server supports phases
314 314 and not pushop.outgoing.missing # no changesets to be pushed
315 315 and publishing):
316 316 # When:
317 317 # - this is a subrepo push
318 318 # - and remote support phase
319 319 # - and no changeset are to be pushed
320 320 # - and remote is publishing
321 321 # We may be in issue 3871 case!
322 322 # We drop the possible phase synchronisation done by
323 323 # courtesy to publish changesets possibly locally draft
324 324 # on the remote.
325 325 remotephases = {'publishing': 'True'}
326 326 ana = phases.analyzeremotephases(pushop.repo,
327 327 pushop.fallbackheads,
328 328 remotephases)
329 329 pheads, droots = ana
330 330 extracond = ''
331 331 if not publishing:
332 332 extracond = ' and public()'
333 333 revset = 'heads((%%ln::%%ln) %s)' % extracond
334 334 # Get the list of all revs draft on remote by public here.
335 335 # XXX Beware that revset break if droots is not strictly
336 336 # XXX root we may want to ensure it is but it is costly
337 337 fallback = list(unfi.set(revset, droots, pushop.fallbackheads))
338 338 if not outgoing.missing:
339 339 future = fallback
340 340 else:
341 341 # adds changeset we are going to push as draft
342 342 #
343 343 # should not be necessary for publishing server, but because of an
344 344 # issue fixed in xxxxx we have to do it anyway.
345 345 fdroots = list(unfi.set('roots(%ln + %ln::)',
346 346 outgoing.missing, droots))
347 347 fdroots = [f.node() for f in fdroots]
348 348 future = list(unfi.set(revset, fdroots, pushop.futureheads))
349 349 pushop.outdatedphases = future
350 350 pushop.fallbackoutdatedphases = fallback
351 351
352 352 @pushdiscovery('obsmarker')
353 353 def _pushdiscoveryobsmarkers(pushop):
354 354 if (obsolete.isenabled(pushop.repo, obsolete.exchangeopt)
355 355 and pushop.repo.obsstore
356 356 and 'obsolete' in pushop.remote.listkeys('namespaces')):
357 357 repo = pushop.repo
358 358 # very naive computation, that can be quite expensive on big repo.
359 359 # However: evolution is currently slow on them anyway.
360 360 nodes = (c.node() for c in repo.set('::%ln', pushop.futureheads))
361 361 pushop.outobsmarkers = pushop.repo.obsstore.relevantmarkers(nodes)
362 362
363 363 @pushdiscovery('bookmarks')
364 364 def _pushdiscoverybookmarks(pushop):
365 365 ui = pushop.ui
366 366 repo = pushop.repo.unfiltered()
367 367 remote = pushop.remote
368 368 ui.debug("checking for updated bookmarks\n")
369 369 ancestors = ()
370 370 if pushop.revs:
371 371 revnums = map(repo.changelog.rev, pushop.revs)
372 372 ancestors = repo.changelog.ancestors(revnums, inclusive=True)
373 373 remotebookmark = remote.listkeys('bookmarks')
374 374
375 375 explicit = set(pushop.bookmarks)
376 376
377 377 comp = bookmod.compare(repo, repo._bookmarks, remotebookmark, srchex=hex)
378 378 addsrc, adddst, advsrc, advdst, diverge, differ, invalid, same = comp
379 379 for b, scid, dcid in advsrc:
380 380 if b in explicit:
381 381 explicit.remove(b)
382 382 if not ancestors or repo[scid].rev() in ancestors:
383 383 pushop.outbookmarks.append((b, dcid, scid))
384 384 # search added bookmark
385 385 for b, scid, dcid in addsrc:
386 386 if b in explicit:
387 387 explicit.remove(b)
388 388 pushop.outbookmarks.append((b, '', scid))
389 389 # search for overwritten bookmark
390 390 for b, scid, dcid in advdst + diverge + differ:
391 391 if b in explicit:
392 392 explicit.remove(b)
393 393 pushop.outbookmarks.append((b, dcid, scid))
394 394 # search for bookmark to delete
395 395 for b, scid, dcid in adddst:
396 396 if b in explicit:
397 397 explicit.remove(b)
398 398 # treat as "deleted locally"
399 399 pushop.outbookmarks.append((b, dcid, ''))
400 400 # identical bookmarks shouldn't get reported
401 401 for b, scid, dcid in same:
402 402 if b in explicit:
403 403 explicit.remove(b)
404 404
405 405 if explicit:
406 406 explicit = sorted(explicit)
407 407 # we should probably list all of them
408 408 ui.warn(_('bookmark %s does not exist on the local '
409 409 'or remote repository!\n') % explicit[0])
410 410 pushop.bkresult = 2
411 411
412 412 pushop.outbookmarks.sort()
413 413
414 414 def _pushcheckoutgoing(pushop):
415 415 outgoing = pushop.outgoing
416 416 unfi = pushop.repo.unfiltered()
417 417 if not outgoing.missing:
418 418 # nothing to push
419 419 scmutil.nochangesfound(unfi.ui, unfi, outgoing.excluded)
420 420 return False
421 421 # something to push
422 422 if not pushop.force:
423 423 # if repo.obsstore == False --> no obsolete
424 424 # then, save the iteration
425 425 if unfi.obsstore:
426 426 # this message are here for 80 char limit reason
427 427 mso = _("push includes obsolete changeset: %s!")
428 428 mst = {"unstable": _("push includes unstable changeset: %s!"),
429 429 "bumped": _("push includes bumped changeset: %s!"),
430 430 "divergent": _("push includes divergent changeset: %s!")}
431 431 # If we are to push if there is at least one
432 432 # obsolete or unstable changeset in missing, at
433 433 # least one of the missinghead will be obsolete or
434 434 # unstable. So checking heads only is ok
435 435 for node in outgoing.missingheads:
436 436 ctx = unfi[node]
437 437 if ctx.obsolete():
438 438 raise util.Abort(mso % ctx)
439 439 elif ctx.troubled():
440 440 raise util.Abort(mst[ctx.troubles()[0]] % ctx)
441 441 newbm = pushop.ui.configlist('bookmarks', 'pushing')
442 442 discovery.checkheads(unfi, pushop.remote, outgoing,
443 443 pushop.remoteheads,
444 444 pushop.newbranch,
445 445 bool(pushop.incoming),
446 446 newbm)
447 447 return True
448 448
449 449 # List of names of steps to perform for an outgoing bundle2, order matters.
450 450 b2partsgenorder = []
451 451
452 452 # Mapping between step name and function
453 453 #
454 454 # This exists to help extensions wrap steps if necessary
455 455 b2partsgenmapping = {}
456 456
457 457 def b2partsgenerator(stepname, idx=None):
458 458 """decorator for function generating bundle2 part
459 459
460 460 The function is added to the step -> function mapping and appended to the
461 461 list of steps. Beware that decorated functions will be added in order
462 462 (this may matter).
463 463
464 464 You can only use this decorator for new steps, if you want to wrap a step
465 465 from an extension, attack the b2partsgenmapping dictionary directly."""
466 466 def dec(func):
467 467 assert stepname not in b2partsgenmapping
468 468 b2partsgenmapping[stepname] = func
469 469 if idx is None:
470 470 b2partsgenorder.append(stepname)
471 471 else:
472 472 b2partsgenorder.insert(idx, stepname)
473 473 return func
474 474 return dec
475 475
476 476 @b2partsgenerator('changeset')
477 477 def _pushb2ctx(pushop, bundler):
478 478 """handle changegroup push through bundle2
479 479
480 480 addchangegroup result is stored in the ``pushop.cgresult`` attribute.
481 481 """
482 482 if 'changesets' in pushop.stepsdone:
483 483 return
484 484 pushop.stepsdone.add('changesets')
485 485 # Send known heads to the server for race detection.
486 486 if not _pushcheckoutgoing(pushop):
487 487 return
488 488 pushop.repo.prepushoutgoinghooks(pushop.repo,
489 489 pushop.remote,
490 490 pushop.outgoing)
491 491 if not pushop.force:
492 492 bundler.newpart('check:heads', data=iter(pushop.remoteheads))
493 493 b2caps = bundle2.bundle2caps(pushop.remote)
494 494 version = None
495 495 cgversions = b2caps.get('changegroup')
496 496 if not cgversions: # 3.1 and 3.2 ship with an empty value
497 497 cg = changegroup.getlocalchangegroupraw(pushop.repo, 'push',
498 498 pushop.outgoing)
499 499 else:
500 500 cgversions = [v for v in cgversions if v in changegroup.packermap]
501 501 if not cgversions:
502 502 raise ValueError(_('no common changegroup version'))
503 503 version = max(cgversions)
504 504 cg = changegroup.getlocalchangegroupraw(pushop.repo, 'push',
505 505 pushop.outgoing,
506 506 version=version)
507 507 cgpart = bundler.newpart('changegroup', data=cg)
508 508 if version is not None:
509 509 cgpart.addparam('version', version)
510 510 def handlereply(op):
511 511 """extract addchangegroup returns from server reply"""
512 512 cgreplies = op.records.getreplies(cgpart.id)
513 513 assert len(cgreplies['changegroup']) == 1
514 514 pushop.cgresult = cgreplies['changegroup'][0]['return']
515 515 return handlereply
516 516
517 517 @b2partsgenerator('phase')
518 518 def _pushb2phases(pushop, bundler):
519 519 """handle phase push through bundle2"""
520 520 if 'phases' in pushop.stepsdone:
521 521 return
522 522 b2caps = bundle2.bundle2caps(pushop.remote)
523 523 if not 'pushkey' in b2caps:
524 524 return
525 525 pushop.stepsdone.add('phases')
526 526 part2node = []
527 527
528 528 def handlefailure(pushop, exc):
529 529 targetid = int(exc.partid)
530 530 for partid, node in part2node:
531 531 if partid == targetid:
532 532 raise error.Abort(_('updating %s to public failed') % node)
533 533
534 534 enc = pushkey.encode
535 535 for newremotehead in pushop.outdatedphases:
536 536 part = bundler.newpart('pushkey')
537 537 part.addparam('namespace', enc('phases'))
538 538 part.addparam('key', enc(newremotehead.hex()))
539 539 part.addparam('old', enc(str(phases.draft)))
540 540 part.addparam('new', enc(str(phases.public)))
541 541 part2node.append((part.id, newremotehead))
542 542 pushop.pkfailcb[part.id] = handlefailure
543 543
544 544 def handlereply(op):
545 545 for partid, node in part2node:
546 546 partrep = op.records.getreplies(partid)
547 547 results = partrep['pushkey']
548 548 assert len(results) <= 1
549 549 msg = None
550 550 if not results:
551 551 msg = _('server ignored update of %s to public!\n') % node
552 552 elif not int(results[0]['return']):
553 553 msg = _('updating %s to public failed!\n') % node
554 554 if msg is not None:
555 555 pushop.ui.warn(msg)
556 556 return handlereply
557 557
558 558 @b2partsgenerator('obsmarkers')
559 559 def _pushb2obsmarkers(pushop, bundler):
560 560 if 'obsmarkers' in pushop.stepsdone:
561 561 return
562 562 remoteversions = bundle2.obsmarkersversion(bundler.capabilities)
563 563 if obsolete.commonversion(remoteversions) is None:
564 564 return
565 565 pushop.stepsdone.add('obsmarkers')
566 566 if pushop.outobsmarkers:
567 567 markers = sorted(pushop.outobsmarkers)
568 568 buildobsmarkerspart(bundler, markers)
569 569
570 570 @b2partsgenerator('bookmarks')
571 571 def _pushb2bookmarks(pushop, bundler):
572 572 """handle phase push through bundle2"""
573 573 if 'bookmarks' in pushop.stepsdone:
574 574 return
575 575 b2caps = bundle2.bundle2caps(pushop.remote)
576 576 if 'pushkey' not in b2caps:
577 577 return
578 578 pushop.stepsdone.add('bookmarks')
579 579 part2book = []
580 580 enc = pushkey.encode
581 581
582 582 def handlefailure(pushop, exc):
583 583 targetid = int(exc.partid)
584 584 for partid, book, action in part2book:
585 585 if partid == targetid:
586 586 raise error.Abort(bookmsgmap[action][1].rstrip() % book)
587 587 # we should not be called for part we did not generated
588 588 assert False
589 589
590 590 for book, old, new in pushop.outbookmarks:
591 591 part = bundler.newpart('pushkey')
592 592 part.addparam('namespace', enc('bookmarks'))
593 593 part.addparam('key', enc(book))
594 594 part.addparam('old', enc(old))
595 595 part.addparam('new', enc(new))
596 596 action = 'update'
597 597 if not old:
598 598 action = 'export'
599 599 elif not new:
600 600 action = 'delete'
601 601 part2book.append((part.id, book, action))
602 602 pushop.pkfailcb[part.id] = handlefailure
603 603
604 604 def handlereply(op):
605 605 ui = pushop.ui
606 606 for partid, book, action in part2book:
607 607 partrep = op.records.getreplies(partid)
608 608 results = partrep['pushkey']
609 609 assert len(results) <= 1
610 610 if not results:
611 611 pushop.ui.warn(_('server ignored bookmark %s update\n') % book)
612 612 else:
613 613 ret = int(results[0]['return'])
614 614 if ret:
615 615 ui.status(bookmsgmap[action][0] % book)
616 616 else:
617 617 ui.warn(bookmsgmap[action][1] % book)
618 618 if pushop.bkresult is not None:
619 619 pushop.bkresult = 1
620 620 return handlereply
621 621
622 622
623 623 def _pushbundle2(pushop):
624 624 """push data to the remote using bundle2
625 625
626 626 The only currently supported type of data is changegroup but this will
627 627 evolve in the future."""
628 628 bundler = bundle2.bundle20(pushop.ui, bundle2.bundle2caps(pushop.remote))
629 629 pushback = (pushop.trmanager
630 630 and pushop.ui.configbool('experimental', 'bundle2.pushback'))
631 631
632 632 # create reply capability
633 633 capsblob = bundle2.encodecaps(bundle2.getrepocaps(pushop.repo,
634 634 allowpushback=pushback))
635 635 bundler.newpart('replycaps', data=capsblob)
636 636 replyhandlers = []
637 637 for partgenname in b2partsgenorder:
638 638 partgen = b2partsgenmapping[partgenname]
639 639 ret = partgen(pushop, bundler)
640 640 if callable(ret):
641 641 replyhandlers.append(ret)
642 642 # do not push if nothing to push
643 643 if bundler.nbparts <= 1:
644 644 return
645 645 stream = util.chunkbuffer(bundler.getchunks())
646 646 try:
647 647 try:
648 648 reply = pushop.remote.unbundle(stream, ['force'], 'push')
649 649 except error.BundleValueError, exc:
650 650 raise util.Abort('missing support for %s' % exc)
651 651 try:
652 652 trgetter = None
653 653 if pushback:
654 654 trgetter = pushop.trmanager.transaction
655 655 op = bundle2.processbundle(pushop.repo, reply, trgetter)
656 656 except error.BundleValueError, exc:
657 657 raise util.Abort('missing support for %s' % exc)
658 658 except error.PushkeyFailed, exc:
659 659 partid = int(exc.partid)
660 660 if partid not in pushop.pkfailcb:
661 661 raise
662 662 pushop.pkfailcb[partid](pushop, exc)
663 663 for rephand in replyhandlers:
664 664 rephand(op)
665 665
666 666 def _pushchangeset(pushop):
667 667 """Make the actual push of changeset bundle to remote repo"""
668 668 if 'changesets' in pushop.stepsdone:
669 669 return
670 670 pushop.stepsdone.add('changesets')
671 671 if not _pushcheckoutgoing(pushop):
672 672 return
673 673 pushop.repo.prepushoutgoinghooks(pushop.repo,
674 674 pushop.remote,
675 675 pushop.outgoing)
676 676 outgoing = pushop.outgoing
677 677 unbundle = pushop.remote.capable('unbundle')
678 678 # TODO: get bundlecaps from remote
679 679 bundlecaps = None
680 680 # create a changegroup from local
681 681 if pushop.revs is None and not (outgoing.excluded
682 682 or pushop.repo.changelog.filteredrevs):
683 683 # push everything,
684 684 # use the fast path, no race possible on push
685 685 bundler = changegroup.cg1packer(pushop.repo, bundlecaps)
686 686 cg = changegroup.getsubset(pushop.repo,
687 687 outgoing,
688 688 bundler,
689 689 'push',
690 690 fastpath=True)
691 691 else:
692 692 cg = changegroup.getlocalchangegroup(pushop.repo, 'push', outgoing,
693 693 bundlecaps)
694 694
695 695 # apply changegroup to remote
696 696 if unbundle:
697 697 # local repo finds heads on server, finds out what
698 698 # revs it must push. once revs transferred, if server
699 699 # finds it has different heads (someone else won
700 700 # commit/push race), server aborts.
701 701 if pushop.force:
702 702 remoteheads = ['force']
703 703 else:
704 704 remoteheads = pushop.remoteheads
705 705 # ssh: return remote's addchangegroup()
706 706 # http: return remote's addchangegroup() or 0 for error
707 707 pushop.cgresult = pushop.remote.unbundle(cg, remoteheads,
708 708 pushop.repo.url())
709 709 else:
710 710 # we return an integer indicating remote head count
711 711 # change
712 712 pushop.cgresult = pushop.remote.addchangegroup(cg, 'push',
713 713 pushop.repo.url())
714 714
715 715 def _pushsyncphase(pushop):
716 716 """synchronise phase information locally and remotely"""
717 717 cheads = pushop.commonheads
718 718 # even when we don't push, exchanging phase data is useful
719 719 remotephases = pushop.remote.listkeys('phases')
720 720 if (pushop.ui.configbool('ui', '_usedassubrepo', False)
721 721 and remotephases # server supports phases
722 722 and pushop.cgresult is None # nothing was pushed
723 723 and remotephases.get('publishing', False)):
724 724 # When:
725 725 # - this is a subrepo push
726 726 # - and remote support phase
727 727 # - and no changeset was pushed
728 728 # - and remote is publishing
729 729 # We may be in issue 3871 case!
730 730 # We drop the possible phase synchronisation done by
731 731 # courtesy to publish changesets possibly locally draft
732 732 # on the remote.
733 733 remotephases = {'publishing': 'True'}
734 734 if not remotephases: # old server or public only reply from non-publishing
735 735 _localphasemove(pushop, cheads)
736 736 # don't push any phase data as there is nothing to push
737 737 else:
738 738 ana = phases.analyzeremotephases(pushop.repo, cheads,
739 739 remotephases)
740 740 pheads, droots = ana
741 741 ### Apply remote phase on local
742 742 if remotephases.get('publishing', False):
743 743 _localphasemove(pushop, cheads)
744 744 else: # publish = False
745 745 _localphasemove(pushop, pheads)
746 746 _localphasemove(pushop, cheads, phases.draft)
747 747 ### Apply local phase on remote
748 748
749 749 if pushop.cgresult:
750 750 if 'phases' in pushop.stepsdone:
751 751 # phases already pushed though bundle2
752 752 return
753 753 outdated = pushop.outdatedphases
754 754 else:
755 755 outdated = pushop.fallbackoutdatedphases
756 756
757 757 pushop.stepsdone.add('phases')
758 758
759 759 # filter heads already turned public by the push
760 760 outdated = [c for c in outdated if c.node() not in pheads]
761 761 # fallback to independent pushkey command
762 762 for newremotehead in outdated:
763 763 r = pushop.remote.pushkey('phases',
764 764 newremotehead.hex(),
765 765 str(phases.draft),
766 766 str(phases.public))
767 767 if not r:
768 768 pushop.ui.warn(_('updating %s to public failed!\n')
769 769 % newremotehead)
770 770
771 771 def _localphasemove(pushop, nodes, phase=phases.public):
772 772 """move <nodes> to <phase> in the local source repo"""
773 773 if pushop.trmanager:
774 774 phases.advanceboundary(pushop.repo,
775 775 pushop.trmanager.transaction(),
776 776 phase,
777 777 nodes)
778 778 else:
779 779 # repo is not locked, do not change any phases!
780 780 # Informs the user that phases should have been moved when
781 781 # applicable.
782 782 actualmoves = [n for n in nodes if phase < pushop.repo[n].phase()]
783 783 phasestr = phases.phasenames[phase]
784 784 if actualmoves:
785 785 pushop.ui.status(_('cannot lock source repo, skipping '
786 786 'local %s phase update\n') % phasestr)
787 787
788 788 def _pushobsolete(pushop):
789 789 """utility function to push obsolete markers to a remote"""
790 790 if 'obsmarkers' in pushop.stepsdone:
791 791 return
792 792 repo = pushop.repo
793 793 remote = pushop.remote
794 794 pushop.stepsdone.add('obsmarkers')
795 795 if pushop.outobsmarkers:
796 796 pushop.ui.debug('try to push obsolete markers to remote\n')
797 797 rslts = []
798 798 remotedata = obsolete._pushkeyescape(sorted(pushop.outobsmarkers))
799 799 for key in sorted(remotedata, reverse=True):
800 800 # reverse sort to ensure we end with dump0
801 801 data = remotedata[key]
802 802 rslts.append(remote.pushkey('obsolete', key, '', data))
803 803 if [r for r in rslts if not r]:
804 804 msg = _('failed to push some obsolete markers!\n')
805 805 repo.ui.warn(msg)
806 806
807 807 def _pushbookmark(pushop):
808 808 """Update bookmark position on remote"""
809 809 if pushop.cgresult == 0 or 'bookmarks' in pushop.stepsdone:
810 810 return
811 811 pushop.stepsdone.add('bookmarks')
812 812 ui = pushop.ui
813 813 remote = pushop.remote
814 814
815 815 for b, old, new in pushop.outbookmarks:
816 816 action = 'update'
817 817 if not old:
818 818 action = 'export'
819 819 elif not new:
820 820 action = 'delete'
821 821 if remote.pushkey('bookmarks', b, old, new):
822 822 ui.status(bookmsgmap[action][0] % b)
823 823 else:
824 824 ui.warn(bookmsgmap[action][1] % b)
825 825 # discovery can have set the value form invalid entry
826 826 if pushop.bkresult is not None:
827 827 pushop.bkresult = 1
828 828
829 829 class pulloperation(object):
830 830 """A object that represent a single pull operation
831 831
832 832 It purpose is to carry pull related state and very common operation.
833 833
834 834 A new should be created at the beginning of each pull and discarded
835 835 afterward.
836 836 """
837 837
838 838 def __init__(self, repo, remote, heads=None, force=False, bookmarks=(),
839 839 remotebookmarks=None):
840 840 # repo we pull into
841 841 self.repo = repo
842 842 # repo we pull from
843 843 self.remote = remote
844 844 # revision we try to pull (None is "all")
845 845 self.heads = heads
846 846 # bookmark pulled explicitly
847 847 self.explicitbookmarks = bookmarks
848 848 # do we force pull?
849 849 self.force = force
850 850 # transaction manager
851 851 self.trmanager = None
852 852 # set of common changeset between local and remote before pull
853 853 self.common = None
854 854 # set of pulled head
855 855 self.rheads = None
856 856 # list of missing changeset to fetch remotely
857 857 self.fetch = None
858 858 # remote bookmarks data
859 859 self.remotebookmarks = remotebookmarks
860 860 # result of changegroup pulling (used as return code by pull)
861 861 self.cgresult = None
862 862 # list of step already done
863 863 self.stepsdone = set()
864 864
865 865 @util.propertycache
866 866 def pulledsubset(self):
867 867 """heads of the set of changeset target by the pull"""
868 868 # compute target subset
869 869 if self.heads is None:
870 870 # We pulled every thing possible
871 871 # sync on everything common
872 872 c = set(self.common)
873 873 ret = list(self.common)
874 874 for n in self.rheads:
875 875 if n not in c:
876 876 ret.append(n)
877 877 return ret
878 878 else:
879 879 # We pulled a specific subset
880 880 # sync on this subset
881 881 return self.heads
882 882
883 883 def gettransaction(self):
884 884 # deprecated; talk to trmanager directly
885 885 return self.trmanager.transaction()
886 886
887 887 class transactionmanager(object):
888 888 """An object to manage the life cycle of a transaction
889 889
890 890 It creates the transaction on demand and calls the appropriate hooks when
891 891 closing the transaction."""
892 892 def __init__(self, repo, source, url):
893 893 self.repo = repo
894 894 self.source = source
895 895 self.url = url
896 896 self._tr = None
897 897
898 898 def transaction(self):
899 899 """Return an open transaction object, constructing if necessary"""
900 900 if not self._tr:
901 901 trname = '%s\n%s' % (self.source, util.hidepassword(self.url))
902 902 self._tr = self.repo.transaction(trname)
903 903 self._tr.hookargs['source'] = self.source
904 904 self._tr.hookargs['url'] = self.url
905 905 return self._tr
906 906
907 907 def close(self):
908 908 """close transaction if created"""
909 909 if self._tr is not None:
910 910 self._tr.close()
911 911
912 912 def release(self):
913 913 """release transaction if created"""
914 914 if self._tr is not None:
915 915 self._tr.release()
916 916
917 917 def pull(repo, remote, heads=None, force=False, bookmarks=(), opargs=None):
918 918 if opargs is None:
919 919 opargs = {}
920 920 pullop = pulloperation(repo, remote, heads, force, bookmarks=bookmarks,
921 921 **opargs)
922 922 if pullop.remote.local():
923 923 missing = set(pullop.remote.requirements) - pullop.repo.supported
924 924 if missing:
925 925 msg = _("required features are not"
926 926 " supported in the destination:"
927 927 " %s") % (', '.join(sorted(missing)))
928 928 raise util.Abort(msg)
929 929
930 930 lock = pullop.repo.lock()
931 931 try:
932 932 pullop.trmanager = transactionmanager(repo, 'pull', remote.url())
933 933 _pulldiscovery(pullop)
934 934 if _canusebundle2(pullop):
935 935 _pullbundle2(pullop)
936 936 _pullchangeset(pullop)
937 937 _pullphase(pullop)
938 938 _pullbookmarks(pullop)
939 939 _pullobsolete(pullop)
940 940 pullop.trmanager.close()
941 941 finally:
942 942 pullop.trmanager.release()
943 943 lock.release()
944 944
945 945 return pullop
946 946
947 947 # list of steps to perform discovery before pull
948 948 pulldiscoveryorder = []
949 949
950 950 # Mapping between step name and function
951 951 #
952 952 # This exists to help extensions wrap steps if necessary
953 953 pulldiscoverymapping = {}
954 954
955 955 def pulldiscovery(stepname):
956 956 """decorator for function performing discovery before pull
957 957
958 958 The function is added to the step -> function mapping and appended to the
959 959 list of steps. Beware that decorated function will be added in order (this
960 960 may matter).
961 961
962 962 You can only use this decorator for a new step, if you want to wrap a step
963 963 from an extension, change the pulldiscovery dictionary directly."""
964 964 def dec(func):
965 965 assert stepname not in pulldiscoverymapping
966 966 pulldiscoverymapping[stepname] = func
967 967 pulldiscoveryorder.append(stepname)
968 968 return func
969 969 return dec
970 970
971 971 def _pulldiscovery(pullop):
972 972 """Run all discovery steps"""
973 973 for stepname in pulldiscoveryorder:
974 974 step = pulldiscoverymapping[stepname]
975 975 step(pullop)
976 976
977 977 @pulldiscovery('b1:bookmarks')
978 978 def _pullbookmarkbundle1(pullop):
979 979 """fetch bookmark data in bundle1 case
980 980
981 981 If not using bundle2, we have to fetch bookmarks before changeset
982 982 discovery to reduce the chance and impact of race conditions."""
983 983 if pullop.remotebookmarks is not None:
984 984 return
985 985 if (_canusebundle2(pullop)
986 986 and 'listkeys' in bundle2.bundle2caps(pullop.remote)):
987 987 # all known bundle2 servers now support listkeys, but lets be nice with
988 988 # new implementation.
989 989 return
990 990 pullop.remotebookmarks = pullop.remote.listkeys('bookmarks')
991 991
992 992
993 993 @pulldiscovery('changegroup')
994 994 def _pulldiscoverychangegroup(pullop):
995 995 """discovery phase for the pull
996 996
997 997 Current handle changeset discovery only, will change handle all discovery
998 998 at some point."""
999 999 tmp = discovery.findcommonincoming(pullop.repo,
1000 1000 pullop.remote,
1001 1001 heads=pullop.heads,
1002 1002 force=pullop.force)
1003 1003 common, fetch, rheads = tmp
1004 1004 nm = pullop.repo.unfiltered().changelog.nodemap
1005 1005 if fetch and rheads:
1006 1006 # If a remote heads in filtered locally, lets drop it from the unknown
1007 1007 # remote heads and put in back in common.
1008 1008 #
1009 1009 # This is a hackish solution to catch most of "common but locally
1010 1010 # hidden situation". We do not performs discovery on unfiltered
1011 1011 # repository because it end up doing a pathological amount of round
1012 1012 # trip for w huge amount of changeset we do not care about.
1013 1013 #
1014 1014 # If a set of such "common but filtered" changeset exist on the server
1015 1015 # but are not including a remote heads, we'll not be able to detect it,
1016 1016 scommon = set(common)
1017 1017 filteredrheads = []
1018 1018 for n in rheads:
1019 1019 if n in nm:
1020 1020 if n not in scommon:
1021 1021 common.append(n)
1022 1022 else:
1023 1023 filteredrheads.append(n)
1024 1024 if not filteredrheads:
1025 1025 fetch = []
1026 1026 rheads = filteredrheads
1027 1027 pullop.common = common
1028 1028 pullop.fetch = fetch
1029 1029 pullop.rheads = rheads
1030 1030
1031 1031 def _pullbundle2(pullop):
1032 1032 """pull data using bundle2
1033 1033
1034 1034 For now, the only supported data are changegroup."""
1035 1035 remotecaps = bundle2.bundle2caps(pullop.remote)
1036 1036 kwargs = {'bundlecaps': caps20to10(pullop.repo)}
1037 1037 # pulling changegroup
1038 1038 pullop.stepsdone.add('changegroup')
1039 1039
1040 1040 kwargs['common'] = pullop.common
1041 1041 kwargs['heads'] = pullop.heads or pullop.rheads
1042 1042 kwargs['cg'] = pullop.fetch
1043 1043 if 'listkeys' in remotecaps:
1044 1044 kwargs['listkeys'] = ['phase']
1045 1045 if pullop.remotebookmarks is None:
1046 1046 # make sure to always includes bookmark data when migrating
1047 1047 # `hg incoming --bundle` to using this function.
1048 1048 kwargs['listkeys'].append('bookmarks')
1049 1049 if not pullop.fetch:
1050 1050 pullop.repo.ui.status(_("no changes found\n"))
1051 1051 pullop.cgresult = 0
1052 1052 else:
1053 1053 if pullop.heads is None and list(pullop.common) == [nullid]:
1054 1054 pullop.repo.ui.status(_("requesting all changes\n"))
1055 1055 if obsolete.isenabled(pullop.repo, obsolete.exchangeopt):
1056 1056 remoteversions = bundle2.obsmarkersversion(remotecaps)
1057 1057 if obsolete.commonversion(remoteversions) is not None:
1058 1058 kwargs['obsmarkers'] = True
1059 1059 pullop.stepsdone.add('obsmarkers')
1060 1060 _pullbundle2extraprepare(pullop, kwargs)
1061 1061 bundle = pullop.remote.getbundle('pull', **kwargs)
1062 1062 try:
1063 1063 op = bundle2.processbundle(pullop.repo, bundle, pullop.gettransaction)
1064 1064 except error.BundleValueError, exc:
1065 1065 raise util.Abort('missing support for %s' % exc)
1066 1066
1067 1067 if pullop.fetch:
1068 1068 results = [cg['return'] for cg in op.records['changegroup']]
1069 1069 pullop.cgresult = changegroup.combineresults(results)
1070 1070
1071 1071 # processing phases change
1072 1072 for namespace, value in op.records['listkeys']:
1073 1073 if namespace == 'phases':
1074 1074 _pullapplyphases(pullop, value)
1075 1075
1076 1076 # processing bookmark update
1077 1077 for namespace, value in op.records['listkeys']:
1078 1078 if namespace == 'bookmarks':
1079 1079 pullop.remotebookmarks = value
1080 1080
1081 1081 # bookmark data were either already there or pulled in the bundle
1082 1082 if pullop.remotebookmarks is not None:
1083 1083 _pullbookmarks(pullop)
1084 1084
1085 1085 def _pullbundle2extraprepare(pullop, kwargs):
1086 1086 """hook function so that extensions can extend the getbundle call"""
1087 1087 pass
1088 1088
1089 1089 def _pullchangeset(pullop):
1090 1090 """pull changeset from unbundle into the local repo"""
1091 1091 # We delay the open of the transaction as late as possible so we
1092 1092 # don't open transaction for nothing or you break future useful
1093 1093 # rollback call
1094 1094 if 'changegroup' in pullop.stepsdone:
1095 1095 return
1096 1096 pullop.stepsdone.add('changegroup')
1097 1097 if not pullop.fetch:
1098 1098 pullop.repo.ui.status(_("no changes found\n"))
1099 1099 pullop.cgresult = 0
1100 1100 return
1101 1101 pullop.gettransaction()
1102 1102 if pullop.heads is None and list(pullop.common) == [nullid]:
1103 1103 pullop.repo.ui.status(_("requesting all changes\n"))
1104 1104 elif pullop.heads is None and pullop.remote.capable('changegroupsubset'):
1105 1105 # issue1320, avoid a race if remote changed after discovery
1106 1106 pullop.heads = pullop.rheads
1107 1107
1108 1108 if pullop.remote.capable('getbundle'):
1109 1109 # TODO: get bundlecaps from remote
1110 1110 cg = pullop.remote.getbundle('pull', common=pullop.common,
1111 1111 heads=pullop.heads or pullop.rheads)
1112 1112 elif pullop.heads is None:
1113 1113 cg = pullop.remote.changegroup(pullop.fetch, 'pull')
1114 1114 elif not pullop.remote.capable('changegroupsubset'):
1115 1115 raise util.Abort(_("partial pull cannot be done because "
1116 1116 "other repository doesn't support "
1117 1117 "changegroupsubset."))
1118 1118 else:
1119 1119 cg = pullop.remote.changegroupsubset(pullop.fetch, pullop.heads, 'pull')
1120 1120 pullop.cgresult = changegroup.addchangegroup(pullop.repo, cg, 'pull',
1121 1121 pullop.remote.url())
1122 1122
1123 1123 def _pullphase(pullop):
1124 1124 # Get remote phases data from remote
1125 1125 if 'phases' in pullop.stepsdone:
1126 1126 return
1127 1127 remotephases = pullop.remote.listkeys('phases')
1128 1128 _pullapplyphases(pullop, remotephases)
1129 1129
1130 1130 def _pullapplyphases(pullop, remotephases):
1131 1131 """apply phase movement from observed remote state"""
1132 1132 if 'phases' in pullop.stepsdone:
1133 1133 return
1134 1134 pullop.stepsdone.add('phases')
1135 1135 publishing = bool(remotephases.get('publishing', False))
1136 1136 if remotephases and not publishing:
1137 1137 # remote is new and unpublishing
1138 1138 pheads, _dr = phases.analyzeremotephases(pullop.repo,
1139 1139 pullop.pulledsubset,
1140 1140 remotephases)
1141 1141 dheads = pullop.pulledsubset
1142 1142 else:
1143 1143 # Remote is old or publishing all common changesets
1144 1144 # should be seen as public
1145 1145 pheads = pullop.pulledsubset
1146 1146 dheads = []
1147 1147 unfi = pullop.repo.unfiltered()
1148 1148 phase = unfi._phasecache.phase
1149 1149 rev = unfi.changelog.nodemap.get
1150 1150 public = phases.public
1151 1151 draft = phases.draft
1152 1152
1153 1153 # exclude changesets already public locally and update the others
1154 1154 pheads = [pn for pn in pheads if phase(unfi, rev(pn)) > public]
1155 1155 if pheads:
1156 1156 tr = pullop.gettransaction()
1157 1157 phases.advanceboundary(pullop.repo, tr, public, pheads)
1158 1158
1159 1159 # exclude changesets already draft locally and update the others
1160 1160 dheads = [pn for pn in dheads if phase(unfi, rev(pn)) > draft]
1161 1161 if dheads:
1162 1162 tr = pullop.gettransaction()
1163 1163 phases.advanceboundary(pullop.repo, tr, draft, dheads)
1164 1164
1165 1165 def _pullbookmarks(pullop):
1166 1166 """process the remote bookmark information to update the local one"""
1167 1167 if 'bookmarks' in pullop.stepsdone:
1168 1168 return
1169 1169 pullop.stepsdone.add('bookmarks')
1170 1170 repo = pullop.repo
1171 1171 remotebookmarks = pullop.remotebookmarks
1172 1172 bookmod.updatefromremote(repo.ui, repo, remotebookmarks,
1173 1173 pullop.remote.url(),
1174 1174 pullop.gettransaction,
1175 1175 explicit=pullop.explicitbookmarks)
1176 1176
1177 1177 def _pullobsolete(pullop):
1178 1178 """utility function to pull obsolete markers from a remote
1179 1179
1180 1180 The `gettransaction` is function that return the pull transaction, creating
1181 1181 one if necessary. We return the transaction to inform the calling code that
1182 1182 a new transaction have been created (when applicable).
1183 1183
1184 1184 Exists mostly to allow overriding for experimentation purpose"""
1185 1185 if 'obsmarkers' in pullop.stepsdone:
1186 1186 return
1187 1187 pullop.stepsdone.add('obsmarkers')
1188 1188 tr = None
1189 1189 if obsolete.isenabled(pullop.repo, obsolete.exchangeopt):
1190 1190 pullop.repo.ui.debug('fetching remote obsolete markers\n')
1191 1191 remoteobs = pullop.remote.listkeys('obsolete')
1192 1192 if 'dump0' in remoteobs:
1193 1193 tr = pullop.gettransaction()
1194 1194 for key in sorted(remoteobs, reverse=True):
1195 1195 if key.startswith('dump'):
1196 1196 data = base85.b85decode(remoteobs[key])
1197 1197 pullop.repo.obsstore.mergemarkers(tr, data)
1198 1198 pullop.repo.invalidatevolatilesets()
1199 1199 return tr
1200 1200
1201 1201 def caps20to10(repo):
1202 1202 """return a set with appropriate options to use bundle20 during getbundle"""
1203 1203 caps = set(['HG20'])
1204 1204 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
1205 1205 caps.add('bundle2=' + urllib.quote(capsblob))
1206 1206 return caps
1207 1207
1208 1208 # List of names of steps to perform for a bundle2 for getbundle, order matters.
1209 1209 getbundle2partsorder = []
1210 1210
1211 1211 # Mapping between step name and function
1212 1212 #
1213 1213 # This exists to help extensions wrap steps if necessary
1214 1214 getbundle2partsmapping = {}
1215 1215
1216 1216 def getbundle2partsgenerator(stepname, idx=None):
1217 1217 """decorator for function generating bundle2 part for getbundle
1218 1218
1219 1219 The function is added to the step -> function mapping and appended to the
1220 1220 list of steps. Beware that decorated functions will be added in order
1221 1221 (this may matter).
1222 1222
1223 1223 You can only use this decorator for new steps, if you want to wrap a step
1224 1224 from an extension, attack the getbundle2partsmapping dictionary directly."""
1225 1225 def dec(func):
1226 1226 assert stepname not in getbundle2partsmapping
1227 1227 getbundle2partsmapping[stepname] = func
1228 1228 if idx is None:
1229 1229 getbundle2partsorder.append(stepname)
1230 1230 else:
1231 1231 getbundle2partsorder.insert(idx, stepname)
1232 1232 return func
1233 1233 return dec
1234 1234
1235 1235 def getbundle(repo, source, heads=None, common=None, bundlecaps=None,
1236 1236 **kwargs):
1237 1237 """return a full bundle (with potentially multiple kind of parts)
1238 1238
1239 1239 Could be a bundle HG10 or a bundle HG20 depending on bundlecaps
1240 1240 passed. For now, the bundle can contain only changegroup, but this will
1241 1241 changes when more part type will be available for bundle2.
1242 1242
1243 1243 This is different from changegroup.getchangegroup that only returns an HG10
1244 1244 changegroup bundle. They may eventually get reunited in the future when we
1245 1245 have a clearer idea of the API we what to query different data.
1246 1246
1247 1247 The implementation is at a very early stage and will get massive rework
1248 1248 when the API of bundle is refined.
1249 1249 """
1250 1250 # bundle10 case
1251 1251 usebundle2 = False
1252 1252 if bundlecaps is not None:
1253 1253 usebundle2 = any((cap.startswith('HG2') for cap in bundlecaps))
1254 1254 if not usebundle2:
1255 1255 if bundlecaps and not kwargs.get('cg', True):
1256 1256 raise ValueError(_('request for bundle10 must include changegroup'))
1257 1257
1258 1258 if kwargs:
1259 1259 raise ValueError(_('unsupported getbundle arguments: %s')
1260 1260 % ', '.join(sorted(kwargs.keys())))
1261 1261 return changegroup.getchangegroup(repo, source, heads=heads,
1262 1262 common=common, bundlecaps=bundlecaps)
1263 1263
1264 1264 # bundle20 case
1265 1265 b2caps = {}
1266 1266 for bcaps in bundlecaps:
1267 1267 if bcaps.startswith('bundle2='):
1268 1268 blob = urllib.unquote(bcaps[len('bundle2='):])
1269 1269 b2caps.update(bundle2.decodecaps(blob))
1270 1270 bundler = bundle2.bundle20(repo.ui, b2caps)
1271 1271
1272 1272 kwargs['heads'] = heads
1273 1273 kwargs['common'] = common
1274 1274
1275 1275 for name in getbundle2partsorder:
1276 1276 func = getbundle2partsmapping[name]
1277 1277 func(bundler, repo, source, bundlecaps=bundlecaps, b2caps=b2caps,
1278 1278 **kwargs)
1279 1279
1280 1280 return util.chunkbuffer(bundler.getchunks())
1281 1281
1282 1282 @getbundle2partsgenerator('changegroup')
1283 1283 def _getbundlechangegrouppart(bundler, repo, source, bundlecaps=None,
1284 1284 b2caps=None, heads=None, common=None, **kwargs):
1285 1285 """add a changegroup part to the requested bundle"""
1286 1286 cg = None
1287 1287 if kwargs.get('cg', True):
1288 1288 # build changegroup bundle here.
1289 1289 version = None
1290 1290 cgversions = b2caps.get('changegroup')
1291 1291 getcgkwargs = {}
1292 1292 if cgversions: # 3.1 and 3.2 ship with an empty value
1293 1293 cgversions = [v for v in cgversions if v in changegroup.packermap]
1294 1294 if not cgversions:
1295 1295 raise ValueError(_('no common changegroup version'))
1296 1296 version = getcgkwargs['version'] = max(cgversions)
1297 1297 outgoing = changegroup.computeoutgoing(repo, heads, common)
1298 1298 cg = changegroup.getlocalchangegroupraw(repo, source, outgoing,
1299 1299 bundlecaps=bundlecaps,
1300 1300 **getcgkwargs)
1301 1301
1302 1302 if cg:
1303 1303 part = bundler.newpart('changegroup', data=cg)
1304 1304 if version is not None:
1305 1305 part.addparam('version', version)
1306 1306 part.addparam('nbchanges', str(len(outgoing.missing)), mandatory=False)
1307 1307
1308 1308 @getbundle2partsgenerator('listkeys')
1309 1309 def _getbundlelistkeysparts(bundler, repo, source, bundlecaps=None,
1310 1310 b2caps=None, **kwargs):
1311 1311 """add parts containing listkeys namespaces to the requested bundle"""
1312 1312 listkeys = kwargs.get('listkeys', ())
1313 1313 for namespace in listkeys:
1314 1314 part = bundler.newpart('listkeys')
1315 1315 part.addparam('namespace', namespace)
1316 1316 keys = repo.listkeys(namespace).items()
1317 1317 part.data = pushkey.encodekeys(keys)
1318 1318
1319 1319 @getbundle2partsgenerator('obsmarkers')
1320 1320 def _getbundleobsmarkerpart(bundler, repo, source, bundlecaps=None,
1321 1321 b2caps=None, heads=None, **kwargs):
1322 1322 """add an obsolescence markers part to the requested bundle"""
1323 1323 if kwargs.get('obsmarkers', False):
1324 1324 if heads is None:
1325 1325 heads = repo.heads()
1326 1326 subset = [c.node() for c in repo.set('::%ln', heads)]
1327 1327 markers = repo.obsstore.relevantmarkers(subset)
1328 1328 markers = sorted(markers)
1329 1329 buildobsmarkerspart(bundler, markers)
1330 1330
1331 1331 @getbundle2partsgenerator('hgtagsfnodes')
1332 1332 def _getbundletagsfnodes(bundler, repo, source, bundlecaps=None,
1333 1333 b2caps=None, heads=None, common=None,
1334 1334 **kwargs):
1335 1335 """Transfer the .hgtags filenodes mapping.
1336 1336
1337 1337 Only values for heads in this bundle will be transferred.
1338 1338
1339 1339 The part data consists of pairs of 20 byte changeset node and .hgtags
1340 1340 filenodes raw values.
1341 1341 """
1342 1342 # Don't send unless:
1343 1343 # - changeset are being exchanged,
1344 1344 # - the client supports it.
1345 1345 if not (kwargs.get('cg', True) and 'hgtagsfnodes' in b2caps):
1346 1346 return
1347 1347
1348 1348 outgoing = changegroup.computeoutgoing(repo, heads, common)
1349 1349
1350 1350 if not outgoing.missingheads:
1351 1351 return
1352 1352
1353 1353 cache = tags.hgtagsfnodescache(repo.unfiltered())
1354 1354 chunks = []
1355 1355
1356 1356 # .hgtags fnodes are only relevant for head changesets. While we could
1357 1357 # transfer values for all known nodes, there will likely be little to
1358 1358 # no benefit.
1359 1359 #
1360 1360 # We don't bother using a generator to produce output data because
1361 1361 # a) we only have 40 bytes per head and even esoteric numbers of heads
1362 1362 # consume little memory (1M heads is 40MB) b) we don't want to send the
1363 1363 # part if we don't have entries and knowing if we have entries requires
1364 1364 # cache lookups.
1365 1365 for node in outgoing.missingheads:
1366 1366 # Don't compute missing, as this may slow down serving.
1367 1367 fnode = cache.getfnode(node, computemissing=False)
1368 1368 if fnode is not None:
1369 1369 chunks.extend([node, fnode])
1370 1370
1371 1371 if chunks:
1372 1372 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1373 1373
1374 1374 def check_heads(repo, their_heads, context):
1375 1375 """check if the heads of a repo have been modified
1376 1376
1377 1377 Used by peer for unbundling.
1378 1378 """
1379 1379 heads = repo.heads()
1380 1380 heads_hash = util.sha1(''.join(sorted(heads))).digest()
1381 1381 if not (their_heads == ['force'] or their_heads == heads or
1382 1382 their_heads == ['hashed', heads_hash]):
1383 1383 # someone else committed/pushed/unbundled while we
1384 1384 # were transferring data
1385 1385 raise error.PushRaced('repository changed while %s - '
1386 1386 'please try again' % context)
1387 1387
1388 1388 def unbundle(repo, cg, heads, source, url):
1389 1389 """Apply a bundle to a repo.
1390 1390
1391 1391 this function makes sure the repo is locked during the application and have
1392 1392 mechanism to check that no push race occurred between the creation of the
1393 1393 bundle and its application.
1394 1394
1395 1395 If the push was raced as PushRaced exception is raised."""
1396 1396 r = 0
1397 1397 # need a transaction when processing a bundle2 stream
1398 1398 wlock = lock = tr = None
1399 1399 recordout = None
1400 1400 # quick fix for output mismatch with bundle2 in 3.4
1401 1401 captureoutput = repo.ui.configbool('experimental', 'bundle2-output-capture',
1402 1402 False)
1403 1403 if url.startswith('remote:http:') or url.startswith('remote:https:'):
1404 1404 captureoutput = True
1405 1405 try:
1406 1406 check_heads(repo, heads, 'uploading changes')
1407 1407 # push can proceed
1408 1408 if util.safehasattr(cg, 'params'):
1409 1409 r = None
1410 1410 try:
1411 1411 wlock = repo.wlock()
1412 1412 lock = repo.lock()
1413 1413 tr = repo.transaction(source)
1414 1414 tr.hookargs['source'] = source
1415 1415 tr.hookargs['url'] = url
1416 1416 tr.hookargs['bundle2'] = '1'
1417 1417 op = bundle2.bundleoperation(repo, lambda: tr,
1418 1418 captureoutput=captureoutput)
1419 1419 try:
1420 1420 r = bundle2.processbundle(repo, cg, op=op)
1421 1421 finally:
1422 1422 r = op.reply
1423 1423 if captureoutput and r is not None:
1424 1424 repo.ui.pushbuffer(error=True, subproc=True)
1425 1425 def recordout(output):
1426 1426 r.newpart('output', data=output, mandatory=False)
1427 1427 tr.close()
1428 1428 except BaseException, exc:
1429 1429 exc.duringunbundle2 = True
1430 1430 if captureoutput and r is not None:
1431 1431 parts = exc._bundle2salvagedoutput = r.salvageoutput()
1432 1432 def recordout(output):
1433 1433 part = bundle2.bundlepart('output', data=output,
1434 1434 mandatory=False)
1435 1435 parts.append(part)
1436 1436 raise
1437 1437 else:
1438 1438 lock = repo.lock()
1439 1439 r = changegroup.addchangegroup(repo, cg, source, url)
1440 1440 finally:
1441 1441 lockmod.release(tr, lock, wlock)
1442 1442 if recordout is not None:
1443 1443 recordout(repo.ui.popbuffer())
1444 1444 return r
1445 1445
1446 1446 # This is it's own function so extensions can override it.
1447 1447 def _walkstreamfiles(repo):
1448 1448 return repo.store.walk()
1449 1449
1450 1450 def generatestreamclone(repo):
1451 1451 """Emit content for a streaming clone.
1452 1452
1453 1453 This is a generator of raw chunks that constitute a streaming clone.
1454 1454
1455 1455 The stream begins with a line of 2 space-delimited integers containing the
1456 1456 number of entries and total bytes size.
1457 1457
1458 1458 Next, are N entries for each file being transferred. Each file entry starts
1459 1459 as a line with the file name and integer size delimited by a null byte.
1460 1460 The raw file data follows. Following the raw file data is the next file
1461 1461 entry, or EOF.
1462 1462
1463 1463 When used on the wire protocol, an additional line indicating protocol
1464 1464 success will be prepended to the stream. This function is not responsible
1465 1465 for adding it.
1466 1466
1467 1467 This function will obtain a repository lock to ensure a consistent view of
1468 1468 the store is captured. It therefore may raise LockError.
1469 1469 """
1470 1470 entries = []
1471 1471 total_bytes = 0
1472 1472 # Get consistent snapshot of repo, lock during scan.
1473 1473 lock = repo.lock()
1474 1474 try:
1475 1475 repo.ui.debug('scanning\n')
1476 1476 for name, ename, size in _walkstreamfiles(repo):
1477 1477 if size:
1478 1478 entries.append((name, size))
1479 1479 total_bytes += size
1480 1480 finally:
1481 1481 lock.release()
1482 1482
1483 1483 repo.ui.debug('%d files, %d bytes to transfer\n' %
1484 1484 (len(entries), total_bytes))
1485 1485 yield '%d %d\n' % (len(entries), total_bytes)
1486 1486
1487 1487 sopener = repo.svfs
1488 1488 oldaudit = sopener.mustaudit
1489 1489 debugflag = repo.ui.debugflag
1490 1490 sopener.mustaudit = False
1491 1491
1492 1492 try:
1493 1493 for name, size in entries:
1494 1494 if debugflag:
1495 1495 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
1496 1496 # partially encode name over the wire for backwards compat
1497 1497 yield '%s\0%d\n' % (store.encodedir(name), size)
1498 1498 if size <= 65536:
1499 1499 fp = sopener(name)
1500 1500 try:
1501 1501 data = fp.read(size)
1502 1502 finally:
1503 1503 fp.close()
1504 1504 yield data
1505 1505 else:
1506 1506 for chunk in util.filechunkiter(sopener(name), limit=size):
1507 1507 yield chunk
1508 1508 finally:
1509 1509 sopener.mustaudit = oldaudit
1510 1510
1511 1511 def consumestreamclone(repo, fp):
1512 1512 """Apply the contents from a streaming clone file.
1513 1513
1514 1514 This takes the output from "streamout" and applies it to the specified
1515 1515 repository.
1516 1516
1517 1517 Like "streamout," the status line added by the wire protocol is not handled
1518 1518 by this function.
1519 1519 """
1520 1520 lock = repo.lock()
1521 1521 try:
1522 1522 repo.ui.status(_('streaming all changes\n'))
1523 1523 l = fp.readline()
1524 1524 try:
1525 1525 total_files, total_bytes = map(int, l.split(' ', 1))
1526 1526 except (ValueError, TypeError):
1527 1527 raise error.ResponseError(
1528 1528 _('unexpected response from remote server:'), l)
1529 1529 repo.ui.status(_('%d files to transfer, %s of data\n') %
1530 1530 (total_files, util.bytecount(total_bytes)))
1531 1531 handled_bytes = 0
1532 1532 repo.ui.progress(_('clone'), 0, total=total_bytes)
1533 1533 start = time.time()
1534 1534
1535 1535 tr = repo.transaction(_('clone'))
1536 1536 try:
1537 1537 for i in xrange(total_files):
1538 1538 # XXX doesn't support '\n' or '\r' in filenames
1539 1539 l = fp.readline()
1540 1540 try:
1541 1541 name, size = l.split('\0', 1)
1542 1542 size = int(size)
1543 1543 except (ValueError, TypeError):
1544 1544 raise error.ResponseError(
1545 1545 _('unexpected response from remote server:'), l)
1546 1546 if repo.ui.debugflag:
1547 1547 repo.ui.debug('adding %s (%s)\n' %
1548 1548 (name, util.bytecount(size)))
1549 1549 # for backwards compat, name was partially encoded
1550 1550 ofp = repo.svfs(store.decodedir(name), 'w')
1551 1551 for chunk in util.filechunkiter(fp, limit=size):
1552 1552 handled_bytes += len(chunk)
1553 1553 repo.ui.progress(_('clone'), handled_bytes,
1554 1554 total=total_bytes)
1555 1555 ofp.write(chunk)
1556 1556 ofp.close()
1557 1557 tr.close()
1558 1558 finally:
1559 1559 tr.release()
1560 1560
1561 1561 # Writing straight to files circumvented the inmemory caches
1562 1562 repo.invalidate()
1563 1563
1564 1564 elapsed = time.time() - start
1565 1565 if elapsed <= 0:
1566 1566 elapsed = 0.001
1567 1567 repo.ui.progress(_('clone'), None)
1568 1568 repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
1569 1569 (util.bytecount(total_bytes), elapsed,
1570 1570 util.bytecount(total_bytes / elapsed)))
1571 1571 finally:
1572 1572 lock.release()
General Comments 0
You need to be logged in to leave comments. Login now