##// END OF EJS Templates
path: pass `path` to `peer` in infinitepush...
marmoute -
r50629:4dfcdb20 default
parent child Browse files
Show More
@@ -1,1385 +1,1383 b''
1 1 # Infinite push
2 2 #
3 3 # Copyright 2016 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 """ store some pushes in a remote blob store on the server (EXPERIMENTAL)
8 8
9 9 IMPORTANT: if you use this extension, please contact
10 10 mercurial-devel@mercurial-scm.org ASAP. This extension is believed to
11 11 be unused and barring learning of users of this functionality, we will
12 12 delete this code at the end of 2020.
13 13
14 14 [infinitepush]
15 15 # Server-side and client-side option. Pattern of the infinitepush bookmark
16 16 branchpattern = PATTERN
17 17
18 18 # Server or client
19 19 server = False
20 20
21 21 # Server-side option. Possible values: 'disk' or 'sql'. Fails if not set
22 22 indextype = disk
23 23
24 24 # Server-side option. Used only if indextype=sql.
25 25 # Format: 'IP:PORT:DB_NAME:USER:PASSWORD'
26 26 sqlhost = IP:PORT:DB_NAME:USER:PASSWORD
27 27
28 28 # Server-side option. Used only if indextype=disk.
29 29 # Filesystem path to the index store
30 30 indexpath = PATH
31 31
32 32 # Server-side option. Possible values: 'disk' or 'external'
33 33 # Fails if not set
34 34 storetype = disk
35 35
36 36 # Server-side option.
37 37 # Path to the binary that will save bundle to the bundlestore
38 38 # Formatted cmd line will be passed to it (see `put_args`)
39 39 put_binary = put
40 40
41 41 # Serser-side option. Used only if storetype=external.
42 42 # Format cmd-line string for put binary. Placeholder: {filename}
43 43 put_args = {filename}
44 44
45 45 # Server-side option.
46 46 # Path to the binary that get bundle from the bundlestore.
47 47 # Formatted cmd line will be passed to it (see `get_args`)
48 48 get_binary = get
49 49
50 50 # Serser-side option. Used only if storetype=external.
51 51 # Format cmd-line string for get binary. Placeholders: {filename} {handle}
52 52 get_args = {filename} {handle}
53 53
54 54 # Server-side option
55 55 logfile = FIlE
56 56
57 57 # Server-side option
58 58 loglevel = DEBUG
59 59
60 60 # Server-side option. Used only if indextype=sql.
61 61 # Sets mysql wait_timeout option.
62 62 waittimeout = 300
63 63
64 64 # Server-side option. Used only if indextype=sql.
65 65 # Sets mysql innodb_lock_wait_timeout option.
66 66 locktimeout = 120
67 67
68 68 # Server-side option. Used only if indextype=sql.
69 69 # Name of the repository
70 70 reponame = ''
71 71
72 72 # Client-side option. Used by --list-remote option. List of remote scratch
73 73 # patterns to list if no patterns are specified.
74 74 defaultremotepatterns = ['*']
75 75
76 76 # Instructs infinitepush to forward all received bundle2 parts to the
77 77 # bundle for storage. Defaults to False.
78 78 storeallparts = True
79 79
80 80 # routes each incoming push to the bundlestore. defaults to False
81 81 pushtobundlestore = True
82 82
83 83 [remotenames]
84 84 # Client-side option
85 85 # This option should be set only if remotenames extension is enabled.
86 86 # Whether remote bookmarks are tracked by remotenames extension.
87 87 bookmarks = True
88 88 """
89 89
90 90
91 91 import collections
92 92 import contextlib
93 93 import functools
94 94 import logging
95 95 import os
96 96 import random
97 97 import re
98 98 import socket
99 99 import subprocess
100 100 import time
101 101
102 102 from mercurial.node import (
103 103 bin,
104 104 hex,
105 105 )
106 106
107 107 from mercurial.i18n import _
108 108
109 109 from mercurial.pycompat import (
110 110 getattr,
111 111 open,
112 112 )
113 113
114 114 from mercurial.utils import (
115 115 procutil,
116 116 stringutil,
117 117 urlutil,
118 118 )
119 119
120 120 from mercurial import (
121 121 bundle2,
122 122 changegroup,
123 123 commands,
124 124 discovery,
125 125 encoding,
126 126 error,
127 127 exchange,
128 128 extensions,
129 129 hg,
130 130 localrepo,
131 131 phases,
132 132 pushkey,
133 133 pycompat,
134 134 registrar,
135 135 util,
136 136 wireprototypes,
137 137 wireprotov1peer,
138 138 wireprotov1server,
139 139 )
140 140
141 141 from . import (
142 142 bundleparts,
143 143 common,
144 144 )
145 145
146 146 # Note for extension authors: ONLY specify testedwith = 'ships-with-hg-core' for
147 147 # extensions which SHIP WITH MERCURIAL. Non-mainline extensions should
148 148 # be specifying the version(s) of Mercurial they are tested with, or
149 149 # leave the attribute unspecified.
150 150 testedwith = b'ships-with-hg-core'
151 151
152 152 configtable = {}
153 153 configitem = registrar.configitem(configtable)
154 154
155 155 configitem(
156 156 b'infinitepush',
157 157 b'server',
158 158 default=False,
159 159 )
160 160 configitem(
161 161 b'infinitepush',
162 162 b'storetype',
163 163 default=b'',
164 164 )
165 165 configitem(
166 166 b'infinitepush',
167 167 b'indextype',
168 168 default=b'',
169 169 )
170 170 configitem(
171 171 b'infinitepush',
172 172 b'indexpath',
173 173 default=b'',
174 174 )
175 175 configitem(
176 176 b'infinitepush',
177 177 b'storeallparts',
178 178 default=False,
179 179 )
180 180 configitem(
181 181 b'infinitepush',
182 182 b'reponame',
183 183 default=b'',
184 184 )
185 185 configitem(
186 186 b'scratchbranch',
187 187 b'storepath',
188 188 default=b'',
189 189 )
190 190 configitem(
191 191 b'infinitepush',
192 192 b'branchpattern',
193 193 default=b'',
194 194 )
195 195 configitem(
196 196 b'infinitepush',
197 197 b'pushtobundlestore',
198 198 default=False,
199 199 )
200 200 configitem(
201 201 b'experimental',
202 202 b'server-bundlestore-bookmark',
203 203 default=b'',
204 204 )
205 205 configitem(
206 206 b'experimental',
207 207 b'infinitepush-scratchpush',
208 208 default=False,
209 209 )
210 210
211 211 experimental = b'experimental'
212 212 configbookmark = b'server-bundlestore-bookmark'
213 213 configscratchpush = b'infinitepush-scratchpush'
214 214
215 215 scratchbranchparttype = bundleparts.scratchbranchparttype
216 216 revsetpredicate = registrar.revsetpredicate()
217 217 templatekeyword = registrar.templatekeyword()
218 218 _scratchbranchmatcher = lambda x: False
219 219 _maybehash = re.compile('^[a-f0-9]+$').search
220 220
221 221
222 222 def _buildexternalbundlestore(ui):
223 223 put_args = ui.configlist(b'infinitepush', b'put_args', [])
224 224 put_binary = ui.config(b'infinitepush', b'put_binary')
225 225 if not put_binary:
226 226 raise error.Abort(b'put binary is not specified')
227 227 get_args = ui.configlist(b'infinitepush', b'get_args', [])
228 228 get_binary = ui.config(b'infinitepush', b'get_binary')
229 229 if not get_binary:
230 230 raise error.Abort(b'get binary is not specified')
231 231 from . import store
232 232
233 233 return store.externalbundlestore(put_binary, put_args, get_binary, get_args)
234 234
235 235
236 236 def _buildsqlindex(ui):
237 237 sqlhost = ui.config(b'infinitepush', b'sqlhost')
238 238 if not sqlhost:
239 239 raise error.Abort(_(b'please set infinitepush.sqlhost'))
240 240 host, port, db, user, password = sqlhost.split(b':')
241 241 reponame = ui.config(b'infinitepush', b'reponame')
242 242 if not reponame:
243 243 raise error.Abort(_(b'please set infinitepush.reponame'))
244 244
245 245 logfile = ui.config(b'infinitepush', b'logfile', b'')
246 246 waittimeout = ui.configint(b'infinitepush', b'waittimeout', 300)
247 247 locktimeout = ui.configint(b'infinitepush', b'locktimeout', 120)
248 248 from . import sqlindexapi
249 249
250 250 return sqlindexapi.sqlindexapi(
251 251 reponame,
252 252 host,
253 253 port,
254 254 db,
255 255 user,
256 256 password,
257 257 logfile,
258 258 _getloglevel(ui),
259 259 waittimeout=waittimeout,
260 260 locktimeout=locktimeout,
261 261 )
262 262
263 263
264 264 def _getloglevel(ui):
265 265 loglevel = ui.config(b'infinitepush', b'loglevel', b'DEBUG')
266 266 numeric_loglevel = getattr(logging, loglevel.upper(), None)
267 267 if not isinstance(numeric_loglevel, int):
268 268 raise error.Abort(_(b'invalid log level %s') % loglevel)
269 269 return numeric_loglevel
270 270
271 271
272 272 def _tryhoist(ui, remotebookmark):
273 273 """returns a bookmarks with hoisted part removed
274 274
275 275 Remotenames extension has a 'hoist' config that allows to use remote
276 276 bookmarks without specifying remote path. For example, 'hg update master'
277 277 works as well as 'hg update remote/master'. We want to allow the same in
278 278 infinitepush.
279 279 """
280 280
281 281 if common.isremotebooksenabled(ui):
282 282 hoist = ui.config(b'remotenames', b'hoistedpeer') + b'/'
283 283 if remotebookmark.startswith(hoist):
284 284 return remotebookmark[len(hoist) :]
285 285 return remotebookmark
286 286
287 287
288 288 class bundlestore:
289 289 def __init__(self, repo):
290 290 self._repo = repo
291 291 storetype = self._repo.ui.config(b'infinitepush', b'storetype')
292 292 if storetype == b'disk':
293 293 from . import store
294 294
295 295 self.store = store.filebundlestore(self._repo.ui, self._repo)
296 296 elif storetype == b'external':
297 297 self.store = _buildexternalbundlestore(self._repo.ui)
298 298 else:
299 299 raise error.Abort(
300 300 _(b'unknown infinitepush store type specified %s') % storetype
301 301 )
302 302
303 303 indextype = self._repo.ui.config(b'infinitepush', b'indextype')
304 304 if indextype == b'disk':
305 305 from . import fileindexapi
306 306
307 307 self.index = fileindexapi.fileindexapi(self._repo)
308 308 elif indextype == b'sql':
309 309 self.index = _buildsqlindex(self._repo.ui)
310 310 else:
311 311 raise error.Abort(
312 312 _(b'unknown infinitepush index type specified %s') % indextype
313 313 )
314 314
315 315
316 316 def _isserver(ui):
317 317 return ui.configbool(b'infinitepush', b'server')
318 318
319 319
320 320 def reposetup(ui, repo):
321 321 if _isserver(ui) and repo.local():
322 322 repo.bundlestore = bundlestore(repo)
323 323
324 324
325 325 def extsetup(ui):
326 326 commonsetup(ui)
327 327 if _isserver(ui):
328 328 serverextsetup(ui)
329 329 else:
330 330 clientextsetup(ui)
331 331
332 332
333 333 def commonsetup(ui):
334 334 wireprotov1server.commands[b'listkeyspatterns'] = (
335 335 wireprotolistkeyspatterns,
336 336 b'namespace patterns',
337 337 )
338 338 scratchbranchpat = ui.config(b'infinitepush', b'branchpattern')
339 339 if scratchbranchpat:
340 340 global _scratchbranchmatcher
341 341 kind, pat, _scratchbranchmatcher = stringutil.stringmatcher(
342 342 scratchbranchpat
343 343 )
344 344
345 345
346 346 def serverextsetup(ui):
347 347 origpushkeyhandler = bundle2.parthandlermapping[b'pushkey']
348 348
349 349 def newpushkeyhandler(*args, **kwargs):
350 350 bundle2pushkey(origpushkeyhandler, *args, **kwargs)
351 351
352 352 newpushkeyhandler.params = origpushkeyhandler.params
353 353 bundle2.parthandlermapping[b'pushkey'] = newpushkeyhandler
354 354
355 355 orighandlephasehandler = bundle2.parthandlermapping[b'phase-heads']
356 356 newphaseheadshandler = lambda *args, **kwargs: bundle2handlephases(
357 357 orighandlephasehandler, *args, **kwargs
358 358 )
359 359 newphaseheadshandler.params = orighandlephasehandler.params
360 360 bundle2.parthandlermapping[b'phase-heads'] = newphaseheadshandler
361 361
362 362 extensions.wrapfunction(
363 363 localrepo.localrepository, b'listkeys', localrepolistkeys
364 364 )
365 365 wireprotov1server.commands[b'lookup'] = (
366 366 _lookupwrap(wireprotov1server.commands[b'lookup'][0]),
367 367 b'key',
368 368 )
369 369 extensions.wrapfunction(exchange, b'getbundlechunks', getbundlechunks)
370 370
371 371 extensions.wrapfunction(bundle2, b'processparts', processparts)
372 372
373 373
374 374 def clientextsetup(ui):
375 375 entry = extensions.wrapcommand(commands.table, b'push', _push)
376 376
377 377 entry[1].append(
378 378 (
379 379 b'',
380 380 b'bundle-store',
381 381 None,
382 382 _(b'force push to go to bundle store (EXPERIMENTAL)'),
383 383 )
384 384 )
385 385
386 386 extensions.wrapcommand(commands.table, b'pull', _pull)
387 387
388 388 extensions.wrapfunction(discovery, b'checkheads', _checkheads)
389 389
390 390 wireprotov1peer.wirepeer.listkeyspatterns = listkeyspatterns
391 391
392 392 partorder = exchange.b2partsgenorder
393 393 index = partorder.index(b'changeset')
394 394 partorder.insert(
395 395 index, partorder.pop(partorder.index(scratchbranchparttype))
396 396 )
397 397
398 398
399 399 def _checkheads(orig, pushop):
400 400 if pushop.ui.configbool(experimental, configscratchpush, False):
401 401 return
402 402 return orig(pushop)
403 403
404 404
405 405 def wireprotolistkeyspatterns(repo, proto, namespace, patterns):
406 406 patterns = wireprototypes.decodelist(patterns)
407 407 d = repo.listkeys(encoding.tolocal(namespace), patterns).items()
408 408 return pushkey.encodekeys(d)
409 409
410 410
411 411 def localrepolistkeys(orig, self, namespace, patterns=None):
412 412 if namespace == b'bookmarks' and patterns:
413 413 index = self.bundlestore.index
414 414 results = {}
415 415 bookmarks = orig(self, namespace)
416 416 for pattern in patterns:
417 417 results.update(index.getbookmarks(pattern))
418 418 if pattern.endswith(b'*'):
419 419 pattern = b're:^' + pattern[:-1] + b'.*'
420 420 kind, pat, matcher = stringutil.stringmatcher(pattern)
421 421 for bookmark, node in bookmarks.items():
422 422 if matcher(bookmark):
423 423 results[bookmark] = node
424 424 return results
425 425 else:
426 426 return orig(self, namespace)
427 427
428 428
429 429 @wireprotov1peer.batchable
430 430 def listkeyspatterns(self, namespace, patterns):
431 431 if not self.capable(b'pushkey'):
432 432 return {}, None
433 433 self.ui.debug(b'preparing listkeys for "%s"\n' % namespace)
434 434
435 435 def decode(d):
436 436 self.ui.debug(
437 437 b'received listkey for "%s": %i bytes\n' % (namespace, len(d))
438 438 )
439 439 return pushkey.decodekeys(d)
440 440
441 441 return {
442 442 b'namespace': encoding.fromlocal(namespace),
443 443 b'patterns': wireprototypes.encodelist(patterns),
444 444 }, decode
445 445
446 446
447 447 def _readbundlerevs(bundlerepo):
448 448 return list(bundlerepo.revs(b'bundle()'))
449 449
450 450
451 451 def _includefilelogstobundle(bundlecaps, bundlerepo, bundlerevs, ui):
452 452 """Tells remotefilelog to include all changed files to the changegroup
453 453
454 454 By default remotefilelog doesn't include file content to the changegroup.
455 455 But we need to include it if we are fetching from bundlestore.
456 456 """
457 457 changedfiles = set()
458 458 cl = bundlerepo.changelog
459 459 for r in bundlerevs:
460 460 # [3] means changed files
461 461 changedfiles.update(cl.read(r)[3])
462 462 if not changedfiles:
463 463 return bundlecaps
464 464
465 465 changedfiles = b'\0'.join(changedfiles)
466 466 newcaps = []
467 467 appended = False
468 468 for cap in bundlecaps or []:
469 469 if cap.startswith(b'excludepattern='):
470 470 newcaps.append(b'\0'.join((cap, changedfiles)))
471 471 appended = True
472 472 else:
473 473 newcaps.append(cap)
474 474 if not appended:
475 475 # Not found excludepattern cap. Just append it
476 476 newcaps.append(b'excludepattern=' + changedfiles)
477 477
478 478 return newcaps
479 479
480 480
481 481 def _rebundle(bundlerepo, bundleroots, unknownhead):
482 482 """
483 483 Bundle may include more revision then user requested. For example,
484 484 if user asks for revision but bundle also consists its descendants.
485 485 This function will filter out all revision that user is not requested.
486 486 """
487 487 parts = []
488 488
489 489 version = b'02'
490 490 outgoing = discovery.outgoing(
491 491 bundlerepo, commonheads=bundleroots, ancestorsof=[unknownhead]
492 492 )
493 493 cgstream = changegroup.makestream(bundlerepo, outgoing, version, b'pull')
494 494 cgstream = util.chunkbuffer(cgstream).read()
495 495 cgpart = bundle2.bundlepart(b'changegroup', data=cgstream)
496 496 cgpart.addparam(b'version', version)
497 497 parts.append(cgpart)
498 498
499 499 return parts
500 500
501 501
502 502 def _getbundleroots(oldrepo, bundlerepo, bundlerevs):
503 503 cl = bundlerepo.changelog
504 504 bundleroots = []
505 505 for rev in bundlerevs:
506 506 node = cl.node(rev)
507 507 parents = cl.parents(node)
508 508 for parent in parents:
509 509 # include all revs that exist in the main repo
510 510 # to make sure that bundle may apply client-side
511 511 if parent in oldrepo:
512 512 bundleroots.append(parent)
513 513 return bundleroots
514 514
515 515
516 516 def _needsrebundling(head, bundlerepo):
517 517 bundleheads = list(bundlerepo.revs(b'heads(bundle())'))
518 518 return not (
519 519 len(bundleheads) == 1 and bundlerepo[bundleheads[0]].node() == head
520 520 )
521 521
522 522
523 523 def _generateoutputparts(head, bundlerepo, bundleroots, bundlefile):
524 524 """generates bundle that will be send to the user
525 525
526 526 returns tuple with raw bundle string and bundle type
527 527 """
528 528 parts = []
529 529 if not _needsrebundling(head, bundlerepo):
530 530 with util.posixfile(bundlefile, b"rb") as f:
531 531 unbundler = exchange.readbundle(bundlerepo.ui, f, bundlefile)
532 532 if isinstance(unbundler, changegroup.cg1unpacker):
533 533 part = bundle2.bundlepart(
534 534 b'changegroup', data=unbundler._stream.read()
535 535 )
536 536 part.addparam(b'version', b'01')
537 537 parts.append(part)
538 538 elif isinstance(unbundler, bundle2.unbundle20):
539 539 haschangegroup = False
540 540 for part in unbundler.iterparts():
541 541 if part.type == b'changegroup':
542 542 haschangegroup = True
543 543 newpart = bundle2.bundlepart(part.type, data=part.read())
544 544 for key, value in part.params.items():
545 545 newpart.addparam(key, value)
546 546 parts.append(newpart)
547 547
548 548 if not haschangegroup:
549 549 raise error.Abort(
550 550 b'unexpected bundle without changegroup part, '
551 551 + b'head: %s' % hex(head),
552 552 hint=b'report to administrator',
553 553 )
554 554 else:
555 555 raise error.Abort(b'unknown bundle type')
556 556 else:
557 557 parts = _rebundle(bundlerepo, bundleroots, head)
558 558
559 559 return parts
560 560
561 561
562 562 def getbundlechunks(orig, repo, source, heads=None, bundlecaps=None, **kwargs):
563 563 heads = heads or []
564 564 # newheads are parents of roots of scratch bundles that were requested
565 565 newphases = {}
566 566 scratchbundles = []
567 567 newheads = []
568 568 scratchheads = []
569 569 nodestobundle = {}
570 570 allbundlestocleanup = []
571 571 try:
572 572 for head in heads:
573 573 if not repo.changelog.index.has_node(head):
574 574 if head not in nodestobundle:
575 575 newbundlefile = common.downloadbundle(repo, head)
576 576 bundlepath = b"bundle:%s+%s" % (repo.root, newbundlefile)
577 577 bundlerepo = hg.repository(repo.ui, bundlepath)
578 578
579 579 allbundlestocleanup.append((bundlerepo, newbundlefile))
580 580 bundlerevs = set(_readbundlerevs(bundlerepo))
581 581 bundlecaps = _includefilelogstobundle(
582 582 bundlecaps, bundlerepo, bundlerevs, repo.ui
583 583 )
584 584 cl = bundlerepo.changelog
585 585 bundleroots = _getbundleroots(repo, bundlerepo, bundlerevs)
586 586 for rev in bundlerevs:
587 587 node = cl.node(rev)
588 588 newphases[hex(node)] = str(phases.draft)
589 589 nodestobundle[node] = (
590 590 bundlerepo,
591 591 bundleroots,
592 592 newbundlefile,
593 593 )
594 594
595 595 scratchbundles.append(
596 596 _generateoutputparts(head, *nodestobundle[head])
597 597 )
598 598 newheads.extend(bundleroots)
599 599 scratchheads.append(head)
600 600 finally:
601 601 for bundlerepo, bundlefile in allbundlestocleanup:
602 602 bundlerepo.close()
603 603 try:
604 604 os.unlink(bundlefile)
605 605 except (IOError, OSError):
606 606 # if we can't cleanup the file then just ignore the error,
607 607 # no need to fail
608 608 pass
609 609
610 610 pullfrombundlestore = bool(scratchbundles)
611 611 wrappedchangegrouppart = False
612 612 wrappedlistkeys = False
613 613 oldchangegrouppart = exchange.getbundle2partsmapping[b'changegroup']
614 614 try:
615 615
616 616 def _changegrouppart(bundler, *args, **kwargs):
617 617 # Order is important here. First add non-scratch part
618 618 # and only then add parts with scratch bundles because
619 619 # non-scratch part contains parents of roots of scratch bundles.
620 620 result = oldchangegrouppart(bundler, *args, **kwargs)
621 621 for bundle in scratchbundles:
622 622 for part in bundle:
623 623 bundler.addpart(part)
624 624 return result
625 625
626 626 exchange.getbundle2partsmapping[b'changegroup'] = _changegrouppart
627 627 wrappedchangegrouppart = True
628 628
629 629 def _listkeys(orig, self, namespace):
630 630 origvalues = orig(self, namespace)
631 631 if namespace == b'phases' and pullfrombundlestore:
632 632 if origvalues.get(b'publishing') == b'True':
633 633 # Make repo non-publishing to preserve draft phase
634 634 del origvalues[b'publishing']
635 635 origvalues.update(newphases)
636 636 return origvalues
637 637
638 638 extensions.wrapfunction(
639 639 localrepo.localrepository, b'listkeys', _listkeys
640 640 )
641 641 wrappedlistkeys = True
642 642 heads = list((set(newheads) | set(heads)) - set(scratchheads))
643 643 result = orig(
644 644 repo, source, heads=heads, bundlecaps=bundlecaps, **kwargs
645 645 )
646 646 finally:
647 647 if wrappedchangegrouppart:
648 648 exchange.getbundle2partsmapping[b'changegroup'] = oldchangegrouppart
649 649 if wrappedlistkeys:
650 650 extensions.unwrapfunction(
651 651 localrepo.localrepository, b'listkeys', _listkeys
652 652 )
653 653 return result
654 654
655 655
656 656 def _lookupwrap(orig):
657 657 def _lookup(repo, proto, key):
658 658 localkey = encoding.tolocal(key)
659 659
660 660 if isinstance(localkey, str) and _scratchbranchmatcher(localkey):
661 661 scratchnode = repo.bundlestore.index.getnode(localkey)
662 662 if scratchnode:
663 663 return b"%d %s\n" % (1, scratchnode)
664 664 else:
665 665 return b"%d %s\n" % (
666 666 0,
667 667 b'scratch branch %s not found' % localkey,
668 668 )
669 669 else:
670 670 try:
671 671 r = hex(repo.lookup(localkey))
672 672 return b"%d %s\n" % (1, r)
673 673 except Exception as inst:
674 674 if repo.bundlestore.index.getbundle(localkey):
675 675 return b"%d %s\n" % (1, localkey)
676 676 else:
677 677 r = stringutil.forcebytestr(inst)
678 678 return b"%d %s\n" % (0, r)
679 679
680 680 return _lookup
681 681
682 682
683 683 def _pull(orig, ui, repo, source=b"default", **opts):
684 684 opts = pycompat.byteskwargs(opts)
685 685 # Copy paste from `pull` command
686 source, branches = urlutil.get_unique_pull_path(
686 path = urlutil.get_unique_pull_path_obj(
687 687 b"infinite-push's pull",
688 repo,
689 688 ui,
690 689 source,
691 default_branches=opts.get(b'branch'),
692 690 )
693 691
694 692 scratchbookmarks = {}
695 693 unfi = repo.unfiltered()
696 694 unknownnodes = []
697 695 for rev in opts.get(b'rev', []):
698 696 if rev not in unfi:
699 697 unknownnodes.append(rev)
700 698 if opts.get(b'bookmark'):
701 699 bookmarks = []
702 700 revs = opts.get(b'rev') or []
703 701 for bookmark in opts.get(b'bookmark'):
704 702 if _scratchbranchmatcher(bookmark):
705 703 # rev is not known yet
706 704 # it will be fetched with listkeyspatterns next
707 705 scratchbookmarks[bookmark] = b'REVTOFETCH'
708 706 else:
709 707 bookmarks.append(bookmark)
710 708
711 709 if scratchbookmarks:
712 other = hg.peer(repo, opts, source)
710 other = hg.peer(repo, opts, path)
713 711 try:
714 712 fetchedbookmarks = other.listkeyspatterns(
715 713 b'bookmarks', patterns=scratchbookmarks
716 714 )
717 715 for bookmark in scratchbookmarks:
718 716 if bookmark not in fetchedbookmarks:
719 717 raise error.Abort(
720 718 b'remote bookmark %s not found!' % bookmark
721 719 )
722 720 scratchbookmarks[bookmark] = fetchedbookmarks[bookmark]
723 721 revs.append(fetchedbookmarks[bookmark])
724 722 finally:
725 723 other.close()
726 724 opts[b'bookmark'] = bookmarks
727 725 opts[b'rev'] = revs
728 726
729 727 if scratchbookmarks or unknownnodes:
730 728 # Set anyincoming to True
731 729 extensions.wrapfunction(
732 730 discovery, b'findcommonincoming', _findcommonincoming
733 731 )
734 732 try:
735 733 # Remote scratch bookmarks will be deleted because remotenames doesn't
736 734 # know about them. Let's save it before pull and restore after
737 remotescratchbookmarks = _readscratchremotebookmarks(ui, repo, source)
738 result = orig(ui, repo, source, **pycompat.strkwargs(opts))
735 remotescratchbookmarks = _readscratchremotebookmarks(ui, repo, path.loc)
736 result = orig(ui, repo, path.loc, **pycompat.strkwargs(opts))
739 737 # TODO(stash): race condition is possible
740 738 # if scratch bookmarks was updated right after orig.
741 739 # But that's unlikely and shouldn't be harmful.
742 740 if common.isremotebooksenabled(ui):
743 741 remotescratchbookmarks.update(scratchbookmarks)
744 _saveremotebookmarks(repo, remotescratchbookmarks, source)
742 _saveremotebookmarks(repo, remotescratchbookmarks, path.loc)
745 743 else:
746 744 _savelocalbookmarks(repo, scratchbookmarks)
747 745 return result
748 746 finally:
749 747 if scratchbookmarks:
750 748 extensions.unwrapfunction(discovery, b'findcommonincoming')
751 749
752 750
753 751 def _readscratchremotebookmarks(ui, repo, other):
754 752 if common.isremotebooksenabled(ui):
755 753 remotenamesext = extensions.find(b'remotenames')
756 754 remotepath = remotenamesext.activepath(repo.ui, other)
757 755 result = {}
758 756 # Let's refresh remotenames to make sure we have it up to date
759 757 # Seems that `repo.names['remotebookmarks']` may return stale bookmarks
760 758 # and it results in deleting scratch bookmarks. Our best guess how to
761 759 # fix it is to use `clearnames()`
762 760 repo._remotenames.clearnames()
763 761 for remotebookmark in repo.names[b'remotebookmarks'].listnames(repo):
764 762 path, bookname = remotenamesext.splitremotename(remotebookmark)
765 763 if path == remotepath and _scratchbranchmatcher(bookname):
766 764 nodes = repo.names[b'remotebookmarks'].nodes(
767 765 repo, remotebookmark
768 766 )
769 767 if nodes:
770 768 result[bookname] = hex(nodes[0])
771 769 return result
772 770 else:
773 771 return {}
774 772
775 773
776 774 def _saveremotebookmarks(repo, newbookmarks, remote):
777 775 remotenamesext = extensions.find(b'remotenames')
778 776 remotepath = remotenamesext.activepath(repo.ui, remote)
779 777 branches = collections.defaultdict(list)
780 778 bookmarks = {}
781 779 remotenames = remotenamesext.readremotenames(repo)
782 780 for hexnode, nametype, remote, rname in remotenames:
783 781 if remote != remotepath:
784 782 continue
785 783 if nametype == b'bookmarks':
786 784 if rname in newbookmarks:
787 785 # It's possible if we have a normal bookmark that matches
788 786 # scratch branch pattern. In this case just use the current
789 787 # bookmark node
790 788 del newbookmarks[rname]
791 789 bookmarks[rname] = hexnode
792 790 elif nametype == b'branches':
793 791 # saveremotenames expects 20 byte binary nodes for branches
794 792 branches[rname].append(bin(hexnode))
795 793
796 794 for bookmark, hexnode in newbookmarks.items():
797 795 bookmarks[bookmark] = hexnode
798 796 remotenamesext.saveremotenames(repo, remotepath, branches, bookmarks)
799 797
800 798
801 799 def _savelocalbookmarks(repo, bookmarks):
802 800 if not bookmarks:
803 801 return
804 802 with repo.wlock(), repo.lock(), repo.transaction(b'bookmark') as tr:
805 803 changes = []
806 804 for scratchbook, node in bookmarks.items():
807 805 changectx = repo[node]
808 806 changes.append((scratchbook, changectx.node()))
809 807 repo._bookmarks.applychanges(repo, tr, changes)
810 808
811 809
812 810 def _findcommonincoming(orig, *args, **kwargs):
813 811 common, inc, remoteheads = orig(*args, **kwargs)
814 812 return common, True, remoteheads
815 813
816 814
817 815 def _push(orig, ui, repo, *dests, **opts):
818 816 opts = pycompat.byteskwargs(opts)
819 817 bookmark = opts.get(b'bookmark')
820 818 # we only support pushing one infinitepush bookmark at once
821 819 if len(bookmark) == 1:
822 820 bookmark = bookmark[0]
823 821 else:
824 822 bookmark = b''
825 823
826 824 oldphasemove = None
827 825 overrides = {(experimental, configbookmark): bookmark}
828 826
829 827 with ui.configoverride(overrides, b'infinitepush'):
830 828 scratchpush = opts.get(b'bundle_store')
831 829 if _scratchbranchmatcher(bookmark):
832 830 scratchpush = True
833 831 # bundle2 can be sent back after push (for example, bundle2
834 832 # containing `pushkey` part to update bookmarks)
835 833 ui.setconfig(experimental, b'bundle2.pushback', True)
836 834
837 835 if scratchpush:
838 836 # this is an infinitepush, we don't want the bookmark to be applied
839 837 # rather that should be stored in the bundlestore
840 838 opts[b'bookmark'] = []
841 839 ui.setconfig(experimental, configscratchpush, True)
842 840 oldphasemove = extensions.wrapfunction(
843 841 exchange, b'_localphasemove', _phasemove
844 842 )
845 843
846 844 paths = list(urlutil.get_push_paths(repo, ui, dests))
847 845 if len(paths) > 1:
848 846 msg = _(b'cannot push to multiple path with infinitepush')
849 847 raise error.Abort(msg)
850 848
851 849 path = paths[0]
852 850 destpath = path.loc
853 851 # Remote scratch bookmarks will be deleted because remotenames doesn't
854 852 # know about them. Let's save it before push and restore after
855 853 remotescratchbookmarks = _readscratchremotebookmarks(ui, repo, destpath)
856 854 result = orig(ui, repo, *dests, **pycompat.strkwargs(opts))
857 855 if common.isremotebooksenabled(ui):
858 856 if bookmark and scratchpush:
859 857 other = hg.peer(repo, opts, path)
860 858 try:
861 859 fetchedbookmarks = other.listkeyspatterns(
862 860 b'bookmarks', patterns=[bookmark]
863 861 )
864 862 remotescratchbookmarks.update(fetchedbookmarks)
865 863 finally:
866 864 other.close()
867 865 _saveremotebookmarks(repo, remotescratchbookmarks, destpath)
868 866 if oldphasemove:
869 867 exchange._localphasemove = oldphasemove
870 868 return result
871 869
872 870
873 871 def _deleteinfinitepushbookmarks(ui, repo, path, names):
874 872 """Prune remote names by removing the bookmarks we don't want anymore,
875 873 then writing the result back to disk
876 874 """
877 875 remotenamesext = extensions.find(b'remotenames')
878 876
879 877 # remotename format is:
880 878 # (node, nametype ("branches" or "bookmarks"), remote, name)
881 879 nametype_idx = 1
882 880 remote_idx = 2
883 881 name_idx = 3
884 882 remotenames = [
885 883 remotename
886 884 for remotename in remotenamesext.readremotenames(repo)
887 885 if remotename[remote_idx] == path
888 886 ]
889 887 remote_bm_names = [
890 888 remotename[name_idx]
891 889 for remotename in remotenames
892 890 if remotename[nametype_idx] == b"bookmarks"
893 891 ]
894 892
895 893 for name in names:
896 894 if name not in remote_bm_names:
897 895 raise error.Abort(
898 896 _(
899 897 b"infinitepush bookmark '{}' does not exist "
900 898 b"in path '{}'"
901 899 ).format(name, path)
902 900 )
903 901
904 902 bookmarks = {}
905 903 branches = collections.defaultdict(list)
906 904 for node, nametype, remote, name in remotenames:
907 905 if nametype == b"bookmarks" and name not in names:
908 906 bookmarks[name] = node
909 907 elif nametype == b"branches":
910 908 # saveremotenames wants binary nodes for branches
911 909 branches[name].append(bin(node))
912 910
913 911 remotenamesext.saveremotenames(repo, path, branches, bookmarks)
914 912
915 913
916 914 def _phasemove(orig, pushop, nodes, phase=phases.public):
917 915 """prevent commits from being marked public
918 916
919 917 Since these are going to a scratch branch, they aren't really being
920 918 published."""
921 919
922 920 if phase != phases.public:
923 921 orig(pushop, nodes, phase)
924 922
925 923
926 924 @exchange.b2partsgenerator(scratchbranchparttype)
927 925 def partgen(pushop, bundler):
928 926 bookmark = pushop.ui.config(experimental, configbookmark)
929 927 scratchpush = pushop.ui.configbool(experimental, configscratchpush)
930 928 if b'changesets' in pushop.stepsdone or not scratchpush:
931 929 return
932 930
933 931 if scratchbranchparttype not in bundle2.bundle2caps(pushop.remote):
934 932 return
935 933
936 934 pushop.stepsdone.add(b'changesets')
937 935 if not pushop.outgoing.missing:
938 936 pushop.ui.status(_(b'no changes found\n'))
939 937 pushop.cgresult = 0
940 938 return
941 939
942 940 # This parameter tells the server that the following bundle is an
943 941 # infinitepush. This let's it switch the part processing to our infinitepush
944 942 # code path.
945 943 bundler.addparam(b"infinitepush", b"True")
946 944
947 945 scratchparts = bundleparts.getscratchbranchparts(
948 946 pushop.repo, pushop.remote, pushop.outgoing, pushop.ui, bookmark
949 947 )
950 948
951 949 for scratchpart in scratchparts:
952 950 bundler.addpart(scratchpart)
953 951
954 952 def handlereply(op):
955 953 # server either succeeds or aborts; no code to read
956 954 pushop.cgresult = 1
957 955
958 956 return handlereply
959 957
960 958
961 959 bundle2.capabilities[bundleparts.scratchbranchparttype] = ()
962 960
963 961
964 962 def _getrevs(bundle, oldnode, force, bookmark):
965 963 b'extracts and validates the revs to be imported'
966 964 revs = [bundle[r] for r in bundle.revs(b'sort(bundle())')]
967 965
968 966 # new bookmark
969 967 if oldnode is None:
970 968 return revs
971 969
972 970 # Fast forward update
973 971 if oldnode in bundle and list(bundle.set(b'bundle() & %s::', oldnode)):
974 972 return revs
975 973
976 974 return revs
977 975
978 976
979 977 @contextlib.contextmanager
980 978 def logservicecall(logger, service, **kwargs):
981 979 start = time.time()
982 980 logger(service, eventtype=b'start', **kwargs)
983 981 try:
984 982 yield
985 983 logger(
986 984 service,
987 985 eventtype=b'success',
988 986 elapsedms=(time.time() - start) * 1000,
989 987 **kwargs
990 988 )
991 989 except Exception as e:
992 990 logger(
993 991 service,
994 992 eventtype=b'failure',
995 993 elapsedms=(time.time() - start) * 1000,
996 994 errormsg=stringutil.forcebytestr(e),
997 995 **kwargs
998 996 )
999 997 raise
1000 998
1001 999
1002 1000 def _getorcreateinfinitepushlogger(op):
1003 1001 logger = op.records[b'infinitepushlogger']
1004 1002 if not logger:
1005 1003 ui = op.repo.ui
1006 1004 try:
1007 1005 username = procutil.getuser()
1008 1006 except Exception:
1009 1007 username = b'unknown'
1010 1008 # Generate random request id to be able to find all logged entries
1011 1009 # for the same request. Since requestid is pseudo-generated it may
1012 1010 # not be unique, but we assume that (hostname, username, requestid)
1013 1011 # is unique.
1014 1012 random.seed()
1015 1013 requestid = random.randint(0, 2000000000)
1016 1014 hostname = socket.gethostname()
1017 1015 logger = functools.partial(
1018 1016 ui.log,
1019 1017 b'infinitepush',
1020 1018 user=username,
1021 1019 requestid=requestid,
1022 1020 hostname=hostname,
1023 1021 reponame=ui.config(b'infinitepush', b'reponame'),
1024 1022 )
1025 1023 op.records.add(b'infinitepushlogger', logger)
1026 1024 else:
1027 1025 logger = logger[0]
1028 1026 return logger
1029 1027
1030 1028
1031 1029 def storetobundlestore(orig, repo, op, unbundler):
1032 1030 """stores the incoming bundle coming from push command to the bundlestore
1033 1031 instead of applying on the revlogs"""
1034 1032
1035 1033 repo.ui.status(_(b"storing changesets on the bundlestore\n"))
1036 1034 bundler = bundle2.bundle20(repo.ui)
1037 1035
1038 1036 # processing each part and storing it in bundler
1039 1037 with bundle2.partiterator(repo, op, unbundler) as parts:
1040 1038 for part in parts:
1041 1039 bundlepart = None
1042 1040 if part.type == b'replycaps':
1043 1041 # This configures the current operation to allow reply parts.
1044 1042 bundle2._processpart(op, part)
1045 1043 else:
1046 1044 bundlepart = bundle2.bundlepart(part.type, data=part.read())
1047 1045 for key, value in part.params.items():
1048 1046 bundlepart.addparam(key, value)
1049 1047
1050 1048 # Certain parts require a response
1051 1049 if part.type in (b'pushkey', b'changegroup'):
1052 1050 if op.reply is not None:
1053 1051 rpart = op.reply.newpart(b'reply:%s' % part.type)
1054 1052 rpart.addparam(
1055 1053 b'in-reply-to', b'%d' % part.id, mandatory=False
1056 1054 )
1057 1055 rpart.addparam(b'return', b'1', mandatory=False)
1058 1056
1059 1057 op.records.add(
1060 1058 part.type,
1061 1059 {
1062 1060 b'return': 1,
1063 1061 },
1064 1062 )
1065 1063 if bundlepart:
1066 1064 bundler.addpart(bundlepart)
1067 1065
1068 1066 # storing the bundle in the bundlestore
1069 1067 buf = util.chunkbuffer(bundler.getchunks())
1070 1068 fd, bundlefile = pycompat.mkstemp()
1071 1069 try:
1072 1070 try:
1073 1071 fp = os.fdopen(fd, 'wb')
1074 1072 fp.write(buf.read())
1075 1073 finally:
1076 1074 fp.close()
1077 1075 storebundle(op, {}, bundlefile)
1078 1076 finally:
1079 1077 try:
1080 1078 os.unlink(bundlefile)
1081 1079 except Exception:
1082 1080 # we would rather see the original exception
1083 1081 pass
1084 1082
1085 1083
1086 1084 def processparts(orig, repo, op, unbundler):
1087 1085
1088 1086 # make sure we don't wrap processparts in case of `hg unbundle`
1089 1087 if op.source == b'unbundle':
1090 1088 return orig(repo, op, unbundler)
1091 1089
1092 1090 # this server routes each push to bundle store
1093 1091 if repo.ui.configbool(b'infinitepush', b'pushtobundlestore'):
1094 1092 return storetobundlestore(orig, repo, op, unbundler)
1095 1093
1096 1094 if unbundler.params.get(b'infinitepush') != b'True':
1097 1095 return orig(repo, op, unbundler)
1098 1096
1099 1097 handleallparts = repo.ui.configbool(b'infinitepush', b'storeallparts')
1100 1098
1101 1099 bundler = bundle2.bundle20(repo.ui)
1102 1100 cgparams = None
1103 1101 with bundle2.partiterator(repo, op, unbundler) as parts:
1104 1102 for part in parts:
1105 1103 bundlepart = None
1106 1104 if part.type == b'replycaps':
1107 1105 # This configures the current operation to allow reply parts.
1108 1106 bundle2._processpart(op, part)
1109 1107 elif part.type == bundleparts.scratchbranchparttype:
1110 1108 # Scratch branch parts need to be converted to normal
1111 1109 # changegroup parts, and the extra parameters stored for later
1112 1110 # when we upload to the store. Eventually those parameters will
1113 1111 # be put on the actual bundle instead of this part, then we can
1114 1112 # send a vanilla changegroup instead of the scratchbranch part.
1115 1113 cgversion = part.params.get(b'cgversion', b'01')
1116 1114 bundlepart = bundle2.bundlepart(
1117 1115 b'changegroup', data=part.read()
1118 1116 )
1119 1117 bundlepart.addparam(b'version', cgversion)
1120 1118 cgparams = part.params
1121 1119
1122 1120 # If we're not dumping all parts into the new bundle, we need to
1123 1121 # alert the future pushkey and phase-heads handler to skip
1124 1122 # the part.
1125 1123 if not handleallparts:
1126 1124 op.records.add(
1127 1125 scratchbranchparttype + b'_skippushkey', True
1128 1126 )
1129 1127 op.records.add(
1130 1128 scratchbranchparttype + b'_skipphaseheads', True
1131 1129 )
1132 1130 else:
1133 1131 if handleallparts:
1134 1132 # Ideally we would not process any parts, and instead just
1135 1133 # forward them to the bundle for storage, but since this
1136 1134 # differs from previous behavior, we need to put it behind a
1137 1135 # config flag for incremental rollout.
1138 1136 bundlepart = bundle2.bundlepart(part.type, data=part.read())
1139 1137 for key, value in part.params.items():
1140 1138 bundlepart.addparam(key, value)
1141 1139
1142 1140 # Certain parts require a response
1143 1141 if part.type == b'pushkey':
1144 1142 if op.reply is not None:
1145 1143 rpart = op.reply.newpart(b'reply:pushkey')
1146 1144 rpart.addparam(
1147 1145 b'in-reply-to', str(part.id), mandatory=False
1148 1146 )
1149 1147 rpart.addparam(b'return', b'1', mandatory=False)
1150 1148 else:
1151 1149 bundle2._processpart(op, part)
1152 1150
1153 1151 if handleallparts:
1154 1152 op.records.add(
1155 1153 part.type,
1156 1154 {
1157 1155 b'return': 1,
1158 1156 },
1159 1157 )
1160 1158 if bundlepart:
1161 1159 bundler.addpart(bundlepart)
1162 1160
1163 1161 # If commits were sent, store them
1164 1162 if cgparams:
1165 1163 buf = util.chunkbuffer(bundler.getchunks())
1166 1164 fd, bundlefile = pycompat.mkstemp()
1167 1165 try:
1168 1166 try:
1169 1167 fp = os.fdopen(fd, 'wb')
1170 1168 fp.write(buf.read())
1171 1169 finally:
1172 1170 fp.close()
1173 1171 storebundle(op, cgparams, bundlefile)
1174 1172 finally:
1175 1173 try:
1176 1174 os.unlink(bundlefile)
1177 1175 except Exception:
1178 1176 # we would rather see the original exception
1179 1177 pass
1180 1178
1181 1179
1182 1180 def storebundle(op, params, bundlefile):
1183 1181 log = _getorcreateinfinitepushlogger(op)
1184 1182 parthandlerstart = time.time()
1185 1183 log(scratchbranchparttype, eventtype=b'start')
1186 1184 index = op.repo.bundlestore.index
1187 1185 store = op.repo.bundlestore.store
1188 1186 op.records.add(scratchbranchparttype + b'_skippushkey', True)
1189 1187
1190 1188 bundle = None
1191 1189 try: # guards bundle
1192 1190 bundlepath = b"bundle:%s+%s" % (op.repo.root, bundlefile)
1193 1191 bundle = hg.repository(op.repo.ui, bundlepath)
1194 1192
1195 1193 bookmark = params.get(b'bookmark')
1196 1194 bookprevnode = params.get(b'bookprevnode', b'')
1197 1195 force = params.get(b'force')
1198 1196
1199 1197 if bookmark:
1200 1198 oldnode = index.getnode(bookmark)
1201 1199 else:
1202 1200 oldnode = None
1203 1201 bundleheads = bundle.revs(b'heads(bundle())')
1204 1202 if bookmark and len(bundleheads) > 1:
1205 1203 raise error.Abort(
1206 1204 _(b'cannot push more than one head to a scratch branch')
1207 1205 )
1208 1206
1209 1207 revs = _getrevs(bundle, oldnode, force, bookmark)
1210 1208
1211 1209 # Notify the user of what is being pushed
1212 1210 plural = b's' if len(revs) > 1 else b''
1213 1211 op.repo.ui.warn(_(b"pushing %d commit%s:\n") % (len(revs), plural))
1214 1212 maxoutput = 10
1215 1213 for i in range(0, min(len(revs), maxoutput)):
1216 1214 firstline = bundle[revs[i]].description().split(b'\n')[0][:50]
1217 1215 op.repo.ui.warn(b" %s %s\n" % (revs[i], firstline))
1218 1216
1219 1217 if len(revs) > maxoutput + 1:
1220 1218 op.repo.ui.warn(b" ...\n")
1221 1219 firstline = bundle[revs[-1]].description().split(b'\n')[0][:50]
1222 1220 op.repo.ui.warn(b" %s %s\n" % (revs[-1], firstline))
1223 1221
1224 1222 nodesctx = [bundle[rev] for rev in revs]
1225 1223 inindex = lambda rev: bool(index.getbundle(bundle[rev].hex()))
1226 1224 if bundleheads:
1227 1225 newheadscount = sum(not inindex(rev) for rev in bundleheads)
1228 1226 else:
1229 1227 newheadscount = 0
1230 1228 # If there's a bookmark specified, there should be only one head,
1231 1229 # so we choose the last node, which will be that head.
1232 1230 # If a bug or malicious client allows there to be a bookmark
1233 1231 # with multiple heads, we will place the bookmark on the last head.
1234 1232 bookmarknode = nodesctx[-1].hex() if nodesctx else None
1235 1233 key = None
1236 1234 if newheadscount:
1237 1235 with open(bundlefile, b'rb') as f:
1238 1236 bundledata = f.read()
1239 1237 with logservicecall(
1240 1238 log, b'bundlestore', bundlesize=len(bundledata)
1241 1239 ):
1242 1240 bundlesizelimit = 100 * 1024 * 1024 # 100 MB
1243 1241 if len(bundledata) > bundlesizelimit:
1244 1242 error_msg = (
1245 1243 b'bundle is too big: %d bytes. '
1246 1244 + b'max allowed size is 100 MB'
1247 1245 )
1248 1246 raise error.Abort(error_msg % (len(bundledata),))
1249 1247 key = store.write(bundledata)
1250 1248
1251 1249 with logservicecall(log, b'index', newheadscount=newheadscount), index:
1252 1250 if key:
1253 1251 index.addbundle(key, nodesctx)
1254 1252 if bookmark:
1255 1253 index.addbookmark(bookmark, bookmarknode)
1256 1254 _maybeaddpushbackpart(
1257 1255 op, bookmark, bookmarknode, bookprevnode, params
1258 1256 )
1259 1257 log(
1260 1258 scratchbranchparttype,
1261 1259 eventtype=b'success',
1262 1260 elapsedms=(time.time() - parthandlerstart) * 1000,
1263 1261 )
1264 1262
1265 1263 except Exception as e:
1266 1264 log(
1267 1265 scratchbranchparttype,
1268 1266 eventtype=b'failure',
1269 1267 elapsedms=(time.time() - parthandlerstart) * 1000,
1270 1268 errormsg=stringutil.forcebytestr(e),
1271 1269 )
1272 1270 raise
1273 1271 finally:
1274 1272 if bundle:
1275 1273 bundle.close()
1276 1274
1277 1275
1278 1276 @bundle2.parthandler(
1279 1277 scratchbranchparttype,
1280 1278 (
1281 1279 b'bookmark',
1282 1280 b'bookprevnode',
1283 1281 b'force',
1284 1282 b'pushbackbookmarks',
1285 1283 b'cgversion',
1286 1284 ),
1287 1285 )
1288 1286 def bundle2scratchbranch(op, part):
1289 1287 '''unbundle a bundle2 part containing a changegroup to store'''
1290 1288
1291 1289 bundler = bundle2.bundle20(op.repo.ui)
1292 1290 cgversion = part.params.get(b'cgversion', b'01')
1293 1291 cgpart = bundle2.bundlepart(b'changegroup', data=part.read())
1294 1292 cgpart.addparam(b'version', cgversion)
1295 1293 bundler.addpart(cgpart)
1296 1294 buf = util.chunkbuffer(bundler.getchunks())
1297 1295
1298 1296 fd, bundlefile = pycompat.mkstemp()
1299 1297 try:
1300 1298 try:
1301 1299 fp = os.fdopen(fd, 'wb')
1302 1300 fp.write(buf.read())
1303 1301 finally:
1304 1302 fp.close()
1305 1303 storebundle(op, part.params, bundlefile)
1306 1304 finally:
1307 1305 try:
1308 1306 os.unlink(bundlefile)
1309 1307 except FileNotFoundError:
1310 1308 pass
1311 1309
1312 1310 return 1
1313 1311
1314 1312
1315 1313 def _maybeaddpushbackpart(op, bookmark, newnode, oldnode, params):
1316 1314 if params.get(b'pushbackbookmarks'):
1317 1315 if op.reply and b'pushback' in op.reply.capabilities:
1318 1316 params = {
1319 1317 b'namespace': b'bookmarks',
1320 1318 b'key': bookmark,
1321 1319 b'new': newnode,
1322 1320 b'old': oldnode,
1323 1321 }
1324 1322 op.reply.newpart(b'pushkey', mandatoryparams=params.items())
1325 1323
1326 1324
1327 1325 def bundle2pushkey(orig, op, part):
1328 1326 """Wrapper of bundle2.handlepushkey()
1329 1327
1330 1328 The only goal is to skip calling the original function if flag is set.
1331 1329 It's set if infinitepush push is happening.
1332 1330 """
1333 1331 if op.records[scratchbranchparttype + b'_skippushkey']:
1334 1332 if op.reply is not None:
1335 1333 rpart = op.reply.newpart(b'reply:pushkey')
1336 1334 rpart.addparam(b'in-reply-to', str(part.id), mandatory=False)
1337 1335 rpart.addparam(b'return', b'1', mandatory=False)
1338 1336 return 1
1339 1337
1340 1338 return orig(op, part)
1341 1339
1342 1340
1343 1341 def bundle2handlephases(orig, op, part):
1344 1342 """Wrapper of bundle2.handlephases()
1345 1343
1346 1344 The only goal is to skip calling the original function if flag is set.
1347 1345 It's set if infinitepush push is happening.
1348 1346 """
1349 1347
1350 1348 if op.records[scratchbranchparttype + b'_skipphaseheads']:
1351 1349 return
1352 1350
1353 1351 return orig(op, part)
1354 1352
1355 1353
1356 1354 def _asyncsavemetadata(root, nodes):
1357 1355 """starts a separate process that fills metadata for the nodes
1358 1356
1359 1357 This function creates a separate process and doesn't wait for it's
1360 1358 completion. This was done to avoid slowing down pushes
1361 1359 """
1362 1360
1363 1361 maxnodes = 50
1364 1362 if len(nodes) > maxnodes:
1365 1363 return
1366 1364 nodesargs = []
1367 1365 for node in nodes:
1368 1366 nodesargs.append(b'--node')
1369 1367 nodesargs.append(node)
1370 1368 with open(os.devnull, b'w+b') as devnull:
1371 1369 cmdline = [
1372 1370 util.hgexecutable(),
1373 1371 b'debugfillinfinitepushmetadata',
1374 1372 b'-R',
1375 1373 root,
1376 1374 ] + nodesargs
1377 1375 # Process will run in background. We don't care about the return code
1378 1376 subprocess.Popen(
1379 1377 pycompat.rapply(procutil.tonativestr, cmdline),
1380 1378 close_fds=True,
1381 1379 shell=False,
1382 1380 stdin=devnull,
1383 1381 stdout=devnull,
1384 1382 stderr=devnull,
1385 1383 )
General Comments 0
You need to be logged in to leave comments. Login now