##// END OF EJS Templates
branchmap: move branch cache code out of streamclone.py...
Gregory Szorc -
r26460:79ef8675 default
parent child Browse files
Show More
@@ -1,458 +1,490 b''
1 # branchmap.py - logic to computes, maintain and stores branchmap for local repo
1 # branchmap.py - logic to computes, maintain and stores branchmap for local repo
2 #
2 #
3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import array
10 import array
11 import struct
11 import struct
12 import time
12 import time
13
13
14 from .node import (
14 from .node import (
15 bin,
15 bin,
16 hex,
16 hex,
17 nullid,
17 nullid,
18 nullrev,
18 nullrev,
19 )
19 )
20 from . import (
20 from . import (
21 encoding,
21 encoding,
22 scmutil,
22 scmutil,
23 util,
23 util,
24 )
24 )
25
25
26 array = array.array
26 array = array.array
27 calcsize = struct.calcsize
27 calcsize = struct.calcsize
28 pack = struct.pack
28 pack = struct.pack
29 unpack = struct.unpack
29 unpack = struct.unpack
30
30
31 def _filename(repo):
31 def _filename(repo):
32 """name of a branchcache file for a given repo or repoview"""
32 """name of a branchcache file for a given repo or repoview"""
33 filename = "cache/branch2"
33 filename = "cache/branch2"
34 if repo.filtername:
34 if repo.filtername:
35 filename = '%s-%s' % (filename, repo.filtername)
35 filename = '%s-%s' % (filename, repo.filtername)
36 return filename
36 return filename
37
37
38 def read(repo):
38 def read(repo):
39 try:
39 try:
40 f = repo.vfs(_filename(repo))
40 f = repo.vfs(_filename(repo))
41 lines = f.read().split('\n')
41 lines = f.read().split('\n')
42 f.close()
42 f.close()
43 except (IOError, OSError):
43 except (IOError, OSError):
44 return None
44 return None
45
45
46 try:
46 try:
47 cachekey = lines.pop(0).split(" ", 2)
47 cachekey = lines.pop(0).split(" ", 2)
48 last, lrev = cachekey[:2]
48 last, lrev = cachekey[:2]
49 last, lrev = bin(last), int(lrev)
49 last, lrev = bin(last), int(lrev)
50 filteredhash = None
50 filteredhash = None
51 if len(cachekey) > 2:
51 if len(cachekey) > 2:
52 filteredhash = bin(cachekey[2])
52 filteredhash = bin(cachekey[2])
53 partial = branchcache(tipnode=last, tiprev=lrev,
53 partial = branchcache(tipnode=last, tiprev=lrev,
54 filteredhash=filteredhash)
54 filteredhash=filteredhash)
55 if not partial.validfor(repo):
55 if not partial.validfor(repo):
56 # invalidate the cache
56 # invalidate the cache
57 raise ValueError('tip differs')
57 raise ValueError('tip differs')
58 for l in lines:
58 for l in lines:
59 if not l:
59 if not l:
60 continue
60 continue
61 node, state, label = l.split(" ", 2)
61 node, state, label = l.split(" ", 2)
62 if state not in 'oc':
62 if state not in 'oc':
63 raise ValueError('invalid branch state')
63 raise ValueError('invalid branch state')
64 label = encoding.tolocal(label.strip())
64 label = encoding.tolocal(label.strip())
65 if not node in repo:
65 if not node in repo:
66 raise ValueError('node %s does not exist' % node)
66 raise ValueError('node %s does not exist' % node)
67 node = bin(node)
67 node = bin(node)
68 partial.setdefault(label, []).append(node)
68 partial.setdefault(label, []).append(node)
69 if state == 'c':
69 if state == 'c':
70 partial._closednodes.add(node)
70 partial._closednodes.add(node)
71 except KeyboardInterrupt:
71 except KeyboardInterrupt:
72 raise
72 raise
73 except Exception as inst:
73 except Exception as inst:
74 if repo.ui.debugflag:
74 if repo.ui.debugflag:
75 msg = 'invalid branchheads cache'
75 msg = 'invalid branchheads cache'
76 if repo.filtername is not None:
76 if repo.filtername is not None:
77 msg += ' (%s)' % repo.filtername
77 msg += ' (%s)' % repo.filtername
78 msg += ': %s\n'
78 msg += ': %s\n'
79 repo.ui.debug(msg % inst)
79 repo.ui.debug(msg % inst)
80 partial = None
80 partial = None
81 return partial
81 return partial
82
82
83 ### Nearest subset relation
83 ### Nearest subset relation
84 # Nearest subset of filter X is a filter Y so that:
84 # Nearest subset of filter X is a filter Y so that:
85 # * Y is included in X,
85 # * Y is included in X,
86 # * X - Y is as small as possible.
86 # * X - Y is as small as possible.
87 # This create and ordering used for branchmap purpose.
87 # This create and ordering used for branchmap purpose.
88 # the ordering may be partial
88 # the ordering may be partial
89 subsettable = {None: 'visible',
89 subsettable = {None: 'visible',
90 'visible': 'served',
90 'visible': 'served',
91 'served': 'immutable',
91 'served': 'immutable',
92 'immutable': 'base'}
92 'immutable': 'base'}
93
93
94 def updatecache(repo):
94 def updatecache(repo):
95 cl = repo.changelog
95 cl = repo.changelog
96 filtername = repo.filtername
96 filtername = repo.filtername
97 partial = repo._branchcaches.get(filtername)
97 partial = repo._branchcaches.get(filtername)
98
98
99 revs = []
99 revs = []
100 if partial is None or not partial.validfor(repo):
100 if partial is None or not partial.validfor(repo):
101 partial = read(repo)
101 partial = read(repo)
102 if partial is None:
102 if partial is None:
103 subsetname = subsettable.get(filtername)
103 subsetname = subsettable.get(filtername)
104 if subsetname is None:
104 if subsetname is None:
105 partial = branchcache()
105 partial = branchcache()
106 else:
106 else:
107 subset = repo.filtered(subsetname)
107 subset = repo.filtered(subsetname)
108 partial = subset.branchmap().copy()
108 partial = subset.branchmap().copy()
109 extrarevs = subset.changelog.filteredrevs - cl.filteredrevs
109 extrarevs = subset.changelog.filteredrevs - cl.filteredrevs
110 revs.extend(r for r in extrarevs if r <= partial.tiprev)
110 revs.extend(r for r in extrarevs if r <= partial.tiprev)
111 revs.extend(cl.revs(start=partial.tiprev + 1))
111 revs.extend(cl.revs(start=partial.tiprev + 1))
112 if revs:
112 if revs:
113 partial.update(repo, revs)
113 partial.update(repo, revs)
114 partial.write(repo)
114 partial.write(repo)
115
115
116 assert partial.validfor(repo), filtername
116 assert partial.validfor(repo), filtername
117 repo._branchcaches[repo.filtername] = partial
117 repo._branchcaches[repo.filtername] = partial
118
118
119 def replacecache(repo, bm):
120 """Replace the branchmap cache for a repo with a branch mapping.
121
122 This is likely only called during clone with a branch map from a remote.
123 """
124 rbheads = []
125 closed = []
126 for bheads in bm.itervalues():
127 rbheads.extend(bheads)
128 for h in bheads:
129 r = repo.changelog.rev(h)
130 b, c = repo.changelog.branchinfo(r)
131 if c:
132 closed.append(h)
133
134 if rbheads:
135 rtiprev = max((int(repo.changelog.rev(node))
136 for node in rbheads))
137 cache = branchcache(bm,
138 repo[rtiprev].node(),
139 rtiprev,
140 closednodes=closed)
141
142 # Try to stick it as low as possible
143 # filter above served are unlikely to be fetch from a clone
144 for candidate in ('base', 'immutable', 'served'):
145 rview = repo.filtered(candidate)
146 if cache.validfor(rview):
147 repo._branchcaches[candidate] = cache
148 cache.write(rview)
149 break
150
119 class branchcache(dict):
151 class branchcache(dict):
120 """A dict like object that hold branches heads cache.
152 """A dict like object that hold branches heads cache.
121
153
122 This cache is used to avoid costly computations to determine all the
154 This cache is used to avoid costly computations to determine all the
123 branch heads of a repo.
155 branch heads of a repo.
124
156
125 The cache is serialized on disk in the following format:
157 The cache is serialized on disk in the following format:
126
158
127 <tip hex node> <tip rev number> [optional filtered repo hex hash]
159 <tip hex node> <tip rev number> [optional filtered repo hex hash]
128 <branch head hex node> <open/closed state> <branch name>
160 <branch head hex node> <open/closed state> <branch name>
129 <branch head hex node> <open/closed state> <branch name>
161 <branch head hex node> <open/closed state> <branch name>
130 ...
162 ...
131
163
132 The first line is used to check if the cache is still valid. If the
164 The first line is used to check if the cache is still valid. If the
133 branch cache is for a filtered repo view, an optional third hash is
165 branch cache is for a filtered repo view, an optional third hash is
134 included that hashes the hashes of all filtered revisions.
166 included that hashes the hashes of all filtered revisions.
135
167
136 The open/closed state is represented by a single letter 'o' or 'c'.
168 The open/closed state is represented by a single letter 'o' or 'c'.
137 This field can be used to avoid changelog reads when determining if a
169 This field can be used to avoid changelog reads when determining if a
138 branch head closes a branch or not.
170 branch head closes a branch or not.
139 """
171 """
140
172
141 def __init__(self, entries=(), tipnode=nullid, tiprev=nullrev,
173 def __init__(self, entries=(), tipnode=nullid, tiprev=nullrev,
142 filteredhash=None, closednodes=None):
174 filteredhash=None, closednodes=None):
143 super(branchcache, self).__init__(entries)
175 super(branchcache, self).__init__(entries)
144 self.tipnode = tipnode
176 self.tipnode = tipnode
145 self.tiprev = tiprev
177 self.tiprev = tiprev
146 self.filteredhash = filteredhash
178 self.filteredhash = filteredhash
147 # closednodes is a set of nodes that close their branch. If the branch
179 # closednodes is a set of nodes that close their branch. If the branch
148 # cache has been updated, it may contain nodes that are no longer
180 # cache has been updated, it may contain nodes that are no longer
149 # heads.
181 # heads.
150 if closednodes is None:
182 if closednodes is None:
151 self._closednodes = set()
183 self._closednodes = set()
152 else:
184 else:
153 self._closednodes = closednodes
185 self._closednodes = closednodes
154
186
155 def validfor(self, repo):
187 def validfor(self, repo):
156 """Is the cache content valid regarding a repo
188 """Is the cache content valid regarding a repo
157
189
158 - False when cached tipnode is unknown or if we detect a strip.
190 - False when cached tipnode is unknown or if we detect a strip.
159 - True when cache is up to date or a subset of current repo."""
191 - True when cache is up to date or a subset of current repo."""
160 try:
192 try:
161 return ((self.tipnode == repo.changelog.node(self.tiprev))
193 return ((self.tipnode == repo.changelog.node(self.tiprev))
162 and (self.filteredhash == \
194 and (self.filteredhash == \
163 scmutil.filteredhash(repo, self.tiprev)))
195 scmutil.filteredhash(repo, self.tiprev)))
164 except IndexError:
196 except IndexError:
165 return False
197 return False
166
198
167 def _branchtip(self, heads):
199 def _branchtip(self, heads):
168 '''Return tuple with last open head in heads and false,
200 '''Return tuple with last open head in heads and false,
169 otherwise return last closed head and true.'''
201 otherwise return last closed head and true.'''
170 tip = heads[-1]
202 tip = heads[-1]
171 closed = True
203 closed = True
172 for h in reversed(heads):
204 for h in reversed(heads):
173 if h not in self._closednodes:
205 if h not in self._closednodes:
174 tip = h
206 tip = h
175 closed = False
207 closed = False
176 break
208 break
177 return tip, closed
209 return tip, closed
178
210
179 def branchtip(self, branch):
211 def branchtip(self, branch):
180 '''Return the tipmost open head on branch head, otherwise return the
212 '''Return the tipmost open head on branch head, otherwise return the
181 tipmost closed head on branch.
213 tipmost closed head on branch.
182 Raise KeyError for unknown branch.'''
214 Raise KeyError for unknown branch.'''
183 return self._branchtip(self[branch])[0]
215 return self._branchtip(self[branch])[0]
184
216
185 def branchheads(self, branch, closed=False):
217 def branchheads(self, branch, closed=False):
186 heads = self[branch]
218 heads = self[branch]
187 if not closed:
219 if not closed:
188 heads = [h for h in heads if h not in self._closednodes]
220 heads = [h for h in heads if h not in self._closednodes]
189 return heads
221 return heads
190
222
191 def iterbranches(self):
223 def iterbranches(self):
192 for bn, heads in self.iteritems():
224 for bn, heads in self.iteritems():
193 yield (bn, heads) + self._branchtip(heads)
225 yield (bn, heads) + self._branchtip(heads)
194
226
195 def copy(self):
227 def copy(self):
196 """return an deep copy of the branchcache object"""
228 """return an deep copy of the branchcache object"""
197 return branchcache(self, self.tipnode, self.tiprev, self.filteredhash,
229 return branchcache(self, self.tipnode, self.tiprev, self.filteredhash,
198 self._closednodes)
230 self._closednodes)
199
231
200 def write(self, repo):
232 def write(self, repo):
201 try:
233 try:
202 f = repo.vfs(_filename(repo), "w", atomictemp=True)
234 f = repo.vfs(_filename(repo), "w", atomictemp=True)
203 cachekey = [hex(self.tipnode), str(self.tiprev)]
235 cachekey = [hex(self.tipnode), str(self.tiprev)]
204 if self.filteredhash is not None:
236 if self.filteredhash is not None:
205 cachekey.append(hex(self.filteredhash))
237 cachekey.append(hex(self.filteredhash))
206 f.write(" ".join(cachekey) + '\n')
238 f.write(" ".join(cachekey) + '\n')
207 nodecount = 0
239 nodecount = 0
208 for label, nodes in sorted(self.iteritems()):
240 for label, nodes in sorted(self.iteritems()):
209 for node in nodes:
241 for node in nodes:
210 nodecount += 1
242 nodecount += 1
211 if node in self._closednodes:
243 if node in self._closednodes:
212 state = 'c'
244 state = 'c'
213 else:
245 else:
214 state = 'o'
246 state = 'o'
215 f.write("%s %s %s\n" % (hex(node), state,
247 f.write("%s %s %s\n" % (hex(node), state,
216 encoding.fromlocal(label)))
248 encoding.fromlocal(label)))
217 f.close()
249 f.close()
218 repo.ui.log('branchcache',
250 repo.ui.log('branchcache',
219 'wrote %s branch cache with %d labels and %d nodes\n',
251 'wrote %s branch cache with %d labels and %d nodes\n',
220 repo.filtername, len(self), nodecount)
252 repo.filtername, len(self), nodecount)
221 except (IOError, OSError, util.Abort) as inst:
253 except (IOError, OSError, util.Abort) as inst:
222 repo.ui.debug("couldn't write branch cache: %s\n" % inst)
254 repo.ui.debug("couldn't write branch cache: %s\n" % inst)
223 # Abort may be raise by read only opener
255 # Abort may be raise by read only opener
224 pass
256 pass
225
257
226 def update(self, repo, revgen):
258 def update(self, repo, revgen):
227 """Given a branchhead cache, self, that may have extra nodes or be
259 """Given a branchhead cache, self, that may have extra nodes or be
228 missing heads, and a generator of nodes that are strictly a superset of
260 missing heads, and a generator of nodes that are strictly a superset of
229 heads missing, this function updates self to be correct.
261 heads missing, this function updates self to be correct.
230 """
262 """
231 starttime = time.time()
263 starttime = time.time()
232 cl = repo.changelog
264 cl = repo.changelog
233 # collect new branch entries
265 # collect new branch entries
234 newbranches = {}
266 newbranches = {}
235 getbranchinfo = repo.revbranchcache().branchinfo
267 getbranchinfo = repo.revbranchcache().branchinfo
236 for r in revgen:
268 for r in revgen:
237 branch, closesbranch = getbranchinfo(r)
269 branch, closesbranch = getbranchinfo(r)
238 newbranches.setdefault(branch, []).append(r)
270 newbranches.setdefault(branch, []).append(r)
239 if closesbranch:
271 if closesbranch:
240 self._closednodes.add(cl.node(r))
272 self._closednodes.add(cl.node(r))
241
273
242 # fetch current topological heads to speed up filtering
274 # fetch current topological heads to speed up filtering
243 topoheads = set(cl.headrevs())
275 topoheads = set(cl.headrevs())
244
276
245 # if older branchheads are reachable from new ones, they aren't
277 # if older branchheads are reachable from new ones, they aren't
246 # really branchheads. Note checking parents is insufficient:
278 # really branchheads. Note checking parents is insufficient:
247 # 1 (branch a) -> 2 (branch b) -> 3 (branch a)
279 # 1 (branch a) -> 2 (branch b) -> 3 (branch a)
248 for branch, newheadrevs in newbranches.iteritems():
280 for branch, newheadrevs in newbranches.iteritems():
249 bheads = self.setdefault(branch, [])
281 bheads = self.setdefault(branch, [])
250 bheadset = set(cl.rev(node) for node in bheads)
282 bheadset = set(cl.rev(node) for node in bheads)
251
283
252 # This have been tested True on all internal usage of this function.
284 # This have been tested True on all internal usage of this function.
253 # run it again in case of doubt
285 # run it again in case of doubt
254 # assert not (set(bheadrevs) & set(newheadrevs))
286 # assert not (set(bheadrevs) & set(newheadrevs))
255 newheadrevs.sort()
287 newheadrevs.sort()
256 bheadset.update(newheadrevs)
288 bheadset.update(newheadrevs)
257
289
258 # This prunes out two kinds of heads - heads that are superseded by
290 # This prunes out two kinds of heads - heads that are superseded by
259 # a head in newheadrevs, and newheadrevs that are not heads because
291 # a head in newheadrevs, and newheadrevs that are not heads because
260 # an existing head is their descendant.
292 # an existing head is their descendant.
261 uncertain = bheadset - topoheads
293 uncertain = bheadset - topoheads
262 if uncertain:
294 if uncertain:
263 floorrev = min(uncertain)
295 floorrev = min(uncertain)
264 ancestors = set(cl.ancestors(newheadrevs, floorrev))
296 ancestors = set(cl.ancestors(newheadrevs, floorrev))
265 bheadset -= ancestors
297 bheadset -= ancestors
266 bheadrevs = sorted(bheadset)
298 bheadrevs = sorted(bheadset)
267 self[branch] = [cl.node(rev) for rev in bheadrevs]
299 self[branch] = [cl.node(rev) for rev in bheadrevs]
268 tiprev = bheadrevs[-1]
300 tiprev = bheadrevs[-1]
269 if tiprev > self.tiprev:
301 if tiprev > self.tiprev:
270 self.tipnode = cl.node(tiprev)
302 self.tipnode = cl.node(tiprev)
271 self.tiprev = tiprev
303 self.tiprev = tiprev
272
304
273 if not self.validfor(repo):
305 if not self.validfor(repo):
274 # cache key are not valid anymore
306 # cache key are not valid anymore
275 self.tipnode = nullid
307 self.tipnode = nullid
276 self.tiprev = nullrev
308 self.tiprev = nullrev
277 for heads in self.values():
309 for heads in self.values():
278 tiprev = max(cl.rev(node) for node in heads)
310 tiprev = max(cl.rev(node) for node in heads)
279 if tiprev > self.tiprev:
311 if tiprev > self.tiprev:
280 self.tipnode = cl.node(tiprev)
312 self.tipnode = cl.node(tiprev)
281 self.tiprev = tiprev
313 self.tiprev = tiprev
282 self.filteredhash = scmutil.filteredhash(repo, self.tiprev)
314 self.filteredhash = scmutil.filteredhash(repo, self.tiprev)
283
315
284 duration = time.time() - starttime
316 duration = time.time() - starttime
285 repo.ui.log('branchcache', 'updated %s branch cache in %.4f seconds\n',
317 repo.ui.log('branchcache', 'updated %s branch cache in %.4f seconds\n',
286 repo.filtername, duration)
318 repo.filtername, duration)
287
319
288 # Revision branch info cache
320 # Revision branch info cache
289
321
290 _rbcversion = '-v1'
322 _rbcversion = '-v1'
291 _rbcnames = 'cache/rbc-names' + _rbcversion
323 _rbcnames = 'cache/rbc-names' + _rbcversion
292 _rbcrevs = 'cache/rbc-revs' + _rbcversion
324 _rbcrevs = 'cache/rbc-revs' + _rbcversion
293 # [4 byte hash prefix][4 byte branch name number with sign bit indicating open]
325 # [4 byte hash prefix][4 byte branch name number with sign bit indicating open]
294 _rbcrecfmt = '>4sI'
326 _rbcrecfmt = '>4sI'
295 _rbcrecsize = calcsize(_rbcrecfmt)
327 _rbcrecsize = calcsize(_rbcrecfmt)
296 _rbcnodelen = 4
328 _rbcnodelen = 4
297 _rbcbranchidxmask = 0x7fffffff
329 _rbcbranchidxmask = 0x7fffffff
298 _rbccloseflag = 0x80000000
330 _rbccloseflag = 0x80000000
299
331
300 class revbranchcache(object):
332 class revbranchcache(object):
301 """Persistent cache, mapping from revision number to branch name and close.
333 """Persistent cache, mapping from revision number to branch name and close.
302 This is a low level cache, independent of filtering.
334 This is a low level cache, independent of filtering.
303
335
304 Branch names are stored in rbc-names in internal encoding separated by 0.
336 Branch names are stored in rbc-names in internal encoding separated by 0.
305 rbc-names is append-only, and each branch name is only stored once and will
337 rbc-names is append-only, and each branch name is only stored once and will
306 thus have a unique index.
338 thus have a unique index.
307
339
308 The branch info for each revision is stored in rbc-revs as constant size
340 The branch info for each revision is stored in rbc-revs as constant size
309 records. The whole file is read into memory, but it is only 'parsed' on
341 records. The whole file is read into memory, but it is only 'parsed' on
310 demand. The file is usually append-only but will be truncated if repo
342 demand. The file is usually append-only but will be truncated if repo
311 modification is detected.
343 modification is detected.
312 The record for each revision contains the first 4 bytes of the
344 The record for each revision contains the first 4 bytes of the
313 corresponding node hash, and the record is only used if it still matches.
345 corresponding node hash, and the record is only used if it still matches.
314 Even a completely trashed rbc-revs fill thus still give the right result
346 Even a completely trashed rbc-revs fill thus still give the right result
315 while converging towards full recovery ... assuming no incorrectly matching
347 while converging towards full recovery ... assuming no incorrectly matching
316 node hashes.
348 node hashes.
317 The record also contains 4 bytes where 31 bits contains the index of the
349 The record also contains 4 bytes where 31 bits contains the index of the
318 branch and the last bit indicate that it is a branch close commit.
350 branch and the last bit indicate that it is a branch close commit.
319 The usage pattern for rbc-revs is thus somewhat similar to 00changelog.i
351 The usage pattern for rbc-revs is thus somewhat similar to 00changelog.i
320 and will grow with it but be 1/8th of its size.
352 and will grow with it but be 1/8th of its size.
321 """
353 """
322
354
323 def __init__(self, repo, readonly=True):
355 def __init__(self, repo, readonly=True):
324 assert repo.filtername is None
356 assert repo.filtername is None
325 self._repo = repo
357 self._repo = repo
326 self._names = [] # branch names in local encoding with static index
358 self._names = [] # branch names in local encoding with static index
327 self._rbcrevs = array('c') # structs of type _rbcrecfmt
359 self._rbcrevs = array('c') # structs of type _rbcrecfmt
328 self._rbcsnameslen = 0
360 self._rbcsnameslen = 0
329 try:
361 try:
330 bndata = repo.vfs.read(_rbcnames)
362 bndata = repo.vfs.read(_rbcnames)
331 self._rbcsnameslen = len(bndata) # for verification before writing
363 self._rbcsnameslen = len(bndata) # for verification before writing
332 self._names = [encoding.tolocal(bn) for bn in bndata.split('\0')]
364 self._names = [encoding.tolocal(bn) for bn in bndata.split('\0')]
333 except (IOError, OSError) as inst:
365 except (IOError, OSError) as inst:
334 if readonly:
366 if readonly:
335 # don't try to use cache - fall back to the slow path
367 # don't try to use cache - fall back to the slow path
336 self.branchinfo = self._branchinfo
368 self.branchinfo = self._branchinfo
337
369
338 if self._names:
370 if self._names:
339 try:
371 try:
340 data = repo.vfs.read(_rbcrevs)
372 data = repo.vfs.read(_rbcrevs)
341 self._rbcrevs.fromstring(data)
373 self._rbcrevs.fromstring(data)
342 except (IOError, OSError) as inst:
374 except (IOError, OSError) as inst:
343 repo.ui.debug("couldn't read revision branch cache: %s\n" %
375 repo.ui.debug("couldn't read revision branch cache: %s\n" %
344 inst)
376 inst)
345 # remember number of good records on disk
377 # remember number of good records on disk
346 self._rbcrevslen = min(len(self._rbcrevs) // _rbcrecsize,
378 self._rbcrevslen = min(len(self._rbcrevs) // _rbcrecsize,
347 len(repo.changelog))
379 len(repo.changelog))
348 if self._rbcrevslen == 0:
380 if self._rbcrevslen == 0:
349 self._names = []
381 self._names = []
350 self._rbcnamescount = len(self._names) # number of good names on disk
382 self._rbcnamescount = len(self._names) # number of good names on disk
351 self._namesreverse = dict((b, r) for r, b in enumerate(self._names))
383 self._namesreverse = dict((b, r) for r, b in enumerate(self._names))
352
384
353 def branchinfo(self, rev):
385 def branchinfo(self, rev):
354 """Return branch name and close flag for rev, using and updating
386 """Return branch name and close flag for rev, using and updating
355 persistent cache."""
387 persistent cache."""
356 changelog = self._repo.changelog
388 changelog = self._repo.changelog
357 rbcrevidx = rev * _rbcrecsize
389 rbcrevidx = rev * _rbcrecsize
358
390
359 # avoid negative index, changelog.read(nullrev) is fast without cache
391 # avoid negative index, changelog.read(nullrev) is fast without cache
360 if rev == nullrev:
392 if rev == nullrev:
361 return changelog.branchinfo(rev)
393 return changelog.branchinfo(rev)
362
394
363 # if requested rev is missing, add and populate all missing revs
395 # if requested rev is missing, add and populate all missing revs
364 if len(self._rbcrevs) < rbcrevidx + _rbcrecsize:
396 if len(self._rbcrevs) < rbcrevidx + _rbcrecsize:
365 self._rbcrevs.extend('\0' * (len(changelog) * _rbcrecsize -
397 self._rbcrevs.extend('\0' * (len(changelog) * _rbcrecsize -
366 len(self._rbcrevs)))
398 len(self._rbcrevs)))
367
399
368 # fast path: extract data from cache, use it if node is matching
400 # fast path: extract data from cache, use it if node is matching
369 reponode = changelog.node(rev)[:_rbcnodelen]
401 reponode = changelog.node(rev)[:_rbcnodelen]
370 cachenode, branchidx = unpack(
402 cachenode, branchidx = unpack(
371 _rbcrecfmt, buffer(self._rbcrevs, rbcrevidx, _rbcrecsize))
403 _rbcrecfmt, buffer(self._rbcrevs, rbcrevidx, _rbcrecsize))
372 close = bool(branchidx & _rbccloseflag)
404 close = bool(branchidx & _rbccloseflag)
373 if close:
405 if close:
374 branchidx &= _rbcbranchidxmask
406 branchidx &= _rbcbranchidxmask
375 if cachenode == '\0\0\0\0':
407 if cachenode == '\0\0\0\0':
376 pass
408 pass
377 elif cachenode == reponode:
409 elif cachenode == reponode:
378 return self._names[branchidx], close
410 return self._names[branchidx], close
379 else:
411 else:
380 # rev/node map has changed, invalidate the cache from here up
412 # rev/node map has changed, invalidate the cache from here up
381 truncate = rbcrevidx + _rbcrecsize
413 truncate = rbcrevidx + _rbcrecsize
382 del self._rbcrevs[truncate:]
414 del self._rbcrevs[truncate:]
383 self._rbcrevslen = min(self._rbcrevslen, truncate)
415 self._rbcrevslen = min(self._rbcrevslen, truncate)
384
416
385 # fall back to slow path and make sure it will be written to disk
417 # fall back to slow path and make sure it will be written to disk
386 return self._branchinfo(rev)
418 return self._branchinfo(rev)
387
419
388 def _branchinfo(self, rev):
420 def _branchinfo(self, rev):
389 """Retrieve branch info from changelog and update _rbcrevs"""
421 """Retrieve branch info from changelog and update _rbcrevs"""
390 changelog = self._repo.changelog
422 changelog = self._repo.changelog
391 b, close = changelog.branchinfo(rev)
423 b, close = changelog.branchinfo(rev)
392 if b in self._namesreverse:
424 if b in self._namesreverse:
393 branchidx = self._namesreverse[b]
425 branchidx = self._namesreverse[b]
394 else:
426 else:
395 branchidx = len(self._names)
427 branchidx = len(self._names)
396 self._names.append(b)
428 self._names.append(b)
397 self._namesreverse[b] = branchidx
429 self._namesreverse[b] = branchidx
398 reponode = changelog.node(rev)
430 reponode = changelog.node(rev)
399 if close:
431 if close:
400 branchidx |= _rbccloseflag
432 branchidx |= _rbccloseflag
401 self._setcachedata(rev, reponode, branchidx)
433 self._setcachedata(rev, reponode, branchidx)
402 return b, close
434 return b, close
403
435
404 def _setcachedata(self, rev, node, branchidx):
436 def _setcachedata(self, rev, node, branchidx):
405 """Writes the node's branch data to the in-memory cache data."""
437 """Writes the node's branch data to the in-memory cache data."""
406 rbcrevidx = rev * _rbcrecsize
438 rbcrevidx = rev * _rbcrecsize
407 rec = array('c')
439 rec = array('c')
408 rec.fromstring(pack(_rbcrecfmt, node, branchidx))
440 rec.fromstring(pack(_rbcrecfmt, node, branchidx))
409 self._rbcrevs[rbcrevidx:rbcrevidx + _rbcrecsize] = rec
441 self._rbcrevs[rbcrevidx:rbcrevidx + _rbcrecsize] = rec
410 self._rbcrevslen = min(self._rbcrevslen, rev)
442 self._rbcrevslen = min(self._rbcrevslen, rev)
411
443
412 tr = self._repo.currenttransaction()
444 tr = self._repo.currenttransaction()
413 if tr:
445 if tr:
414 tr.addfinalize('write-revbranchcache', self.write)
446 tr.addfinalize('write-revbranchcache', self.write)
415
447
416 def write(self, tr=None):
448 def write(self, tr=None):
417 """Save branch cache if it is dirty."""
449 """Save branch cache if it is dirty."""
418 repo = self._repo
450 repo = self._repo
419 if self._rbcnamescount < len(self._names):
451 if self._rbcnamescount < len(self._names):
420 try:
452 try:
421 if self._rbcnamescount != 0:
453 if self._rbcnamescount != 0:
422 f = repo.vfs.open(_rbcnames, 'ab')
454 f = repo.vfs.open(_rbcnames, 'ab')
423 if f.tell() == self._rbcsnameslen:
455 if f.tell() == self._rbcsnameslen:
424 f.write('\0')
456 f.write('\0')
425 else:
457 else:
426 f.close()
458 f.close()
427 repo.ui.debug("%s changed - rewriting it\n" % _rbcnames)
459 repo.ui.debug("%s changed - rewriting it\n" % _rbcnames)
428 self._rbcnamescount = 0
460 self._rbcnamescount = 0
429 self._rbcrevslen = 0
461 self._rbcrevslen = 0
430 if self._rbcnamescount == 0:
462 if self._rbcnamescount == 0:
431 f = repo.vfs.open(_rbcnames, 'wb')
463 f = repo.vfs.open(_rbcnames, 'wb')
432 f.write('\0'.join(encoding.fromlocal(b)
464 f.write('\0'.join(encoding.fromlocal(b)
433 for b in self._names[self._rbcnamescount:]))
465 for b in self._names[self._rbcnamescount:]))
434 self._rbcsnameslen = f.tell()
466 self._rbcsnameslen = f.tell()
435 f.close()
467 f.close()
436 except (IOError, OSError, util.Abort) as inst:
468 except (IOError, OSError, util.Abort) as inst:
437 repo.ui.debug("couldn't write revision branch cache names: "
469 repo.ui.debug("couldn't write revision branch cache names: "
438 "%s\n" % inst)
470 "%s\n" % inst)
439 return
471 return
440 self._rbcnamescount = len(self._names)
472 self._rbcnamescount = len(self._names)
441
473
442 start = self._rbcrevslen * _rbcrecsize
474 start = self._rbcrevslen * _rbcrecsize
443 if start != len(self._rbcrevs):
475 if start != len(self._rbcrevs):
444 revs = min(len(repo.changelog), len(self._rbcrevs) // _rbcrecsize)
476 revs = min(len(repo.changelog), len(self._rbcrevs) // _rbcrecsize)
445 try:
477 try:
446 f = repo.vfs.open(_rbcrevs, 'ab')
478 f = repo.vfs.open(_rbcrevs, 'ab')
447 if f.tell() != start:
479 if f.tell() != start:
448 repo.ui.debug("truncating %s to %s\n" % (_rbcrevs, start))
480 repo.ui.debug("truncating %s to %s\n" % (_rbcrevs, start))
449 f.seek(start)
481 f.seek(start)
450 f.truncate()
482 f.truncate()
451 end = revs * _rbcrecsize
483 end = revs * _rbcrecsize
452 f.write(self._rbcrevs[start:end])
484 f.write(self._rbcrevs[start:end])
453 f.close()
485 f.close()
454 except (IOError, OSError, util.Abort) as inst:
486 except (IOError, OSError, util.Abort) as inst:
455 repo.ui.debug("couldn't write revision branch cache: %s\n" %
487 repo.ui.debug("couldn't write revision branch cache: %s\n" %
456 inst)
488 inst)
457 return
489 return
458 self._rbcrevslen = revs
490 self._rbcrevslen = revs
@@ -1,283 +1,260 b''
1 # streamclone.py - producing and consuming streaming repository data
1 # streamclone.py - producing and consuming streaming repository data
2 #
2 #
3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import time
10 import time
11
11
12 from .i18n import _
12 from .i18n import _
13 from . import (
13 from . import (
14 branchmap,
14 branchmap,
15 error,
15 error,
16 store,
16 store,
17 util,
17 util,
18 )
18 )
19
19
20 def canperformstreamclone(repo, remote, heads, streamrequested=None):
20 def canperformstreamclone(repo, remote, heads, streamrequested=None):
21 """Whether it is possible to perform a streaming clone as part of pull.
21 """Whether it is possible to perform a streaming clone as part of pull.
22
22
23 Returns a tuple of (supported, requirements). ``supported`` is True if
23 Returns a tuple of (supported, requirements). ``supported`` is True if
24 streaming clone is supported and False otherwise. ``requirements`` is
24 streaming clone is supported and False otherwise. ``requirements`` is
25 a set of repo requirements from the remote, or ``None`` if stream clone
25 a set of repo requirements from the remote, or ``None`` if stream clone
26 isn't supported.
26 isn't supported.
27 """
27 """
28 # Streaming clone only works on empty repositories.
28 # Streaming clone only works on empty repositories.
29 if len(repo):
29 if len(repo):
30 return False, None
30 return False, None
31
31
32 # Streaming clone only works if all data is being requested.
32 # Streaming clone only works if all data is being requested.
33 if heads:
33 if heads:
34 return False, None
34 return False, None
35
35
36 # If we don't have a preference, let the server decide for us. This
36 # If we don't have a preference, let the server decide for us. This
37 # likely only comes into play in LANs.
37 # likely only comes into play in LANs.
38 if streamrequested is None:
38 if streamrequested is None:
39 # The server can advertise whether to prefer streaming clone.
39 # The server can advertise whether to prefer streaming clone.
40 streamrequested = remote.capable('stream-preferred')
40 streamrequested = remote.capable('stream-preferred')
41
41
42 if not streamrequested:
42 if not streamrequested:
43 return False, None
43 return False, None
44
44
45 # In order for stream clone to work, the client has to support all the
45 # In order for stream clone to work, the client has to support all the
46 # requirements advertised by the server.
46 # requirements advertised by the server.
47 #
47 #
48 # The server advertises its requirements via the "stream" and "streamreqs"
48 # The server advertises its requirements via the "stream" and "streamreqs"
49 # capability. "stream" (a value-less capability) is advertised if and only
49 # capability. "stream" (a value-less capability) is advertised if and only
50 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
50 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
51 # is advertised and contains a comma-delimited list of requirements.
51 # is advertised and contains a comma-delimited list of requirements.
52 requirements = set()
52 requirements = set()
53 if remote.capable('stream'):
53 if remote.capable('stream'):
54 requirements.add('revlogv1')
54 requirements.add('revlogv1')
55 else:
55 else:
56 streamreqs = remote.capable('streamreqs')
56 streamreqs = remote.capable('streamreqs')
57 # This is weird and shouldn't happen with modern servers.
57 # This is weird and shouldn't happen with modern servers.
58 if not streamreqs:
58 if not streamreqs:
59 return False, None
59 return False, None
60
60
61 streamreqs = set(streamreqs.split(','))
61 streamreqs = set(streamreqs.split(','))
62 # Server requires something we don't support. Bail.
62 # Server requires something we don't support. Bail.
63 if streamreqs - repo.supportedformats:
63 if streamreqs - repo.supportedformats:
64 return False, None
64 return False, None
65 requirements = streamreqs
65 requirements = streamreqs
66
66
67 return True, requirements
67 return True, requirements
68
68
69 def maybeperformstreamclone(pullop):
69 def maybeperformstreamclone(pullop):
70 repo = pullop.repo
70 repo = pullop.repo
71 remote = pullop.remote
71 remote = pullop.remote
72
72
73 r = canperformstreamclone(repo, remote, pullop.heads,
73 r = canperformstreamclone(repo, remote, pullop.heads,
74 streamrequested=pullop.streamclonerequested)
74 streamrequested=pullop.streamclonerequested)
75 supported, requirements = r
75 supported, requirements = r
76
76
77 if not supported:
77 if not supported:
78 return
78 return
79
79
80 # Save remote branchmap. We will use it later to speed up branchcache
80 # Save remote branchmap. We will use it later to speed up branchcache
81 # creation.
81 # creation.
82 rbranchmap = None
82 rbranchmap = None
83 if remote.capable('branchmap'):
83 if remote.capable('branchmap'):
84 rbranchmap = remote.branchmap()
84 rbranchmap = remote.branchmap()
85
85
86 fp = remote.stream_out()
86 fp = remote.stream_out()
87 l = fp.readline()
87 l = fp.readline()
88 try:
88 try:
89 resp = int(l)
89 resp = int(l)
90 except ValueError:
90 except ValueError:
91 raise error.ResponseError(
91 raise error.ResponseError(
92 _('unexpected response from remote server:'), l)
92 _('unexpected response from remote server:'), l)
93 if resp == 1:
93 if resp == 1:
94 raise util.Abort(_('operation forbidden by server'))
94 raise util.Abort(_('operation forbidden by server'))
95 elif resp == 2:
95 elif resp == 2:
96 raise util.Abort(_('locking the remote repository failed'))
96 raise util.Abort(_('locking the remote repository failed'))
97 elif resp != 0:
97 elif resp != 0:
98 raise util.Abort(_('the server sent an unknown error code'))
98 raise util.Abort(_('the server sent an unknown error code'))
99
99
100 applyremotedata(repo, requirements, rbranchmap, fp)
100 applyremotedata(repo, requirements, rbranchmap, fp)
101
101
102 def allowservergeneration(ui):
102 def allowservergeneration(ui):
103 """Whether streaming clones are allowed from the server."""
103 """Whether streaming clones are allowed from the server."""
104 return ui.configbool('server', 'uncompressed', True, untrusted=True)
104 return ui.configbool('server', 'uncompressed', True, untrusted=True)
105
105
106 # This is it's own function so extensions can override it.
106 # This is it's own function so extensions can override it.
107 def _walkstreamfiles(repo):
107 def _walkstreamfiles(repo):
108 return repo.store.walk()
108 return repo.store.walk()
109
109
110 def generatev1(repo):
110 def generatev1(repo):
111 """Emit content for version 1 of a streaming clone.
111 """Emit content for version 1 of a streaming clone.
112
112
113 This is a generator of raw chunks that constitute a streaming clone.
113 This is a generator of raw chunks that constitute a streaming clone.
114
114
115 The stream begins with a line of 2 space-delimited integers containing the
115 The stream begins with a line of 2 space-delimited integers containing the
116 number of entries and total bytes size.
116 number of entries and total bytes size.
117
117
118 Next, are N entries for each file being transferred. Each file entry starts
118 Next, are N entries for each file being transferred. Each file entry starts
119 as a line with the file name and integer size delimited by a null byte.
119 as a line with the file name and integer size delimited by a null byte.
120 The raw file data follows. Following the raw file data is the next file
120 The raw file data follows. Following the raw file data is the next file
121 entry, or EOF.
121 entry, or EOF.
122
122
123 When used on the wire protocol, an additional line indicating protocol
123 When used on the wire protocol, an additional line indicating protocol
124 success will be prepended to the stream. This function is not responsible
124 success will be prepended to the stream. This function is not responsible
125 for adding it.
125 for adding it.
126
126
127 This function will obtain a repository lock to ensure a consistent view of
127 This function will obtain a repository lock to ensure a consistent view of
128 the store is captured. It therefore may raise LockError.
128 the store is captured. It therefore may raise LockError.
129 """
129 """
130 entries = []
130 entries = []
131 total_bytes = 0
131 total_bytes = 0
132 # Get consistent snapshot of repo, lock during scan.
132 # Get consistent snapshot of repo, lock during scan.
133 lock = repo.lock()
133 lock = repo.lock()
134 try:
134 try:
135 repo.ui.debug('scanning\n')
135 repo.ui.debug('scanning\n')
136 for name, ename, size in _walkstreamfiles(repo):
136 for name, ename, size in _walkstreamfiles(repo):
137 if size:
137 if size:
138 entries.append((name, size))
138 entries.append((name, size))
139 total_bytes += size
139 total_bytes += size
140 finally:
140 finally:
141 lock.release()
141 lock.release()
142
142
143 repo.ui.debug('%d files, %d bytes to transfer\n' %
143 repo.ui.debug('%d files, %d bytes to transfer\n' %
144 (len(entries), total_bytes))
144 (len(entries), total_bytes))
145 yield '%d %d\n' % (len(entries), total_bytes)
145 yield '%d %d\n' % (len(entries), total_bytes)
146
146
147 svfs = repo.svfs
147 svfs = repo.svfs
148 oldaudit = svfs.mustaudit
148 oldaudit = svfs.mustaudit
149 debugflag = repo.ui.debugflag
149 debugflag = repo.ui.debugflag
150 svfs.mustaudit = False
150 svfs.mustaudit = False
151
151
152 try:
152 try:
153 for name, size in entries:
153 for name, size in entries:
154 if debugflag:
154 if debugflag:
155 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
155 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
156 # partially encode name over the wire for backwards compat
156 # partially encode name over the wire for backwards compat
157 yield '%s\0%d\n' % (store.encodedir(name), size)
157 yield '%s\0%d\n' % (store.encodedir(name), size)
158 if size <= 65536:
158 if size <= 65536:
159 fp = svfs(name)
159 fp = svfs(name)
160 try:
160 try:
161 data = fp.read(size)
161 data = fp.read(size)
162 finally:
162 finally:
163 fp.close()
163 fp.close()
164 yield data
164 yield data
165 else:
165 else:
166 for chunk in util.filechunkiter(svfs(name), limit=size):
166 for chunk in util.filechunkiter(svfs(name), limit=size):
167 yield chunk
167 yield chunk
168 finally:
168 finally:
169 svfs.mustaudit = oldaudit
169 svfs.mustaudit = oldaudit
170
170
171 def consumev1(repo, fp):
171 def consumev1(repo, fp):
172 """Apply the contents from version 1 of a streaming clone file handle.
172 """Apply the contents from version 1 of a streaming clone file handle.
173
173
174 This takes the output from "streamout" and applies it to the specified
174 This takes the output from "streamout" and applies it to the specified
175 repository.
175 repository.
176
176
177 Like "streamout," the status line added by the wire protocol is not handled
177 Like "streamout," the status line added by the wire protocol is not handled
178 by this function.
178 by this function.
179 """
179 """
180 lock = repo.lock()
180 lock = repo.lock()
181 try:
181 try:
182 repo.ui.status(_('streaming all changes\n'))
182 repo.ui.status(_('streaming all changes\n'))
183 l = fp.readline()
183 l = fp.readline()
184 try:
184 try:
185 total_files, total_bytes = map(int, l.split(' ', 1))
185 total_files, total_bytes = map(int, l.split(' ', 1))
186 except (ValueError, TypeError):
186 except (ValueError, TypeError):
187 raise error.ResponseError(
187 raise error.ResponseError(
188 _('unexpected response from remote server:'), l)
188 _('unexpected response from remote server:'), l)
189 repo.ui.status(_('%d files to transfer, %s of data\n') %
189 repo.ui.status(_('%d files to transfer, %s of data\n') %
190 (total_files, util.bytecount(total_bytes)))
190 (total_files, util.bytecount(total_bytes)))
191 handled_bytes = 0
191 handled_bytes = 0
192 repo.ui.progress(_('clone'), 0, total=total_bytes)
192 repo.ui.progress(_('clone'), 0, total=total_bytes)
193 start = time.time()
193 start = time.time()
194
194
195 tr = repo.transaction(_('clone'))
195 tr = repo.transaction(_('clone'))
196 try:
196 try:
197 for i in xrange(total_files):
197 for i in xrange(total_files):
198 # XXX doesn't support '\n' or '\r' in filenames
198 # XXX doesn't support '\n' or '\r' in filenames
199 l = fp.readline()
199 l = fp.readline()
200 try:
200 try:
201 name, size = l.split('\0', 1)
201 name, size = l.split('\0', 1)
202 size = int(size)
202 size = int(size)
203 except (ValueError, TypeError):
203 except (ValueError, TypeError):
204 raise error.ResponseError(
204 raise error.ResponseError(
205 _('unexpected response from remote server:'), l)
205 _('unexpected response from remote server:'), l)
206 if repo.ui.debugflag:
206 if repo.ui.debugflag:
207 repo.ui.debug('adding %s (%s)\n' %
207 repo.ui.debug('adding %s (%s)\n' %
208 (name, util.bytecount(size)))
208 (name, util.bytecount(size)))
209 # for backwards compat, name was partially encoded
209 # for backwards compat, name was partially encoded
210 ofp = repo.svfs(store.decodedir(name), 'w')
210 ofp = repo.svfs(store.decodedir(name), 'w')
211 for chunk in util.filechunkiter(fp, limit=size):
211 for chunk in util.filechunkiter(fp, limit=size):
212 handled_bytes += len(chunk)
212 handled_bytes += len(chunk)
213 repo.ui.progress(_('clone'), handled_bytes,
213 repo.ui.progress(_('clone'), handled_bytes,
214 total=total_bytes)
214 total=total_bytes)
215 ofp.write(chunk)
215 ofp.write(chunk)
216 ofp.close()
216 ofp.close()
217 tr.close()
217 tr.close()
218 finally:
218 finally:
219 tr.release()
219 tr.release()
220
220
221 # Writing straight to files circumvented the inmemory caches
221 # Writing straight to files circumvented the inmemory caches
222 repo.invalidate()
222 repo.invalidate()
223
223
224 elapsed = time.time() - start
224 elapsed = time.time() - start
225 if elapsed <= 0:
225 if elapsed <= 0:
226 elapsed = 0.001
226 elapsed = 0.001
227 repo.ui.progress(_('clone'), None)
227 repo.ui.progress(_('clone'), None)
228 repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
228 repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
229 (util.bytecount(total_bytes), elapsed,
229 (util.bytecount(total_bytes), elapsed,
230 util.bytecount(total_bytes / elapsed)))
230 util.bytecount(total_bytes / elapsed)))
231 finally:
231 finally:
232 lock.release()
232 lock.release()
233
233
234 def applyremotedata(repo, remotereqs, remotebranchmap, fp):
234 def applyremotedata(repo, remotereqs, remotebranchmap, fp):
235 """Apply stream clone data to a repository.
235 """Apply stream clone data to a repository.
236
236
237 "remotereqs" is a set of requirements to handle the incoming data.
237 "remotereqs" is a set of requirements to handle the incoming data.
238 "remotebranchmap" is the result of a branchmap lookup on the remote. It
238 "remotebranchmap" is the result of a branchmap lookup on the remote. It
239 can be None.
239 can be None.
240 "fp" is a file object containing the raw stream data, suitable for
240 "fp" is a file object containing the raw stream data, suitable for
241 feeding into consumev1().
241 feeding into consumev1().
242 """
242 """
243 lock = repo.lock()
243 lock = repo.lock()
244 try:
244 try:
245 consumev1(repo, fp)
245 consumev1(repo, fp)
246
246
247 # new requirements = old non-format requirements +
247 # new requirements = old non-format requirements +
248 # new format-related remote requirements
248 # new format-related remote requirements
249 # requirements from the streamed-in repository
249 # requirements from the streamed-in repository
250 repo.requirements = remotereqs | (
250 repo.requirements = remotereqs | (
251 repo.requirements - repo.supportedformats)
251 repo.requirements - repo.supportedformats)
252 repo._applyopenerreqs()
252 repo._applyopenerreqs()
253 repo._writerequirements()
253 repo._writerequirements()
254
254
255 if remotebranchmap:
255 if remotebranchmap:
256 rbheads = []
256 branchmap.replacecache(repo, remotebranchmap)
257 closed = []
258 for bheads in remotebranchmap.itervalues():
259 rbheads.extend(bheads)
260 for h in bheads:
261 r = repo.changelog.rev(h)
262 b, c = repo.changelog.branchinfo(r)
263 if c:
264 closed.append(h)
265
257
266 if rbheads:
267 rtiprev = max((int(repo.changelog.rev(node))
268 for node in rbheads))
269 cache = branchmap.branchcache(remotebranchmap,
270 repo[rtiprev].node(),
271 rtiprev,
272 closednodes=closed)
273 # Try to stick it as low as possible
274 # filter above served are unlikely to be fetch from a clone
275 for candidate in ('base', 'immutable', 'served'):
276 rview = repo.filtered(candidate)
277 if cache.validfor(rview):
278 repo._branchcaches[candidate] = cache
279 cache.write(rview)
280 break
281 repo.invalidate()
258 repo.invalidate()
282 finally:
259 finally:
283 lock.release()
260 lock.release()
General Comments 0
You need to be logged in to leave comments. Login now