##// END OF EJS Templates
infinitepush: opt out of changegroup3 unless explicitly configured...
Matt Harbison -
r51175:806ca6bd default
parent child Browse files
Show More
@@ -1,1383 +1,1388 b''
1 1 # Infinite push
2 2 #
3 3 # Copyright 2016 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 """ store some pushes in a remote blob store on the server (EXPERIMENTAL)
8 8
9 9 IMPORTANT: if you use this extension, please contact
10 10 mercurial-devel@mercurial-scm.org ASAP. This extension is believed to
11 11 be unused and barring learning of users of this functionality, we will
12 12 delete this code at the end of 2020.
13 13
14 14 [infinitepush]
15 15 # Server-side and client-side option. Pattern of the infinitepush bookmark
16 16 branchpattern = PATTERN
17 17
18 18 # Server or client
19 19 server = False
20 20
21 21 # Server-side option. Possible values: 'disk' or 'sql'. Fails if not set
22 22 indextype = disk
23 23
24 24 # Server-side option. Used only if indextype=sql.
25 25 # Format: 'IP:PORT:DB_NAME:USER:PASSWORD'
26 26 sqlhost = IP:PORT:DB_NAME:USER:PASSWORD
27 27
28 28 # Server-side option. Used only if indextype=disk.
29 29 # Filesystem path to the index store
30 30 indexpath = PATH
31 31
32 32 # Server-side option. Possible values: 'disk' or 'external'
33 33 # Fails if not set
34 34 storetype = disk
35 35
36 36 # Server-side option.
37 37 # Path to the binary that will save bundle to the bundlestore
38 38 # Formatted cmd line will be passed to it (see `put_args`)
39 39 put_binary = put
40 40
41 41 # Serser-side option. Used only if storetype=external.
42 42 # Format cmd-line string for put binary. Placeholder: {filename}
43 43 put_args = {filename}
44 44
45 45 # Server-side option.
46 46 # Path to the binary that get bundle from the bundlestore.
47 47 # Formatted cmd line will be passed to it (see `get_args`)
48 48 get_binary = get
49 49
50 50 # Serser-side option. Used only if storetype=external.
51 51 # Format cmd-line string for get binary. Placeholders: {filename} {handle}
52 52 get_args = {filename} {handle}
53 53
54 54 # Server-side option
55 55 logfile = FIlE
56 56
57 57 # Server-side option
58 58 loglevel = DEBUG
59 59
60 60 # Server-side option. Used only if indextype=sql.
61 61 # Sets mysql wait_timeout option.
62 62 waittimeout = 300
63 63
64 64 # Server-side option. Used only if indextype=sql.
65 65 # Sets mysql innodb_lock_wait_timeout option.
66 66 locktimeout = 120
67 67
68 68 # Server-side option. Used only if indextype=sql.
69 69 # Name of the repository
70 70 reponame = ''
71 71
72 72 # Client-side option. Used by --list-remote option. List of remote scratch
73 73 # patterns to list if no patterns are specified.
74 74 defaultremotepatterns = ['*']
75 75
76 76 # Instructs infinitepush to forward all received bundle2 parts to the
77 77 # bundle for storage. Defaults to False.
78 78 storeallparts = True
79 79
80 80 # routes each incoming push to the bundlestore. defaults to False
81 81 pushtobundlestore = True
82 82
83 83 [remotenames]
84 84 # Client-side option
85 85 # This option should be set only if remotenames extension is enabled.
86 86 # Whether remote bookmarks are tracked by remotenames extension.
87 87 bookmarks = True
88 88 """
89 89
90 90
91 91 import collections
92 92 import contextlib
93 93 import functools
94 94 import logging
95 95 import os
96 96 import random
97 97 import re
98 98 import socket
99 99 import subprocess
100 100 import time
101 101
102 102 from mercurial.node import (
103 103 bin,
104 104 hex,
105 105 )
106 106
107 107 from mercurial.i18n import _
108 108
109 109 from mercurial.pycompat import (
110 110 getattr,
111 111 open,
112 112 )
113 113
114 114 from mercurial.utils import (
115 115 procutil,
116 116 stringutil,
117 117 urlutil,
118 118 )
119 119
120 120 from mercurial import (
121 121 bundle2,
122 122 changegroup,
123 123 commands,
124 124 discovery,
125 125 encoding,
126 126 error,
127 127 exchange,
128 128 extensions,
129 129 hg,
130 130 localrepo,
131 131 phases,
132 132 pushkey,
133 133 pycompat,
134 134 registrar,
135 135 util,
136 136 wireprototypes,
137 137 wireprotov1peer,
138 138 wireprotov1server,
139 139 )
140 140
141 141 from . import (
142 142 bundleparts,
143 143 common,
144 144 )
145 145
146 146 # Note for extension authors: ONLY specify testedwith = 'ships-with-hg-core' for
147 147 # extensions which SHIP WITH MERCURIAL. Non-mainline extensions should
148 148 # be specifying the version(s) of Mercurial they are tested with, or
149 149 # leave the attribute unspecified.
150 150 testedwith = b'ships-with-hg-core'
151 151
152 152 configtable = {}
153 153 configitem = registrar.configitem(configtable)
154 154
155 155 configitem(
156 156 b'infinitepush',
157 157 b'server',
158 158 default=False,
159 159 )
160 160 configitem(
161 161 b'infinitepush',
162 162 b'storetype',
163 163 default=b'',
164 164 )
165 165 configitem(
166 166 b'infinitepush',
167 167 b'indextype',
168 168 default=b'',
169 169 )
170 170 configitem(
171 171 b'infinitepush',
172 172 b'indexpath',
173 173 default=b'',
174 174 )
175 175 configitem(
176 176 b'infinitepush',
177 177 b'storeallparts',
178 178 default=False,
179 179 )
180 180 configitem(
181 181 b'infinitepush',
182 182 b'reponame',
183 183 default=b'',
184 184 )
185 185 configitem(
186 186 b'scratchbranch',
187 187 b'storepath',
188 188 default=b'',
189 189 )
190 190 configitem(
191 191 b'infinitepush',
192 192 b'branchpattern',
193 193 default=b'',
194 194 )
195 195 configitem(
196 196 b'infinitepush',
197 197 b'pushtobundlestore',
198 198 default=False,
199 199 )
200 200 configitem(
201 201 b'experimental',
202 202 b'server-bundlestore-bookmark',
203 203 default=b'',
204 204 )
205 205 configitem(
206 206 b'experimental',
207 207 b'infinitepush-scratchpush',
208 208 default=False,
209 209 )
210 210
211 211 experimental = b'experimental'
212 212 configbookmark = b'server-bundlestore-bookmark'
213 213 configscratchpush = b'infinitepush-scratchpush'
214 214
215 215 scratchbranchparttype = bundleparts.scratchbranchparttype
216 216 revsetpredicate = registrar.revsetpredicate()
217 217 templatekeyword = registrar.templatekeyword()
218 218 _scratchbranchmatcher = lambda x: False
219 219 _maybehash = re.compile('^[a-f0-9]+$').search
220 220
221 221
222 222 def _buildexternalbundlestore(ui):
223 223 put_args = ui.configlist(b'infinitepush', b'put_args', [])
224 224 put_binary = ui.config(b'infinitepush', b'put_binary')
225 225 if not put_binary:
226 226 raise error.Abort(b'put binary is not specified')
227 227 get_args = ui.configlist(b'infinitepush', b'get_args', [])
228 228 get_binary = ui.config(b'infinitepush', b'get_binary')
229 229 if not get_binary:
230 230 raise error.Abort(b'get binary is not specified')
231 231 from . import store
232 232
233 233 return store.externalbundlestore(put_binary, put_args, get_binary, get_args)
234 234
235 235
236 236 def _buildsqlindex(ui):
237 237 sqlhost = ui.config(b'infinitepush', b'sqlhost')
238 238 if not sqlhost:
239 239 raise error.Abort(_(b'please set infinitepush.sqlhost'))
240 240 host, port, db, user, password = sqlhost.split(b':')
241 241 reponame = ui.config(b'infinitepush', b'reponame')
242 242 if not reponame:
243 243 raise error.Abort(_(b'please set infinitepush.reponame'))
244 244
245 245 logfile = ui.config(b'infinitepush', b'logfile', b'')
246 246 waittimeout = ui.configint(b'infinitepush', b'waittimeout', 300)
247 247 locktimeout = ui.configint(b'infinitepush', b'locktimeout', 120)
248 248 from . import sqlindexapi
249 249
250 250 return sqlindexapi.sqlindexapi(
251 251 reponame,
252 252 host,
253 253 port,
254 254 db,
255 255 user,
256 256 password,
257 257 logfile,
258 258 _getloglevel(ui),
259 259 waittimeout=waittimeout,
260 260 locktimeout=locktimeout,
261 261 )
262 262
263 263
264 264 def _getloglevel(ui):
265 265 loglevel = ui.config(b'infinitepush', b'loglevel', b'DEBUG')
266 266 numeric_loglevel = getattr(logging, loglevel.upper(), None)
267 267 if not isinstance(numeric_loglevel, int):
268 268 raise error.Abort(_(b'invalid log level %s') % loglevel)
269 269 return numeric_loglevel
270 270
271 271
272 272 def _tryhoist(ui, remotebookmark):
273 273 """returns a bookmarks with hoisted part removed
274 274
275 275 Remotenames extension has a 'hoist' config that allows to use remote
276 276 bookmarks without specifying remote path. For example, 'hg update master'
277 277 works as well as 'hg update remote/master'. We want to allow the same in
278 278 infinitepush.
279 279 """
280 280
281 281 if common.isremotebooksenabled(ui):
282 282 hoist = ui.config(b'remotenames', b'hoistedpeer') + b'/'
283 283 if remotebookmark.startswith(hoist):
284 284 return remotebookmark[len(hoist) :]
285 285 return remotebookmark
286 286
287 287
288 288 class bundlestore:
289 289 def __init__(self, repo):
290 290 self._repo = repo
291 291 storetype = self._repo.ui.config(b'infinitepush', b'storetype')
292 292 if storetype == b'disk':
293 293 from . import store
294 294
295 295 self.store = store.filebundlestore(self._repo.ui, self._repo)
296 296 elif storetype == b'external':
297 297 self.store = _buildexternalbundlestore(self._repo.ui)
298 298 else:
299 299 raise error.Abort(
300 300 _(b'unknown infinitepush store type specified %s') % storetype
301 301 )
302 302
303 303 indextype = self._repo.ui.config(b'infinitepush', b'indextype')
304 304 if indextype == b'disk':
305 305 from . import fileindexapi
306 306
307 307 self.index = fileindexapi.fileindexapi(self._repo)
308 308 elif indextype == b'sql':
309 309 self.index = _buildsqlindex(self._repo.ui)
310 310 else:
311 311 raise error.Abort(
312 312 _(b'unknown infinitepush index type specified %s') % indextype
313 313 )
314 314
315 315
316 316 def _isserver(ui):
317 317 return ui.configbool(b'infinitepush', b'server')
318 318
319 319
320 320 def reposetup(ui, repo):
321 321 if _isserver(ui) and repo.local():
322 322 repo.bundlestore = bundlestore(repo)
323 323
324 324
325 325 def extsetup(ui):
326 326 commonsetup(ui)
327 327 if _isserver(ui):
328 328 serverextsetup(ui)
329 329 else:
330 330 clientextsetup(ui)
331 331
332 332
333 def uipopulate(ui):
334 if not ui.hasconfig(b"experimental", b"changegroup3"):
335 ui.setconfig(b"experimental", b"changegroup3", False, b"infinitepush")
336
337
333 338 def commonsetup(ui):
334 339 wireprotov1server.commands[b'listkeyspatterns'] = (
335 340 wireprotolistkeyspatterns,
336 341 b'namespace patterns',
337 342 )
338 343 scratchbranchpat = ui.config(b'infinitepush', b'branchpattern')
339 344 if scratchbranchpat:
340 345 global _scratchbranchmatcher
341 346 kind, pat, _scratchbranchmatcher = stringutil.stringmatcher(
342 347 scratchbranchpat
343 348 )
344 349
345 350
346 351 def serverextsetup(ui):
347 352 origpushkeyhandler = bundle2.parthandlermapping[b'pushkey']
348 353
349 354 def newpushkeyhandler(*args, **kwargs):
350 355 bundle2pushkey(origpushkeyhandler, *args, **kwargs)
351 356
352 357 newpushkeyhandler.params = origpushkeyhandler.params
353 358 bundle2.parthandlermapping[b'pushkey'] = newpushkeyhandler
354 359
355 360 orighandlephasehandler = bundle2.parthandlermapping[b'phase-heads']
356 361 newphaseheadshandler = lambda *args, **kwargs: bundle2handlephases(
357 362 orighandlephasehandler, *args, **kwargs
358 363 )
359 364 newphaseheadshandler.params = orighandlephasehandler.params
360 365 bundle2.parthandlermapping[b'phase-heads'] = newphaseheadshandler
361 366
362 367 extensions.wrapfunction(
363 368 localrepo.localrepository, b'listkeys', localrepolistkeys
364 369 )
365 370 wireprotov1server.commands[b'lookup'] = (
366 371 _lookupwrap(wireprotov1server.commands[b'lookup'][0]),
367 372 b'key',
368 373 )
369 374 extensions.wrapfunction(exchange, b'getbundlechunks', getbundlechunks)
370 375
371 376 extensions.wrapfunction(bundle2, b'processparts', processparts)
372 377
373 378
374 379 def clientextsetup(ui):
375 380 entry = extensions.wrapcommand(commands.table, b'push', _push)
376 381
377 382 entry[1].append(
378 383 (
379 384 b'',
380 385 b'bundle-store',
381 386 None,
382 387 _(b'force push to go to bundle store (EXPERIMENTAL)'),
383 388 )
384 389 )
385 390
386 391 extensions.wrapcommand(commands.table, b'pull', _pull)
387 392
388 393 extensions.wrapfunction(discovery, b'checkheads', _checkheads)
389 394
390 395 wireprotov1peer.wirepeer.listkeyspatterns = listkeyspatterns
391 396
392 397 partorder = exchange.b2partsgenorder
393 398 index = partorder.index(b'changeset')
394 399 partorder.insert(
395 400 index, partorder.pop(partorder.index(scratchbranchparttype))
396 401 )
397 402
398 403
399 404 def _checkheads(orig, pushop):
400 405 if pushop.ui.configbool(experimental, configscratchpush, False):
401 406 return
402 407 return orig(pushop)
403 408
404 409
405 410 def wireprotolistkeyspatterns(repo, proto, namespace, patterns):
406 411 patterns = wireprototypes.decodelist(patterns)
407 412 d = repo.listkeys(encoding.tolocal(namespace), patterns).items()
408 413 return pushkey.encodekeys(d)
409 414
410 415
411 416 def localrepolistkeys(orig, self, namespace, patterns=None):
412 417 if namespace == b'bookmarks' and patterns:
413 418 index = self.bundlestore.index
414 419 results = {}
415 420 bookmarks = orig(self, namespace)
416 421 for pattern in patterns:
417 422 results.update(index.getbookmarks(pattern))
418 423 if pattern.endswith(b'*'):
419 424 pattern = b're:^' + pattern[:-1] + b'.*'
420 425 kind, pat, matcher = stringutil.stringmatcher(pattern)
421 426 for bookmark, node in bookmarks.items():
422 427 if matcher(bookmark):
423 428 results[bookmark] = node
424 429 return results
425 430 else:
426 431 return orig(self, namespace)
427 432
428 433
429 434 @wireprotov1peer.batchable
430 435 def listkeyspatterns(self, namespace, patterns):
431 436 if not self.capable(b'pushkey'):
432 437 return {}, None
433 438 self.ui.debug(b'preparing listkeys for "%s"\n' % namespace)
434 439
435 440 def decode(d):
436 441 self.ui.debug(
437 442 b'received listkey for "%s": %i bytes\n' % (namespace, len(d))
438 443 )
439 444 return pushkey.decodekeys(d)
440 445
441 446 return {
442 447 b'namespace': encoding.fromlocal(namespace),
443 448 b'patterns': wireprototypes.encodelist(patterns),
444 449 }, decode
445 450
446 451
447 452 def _readbundlerevs(bundlerepo):
448 453 return list(bundlerepo.revs(b'bundle()'))
449 454
450 455
451 456 def _includefilelogstobundle(bundlecaps, bundlerepo, bundlerevs, ui):
452 457 """Tells remotefilelog to include all changed files to the changegroup
453 458
454 459 By default remotefilelog doesn't include file content to the changegroup.
455 460 But we need to include it if we are fetching from bundlestore.
456 461 """
457 462 changedfiles = set()
458 463 cl = bundlerepo.changelog
459 464 for r in bundlerevs:
460 465 # [3] means changed files
461 466 changedfiles.update(cl.read(r)[3])
462 467 if not changedfiles:
463 468 return bundlecaps
464 469
465 470 changedfiles = b'\0'.join(changedfiles)
466 471 newcaps = []
467 472 appended = False
468 473 for cap in bundlecaps or []:
469 474 if cap.startswith(b'excludepattern='):
470 475 newcaps.append(b'\0'.join((cap, changedfiles)))
471 476 appended = True
472 477 else:
473 478 newcaps.append(cap)
474 479 if not appended:
475 480 # Not found excludepattern cap. Just append it
476 481 newcaps.append(b'excludepattern=' + changedfiles)
477 482
478 483 return newcaps
479 484
480 485
481 486 def _rebundle(bundlerepo, bundleroots, unknownhead):
482 487 """
483 488 Bundle may include more revision then user requested. For example,
484 489 if user asks for revision but bundle also consists its descendants.
485 490 This function will filter out all revision that user is not requested.
486 491 """
487 492 parts = []
488 493
489 494 version = b'02'
490 495 outgoing = discovery.outgoing(
491 496 bundlerepo, commonheads=bundleroots, ancestorsof=[unknownhead]
492 497 )
493 498 cgstream = changegroup.makestream(bundlerepo, outgoing, version, b'pull')
494 499 cgstream = util.chunkbuffer(cgstream).read()
495 500 cgpart = bundle2.bundlepart(b'changegroup', data=cgstream)
496 501 cgpart.addparam(b'version', version)
497 502 parts.append(cgpart)
498 503
499 504 return parts
500 505
501 506
502 507 def _getbundleroots(oldrepo, bundlerepo, bundlerevs):
503 508 cl = bundlerepo.changelog
504 509 bundleroots = []
505 510 for rev in bundlerevs:
506 511 node = cl.node(rev)
507 512 parents = cl.parents(node)
508 513 for parent in parents:
509 514 # include all revs that exist in the main repo
510 515 # to make sure that bundle may apply client-side
511 516 if parent in oldrepo:
512 517 bundleroots.append(parent)
513 518 return bundleroots
514 519
515 520
516 521 def _needsrebundling(head, bundlerepo):
517 522 bundleheads = list(bundlerepo.revs(b'heads(bundle())'))
518 523 return not (
519 524 len(bundleheads) == 1 and bundlerepo[bundleheads[0]].node() == head
520 525 )
521 526
522 527
523 528 def _generateoutputparts(head, bundlerepo, bundleroots, bundlefile):
524 529 """generates bundle that will be send to the user
525 530
526 531 returns tuple with raw bundle string and bundle type
527 532 """
528 533 parts = []
529 534 if not _needsrebundling(head, bundlerepo):
530 535 with util.posixfile(bundlefile, b"rb") as f:
531 536 unbundler = exchange.readbundle(bundlerepo.ui, f, bundlefile)
532 537 if isinstance(unbundler, changegroup.cg1unpacker):
533 538 part = bundle2.bundlepart(
534 539 b'changegroup', data=unbundler._stream.read()
535 540 )
536 541 part.addparam(b'version', b'01')
537 542 parts.append(part)
538 543 elif isinstance(unbundler, bundle2.unbundle20):
539 544 haschangegroup = False
540 545 for part in unbundler.iterparts():
541 546 if part.type == b'changegroup':
542 547 haschangegroup = True
543 548 newpart = bundle2.bundlepart(part.type, data=part.read())
544 549 for key, value in part.params.items():
545 550 newpart.addparam(key, value)
546 551 parts.append(newpart)
547 552
548 553 if not haschangegroup:
549 554 raise error.Abort(
550 555 b'unexpected bundle without changegroup part, '
551 556 + b'head: %s' % hex(head),
552 557 hint=b'report to administrator',
553 558 )
554 559 else:
555 560 raise error.Abort(b'unknown bundle type')
556 561 else:
557 562 parts = _rebundle(bundlerepo, bundleroots, head)
558 563
559 564 return parts
560 565
561 566
562 567 def getbundlechunks(orig, repo, source, heads=None, bundlecaps=None, **kwargs):
563 568 heads = heads or []
564 569 # newheads are parents of roots of scratch bundles that were requested
565 570 newphases = {}
566 571 scratchbundles = []
567 572 newheads = []
568 573 scratchheads = []
569 574 nodestobundle = {}
570 575 allbundlestocleanup = []
571 576 try:
572 577 for head in heads:
573 578 if not repo.changelog.index.has_node(head):
574 579 if head not in nodestobundle:
575 580 newbundlefile = common.downloadbundle(repo, head)
576 581 bundlepath = b"bundle:%s+%s" % (repo.root, newbundlefile)
577 582 bundlerepo = hg.repository(repo.ui, bundlepath)
578 583
579 584 allbundlestocleanup.append((bundlerepo, newbundlefile))
580 585 bundlerevs = set(_readbundlerevs(bundlerepo))
581 586 bundlecaps = _includefilelogstobundle(
582 587 bundlecaps, bundlerepo, bundlerevs, repo.ui
583 588 )
584 589 cl = bundlerepo.changelog
585 590 bundleroots = _getbundleroots(repo, bundlerepo, bundlerevs)
586 591 for rev in bundlerevs:
587 592 node = cl.node(rev)
588 593 newphases[hex(node)] = str(phases.draft)
589 594 nodestobundle[node] = (
590 595 bundlerepo,
591 596 bundleroots,
592 597 newbundlefile,
593 598 )
594 599
595 600 scratchbundles.append(
596 601 _generateoutputparts(head, *nodestobundle[head])
597 602 )
598 603 newheads.extend(bundleroots)
599 604 scratchheads.append(head)
600 605 finally:
601 606 for bundlerepo, bundlefile in allbundlestocleanup:
602 607 bundlerepo.close()
603 608 try:
604 609 os.unlink(bundlefile)
605 610 except (IOError, OSError):
606 611 # if we can't cleanup the file then just ignore the error,
607 612 # no need to fail
608 613 pass
609 614
610 615 pullfrombundlestore = bool(scratchbundles)
611 616 wrappedchangegrouppart = False
612 617 wrappedlistkeys = False
613 618 oldchangegrouppart = exchange.getbundle2partsmapping[b'changegroup']
614 619 try:
615 620
616 621 def _changegrouppart(bundler, *args, **kwargs):
617 622 # Order is important here. First add non-scratch part
618 623 # and only then add parts with scratch bundles because
619 624 # non-scratch part contains parents of roots of scratch bundles.
620 625 result = oldchangegrouppart(bundler, *args, **kwargs)
621 626 for bundle in scratchbundles:
622 627 for part in bundle:
623 628 bundler.addpart(part)
624 629 return result
625 630
626 631 exchange.getbundle2partsmapping[b'changegroup'] = _changegrouppart
627 632 wrappedchangegrouppart = True
628 633
629 634 def _listkeys(orig, self, namespace):
630 635 origvalues = orig(self, namespace)
631 636 if namespace == b'phases' and pullfrombundlestore:
632 637 if origvalues.get(b'publishing') == b'True':
633 638 # Make repo non-publishing to preserve draft phase
634 639 del origvalues[b'publishing']
635 640 origvalues.update(newphases)
636 641 return origvalues
637 642
638 643 extensions.wrapfunction(
639 644 localrepo.localrepository, b'listkeys', _listkeys
640 645 )
641 646 wrappedlistkeys = True
642 647 heads = list((set(newheads) | set(heads)) - set(scratchheads))
643 648 result = orig(
644 649 repo, source, heads=heads, bundlecaps=bundlecaps, **kwargs
645 650 )
646 651 finally:
647 652 if wrappedchangegrouppart:
648 653 exchange.getbundle2partsmapping[b'changegroup'] = oldchangegrouppart
649 654 if wrappedlistkeys:
650 655 extensions.unwrapfunction(
651 656 localrepo.localrepository, b'listkeys', _listkeys
652 657 )
653 658 return result
654 659
655 660
656 661 def _lookupwrap(orig):
657 662 def _lookup(repo, proto, key):
658 663 localkey = encoding.tolocal(key)
659 664
660 665 if isinstance(localkey, str) and _scratchbranchmatcher(localkey):
661 666 scratchnode = repo.bundlestore.index.getnode(localkey)
662 667 if scratchnode:
663 668 return b"%d %s\n" % (1, scratchnode)
664 669 else:
665 670 return b"%d %s\n" % (
666 671 0,
667 672 b'scratch branch %s not found' % localkey,
668 673 )
669 674 else:
670 675 try:
671 676 r = hex(repo.lookup(localkey))
672 677 return b"%d %s\n" % (1, r)
673 678 except Exception as inst:
674 679 if repo.bundlestore.index.getbundle(localkey):
675 680 return b"%d %s\n" % (1, localkey)
676 681 else:
677 682 r = stringutil.forcebytestr(inst)
678 683 return b"%d %s\n" % (0, r)
679 684
680 685 return _lookup
681 686
682 687
683 688 def _pull(orig, ui, repo, source=b"default", **opts):
684 689 opts = pycompat.byteskwargs(opts)
685 690 # Copy paste from `pull` command
686 691 path = urlutil.get_unique_pull_path_obj(
687 692 b"infinite-push's pull",
688 693 ui,
689 694 source,
690 695 )
691 696
692 697 scratchbookmarks = {}
693 698 unfi = repo.unfiltered()
694 699 unknownnodes = []
695 700 for rev in opts.get(b'rev', []):
696 701 if rev not in unfi:
697 702 unknownnodes.append(rev)
698 703 if opts.get(b'bookmark'):
699 704 bookmarks = []
700 705 revs = opts.get(b'rev') or []
701 706 for bookmark in opts.get(b'bookmark'):
702 707 if _scratchbranchmatcher(bookmark):
703 708 # rev is not known yet
704 709 # it will be fetched with listkeyspatterns next
705 710 scratchbookmarks[bookmark] = b'REVTOFETCH'
706 711 else:
707 712 bookmarks.append(bookmark)
708 713
709 714 if scratchbookmarks:
710 715 other = hg.peer(repo, opts, path)
711 716 try:
712 717 fetchedbookmarks = other.listkeyspatterns(
713 718 b'bookmarks', patterns=scratchbookmarks
714 719 )
715 720 for bookmark in scratchbookmarks:
716 721 if bookmark not in fetchedbookmarks:
717 722 raise error.Abort(
718 723 b'remote bookmark %s not found!' % bookmark
719 724 )
720 725 scratchbookmarks[bookmark] = fetchedbookmarks[bookmark]
721 726 revs.append(fetchedbookmarks[bookmark])
722 727 finally:
723 728 other.close()
724 729 opts[b'bookmark'] = bookmarks
725 730 opts[b'rev'] = revs
726 731
727 732 if scratchbookmarks or unknownnodes:
728 733 # Set anyincoming to True
729 734 extensions.wrapfunction(
730 735 discovery, b'findcommonincoming', _findcommonincoming
731 736 )
732 737 try:
733 738 # Remote scratch bookmarks will be deleted because remotenames doesn't
734 739 # know about them. Let's save it before pull and restore after
735 740 remotescratchbookmarks = _readscratchremotebookmarks(ui, repo, path.loc)
736 741 result = orig(ui, repo, path.loc, **pycompat.strkwargs(opts))
737 742 # TODO(stash): race condition is possible
738 743 # if scratch bookmarks was updated right after orig.
739 744 # But that's unlikely and shouldn't be harmful.
740 745 if common.isremotebooksenabled(ui):
741 746 remotescratchbookmarks.update(scratchbookmarks)
742 747 _saveremotebookmarks(repo, remotescratchbookmarks, path.loc)
743 748 else:
744 749 _savelocalbookmarks(repo, scratchbookmarks)
745 750 return result
746 751 finally:
747 752 if scratchbookmarks:
748 753 extensions.unwrapfunction(discovery, b'findcommonincoming')
749 754
750 755
751 756 def _readscratchremotebookmarks(ui, repo, other):
752 757 if common.isremotebooksenabled(ui):
753 758 remotenamesext = extensions.find(b'remotenames')
754 759 remotepath = remotenamesext.activepath(repo.ui, other)
755 760 result = {}
756 761 # Let's refresh remotenames to make sure we have it up to date
757 762 # Seems that `repo.names['remotebookmarks']` may return stale bookmarks
758 763 # and it results in deleting scratch bookmarks. Our best guess how to
759 764 # fix it is to use `clearnames()`
760 765 repo._remotenames.clearnames()
761 766 for remotebookmark in repo.names[b'remotebookmarks'].listnames(repo):
762 767 path, bookname = remotenamesext.splitremotename(remotebookmark)
763 768 if path == remotepath and _scratchbranchmatcher(bookname):
764 769 nodes = repo.names[b'remotebookmarks'].nodes(
765 770 repo, remotebookmark
766 771 )
767 772 if nodes:
768 773 result[bookname] = hex(nodes[0])
769 774 return result
770 775 else:
771 776 return {}
772 777
773 778
774 779 def _saveremotebookmarks(repo, newbookmarks, remote):
775 780 remotenamesext = extensions.find(b'remotenames')
776 781 remotepath = remotenamesext.activepath(repo.ui, remote)
777 782 branches = collections.defaultdict(list)
778 783 bookmarks = {}
779 784 remotenames = remotenamesext.readremotenames(repo)
780 785 for hexnode, nametype, remote, rname in remotenames:
781 786 if remote != remotepath:
782 787 continue
783 788 if nametype == b'bookmarks':
784 789 if rname in newbookmarks:
785 790 # It's possible if we have a normal bookmark that matches
786 791 # scratch branch pattern. In this case just use the current
787 792 # bookmark node
788 793 del newbookmarks[rname]
789 794 bookmarks[rname] = hexnode
790 795 elif nametype == b'branches':
791 796 # saveremotenames expects 20 byte binary nodes for branches
792 797 branches[rname].append(bin(hexnode))
793 798
794 799 for bookmark, hexnode in newbookmarks.items():
795 800 bookmarks[bookmark] = hexnode
796 801 remotenamesext.saveremotenames(repo, remotepath, branches, bookmarks)
797 802
798 803
799 804 def _savelocalbookmarks(repo, bookmarks):
800 805 if not bookmarks:
801 806 return
802 807 with repo.wlock(), repo.lock(), repo.transaction(b'bookmark') as tr:
803 808 changes = []
804 809 for scratchbook, node in bookmarks.items():
805 810 changectx = repo[node]
806 811 changes.append((scratchbook, changectx.node()))
807 812 repo._bookmarks.applychanges(repo, tr, changes)
808 813
809 814
810 815 def _findcommonincoming(orig, *args, **kwargs):
811 816 common, inc, remoteheads = orig(*args, **kwargs)
812 817 return common, True, remoteheads
813 818
814 819
815 820 def _push(orig, ui, repo, *dests, **opts):
816 821 opts = pycompat.byteskwargs(opts)
817 822 bookmark = opts.get(b'bookmark')
818 823 # we only support pushing one infinitepush bookmark at once
819 824 if len(bookmark) == 1:
820 825 bookmark = bookmark[0]
821 826 else:
822 827 bookmark = b''
823 828
824 829 oldphasemove = None
825 830 overrides = {(experimental, configbookmark): bookmark}
826 831
827 832 with ui.configoverride(overrides, b'infinitepush'):
828 833 scratchpush = opts.get(b'bundle_store')
829 834 if _scratchbranchmatcher(bookmark):
830 835 scratchpush = True
831 836 # bundle2 can be sent back after push (for example, bundle2
832 837 # containing `pushkey` part to update bookmarks)
833 838 ui.setconfig(experimental, b'bundle2.pushback', True)
834 839
835 840 if scratchpush:
836 841 # this is an infinitepush, we don't want the bookmark to be applied
837 842 # rather that should be stored in the bundlestore
838 843 opts[b'bookmark'] = []
839 844 ui.setconfig(experimental, configscratchpush, True)
840 845 oldphasemove = extensions.wrapfunction(
841 846 exchange, b'_localphasemove', _phasemove
842 847 )
843 848
844 849 paths = list(urlutil.get_push_paths(repo, ui, dests))
845 850 if len(paths) > 1:
846 851 msg = _(b'cannot push to multiple path with infinitepush')
847 852 raise error.Abort(msg)
848 853
849 854 path = paths[0]
850 855 destpath = path.loc
851 856 # Remote scratch bookmarks will be deleted because remotenames doesn't
852 857 # know about them. Let's save it before push and restore after
853 858 remotescratchbookmarks = _readscratchremotebookmarks(ui, repo, destpath)
854 859 result = orig(ui, repo, *dests, **pycompat.strkwargs(opts))
855 860 if common.isremotebooksenabled(ui):
856 861 if bookmark and scratchpush:
857 862 other = hg.peer(repo, opts, path)
858 863 try:
859 864 fetchedbookmarks = other.listkeyspatterns(
860 865 b'bookmarks', patterns=[bookmark]
861 866 )
862 867 remotescratchbookmarks.update(fetchedbookmarks)
863 868 finally:
864 869 other.close()
865 870 _saveremotebookmarks(repo, remotescratchbookmarks, destpath)
866 871 if oldphasemove:
867 872 exchange._localphasemove = oldphasemove
868 873 return result
869 874
870 875
871 876 def _deleteinfinitepushbookmarks(ui, repo, path, names):
872 877 """Prune remote names by removing the bookmarks we don't want anymore,
873 878 then writing the result back to disk
874 879 """
875 880 remotenamesext = extensions.find(b'remotenames')
876 881
877 882 # remotename format is:
878 883 # (node, nametype ("branches" or "bookmarks"), remote, name)
879 884 nametype_idx = 1
880 885 remote_idx = 2
881 886 name_idx = 3
882 887 remotenames = [
883 888 remotename
884 889 for remotename in remotenamesext.readremotenames(repo)
885 890 if remotename[remote_idx] == path
886 891 ]
887 892 remote_bm_names = [
888 893 remotename[name_idx]
889 894 for remotename in remotenames
890 895 if remotename[nametype_idx] == b"bookmarks"
891 896 ]
892 897
893 898 for name in names:
894 899 if name not in remote_bm_names:
895 900 raise error.Abort(
896 901 _(
897 902 b"infinitepush bookmark '{}' does not exist "
898 903 b"in path '{}'"
899 904 ).format(name, path)
900 905 )
901 906
902 907 bookmarks = {}
903 908 branches = collections.defaultdict(list)
904 909 for node, nametype, remote, name in remotenames:
905 910 if nametype == b"bookmarks" and name not in names:
906 911 bookmarks[name] = node
907 912 elif nametype == b"branches":
908 913 # saveremotenames wants binary nodes for branches
909 914 branches[name].append(bin(node))
910 915
911 916 remotenamesext.saveremotenames(repo, path, branches, bookmarks)
912 917
913 918
914 919 def _phasemove(orig, pushop, nodes, phase=phases.public):
915 920 """prevent commits from being marked public
916 921
917 922 Since these are going to a scratch branch, they aren't really being
918 923 published."""
919 924
920 925 if phase != phases.public:
921 926 orig(pushop, nodes, phase)
922 927
923 928
924 929 @exchange.b2partsgenerator(scratchbranchparttype)
925 930 def partgen(pushop, bundler):
926 931 bookmark = pushop.ui.config(experimental, configbookmark)
927 932 scratchpush = pushop.ui.configbool(experimental, configscratchpush)
928 933 if b'changesets' in pushop.stepsdone or not scratchpush:
929 934 return
930 935
931 936 if scratchbranchparttype not in bundle2.bundle2caps(pushop.remote):
932 937 return
933 938
934 939 pushop.stepsdone.add(b'changesets')
935 940 if not pushop.outgoing.missing:
936 941 pushop.ui.status(_(b'no changes found\n'))
937 942 pushop.cgresult = 0
938 943 return
939 944
940 945 # This parameter tells the server that the following bundle is an
941 946 # infinitepush. This let's it switch the part processing to our infinitepush
942 947 # code path.
943 948 bundler.addparam(b"infinitepush", b"True")
944 949
945 950 scratchparts = bundleparts.getscratchbranchparts(
946 951 pushop.repo, pushop.remote, pushop.outgoing, pushop.ui, bookmark
947 952 )
948 953
949 954 for scratchpart in scratchparts:
950 955 bundler.addpart(scratchpart)
951 956
952 957 def handlereply(op):
953 958 # server either succeeds or aborts; no code to read
954 959 pushop.cgresult = 1
955 960
956 961 return handlereply
957 962
958 963
959 964 bundle2.capabilities[bundleparts.scratchbranchparttype] = ()
960 965
961 966
962 967 def _getrevs(bundle, oldnode, force, bookmark):
963 968 b'extracts and validates the revs to be imported'
964 969 revs = [bundle[r] for r in bundle.revs(b'sort(bundle())')]
965 970
966 971 # new bookmark
967 972 if oldnode is None:
968 973 return revs
969 974
970 975 # Fast forward update
971 976 if oldnode in bundle and list(bundle.set(b'bundle() & %s::', oldnode)):
972 977 return revs
973 978
974 979 return revs
975 980
976 981
977 982 @contextlib.contextmanager
978 983 def logservicecall(logger, service, **kwargs):
979 984 start = time.time()
980 985 logger(service, eventtype=b'start', **kwargs)
981 986 try:
982 987 yield
983 988 logger(
984 989 service,
985 990 eventtype=b'success',
986 991 elapsedms=(time.time() - start) * 1000,
987 992 **kwargs
988 993 )
989 994 except Exception as e:
990 995 logger(
991 996 service,
992 997 eventtype=b'failure',
993 998 elapsedms=(time.time() - start) * 1000,
994 999 errormsg=stringutil.forcebytestr(e),
995 1000 **kwargs
996 1001 )
997 1002 raise
998 1003
999 1004
1000 1005 def _getorcreateinfinitepushlogger(op):
1001 1006 logger = op.records[b'infinitepushlogger']
1002 1007 if not logger:
1003 1008 ui = op.repo.ui
1004 1009 try:
1005 1010 username = procutil.getuser()
1006 1011 except Exception:
1007 1012 username = b'unknown'
1008 1013 # Generate random request id to be able to find all logged entries
1009 1014 # for the same request. Since requestid is pseudo-generated it may
1010 1015 # not be unique, but we assume that (hostname, username, requestid)
1011 1016 # is unique.
1012 1017 random.seed()
1013 1018 requestid = random.randint(0, 2000000000)
1014 1019 hostname = socket.gethostname()
1015 1020 logger = functools.partial(
1016 1021 ui.log,
1017 1022 b'infinitepush',
1018 1023 user=username,
1019 1024 requestid=requestid,
1020 1025 hostname=hostname,
1021 1026 reponame=ui.config(b'infinitepush', b'reponame'),
1022 1027 )
1023 1028 op.records.add(b'infinitepushlogger', logger)
1024 1029 else:
1025 1030 logger = logger[0]
1026 1031 return logger
1027 1032
1028 1033
1029 1034 def storetobundlestore(orig, repo, op, unbundler):
1030 1035 """stores the incoming bundle coming from push command to the bundlestore
1031 1036 instead of applying on the revlogs"""
1032 1037
1033 1038 repo.ui.status(_(b"storing changesets on the bundlestore\n"))
1034 1039 bundler = bundle2.bundle20(repo.ui)
1035 1040
1036 1041 # processing each part and storing it in bundler
1037 1042 with bundle2.partiterator(repo, op, unbundler) as parts:
1038 1043 for part in parts:
1039 1044 bundlepart = None
1040 1045 if part.type == b'replycaps':
1041 1046 # This configures the current operation to allow reply parts.
1042 1047 bundle2._processpart(op, part)
1043 1048 else:
1044 1049 bundlepart = bundle2.bundlepart(part.type, data=part.read())
1045 1050 for key, value in part.params.items():
1046 1051 bundlepart.addparam(key, value)
1047 1052
1048 1053 # Certain parts require a response
1049 1054 if part.type in (b'pushkey', b'changegroup'):
1050 1055 if op.reply is not None:
1051 1056 rpart = op.reply.newpart(b'reply:%s' % part.type)
1052 1057 rpart.addparam(
1053 1058 b'in-reply-to', b'%d' % part.id, mandatory=False
1054 1059 )
1055 1060 rpart.addparam(b'return', b'1', mandatory=False)
1056 1061
1057 1062 op.records.add(
1058 1063 part.type,
1059 1064 {
1060 1065 b'return': 1,
1061 1066 },
1062 1067 )
1063 1068 if bundlepart:
1064 1069 bundler.addpart(bundlepart)
1065 1070
1066 1071 # storing the bundle in the bundlestore
1067 1072 buf = util.chunkbuffer(bundler.getchunks())
1068 1073 fd, bundlefile = pycompat.mkstemp()
1069 1074 try:
1070 1075 try:
1071 1076 fp = os.fdopen(fd, 'wb')
1072 1077 fp.write(buf.read())
1073 1078 finally:
1074 1079 fp.close()
1075 1080 storebundle(op, {}, bundlefile)
1076 1081 finally:
1077 1082 try:
1078 1083 os.unlink(bundlefile)
1079 1084 except Exception:
1080 1085 # we would rather see the original exception
1081 1086 pass
1082 1087
1083 1088
1084 1089 def processparts(orig, repo, op, unbundler):
1085 1090
1086 1091 # make sure we don't wrap processparts in case of `hg unbundle`
1087 1092 if op.source == b'unbundle':
1088 1093 return orig(repo, op, unbundler)
1089 1094
1090 1095 # this server routes each push to bundle store
1091 1096 if repo.ui.configbool(b'infinitepush', b'pushtobundlestore'):
1092 1097 return storetobundlestore(orig, repo, op, unbundler)
1093 1098
1094 1099 if unbundler.params.get(b'infinitepush') != b'True':
1095 1100 return orig(repo, op, unbundler)
1096 1101
1097 1102 handleallparts = repo.ui.configbool(b'infinitepush', b'storeallparts')
1098 1103
1099 1104 bundler = bundle2.bundle20(repo.ui)
1100 1105 cgparams = None
1101 1106 with bundle2.partiterator(repo, op, unbundler) as parts:
1102 1107 for part in parts:
1103 1108 bundlepart = None
1104 1109 if part.type == b'replycaps':
1105 1110 # This configures the current operation to allow reply parts.
1106 1111 bundle2._processpart(op, part)
1107 1112 elif part.type == bundleparts.scratchbranchparttype:
1108 1113 # Scratch branch parts need to be converted to normal
1109 1114 # changegroup parts, and the extra parameters stored for later
1110 1115 # when we upload to the store. Eventually those parameters will
1111 1116 # be put on the actual bundle instead of this part, then we can
1112 1117 # send a vanilla changegroup instead of the scratchbranch part.
1113 1118 cgversion = part.params.get(b'cgversion', b'01')
1114 1119 bundlepart = bundle2.bundlepart(
1115 1120 b'changegroup', data=part.read()
1116 1121 )
1117 1122 bundlepart.addparam(b'version', cgversion)
1118 1123 cgparams = part.params
1119 1124
1120 1125 # If we're not dumping all parts into the new bundle, we need to
1121 1126 # alert the future pushkey and phase-heads handler to skip
1122 1127 # the part.
1123 1128 if not handleallparts:
1124 1129 op.records.add(
1125 1130 scratchbranchparttype + b'_skippushkey', True
1126 1131 )
1127 1132 op.records.add(
1128 1133 scratchbranchparttype + b'_skipphaseheads', True
1129 1134 )
1130 1135 else:
1131 1136 if handleallparts:
1132 1137 # Ideally we would not process any parts, and instead just
1133 1138 # forward them to the bundle for storage, but since this
1134 1139 # differs from previous behavior, we need to put it behind a
1135 1140 # config flag for incremental rollout.
1136 1141 bundlepart = bundle2.bundlepart(part.type, data=part.read())
1137 1142 for key, value in part.params.items():
1138 1143 bundlepart.addparam(key, value)
1139 1144
1140 1145 # Certain parts require a response
1141 1146 if part.type == b'pushkey':
1142 1147 if op.reply is not None:
1143 1148 rpart = op.reply.newpart(b'reply:pushkey')
1144 1149 rpart.addparam(
1145 1150 b'in-reply-to', str(part.id), mandatory=False
1146 1151 )
1147 1152 rpart.addparam(b'return', b'1', mandatory=False)
1148 1153 else:
1149 1154 bundle2._processpart(op, part)
1150 1155
1151 1156 if handleallparts:
1152 1157 op.records.add(
1153 1158 part.type,
1154 1159 {
1155 1160 b'return': 1,
1156 1161 },
1157 1162 )
1158 1163 if bundlepart:
1159 1164 bundler.addpart(bundlepart)
1160 1165
1161 1166 # If commits were sent, store them
1162 1167 if cgparams:
1163 1168 buf = util.chunkbuffer(bundler.getchunks())
1164 1169 fd, bundlefile = pycompat.mkstemp()
1165 1170 try:
1166 1171 try:
1167 1172 fp = os.fdopen(fd, 'wb')
1168 1173 fp.write(buf.read())
1169 1174 finally:
1170 1175 fp.close()
1171 1176 storebundle(op, cgparams, bundlefile)
1172 1177 finally:
1173 1178 try:
1174 1179 os.unlink(bundlefile)
1175 1180 except Exception:
1176 1181 # we would rather see the original exception
1177 1182 pass
1178 1183
1179 1184
1180 1185 def storebundle(op, params, bundlefile):
1181 1186 log = _getorcreateinfinitepushlogger(op)
1182 1187 parthandlerstart = time.time()
1183 1188 log(scratchbranchparttype, eventtype=b'start')
1184 1189 index = op.repo.bundlestore.index
1185 1190 store = op.repo.bundlestore.store
1186 1191 op.records.add(scratchbranchparttype + b'_skippushkey', True)
1187 1192
1188 1193 bundle = None
1189 1194 try: # guards bundle
1190 1195 bundlepath = b"bundle:%s+%s" % (op.repo.root, bundlefile)
1191 1196 bundle = hg.repository(op.repo.ui, bundlepath)
1192 1197
1193 1198 bookmark = params.get(b'bookmark')
1194 1199 bookprevnode = params.get(b'bookprevnode', b'')
1195 1200 force = params.get(b'force')
1196 1201
1197 1202 if bookmark:
1198 1203 oldnode = index.getnode(bookmark)
1199 1204 else:
1200 1205 oldnode = None
1201 1206 bundleheads = bundle.revs(b'heads(bundle())')
1202 1207 if bookmark and len(bundleheads) > 1:
1203 1208 raise error.Abort(
1204 1209 _(b'cannot push more than one head to a scratch branch')
1205 1210 )
1206 1211
1207 1212 revs = _getrevs(bundle, oldnode, force, bookmark)
1208 1213
1209 1214 # Notify the user of what is being pushed
1210 1215 plural = b's' if len(revs) > 1 else b''
1211 1216 op.repo.ui.warn(_(b"pushing %d commit%s:\n") % (len(revs), plural))
1212 1217 maxoutput = 10
1213 1218 for i in range(0, min(len(revs), maxoutput)):
1214 1219 firstline = bundle[revs[i]].description().split(b'\n')[0][:50]
1215 1220 op.repo.ui.warn(b" %s %s\n" % (revs[i], firstline))
1216 1221
1217 1222 if len(revs) > maxoutput + 1:
1218 1223 op.repo.ui.warn(b" ...\n")
1219 1224 firstline = bundle[revs[-1]].description().split(b'\n')[0][:50]
1220 1225 op.repo.ui.warn(b" %s %s\n" % (revs[-1], firstline))
1221 1226
1222 1227 nodesctx = [bundle[rev] for rev in revs]
1223 1228 inindex = lambda rev: bool(index.getbundle(bundle[rev].hex()))
1224 1229 if bundleheads:
1225 1230 newheadscount = sum(not inindex(rev) for rev in bundleheads)
1226 1231 else:
1227 1232 newheadscount = 0
1228 1233 # If there's a bookmark specified, there should be only one head,
1229 1234 # so we choose the last node, which will be that head.
1230 1235 # If a bug or malicious client allows there to be a bookmark
1231 1236 # with multiple heads, we will place the bookmark on the last head.
1232 1237 bookmarknode = nodesctx[-1].hex() if nodesctx else None
1233 1238 key = None
1234 1239 if newheadscount:
1235 1240 with open(bundlefile, b'rb') as f:
1236 1241 bundledata = f.read()
1237 1242 with logservicecall(
1238 1243 log, b'bundlestore', bundlesize=len(bundledata)
1239 1244 ):
1240 1245 bundlesizelimit = 100 * 1024 * 1024 # 100 MB
1241 1246 if len(bundledata) > bundlesizelimit:
1242 1247 error_msg = (
1243 1248 b'bundle is too big: %d bytes. '
1244 1249 + b'max allowed size is 100 MB'
1245 1250 )
1246 1251 raise error.Abort(error_msg % (len(bundledata),))
1247 1252 key = store.write(bundledata)
1248 1253
1249 1254 with logservicecall(log, b'index', newheadscount=newheadscount), index:
1250 1255 if key:
1251 1256 index.addbundle(key, nodesctx)
1252 1257 if bookmark:
1253 1258 index.addbookmark(bookmark, bookmarknode)
1254 1259 _maybeaddpushbackpart(
1255 1260 op, bookmark, bookmarknode, bookprevnode, params
1256 1261 )
1257 1262 log(
1258 1263 scratchbranchparttype,
1259 1264 eventtype=b'success',
1260 1265 elapsedms=(time.time() - parthandlerstart) * 1000,
1261 1266 )
1262 1267
1263 1268 except Exception as e:
1264 1269 log(
1265 1270 scratchbranchparttype,
1266 1271 eventtype=b'failure',
1267 1272 elapsedms=(time.time() - parthandlerstart) * 1000,
1268 1273 errormsg=stringutil.forcebytestr(e),
1269 1274 )
1270 1275 raise
1271 1276 finally:
1272 1277 if bundle:
1273 1278 bundle.close()
1274 1279
1275 1280
1276 1281 @bundle2.parthandler(
1277 1282 scratchbranchparttype,
1278 1283 (
1279 1284 b'bookmark',
1280 1285 b'bookprevnode',
1281 1286 b'force',
1282 1287 b'pushbackbookmarks',
1283 1288 b'cgversion',
1284 1289 ),
1285 1290 )
1286 1291 def bundle2scratchbranch(op, part):
1287 1292 '''unbundle a bundle2 part containing a changegroup to store'''
1288 1293
1289 1294 bundler = bundle2.bundle20(op.repo.ui)
1290 1295 cgversion = part.params.get(b'cgversion', b'01')
1291 1296 cgpart = bundle2.bundlepart(b'changegroup', data=part.read())
1292 1297 cgpart.addparam(b'version', cgversion)
1293 1298 bundler.addpart(cgpart)
1294 1299 buf = util.chunkbuffer(bundler.getchunks())
1295 1300
1296 1301 fd, bundlefile = pycompat.mkstemp()
1297 1302 try:
1298 1303 try:
1299 1304 fp = os.fdopen(fd, 'wb')
1300 1305 fp.write(buf.read())
1301 1306 finally:
1302 1307 fp.close()
1303 1308 storebundle(op, part.params, bundlefile)
1304 1309 finally:
1305 1310 try:
1306 1311 os.unlink(bundlefile)
1307 1312 except FileNotFoundError:
1308 1313 pass
1309 1314
1310 1315 return 1
1311 1316
1312 1317
1313 1318 def _maybeaddpushbackpart(op, bookmark, newnode, oldnode, params):
1314 1319 if params.get(b'pushbackbookmarks'):
1315 1320 if op.reply and b'pushback' in op.reply.capabilities:
1316 1321 params = {
1317 1322 b'namespace': b'bookmarks',
1318 1323 b'key': bookmark,
1319 1324 b'new': newnode,
1320 1325 b'old': oldnode,
1321 1326 }
1322 1327 op.reply.newpart(b'pushkey', mandatoryparams=params.items())
1323 1328
1324 1329
1325 1330 def bundle2pushkey(orig, op, part):
1326 1331 """Wrapper of bundle2.handlepushkey()
1327 1332
1328 1333 The only goal is to skip calling the original function if flag is set.
1329 1334 It's set if infinitepush push is happening.
1330 1335 """
1331 1336 if op.records[scratchbranchparttype + b'_skippushkey']:
1332 1337 if op.reply is not None:
1333 1338 rpart = op.reply.newpart(b'reply:pushkey')
1334 1339 rpart.addparam(b'in-reply-to', str(part.id), mandatory=False)
1335 1340 rpart.addparam(b'return', b'1', mandatory=False)
1336 1341 return 1
1337 1342
1338 1343 return orig(op, part)
1339 1344
1340 1345
1341 1346 def bundle2handlephases(orig, op, part):
1342 1347 """Wrapper of bundle2.handlephases()
1343 1348
1344 1349 The only goal is to skip calling the original function if flag is set.
1345 1350 It's set if infinitepush push is happening.
1346 1351 """
1347 1352
1348 1353 if op.records[scratchbranchparttype + b'_skipphaseheads']:
1349 1354 return
1350 1355
1351 1356 return orig(op, part)
1352 1357
1353 1358
1354 1359 def _asyncsavemetadata(root, nodes):
1355 1360 """starts a separate process that fills metadata for the nodes
1356 1361
1357 1362 This function creates a separate process and doesn't wait for it's
1358 1363 completion. This was done to avoid slowing down pushes
1359 1364 """
1360 1365
1361 1366 maxnodes = 50
1362 1367 if len(nodes) > maxnodes:
1363 1368 return
1364 1369 nodesargs = []
1365 1370 for node in nodes:
1366 1371 nodesargs.append(b'--node')
1367 1372 nodesargs.append(node)
1368 1373 with open(os.devnull, b'w+b') as devnull:
1369 1374 cmdline = [
1370 1375 util.hgexecutable(),
1371 1376 b'debugfillinfinitepushmetadata',
1372 1377 b'-R',
1373 1378 root,
1374 1379 ] + nodesargs
1375 1380 # Process will run in background. We don't care about the return code
1376 1381 subprocess.Popen(
1377 1382 pycompat.rapply(procutil.tonativestr, cmdline),
1378 1383 close_fds=True,
1379 1384 shell=False,
1380 1385 stdin=devnull,
1381 1386 stdout=devnull,
1382 1387 stderr=devnull,
1383 1388 )
General Comments 0
You need to be logged in to leave comments. Login now