##// END OF EJS Templates
bundle2: don't use seekable bundle2 parts by default (issue5691)...
Gregory Szorc -
r35113:da91e730 default
parent child Browse files
Show More
@@ -1,1620 +1,1625
1 1 # perf.py - performance test routines
2 2 '''helper extension to measure performance'''
3 3
4 4 # "historical portability" policy of perf.py:
5 5 #
6 6 # We have to do:
7 7 # - make perf.py "loadable" with as wide Mercurial version as possible
8 8 # This doesn't mean that perf commands work correctly with that Mercurial.
9 9 # BTW, perf.py itself has been available since 1.1 (or eb240755386d).
10 10 # - make historical perf command work correctly with as wide Mercurial
11 11 # version as possible
12 12 #
13 13 # We have to do, if possible with reasonable cost:
14 14 # - make recent perf command for historical feature work correctly
15 15 # with early Mercurial
16 16 #
17 17 # We don't have to do:
18 18 # - make perf command for recent feature work correctly with early
19 19 # Mercurial
20 20
21 21 from __future__ import absolute_import
22 22 import functools
23 23 import gc
24 24 import os
25 25 import random
26 26 import struct
27 27 import sys
28 28 import time
29 29 from mercurial import (
30 30 changegroup,
31 31 cmdutil,
32 32 commands,
33 33 copies,
34 34 error,
35 35 extensions,
36 36 mdiff,
37 37 merge,
38 38 revlog,
39 39 util,
40 40 )
41 41
42 42 # for "historical portability":
43 43 # try to import modules separately (in dict order), and ignore
44 44 # failure, because these aren't available with early Mercurial
45 45 try:
46 46 from mercurial import branchmap # since 2.5 (or bcee63733aad)
47 47 except ImportError:
48 48 pass
49 49 try:
50 50 from mercurial import obsolete # since 2.3 (or ad0d6c2b3279)
51 51 except ImportError:
52 52 pass
53 53 try:
54 54 from mercurial import registrar # since 3.7 (or 37d50250b696)
55 55 dir(registrar) # forcibly load it
56 56 except ImportError:
57 57 registrar = None
58 58 try:
59 59 from mercurial import repoview # since 2.5 (or 3a6ddacb7198)
60 60 except ImportError:
61 61 pass
62 62 try:
63 63 from mercurial import scmutil # since 1.9 (or 8b252e826c68)
64 64 except ImportError:
65 65 pass
66 66
67 67 # for "historical portability":
68 68 # define util.safehasattr forcibly, because util.safehasattr has been
69 69 # available since 1.9.3 (or 94b200a11cf7)
70 70 _undefined = object()
71 71 def safehasattr(thing, attr):
72 72 return getattr(thing, attr, _undefined) is not _undefined
73 73 setattr(util, 'safehasattr', safehasattr)
74 74
75 75 # for "historical portability":
76 76 # define util.timer forcibly, because util.timer has been available
77 77 # since ae5d60bb70c9
78 78 if safehasattr(time, 'perf_counter'):
79 79 util.timer = time.perf_counter
80 80 elif os.name == 'nt':
81 81 util.timer = time.clock
82 82 else:
83 83 util.timer = time.time
84 84
85 85 # for "historical portability":
86 86 # use locally defined empty option list, if formatteropts isn't
87 87 # available, because commands.formatteropts has been available since
88 88 # 3.2 (or 7a7eed5176a4), even though formatting itself has been
89 89 # available since 2.2 (or ae5f92e154d3)
90 90 formatteropts = getattr(cmdutil, "formatteropts",
91 91 getattr(commands, "formatteropts", []))
92 92
93 93 # for "historical portability":
94 94 # use locally defined option list, if debugrevlogopts isn't available,
95 95 # because commands.debugrevlogopts has been available since 3.7 (or
96 96 # 5606f7d0d063), even though cmdutil.openrevlog() has been available
97 97 # since 1.9 (or a79fea6b3e77).
98 98 revlogopts = getattr(cmdutil, "debugrevlogopts",
99 99 getattr(commands, "debugrevlogopts", [
100 100 ('c', 'changelog', False, ('open changelog')),
101 101 ('m', 'manifest', False, ('open manifest')),
102 102 ('', 'dir', False, ('open directory manifest')),
103 103 ]))
104 104
105 105 cmdtable = {}
106 106
107 107 # for "historical portability":
108 108 # define parsealiases locally, because cmdutil.parsealiases has been
109 109 # available since 1.5 (or 6252852b4332)
110 110 def parsealiases(cmd):
111 111 return cmd.lstrip("^").split("|")
112 112
113 113 if safehasattr(registrar, 'command'):
114 114 command = registrar.command(cmdtable)
115 115 elif safehasattr(cmdutil, 'command'):
116 116 import inspect
117 117 command = cmdutil.command(cmdtable)
118 118 if 'norepo' not in inspect.getargspec(command)[0]:
119 119 # for "historical portability":
120 120 # wrap original cmdutil.command, because "norepo" option has
121 121 # been available since 3.1 (or 75a96326cecb)
122 122 _command = command
123 123 def command(name, options=(), synopsis=None, norepo=False):
124 124 if norepo:
125 125 commands.norepo += ' %s' % ' '.join(parsealiases(name))
126 126 return _command(name, list(options), synopsis)
127 127 else:
128 128 # for "historical portability":
129 129 # define "@command" annotation locally, because cmdutil.command
130 130 # has been available since 1.9 (or 2daa5179e73f)
131 131 def command(name, options=(), synopsis=None, norepo=False):
132 132 def decorator(func):
133 133 if synopsis:
134 134 cmdtable[name] = func, list(options), synopsis
135 135 else:
136 136 cmdtable[name] = func, list(options)
137 137 if norepo:
138 138 commands.norepo += ' %s' % ' '.join(parsealiases(name))
139 139 return func
140 140 return decorator
141 141
142 142 try:
143 143 import mercurial.registrar
144 144 import mercurial.configitems
145 145 configtable = {}
146 146 configitem = mercurial.registrar.configitem(configtable)
147 147 configitem('perf', 'presleep',
148 148 default=mercurial.configitems.dynamicdefault,
149 149 )
150 150 configitem('perf', 'stub',
151 151 default=mercurial.configitems.dynamicdefault,
152 152 )
153 153 configitem('perf', 'parentscount',
154 154 default=mercurial.configitems.dynamicdefault,
155 155 )
156 156 except (ImportError, AttributeError):
157 157 pass
158 158
159 159 def getlen(ui):
160 160 if ui.configbool("perf", "stub", False):
161 161 return lambda x: 1
162 162 return len
163 163
164 164 def gettimer(ui, opts=None):
165 165 """return a timer function and formatter: (timer, formatter)
166 166
167 167 This function exists to gather the creation of formatter in a single
168 168 place instead of duplicating it in all performance commands."""
169 169
170 170 # enforce an idle period before execution to counteract power management
171 171 # experimental config: perf.presleep
172 172 time.sleep(getint(ui, "perf", "presleep", 1))
173 173
174 174 if opts is None:
175 175 opts = {}
176 176 # redirect all to stderr unless buffer api is in use
177 177 if not ui._buffers:
178 178 ui = ui.copy()
179 179 uifout = safeattrsetter(ui, 'fout', ignoremissing=True)
180 180 if uifout:
181 181 # for "historical portability":
182 182 # ui.fout/ferr have been available since 1.9 (or 4e1ccd4c2b6d)
183 183 uifout.set(ui.ferr)
184 184
185 185 # get a formatter
186 186 uiformatter = getattr(ui, 'formatter', None)
187 187 if uiformatter:
188 188 fm = uiformatter('perf', opts)
189 189 else:
190 190 # for "historical portability":
191 191 # define formatter locally, because ui.formatter has been
192 192 # available since 2.2 (or ae5f92e154d3)
193 193 from mercurial import node
194 194 class defaultformatter(object):
195 195 """Minimized composition of baseformatter and plainformatter
196 196 """
197 197 def __init__(self, ui, topic, opts):
198 198 self._ui = ui
199 199 if ui.debugflag:
200 200 self.hexfunc = node.hex
201 201 else:
202 202 self.hexfunc = node.short
203 203 def __nonzero__(self):
204 204 return False
205 205 __bool__ = __nonzero__
206 206 def startitem(self):
207 207 pass
208 208 def data(self, **data):
209 209 pass
210 210 def write(self, fields, deftext, *fielddata, **opts):
211 211 self._ui.write(deftext % fielddata, **opts)
212 212 def condwrite(self, cond, fields, deftext, *fielddata, **opts):
213 213 if cond:
214 214 self._ui.write(deftext % fielddata, **opts)
215 215 def plain(self, text, **opts):
216 216 self._ui.write(text, **opts)
217 217 def end(self):
218 218 pass
219 219 fm = defaultformatter(ui, 'perf', opts)
220 220
221 221 # stub function, runs code only once instead of in a loop
222 222 # experimental config: perf.stub
223 223 if ui.configbool("perf", "stub", False):
224 224 return functools.partial(stub_timer, fm), fm
225 225 return functools.partial(_timer, fm), fm
226 226
227 227 def stub_timer(fm, func, title=None):
228 228 func()
229 229
230 230 def _timer(fm, func, title=None):
231 231 gc.collect()
232 232 results = []
233 233 begin = util.timer()
234 234 count = 0
235 235 while True:
236 236 ostart = os.times()
237 237 cstart = util.timer()
238 238 r = func()
239 239 cstop = util.timer()
240 240 ostop = os.times()
241 241 count += 1
242 242 a, b = ostart, ostop
243 243 results.append((cstop - cstart, b[0] - a[0], b[1]-a[1]))
244 244 if cstop - begin > 3 and count >= 100:
245 245 break
246 246 if cstop - begin > 10 and count >= 3:
247 247 break
248 248
249 249 fm.startitem()
250 250
251 251 if title:
252 252 fm.write('title', '! %s\n', title)
253 253 if r:
254 254 fm.write('result', '! result: %s\n', r)
255 255 m = min(results)
256 256 fm.plain('!')
257 257 fm.write('wall', ' wall %f', m[0])
258 258 fm.write('comb', ' comb %f', m[1] + m[2])
259 259 fm.write('user', ' user %f', m[1])
260 260 fm.write('sys', ' sys %f', m[2])
261 261 fm.write('count', ' (best of %d)', count)
262 262 fm.plain('\n')
263 263
264 264 # utilities for historical portability
265 265
266 266 def getint(ui, section, name, default):
267 267 # for "historical portability":
268 268 # ui.configint has been available since 1.9 (or fa2b596db182)
269 269 v = ui.config(section, name, None)
270 270 if v is None:
271 271 return default
272 272 try:
273 273 return int(v)
274 274 except ValueError:
275 275 raise error.ConfigError(("%s.%s is not an integer ('%s')")
276 276 % (section, name, v))
277 277
278 278 def safeattrsetter(obj, name, ignoremissing=False):
279 279 """Ensure that 'obj' has 'name' attribute before subsequent setattr
280 280
281 281 This function is aborted, if 'obj' doesn't have 'name' attribute
282 282 at runtime. This avoids overlooking removal of an attribute, which
283 283 breaks assumption of performance measurement, in the future.
284 284
285 285 This function returns the object to (1) assign a new value, and
286 286 (2) restore an original value to the attribute.
287 287
288 288 If 'ignoremissing' is true, missing 'name' attribute doesn't cause
289 289 abortion, and this function returns None. This is useful to
290 290 examine an attribute, which isn't ensured in all Mercurial
291 291 versions.
292 292 """
293 293 if not util.safehasattr(obj, name):
294 294 if ignoremissing:
295 295 return None
296 296 raise error.Abort(("missing attribute %s of %s might break assumption"
297 297 " of performance measurement") % (name, obj))
298 298
299 299 origvalue = getattr(obj, name)
300 300 class attrutil(object):
301 301 def set(self, newvalue):
302 302 setattr(obj, name, newvalue)
303 303 def restore(self):
304 304 setattr(obj, name, origvalue)
305 305
306 306 return attrutil()
307 307
308 308 # utilities to examine each internal API changes
309 309
310 310 def getbranchmapsubsettable():
311 311 # for "historical portability":
312 312 # subsettable is defined in:
313 313 # - branchmap since 2.9 (or 175c6fd8cacc)
314 314 # - repoview since 2.5 (or 59a9f18d4587)
315 315 for mod in (branchmap, repoview):
316 316 subsettable = getattr(mod, 'subsettable', None)
317 317 if subsettable:
318 318 return subsettable
319 319
320 320 # bisecting in bcee63733aad::59a9f18d4587 can reach here (both
321 321 # branchmap and repoview modules exist, but subsettable attribute
322 322 # doesn't)
323 323 raise error.Abort(("perfbranchmap not available with this Mercurial"),
324 324 hint="use 2.5 or later")
325 325
326 326 def getsvfs(repo):
327 327 """Return appropriate object to access files under .hg/store
328 328 """
329 329 # for "historical portability":
330 330 # repo.svfs has been available since 2.3 (or 7034365089bf)
331 331 svfs = getattr(repo, 'svfs', None)
332 332 if svfs:
333 333 return svfs
334 334 else:
335 335 return getattr(repo, 'sopener')
336 336
337 337 def getvfs(repo):
338 338 """Return appropriate object to access files under .hg
339 339 """
340 340 # for "historical portability":
341 341 # repo.vfs has been available since 2.3 (or 7034365089bf)
342 342 vfs = getattr(repo, 'vfs', None)
343 343 if vfs:
344 344 return vfs
345 345 else:
346 346 return getattr(repo, 'opener')
347 347
348 348 def repocleartagscachefunc(repo):
349 349 """Return the function to clear tags cache according to repo internal API
350 350 """
351 351 if util.safehasattr(repo, '_tagscache'): # since 2.0 (or 9dca7653b525)
352 352 # in this case, setattr(repo, '_tagscache', None) or so isn't
353 353 # correct way to clear tags cache, because existing code paths
354 354 # expect _tagscache to be a structured object.
355 355 def clearcache():
356 356 # _tagscache has been filteredpropertycache since 2.5 (or
357 357 # 98c867ac1330), and delattr() can't work in such case
358 358 if '_tagscache' in vars(repo):
359 359 del repo.__dict__['_tagscache']
360 360 return clearcache
361 361
362 362 repotags = safeattrsetter(repo, '_tags', ignoremissing=True)
363 363 if repotags: # since 1.4 (or 5614a628d173)
364 364 return lambda : repotags.set(None)
365 365
366 366 repotagscache = safeattrsetter(repo, 'tagscache', ignoremissing=True)
367 367 if repotagscache: # since 0.6 (or d7df759d0e97)
368 368 return lambda : repotagscache.set(None)
369 369
370 370 # Mercurial earlier than 0.6 (or d7df759d0e97) logically reaches
371 371 # this point, but it isn't so problematic, because:
372 372 # - repo.tags of such Mercurial isn't "callable", and repo.tags()
373 373 # in perftags() causes failure soon
374 374 # - perf.py itself has been available since 1.1 (or eb240755386d)
375 375 raise error.Abort(("tags API of this hg command is unknown"))
376 376
377 377 # utilities to clear cache
378 378
379 379 def clearfilecache(repo, attrname):
380 380 unfi = repo.unfiltered()
381 381 if attrname in vars(unfi):
382 382 delattr(unfi, attrname)
383 383 unfi._filecache.pop(attrname, None)
384 384
385 385 # perf commands
386 386
387 387 @command('perfwalk', formatteropts)
388 388 def perfwalk(ui, repo, *pats, **opts):
389 389 timer, fm = gettimer(ui, opts)
390 390 m = scmutil.match(repo[None], pats, {})
391 391 timer(lambda: len(list(repo.dirstate.walk(m, subrepos=[], unknown=True,
392 392 ignored=False))))
393 393 fm.end()
394 394
395 395 @command('perfannotate', formatteropts)
396 396 def perfannotate(ui, repo, f, **opts):
397 397 timer, fm = gettimer(ui, opts)
398 398 fc = repo['.'][f]
399 399 timer(lambda: len(fc.annotate(True)))
400 400 fm.end()
401 401
402 402 @command('perfstatus',
403 403 [('u', 'unknown', False,
404 404 'ask status to look for unknown files')] + formatteropts)
405 405 def perfstatus(ui, repo, **opts):
406 406 #m = match.always(repo.root, repo.getcwd())
407 407 #timer(lambda: sum(map(len, repo.dirstate.status(m, [], False, False,
408 408 # False))))
409 409 timer, fm = gettimer(ui, opts)
410 410 timer(lambda: sum(map(len, repo.status(unknown=opts['unknown']))))
411 411 fm.end()
412 412
413 413 @command('perfaddremove', formatteropts)
414 414 def perfaddremove(ui, repo, **opts):
415 415 timer, fm = gettimer(ui, opts)
416 416 try:
417 417 oldquiet = repo.ui.quiet
418 418 repo.ui.quiet = True
419 419 matcher = scmutil.match(repo[None])
420 420 timer(lambda: scmutil.addremove(repo, matcher, "", dry_run=True))
421 421 finally:
422 422 repo.ui.quiet = oldquiet
423 423 fm.end()
424 424
425 425 def clearcaches(cl):
426 426 # behave somewhat consistently across internal API changes
427 427 if util.safehasattr(cl, 'clearcaches'):
428 428 cl.clearcaches()
429 429 elif util.safehasattr(cl, '_nodecache'):
430 430 from mercurial.node import nullid, nullrev
431 431 cl._nodecache = {nullid: nullrev}
432 432 cl._nodepos = None
433 433
434 434 @command('perfheads', formatteropts)
435 435 def perfheads(ui, repo, **opts):
436 436 timer, fm = gettimer(ui, opts)
437 437 cl = repo.changelog
438 438 def d():
439 439 len(cl.headrevs())
440 440 clearcaches(cl)
441 441 timer(d)
442 442 fm.end()
443 443
444 444 @command('perftags', formatteropts)
445 445 def perftags(ui, repo, **opts):
446 446 import mercurial.changelog
447 447 import mercurial.manifest
448 448 timer, fm = gettimer(ui, opts)
449 449 svfs = getsvfs(repo)
450 450 repocleartagscache = repocleartagscachefunc(repo)
451 451 def t():
452 452 repo.changelog = mercurial.changelog.changelog(svfs)
453 453 repo.manifestlog = mercurial.manifest.manifestlog(svfs, repo)
454 454 repocleartagscache()
455 455 return len(repo.tags())
456 456 timer(t)
457 457 fm.end()
458 458
459 459 @command('perfancestors', formatteropts)
460 460 def perfancestors(ui, repo, **opts):
461 461 timer, fm = gettimer(ui, opts)
462 462 heads = repo.changelog.headrevs()
463 463 def d():
464 464 for a in repo.changelog.ancestors(heads):
465 465 pass
466 466 timer(d)
467 467 fm.end()
468 468
469 469 @command('perfancestorset', formatteropts)
470 470 def perfancestorset(ui, repo, revset, **opts):
471 471 timer, fm = gettimer(ui, opts)
472 472 revs = repo.revs(revset)
473 473 heads = repo.changelog.headrevs()
474 474 def d():
475 475 s = repo.changelog.ancestors(heads)
476 476 for rev in revs:
477 477 rev in s
478 478 timer(d)
479 479 fm.end()
480 480
481 481 @command('perfbookmarks', formatteropts)
482 482 def perfbookmarks(ui, repo, **opts):
483 483 """benchmark parsing bookmarks from disk to memory"""
484 484 timer, fm = gettimer(ui, opts)
485 485 def d():
486 486 clearfilecache(repo, '_bookmarks')
487 487 repo._bookmarks
488 488 timer(d)
489 489 fm.end()
490 490
491 491 @command('perfbundleread', formatteropts, 'BUNDLE')
492 492 def perfbundleread(ui, repo, bundlepath, **opts):
493 493 """Benchmark reading of bundle files.
494 494
495 495 This command is meant to isolate the I/O part of bundle reading as
496 496 much as possible.
497 497 """
498 498 from mercurial import (
499 499 bundle2,
500 500 exchange,
501 501 streamclone,
502 502 )
503 503
504 504 def makebench(fn):
505 505 def run():
506 506 with open(bundlepath, 'rb') as fh:
507 507 bundle = exchange.readbundle(ui, fh, bundlepath)
508 508 fn(bundle)
509 509
510 510 return run
511 511
512 512 def makereadnbytes(size):
513 513 def run():
514 514 with open(bundlepath, 'rb') as fh:
515 515 bundle = exchange.readbundle(ui, fh, bundlepath)
516 516 while bundle.read(size):
517 517 pass
518 518
519 519 return run
520 520
521 521 def makestdioread(size):
522 522 def run():
523 523 with open(bundlepath, 'rb') as fh:
524 524 while fh.read(size):
525 525 pass
526 526
527 527 return run
528 528
529 529 # bundle1
530 530
531 531 def deltaiter(bundle):
532 532 for delta in bundle.deltaiter():
533 533 pass
534 534
535 535 def iterchunks(bundle):
536 536 for chunk in bundle.getchunks():
537 537 pass
538 538
539 539 # bundle2
540 540
541 541 def forwardchunks(bundle):
542 542 for chunk in bundle._forwardchunks():
543 543 pass
544 544
545 545 def iterparts(bundle):
546 546 for part in bundle.iterparts():
547 547 pass
548 548
549 def iterpartsseekable(bundle):
550 for part in bundle.iterparts(seekable=True):
551 pass
552
549 553 def seek(bundle):
550 for part in bundle.iterparts():
554 for part in bundle.iterparts(seekable=True):
551 555 part.seek(0, os.SEEK_END)
552 556
553 557 def makepartreadnbytes(size):
554 558 def run():
555 559 with open(bundlepath, 'rb') as fh:
556 560 bundle = exchange.readbundle(ui, fh, bundlepath)
557 561 for part in bundle.iterparts():
558 562 while part.read(size):
559 563 pass
560 564
561 565 return run
562 566
563 567 benches = [
564 568 (makestdioread(8192), 'read(8k)'),
565 569 (makestdioread(16384), 'read(16k)'),
566 570 (makestdioread(32768), 'read(32k)'),
567 571 (makestdioread(131072), 'read(128k)'),
568 572 ]
569 573
570 574 with open(bundlepath, 'rb') as fh:
571 575 bundle = exchange.readbundle(ui, fh, bundlepath)
572 576
573 577 if isinstance(bundle, changegroup.cg1unpacker):
574 578 benches.extend([
575 579 (makebench(deltaiter), 'cg1 deltaiter()'),
576 580 (makebench(iterchunks), 'cg1 getchunks()'),
577 581 (makereadnbytes(8192), 'cg1 read(8k)'),
578 582 (makereadnbytes(16384), 'cg1 read(16k)'),
579 583 (makereadnbytes(32768), 'cg1 read(32k)'),
580 584 (makereadnbytes(131072), 'cg1 read(128k)'),
581 585 ])
582 586 elif isinstance(bundle, bundle2.unbundle20):
583 587 benches.extend([
584 588 (makebench(forwardchunks), 'bundle2 forwardchunks()'),
585 589 (makebench(iterparts), 'bundle2 iterparts()'),
590 (makebench(iterpartsseekable), 'bundle2 iterparts() seekable'),
586 591 (makebench(seek), 'bundle2 part seek()'),
587 592 (makepartreadnbytes(8192), 'bundle2 part read(8k)'),
588 593 (makepartreadnbytes(16384), 'bundle2 part read(16k)'),
589 594 (makepartreadnbytes(32768), 'bundle2 part read(32k)'),
590 595 (makepartreadnbytes(131072), 'bundle2 part read(128k)'),
591 596 ])
592 597 elif isinstance(bundle, streamclone.streamcloneapplier):
593 598 raise error.Abort('stream clone bundles not supported')
594 599 else:
595 600 raise error.Abort('unhandled bundle type: %s' % type(bundle))
596 601
597 602 for fn, title in benches:
598 603 timer, fm = gettimer(ui, opts)
599 604 timer(fn, title=title)
600 605 fm.end()
601 606
602 607 @command('perfchangegroupchangelog', formatteropts +
603 608 [('', 'version', '02', 'changegroup version'),
604 609 ('r', 'rev', '', 'revisions to add to changegroup')])
605 610 def perfchangegroupchangelog(ui, repo, version='02', rev=None, **opts):
606 611 """Benchmark producing a changelog group for a changegroup.
607 612
608 613 This measures the time spent processing the changelog during a
609 614 bundle operation. This occurs during `hg bundle` and on a server
610 615 processing a `getbundle` wire protocol request (handles clones
611 616 and pull requests).
612 617
613 618 By default, all revisions are added to the changegroup.
614 619 """
615 620 cl = repo.changelog
616 621 revs = [cl.lookup(r) for r in repo.revs(rev or 'all()')]
617 622 bundler = changegroup.getbundler(version, repo)
618 623
619 624 def lookup(node):
620 625 # The real bundler reads the revision in order to access the
621 626 # manifest node and files list. Do that here.
622 627 cl.read(node)
623 628 return node
624 629
625 630 def d():
626 631 for chunk in bundler.group(revs, cl, lookup):
627 632 pass
628 633
629 634 timer, fm = gettimer(ui, opts)
630 635 timer(d)
631 636 fm.end()
632 637
633 638 @command('perfdirs', formatteropts)
634 639 def perfdirs(ui, repo, **opts):
635 640 timer, fm = gettimer(ui, opts)
636 641 dirstate = repo.dirstate
637 642 'a' in dirstate
638 643 def d():
639 644 dirstate.hasdir('a')
640 645 del dirstate._map._dirs
641 646 timer(d)
642 647 fm.end()
643 648
644 649 @command('perfdirstate', formatteropts)
645 650 def perfdirstate(ui, repo, **opts):
646 651 timer, fm = gettimer(ui, opts)
647 652 "a" in repo.dirstate
648 653 def d():
649 654 repo.dirstate.invalidate()
650 655 "a" in repo.dirstate
651 656 timer(d)
652 657 fm.end()
653 658
654 659 @command('perfdirstatedirs', formatteropts)
655 660 def perfdirstatedirs(ui, repo, **opts):
656 661 timer, fm = gettimer(ui, opts)
657 662 "a" in repo.dirstate
658 663 def d():
659 664 repo.dirstate.hasdir("a")
660 665 del repo.dirstate._map._dirs
661 666 timer(d)
662 667 fm.end()
663 668
664 669 @command('perfdirstatefoldmap', formatteropts)
665 670 def perfdirstatefoldmap(ui, repo, **opts):
666 671 timer, fm = gettimer(ui, opts)
667 672 dirstate = repo.dirstate
668 673 'a' in dirstate
669 674 def d():
670 675 dirstate._map.filefoldmap.get('a')
671 676 del dirstate._map.filefoldmap
672 677 timer(d)
673 678 fm.end()
674 679
675 680 @command('perfdirfoldmap', formatteropts)
676 681 def perfdirfoldmap(ui, repo, **opts):
677 682 timer, fm = gettimer(ui, opts)
678 683 dirstate = repo.dirstate
679 684 'a' in dirstate
680 685 def d():
681 686 dirstate._map.dirfoldmap.get('a')
682 687 del dirstate._map.dirfoldmap
683 688 del dirstate._map._dirs
684 689 timer(d)
685 690 fm.end()
686 691
687 692 @command('perfdirstatewrite', formatteropts)
688 693 def perfdirstatewrite(ui, repo, **opts):
689 694 timer, fm = gettimer(ui, opts)
690 695 ds = repo.dirstate
691 696 "a" in ds
692 697 def d():
693 698 ds._dirty = True
694 699 ds.write(repo.currenttransaction())
695 700 timer(d)
696 701 fm.end()
697 702
698 703 @command('perfmergecalculate',
699 704 [('r', 'rev', '.', 'rev to merge against')] + formatteropts)
700 705 def perfmergecalculate(ui, repo, rev, **opts):
701 706 timer, fm = gettimer(ui, opts)
702 707 wctx = repo[None]
703 708 rctx = scmutil.revsingle(repo, rev, rev)
704 709 ancestor = wctx.ancestor(rctx)
705 710 # we don't want working dir files to be stat'd in the benchmark, so prime
706 711 # that cache
707 712 wctx.dirty()
708 713 def d():
709 714 # acceptremote is True because we don't want prompts in the middle of
710 715 # our benchmark
711 716 merge.calculateupdates(repo, wctx, rctx, [ancestor], False, False,
712 717 acceptremote=True, followcopies=True)
713 718 timer(d)
714 719 fm.end()
715 720
716 721 @command('perfpathcopies', [], "REV REV")
717 722 def perfpathcopies(ui, repo, rev1, rev2, **opts):
718 723 timer, fm = gettimer(ui, opts)
719 724 ctx1 = scmutil.revsingle(repo, rev1, rev1)
720 725 ctx2 = scmutil.revsingle(repo, rev2, rev2)
721 726 def d():
722 727 copies.pathcopies(ctx1, ctx2)
723 728 timer(d)
724 729 fm.end()
725 730
726 731 @command('perfphases',
727 732 [('', 'full', False, 'include file reading time too'),
728 733 ], "")
729 734 def perfphases(ui, repo, **opts):
730 735 """benchmark phasesets computation"""
731 736 timer, fm = gettimer(ui, opts)
732 737 _phases = repo._phasecache
733 738 full = opts.get('full')
734 739 def d():
735 740 phases = _phases
736 741 if full:
737 742 clearfilecache(repo, '_phasecache')
738 743 phases = repo._phasecache
739 744 phases.invalidate()
740 745 phases.loadphaserevs(repo)
741 746 timer(d)
742 747 fm.end()
743 748
744 749 @command('perfmanifest', [], 'REV')
745 750 def perfmanifest(ui, repo, rev, **opts):
746 751 timer, fm = gettimer(ui, opts)
747 752 ctx = scmutil.revsingle(repo, rev, rev)
748 753 t = ctx.manifestnode()
749 754 def d():
750 755 repo.manifestlog.clearcaches()
751 756 repo.manifestlog[t].read()
752 757 timer(d)
753 758 fm.end()
754 759
755 760 @command('perfchangeset', formatteropts)
756 761 def perfchangeset(ui, repo, rev, **opts):
757 762 timer, fm = gettimer(ui, opts)
758 763 n = repo[rev].node()
759 764 def d():
760 765 repo.changelog.read(n)
761 766 #repo.changelog._cache = None
762 767 timer(d)
763 768 fm.end()
764 769
765 770 @command('perfindex', formatteropts)
766 771 def perfindex(ui, repo, **opts):
767 772 import mercurial.revlog
768 773 timer, fm = gettimer(ui, opts)
769 774 mercurial.revlog._prereadsize = 2**24 # disable lazy parser in old hg
770 775 n = repo["tip"].node()
771 776 svfs = getsvfs(repo)
772 777 def d():
773 778 cl = mercurial.revlog.revlog(svfs, "00changelog.i")
774 779 cl.rev(n)
775 780 timer(d)
776 781 fm.end()
777 782
778 783 @command('perfstartup', formatteropts)
779 784 def perfstartup(ui, repo, **opts):
780 785 timer, fm = gettimer(ui, opts)
781 786 cmd = sys.argv[0]
782 787 def d():
783 788 if os.name != 'nt':
784 789 os.system("HGRCPATH= %s version -q > /dev/null" % cmd)
785 790 else:
786 791 os.environ['HGRCPATH'] = ' '
787 792 os.system("%s version -q > NUL" % cmd)
788 793 timer(d)
789 794 fm.end()
790 795
791 796 @command('perfparents', formatteropts)
792 797 def perfparents(ui, repo, **opts):
793 798 timer, fm = gettimer(ui, opts)
794 799 # control the number of commits perfparents iterates over
795 800 # experimental config: perf.parentscount
796 801 count = getint(ui, "perf", "parentscount", 1000)
797 802 if len(repo.changelog) < count:
798 803 raise error.Abort("repo needs %d commits for this test" % count)
799 804 repo = repo.unfiltered()
800 805 nl = [repo.changelog.node(i) for i in xrange(count)]
801 806 def d():
802 807 for n in nl:
803 808 repo.changelog.parents(n)
804 809 timer(d)
805 810 fm.end()
806 811
807 812 @command('perfctxfiles', formatteropts)
808 813 def perfctxfiles(ui, repo, x, **opts):
809 814 x = int(x)
810 815 timer, fm = gettimer(ui, opts)
811 816 def d():
812 817 len(repo[x].files())
813 818 timer(d)
814 819 fm.end()
815 820
816 821 @command('perfrawfiles', formatteropts)
817 822 def perfrawfiles(ui, repo, x, **opts):
818 823 x = int(x)
819 824 timer, fm = gettimer(ui, opts)
820 825 cl = repo.changelog
821 826 def d():
822 827 len(cl.read(x)[3])
823 828 timer(d)
824 829 fm.end()
825 830
826 831 @command('perflookup', formatteropts)
827 832 def perflookup(ui, repo, rev, **opts):
828 833 timer, fm = gettimer(ui, opts)
829 834 timer(lambda: len(repo.lookup(rev)))
830 835 fm.end()
831 836
832 837 @command('perfrevrange', formatteropts)
833 838 def perfrevrange(ui, repo, *specs, **opts):
834 839 timer, fm = gettimer(ui, opts)
835 840 revrange = scmutil.revrange
836 841 timer(lambda: len(revrange(repo, specs)))
837 842 fm.end()
838 843
839 844 @command('perfnodelookup', formatteropts)
840 845 def perfnodelookup(ui, repo, rev, **opts):
841 846 timer, fm = gettimer(ui, opts)
842 847 import mercurial.revlog
843 848 mercurial.revlog._prereadsize = 2**24 # disable lazy parser in old hg
844 849 n = repo[rev].node()
845 850 cl = mercurial.revlog.revlog(getsvfs(repo), "00changelog.i")
846 851 def d():
847 852 cl.rev(n)
848 853 clearcaches(cl)
849 854 timer(d)
850 855 fm.end()
851 856
852 857 @command('perflog',
853 858 [('', 'rename', False, 'ask log to follow renames')] + formatteropts)
854 859 def perflog(ui, repo, rev=None, **opts):
855 860 if rev is None:
856 861 rev=[]
857 862 timer, fm = gettimer(ui, opts)
858 863 ui.pushbuffer()
859 864 timer(lambda: commands.log(ui, repo, rev=rev, date='', user='',
860 865 copies=opts.get('rename')))
861 866 ui.popbuffer()
862 867 fm.end()
863 868
864 869 @command('perfmoonwalk', formatteropts)
865 870 def perfmoonwalk(ui, repo, **opts):
866 871 """benchmark walking the changelog backwards
867 872
868 873 This also loads the changelog data for each revision in the changelog.
869 874 """
870 875 timer, fm = gettimer(ui, opts)
871 876 def moonwalk():
872 877 for i in xrange(len(repo), -1, -1):
873 878 ctx = repo[i]
874 879 ctx.branch() # read changelog data (in addition to the index)
875 880 timer(moonwalk)
876 881 fm.end()
877 882
878 883 @command('perftemplating', formatteropts)
879 884 def perftemplating(ui, repo, rev=None, **opts):
880 885 if rev is None:
881 886 rev=[]
882 887 timer, fm = gettimer(ui, opts)
883 888 ui.pushbuffer()
884 889 timer(lambda: commands.log(ui, repo, rev=rev, date='', user='',
885 890 template='{date|shortdate} [{rev}:{node|short}]'
886 891 ' {author|person}: {desc|firstline}\n'))
887 892 ui.popbuffer()
888 893 fm.end()
889 894
890 895 @command('perfcca', formatteropts)
891 896 def perfcca(ui, repo, **opts):
892 897 timer, fm = gettimer(ui, opts)
893 898 timer(lambda: scmutil.casecollisionauditor(ui, False, repo.dirstate))
894 899 fm.end()
895 900
896 901 @command('perffncacheload', formatteropts)
897 902 def perffncacheload(ui, repo, **opts):
898 903 timer, fm = gettimer(ui, opts)
899 904 s = repo.store
900 905 def d():
901 906 s.fncache._load()
902 907 timer(d)
903 908 fm.end()
904 909
905 910 @command('perffncachewrite', formatteropts)
906 911 def perffncachewrite(ui, repo, **opts):
907 912 timer, fm = gettimer(ui, opts)
908 913 s = repo.store
909 914 s.fncache._load()
910 915 lock = repo.lock()
911 916 tr = repo.transaction('perffncachewrite')
912 917 def d():
913 918 s.fncache._dirty = True
914 919 s.fncache.write(tr)
915 920 timer(d)
916 921 tr.close()
917 922 lock.release()
918 923 fm.end()
919 924
920 925 @command('perffncacheencode', formatteropts)
921 926 def perffncacheencode(ui, repo, **opts):
922 927 timer, fm = gettimer(ui, opts)
923 928 s = repo.store
924 929 s.fncache._load()
925 930 def d():
926 931 for p in s.fncache.entries:
927 932 s.encode(p)
928 933 timer(d)
929 934 fm.end()
930 935
931 936 @command('perfbdiff', revlogopts + formatteropts + [
932 937 ('', 'count', 1, 'number of revisions to test (when using --startrev)'),
933 938 ('', 'alldata', False, 'test bdiffs for all associated revisions')],
934 939 '-c|-m|FILE REV')
935 940 def perfbdiff(ui, repo, file_, rev=None, count=None, **opts):
936 941 """benchmark a bdiff between revisions
937 942
938 943 By default, benchmark a bdiff between its delta parent and itself.
939 944
940 945 With ``--count``, benchmark bdiffs between delta parents and self for N
941 946 revisions starting at the specified revision.
942 947
943 948 With ``--alldata``, assume the requested revision is a changeset and
944 949 measure bdiffs for all changes related to that changeset (manifest
945 950 and filelogs).
946 951 """
947 952 if opts['alldata']:
948 953 opts['changelog'] = True
949 954
950 955 if opts.get('changelog') or opts.get('manifest'):
951 956 file_, rev = None, file_
952 957 elif rev is None:
953 958 raise error.CommandError('perfbdiff', 'invalid arguments')
954 959
955 960 textpairs = []
956 961
957 962 r = cmdutil.openrevlog(repo, 'perfbdiff', file_, opts)
958 963
959 964 startrev = r.rev(r.lookup(rev))
960 965 for rev in range(startrev, min(startrev + count, len(r) - 1)):
961 966 if opts['alldata']:
962 967 # Load revisions associated with changeset.
963 968 ctx = repo[rev]
964 969 mtext = repo.manifestlog._revlog.revision(ctx.manifestnode())
965 970 for pctx in ctx.parents():
966 971 pman = repo.manifestlog._revlog.revision(pctx.manifestnode())
967 972 textpairs.append((pman, mtext))
968 973
969 974 # Load filelog revisions by iterating manifest delta.
970 975 man = ctx.manifest()
971 976 pman = ctx.p1().manifest()
972 977 for filename, change in pman.diff(man).items():
973 978 fctx = repo.file(filename)
974 979 f1 = fctx.revision(change[0][0] or -1)
975 980 f2 = fctx.revision(change[1][0] or -1)
976 981 textpairs.append((f1, f2))
977 982 else:
978 983 dp = r.deltaparent(rev)
979 984 textpairs.append((r.revision(dp), r.revision(rev)))
980 985
981 986 def d():
982 987 for pair in textpairs:
983 988 mdiff.textdiff(*pair)
984 989
985 990 timer, fm = gettimer(ui, opts)
986 991 timer(d)
987 992 fm.end()
988 993
989 994 @command('perfdiffwd', formatteropts)
990 995 def perfdiffwd(ui, repo, **opts):
991 996 """Profile diff of working directory changes"""
992 997 timer, fm = gettimer(ui, opts)
993 998 options = {
994 999 'w': 'ignore_all_space',
995 1000 'b': 'ignore_space_change',
996 1001 'B': 'ignore_blank_lines',
997 1002 }
998 1003
999 1004 for diffopt in ('', 'w', 'b', 'B', 'wB'):
1000 1005 opts = dict((options[c], '1') for c in diffopt)
1001 1006 def d():
1002 1007 ui.pushbuffer()
1003 1008 commands.diff(ui, repo, **opts)
1004 1009 ui.popbuffer()
1005 1010 title = 'diffopts: %s' % (diffopt and ('-' + diffopt) or 'none')
1006 1011 timer(d, title)
1007 1012 fm.end()
1008 1013
1009 1014 @command('perfrevlogindex', revlogopts + formatteropts,
1010 1015 '-c|-m|FILE')
1011 1016 def perfrevlogindex(ui, repo, file_=None, **opts):
1012 1017 """Benchmark operations against a revlog index.
1013 1018
1014 1019 This tests constructing a revlog instance, reading index data,
1015 1020 parsing index data, and performing various operations related to
1016 1021 index data.
1017 1022 """
1018 1023
1019 1024 rl = cmdutil.openrevlog(repo, 'perfrevlogindex', file_, opts)
1020 1025
1021 1026 opener = getattr(rl, 'opener') # trick linter
1022 1027 indexfile = rl.indexfile
1023 1028 data = opener.read(indexfile)
1024 1029
1025 1030 header = struct.unpack('>I', data[0:4])[0]
1026 1031 version = header & 0xFFFF
1027 1032 if version == 1:
1028 1033 revlogio = revlog.revlogio()
1029 1034 inline = header & (1 << 16)
1030 1035 else:
1031 1036 raise error.Abort(('unsupported revlog version: %d') % version)
1032 1037
1033 1038 rllen = len(rl)
1034 1039
1035 1040 node0 = rl.node(0)
1036 1041 node25 = rl.node(rllen // 4)
1037 1042 node50 = rl.node(rllen // 2)
1038 1043 node75 = rl.node(rllen // 4 * 3)
1039 1044 node100 = rl.node(rllen - 1)
1040 1045
1041 1046 allrevs = range(rllen)
1042 1047 allrevsrev = list(reversed(allrevs))
1043 1048 allnodes = [rl.node(rev) for rev in range(rllen)]
1044 1049 allnodesrev = list(reversed(allnodes))
1045 1050
1046 1051 def constructor():
1047 1052 revlog.revlog(opener, indexfile)
1048 1053
1049 1054 def read():
1050 1055 with opener(indexfile) as fh:
1051 1056 fh.read()
1052 1057
1053 1058 def parseindex():
1054 1059 revlogio.parseindex(data, inline)
1055 1060
1056 1061 def getentry(revornode):
1057 1062 index = revlogio.parseindex(data, inline)[0]
1058 1063 index[revornode]
1059 1064
1060 1065 def getentries(revs, count=1):
1061 1066 index = revlogio.parseindex(data, inline)[0]
1062 1067
1063 1068 for i in range(count):
1064 1069 for rev in revs:
1065 1070 index[rev]
1066 1071
1067 1072 def resolvenode(node):
1068 1073 nodemap = revlogio.parseindex(data, inline)[1]
1069 1074 # This only works for the C code.
1070 1075 if nodemap is None:
1071 1076 return
1072 1077
1073 1078 try:
1074 1079 nodemap[node]
1075 1080 except error.RevlogError:
1076 1081 pass
1077 1082
1078 1083 def resolvenodes(nodes, count=1):
1079 1084 nodemap = revlogio.parseindex(data, inline)[1]
1080 1085 if nodemap is None:
1081 1086 return
1082 1087
1083 1088 for i in range(count):
1084 1089 for node in nodes:
1085 1090 try:
1086 1091 nodemap[node]
1087 1092 except error.RevlogError:
1088 1093 pass
1089 1094
1090 1095 benches = [
1091 1096 (constructor, 'revlog constructor'),
1092 1097 (read, 'read'),
1093 1098 (parseindex, 'create index object'),
1094 1099 (lambda: getentry(0), 'retrieve index entry for rev 0'),
1095 1100 (lambda: resolvenode('a' * 20), 'look up missing node'),
1096 1101 (lambda: resolvenode(node0), 'look up node at rev 0'),
1097 1102 (lambda: resolvenode(node25), 'look up node at 1/4 len'),
1098 1103 (lambda: resolvenode(node50), 'look up node at 1/2 len'),
1099 1104 (lambda: resolvenode(node75), 'look up node at 3/4 len'),
1100 1105 (lambda: resolvenode(node100), 'look up node at tip'),
1101 1106 # 2x variation is to measure caching impact.
1102 1107 (lambda: resolvenodes(allnodes),
1103 1108 'look up all nodes (forward)'),
1104 1109 (lambda: resolvenodes(allnodes, 2),
1105 1110 'look up all nodes 2x (forward)'),
1106 1111 (lambda: resolvenodes(allnodesrev),
1107 1112 'look up all nodes (reverse)'),
1108 1113 (lambda: resolvenodes(allnodesrev, 2),
1109 1114 'look up all nodes 2x (reverse)'),
1110 1115 (lambda: getentries(allrevs),
1111 1116 'retrieve all index entries (forward)'),
1112 1117 (lambda: getentries(allrevs, 2),
1113 1118 'retrieve all index entries 2x (forward)'),
1114 1119 (lambda: getentries(allrevsrev),
1115 1120 'retrieve all index entries (reverse)'),
1116 1121 (lambda: getentries(allrevsrev, 2),
1117 1122 'retrieve all index entries 2x (reverse)'),
1118 1123 ]
1119 1124
1120 1125 for fn, title in benches:
1121 1126 timer, fm = gettimer(ui, opts)
1122 1127 timer(fn, title=title)
1123 1128 fm.end()
1124 1129
1125 1130 @command('perfrevlogrevisions', revlogopts + formatteropts +
1126 1131 [('d', 'dist', 100, 'distance between the revisions'),
1127 1132 ('s', 'startrev', 0, 'revision to start reading at'),
1128 1133 ('', 'reverse', False, 'read in reverse')],
1129 1134 '-c|-m|FILE')
1130 1135 def perfrevlogrevisions(ui, repo, file_=None, startrev=0, reverse=False,
1131 1136 **opts):
1132 1137 """Benchmark reading a series of revisions from a revlog.
1133 1138
1134 1139 By default, we read every ``-d/--dist`` revision from 0 to tip of
1135 1140 the specified revlog.
1136 1141
1137 1142 The start revision can be defined via ``-s/--startrev``.
1138 1143 """
1139 1144 rl = cmdutil.openrevlog(repo, 'perfrevlogrevisions', file_, opts)
1140 1145 rllen = getlen(ui)(rl)
1141 1146
1142 1147 def d():
1143 1148 rl.clearcaches()
1144 1149
1145 1150 beginrev = startrev
1146 1151 endrev = rllen
1147 1152 dist = opts['dist']
1148 1153
1149 1154 if reverse:
1150 1155 beginrev, endrev = endrev, beginrev
1151 1156 dist = -1 * dist
1152 1157
1153 1158 for x in xrange(beginrev, endrev, dist):
1154 1159 # Old revisions don't support passing int.
1155 1160 n = rl.node(x)
1156 1161 rl.revision(n)
1157 1162
1158 1163 timer, fm = gettimer(ui, opts)
1159 1164 timer(d)
1160 1165 fm.end()
1161 1166
1162 1167 @command('perfrevlogchunks', revlogopts + formatteropts +
1163 1168 [('e', 'engines', '', 'compression engines to use'),
1164 1169 ('s', 'startrev', 0, 'revision to start at')],
1165 1170 '-c|-m|FILE')
1166 1171 def perfrevlogchunks(ui, repo, file_=None, engines=None, startrev=0, **opts):
1167 1172 """Benchmark operations on revlog chunks.
1168 1173
1169 1174 Logically, each revlog is a collection of fulltext revisions. However,
1170 1175 stored within each revlog are "chunks" of possibly compressed data. This
1171 1176 data needs to be read and decompressed or compressed and written.
1172 1177
1173 1178 This command measures the time it takes to read+decompress and recompress
1174 1179 chunks in a revlog. It effectively isolates I/O and compression performance.
1175 1180 For measurements of higher-level operations like resolving revisions,
1176 1181 see ``perfrevlogrevisions`` and ``perfrevlogrevision``.
1177 1182 """
1178 1183 rl = cmdutil.openrevlog(repo, 'perfrevlogchunks', file_, opts)
1179 1184
1180 1185 # _chunkraw was renamed to _getsegmentforrevs.
1181 1186 try:
1182 1187 segmentforrevs = rl._getsegmentforrevs
1183 1188 except AttributeError:
1184 1189 segmentforrevs = rl._chunkraw
1185 1190
1186 1191 # Verify engines argument.
1187 1192 if engines:
1188 1193 engines = set(e.strip() for e in engines.split(','))
1189 1194 for engine in engines:
1190 1195 try:
1191 1196 util.compressionengines[engine]
1192 1197 except KeyError:
1193 1198 raise error.Abort('unknown compression engine: %s' % engine)
1194 1199 else:
1195 1200 engines = []
1196 1201 for e in util.compengines:
1197 1202 engine = util.compengines[e]
1198 1203 try:
1199 1204 if engine.available():
1200 1205 engine.revlogcompressor().compress('dummy')
1201 1206 engines.append(e)
1202 1207 except NotImplementedError:
1203 1208 pass
1204 1209
1205 1210 revs = list(rl.revs(startrev, len(rl) - 1))
1206 1211
1207 1212 def rlfh(rl):
1208 1213 if rl._inline:
1209 1214 return getsvfs(repo)(rl.indexfile)
1210 1215 else:
1211 1216 return getsvfs(repo)(rl.datafile)
1212 1217
1213 1218 def doread():
1214 1219 rl.clearcaches()
1215 1220 for rev in revs:
1216 1221 segmentforrevs(rev, rev)
1217 1222
1218 1223 def doreadcachedfh():
1219 1224 rl.clearcaches()
1220 1225 fh = rlfh(rl)
1221 1226 for rev in revs:
1222 1227 segmentforrevs(rev, rev, df=fh)
1223 1228
1224 1229 def doreadbatch():
1225 1230 rl.clearcaches()
1226 1231 segmentforrevs(revs[0], revs[-1])
1227 1232
1228 1233 def doreadbatchcachedfh():
1229 1234 rl.clearcaches()
1230 1235 fh = rlfh(rl)
1231 1236 segmentforrevs(revs[0], revs[-1], df=fh)
1232 1237
1233 1238 def dochunk():
1234 1239 rl.clearcaches()
1235 1240 fh = rlfh(rl)
1236 1241 for rev in revs:
1237 1242 rl._chunk(rev, df=fh)
1238 1243
1239 1244 chunks = [None]
1240 1245
1241 1246 def dochunkbatch():
1242 1247 rl.clearcaches()
1243 1248 fh = rlfh(rl)
1244 1249 # Save chunks as a side-effect.
1245 1250 chunks[0] = rl._chunks(revs, df=fh)
1246 1251
1247 1252 def docompress(compressor):
1248 1253 rl.clearcaches()
1249 1254
1250 1255 try:
1251 1256 # Swap in the requested compression engine.
1252 1257 oldcompressor = rl._compressor
1253 1258 rl._compressor = compressor
1254 1259 for chunk in chunks[0]:
1255 1260 rl.compress(chunk)
1256 1261 finally:
1257 1262 rl._compressor = oldcompressor
1258 1263
1259 1264 benches = [
1260 1265 (lambda: doread(), 'read'),
1261 1266 (lambda: doreadcachedfh(), 'read w/ reused fd'),
1262 1267 (lambda: doreadbatch(), 'read batch'),
1263 1268 (lambda: doreadbatchcachedfh(), 'read batch w/ reused fd'),
1264 1269 (lambda: dochunk(), 'chunk'),
1265 1270 (lambda: dochunkbatch(), 'chunk batch'),
1266 1271 ]
1267 1272
1268 1273 for engine in sorted(engines):
1269 1274 compressor = util.compengines[engine].revlogcompressor()
1270 1275 benches.append((functools.partial(docompress, compressor),
1271 1276 'compress w/ %s' % engine))
1272 1277
1273 1278 for fn, title in benches:
1274 1279 timer, fm = gettimer(ui, opts)
1275 1280 timer(fn, title=title)
1276 1281 fm.end()
1277 1282
1278 1283 @command('perfrevlogrevision', revlogopts + formatteropts +
1279 1284 [('', 'cache', False, 'use caches instead of clearing')],
1280 1285 '-c|-m|FILE REV')
1281 1286 def perfrevlogrevision(ui, repo, file_, rev=None, cache=None, **opts):
1282 1287 """Benchmark obtaining a revlog revision.
1283 1288
1284 1289 Obtaining a revlog revision consists of roughly the following steps:
1285 1290
1286 1291 1. Compute the delta chain
1287 1292 2. Obtain the raw chunks for that delta chain
1288 1293 3. Decompress each raw chunk
1289 1294 4. Apply binary patches to obtain fulltext
1290 1295 5. Verify hash of fulltext
1291 1296
1292 1297 This command measures the time spent in each of these phases.
1293 1298 """
1294 1299 if opts.get('changelog') or opts.get('manifest'):
1295 1300 file_, rev = None, file_
1296 1301 elif rev is None:
1297 1302 raise error.CommandError('perfrevlogrevision', 'invalid arguments')
1298 1303
1299 1304 r = cmdutil.openrevlog(repo, 'perfrevlogrevision', file_, opts)
1300 1305
1301 1306 # _chunkraw was renamed to _getsegmentforrevs.
1302 1307 try:
1303 1308 segmentforrevs = r._getsegmentforrevs
1304 1309 except AttributeError:
1305 1310 segmentforrevs = r._chunkraw
1306 1311
1307 1312 node = r.lookup(rev)
1308 1313 rev = r.rev(node)
1309 1314
1310 1315 def getrawchunks(data, chain):
1311 1316 start = r.start
1312 1317 length = r.length
1313 1318 inline = r._inline
1314 1319 iosize = r._io.size
1315 1320 buffer = util.buffer
1316 1321 offset = start(chain[0])
1317 1322
1318 1323 chunks = []
1319 1324 ladd = chunks.append
1320 1325
1321 1326 for rev in chain:
1322 1327 chunkstart = start(rev)
1323 1328 if inline:
1324 1329 chunkstart += (rev + 1) * iosize
1325 1330 chunklength = length(rev)
1326 1331 ladd(buffer(data, chunkstart - offset, chunklength))
1327 1332
1328 1333 return chunks
1329 1334
1330 1335 def dodeltachain(rev):
1331 1336 if not cache:
1332 1337 r.clearcaches()
1333 1338 r._deltachain(rev)
1334 1339
1335 1340 def doread(chain):
1336 1341 if not cache:
1337 1342 r.clearcaches()
1338 1343 segmentforrevs(chain[0], chain[-1])
1339 1344
1340 1345 def dorawchunks(data, chain):
1341 1346 if not cache:
1342 1347 r.clearcaches()
1343 1348 getrawchunks(data, chain)
1344 1349
1345 1350 def dodecompress(chunks):
1346 1351 decomp = r.decompress
1347 1352 for chunk in chunks:
1348 1353 decomp(chunk)
1349 1354
1350 1355 def dopatch(text, bins):
1351 1356 if not cache:
1352 1357 r.clearcaches()
1353 1358 mdiff.patches(text, bins)
1354 1359
1355 1360 def dohash(text):
1356 1361 if not cache:
1357 1362 r.clearcaches()
1358 1363 r.checkhash(text, node, rev=rev)
1359 1364
1360 1365 def dorevision():
1361 1366 if not cache:
1362 1367 r.clearcaches()
1363 1368 r.revision(node)
1364 1369
1365 1370 chain = r._deltachain(rev)[0]
1366 1371 data = segmentforrevs(chain[0], chain[-1])[1]
1367 1372 rawchunks = getrawchunks(data, chain)
1368 1373 bins = r._chunks(chain)
1369 1374 text = str(bins[0])
1370 1375 bins = bins[1:]
1371 1376 text = mdiff.patches(text, bins)
1372 1377
1373 1378 benches = [
1374 1379 (lambda: dorevision(), 'full'),
1375 1380 (lambda: dodeltachain(rev), 'deltachain'),
1376 1381 (lambda: doread(chain), 'read'),
1377 1382 (lambda: dorawchunks(data, chain), 'rawchunks'),
1378 1383 (lambda: dodecompress(rawchunks), 'decompress'),
1379 1384 (lambda: dopatch(text, bins), 'patch'),
1380 1385 (lambda: dohash(text), 'hash'),
1381 1386 ]
1382 1387
1383 1388 for fn, title in benches:
1384 1389 timer, fm = gettimer(ui, opts)
1385 1390 timer(fn, title=title)
1386 1391 fm.end()
1387 1392
1388 1393 @command('perfrevset',
1389 1394 [('C', 'clear', False, 'clear volatile cache between each call.'),
1390 1395 ('', 'contexts', False, 'obtain changectx for each revision')]
1391 1396 + formatteropts, "REVSET")
1392 1397 def perfrevset(ui, repo, expr, clear=False, contexts=False, **opts):
1393 1398 """benchmark the execution time of a revset
1394 1399
1395 1400 Use the --clean option if need to evaluate the impact of build volatile
1396 1401 revisions set cache on the revset execution. Volatile cache hold filtered
1397 1402 and obsolete related cache."""
1398 1403 timer, fm = gettimer(ui, opts)
1399 1404 def d():
1400 1405 if clear:
1401 1406 repo.invalidatevolatilesets()
1402 1407 if contexts:
1403 1408 for ctx in repo.set(expr): pass
1404 1409 else:
1405 1410 for r in repo.revs(expr): pass
1406 1411 timer(d)
1407 1412 fm.end()
1408 1413
1409 1414 @command('perfvolatilesets',
1410 1415 [('', 'clear-obsstore', False, 'drop obsstore between each call.'),
1411 1416 ] + formatteropts)
1412 1417 def perfvolatilesets(ui, repo, *names, **opts):
1413 1418 """benchmark the computation of various volatile set
1414 1419
1415 1420 Volatile set computes element related to filtering and obsolescence."""
1416 1421 timer, fm = gettimer(ui, opts)
1417 1422 repo = repo.unfiltered()
1418 1423
1419 1424 def getobs(name):
1420 1425 def d():
1421 1426 repo.invalidatevolatilesets()
1422 1427 if opts['clear_obsstore']:
1423 1428 clearfilecache(repo, 'obsstore')
1424 1429 obsolete.getrevs(repo, name)
1425 1430 return d
1426 1431
1427 1432 allobs = sorted(obsolete.cachefuncs)
1428 1433 if names:
1429 1434 allobs = [n for n in allobs if n in names]
1430 1435
1431 1436 for name in allobs:
1432 1437 timer(getobs(name), title=name)
1433 1438
1434 1439 def getfiltered(name):
1435 1440 def d():
1436 1441 repo.invalidatevolatilesets()
1437 1442 if opts['clear_obsstore']:
1438 1443 clearfilecache(repo, 'obsstore')
1439 1444 repoview.filterrevs(repo, name)
1440 1445 return d
1441 1446
1442 1447 allfilter = sorted(repoview.filtertable)
1443 1448 if names:
1444 1449 allfilter = [n for n in allfilter if n in names]
1445 1450
1446 1451 for name in allfilter:
1447 1452 timer(getfiltered(name), title=name)
1448 1453 fm.end()
1449 1454
1450 1455 @command('perfbranchmap',
1451 1456 [('f', 'full', False,
1452 1457 'Includes build time of subset'),
1453 1458 ('', 'clear-revbranch', False,
1454 1459 'purge the revbranch cache between computation'),
1455 1460 ] + formatteropts)
1456 1461 def perfbranchmap(ui, repo, full=False, clear_revbranch=False, **opts):
1457 1462 """benchmark the update of a branchmap
1458 1463
1459 1464 This benchmarks the full repo.branchmap() call with read and write disabled
1460 1465 """
1461 1466 timer, fm = gettimer(ui, opts)
1462 1467 def getbranchmap(filtername):
1463 1468 """generate a benchmark function for the filtername"""
1464 1469 if filtername is None:
1465 1470 view = repo
1466 1471 else:
1467 1472 view = repo.filtered(filtername)
1468 1473 def d():
1469 1474 if clear_revbranch:
1470 1475 repo.revbranchcache()._clear()
1471 1476 if full:
1472 1477 view._branchcaches.clear()
1473 1478 else:
1474 1479 view._branchcaches.pop(filtername, None)
1475 1480 view.branchmap()
1476 1481 return d
1477 1482 # add filter in smaller subset to bigger subset
1478 1483 possiblefilters = set(repoview.filtertable)
1479 1484 subsettable = getbranchmapsubsettable()
1480 1485 allfilters = []
1481 1486 while possiblefilters:
1482 1487 for name in possiblefilters:
1483 1488 subset = subsettable.get(name)
1484 1489 if subset not in possiblefilters:
1485 1490 break
1486 1491 else:
1487 1492 assert False, 'subset cycle %s!' % possiblefilters
1488 1493 allfilters.append(name)
1489 1494 possiblefilters.remove(name)
1490 1495
1491 1496 # warm the cache
1492 1497 if not full:
1493 1498 for name in allfilters:
1494 1499 repo.filtered(name).branchmap()
1495 1500 # add unfiltered
1496 1501 allfilters.append(None)
1497 1502
1498 1503 branchcacheread = safeattrsetter(branchmap, 'read')
1499 1504 branchcachewrite = safeattrsetter(branchmap.branchcache, 'write')
1500 1505 branchcacheread.set(lambda repo: None)
1501 1506 branchcachewrite.set(lambda bc, repo: None)
1502 1507 try:
1503 1508 for name in allfilters:
1504 1509 timer(getbranchmap(name), title=str(name))
1505 1510 finally:
1506 1511 branchcacheread.restore()
1507 1512 branchcachewrite.restore()
1508 1513 fm.end()
1509 1514
1510 1515 @command('perfloadmarkers')
1511 1516 def perfloadmarkers(ui, repo):
1512 1517 """benchmark the time to parse the on-disk markers for a repo
1513 1518
1514 1519 Result is the number of markers in the repo."""
1515 1520 timer, fm = gettimer(ui)
1516 1521 svfs = getsvfs(repo)
1517 1522 timer(lambda: len(obsolete.obsstore(svfs)))
1518 1523 fm.end()
1519 1524
1520 1525 @command('perflrucachedict', formatteropts +
1521 1526 [('', 'size', 4, 'size of cache'),
1522 1527 ('', 'gets', 10000, 'number of key lookups'),
1523 1528 ('', 'sets', 10000, 'number of key sets'),
1524 1529 ('', 'mixed', 10000, 'number of mixed mode operations'),
1525 1530 ('', 'mixedgetfreq', 50, 'frequency of get vs set ops in mixed mode')],
1526 1531 norepo=True)
1527 1532 def perflrucache(ui, size=4, gets=10000, sets=10000, mixed=10000,
1528 1533 mixedgetfreq=50, **opts):
1529 1534 def doinit():
1530 1535 for i in xrange(10000):
1531 1536 util.lrucachedict(size)
1532 1537
1533 1538 values = []
1534 1539 for i in xrange(size):
1535 1540 values.append(random.randint(0, sys.maxint))
1536 1541
1537 1542 # Get mode fills the cache and tests raw lookup performance with no
1538 1543 # eviction.
1539 1544 getseq = []
1540 1545 for i in xrange(gets):
1541 1546 getseq.append(random.choice(values))
1542 1547
1543 1548 def dogets():
1544 1549 d = util.lrucachedict(size)
1545 1550 for v in values:
1546 1551 d[v] = v
1547 1552 for key in getseq:
1548 1553 value = d[key]
1549 1554 value # silence pyflakes warning
1550 1555
1551 1556 # Set mode tests insertion speed with cache eviction.
1552 1557 setseq = []
1553 1558 for i in xrange(sets):
1554 1559 setseq.append(random.randint(0, sys.maxint))
1555 1560
1556 1561 def dosets():
1557 1562 d = util.lrucachedict(size)
1558 1563 for v in setseq:
1559 1564 d[v] = v
1560 1565
1561 1566 # Mixed mode randomly performs gets and sets with eviction.
1562 1567 mixedops = []
1563 1568 for i in xrange(mixed):
1564 1569 r = random.randint(0, 100)
1565 1570 if r < mixedgetfreq:
1566 1571 op = 0
1567 1572 else:
1568 1573 op = 1
1569 1574
1570 1575 mixedops.append((op, random.randint(0, size * 2)))
1571 1576
1572 1577 def domixed():
1573 1578 d = util.lrucachedict(size)
1574 1579
1575 1580 for op, v in mixedops:
1576 1581 if op == 0:
1577 1582 try:
1578 1583 d[v]
1579 1584 except KeyError:
1580 1585 pass
1581 1586 else:
1582 1587 d[v] = v
1583 1588
1584 1589 benches = [
1585 1590 (doinit, 'init'),
1586 1591 (dogets, 'gets'),
1587 1592 (dosets, 'sets'),
1588 1593 (domixed, 'mixed')
1589 1594 ]
1590 1595
1591 1596 for fn, title in benches:
1592 1597 timer, fm = gettimer(ui, opts)
1593 1598 timer(fn, title=title)
1594 1599 fm.end()
1595 1600
1596 1601 @command('perfwrite', formatteropts)
1597 1602 def perfwrite(ui, repo, **opts):
1598 1603 """microbenchmark ui.write
1599 1604 """
1600 1605 timer, fm = gettimer(ui, opts)
1601 1606 def write():
1602 1607 for i in range(100000):
1603 1608 ui.write(('Testing write performance\n'))
1604 1609 timer(write)
1605 1610 fm.end()
1606 1611
1607 1612 def uisetup(ui):
1608 1613 if (util.safehasattr(cmdutil, 'openrevlog') and
1609 1614 not util.safehasattr(commands, 'debugrevlogopts')):
1610 1615 # for "historical portability":
1611 1616 # In this case, Mercurial should be 1.9 (or a79fea6b3e77) -
1612 1617 # 3.7 (or 5606f7d0d063). Therefore, '--dir' option for
1613 1618 # openrevlog() should cause failure, because it has been
1614 1619 # available since 3.5 (or 49c583ca48c4).
1615 1620 def openrevlog(orig, repo, cmd, file_, opts):
1616 1621 if opts.get('dir') and not util.safehasattr(repo, 'dirlog'):
1617 1622 raise error.Abort("This version doesn't support --dir option",
1618 1623 hint="use 3.5 or later")
1619 1624 return orig(repo, cmd, file_, opts)
1620 1625 extensions.wrapfunction(cmdutil, 'openrevlog', openrevlog)
@@ -1,1999 +1,2000
1 1 # bundle2.py - generic container format to transmit arbitrary data.
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 """Handling of the new bundle2 format
8 8
9 9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 11 that will be handed to and processed by the application layer.
12 12
13 13
14 14 General format architecture
15 15 ===========================
16 16
17 17 The format is architectured as follow
18 18
19 19 - magic string
20 20 - stream level parameters
21 21 - payload parts (any number)
22 22 - end of stream marker.
23 23
24 24 the Binary format
25 25 ============================
26 26
27 27 All numbers are unsigned and big-endian.
28 28
29 29 stream level parameters
30 30 ------------------------
31 31
32 32 Binary format is as follow
33 33
34 34 :params size: int32
35 35
36 36 The total number of Bytes used by the parameters
37 37
38 38 :params value: arbitrary number of Bytes
39 39
40 40 A blob of `params size` containing the serialized version of all stream level
41 41 parameters.
42 42
43 43 The blob contains a space separated list of parameters. Parameters with value
44 44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45 45
46 46 Empty name are obviously forbidden.
47 47
48 48 Name MUST start with a letter. If this first letter is lower case, the
49 49 parameter is advisory and can be safely ignored. However when the first
50 50 letter is capital, the parameter is mandatory and the bundling process MUST
51 51 stop if he is not able to proceed it.
52 52
53 53 Stream parameters use a simple textual format for two main reasons:
54 54
55 55 - Stream level parameters should remain simple and we want to discourage any
56 56 crazy usage.
57 57 - Textual data allow easy human inspection of a bundle2 header in case of
58 58 troubles.
59 59
60 60 Any Applicative level options MUST go into a bundle2 part instead.
61 61
62 62 Payload part
63 63 ------------------------
64 64
65 65 Binary format is as follow
66 66
67 67 :header size: int32
68 68
69 69 The total number of Bytes used by the part header. When the header is empty
70 70 (size = 0) this is interpreted as the end of stream marker.
71 71
72 72 :header:
73 73
74 74 The header defines how to interpret the part. It contains two piece of
75 75 data: the part type, and the part parameters.
76 76
77 77 The part type is used to route an application level handler, that can
78 78 interpret payload.
79 79
80 80 Part parameters are passed to the application level handler. They are
81 81 meant to convey information that will help the application level object to
82 82 interpret the part payload.
83 83
84 84 The binary format of the header is has follow
85 85
86 86 :typesize: (one byte)
87 87
88 88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89 89
90 90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 91 to this part.
92 92
93 93 :parameters:
94 94
95 95 Part's parameter may have arbitrary content, the binary structure is::
96 96
97 97 <mandatory-count><advisory-count><param-sizes><param-data>
98 98
99 99 :mandatory-count: 1 byte, number of mandatory parameters
100 100
101 101 :advisory-count: 1 byte, number of advisory parameters
102 102
103 103 :param-sizes:
104 104
105 105 N couple of bytes, where N is the total number of parameters. Each
106 106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107 107
108 108 :param-data:
109 109
110 110 A blob of bytes from which each parameter key and value can be
111 111 retrieved using the list of size couples stored in the previous
112 112 field.
113 113
114 114 Mandatory parameters comes first, then the advisory ones.
115 115
116 116 Each parameter's key MUST be unique within the part.
117 117
118 118 :payload:
119 119
120 120 payload is a series of `<chunksize><chunkdata>`.
121 121
122 122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124 124
125 125 The current implementation always produces either zero or one chunk.
126 126 This is an implementation limitation that will ultimately be lifted.
127 127
128 128 `chunksize` can be negative to trigger special case processing. No such
129 129 processing is in place yet.
130 130
131 131 Bundle processing
132 132 ============================
133 133
134 134 Each part is processed in order using a "part handler". Handler are registered
135 135 for a certain part type.
136 136
137 137 The matching of a part to its handler is case insensitive. The case of the
138 138 part type is used to know if a part is mandatory or advisory. If the Part type
139 139 contains any uppercase char it is considered mandatory. When no handler is
140 140 known for a Mandatory part, the process is aborted and an exception is raised.
141 141 If the part is advisory and no handler is known, the part is ignored. When the
142 142 process is aborted, the full bundle is still read from the stream to keep the
143 143 channel usable. But none of the part read from an abort are processed. In the
144 144 future, dropping the stream may become an option for channel we do not care to
145 145 preserve.
146 146 """
147 147
148 148 from __future__ import absolute_import, division
149 149
150 150 import errno
151 151 import os
152 152 import re
153 153 import string
154 154 import struct
155 155 import sys
156 156
157 157 from .i18n import _
158 158 from . import (
159 159 changegroup,
160 160 error,
161 161 node as nodemod,
162 162 obsolete,
163 163 phases,
164 164 pushkey,
165 165 pycompat,
166 166 tags,
167 167 url,
168 168 util,
169 169 )
170 170
171 171 urlerr = util.urlerr
172 172 urlreq = util.urlreq
173 173
174 174 _pack = struct.pack
175 175 _unpack = struct.unpack
176 176
177 177 _fstreamparamsize = '>i'
178 178 _fpartheadersize = '>i'
179 179 _fparttypesize = '>B'
180 180 _fpartid = '>I'
181 181 _fpayloadsize = '>i'
182 182 _fpartparamcount = '>BB'
183 183
184 184 preferedchunksize = 4096
185 185
186 186 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
187 187
188 188 def outdebug(ui, message):
189 189 """debug regarding output stream (bundling)"""
190 190 if ui.configbool('devel', 'bundle2.debug'):
191 191 ui.debug('bundle2-output: %s\n' % message)
192 192
193 193 def indebug(ui, message):
194 194 """debug on input stream (unbundling)"""
195 195 if ui.configbool('devel', 'bundle2.debug'):
196 196 ui.debug('bundle2-input: %s\n' % message)
197 197
198 198 def validateparttype(parttype):
199 199 """raise ValueError if a parttype contains invalid character"""
200 200 if _parttypeforbidden.search(parttype):
201 201 raise ValueError(parttype)
202 202
203 203 def _makefpartparamsizes(nbparams):
204 204 """return a struct format to read part parameter sizes
205 205
206 206 The number parameters is variable so we need to build that format
207 207 dynamically.
208 208 """
209 209 return '>'+('BB'*nbparams)
210 210
211 211 parthandlermapping = {}
212 212
213 213 def parthandler(parttype, params=()):
214 214 """decorator that register a function as a bundle2 part handler
215 215
216 216 eg::
217 217
218 218 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
219 219 def myparttypehandler(...):
220 220 '''process a part of type "my part".'''
221 221 ...
222 222 """
223 223 validateparttype(parttype)
224 224 def _decorator(func):
225 225 lparttype = parttype.lower() # enforce lower case matching.
226 226 assert lparttype not in parthandlermapping
227 227 parthandlermapping[lparttype] = func
228 228 func.params = frozenset(params)
229 229 return func
230 230 return _decorator
231 231
232 232 class unbundlerecords(object):
233 233 """keep record of what happens during and unbundle
234 234
235 235 New records are added using `records.add('cat', obj)`. Where 'cat' is a
236 236 category of record and obj is an arbitrary object.
237 237
238 238 `records['cat']` will return all entries of this category 'cat'.
239 239
240 240 Iterating on the object itself will yield `('category', obj)` tuples
241 241 for all entries.
242 242
243 243 All iterations happens in chronological order.
244 244 """
245 245
246 246 def __init__(self):
247 247 self._categories = {}
248 248 self._sequences = []
249 249 self._replies = {}
250 250
251 251 def add(self, category, entry, inreplyto=None):
252 252 """add a new record of a given category.
253 253
254 254 The entry can then be retrieved in the list returned by
255 255 self['category']."""
256 256 self._categories.setdefault(category, []).append(entry)
257 257 self._sequences.append((category, entry))
258 258 if inreplyto is not None:
259 259 self.getreplies(inreplyto).add(category, entry)
260 260
261 261 def getreplies(self, partid):
262 262 """get the records that are replies to a specific part"""
263 263 return self._replies.setdefault(partid, unbundlerecords())
264 264
265 265 def __getitem__(self, cat):
266 266 return tuple(self._categories.get(cat, ()))
267 267
268 268 def __iter__(self):
269 269 return iter(self._sequences)
270 270
271 271 def __len__(self):
272 272 return len(self._sequences)
273 273
274 274 def __nonzero__(self):
275 275 return bool(self._sequences)
276 276
277 277 __bool__ = __nonzero__
278 278
279 279 class bundleoperation(object):
280 280 """an object that represents a single bundling process
281 281
282 282 Its purpose is to carry unbundle-related objects and states.
283 283
284 284 A new object should be created at the beginning of each bundle processing.
285 285 The object is to be returned by the processing function.
286 286
287 287 The object has very little content now it will ultimately contain:
288 288 * an access to the repo the bundle is applied to,
289 289 * a ui object,
290 290 * a way to retrieve a transaction to add changes to the repo,
291 291 * a way to record the result of processing each part,
292 292 * a way to construct a bundle response when applicable.
293 293 """
294 294
295 295 def __init__(self, repo, transactiongetter, captureoutput=True):
296 296 self.repo = repo
297 297 self.ui = repo.ui
298 298 self.records = unbundlerecords()
299 299 self.reply = None
300 300 self.captureoutput = captureoutput
301 301 self.hookargs = {}
302 302 self._gettransaction = transactiongetter
303 303
304 304 def gettransaction(self):
305 305 transaction = self._gettransaction()
306 306
307 307 if self.hookargs:
308 308 # the ones added to the transaction supercede those added
309 309 # to the operation.
310 310 self.hookargs.update(transaction.hookargs)
311 311 transaction.hookargs = self.hookargs
312 312
313 313 # mark the hookargs as flushed. further attempts to add to
314 314 # hookargs will result in an abort.
315 315 self.hookargs = None
316 316
317 317 return transaction
318 318
319 319 def addhookargs(self, hookargs):
320 320 if self.hookargs is None:
321 321 raise error.ProgrammingError('attempted to add hookargs to '
322 322 'operation after transaction started')
323 323 self.hookargs.update(hookargs)
324 324
325 325 class TransactionUnavailable(RuntimeError):
326 326 pass
327 327
328 328 def _notransaction():
329 329 """default method to get a transaction while processing a bundle
330 330
331 331 Raise an exception to highlight the fact that no transaction was expected
332 332 to be created"""
333 333 raise TransactionUnavailable()
334 334
335 335 def applybundle(repo, unbundler, tr, source=None, url=None, **kwargs):
336 336 # transform me into unbundler.apply() as soon as the freeze is lifted
337 337 if isinstance(unbundler, unbundle20):
338 338 tr.hookargs['bundle2'] = '1'
339 339 if source is not None and 'source' not in tr.hookargs:
340 340 tr.hookargs['source'] = source
341 341 if url is not None and 'url' not in tr.hookargs:
342 342 tr.hookargs['url'] = url
343 343 return processbundle(repo, unbundler, lambda: tr)
344 344 else:
345 345 # the transactiongetter won't be used, but we might as well set it
346 346 op = bundleoperation(repo, lambda: tr)
347 347 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
348 348 return op
349 349
350 350 class partiterator(object):
351 351 def __init__(self, repo, op, unbundler):
352 352 self.repo = repo
353 353 self.op = op
354 354 self.unbundler = unbundler
355 355 self.iterator = None
356 356 self.count = 0
357 357 self.current = None
358 358
359 359 def __enter__(self):
360 360 def func():
361 361 itr = enumerate(self.unbundler.iterparts())
362 362 for count, p in itr:
363 363 self.count = count
364 364 self.current = p
365 365 yield p
366 366 p.consume()
367 367 self.current = None
368 368 self.iterator = func()
369 369 return self.iterator
370 370
371 371 def __exit__(self, type, exc, tb):
372 372 if not self.iterator:
373 373 return
374 374
375 375 # Only gracefully abort in a normal exception situation. User aborts
376 376 # like Ctrl+C throw a KeyboardInterrupt which is not a base Exception,
377 377 # and should not gracefully cleanup.
378 378 if isinstance(exc, Exception):
379 379 # Any exceptions seeking to the end of the bundle at this point are
380 380 # almost certainly related to the underlying stream being bad.
381 381 # And, chances are that the exception we're handling is related to
382 382 # getting in that bad state. So, we swallow the seeking error and
383 383 # re-raise the original error.
384 384 seekerror = False
385 385 try:
386 386 if self.current:
387 387 # consume the part content to not corrupt the stream.
388 388 self.current.consume()
389 389
390 390 for part in self.iterator:
391 391 # consume the bundle content
392 392 part.consume()
393 393 except Exception:
394 394 seekerror = True
395 395
396 396 # Small hack to let caller code distinguish exceptions from bundle2
397 397 # processing from processing the old format. This is mostly needed
398 398 # to handle different return codes to unbundle according to the type
399 399 # of bundle. We should probably clean up or drop this return code
400 400 # craziness in a future version.
401 401 exc.duringunbundle2 = True
402 402 salvaged = []
403 403 replycaps = None
404 404 if self.op.reply is not None:
405 405 salvaged = self.op.reply.salvageoutput()
406 406 replycaps = self.op.reply.capabilities
407 407 exc._replycaps = replycaps
408 408 exc._bundle2salvagedoutput = salvaged
409 409
410 410 # Re-raising from a variable loses the original stack. So only use
411 411 # that form if we need to.
412 412 if seekerror:
413 413 raise exc
414 414
415 415 self.repo.ui.debug('bundle2-input-bundle: %i parts total\n' %
416 416 self.count)
417 417
418 418 def processbundle(repo, unbundler, transactiongetter=None, op=None):
419 419 """This function process a bundle, apply effect to/from a repo
420 420
421 421 It iterates over each part then searches for and uses the proper handling
422 422 code to process the part. Parts are processed in order.
423 423
424 424 Unknown Mandatory part will abort the process.
425 425
426 426 It is temporarily possible to provide a prebuilt bundleoperation to the
427 427 function. This is used to ensure output is properly propagated in case of
428 428 an error during the unbundling. This output capturing part will likely be
429 429 reworked and this ability will probably go away in the process.
430 430 """
431 431 if op is None:
432 432 if transactiongetter is None:
433 433 transactiongetter = _notransaction
434 434 op = bundleoperation(repo, transactiongetter)
435 435 # todo:
436 436 # - replace this is a init function soon.
437 437 # - exception catching
438 438 unbundler.params
439 439 if repo.ui.debugflag:
440 440 msg = ['bundle2-input-bundle:']
441 441 if unbundler.params:
442 442 msg.append(' %i params' % len(unbundler.params))
443 443 if op._gettransaction is None or op._gettransaction is _notransaction:
444 444 msg.append(' no-transaction')
445 445 else:
446 446 msg.append(' with-transaction')
447 447 msg.append('\n')
448 448 repo.ui.debug(''.join(msg))
449 449
450 450 processparts(repo, op, unbundler)
451 451
452 452 return op
453 453
454 454 def processparts(repo, op, unbundler):
455 455 with partiterator(repo, op, unbundler) as parts:
456 456 for part in parts:
457 457 _processpart(op, part)
458 458
459 459 def _processchangegroup(op, cg, tr, source, url, **kwargs):
460 460 ret = cg.apply(op.repo, tr, source, url, **kwargs)
461 461 op.records.add('changegroup', {
462 462 'return': ret,
463 463 })
464 464 return ret
465 465
466 466 def _gethandler(op, part):
467 467 status = 'unknown' # used by debug output
468 468 try:
469 469 handler = parthandlermapping.get(part.type)
470 470 if handler is None:
471 471 status = 'unsupported-type'
472 472 raise error.BundleUnknownFeatureError(parttype=part.type)
473 473 indebug(op.ui, 'found a handler for part %s' % part.type)
474 474 unknownparams = part.mandatorykeys - handler.params
475 475 if unknownparams:
476 476 unknownparams = list(unknownparams)
477 477 unknownparams.sort()
478 478 status = 'unsupported-params (%s)' % ', '.join(unknownparams)
479 479 raise error.BundleUnknownFeatureError(parttype=part.type,
480 480 params=unknownparams)
481 481 status = 'supported'
482 482 except error.BundleUnknownFeatureError as exc:
483 483 if part.mandatory: # mandatory parts
484 484 raise
485 485 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
486 486 return # skip to part processing
487 487 finally:
488 488 if op.ui.debugflag:
489 489 msg = ['bundle2-input-part: "%s"' % part.type]
490 490 if not part.mandatory:
491 491 msg.append(' (advisory)')
492 492 nbmp = len(part.mandatorykeys)
493 493 nbap = len(part.params) - nbmp
494 494 if nbmp or nbap:
495 495 msg.append(' (params:')
496 496 if nbmp:
497 497 msg.append(' %i mandatory' % nbmp)
498 498 if nbap:
499 499 msg.append(' %i advisory' % nbmp)
500 500 msg.append(')')
501 501 msg.append(' %s\n' % status)
502 502 op.ui.debug(''.join(msg))
503 503
504 504 return handler
505 505
506 506 def _processpart(op, part):
507 507 """process a single part from a bundle
508 508
509 509 The part is guaranteed to have been fully consumed when the function exits
510 510 (even if an exception is raised)."""
511 511 handler = _gethandler(op, part)
512 512 if handler is None:
513 513 return
514 514
515 515 # handler is called outside the above try block so that we don't
516 516 # risk catching KeyErrors from anything other than the
517 517 # parthandlermapping lookup (any KeyError raised by handler()
518 518 # itself represents a defect of a different variety).
519 519 output = None
520 520 if op.captureoutput and op.reply is not None:
521 521 op.ui.pushbuffer(error=True, subproc=True)
522 522 output = ''
523 523 try:
524 524 handler(op, part)
525 525 finally:
526 526 if output is not None:
527 527 output = op.ui.popbuffer()
528 528 if output:
529 529 outpart = op.reply.newpart('output', data=output,
530 530 mandatory=False)
531 531 outpart.addparam(
532 532 'in-reply-to', pycompat.bytestr(part.id), mandatory=False)
533 533
534 534 def decodecaps(blob):
535 535 """decode a bundle2 caps bytes blob into a dictionary
536 536
537 537 The blob is a list of capabilities (one per line)
538 538 Capabilities may have values using a line of the form::
539 539
540 540 capability=value1,value2,value3
541 541
542 542 The values are always a list."""
543 543 caps = {}
544 544 for line in blob.splitlines():
545 545 if not line:
546 546 continue
547 547 if '=' not in line:
548 548 key, vals = line, ()
549 549 else:
550 550 key, vals = line.split('=', 1)
551 551 vals = vals.split(',')
552 552 key = urlreq.unquote(key)
553 553 vals = [urlreq.unquote(v) for v in vals]
554 554 caps[key] = vals
555 555 return caps
556 556
557 557 def encodecaps(caps):
558 558 """encode a bundle2 caps dictionary into a bytes blob"""
559 559 chunks = []
560 560 for ca in sorted(caps):
561 561 vals = caps[ca]
562 562 ca = urlreq.quote(ca)
563 563 vals = [urlreq.quote(v) for v in vals]
564 564 if vals:
565 565 ca = "%s=%s" % (ca, ','.join(vals))
566 566 chunks.append(ca)
567 567 return '\n'.join(chunks)
568 568
569 569 bundletypes = {
570 570 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
571 571 # since the unification ssh accepts a header but there
572 572 # is no capability signaling it.
573 573 "HG20": (), # special-cased below
574 574 "HG10UN": ("HG10UN", 'UN'),
575 575 "HG10BZ": ("HG10", 'BZ'),
576 576 "HG10GZ": ("HG10GZ", 'GZ'),
577 577 }
578 578
579 579 # hgweb uses this list to communicate its preferred type
580 580 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
581 581
582 582 class bundle20(object):
583 583 """represent an outgoing bundle2 container
584 584
585 585 Use the `addparam` method to add stream level parameter. and `newpart` to
586 586 populate it. Then call `getchunks` to retrieve all the binary chunks of
587 587 data that compose the bundle2 container."""
588 588
589 589 _magicstring = 'HG20'
590 590
591 591 def __init__(self, ui, capabilities=()):
592 592 self.ui = ui
593 593 self._params = []
594 594 self._parts = []
595 595 self.capabilities = dict(capabilities)
596 596 self._compengine = util.compengines.forbundletype('UN')
597 597 self._compopts = None
598 598
599 599 def setcompression(self, alg, compopts=None):
600 600 """setup core part compression to <alg>"""
601 601 if alg in (None, 'UN'):
602 602 return
603 603 assert not any(n.lower() == 'compression' for n, v in self._params)
604 604 self.addparam('Compression', alg)
605 605 self._compengine = util.compengines.forbundletype(alg)
606 606 self._compopts = compopts
607 607
608 608 @property
609 609 def nbparts(self):
610 610 """total number of parts added to the bundler"""
611 611 return len(self._parts)
612 612
613 613 # methods used to defines the bundle2 content
614 614 def addparam(self, name, value=None):
615 615 """add a stream level parameter"""
616 616 if not name:
617 617 raise ValueError(r'empty parameter name')
618 618 if name[0:1] not in pycompat.bytestr(string.ascii_letters):
619 619 raise ValueError(r'non letter first character: %s' % name)
620 620 self._params.append((name, value))
621 621
622 622 def addpart(self, part):
623 623 """add a new part to the bundle2 container
624 624
625 625 Parts contains the actual applicative payload."""
626 626 assert part.id is None
627 627 part.id = len(self._parts) # very cheap counter
628 628 self._parts.append(part)
629 629
630 630 def newpart(self, typeid, *args, **kwargs):
631 631 """create a new part and add it to the containers
632 632
633 633 As the part is directly added to the containers. For now, this means
634 634 that any failure to properly initialize the part after calling
635 635 ``newpart`` should result in a failure of the whole bundling process.
636 636
637 637 You can still fall back to manually create and add if you need better
638 638 control."""
639 639 part = bundlepart(typeid, *args, **kwargs)
640 640 self.addpart(part)
641 641 return part
642 642
643 643 # methods used to generate the bundle2 stream
644 644 def getchunks(self):
645 645 if self.ui.debugflag:
646 646 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
647 647 if self._params:
648 648 msg.append(' (%i params)' % len(self._params))
649 649 msg.append(' %i parts total\n' % len(self._parts))
650 650 self.ui.debug(''.join(msg))
651 651 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
652 652 yield self._magicstring
653 653 param = self._paramchunk()
654 654 outdebug(self.ui, 'bundle parameter: %s' % param)
655 655 yield _pack(_fstreamparamsize, len(param))
656 656 if param:
657 657 yield param
658 658 for chunk in self._compengine.compressstream(self._getcorechunk(),
659 659 self._compopts):
660 660 yield chunk
661 661
662 662 def _paramchunk(self):
663 663 """return a encoded version of all stream parameters"""
664 664 blocks = []
665 665 for par, value in self._params:
666 666 par = urlreq.quote(par)
667 667 if value is not None:
668 668 value = urlreq.quote(value)
669 669 par = '%s=%s' % (par, value)
670 670 blocks.append(par)
671 671 return ' '.join(blocks)
672 672
673 673 def _getcorechunk(self):
674 674 """yield chunk for the core part of the bundle
675 675
676 676 (all but headers and parameters)"""
677 677 outdebug(self.ui, 'start of parts')
678 678 for part in self._parts:
679 679 outdebug(self.ui, 'bundle part: "%s"' % part.type)
680 680 for chunk in part.getchunks(ui=self.ui):
681 681 yield chunk
682 682 outdebug(self.ui, 'end of bundle')
683 683 yield _pack(_fpartheadersize, 0)
684 684
685 685
686 686 def salvageoutput(self):
687 687 """return a list with a copy of all output parts in the bundle
688 688
689 689 This is meant to be used during error handling to make sure we preserve
690 690 server output"""
691 691 salvaged = []
692 692 for part in self._parts:
693 693 if part.type.startswith('output'):
694 694 salvaged.append(part.copy())
695 695 return salvaged
696 696
697 697
698 698 class unpackermixin(object):
699 699 """A mixin to extract bytes and struct data from a stream"""
700 700
701 701 def __init__(self, fp):
702 702 self._fp = fp
703 703
704 704 def _unpack(self, format):
705 705 """unpack this struct format from the stream
706 706
707 707 This method is meant for internal usage by the bundle2 protocol only.
708 708 They directly manipulate the low level stream including bundle2 level
709 709 instruction.
710 710
711 711 Do not use it to implement higher-level logic or methods."""
712 712 data = self._readexact(struct.calcsize(format))
713 713 return _unpack(format, data)
714 714
715 715 def _readexact(self, size):
716 716 """read exactly <size> bytes from the stream
717 717
718 718 This method is meant for internal usage by the bundle2 protocol only.
719 719 They directly manipulate the low level stream including bundle2 level
720 720 instruction.
721 721
722 722 Do not use it to implement higher-level logic or methods."""
723 723 return changegroup.readexactly(self._fp, size)
724 724
725 725 def getunbundler(ui, fp, magicstring=None):
726 726 """return a valid unbundler object for a given magicstring"""
727 727 if magicstring is None:
728 728 magicstring = changegroup.readexactly(fp, 4)
729 729 magic, version = magicstring[0:2], magicstring[2:4]
730 730 if magic != 'HG':
731 731 ui.debug(
732 732 "error: invalid magic: %r (version %r), should be 'HG'\n"
733 733 % (magic, version))
734 734 raise error.Abort(_('not a Mercurial bundle'))
735 735 unbundlerclass = formatmap.get(version)
736 736 if unbundlerclass is None:
737 737 raise error.Abort(_('unknown bundle version %s') % version)
738 738 unbundler = unbundlerclass(ui, fp)
739 739 indebug(ui, 'start processing of %s stream' % magicstring)
740 740 return unbundler
741 741
742 742 class unbundle20(unpackermixin):
743 743 """interpret a bundle2 stream
744 744
745 745 This class is fed with a binary stream and yields parts through its
746 746 `iterparts` methods."""
747 747
748 748 _magicstring = 'HG20'
749 749
750 750 def __init__(self, ui, fp):
751 751 """If header is specified, we do not read it out of the stream."""
752 752 self.ui = ui
753 753 self._compengine = util.compengines.forbundletype('UN')
754 754 self._compressed = None
755 755 super(unbundle20, self).__init__(fp)
756 756
757 757 @util.propertycache
758 758 def params(self):
759 759 """dictionary of stream level parameters"""
760 760 indebug(self.ui, 'reading bundle2 stream parameters')
761 761 params = {}
762 762 paramssize = self._unpack(_fstreamparamsize)[0]
763 763 if paramssize < 0:
764 764 raise error.BundleValueError('negative bundle param size: %i'
765 765 % paramssize)
766 766 if paramssize:
767 767 params = self._readexact(paramssize)
768 768 params = self._processallparams(params)
769 769 return params
770 770
771 771 def _processallparams(self, paramsblock):
772 772 """"""
773 773 params = util.sortdict()
774 774 for p in paramsblock.split(' '):
775 775 p = p.split('=', 1)
776 776 p = [urlreq.unquote(i) for i in p]
777 777 if len(p) < 2:
778 778 p.append(None)
779 779 self._processparam(*p)
780 780 params[p[0]] = p[1]
781 781 return params
782 782
783 783
784 784 def _processparam(self, name, value):
785 785 """process a parameter, applying its effect if needed
786 786
787 787 Parameter starting with a lower case letter are advisory and will be
788 788 ignored when unknown. Those starting with an upper case letter are
789 789 mandatory and will this function will raise a KeyError when unknown.
790 790
791 791 Note: no option are currently supported. Any input will be either
792 792 ignored or failing.
793 793 """
794 794 if not name:
795 795 raise ValueError(r'empty parameter name')
796 796 if name[0:1] not in pycompat.bytestr(string.ascii_letters):
797 797 raise ValueError(r'non letter first character: %s' % name)
798 798 try:
799 799 handler = b2streamparamsmap[name.lower()]
800 800 except KeyError:
801 801 if name[0:1].islower():
802 802 indebug(self.ui, "ignoring unknown parameter %s" % name)
803 803 else:
804 804 raise error.BundleUnknownFeatureError(params=(name,))
805 805 else:
806 806 handler(self, name, value)
807 807
808 808 def _forwardchunks(self):
809 809 """utility to transfer a bundle2 as binary
810 810
811 811 This is made necessary by the fact the 'getbundle' command over 'ssh'
812 812 have no way to know then the reply end, relying on the bundle to be
813 813 interpreted to know its end. This is terrible and we are sorry, but we
814 814 needed to move forward to get general delta enabled.
815 815 """
816 816 yield self._magicstring
817 817 assert 'params' not in vars(self)
818 818 paramssize = self._unpack(_fstreamparamsize)[0]
819 819 if paramssize < 0:
820 820 raise error.BundleValueError('negative bundle param size: %i'
821 821 % paramssize)
822 822 yield _pack(_fstreamparamsize, paramssize)
823 823 if paramssize:
824 824 params = self._readexact(paramssize)
825 825 self._processallparams(params)
826 826 yield params
827 827 assert self._compengine.bundletype == 'UN'
828 828 # From there, payload might need to be decompressed
829 829 self._fp = self._compengine.decompressorreader(self._fp)
830 830 emptycount = 0
831 831 while emptycount < 2:
832 832 # so we can brainlessly loop
833 833 assert _fpartheadersize == _fpayloadsize
834 834 size = self._unpack(_fpartheadersize)[0]
835 835 yield _pack(_fpartheadersize, size)
836 836 if size:
837 837 emptycount = 0
838 838 else:
839 839 emptycount += 1
840 840 continue
841 841 if size == flaginterrupt:
842 842 continue
843 843 elif size < 0:
844 844 raise error.BundleValueError('negative chunk size: %i')
845 845 yield self._readexact(size)
846 846
847 847
848 def iterparts(self):
848 def iterparts(self, seekable=False):
849 849 """yield all parts contained in the stream"""
850 cls = seekableunbundlepart if seekable else unbundlepart
850 851 # make sure param have been loaded
851 852 self.params
852 853 # From there, payload need to be decompressed
853 854 self._fp = self._compengine.decompressorreader(self._fp)
854 855 indebug(self.ui, 'start extraction of bundle2 parts')
855 856 headerblock = self._readpartheader()
856 857 while headerblock is not None:
857 part = seekableunbundlepart(self.ui, headerblock, self._fp)
858 part = cls(self.ui, headerblock, self._fp)
858 859 yield part
859 860 # Ensure part is fully consumed so we can start reading the next
860 861 # part.
861 862 part.consume()
862 863
863 864 headerblock = self._readpartheader()
864 865 indebug(self.ui, 'end of bundle2 stream')
865 866
866 867 def _readpartheader(self):
867 868 """reads a part header size and return the bytes blob
868 869
869 870 returns None if empty"""
870 871 headersize = self._unpack(_fpartheadersize)[0]
871 872 if headersize < 0:
872 873 raise error.BundleValueError('negative part header size: %i'
873 874 % headersize)
874 875 indebug(self.ui, 'part header size: %i' % headersize)
875 876 if headersize:
876 877 return self._readexact(headersize)
877 878 return None
878 879
879 880 def compressed(self):
880 881 self.params # load params
881 882 return self._compressed
882 883
883 884 def close(self):
884 885 """close underlying file"""
885 886 if util.safehasattr(self._fp, 'close'):
886 887 return self._fp.close()
887 888
888 889 formatmap = {'20': unbundle20}
889 890
890 891 b2streamparamsmap = {}
891 892
892 893 def b2streamparamhandler(name):
893 894 """register a handler for a stream level parameter"""
894 895 def decorator(func):
895 896 assert name not in formatmap
896 897 b2streamparamsmap[name] = func
897 898 return func
898 899 return decorator
899 900
900 901 @b2streamparamhandler('compression')
901 902 def processcompression(unbundler, param, value):
902 903 """read compression parameter and install payload decompression"""
903 904 if value not in util.compengines.supportedbundletypes:
904 905 raise error.BundleUnknownFeatureError(params=(param,),
905 906 values=(value,))
906 907 unbundler._compengine = util.compengines.forbundletype(value)
907 908 if value is not None:
908 909 unbundler._compressed = True
909 910
910 911 class bundlepart(object):
911 912 """A bundle2 part contains application level payload
912 913
913 914 The part `type` is used to route the part to the application level
914 915 handler.
915 916
916 917 The part payload is contained in ``part.data``. It could be raw bytes or a
917 918 generator of byte chunks.
918 919
919 920 You can add parameters to the part using the ``addparam`` method.
920 921 Parameters can be either mandatory (default) or advisory. Remote side
921 922 should be able to safely ignore the advisory ones.
922 923
923 924 Both data and parameters cannot be modified after the generation has begun.
924 925 """
925 926
926 927 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
927 928 data='', mandatory=True):
928 929 validateparttype(parttype)
929 930 self.id = None
930 931 self.type = parttype
931 932 self._data = data
932 933 self._mandatoryparams = list(mandatoryparams)
933 934 self._advisoryparams = list(advisoryparams)
934 935 # checking for duplicated entries
935 936 self._seenparams = set()
936 937 for pname, __ in self._mandatoryparams + self._advisoryparams:
937 938 if pname in self._seenparams:
938 939 raise error.ProgrammingError('duplicated params: %s' % pname)
939 940 self._seenparams.add(pname)
940 941 # status of the part's generation:
941 942 # - None: not started,
942 943 # - False: currently generated,
943 944 # - True: generation done.
944 945 self._generated = None
945 946 self.mandatory = mandatory
946 947
947 948 def __repr__(self):
948 949 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
949 950 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
950 951 % (cls, id(self), self.id, self.type, self.mandatory))
951 952
952 953 def copy(self):
953 954 """return a copy of the part
954 955
955 956 The new part have the very same content but no partid assigned yet.
956 957 Parts with generated data cannot be copied."""
957 958 assert not util.safehasattr(self.data, 'next')
958 959 return self.__class__(self.type, self._mandatoryparams,
959 960 self._advisoryparams, self._data, self.mandatory)
960 961
961 962 # methods used to defines the part content
962 963 @property
963 964 def data(self):
964 965 return self._data
965 966
966 967 @data.setter
967 968 def data(self, data):
968 969 if self._generated is not None:
969 970 raise error.ReadOnlyPartError('part is being generated')
970 971 self._data = data
971 972
972 973 @property
973 974 def mandatoryparams(self):
974 975 # make it an immutable tuple to force people through ``addparam``
975 976 return tuple(self._mandatoryparams)
976 977
977 978 @property
978 979 def advisoryparams(self):
979 980 # make it an immutable tuple to force people through ``addparam``
980 981 return tuple(self._advisoryparams)
981 982
982 983 def addparam(self, name, value='', mandatory=True):
983 984 """add a parameter to the part
984 985
985 986 If 'mandatory' is set to True, the remote handler must claim support
986 987 for this parameter or the unbundling will be aborted.
987 988
988 989 The 'name' and 'value' cannot exceed 255 bytes each.
989 990 """
990 991 if self._generated is not None:
991 992 raise error.ReadOnlyPartError('part is being generated')
992 993 if name in self._seenparams:
993 994 raise ValueError('duplicated params: %s' % name)
994 995 self._seenparams.add(name)
995 996 params = self._advisoryparams
996 997 if mandatory:
997 998 params = self._mandatoryparams
998 999 params.append((name, value))
999 1000
1000 1001 # methods used to generates the bundle2 stream
1001 1002 def getchunks(self, ui):
1002 1003 if self._generated is not None:
1003 1004 raise error.ProgrammingError('part can only be consumed once')
1004 1005 self._generated = False
1005 1006
1006 1007 if ui.debugflag:
1007 1008 msg = ['bundle2-output-part: "%s"' % self.type]
1008 1009 if not self.mandatory:
1009 1010 msg.append(' (advisory)')
1010 1011 nbmp = len(self.mandatoryparams)
1011 1012 nbap = len(self.advisoryparams)
1012 1013 if nbmp or nbap:
1013 1014 msg.append(' (params:')
1014 1015 if nbmp:
1015 1016 msg.append(' %i mandatory' % nbmp)
1016 1017 if nbap:
1017 1018 msg.append(' %i advisory' % nbmp)
1018 1019 msg.append(')')
1019 1020 if not self.data:
1020 1021 msg.append(' empty payload')
1021 1022 elif (util.safehasattr(self.data, 'next')
1022 1023 or util.safehasattr(self.data, '__next__')):
1023 1024 msg.append(' streamed payload')
1024 1025 else:
1025 1026 msg.append(' %i bytes payload' % len(self.data))
1026 1027 msg.append('\n')
1027 1028 ui.debug(''.join(msg))
1028 1029
1029 1030 #### header
1030 1031 if self.mandatory:
1031 1032 parttype = self.type.upper()
1032 1033 else:
1033 1034 parttype = self.type.lower()
1034 1035 outdebug(ui, 'part %s: "%s"' % (pycompat.bytestr(self.id), parttype))
1035 1036 ## parttype
1036 1037 header = [_pack(_fparttypesize, len(parttype)),
1037 1038 parttype, _pack(_fpartid, self.id),
1038 1039 ]
1039 1040 ## parameters
1040 1041 # count
1041 1042 manpar = self.mandatoryparams
1042 1043 advpar = self.advisoryparams
1043 1044 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
1044 1045 # size
1045 1046 parsizes = []
1046 1047 for key, value in manpar:
1047 1048 parsizes.append(len(key))
1048 1049 parsizes.append(len(value))
1049 1050 for key, value in advpar:
1050 1051 parsizes.append(len(key))
1051 1052 parsizes.append(len(value))
1052 1053 paramsizes = _pack(_makefpartparamsizes(len(parsizes) // 2), *parsizes)
1053 1054 header.append(paramsizes)
1054 1055 # key, value
1055 1056 for key, value in manpar:
1056 1057 header.append(key)
1057 1058 header.append(value)
1058 1059 for key, value in advpar:
1059 1060 header.append(key)
1060 1061 header.append(value)
1061 1062 ## finalize header
1062 1063 try:
1063 1064 headerchunk = ''.join(header)
1064 1065 except TypeError:
1065 1066 raise TypeError(r'Found a non-bytes trying to '
1066 1067 r'build bundle part header: %r' % header)
1067 1068 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
1068 1069 yield _pack(_fpartheadersize, len(headerchunk))
1069 1070 yield headerchunk
1070 1071 ## payload
1071 1072 try:
1072 1073 for chunk in self._payloadchunks():
1073 1074 outdebug(ui, 'payload chunk size: %i' % len(chunk))
1074 1075 yield _pack(_fpayloadsize, len(chunk))
1075 1076 yield chunk
1076 1077 except GeneratorExit:
1077 1078 # GeneratorExit means that nobody is listening for our
1078 1079 # results anyway, so just bail quickly rather than trying
1079 1080 # to produce an error part.
1080 1081 ui.debug('bundle2-generatorexit\n')
1081 1082 raise
1082 1083 except BaseException as exc:
1083 1084 bexc = util.forcebytestr(exc)
1084 1085 # backup exception data for later
1085 1086 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
1086 1087 % bexc)
1087 1088 tb = sys.exc_info()[2]
1088 1089 msg = 'unexpected error: %s' % bexc
1089 1090 interpart = bundlepart('error:abort', [('message', msg)],
1090 1091 mandatory=False)
1091 1092 interpart.id = 0
1092 1093 yield _pack(_fpayloadsize, -1)
1093 1094 for chunk in interpart.getchunks(ui=ui):
1094 1095 yield chunk
1095 1096 outdebug(ui, 'closing payload chunk')
1096 1097 # abort current part payload
1097 1098 yield _pack(_fpayloadsize, 0)
1098 1099 pycompat.raisewithtb(exc, tb)
1099 1100 # end of payload
1100 1101 outdebug(ui, 'closing payload chunk')
1101 1102 yield _pack(_fpayloadsize, 0)
1102 1103 self._generated = True
1103 1104
1104 1105 def _payloadchunks(self):
1105 1106 """yield chunks of a the part payload
1106 1107
1107 1108 Exists to handle the different methods to provide data to a part."""
1108 1109 # we only support fixed size data now.
1109 1110 # This will be improved in the future.
1110 1111 if (util.safehasattr(self.data, 'next')
1111 1112 or util.safehasattr(self.data, '__next__')):
1112 1113 buff = util.chunkbuffer(self.data)
1113 1114 chunk = buff.read(preferedchunksize)
1114 1115 while chunk:
1115 1116 yield chunk
1116 1117 chunk = buff.read(preferedchunksize)
1117 1118 elif len(self.data):
1118 1119 yield self.data
1119 1120
1120 1121
1121 1122 flaginterrupt = -1
1122 1123
1123 1124 class interrupthandler(unpackermixin):
1124 1125 """read one part and process it with restricted capability
1125 1126
1126 1127 This allows to transmit exception raised on the producer size during part
1127 1128 iteration while the consumer is reading a part.
1128 1129
1129 1130 Part processed in this manner only have access to a ui object,"""
1130 1131
1131 1132 def __init__(self, ui, fp):
1132 1133 super(interrupthandler, self).__init__(fp)
1133 1134 self.ui = ui
1134 1135
1135 1136 def _readpartheader(self):
1136 1137 """reads a part header size and return the bytes blob
1137 1138
1138 1139 returns None if empty"""
1139 1140 headersize = self._unpack(_fpartheadersize)[0]
1140 1141 if headersize < 0:
1141 1142 raise error.BundleValueError('negative part header size: %i'
1142 1143 % headersize)
1143 1144 indebug(self.ui, 'part header size: %i\n' % headersize)
1144 1145 if headersize:
1145 1146 return self._readexact(headersize)
1146 1147 return None
1147 1148
1148 1149 def __call__(self):
1149 1150
1150 1151 self.ui.debug('bundle2-input-stream-interrupt:'
1151 1152 ' opening out of band context\n')
1152 1153 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1153 1154 headerblock = self._readpartheader()
1154 1155 if headerblock is None:
1155 1156 indebug(self.ui, 'no part found during interruption.')
1156 1157 return
1157 part = seekableunbundlepart(self.ui, headerblock, self._fp)
1158 part = unbundlepart(self.ui, headerblock, self._fp)
1158 1159 op = interruptoperation(self.ui)
1159 1160 hardabort = False
1160 1161 try:
1161 1162 _processpart(op, part)
1162 1163 except (SystemExit, KeyboardInterrupt):
1163 1164 hardabort = True
1164 1165 raise
1165 1166 finally:
1166 1167 if not hardabort:
1167 1168 part.consume()
1168 1169 self.ui.debug('bundle2-input-stream-interrupt:'
1169 1170 ' closing out of band context\n')
1170 1171
1171 1172 class interruptoperation(object):
1172 1173 """A limited operation to be use by part handler during interruption
1173 1174
1174 1175 It only have access to an ui object.
1175 1176 """
1176 1177
1177 1178 def __init__(self, ui):
1178 1179 self.ui = ui
1179 1180 self.reply = None
1180 1181 self.captureoutput = False
1181 1182
1182 1183 @property
1183 1184 def repo(self):
1184 1185 raise error.ProgrammingError('no repo access from stream interruption')
1185 1186
1186 1187 def gettransaction(self):
1187 1188 raise TransactionUnavailable('no repo access from stream interruption')
1188 1189
1189 1190 def decodepayloadchunks(ui, fh):
1190 1191 """Reads bundle2 part payload data into chunks.
1191 1192
1192 1193 Part payload data consists of framed chunks. This function takes
1193 1194 a file handle and emits those chunks.
1194 1195 """
1195 1196 headersize = struct.calcsize(_fpayloadsize)
1196 1197 readexactly = changegroup.readexactly
1197 1198
1198 1199 chunksize = _unpack(_fpayloadsize, readexactly(fh, headersize))[0]
1199 1200 indebug(ui, 'payload chunk size: %i' % chunksize)
1200 1201
1201 1202 while chunksize:
1202 1203 if chunksize >= 0:
1203 1204 yield readexactly(fh, chunksize)
1204 1205 elif chunksize == flaginterrupt:
1205 1206 # Interrupt "signal" detected. The regular stream is interrupted
1206 1207 # and a bundle2 part follows. Consume it.
1207 1208 interrupthandler(ui, fh)()
1208 1209 else:
1209 1210 raise error.BundleValueError(
1210 1211 'negative payload chunk size: %s' % chunksize)
1211 1212
1212 1213 chunksize = _unpack(_fpayloadsize, readexactly(fh, headersize))[0]
1213 1214 indebug(ui, 'payload chunk size: %i' % chunksize)
1214 1215
1215 1216 class unbundlepart(unpackermixin):
1216 1217 """a bundle part read from a bundle"""
1217 1218
1218 1219 def __init__(self, ui, header, fp):
1219 1220 super(unbundlepart, self).__init__(fp)
1220 1221 self._seekable = (util.safehasattr(fp, 'seek') and
1221 1222 util.safehasattr(fp, 'tell'))
1222 1223 self.ui = ui
1223 1224 # unbundle state attr
1224 1225 self._headerdata = header
1225 1226 self._headeroffset = 0
1226 1227 self._initialized = False
1227 1228 self.consumed = False
1228 1229 # part data
1229 1230 self.id = None
1230 1231 self.type = None
1231 1232 self.mandatoryparams = None
1232 1233 self.advisoryparams = None
1233 1234 self.params = None
1234 1235 self.mandatorykeys = ()
1235 1236 self._readheader()
1236 1237 self._mandatory = None
1237 1238 self._pos = 0
1238 1239
1239 1240 def _fromheader(self, size):
1240 1241 """return the next <size> byte from the header"""
1241 1242 offset = self._headeroffset
1242 1243 data = self._headerdata[offset:(offset + size)]
1243 1244 self._headeroffset = offset + size
1244 1245 return data
1245 1246
1246 1247 def _unpackheader(self, format):
1247 1248 """read given format from header
1248 1249
1249 1250 This automatically compute the size of the format to read."""
1250 1251 data = self._fromheader(struct.calcsize(format))
1251 1252 return _unpack(format, data)
1252 1253
1253 1254 def _initparams(self, mandatoryparams, advisoryparams):
1254 1255 """internal function to setup all logic related parameters"""
1255 1256 # make it read only to prevent people touching it by mistake.
1256 1257 self.mandatoryparams = tuple(mandatoryparams)
1257 1258 self.advisoryparams = tuple(advisoryparams)
1258 1259 # user friendly UI
1259 1260 self.params = util.sortdict(self.mandatoryparams)
1260 1261 self.params.update(self.advisoryparams)
1261 1262 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1262 1263
1263 1264 def _readheader(self):
1264 1265 """read the header and setup the object"""
1265 1266 typesize = self._unpackheader(_fparttypesize)[0]
1266 1267 self.type = self._fromheader(typesize)
1267 1268 indebug(self.ui, 'part type: "%s"' % self.type)
1268 1269 self.id = self._unpackheader(_fpartid)[0]
1269 1270 indebug(self.ui, 'part id: "%s"' % pycompat.bytestr(self.id))
1270 1271 # extract mandatory bit from type
1271 1272 self.mandatory = (self.type != self.type.lower())
1272 1273 self.type = self.type.lower()
1273 1274 ## reading parameters
1274 1275 # param count
1275 1276 mancount, advcount = self._unpackheader(_fpartparamcount)
1276 1277 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1277 1278 # param size
1278 1279 fparamsizes = _makefpartparamsizes(mancount + advcount)
1279 1280 paramsizes = self._unpackheader(fparamsizes)
1280 1281 # make it a list of couple again
1281 1282 paramsizes = list(zip(paramsizes[::2], paramsizes[1::2]))
1282 1283 # split mandatory from advisory
1283 1284 mansizes = paramsizes[:mancount]
1284 1285 advsizes = paramsizes[mancount:]
1285 1286 # retrieve param value
1286 1287 manparams = []
1287 1288 for key, value in mansizes:
1288 1289 manparams.append((self._fromheader(key), self._fromheader(value)))
1289 1290 advparams = []
1290 1291 for key, value in advsizes:
1291 1292 advparams.append((self._fromheader(key), self._fromheader(value)))
1292 1293 self._initparams(manparams, advparams)
1293 1294 ## part payload
1294 1295 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1295 1296 # we read the data, tell it
1296 1297 self._initialized = True
1297 1298
1298 1299 def _payloadchunks(self):
1299 1300 """Generator of decoded chunks in the payload."""
1300 1301 return decodepayloadchunks(self.ui, self._fp)
1301 1302
1302 1303 def consume(self):
1303 1304 """Read the part payload until completion.
1304 1305
1305 1306 By consuming the part data, the underlying stream read offset will
1306 1307 be advanced to the next part (or end of stream).
1307 1308 """
1308 1309 if self.consumed:
1309 1310 return
1310 1311
1311 1312 chunk = self.read(32768)
1312 1313 while chunk:
1313 1314 self._pos += len(chunk)
1314 1315 chunk = self.read(32768)
1315 1316
1316 1317 def read(self, size=None):
1317 1318 """read payload data"""
1318 1319 if not self._initialized:
1319 1320 self._readheader()
1320 1321 if size is None:
1321 1322 data = self._payloadstream.read()
1322 1323 else:
1323 1324 data = self._payloadstream.read(size)
1324 1325 self._pos += len(data)
1325 1326 if size is None or len(data) < size:
1326 1327 if not self.consumed and self._pos:
1327 1328 self.ui.debug('bundle2-input-part: total payload size %i\n'
1328 1329 % self._pos)
1329 1330 self.consumed = True
1330 1331 return data
1331 1332
1332 1333 class seekableunbundlepart(unbundlepart):
1333 1334 """A bundle2 part in a bundle that is seekable.
1334 1335
1335 1336 Regular ``unbundlepart`` instances can only be read once. This class
1336 1337 extends ``unbundlepart`` to enable bi-directional seeking within the
1337 1338 part.
1338 1339
1339 1340 Bundle2 part data consists of framed chunks. Offsets when seeking
1340 1341 refer to the decoded data, not the offsets in the underlying bundle2
1341 1342 stream.
1342 1343
1343 1344 To facilitate quickly seeking within the decoded data, instances of this
1344 1345 class maintain a mapping between offsets in the underlying stream and
1345 1346 the decoded payload. This mapping will consume memory in proportion
1346 1347 to the number of chunks within the payload (which almost certainly
1347 1348 increases in proportion with the size of the part).
1348 1349 """
1349 1350 def __init__(self, ui, header, fp):
1350 1351 # (payload, file) offsets for chunk starts.
1351 1352 self._chunkindex = []
1352 1353
1353 1354 super(seekableunbundlepart, self).__init__(ui, header, fp)
1354 1355
1355 1356 def _payloadchunks(self, chunknum=0):
1356 1357 '''seek to specified chunk and start yielding data'''
1357 1358 if len(self._chunkindex) == 0:
1358 1359 assert chunknum == 0, 'Must start with chunk 0'
1359 1360 self._chunkindex.append((0, self._tellfp()))
1360 1361 else:
1361 1362 assert chunknum < len(self._chunkindex), \
1362 1363 'Unknown chunk %d' % chunknum
1363 1364 self._seekfp(self._chunkindex[chunknum][1])
1364 1365
1365 1366 pos = self._chunkindex[chunknum][0]
1366 1367
1367 1368 for chunk in decodepayloadchunks(self.ui, self._fp):
1368 1369 chunknum += 1
1369 1370 pos += len(chunk)
1370 1371 if chunknum == len(self._chunkindex):
1371 1372 self._chunkindex.append((pos, self._tellfp()))
1372 1373
1373 1374 yield chunk
1374 1375
1375 1376 def _findchunk(self, pos):
1376 1377 '''for a given payload position, return a chunk number and offset'''
1377 1378 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1378 1379 if ppos == pos:
1379 1380 return chunk, 0
1380 1381 elif ppos > pos:
1381 1382 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1382 1383 raise ValueError('Unknown chunk')
1383 1384
1384 1385 def tell(self):
1385 1386 return self._pos
1386 1387
1387 1388 def seek(self, offset, whence=os.SEEK_SET):
1388 1389 if whence == os.SEEK_SET:
1389 1390 newpos = offset
1390 1391 elif whence == os.SEEK_CUR:
1391 1392 newpos = self._pos + offset
1392 1393 elif whence == os.SEEK_END:
1393 1394 if not self.consumed:
1394 1395 self.read()
1395 1396 newpos = self._chunkindex[-1][0] - offset
1396 1397 else:
1397 1398 raise ValueError('Unknown whence value: %r' % (whence,))
1398 1399
1399 1400 if newpos > self._chunkindex[-1][0] and not self.consumed:
1400 1401 self.read()
1401 1402 if not 0 <= newpos <= self._chunkindex[-1][0]:
1402 1403 raise ValueError('Offset out of range')
1403 1404
1404 1405 if self._pos != newpos:
1405 1406 chunk, internaloffset = self._findchunk(newpos)
1406 1407 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1407 1408 adjust = self.read(internaloffset)
1408 1409 if len(adjust) != internaloffset:
1409 1410 raise error.Abort(_('Seek failed\n'))
1410 1411 self._pos = newpos
1411 1412
1412 1413 def _seekfp(self, offset, whence=0):
1413 1414 """move the underlying file pointer
1414 1415
1415 1416 This method is meant for internal usage by the bundle2 protocol only.
1416 1417 They directly manipulate the low level stream including bundle2 level
1417 1418 instruction.
1418 1419
1419 1420 Do not use it to implement higher-level logic or methods."""
1420 1421 if self._seekable:
1421 1422 return self._fp.seek(offset, whence)
1422 1423 else:
1423 1424 raise NotImplementedError(_('File pointer is not seekable'))
1424 1425
1425 1426 def _tellfp(self):
1426 1427 """return the file offset, or None if file is not seekable
1427 1428
1428 1429 This method is meant for internal usage by the bundle2 protocol only.
1429 1430 They directly manipulate the low level stream including bundle2 level
1430 1431 instruction.
1431 1432
1432 1433 Do not use it to implement higher-level logic or methods."""
1433 1434 if self._seekable:
1434 1435 try:
1435 1436 return self._fp.tell()
1436 1437 except IOError as e:
1437 1438 if e.errno == errno.ESPIPE:
1438 1439 self._seekable = False
1439 1440 else:
1440 1441 raise
1441 1442 return None
1442 1443
1443 1444 # These are only the static capabilities.
1444 1445 # Check the 'getrepocaps' function for the rest.
1445 1446 capabilities = {'HG20': (),
1446 1447 'error': ('abort', 'unsupportedcontent', 'pushraced',
1447 1448 'pushkey'),
1448 1449 'listkeys': (),
1449 1450 'pushkey': (),
1450 1451 'digests': tuple(sorted(util.DIGESTS.keys())),
1451 1452 'remote-changegroup': ('http', 'https'),
1452 1453 'hgtagsfnodes': (),
1453 1454 'phases': ('heads',),
1454 1455 }
1455 1456
1456 1457 def getrepocaps(repo, allowpushback=False):
1457 1458 """return the bundle2 capabilities for a given repo
1458 1459
1459 1460 Exists to allow extensions (like evolution) to mutate the capabilities.
1460 1461 """
1461 1462 caps = capabilities.copy()
1462 1463 caps['changegroup'] = tuple(sorted(
1463 1464 changegroup.supportedincomingversions(repo)))
1464 1465 if obsolete.isenabled(repo, obsolete.exchangeopt):
1465 1466 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1466 1467 caps['obsmarkers'] = supportedformat
1467 1468 if allowpushback:
1468 1469 caps['pushback'] = ()
1469 1470 cpmode = repo.ui.config('server', 'concurrent-push-mode')
1470 1471 if cpmode == 'check-related':
1471 1472 caps['checkheads'] = ('related',)
1472 1473 if 'phases' in repo.ui.configlist('devel', 'legacy.exchange'):
1473 1474 caps.pop('phases')
1474 1475 return caps
1475 1476
1476 1477 def bundle2caps(remote):
1477 1478 """return the bundle capabilities of a peer as dict"""
1478 1479 raw = remote.capable('bundle2')
1479 1480 if not raw and raw != '':
1480 1481 return {}
1481 1482 capsblob = urlreq.unquote(remote.capable('bundle2'))
1482 1483 return decodecaps(capsblob)
1483 1484
1484 1485 def obsmarkersversion(caps):
1485 1486 """extract the list of supported obsmarkers versions from a bundle2caps dict
1486 1487 """
1487 1488 obscaps = caps.get('obsmarkers', ())
1488 1489 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1489 1490
1490 1491 def writenewbundle(ui, repo, source, filename, bundletype, outgoing, opts,
1491 1492 vfs=None, compression=None, compopts=None):
1492 1493 if bundletype.startswith('HG10'):
1493 1494 cg = changegroup.makechangegroup(repo, outgoing, '01', source)
1494 1495 return writebundle(ui, cg, filename, bundletype, vfs=vfs,
1495 1496 compression=compression, compopts=compopts)
1496 1497 elif not bundletype.startswith('HG20'):
1497 1498 raise error.ProgrammingError('unknown bundle type: %s' % bundletype)
1498 1499
1499 1500 caps = {}
1500 1501 if 'obsolescence' in opts:
1501 1502 caps['obsmarkers'] = ('V1',)
1502 1503 bundle = bundle20(ui, caps)
1503 1504 bundle.setcompression(compression, compopts)
1504 1505 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1505 1506 chunkiter = bundle.getchunks()
1506 1507
1507 1508 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1508 1509
1509 1510 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1510 1511 # We should eventually reconcile this logic with the one behind
1511 1512 # 'exchange.getbundle2partsgenerator'.
1512 1513 #
1513 1514 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1514 1515 # different right now. So we keep them separated for now for the sake of
1515 1516 # simplicity.
1516 1517
1517 1518 # we always want a changegroup in such bundle
1518 1519 cgversion = opts.get('cg.version')
1519 1520 if cgversion is None:
1520 1521 cgversion = changegroup.safeversion(repo)
1521 1522 cg = changegroup.makechangegroup(repo, outgoing, cgversion, source)
1522 1523 part = bundler.newpart('changegroup', data=cg.getchunks())
1523 1524 part.addparam('version', cg.version)
1524 1525 if 'clcount' in cg.extras:
1525 1526 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1526 1527 mandatory=False)
1527 1528 if opts.get('phases') and repo.revs('%ln and secret()',
1528 1529 outgoing.missingheads):
1529 1530 part.addparam('targetphase', '%d' % phases.secret, mandatory=False)
1530 1531
1531 1532 addparttagsfnodescache(repo, bundler, outgoing)
1532 1533
1533 1534 if opts.get('obsolescence', False):
1534 1535 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1535 1536 buildobsmarkerspart(bundler, obsmarkers)
1536 1537
1537 1538 if opts.get('phases', False):
1538 1539 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1539 1540 phasedata = phases.binaryencode(headsbyphase)
1540 1541 bundler.newpart('phase-heads', data=phasedata)
1541 1542
1542 1543 def addparttagsfnodescache(repo, bundler, outgoing):
1543 1544 # we include the tags fnode cache for the bundle changeset
1544 1545 # (as an optional parts)
1545 1546 cache = tags.hgtagsfnodescache(repo.unfiltered())
1546 1547 chunks = []
1547 1548
1548 1549 # .hgtags fnodes are only relevant for head changesets. While we could
1549 1550 # transfer values for all known nodes, there will likely be little to
1550 1551 # no benefit.
1551 1552 #
1552 1553 # We don't bother using a generator to produce output data because
1553 1554 # a) we only have 40 bytes per head and even esoteric numbers of heads
1554 1555 # consume little memory (1M heads is 40MB) b) we don't want to send the
1555 1556 # part if we don't have entries and knowing if we have entries requires
1556 1557 # cache lookups.
1557 1558 for node in outgoing.missingheads:
1558 1559 # Don't compute missing, as this may slow down serving.
1559 1560 fnode = cache.getfnode(node, computemissing=False)
1560 1561 if fnode is not None:
1561 1562 chunks.extend([node, fnode])
1562 1563
1563 1564 if chunks:
1564 1565 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1565 1566
1566 1567 def buildobsmarkerspart(bundler, markers):
1567 1568 """add an obsmarker part to the bundler with <markers>
1568 1569
1569 1570 No part is created if markers is empty.
1570 1571 Raises ValueError if the bundler doesn't support any known obsmarker format.
1571 1572 """
1572 1573 if not markers:
1573 1574 return None
1574 1575
1575 1576 remoteversions = obsmarkersversion(bundler.capabilities)
1576 1577 version = obsolete.commonversion(remoteversions)
1577 1578 if version is None:
1578 1579 raise ValueError('bundler does not support common obsmarker format')
1579 1580 stream = obsolete.encodemarkers(markers, True, version=version)
1580 1581 return bundler.newpart('obsmarkers', data=stream)
1581 1582
1582 1583 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1583 1584 compopts=None):
1584 1585 """Write a bundle file and return its filename.
1585 1586
1586 1587 Existing files will not be overwritten.
1587 1588 If no filename is specified, a temporary file is created.
1588 1589 bz2 compression can be turned off.
1589 1590 The bundle file will be deleted in case of errors.
1590 1591 """
1591 1592
1592 1593 if bundletype == "HG20":
1593 1594 bundle = bundle20(ui)
1594 1595 bundle.setcompression(compression, compopts)
1595 1596 part = bundle.newpart('changegroup', data=cg.getchunks())
1596 1597 part.addparam('version', cg.version)
1597 1598 if 'clcount' in cg.extras:
1598 1599 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1599 1600 mandatory=False)
1600 1601 chunkiter = bundle.getchunks()
1601 1602 else:
1602 1603 # compression argument is only for the bundle2 case
1603 1604 assert compression is None
1604 1605 if cg.version != '01':
1605 1606 raise error.Abort(_('old bundle types only supports v1 '
1606 1607 'changegroups'))
1607 1608 header, comp = bundletypes[bundletype]
1608 1609 if comp not in util.compengines.supportedbundletypes:
1609 1610 raise error.Abort(_('unknown stream compression type: %s')
1610 1611 % comp)
1611 1612 compengine = util.compengines.forbundletype(comp)
1612 1613 def chunkiter():
1613 1614 yield header
1614 1615 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1615 1616 yield chunk
1616 1617 chunkiter = chunkiter()
1617 1618
1618 1619 # parse the changegroup data, otherwise we will block
1619 1620 # in case of sshrepo because we don't know the end of the stream
1620 1621 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1621 1622
1622 1623 def combinechangegroupresults(op):
1623 1624 """logic to combine 0 or more addchangegroup results into one"""
1624 1625 results = [r.get('return', 0)
1625 1626 for r in op.records['changegroup']]
1626 1627 changedheads = 0
1627 1628 result = 1
1628 1629 for ret in results:
1629 1630 # If any changegroup result is 0, return 0
1630 1631 if ret == 0:
1631 1632 result = 0
1632 1633 break
1633 1634 if ret < -1:
1634 1635 changedheads += ret + 1
1635 1636 elif ret > 1:
1636 1637 changedheads += ret - 1
1637 1638 if changedheads > 0:
1638 1639 result = 1 + changedheads
1639 1640 elif changedheads < 0:
1640 1641 result = -1 + changedheads
1641 1642 return result
1642 1643
1643 1644 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest',
1644 1645 'targetphase'))
1645 1646 def handlechangegroup(op, inpart):
1646 1647 """apply a changegroup part on the repo
1647 1648
1648 1649 This is a very early implementation that will massive rework before being
1649 1650 inflicted to any end-user.
1650 1651 """
1651 1652 tr = op.gettransaction()
1652 1653 unpackerversion = inpart.params.get('version', '01')
1653 1654 # We should raise an appropriate exception here
1654 1655 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1655 1656 # the source and url passed here are overwritten by the one contained in
1656 1657 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1657 1658 nbchangesets = None
1658 1659 if 'nbchanges' in inpart.params:
1659 1660 nbchangesets = int(inpart.params.get('nbchanges'))
1660 1661 if ('treemanifest' in inpart.params and
1661 1662 'treemanifest' not in op.repo.requirements):
1662 1663 if len(op.repo.changelog) != 0:
1663 1664 raise error.Abort(_(
1664 1665 "bundle contains tree manifests, but local repo is "
1665 1666 "non-empty and does not use tree manifests"))
1666 1667 op.repo.requirements.add('treemanifest')
1667 1668 op.repo._applyopenerreqs()
1668 1669 op.repo._writerequirements()
1669 1670 extrakwargs = {}
1670 1671 targetphase = inpart.params.get('targetphase')
1671 1672 if targetphase is not None:
1672 1673 extrakwargs['targetphase'] = int(targetphase)
1673 1674 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2',
1674 1675 expectedtotal=nbchangesets, **extrakwargs)
1675 1676 if op.reply is not None:
1676 1677 # This is definitely not the final form of this
1677 1678 # return. But one need to start somewhere.
1678 1679 part = op.reply.newpart('reply:changegroup', mandatory=False)
1679 1680 part.addparam(
1680 1681 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1681 1682 part.addparam('return', '%i' % ret, mandatory=False)
1682 1683 assert not inpart.read()
1683 1684
1684 1685 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1685 1686 ['digest:%s' % k for k in util.DIGESTS.keys()])
1686 1687 @parthandler('remote-changegroup', _remotechangegroupparams)
1687 1688 def handleremotechangegroup(op, inpart):
1688 1689 """apply a bundle10 on the repo, given an url and validation information
1689 1690
1690 1691 All the information about the remote bundle to import are given as
1691 1692 parameters. The parameters include:
1692 1693 - url: the url to the bundle10.
1693 1694 - size: the bundle10 file size. It is used to validate what was
1694 1695 retrieved by the client matches the server knowledge about the bundle.
1695 1696 - digests: a space separated list of the digest types provided as
1696 1697 parameters.
1697 1698 - digest:<digest-type>: the hexadecimal representation of the digest with
1698 1699 that name. Like the size, it is used to validate what was retrieved by
1699 1700 the client matches what the server knows about the bundle.
1700 1701
1701 1702 When multiple digest types are given, all of them are checked.
1702 1703 """
1703 1704 try:
1704 1705 raw_url = inpart.params['url']
1705 1706 except KeyError:
1706 1707 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1707 1708 parsed_url = util.url(raw_url)
1708 1709 if parsed_url.scheme not in capabilities['remote-changegroup']:
1709 1710 raise error.Abort(_('remote-changegroup does not support %s urls') %
1710 1711 parsed_url.scheme)
1711 1712
1712 1713 try:
1713 1714 size = int(inpart.params['size'])
1714 1715 except ValueError:
1715 1716 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1716 1717 % 'size')
1717 1718 except KeyError:
1718 1719 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1719 1720
1720 1721 digests = {}
1721 1722 for typ in inpart.params.get('digests', '').split():
1722 1723 param = 'digest:%s' % typ
1723 1724 try:
1724 1725 value = inpart.params[param]
1725 1726 except KeyError:
1726 1727 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1727 1728 param)
1728 1729 digests[typ] = value
1729 1730
1730 1731 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1731 1732
1732 1733 tr = op.gettransaction()
1733 1734 from . import exchange
1734 1735 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1735 1736 if not isinstance(cg, changegroup.cg1unpacker):
1736 1737 raise error.Abort(_('%s: not a bundle version 1.0') %
1737 1738 util.hidepassword(raw_url))
1738 1739 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2')
1739 1740 if op.reply is not None:
1740 1741 # This is definitely not the final form of this
1741 1742 # return. But one need to start somewhere.
1742 1743 part = op.reply.newpart('reply:changegroup')
1743 1744 part.addparam(
1744 1745 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1745 1746 part.addparam('return', '%i' % ret, mandatory=False)
1746 1747 try:
1747 1748 real_part.validate()
1748 1749 except error.Abort as e:
1749 1750 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1750 1751 (util.hidepassword(raw_url), str(e)))
1751 1752 assert not inpart.read()
1752 1753
1753 1754 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1754 1755 def handlereplychangegroup(op, inpart):
1755 1756 ret = int(inpart.params['return'])
1756 1757 replyto = int(inpart.params['in-reply-to'])
1757 1758 op.records.add('changegroup', {'return': ret}, replyto)
1758 1759
1759 1760 @parthandler('check:heads')
1760 1761 def handlecheckheads(op, inpart):
1761 1762 """check that head of the repo did not change
1762 1763
1763 1764 This is used to detect a push race when using unbundle.
1764 1765 This replaces the "heads" argument of unbundle."""
1765 1766 h = inpart.read(20)
1766 1767 heads = []
1767 1768 while len(h) == 20:
1768 1769 heads.append(h)
1769 1770 h = inpart.read(20)
1770 1771 assert not h
1771 1772 # Trigger a transaction so that we are guaranteed to have the lock now.
1772 1773 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1773 1774 op.gettransaction()
1774 1775 if sorted(heads) != sorted(op.repo.heads()):
1775 1776 raise error.PushRaced('repository changed while pushing - '
1776 1777 'please try again')
1777 1778
1778 1779 @parthandler('check:updated-heads')
1779 1780 def handlecheckupdatedheads(op, inpart):
1780 1781 """check for race on the heads touched by a push
1781 1782
1782 1783 This is similar to 'check:heads' but focus on the heads actually updated
1783 1784 during the push. If other activities happen on unrelated heads, it is
1784 1785 ignored.
1785 1786
1786 1787 This allow server with high traffic to avoid push contention as long as
1787 1788 unrelated parts of the graph are involved."""
1788 1789 h = inpart.read(20)
1789 1790 heads = []
1790 1791 while len(h) == 20:
1791 1792 heads.append(h)
1792 1793 h = inpart.read(20)
1793 1794 assert not h
1794 1795 # trigger a transaction so that we are guaranteed to have the lock now.
1795 1796 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1796 1797 op.gettransaction()
1797 1798
1798 1799 currentheads = set()
1799 1800 for ls in op.repo.branchmap().itervalues():
1800 1801 currentheads.update(ls)
1801 1802
1802 1803 for h in heads:
1803 1804 if h not in currentheads:
1804 1805 raise error.PushRaced('repository changed while pushing - '
1805 1806 'please try again')
1806 1807
1807 1808 @parthandler('check:phases')
1808 1809 def handlecheckphases(op, inpart):
1809 1810 """check that phase boundaries of the repository did not change
1810 1811
1811 1812 This is used to detect a push race.
1812 1813 """
1813 1814 phasetonodes = phases.binarydecode(inpart)
1814 1815 unfi = op.repo.unfiltered()
1815 1816 cl = unfi.changelog
1816 1817 phasecache = unfi._phasecache
1817 1818 msg = ('repository changed while pushing - please try again '
1818 1819 '(%s is %s expected %s)')
1819 1820 for expectedphase, nodes in enumerate(phasetonodes):
1820 1821 for n in nodes:
1821 1822 actualphase = phasecache.phase(unfi, cl.rev(n))
1822 1823 if actualphase != expectedphase:
1823 1824 finalmsg = msg % (nodemod.short(n),
1824 1825 phases.phasenames[actualphase],
1825 1826 phases.phasenames[expectedphase])
1826 1827 raise error.PushRaced(finalmsg)
1827 1828
1828 1829 @parthandler('output')
1829 1830 def handleoutput(op, inpart):
1830 1831 """forward output captured on the server to the client"""
1831 1832 for line in inpart.read().splitlines():
1832 1833 op.ui.status(_('remote: %s\n') % line)
1833 1834
1834 1835 @parthandler('replycaps')
1835 1836 def handlereplycaps(op, inpart):
1836 1837 """Notify that a reply bundle should be created
1837 1838
1838 1839 The payload contains the capabilities information for the reply"""
1839 1840 caps = decodecaps(inpart.read())
1840 1841 if op.reply is None:
1841 1842 op.reply = bundle20(op.ui, caps)
1842 1843
1843 1844 class AbortFromPart(error.Abort):
1844 1845 """Sub-class of Abort that denotes an error from a bundle2 part."""
1845 1846
1846 1847 @parthandler('error:abort', ('message', 'hint'))
1847 1848 def handleerrorabort(op, inpart):
1848 1849 """Used to transmit abort error over the wire"""
1849 1850 raise AbortFromPart(inpart.params['message'],
1850 1851 hint=inpart.params.get('hint'))
1851 1852
1852 1853 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1853 1854 'in-reply-to'))
1854 1855 def handleerrorpushkey(op, inpart):
1855 1856 """Used to transmit failure of a mandatory pushkey over the wire"""
1856 1857 kwargs = {}
1857 1858 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1858 1859 value = inpart.params.get(name)
1859 1860 if value is not None:
1860 1861 kwargs[name] = value
1861 1862 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1862 1863
1863 1864 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1864 1865 def handleerrorunsupportedcontent(op, inpart):
1865 1866 """Used to transmit unknown content error over the wire"""
1866 1867 kwargs = {}
1867 1868 parttype = inpart.params.get('parttype')
1868 1869 if parttype is not None:
1869 1870 kwargs['parttype'] = parttype
1870 1871 params = inpart.params.get('params')
1871 1872 if params is not None:
1872 1873 kwargs['params'] = params.split('\0')
1873 1874
1874 1875 raise error.BundleUnknownFeatureError(**kwargs)
1875 1876
1876 1877 @parthandler('error:pushraced', ('message',))
1877 1878 def handleerrorpushraced(op, inpart):
1878 1879 """Used to transmit push race error over the wire"""
1879 1880 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1880 1881
1881 1882 @parthandler('listkeys', ('namespace',))
1882 1883 def handlelistkeys(op, inpart):
1883 1884 """retrieve pushkey namespace content stored in a bundle2"""
1884 1885 namespace = inpart.params['namespace']
1885 1886 r = pushkey.decodekeys(inpart.read())
1886 1887 op.records.add('listkeys', (namespace, r))
1887 1888
1888 1889 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1889 1890 def handlepushkey(op, inpart):
1890 1891 """process a pushkey request"""
1891 1892 dec = pushkey.decode
1892 1893 namespace = dec(inpart.params['namespace'])
1893 1894 key = dec(inpart.params['key'])
1894 1895 old = dec(inpart.params['old'])
1895 1896 new = dec(inpart.params['new'])
1896 1897 # Grab the transaction to ensure that we have the lock before performing the
1897 1898 # pushkey.
1898 1899 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1899 1900 op.gettransaction()
1900 1901 ret = op.repo.pushkey(namespace, key, old, new)
1901 1902 record = {'namespace': namespace,
1902 1903 'key': key,
1903 1904 'old': old,
1904 1905 'new': new}
1905 1906 op.records.add('pushkey', record)
1906 1907 if op.reply is not None:
1907 1908 rpart = op.reply.newpart('reply:pushkey')
1908 1909 rpart.addparam(
1909 1910 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1910 1911 rpart.addparam('return', '%i' % ret, mandatory=False)
1911 1912 if inpart.mandatory and not ret:
1912 1913 kwargs = {}
1913 1914 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1914 1915 if key in inpart.params:
1915 1916 kwargs[key] = inpart.params[key]
1916 1917 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1917 1918
1918 1919 @parthandler('phase-heads')
1919 1920 def handlephases(op, inpart):
1920 1921 """apply phases from bundle part to repo"""
1921 1922 headsbyphase = phases.binarydecode(inpart)
1922 1923 phases.updatephases(op.repo.unfiltered(), op.gettransaction, headsbyphase)
1923 1924
1924 1925 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1925 1926 def handlepushkeyreply(op, inpart):
1926 1927 """retrieve the result of a pushkey request"""
1927 1928 ret = int(inpart.params['return'])
1928 1929 partid = int(inpart.params['in-reply-to'])
1929 1930 op.records.add('pushkey', {'return': ret}, partid)
1930 1931
1931 1932 @parthandler('obsmarkers')
1932 1933 def handleobsmarker(op, inpart):
1933 1934 """add a stream of obsmarkers to the repo"""
1934 1935 tr = op.gettransaction()
1935 1936 markerdata = inpart.read()
1936 1937 if op.ui.config('experimental', 'obsmarkers-exchange-debug'):
1937 1938 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1938 1939 % len(markerdata))
1939 1940 # The mergemarkers call will crash if marker creation is not enabled.
1940 1941 # we want to avoid this if the part is advisory.
1941 1942 if not inpart.mandatory and op.repo.obsstore.readonly:
1942 1943 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1943 1944 return
1944 1945 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1945 1946 op.repo.invalidatevolatilesets()
1946 1947 if new:
1947 1948 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1948 1949 op.records.add('obsmarkers', {'new': new})
1949 1950 if op.reply is not None:
1950 1951 rpart = op.reply.newpart('reply:obsmarkers')
1951 1952 rpart.addparam(
1952 1953 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1953 1954 rpart.addparam('new', '%i' % new, mandatory=False)
1954 1955
1955 1956
1956 1957 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1957 1958 def handleobsmarkerreply(op, inpart):
1958 1959 """retrieve the result of a pushkey request"""
1959 1960 ret = int(inpart.params['new'])
1960 1961 partid = int(inpart.params['in-reply-to'])
1961 1962 op.records.add('obsmarkers', {'new': ret}, partid)
1962 1963
1963 1964 @parthandler('hgtagsfnodes')
1964 1965 def handlehgtagsfnodes(op, inpart):
1965 1966 """Applies .hgtags fnodes cache entries to the local repo.
1966 1967
1967 1968 Payload is pairs of 20 byte changeset nodes and filenodes.
1968 1969 """
1969 1970 # Grab the transaction so we ensure that we have the lock at this point.
1970 1971 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1971 1972 op.gettransaction()
1972 1973 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1973 1974
1974 1975 count = 0
1975 1976 while True:
1976 1977 node = inpart.read(20)
1977 1978 fnode = inpart.read(20)
1978 1979 if len(node) < 20 or len(fnode) < 20:
1979 1980 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1980 1981 break
1981 1982 cache.setfnode(node, fnode)
1982 1983 count += 1
1983 1984
1984 1985 cache.write()
1985 1986 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
1986 1987
1987 1988 @parthandler('pushvars')
1988 1989 def bundle2getvars(op, part):
1989 1990 '''unbundle a bundle2 containing shellvars on the server'''
1990 1991 # An option to disable unbundling on server-side for security reasons
1991 1992 if op.ui.configbool('push', 'pushvars.server'):
1992 1993 hookargs = {}
1993 1994 for key, value in part.advisoryparams:
1994 1995 key = key.upper()
1995 1996 # We want pushed variables to have USERVAR_ prepended so we know
1996 1997 # they came from the --pushvar flag.
1997 1998 key = "USERVAR_" + key
1998 1999 hookargs[key] = value
1999 2000 op.addhookargs(hookargs)
@@ -1,598 +1,598
1 1 # bundlerepo.py - repository class for viewing uncompressed bundles
2 2 #
3 3 # Copyright 2006, 2007 Benoit Boissinot <bboissin@gmail.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 """Repository class for viewing uncompressed bundles.
9 9
10 10 This provides a read-only repository interface to bundles as if they
11 11 were part of the actual repository.
12 12 """
13 13
14 14 from __future__ import absolute_import
15 15
16 16 import os
17 17 import shutil
18 18 import tempfile
19 19
20 20 from .i18n import _
21 21 from .node import nullid
22 22
23 23 from . import (
24 24 bundle2,
25 25 changegroup,
26 26 changelog,
27 27 cmdutil,
28 28 discovery,
29 29 error,
30 30 exchange,
31 31 filelog,
32 32 localrepo,
33 33 manifest,
34 34 mdiff,
35 35 node as nodemod,
36 36 pathutil,
37 37 phases,
38 38 pycompat,
39 39 revlog,
40 40 util,
41 41 vfs as vfsmod,
42 42 )
43 43
44 44 class bundlerevlog(revlog.revlog):
45 45 def __init__(self, opener, indexfile, cgunpacker, linkmapper):
46 46 # How it works:
47 47 # To retrieve a revision, we need to know the offset of the revision in
48 48 # the bundle (an unbundle object). We store this offset in the index
49 49 # (start). The base of the delta is stored in the base field.
50 50 #
51 51 # To differentiate a rev in the bundle from a rev in the revlog, we
52 52 # check revision against repotiprev.
53 53 opener = vfsmod.readonlyvfs(opener)
54 54 revlog.revlog.__init__(self, opener, indexfile)
55 55 self.bundle = cgunpacker
56 56 n = len(self)
57 57 self.repotiprev = n - 1
58 58 self.bundlerevs = set() # used by 'bundle()' revset expression
59 59 for deltadata in cgunpacker.deltaiter():
60 60 node, p1, p2, cs, deltabase, delta, flags = deltadata
61 61
62 62 size = len(delta)
63 63 start = cgunpacker.tell() - size
64 64
65 65 link = linkmapper(cs)
66 66 if node in self.nodemap:
67 67 # this can happen if two branches make the same change
68 68 self.bundlerevs.add(self.nodemap[node])
69 69 continue
70 70
71 71 for p in (p1, p2):
72 72 if p not in self.nodemap:
73 73 raise error.LookupError(p, self.indexfile,
74 74 _("unknown parent"))
75 75
76 76 if deltabase not in self.nodemap:
77 77 raise LookupError(deltabase, self.indexfile,
78 78 _('unknown delta base'))
79 79
80 80 baserev = self.rev(deltabase)
81 81 # start, size, full unc. size, base (unused), link, p1, p2, node
82 82 e = (revlog.offset_type(start, flags), size, -1, baserev, link,
83 83 self.rev(p1), self.rev(p2), node)
84 84 self.index.insert(-1, e)
85 85 self.nodemap[node] = n
86 86 self.bundlerevs.add(n)
87 87 n += 1
88 88
89 89 def _chunk(self, rev, df=None):
90 90 # Warning: in case of bundle, the diff is against what we stored as
91 91 # delta base, not against rev - 1
92 92 # XXX: could use some caching
93 93 if rev <= self.repotiprev:
94 94 return revlog.revlog._chunk(self, rev)
95 95 self.bundle.seek(self.start(rev))
96 96 return self.bundle.read(self.length(rev))
97 97
98 98 def revdiff(self, rev1, rev2):
99 99 """return or calculate a delta between two revisions"""
100 100 if rev1 > self.repotiprev and rev2 > self.repotiprev:
101 101 # hot path for bundle
102 102 revb = self.index[rev2][3]
103 103 if revb == rev1:
104 104 return self._chunk(rev2)
105 105 elif rev1 <= self.repotiprev and rev2 <= self.repotiprev:
106 106 return revlog.revlog.revdiff(self, rev1, rev2)
107 107
108 108 return mdiff.textdiff(self.revision(rev1, raw=True),
109 109 self.revision(rev2, raw=True))
110 110
111 111 def revision(self, nodeorrev, _df=None, raw=False):
112 112 """return an uncompressed revision of a given node or revision
113 113 number.
114 114 """
115 115 if isinstance(nodeorrev, int):
116 116 rev = nodeorrev
117 117 node = self.node(rev)
118 118 else:
119 119 node = nodeorrev
120 120 rev = self.rev(node)
121 121
122 122 if node == nullid:
123 123 return ""
124 124
125 125 rawtext = None
126 126 chain = []
127 127 iterrev = rev
128 128 # reconstruct the revision if it is from a changegroup
129 129 while iterrev > self.repotiprev:
130 130 if self._cache and self._cache[1] == iterrev:
131 131 rawtext = self._cache[2]
132 132 break
133 133 chain.append(iterrev)
134 134 iterrev = self.index[iterrev][3]
135 135 if rawtext is None:
136 136 rawtext = self.baserevision(iterrev)
137 137
138 138 while chain:
139 139 delta = self._chunk(chain.pop())
140 140 rawtext = mdiff.patches(rawtext, [delta])
141 141
142 142 text, validatehash = self._processflags(rawtext, self.flags(rev),
143 143 'read', raw=raw)
144 144 if validatehash:
145 145 self.checkhash(text, node, rev=rev)
146 146 self._cache = (node, rev, rawtext)
147 147 return text
148 148
149 149 def baserevision(self, nodeorrev):
150 150 # Revlog subclasses may override 'revision' method to modify format of
151 151 # content retrieved from revlog. To use bundlerevlog with such class one
152 152 # needs to override 'baserevision' and make more specific call here.
153 153 return revlog.revlog.revision(self, nodeorrev, raw=True)
154 154
155 155 def addrevision(self, *args, **kwargs):
156 156 raise NotImplementedError
157 157
158 158 def addgroup(self, *args, **kwargs):
159 159 raise NotImplementedError
160 160
161 161 def strip(self, *args, **kwargs):
162 162 raise NotImplementedError
163 163
164 164 def checksize(self):
165 165 raise NotImplementedError
166 166
167 167 class bundlechangelog(bundlerevlog, changelog.changelog):
168 168 def __init__(self, opener, cgunpacker):
169 169 changelog.changelog.__init__(self, opener)
170 170 linkmapper = lambda x: x
171 171 bundlerevlog.__init__(self, opener, self.indexfile, cgunpacker,
172 172 linkmapper)
173 173
174 174 def baserevision(self, nodeorrev):
175 175 # Although changelog doesn't override 'revision' method, some extensions
176 176 # may replace this class with another that does. Same story with
177 177 # manifest and filelog classes.
178 178
179 179 # This bypasses filtering on changelog.node() and rev() because we need
180 180 # revision text of the bundle base even if it is hidden.
181 181 oldfilter = self.filteredrevs
182 182 try:
183 183 self.filteredrevs = ()
184 184 return changelog.changelog.revision(self, nodeorrev, raw=True)
185 185 finally:
186 186 self.filteredrevs = oldfilter
187 187
188 188 class bundlemanifest(bundlerevlog, manifest.manifestrevlog):
189 189 def __init__(self, opener, cgunpacker, linkmapper, dirlogstarts=None,
190 190 dir=''):
191 191 manifest.manifestrevlog.__init__(self, opener, dir=dir)
192 192 bundlerevlog.__init__(self, opener, self.indexfile, cgunpacker,
193 193 linkmapper)
194 194 if dirlogstarts is None:
195 195 dirlogstarts = {}
196 196 if self.bundle.version == "03":
197 197 dirlogstarts = _getfilestarts(self.bundle)
198 198 self._dirlogstarts = dirlogstarts
199 199 self._linkmapper = linkmapper
200 200
201 201 def baserevision(self, nodeorrev):
202 202 node = nodeorrev
203 203 if isinstance(node, int):
204 204 node = self.node(node)
205 205
206 206 if node in self.fulltextcache:
207 207 result = '%s' % self.fulltextcache[node]
208 208 else:
209 209 result = manifest.manifestrevlog.revision(self, nodeorrev, raw=True)
210 210 return result
211 211
212 212 def dirlog(self, d):
213 213 if d in self._dirlogstarts:
214 214 self.bundle.seek(self._dirlogstarts[d])
215 215 return bundlemanifest(
216 216 self.opener, self.bundle, self._linkmapper,
217 217 self._dirlogstarts, dir=d)
218 218 return super(bundlemanifest, self).dirlog(d)
219 219
220 220 class bundlefilelog(bundlerevlog, filelog.filelog):
221 221 def __init__(self, opener, path, cgunpacker, linkmapper):
222 222 filelog.filelog.__init__(self, opener, path)
223 223 bundlerevlog.__init__(self, opener, self.indexfile, cgunpacker,
224 224 linkmapper)
225 225
226 226 def baserevision(self, nodeorrev):
227 227 return filelog.filelog.revision(self, nodeorrev, raw=True)
228 228
229 229 class bundlepeer(localrepo.localpeer):
230 230 def canpush(self):
231 231 return False
232 232
233 233 class bundlephasecache(phases.phasecache):
234 234 def __init__(self, *args, **kwargs):
235 235 super(bundlephasecache, self).__init__(*args, **kwargs)
236 236 if util.safehasattr(self, 'opener'):
237 237 self.opener = vfsmod.readonlyvfs(self.opener)
238 238
239 239 def write(self):
240 240 raise NotImplementedError
241 241
242 242 def _write(self, fp):
243 243 raise NotImplementedError
244 244
245 245 def _updateroots(self, phase, newroots, tr):
246 246 self.phaseroots[phase] = newroots
247 247 self.invalidate()
248 248 self.dirty = True
249 249
250 250 def _getfilestarts(cgunpacker):
251 251 filespos = {}
252 252 for chunkdata in iter(cgunpacker.filelogheader, {}):
253 253 fname = chunkdata['filename']
254 254 filespos[fname] = cgunpacker.tell()
255 255 for chunk in iter(lambda: cgunpacker.deltachunk(None), {}):
256 256 pass
257 257 return filespos
258 258
259 259 class bundlerepository(localrepo.localrepository):
260 260 """A repository instance that is a union of a local repo and a bundle.
261 261
262 262 Instances represent a read-only repository composed of a local repository
263 263 with the contents of a bundle file applied. The repository instance is
264 264 conceptually similar to the state of a repository after an
265 265 ``hg unbundle`` operation. However, the contents of the bundle are never
266 266 applied to the actual base repository.
267 267 """
268 268 def __init__(self, ui, repopath, bundlepath):
269 269 self._tempparent = None
270 270 try:
271 271 localrepo.localrepository.__init__(self, ui, repopath)
272 272 except error.RepoError:
273 273 self._tempparent = tempfile.mkdtemp()
274 274 localrepo.instance(ui, self._tempparent, 1)
275 275 localrepo.localrepository.__init__(self, ui, self._tempparent)
276 276 self.ui.setconfig('phases', 'publish', False, 'bundlerepo')
277 277
278 278 if repopath:
279 279 self._url = 'bundle:' + util.expandpath(repopath) + '+' + bundlepath
280 280 else:
281 281 self._url = 'bundle:' + bundlepath
282 282
283 283 self.tempfile = None
284 284 f = util.posixfile(bundlepath, "rb")
285 285 bundle = exchange.readbundle(ui, f, bundlepath)
286 286
287 287 if isinstance(bundle, bundle2.unbundle20):
288 288 self._bundlefile = bundle
289 289 self._cgunpacker = None
290 290
291 291 cgpart = None
292 for part in bundle.iterparts():
292 for part in bundle.iterparts(seekable=True):
293 293 if part.type == 'changegroup':
294 294 if cgpart:
295 295 raise NotImplementedError("can't process "
296 296 "multiple changegroups")
297 297 cgpart = part
298 298
299 299 self._handlebundle2part(bundle, part)
300 300
301 301 if not cgpart:
302 302 raise error.Abort(_("No changegroups found"))
303 303
304 304 # This is required to placate a later consumer, which expects
305 305 # the payload offset to be at the beginning of the changegroup.
306 306 # We need to do this after the iterparts() generator advances
307 307 # because iterparts() will seek to end of payload after the
308 308 # generator returns control to iterparts().
309 309 cgpart.seek(0, os.SEEK_SET)
310 310
311 311 elif isinstance(bundle, changegroup.cg1unpacker):
312 312 if bundle.compressed():
313 313 f = self._writetempbundle(bundle.read, '.hg10un',
314 314 header='HG10UN')
315 315 bundle = exchange.readbundle(ui, f, bundlepath, self.vfs)
316 316
317 317 self._bundlefile = bundle
318 318 self._cgunpacker = bundle
319 319 else:
320 320 raise error.Abort(_('bundle type %s cannot be read') %
321 321 type(bundle))
322 322
323 323 # dict with the mapping 'filename' -> position in the changegroup.
324 324 self._cgfilespos = {}
325 325
326 326 self.firstnewrev = self.changelog.repotiprev + 1
327 327 phases.retractboundary(self, None, phases.draft,
328 328 [ctx.node() for ctx in self[self.firstnewrev:]])
329 329
330 330 def _handlebundle2part(self, bundle, part):
331 331 if part.type != 'changegroup':
332 332 return
333 333
334 334 cgstream = part
335 335 version = part.params.get('version', '01')
336 336 legalcgvers = changegroup.supportedincomingversions(self)
337 337 if version not in legalcgvers:
338 338 msg = _('Unsupported changegroup version: %s')
339 339 raise error.Abort(msg % version)
340 340 if bundle.compressed():
341 341 cgstream = self._writetempbundle(part.read, '.cg%sun' % version)
342 342
343 343 self._cgunpacker = changegroup.getunbundler(version, cgstream, 'UN')
344 344
345 345 def _writetempbundle(self, readfn, suffix, header=''):
346 346 """Write a temporary file to disk
347 347 """
348 348 fdtemp, temp = self.vfs.mkstemp(prefix="hg-bundle-",
349 349 suffix=suffix)
350 350 self.tempfile = temp
351 351
352 352 with os.fdopen(fdtemp, pycompat.sysstr('wb')) as fptemp:
353 353 fptemp.write(header)
354 354 while True:
355 355 chunk = readfn(2**18)
356 356 if not chunk:
357 357 break
358 358 fptemp.write(chunk)
359 359
360 360 return self.vfs.open(self.tempfile, mode="rb")
361 361
362 362 @localrepo.unfilteredpropertycache
363 363 def _phasecache(self):
364 364 return bundlephasecache(self, self._phasedefaults)
365 365
366 366 @localrepo.unfilteredpropertycache
367 367 def changelog(self):
368 368 # consume the header if it exists
369 369 self._cgunpacker.changelogheader()
370 370 c = bundlechangelog(self.svfs, self._cgunpacker)
371 371 self.manstart = self._cgunpacker.tell()
372 372 return c
373 373
374 374 def _constructmanifest(self):
375 375 self._cgunpacker.seek(self.manstart)
376 376 # consume the header if it exists
377 377 self._cgunpacker.manifestheader()
378 378 linkmapper = self.unfiltered().changelog.rev
379 379 m = bundlemanifest(self.svfs, self._cgunpacker, linkmapper)
380 380 self.filestart = self._cgunpacker.tell()
381 381 return m
382 382
383 383 def _consumemanifest(self):
384 384 """Consumes the manifest portion of the bundle, setting filestart so the
385 385 file portion can be read."""
386 386 self._cgunpacker.seek(self.manstart)
387 387 self._cgunpacker.manifestheader()
388 388 for delta in self._cgunpacker.deltaiter():
389 389 pass
390 390 self.filestart = self._cgunpacker.tell()
391 391
392 392 @localrepo.unfilteredpropertycache
393 393 def manstart(self):
394 394 self.changelog
395 395 return self.manstart
396 396
397 397 @localrepo.unfilteredpropertycache
398 398 def filestart(self):
399 399 self.manifestlog
400 400
401 401 # If filestart was not set by self.manifestlog, that means the
402 402 # manifestlog implementation did not consume the manifests from the
403 403 # changegroup (ex: it might be consuming trees from a separate bundle2
404 404 # part instead). So we need to manually consume it.
405 405 if 'filestart' not in self.__dict__:
406 406 self._consumemanifest()
407 407
408 408 return self.filestart
409 409
410 410 def url(self):
411 411 return self._url
412 412
413 413 def file(self, f):
414 414 if not self._cgfilespos:
415 415 self._cgunpacker.seek(self.filestart)
416 416 self._cgfilespos = _getfilestarts(self._cgunpacker)
417 417
418 418 if f in self._cgfilespos:
419 419 self._cgunpacker.seek(self._cgfilespos[f])
420 420 linkmapper = self.unfiltered().changelog.rev
421 421 return bundlefilelog(self.svfs, f, self._cgunpacker, linkmapper)
422 422 else:
423 423 return filelog.filelog(self.svfs, f)
424 424
425 425 def close(self):
426 426 """Close assigned bundle file immediately."""
427 427 self._bundlefile.close()
428 428 if self.tempfile is not None:
429 429 self.vfs.unlink(self.tempfile)
430 430 if self._tempparent:
431 431 shutil.rmtree(self._tempparent, True)
432 432
433 433 def cancopy(self):
434 434 return False
435 435
436 436 def peer(self):
437 437 return bundlepeer(self)
438 438
439 439 def getcwd(self):
440 440 return pycompat.getcwd() # always outside the repo
441 441
442 442 # Check if parents exist in localrepo before setting
443 443 def setparents(self, p1, p2=nullid):
444 444 p1rev = self.changelog.rev(p1)
445 445 p2rev = self.changelog.rev(p2)
446 446 msg = _("setting parent to node %s that only exists in the bundle\n")
447 447 if self.changelog.repotiprev < p1rev:
448 448 self.ui.warn(msg % nodemod.hex(p1))
449 449 if self.changelog.repotiprev < p2rev:
450 450 self.ui.warn(msg % nodemod.hex(p2))
451 451 return super(bundlerepository, self).setparents(p1, p2)
452 452
453 453 def instance(ui, path, create):
454 454 if create:
455 455 raise error.Abort(_('cannot create new bundle repository'))
456 456 # internal config: bundle.mainreporoot
457 457 parentpath = ui.config("bundle", "mainreporoot")
458 458 if not parentpath:
459 459 # try to find the correct path to the working directory repo
460 460 parentpath = cmdutil.findrepo(pycompat.getcwd())
461 461 if parentpath is None:
462 462 parentpath = ''
463 463 if parentpath:
464 464 # Try to make the full path relative so we get a nice, short URL.
465 465 # In particular, we don't want temp dir names in test outputs.
466 466 cwd = pycompat.getcwd()
467 467 if parentpath == cwd:
468 468 parentpath = ''
469 469 else:
470 470 cwd = pathutil.normasprefix(cwd)
471 471 if parentpath.startswith(cwd):
472 472 parentpath = parentpath[len(cwd):]
473 473 u = util.url(path)
474 474 path = u.localpath()
475 475 if u.scheme == 'bundle':
476 476 s = path.split("+", 1)
477 477 if len(s) == 1:
478 478 repopath, bundlename = parentpath, s[0]
479 479 else:
480 480 repopath, bundlename = s
481 481 else:
482 482 repopath, bundlename = parentpath, path
483 483 return bundlerepository(ui, repopath, bundlename)
484 484
485 485 class bundletransactionmanager(object):
486 486 def transaction(self):
487 487 return None
488 488
489 489 def close(self):
490 490 raise NotImplementedError
491 491
492 492 def release(self):
493 493 raise NotImplementedError
494 494
495 495 def getremotechanges(ui, repo, other, onlyheads=None, bundlename=None,
496 496 force=False):
497 497 '''obtains a bundle of changes incoming from other
498 498
499 499 "onlyheads" restricts the returned changes to those reachable from the
500 500 specified heads.
501 501 "bundlename", if given, stores the bundle to this file path permanently;
502 502 otherwise it's stored to a temp file and gets deleted again when you call
503 503 the returned "cleanupfn".
504 504 "force" indicates whether to proceed on unrelated repos.
505 505
506 506 Returns a tuple (local, csets, cleanupfn):
507 507
508 508 "local" is a local repo from which to obtain the actual incoming
509 509 changesets; it is a bundlerepo for the obtained bundle when the
510 510 original "other" is remote.
511 511 "csets" lists the incoming changeset node ids.
512 512 "cleanupfn" must be called without arguments when you're done processing
513 513 the changes; it closes both the original "other" and the one returned
514 514 here.
515 515 '''
516 516 tmp = discovery.findcommonincoming(repo, other, heads=onlyheads,
517 517 force=force)
518 518 common, incoming, rheads = tmp
519 519 if not incoming:
520 520 try:
521 521 if bundlename:
522 522 os.unlink(bundlename)
523 523 except OSError:
524 524 pass
525 525 return repo, [], other.close
526 526
527 527 commonset = set(common)
528 528 rheads = [x for x in rheads if x not in commonset]
529 529
530 530 bundle = None
531 531 bundlerepo = None
532 532 localrepo = other.local()
533 533 if bundlename or not localrepo:
534 534 # create a bundle (uncompressed if other repo is not local)
535 535
536 536 # developer config: devel.legacy.exchange
537 537 legexc = ui.configlist('devel', 'legacy.exchange')
538 538 forcebundle1 = 'bundle2' not in legexc and 'bundle1' in legexc
539 539 canbundle2 = (not forcebundle1
540 540 and other.capable('getbundle')
541 541 and other.capable('bundle2'))
542 542 if canbundle2:
543 543 kwargs = {}
544 544 kwargs['common'] = common
545 545 kwargs['heads'] = rheads
546 546 kwargs['bundlecaps'] = exchange.caps20to10(repo)
547 547 kwargs['cg'] = True
548 548 b2 = other.getbundle('incoming', **kwargs)
549 549 fname = bundle = changegroup.writechunks(ui, b2._forwardchunks(),
550 550 bundlename)
551 551 else:
552 552 if other.capable('getbundle'):
553 553 cg = other.getbundle('incoming', common=common, heads=rheads)
554 554 elif onlyheads is None and not other.capable('changegroupsubset'):
555 555 # compat with older servers when pulling all remote heads
556 556 cg = other.changegroup(incoming, "incoming")
557 557 rheads = None
558 558 else:
559 559 cg = other.changegroupsubset(incoming, rheads, 'incoming')
560 560 if localrepo:
561 561 bundletype = "HG10BZ"
562 562 else:
563 563 bundletype = "HG10UN"
564 564 fname = bundle = bundle2.writebundle(ui, cg, bundlename,
565 565 bundletype)
566 566 # keep written bundle?
567 567 if bundlename:
568 568 bundle = None
569 569 if not localrepo:
570 570 # use the created uncompressed bundlerepo
571 571 localrepo = bundlerepo = bundlerepository(repo.baseui, repo.root,
572 572 fname)
573 573 # this repo contains local and other now, so filter out local again
574 574 common = repo.heads()
575 575 if localrepo:
576 576 # Part of common may be remotely filtered
577 577 # So use an unfiltered version
578 578 # The discovery process probably need cleanup to avoid that
579 579 localrepo = localrepo.unfiltered()
580 580
581 581 csets = localrepo.changelog.findmissing(common, rheads)
582 582
583 583 if bundlerepo:
584 584 reponodes = [ctx.node() for ctx in bundlerepo[bundlerepo.firstnewrev:]]
585 585 remotephases = other.listkeys('phases')
586 586
587 587 pullop = exchange.pulloperation(bundlerepo, other, heads=reponodes)
588 588 pullop.trmanager = bundletransactionmanager()
589 589 exchange._pullapplyphases(pullop, remotephases)
590 590
591 591 def cleanup():
592 592 if bundlerepo:
593 593 bundlerepo.close()
594 594 if bundle:
595 595 os.unlink(bundle)
596 596 other.close()
597 597
598 598 return (localrepo, csets, cleanup)
General Comments 0
You need to be logged in to leave comments. Login now