##// END OF EJS Templates
branchmap: move branch cache code out of streamclone.py...
Gregory Szorc -
r26460:79ef8675 default
parent child Browse files
Show More
@@ -1,458 +1,490 b''
1 1 # branchmap.py - logic to computes, maintain and stores branchmap for local repo
2 2 #
3 3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import array
11 11 import struct
12 12 import time
13 13
14 14 from .node import (
15 15 bin,
16 16 hex,
17 17 nullid,
18 18 nullrev,
19 19 )
20 20 from . import (
21 21 encoding,
22 22 scmutil,
23 23 util,
24 24 )
25 25
26 26 array = array.array
27 27 calcsize = struct.calcsize
28 28 pack = struct.pack
29 29 unpack = struct.unpack
30 30
31 31 def _filename(repo):
32 32 """name of a branchcache file for a given repo or repoview"""
33 33 filename = "cache/branch2"
34 34 if repo.filtername:
35 35 filename = '%s-%s' % (filename, repo.filtername)
36 36 return filename
37 37
38 38 def read(repo):
39 39 try:
40 40 f = repo.vfs(_filename(repo))
41 41 lines = f.read().split('\n')
42 42 f.close()
43 43 except (IOError, OSError):
44 44 return None
45 45
46 46 try:
47 47 cachekey = lines.pop(0).split(" ", 2)
48 48 last, lrev = cachekey[:2]
49 49 last, lrev = bin(last), int(lrev)
50 50 filteredhash = None
51 51 if len(cachekey) > 2:
52 52 filteredhash = bin(cachekey[2])
53 53 partial = branchcache(tipnode=last, tiprev=lrev,
54 54 filteredhash=filteredhash)
55 55 if not partial.validfor(repo):
56 56 # invalidate the cache
57 57 raise ValueError('tip differs')
58 58 for l in lines:
59 59 if not l:
60 60 continue
61 61 node, state, label = l.split(" ", 2)
62 62 if state not in 'oc':
63 63 raise ValueError('invalid branch state')
64 64 label = encoding.tolocal(label.strip())
65 65 if not node in repo:
66 66 raise ValueError('node %s does not exist' % node)
67 67 node = bin(node)
68 68 partial.setdefault(label, []).append(node)
69 69 if state == 'c':
70 70 partial._closednodes.add(node)
71 71 except KeyboardInterrupt:
72 72 raise
73 73 except Exception as inst:
74 74 if repo.ui.debugflag:
75 75 msg = 'invalid branchheads cache'
76 76 if repo.filtername is not None:
77 77 msg += ' (%s)' % repo.filtername
78 78 msg += ': %s\n'
79 79 repo.ui.debug(msg % inst)
80 80 partial = None
81 81 return partial
82 82
83 83 ### Nearest subset relation
84 84 # Nearest subset of filter X is a filter Y so that:
85 85 # * Y is included in X,
86 86 # * X - Y is as small as possible.
87 87 # This create and ordering used for branchmap purpose.
88 88 # the ordering may be partial
89 89 subsettable = {None: 'visible',
90 90 'visible': 'served',
91 91 'served': 'immutable',
92 92 'immutable': 'base'}
93 93
94 94 def updatecache(repo):
95 95 cl = repo.changelog
96 96 filtername = repo.filtername
97 97 partial = repo._branchcaches.get(filtername)
98 98
99 99 revs = []
100 100 if partial is None or not partial.validfor(repo):
101 101 partial = read(repo)
102 102 if partial is None:
103 103 subsetname = subsettable.get(filtername)
104 104 if subsetname is None:
105 105 partial = branchcache()
106 106 else:
107 107 subset = repo.filtered(subsetname)
108 108 partial = subset.branchmap().copy()
109 109 extrarevs = subset.changelog.filteredrevs - cl.filteredrevs
110 110 revs.extend(r for r in extrarevs if r <= partial.tiprev)
111 111 revs.extend(cl.revs(start=partial.tiprev + 1))
112 112 if revs:
113 113 partial.update(repo, revs)
114 114 partial.write(repo)
115 115
116 116 assert partial.validfor(repo), filtername
117 117 repo._branchcaches[repo.filtername] = partial
118 118
119 def replacecache(repo, bm):
120 """Replace the branchmap cache for a repo with a branch mapping.
121
122 This is likely only called during clone with a branch map from a remote.
123 """
124 rbheads = []
125 closed = []
126 for bheads in bm.itervalues():
127 rbheads.extend(bheads)
128 for h in bheads:
129 r = repo.changelog.rev(h)
130 b, c = repo.changelog.branchinfo(r)
131 if c:
132 closed.append(h)
133
134 if rbheads:
135 rtiprev = max((int(repo.changelog.rev(node))
136 for node in rbheads))
137 cache = branchcache(bm,
138 repo[rtiprev].node(),
139 rtiprev,
140 closednodes=closed)
141
142 # Try to stick it as low as possible
143 # filter above served are unlikely to be fetch from a clone
144 for candidate in ('base', 'immutable', 'served'):
145 rview = repo.filtered(candidate)
146 if cache.validfor(rview):
147 repo._branchcaches[candidate] = cache
148 cache.write(rview)
149 break
150
119 151 class branchcache(dict):
120 152 """A dict like object that hold branches heads cache.
121 153
122 154 This cache is used to avoid costly computations to determine all the
123 155 branch heads of a repo.
124 156
125 157 The cache is serialized on disk in the following format:
126 158
127 159 <tip hex node> <tip rev number> [optional filtered repo hex hash]
128 160 <branch head hex node> <open/closed state> <branch name>
129 161 <branch head hex node> <open/closed state> <branch name>
130 162 ...
131 163
132 164 The first line is used to check if the cache is still valid. If the
133 165 branch cache is for a filtered repo view, an optional third hash is
134 166 included that hashes the hashes of all filtered revisions.
135 167
136 168 The open/closed state is represented by a single letter 'o' or 'c'.
137 169 This field can be used to avoid changelog reads when determining if a
138 170 branch head closes a branch or not.
139 171 """
140 172
141 173 def __init__(self, entries=(), tipnode=nullid, tiprev=nullrev,
142 174 filteredhash=None, closednodes=None):
143 175 super(branchcache, self).__init__(entries)
144 176 self.tipnode = tipnode
145 177 self.tiprev = tiprev
146 178 self.filteredhash = filteredhash
147 179 # closednodes is a set of nodes that close their branch. If the branch
148 180 # cache has been updated, it may contain nodes that are no longer
149 181 # heads.
150 182 if closednodes is None:
151 183 self._closednodes = set()
152 184 else:
153 185 self._closednodes = closednodes
154 186
155 187 def validfor(self, repo):
156 188 """Is the cache content valid regarding a repo
157 189
158 190 - False when cached tipnode is unknown or if we detect a strip.
159 191 - True when cache is up to date or a subset of current repo."""
160 192 try:
161 193 return ((self.tipnode == repo.changelog.node(self.tiprev))
162 194 and (self.filteredhash == \
163 195 scmutil.filteredhash(repo, self.tiprev)))
164 196 except IndexError:
165 197 return False
166 198
167 199 def _branchtip(self, heads):
168 200 '''Return tuple with last open head in heads and false,
169 201 otherwise return last closed head and true.'''
170 202 tip = heads[-1]
171 203 closed = True
172 204 for h in reversed(heads):
173 205 if h not in self._closednodes:
174 206 tip = h
175 207 closed = False
176 208 break
177 209 return tip, closed
178 210
179 211 def branchtip(self, branch):
180 212 '''Return the tipmost open head on branch head, otherwise return the
181 213 tipmost closed head on branch.
182 214 Raise KeyError for unknown branch.'''
183 215 return self._branchtip(self[branch])[0]
184 216
185 217 def branchheads(self, branch, closed=False):
186 218 heads = self[branch]
187 219 if not closed:
188 220 heads = [h for h in heads if h not in self._closednodes]
189 221 return heads
190 222
191 223 def iterbranches(self):
192 224 for bn, heads in self.iteritems():
193 225 yield (bn, heads) + self._branchtip(heads)
194 226
195 227 def copy(self):
196 228 """return an deep copy of the branchcache object"""
197 229 return branchcache(self, self.tipnode, self.tiprev, self.filteredhash,
198 230 self._closednodes)
199 231
200 232 def write(self, repo):
201 233 try:
202 234 f = repo.vfs(_filename(repo), "w", atomictemp=True)
203 235 cachekey = [hex(self.tipnode), str(self.tiprev)]
204 236 if self.filteredhash is not None:
205 237 cachekey.append(hex(self.filteredhash))
206 238 f.write(" ".join(cachekey) + '\n')
207 239 nodecount = 0
208 240 for label, nodes in sorted(self.iteritems()):
209 241 for node in nodes:
210 242 nodecount += 1
211 243 if node in self._closednodes:
212 244 state = 'c'
213 245 else:
214 246 state = 'o'
215 247 f.write("%s %s %s\n" % (hex(node), state,
216 248 encoding.fromlocal(label)))
217 249 f.close()
218 250 repo.ui.log('branchcache',
219 251 'wrote %s branch cache with %d labels and %d nodes\n',
220 252 repo.filtername, len(self), nodecount)
221 253 except (IOError, OSError, util.Abort) as inst:
222 254 repo.ui.debug("couldn't write branch cache: %s\n" % inst)
223 255 # Abort may be raise by read only opener
224 256 pass
225 257
226 258 def update(self, repo, revgen):
227 259 """Given a branchhead cache, self, that may have extra nodes or be
228 260 missing heads, and a generator of nodes that are strictly a superset of
229 261 heads missing, this function updates self to be correct.
230 262 """
231 263 starttime = time.time()
232 264 cl = repo.changelog
233 265 # collect new branch entries
234 266 newbranches = {}
235 267 getbranchinfo = repo.revbranchcache().branchinfo
236 268 for r in revgen:
237 269 branch, closesbranch = getbranchinfo(r)
238 270 newbranches.setdefault(branch, []).append(r)
239 271 if closesbranch:
240 272 self._closednodes.add(cl.node(r))
241 273
242 274 # fetch current topological heads to speed up filtering
243 275 topoheads = set(cl.headrevs())
244 276
245 277 # if older branchheads are reachable from new ones, they aren't
246 278 # really branchheads. Note checking parents is insufficient:
247 279 # 1 (branch a) -> 2 (branch b) -> 3 (branch a)
248 280 for branch, newheadrevs in newbranches.iteritems():
249 281 bheads = self.setdefault(branch, [])
250 282 bheadset = set(cl.rev(node) for node in bheads)
251 283
252 284 # This have been tested True on all internal usage of this function.
253 285 # run it again in case of doubt
254 286 # assert not (set(bheadrevs) & set(newheadrevs))
255 287 newheadrevs.sort()
256 288 bheadset.update(newheadrevs)
257 289
258 290 # This prunes out two kinds of heads - heads that are superseded by
259 291 # a head in newheadrevs, and newheadrevs that are not heads because
260 292 # an existing head is their descendant.
261 293 uncertain = bheadset - topoheads
262 294 if uncertain:
263 295 floorrev = min(uncertain)
264 296 ancestors = set(cl.ancestors(newheadrevs, floorrev))
265 297 bheadset -= ancestors
266 298 bheadrevs = sorted(bheadset)
267 299 self[branch] = [cl.node(rev) for rev in bheadrevs]
268 300 tiprev = bheadrevs[-1]
269 301 if tiprev > self.tiprev:
270 302 self.tipnode = cl.node(tiprev)
271 303 self.tiprev = tiprev
272 304
273 305 if not self.validfor(repo):
274 306 # cache key are not valid anymore
275 307 self.tipnode = nullid
276 308 self.tiprev = nullrev
277 309 for heads in self.values():
278 310 tiprev = max(cl.rev(node) for node in heads)
279 311 if tiprev > self.tiprev:
280 312 self.tipnode = cl.node(tiprev)
281 313 self.tiprev = tiprev
282 314 self.filteredhash = scmutil.filteredhash(repo, self.tiprev)
283 315
284 316 duration = time.time() - starttime
285 317 repo.ui.log('branchcache', 'updated %s branch cache in %.4f seconds\n',
286 318 repo.filtername, duration)
287 319
288 320 # Revision branch info cache
289 321
290 322 _rbcversion = '-v1'
291 323 _rbcnames = 'cache/rbc-names' + _rbcversion
292 324 _rbcrevs = 'cache/rbc-revs' + _rbcversion
293 325 # [4 byte hash prefix][4 byte branch name number with sign bit indicating open]
294 326 _rbcrecfmt = '>4sI'
295 327 _rbcrecsize = calcsize(_rbcrecfmt)
296 328 _rbcnodelen = 4
297 329 _rbcbranchidxmask = 0x7fffffff
298 330 _rbccloseflag = 0x80000000
299 331
300 332 class revbranchcache(object):
301 333 """Persistent cache, mapping from revision number to branch name and close.
302 334 This is a low level cache, independent of filtering.
303 335
304 336 Branch names are stored in rbc-names in internal encoding separated by 0.
305 337 rbc-names is append-only, and each branch name is only stored once and will
306 338 thus have a unique index.
307 339
308 340 The branch info for each revision is stored in rbc-revs as constant size
309 341 records. The whole file is read into memory, but it is only 'parsed' on
310 342 demand. The file is usually append-only but will be truncated if repo
311 343 modification is detected.
312 344 The record for each revision contains the first 4 bytes of the
313 345 corresponding node hash, and the record is only used if it still matches.
314 346 Even a completely trashed rbc-revs fill thus still give the right result
315 347 while converging towards full recovery ... assuming no incorrectly matching
316 348 node hashes.
317 349 The record also contains 4 bytes where 31 bits contains the index of the
318 350 branch and the last bit indicate that it is a branch close commit.
319 351 The usage pattern for rbc-revs is thus somewhat similar to 00changelog.i
320 352 and will grow with it but be 1/8th of its size.
321 353 """
322 354
323 355 def __init__(self, repo, readonly=True):
324 356 assert repo.filtername is None
325 357 self._repo = repo
326 358 self._names = [] # branch names in local encoding with static index
327 359 self._rbcrevs = array('c') # structs of type _rbcrecfmt
328 360 self._rbcsnameslen = 0
329 361 try:
330 362 bndata = repo.vfs.read(_rbcnames)
331 363 self._rbcsnameslen = len(bndata) # for verification before writing
332 364 self._names = [encoding.tolocal(bn) for bn in bndata.split('\0')]
333 365 except (IOError, OSError) as inst:
334 366 if readonly:
335 367 # don't try to use cache - fall back to the slow path
336 368 self.branchinfo = self._branchinfo
337 369
338 370 if self._names:
339 371 try:
340 372 data = repo.vfs.read(_rbcrevs)
341 373 self._rbcrevs.fromstring(data)
342 374 except (IOError, OSError) as inst:
343 375 repo.ui.debug("couldn't read revision branch cache: %s\n" %
344 376 inst)
345 377 # remember number of good records on disk
346 378 self._rbcrevslen = min(len(self._rbcrevs) // _rbcrecsize,
347 379 len(repo.changelog))
348 380 if self._rbcrevslen == 0:
349 381 self._names = []
350 382 self._rbcnamescount = len(self._names) # number of good names on disk
351 383 self._namesreverse = dict((b, r) for r, b in enumerate(self._names))
352 384
353 385 def branchinfo(self, rev):
354 386 """Return branch name and close flag for rev, using and updating
355 387 persistent cache."""
356 388 changelog = self._repo.changelog
357 389 rbcrevidx = rev * _rbcrecsize
358 390
359 391 # avoid negative index, changelog.read(nullrev) is fast without cache
360 392 if rev == nullrev:
361 393 return changelog.branchinfo(rev)
362 394
363 395 # if requested rev is missing, add and populate all missing revs
364 396 if len(self._rbcrevs) < rbcrevidx + _rbcrecsize:
365 397 self._rbcrevs.extend('\0' * (len(changelog) * _rbcrecsize -
366 398 len(self._rbcrevs)))
367 399
368 400 # fast path: extract data from cache, use it if node is matching
369 401 reponode = changelog.node(rev)[:_rbcnodelen]
370 402 cachenode, branchidx = unpack(
371 403 _rbcrecfmt, buffer(self._rbcrevs, rbcrevidx, _rbcrecsize))
372 404 close = bool(branchidx & _rbccloseflag)
373 405 if close:
374 406 branchidx &= _rbcbranchidxmask
375 407 if cachenode == '\0\0\0\0':
376 408 pass
377 409 elif cachenode == reponode:
378 410 return self._names[branchidx], close
379 411 else:
380 412 # rev/node map has changed, invalidate the cache from here up
381 413 truncate = rbcrevidx + _rbcrecsize
382 414 del self._rbcrevs[truncate:]
383 415 self._rbcrevslen = min(self._rbcrevslen, truncate)
384 416
385 417 # fall back to slow path and make sure it will be written to disk
386 418 return self._branchinfo(rev)
387 419
388 420 def _branchinfo(self, rev):
389 421 """Retrieve branch info from changelog and update _rbcrevs"""
390 422 changelog = self._repo.changelog
391 423 b, close = changelog.branchinfo(rev)
392 424 if b in self._namesreverse:
393 425 branchidx = self._namesreverse[b]
394 426 else:
395 427 branchidx = len(self._names)
396 428 self._names.append(b)
397 429 self._namesreverse[b] = branchidx
398 430 reponode = changelog.node(rev)
399 431 if close:
400 432 branchidx |= _rbccloseflag
401 433 self._setcachedata(rev, reponode, branchidx)
402 434 return b, close
403 435
404 436 def _setcachedata(self, rev, node, branchidx):
405 437 """Writes the node's branch data to the in-memory cache data."""
406 438 rbcrevidx = rev * _rbcrecsize
407 439 rec = array('c')
408 440 rec.fromstring(pack(_rbcrecfmt, node, branchidx))
409 441 self._rbcrevs[rbcrevidx:rbcrevidx + _rbcrecsize] = rec
410 442 self._rbcrevslen = min(self._rbcrevslen, rev)
411 443
412 444 tr = self._repo.currenttransaction()
413 445 if tr:
414 446 tr.addfinalize('write-revbranchcache', self.write)
415 447
416 448 def write(self, tr=None):
417 449 """Save branch cache if it is dirty."""
418 450 repo = self._repo
419 451 if self._rbcnamescount < len(self._names):
420 452 try:
421 453 if self._rbcnamescount != 0:
422 454 f = repo.vfs.open(_rbcnames, 'ab')
423 455 if f.tell() == self._rbcsnameslen:
424 456 f.write('\0')
425 457 else:
426 458 f.close()
427 459 repo.ui.debug("%s changed - rewriting it\n" % _rbcnames)
428 460 self._rbcnamescount = 0
429 461 self._rbcrevslen = 0
430 462 if self._rbcnamescount == 0:
431 463 f = repo.vfs.open(_rbcnames, 'wb')
432 464 f.write('\0'.join(encoding.fromlocal(b)
433 465 for b in self._names[self._rbcnamescount:]))
434 466 self._rbcsnameslen = f.tell()
435 467 f.close()
436 468 except (IOError, OSError, util.Abort) as inst:
437 469 repo.ui.debug("couldn't write revision branch cache names: "
438 470 "%s\n" % inst)
439 471 return
440 472 self._rbcnamescount = len(self._names)
441 473
442 474 start = self._rbcrevslen * _rbcrecsize
443 475 if start != len(self._rbcrevs):
444 476 revs = min(len(repo.changelog), len(self._rbcrevs) // _rbcrecsize)
445 477 try:
446 478 f = repo.vfs.open(_rbcrevs, 'ab')
447 479 if f.tell() != start:
448 480 repo.ui.debug("truncating %s to %s\n" % (_rbcrevs, start))
449 481 f.seek(start)
450 482 f.truncate()
451 483 end = revs * _rbcrecsize
452 484 f.write(self._rbcrevs[start:end])
453 485 f.close()
454 486 except (IOError, OSError, util.Abort) as inst:
455 487 repo.ui.debug("couldn't write revision branch cache: %s\n" %
456 488 inst)
457 489 return
458 490 self._rbcrevslen = revs
@@ -1,283 +1,260 b''
1 1 # streamclone.py - producing and consuming streaming repository data
2 2 #
3 3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import time
11 11
12 12 from .i18n import _
13 13 from . import (
14 14 branchmap,
15 15 error,
16 16 store,
17 17 util,
18 18 )
19 19
20 20 def canperformstreamclone(repo, remote, heads, streamrequested=None):
21 21 """Whether it is possible to perform a streaming clone as part of pull.
22 22
23 23 Returns a tuple of (supported, requirements). ``supported`` is True if
24 24 streaming clone is supported and False otherwise. ``requirements`` is
25 25 a set of repo requirements from the remote, or ``None`` if stream clone
26 26 isn't supported.
27 27 """
28 28 # Streaming clone only works on empty repositories.
29 29 if len(repo):
30 30 return False, None
31 31
32 32 # Streaming clone only works if all data is being requested.
33 33 if heads:
34 34 return False, None
35 35
36 36 # If we don't have a preference, let the server decide for us. This
37 37 # likely only comes into play in LANs.
38 38 if streamrequested is None:
39 39 # The server can advertise whether to prefer streaming clone.
40 40 streamrequested = remote.capable('stream-preferred')
41 41
42 42 if not streamrequested:
43 43 return False, None
44 44
45 45 # In order for stream clone to work, the client has to support all the
46 46 # requirements advertised by the server.
47 47 #
48 48 # The server advertises its requirements via the "stream" and "streamreqs"
49 49 # capability. "stream" (a value-less capability) is advertised if and only
50 50 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
51 51 # is advertised and contains a comma-delimited list of requirements.
52 52 requirements = set()
53 53 if remote.capable('stream'):
54 54 requirements.add('revlogv1')
55 55 else:
56 56 streamreqs = remote.capable('streamreqs')
57 57 # This is weird and shouldn't happen with modern servers.
58 58 if not streamreqs:
59 59 return False, None
60 60
61 61 streamreqs = set(streamreqs.split(','))
62 62 # Server requires something we don't support. Bail.
63 63 if streamreqs - repo.supportedformats:
64 64 return False, None
65 65 requirements = streamreqs
66 66
67 67 return True, requirements
68 68
69 69 def maybeperformstreamclone(pullop):
70 70 repo = pullop.repo
71 71 remote = pullop.remote
72 72
73 73 r = canperformstreamclone(repo, remote, pullop.heads,
74 74 streamrequested=pullop.streamclonerequested)
75 75 supported, requirements = r
76 76
77 77 if not supported:
78 78 return
79 79
80 80 # Save remote branchmap. We will use it later to speed up branchcache
81 81 # creation.
82 82 rbranchmap = None
83 83 if remote.capable('branchmap'):
84 84 rbranchmap = remote.branchmap()
85 85
86 86 fp = remote.stream_out()
87 87 l = fp.readline()
88 88 try:
89 89 resp = int(l)
90 90 except ValueError:
91 91 raise error.ResponseError(
92 92 _('unexpected response from remote server:'), l)
93 93 if resp == 1:
94 94 raise util.Abort(_('operation forbidden by server'))
95 95 elif resp == 2:
96 96 raise util.Abort(_('locking the remote repository failed'))
97 97 elif resp != 0:
98 98 raise util.Abort(_('the server sent an unknown error code'))
99 99
100 100 applyremotedata(repo, requirements, rbranchmap, fp)
101 101
102 102 def allowservergeneration(ui):
103 103 """Whether streaming clones are allowed from the server."""
104 104 return ui.configbool('server', 'uncompressed', True, untrusted=True)
105 105
106 106 # This is it's own function so extensions can override it.
107 107 def _walkstreamfiles(repo):
108 108 return repo.store.walk()
109 109
110 110 def generatev1(repo):
111 111 """Emit content for version 1 of a streaming clone.
112 112
113 113 This is a generator of raw chunks that constitute a streaming clone.
114 114
115 115 The stream begins with a line of 2 space-delimited integers containing the
116 116 number of entries and total bytes size.
117 117
118 118 Next, are N entries for each file being transferred. Each file entry starts
119 119 as a line with the file name and integer size delimited by a null byte.
120 120 The raw file data follows. Following the raw file data is the next file
121 121 entry, or EOF.
122 122
123 123 When used on the wire protocol, an additional line indicating protocol
124 124 success will be prepended to the stream. This function is not responsible
125 125 for adding it.
126 126
127 127 This function will obtain a repository lock to ensure a consistent view of
128 128 the store is captured. It therefore may raise LockError.
129 129 """
130 130 entries = []
131 131 total_bytes = 0
132 132 # Get consistent snapshot of repo, lock during scan.
133 133 lock = repo.lock()
134 134 try:
135 135 repo.ui.debug('scanning\n')
136 136 for name, ename, size in _walkstreamfiles(repo):
137 137 if size:
138 138 entries.append((name, size))
139 139 total_bytes += size
140 140 finally:
141 141 lock.release()
142 142
143 143 repo.ui.debug('%d files, %d bytes to transfer\n' %
144 144 (len(entries), total_bytes))
145 145 yield '%d %d\n' % (len(entries), total_bytes)
146 146
147 147 svfs = repo.svfs
148 148 oldaudit = svfs.mustaudit
149 149 debugflag = repo.ui.debugflag
150 150 svfs.mustaudit = False
151 151
152 152 try:
153 153 for name, size in entries:
154 154 if debugflag:
155 155 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
156 156 # partially encode name over the wire for backwards compat
157 157 yield '%s\0%d\n' % (store.encodedir(name), size)
158 158 if size <= 65536:
159 159 fp = svfs(name)
160 160 try:
161 161 data = fp.read(size)
162 162 finally:
163 163 fp.close()
164 164 yield data
165 165 else:
166 166 for chunk in util.filechunkiter(svfs(name), limit=size):
167 167 yield chunk
168 168 finally:
169 169 svfs.mustaudit = oldaudit
170 170
171 171 def consumev1(repo, fp):
172 172 """Apply the contents from version 1 of a streaming clone file handle.
173 173
174 174 This takes the output from "streamout" and applies it to the specified
175 175 repository.
176 176
177 177 Like "streamout," the status line added by the wire protocol is not handled
178 178 by this function.
179 179 """
180 180 lock = repo.lock()
181 181 try:
182 182 repo.ui.status(_('streaming all changes\n'))
183 183 l = fp.readline()
184 184 try:
185 185 total_files, total_bytes = map(int, l.split(' ', 1))
186 186 except (ValueError, TypeError):
187 187 raise error.ResponseError(
188 188 _('unexpected response from remote server:'), l)
189 189 repo.ui.status(_('%d files to transfer, %s of data\n') %
190 190 (total_files, util.bytecount(total_bytes)))
191 191 handled_bytes = 0
192 192 repo.ui.progress(_('clone'), 0, total=total_bytes)
193 193 start = time.time()
194 194
195 195 tr = repo.transaction(_('clone'))
196 196 try:
197 197 for i in xrange(total_files):
198 198 # XXX doesn't support '\n' or '\r' in filenames
199 199 l = fp.readline()
200 200 try:
201 201 name, size = l.split('\0', 1)
202 202 size = int(size)
203 203 except (ValueError, TypeError):
204 204 raise error.ResponseError(
205 205 _('unexpected response from remote server:'), l)
206 206 if repo.ui.debugflag:
207 207 repo.ui.debug('adding %s (%s)\n' %
208 208 (name, util.bytecount(size)))
209 209 # for backwards compat, name was partially encoded
210 210 ofp = repo.svfs(store.decodedir(name), 'w')
211 211 for chunk in util.filechunkiter(fp, limit=size):
212 212 handled_bytes += len(chunk)
213 213 repo.ui.progress(_('clone'), handled_bytes,
214 214 total=total_bytes)
215 215 ofp.write(chunk)
216 216 ofp.close()
217 217 tr.close()
218 218 finally:
219 219 tr.release()
220 220
221 221 # Writing straight to files circumvented the inmemory caches
222 222 repo.invalidate()
223 223
224 224 elapsed = time.time() - start
225 225 if elapsed <= 0:
226 226 elapsed = 0.001
227 227 repo.ui.progress(_('clone'), None)
228 228 repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
229 229 (util.bytecount(total_bytes), elapsed,
230 230 util.bytecount(total_bytes / elapsed)))
231 231 finally:
232 232 lock.release()
233 233
234 234 def applyremotedata(repo, remotereqs, remotebranchmap, fp):
235 235 """Apply stream clone data to a repository.
236 236
237 237 "remotereqs" is a set of requirements to handle the incoming data.
238 238 "remotebranchmap" is the result of a branchmap lookup on the remote. It
239 239 can be None.
240 240 "fp" is a file object containing the raw stream data, suitable for
241 241 feeding into consumev1().
242 242 """
243 243 lock = repo.lock()
244 244 try:
245 245 consumev1(repo, fp)
246 246
247 247 # new requirements = old non-format requirements +
248 248 # new format-related remote requirements
249 249 # requirements from the streamed-in repository
250 250 repo.requirements = remotereqs | (
251 251 repo.requirements - repo.supportedformats)
252 252 repo._applyopenerreqs()
253 253 repo._writerequirements()
254 254
255 255 if remotebranchmap:
256 rbheads = []
257 closed = []
258 for bheads in remotebranchmap.itervalues():
259 rbheads.extend(bheads)
260 for h in bheads:
261 r = repo.changelog.rev(h)
262 b, c = repo.changelog.branchinfo(r)
263 if c:
264 closed.append(h)
256 branchmap.replacecache(repo, remotebranchmap)
265 257
266 if rbheads:
267 rtiprev = max((int(repo.changelog.rev(node))
268 for node in rbheads))
269 cache = branchmap.branchcache(remotebranchmap,
270 repo[rtiprev].node(),
271 rtiprev,
272 closednodes=closed)
273 # Try to stick it as low as possible
274 # filter above served are unlikely to be fetch from a clone
275 for candidate in ('base', 'immutable', 'served'):
276 rview = repo.filtered(candidate)
277 if cache.validfor(rview):
278 repo._branchcaches[candidate] = cache
279 cache.write(rview)
280 break
281 258 repo.invalidate()
282 259 finally:
283 260 lock.release()
General Comments 0
You need to be logged in to leave comments. Login now