##// END OF EJS Templates
changegroup: introduce "raw" versions of some commands...
Sune Foldager -
r23177:706547a1 default
parent child Browse files
Show More
@@ -1,768 +1,793 b''
1 1 # changegroup.py - Mercurial changegroup manipulation functions
2 2 #
3 3 # Copyright 2006 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 import weakref
9 9 from i18n import _
10 10 from node import nullrev, nullid, hex, short
11 11 import mdiff, util, dagutil
12 12 import struct, os, bz2, zlib, tempfile
13 13 import discovery, error, phases, branchmap
14 14
15 15 _CHANGEGROUPV1_DELTA_HEADER = "20s20s20s20s"
16 16
17 17 def readexactly(stream, n):
18 18 '''read n bytes from stream.read and abort if less was available'''
19 19 s = stream.read(n)
20 20 if len(s) < n:
21 21 raise util.Abort(_("stream ended unexpectedly"
22 22 " (got %d bytes, expected %d)")
23 23 % (len(s), n))
24 24 return s
25 25
26 26 def getchunk(stream):
27 27 """return the next chunk from stream as a string"""
28 28 d = readexactly(stream, 4)
29 29 l = struct.unpack(">l", d)[0]
30 30 if l <= 4:
31 31 if l:
32 32 raise util.Abort(_("invalid chunk length %d") % l)
33 33 return ""
34 34 return readexactly(stream, l - 4)
35 35
36 36 def chunkheader(length):
37 37 """return a changegroup chunk header (string)"""
38 38 return struct.pack(">l", length + 4)
39 39
40 40 def closechunk():
41 41 """return a changegroup chunk header (string) for a zero-length chunk"""
42 42 return struct.pack(">l", 0)
43 43
44 44 class nocompress(object):
45 45 def compress(self, x):
46 46 return x
47 47 def flush(self):
48 48 return ""
49 49
50 50 bundletypes = {
51 51 "": ("", nocompress), # only when using unbundle on ssh and old http servers
52 52 # since the unification ssh accepts a header but there
53 53 # is no capability signaling it.
54 54 "HG10UN": ("HG10UN", nocompress),
55 55 "HG10BZ": ("HG10", lambda: bz2.BZ2Compressor()),
56 56 "HG10GZ": ("HG10GZ", lambda: zlib.compressobj()),
57 57 }
58 58
59 59 # hgweb uses this list to communicate its preferred type
60 60 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
61 61
62 62 def writebundle(cg, filename, bundletype, vfs=None):
63 63 """Write a bundle file and return its filename.
64 64
65 65 Existing files will not be overwritten.
66 66 If no filename is specified, a temporary file is created.
67 67 bz2 compression can be turned off.
68 68 The bundle file will be deleted in case of errors.
69 69 """
70 70
71 71 fh = None
72 72 cleanup = None
73 73 try:
74 74 if filename:
75 75 if vfs:
76 76 fh = vfs.open(filename, "wb")
77 77 else:
78 78 fh = open(filename, "wb")
79 79 else:
80 80 fd, filename = tempfile.mkstemp(prefix="hg-bundle-", suffix=".hg")
81 81 fh = os.fdopen(fd, "wb")
82 82 cleanup = filename
83 83
84 84 header, compressor = bundletypes[bundletype]
85 85 fh.write(header)
86 86 z = compressor()
87 87
88 88 # parse the changegroup data, otherwise we will block
89 89 # in case of sshrepo because we don't know the end of the stream
90 90
91 91 # an empty chunkgroup is the end of the changegroup
92 92 # a changegroup has at least 2 chunkgroups (changelog and manifest).
93 93 # after that, an empty chunkgroup is the end of the changegroup
94 94 for chunk in cg.getchunks():
95 95 fh.write(z.compress(chunk))
96 96 fh.write(z.flush())
97 97 cleanup = None
98 98 return filename
99 99 finally:
100 100 if fh is not None:
101 101 fh.close()
102 102 if cleanup is not None:
103 103 if filename and vfs:
104 104 vfs.unlink(cleanup)
105 105 else:
106 106 os.unlink(cleanup)
107 107
108 108 def decompressor(fh, alg):
109 109 if alg == 'UN':
110 110 return fh
111 111 elif alg == 'GZ':
112 112 def generator(f):
113 113 zd = zlib.decompressobj()
114 114 for chunk in util.filechunkiter(f):
115 115 yield zd.decompress(chunk)
116 116 elif alg == 'BZ':
117 117 def generator(f):
118 118 zd = bz2.BZ2Decompressor()
119 119 zd.decompress("BZ")
120 120 for chunk in util.filechunkiter(f, 4096):
121 121 yield zd.decompress(chunk)
122 122 else:
123 123 raise util.Abort("unknown bundle compression '%s'" % alg)
124 124 return util.chunkbuffer(generator(fh))
125 125
126 126 class cg1unpacker(object):
127 127 deltaheader = _CHANGEGROUPV1_DELTA_HEADER
128 128 deltaheadersize = struct.calcsize(deltaheader)
129 129 def __init__(self, fh, alg):
130 130 self._stream = decompressor(fh, alg)
131 131 self._type = alg
132 132 self.callback = None
133 133 def compressed(self):
134 134 return self._type != 'UN'
135 135 def read(self, l):
136 136 return self._stream.read(l)
137 137 def seek(self, pos):
138 138 return self._stream.seek(pos)
139 139 def tell(self):
140 140 return self._stream.tell()
141 141 def close(self):
142 142 return self._stream.close()
143 143
144 144 def chunklength(self):
145 145 d = readexactly(self._stream, 4)
146 146 l = struct.unpack(">l", d)[0]
147 147 if l <= 4:
148 148 if l:
149 149 raise util.Abort(_("invalid chunk length %d") % l)
150 150 return 0
151 151 if self.callback:
152 152 self.callback()
153 153 return l - 4
154 154
155 155 def changelogheader(self):
156 156 """v10 does not have a changelog header chunk"""
157 157 return {}
158 158
159 159 def manifestheader(self):
160 160 """v10 does not have a manifest header chunk"""
161 161 return {}
162 162
163 163 def filelogheader(self):
164 164 """return the header of the filelogs chunk, v10 only has the filename"""
165 165 l = self.chunklength()
166 166 if not l:
167 167 return {}
168 168 fname = readexactly(self._stream, l)
169 169 return {'filename': fname}
170 170
171 171 def _deltaheader(self, headertuple, prevnode):
172 172 node, p1, p2, cs = headertuple
173 173 if prevnode is None:
174 174 deltabase = p1
175 175 else:
176 176 deltabase = prevnode
177 177 return node, p1, p2, deltabase, cs
178 178
179 179 def deltachunk(self, prevnode):
180 180 l = self.chunklength()
181 181 if not l:
182 182 return {}
183 183 headerdata = readexactly(self._stream, self.deltaheadersize)
184 184 header = struct.unpack(self.deltaheader, headerdata)
185 185 delta = readexactly(self._stream, l - self.deltaheadersize)
186 186 node, p1, p2, deltabase, cs = self._deltaheader(header, prevnode)
187 187 return {'node': node, 'p1': p1, 'p2': p2, 'cs': cs,
188 188 'deltabase': deltabase, 'delta': delta}
189 189
190 190 def getchunks(self):
191 191 """returns all the chunks contains in the bundle
192 192
193 193 Used when you need to forward the binary stream to a file or another
194 194 network API. To do so, it parse the changegroup data, otherwise it will
195 195 block in case of sshrepo because it don't know the end of the stream.
196 196 """
197 197 # an empty chunkgroup is the end of the changegroup
198 198 # a changegroup has at least 2 chunkgroups (changelog and manifest).
199 199 # after that, an empty chunkgroup is the end of the changegroup
200 200 empty = False
201 201 count = 0
202 202 while not empty or count <= 2:
203 203 empty = True
204 204 count += 1
205 205 while True:
206 206 chunk = getchunk(self)
207 207 if not chunk:
208 208 break
209 209 empty = False
210 210 yield chunkheader(len(chunk))
211 211 pos = 0
212 212 while pos < len(chunk):
213 213 next = pos + 2**20
214 214 yield chunk[pos:next]
215 215 pos = next
216 216 yield closechunk()
217 217
218 218 class headerlessfixup(object):
219 219 def __init__(self, fh, h):
220 220 self._h = h
221 221 self._fh = fh
222 222 def read(self, n):
223 223 if self._h:
224 224 d, self._h = self._h[:n], self._h[n:]
225 225 if len(d) < n:
226 226 d += readexactly(self._fh, n - len(d))
227 227 return d
228 228 return readexactly(self._fh, n)
229 229
230 230 class cg1packer(object):
231 231 deltaheader = _CHANGEGROUPV1_DELTA_HEADER
232 232 def __init__(self, repo, bundlecaps=None):
233 233 """Given a source repo, construct a bundler.
234 234
235 235 bundlecaps is optional and can be used to specify the set of
236 236 capabilities which can be used to build the bundle.
237 237 """
238 238 # Set of capabilities we can use to build the bundle.
239 239 if bundlecaps is None:
240 240 bundlecaps = set()
241 241 self._bundlecaps = bundlecaps
242 242 self._changelog = repo.changelog
243 243 self._manifest = repo.manifest
244 244 reorder = repo.ui.config('bundle', 'reorder', 'auto')
245 245 if reorder == 'auto':
246 246 reorder = None
247 247 else:
248 248 reorder = util.parsebool(reorder)
249 249 self._repo = repo
250 250 self._reorder = reorder
251 251 self._progress = repo.ui.progress
252 252 def close(self):
253 253 return closechunk()
254 254
255 255 def fileheader(self, fname):
256 256 return chunkheader(len(fname)) + fname
257 257
258 258 def group(self, nodelist, revlog, lookup, units=None, reorder=None):
259 259 """Calculate a delta group, yielding a sequence of changegroup chunks
260 260 (strings).
261 261
262 262 Given a list of changeset revs, return a set of deltas and
263 263 metadata corresponding to nodes. The first delta is
264 264 first parent(nodelist[0]) -> nodelist[0], the receiver is
265 265 guaranteed to have this parent as it has all history before
266 266 these changesets. In the case firstparent is nullrev the
267 267 changegroup starts with a full revision.
268 268
269 269 If units is not None, progress detail will be generated, units specifies
270 270 the type of revlog that is touched (changelog, manifest, etc.).
271 271 """
272 272 # if we don't have any revisions touched by these changesets, bail
273 273 if len(nodelist) == 0:
274 274 yield self.close()
275 275 return
276 276
277 277 # for generaldelta revlogs, we linearize the revs; this will both be
278 278 # much quicker and generate a much smaller bundle
279 279 if (revlog._generaldelta and reorder is not False) or reorder:
280 280 dag = dagutil.revlogdag(revlog)
281 281 revs = set(revlog.rev(n) for n in nodelist)
282 282 revs = dag.linearize(revs)
283 283 else:
284 284 revs = sorted([revlog.rev(n) for n in nodelist])
285 285
286 286 # add the parent of the first rev
287 287 p = revlog.parentrevs(revs[0])[0]
288 288 revs.insert(0, p)
289 289
290 290 # build deltas
291 291 total = len(revs) - 1
292 292 msgbundling = _('bundling')
293 293 for r in xrange(len(revs) - 1):
294 294 if units is not None:
295 295 self._progress(msgbundling, r + 1, unit=units, total=total)
296 296 prev, curr = revs[r], revs[r + 1]
297 297 linknode = lookup(revlog.node(curr))
298 298 for c in self.revchunk(revlog, curr, prev, linknode):
299 299 yield c
300 300
301 301 yield self.close()
302 302
303 303 # filter any nodes that claim to be part of the known set
304 304 def prune(self, revlog, missing, commonrevs, source):
305 305 rr, rl = revlog.rev, revlog.linkrev
306 306 return [n for n in missing if rl(rr(n)) not in commonrevs]
307 307
308 308 def generate(self, commonrevs, clnodes, fastpathlinkrev, source):
309 309 '''yield a sequence of changegroup chunks (strings)'''
310 310 repo = self._repo
311 311 cl = self._changelog
312 312 mf = self._manifest
313 313 reorder = self._reorder
314 314 progress = self._progress
315 315
316 316 # for progress output
317 317 msgbundling = _('bundling')
318 318
319 319 mfs = {} # needed manifests
320 320 fnodes = {} # needed file nodes
321 321 changedfiles = set()
322 322
323 323 # Callback for the changelog, used to collect changed files and manifest
324 324 # nodes.
325 325 # Returns the linkrev node (identity in the changelog case).
326 326 def lookupcl(x):
327 327 c = cl.read(x)
328 328 changedfiles.update(c[3])
329 329 # record the first changeset introducing this manifest version
330 330 mfs.setdefault(c[0], x)
331 331 return x
332 332
333 333 # Callback for the manifest, used to collect linkrevs for filelog
334 334 # revisions.
335 335 # Returns the linkrev node (collected in lookupcl).
336 336 def lookupmf(x):
337 337 clnode = mfs[x]
338 338 if not fastpathlinkrev:
339 339 mdata = mf.readfast(x)
340 340 for f, n in mdata.iteritems():
341 341 if f in changedfiles:
342 342 # record the first changeset introducing this filelog
343 343 # version
344 344 fnodes[f].setdefault(n, clnode)
345 345 return clnode
346 346
347 347 for chunk in self.group(clnodes, cl, lookupcl, units=_('changesets'),
348 348 reorder=reorder):
349 349 yield chunk
350 350 progress(msgbundling, None)
351 351
352 352 for f in changedfiles:
353 353 fnodes[f] = {}
354 354 mfnodes = self.prune(mf, mfs, commonrevs, source)
355 355 for chunk in self.group(mfnodes, mf, lookupmf, units=_('manifests'),
356 356 reorder=reorder):
357 357 yield chunk
358 358 progress(msgbundling, None)
359 359
360 360 mfs.clear()
361 361 needed = set(cl.rev(x) for x in clnodes)
362 362
363 363 def linknodes(filerevlog, fname):
364 364 if fastpathlinkrev:
365 365 llr = filerevlog.linkrev
366 366 def genfilenodes():
367 367 for r in filerevlog:
368 368 linkrev = llr(r)
369 369 if linkrev in needed:
370 370 yield filerevlog.node(r), cl.node(linkrev)
371 371 fnodes[fname] = dict(genfilenodes())
372 372 return fnodes.get(fname, {})
373 373
374 374 for chunk in self.generatefiles(changedfiles, linknodes, commonrevs,
375 375 source):
376 376 yield chunk
377 377
378 378 yield self.close()
379 379 progress(msgbundling, None)
380 380
381 381 if clnodes:
382 382 repo.hook('outgoing', node=hex(clnodes[0]), source=source)
383 383
384 384 def generatefiles(self, changedfiles, linknodes, commonrevs, source):
385 385 repo = self._repo
386 386 progress = self._progress
387 387 reorder = self._reorder
388 388 msgbundling = _('bundling')
389 389
390 390 total = len(changedfiles)
391 391 # for progress output
392 392 msgfiles = _('files')
393 393 for i, fname in enumerate(sorted(changedfiles)):
394 394 filerevlog = repo.file(fname)
395 395 if not filerevlog:
396 396 raise util.Abort(_("empty or missing revlog for %s") % fname)
397 397
398 398 linkrevnodes = linknodes(filerevlog, fname)
399 399 # Lookup for filenodes, we collected the linkrev nodes above in the
400 400 # fastpath case and with lookupmf in the slowpath case.
401 401 def lookupfilelog(x):
402 402 return linkrevnodes[x]
403 403
404 404 filenodes = self.prune(filerevlog, linkrevnodes, commonrevs, source)
405 405 if filenodes:
406 406 progress(msgbundling, i + 1, item=fname, unit=msgfiles,
407 407 total=total)
408 408 yield self.fileheader(fname)
409 409 for chunk in self.group(filenodes, filerevlog, lookupfilelog,
410 410 reorder=reorder):
411 411 yield chunk
412 412
413 413 def revchunk(self, revlog, rev, prev, linknode):
414 414 node = revlog.node(rev)
415 415 p1, p2 = revlog.parentrevs(rev)
416 416 base = prev
417 417
418 418 prefix = ''
419 419 if base == nullrev:
420 420 delta = revlog.revision(node)
421 421 prefix = mdiff.trivialdiffheader(len(delta))
422 422 else:
423 423 delta = revlog.revdiff(base, rev)
424 424 p1n, p2n = revlog.parents(node)
425 425 basenode = revlog.node(base)
426 426 meta = self.builddeltaheader(node, p1n, p2n, basenode, linknode)
427 427 meta += prefix
428 428 l = len(meta) + len(delta)
429 429 yield chunkheader(l)
430 430 yield meta
431 431 yield delta
432 432 def builddeltaheader(self, node, p1n, p2n, basenode, linknode):
433 433 # do nothing with basenode, it is implicitly the previous one in HG10
434 434 return struct.pack(self.deltaheader, node, p1n, p2n, linknode)
435 435
436 436 packermap = {'01': (cg1packer, cg1unpacker)}
437 437
438 438 def _changegroupinfo(repo, nodes, source):
439 439 if repo.ui.verbose or source == 'bundle':
440 440 repo.ui.status(_("%d changesets found\n") % len(nodes))
441 441 if repo.ui.debugflag:
442 442 repo.ui.debug("list of changesets:\n")
443 443 for node in nodes:
444 444 repo.ui.debug("%s\n" % hex(node))
445 445
446 def getsubset(repo, outgoing, bundler, source, fastpath=False):
446 def getsubsetraw(repo, outgoing, bundler, source, fastpath=False):
447 447 repo = repo.unfiltered()
448 448 commonrevs = outgoing.common
449 449 csets = outgoing.missing
450 450 heads = outgoing.missingheads
451 451 # We go through the fast path if we get told to, or if all (unfiltered
452 452 # heads have been requested (since we then know there all linkrevs will
453 453 # be pulled by the client).
454 454 heads.sort()
455 455 fastpathlinkrev = fastpath or (
456 456 repo.filtername is None and heads == sorted(repo.heads()))
457 457
458 458 repo.hook('preoutgoing', throw=True, source=source)
459 459 _changegroupinfo(repo, csets, source)
460 gengroup = bundler.generate(commonrevs, csets, fastpathlinkrev, source)
460 return bundler.generate(commonrevs, csets, fastpathlinkrev, source)
461
462 def getsubset(repo, outgoing, bundler, source, fastpath=False):
463 gengroup = getsubsetraw(repo, outgoing, bundler, source, fastpath)
461 464 return cg1unpacker(util.chunkbuffer(gengroup), 'UN')
462 465
463 466 def changegroupsubset(repo, roots, heads, source):
464 467 """Compute a changegroup consisting of all the nodes that are
465 468 descendants of any of the roots and ancestors of any of the heads.
466 469 Return a chunkbuffer object whose read() method will return
467 470 successive changegroup chunks.
468 471
469 472 It is fairly complex as determining which filenodes and which
470 473 manifest nodes need to be included for the changeset to be complete
471 474 is non-trivial.
472 475
473 476 Another wrinkle is doing the reverse, figuring out which changeset in
474 477 the changegroup a particular filenode or manifestnode belongs to.
475 478 """
476 479 cl = repo.changelog
477 480 if not roots:
478 481 roots = [nullid]
479 482 # TODO: remove call to nodesbetween.
480 483 csets, roots, heads = cl.nodesbetween(roots, heads)
481 484 discbases = []
482 485 for n in roots:
483 486 discbases.extend([p for p in cl.parents(n) if p != nullid])
484 487 outgoing = discovery.outgoing(cl, discbases, heads)
485 488 bundler = cg1packer(repo)
486 489 return getsubset(repo, outgoing, bundler, source)
487 490
491 def getlocalchangegroupraw(repo, source, outgoing, bundlecaps=None):
492 """Like getbundle, but taking a discovery.outgoing as an argument.
493
494 This is only implemented for local repos and reuses potentially
495 precomputed sets in outgoing. Returns a raw changegroup generator."""
496 if not outgoing.missing:
497 return None
498 bundler = cg1packer(repo, bundlecaps)
499 return getsubsetraw(repo, outgoing, bundler, source)
500
488 501 def getlocalchangegroup(repo, source, outgoing, bundlecaps=None):
489 502 """Like getbundle, but taking a discovery.outgoing as an argument.
490 503
491 504 This is only implemented for local repos and reuses potentially
492 505 precomputed sets in outgoing."""
493 506 if not outgoing.missing:
494 507 return None
495 508 bundler = cg1packer(repo, bundlecaps)
496 509 return getsubset(repo, outgoing, bundler, source)
497 510
498 511 def _computeoutgoing(repo, heads, common):
499 512 """Computes which revs are outgoing given a set of common
500 513 and a set of heads.
501 514
502 515 This is a separate function so extensions can have access to
503 516 the logic.
504 517
505 518 Returns a discovery.outgoing object.
506 519 """
507 520 cl = repo.changelog
508 521 if common:
509 522 hasnode = cl.hasnode
510 523 common = [n for n in common if hasnode(n)]
511 524 else:
512 525 common = [nullid]
513 526 if not heads:
514 527 heads = cl.heads()
515 528 return discovery.outgoing(cl, common, heads)
516 529
530 def getchangegroupraw(repo, source, heads=None, common=None, bundlecaps=None):
531 """Like changegroupsubset, but returns the set difference between the
532 ancestors of heads and the ancestors common.
533
534 If heads is None, use the local heads. If common is None, use [nullid].
535
536 The nodes in common might not all be known locally due to the way the
537 current discovery protocol works. Returns a raw changegroup generator.
538 """
539 outgoing = _computeoutgoing(repo, heads, common)
540 return getlocalchangegroupraw(repo, source, outgoing, bundlecaps=bundlecaps)
541
517 542 def getchangegroup(repo, source, heads=None, common=None, bundlecaps=None):
518 543 """Like changegroupsubset, but returns the set difference between the
519 544 ancestors of heads and the ancestors common.
520 545
521 546 If heads is None, use the local heads. If common is None, use [nullid].
522 547
523 548 The nodes in common might not all be known locally due to the way the
524 549 current discovery protocol works.
525 550 """
526 551 outgoing = _computeoutgoing(repo, heads, common)
527 552 return getlocalchangegroup(repo, source, outgoing, bundlecaps=bundlecaps)
528 553
529 554 def changegroup(repo, basenodes, source):
530 555 # to avoid a race we use changegroupsubset() (issue1320)
531 556 return changegroupsubset(repo, basenodes, repo.heads(), source)
532 557
533 558 def addchangegroupfiles(repo, source, revmap, trp, pr, needfiles):
534 559 revisions = 0
535 560 files = 0
536 561 while True:
537 562 chunkdata = source.filelogheader()
538 563 if not chunkdata:
539 564 break
540 565 f = chunkdata["filename"]
541 566 repo.ui.debug("adding %s revisions\n" % f)
542 567 pr()
543 568 fl = repo.file(f)
544 569 o = len(fl)
545 570 if not fl.addgroup(source, revmap, trp):
546 571 raise util.Abort(_("received file revlog group is empty"))
547 572 revisions += len(fl) - o
548 573 files += 1
549 574 if f in needfiles:
550 575 needs = needfiles[f]
551 576 for new in xrange(o, len(fl)):
552 577 n = fl.node(new)
553 578 if n in needs:
554 579 needs.remove(n)
555 580 else:
556 581 raise util.Abort(
557 582 _("received spurious file revlog entry"))
558 583 if not needs:
559 584 del needfiles[f]
560 585 repo.ui.progress(_('files'), None)
561 586
562 587 for f, needs in needfiles.iteritems():
563 588 fl = repo.file(f)
564 589 for n in needs:
565 590 try:
566 591 fl.rev(n)
567 592 except error.LookupError:
568 593 raise util.Abort(
569 594 _('missing file data for %s:%s - run hg verify') %
570 595 (f, hex(n)))
571 596
572 597 return revisions, files
573 598
574 599 def addchangegroup(repo, source, srctype, url, emptyok=False,
575 600 targetphase=phases.draft):
576 601 """Add the changegroup returned by source.read() to this repo.
577 602 srctype is a string like 'push', 'pull', or 'unbundle'. url is
578 603 the URL of the repo where this changegroup is coming from.
579 604
580 605 Return an integer summarizing the change to this repo:
581 606 - nothing changed or no source: 0
582 607 - more heads than before: 1+added heads (2..n)
583 608 - fewer heads than before: -1-removed heads (-2..-n)
584 609 - number of heads stays the same: 1
585 610 """
586 611 repo = repo.unfiltered()
587 612 def csmap(x):
588 613 repo.ui.debug("add changeset %s\n" % short(x))
589 614 return len(cl)
590 615
591 616 def revmap(x):
592 617 return cl.rev(x)
593 618
594 619 if not source:
595 620 return 0
596 621
597 622 changesets = files = revisions = 0
598 623 efiles = set()
599 624
600 625 # write changelog data to temp files so concurrent readers will not see
601 626 # inconsistent view
602 627 cl = repo.changelog
603 628 cl.delayupdate()
604 629 oldheads = cl.heads()
605 630
606 631 tr = repo.transaction("\n".join([srctype, util.hidepassword(url)]))
607 632 # The transaction could have been created before and already carries source
608 633 # information. In this case we use the top level data. We overwrite the
609 634 # argument because we need to use the top level value (if they exist) in
610 635 # this function.
611 636 srctype = tr.hookargs.setdefault('source', srctype)
612 637 url = tr.hookargs.setdefault('url', url)
613 638 try:
614 639 repo.hook('prechangegroup', throw=True, **tr.hookargs)
615 640
616 641 trp = weakref.proxy(tr)
617 642 # pull off the changeset group
618 643 repo.ui.status(_("adding changesets\n"))
619 644 clstart = len(cl)
620 645 class prog(object):
621 646 step = _('changesets')
622 647 count = 1
623 648 ui = repo.ui
624 649 total = None
625 650 def __call__(repo):
626 651 repo.ui.progress(repo.step, repo.count, unit=_('chunks'),
627 652 total=repo.total)
628 653 repo.count += 1
629 654 pr = prog()
630 655 source.callback = pr
631 656
632 657 source.changelogheader()
633 658 srccontent = cl.addgroup(source, csmap, trp)
634 659 if not (srccontent or emptyok):
635 660 raise util.Abort(_("received changelog group is empty"))
636 661 clend = len(cl)
637 662 changesets = clend - clstart
638 663 for c in xrange(clstart, clend):
639 664 efiles.update(repo[c].files())
640 665 efiles = len(efiles)
641 666 repo.ui.progress(_('changesets'), None)
642 667
643 668 # pull off the manifest group
644 669 repo.ui.status(_("adding manifests\n"))
645 670 pr.step = _('manifests')
646 671 pr.count = 1
647 672 pr.total = changesets # manifests <= changesets
648 673 # no need to check for empty manifest group here:
649 674 # if the result of the merge of 1 and 2 is the same in 3 and 4,
650 675 # no new manifest will be created and the manifest group will
651 676 # be empty during the pull
652 677 source.manifestheader()
653 678 repo.manifest.addgroup(source, revmap, trp)
654 679 repo.ui.progress(_('manifests'), None)
655 680
656 681 needfiles = {}
657 682 if repo.ui.configbool('server', 'validate', default=False):
658 683 # validate incoming csets have their manifests
659 684 for cset in xrange(clstart, clend):
660 685 mfest = repo.changelog.read(repo.changelog.node(cset))[0]
661 686 mfest = repo.manifest.readdelta(mfest)
662 687 # store file nodes we must see
663 688 for f, n in mfest.iteritems():
664 689 needfiles.setdefault(f, set()).add(n)
665 690
666 691 # process the files
667 692 repo.ui.status(_("adding file changes\n"))
668 693 pr.step = _('files')
669 694 pr.count = 1
670 695 pr.total = efiles
671 696 source.callback = None
672 697
673 698 newrevs, newfiles = addchangegroupfiles(repo, source, revmap, trp, pr,
674 699 needfiles)
675 700 revisions += newrevs
676 701 files += newfiles
677 702
678 703 dh = 0
679 704 if oldheads:
680 705 heads = cl.heads()
681 706 dh = len(heads) - len(oldheads)
682 707 for h in heads:
683 708 if h not in oldheads and repo[h].closesbranch():
684 709 dh -= 1
685 710 htext = ""
686 711 if dh:
687 712 htext = _(" (%+d heads)") % dh
688 713
689 714 repo.ui.status(_("added %d changesets"
690 715 " with %d changes to %d files%s\n")
691 716 % (changesets, revisions, files, htext))
692 717 repo.invalidatevolatilesets()
693 718
694 719 if changesets > 0:
695 720 p = lambda: cl.writepending() and repo.root or ""
696 721 if 'node' not in tr.hookargs:
697 722 tr.hookargs['node'] = hex(cl.node(clstart))
698 723 hookargs = dict(tr.hookargs)
699 724 else:
700 725 hookargs = dict(tr.hookargs)
701 726 hookargs['node'] = hex(cl.node(clstart))
702 727 repo.hook('pretxnchangegroup', throw=True, pending=p, **hookargs)
703 728
704 729 added = [cl.node(r) for r in xrange(clstart, clend)]
705 730 publishing = repo.ui.configbool('phases', 'publish', True)
706 731 if srctype in ('push', 'serve'):
707 732 # Old servers can not push the boundary themselves.
708 733 # New servers won't push the boundary if changeset already
709 734 # exists locally as secret
710 735 #
711 736 # We should not use added here but the list of all change in
712 737 # the bundle
713 738 if publishing:
714 739 phases.advanceboundary(repo, tr, phases.public, srccontent)
715 740 else:
716 741 # Those changesets have been pushed from the outside, their
717 742 # phases are going to be pushed alongside. Therefor
718 743 # `targetphase` is ignored.
719 744 phases.advanceboundary(repo, tr, phases.draft, srccontent)
720 745 phases.retractboundary(repo, tr, phases.draft, added)
721 746 elif srctype != 'strip':
722 747 # publishing only alter behavior during push
723 748 #
724 749 # strip should not touch boundary at all
725 750 phases.retractboundary(repo, tr, targetphase, added)
726 751
727 752 # make changelog see real files again
728 753 cl.finalize(trp)
729 754
730 755 tr.close()
731 756
732 757 if changesets > 0:
733 758 if srctype != 'strip':
734 759 # During strip, branchcache is invalid but coming call to
735 760 # `destroyed` will repair it.
736 761 # In other case we can safely update cache on disk.
737 762 branchmap.updatecache(repo.filtered('served'))
738 763
739 764 def runhooks():
740 765 # These hooks run when the lock releases, not when the
741 766 # transaction closes. So it's possible for the changelog
742 767 # to have changed since we last saw it.
743 768 if clstart >= len(repo):
744 769 return
745 770
746 771 # forcefully update the on-disk branch cache
747 772 repo.ui.debug("updating the branch cache\n")
748 773 repo.hook("changegroup", **hookargs)
749 774
750 775 for n in added:
751 776 args = hookargs.copy()
752 777 args['node'] = hex(n)
753 778 repo.hook("incoming", **args)
754 779
755 780 newheads = [h for h in repo.heads() if h not in oldheads]
756 781 repo.ui.log("incoming",
757 782 "%s incoming changes - new heads: %s\n",
758 783 len(added),
759 784 ', '.join([hex(c[:6]) for c in newheads]))
760 785 repo._afterlock(runhooks)
761 786
762 787 finally:
763 788 tr.release()
764 789 # never return 0 here:
765 790 if dh < 0:
766 791 return dh - 1
767 792 else:
768 793 return dh + 1
@@ -1,1270 +1,1271 b''
1 1 # exchange.py - utility to exchange data between repos.
2 2 #
3 3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from i18n import _
9 9 from node import hex, nullid
10 10 import errno, urllib
11 11 import util, scmutil, changegroup, base85, error
12 12 import discovery, phases, obsolete, bookmarks as bookmod, bundle2, pushkey
13 13
14 14 def readbundle(ui, fh, fname, vfs=None):
15 15 header = changegroup.readexactly(fh, 4)
16 16
17 17 alg = None
18 18 if not fname:
19 19 fname = "stream"
20 20 if not header.startswith('HG') and header.startswith('\0'):
21 21 fh = changegroup.headerlessfixup(fh, header)
22 22 header = "HG10"
23 23 alg = 'UN'
24 24 elif vfs:
25 25 fname = vfs.join(fname)
26 26
27 27 magic, version = header[0:2], header[2:4]
28 28
29 29 if magic != 'HG':
30 30 raise util.Abort(_('%s: not a Mercurial bundle') % fname)
31 31 if version == '10':
32 32 if alg is None:
33 33 alg = changegroup.readexactly(fh, 2)
34 34 return changegroup.cg1unpacker(fh, alg)
35 35 elif version == '2Y':
36 36 return bundle2.unbundle20(ui, fh, header=magic + version)
37 37 else:
38 38 raise util.Abort(_('%s: unknown bundle version %s') % (fname, version))
39 39
40 40 def buildobsmarkerspart(bundler, markers):
41 41 """add an obsmarker part to the bundler with <markers>
42 42
43 43 No part is created if markers is empty.
44 44 Raises ValueError if the bundler doesn't support any known obsmarker format.
45 45 """
46 46 if markers:
47 47 remoteversions = bundle2.obsmarkersversion(bundler.capabilities)
48 48 version = obsolete.commonversion(remoteversions)
49 49 if version is None:
50 50 raise ValueError('bundler do not support common obsmarker format')
51 51 stream = obsolete.encodemarkers(markers, True, version=version)
52 52 return bundler.newpart('B2X:OBSMARKERS', data=stream)
53 53 return None
54 54
55 55 class pushoperation(object):
56 56 """A object that represent a single push operation
57 57
58 58 It purpose is to carry push related state and very common operation.
59 59
60 60 A new should be created at the beginning of each push and discarded
61 61 afterward.
62 62 """
63 63
64 64 def __init__(self, repo, remote, force=False, revs=None, newbranch=False,
65 65 bookmarks=()):
66 66 # repo we push from
67 67 self.repo = repo
68 68 self.ui = repo.ui
69 69 # repo we push to
70 70 self.remote = remote
71 71 # force option provided
72 72 self.force = force
73 73 # revs to be pushed (None is "all")
74 74 self.revs = revs
75 75 # bookmark explicitly pushed
76 76 self.bookmarks = bookmarks
77 77 # allow push of new branch
78 78 self.newbranch = newbranch
79 79 # did a local lock get acquired?
80 80 self.locallocked = None
81 81 # step already performed
82 82 # (used to check what steps have been already performed through bundle2)
83 83 self.stepsdone = set()
84 84 # Integer version of the changegroup push result
85 85 # - None means nothing to push
86 86 # - 0 means HTTP error
87 87 # - 1 means we pushed and remote head count is unchanged *or*
88 88 # we have outgoing changesets but refused to push
89 89 # - other values as described by addchangegroup()
90 90 self.cgresult = None
91 91 # Boolean value for the bookmark push
92 92 self.bkresult = None
93 93 # discover.outgoing object (contains common and outgoing data)
94 94 self.outgoing = None
95 95 # all remote heads before the push
96 96 self.remoteheads = None
97 97 # testable as a boolean indicating if any nodes are missing locally.
98 98 self.incoming = None
99 99 # phases changes that must be pushed along side the changesets
100 100 self.outdatedphases = None
101 101 # phases changes that must be pushed if changeset push fails
102 102 self.fallbackoutdatedphases = None
103 103 # outgoing obsmarkers
104 104 self.outobsmarkers = set()
105 105 # outgoing bookmarks
106 106 self.outbookmarks = []
107 107
108 108 @util.propertycache
109 109 def futureheads(self):
110 110 """future remote heads if the changeset push succeeds"""
111 111 return self.outgoing.missingheads
112 112
113 113 @util.propertycache
114 114 def fallbackheads(self):
115 115 """future remote heads if the changeset push fails"""
116 116 if self.revs is None:
117 117 # not target to push, all common are relevant
118 118 return self.outgoing.commonheads
119 119 unfi = self.repo.unfiltered()
120 120 # I want cheads = heads(::missingheads and ::commonheads)
121 121 # (missingheads is revs with secret changeset filtered out)
122 122 #
123 123 # This can be expressed as:
124 124 # cheads = ( (missingheads and ::commonheads)
125 125 # + (commonheads and ::missingheads))"
126 126 # )
127 127 #
128 128 # while trying to push we already computed the following:
129 129 # common = (::commonheads)
130 130 # missing = ((commonheads::missingheads) - commonheads)
131 131 #
132 132 # We can pick:
133 133 # * missingheads part of common (::commonheads)
134 134 common = set(self.outgoing.common)
135 135 nm = self.repo.changelog.nodemap
136 136 cheads = [node for node in self.revs if nm[node] in common]
137 137 # and
138 138 # * commonheads parents on missing
139 139 revset = unfi.set('%ln and parents(roots(%ln))',
140 140 self.outgoing.commonheads,
141 141 self.outgoing.missing)
142 142 cheads.extend(c.node() for c in revset)
143 143 return cheads
144 144
145 145 @property
146 146 def commonheads(self):
147 147 """set of all common heads after changeset bundle push"""
148 148 if self.cgresult:
149 149 return self.futureheads
150 150 else:
151 151 return self.fallbackheads
152 152
153 153 # mapping of message used when pushing bookmark
154 154 bookmsgmap = {'update': (_("updating bookmark %s\n"),
155 155 _('updating bookmark %s failed!\n')),
156 156 'export': (_("exporting bookmark %s\n"),
157 157 _('exporting bookmark %s failed!\n')),
158 158 'delete': (_("deleting remote bookmark %s\n"),
159 159 _('deleting remote bookmark %s failed!\n')),
160 160 }
161 161
162 162
163 163 def push(repo, remote, force=False, revs=None, newbranch=False, bookmarks=()):
164 164 '''Push outgoing changesets (limited by revs) from a local
165 165 repository to remote. Return an integer:
166 166 - None means nothing to push
167 167 - 0 means HTTP error
168 168 - 1 means we pushed and remote head count is unchanged *or*
169 169 we have outgoing changesets but refused to push
170 170 - other values as described by addchangegroup()
171 171 '''
172 172 pushop = pushoperation(repo, remote, force, revs, newbranch, bookmarks)
173 173 if pushop.remote.local():
174 174 missing = (set(pushop.repo.requirements)
175 175 - pushop.remote.local().supported)
176 176 if missing:
177 177 msg = _("required features are not"
178 178 " supported in the destination:"
179 179 " %s") % (', '.join(sorted(missing)))
180 180 raise util.Abort(msg)
181 181
182 182 # there are two ways to push to remote repo:
183 183 #
184 184 # addchangegroup assumes local user can lock remote
185 185 # repo (local filesystem, old ssh servers).
186 186 #
187 187 # unbundle assumes local user cannot lock remote repo (new ssh
188 188 # servers, http servers).
189 189
190 190 if not pushop.remote.canpush():
191 191 raise util.Abort(_("destination does not support push"))
192 192 # get local lock as we might write phase data
193 193 locallock = None
194 194 try:
195 195 locallock = pushop.repo.lock()
196 196 pushop.locallocked = True
197 197 except IOError, err:
198 198 pushop.locallocked = False
199 199 if err.errno != errno.EACCES:
200 200 raise
201 201 # source repo cannot be locked.
202 202 # We do not abort the push, but just disable the local phase
203 203 # synchronisation.
204 204 msg = 'cannot lock source repository: %s\n' % err
205 205 pushop.ui.debug(msg)
206 206 try:
207 207 pushop.repo.checkpush(pushop)
208 208 lock = None
209 209 unbundle = pushop.remote.capable('unbundle')
210 210 if not unbundle:
211 211 lock = pushop.remote.lock()
212 212 try:
213 213 _pushdiscovery(pushop)
214 214 if (pushop.repo.ui.configbool('experimental', 'bundle2-exp',
215 215 False)
216 216 and pushop.remote.capable('bundle2-exp')):
217 217 _pushbundle2(pushop)
218 218 _pushchangeset(pushop)
219 219 _pushsyncphase(pushop)
220 220 _pushobsolete(pushop)
221 221 _pushbookmark(pushop)
222 222 finally:
223 223 if lock is not None:
224 224 lock.release()
225 225 finally:
226 226 if locallock is not None:
227 227 locallock.release()
228 228
229 229 return pushop
230 230
231 231 # list of steps to perform discovery before push
232 232 pushdiscoveryorder = []
233 233
234 234 # Mapping between step name and function
235 235 #
236 236 # This exists to help extensions wrap steps if necessary
237 237 pushdiscoverymapping = {}
238 238
239 239 def pushdiscovery(stepname):
240 240 """decorator for function performing discovery before push
241 241
242 242 The function is added to the step -> function mapping and appended to the
243 243 list of steps. Beware that decorated function will be added in order (this
244 244 may matter).
245 245
246 246 You can only use this decorator for a new step, if you want to wrap a step
247 247 from an extension, change the pushdiscovery dictionary directly."""
248 248 def dec(func):
249 249 assert stepname not in pushdiscoverymapping
250 250 pushdiscoverymapping[stepname] = func
251 251 pushdiscoveryorder.append(stepname)
252 252 return func
253 253 return dec
254 254
255 255 def _pushdiscovery(pushop):
256 256 """Run all discovery steps"""
257 257 for stepname in pushdiscoveryorder:
258 258 step = pushdiscoverymapping[stepname]
259 259 step(pushop)
260 260
261 261 @pushdiscovery('changeset')
262 262 def _pushdiscoverychangeset(pushop):
263 263 """discover the changeset that need to be pushed"""
264 264 unfi = pushop.repo.unfiltered()
265 265 fci = discovery.findcommonincoming
266 266 commoninc = fci(unfi, pushop.remote, force=pushop.force)
267 267 common, inc, remoteheads = commoninc
268 268 fco = discovery.findcommonoutgoing
269 269 outgoing = fco(unfi, pushop.remote, onlyheads=pushop.revs,
270 270 commoninc=commoninc, force=pushop.force)
271 271 pushop.outgoing = outgoing
272 272 pushop.remoteheads = remoteheads
273 273 pushop.incoming = inc
274 274
275 275 @pushdiscovery('phase')
276 276 def _pushdiscoveryphase(pushop):
277 277 """discover the phase that needs to be pushed
278 278
279 279 (computed for both success and failure case for changesets push)"""
280 280 outgoing = pushop.outgoing
281 281 unfi = pushop.repo.unfiltered()
282 282 remotephases = pushop.remote.listkeys('phases')
283 283 publishing = remotephases.get('publishing', False)
284 284 ana = phases.analyzeremotephases(pushop.repo,
285 285 pushop.fallbackheads,
286 286 remotephases)
287 287 pheads, droots = ana
288 288 extracond = ''
289 289 if not publishing:
290 290 extracond = ' and public()'
291 291 revset = 'heads((%%ln::%%ln) %s)' % extracond
292 292 # Get the list of all revs draft on remote by public here.
293 293 # XXX Beware that revset break if droots is not strictly
294 294 # XXX root we may want to ensure it is but it is costly
295 295 fallback = list(unfi.set(revset, droots, pushop.fallbackheads))
296 296 if not outgoing.missing:
297 297 future = fallback
298 298 else:
299 299 # adds changeset we are going to push as draft
300 300 #
301 301 # should not be necessary for publishing server, but because of an
302 302 # issue fixed in xxxxx we have to do it anyway.
303 303 fdroots = list(unfi.set('roots(%ln + %ln::)',
304 304 outgoing.missing, droots))
305 305 fdroots = [f.node() for f in fdroots]
306 306 future = list(unfi.set(revset, fdroots, pushop.futureheads))
307 307 pushop.outdatedphases = future
308 308 pushop.fallbackoutdatedphases = fallback
309 309
310 310 @pushdiscovery('obsmarker')
311 311 def _pushdiscoveryobsmarkers(pushop):
312 312 if (obsolete.isenabled(pushop.repo, obsolete.exchangeopt)
313 313 and pushop.repo.obsstore
314 314 and 'obsolete' in pushop.remote.listkeys('namespaces')):
315 315 repo = pushop.repo
316 316 # very naive computation, that can be quite expensive on big repo.
317 317 # However: evolution is currently slow on them anyway.
318 318 nodes = (c.node() for c in repo.set('::%ln', pushop.futureheads))
319 319 pushop.outobsmarkers = pushop.repo.obsstore.relevantmarkers(nodes)
320 320
321 321 @pushdiscovery('bookmarks')
322 322 def _pushdiscoverybookmarks(pushop):
323 323 ui = pushop.ui
324 324 repo = pushop.repo.unfiltered()
325 325 remote = pushop.remote
326 326 ui.debug("checking for updated bookmarks\n")
327 327 ancestors = ()
328 328 if pushop.revs:
329 329 revnums = map(repo.changelog.rev, pushop.revs)
330 330 ancestors = repo.changelog.ancestors(revnums, inclusive=True)
331 331 remotebookmark = remote.listkeys('bookmarks')
332 332
333 333 explicit = set(pushop.bookmarks)
334 334
335 335 comp = bookmod.compare(repo, repo._bookmarks, remotebookmark, srchex=hex)
336 336 addsrc, adddst, advsrc, advdst, diverge, differ, invalid, same = comp
337 337 for b, scid, dcid in advsrc:
338 338 if b in explicit:
339 339 explicit.remove(b)
340 340 if not ancestors or repo[scid].rev() in ancestors:
341 341 pushop.outbookmarks.append((b, dcid, scid))
342 342 # search added bookmark
343 343 for b, scid, dcid in addsrc:
344 344 if b in explicit:
345 345 explicit.remove(b)
346 346 pushop.outbookmarks.append((b, '', scid))
347 347 # search for overwritten bookmark
348 348 for b, scid, dcid in advdst + diverge + differ:
349 349 if b in explicit:
350 350 explicit.remove(b)
351 351 pushop.outbookmarks.append((b, dcid, scid))
352 352 # search for bookmark to delete
353 353 for b, scid, dcid in adddst:
354 354 if b in explicit:
355 355 explicit.remove(b)
356 356 # treat as "deleted locally"
357 357 pushop.outbookmarks.append((b, dcid, ''))
358 358 # identical bookmarks shouldn't get reported
359 359 for b, scid, dcid in same:
360 360 if b in explicit:
361 361 explicit.remove(b)
362 362
363 363 if explicit:
364 364 explicit = sorted(explicit)
365 365 # we should probably list all of them
366 366 ui.warn(_('bookmark %s does not exist on the local '
367 367 'or remote repository!\n') % explicit[0])
368 368 pushop.bkresult = 2
369 369
370 370 pushop.outbookmarks.sort()
371 371
372 372 def _pushcheckoutgoing(pushop):
373 373 outgoing = pushop.outgoing
374 374 unfi = pushop.repo.unfiltered()
375 375 if not outgoing.missing:
376 376 # nothing to push
377 377 scmutil.nochangesfound(unfi.ui, unfi, outgoing.excluded)
378 378 return False
379 379 # something to push
380 380 if not pushop.force:
381 381 # if repo.obsstore == False --> no obsolete
382 382 # then, save the iteration
383 383 if unfi.obsstore:
384 384 # this message are here for 80 char limit reason
385 385 mso = _("push includes obsolete changeset: %s!")
386 386 mst = {"unstable": _("push includes unstable changeset: %s!"),
387 387 "bumped": _("push includes bumped changeset: %s!"),
388 388 "divergent": _("push includes divergent changeset: %s!")}
389 389 # If we are to push if there is at least one
390 390 # obsolete or unstable changeset in missing, at
391 391 # least one of the missinghead will be obsolete or
392 392 # unstable. So checking heads only is ok
393 393 for node in outgoing.missingheads:
394 394 ctx = unfi[node]
395 395 if ctx.obsolete():
396 396 raise util.Abort(mso % ctx)
397 397 elif ctx.troubled():
398 398 raise util.Abort(mst[ctx.troubles()[0]] % ctx)
399 399 newbm = pushop.ui.configlist('bookmarks', 'pushing')
400 400 discovery.checkheads(unfi, pushop.remote, outgoing,
401 401 pushop.remoteheads,
402 402 pushop.newbranch,
403 403 bool(pushop.incoming),
404 404 newbm)
405 405 return True
406 406
407 407 # List of names of steps to perform for an outgoing bundle2, order matters.
408 408 b2partsgenorder = []
409 409
410 410 # Mapping between step name and function
411 411 #
412 412 # This exists to help extensions wrap steps if necessary
413 413 b2partsgenmapping = {}
414 414
415 415 def b2partsgenerator(stepname):
416 416 """decorator for function generating bundle2 part
417 417
418 418 The function is added to the step -> function mapping and appended to the
419 419 list of steps. Beware that decorated functions will be added in order
420 420 (this may matter).
421 421
422 422 You can only use this decorator for new steps, if you want to wrap a step
423 423 from an extension, attack the b2partsgenmapping dictionary directly."""
424 424 def dec(func):
425 425 assert stepname not in b2partsgenmapping
426 426 b2partsgenmapping[stepname] = func
427 427 b2partsgenorder.append(stepname)
428 428 return func
429 429 return dec
430 430
431 431 @b2partsgenerator('changeset')
432 432 def _pushb2ctx(pushop, bundler):
433 433 """handle changegroup push through bundle2
434 434
435 435 addchangegroup result is stored in the ``pushop.cgresult`` attribute.
436 436 """
437 437 if 'changesets' in pushop.stepsdone:
438 438 return
439 439 pushop.stepsdone.add('changesets')
440 440 # Send known heads to the server for race detection.
441 441 if not _pushcheckoutgoing(pushop):
442 442 return
443 443 pushop.repo.prepushoutgoinghooks(pushop.repo,
444 444 pushop.remote,
445 445 pushop.outgoing)
446 446 if not pushop.force:
447 447 bundler.newpart('B2X:CHECK:HEADS', data=iter(pushop.remoteheads))
448 cg = changegroup.getlocalchangegroup(pushop.repo, 'push', pushop.outgoing)
449 cgpart = bundler.newpart('B2X:CHANGEGROUP', data=cg.getchunks())
448 cg = changegroup.getlocalchangegroupraw(pushop.repo, 'push',
449 pushop.outgoing)
450 cgpart = bundler.newpart('B2X:CHANGEGROUP', data=cg)
450 451 def handlereply(op):
451 452 """extract addchangegroup returns from server reply"""
452 453 cgreplies = op.records.getreplies(cgpart.id)
453 454 assert len(cgreplies['changegroup']) == 1
454 455 pushop.cgresult = cgreplies['changegroup'][0]['return']
455 456 return handlereply
456 457
457 458 @b2partsgenerator('phase')
458 459 def _pushb2phases(pushop, bundler):
459 460 """handle phase push through bundle2"""
460 461 if 'phases' in pushop.stepsdone:
461 462 return
462 463 b2caps = bundle2.bundle2caps(pushop.remote)
463 464 if not 'b2x:pushkey' in b2caps:
464 465 return
465 466 pushop.stepsdone.add('phases')
466 467 part2node = []
467 468 enc = pushkey.encode
468 469 for newremotehead in pushop.outdatedphases:
469 470 part = bundler.newpart('b2x:pushkey')
470 471 part.addparam('namespace', enc('phases'))
471 472 part.addparam('key', enc(newremotehead.hex()))
472 473 part.addparam('old', enc(str(phases.draft)))
473 474 part.addparam('new', enc(str(phases.public)))
474 475 part2node.append((part.id, newremotehead))
475 476 def handlereply(op):
476 477 for partid, node in part2node:
477 478 partrep = op.records.getreplies(partid)
478 479 results = partrep['pushkey']
479 480 assert len(results) <= 1
480 481 msg = None
481 482 if not results:
482 483 msg = _('server ignored update of %s to public!\n') % node
483 484 elif not int(results[0]['return']):
484 485 msg = _('updating %s to public failed!\n') % node
485 486 if msg is not None:
486 487 pushop.ui.warn(msg)
487 488 return handlereply
488 489
489 490 @b2partsgenerator('obsmarkers')
490 491 def _pushb2obsmarkers(pushop, bundler):
491 492 if 'obsmarkers' in pushop.stepsdone:
492 493 return
493 494 remoteversions = bundle2.obsmarkersversion(bundler.capabilities)
494 495 if obsolete.commonversion(remoteversions) is None:
495 496 return
496 497 pushop.stepsdone.add('obsmarkers')
497 498 if pushop.outobsmarkers:
498 499 buildobsmarkerspart(bundler, pushop.outobsmarkers)
499 500
500 501 @b2partsgenerator('bookmarks')
501 502 def _pushb2bookmarks(pushop, bundler):
502 503 """handle phase push through bundle2"""
503 504 if 'bookmarks' in pushop.stepsdone:
504 505 return
505 506 b2caps = bundle2.bundle2caps(pushop.remote)
506 507 if 'b2x:pushkey' not in b2caps:
507 508 return
508 509 pushop.stepsdone.add('bookmarks')
509 510 part2book = []
510 511 enc = pushkey.encode
511 512 for book, old, new in pushop.outbookmarks:
512 513 part = bundler.newpart('b2x:pushkey')
513 514 part.addparam('namespace', enc('bookmarks'))
514 515 part.addparam('key', enc(book))
515 516 part.addparam('old', enc(old))
516 517 part.addparam('new', enc(new))
517 518 action = 'update'
518 519 if not old:
519 520 action = 'export'
520 521 elif not new:
521 522 action = 'delete'
522 523 part2book.append((part.id, book, action))
523 524
524 525
525 526 def handlereply(op):
526 527 ui = pushop.ui
527 528 for partid, book, action in part2book:
528 529 partrep = op.records.getreplies(partid)
529 530 results = partrep['pushkey']
530 531 assert len(results) <= 1
531 532 if not results:
532 533 pushop.ui.warn(_('server ignored bookmark %s update\n') % book)
533 534 else:
534 535 ret = int(results[0]['return'])
535 536 if ret:
536 537 ui.status(bookmsgmap[action][0] % book)
537 538 else:
538 539 ui.warn(bookmsgmap[action][1] % book)
539 540 if pushop.bkresult is not None:
540 541 pushop.bkresult = 1
541 542 return handlereply
542 543
543 544
544 545 def _pushbundle2(pushop):
545 546 """push data to the remote using bundle2
546 547
547 548 The only currently supported type of data is changegroup but this will
548 549 evolve in the future."""
549 550 bundler = bundle2.bundle20(pushop.ui, bundle2.bundle2caps(pushop.remote))
550 551 # create reply capability
551 552 capsblob = bundle2.encodecaps(bundle2.getrepocaps(pushop.repo))
552 553 bundler.newpart('b2x:replycaps', data=capsblob)
553 554 replyhandlers = []
554 555 for partgenname in b2partsgenorder:
555 556 partgen = b2partsgenmapping[partgenname]
556 557 ret = partgen(pushop, bundler)
557 558 if callable(ret):
558 559 replyhandlers.append(ret)
559 560 # do not push if nothing to push
560 561 if bundler.nbparts <= 1:
561 562 return
562 563 stream = util.chunkbuffer(bundler.getchunks())
563 564 try:
564 565 reply = pushop.remote.unbundle(stream, ['force'], 'push')
565 566 except error.BundleValueError, exc:
566 567 raise util.Abort('missing support for %s' % exc)
567 568 try:
568 569 op = bundle2.processbundle(pushop.repo, reply)
569 570 except error.BundleValueError, exc:
570 571 raise util.Abort('missing support for %s' % exc)
571 572 for rephand in replyhandlers:
572 573 rephand(op)
573 574
574 575 def _pushchangeset(pushop):
575 576 """Make the actual push of changeset bundle to remote repo"""
576 577 if 'changesets' in pushop.stepsdone:
577 578 return
578 579 pushop.stepsdone.add('changesets')
579 580 if not _pushcheckoutgoing(pushop):
580 581 return
581 582 pushop.repo.prepushoutgoinghooks(pushop.repo,
582 583 pushop.remote,
583 584 pushop.outgoing)
584 585 outgoing = pushop.outgoing
585 586 unbundle = pushop.remote.capable('unbundle')
586 587 # TODO: get bundlecaps from remote
587 588 bundlecaps = None
588 589 # create a changegroup from local
589 590 if pushop.revs is None and not (outgoing.excluded
590 591 or pushop.repo.changelog.filteredrevs):
591 592 # push everything,
592 593 # use the fast path, no race possible on push
593 594 bundler = changegroup.cg1packer(pushop.repo, bundlecaps)
594 595 cg = changegroup.getsubset(pushop.repo,
595 596 outgoing,
596 597 bundler,
597 598 'push',
598 599 fastpath=True)
599 600 else:
600 601 cg = changegroup.getlocalchangegroup(pushop.repo, 'push', outgoing,
601 602 bundlecaps)
602 603
603 604 # apply changegroup to remote
604 605 if unbundle:
605 606 # local repo finds heads on server, finds out what
606 607 # revs it must push. once revs transferred, if server
607 608 # finds it has different heads (someone else won
608 609 # commit/push race), server aborts.
609 610 if pushop.force:
610 611 remoteheads = ['force']
611 612 else:
612 613 remoteheads = pushop.remoteheads
613 614 # ssh: return remote's addchangegroup()
614 615 # http: return remote's addchangegroup() or 0 for error
615 616 pushop.cgresult = pushop.remote.unbundle(cg, remoteheads,
616 617 pushop.repo.url())
617 618 else:
618 619 # we return an integer indicating remote head count
619 620 # change
620 621 pushop.cgresult = pushop.remote.addchangegroup(cg, 'push',
621 622 pushop.repo.url())
622 623
623 624 def _pushsyncphase(pushop):
624 625 """synchronise phase information locally and remotely"""
625 626 cheads = pushop.commonheads
626 627 # even when we don't push, exchanging phase data is useful
627 628 remotephases = pushop.remote.listkeys('phases')
628 629 if (pushop.ui.configbool('ui', '_usedassubrepo', False)
629 630 and remotephases # server supports phases
630 631 and pushop.cgresult is None # nothing was pushed
631 632 and remotephases.get('publishing', False)):
632 633 # When:
633 634 # - this is a subrepo push
634 635 # - and remote support phase
635 636 # - and no changeset was pushed
636 637 # - and remote is publishing
637 638 # We may be in issue 3871 case!
638 639 # We drop the possible phase synchronisation done by
639 640 # courtesy to publish changesets possibly locally draft
640 641 # on the remote.
641 642 remotephases = {'publishing': 'True'}
642 643 if not remotephases: # old server or public only reply from non-publishing
643 644 _localphasemove(pushop, cheads)
644 645 # don't push any phase data as there is nothing to push
645 646 else:
646 647 ana = phases.analyzeremotephases(pushop.repo, cheads,
647 648 remotephases)
648 649 pheads, droots = ana
649 650 ### Apply remote phase on local
650 651 if remotephases.get('publishing', False):
651 652 _localphasemove(pushop, cheads)
652 653 else: # publish = False
653 654 _localphasemove(pushop, pheads)
654 655 _localphasemove(pushop, cheads, phases.draft)
655 656 ### Apply local phase on remote
656 657
657 658 if pushop.cgresult:
658 659 if 'phases' in pushop.stepsdone:
659 660 # phases already pushed though bundle2
660 661 return
661 662 outdated = pushop.outdatedphases
662 663 else:
663 664 outdated = pushop.fallbackoutdatedphases
664 665
665 666 pushop.stepsdone.add('phases')
666 667
667 668 # filter heads already turned public by the push
668 669 outdated = [c for c in outdated if c.node() not in pheads]
669 670 b2caps = bundle2.bundle2caps(pushop.remote)
670 671 if 'b2x:pushkey' in b2caps:
671 672 # server supports bundle2, let's do a batched push through it
672 673 #
673 674 # This will eventually be unified with the changesets bundle2 push
674 675 bundler = bundle2.bundle20(pushop.ui, b2caps)
675 676 capsblob = bundle2.encodecaps(bundle2.getrepocaps(pushop.repo))
676 677 bundler.newpart('b2x:replycaps', data=capsblob)
677 678 part2node = []
678 679 enc = pushkey.encode
679 680 for newremotehead in outdated:
680 681 part = bundler.newpart('b2x:pushkey')
681 682 part.addparam('namespace', enc('phases'))
682 683 part.addparam('key', enc(newremotehead.hex()))
683 684 part.addparam('old', enc(str(phases.draft)))
684 685 part.addparam('new', enc(str(phases.public)))
685 686 part2node.append((part.id, newremotehead))
686 687 stream = util.chunkbuffer(bundler.getchunks())
687 688 try:
688 689 reply = pushop.remote.unbundle(stream, ['force'], 'push')
689 690 op = bundle2.processbundle(pushop.repo, reply)
690 691 except error.BundleValueError, exc:
691 692 raise util.Abort('missing support for %s' % exc)
692 693 for partid, node in part2node:
693 694 partrep = op.records.getreplies(partid)
694 695 results = partrep['pushkey']
695 696 assert len(results) <= 1
696 697 msg = None
697 698 if not results:
698 699 msg = _('server ignored update of %s to public!\n') % node
699 700 elif not int(results[0]['return']):
700 701 msg = _('updating %s to public failed!\n') % node
701 702 if msg is not None:
702 703 pushop.ui.warn(msg)
703 704
704 705 else:
705 706 # fallback to independent pushkey command
706 707 for newremotehead in outdated:
707 708 r = pushop.remote.pushkey('phases',
708 709 newremotehead.hex(),
709 710 str(phases.draft),
710 711 str(phases.public))
711 712 if not r:
712 713 pushop.ui.warn(_('updating %s to public failed!\n')
713 714 % newremotehead)
714 715
715 716 def _localphasemove(pushop, nodes, phase=phases.public):
716 717 """move <nodes> to <phase> in the local source repo"""
717 718 if pushop.locallocked:
718 719 tr = pushop.repo.transaction('push-phase-sync')
719 720 try:
720 721 phases.advanceboundary(pushop.repo, tr, phase, nodes)
721 722 tr.close()
722 723 finally:
723 724 tr.release()
724 725 else:
725 726 # repo is not locked, do not change any phases!
726 727 # Informs the user that phases should have been moved when
727 728 # applicable.
728 729 actualmoves = [n for n in nodes if phase < pushop.repo[n].phase()]
729 730 phasestr = phases.phasenames[phase]
730 731 if actualmoves:
731 732 pushop.ui.status(_('cannot lock source repo, skipping '
732 733 'local %s phase update\n') % phasestr)
733 734
734 735 def _pushobsolete(pushop):
735 736 """utility function to push obsolete markers to a remote"""
736 737 if 'obsmarkers' in pushop.stepsdone:
737 738 return
738 739 pushop.ui.debug('try to push obsolete markers to remote\n')
739 740 repo = pushop.repo
740 741 remote = pushop.remote
741 742 pushop.stepsdone.add('obsmarkers')
742 743 if pushop.outobsmarkers:
743 744 rslts = []
744 745 remotedata = obsolete._pushkeyescape(pushop.outobsmarkers)
745 746 for key in sorted(remotedata, reverse=True):
746 747 # reverse sort to ensure we end with dump0
747 748 data = remotedata[key]
748 749 rslts.append(remote.pushkey('obsolete', key, '', data))
749 750 if [r for r in rslts if not r]:
750 751 msg = _('failed to push some obsolete markers!\n')
751 752 repo.ui.warn(msg)
752 753
753 754 def _pushbookmark(pushop):
754 755 """Update bookmark position on remote"""
755 756 if pushop.cgresult == 0 or 'bookmarks' in pushop.stepsdone:
756 757 return
757 758 pushop.stepsdone.add('bookmarks')
758 759 ui = pushop.ui
759 760 remote = pushop.remote
760 761
761 762 for b, old, new in pushop.outbookmarks:
762 763 action = 'update'
763 764 if not old:
764 765 action = 'export'
765 766 elif not new:
766 767 action = 'delete'
767 768 if remote.pushkey('bookmarks', b, old, new):
768 769 ui.status(bookmsgmap[action][0] % b)
769 770 else:
770 771 ui.warn(bookmsgmap[action][1] % b)
771 772 # discovery can have set the value form invalid entry
772 773 if pushop.bkresult is not None:
773 774 pushop.bkresult = 1
774 775
775 776 class pulloperation(object):
776 777 """A object that represent a single pull operation
777 778
778 779 It purpose is to carry push related state and very common operation.
779 780
780 781 A new should be created at the beginning of each pull and discarded
781 782 afterward.
782 783 """
783 784
784 785 def __init__(self, repo, remote, heads=None, force=False, bookmarks=()):
785 786 # repo we pull into
786 787 self.repo = repo
787 788 # repo we pull from
788 789 self.remote = remote
789 790 # revision we try to pull (None is "all")
790 791 self.heads = heads
791 792 # bookmark pulled explicitly
792 793 self.explicitbookmarks = bookmarks
793 794 # do we force pull?
794 795 self.force = force
795 796 # the name the pull transaction
796 797 self._trname = 'pull\n' + util.hidepassword(remote.url())
797 798 # hold the transaction once created
798 799 self._tr = None
799 800 # set of common changeset between local and remote before pull
800 801 self.common = None
801 802 # set of pulled head
802 803 self.rheads = None
803 804 # list of missing changeset to fetch remotely
804 805 self.fetch = None
805 806 # remote bookmarks data
806 807 self.remotebookmarks = None
807 808 # result of changegroup pulling (used as return code by pull)
808 809 self.cgresult = None
809 810 # list of step already done
810 811 self.stepsdone = set()
811 812
812 813 @util.propertycache
813 814 def pulledsubset(self):
814 815 """heads of the set of changeset target by the pull"""
815 816 # compute target subset
816 817 if self.heads is None:
817 818 # We pulled every thing possible
818 819 # sync on everything common
819 820 c = set(self.common)
820 821 ret = list(self.common)
821 822 for n in self.rheads:
822 823 if n not in c:
823 824 ret.append(n)
824 825 return ret
825 826 else:
826 827 # We pulled a specific subset
827 828 # sync on this subset
828 829 return self.heads
829 830
830 831 def gettransaction(self):
831 832 """get appropriate pull transaction, creating it if needed"""
832 833 if self._tr is None:
833 834 self._tr = self.repo.transaction(self._trname)
834 835 self._tr.hookargs['source'] = 'pull'
835 836 self._tr.hookargs['url'] = self.remote.url()
836 837 return self._tr
837 838
838 839 def closetransaction(self):
839 840 """close transaction if created"""
840 841 if self._tr is not None:
841 842 repo = self.repo
842 843 cl = repo.unfiltered().changelog
843 844 p = cl.writepending() and repo.root or ""
844 845 p = cl.writepending() and repo.root or ""
845 846 repo.hook('b2x-pretransactionclose', throw=True, pending=p,
846 847 **self._tr.hookargs)
847 848 self._tr.close()
848 849 hookargs = dict(self._tr.hookargs)
849 850 def runhooks():
850 851 repo.hook('b2x-transactionclose', **hookargs)
851 852 repo._afterlock(runhooks)
852 853
853 854 def releasetransaction(self):
854 855 """release transaction if created"""
855 856 if self._tr is not None:
856 857 self._tr.release()
857 858
858 859 def pull(repo, remote, heads=None, force=False, bookmarks=()):
859 860 pullop = pulloperation(repo, remote, heads, force, bookmarks=bookmarks)
860 861 if pullop.remote.local():
861 862 missing = set(pullop.remote.requirements) - pullop.repo.supported
862 863 if missing:
863 864 msg = _("required features are not"
864 865 " supported in the destination:"
865 866 " %s") % (', '.join(sorted(missing)))
866 867 raise util.Abort(msg)
867 868
868 869 pullop.remotebookmarks = remote.listkeys('bookmarks')
869 870 lock = pullop.repo.lock()
870 871 try:
871 872 _pulldiscovery(pullop)
872 873 if (pullop.repo.ui.configbool('experimental', 'bundle2-exp', False)
873 874 and pullop.remote.capable('bundle2-exp')):
874 875 _pullbundle2(pullop)
875 876 _pullchangeset(pullop)
876 877 _pullphase(pullop)
877 878 _pullbookmarks(pullop)
878 879 _pullobsolete(pullop)
879 880 pullop.closetransaction()
880 881 finally:
881 882 pullop.releasetransaction()
882 883 lock.release()
883 884
884 885 return pullop
885 886
886 887 # list of steps to perform discovery before pull
887 888 pulldiscoveryorder = []
888 889
889 890 # Mapping between step name and function
890 891 #
891 892 # This exists to help extensions wrap steps if necessary
892 893 pulldiscoverymapping = {}
893 894
894 895 def pulldiscovery(stepname):
895 896 """decorator for function performing discovery before pull
896 897
897 898 The function is added to the step -> function mapping and appended to the
898 899 list of steps. Beware that decorated function will be added in order (this
899 900 may matter).
900 901
901 902 You can only use this decorator for a new step, if you want to wrap a step
902 903 from an extension, change the pulldiscovery dictionary directly."""
903 904 def dec(func):
904 905 assert stepname not in pulldiscoverymapping
905 906 pulldiscoverymapping[stepname] = func
906 907 pulldiscoveryorder.append(stepname)
907 908 return func
908 909 return dec
909 910
910 911 def _pulldiscovery(pullop):
911 912 """Run all discovery steps"""
912 913 for stepname in pulldiscoveryorder:
913 914 step = pulldiscoverymapping[stepname]
914 915 step(pullop)
915 916
916 917 @pulldiscovery('changegroup')
917 918 def _pulldiscoverychangegroup(pullop):
918 919 """discovery phase for the pull
919 920
920 921 Current handle changeset discovery only, will change handle all discovery
921 922 at some point."""
922 923 tmp = discovery.findcommonincoming(pullop.repo.unfiltered(),
923 924 pullop.remote,
924 925 heads=pullop.heads,
925 926 force=pullop.force)
926 927 pullop.common, pullop.fetch, pullop.rheads = tmp
927 928
928 929 def _pullbundle2(pullop):
929 930 """pull data using bundle2
930 931
931 932 For now, the only supported data are changegroup."""
932 933 remotecaps = bundle2.bundle2caps(pullop.remote)
933 934 kwargs = {'bundlecaps': caps20to10(pullop.repo)}
934 935 # pulling changegroup
935 936 pullop.stepsdone.add('changegroup')
936 937
937 938 kwargs['common'] = pullop.common
938 939 kwargs['heads'] = pullop.heads or pullop.rheads
939 940 kwargs['cg'] = pullop.fetch
940 941 if 'b2x:listkeys' in remotecaps:
941 942 kwargs['listkeys'] = ['phase', 'bookmarks']
942 943 if not pullop.fetch:
943 944 pullop.repo.ui.status(_("no changes found\n"))
944 945 pullop.cgresult = 0
945 946 else:
946 947 if pullop.heads is None and list(pullop.common) == [nullid]:
947 948 pullop.repo.ui.status(_("requesting all changes\n"))
948 949 if obsolete.isenabled(pullop.repo, obsolete.exchangeopt):
949 950 remoteversions = bundle2.obsmarkersversion(remotecaps)
950 951 if obsolete.commonversion(remoteversions) is not None:
951 952 kwargs['obsmarkers'] = True
952 953 pullop.stepsdone.add('obsmarkers')
953 954 _pullbundle2extraprepare(pullop, kwargs)
954 955 if kwargs.keys() == ['format']:
955 956 return # nothing to pull
956 957 bundle = pullop.remote.getbundle('pull', **kwargs)
957 958 try:
958 959 op = bundle2.processbundle(pullop.repo, bundle, pullop.gettransaction)
959 960 except error.BundleValueError, exc:
960 961 raise util.Abort('missing support for %s' % exc)
961 962
962 963 if pullop.fetch:
963 964 changedheads = 0
964 965 pullop.cgresult = 1
965 966 for cg in op.records['changegroup']:
966 967 ret = cg['return']
967 968 # If any changegroup result is 0, return 0
968 969 if ret == 0:
969 970 pullop.cgresult = 0
970 971 break
971 972 if ret < -1:
972 973 changedheads += ret + 1
973 974 elif ret > 1:
974 975 changedheads += ret - 1
975 976 if changedheads > 0:
976 977 pullop.cgresult = 1 + changedheads
977 978 elif changedheads < 0:
978 979 pullop.cgresult = -1 + changedheads
979 980
980 981 # processing phases change
981 982 for namespace, value in op.records['listkeys']:
982 983 if namespace == 'phases':
983 984 _pullapplyphases(pullop, value)
984 985
985 986 # processing bookmark update
986 987 for namespace, value in op.records['listkeys']:
987 988 if namespace == 'bookmarks':
988 989 pullop.remotebookmarks = value
989 990 _pullbookmarks(pullop)
990 991
991 992 def _pullbundle2extraprepare(pullop, kwargs):
992 993 """hook function so that extensions can extend the getbundle call"""
993 994 pass
994 995
995 996 def _pullchangeset(pullop):
996 997 """pull changeset from unbundle into the local repo"""
997 998 # We delay the open of the transaction as late as possible so we
998 999 # don't open transaction for nothing or you break future useful
999 1000 # rollback call
1000 1001 if 'changegroup' in pullop.stepsdone:
1001 1002 return
1002 1003 pullop.stepsdone.add('changegroup')
1003 1004 if not pullop.fetch:
1004 1005 pullop.repo.ui.status(_("no changes found\n"))
1005 1006 pullop.cgresult = 0
1006 1007 return
1007 1008 pullop.gettransaction()
1008 1009 if pullop.heads is None and list(pullop.common) == [nullid]:
1009 1010 pullop.repo.ui.status(_("requesting all changes\n"))
1010 1011 elif pullop.heads is None and pullop.remote.capable('changegroupsubset'):
1011 1012 # issue1320, avoid a race if remote changed after discovery
1012 1013 pullop.heads = pullop.rheads
1013 1014
1014 1015 if pullop.remote.capable('getbundle'):
1015 1016 # TODO: get bundlecaps from remote
1016 1017 cg = pullop.remote.getbundle('pull', common=pullop.common,
1017 1018 heads=pullop.heads or pullop.rheads)
1018 1019 elif pullop.heads is None:
1019 1020 cg = pullop.remote.changegroup(pullop.fetch, 'pull')
1020 1021 elif not pullop.remote.capable('changegroupsubset'):
1021 1022 raise util.Abort(_("partial pull cannot be done because "
1022 1023 "other repository doesn't support "
1023 1024 "changegroupsubset."))
1024 1025 else:
1025 1026 cg = pullop.remote.changegroupsubset(pullop.fetch, pullop.heads, 'pull')
1026 1027 pullop.cgresult = changegroup.addchangegroup(pullop.repo, cg, 'pull',
1027 1028 pullop.remote.url())
1028 1029
1029 1030 def _pullphase(pullop):
1030 1031 # Get remote phases data from remote
1031 1032 if 'phases' in pullop.stepsdone:
1032 1033 return
1033 1034 remotephases = pullop.remote.listkeys('phases')
1034 1035 _pullapplyphases(pullop, remotephases)
1035 1036
1036 1037 def _pullapplyphases(pullop, remotephases):
1037 1038 """apply phase movement from observed remote state"""
1038 1039 if 'phases' in pullop.stepsdone:
1039 1040 return
1040 1041 pullop.stepsdone.add('phases')
1041 1042 publishing = bool(remotephases.get('publishing', False))
1042 1043 if remotephases and not publishing:
1043 1044 # remote is new and unpublishing
1044 1045 pheads, _dr = phases.analyzeremotephases(pullop.repo,
1045 1046 pullop.pulledsubset,
1046 1047 remotephases)
1047 1048 dheads = pullop.pulledsubset
1048 1049 else:
1049 1050 # Remote is old or publishing all common changesets
1050 1051 # should be seen as public
1051 1052 pheads = pullop.pulledsubset
1052 1053 dheads = []
1053 1054 unfi = pullop.repo.unfiltered()
1054 1055 phase = unfi._phasecache.phase
1055 1056 rev = unfi.changelog.nodemap.get
1056 1057 public = phases.public
1057 1058 draft = phases.draft
1058 1059
1059 1060 # exclude changesets already public locally and update the others
1060 1061 pheads = [pn for pn in pheads if phase(unfi, rev(pn)) > public]
1061 1062 if pheads:
1062 1063 tr = pullop.gettransaction()
1063 1064 phases.advanceboundary(pullop.repo, tr, public, pheads)
1064 1065
1065 1066 # exclude changesets already draft locally and update the others
1066 1067 dheads = [pn for pn in dheads if phase(unfi, rev(pn)) > draft]
1067 1068 if dheads:
1068 1069 tr = pullop.gettransaction()
1069 1070 phases.advanceboundary(pullop.repo, tr, draft, dheads)
1070 1071
1071 1072 def _pullbookmarks(pullop):
1072 1073 """process the remote bookmark information to update the local one"""
1073 1074 if 'bookmarks' in pullop.stepsdone:
1074 1075 return
1075 1076 pullop.stepsdone.add('bookmarks')
1076 1077 repo = pullop.repo
1077 1078 remotebookmarks = pullop.remotebookmarks
1078 1079 bookmod.updatefromremote(repo.ui, repo, remotebookmarks,
1079 1080 pullop.remote.url(),
1080 1081 pullop.gettransaction,
1081 1082 explicit=pullop.explicitbookmarks)
1082 1083
1083 1084 def _pullobsolete(pullop):
1084 1085 """utility function to pull obsolete markers from a remote
1085 1086
1086 1087 The `gettransaction` is function that return the pull transaction, creating
1087 1088 one if necessary. We return the transaction to inform the calling code that
1088 1089 a new transaction have been created (when applicable).
1089 1090
1090 1091 Exists mostly to allow overriding for experimentation purpose"""
1091 1092 if 'obsmarkers' in pullop.stepsdone:
1092 1093 return
1093 1094 pullop.stepsdone.add('obsmarkers')
1094 1095 tr = None
1095 1096 if obsolete.isenabled(pullop.repo, obsolete.exchangeopt):
1096 1097 pullop.repo.ui.debug('fetching remote obsolete markers\n')
1097 1098 remoteobs = pullop.remote.listkeys('obsolete')
1098 1099 if 'dump0' in remoteobs:
1099 1100 tr = pullop.gettransaction()
1100 1101 for key in sorted(remoteobs, reverse=True):
1101 1102 if key.startswith('dump'):
1102 1103 data = base85.b85decode(remoteobs[key])
1103 1104 pullop.repo.obsstore.mergemarkers(tr, data)
1104 1105 pullop.repo.invalidatevolatilesets()
1105 1106 return tr
1106 1107
1107 1108 def caps20to10(repo):
1108 1109 """return a set with appropriate options to use bundle20 during getbundle"""
1109 1110 caps = set(['HG2Y'])
1110 1111 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
1111 1112 caps.add('bundle2=' + urllib.quote(capsblob))
1112 1113 return caps
1113 1114
1114 1115 # List of names of steps to perform for a bundle2 for getbundle, order matters.
1115 1116 getbundle2partsorder = []
1116 1117
1117 1118 # Mapping between step name and function
1118 1119 #
1119 1120 # This exists to help extensions wrap steps if necessary
1120 1121 getbundle2partsmapping = {}
1121 1122
1122 1123 def getbundle2partsgenerator(stepname):
1123 1124 """decorator for function generating bundle2 part for getbundle
1124 1125
1125 1126 The function is added to the step -> function mapping and appended to the
1126 1127 list of steps. Beware that decorated functions will be added in order
1127 1128 (this may matter).
1128 1129
1129 1130 You can only use this decorator for new steps, if you want to wrap a step
1130 1131 from an extension, attack the getbundle2partsmapping dictionary directly."""
1131 1132 def dec(func):
1132 1133 assert stepname not in getbundle2partsmapping
1133 1134 getbundle2partsmapping[stepname] = func
1134 1135 getbundle2partsorder.append(stepname)
1135 1136 return func
1136 1137 return dec
1137 1138
1138 1139 def getbundle(repo, source, heads=None, common=None, bundlecaps=None,
1139 1140 **kwargs):
1140 1141 """return a full bundle (with potentially multiple kind of parts)
1141 1142
1142 1143 Could be a bundle HG10 or a bundle HG2Y depending on bundlecaps
1143 1144 passed. For now, the bundle can contain only changegroup, but this will
1144 1145 changes when more part type will be available for bundle2.
1145 1146
1146 1147 This is different from changegroup.getchangegroup that only returns an HG10
1147 1148 changegroup bundle. They may eventually get reunited in the future when we
1148 1149 have a clearer idea of the API we what to query different data.
1149 1150
1150 1151 The implementation is at a very early stage and will get massive rework
1151 1152 when the API of bundle is refined.
1152 1153 """
1153 1154 # bundle10 case
1154 1155 if bundlecaps is None or 'HG2Y' not in bundlecaps:
1155 1156 if bundlecaps and not kwargs.get('cg', True):
1156 1157 raise ValueError(_('request for bundle10 must include changegroup'))
1157 1158
1158 1159 if kwargs:
1159 1160 raise ValueError(_('unsupported getbundle arguments: %s')
1160 1161 % ', '.join(sorted(kwargs.keys())))
1161 1162 return changegroup.getchangegroup(repo, source, heads=heads,
1162 1163 common=common, bundlecaps=bundlecaps)
1163 1164
1164 1165 # bundle20 case
1165 1166 b2caps = {}
1166 1167 for bcaps in bundlecaps:
1167 1168 if bcaps.startswith('bundle2='):
1168 1169 blob = urllib.unquote(bcaps[len('bundle2='):])
1169 1170 b2caps.update(bundle2.decodecaps(blob))
1170 1171 bundler = bundle2.bundle20(repo.ui, b2caps)
1171 1172
1172 1173 for name in getbundle2partsorder:
1173 1174 func = getbundle2partsmapping[name]
1174 1175 kwargs['heads'] = heads
1175 1176 kwargs['common'] = common
1176 1177 func(bundler, repo, source, bundlecaps=bundlecaps, b2caps=b2caps,
1177 1178 **kwargs)
1178 1179
1179 1180 return util.chunkbuffer(bundler.getchunks())
1180 1181
1181 1182 @getbundle2partsgenerator('changegroup')
1182 1183 def _getbundlechangegrouppart(bundler, repo, source, bundlecaps=None,
1183 1184 b2caps=None, heads=None, common=None, **kwargs):
1184 1185 """add a changegroup part to the requested bundle"""
1185 1186 cg = None
1186 1187 if kwargs.get('cg', True):
1187 1188 # build changegroup bundle here.
1188 cg = changegroup.getchangegroup(repo, source, heads=heads,
1189 cg = changegroup.getchangegroupraw(repo, source, heads=heads,
1189 1190 common=common, bundlecaps=bundlecaps)
1190 1191
1191 1192 if cg:
1192 bundler.newpart('b2x:changegroup', data=cg.getchunks())
1193 bundler.newpart('b2x:changegroup', data=cg)
1193 1194
1194 1195 @getbundle2partsgenerator('listkeys')
1195 1196 def _getbundlelistkeysparts(bundler, repo, source, bundlecaps=None,
1196 1197 b2caps=None, **kwargs):
1197 1198 """add parts containing listkeys namespaces to the requested bundle"""
1198 1199 listkeys = kwargs.get('listkeys', ())
1199 1200 for namespace in listkeys:
1200 1201 part = bundler.newpart('b2x:listkeys')
1201 1202 part.addparam('namespace', namespace)
1202 1203 keys = repo.listkeys(namespace).items()
1203 1204 part.data = pushkey.encodekeys(keys)
1204 1205
1205 1206 @getbundle2partsgenerator('obsmarkers')
1206 1207 def _getbundleobsmarkerpart(bundler, repo, source, bundlecaps=None,
1207 1208 b2caps=None, heads=None, **kwargs):
1208 1209 """add an obsolescence markers part to the requested bundle"""
1209 1210 if kwargs.get('obsmarkers', False):
1210 1211 if heads is None:
1211 1212 heads = repo.heads()
1212 1213 subset = [c.node() for c in repo.set('::%ln', heads)]
1213 1214 markers = repo.obsstore.relevantmarkers(subset)
1214 1215 buildobsmarkerspart(bundler, markers)
1215 1216
1216 1217 def check_heads(repo, their_heads, context):
1217 1218 """check if the heads of a repo have been modified
1218 1219
1219 1220 Used by peer for unbundling.
1220 1221 """
1221 1222 heads = repo.heads()
1222 1223 heads_hash = util.sha1(''.join(sorted(heads))).digest()
1223 1224 if not (their_heads == ['force'] or their_heads == heads or
1224 1225 their_heads == ['hashed', heads_hash]):
1225 1226 # someone else committed/pushed/unbundled while we
1226 1227 # were transferring data
1227 1228 raise error.PushRaced('repository changed while %s - '
1228 1229 'please try again' % context)
1229 1230
1230 1231 def unbundle(repo, cg, heads, source, url):
1231 1232 """Apply a bundle to a repo.
1232 1233
1233 1234 this function makes sure the repo is locked during the application and have
1234 1235 mechanism to check that no push race occurred between the creation of the
1235 1236 bundle and its application.
1236 1237
1237 1238 If the push was raced as PushRaced exception is raised."""
1238 1239 r = 0
1239 1240 # need a transaction when processing a bundle2 stream
1240 1241 tr = None
1241 1242 lock = repo.lock()
1242 1243 try:
1243 1244 check_heads(repo, heads, 'uploading changes')
1244 1245 # push can proceed
1245 1246 if util.safehasattr(cg, 'params'):
1246 1247 try:
1247 1248 tr = repo.transaction('unbundle')
1248 1249 tr.hookargs['source'] = source
1249 1250 tr.hookargs['url'] = url
1250 1251 tr.hookargs['bundle2-exp'] = '1'
1251 1252 r = bundle2.processbundle(repo, cg, lambda: tr).reply
1252 1253 cl = repo.unfiltered().changelog
1253 1254 p = cl.writepending() and repo.root or ""
1254 1255 repo.hook('b2x-pretransactionclose', throw=True, pending=p,
1255 1256 **tr.hookargs)
1256 1257 tr.close()
1257 1258 hookargs = dict(tr.hookargs)
1258 1259 def runhooks():
1259 1260 repo.hook('b2x-transactionclose', **hookargs)
1260 1261 repo._afterlock(runhooks)
1261 1262 except Exception, exc:
1262 1263 exc.duringunbundle2 = True
1263 1264 raise
1264 1265 else:
1265 1266 r = changegroup.addchangegroup(repo, cg, source, url)
1266 1267 finally:
1267 1268 if tr is not None:
1268 1269 tr.release()
1269 1270 lock.release()
1270 1271 return r
General Comments 0
You need to be logged in to leave comments. Login now