##// END OF EJS Templates
infinitepush: use the new function to determine push destination...
marmoute -
r47673:6071bfab default
parent child Browse files
Show More
@@ -1,1385 +1,1385 b''
1 1 # Infinite push
2 2 #
3 3 # Copyright 2016 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 """ store some pushes in a remote blob store on the server (EXPERIMENTAL)
8 8
9 9 IMPORTANT: if you use this extension, please contact
10 10 mercurial-devel@mercurial-scm.org ASAP. This extension is believed to
11 11 be unused and barring learning of users of this functionality, we will
12 12 delete this code at the end of 2020.
13 13
14 14 [infinitepush]
15 15 # Server-side and client-side option. Pattern of the infinitepush bookmark
16 16 branchpattern = PATTERN
17 17
18 18 # Server or client
19 19 server = False
20 20
21 21 # Server-side option. Possible values: 'disk' or 'sql'. Fails if not set
22 22 indextype = disk
23 23
24 24 # Server-side option. Used only if indextype=sql.
25 25 # Format: 'IP:PORT:DB_NAME:USER:PASSWORD'
26 26 sqlhost = IP:PORT:DB_NAME:USER:PASSWORD
27 27
28 28 # Server-side option. Used only if indextype=disk.
29 29 # Filesystem path to the index store
30 30 indexpath = PATH
31 31
32 32 # Server-side option. Possible values: 'disk' or 'external'
33 33 # Fails if not set
34 34 storetype = disk
35 35
36 36 # Server-side option.
37 37 # Path to the binary that will save bundle to the bundlestore
38 38 # Formatted cmd line will be passed to it (see `put_args`)
39 39 put_binary = put
40 40
41 41 # Serser-side option. Used only if storetype=external.
42 42 # Format cmd-line string for put binary. Placeholder: {filename}
43 43 put_args = {filename}
44 44
45 45 # Server-side option.
46 46 # Path to the binary that get bundle from the bundlestore.
47 47 # Formatted cmd line will be passed to it (see `get_args`)
48 48 get_binary = get
49 49
50 50 # Serser-side option. Used only if storetype=external.
51 51 # Format cmd-line string for get binary. Placeholders: {filename} {handle}
52 52 get_args = {filename} {handle}
53 53
54 54 # Server-side option
55 55 logfile = FIlE
56 56
57 57 # Server-side option
58 58 loglevel = DEBUG
59 59
60 60 # Server-side option. Used only if indextype=sql.
61 61 # Sets mysql wait_timeout option.
62 62 waittimeout = 300
63 63
64 64 # Server-side option. Used only if indextype=sql.
65 65 # Sets mysql innodb_lock_wait_timeout option.
66 66 locktimeout = 120
67 67
68 68 # Server-side option. Used only if indextype=sql.
69 69 # Name of the repository
70 70 reponame = ''
71 71
72 72 # Client-side option. Used by --list-remote option. List of remote scratch
73 73 # patterns to list if no patterns are specified.
74 74 defaultremotepatterns = ['*']
75 75
76 76 # Instructs infinitepush to forward all received bundle2 parts to the
77 77 # bundle for storage. Defaults to False.
78 78 storeallparts = True
79 79
80 80 # routes each incoming push to the bundlestore. defaults to False
81 81 pushtobundlestore = True
82 82
83 83 [remotenames]
84 84 # Client-side option
85 85 # This option should be set only if remotenames extension is enabled.
86 86 # Whether remote bookmarks are tracked by remotenames extension.
87 87 bookmarks = True
88 88 """
89 89
90 90 from __future__ import absolute_import
91 91
92 92 import collections
93 93 import contextlib
94 94 import errno
95 95 import functools
96 96 import logging
97 97 import os
98 98 import random
99 99 import re
100 100 import socket
101 101 import subprocess
102 102 import time
103 103
104 104 from mercurial.node import (
105 105 bin,
106 106 hex,
107 107 )
108 108
109 109 from mercurial.i18n import _
110 110
111 111 from mercurial.pycompat import (
112 112 getattr,
113 113 open,
114 114 )
115 115
116 116 from mercurial.utils import (
117 117 procutil,
118 118 stringutil,
119 119 urlutil,
120 120 )
121 121
122 122 from mercurial import (
123 123 bundle2,
124 124 changegroup,
125 125 commands,
126 126 discovery,
127 127 encoding,
128 128 error,
129 129 exchange,
130 130 extensions,
131 131 hg,
132 132 localrepo,
133 133 phases,
134 134 pushkey,
135 135 pycompat,
136 136 registrar,
137 137 util,
138 138 wireprototypes,
139 139 wireprotov1peer,
140 140 wireprotov1server,
141 141 )
142 142
143 143 from . import (
144 144 bundleparts,
145 145 common,
146 146 )
147 147
148 148 # Note for extension authors: ONLY specify testedwith = 'ships-with-hg-core' for
149 149 # extensions which SHIP WITH MERCURIAL. Non-mainline extensions should
150 150 # be specifying the version(s) of Mercurial they are tested with, or
151 151 # leave the attribute unspecified.
152 152 testedwith = b'ships-with-hg-core'
153 153
154 154 configtable = {}
155 155 configitem = registrar.configitem(configtable)
156 156
157 157 configitem(
158 158 b'infinitepush',
159 159 b'server',
160 160 default=False,
161 161 )
162 162 configitem(
163 163 b'infinitepush',
164 164 b'storetype',
165 165 default=b'',
166 166 )
167 167 configitem(
168 168 b'infinitepush',
169 169 b'indextype',
170 170 default=b'',
171 171 )
172 172 configitem(
173 173 b'infinitepush',
174 174 b'indexpath',
175 175 default=b'',
176 176 )
177 177 configitem(
178 178 b'infinitepush',
179 179 b'storeallparts',
180 180 default=False,
181 181 )
182 182 configitem(
183 183 b'infinitepush',
184 184 b'reponame',
185 185 default=b'',
186 186 )
187 187 configitem(
188 188 b'scratchbranch',
189 189 b'storepath',
190 190 default=b'',
191 191 )
192 192 configitem(
193 193 b'infinitepush',
194 194 b'branchpattern',
195 195 default=b'',
196 196 )
197 197 configitem(
198 198 b'infinitepush',
199 199 b'pushtobundlestore',
200 200 default=False,
201 201 )
202 202 configitem(
203 203 b'experimental',
204 204 b'server-bundlestore-bookmark',
205 205 default=b'',
206 206 )
207 207 configitem(
208 208 b'experimental',
209 209 b'infinitepush-scratchpush',
210 210 default=False,
211 211 )
212 212
213 213 experimental = b'experimental'
214 214 configbookmark = b'server-bundlestore-bookmark'
215 215 configscratchpush = b'infinitepush-scratchpush'
216 216
217 217 scratchbranchparttype = bundleparts.scratchbranchparttype
218 218 revsetpredicate = registrar.revsetpredicate()
219 219 templatekeyword = registrar.templatekeyword()
220 220 _scratchbranchmatcher = lambda x: False
221 221 _maybehash = re.compile('^[a-f0-9]+$').search
222 222
223 223
224 224 def _buildexternalbundlestore(ui):
225 225 put_args = ui.configlist(b'infinitepush', b'put_args', [])
226 226 put_binary = ui.config(b'infinitepush', b'put_binary')
227 227 if not put_binary:
228 228 raise error.Abort(b'put binary is not specified')
229 229 get_args = ui.configlist(b'infinitepush', b'get_args', [])
230 230 get_binary = ui.config(b'infinitepush', b'get_binary')
231 231 if not get_binary:
232 232 raise error.Abort(b'get binary is not specified')
233 233 from . import store
234 234
235 235 return store.externalbundlestore(put_binary, put_args, get_binary, get_args)
236 236
237 237
238 238 def _buildsqlindex(ui):
239 239 sqlhost = ui.config(b'infinitepush', b'sqlhost')
240 240 if not sqlhost:
241 241 raise error.Abort(_(b'please set infinitepush.sqlhost'))
242 242 host, port, db, user, password = sqlhost.split(b':')
243 243 reponame = ui.config(b'infinitepush', b'reponame')
244 244 if not reponame:
245 245 raise error.Abort(_(b'please set infinitepush.reponame'))
246 246
247 247 logfile = ui.config(b'infinitepush', b'logfile', b'')
248 248 waittimeout = ui.configint(b'infinitepush', b'waittimeout', 300)
249 249 locktimeout = ui.configint(b'infinitepush', b'locktimeout', 120)
250 250 from . import sqlindexapi
251 251
252 252 return sqlindexapi.sqlindexapi(
253 253 reponame,
254 254 host,
255 255 port,
256 256 db,
257 257 user,
258 258 password,
259 259 logfile,
260 260 _getloglevel(ui),
261 261 waittimeout=waittimeout,
262 262 locktimeout=locktimeout,
263 263 )
264 264
265 265
266 266 def _getloglevel(ui):
267 267 loglevel = ui.config(b'infinitepush', b'loglevel', b'DEBUG')
268 268 numeric_loglevel = getattr(logging, loglevel.upper(), None)
269 269 if not isinstance(numeric_loglevel, int):
270 270 raise error.Abort(_(b'invalid log level %s') % loglevel)
271 271 return numeric_loglevel
272 272
273 273
274 274 def _tryhoist(ui, remotebookmark):
275 275 """returns a bookmarks with hoisted part removed
276 276
277 277 Remotenames extension has a 'hoist' config that allows to use remote
278 278 bookmarks without specifying remote path. For example, 'hg update master'
279 279 works as well as 'hg update remote/master'. We want to allow the same in
280 280 infinitepush.
281 281 """
282 282
283 283 if common.isremotebooksenabled(ui):
284 284 hoist = ui.config(b'remotenames', b'hoistedpeer') + b'/'
285 285 if remotebookmark.startswith(hoist):
286 286 return remotebookmark[len(hoist) :]
287 287 return remotebookmark
288 288
289 289
290 290 class bundlestore(object):
291 291 def __init__(self, repo):
292 292 self._repo = repo
293 293 storetype = self._repo.ui.config(b'infinitepush', b'storetype')
294 294 if storetype == b'disk':
295 295 from . import store
296 296
297 297 self.store = store.filebundlestore(self._repo.ui, self._repo)
298 298 elif storetype == b'external':
299 299 self.store = _buildexternalbundlestore(self._repo.ui)
300 300 else:
301 301 raise error.Abort(
302 302 _(b'unknown infinitepush store type specified %s') % storetype
303 303 )
304 304
305 305 indextype = self._repo.ui.config(b'infinitepush', b'indextype')
306 306 if indextype == b'disk':
307 307 from . import fileindexapi
308 308
309 309 self.index = fileindexapi.fileindexapi(self._repo)
310 310 elif indextype == b'sql':
311 311 self.index = _buildsqlindex(self._repo.ui)
312 312 else:
313 313 raise error.Abort(
314 314 _(b'unknown infinitepush index type specified %s') % indextype
315 315 )
316 316
317 317
318 318 def _isserver(ui):
319 319 return ui.configbool(b'infinitepush', b'server')
320 320
321 321
322 322 def reposetup(ui, repo):
323 323 if _isserver(ui) and repo.local():
324 324 repo.bundlestore = bundlestore(repo)
325 325
326 326
327 327 def extsetup(ui):
328 328 commonsetup(ui)
329 329 if _isserver(ui):
330 330 serverextsetup(ui)
331 331 else:
332 332 clientextsetup(ui)
333 333
334 334
335 335 def commonsetup(ui):
336 336 wireprotov1server.commands[b'listkeyspatterns'] = (
337 337 wireprotolistkeyspatterns,
338 338 b'namespace patterns',
339 339 )
340 340 scratchbranchpat = ui.config(b'infinitepush', b'branchpattern')
341 341 if scratchbranchpat:
342 342 global _scratchbranchmatcher
343 343 kind, pat, _scratchbranchmatcher = stringutil.stringmatcher(
344 344 scratchbranchpat
345 345 )
346 346
347 347
348 348 def serverextsetup(ui):
349 349 origpushkeyhandler = bundle2.parthandlermapping[b'pushkey']
350 350
351 351 def newpushkeyhandler(*args, **kwargs):
352 352 bundle2pushkey(origpushkeyhandler, *args, **kwargs)
353 353
354 354 newpushkeyhandler.params = origpushkeyhandler.params
355 355 bundle2.parthandlermapping[b'pushkey'] = newpushkeyhandler
356 356
357 357 orighandlephasehandler = bundle2.parthandlermapping[b'phase-heads']
358 358 newphaseheadshandler = lambda *args, **kwargs: bundle2handlephases(
359 359 orighandlephasehandler, *args, **kwargs
360 360 )
361 361 newphaseheadshandler.params = orighandlephasehandler.params
362 362 bundle2.parthandlermapping[b'phase-heads'] = newphaseheadshandler
363 363
364 364 extensions.wrapfunction(
365 365 localrepo.localrepository, b'listkeys', localrepolistkeys
366 366 )
367 367 wireprotov1server.commands[b'lookup'] = (
368 368 _lookupwrap(wireprotov1server.commands[b'lookup'][0]),
369 369 b'key',
370 370 )
371 371 extensions.wrapfunction(exchange, b'getbundlechunks', getbundlechunks)
372 372
373 373 extensions.wrapfunction(bundle2, b'processparts', processparts)
374 374
375 375
376 376 def clientextsetup(ui):
377 377 entry = extensions.wrapcommand(commands.table, b'push', _push)
378 378
379 379 entry[1].append(
380 380 (
381 381 b'',
382 382 b'bundle-store',
383 383 None,
384 384 _(b'force push to go to bundle store (EXPERIMENTAL)'),
385 385 )
386 386 )
387 387
388 388 extensions.wrapcommand(commands.table, b'pull', _pull)
389 389
390 390 extensions.wrapfunction(discovery, b'checkheads', _checkheads)
391 391
392 392 wireprotov1peer.wirepeer.listkeyspatterns = listkeyspatterns
393 393
394 394 partorder = exchange.b2partsgenorder
395 395 index = partorder.index(b'changeset')
396 396 partorder.insert(
397 397 index, partorder.pop(partorder.index(scratchbranchparttype))
398 398 )
399 399
400 400
401 401 def _checkheads(orig, pushop):
402 402 if pushop.ui.configbool(experimental, configscratchpush, False):
403 403 return
404 404 return orig(pushop)
405 405
406 406
407 407 def wireprotolistkeyspatterns(repo, proto, namespace, patterns):
408 408 patterns = wireprototypes.decodelist(patterns)
409 409 d = pycompat.iteritems(repo.listkeys(encoding.tolocal(namespace), patterns))
410 410 return pushkey.encodekeys(d)
411 411
412 412
413 413 def localrepolistkeys(orig, self, namespace, patterns=None):
414 414 if namespace == b'bookmarks' and patterns:
415 415 index = self.bundlestore.index
416 416 results = {}
417 417 bookmarks = orig(self, namespace)
418 418 for pattern in patterns:
419 419 results.update(index.getbookmarks(pattern))
420 420 if pattern.endswith(b'*'):
421 421 pattern = b're:^' + pattern[:-1] + b'.*'
422 422 kind, pat, matcher = stringutil.stringmatcher(pattern)
423 423 for bookmark, node in pycompat.iteritems(bookmarks):
424 424 if matcher(bookmark):
425 425 results[bookmark] = node
426 426 return results
427 427 else:
428 428 return orig(self, namespace)
429 429
430 430
431 431 @wireprotov1peer.batchable
432 432 def listkeyspatterns(self, namespace, patterns):
433 433 if not self.capable(b'pushkey'):
434 434 yield {}, None
435 435 f = wireprotov1peer.future()
436 436 self.ui.debug(b'preparing listkeys for "%s"\n' % namespace)
437 437 yield {
438 438 b'namespace': encoding.fromlocal(namespace),
439 439 b'patterns': wireprototypes.encodelist(patterns),
440 440 }, f
441 441 d = f.value
442 442 self.ui.debug(
443 443 b'received listkey for "%s": %i bytes\n' % (namespace, len(d))
444 444 )
445 445 yield pushkey.decodekeys(d)
446 446
447 447
448 448 def _readbundlerevs(bundlerepo):
449 449 return list(bundlerepo.revs(b'bundle()'))
450 450
451 451
452 452 def _includefilelogstobundle(bundlecaps, bundlerepo, bundlerevs, ui):
453 453 """Tells remotefilelog to include all changed files to the changegroup
454 454
455 455 By default remotefilelog doesn't include file content to the changegroup.
456 456 But we need to include it if we are fetching from bundlestore.
457 457 """
458 458 changedfiles = set()
459 459 cl = bundlerepo.changelog
460 460 for r in bundlerevs:
461 461 # [3] means changed files
462 462 changedfiles.update(cl.read(r)[3])
463 463 if not changedfiles:
464 464 return bundlecaps
465 465
466 466 changedfiles = b'\0'.join(changedfiles)
467 467 newcaps = []
468 468 appended = False
469 469 for cap in bundlecaps or []:
470 470 if cap.startswith(b'excludepattern='):
471 471 newcaps.append(b'\0'.join((cap, changedfiles)))
472 472 appended = True
473 473 else:
474 474 newcaps.append(cap)
475 475 if not appended:
476 476 # Not found excludepattern cap. Just append it
477 477 newcaps.append(b'excludepattern=' + changedfiles)
478 478
479 479 return newcaps
480 480
481 481
482 482 def _rebundle(bundlerepo, bundleroots, unknownhead):
483 483 """
484 484 Bundle may include more revision then user requested. For example,
485 485 if user asks for revision but bundle also consists its descendants.
486 486 This function will filter out all revision that user is not requested.
487 487 """
488 488 parts = []
489 489
490 490 version = b'02'
491 491 outgoing = discovery.outgoing(
492 492 bundlerepo, commonheads=bundleroots, ancestorsof=[unknownhead]
493 493 )
494 494 cgstream = changegroup.makestream(bundlerepo, outgoing, version, b'pull')
495 495 cgstream = util.chunkbuffer(cgstream).read()
496 496 cgpart = bundle2.bundlepart(b'changegroup', data=cgstream)
497 497 cgpart.addparam(b'version', version)
498 498 parts.append(cgpart)
499 499
500 500 return parts
501 501
502 502
503 503 def _getbundleroots(oldrepo, bundlerepo, bundlerevs):
504 504 cl = bundlerepo.changelog
505 505 bundleroots = []
506 506 for rev in bundlerevs:
507 507 node = cl.node(rev)
508 508 parents = cl.parents(node)
509 509 for parent in parents:
510 510 # include all revs that exist in the main repo
511 511 # to make sure that bundle may apply client-side
512 512 if parent in oldrepo:
513 513 bundleroots.append(parent)
514 514 return bundleroots
515 515
516 516
517 517 def _needsrebundling(head, bundlerepo):
518 518 bundleheads = list(bundlerepo.revs(b'heads(bundle())'))
519 519 return not (
520 520 len(bundleheads) == 1 and bundlerepo[bundleheads[0]].node() == head
521 521 )
522 522
523 523
524 524 def _generateoutputparts(head, bundlerepo, bundleroots, bundlefile):
525 525 """generates bundle that will be send to the user
526 526
527 527 returns tuple with raw bundle string and bundle type
528 528 """
529 529 parts = []
530 530 if not _needsrebundling(head, bundlerepo):
531 531 with util.posixfile(bundlefile, b"rb") as f:
532 532 unbundler = exchange.readbundle(bundlerepo.ui, f, bundlefile)
533 533 if isinstance(unbundler, changegroup.cg1unpacker):
534 534 part = bundle2.bundlepart(
535 535 b'changegroup', data=unbundler._stream.read()
536 536 )
537 537 part.addparam(b'version', b'01')
538 538 parts.append(part)
539 539 elif isinstance(unbundler, bundle2.unbundle20):
540 540 haschangegroup = False
541 541 for part in unbundler.iterparts():
542 542 if part.type == b'changegroup':
543 543 haschangegroup = True
544 544 newpart = bundle2.bundlepart(part.type, data=part.read())
545 545 for key, value in pycompat.iteritems(part.params):
546 546 newpart.addparam(key, value)
547 547 parts.append(newpart)
548 548
549 549 if not haschangegroup:
550 550 raise error.Abort(
551 551 b'unexpected bundle without changegroup part, '
552 552 + b'head: %s' % hex(head),
553 553 hint=b'report to administrator',
554 554 )
555 555 else:
556 556 raise error.Abort(b'unknown bundle type')
557 557 else:
558 558 parts = _rebundle(bundlerepo, bundleroots, head)
559 559
560 560 return parts
561 561
562 562
563 563 def getbundlechunks(orig, repo, source, heads=None, bundlecaps=None, **kwargs):
564 564 heads = heads or []
565 565 # newheads are parents of roots of scratch bundles that were requested
566 566 newphases = {}
567 567 scratchbundles = []
568 568 newheads = []
569 569 scratchheads = []
570 570 nodestobundle = {}
571 571 allbundlestocleanup = []
572 572 try:
573 573 for head in heads:
574 574 if not repo.changelog.index.has_node(head):
575 575 if head not in nodestobundle:
576 576 newbundlefile = common.downloadbundle(repo, head)
577 577 bundlepath = b"bundle:%s+%s" % (repo.root, newbundlefile)
578 578 bundlerepo = hg.repository(repo.ui, bundlepath)
579 579
580 580 allbundlestocleanup.append((bundlerepo, newbundlefile))
581 581 bundlerevs = set(_readbundlerevs(bundlerepo))
582 582 bundlecaps = _includefilelogstobundle(
583 583 bundlecaps, bundlerepo, bundlerevs, repo.ui
584 584 )
585 585 cl = bundlerepo.changelog
586 586 bundleroots = _getbundleroots(repo, bundlerepo, bundlerevs)
587 587 for rev in bundlerevs:
588 588 node = cl.node(rev)
589 589 newphases[hex(node)] = str(phases.draft)
590 590 nodestobundle[node] = (
591 591 bundlerepo,
592 592 bundleroots,
593 593 newbundlefile,
594 594 )
595 595
596 596 scratchbundles.append(
597 597 _generateoutputparts(head, *nodestobundle[head])
598 598 )
599 599 newheads.extend(bundleroots)
600 600 scratchheads.append(head)
601 601 finally:
602 602 for bundlerepo, bundlefile in allbundlestocleanup:
603 603 bundlerepo.close()
604 604 try:
605 605 os.unlink(bundlefile)
606 606 except (IOError, OSError):
607 607 # if we can't cleanup the file then just ignore the error,
608 608 # no need to fail
609 609 pass
610 610
611 611 pullfrombundlestore = bool(scratchbundles)
612 612 wrappedchangegrouppart = False
613 613 wrappedlistkeys = False
614 614 oldchangegrouppart = exchange.getbundle2partsmapping[b'changegroup']
615 615 try:
616 616
617 617 def _changegrouppart(bundler, *args, **kwargs):
618 618 # Order is important here. First add non-scratch part
619 619 # and only then add parts with scratch bundles because
620 620 # non-scratch part contains parents of roots of scratch bundles.
621 621 result = oldchangegrouppart(bundler, *args, **kwargs)
622 622 for bundle in scratchbundles:
623 623 for part in bundle:
624 624 bundler.addpart(part)
625 625 return result
626 626
627 627 exchange.getbundle2partsmapping[b'changegroup'] = _changegrouppart
628 628 wrappedchangegrouppart = True
629 629
630 630 def _listkeys(orig, self, namespace):
631 631 origvalues = orig(self, namespace)
632 632 if namespace == b'phases' and pullfrombundlestore:
633 633 if origvalues.get(b'publishing') == b'True':
634 634 # Make repo non-publishing to preserve draft phase
635 635 del origvalues[b'publishing']
636 636 origvalues.update(newphases)
637 637 return origvalues
638 638
639 639 extensions.wrapfunction(
640 640 localrepo.localrepository, b'listkeys', _listkeys
641 641 )
642 642 wrappedlistkeys = True
643 643 heads = list((set(newheads) | set(heads)) - set(scratchheads))
644 644 result = orig(
645 645 repo, source, heads=heads, bundlecaps=bundlecaps, **kwargs
646 646 )
647 647 finally:
648 648 if wrappedchangegrouppart:
649 649 exchange.getbundle2partsmapping[b'changegroup'] = oldchangegrouppart
650 650 if wrappedlistkeys:
651 651 extensions.unwrapfunction(
652 652 localrepo.localrepository, b'listkeys', _listkeys
653 653 )
654 654 return result
655 655
656 656
657 657 def _lookupwrap(orig):
658 658 def _lookup(repo, proto, key):
659 659 localkey = encoding.tolocal(key)
660 660
661 661 if isinstance(localkey, str) and _scratchbranchmatcher(localkey):
662 662 scratchnode = repo.bundlestore.index.getnode(localkey)
663 663 if scratchnode:
664 664 return b"%d %s\n" % (1, scratchnode)
665 665 else:
666 666 return b"%d %s\n" % (
667 667 0,
668 668 b'scratch branch %s not found' % localkey,
669 669 )
670 670 else:
671 671 try:
672 672 r = hex(repo.lookup(localkey))
673 673 return b"%d %s\n" % (1, r)
674 674 except Exception as inst:
675 675 if repo.bundlestore.index.getbundle(localkey):
676 676 return b"%d %s\n" % (1, localkey)
677 677 else:
678 678 r = stringutil.forcebytestr(inst)
679 679 return b"%d %s\n" % (0, r)
680 680
681 681 return _lookup
682 682
683 683
684 684 def _pull(orig, ui, repo, source=b"default", **opts):
685 685 opts = pycompat.byteskwargs(opts)
686 686 # Copy paste from `pull` command
687 687 source, branches = urlutil.parseurl(
688 688 ui.expandpath(source), opts.get(b'branch')
689 689 )
690 690
691 691 scratchbookmarks = {}
692 692 unfi = repo.unfiltered()
693 693 unknownnodes = []
694 694 for rev in opts.get(b'rev', []):
695 695 if rev not in unfi:
696 696 unknownnodes.append(rev)
697 697 if opts.get(b'bookmark'):
698 698 bookmarks = []
699 699 revs = opts.get(b'rev') or []
700 700 for bookmark in opts.get(b'bookmark'):
701 701 if _scratchbranchmatcher(bookmark):
702 702 # rev is not known yet
703 703 # it will be fetched with listkeyspatterns next
704 704 scratchbookmarks[bookmark] = b'REVTOFETCH'
705 705 else:
706 706 bookmarks.append(bookmark)
707 707
708 708 if scratchbookmarks:
709 709 other = hg.peer(repo, opts, source)
710 710 try:
711 711 fetchedbookmarks = other.listkeyspatterns(
712 712 b'bookmarks', patterns=scratchbookmarks
713 713 )
714 714 for bookmark in scratchbookmarks:
715 715 if bookmark not in fetchedbookmarks:
716 716 raise error.Abort(
717 717 b'remote bookmark %s not found!' % bookmark
718 718 )
719 719 scratchbookmarks[bookmark] = fetchedbookmarks[bookmark]
720 720 revs.append(fetchedbookmarks[bookmark])
721 721 finally:
722 722 other.close()
723 723 opts[b'bookmark'] = bookmarks
724 724 opts[b'rev'] = revs
725 725
726 726 if scratchbookmarks or unknownnodes:
727 727 # Set anyincoming to True
728 728 extensions.wrapfunction(
729 729 discovery, b'findcommonincoming', _findcommonincoming
730 730 )
731 731 try:
732 732 # Remote scratch bookmarks will be deleted because remotenames doesn't
733 733 # know about them. Let's save it before pull and restore after
734 734 remotescratchbookmarks = _readscratchremotebookmarks(ui, repo, source)
735 735 result = orig(ui, repo, source, **pycompat.strkwargs(opts))
736 736 # TODO(stash): race condition is possible
737 737 # if scratch bookmarks was updated right after orig.
738 738 # But that's unlikely and shouldn't be harmful.
739 739 if common.isremotebooksenabled(ui):
740 740 remotescratchbookmarks.update(scratchbookmarks)
741 741 _saveremotebookmarks(repo, remotescratchbookmarks, source)
742 742 else:
743 743 _savelocalbookmarks(repo, scratchbookmarks)
744 744 return result
745 745 finally:
746 746 if scratchbookmarks:
747 747 extensions.unwrapfunction(discovery, b'findcommonincoming')
748 748
749 749
750 750 def _readscratchremotebookmarks(ui, repo, other):
751 751 if common.isremotebooksenabled(ui):
752 752 remotenamesext = extensions.find(b'remotenames')
753 753 remotepath = remotenamesext.activepath(repo.ui, other)
754 754 result = {}
755 755 # Let's refresh remotenames to make sure we have it up to date
756 756 # Seems that `repo.names['remotebookmarks']` may return stale bookmarks
757 757 # and it results in deleting scratch bookmarks. Our best guess how to
758 758 # fix it is to use `clearnames()`
759 759 repo._remotenames.clearnames()
760 760 for remotebookmark in repo.names[b'remotebookmarks'].listnames(repo):
761 761 path, bookname = remotenamesext.splitremotename(remotebookmark)
762 762 if path == remotepath and _scratchbranchmatcher(bookname):
763 763 nodes = repo.names[b'remotebookmarks'].nodes(
764 764 repo, remotebookmark
765 765 )
766 766 if nodes:
767 767 result[bookname] = hex(nodes[0])
768 768 return result
769 769 else:
770 770 return {}
771 771
772 772
773 773 def _saveremotebookmarks(repo, newbookmarks, remote):
774 774 remotenamesext = extensions.find(b'remotenames')
775 775 remotepath = remotenamesext.activepath(repo.ui, remote)
776 776 branches = collections.defaultdict(list)
777 777 bookmarks = {}
778 778 remotenames = remotenamesext.readremotenames(repo)
779 779 for hexnode, nametype, remote, rname in remotenames:
780 780 if remote != remotepath:
781 781 continue
782 782 if nametype == b'bookmarks':
783 783 if rname in newbookmarks:
784 784 # It's possible if we have a normal bookmark that matches
785 785 # scratch branch pattern. In this case just use the current
786 786 # bookmark node
787 787 del newbookmarks[rname]
788 788 bookmarks[rname] = hexnode
789 789 elif nametype == b'branches':
790 790 # saveremotenames expects 20 byte binary nodes for branches
791 791 branches[rname].append(bin(hexnode))
792 792
793 793 for bookmark, hexnode in pycompat.iteritems(newbookmarks):
794 794 bookmarks[bookmark] = hexnode
795 795 remotenamesext.saveremotenames(repo, remotepath, branches, bookmarks)
796 796
797 797
798 798 def _savelocalbookmarks(repo, bookmarks):
799 799 if not bookmarks:
800 800 return
801 801 with repo.wlock(), repo.lock(), repo.transaction(b'bookmark') as tr:
802 802 changes = []
803 803 for scratchbook, node in pycompat.iteritems(bookmarks):
804 804 changectx = repo[node]
805 805 changes.append((scratchbook, changectx.node()))
806 806 repo._bookmarks.applychanges(repo, tr, changes)
807 807
808 808
809 809 def _findcommonincoming(orig, *args, **kwargs):
810 810 common, inc, remoteheads = orig(*args, **kwargs)
811 811 return common, True, remoteheads
812 812
813 813
814 def _push(orig, ui, repo, dest=None, *args, **opts):
814 def _push(orig, ui, repo, *dests, **opts):
815 815 opts = pycompat.byteskwargs(opts)
816 816 bookmark = opts.get(b'bookmark')
817 817 # we only support pushing one infinitepush bookmark at once
818 818 if len(bookmark) == 1:
819 819 bookmark = bookmark[0]
820 820 else:
821 821 bookmark = b''
822 822
823 823 oldphasemove = None
824 824 overrides = {(experimental, configbookmark): bookmark}
825 825
826 826 with ui.configoverride(overrides, b'infinitepush'):
827 827 scratchpush = opts.get(b'bundle_store')
828 828 if _scratchbranchmatcher(bookmark):
829 829 scratchpush = True
830 830 # bundle2 can be sent back after push (for example, bundle2
831 831 # containing `pushkey` part to update bookmarks)
832 832 ui.setconfig(experimental, b'bundle2.pushback', True)
833 833
834 834 if scratchpush:
835 835 # this is an infinitepush, we don't want the bookmark to be applied
836 836 # rather that should be stored in the bundlestore
837 837 opts[b'bookmark'] = []
838 838 ui.setconfig(experimental, configscratchpush, True)
839 839 oldphasemove = extensions.wrapfunction(
840 840 exchange, b'_localphasemove', _phasemove
841 841 )
842 # Copy-paste from `push` command
843 path = ui.getpath(dest, default=(b'default-push', b'default'))
844 if not path:
845 raise error.Abort(
846 _(b'default repository not configured!'),
847 hint=_(b"see 'hg help config.paths'"),
848 )
842
843 paths = list(urlutil.get_push_paths(repo, ui, dests))
844 if len(paths) > 1:
845 msg = _(b'cannot push to multiple path with infinitepush')
846 raise error.Abort(msg)
847
848 path = paths[0]
849 849 destpath = path.pushloc or path.loc
850 850 # Remote scratch bookmarks will be deleted because remotenames doesn't
851 851 # know about them. Let's save it before push and restore after
852 852 remotescratchbookmarks = _readscratchremotebookmarks(ui, repo, destpath)
853 result = orig(ui, repo, dest, *args, **pycompat.strkwargs(opts))
853 result = orig(ui, repo, *dests, **pycompat.strkwargs(opts))
854 854 if common.isremotebooksenabled(ui):
855 855 if bookmark and scratchpush:
856 856 other = hg.peer(repo, opts, destpath)
857 857 try:
858 858 fetchedbookmarks = other.listkeyspatterns(
859 859 b'bookmarks', patterns=[bookmark]
860 860 )
861 861 remotescratchbookmarks.update(fetchedbookmarks)
862 862 finally:
863 863 other.close()
864 864 _saveremotebookmarks(repo, remotescratchbookmarks, destpath)
865 865 if oldphasemove:
866 866 exchange._localphasemove = oldphasemove
867 867 return result
868 868
869 869
870 870 def _deleteinfinitepushbookmarks(ui, repo, path, names):
871 871 """Prune remote names by removing the bookmarks we don't want anymore,
872 872 then writing the result back to disk
873 873 """
874 874 remotenamesext = extensions.find(b'remotenames')
875 875
876 876 # remotename format is:
877 877 # (node, nametype ("branches" or "bookmarks"), remote, name)
878 878 nametype_idx = 1
879 879 remote_idx = 2
880 880 name_idx = 3
881 881 remotenames = [
882 882 remotename
883 883 for remotename in remotenamesext.readremotenames(repo)
884 884 if remotename[remote_idx] == path
885 885 ]
886 886 remote_bm_names = [
887 887 remotename[name_idx]
888 888 for remotename in remotenames
889 889 if remotename[nametype_idx] == b"bookmarks"
890 890 ]
891 891
892 892 for name in names:
893 893 if name not in remote_bm_names:
894 894 raise error.Abort(
895 895 _(
896 896 b"infinitepush bookmark '{}' does not exist "
897 897 b"in path '{}'"
898 898 ).format(name, path)
899 899 )
900 900
901 901 bookmarks = {}
902 902 branches = collections.defaultdict(list)
903 903 for node, nametype, remote, name in remotenames:
904 904 if nametype == b"bookmarks" and name not in names:
905 905 bookmarks[name] = node
906 906 elif nametype == b"branches":
907 907 # saveremotenames wants binary nodes for branches
908 908 branches[name].append(bin(node))
909 909
910 910 remotenamesext.saveremotenames(repo, path, branches, bookmarks)
911 911
912 912
913 913 def _phasemove(orig, pushop, nodes, phase=phases.public):
914 914 """prevent commits from being marked public
915 915
916 916 Since these are going to a scratch branch, they aren't really being
917 917 published."""
918 918
919 919 if phase != phases.public:
920 920 orig(pushop, nodes, phase)
921 921
922 922
923 923 @exchange.b2partsgenerator(scratchbranchparttype)
924 924 def partgen(pushop, bundler):
925 925 bookmark = pushop.ui.config(experimental, configbookmark)
926 926 scratchpush = pushop.ui.configbool(experimental, configscratchpush)
927 927 if b'changesets' in pushop.stepsdone or not scratchpush:
928 928 return
929 929
930 930 if scratchbranchparttype not in bundle2.bundle2caps(pushop.remote):
931 931 return
932 932
933 933 pushop.stepsdone.add(b'changesets')
934 934 if not pushop.outgoing.missing:
935 935 pushop.ui.status(_(b'no changes found\n'))
936 936 pushop.cgresult = 0
937 937 return
938 938
939 939 # This parameter tells the server that the following bundle is an
940 940 # infinitepush. This let's it switch the part processing to our infinitepush
941 941 # code path.
942 942 bundler.addparam(b"infinitepush", b"True")
943 943
944 944 scratchparts = bundleparts.getscratchbranchparts(
945 945 pushop.repo, pushop.remote, pushop.outgoing, pushop.ui, bookmark
946 946 )
947 947
948 948 for scratchpart in scratchparts:
949 949 bundler.addpart(scratchpart)
950 950
951 951 def handlereply(op):
952 952 # server either succeeds or aborts; no code to read
953 953 pushop.cgresult = 1
954 954
955 955 return handlereply
956 956
957 957
958 958 bundle2.capabilities[bundleparts.scratchbranchparttype] = ()
959 959
960 960
961 961 def _getrevs(bundle, oldnode, force, bookmark):
962 962 b'extracts and validates the revs to be imported'
963 963 revs = [bundle[r] for r in bundle.revs(b'sort(bundle())')]
964 964
965 965 # new bookmark
966 966 if oldnode is None:
967 967 return revs
968 968
969 969 # Fast forward update
970 970 if oldnode in bundle and list(bundle.set(b'bundle() & %s::', oldnode)):
971 971 return revs
972 972
973 973 return revs
974 974
975 975
976 976 @contextlib.contextmanager
977 977 def logservicecall(logger, service, **kwargs):
978 978 start = time.time()
979 979 logger(service, eventtype=b'start', **kwargs)
980 980 try:
981 981 yield
982 982 logger(
983 983 service,
984 984 eventtype=b'success',
985 985 elapsedms=(time.time() - start) * 1000,
986 986 **kwargs
987 987 )
988 988 except Exception as e:
989 989 logger(
990 990 service,
991 991 eventtype=b'failure',
992 992 elapsedms=(time.time() - start) * 1000,
993 993 errormsg=stringutil.forcebytestr(e),
994 994 **kwargs
995 995 )
996 996 raise
997 997
998 998
999 999 def _getorcreateinfinitepushlogger(op):
1000 1000 logger = op.records[b'infinitepushlogger']
1001 1001 if not logger:
1002 1002 ui = op.repo.ui
1003 1003 try:
1004 1004 username = procutil.getuser()
1005 1005 except Exception:
1006 1006 username = b'unknown'
1007 1007 # Generate random request id to be able to find all logged entries
1008 1008 # for the same request. Since requestid is pseudo-generated it may
1009 1009 # not be unique, but we assume that (hostname, username, requestid)
1010 1010 # is unique.
1011 1011 random.seed()
1012 1012 requestid = random.randint(0, 2000000000)
1013 1013 hostname = socket.gethostname()
1014 1014 logger = functools.partial(
1015 1015 ui.log,
1016 1016 b'infinitepush',
1017 1017 user=username,
1018 1018 requestid=requestid,
1019 1019 hostname=hostname,
1020 1020 reponame=ui.config(b'infinitepush', b'reponame'),
1021 1021 )
1022 1022 op.records.add(b'infinitepushlogger', logger)
1023 1023 else:
1024 1024 logger = logger[0]
1025 1025 return logger
1026 1026
1027 1027
1028 1028 def storetobundlestore(orig, repo, op, unbundler):
1029 1029 """stores the incoming bundle coming from push command to the bundlestore
1030 1030 instead of applying on the revlogs"""
1031 1031
1032 1032 repo.ui.status(_(b"storing changesets on the bundlestore\n"))
1033 1033 bundler = bundle2.bundle20(repo.ui)
1034 1034
1035 1035 # processing each part and storing it in bundler
1036 1036 with bundle2.partiterator(repo, op, unbundler) as parts:
1037 1037 for part in parts:
1038 1038 bundlepart = None
1039 1039 if part.type == b'replycaps':
1040 1040 # This configures the current operation to allow reply parts.
1041 1041 bundle2._processpart(op, part)
1042 1042 else:
1043 1043 bundlepart = bundle2.bundlepart(part.type, data=part.read())
1044 1044 for key, value in pycompat.iteritems(part.params):
1045 1045 bundlepart.addparam(key, value)
1046 1046
1047 1047 # Certain parts require a response
1048 1048 if part.type in (b'pushkey', b'changegroup'):
1049 1049 if op.reply is not None:
1050 1050 rpart = op.reply.newpart(b'reply:%s' % part.type)
1051 1051 rpart.addparam(
1052 1052 b'in-reply-to', b'%d' % part.id, mandatory=False
1053 1053 )
1054 1054 rpart.addparam(b'return', b'1', mandatory=False)
1055 1055
1056 1056 op.records.add(
1057 1057 part.type,
1058 1058 {
1059 1059 b'return': 1,
1060 1060 },
1061 1061 )
1062 1062 if bundlepart:
1063 1063 bundler.addpart(bundlepart)
1064 1064
1065 1065 # storing the bundle in the bundlestore
1066 1066 buf = util.chunkbuffer(bundler.getchunks())
1067 1067 fd, bundlefile = pycompat.mkstemp()
1068 1068 try:
1069 1069 try:
1070 1070 fp = os.fdopen(fd, 'wb')
1071 1071 fp.write(buf.read())
1072 1072 finally:
1073 1073 fp.close()
1074 1074 storebundle(op, {}, bundlefile)
1075 1075 finally:
1076 1076 try:
1077 1077 os.unlink(bundlefile)
1078 1078 except Exception:
1079 1079 # we would rather see the original exception
1080 1080 pass
1081 1081
1082 1082
1083 1083 def processparts(orig, repo, op, unbundler):
1084 1084
1085 1085 # make sure we don't wrap processparts in case of `hg unbundle`
1086 1086 if op.source == b'unbundle':
1087 1087 return orig(repo, op, unbundler)
1088 1088
1089 1089 # this server routes each push to bundle store
1090 1090 if repo.ui.configbool(b'infinitepush', b'pushtobundlestore'):
1091 1091 return storetobundlestore(orig, repo, op, unbundler)
1092 1092
1093 1093 if unbundler.params.get(b'infinitepush') != b'True':
1094 1094 return orig(repo, op, unbundler)
1095 1095
1096 1096 handleallparts = repo.ui.configbool(b'infinitepush', b'storeallparts')
1097 1097
1098 1098 bundler = bundle2.bundle20(repo.ui)
1099 1099 cgparams = None
1100 1100 with bundle2.partiterator(repo, op, unbundler) as parts:
1101 1101 for part in parts:
1102 1102 bundlepart = None
1103 1103 if part.type == b'replycaps':
1104 1104 # This configures the current operation to allow reply parts.
1105 1105 bundle2._processpart(op, part)
1106 1106 elif part.type == bundleparts.scratchbranchparttype:
1107 1107 # Scratch branch parts need to be converted to normal
1108 1108 # changegroup parts, and the extra parameters stored for later
1109 1109 # when we upload to the store. Eventually those parameters will
1110 1110 # be put on the actual bundle instead of this part, then we can
1111 1111 # send a vanilla changegroup instead of the scratchbranch part.
1112 1112 cgversion = part.params.get(b'cgversion', b'01')
1113 1113 bundlepart = bundle2.bundlepart(
1114 1114 b'changegroup', data=part.read()
1115 1115 )
1116 1116 bundlepart.addparam(b'version', cgversion)
1117 1117 cgparams = part.params
1118 1118
1119 1119 # If we're not dumping all parts into the new bundle, we need to
1120 1120 # alert the future pushkey and phase-heads handler to skip
1121 1121 # the part.
1122 1122 if not handleallparts:
1123 1123 op.records.add(
1124 1124 scratchbranchparttype + b'_skippushkey', True
1125 1125 )
1126 1126 op.records.add(
1127 1127 scratchbranchparttype + b'_skipphaseheads', True
1128 1128 )
1129 1129 else:
1130 1130 if handleallparts:
1131 1131 # Ideally we would not process any parts, and instead just
1132 1132 # forward them to the bundle for storage, but since this
1133 1133 # differs from previous behavior, we need to put it behind a
1134 1134 # config flag for incremental rollout.
1135 1135 bundlepart = bundle2.bundlepart(part.type, data=part.read())
1136 1136 for key, value in pycompat.iteritems(part.params):
1137 1137 bundlepart.addparam(key, value)
1138 1138
1139 1139 # Certain parts require a response
1140 1140 if part.type == b'pushkey':
1141 1141 if op.reply is not None:
1142 1142 rpart = op.reply.newpart(b'reply:pushkey')
1143 1143 rpart.addparam(
1144 1144 b'in-reply-to', str(part.id), mandatory=False
1145 1145 )
1146 1146 rpart.addparam(b'return', b'1', mandatory=False)
1147 1147 else:
1148 1148 bundle2._processpart(op, part)
1149 1149
1150 1150 if handleallparts:
1151 1151 op.records.add(
1152 1152 part.type,
1153 1153 {
1154 1154 b'return': 1,
1155 1155 },
1156 1156 )
1157 1157 if bundlepart:
1158 1158 bundler.addpart(bundlepart)
1159 1159
1160 1160 # If commits were sent, store them
1161 1161 if cgparams:
1162 1162 buf = util.chunkbuffer(bundler.getchunks())
1163 1163 fd, bundlefile = pycompat.mkstemp()
1164 1164 try:
1165 1165 try:
1166 1166 fp = os.fdopen(fd, 'wb')
1167 1167 fp.write(buf.read())
1168 1168 finally:
1169 1169 fp.close()
1170 1170 storebundle(op, cgparams, bundlefile)
1171 1171 finally:
1172 1172 try:
1173 1173 os.unlink(bundlefile)
1174 1174 except Exception:
1175 1175 # we would rather see the original exception
1176 1176 pass
1177 1177
1178 1178
1179 1179 def storebundle(op, params, bundlefile):
1180 1180 log = _getorcreateinfinitepushlogger(op)
1181 1181 parthandlerstart = time.time()
1182 1182 log(scratchbranchparttype, eventtype=b'start')
1183 1183 index = op.repo.bundlestore.index
1184 1184 store = op.repo.bundlestore.store
1185 1185 op.records.add(scratchbranchparttype + b'_skippushkey', True)
1186 1186
1187 1187 bundle = None
1188 1188 try: # guards bundle
1189 1189 bundlepath = b"bundle:%s+%s" % (op.repo.root, bundlefile)
1190 1190 bundle = hg.repository(op.repo.ui, bundlepath)
1191 1191
1192 1192 bookmark = params.get(b'bookmark')
1193 1193 bookprevnode = params.get(b'bookprevnode', b'')
1194 1194 force = params.get(b'force')
1195 1195
1196 1196 if bookmark:
1197 1197 oldnode = index.getnode(bookmark)
1198 1198 else:
1199 1199 oldnode = None
1200 1200 bundleheads = bundle.revs(b'heads(bundle())')
1201 1201 if bookmark and len(bundleheads) > 1:
1202 1202 raise error.Abort(
1203 1203 _(b'cannot push more than one head to a scratch branch')
1204 1204 )
1205 1205
1206 1206 revs = _getrevs(bundle, oldnode, force, bookmark)
1207 1207
1208 1208 # Notify the user of what is being pushed
1209 1209 plural = b's' if len(revs) > 1 else b''
1210 1210 op.repo.ui.warn(_(b"pushing %d commit%s:\n") % (len(revs), plural))
1211 1211 maxoutput = 10
1212 1212 for i in range(0, min(len(revs), maxoutput)):
1213 1213 firstline = bundle[revs[i]].description().split(b'\n')[0][:50]
1214 1214 op.repo.ui.warn(b" %s %s\n" % (revs[i], firstline))
1215 1215
1216 1216 if len(revs) > maxoutput + 1:
1217 1217 op.repo.ui.warn(b" ...\n")
1218 1218 firstline = bundle[revs[-1]].description().split(b'\n')[0][:50]
1219 1219 op.repo.ui.warn(b" %s %s\n" % (revs[-1], firstline))
1220 1220
1221 1221 nodesctx = [bundle[rev] for rev in revs]
1222 1222 inindex = lambda rev: bool(index.getbundle(bundle[rev].hex()))
1223 1223 if bundleheads:
1224 1224 newheadscount = sum(not inindex(rev) for rev in bundleheads)
1225 1225 else:
1226 1226 newheadscount = 0
1227 1227 # If there's a bookmark specified, there should be only one head,
1228 1228 # so we choose the last node, which will be that head.
1229 1229 # If a bug or malicious client allows there to be a bookmark
1230 1230 # with multiple heads, we will place the bookmark on the last head.
1231 1231 bookmarknode = nodesctx[-1].hex() if nodesctx else None
1232 1232 key = None
1233 1233 if newheadscount:
1234 1234 with open(bundlefile, b'rb') as f:
1235 1235 bundledata = f.read()
1236 1236 with logservicecall(
1237 1237 log, b'bundlestore', bundlesize=len(bundledata)
1238 1238 ):
1239 1239 bundlesizelimit = 100 * 1024 * 1024 # 100 MB
1240 1240 if len(bundledata) > bundlesizelimit:
1241 1241 error_msg = (
1242 1242 b'bundle is too big: %d bytes. '
1243 1243 + b'max allowed size is 100 MB'
1244 1244 )
1245 1245 raise error.Abort(error_msg % (len(bundledata),))
1246 1246 key = store.write(bundledata)
1247 1247
1248 1248 with logservicecall(log, b'index', newheadscount=newheadscount), index:
1249 1249 if key:
1250 1250 index.addbundle(key, nodesctx)
1251 1251 if bookmark:
1252 1252 index.addbookmark(bookmark, bookmarknode)
1253 1253 _maybeaddpushbackpart(
1254 1254 op, bookmark, bookmarknode, bookprevnode, params
1255 1255 )
1256 1256 log(
1257 1257 scratchbranchparttype,
1258 1258 eventtype=b'success',
1259 1259 elapsedms=(time.time() - parthandlerstart) * 1000,
1260 1260 )
1261 1261
1262 1262 except Exception as e:
1263 1263 log(
1264 1264 scratchbranchparttype,
1265 1265 eventtype=b'failure',
1266 1266 elapsedms=(time.time() - parthandlerstart) * 1000,
1267 1267 errormsg=stringutil.forcebytestr(e),
1268 1268 )
1269 1269 raise
1270 1270 finally:
1271 1271 if bundle:
1272 1272 bundle.close()
1273 1273
1274 1274
1275 1275 @bundle2.parthandler(
1276 1276 scratchbranchparttype,
1277 1277 (
1278 1278 b'bookmark',
1279 1279 b'bookprevnode',
1280 1280 b'force',
1281 1281 b'pushbackbookmarks',
1282 1282 b'cgversion',
1283 1283 ),
1284 1284 )
1285 1285 def bundle2scratchbranch(op, part):
1286 1286 '''unbundle a bundle2 part containing a changegroup to store'''
1287 1287
1288 1288 bundler = bundle2.bundle20(op.repo.ui)
1289 1289 cgversion = part.params.get(b'cgversion', b'01')
1290 1290 cgpart = bundle2.bundlepart(b'changegroup', data=part.read())
1291 1291 cgpart.addparam(b'version', cgversion)
1292 1292 bundler.addpart(cgpart)
1293 1293 buf = util.chunkbuffer(bundler.getchunks())
1294 1294
1295 1295 fd, bundlefile = pycompat.mkstemp()
1296 1296 try:
1297 1297 try:
1298 1298 fp = os.fdopen(fd, 'wb')
1299 1299 fp.write(buf.read())
1300 1300 finally:
1301 1301 fp.close()
1302 1302 storebundle(op, part.params, bundlefile)
1303 1303 finally:
1304 1304 try:
1305 1305 os.unlink(bundlefile)
1306 1306 except OSError as e:
1307 1307 if e.errno != errno.ENOENT:
1308 1308 raise
1309 1309
1310 1310 return 1
1311 1311
1312 1312
1313 1313 def _maybeaddpushbackpart(op, bookmark, newnode, oldnode, params):
1314 1314 if params.get(b'pushbackbookmarks'):
1315 1315 if op.reply and b'pushback' in op.reply.capabilities:
1316 1316 params = {
1317 1317 b'namespace': b'bookmarks',
1318 1318 b'key': bookmark,
1319 1319 b'new': newnode,
1320 1320 b'old': oldnode,
1321 1321 }
1322 1322 op.reply.newpart(
1323 1323 b'pushkey', mandatoryparams=pycompat.iteritems(params)
1324 1324 )
1325 1325
1326 1326
1327 1327 def bundle2pushkey(orig, op, part):
1328 1328 """Wrapper of bundle2.handlepushkey()
1329 1329
1330 1330 The only goal is to skip calling the original function if flag is set.
1331 1331 It's set if infinitepush push is happening.
1332 1332 """
1333 1333 if op.records[scratchbranchparttype + b'_skippushkey']:
1334 1334 if op.reply is not None:
1335 1335 rpart = op.reply.newpart(b'reply:pushkey')
1336 1336 rpart.addparam(b'in-reply-to', str(part.id), mandatory=False)
1337 1337 rpart.addparam(b'return', b'1', mandatory=False)
1338 1338 return 1
1339 1339
1340 1340 return orig(op, part)
1341 1341
1342 1342
1343 1343 def bundle2handlephases(orig, op, part):
1344 1344 """Wrapper of bundle2.handlephases()
1345 1345
1346 1346 The only goal is to skip calling the original function if flag is set.
1347 1347 It's set if infinitepush push is happening.
1348 1348 """
1349 1349
1350 1350 if op.records[scratchbranchparttype + b'_skipphaseheads']:
1351 1351 return
1352 1352
1353 1353 return orig(op, part)
1354 1354
1355 1355
1356 1356 def _asyncsavemetadata(root, nodes):
1357 1357 """starts a separate process that fills metadata for the nodes
1358 1358
1359 1359 This function creates a separate process and doesn't wait for it's
1360 1360 completion. This was done to avoid slowing down pushes
1361 1361 """
1362 1362
1363 1363 maxnodes = 50
1364 1364 if len(nodes) > maxnodes:
1365 1365 return
1366 1366 nodesargs = []
1367 1367 for node in nodes:
1368 1368 nodesargs.append(b'--node')
1369 1369 nodesargs.append(node)
1370 1370 with open(os.devnull, b'w+b') as devnull:
1371 1371 cmdline = [
1372 1372 util.hgexecutable(),
1373 1373 b'debugfillinfinitepushmetadata',
1374 1374 b'-R',
1375 1375 root,
1376 1376 ] + nodesargs
1377 1377 # Process will run in background. We don't care about the return code
1378 1378 subprocess.Popen(
1379 1379 pycompat.rapply(procutil.tonativestr, cmdline),
1380 1380 close_fds=True,
1381 1381 shell=False,
1382 1382 stdin=devnull,
1383 1383 stdout=devnull,
1384 1384 stderr=devnull,
1385 1385 )
General Comments 0
You need to be logged in to leave comments. Login now