##// END OF EJS Templates
streamclone: add explicit check for empty local repo...
Gregory Szorc -
r26447:591088f7 default
parent child Browse files
Show More
@@ -1,278 +1,282 b''
1 # streamclone.py - producing and consuming streaming repository data
1 # streamclone.py - producing and consuming streaming repository data
2 #
2 #
3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import time
10 import time
11
11
12 from .i18n import _
12 from .i18n import _
13 from . import (
13 from . import (
14 branchmap,
14 branchmap,
15 error,
15 error,
16 store,
16 store,
17 util,
17 util,
18 )
18 )
19
19
20 def canperformstreamclone(repo, remote, heads, streamrequested=None):
20 def canperformstreamclone(repo, remote, heads, streamrequested=None):
21 """Whether it is possible to perform a streaming clone as part of pull.
21 """Whether it is possible to perform a streaming clone as part of pull.
22
22
23 Returns a tuple of (supported, requirements). ``supported`` is True if
23 Returns a tuple of (supported, requirements). ``supported`` is True if
24 streaming clone is supported and False otherwise. ``requirements`` is
24 streaming clone is supported and False otherwise. ``requirements`` is
25 a set of repo requirements from the remote, or ``None`` if stream clone
25 a set of repo requirements from the remote, or ``None`` if stream clone
26 isn't supported.
26 isn't supported.
27 """
27 """
28 # Streaming clone only works on empty repositories.
29 if len(repo):
30 return False, None
31
28 # Streaming clone only works if all data is being requested.
32 # Streaming clone only works if all data is being requested.
29 if heads:
33 if heads:
30 return False, None
34 return False, None
31
35
32 # If we don't have a preference, let the server decide for us. This
36 # If we don't have a preference, let the server decide for us. This
33 # likely only comes into play in LANs.
37 # likely only comes into play in LANs.
34 if streamrequested is None:
38 if streamrequested is None:
35 # The server can advertise whether to prefer streaming clone.
39 # The server can advertise whether to prefer streaming clone.
36 streamrequested = remote.capable('stream-preferred')
40 streamrequested = remote.capable('stream-preferred')
37
41
38 if not streamrequested:
42 if not streamrequested:
39 return False, None
43 return False, None
40
44
41 # In order for stream clone to work, the client has to support all the
45 # In order for stream clone to work, the client has to support all the
42 # requirements advertised by the server.
46 # requirements advertised by the server.
43 #
47 #
44 # The server advertises its requirements via the "stream" and "streamreqs"
48 # The server advertises its requirements via the "stream" and "streamreqs"
45 # capability. "stream" (a value-less capability) is advertised if and only
49 # capability. "stream" (a value-less capability) is advertised if and only
46 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
50 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
47 # is advertised and contains a comma-delimited list of requirements.
51 # is advertised and contains a comma-delimited list of requirements.
48 requirements = set()
52 requirements = set()
49 if remote.capable('stream'):
53 if remote.capable('stream'):
50 requirements.add('revlogv1')
54 requirements.add('revlogv1')
51 else:
55 else:
52 streamreqs = remote.capable('streamreqs')
56 streamreqs = remote.capable('streamreqs')
53 # This is weird and shouldn't happen with modern servers.
57 # This is weird and shouldn't happen with modern servers.
54 if not streamreqs:
58 if not streamreqs:
55 return False, None
59 return False, None
56
60
57 streamreqs = set(streamreqs.split(','))
61 streamreqs = set(streamreqs.split(','))
58 # Server requires something we don't support. Bail.
62 # Server requires something we don't support. Bail.
59 if streamreqs - repo.supportedformats:
63 if streamreqs - repo.supportedformats:
60 return False, None
64 return False, None
61 requirements = streamreqs
65 requirements = streamreqs
62
66
63 return True, requirements
67 return True, requirements
64
68
65 def maybeperformstreamclone(repo, remote, heads, stream):
69 def maybeperformstreamclone(repo, remote, heads, stream):
66 supported, requirements = canperformstreamclone(repo, remote, heads,
70 supported, requirements = canperformstreamclone(repo, remote, heads,
67 streamrequested=stream)
71 streamrequested=stream)
68 if not supported:
72 if not supported:
69 return
73 return
70
74
71 streamin(repo, remote, requirements)
75 streamin(repo, remote, requirements)
72
76
73 def allowservergeneration(ui):
77 def allowservergeneration(ui):
74 """Whether streaming clones are allowed from the server."""
78 """Whether streaming clones are allowed from the server."""
75 return ui.configbool('server', 'uncompressed', True, untrusted=True)
79 return ui.configbool('server', 'uncompressed', True, untrusted=True)
76
80
77 # This is it's own function so extensions can override it.
81 # This is it's own function so extensions can override it.
78 def _walkstreamfiles(repo):
82 def _walkstreamfiles(repo):
79 return repo.store.walk()
83 return repo.store.walk()
80
84
81 def generatev1(repo):
85 def generatev1(repo):
82 """Emit content for version 1 of a streaming clone.
86 """Emit content for version 1 of a streaming clone.
83
87
84 This is a generator of raw chunks that constitute a streaming clone.
88 This is a generator of raw chunks that constitute a streaming clone.
85
89
86 The stream begins with a line of 2 space-delimited integers containing the
90 The stream begins with a line of 2 space-delimited integers containing the
87 number of entries and total bytes size.
91 number of entries and total bytes size.
88
92
89 Next, are N entries for each file being transferred. Each file entry starts
93 Next, are N entries for each file being transferred. Each file entry starts
90 as a line with the file name and integer size delimited by a null byte.
94 as a line with the file name and integer size delimited by a null byte.
91 The raw file data follows. Following the raw file data is the next file
95 The raw file data follows. Following the raw file data is the next file
92 entry, or EOF.
96 entry, or EOF.
93
97
94 When used on the wire protocol, an additional line indicating protocol
98 When used on the wire protocol, an additional line indicating protocol
95 success will be prepended to the stream. This function is not responsible
99 success will be prepended to the stream. This function is not responsible
96 for adding it.
100 for adding it.
97
101
98 This function will obtain a repository lock to ensure a consistent view of
102 This function will obtain a repository lock to ensure a consistent view of
99 the store is captured. It therefore may raise LockError.
103 the store is captured. It therefore may raise LockError.
100 """
104 """
101 entries = []
105 entries = []
102 total_bytes = 0
106 total_bytes = 0
103 # Get consistent snapshot of repo, lock during scan.
107 # Get consistent snapshot of repo, lock during scan.
104 lock = repo.lock()
108 lock = repo.lock()
105 try:
109 try:
106 repo.ui.debug('scanning\n')
110 repo.ui.debug('scanning\n')
107 for name, ename, size in _walkstreamfiles(repo):
111 for name, ename, size in _walkstreamfiles(repo):
108 if size:
112 if size:
109 entries.append((name, size))
113 entries.append((name, size))
110 total_bytes += size
114 total_bytes += size
111 finally:
115 finally:
112 lock.release()
116 lock.release()
113
117
114 repo.ui.debug('%d files, %d bytes to transfer\n' %
118 repo.ui.debug('%d files, %d bytes to transfer\n' %
115 (len(entries), total_bytes))
119 (len(entries), total_bytes))
116 yield '%d %d\n' % (len(entries), total_bytes)
120 yield '%d %d\n' % (len(entries), total_bytes)
117
121
118 svfs = repo.svfs
122 svfs = repo.svfs
119 oldaudit = svfs.mustaudit
123 oldaudit = svfs.mustaudit
120 debugflag = repo.ui.debugflag
124 debugflag = repo.ui.debugflag
121 svfs.mustaudit = False
125 svfs.mustaudit = False
122
126
123 try:
127 try:
124 for name, size in entries:
128 for name, size in entries:
125 if debugflag:
129 if debugflag:
126 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
130 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
127 # partially encode name over the wire for backwards compat
131 # partially encode name over the wire for backwards compat
128 yield '%s\0%d\n' % (store.encodedir(name), size)
132 yield '%s\0%d\n' % (store.encodedir(name), size)
129 if size <= 65536:
133 if size <= 65536:
130 fp = svfs(name)
134 fp = svfs(name)
131 try:
135 try:
132 data = fp.read(size)
136 data = fp.read(size)
133 finally:
137 finally:
134 fp.close()
138 fp.close()
135 yield data
139 yield data
136 else:
140 else:
137 for chunk in util.filechunkiter(svfs(name), limit=size):
141 for chunk in util.filechunkiter(svfs(name), limit=size):
138 yield chunk
142 yield chunk
139 finally:
143 finally:
140 svfs.mustaudit = oldaudit
144 svfs.mustaudit = oldaudit
141
145
142 def consumev1(repo, fp):
146 def consumev1(repo, fp):
143 """Apply the contents from version 1 of a streaming clone file handle.
147 """Apply the contents from version 1 of a streaming clone file handle.
144
148
145 This takes the output from "streamout" and applies it to the specified
149 This takes the output from "streamout" and applies it to the specified
146 repository.
150 repository.
147
151
148 Like "streamout," the status line added by the wire protocol is not handled
152 Like "streamout," the status line added by the wire protocol is not handled
149 by this function.
153 by this function.
150 """
154 """
151 lock = repo.lock()
155 lock = repo.lock()
152 try:
156 try:
153 repo.ui.status(_('streaming all changes\n'))
157 repo.ui.status(_('streaming all changes\n'))
154 l = fp.readline()
158 l = fp.readline()
155 try:
159 try:
156 total_files, total_bytes = map(int, l.split(' ', 1))
160 total_files, total_bytes = map(int, l.split(' ', 1))
157 except (ValueError, TypeError):
161 except (ValueError, TypeError):
158 raise error.ResponseError(
162 raise error.ResponseError(
159 _('unexpected response from remote server:'), l)
163 _('unexpected response from remote server:'), l)
160 repo.ui.status(_('%d files to transfer, %s of data\n') %
164 repo.ui.status(_('%d files to transfer, %s of data\n') %
161 (total_files, util.bytecount(total_bytes)))
165 (total_files, util.bytecount(total_bytes)))
162 handled_bytes = 0
166 handled_bytes = 0
163 repo.ui.progress(_('clone'), 0, total=total_bytes)
167 repo.ui.progress(_('clone'), 0, total=total_bytes)
164 start = time.time()
168 start = time.time()
165
169
166 tr = repo.transaction(_('clone'))
170 tr = repo.transaction(_('clone'))
167 try:
171 try:
168 for i in xrange(total_files):
172 for i in xrange(total_files):
169 # XXX doesn't support '\n' or '\r' in filenames
173 # XXX doesn't support '\n' or '\r' in filenames
170 l = fp.readline()
174 l = fp.readline()
171 try:
175 try:
172 name, size = l.split('\0', 1)
176 name, size = l.split('\0', 1)
173 size = int(size)
177 size = int(size)
174 except (ValueError, TypeError):
178 except (ValueError, TypeError):
175 raise error.ResponseError(
179 raise error.ResponseError(
176 _('unexpected response from remote server:'), l)
180 _('unexpected response from remote server:'), l)
177 if repo.ui.debugflag:
181 if repo.ui.debugflag:
178 repo.ui.debug('adding %s (%s)\n' %
182 repo.ui.debug('adding %s (%s)\n' %
179 (name, util.bytecount(size)))
183 (name, util.bytecount(size)))
180 # for backwards compat, name was partially encoded
184 # for backwards compat, name was partially encoded
181 ofp = repo.svfs(store.decodedir(name), 'w')
185 ofp = repo.svfs(store.decodedir(name), 'w')
182 for chunk in util.filechunkiter(fp, limit=size):
186 for chunk in util.filechunkiter(fp, limit=size):
183 handled_bytes += len(chunk)
187 handled_bytes += len(chunk)
184 repo.ui.progress(_('clone'), handled_bytes,
188 repo.ui.progress(_('clone'), handled_bytes,
185 total=total_bytes)
189 total=total_bytes)
186 ofp.write(chunk)
190 ofp.write(chunk)
187 ofp.close()
191 ofp.close()
188 tr.close()
192 tr.close()
189 finally:
193 finally:
190 tr.release()
194 tr.release()
191
195
192 # Writing straight to files circumvented the inmemory caches
196 # Writing straight to files circumvented the inmemory caches
193 repo.invalidate()
197 repo.invalidate()
194
198
195 elapsed = time.time() - start
199 elapsed = time.time() - start
196 if elapsed <= 0:
200 if elapsed <= 0:
197 elapsed = 0.001
201 elapsed = 0.001
198 repo.ui.progress(_('clone'), None)
202 repo.ui.progress(_('clone'), None)
199 repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
203 repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
200 (util.bytecount(total_bytes), elapsed,
204 (util.bytecount(total_bytes), elapsed,
201 util.bytecount(total_bytes / elapsed)))
205 util.bytecount(total_bytes / elapsed)))
202 finally:
206 finally:
203 lock.release()
207 lock.release()
204
208
205 def streamin(repo, remote, remotereqs):
209 def streamin(repo, remote, remotereqs):
206 # Save remote branchmap. We will use it later
210 # Save remote branchmap. We will use it later
207 # to speed up branchcache creation
211 # to speed up branchcache creation
208 rbranchmap = None
212 rbranchmap = None
209 if remote.capable("branchmap"):
213 if remote.capable("branchmap"):
210 rbranchmap = remote.branchmap()
214 rbranchmap = remote.branchmap()
211
215
212 fp = remote.stream_out()
216 fp = remote.stream_out()
213 l = fp.readline()
217 l = fp.readline()
214 try:
218 try:
215 resp = int(l)
219 resp = int(l)
216 except ValueError:
220 except ValueError:
217 raise error.ResponseError(
221 raise error.ResponseError(
218 _('unexpected response from remote server:'), l)
222 _('unexpected response from remote server:'), l)
219 if resp == 1:
223 if resp == 1:
220 raise util.Abort(_('operation forbidden by server'))
224 raise util.Abort(_('operation forbidden by server'))
221 elif resp == 2:
225 elif resp == 2:
222 raise util.Abort(_('locking the remote repository failed'))
226 raise util.Abort(_('locking the remote repository failed'))
223 elif resp != 0:
227 elif resp != 0:
224 raise util.Abort(_('the server sent an unknown error code'))
228 raise util.Abort(_('the server sent an unknown error code'))
225
229
226 applyremotedata(repo, remotereqs, rbranchmap, fp)
230 applyremotedata(repo, remotereqs, rbranchmap, fp)
227 return len(repo.heads()) + 1
231 return len(repo.heads()) + 1
228
232
229 def applyremotedata(repo, remotereqs, remotebranchmap, fp):
233 def applyremotedata(repo, remotereqs, remotebranchmap, fp):
230 """Apply stream clone data to a repository.
234 """Apply stream clone data to a repository.
231
235
232 "remotereqs" is a set of requirements to handle the incoming data.
236 "remotereqs" is a set of requirements to handle the incoming data.
233 "remotebranchmap" is the result of a branchmap lookup on the remote. It
237 "remotebranchmap" is the result of a branchmap lookup on the remote. It
234 can be None.
238 can be None.
235 "fp" is a file object containing the raw stream data, suitable for
239 "fp" is a file object containing the raw stream data, suitable for
236 feeding into consumev1().
240 feeding into consumev1().
237 """
241 """
238 lock = repo.lock()
242 lock = repo.lock()
239 try:
243 try:
240 consumev1(repo, fp)
244 consumev1(repo, fp)
241
245
242 # new requirements = old non-format requirements +
246 # new requirements = old non-format requirements +
243 # new format-related remote requirements
247 # new format-related remote requirements
244 # requirements from the streamed-in repository
248 # requirements from the streamed-in repository
245 repo.requirements = remotereqs | (
249 repo.requirements = remotereqs | (
246 repo.requirements - repo.supportedformats)
250 repo.requirements - repo.supportedformats)
247 repo._applyopenerreqs()
251 repo._applyopenerreqs()
248 repo._writerequirements()
252 repo._writerequirements()
249
253
250 if remotebranchmap:
254 if remotebranchmap:
251 rbheads = []
255 rbheads = []
252 closed = []
256 closed = []
253 for bheads in remotebranchmap.itervalues():
257 for bheads in remotebranchmap.itervalues():
254 rbheads.extend(bheads)
258 rbheads.extend(bheads)
255 for h in bheads:
259 for h in bheads:
256 r = repo.changelog.rev(h)
260 r = repo.changelog.rev(h)
257 b, c = repo.changelog.branchinfo(r)
261 b, c = repo.changelog.branchinfo(r)
258 if c:
262 if c:
259 closed.append(h)
263 closed.append(h)
260
264
261 if rbheads:
265 if rbheads:
262 rtiprev = max((int(repo.changelog.rev(node))
266 rtiprev = max((int(repo.changelog.rev(node))
263 for node in rbheads))
267 for node in rbheads))
264 cache = branchmap.branchcache(remotebranchmap,
268 cache = branchmap.branchcache(remotebranchmap,
265 repo[rtiprev].node(),
269 repo[rtiprev].node(),
266 rtiprev,
270 rtiprev,
267 closednodes=closed)
271 closednodes=closed)
268 # Try to stick it as low as possible
272 # Try to stick it as low as possible
269 # filter above served are unlikely to be fetch from a clone
273 # filter above served are unlikely to be fetch from a clone
270 for candidate in ('base', 'immutable', 'served'):
274 for candidate in ('base', 'immutable', 'served'):
271 rview = repo.filtered(candidate)
275 rview = repo.filtered(candidate)
272 if cache.validfor(rview):
276 if cache.validfor(rview):
273 repo._branchcaches[candidate] = cache
277 repo._branchcaches[candidate] = cache
274 cache.write(rview)
278 cache.write(rview)
275 break
279 break
276 repo.invalidate()
280 repo.invalidate()
277 finally:
281 finally:
278 lock.release()
282 lock.release()
General Comments 0
You need to be logged in to leave comments. Login now