##// END OF EJS Templates
streamclone: move code out of exchange.py...
Gregory Szorc -
r26443:d947086d default
parent child Browse files
Show More
@@ -1,1598 +1,1469 b''
1 # exchange.py - utility to exchange data between repos.
1 # exchange.py - utility to exchange data between repos.
2 #
2 #
3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 import time
9 from i18n import _
8 from i18n import _
10 from node import hex, nullid
9 from node import hex, nullid
11 import errno, urllib
10 import errno, urllib
12 import util, scmutil, changegroup, base85, error, store
11 import util, scmutil, changegroup, base85, error
13 import discovery, phases, obsolete, bookmarks as bookmod, bundle2, pushkey
12 import discovery, phases, obsolete, bookmarks as bookmod, bundle2, pushkey
14 import lock as lockmod
13 import lock as lockmod
15 import tags
14 import tags
16
15
17 def readbundle(ui, fh, fname, vfs=None):
16 def readbundle(ui, fh, fname, vfs=None):
18 header = changegroup.readexactly(fh, 4)
17 header = changegroup.readexactly(fh, 4)
19
18
20 alg = None
19 alg = None
21 if not fname:
20 if not fname:
22 fname = "stream"
21 fname = "stream"
23 if not header.startswith('HG') and header.startswith('\0'):
22 if not header.startswith('HG') and header.startswith('\0'):
24 fh = changegroup.headerlessfixup(fh, header)
23 fh = changegroup.headerlessfixup(fh, header)
25 header = "HG10"
24 header = "HG10"
26 alg = 'UN'
25 alg = 'UN'
27 elif vfs:
26 elif vfs:
28 fname = vfs.join(fname)
27 fname = vfs.join(fname)
29
28
30 magic, version = header[0:2], header[2:4]
29 magic, version = header[0:2], header[2:4]
31
30
32 if magic != 'HG':
31 if magic != 'HG':
33 raise util.Abort(_('%s: not a Mercurial bundle') % fname)
32 raise util.Abort(_('%s: not a Mercurial bundle') % fname)
34 if version == '10':
33 if version == '10':
35 if alg is None:
34 if alg is None:
36 alg = changegroup.readexactly(fh, 2)
35 alg = changegroup.readexactly(fh, 2)
37 return changegroup.cg1unpacker(fh, alg)
36 return changegroup.cg1unpacker(fh, alg)
38 elif version.startswith('2'):
37 elif version.startswith('2'):
39 return bundle2.getunbundler(ui, fh, magicstring=magic + version)
38 return bundle2.getunbundler(ui, fh, magicstring=magic + version)
40 else:
39 else:
41 raise util.Abort(_('%s: unknown bundle version %s') % (fname, version))
40 raise util.Abort(_('%s: unknown bundle version %s') % (fname, version))
42
41
43 def buildobsmarkerspart(bundler, markers):
42 def buildobsmarkerspart(bundler, markers):
44 """add an obsmarker part to the bundler with <markers>
43 """add an obsmarker part to the bundler with <markers>
45
44
46 No part is created if markers is empty.
45 No part is created if markers is empty.
47 Raises ValueError if the bundler doesn't support any known obsmarker format.
46 Raises ValueError if the bundler doesn't support any known obsmarker format.
48 """
47 """
49 if markers:
48 if markers:
50 remoteversions = bundle2.obsmarkersversion(bundler.capabilities)
49 remoteversions = bundle2.obsmarkersversion(bundler.capabilities)
51 version = obsolete.commonversion(remoteversions)
50 version = obsolete.commonversion(remoteversions)
52 if version is None:
51 if version is None:
53 raise ValueError('bundler do not support common obsmarker format')
52 raise ValueError('bundler do not support common obsmarker format')
54 stream = obsolete.encodemarkers(markers, True, version=version)
53 stream = obsolete.encodemarkers(markers, True, version=version)
55 return bundler.newpart('obsmarkers', data=stream)
54 return bundler.newpart('obsmarkers', data=stream)
56 return None
55 return None
57
56
58 def _canusebundle2(op):
57 def _canusebundle2(op):
59 """return true if a pull/push can use bundle2
58 """return true if a pull/push can use bundle2
60
59
61 Feel free to nuke this function when we drop the experimental option"""
60 Feel free to nuke this function when we drop the experimental option"""
62 return (op.repo.ui.configbool('experimental', 'bundle2-exp', True)
61 return (op.repo.ui.configbool('experimental', 'bundle2-exp', True)
63 and op.remote.capable('bundle2'))
62 and op.remote.capable('bundle2'))
64
63
65
64
66 class pushoperation(object):
65 class pushoperation(object):
67 """A object that represent a single push operation
66 """A object that represent a single push operation
68
67
69 It purpose is to carry push related state and very common operation.
68 It purpose is to carry push related state and very common operation.
70
69
71 A new should be created at the beginning of each push and discarded
70 A new should be created at the beginning of each push and discarded
72 afterward.
71 afterward.
73 """
72 """
74
73
75 def __init__(self, repo, remote, force=False, revs=None, newbranch=False,
74 def __init__(self, repo, remote, force=False, revs=None, newbranch=False,
76 bookmarks=()):
75 bookmarks=()):
77 # repo we push from
76 # repo we push from
78 self.repo = repo
77 self.repo = repo
79 self.ui = repo.ui
78 self.ui = repo.ui
80 # repo we push to
79 # repo we push to
81 self.remote = remote
80 self.remote = remote
82 # force option provided
81 # force option provided
83 self.force = force
82 self.force = force
84 # revs to be pushed (None is "all")
83 # revs to be pushed (None is "all")
85 self.revs = revs
84 self.revs = revs
86 # bookmark explicitly pushed
85 # bookmark explicitly pushed
87 self.bookmarks = bookmarks
86 self.bookmarks = bookmarks
88 # allow push of new branch
87 # allow push of new branch
89 self.newbranch = newbranch
88 self.newbranch = newbranch
90 # did a local lock get acquired?
89 # did a local lock get acquired?
91 self.locallocked = None
90 self.locallocked = None
92 # step already performed
91 # step already performed
93 # (used to check what steps have been already performed through bundle2)
92 # (used to check what steps have been already performed through bundle2)
94 self.stepsdone = set()
93 self.stepsdone = set()
95 # Integer version of the changegroup push result
94 # Integer version of the changegroup push result
96 # - None means nothing to push
95 # - None means nothing to push
97 # - 0 means HTTP error
96 # - 0 means HTTP error
98 # - 1 means we pushed and remote head count is unchanged *or*
97 # - 1 means we pushed and remote head count is unchanged *or*
99 # we have outgoing changesets but refused to push
98 # we have outgoing changesets but refused to push
100 # - other values as described by addchangegroup()
99 # - other values as described by addchangegroup()
101 self.cgresult = None
100 self.cgresult = None
102 # Boolean value for the bookmark push
101 # Boolean value for the bookmark push
103 self.bkresult = None
102 self.bkresult = None
104 # discover.outgoing object (contains common and outgoing data)
103 # discover.outgoing object (contains common and outgoing data)
105 self.outgoing = None
104 self.outgoing = None
106 # all remote heads before the push
105 # all remote heads before the push
107 self.remoteheads = None
106 self.remoteheads = None
108 # testable as a boolean indicating if any nodes are missing locally.
107 # testable as a boolean indicating if any nodes are missing locally.
109 self.incoming = None
108 self.incoming = None
110 # phases changes that must be pushed along side the changesets
109 # phases changes that must be pushed along side the changesets
111 self.outdatedphases = None
110 self.outdatedphases = None
112 # phases changes that must be pushed if changeset push fails
111 # phases changes that must be pushed if changeset push fails
113 self.fallbackoutdatedphases = None
112 self.fallbackoutdatedphases = None
114 # outgoing obsmarkers
113 # outgoing obsmarkers
115 self.outobsmarkers = set()
114 self.outobsmarkers = set()
116 # outgoing bookmarks
115 # outgoing bookmarks
117 self.outbookmarks = []
116 self.outbookmarks = []
118 # transaction manager
117 # transaction manager
119 self.trmanager = None
118 self.trmanager = None
120 # map { pushkey partid -> callback handling failure}
119 # map { pushkey partid -> callback handling failure}
121 # used to handle exception from mandatory pushkey part failure
120 # used to handle exception from mandatory pushkey part failure
122 self.pkfailcb = {}
121 self.pkfailcb = {}
123
122
124 @util.propertycache
123 @util.propertycache
125 def futureheads(self):
124 def futureheads(self):
126 """future remote heads if the changeset push succeeds"""
125 """future remote heads if the changeset push succeeds"""
127 return self.outgoing.missingheads
126 return self.outgoing.missingheads
128
127
129 @util.propertycache
128 @util.propertycache
130 def fallbackheads(self):
129 def fallbackheads(self):
131 """future remote heads if the changeset push fails"""
130 """future remote heads if the changeset push fails"""
132 if self.revs is None:
131 if self.revs is None:
133 # not target to push, all common are relevant
132 # not target to push, all common are relevant
134 return self.outgoing.commonheads
133 return self.outgoing.commonheads
135 unfi = self.repo.unfiltered()
134 unfi = self.repo.unfiltered()
136 # I want cheads = heads(::missingheads and ::commonheads)
135 # I want cheads = heads(::missingheads and ::commonheads)
137 # (missingheads is revs with secret changeset filtered out)
136 # (missingheads is revs with secret changeset filtered out)
138 #
137 #
139 # This can be expressed as:
138 # This can be expressed as:
140 # cheads = ( (missingheads and ::commonheads)
139 # cheads = ( (missingheads and ::commonheads)
141 # + (commonheads and ::missingheads))"
140 # + (commonheads and ::missingheads))"
142 # )
141 # )
143 #
142 #
144 # while trying to push we already computed the following:
143 # while trying to push we already computed the following:
145 # common = (::commonheads)
144 # common = (::commonheads)
146 # missing = ((commonheads::missingheads) - commonheads)
145 # missing = ((commonheads::missingheads) - commonheads)
147 #
146 #
148 # We can pick:
147 # We can pick:
149 # * missingheads part of common (::commonheads)
148 # * missingheads part of common (::commonheads)
150 common = self.outgoing.common
149 common = self.outgoing.common
151 nm = self.repo.changelog.nodemap
150 nm = self.repo.changelog.nodemap
152 cheads = [node for node in self.revs if nm[node] in common]
151 cheads = [node for node in self.revs if nm[node] in common]
153 # and
152 # and
154 # * commonheads parents on missing
153 # * commonheads parents on missing
155 revset = unfi.set('%ln and parents(roots(%ln))',
154 revset = unfi.set('%ln and parents(roots(%ln))',
156 self.outgoing.commonheads,
155 self.outgoing.commonheads,
157 self.outgoing.missing)
156 self.outgoing.missing)
158 cheads.extend(c.node() for c in revset)
157 cheads.extend(c.node() for c in revset)
159 return cheads
158 return cheads
160
159
161 @property
160 @property
162 def commonheads(self):
161 def commonheads(self):
163 """set of all common heads after changeset bundle push"""
162 """set of all common heads after changeset bundle push"""
164 if self.cgresult:
163 if self.cgresult:
165 return self.futureheads
164 return self.futureheads
166 else:
165 else:
167 return self.fallbackheads
166 return self.fallbackheads
168
167
169 # mapping of message used when pushing bookmark
168 # mapping of message used when pushing bookmark
170 bookmsgmap = {'update': (_("updating bookmark %s\n"),
169 bookmsgmap = {'update': (_("updating bookmark %s\n"),
171 _('updating bookmark %s failed!\n')),
170 _('updating bookmark %s failed!\n')),
172 'export': (_("exporting bookmark %s\n"),
171 'export': (_("exporting bookmark %s\n"),
173 _('exporting bookmark %s failed!\n')),
172 _('exporting bookmark %s failed!\n')),
174 'delete': (_("deleting remote bookmark %s\n"),
173 'delete': (_("deleting remote bookmark %s\n"),
175 _('deleting remote bookmark %s failed!\n')),
174 _('deleting remote bookmark %s failed!\n')),
176 }
175 }
177
176
178
177
179 def push(repo, remote, force=False, revs=None, newbranch=False, bookmarks=()):
178 def push(repo, remote, force=False, revs=None, newbranch=False, bookmarks=()):
180 '''Push outgoing changesets (limited by revs) from a local
179 '''Push outgoing changesets (limited by revs) from a local
181 repository to remote. Return an integer:
180 repository to remote. Return an integer:
182 - None means nothing to push
181 - None means nothing to push
183 - 0 means HTTP error
182 - 0 means HTTP error
184 - 1 means we pushed and remote head count is unchanged *or*
183 - 1 means we pushed and remote head count is unchanged *or*
185 we have outgoing changesets but refused to push
184 we have outgoing changesets but refused to push
186 - other values as described by addchangegroup()
185 - other values as described by addchangegroup()
187 '''
186 '''
188 pushop = pushoperation(repo, remote, force, revs, newbranch, bookmarks)
187 pushop = pushoperation(repo, remote, force, revs, newbranch, bookmarks)
189 if pushop.remote.local():
188 if pushop.remote.local():
190 missing = (set(pushop.repo.requirements)
189 missing = (set(pushop.repo.requirements)
191 - pushop.remote.local().supported)
190 - pushop.remote.local().supported)
192 if missing:
191 if missing:
193 msg = _("required features are not"
192 msg = _("required features are not"
194 " supported in the destination:"
193 " supported in the destination:"
195 " %s") % (', '.join(sorted(missing)))
194 " %s") % (', '.join(sorted(missing)))
196 raise util.Abort(msg)
195 raise util.Abort(msg)
197
196
198 # there are two ways to push to remote repo:
197 # there are two ways to push to remote repo:
199 #
198 #
200 # addchangegroup assumes local user can lock remote
199 # addchangegroup assumes local user can lock remote
201 # repo (local filesystem, old ssh servers).
200 # repo (local filesystem, old ssh servers).
202 #
201 #
203 # unbundle assumes local user cannot lock remote repo (new ssh
202 # unbundle assumes local user cannot lock remote repo (new ssh
204 # servers, http servers).
203 # servers, http servers).
205
204
206 if not pushop.remote.canpush():
205 if not pushop.remote.canpush():
207 raise util.Abort(_("destination does not support push"))
206 raise util.Abort(_("destination does not support push"))
208 # get local lock as we might write phase data
207 # get local lock as we might write phase data
209 localwlock = locallock = None
208 localwlock = locallock = None
210 try:
209 try:
211 # bundle2 push may receive a reply bundle touching bookmarks or other
210 # bundle2 push may receive a reply bundle touching bookmarks or other
212 # things requiring the wlock. Take it now to ensure proper ordering.
211 # things requiring the wlock. Take it now to ensure proper ordering.
213 maypushback = pushop.ui.configbool('experimental', 'bundle2.pushback')
212 maypushback = pushop.ui.configbool('experimental', 'bundle2.pushback')
214 if _canusebundle2(pushop) and maypushback:
213 if _canusebundle2(pushop) and maypushback:
215 localwlock = pushop.repo.wlock()
214 localwlock = pushop.repo.wlock()
216 locallock = pushop.repo.lock()
215 locallock = pushop.repo.lock()
217 pushop.locallocked = True
216 pushop.locallocked = True
218 except IOError as err:
217 except IOError as err:
219 pushop.locallocked = False
218 pushop.locallocked = False
220 if err.errno != errno.EACCES:
219 if err.errno != errno.EACCES:
221 raise
220 raise
222 # source repo cannot be locked.
221 # source repo cannot be locked.
223 # We do not abort the push, but just disable the local phase
222 # We do not abort the push, but just disable the local phase
224 # synchronisation.
223 # synchronisation.
225 msg = 'cannot lock source repository: %s\n' % err
224 msg = 'cannot lock source repository: %s\n' % err
226 pushop.ui.debug(msg)
225 pushop.ui.debug(msg)
227 try:
226 try:
228 if pushop.locallocked:
227 if pushop.locallocked:
229 pushop.trmanager = transactionmanager(repo,
228 pushop.trmanager = transactionmanager(repo,
230 'push-response',
229 'push-response',
231 pushop.remote.url())
230 pushop.remote.url())
232 pushop.repo.checkpush(pushop)
231 pushop.repo.checkpush(pushop)
233 lock = None
232 lock = None
234 unbundle = pushop.remote.capable('unbundle')
233 unbundle = pushop.remote.capable('unbundle')
235 if not unbundle:
234 if not unbundle:
236 lock = pushop.remote.lock()
235 lock = pushop.remote.lock()
237 try:
236 try:
238 _pushdiscovery(pushop)
237 _pushdiscovery(pushop)
239 if _canusebundle2(pushop):
238 if _canusebundle2(pushop):
240 _pushbundle2(pushop)
239 _pushbundle2(pushop)
241 _pushchangeset(pushop)
240 _pushchangeset(pushop)
242 _pushsyncphase(pushop)
241 _pushsyncphase(pushop)
243 _pushobsolete(pushop)
242 _pushobsolete(pushop)
244 _pushbookmark(pushop)
243 _pushbookmark(pushop)
245 finally:
244 finally:
246 if lock is not None:
245 if lock is not None:
247 lock.release()
246 lock.release()
248 if pushop.trmanager:
247 if pushop.trmanager:
249 pushop.trmanager.close()
248 pushop.trmanager.close()
250 finally:
249 finally:
251 if pushop.trmanager:
250 if pushop.trmanager:
252 pushop.trmanager.release()
251 pushop.trmanager.release()
253 if locallock is not None:
252 if locallock is not None:
254 locallock.release()
253 locallock.release()
255 if localwlock is not None:
254 if localwlock is not None:
256 localwlock.release()
255 localwlock.release()
257
256
258 return pushop
257 return pushop
259
258
260 # list of steps to perform discovery before push
259 # list of steps to perform discovery before push
261 pushdiscoveryorder = []
260 pushdiscoveryorder = []
262
261
263 # Mapping between step name and function
262 # Mapping between step name and function
264 #
263 #
265 # This exists to help extensions wrap steps if necessary
264 # This exists to help extensions wrap steps if necessary
266 pushdiscoverymapping = {}
265 pushdiscoverymapping = {}
267
266
268 def pushdiscovery(stepname):
267 def pushdiscovery(stepname):
269 """decorator for function performing discovery before push
268 """decorator for function performing discovery before push
270
269
271 The function is added to the step -> function mapping and appended to the
270 The function is added to the step -> function mapping and appended to the
272 list of steps. Beware that decorated function will be added in order (this
271 list of steps. Beware that decorated function will be added in order (this
273 may matter).
272 may matter).
274
273
275 You can only use this decorator for a new step, if you want to wrap a step
274 You can only use this decorator for a new step, if you want to wrap a step
276 from an extension, change the pushdiscovery dictionary directly."""
275 from an extension, change the pushdiscovery dictionary directly."""
277 def dec(func):
276 def dec(func):
278 assert stepname not in pushdiscoverymapping
277 assert stepname not in pushdiscoverymapping
279 pushdiscoverymapping[stepname] = func
278 pushdiscoverymapping[stepname] = func
280 pushdiscoveryorder.append(stepname)
279 pushdiscoveryorder.append(stepname)
281 return func
280 return func
282 return dec
281 return dec
283
282
284 def _pushdiscovery(pushop):
283 def _pushdiscovery(pushop):
285 """Run all discovery steps"""
284 """Run all discovery steps"""
286 for stepname in pushdiscoveryorder:
285 for stepname in pushdiscoveryorder:
287 step = pushdiscoverymapping[stepname]
286 step = pushdiscoverymapping[stepname]
288 step(pushop)
287 step(pushop)
289
288
290 @pushdiscovery('changeset')
289 @pushdiscovery('changeset')
291 def _pushdiscoverychangeset(pushop):
290 def _pushdiscoverychangeset(pushop):
292 """discover the changeset that need to be pushed"""
291 """discover the changeset that need to be pushed"""
293 fci = discovery.findcommonincoming
292 fci = discovery.findcommonincoming
294 commoninc = fci(pushop.repo, pushop.remote, force=pushop.force)
293 commoninc = fci(pushop.repo, pushop.remote, force=pushop.force)
295 common, inc, remoteheads = commoninc
294 common, inc, remoteheads = commoninc
296 fco = discovery.findcommonoutgoing
295 fco = discovery.findcommonoutgoing
297 outgoing = fco(pushop.repo, pushop.remote, onlyheads=pushop.revs,
296 outgoing = fco(pushop.repo, pushop.remote, onlyheads=pushop.revs,
298 commoninc=commoninc, force=pushop.force)
297 commoninc=commoninc, force=pushop.force)
299 pushop.outgoing = outgoing
298 pushop.outgoing = outgoing
300 pushop.remoteheads = remoteheads
299 pushop.remoteheads = remoteheads
301 pushop.incoming = inc
300 pushop.incoming = inc
302
301
303 @pushdiscovery('phase')
302 @pushdiscovery('phase')
304 def _pushdiscoveryphase(pushop):
303 def _pushdiscoveryphase(pushop):
305 """discover the phase that needs to be pushed
304 """discover the phase that needs to be pushed
306
305
307 (computed for both success and failure case for changesets push)"""
306 (computed for both success and failure case for changesets push)"""
308 outgoing = pushop.outgoing
307 outgoing = pushop.outgoing
309 unfi = pushop.repo.unfiltered()
308 unfi = pushop.repo.unfiltered()
310 remotephases = pushop.remote.listkeys('phases')
309 remotephases = pushop.remote.listkeys('phases')
311 publishing = remotephases.get('publishing', False)
310 publishing = remotephases.get('publishing', False)
312 if (pushop.ui.configbool('ui', '_usedassubrepo', False)
311 if (pushop.ui.configbool('ui', '_usedassubrepo', False)
313 and remotephases # server supports phases
312 and remotephases # server supports phases
314 and not pushop.outgoing.missing # no changesets to be pushed
313 and not pushop.outgoing.missing # no changesets to be pushed
315 and publishing):
314 and publishing):
316 # When:
315 # When:
317 # - this is a subrepo push
316 # - this is a subrepo push
318 # - and remote support phase
317 # - and remote support phase
319 # - and no changeset are to be pushed
318 # - and no changeset are to be pushed
320 # - and remote is publishing
319 # - and remote is publishing
321 # We may be in issue 3871 case!
320 # We may be in issue 3871 case!
322 # We drop the possible phase synchronisation done by
321 # We drop the possible phase synchronisation done by
323 # courtesy to publish changesets possibly locally draft
322 # courtesy to publish changesets possibly locally draft
324 # on the remote.
323 # on the remote.
325 remotephases = {'publishing': 'True'}
324 remotephases = {'publishing': 'True'}
326 ana = phases.analyzeremotephases(pushop.repo,
325 ana = phases.analyzeremotephases(pushop.repo,
327 pushop.fallbackheads,
326 pushop.fallbackheads,
328 remotephases)
327 remotephases)
329 pheads, droots = ana
328 pheads, droots = ana
330 extracond = ''
329 extracond = ''
331 if not publishing:
330 if not publishing:
332 extracond = ' and public()'
331 extracond = ' and public()'
333 revset = 'heads((%%ln::%%ln) %s)' % extracond
332 revset = 'heads((%%ln::%%ln) %s)' % extracond
334 # Get the list of all revs draft on remote by public here.
333 # Get the list of all revs draft on remote by public here.
335 # XXX Beware that revset break if droots is not strictly
334 # XXX Beware that revset break if droots is not strictly
336 # XXX root we may want to ensure it is but it is costly
335 # XXX root we may want to ensure it is but it is costly
337 fallback = list(unfi.set(revset, droots, pushop.fallbackheads))
336 fallback = list(unfi.set(revset, droots, pushop.fallbackheads))
338 if not outgoing.missing:
337 if not outgoing.missing:
339 future = fallback
338 future = fallback
340 else:
339 else:
341 # adds changeset we are going to push as draft
340 # adds changeset we are going to push as draft
342 #
341 #
343 # should not be necessary for publishing server, but because of an
342 # should not be necessary for publishing server, but because of an
344 # issue fixed in xxxxx we have to do it anyway.
343 # issue fixed in xxxxx we have to do it anyway.
345 fdroots = list(unfi.set('roots(%ln + %ln::)',
344 fdroots = list(unfi.set('roots(%ln + %ln::)',
346 outgoing.missing, droots))
345 outgoing.missing, droots))
347 fdroots = [f.node() for f in fdroots]
346 fdroots = [f.node() for f in fdroots]
348 future = list(unfi.set(revset, fdroots, pushop.futureheads))
347 future = list(unfi.set(revset, fdroots, pushop.futureheads))
349 pushop.outdatedphases = future
348 pushop.outdatedphases = future
350 pushop.fallbackoutdatedphases = fallback
349 pushop.fallbackoutdatedphases = fallback
351
350
352 @pushdiscovery('obsmarker')
351 @pushdiscovery('obsmarker')
353 def _pushdiscoveryobsmarkers(pushop):
352 def _pushdiscoveryobsmarkers(pushop):
354 if (obsolete.isenabled(pushop.repo, obsolete.exchangeopt)
353 if (obsolete.isenabled(pushop.repo, obsolete.exchangeopt)
355 and pushop.repo.obsstore
354 and pushop.repo.obsstore
356 and 'obsolete' in pushop.remote.listkeys('namespaces')):
355 and 'obsolete' in pushop.remote.listkeys('namespaces')):
357 repo = pushop.repo
356 repo = pushop.repo
358 # very naive computation, that can be quite expensive on big repo.
357 # very naive computation, that can be quite expensive on big repo.
359 # However: evolution is currently slow on them anyway.
358 # However: evolution is currently slow on them anyway.
360 nodes = (c.node() for c in repo.set('::%ln', pushop.futureheads))
359 nodes = (c.node() for c in repo.set('::%ln', pushop.futureheads))
361 pushop.outobsmarkers = pushop.repo.obsstore.relevantmarkers(nodes)
360 pushop.outobsmarkers = pushop.repo.obsstore.relevantmarkers(nodes)
362
361
363 @pushdiscovery('bookmarks')
362 @pushdiscovery('bookmarks')
364 def _pushdiscoverybookmarks(pushop):
363 def _pushdiscoverybookmarks(pushop):
365 ui = pushop.ui
364 ui = pushop.ui
366 repo = pushop.repo.unfiltered()
365 repo = pushop.repo.unfiltered()
367 remote = pushop.remote
366 remote = pushop.remote
368 ui.debug("checking for updated bookmarks\n")
367 ui.debug("checking for updated bookmarks\n")
369 ancestors = ()
368 ancestors = ()
370 if pushop.revs:
369 if pushop.revs:
371 revnums = map(repo.changelog.rev, pushop.revs)
370 revnums = map(repo.changelog.rev, pushop.revs)
372 ancestors = repo.changelog.ancestors(revnums, inclusive=True)
371 ancestors = repo.changelog.ancestors(revnums, inclusive=True)
373 remotebookmark = remote.listkeys('bookmarks')
372 remotebookmark = remote.listkeys('bookmarks')
374
373
375 explicit = set(pushop.bookmarks)
374 explicit = set(pushop.bookmarks)
376
375
377 comp = bookmod.compare(repo, repo._bookmarks, remotebookmark, srchex=hex)
376 comp = bookmod.compare(repo, repo._bookmarks, remotebookmark, srchex=hex)
378 addsrc, adddst, advsrc, advdst, diverge, differ, invalid, same = comp
377 addsrc, adddst, advsrc, advdst, diverge, differ, invalid, same = comp
379 for b, scid, dcid in advsrc:
378 for b, scid, dcid in advsrc:
380 if b in explicit:
379 if b in explicit:
381 explicit.remove(b)
380 explicit.remove(b)
382 if not ancestors or repo[scid].rev() in ancestors:
381 if not ancestors or repo[scid].rev() in ancestors:
383 pushop.outbookmarks.append((b, dcid, scid))
382 pushop.outbookmarks.append((b, dcid, scid))
384 # search added bookmark
383 # search added bookmark
385 for b, scid, dcid in addsrc:
384 for b, scid, dcid in addsrc:
386 if b in explicit:
385 if b in explicit:
387 explicit.remove(b)
386 explicit.remove(b)
388 pushop.outbookmarks.append((b, '', scid))
387 pushop.outbookmarks.append((b, '', scid))
389 # search for overwritten bookmark
388 # search for overwritten bookmark
390 for b, scid, dcid in advdst + diverge + differ:
389 for b, scid, dcid in advdst + diverge + differ:
391 if b in explicit:
390 if b in explicit:
392 explicit.remove(b)
391 explicit.remove(b)
393 pushop.outbookmarks.append((b, dcid, scid))
392 pushop.outbookmarks.append((b, dcid, scid))
394 # search for bookmark to delete
393 # search for bookmark to delete
395 for b, scid, dcid in adddst:
394 for b, scid, dcid in adddst:
396 if b in explicit:
395 if b in explicit:
397 explicit.remove(b)
396 explicit.remove(b)
398 # treat as "deleted locally"
397 # treat as "deleted locally"
399 pushop.outbookmarks.append((b, dcid, ''))
398 pushop.outbookmarks.append((b, dcid, ''))
400 # identical bookmarks shouldn't get reported
399 # identical bookmarks shouldn't get reported
401 for b, scid, dcid in same:
400 for b, scid, dcid in same:
402 if b in explicit:
401 if b in explicit:
403 explicit.remove(b)
402 explicit.remove(b)
404
403
405 if explicit:
404 if explicit:
406 explicit = sorted(explicit)
405 explicit = sorted(explicit)
407 # we should probably list all of them
406 # we should probably list all of them
408 ui.warn(_('bookmark %s does not exist on the local '
407 ui.warn(_('bookmark %s does not exist on the local '
409 'or remote repository!\n') % explicit[0])
408 'or remote repository!\n') % explicit[0])
410 pushop.bkresult = 2
409 pushop.bkresult = 2
411
410
412 pushop.outbookmarks.sort()
411 pushop.outbookmarks.sort()
413
412
414 def _pushcheckoutgoing(pushop):
413 def _pushcheckoutgoing(pushop):
415 outgoing = pushop.outgoing
414 outgoing = pushop.outgoing
416 unfi = pushop.repo.unfiltered()
415 unfi = pushop.repo.unfiltered()
417 if not outgoing.missing:
416 if not outgoing.missing:
418 # nothing to push
417 # nothing to push
419 scmutil.nochangesfound(unfi.ui, unfi, outgoing.excluded)
418 scmutil.nochangesfound(unfi.ui, unfi, outgoing.excluded)
420 return False
419 return False
421 # something to push
420 # something to push
422 if not pushop.force:
421 if not pushop.force:
423 # if repo.obsstore == False --> no obsolete
422 # if repo.obsstore == False --> no obsolete
424 # then, save the iteration
423 # then, save the iteration
425 if unfi.obsstore:
424 if unfi.obsstore:
426 # this message are here for 80 char limit reason
425 # this message are here for 80 char limit reason
427 mso = _("push includes obsolete changeset: %s!")
426 mso = _("push includes obsolete changeset: %s!")
428 mst = {"unstable": _("push includes unstable changeset: %s!"),
427 mst = {"unstable": _("push includes unstable changeset: %s!"),
429 "bumped": _("push includes bumped changeset: %s!"),
428 "bumped": _("push includes bumped changeset: %s!"),
430 "divergent": _("push includes divergent changeset: %s!")}
429 "divergent": _("push includes divergent changeset: %s!")}
431 # If we are to push if there is at least one
430 # If we are to push if there is at least one
432 # obsolete or unstable changeset in missing, at
431 # obsolete or unstable changeset in missing, at
433 # least one of the missinghead will be obsolete or
432 # least one of the missinghead will be obsolete or
434 # unstable. So checking heads only is ok
433 # unstable. So checking heads only is ok
435 for node in outgoing.missingheads:
434 for node in outgoing.missingheads:
436 ctx = unfi[node]
435 ctx = unfi[node]
437 if ctx.obsolete():
436 if ctx.obsolete():
438 raise util.Abort(mso % ctx)
437 raise util.Abort(mso % ctx)
439 elif ctx.troubled():
438 elif ctx.troubled():
440 raise util.Abort(mst[ctx.troubles()[0]] % ctx)
439 raise util.Abort(mst[ctx.troubles()[0]] % ctx)
441
440
442 # internal config: bookmarks.pushing
441 # internal config: bookmarks.pushing
443 newbm = pushop.ui.configlist('bookmarks', 'pushing')
442 newbm = pushop.ui.configlist('bookmarks', 'pushing')
444 discovery.checkheads(unfi, pushop.remote, outgoing,
443 discovery.checkheads(unfi, pushop.remote, outgoing,
445 pushop.remoteheads,
444 pushop.remoteheads,
446 pushop.newbranch,
445 pushop.newbranch,
447 bool(pushop.incoming),
446 bool(pushop.incoming),
448 newbm)
447 newbm)
449 return True
448 return True
450
449
451 # List of names of steps to perform for an outgoing bundle2, order matters.
450 # List of names of steps to perform for an outgoing bundle2, order matters.
452 b2partsgenorder = []
451 b2partsgenorder = []
453
452
454 # Mapping between step name and function
453 # Mapping between step name and function
455 #
454 #
456 # This exists to help extensions wrap steps if necessary
455 # This exists to help extensions wrap steps if necessary
457 b2partsgenmapping = {}
456 b2partsgenmapping = {}
458
457
459 def b2partsgenerator(stepname, idx=None):
458 def b2partsgenerator(stepname, idx=None):
460 """decorator for function generating bundle2 part
459 """decorator for function generating bundle2 part
461
460
462 The function is added to the step -> function mapping and appended to the
461 The function is added to the step -> function mapping and appended to the
463 list of steps. Beware that decorated functions will be added in order
462 list of steps. Beware that decorated functions will be added in order
464 (this may matter).
463 (this may matter).
465
464
466 You can only use this decorator for new steps, if you want to wrap a step
465 You can only use this decorator for new steps, if you want to wrap a step
467 from an extension, attack the b2partsgenmapping dictionary directly."""
466 from an extension, attack the b2partsgenmapping dictionary directly."""
468 def dec(func):
467 def dec(func):
469 assert stepname not in b2partsgenmapping
468 assert stepname not in b2partsgenmapping
470 b2partsgenmapping[stepname] = func
469 b2partsgenmapping[stepname] = func
471 if idx is None:
470 if idx is None:
472 b2partsgenorder.append(stepname)
471 b2partsgenorder.append(stepname)
473 else:
472 else:
474 b2partsgenorder.insert(idx, stepname)
473 b2partsgenorder.insert(idx, stepname)
475 return func
474 return func
476 return dec
475 return dec
477
476
478 def _pushb2ctxcheckheads(pushop, bundler):
477 def _pushb2ctxcheckheads(pushop, bundler):
479 """Generate race condition checking parts
478 """Generate race condition checking parts
480
479
481 Exists as an indepedent function to aid extensions
480 Exists as an indepedent function to aid extensions
482 """
481 """
483 if not pushop.force:
482 if not pushop.force:
484 bundler.newpart('check:heads', data=iter(pushop.remoteheads))
483 bundler.newpart('check:heads', data=iter(pushop.remoteheads))
485
484
486 @b2partsgenerator('changeset')
485 @b2partsgenerator('changeset')
487 def _pushb2ctx(pushop, bundler):
486 def _pushb2ctx(pushop, bundler):
488 """handle changegroup push through bundle2
487 """handle changegroup push through bundle2
489
488
490 addchangegroup result is stored in the ``pushop.cgresult`` attribute.
489 addchangegroup result is stored in the ``pushop.cgresult`` attribute.
491 """
490 """
492 if 'changesets' in pushop.stepsdone:
491 if 'changesets' in pushop.stepsdone:
493 return
492 return
494 pushop.stepsdone.add('changesets')
493 pushop.stepsdone.add('changesets')
495 # Send known heads to the server for race detection.
494 # Send known heads to the server for race detection.
496 if not _pushcheckoutgoing(pushop):
495 if not _pushcheckoutgoing(pushop):
497 return
496 return
498 pushop.repo.prepushoutgoinghooks(pushop.repo,
497 pushop.repo.prepushoutgoinghooks(pushop.repo,
499 pushop.remote,
498 pushop.remote,
500 pushop.outgoing)
499 pushop.outgoing)
501
500
502 _pushb2ctxcheckheads(pushop, bundler)
501 _pushb2ctxcheckheads(pushop, bundler)
503
502
504 b2caps = bundle2.bundle2caps(pushop.remote)
503 b2caps = bundle2.bundle2caps(pushop.remote)
505 version = None
504 version = None
506 cgversions = b2caps.get('changegroup')
505 cgversions = b2caps.get('changegroup')
507 if not cgversions: # 3.1 and 3.2 ship with an empty value
506 if not cgversions: # 3.1 and 3.2 ship with an empty value
508 cg = changegroup.getlocalchangegroupraw(pushop.repo, 'push',
507 cg = changegroup.getlocalchangegroupraw(pushop.repo, 'push',
509 pushop.outgoing)
508 pushop.outgoing)
510 else:
509 else:
511 cgversions = [v for v in cgversions if v in changegroup.packermap]
510 cgversions = [v for v in cgversions if v in changegroup.packermap]
512 if not cgversions:
511 if not cgversions:
513 raise ValueError(_('no common changegroup version'))
512 raise ValueError(_('no common changegroup version'))
514 version = max(cgversions)
513 version = max(cgversions)
515 cg = changegroup.getlocalchangegroupraw(pushop.repo, 'push',
514 cg = changegroup.getlocalchangegroupraw(pushop.repo, 'push',
516 pushop.outgoing,
515 pushop.outgoing,
517 version=version)
516 version=version)
518 cgpart = bundler.newpart('changegroup', data=cg)
517 cgpart = bundler.newpart('changegroup', data=cg)
519 if version is not None:
518 if version is not None:
520 cgpart.addparam('version', version)
519 cgpart.addparam('version', version)
521 def handlereply(op):
520 def handlereply(op):
522 """extract addchangegroup returns from server reply"""
521 """extract addchangegroup returns from server reply"""
523 cgreplies = op.records.getreplies(cgpart.id)
522 cgreplies = op.records.getreplies(cgpart.id)
524 assert len(cgreplies['changegroup']) == 1
523 assert len(cgreplies['changegroup']) == 1
525 pushop.cgresult = cgreplies['changegroup'][0]['return']
524 pushop.cgresult = cgreplies['changegroup'][0]['return']
526 return handlereply
525 return handlereply
527
526
528 @b2partsgenerator('phase')
527 @b2partsgenerator('phase')
529 def _pushb2phases(pushop, bundler):
528 def _pushb2phases(pushop, bundler):
530 """handle phase push through bundle2"""
529 """handle phase push through bundle2"""
531 if 'phases' in pushop.stepsdone:
530 if 'phases' in pushop.stepsdone:
532 return
531 return
533 b2caps = bundle2.bundle2caps(pushop.remote)
532 b2caps = bundle2.bundle2caps(pushop.remote)
534 if not 'pushkey' in b2caps:
533 if not 'pushkey' in b2caps:
535 return
534 return
536 pushop.stepsdone.add('phases')
535 pushop.stepsdone.add('phases')
537 part2node = []
536 part2node = []
538
537
539 def handlefailure(pushop, exc):
538 def handlefailure(pushop, exc):
540 targetid = int(exc.partid)
539 targetid = int(exc.partid)
541 for partid, node in part2node:
540 for partid, node in part2node:
542 if partid == targetid:
541 if partid == targetid:
543 raise error.Abort(_('updating %s to public failed') % node)
542 raise error.Abort(_('updating %s to public failed') % node)
544
543
545 enc = pushkey.encode
544 enc = pushkey.encode
546 for newremotehead in pushop.outdatedphases:
545 for newremotehead in pushop.outdatedphases:
547 part = bundler.newpart('pushkey')
546 part = bundler.newpart('pushkey')
548 part.addparam('namespace', enc('phases'))
547 part.addparam('namespace', enc('phases'))
549 part.addparam('key', enc(newremotehead.hex()))
548 part.addparam('key', enc(newremotehead.hex()))
550 part.addparam('old', enc(str(phases.draft)))
549 part.addparam('old', enc(str(phases.draft)))
551 part.addparam('new', enc(str(phases.public)))
550 part.addparam('new', enc(str(phases.public)))
552 part2node.append((part.id, newremotehead))
551 part2node.append((part.id, newremotehead))
553 pushop.pkfailcb[part.id] = handlefailure
552 pushop.pkfailcb[part.id] = handlefailure
554
553
555 def handlereply(op):
554 def handlereply(op):
556 for partid, node in part2node:
555 for partid, node in part2node:
557 partrep = op.records.getreplies(partid)
556 partrep = op.records.getreplies(partid)
558 results = partrep['pushkey']
557 results = partrep['pushkey']
559 assert len(results) <= 1
558 assert len(results) <= 1
560 msg = None
559 msg = None
561 if not results:
560 if not results:
562 msg = _('server ignored update of %s to public!\n') % node
561 msg = _('server ignored update of %s to public!\n') % node
563 elif not int(results[0]['return']):
562 elif not int(results[0]['return']):
564 msg = _('updating %s to public failed!\n') % node
563 msg = _('updating %s to public failed!\n') % node
565 if msg is not None:
564 if msg is not None:
566 pushop.ui.warn(msg)
565 pushop.ui.warn(msg)
567 return handlereply
566 return handlereply
568
567
569 @b2partsgenerator('obsmarkers')
568 @b2partsgenerator('obsmarkers')
570 def _pushb2obsmarkers(pushop, bundler):
569 def _pushb2obsmarkers(pushop, bundler):
571 if 'obsmarkers' in pushop.stepsdone:
570 if 'obsmarkers' in pushop.stepsdone:
572 return
571 return
573 remoteversions = bundle2.obsmarkersversion(bundler.capabilities)
572 remoteversions = bundle2.obsmarkersversion(bundler.capabilities)
574 if obsolete.commonversion(remoteversions) is None:
573 if obsolete.commonversion(remoteversions) is None:
575 return
574 return
576 pushop.stepsdone.add('obsmarkers')
575 pushop.stepsdone.add('obsmarkers')
577 if pushop.outobsmarkers:
576 if pushop.outobsmarkers:
578 markers = sorted(pushop.outobsmarkers)
577 markers = sorted(pushop.outobsmarkers)
579 buildobsmarkerspart(bundler, markers)
578 buildobsmarkerspart(bundler, markers)
580
579
581 @b2partsgenerator('bookmarks')
580 @b2partsgenerator('bookmarks')
582 def _pushb2bookmarks(pushop, bundler):
581 def _pushb2bookmarks(pushop, bundler):
583 """handle bookmark push through bundle2"""
582 """handle bookmark push through bundle2"""
584 if 'bookmarks' in pushop.stepsdone:
583 if 'bookmarks' in pushop.stepsdone:
585 return
584 return
586 b2caps = bundle2.bundle2caps(pushop.remote)
585 b2caps = bundle2.bundle2caps(pushop.remote)
587 if 'pushkey' not in b2caps:
586 if 'pushkey' not in b2caps:
588 return
587 return
589 pushop.stepsdone.add('bookmarks')
588 pushop.stepsdone.add('bookmarks')
590 part2book = []
589 part2book = []
591 enc = pushkey.encode
590 enc = pushkey.encode
592
591
593 def handlefailure(pushop, exc):
592 def handlefailure(pushop, exc):
594 targetid = int(exc.partid)
593 targetid = int(exc.partid)
595 for partid, book, action in part2book:
594 for partid, book, action in part2book:
596 if partid == targetid:
595 if partid == targetid:
597 raise error.Abort(bookmsgmap[action][1].rstrip() % book)
596 raise error.Abort(bookmsgmap[action][1].rstrip() % book)
598 # we should not be called for part we did not generated
597 # we should not be called for part we did not generated
599 assert False
598 assert False
600
599
601 for book, old, new in pushop.outbookmarks:
600 for book, old, new in pushop.outbookmarks:
602 part = bundler.newpart('pushkey')
601 part = bundler.newpart('pushkey')
603 part.addparam('namespace', enc('bookmarks'))
602 part.addparam('namespace', enc('bookmarks'))
604 part.addparam('key', enc(book))
603 part.addparam('key', enc(book))
605 part.addparam('old', enc(old))
604 part.addparam('old', enc(old))
606 part.addparam('new', enc(new))
605 part.addparam('new', enc(new))
607 action = 'update'
606 action = 'update'
608 if not old:
607 if not old:
609 action = 'export'
608 action = 'export'
610 elif not new:
609 elif not new:
611 action = 'delete'
610 action = 'delete'
612 part2book.append((part.id, book, action))
611 part2book.append((part.id, book, action))
613 pushop.pkfailcb[part.id] = handlefailure
612 pushop.pkfailcb[part.id] = handlefailure
614
613
615 def handlereply(op):
614 def handlereply(op):
616 ui = pushop.ui
615 ui = pushop.ui
617 for partid, book, action in part2book:
616 for partid, book, action in part2book:
618 partrep = op.records.getreplies(partid)
617 partrep = op.records.getreplies(partid)
619 results = partrep['pushkey']
618 results = partrep['pushkey']
620 assert len(results) <= 1
619 assert len(results) <= 1
621 if not results:
620 if not results:
622 pushop.ui.warn(_('server ignored bookmark %s update\n') % book)
621 pushop.ui.warn(_('server ignored bookmark %s update\n') % book)
623 else:
622 else:
624 ret = int(results[0]['return'])
623 ret = int(results[0]['return'])
625 if ret:
624 if ret:
626 ui.status(bookmsgmap[action][0] % book)
625 ui.status(bookmsgmap[action][0] % book)
627 else:
626 else:
628 ui.warn(bookmsgmap[action][1] % book)
627 ui.warn(bookmsgmap[action][1] % book)
629 if pushop.bkresult is not None:
628 if pushop.bkresult is not None:
630 pushop.bkresult = 1
629 pushop.bkresult = 1
631 return handlereply
630 return handlereply
632
631
633
632
634 def _pushbundle2(pushop):
633 def _pushbundle2(pushop):
635 """push data to the remote using bundle2
634 """push data to the remote using bundle2
636
635
637 The only currently supported type of data is changegroup but this will
636 The only currently supported type of data is changegroup but this will
638 evolve in the future."""
637 evolve in the future."""
639 bundler = bundle2.bundle20(pushop.ui, bundle2.bundle2caps(pushop.remote))
638 bundler = bundle2.bundle20(pushop.ui, bundle2.bundle2caps(pushop.remote))
640 pushback = (pushop.trmanager
639 pushback = (pushop.trmanager
641 and pushop.ui.configbool('experimental', 'bundle2.pushback'))
640 and pushop.ui.configbool('experimental', 'bundle2.pushback'))
642
641
643 # create reply capability
642 # create reply capability
644 capsblob = bundle2.encodecaps(bundle2.getrepocaps(pushop.repo,
643 capsblob = bundle2.encodecaps(bundle2.getrepocaps(pushop.repo,
645 allowpushback=pushback))
644 allowpushback=pushback))
646 bundler.newpart('replycaps', data=capsblob)
645 bundler.newpart('replycaps', data=capsblob)
647 replyhandlers = []
646 replyhandlers = []
648 for partgenname in b2partsgenorder:
647 for partgenname in b2partsgenorder:
649 partgen = b2partsgenmapping[partgenname]
648 partgen = b2partsgenmapping[partgenname]
650 ret = partgen(pushop, bundler)
649 ret = partgen(pushop, bundler)
651 if callable(ret):
650 if callable(ret):
652 replyhandlers.append(ret)
651 replyhandlers.append(ret)
653 # do not push if nothing to push
652 # do not push if nothing to push
654 if bundler.nbparts <= 1:
653 if bundler.nbparts <= 1:
655 return
654 return
656 stream = util.chunkbuffer(bundler.getchunks())
655 stream = util.chunkbuffer(bundler.getchunks())
657 try:
656 try:
658 try:
657 try:
659 reply = pushop.remote.unbundle(stream, ['force'], 'push')
658 reply = pushop.remote.unbundle(stream, ['force'], 'push')
660 except error.BundleValueError as exc:
659 except error.BundleValueError as exc:
661 raise util.Abort('missing support for %s' % exc)
660 raise util.Abort('missing support for %s' % exc)
662 try:
661 try:
663 trgetter = None
662 trgetter = None
664 if pushback:
663 if pushback:
665 trgetter = pushop.trmanager.transaction
664 trgetter = pushop.trmanager.transaction
666 op = bundle2.processbundle(pushop.repo, reply, trgetter)
665 op = bundle2.processbundle(pushop.repo, reply, trgetter)
667 except error.BundleValueError as exc:
666 except error.BundleValueError as exc:
668 raise util.Abort('missing support for %s' % exc)
667 raise util.Abort('missing support for %s' % exc)
669 except error.PushkeyFailed as exc:
668 except error.PushkeyFailed as exc:
670 partid = int(exc.partid)
669 partid = int(exc.partid)
671 if partid not in pushop.pkfailcb:
670 if partid not in pushop.pkfailcb:
672 raise
671 raise
673 pushop.pkfailcb[partid](pushop, exc)
672 pushop.pkfailcb[partid](pushop, exc)
674 for rephand in replyhandlers:
673 for rephand in replyhandlers:
675 rephand(op)
674 rephand(op)
676
675
677 def _pushchangeset(pushop):
676 def _pushchangeset(pushop):
678 """Make the actual push of changeset bundle to remote repo"""
677 """Make the actual push of changeset bundle to remote repo"""
679 if 'changesets' in pushop.stepsdone:
678 if 'changesets' in pushop.stepsdone:
680 return
679 return
681 pushop.stepsdone.add('changesets')
680 pushop.stepsdone.add('changesets')
682 if not _pushcheckoutgoing(pushop):
681 if not _pushcheckoutgoing(pushop):
683 return
682 return
684 pushop.repo.prepushoutgoinghooks(pushop.repo,
683 pushop.repo.prepushoutgoinghooks(pushop.repo,
685 pushop.remote,
684 pushop.remote,
686 pushop.outgoing)
685 pushop.outgoing)
687 outgoing = pushop.outgoing
686 outgoing = pushop.outgoing
688 unbundle = pushop.remote.capable('unbundle')
687 unbundle = pushop.remote.capable('unbundle')
689 # TODO: get bundlecaps from remote
688 # TODO: get bundlecaps from remote
690 bundlecaps = None
689 bundlecaps = None
691 # create a changegroup from local
690 # create a changegroup from local
692 if pushop.revs is None and not (outgoing.excluded
691 if pushop.revs is None and not (outgoing.excluded
693 or pushop.repo.changelog.filteredrevs):
692 or pushop.repo.changelog.filteredrevs):
694 # push everything,
693 # push everything,
695 # use the fast path, no race possible on push
694 # use the fast path, no race possible on push
696 bundler = changegroup.cg1packer(pushop.repo, bundlecaps)
695 bundler = changegroup.cg1packer(pushop.repo, bundlecaps)
697 cg = changegroup.getsubset(pushop.repo,
696 cg = changegroup.getsubset(pushop.repo,
698 outgoing,
697 outgoing,
699 bundler,
698 bundler,
700 'push',
699 'push',
701 fastpath=True)
700 fastpath=True)
702 else:
701 else:
703 cg = changegroup.getlocalchangegroup(pushop.repo, 'push', outgoing,
702 cg = changegroup.getlocalchangegroup(pushop.repo, 'push', outgoing,
704 bundlecaps)
703 bundlecaps)
705
704
706 # apply changegroup to remote
705 # apply changegroup to remote
707 if unbundle:
706 if unbundle:
708 # local repo finds heads on server, finds out what
707 # local repo finds heads on server, finds out what
709 # revs it must push. once revs transferred, if server
708 # revs it must push. once revs transferred, if server
710 # finds it has different heads (someone else won
709 # finds it has different heads (someone else won
711 # commit/push race), server aborts.
710 # commit/push race), server aborts.
712 if pushop.force:
711 if pushop.force:
713 remoteheads = ['force']
712 remoteheads = ['force']
714 else:
713 else:
715 remoteheads = pushop.remoteheads
714 remoteheads = pushop.remoteheads
716 # ssh: return remote's addchangegroup()
715 # ssh: return remote's addchangegroup()
717 # http: return remote's addchangegroup() or 0 for error
716 # http: return remote's addchangegroup() or 0 for error
718 pushop.cgresult = pushop.remote.unbundle(cg, remoteheads,
717 pushop.cgresult = pushop.remote.unbundle(cg, remoteheads,
719 pushop.repo.url())
718 pushop.repo.url())
720 else:
719 else:
721 # we return an integer indicating remote head count
720 # we return an integer indicating remote head count
722 # change
721 # change
723 pushop.cgresult = pushop.remote.addchangegroup(cg, 'push',
722 pushop.cgresult = pushop.remote.addchangegroup(cg, 'push',
724 pushop.repo.url())
723 pushop.repo.url())
725
724
726 def _pushsyncphase(pushop):
725 def _pushsyncphase(pushop):
727 """synchronise phase information locally and remotely"""
726 """synchronise phase information locally and remotely"""
728 cheads = pushop.commonheads
727 cheads = pushop.commonheads
729 # even when we don't push, exchanging phase data is useful
728 # even when we don't push, exchanging phase data is useful
730 remotephases = pushop.remote.listkeys('phases')
729 remotephases = pushop.remote.listkeys('phases')
731 if (pushop.ui.configbool('ui', '_usedassubrepo', False)
730 if (pushop.ui.configbool('ui', '_usedassubrepo', False)
732 and remotephases # server supports phases
731 and remotephases # server supports phases
733 and pushop.cgresult is None # nothing was pushed
732 and pushop.cgresult is None # nothing was pushed
734 and remotephases.get('publishing', False)):
733 and remotephases.get('publishing', False)):
735 # When:
734 # When:
736 # - this is a subrepo push
735 # - this is a subrepo push
737 # - and remote support phase
736 # - and remote support phase
738 # - and no changeset was pushed
737 # - and no changeset was pushed
739 # - and remote is publishing
738 # - and remote is publishing
740 # We may be in issue 3871 case!
739 # We may be in issue 3871 case!
741 # We drop the possible phase synchronisation done by
740 # We drop the possible phase synchronisation done by
742 # courtesy to publish changesets possibly locally draft
741 # courtesy to publish changesets possibly locally draft
743 # on the remote.
742 # on the remote.
744 remotephases = {'publishing': 'True'}
743 remotephases = {'publishing': 'True'}
745 if not remotephases: # old server or public only reply from non-publishing
744 if not remotephases: # old server or public only reply from non-publishing
746 _localphasemove(pushop, cheads)
745 _localphasemove(pushop, cheads)
747 # don't push any phase data as there is nothing to push
746 # don't push any phase data as there is nothing to push
748 else:
747 else:
749 ana = phases.analyzeremotephases(pushop.repo, cheads,
748 ana = phases.analyzeremotephases(pushop.repo, cheads,
750 remotephases)
749 remotephases)
751 pheads, droots = ana
750 pheads, droots = ana
752 ### Apply remote phase on local
751 ### Apply remote phase on local
753 if remotephases.get('publishing', False):
752 if remotephases.get('publishing', False):
754 _localphasemove(pushop, cheads)
753 _localphasemove(pushop, cheads)
755 else: # publish = False
754 else: # publish = False
756 _localphasemove(pushop, pheads)
755 _localphasemove(pushop, pheads)
757 _localphasemove(pushop, cheads, phases.draft)
756 _localphasemove(pushop, cheads, phases.draft)
758 ### Apply local phase on remote
757 ### Apply local phase on remote
759
758
760 if pushop.cgresult:
759 if pushop.cgresult:
761 if 'phases' in pushop.stepsdone:
760 if 'phases' in pushop.stepsdone:
762 # phases already pushed though bundle2
761 # phases already pushed though bundle2
763 return
762 return
764 outdated = pushop.outdatedphases
763 outdated = pushop.outdatedphases
765 else:
764 else:
766 outdated = pushop.fallbackoutdatedphases
765 outdated = pushop.fallbackoutdatedphases
767
766
768 pushop.stepsdone.add('phases')
767 pushop.stepsdone.add('phases')
769
768
770 # filter heads already turned public by the push
769 # filter heads already turned public by the push
771 outdated = [c for c in outdated if c.node() not in pheads]
770 outdated = [c for c in outdated if c.node() not in pheads]
772 # fallback to independent pushkey command
771 # fallback to independent pushkey command
773 for newremotehead in outdated:
772 for newremotehead in outdated:
774 r = pushop.remote.pushkey('phases',
773 r = pushop.remote.pushkey('phases',
775 newremotehead.hex(),
774 newremotehead.hex(),
776 str(phases.draft),
775 str(phases.draft),
777 str(phases.public))
776 str(phases.public))
778 if not r:
777 if not r:
779 pushop.ui.warn(_('updating %s to public failed!\n')
778 pushop.ui.warn(_('updating %s to public failed!\n')
780 % newremotehead)
779 % newremotehead)
781
780
782 def _localphasemove(pushop, nodes, phase=phases.public):
781 def _localphasemove(pushop, nodes, phase=phases.public):
783 """move <nodes> to <phase> in the local source repo"""
782 """move <nodes> to <phase> in the local source repo"""
784 if pushop.trmanager:
783 if pushop.trmanager:
785 phases.advanceboundary(pushop.repo,
784 phases.advanceboundary(pushop.repo,
786 pushop.trmanager.transaction(),
785 pushop.trmanager.transaction(),
787 phase,
786 phase,
788 nodes)
787 nodes)
789 else:
788 else:
790 # repo is not locked, do not change any phases!
789 # repo is not locked, do not change any phases!
791 # Informs the user that phases should have been moved when
790 # Informs the user that phases should have been moved when
792 # applicable.
791 # applicable.
793 actualmoves = [n for n in nodes if phase < pushop.repo[n].phase()]
792 actualmoves = [n for n in nodes if phase < pushop.repo[n].phase()]
794 phasestr = phases.phasenames[phase]
793 phasestr = phases.phasenames[phase]
795 if actualmoves:
794 if actualmoves:
796 pushop.ui.status(_('cannot lock source repo, skipping '
795 pushop.ui.status(_('cannot lock source repo, skipping '
797 'local %s phase update\n') % phasestr)
796 'local %s phase update\n') % phasestr)
798
797
799 def _pushobsolete(pushop):
798 def _pushobsolete(pushop):
800 """utility function to push obsolete markers to a remote"""
799 """utility function to push obsolete markers to a remote"""
801 if 'obsmarkers' in pushop.stepsdone:
800 if 'obsmarkers' in pushop.stepsdone:
802 return
801 return
803 repo = pushop.repo
802 repo = pushop.repo
804 remote = pushop.remote
803 remote = pushop.remote
805 pushop.stepsdone.add('obsmarkers')
804 pushop.stepsdone.add('obsmarkers')
806 if pushop.outobsmarkers:
805 if pushop.outobsmarkers:
807 pushop.ui.debug('try to push obsolete markers to remote\n')
806 pushop.ui.debug('try to push obsolete markers to remote\n')
808 rslts = []
807 rslts = []
809 remotedata = obsolete._pushkeyescape(sorted(pushop.outobsmarkers))
808 remotedata = obsolete._pushkeyescape(sorted(pushop.outobsmarkers))
810 for key in sorted(remotedata, reverse=True):
809 for key in sorted(remotedata, reverse=True):
811 # reverse sort to ensure we end with dump0
810 # reverse sort to ensure we end with dump0
812 data = remotedata[key]
811 data = remotedata[key]
813 rslts.append(remote.pushkey('obsolete', key, '', data))
812 rslts.append(remote.pushkey('obsolete', key, '', data))
814 if [r for r in rslts if not r]:
813 if [r for r in rslts if not r]:
815 msg = _('failed to push some obsolete markers!\n')
814 msg = _('failed to push some obsolete markers!\n')
816 repo.ui.warn(msg)
815 repo.ui.warn(msg)
817
816
818 def _pushbookmark(pushop):
817 def _pushbookmark(pushop):
819 """Update bookmark position on remote"""
818 """Update bookmark position on remote"""
820 if pushop.cgresult == 0 or 'bookmarks' in pushop.stepsdone:
819 if pushop.cgresult == 0 or 'bookmarks' in pushop.stepsdone:
821 return
820 return
822 pushop.stepsdone.add('bookmarks')
821 pushop.stepsdone.add('bookmarks')
823 ui = pushop.ui
822 ui = pushop.ui
824 remote = pushop.remote
823 remote = pushop.remote
825
824
826 for b, old, new in pushop.outbookmarks:
825 for b, old, new in pushop.outbookmarks:
827 action = 'update'
826 action = 'update'
828 if not old:
827 if not old:
829 action = 'export'
828 action = 'export'
830 elif not new:
829 elif not new:
831 action = 'delete'
830 action = 'delete'
832 if remote.pushkey('bookmarks', b, old, new):
831 if remote.pushkey('bookmarks', b, old, new):
833 ui.status(bookmsgmap[action][0] % b)
832 ui.status(bookmsgmap[action][0] % b)
834 else:
833 else:
835 ui.warn(bookmsgmap[action][1] % b)
834 ui.warn(bookmsgmap[action][1] % b)
836 # discovery can have set the value form invalid entry
835 # discovery can have set the value form invalid entry
837 if pushop.bkresult is not None:
836 if pushop.bkresult is not None:
838 pushop.bkresult = 1
837 pushop.bkresult = 1
839
838
840 class pulloperation(object):
839 class pulloperation(object):
841 """A object that represent a single pull operation
840 """A object that represent a single pull operation
842
841
843 It purpose is to carry pull related state and very common operation.
842 It purpose is to carry pull related state and very common operation.
844
843
845 A new should be created at the beginning of each pull and discarded
844 A new should be created at the beginning of each pull and discarded
846 afterward.
845 afterward.
847 """
846 """
848
847
849 def __init__(self, repo, remote, heads=None, force=False, bookmarks=(),
848 def __init__(self, repo, remote, heads=None, force=False, bookmarks=(),
850 remotebookmarks=None):
849 remotebookmarks=None):
851 # repo we pull into
850 # repo we pull into
852 self.repo = repo
851 self.repo = repo
853 # repo we pull from
852 # repo we pull from
854 self.remote = remote
853 self.remote = remote
855 # revision we try to pull (None is "all")
854 # revision we try to pull (None is "all")
856 self.heads = heads
855 self.heads = heads
857 # bookmark pulled explicitly
856 # bookmark pulled explicitly
858 self.explicitbookmarks = bookmarks
857 self.explicitbookmarks = bookmarks
859 # do we force pull?
858 # do we force pull?
860 self.force = force
859 self.force = force
861 # transaction manager
860 # transaction manager
862 self.trmanager = None
861 self.trmanager = None
863 # set of common changeset between local and remote before pull
862 # set of common changeset between local and remote before pull
864 self.common = None
863 self.common = None
865 # set of pulled head
864 # set of pulled head
866 self.rheads = None
865 self.rheads = None
867 # list of missing changeset to fetch remotely
866 # list of missing changeset to fetch remotely
868 self.fetch = None
867 self.fetch = None
869 # remote bookmarks data
868 # remote bookmarks data
870 self.remotebookmarks = remotebookmarks
869 self.remotebookmarks = remotebookmarks
871 # result of changegroup pulling (used as return code by pull)
870 # result of changegroup pulling (used as return code by pull)
872 self.cgresult = None
871 self.cgresult = None
873 # list of step already done
872 # list of step already done
874 self.stepsdone = set()
873 self.stepsdone = set()
875
874
876 @util.propertycache
875 @util.propertycache
877 def pulledsubset(self):
876 def pulledsubset(self):
878 """heads of the set of changeset target by the pull"""
877 """heads of the set of changeset target by the pull"""
879 # compute target subset
878 # compute target subset
880 if self.heads is None:
879 if self.heads is None:
881 # We pulled every thing possible
880 # We pulled every thing possible
882 # sync on everything common
881 # sync on everything common
883 c = set(self.common)
882 c = set(self.common)
884 ret = list(self.common)
883 ret = list(self.common)
885 for n in self.rheads:
884 for n in self.rheads:
886 if n not in c:
885 if n not in c:
887 ret.append(n)
886 ret.append(n)
888 return ret
887 return ret
889 else:
888 else:
890 # We pulled a specific subset
889 # We pulled a specific subset
891 # sync on this subset
890 # sync on this subset
892 return self.heads
891 return self.heads
893
892
894 def gettransaction(self):
893 def gettransaction(self):
895 # deprecated; talk to trmanager directly
894 # deprecated; talk to trmanager directly
896 return self.trmanager.transaction()
895 return self.trmanager.transaction()
897
896
898 class transactionmanager(object):
897 class transactionmanager(object):
899 """An object to manage the life cycle of a transaction
898 """An object to manage the life cycle of a transaction
900
899
901 It creates the transaction on demand and calls the appropriate hooks when
900 It creates the transaction on demand and calls the appropriate hooks when
902 closing the transaction."""
901 closing the transaction."""
903 def __init__(self, repo, source, url):
902 def __init__(self, repo, source, url):
904 self.repo = repo
903 self.repo = repo
905 self.source = source
904 self.source = source
906 self.url = url
905 self.url = url
907 self._tr = None
906 self._tr = None
908
907
909 def transaction(self):
908 def transaction(self):
910 """Return an open transaction object, constructing if necessary"""
909 """Return an open transaction object, constructing if necessary"""
911 if not self._tr:
910 if not self._tr:
912 trname = '%s\n%s' % (self.source, util.hidepassword(self.url))
911 trname = '%s\n%s' % (self.source, util.hidepassword(self.url))
913 self._tr = self.repo.transaction(trname)
912 self._tr = self.repo.transaction(trname)
914 self._tr.hookargs['source'] = self.source
913 self._tr.hookargs['source'] = self.source
915 self._tr.hookargs['url'] = self.url
914 self._tr.hookargs['url'] = self.url
916 return self._tr
915 return self._tr
917
916
918 def close(self):
917 def close(self):
919 """close transaction if created"""
918 """close transaction if created"""
920 if self._tr is not None:
919 if self._tr is not None:
921 self._tr.close()
920 self._tr.close()
922
921
923 def release(self):
922 def release(self):
924 """release transaction if created"""
923 """release transaction if created"""
925 if self._tr is not None:
924 if self._tr is not None:
926 self._tr.release()
925 self._tr.release()
927
926
928 def pull(repo, remote, heads=None, force=False, bookmarks=(), opargs=None):
927 def pull(repo, remote, heads=None, force=False, bookmarks=(), opargs=None):
929 """Fetch repository data from a remote.
928 """Fetch repository data from a remote.
930
929
931 This is the main function used to retrieve data from a remote repository.
930 This is the main function used to retrieve data from a remote repository.
932
931
933 ``repo`` is the local repository to clone into.
932 ``repo`` is the local repository to clone into.
934 ``remote`` is a peer instance.
933 ``remote`` is a peer instance.
935 ``heads`` is an iterable of revisions we want to pull. ``None`` (the
934 ``heads`` is an iterable of revisions we want to pull. ``None`` (the
936 default) means to pull everything from the remote.
935 default) means to pull everything from the remote.
937 ``bookmarks`` is an iterable of bookmarks requesting to be pulled. By
936 ``bookmarks`` is an iterable of bookmarks requesting to be pulled. By
938 default, all remote bookmarks are pulled.
937 default, all remote bookmarks are pulled.
939 ``opargs`` are additional keyword arguments to pass to ``pulloperation``
938 ``opargs`` are additional keyword arguments to pass to ``pulloperation``
940 initialization.
939 initialization.
941
940
942 Returns the ``pulloperation`` created for this pull.
941 Returns the ``pulloperation`` created for this pull.
943 """
942 """
944 if opargs is None:
943 if opargs is None:
945 opargs = {}
944 opargs = {}
946 pullop = pulloperation(repo, remote, heads, force, bookmarks=bookmarks,
945 pullop = pulloperation(repo, remote, heads, force, bookmarks=bookmarks,
947 **opargs)
946 **opargs)
948 if pullop.remote.local():
947 if pullop.remote.local():
949 missing = set(pullop.remote.requirements) - pullop.repo.supported
948 missing = set(pullop.remote.requirements) - pullop.repo.supported
950 if missing:
949 if missing:
951 msg = _("required features are not"
950 msg = _("required features are not"
952 " supported in the destination:"
951 " supported in the destination:"
953 " %s") % (', '.join(sorted(missing)))
952 " %s") % (', '.join(sorted(missing)))
954 raise util.Abort(msg)
953 raise util.Abort(msg)
955
954
956 lock = pullop.repo.lock()
955 lock = pullop.repo.lock()
957 try:
956 try:
958 pullop.trmanager = transactionmanager(repo, 'pull', remote.url())
957 pullop.trmanager = transactionmanager(repo, 'pull', remote.url())
959 _pulldiscovery(pullop)
958 _pulldiscovery(pullop)
960 if _canusebundle2(pullop):
959 if _canusebundle2(pullop):
961 _pullbundle2(pullop)
960 _pullbundle2(pullop)
962 _pullchangeset(pullop)
961 _pullchangeset(pullop)
963 _pullphase(pullop)
962 _pullphase(pullop)
964 _pullbookmarks(pullop)
963 _pullbookmarks(pullop)
965 _pullobsolete(pullop)
964 _pullobsolete(pullop)
966 pullop.trmanager.close()
965 pullop.trmanager.close()
967 finally:
966 finally:
968 pullop.trmanager.release()
967 pullop.trmanager.release()
969 lock.release()
968 lock.release()
970
969
971 return pullop
970 return pullop
972
971
973 # list of steps to perform discovery before pull
972 # list of steps to perform discovery before pull
974 pulldiscoveryorder = []
973 pulldiscoveryorder = []
975
974
976 # Mapping between step name and function
975 # Mapping between step name and function
977 #
976 #
978 # This exists to help extensions wrap steps if necessary
977 # This exists to help extensions wrap steps if necessary
979 pulldiscoverymapping = {}
978 pulldiscoverymapping = {}
980
979
981 def pulldiscovery(stepname):
980 def pulldiscovery(stepname):
982 """decorator for function performing discovery before pull
981 """decorator for function performing discovery before pull
983
982
984 The function is added to the step -> function mapping and appended to the
983 The function is added to the step -> function mapping and appended to the
985 list of steps. Beware that decorated function will be added in order (this
984 list of steps. Beware that decorated function will be added in order (this
986 may matter).
985 may matter).
987
986
988 You can only use this decorator for a new step, if you want to wrap a step
987 You can only use this decorator for a new step, if you want to wrap a step
989 from an extension, change the pulldiscovery dictionary directly."""
988 from an extension, change the pulldiscovery dictionary directly."""
990 def dec(func):
989 def dec(func):
991 assert stepname not in pulldiscoverymapping
990 assert stepname not in pulldiscoverymapping
992 pulldiscoverymapping[stepname] = func
991 pulldiscoverymapping[stepname] = func
993 pulldiscoveryorder.append(stepname)
992 pulldiscoveryorder.append(stepname)
994 return func
993 return func
995 return dec
994 return dec
996
995
997 def _pulldiscovery(pullop):
996 def _pulldiscovery(pullop):
998 """Run all discovery steps"""
997 """Run all discovery steps"""
999 for stepname in pulldiscoveryorder:
998 for stepname in pulldiscoveryorder:
1000 step = pulldiscoverymapping[stepname]
999 step = pulldiscoverymapping[stepname]
1001 step(pullop)
1000 step(pullop)
1002
1001
1003 @pulldiscovery('b1:bookmarks')
1002 @pulldiscovery('b1:bookmarks')
1004 def _pullbookmarkbundle1(pullop):
1003 def _pullbookmarkbundle1(pullop):
1005 """fetch bookmark data in bundle1 case
1004 """fetch bookmark data in bundle1 case
1006
1005
1007 If not using bundle2, we have to fetch bookmarks before changeset
1006 If not using bundle2, we have to fetch bookmarks before changeset
1008 discovery to reduce the chance and impact of race conditions."""
1007 discovery to reduce the chance and impact of race conditions."""
1009 if pullop.remotebookmarks is not None:
1008 if pullop.remotebookmarks is not None:
1010 return
1009 return
1011 if (_canusebundle2(pullop)
1010 if (_canusebundle2(pullop)
1012 and 'listkeys' in bundle2.bundle2caps(pullop.remote)):
1011 and 'listkeys' in bundle2.bundle2caps(pullop.remote)):
1013 # all known bundle2 servers now support listkeys, but lets be nice with
1012 # all known bundle2 servers now support listkeys, but lets be nice with
1014 # new implementation.
1013 # new implementation.
1015 return
1014 return
1016 pullop.remotebookmarks = pullop.remote.listkeys('bookmarks')
1015 pullop.remotebookmarks = pullop.remote.listkeys('bookmarks')
1017
1016
1018
1017
1019 @pulldiscovery('changegroup')
1018 @pulldiscovery('changegroup')
1020 def _pulldiscoverychangegroup(pullop):
1019 def _pulldiscoverychangegroup(pullop):
1021 """discovery phase for the pull
1020 """discovery phase for the pull
1022
1021
1023 Current handle changeset discovery only, will change handle all discovery
1022 Current handle changeset discovery only, will change handle all discovery
1024 at some point."""
1023 at some point."""
1025 tmp = discovery.findcommonincoming(pullop.repo,
1024 tmp = discovery.findcommonincoming(pullop.repo,
1026 pullop.remote,
1025 pullop.remote,
1027 heads=pullop.heads,
1026 heads=pullop.heads,
1028 force=pullop.force)
1027 force=pullop.force)
1029 common, fetch, rheads = tmp
1028 common, fetch, rheads = tmp
1030 nm = pullop.repo.unfiltered().changelog.nodemap
1029 nm = pullop.repo.unfiltered().changelog.nodemap
1031 if fetch and rheads:
1030 if fetch and rheads:
1032 # If a remote heads in filtered locally, lets drop it from the unknown
1031 # If a remote heads in filtered locally, lets drop it from the unknown
1033 # remote heads and put in back in common.
1032 # remote heads and put in back in common.
1034 #
1033 #
1035 # This is a hackish solution to catch most of "common but locally
1034 # This is a hackish solution to catch most of "common but locally
1036 # hidden situation". We do not performs discovery on unfiltered
1035 # hidden situation". We do not performs discovery on unfiltered
1037 # repository because it end up doing a pathological amount of round
1036 # repository because it end up doing a pathological amount of round
1038 # trip for w huge amount of changeset we do not care about.
1037 # trip for w huge amount of changeset we do not care about.
1039 #
1038 #
1040 # If a set of such "common but filtered" changeset exist on the server
1039 # If a set of such "common but filtered" changeset exist on the server
1041 # but are not including a remote heads, we'll not be able to detect it,
1040 # but are not including a remote heads, we'll not be able to detect it,
1042 scommon = set(common)
1041 scommon = set(common)
1043 filteredrheads = []
1042 filteredrheads = []
1044 for n in rheads:
1043 for n in rheads:
1045 if n in nm:
1044 if n in nm:
1046 if n not in scommon:
1045 if n not in scommon:
1047 common.append(n)
1046 common.append(n)
1048 else:
1047 else:
1049 filteredrheads.append(n)
1048 filteredrheads.append(n)
1050 if not filteredrheads:
1049 if not filteredrheads:
1051 fetch = []
1050 fetch = []
1052 rheads = filteredrheads
1051 rheads = filteredrheads
1053 pullop.common = common
1052 pullop.common = common
1054 pullop.fetch = fetch
1053 pullop.fetch = fetch
1055 pullop.rheads = rheads
1054 pullop.rheads = rheads
1056
1055
1057 def _pullbundle2(pullop):
1056 def _pullbundle2(pullop):
1058 """pull data using bundle2
1057 """pull data using bundle2
1059
1058
1060 For now, the only supported data are changegroup."""
1059 For now, the only supported data are changegroup."""
1061 remotecaps = bundle2.bundle2caps(pullop.remote)
1060 remotecaps = bundle2.bundle2caps(pullop.remote)
1062 kwargs = {'bundlecaps': caps20to10(pullop.repo)}
1061 kwargs = {'bundlecaps': caps20to10(pullop.repo)}
1063 # pulling changegroup
1062 # pulling changegroup
1064 pullop.stepsdone.add('changegroup')
1063 pullop.stepsdone.add('changegroup')
1065
1064
1066 kwargs['common'] = pullop.common
1065 kwargs['common'] = pullop.common
1067 kwargs['heads'] = pullop.heads or pullop.rheads
1066 kwargs['heads'] = pullop.heads or pullop.rheads
1068 kwargs['cg'] = pullop.fetch
1067 kwargs['cg'] = pullop.fetch
1069 if 'listkeys' in remotecaps:
1068 if 'listkeys' in remotecaps:
1070 kwargs['listkeys'] = ['phase']
1069 kwargs['listkeys'] = ['phase']
1071 if pullop.remotebookmarks is None:
1070 if pullop.remotebookmarks is None:
1072 # make sure to always includes bookmark data when migrating
1071 # make sure to always includes bookmark data when migrating
1073 # `hg incoming --bundle` to using this function.
1072 # `hg incoming --bundle` to using this function.
1074 kwargs['listkeys'].append('bookmarks')
1073 kwargs['listkeys'].append('bookmarks')
1075 if not pullop.fetch:
1074 if not pullop.fetch:
1076 pullop.repo.ui.status(_("no changes found\n"))
1075 pullop.repo.ui.status(_("no changes found\n"))
1077 pullop.cgresult = 0
1076 pullop.cgresult = 0
1078 else:
1077 else:
1079 if pullop.heads is None and list(pullop.common) == [nullid]:
1078 if pullop.heads is None and list(pullop.common) == [nullid]:
1080 pullop.repo.ui.status(_("requesting all changes\n"))
1079 pullop.repo.ui.status(_("requesting all changes\n"))
1081 if obsolete.isenabled(pullop.repo, obsolete.exchangeopt):
1080 if obsolete.isenabled(pullop.repo, obsolete.exchangeopt):
1082 remoteversions = bundle2.obsmarkersversion(remotecaps)
1081 remoteversions = bundle2.obsmarkersversion(remotecaps)
1083 if obsolete.commonversion(remoteversions) is not None:
1082 if obsolete.commonversion(remoteversions) is not None:
1084 kwargs['obsmarkers'] = True
1083 kwargs['obsmarkers'] = True
1085 pullop.stepsdone.add('obsmarkers')
1084 pullop.stepsdone.add('obsmarkers')
1086 _pullbundle2extraprepare(pullop, kwargs)
1085 _pullbundle2extraprepare(pullop, kwargs)
1087 bundle = pullop.remote.getbundle('pull', **kwargs)
1086 bundle = pullop.remote.getbundle('pull', **kwargs)
1088 try:
1087 try:
1089 op = bundle2.processbundle(pullop.repo, bundle, pullop.gettransaction)
1088 op = bundle2.processbundle(pullop.repo, bundle, pullop.gettransaction)
1090 except error.BundleValueError as exc:
1089 except error.BundleValueError as exc:
1091 raise util.Abort('missing support for %s' % exc)
1090 raise util.Abort('missing support for %s' % exc)
1092
1091
1093 if pullop.fetch:
1092 if pullop.fetch:
1094 results = [cg['return'] for cg in op.records['changegroup']]
1093 results = [cg['return'] for cg in op.records['changegroup']]
1095 pullop.cgresult = changegroup.combineresults(results)
1094 pullop.cgresult = changegroup.combineresults(results)
1096
1095
1097 # processing phases change
1096 # processing phases change
1098 for namespace, value in op.records['listkeys']:
1097 for namespace, value in op.records['listkeys']:
1099 if namespace == 'phases':
1098 if namespace == 'phases':
1100 _pullapplyphases(pullop, value)
1099 _pullapplyphases(pullop, value)
1101
1100
1102 # processing bookmark update
1101 # processing bookmark update
1103 for namespace, value in op.records['listkeys']:
1102 for namespace, value in op.records['listkeys']:
1104 if namespace == 'bookmarks':
1103 if namespace == 'bookmarks':
1105 pullop.remotebookmarks = value
1104 pullop.remotebookmarks = value
1106
1105
1107 # bookmark data were either already there or pulled in the bundle
1106 # bookmark data were either already there or pulled in the bundle
1108 if pullop.remotebookmarks is not None:
1107 if pullop.remotebookmarks is not None:
1109 _pullbookmarks(pullop)
1108 _pullbookmarks(pullop)
1110
1109
1111 def _pullbundle2extraprepare(pullop, kwargs):
1110 def _pullbundle2extraprepare(pullop, kwargs):
1112 """hook function so that extensions can extend the getbundle call"""
1111 """hook function so that extensions can extend the getbundle call"""
1113 pass
1112 pass
1114
1113
1115 def _pullchangeset(pullop):
1114 def _pullchangeset(pullop):
1116 """pull changeset from unbundle into the local repo"""
1115 """pull changeset from unbundle into the local repo"""
1117 # We delay the open of the transaction as late as possible so we
1116 # We delay the open of the transaction as late as possible so we
1118 # don't open transaction for nothing or you break future useful
1117 # don't open transaction for nothing or you break future useful
1119 # rollback call
1118 # rollback call
1120 if 'changegroup' in pullop.stepsdone:
1119 if 'changegroup' in pullop.stepsdone:
1121 return
1120 return
1122 pullop.stepsdone.add('changegroup')
1121 pullop.stepsdone.add('changegroup')
1123 if not pullop.fetch:
1122 if not pullop.fetch:
1124 pullop.repo.ui.status(_("no changes found\n"))
1123 pullop.repo.ui.status(_("no changes found\n"))
1125 pullop.cgresult = 0
1124 pullop.cgresult = 0
1126 return
1125 return
1127 pullop.gettransaction()
1126 pullop.gettransaction()
1128 if pullop.heads is None and list(pullop.common) == [nullid]:
1127 if pullop.heads is None and list(pullop.common) == [nullid]:
1129 pullop.repo.ui.status(_("requesting all changes\n"))
1128 pullop.repo.ui.status(_("requesting all changes\n"))
1130 elif pullop.heads is None and pullop.remote.capable('changegroupsubset'):
1129 elif pullop.heads is None and pullop.remote.capable('changegroupsubset'):
1131 # issue1320, avoid a race if remote changed after discovery
1130 # issue1320, avoid a race if remote changed after discovery
1132 pullop.heads = pullop.rheads
1131 pullop.heads = pullop.rheads
1133
1132
1134 if pullop.remote.capable('getbundle'):
1133 if pullop.remote.capable('getbundle'):
1135 # TODO: get bundlecaps from remote
1134 # TODO: get bundlecaps from remote
1136 cg = pullop.remote.getbundle('pull', common=pullop.common,
1135 cg = pullop.remote.getbundle('pull', common=pullop.common,
1137 heads=pullop.heads or pullop.rheads)
1136 heads=pullop.heads or pullop.rheads)
1138 elif pullop.heads is None:
1137 elif pullop.heads is None:
1139 cg = pullop.remote.changegroup(pullop.fetch, 'pull')
1138 cg = pullop.remote.changegroup(pullop.fetch, 'pull')
1140 elif not pullop.remote.capable('changegroupsubset'):
1139 elif not pullop.remote.capable('changegroupsubset'):
1141 raise util.Abort(_("partial pull cannot be done because "
1140 raise util.Abort(_("partial pull cannot be done because "
1142 "other repository doesn't support "
1141 "other repository doesn't support "
1143 "changegroupsubset."))
1142 "changegroupsubset."))
1144 else:
1143 else:
1145 cg = pullop.remote.changegroupsubset(pullop.fetch, pullop.heads, 'pull')
1144 cg = pullop.remote.changegroupsubset(pullop.fetch, pullop.heads, 'pull')
1146 pullop.cgresult = changegroup.addchangegroup(pullop.repo, cg, 'pull',
1145 pullop.cgresult = changegroup.addchangegroup(pullop.repo, cg, 'pull',
1147 pullop.remote.url())
1146 pullop.remote.url())
1148
1147
1149 def _pullphase(pullop):
1148 def _pullphase(pullop):
1150 # Get remote phases data from remote
1149 # Get remote phases data from remote
1151 if 'phases' in pullop.stepsdone:
1150 if 'phases' in pullop.stepsdone:
1152 return
1151 return
1153 remotephases = pullop.remote.listkeys('phases')
1152 remotephases = pullop.remote.listkeys('phases')
1154 _pullapplyphases(pullop, remotephases)
1153 _pullapplyphases(pullop, remotephases)
1155
1154
1156 def _pullapplyphases(pullop, remotephases):
1155 def _pullapplyphases(pullop, remotephases):
1157 """apply phase movement from observed remote state"""
1156 """apply phase movement from observed remote state"""
1158 if 'phases' in pullop.stepsdone:
1157 if 'phases' in pullop.stepsdone:
1159 return
1158 return
1160 pullop.stepsdone.add('phases')
1159 pullop.stepsdone.add('phases')
1161 publishing = bool(remotephases.get('publishing', False))
1160 publishing = bool(remotephases.get('publishing', False))
1162 if remotephases and not publishing:
1161 if remotephases and not publishing:
1163 # remote is new and unpublishing
1162 # remote is new and unpublishing
1164 pheads, _dr = phases.analyzeremotephases(pullop.repo,
1163 pheads, _dr = phases.analyzeremotephases(pullop.repo,
1165 pullop.pulledsubset,
1164 pullop.pulledsubset,
1166 remotephases)
1165 remotephases)
1167 dheads = pullop.pulledsubset
1166 dheads = pullop.pulledsubset
1168 else:
1167 else:
1169 # Remote is old or publishing all common changesets
1168 # Remote is old or publishing all common changesets
1170 # should be seen as public
1169 # should be seen as public
1171 pheads = pullop.pulledsubset
1170 pheads = pullop.pulledsubset
1172 dheads = []
1171 dheads = []
1173 unfi = pullop.repo.unfiltered()
1172 unfi = pullop.repo.unfiltered()
1174 phase = unfi._phasecache.phase
1173 phase = unfi._phasecache.phase
1175 rev = unfi.changelog.nodemap.get
1174 rev = unfi.changelog.nodemap.get
1176 public = phases.public
1175 public = phases.public
1177 draft = phases.draft
1176 draft = phases.draft
1178
1177
1179 # exclude changesets already public locally and update the others
1178 # exclude changesets already public locally and update the others
1180 pheads = [pn for pn in pheads if phase(unfi, rev(pn)) > public]
1179 pheads = [pn for pn in pheads if phase(unfi, rev(pn)) > public]
1181 if pheads:
1180 if pheads:
1182 tr = pullop.gettransaction()
1181 tr = pullop.gettransaction()
1183 phases.advanceboundary(pullop.repo, tr, public, pheads)
1182 phases.advanceboundary(pullop.repo, tr, public, pheads)
1184
1183
1185 # exclude changesets already draft locally and update the others
1184 # exclude changesets already draft locally and update the others
1186 dheads = [pn for pn in dheads if phase(unfi, rev(pn)) > draft]
1185 dheads = [pn for pn in dheads if phase(unfi, rev(pn)) > draft]
1187 if dheads:
1186 if dheads:
1188 tr = pullop.gettransaction()
1187 tr = pullop.gettransaction()
1189 phases.advanceboundary(pullop.repo, tr, draft, dheads)
1188 phases.advanceboundary(pullop.repo, tr, draft, dheads)
1190
1189
1191 def _pullbookmarks(pullop):
1190 def _pullbookmarks(pullop):
1192 """process the remote bookmark information to update the local one"""
1191 """process the remote bookmark information to update the local one"""
1193 if 'bookmarks' in pullop.stepsdone:
1192 if 'bookmarks' in pullop.stepsdone:
1194 return
1193 return
1195 pullop.stepsdone.add('bookmarks')
1194 pullop.stepsdone.add('bookmarks')
1196 repo = pullop.repo
1195 repo = pullop.repo
1197 remotebookmarks = pullop.remotebookmarks
1196 remotebookmarks = pullop.remotebookmarks
1198 bookmod.updatefromremote(repo.ui, repo, remotebookmarks,
1197 bookmod.updatefromremote(repo.ui, repo, remotebookmarks,
1199 pullop.remote.url(),
1198 pullop.remote.url(),
1200 pullop.gettransaction,
1199 pullop.gettransaction,
1201 explicit=pullop.explicitbookmarks)
1200 explicit=pullop.explicitbookmarks)
1202
1201
1203 def _pullobsolete(pullop):
1202 def _pullobsolete(pullop):
1204 """utility function to pull obsolete markers from a remote
1203 """utility function to pull obsolete markers from a remote
1205
1204
1206 The `gettransaction` is function that return the pull transaction, creating
1205 The `gettransaction` is function that return the pull transaction, creating
1207 one if necessary. We return the transaction to inform the calling code that
1206 one if necessary. We return the transaction to inform the calling code that
1208 a new transaction have been created (when applicable).
1207 a new transaction have been created (when applicable).
1209
1208
1210 Exists mostly to allow overriding for experimentation purpose"""
1209 Exists mostly to allow overriding for experimentation purpose"""
1211 if 'obsmarkers' in pullop.stepsdone:
1210 if 'obsmarkers' in pullop.stepsdone:
1212 return
1211 return
1213 pullop.stepsdone.add('obsmarkers')
1212 pullop.stepsdone.add('obsmarkers')
1214 tr = None
1213 tr = None
1215 if obsolete.isenabled(pullop.repo, obsolete.exchangeopt):
1214 if obsolete.isenabled(pullop.repo, obsolete.exchangeopt):
1216 pullop.repo.ui.debug('fetching remote obsolete markers\n')
1215 pullop.repo.ui.debug('fetching remote obsolete markers\n')
1217 remoteobs = pullop.remote.listkeys('obsolete')
1216 remoteobs = pullop.remote.listkeys('obsolete')
1218 if 'dump0' in remoteobs:
1217 if 'dump0' in remoteobs:
1219 tr = pullop.gettransaction()
1218 tr = pullop.gettransaction()
1220 for key in sorted(remoteobs, reverse=True):
1219 for key in sorted(remoteobs, reverse=True):
1221 if key.startswith('dump'):
1220 if key.startswith('dump'):
1222 data = base85.b85decode(remoteobs[key])
1221 data = base85.b85decode(remoteobs[key])
1223 pullop.repo.obsstore.mergemarkers(tr, data)
1222 pullop.repo.obsstore.mergemarkers(tr, data)
1224 pullop.repo.invalidatevolatilesets()
1223 pullop.repo.invalidatevolatilesets()
1225 return tr
1224 return tr
1226
1225
1227 def caps20to10(repo):
1226 def caps20to10(repo):
1228 """return a set with appropriate options to use bundle20 during getbundle"""
1227 """return a set with appropriate options to use bundle20 during getbundle"""
1229 caps = set(['HG20'])
1228 caps = set(['HG20'])
1230 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
1229 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
1231 caps.add('bundle2=' + urllib.quote(capsblob))
1230 caps.add('bundle2=' + urllib.quote(capsblob))
1232 return caps
1231 return caps
1233
1232
1234 # List of names of steps to perform for a bundle2 for getbundle, order matters.
1233 # List of names of steps to perform for a bundle2 for getbundle, order matters.
1235 getbundle2partsorder = []
1234 getbundle2partsorder = []
1236
1235
1237 # Mapping between step name and function
1236 # Mapping between step name and function
1238 #
1237 #
1239 # This exists to help extensions wrap steps if necessary
1238 # This exists to help extensions wrap steps if necessary
1240 getbundle2partsmapping = {}
1239 getbundle2partsmapping = {}
1241
1240
1242 def getbundle2partsgenerator(stepname, idx=None):
1241 def getbundle2partsgenerator(stepname, idx=None):
1243 """decorator for function generating bundle2 part for getbundle
1242 """decorator for function generating bundle2 part for getbundle
1244
1243
1245 The function is added to the step -> function mapping and appended to the
1244 The function is added to the step -> function mapping and appended to the
1246 list of steps. Beware that decorated functions will be added in order
1245 list of steps. Beware that decorated functions will be added in order
1247 (this may matter).
1246 (this may matter).
1248
1247
1249 You can only use this decorator for new steps, if you want to wrap a step
1248 You can only use this decorator for new steps, if you want to wrap a step
1250 from an extension, attack the getbundle2partsmapping dictionary directly."""
1249 from an extension, attack the getbundle2partsmapping dictionary directly."""
1251 def dec(func):
1250 def dec(func):
1252 assert stepname not in getbundle2partsmapping
1251 assert stepname not in getbundle2partsmapping
1253 getbundle2partsmapping[stepname] = func
1252 getbundle2partsmapping[stepname] = func
1254 if idx is None:
1253 if idx is None:
1255 getbundle2partsorder.append(stepname)
1254 getbundle2partsorder.append(stepname)
1256 else:
1255 else:
1257 getbundle2partsorder.insert(idx, stepname)
1256 getbundle2partsorder.insert(idx, stepname)
1258 return func
1257 return func
1259 return dec
1258 return dec
1260
1259
1261 def getbundle(repo, source, heads=None, common=None, bundlecaps=None,
1260 def getbundle(repo, source, heads=None, common=None, bundlecaps=None,
1262 **kwargs):
1261 **kwargs):
1263 """return a full bundle (with potentially multiple kind of parts)
1262 """return a full bundle (with potentially multiple kind of parts)
1264
1263
1265 Could be a bundle HG10 or a bundle HG20 depending on bundlecaps
1264 Could be a bundle HG10 or a bundle HG20 depending on bundlecaps
1266 passed. For now, the bundle can contain only changegroup, but this will
1265 passed. For now, the bundle can contain only changegroup, but this will
1267 changes when more part type will be available for bundle2.
1266 changes when more part type will be available for bundle2.
1268
1267
1269 This is different from changegroup.getchangegroup that only returns an HG10
1268 This is different from changegroup.getchangegroup that only returns an HG10
1270 changegroup bundle. They may eventually get reunited in the future when we
1269 changegroup bundle. They may eventually get reunited in the future when we
1271 have a clearer idea of the API we what to query different data.
1270 have a clearer idea of the API we what to query different data.
1272
1271
1273 The implementation is at a very early stage and will get massive rework
1272 The implementation is at a very early stage and will get massive rework
1274 when the API of bundle is refined.
1273 when the API of bundle is refined.
1275 """
1274 """
1276 # bundle10 case
1275 # bundle10 case
1277 usebundle2 = False
1276 usebundle2 = False
1278 if bundlecaps is not None:
1277 if bundlecaps is not None:
1279 usebundle2 = any((cap.startswith('HG2') for cap in bundlecaps))
1278 usebundle2 = any((cap.startswith('HG2') for cap in bundlecaps))
1280 if not usebundle2:
1279 if not usebundle2:
1281 if bundlecaps and not kwargs.get('cg', True):
1280 if bundlecaps and not kwargs.get('cg', True):
1282 raise ValueError(_('request for bundle10 must include changegroup'))
1281 raise ValueError(_('request for bundle10 must include changegroup'))
1283
1282
1284 if kwargs:
1283 if kwargs:
1285 raise ValueError(_('unsupported getbundle arguments: %s')
1284 raise ValueError(_('unsupported getbundle arguments: %s')
1286 % ', '.join(sorted(kwargs.keys())))
1285 % ', '.join(sorted(kwargs.keys())))
1287 return changegroup.getchangegroup(repo, source, heads=heads,
1286 return changegroup.getchangegroup(repo, source, heads=heads,
1288 common=common, bundlecaps=bundlecaps)
1287 common=common, bundlecaps=bundlecaps)
1289
1288
1290 # bundle20 case
1289 # bundle20 case
1291 b2caps = {}
1290 b2caps = {}
1292 for bcaps in bundlecaps:
1291 for bcaps in bundlecaps:
1293 if bcaps.startswith('bundle2='):
1292 if bcaps.startswith('bundle2='):
1294 blob = urllib.unquote(bcaps[len('bundle2='):])
1293 blob = urllib.unquote(bcaps[len('bundle2='):])
1295 b2caps.update(bundle2.decodecaps(blob))
1294 b2caps.update(bundle2.decodecaps(blob))
1296 bundler = bundle2.bundle20(repo.ui, b2caps)
1295 bundler = bundle2.bundle20(repo.ui, b2caps)
1297
1296
1298 kwargs['heads'] = heads
1297 kwargs['heads'] = heads
1299 kwargs['common'] = common
1298 kwargs['common'] = common
1300
1299
1301 for name in getbundle2partsorder:
1300 for name in getbundle2partsorder:
1302 func = getbundle2partsmapping[name]
1301 func = getbundle2partsmapping[name]
1303 func(bundler, repo, source, bundlecaps=bundlecaps, b2caps=b2caps,
1302 func(bundler, repo, source, bundlecaps=bundlecaps, b2caps=b2caps,
1304 **kwargs)
1303 **kwargs)
1305
1304
1306 return util.chunkbuffer(bundler.getchunks())
1305 return util.chunkbuffer(bundler.getchunks())
1307
1306
1308 @getbundle2partsgenerator('changegroup')
1307 @getbundle2partsgenerator('changegroup')
1309 def _getbundlechangegrouppart(bundler, repo, source, bundlecaps=None,
1308 def _getbundlechangegrouppart(bundler, repo, source, bundlecaps=None,
1310 b2caps=None, heads=None, common=None, **kwargs):
1309 b2caps=None, heads=None, common=None, **kwargs):
1311 """add a changegroup part to the requested bundle"""
1310 """add a changegroup part to the requested bundle"""
1312 cg = None
1311 cg = None
1313 if kwargs.get('cg', True):
1312 if kwargs.get('cg', True):
1314 # build changegroup bundle here.
1313 # build changegroup bundle here.
1315 version = None
1314 version = None
1316 cgversions = b2caps.get('changegroup')
1315 cgversions = b2caps.get('changegroup')
1317 getcgkwargs = {}
1316 getcgkwargs = {}
1318 if cgversions: # 3.1 and 3.2 ship with an empty value
1317 if cgversions: # 3.1 and 3.2 ship with an empty value
1319 cgversions = [v for v in cgversions if v in changegroup.packermap]
1318 cgversions = [v for v in cgversions if v in changegroup.packermap]
1320 if not cgversions:
1319 if not cgversions:
1321 raise ValueError(_('no common changegroup version'))
1320 raise ValueError(_('no common changegroup version'))
1322 version = getcgkwargs['version'] = max(cgversions)
1321 version = getcgkwargs['version'] = max(cgversions)
1323 outgoing = changegroup.computeoutgoing(repo, heads, common)
1322 outgoing = changegroup.computeoutgoing(repo, heads, common)
1324 cg = changegroup.getlocalchangegroupraw(repo, source, outgoing,
1323 cg = changegroup.getlocalchangegroupraw(repo, source, outgoing,
1325 bundlecaps=bundlecaps,
1324 bundlecaps=bundlecaps,
1326 **getcgkwargs)
1325 **getcgkwargs)
1327
1326
1328 if cg:
1327 if cg:
1329 part = bundler.newpart('changegroup', data=cg)
1328 part = bundler.newpart('changegroup', data=cg)
1330 if version is not None:
1329 if version is not None:
1331 part.addparam('version', version)
1330 part.addparam('version', version)
1332 part.addparam('nbchanges', str(len(outgoing.missing)), mandatory=False)
1331 part.addparam('nbchanges', str(len(outgoing.missing)), mandatory=False)
1333
1332
1334 @getbundle2partsgenerator('listkeys')
1333 @getbundle2partsgenerator('listkeys')
1335 def _getbundlelistkeysparts(bundler, repo, source, bundlecaps=None,
1334 def _getbundlelistkeysparts(bundler, repo, source, bundlecaps=None,
1336 b2caps=None, **kwargs):
1335 b2caps=None, **kwargs):
1337 """add parts containing listkeys namespaces to the requested bundle"""
1336 """add parts containing listkeys namespaces to the requested bundle"""
1338 listkeys = kwargs.get('listkeys', ())
1337 listkeys = kwargs.get('listkeys', ())
1339 for namespace in listkeys:
1338 for namespace in listkeys:
1340 part = bundler.newpart('listkeys')
1339 part = bundler.newpart('listkeys')
1341 part.addparam('namespace', namespace)
1340 part.addparam('namespace', namespace)
1342 keys = repo.listkeys(namespace).items()
1341 keys = repo.listkeys(namespace).items()
1343 part.data = pushkey.encodekeys(keys)
1342 part.data = pushkey.encodekeys(keys)
1344
1343
1345 @getbundle2partsgenerator('obsmarkers')
1344 @getbundle2partsgenerator('obsmarkers')
1346 def _getbundleobsmarkerpart(bundler, repo, source, bundlecaps=None,
1345 def _getbundleobsmarkerpart(bundler, repo, source, bundlecaps=None,
1347 b2caps=None, heads=None, **kwargs):
1346 b2caps=None, heads=None, **kwargs):
1348 """add an obsolescence markers part to the requested bundle"""
1347 """add an obsolescence markers part to the requested bundle"""
1349 if kwargs.get('obsmarkers', False):
1348 if kwargs.get('obsmarkers', False):
1350 if heads is None:
1349 if heads is None:
1351 heads = repo.heads()
1350 heads = repo.heads()
1352 subset = [c.node() for c in repo.set('::%ln', heads)]
1351 subset = [c.node() for c in repo.set('::%ln', heads)]
1353 markers = repo.obsstore.relevantmarkers(subset)
1352 markers = repo.obsstore.relevantmarkers(subset)
1354 markers = sorted(markers)
1353 markers = sorted(markers)
1355 buildobsmarkerspart(bundler, markers)
1354 buildobsmarkerspart(bundler, markers)
1356
1355
1357 @getbundle2partsgenerator('hgtagsfnodes')
1356 @getbundle2partsgenerator('hgtagsfnodes')
1358 def _getbundletagsfnodes(bundler, repo, source, bundlecaps=None,
1357 def _getbundletagsfnodes(bundler, repo, source, bundlecaps=None,
1359 b2caps=None, heads=None, common=None,
1358 b2caps=None, heads=None, common=None,
1360 **kwargs):
1359 **kwargs):
1361 """Transfer the .hgtags filenodes mapping.
1360 """Transfer the .hgtags filenodes mapping.
1362
1361
1363 Only values for heads in this bundle will be transferred.
1362 Only values for heads in this bundle will be transferred.
1364
1363
1365 The part data consists of pairs of 20 byte changeset node and .hgtags
1364 The part data consists of pairs of 20 byte changeset node and .hgtags
1366 filenodes raw values.
1365 filenodes raw values.
1367 """
1366 """
1368 # Don't send unless:
1367 # Don't send unless:
1369 # - changeset are being exchanged,
1368 # - changeset are being exchanged,
1370 # - the client supports it.
1369 # - the client supports it.
1371 if not (kwargs.get('cg', True) and 'hgtagsfnodes' in b2caps):
1370 if not (kwargs.get('cg', True) and 'hgtagsfnodes' in b2caps):
1372 return
1371 return
1373
1372
1374 outgoing = changegroup.computeoutgoing(repo, heads, common)
1373 outgoing = changegroup.computeoutgoing(repo, heads, common)
1375
1374
1376 if not outgoing.missingheads:
1375 if not outgoing.missingheads:
1377 return
1376 return
1378
1377
1379 cache = tags.hgtagsfnodescache(repo.unfiltered())
1378 cache = tags.hgtagsfnodescache(repo.unfiltered())
1380 chunks = []
1379 chunks = []
1381
1380
1382 # .hgtags fnodes are only relevant for head changesets. While we could
1381 # .hgtags fnodes are only relevant for head changesets. While we could
1383 # transfer values for all known nodes, there will likely be little to
1382 # transfer values for all known nodes, there will likely be little to
1384 # no benefit.
1383 # no benefit.
1385 #
1384 #
1386 # We don't bother using a generator to produce output data because
1385 # We don't bother using a generator to produce output data because
1387 # a) we only have 40 bytes per head and even esoteric numbers of heads
1386 # a) we only have 40 bytes per head and even esoteric numbers of heads
1388 # consume little memory (1M heads is 40MB) b) we don't want to send the
1387 # consume little memory (1M heads is 40MB) b) we don't want to send the
1389 # part if we don't have entries and knowing if we have entries requires
1388 # part if we don't have entries and knowing if we have entries requires
1390 # cache lookups.
1389 # cache lookups.
1391 for node in outgoing.missingheads:
1390 for node in outgoing.missingheads:
1392 # Don't compute missing, as this may slow down serving.
1391 # Don't compute missing, as this may slow down serving.
1393 fnode = cache.getfnode(node, computemissing=False)
1392 fnode = cache.getfnode(node, computemissing=False)
1394 if fnode is not None:
1393 if fnode is not None:
1395 chunks.extend([node, fnode])
1394 chunks.extend([node, fnode])
1396
1395
1397 if chunks:
1396 if chunks:
1398 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1397 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1399
1398
1400 def check_heads(repo, their_heads, context):
1399 def check_heads(repo, their_heads, context):
1401 """check if the heads of a repo have been modified
1400 """check if the heads of a repo have been modified
1402
1401
1403 Used by peer for unbundling.
1402 Used by peer for unbundling.
1404 """
1403 """
1405 heads = repo.heads()
1404 heads = repo.heads()
1406 heads_hash = util.sha1(''.join(sorted(heads))).digest()
1405 heads_hash = util.sha1(''.join(sorted(heads))).digest()
1407 if not (their_heads == ['force'] or their_heads == heads or
1406 if not (their_heads == ['force'] or their_heads == heads or
1408 their_heads == ['hashed', heads_hash]):
1407 their_heads == ['hashed', heads_hash]):
1409 # someone else committed/pushed/unbundled while we
1408 # someone else committed/pushed/unbundled while we
1410 # were transferring data
1409 # were transferring data
1411 raise error.PushRaced('repository changed while %s - '
1410 raise error.PushRaced('repository changed while %s - '
1412 'please try again' % context)
1411 'please try again' % context)
1413
1412
1414 def unbundle(repo, cg, heads, source, url):
1413 def unbundle(repo, cg, heads, source, url):
1415 """Apply a bundle to a repo.
1414 """Apply a bundle to a repo.
1416
1415
1417 this function makes sure the repo is locked during the application and have
1416 this function makes sure the repo is locked during the application and have
1418 mechanism to check that no push race occurred between the creation of the
1417 mechanism to check that no push race occurred between the creation of the
1419 bundle and its application.
1418 bundle and its application.
1420
1419
1421 If the push was raced as PushRaced exception is raised."""
1420 If the push was raced as PushRaced exception is raised."""
1422 r = 0
1421 r = 0
1423 # need a transaction when processing a bundle2 stream
1422 # need a transaction when processing a bundle2 stream
1424 wlock = lock = tr = None
1423 wlock = lock = tr = None
1425 recordout = None
1424 recordout = None
1426 # quick fix for output mismatch with bundle2 in 3.4
1425 # quick fix for output mismatch with bundle2 in 3.4
1427 captureoutput = repo.ui.configbool('experimental', 'bundle2-output-capture',
1426 captureoutput = repo.ui.configbool('experimental', 'bundle2-output-capture',
1428 False)
1427 False)
1429 if url.startswith('remote:http:') or url.startswith('remote:https:'):
1428 if url.startswith('remote:http:') or url.startswith('remote:https:'):
1430 captureoutput = True
1429 captureoutput = True
1431 try:
1430 try:
1432 check_heads(repo, heads, 'uploading changes')
1431 check_heads(repo, heads, 'uploading changes')
1433 # push can proceed
1432 # push can proceed
1434 if util.safehasattr(cg, 'params'):
1433 if util.safehasattr(cg, 'params'):
1435 r = None
1434 r = None
1436 try:
1435 try:
1437 wlock = repo.wlock()
1436 wlock = repo.wlock()
1438 lock = repo.lock()
1437 lock = repo.lock()
1439 tr = repo.transaction(source)
1438 tr = repo.transaction(source)
1440 tr.hookargs['source'] = source
1439 tr.hookargs['source'] = source
1441 tr.hookargs['url'] = url
1440 tr.hookargs['url'] = url
1442 tr.hookargs['bundle2'] = '1'
1441 tr.hookargs['bundle2'] = '1'
1443 op = bundle2.bundleoperation(repo, lambda: tr,
1442 op = bundle2.bundleoperation(repo, lambda: tr,
1444 captureoutput=captureoutput)
1443 captureoutput=captureoutput)
1445 try:
1444 try:
1446 op = bundle2.processbundle(repo, cg, op=op)
1445 op = bundle2.processbundle(repo, cg, op=op)
1447 finally:
1446 finally:
1448 r = op.reply
1447 r = op.reply
1449 if captureoutput and r is not None:
1448 if captureoutput and r is not None:
1450 repo.ui.pushbuffer(error=True, subproc=True)
1449 repo.ui.pushbuffer(error=True, subproc=True)
1451 def recordout(output):
1450 def recordout(output):
1452 r.newpart('output', data=output, mandatory=False)
1451 r.newpart('output', data=output, mandatory=False)
1453 tr.close()
1452 tr.close()
1454 except BaseException as exc:
1453 except BaseException as exc:
1455 exc.duringunbundle2 = True
1454 exc.duringunbundle2 = True
1456 if captureoutput and r is not None:
1455 if captureoutput and r is not None:
1457 parts = exc._bundle2salvagedoutput = r.salvageoutput()
1456 parts = exc._bundle2salvagedoutput = r.salvageoutput()
1458 def recordout(output):
1457 def recordout(output):
1459 part = bundle2.bundlepart('output', data=output,
1458 part = bundle2.bundlepart('output', data=output,
1460 mandatory=False)
1459 mandatory=False)
1461 parts.append(part)
1460 parts.append(part)
1462 raise
1461 raise
1463 else:
1462 else:
1464 lock = repo.lock()
1463 lock = repo.lock()
1465 r = changegroup.addchangegroup(repo, cg, source, url)
1464 r = changegroup.addchangegroup(repo, cg, source, url)
1466 finally:
1465 finally:
1467 lockmod.release(tr, lock, wlock)
1466 lockmod.release(tr, lock, wlock)
1468 if recordout is not None:
1467 if recordout is not None:
1469 recordout(repo.ui.popbuffer())
1468 recordout(repo.ui.popbuffer())
1470 return r
1469 return r
1471
1472 # This is it's own function so extensions can override it.
1473 def _walkstreamfiles(repo):
1474 return repo.store.walk()
1475
1476 def generatestreamclone(repo):
1477 """Emit content for a streaming clone.
1478
1479 This is a generator of raw chunks that constitute a streaming clone.
1480
1481 The stream begins with a line of 2 space-delimited integers containing the
1482 number of entries and total bytes size.
1483
1484 Next, are N entries for each file being transferred. Each file entry starts
1485 as a line with the file name and integer size delimited by a null byte.
1486 The raw file data follows. Following the raw file data is the next file
1487 entry, or EOF.
1488
1489 When used on the wire protocol, an additional line indicating protocol
1490 success will be prepended to the stream. This function is not responsible
1491 for adding it.
1492
1493 This function will obtain a repository lock to ensure a consistent view of
1494 the store is captured. It therefore may raise LockError.
1495 """
1496 entries = []
1497 total_bytes = 0
1498 # Get consistent snapshot of repo, lock during scan.
1499 lock = repo.lock()
1500 try:
1501 repo.ui.debug('scanning\n')
1502 for name, ename, size in _walkstreamfiles(repo):
1503 if size:
1504 entries.append((name, size))
1505 total_bytes += size
1506 finally:
1507 lock.release()
1508
1509 repo.ui.debug('%d files, %d bytes to transfer\n' %
1510 (len(entries), total_bytes))
1511 yield '%d %d\n' % (len(entries), total_bytes)
1512
1513 svfs = repo.svfs
1514 oldaudit = svfs.mustaudit
1515 debugflag = repo.ui.debugflag
1516 svfs.mustaudit = False
1517
1518 try:
1519 for name, size in entries:
1520 if debugflag:
1521 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
1522 # partially encode name over the wire for backwards compat
1523 yield '%s\0%d\n' % (store.encodedir(name), size)
1524 if size <= 65536:
1525 fp = svfs(name)
1526 try:
1527 data = fp.read(size)
1528 finally:
1529 fp.close()
1530 yield data
1531 else:
1532 for chunk in util.filechunkiter(svfs(name), limit=size):
1533 yield chunk
1534 finally:
1535 svfs.mustaudit = oldaudit
1536
1537 def consumestreamclone(repo, fp):
1538 """Apply the contents from a streaming clone file.
1539
1540 This takes the output from "streamout" and applies it to the specified
1541 repository.
1542
1543 Like "streamout," the status line added by the wire protocol is not handled
1544 by this function.
1545 """
1546 lock = repo.lock()
1547 try:
1548 repo.ui.status(_('streaming all changes\n'))
1549 l = fp.readline()
1550 try:
1551 total_files, total_bytes = map(int, l.split(' ', 1))
1552 except (ValueError, TypeError):
1553 raise error.ResponseError(
1554 _('unexpected response from remote server:'), l)
1555 repo.ui.status(_('%d files to transfer, %s of data\n') %
1556 (total_files, util.bytecount(total_bytes)))
1557 handled_bytes = 0
1558 repo.ui.progress(_('clone'), 0, total=total_bytes)
1559 start = time.time()
1560
1561 tr = repo.transaction(_('clone'))
1562 try:
1563 for i in xrange(total_files):
1564 # XXX doesn't support '\n' or '\r' in filenames
1565 l = fp.readline()
1566 try:
1567 name, size = l.split('\0', 1)
1568 size = int(size)
1569 except (ValueError, TypeError):
1570 raise error.ResponseError(
1571 _('unexpected response from remote server:'), l)
1572 if repo.ui.debugflag:
1573 repo.ui.debug('adding %s (%s)\n' %
1574 (name, util.bytecount(size)))
1575 # for backwards compat, name was partially encoded
1576 ofp = repo.svfs(store.decodedir(name), 'w')
1577 for chunk in util.filechunkiter(fp, limit=size):
1578 handled_bytes += len(chunk)
1579 repo.ui.progress(_('clone'), handled_bytes,
1580 total=total_bytes)
1581 ofp.write(chunk)
1582 ofp.close()
1583 tr.close()
1584 finally:
1585 tr.release()
1586
1587 # Writing straight to files circumvented the inmemory caches
1588 repo.invalidate()
1589
1590 elapsed = time.time() - start
1591 if elapsed <= 0:
1592 elapsed = 0.001
1593 repo.ui.progress(_('clone'), None)
1594 repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
1595 (util.bytecount(total_bytes), elapsed,
1596 util.bytecount(total_bytes / elapsed)))
1597 finally:
1598 lock.release()
@@ -1,91 +1,221 b''
1 # streamclone.py - producing and consuming streaming repository data
1 # streamclone.py - producing and consuming streaming repository data
2 #
2 #
3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import time
11
10 from .i18n import _
12 from .i18n import _
11 from . import (
13 from . import (
12 branchmap,
14 branchmap,
13 error,
15 error,
14 exchange,
16 store,
15 util,
17 util,
16 )
18 )
17
19
20 # This is it's own function so extensions can override it.
21 def _walkstreamfiles(repo):
22 return repo.store.walk()
23
24 def generatev1(repo):
25 """Emit content for version 1 of a streaming clone.
26
27 This is a generator of raw chunks that constitute a streaming clone.
28
29 The stream begins with a line of 2 space-delimited integers containing the
30 number of entries and total bytes size.
31
32 Next, are N entries for each file being transferred. Each file entry starts
33 as a line with the file name and integer size delimited by a null byte.
34 The raw file data follows. Following the raw file data is the next file
35 entry, or EOF.
36
37 When used on the wire protocol, an additional line indicating protocol
38 success will be prepended to the stream. This function is not responsible
39 for adding it.
40
41 This function will obtain a repository lock to ensure a consistent view of
42 the store is captured. It therefore may raise LockError.
43 """
44 entries = []
45 total_bytes = 0
46 # Get consistent snapshot of repo, lock during scan.
47 lock = repo.lock()
48 try:
49 repo.ui.debug('scanning\n')
50 for name, ename, size in _walkstreamfiles(repo):
51 if size:
52 entries.append((name, size))
53 total_bytes += size
54 finally:
55 lock.release()
56
57 repo.ui.debug('%d files, %d bytes to transfer\n' %
58 (len(entries), total_bytes))
59 yield '%d %d\n' % (len(entries), total_bytes)
60
61 svfs = repo.svfs
62 oldaudit = svfs.mustaudit
63 debugflag = repo.ui.debugflag
64 svfs.mustaudit = False
65
66 try:
67 for name, size in entries:
68 if debugflag:
69 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
70 # partially encode name over the wire for backwards compat
71 yield '%s\0%d\n' % (store.encodedir(name), size)
72 if size <= 65536:
73 fp = svfs(name)
74 try:
75 data = fp.read(size)
76 finally:
77 fp.close()
78 yield data
79 else:
80 for chunk in util.filechunkiter(svfs(name), limit=size):
81 yield chunk
82 finally:
83 svfs.mustaudit = oldaudit
84
85 def consumev1(repo, fp):
86 """Apply the contents from version 1 of a streaming clone file handle.
87
88 This takes the output from "streamout" and applies it to the specified
89 repository.
90
91 Like "streamout," the status line added by the wire protocol is not handled
92 by this function.
93 """
94 lock = repo.lock()
95 try:
96 repo.ui.status(_('streaming all changes\n'))
97 l = fp.readline()
98 try:
99 total_files, total_bytes = map(int, l.split(' ', 1))
100 except (ValueError, TypeError):
101 raise error.ResponseError(
102 _('unexpected response from remote server:'), l)
103 repo.ui.status(_('%d files to transfer, %s of data\n') %
104 (total_files, util.bytecount(total_bytes)))
105 handled_bytes = 0
106 repo.ui.progress(_('clone'), 0, total=total_bytes)
107 start = time.time()
108
109 tr = repo.transaction(_('clone'))
110 try:
111 for i in xrange(total_files):
112 # XXX doesn't support '\n' or '\r' in filenames
113 l = fp.readline()
114 try:
115 name, size = l.split('\0', 1)
116 size = int(size)
117 except (ValueError, TypeError):
118 raise error.ResponseError(
119 _('unexpected response from remote server:'), l)
120 if repo.ui.debugflag:
121 repo.ui.debug('adding %s (%s)\n' %
122 (name, util.bytecount(size)))
123 # for backwards compat, name was partially encoded
124 ofp = repo.svfs(store.decodedir(name), 'w')
125 for chunk in util.filechunkiter(fp, limit=size):
126 handled_bytes += len(chunk)
127 repo.ui.progress(_('clone'), handled_bytes,
128 total=total_bytes)
129 ofp.write(chunk)
130 ofp.close()
131 tr.close()
132 finally:
133 tr.release()
134
135 # Writing straight to files circumvented the inmemory caches
136 repo.invalidate()
137
138 elapsed = time.time() - start
139 if elapsed <= 0:
140 elapsed = 0.001
141 repo.ui.progress(_('clone'), None)
142 repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
143 (util.bytecount(total_bytes), elapsed,
144 util.bytecount(total_bytes / elapsed)))
145 finally:
146 lock.release()
147
18 def streamin(repo, remote, remotereqs):
148 def streamin(repo, remote, remotereqs):
19 # Save remote branchmap. We will use it later
149 # Save remote branchmap. We will use it later
20 # to speed up branchcache creation
150 # to speed up branchcache creation
21 rbranchmap = None
151 rbranchmap = None
22 if remote.capable("branchmap"):
152 if remote.capable("branchmap"):
23 rbranchmap = remote.branchmap()
153 rbranchmap = remote.branchmap()
24
154
25 fp = remote.stream_out()
155 fp = remote.stream_out()
26 l = fp.readline()
156 l = fp.readline()
27 try:
157 try:
28 resp = int(l)
158 resp = int(l)
29 except ValueError:
159 except ValueError:
30 raise error.ResponseError(
160 raise error.ResponseError(
31 _('unexpected response from remote server:'), l)
161 _('unexpected response from remote server:'), l)
32 if resp == 1:
162 if resp == 1:
33 raise util.Abort(_('operation forbidden by server'))
163 raise util.Abort(_('operation forbidden by server'))
34 elif resp == 2:
164 elif resp == 2:
35 raise util.Abort(_('locking the remote repository failed'))
165 raise util.Abort(_('locking the remote repository failed'))
36 elif resp != 0:
166 elif resp != 0:
37 raise util.Abort(_('the server sent an unknown error code'))
167 raise util.Abort(_('the server sent an unknown error code'))
38
168
39 applyremotedata(repo, remotereqs, rbranchmap, fp)
169 applyremotedata(repo, remotereqs, rbranchmap, fp)
40 return len(repo.heads()) + 1
170 return len(repo.heads()) + 1
41
171
42 def applyremotedata(repo, remotereqs, remotebranchmap, fp):
172 def applyremotedata(repo, remotereqs, remotebranchmap, fp):
43 """Apply stream clone data to a repository.
173 """Apply stream clone data to a repository.
44
174
45 "remotereqs" is a set of requirements to handle the incoming data.
175 "remotereqs" is a set of requirements to handle the incoming data.
46 "remotebranchmap" is the result of a branchmap lookup on the remote. It
176 "remotebranchmap" is the result of a branchmap lookup on the remote. It
47 can be None.
177 can be None.
48 "fp" is a file object containing the raw stream data, suitable for
178 "fp" is a file object containing the raw stream data, suitable for
49 feeding into exchange.consumestreamclone.
179 feeding into consumev1().
50 """
180 """
51 lock = repo.lock()
181 lock = repo.lock()
52 try:
182 try:
53 exchange.consumestreamclone(repo, fp)
183 consumev1(repo, fp)
54
184
55 # new requirements = old non-format requirements +
185 # new requirements = old non-format requirements +
56 # new format-related remote requirements
186 # new format-related remote requirements
57 # requirements from the streamed-in repository
187 # requirements from the streamed-in repository
58 repo.requirements = remotereqs | (
188 repo.requirements = remotereqs | (
59 repo.requirements - repo.supportedformats)
189 repo.requirements - repo.supportedformats)
60 repo._applyopenerreqs()
190 repo._applyopenerreqs()
61 repo._writerequirements()
191 repo._writerequirements()
62
192
63 if remotebranchmap:
193 if remotebranchmap:
64 rbheads = []
194 rbheads = []
65 closed = []
195 closed = []
66 for bheads in remotebranchmap.itervalues():
196 for bheads in remotebranchmap.itervalues():
67 rbheads.extend(bheads)
197 rbheads.extend(bheads)
68 for h in bheads:
198 for h in bheads:
69 r = repo.changelog.rev(h)
199 r = repo.changelog.rev(h)
70 b, c = repo.changelog.branchinfo(r)
200 b, c = repo.changelog.branchinfo(r)
71 if c:
201 if c:
72 closed.append(h)
202 closed.append(h)
73
203
74 if rbheads:
204 if rbheads:
75 rtiprev = max((int(repo.changelog.rev(node))
205 rtiprev = max((int(repo.changelog.rev(node))
76 for node in rbheads))
206 for node in rbheads))
77 cache = branchmap.branchcache(remotebranchmap,
207 cache = branchmap.branchcache(remotebranchmap,
78 repo[rtiprev].node(),
208 repo[rtiprev].node(),
79 rtiprev,
209 rtiprev,
80 closednodes=closed)
210 closednodes=closed)
81 # Try to stick it as low as possible
211 # Try to stick it as low as possible
82 # filter above served are unlikely to be fetch from a clone
212 # filter above served are unlikely to be fetch from a clone
83 for candidate in ('base', 'immutable', 'served'):
213 for candidate in ('base', 'immutable', 'served'):
84 rview = repo.filtered(candidate)
214 rview = repo.filtered(candidate)
85 if cache.validfor(rview):
215 if cache.validfor(rview):
86 repo._branchcaches[candidate] = cache
216 repo._branchcaches[candidate] = cache
87 cache.write(rview)
217 cache.write(rview)
88 break
218 break
89 repo.invalidate()
219 repo.invalidate()
90 finally:
220 finally:
91 lock.release()
221 lock.release()
@@ -1,812 +1,813 b''
1 # wireproto.py - generic wire protocol support functions
1 # wireproto.py - generic wire protocol support functions
2 #
2 #
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import os
10 import os
11 import sys
11 import sys
12 import tempfile
12 import tempfile
13 import urllib
13 import urllib
14
14
15 from .i18n import _
15 from .i18n import _
16 from .node import (
16 from .node import (
17 bin,
17 bin,
18 hex,
18 hex,
19 )
19 )
20
20
21 from . import (
21 from . import (
22 bundle2,
22 bundle2,
23 changegroup as changegroupmod,
23 changegroup as changegroupmod,
24 encoding,
24 encoding,
25 error,
25 error,
26 exchange,
26 exchange,
27 peer,
27 peer,
28 pushkey as pushkeymod,
28 pushkey as pushkeymod,
29 streamclone,
29 util,
30 util,
30 )
31 )
31
32
32 class abstractserverproto(object):
33 class abstractserverproto(object):
33 """abstract class that summarizes the protocol API
34 """abstract class that summarizes the protocol API
34
35
35 Used as reference and documentation.
36 Used as reference and documentation.
36 """
37 """
37
38
38 def getargs(self, args):
39 def getargs(self, args):
39 """return the value for arguments in <args>
40 """return the value for arguments in <args>
40
41
41 returns a list of values (same order as <args>)"""
42 returns a list of values (same order as <args>)"""
42 raise NotImplementedError()
43 raise NotImplementedError()
43
44
44 def getfile(self, fp):
45 def getfile(self, fp):
45 """write the whole content of a file into a file like object
46 """write the whole content of a file into a file like object
46
47
47 The file is in the form::
48 The file is in the form::
48
49
49 (<chunk-size>\n<chunk>)+0\n
50 (<chunk-size>\n<chunk>)+0\n
50
51
51 chunk size is the ascii version of the int.
52 chunk size is the ascii version of the int.
52 """
53 """
53 raise NotImplementedError()
54 raise NotImplementedError()
54
55
55 def redirect(self):
56 def redirect(self):
56 """may setup interception for stdout and stderr
57 """may setup interception for stdout and stderr
57
58
58 See also the `restore` method."""
59 See also the `restore` method."""
59 raise NotImplementedError()
60 raise NotImplementedError()
60
61
61 # If the `redirect` function does install interception, the `restore`
62 # If the `redirect` function does install interception, the `restore`
62 # function MUST be defined. If interception is not used, this function
63 # function MUST be defined. If interception is not used, this function
63 # MUST NOT be defined.
64 # MUST NOT be defined.
64 #
65 #
65 # left commented here on purpose
66 # left commented here on purpose
66 #
67 #
67 #def restore(self):
68 #def restore(self):
68 # """reinstall previous stdout and stderr and return intercepted stdout
69 # """reinstall previous stdout and stderr and return intercepted stdout
69 # """
70 # """
70 # raise NotImplementedError()
71 # raise NotImplementedError()
71
72
72 def groupchunks(self, cg):
73 def groupchunks(self, cg):
73 """return 4096 chunks from a changegroup object
74 """return 4096 chunks from a changegroup object
74
75
75 Some protocols may have compressed the contents."""
76 Some protocols may have compressed the contents."""
76 raise NotImplementedError()
77 raise NotImplementedError()
77
78
78 class remotebatch(peer.batcher):
79 class remotebatch(peer.batcher):
79 '''batches the queued calls; uses as few roundtrips as possible'''
80 '''batches the queued calls; uses as few roundtrips as possible'''
80 def __init__(self, remote):
81 def __init__(self, remote):
81 '''remote must support _submitbatch(encbatch) and
82 '''remote must support _submitbatch(encbatch) and
82 _submitone(op, encargs)'''
83 _submitone(op, encargs)'''
83 peer.batcher.__init__(self)
84 peer.batcher.__init__(self)
84 self.remote = remote
85 self.remote = remote
85 def submit(self):
86 def submit(self):
86 req, rsp = [], []
87 req, rsp = [], []
87 for name, args, opts, resref in self.calls:
88 for name, args, opts, resref in self.calls:
88 mtd = getattr(self.remote, name)
89 mtd = getattr(self.remote, name)
89 batchablefn = getattr(mtd, 'batchable', None)
90 batchablefn = getattr(mtd, 'batchable', None)
90 if batchablefn is not None:
91 if batchablefn is not None:
91 batchable = batchablefn(mtd.im_self, *args, **opts)
92 batchable = batchablefn(mtd.im_self, *args, **opts)
92 encargsorres, encresref = batchable.next()
93 encargsorres, encresref = batchable.next()
93 if encresref:
94 if encresref:
94 req.append((name, encargsorres,))
95 req.append((name, encargsorres,))
95 rsp.append((batchable, encresref, resref,))
96 rsp.append((batchable, encresref, resref,))
96 else:
97 else:
97 resref.set(encargsorres)
98 resref.set(encargsorres)
98 else:
99 else:
99 if req:
100 if req:
100 self._submitreq(req, rsp)
101 self._submitreq(req, rsp)
101 req, rsp = [], []
102 req, rsp = [], []
102 resref.set(mtd(*args, **opts))
103 resref.set(mtd(*args, **opts))
103 if req:
104 if req:
104 self._submitreq(req, rsp)
105 self._submitreq(req, rsp)
105 def _submitreq(self, req, rsp):
106 def _submitreq(self, req, rsp):
106 encresults = self.remote._submitbatch(req)
107 encresults = self.remote._submitbatch(req)
107 for encres, r in zip(encresults, rsp):
108 for encres, r in zip(encresults, rsp):
108 batchable, encresref, resref = r
109 batchable, encresref, resref = r
109 encresref.set(encres)
110 encresref.set(encres)
110 resref.set(batchable.next())
111 resref.set(batchable.next())
111
112
112 # Forward a couple of names from peer to make wireproto interactions
113 # Forward a couple of names from peer to make wireproto interactions
113 # slightly more sensible.
114 # slightly more sensible.
114 batchable = peer.batchable
115 batchable = peer.batchable
115 future = peer.future
116 future = peer.future
116
117
117 # list of nodes encoding / decoding
118 # list of nodes encoding / decoding
118
119
119 def decodelist(l, sep=' '):
120 def decodelist(l, sep=' '):
120 if l:
121 if l:
121 return map(bin, l.split(sep))
122 return map(bin, l.split(sep))
122 return []
123 return []
123
124
124 def encodelist(l, sep=' '):
125 def encodelist(l, sep=' '):
125 try:
126 try:
126 return sep.join(map(hex, l))
127 return sep.join(map(hex, l))
127 except TypeError:
128 except TypeError:
128 raise
129 raise
129
130
130 # batched call argument encoding
131 # batched call argument encoding
131
132
132 def escapearg(plain):
133 def escapearg(plain):
133 return (plain
134 return (plain
134 .replace(':', ':c')
135 .replace(':', ':c')
135 .replace(',', ':o')
136 .replace(',', ':o')
136 .replace(';', ':s')
137 .replace(';', ':s')
137 .replace('=', ':e'))
138 .replace('=', ':e'))
138
139
139 def unescapearg(escaped):
140 def unescapearg(escaped):
140 return (escaped
141 return (escaped
141 .replace(':e', '=')
142 .replace(':e', '=')
142 .replace(':s', ';')
143 .replace(':s', ';')
143 .replace(':o', ',')
144 .replace(':o', ',')
144 .replace(':c', ':'))
145 .replace(':c', ':'))
145
146
146 # mapping of options accepted by getbundle and their types
147 # mapping of options accepted by getbundle and their types
147 #
148 #
148 # Meant to be extended by extensions. It is extensions responsibility to ensure
149 # Meant to be extended by extensions. It is extensions responsibility to ensure
149 # such options are properly processed in exchange.getbundle.
150 # such options are properly processed in exchange.getbundle.
150 #
151 #
151 # supported types are:
152 # supported types are:
152 #
153 #
153 # :nodes: list of binary nodes
154 # :nodes: list of binary nodes
154 # :csv: list of comma-separated values
155 # :csv: list of comma-separated values
155 # :scsv: list of comma-separated values return as set
156 # :scsv: list of comma-separated values return as set
156 # :plain: string with no transformation needed.
157 # :plain: string with no transformation needed.
157 gboptsmap = {'heads': 'nodes',
158 gboptsmap = {'heads': 'nodes',
158 'common': 'nodes',
159 'common': 'nodes',
159 'obsmarkers': 'boolean',
160 'obsmarkers': 'boolean',
160 'bundlecaps': 'scsv',
161 'bundlecaps': 'scsv',
161 'listkeys': 'csv',
162 'listkeys': 'csv',
162 'cg': 'boolean'}
163 'cg': 'boolean'}
163
164
164 # client side
165 # client side
165
166
166 class wirepeer(peer.peerrepository):
167 class wirepeer(peer.peerrepository):
167
168
168 def batch(self):
169 def batch(self):
169 if self.capable('batch'):
170 if self.capable('batch'):
170 return remotebatch(self)
171 return remotebatch(self)
171 else:
172 else:
172 return peer.localbatch(self)
173 return peer.localbatch(self)
173 def _submitbatch(self, req):
174 def _submitbatch(self, req):
174 cmds = []
175 cmds = []
175 for op, argsdict in req:
176 for op, argsdict in req:
176 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
177 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
177 for k, v in argsdict.iteritems())
178 for k, v in argsdict.iteritems())
178 cmds.append('%s %s' % (op, args))
179 cmds.append('%s %s' % (op, args))
179 rsp = self._call("batch", cmds=';'.join(cmds))
180 rsp = self._call("batch", cmds=';'.join(cmds))
180 return [unescapearg(r) for r in rsp.split(';')]
181 return [unescapearg(r) for r in rsp.split(';')]
181 def _submitone(self, op, args):
182 def _submitone(self, op, args):
182 return self._call(op, **args)
183 return self._call(op, **args)
183
184
184 @batchable
185 @batchable
185 def lookup(self, key):
186 def lookup(self, key):
186 self.requirecap('lookup', _('look up remote revision'))
187 self.requirecap('lookup', _('look up remote revision'))
187 f = future()
188 f = future()
188 yield {'key': encoding.fromlocal(key)}, f
189 yield {'key': encoding.fromlocal(key)}, f
189 d = f.value
190 d = f.value
190 success, data = d[:-1].split(" ", 1)
191 success, data = d[:-1].split(" ", 1)
191 if int(success):
192 if int(success):
192 yield bin(data)
193 yield bin(data)
193 self._abort(error.RepoError(data))
194 self._abort(error.RepoError(data))
194
195
195 @batchable
196 @batchable
196 def heads(self):
197 def heads(self):
197 f = future()
198 f = future()
198 yield {}, f
199 yield {}, f
199 d = f.value
200 d = f.value
200 try:
201 try:
201 yield decodelist(d[:-1])
202 yield decodelist(d[:-1])
202 except ValueError:
203 except ValueError:
203 self._abort(error.ResponseError(_("unexpected response:"), d))
204 self._abort(error.ResponseError(_("unexpected response:"), d))
204
205
205 @batchable
206 @batchable
206 def known(self, nodes):
207 def known(self, nodes):
207 f = future()
208 f = future()
208 yield {'nodes': encodelist(nodes)}, f
209 yield {'nodes': encodelist(nodes)}, f
209 d = f.value
210 d = f.value
210 try:
211 try:
211 yield [bool(int(b)) for b in d]
212 yield [bool(int(b)) for b in d]
212 except ValueError:
213 except ValueError:
213 self._abort(error.ResponseError(_("unexpected response:"), d))
214 self._abort(error.ResponseError(_("unexpected response:"), d))
214
215
215 @batchable
216 @batchable
216 def branchmap(self):
217 def branchmap(self):
217 f = future()
218 f = future()
218 yield {}, f
219 yield {}, f
219 d = f.value
220 d = f.value
220 try:
221 try:
221 branchmap = {}
222 branchmap = {}
222 for branchpart in d.splitlines():
223 for branchpart in d.splitlines():
223 branchname, branchheads = branchpart.split(' ', 1)
224 branchname, branchheads = branchpart.split(' ', 1)
224 branchname = encoding.tolocal(urllib.unquote(branchname))
225 branchname = encoding.tolocal(urllib.unquote(branchname))
225 branchheads = decodelist(branchheads)
226 branchheads = decodelist(branchheads)
226 branchmap[branchname] = branchheads
227 branchmap[branchname] = branchheads
227 yield branchmap
228 yield branchmap
228 except TypeError:
229 except TypeError:
229 self._abort(error.ResponseError(_("unexpected response:"), d))
230 self._abort(error.ResponseError(_("unexpected response:"), d))
230
231
231 def branches(self, nodes):
232 def branches(self, nodes):
232 n = encodelist(nodes)
233 n = encodelist(nodes)
233 d = self._call("branches", nodes=n)
234 d = self._call("branches", nodes=n)
234 try:
235 try:
235 br = [tuple(decodelist(b)) for b in d.splitlines()]
236 br = [tuple(decodelist(b)) for b in d.splitlines()]
236 return br
237 return br
237 except ValueError:
238 except ValueError:
238 self._abort(error.ResponseError(_("unexpected response:"), d))
239 self._abort(error.ResponseError(_("unexpected response:"), d))
239
240
240 def between(self, pairs):
241 def between(self, pairs):
241 batch = 8 # avoid giant requests
242 batch = 8 # avoid giant requests
242 r = []
243 r = []
243 for i in xrange(0, len(pairs), batch):
244 for i in xrange(0, len(pairs), batch):
244 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
245 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
245 d = self._call("between", pairs=n)
246 d = self._call("between", pairs=n)
246 try:
247 try:
247 r.extend(l and decodelist(l) or [] for l in d.splitlines())
248 r.extend(l and decodelist(l) or [] for l in d.splitlines())
248 except ValueError:
249 except ValueError:
249 self._abort(error.ResponseError(_("unexpected response:"), d))
250 self._abort(error.ResponseError(_("unexpected response:"), d))
250 return r
251 return r
251
252
252 @batchable
253 @batchable
253 def pushkey(self, namespace, key, old, new):
254 def pushkey(self, namespace, key, old, new):
254 if not self.capable('pushkey'):
255 if not self.capable('pushkey'):
255 yield False, None
256 yield False, None
256 f = future()
257 f = future()
257 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
258 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
258 yield {'namespace': encoding.fromlocal(namespace),
259 yield {'namespace': encoding.fromlocal(namespace),
259 'key': encoding.fromlocal(key),
260 'key': encoding.fromlocal(key),
260 'old': encoding.fromlocal(old),
261 'old': encoding.fromlocal(old),
261 'new': encoding.fromlocal(new)}, f
262 'new': encoding.fromlocal(new)}, f
262 d = f.value
263 d = f.value
263 d, output = d.split('\n', 1)
264 d, output = d.split('\n', 1)
264 try:
265 try:
265 d = bool(int(d))
266 d = bool(int(d))
266 except ValueError:
267 except ValueError:
267 raise error.ResponseError(
268 raise error.ResponseError(
268 _('push failed (unexpected response):'), d)
269 _('push failed (unexpected response):'), d)
269 for l in output.splitlines(True):
270 for l in output.splitlines(True):
270 self.ui.status(_('remote: '), l)
271 self.ui.status(_('remote: '), l)
271 yield d
272 yield d
272
273
273 @batchable
274 @batchable
274 def listkeys(self, namespace):
275 def listkeys(self, namespace):
275 if not self.capable('pushkey'):
276 if not self.capable('pushkey'):
276 yield {}, None
277 yield {}, None
277 f = future()
278 f = future()
278 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
279 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
279 yield {'namespace': encoding.fromlocal(namespace)}, f
280 yield {'namespace': encoding.fromlocal(namespace)}, f
280 d = f.value
281 d = f.value
281 self.ui.debug('received listkey for "%s": %i bytes\n'
282 self.ui.debug('received listkey for "%s": %i bytes\n'
282 % (namespace, len(d)))
283 % (namespace, len(d)))
283 yield pushkeymod.decodekeys(d)
284 yield pushkeymod.decodekeys(d)
284
285
285 def stream_out(self):
286 def stream_out(self):
286 return self._callstream('stream_out')
287 return self._callstream('stream_out')
287
288
288 def changegroup(self, nodes, kind):
289 def changegroup(self, nodes, kind):
289 n = encodelist(nodes)
290 n = encodelist(nodes)
290 f = self._callcompressable("changegroup", roots=n)
291 f = self._callcompressable("changegroup", roots=n)
291 return changegroupmod.cg1unpacker(f, 'UN')
292 return changegroupmod.cg1unpacker(f, 'UN')
292
293
293 def changegroupsubset(self, bases, heads, kind):
294 def changegroupsubset(self, bases, heads, kind):
294 self.requirecap('changegroupsubset', _('look up remote changes'))
295 self.requirecap('changegroupsubset', _('look up remote changes'))
295 bases = encodelist(bases)
296 bases = encodelist(bases)
296 heads = encodelist(heads)
297 heads = encodelist(heads)
297 f = self._callcompressable("changegroupsubset",
298 f = self._callcompressable("changegroupsubset",
298 bases=bases, heads=heads)
299 bases=bases, heads=heads)
299 return changegroupmod.cg1unpacker(f, 'UN')
300 return changegroupmod.cg1unpacker(f, 'UN')
300
301
301 def getbundle(self, source, **kwargs):
302 def getbundle(self, source, **kwargs):
302 self.requirecap('getbundle', _('look up remote changes'))
303 self.requirecap('getbundle', _('look up remote changes'))
303 opts = {}
304 opts = {}
304 bundlecaps = kwargs.get('bundlecaps')
305 bundlecaps = kwargs.get('bundlecaps')
305 if bundlecaps is not None:
306 if bundlecaps is not None:
306 kwargs['bundlecaps'] = sorted(bundlecaps)
307 kwargs['bundlecaps'] = sorted(bundlecaps)
307 else:
308 else:
308 bundlecaps = () # kwargs could have it to None
309 bundlecaps = () # kwargs could have it to None
309 for key, value in kwargs.iteritems():
310 for key, value in kwargs.iteritems():
310 if value is None:
311 if value is None:
311 continue
312 continue
312 keytype = gboptsmap.get(key)
313 keytype = gboptsmap.get(key)
313 if keytype is None:
314 if keytype is None:
314 assert False, 'unexpected'
315 assert False, 'unexpected'
315 elif keytype == 'nodes':
316 elif keytype == 'nodes':
316 value = encodelist(value)
317 value = encodelist(value)
317 elif keytype in ('csv', 'scsv'):
318 elif keytype in ('csv', 'scsv'):
318 value = ','.join(value)
319 value = ','.join(value)
319 elif keytype == 'boolean':
320 elif keytype == 'boolean':
320 value = '%i' % bool(value)
321 value = '%i' % bool(value)
321 elif keytype != 'plain':
322 elif keytype != 'plain':
322 raise KeyError('unknown getbundle option type %s'
323 raise KeyError('unknown getbundle option type %s'
323 % keytype)
324 % keytype)
324 opts[key] = value
325 opts[key] = value
325 f = self._callcompressable("getbundle", **opts)
326 f = self._callcompressable("getbundle", **opts)
326 if any((cap.startswith('HG2') for cap in bundlecaps)):
327 if any((cap.startswith('HG2') for cap in bundlecaps)):
327 return bundle2.getunbundler(self.ui, f)
328 return bundle2.getunbundler(self.ui, f)
328 else:
329 else:
329 return changegroupmod.cg1unpacker(f, 'UN')
330 return changegroupmod.cg1unpacker(f, 'UN')
330
331
331 def unbundle(self, cg, heads, source):
332 def unbundle(self, cg, heads, source):
332 '''Send cg (a readable file-like object representing the
333 '''Send cg (a readable file-like object representing the
333 changegroup to push, typically a chunkbuffer object) to the
334 changegroup to push, typically a chunkbuffer object) to the
334 remote server as a bundle.
335 remote server as a bundle.
335
336
336 When pushing a bundle10 stream, return an integer indicating the
337 When pushing a bundle10 stream, return an integer indicating the
337 result of the push (see localrepository.addchangegroup()).
338 result of the push (see localrepository.addchangegroup()).
338
339
339 When pushing a bundle20 stream, return a bundle20 stream.'''
340 When pushing a bundle20 stream, return a bundle20 stream.'''
340
341
341 if heads != ['force'] and self.capable('unbundlehash'):
342 if heads != ['force'] and self.capable('unbundlehash'):
342 heads = encodelist(['hashed',
343 heads = encodelist(['hashed',
343 util.sha1(''.join(sorted(heads))).digest()])
344 util.sha1(''.join(sorted(heads))).digest()])
344 else:
345 else:
345 heads = encodelist(heads)
346 heads = encodelist(heads)
346
347
347 if util.safehasattr(cg, 'deltaheader'):
348 if util.safehasattr(cg, 'deltaheader'):
348 # this a bundle10, do the old style call sequence
349 # this a bundle10, do the old style call sequence
349 ret, output = self._callpush("unbundle", cg, heads=heads)
350 ret, output = self._callpush("unbundle", cg, heads=heads)
350 if ret == "":
351 if ret == "":
351 raise error.ResponseError(
352 raise error.ResponseError(
352 _('push failed:'), output)
353 _('push failed:'), output)
353 try:
354 try:
354 ret = int(ret)
355 ret = int(ret)
355 except ValueError:
356 except ValueError:
356 raise error.ResponseError(
357 raise error.ResponseError(
357 _('push failed (unexpected response):'), ret)
358 _('push failed (unexpected response):'), ret)
358
359
359 for l in output.splitlines(True):
360 for l in output.splitlines(True):
360 self.ui.status(_('remote: '), l)
361 self.ui.status(_('remote: '), l)
361 else:
362 else:
362 # bundle2 push. Send a stream, fetch a stream.
363 # bundle2 push. Send a stream, fetch a stream.
363 stream = self._calltwowaystream('unbundle', cg, heads=heads)
364 stream = self._calltwowaystream('unbundle', cg, heads=heads)
364 ret = bundle2.getunbundler(self.ui, stream)
365 ret = bundle2.getunbundler(self.ui, stream)
365 return ret
366 return ret
366
367
367 def debugwireargs(self, one, two, three=None, four=None, five=None):
368 def debugwireargs(self, one, two, three=None, four=None, five=None):
368 # don't pass optional arguments left at their default value
369 # don't pass optional arguments left at their default value
369 opts = {}
370 opts = {}
370 if three is not None:
371 if three is not None:
371 opts['three'] = three
372 opts['three'] = three
372 if four is not None:
373 if four is not None:
373 opts['four'] = four
374 opts['four'] = four
374 return self._call('debugwireargs', one=one, two=two, **opts)
375 return self._call('debugwireargs', one=one, two=two, **opts)
375
376
376 def _call(self, cmd, **args):
377 def _call(self, cmd, **args):
377 """execute <cmd> on the server
378 """execute <cmd> on the server
378
379
379 The command is expected to return a simple string.
380 The command is expected to return a simple string.
380
381
381 returns the server reply as a string."""
382 returns the server reply as a string."""
382 raise NotImplementedError()
383 raise NotImplementedError()
383
384
384 def _callstream(self, cmd, **args):
385 def _callstream(self, cmd, **args):
385 """execute <cmd> on the server
386 """execute <cmd> on the server
386
387
387 The command is expected to return a stream.
388 The command is expected to return a stream.
388
389
389 returns the server reply as a file like object."""
390 returns the server reply as a file like object."""
390 raise NotImplementedError()
391 raise NotImplementedError()
391
392
392 def _callcompressable(self, cmd, **args):
393 def _callcompressable(self, cmd, **args):
393 """execute <cmd> on the server
394 """execute <cmd> on the server
394
395
395 The command is expected to return a stream.
396 The command is expected to return a stream.
396
397
397 The stream may have been compressed in some implementations. This
398 The stream may have been compressed in some implementations. This
398 function takes care of the decompression. This is the only difference
399 function takes care of the decompression. This is the only difference
399 with _callstream.
400 with _callstream.
400
401
401 returns the server reply as a file like object.
402 returns the server reply as a file like object.
402 """
403 """
403 raise NotImplementedError()
404 raise NotImplementedError()
404
405
405 def _callpush(self, cmd, fp, **args):
406 def _callpush(self, cmd, fp, **args):
406 """execute a <cmd> on server
407 """execute a <cmd> on server
407
408
408 The command is expected to be related to a push. Push has a special
409 The command is expected to be related to a push. Push has a special
409 return method.
410 return method.
410
411
411 returns the server reply as a (ret, output) tuple. ret is either
412 returns the server reply as a (ret, output) tuple. ret is either
412 empty (error) or a stringified int.
413 empty (error) or a stringified int.
413 """
414 """
414 raise NotImplementedError()
415 raise NotImplementedError()
415
416
416 def _calltwowaystream(self, cmd, fp, **args):
417 def _calltwowaystream(self, cmd, fp, **args):
417 """execute <cmd> on server
418 """execute <cmd> on server
418
419
419 The command will send a stream to the server and get a stream in reply.
420 The command will send a stream to the server and get a stream in reply.
420 """
421 """
421 raise NotImplementedError()
422 raise NotImplementedError()
422
423
423 def _abort(self, exception):
424 def _abort(self, exception):
424 """clearly abort the wire protocol connection and raise the exception
425 """clearly abort the wire protocol connection and raise the exception
425 """
426 """
426 raise NotImplementedError()
427 raise NotImplementedError()
427
428
428 # server side
429 # server side
429
430
430 # wire protocol command can either return a string or one of these classes.
431 # wire protocol command can either return a string or one of these classes.
431 class streamres(object):
432 class streamres(object):
432 """wireproto reply: binary stream
433 """wireproto reply: binary stream
433
434
434 The call was successful and the result is a stream.
435 The call was successful and the result is a stream.
435 Iterate on the `self.gen` attribute to retrieve chunks.
436 Iterate on the `self.gen` attribute to retrieve chunks.
436 """
437 """
437 def __init__(self, gen):
438 def __init__(self, gen):
438 self.gen = gen
439 self.gen = gen
439
440
440 class pushres(object):
441 class pushres(object):
441 """wireproto reply: success with simple integer return
442 """wireproto reply: success with simple integer return
442
443
443 The call was successful and returned an integer contained in `self.res`.
444 The call was successful and returned an integer contained in `self.res`.
444 """
445 """
445 def __init__(self, res):
446 def __init__(self, res):
446 self.res = res
447 self.res = res
447
448
448 class pusherr(object):
449 class pusherr(object):
449 """wireproto reply: failure
450 """wireproto reply: failure
450
451
451 The call failed. The `self.res` attribute contains the error message.
452 The call failed. The `self.res` attribute contains the error message.
452 """
453 """
453 def __init__(self, res):
454 def __init__(self, res):
454 self.res = res
455 self.res = res
455
456
456 class ooberror(object):
457 class ooberror(object):
457 """wireproto reply: failure of a batch of operation
458 """wireproto reply: failure of a batch of operation
458
459
459 Something failed during a batch call. The error message is stored in
460 Something failed during a batch call. The error message is stored in
460 `self.message`.
461 `self.message`.
461 """
462 """
462 def __init__(self, message):
463 def __init__(self, message):
463 self.message = message
464 self.message = message
464
465
465 def dispatch(repo, proto, command):
466 def dispatch(repo, proto, command):
466 repo = repo.filtered("served")
467 repo = repo.filtered("served")
467 func, spec = commands[command]
468 func, spec = commands[command]
468 args = proto.getargs(spec)
469 args = proto.getargs(spec)
469 return func(repo, proto, *args)
470 return func(repo, proto, *args)
470
471
471 def options(cmd, keys, others):
472 def options(cmd, keys, others):
472 opts = {}
473 opts = {}
473 for k in keys:
474 for k in keys:
474 if k in others:
475 if k in others:
475 opts[k] = others[k]
476 opts[k] = others[k]
476 del others[k]
477 del others[k]
477 if others:
478 if others:
478 sys.stderr.write("warning: %s ignored unexpected arguments %s\n"
479 sys.stderr.write("warning: %s ignored unexpected arguments %s\n"
479 % (cmd, ",".join(others)))
480 % (cmd, ",".join(others)))
480 return opts
481 return opts
481
482
482 # list of commands
483 # list of commands
483 commands = {}
484 commands = {}
484
485
485 def wireprotocommand(name, args=''):
486 def wireprotocommand(name, args=''):
486 """decorator for wire protocol command"""
487 """decorator for wire protocol command"""
487 def register(func):
488 def register(func):
488 commands[name] = (func, args)
489 commands[name] = (func, args)
489 return func
490 return func
490 return register
491 return register
491
492
492 @wireprotocommand('batch', 'cmds *')
493 @wireprotocommand('batch', 'cmds *')
493 def batch(repo, proto, cmds, others):
494 def batch(repo, proto, cmds, others):
494 repo = repo.filtered("served")
495 repo = repo.filtered("served")
495 res = []
496 res = []
496 for pair in cmds.split(';'):
497 for pair in cmds.split(';'):
497 op, args = pair.split(' ', 1)
498 op, args = pair.split(' ', 1)
498 vals = {}
499 vals = {}
499 for a in args.split(','):
500 for a in args.split(','):
500 if a:
501 if a:
501 n, v = a.split('=')
502 n, v = a.split('=')
502 vals[n] = unescapearg(v)
503 vals[n] = unescapearg(v)
503 func, spec = commands[op]
504 func, spec = commands[op]
504 if spec:
505 if spec:
505 keys = spec.split()
506 keys = spec.split()
506 data = {}
507 data = {}
507 for k in keys:
508 for k in keys:
508 if k == '*':
509 if k == '*':
509 star = {}
510 star = {}
510 for key in vals.keys():
511 for key in vals.keys():
511 if key not in keys:
512 if key not in keys:
512 star[key] = vals[key]
513 star[key] = vals[key]
513 data['*'] = star
514 data['*'] = star
514 else:
515 else:
515 data[k] = vals[k]
516 data[k] = vals[k]
516 result = func(repo, proto, *[data[k] for k in keys])
517 result = func(repo, proto, *[data[k] for k in keys])
517 else:
518 else:
518 result = func(repo, proto)
519 result = func(repo, proto)
519 if isinstance(result, ooberror):
520 if isinstance(result, ooberror):
520 return result
521 return result
521 res.append(escapearg(result))
522 res.append(escapearg(result))
522 return ';'.join(res)
523 return ';'.join(res)
523
524
524 @wireprotocommand('between', 'pairs')
525 @wireprotocommand('between', 'pairs')
525 def between(repo, proto, pairs):
526 def between(repo, proto, pairs):
526 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
527 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
527 r = []
528 r = []
528 for b in repo.between(pairs):
529 for b in repo.between(pairs):
529 r.append(encodelist(b) + "\n")
530 r.append(encodelist(b) + "\n")
530 return "".join(r)
531 return "".join(r)
531
532
532 @wireprotocommand('branchmap')
533 @wireprotocommand('branchmap')
533 def branchmap(repo, proto):
534 def branchmap(repo, proto):
534 branchmap = repo.branchmap()
535 branchmap = repo.branchmap()
535 heads = []
536 heads = []
536 for branch, nodes in branchmap.iteritems():
537 for branch, nodes in branchmap.iteritems():
537 branchname = urllib.quote(encoding.fromlocal(branch))
538 branchname = urllib.quote(encoding.fromlocal(branch))
538 branchnodes = encodelist(nodes)
539 branchnodes = encodelist(nodes)
539 heads.append('%s %s' % (branchname, branchnodes))
540 heads.append('%s %s' % (branchname, branchnodes))
540 return '\n'.join(heads)
541 return '\n'.join(heads)
541
542
542 @wireprotocommand('branches', 'nodes')
543 @wireprotocommand('branches', 'nodes')
543 def branches(repo, proto, nodes):
544 def branches(repo, proto, nodes):
544 nodes = decodelist(nodes)
545 nodes = decodelist(nodes)
545 r = []
546 r = []
546 for b in repo.branches(nodes):
547 for b in repo.branches(nodes):
547 r.append(encodelist(b) + "\n")
548 r.append(encodelist(b) + "\n")
548 return "".join(r)
549 return "".join(r)
549
550
550
551
551 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
552 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
552 'known', 'getbundle', 'unbundlehash', 'batch']
553 'known', 'getbundle', 'unbundlehash', 'batch']
553
554
554 def _capabilities(repo, proto):
555 def _capabilities(repo, proto):
555 """return a list of capabilities for a repo
556 """return a list of capabilities for a repo
556
557
557 This function exists to allow extensions to easily wrap capabilities
558 This function exists to allow extensions to easily wrap capabilities
558 computation
559 computation
559
560
560 - returns a lists: easy to alter
561 - returns a lists: easy to alter
561 - change done here will be propagated to both `capabilities` and `hello`
562 - change done here will be propagated to both `capabilities` and `hello`
562 command without any other action needed.
563 command without any other action needed.
563 """
564 """
564 # copy to prevent modification of the global list
565 # copy to prevent modification of the global list
565 caps = list(wireprotocaps)
566 caps = list(wireprotocaps)
566 if _allowstream(repo.ui):
567 if _allowstream(repo.ui):
567 if repo.ui.configbool('server', 'preferuncompressed', False):
568 if repo.ui.configbool('server', 'preferuncompressed', False):
568 caps.append('stream-preferred')
569 caps.append('stream-preferred')
569 requiredformats = repo.requirements & repo.supportedformats
570 requiredformats = repo.requirements & repo.supportedformats
570 # if our local revlogs are just revlogv1, add 'stream' cap
571 # if our local revlogs are just revlogv1, add 'stream' cap
571 if not requiredformats - set(('revlogv1',)):
572 if not requiredformats - set(('revlogv1',)):
572 caps.append('stream')
573 caps.append('stream')
573 # otherwise, add 'streamreqs' detailing our local revlog format
574 # otherwise, add 'streamreqs' detailing our local revlog format
574 else:
575 else:
575 caps.append('streamreqs=%s' % ','.join(requiredformats))
576 caps.append('streamreqs=%s' % ','.join(requiredformats))
576 if repo.ui.configbool('experimental', 'bundle2-advertise', True):
577 if repo.ui.configbool('experimental', 'bundle2-advertise', True):
577 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
578 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
578 caps.append('bundle2=' + urllib.quote(capsblob))
579 caps.append('bundle2=' + urllib.quote(capsblob))
579 caps.append('unbundle=%s' % ','.join(changegroupmod.bundlepriority))
580 caps.append('unbundle=%s' % ','.join(changegroupmod.bundlepriority))
580 caps.append(
581 caps.append(
581 'httpheader=%d' % repo.ui.configint('server', 'maxhttpheaderlen', 1024))
582 'httpheader=%d' % repo.ui.configint('server', 'maxhttpheaderlen', 1024))
582 return caps
583 return caps
583
584
584 # If you are writing an extension and consider wrapping this function. Wrap
585 # If you are writing an extension and consider wrapping this function. Wrap
585 # `_capabilities` instead.
586 # `_capabilities` instead.
586 @wireprotocommand('capabilities')
587 @wireprotocommand('capabilities')
587 def capabilities(repo, proto):
588 def capabilities(repo, proto):
588 return ' '.join(_capabilities(repo, proto))
589 return ' '.join(_capabilities(repo, proto))
589
590
590 @wireprotocommand('changegroup', 'roots')
591 @wireprotocommand('changegroup', 'roots')
591 def changegroup(repo, proto, roots):
592 def changegroup(repo, proto, roots):
592 nodes = decodelist(roots)
593 nodes = decodelist(roots)
593 cg = changegroupmod.changegroup(repo, nodes, 'serve')
594 cg = changegroupmod.changegroup(repo, nodes, 'serve')
594 return streamres(proto.groupchunks(cg))
595 return streamres(proto.groupchunks(cg))
595
596
596 @wireprotocommand('changegroupsubset', 'bases heads')
597 @wireprotocommand('changegroupsubset', 'bases heads')
597 def changegroupsubset(repo, proto, bases, heads):
598 def changegroupsubset(repo, proto, bases, heads):
598 bases = decodelist(bases)
599 bases = decodelist(bases)
599 heads = decodelist(heads)
600 heads = decodelist(heads)
600 cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve')
601 cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve')
601 return streamres(proto.groupchunks(cg))
602 return streamres(proto.groupchunks(cg))
602
603
603 @wireprotocommand('debugwireargs', 'one two *')
604 @wireprotocommand('debugwireargs', 'one two *')
604 def debugwireargs(repo, proto, one, two, others):
605 def debugwireargs(repo, proto, one, two, others):
605 # only accept optional args from the known set
606 # only accept optional args from the known set
606 opts = options('debugwireargs', ['three', 'four'], others)
607 opts = options('debugwireargs', ['three', 'four'], others)
607 return repo.debugwireargs(one, two, **opts)
608 return repo.debugwireargs(one, two, **opts)
608
609
609 # List of options accepted by getbundle.
610 # List of options accepted by getbundle.
610 #
611 #
611 # Meant to be extended by extensions. It is the extension's responsibility to
612 # Meant to be extended by extensions. It is the extension's responsibility to
612 # ensure such options are properly processed in exchange.getbundle.
613 # ensure such options are properly processed in exchange.getbundle.
613 gboptslist = ['heads', 'common', 'bundlecaps']
614 gboptslist = ['heads', 'common', 'bundlecaps']
614
615
615 @wireprotocommand('getbundle', '*')
616 @wireprotocommand('getbundle', '*')
616 def getbundle(repo, proto, others):
617 def getbundle(repo, proto, others):
617 opts = options('getbundle', gboptsmap.keys(), others)
618 opts = options('getbundle', gboptsmap.keys(), others)
618 for k, v in opts.iteritems():
619 for k, v in opts.iteritems():
619 keytype = gboptsmap[k]
620 keytype = gboptsmap[k]
620 if keytype == 'nodes':
621 if keytype == 'nodes':
621 opts[k] = decodelist(v)
622 opts[k] = decodelist(v)
622 elif keytype == 'csv':
623 elif keytype == 'csv':
623 opts[k] = list(v.split(','))
624 opts[k] = list(v.split(','))
624 elif keytype == 'scsv':
625 elif keytype == 'scsv':
625 opts[k] = set(v.split(','))
626 opts[k] = set(v.split(','))
626 elif keytype == 'boolean':
627 elif keytype == 'boolean':
627 opts[k] = bool(v)
628 opts[k] = bool(v)
628 elif keytype != 'plain':
629 elif keytype != 'plain':
629 raise KeyError('unknown getbundle option type %s'
630 raise KeyError('unknown getbundle option type %s'
630 % keytype)
631 % keytype)
631 cg = exchange.getbundle(repo, 'serve', **opts)
632 cg = exchange.getbundle(repo, 'serve', **opts)
632 return streamres(proto.groupchunks(cg))
633 return streamres(proto.groupchunks(cg))
633
634
634 @wireprotocommand('heads')
635 @wireprotocommand('heads')
635 def heads(repo, proto):
636 def heads(repo, proto):
636 h = repo.heads()
637 h = repo.heads()
637 return encodelist(h) + "\n"
638 return encodelist(h) + "\n"
638
639
639 @wireprotocommand('hello')
640 @wireprotocommand('hello')
640 def hello(repo, proto):
641 def hello(repo, proto):
641 '''the hello command returns a set of lines describing various
642 '''the hello command returns a set of lines describing various
642 interesting things about the server, in an RFC822-like format.
643 interesting things about the server, in an RFC822-like format.
643 Currently the only one defined is "capabilities", which
644 Currently the only one defined is "capabilities", which
644 consists of a line in the form:
645 consists of a line in the form:
645
646
646 capabilities: space separated list of tokens
647 capabilities: space separated list of tokens
647 '''
648 '''
648 return "capabilities: %s\n" % (capabilities(repo, proto))
649 return "capabilities: %s\n" % (capabilities(repo, proto))
649
650
650 @wireprotocommand('listkeys', 'namespace')
651 @wireprotocommand('listkeys', 'namespace')
651 def listkeys(repo, proto, namespace):
652 def listkeys(repo, proto, namespace):
652 d = repo.listkeys(encoding.tolocal(namespace)).items()
653 d = repo.listkeys(encoding.tolocal(namespace)).items()
653 return pushkeymod.encodekeys(d)
654 return pushkeymod.encodekeys(d)
654
655
655 @wireprotocommand('lookup', 'key')
656 @wireprotocommand('lookup', 'key')
656 def lookup(repo, proto, key):
657 def lookup(repo, proto, key):
657 try:
658 try:
658 k = encoding.tolocal(key)
659 k = encoding.tolocal(key)
659 c = repo[k]
660 c = repo[k]
660 r = c.hex()
661 r = c.hex()
661 success = 1
662 success = 1
662 except Exception as inst:
663 except Exception as inst:
663 r = str(inst)
664 r = str(inst)
664 success = 0
665 success = 0
665 return "%s %s\n" % (success, r)
666 return "%s %s\n" % (success, r)
666
667
667 @wireprotocommand('known', 'nodes *')
668 @wireprotocommand('known', 'nodes *')
668 def known(repo, proto, nodes, others):
669 def known(repo, proto, nodes, others):
669 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
670 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
670
671
671 @wireprotocommand('pushkey', 'namespace key old new')
672 @wireprotocommand('pushkey', 'namespace key old new')
672 def pushkey(repo, proto, namespace, key, old, new):
673 def pushkey(repo, proto, namespace, key, old, new):
673 # compatibility with pre-1.8 clients which were accidentally
674 # compatibility with pre-1.8 clients which were accidentally
674 # sending raw binary nodes rather than utf-8-encoded hex
675 # sending raw binary nodes rather than utf-8-encoded hex
675 if len(new) == 20 and new.encode('string-escape') != new:
676 if len(new) == 20 and new.encode('string-escape') != new:
676 # looks like it could be a binary node
677 # looks like it could be a binary node
677 try:
678 try:
678 new.decode('utf-8')
679 new.decode('utf-8')
679 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
680 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
680 except UnicodeDecodeError:
681 except UnicodeDecodeError:
681 pass # binary, leave unmodified
682 pass # binary, leave unmodified
682 else:
683 else:
683 new = encoding.tolocal(new) # normal path
684 new = encoding.tolocal(new) # normal path
684
685
685 if util.safehasattr(proto, 'restore'):
686 if util.safehasattr(proto, 'restore'):
686
687
687 proto.redirect()
688 proto.redirect()
688
689
689 try:
690 try:
690 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
691 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
691 encoding.tolocal(old), new) or False
692 encoding.tolocal(old), new) or False
692 except util.Abort:
693 except util.Abort:
693 r = False
694 r = False
694
695
695 output = proto.restore()
696 output = proto.restore()
696
697
697 return '%s\n%s' % (int(r), output)
698 return '%s\n%s' % (int(r), output)
698
699
699 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
700 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
700 encoding.tolocal(old), new)
701 encoding.tolocal(old), new)
701 return '%s\n' % int(r)
702 return '%s\n' % int(r)
702
703
703 def _allowstream(ui):
704 def _allowstream(ui):
704 return ui.configbool('server', 'uncompressed', True, untrusted=True)
705 return ui.configbool('server', 'uncompressed', True, untrusted=True)
705
706
706 @wireprotocommand('stream_out')
707 @wireprotocommand('stream_out')
707 def stream(repo, proto):
708 def stream(repo, proto):
708 '''If the server supports streaming clone, it advertises the "stream"
709 '''If the server supports streaming clone, it advertises the "stream"
709 capability with a value representing the version and flags of the repo
710 capability with a value representing the version and flags of the repo
710 it is serving. Client checks to see if it understands the format.
711 it is serving. Client checks to see if it understands the format.
711 '''
712 '''
712 if not _allowstream(repo.ui):
713 if not _allowstream(repo.ui):
713 return '1\n'
714 return '1\n'
714
715
715 def getstream(it):
716 def getstream(it):
716 yield '0\n'
717 yield '0\n'
717 for chunk in it:
718 for chunk in it:
718 yield chunk
719 yield chunk
719
720
720 try:
721 try:
721 # LockError may be raised before the first result is yielded. Don't
722 # LockError may be raised before the first result is yielded. Don't
722 # emit output until we're sure we got the lock successfully.
723 # emit output until we're sure we got the lock successfully.
723 it = exchange.generatestreamclone(repo)
724 it = streamclone.generatev1(repo)
724 return streamres(getstream(it))
725 return streamres(getstream(it))
725 except error.LockError:
726 except error.LockError:
726 return '2\n'
727 return '2\n'
727
728
728 @wireprotocommand('unbundle', 'heads')
729 @wireprotocommand('unbundle', 'heads')
729 def unbundle(repo, proto, heads):
730 def unbundle(repo, proto, heads):
730 their_heads = decodelist(heads)
731 their_heads = decodelist(heads)
731
732
732 try:
733 try:
733 proto.redirect()
734 proto.redirect()
734
735
735 exchange.check_heads(repo, their_heads, 'preparing changes')
736 exchange.check_heads(repo, their_heads, 'preparing changes')
736
737
737 # write bundle data to temporary file because it can be big
738 # write bundle data to temporary file because it can be big
738 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
739 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
739 fp = os.fdopen(fd, 'wb+')
740 fp = os.fdopen(fd, 'wb+')
740 r = 0
741 r = 0
741 try:
742 try:
742 proto.getfile(fp)
743 proto.getfile(fp)
743 fp.seek(0)
744 fp.seek(0)
744 gen = exchange.readbundle(repo.ui, fp, None)
745 gen = exchange.readbundle(repo.ui, fp, None)
745 r = exchange.unbundle(repo, gen, their_heads, 'serve',
746 r = exchange.unbundle(repo, gen, their_heads, 'serve',
746 proto._client())
747 proto._client())
747 if util.safehasattr(r, 'addpart'):
748 if util.safehasattr(r, 'addpart'):
748 # The return looks streamable, we are in the bundle2 case and
749 # The return looks streamable, we are in the bundle2 case and
749 # should return a stream.
750 # should return a stream.
750 return streamres(r.getchunks())
751 return streamres(r.getchunks())
751 return pushres(r)
752 return pushres(r)
752
753
753 finally:
754 finally:
754 fp.close()
755 fp.close()
755 os.unlink(tempname)
756 os.unlink(tempname)
756
757
757 except (error.BundleValueError, util.Abort, error.PushRaced) as exc:
758 except (error.BundleValueError, util.Abort, error.PushRaced) as exc:
758 # handle non-bundle2 case first
759 # handle non-bundle2 case first
759 if not getattr(exc, 'duringunbundle2', False):
760 if not getattr(exc, 'duringunbundle2', False):
760 try:
761 try:
761 raise
762 raise
762 except util.Abort:
763 except util.Abort:
763 # The old code we moved used sys.stderr directly.
764 # The old code we moved used sys.stderr directly.
764 # We did not change it to minimise code change.
765 # We did not change it to minimise code change.
765 # This need to be moved to something proper.
766 # This need to be moved to something proper.
766 # Feel free to do it.
767 # Feel free to do it.
767 sys.stderr.write("abort: %s\n" % exc)
768 sys.stderr.write("abort: %s\n" % exc)
768 return pushres(0)
769 return pushres(0)
769 except error.PushRaced:
770 except error.PushRaced:
770 return pusherr(str(exc))
771 return pusherr(str(exc))
771
772
772 bundler = bundle2.bundle20(repo.ui)
773 bundler = bundle2.bundle20(repo.ui)
773 for out in getattr(exc, '_bundle2salvagedoutput', ()):
774 for out in getattr(exc, '_bundle2salvagedoutput', ()):
774 bundler.addpart(out)
775 bundler.addpart(out)
775 try:
776 try:
776 try:
777 try:
777 raise
778 raise
778 except error.PushkeyFailed as exc:
779 except error.PushkeyFailed as exc:
779 # check client caps
780 # check client caps
780 remotecaps = getattr(exc, '_replycaps', None)
781 remotecaps = getattr(exc, '_replycaps', None)
781 if (remotecaps is not None
782 if (remotecaps is not None
782 and 'pushkey' not in remotecaps.get('error', ())):
783 and 'pushkey' not in remotecaps.get('error', ())):
783 # no support remote side, fallback to Abort handler.
784 # no support remote side, fallback to Abort handler.
784 raise
785 raise
785 part = bundler.newpart('error:pushkey')
786 part = bundler.newpart('error:pushkey')
786 part.addparam('in-reply-to', exc.partid)
787 part.addparam('in-reply-to', exc.partid)
787 if exc.namespace is not None:
788 if exc.namespace is not None:
788 part.addparam('namespace', exc.namespace, mandatory=False)
789 part.addparam('namespace', exc.namespace, mandatory=False)
789 if exc.key is not None:
790 if exc.key is not None:
790 part.addparam('key', exc.key, mandatory=False)
791 part.addparam('key', exc.key, mandatory=False)
791 if exc.new is not None:
792 if exc.new is not None:
792 part.addparam('new', exc.new, mandatory=False)
793 part.addparam('new', exc.new, mandatory=False)
793 if exc.old is not None:
794 if exc.old is not None:
794 part.addparam('old', exc.old, mandatory=False)
795 part.addparam('old', exc.old, mandatory=False)
795 if exc.ret is not None:
796 if exc.ret is not None:
796 part.addparam('ret', exc.ret, mandatory=False)
797 part.addparam('ret', exc.ret, mandatory=False)
797 except error.BundleValueError as exc:
798 except error.BundleValueError as exc:
798 errpart = bundler.newpart('error:unsupportedcontent')
799 errpart = bundler.newpart('error:unsupportedcontent')
799 if exc.parttype is not None:
800 if exc.parttype is not None:
800 errpart.addparam('parttype', exc.parttype)
801 errpart.addparam('parttype', exc.parttype)
801 if exc.params:
802 if exc.params:
802 errpart.addparam('params', '\0'.join(exc.params))
803 errpart.addparam('params', '\0'.join(exc.params))
803 except util.Abort as exc:
804 except util.Abort as exc:
804 manargs = [('message', str(exc))]
805 manargs = [('message', str(exc))]
805 advargs = []
806 advargs = []
806 if exc.hint is not None:
807 if exc.hint is not None:
807 advargs.append(('hint', exc.hint))
808 advargs.append(('hint', exc.hint))
808 bundler.addpart(bundle2.bundlepart('error:abort',
809 bundler.addpart(bundle2.bundlepart('error:abort',
809 manargs, advargs))
810 manargs, advargs))
810 except error.PushRaced as exc:
811 except error.PushRaced as exc:
811 bundler.newpart('error:pushraced', [('message', str(exc))])
812 bundler.newpart('error:pushraced', [('message', str(exc))])
812 return streamres(bundler.getchunks())
813 return streamres(bundler.getchunks())
General Comments 0
You need to be logged in to leave comments. Login now