##// END OF EJS Templates
streamclone: move streamin() into maybeperformstreamclone()...
Gregory Szorc -
r26459:3b28ffde default
parent child Browse files
Show More
@@ -1,287 +1,283 b''
1 # streamclone.py - producing and consuming streaming repository data
1 # streamclone.py - producing and consuming streaming repository data
2 #
2 #
3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import time
10 import time
11
11
12 from .i18n import _
12 from .i18n import _
13 from . import (
13 from . import (
14 branchmap,
14 branchmap,
15 error,
15 error,
16 store,
16 store,
17 util,
17 util,
18 )
18 )
19
19
20 def canperformstreamclone(repo, remote, heads, streamrequested=None):
20 def canperformstreamclone(repo, remote, heads, streamrequested=None):
21 """Whether it is possible to perform a streaming clone as part of pull.
21 """Whether it is possible to perform a streaming clone as part of pull.
22
22
23 Returns a tuple of (supported, requirements). ``supported`` is True if
23 Returns a tuple of (supported, requirements). ``supported`` is True if
24 streaming clone is supported and False otherwise. ``requirements`` is
24 streaming clone is supported and False otherwise. ``requirements`` is
25 a set of repo requirements from the remote, or ``None`` if stream clone
25 a set of repo requirements from the remote, or ``None`` if stream clone
26 isn't supported.
26 isn't supported.
27 """
27 """
28 # Streaming clone only works on empty repositories.
28 # Streaming clone only works on empty repositories.
29 if len(repo):
29 if len(repo):
30 return False, None
30 return False, None
31
31
32 # Streaming clone only works if all data is being requested.
32 # Streaming clone only works if all data is being requested.
33 if heads:
33 if heads:
34 return False, None
34 return False, None
35
35
36 # If we don't have a preference, let the server decide for us. This
36 # If we don't have a preference, let the server decide for us. This
37 # likely only comes into play in LANs.
37 # likely only comes into play in LANs.
38 if streamrequested is None:
38 if streamrequested is None:
39 # The server can advertise whether to prefer streaming clone.
39 # The server can advertise whether to prefer streaming clone.
40 streamrequested = remote.capable('stream-preferred')
40 streamrequested = remote.capable('stream-preferred')
41
41
42 if not streamrequested:
42 if not streamrequested:
43 return False, None
43 return False, None
44
44
45 # In order for stream clone to work, the client has to support all the
45 # In order for stream clone to work, the client has to support all the
46 # requirements advertised by the server.
46 # requirements advertised by the server.
47 #
47 #
48 # The server advertises its requirements via the "stream" and "streamreqs"
48 # The server advertises its requirements via the "stream" and "streamreqs"
49 # capability. "stream" (a value-less capability) is advertised if and only
49 # capability. "stream" (a value-less capability) is advertised if and only
50 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
50 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
51 # is advertised and contains a comma-delimited list of requirements.
51 # is advertised and contains a comma-delimited list of requirements.
52 requirements = set()
52 requirements = set()
53 if remote.capable('stream'):
53 if remote.capable('stream'):
54 requirements.add('revlogv1')
54 requirements.add('revlogv1')
55 else:
55 else:
56 streamreqs = remote.capable('streamreqs')
56 streamreqs = remote.capable('streamreqs')
57 # This is weird and shouldn't happen with modern servers.
57 # This is weird and shouldn't happen with modern servers.
58 if not streamreqs:
58 if not streamreqs:
59 return False, None
59 return False, None
60
60
61 streamreqs = set(streamreqs.split(','))
61 streamreqs = set(streamreqs.split(','))
62 # Server requires something we don't support. Bail.
62 # Server requires something we don't support. Bail.
63 if streamreqs - repo.supportedformats:
63 if streamreqs - repo.supportedformats:
64 return False, None
64 return False, None
65 requirements = streamreqs
65 requirements = streamreqs
66
66
67 return True, requirements
67 return True, requirements
68
68
69 def maybeperformstreamclone(pullop):
69 def maybeperformstreamclone(pullop):
70 repo = pullop.repo
70 repo = pullop.repo
71 remote = pullop.remote
71 remote = pullop.remote
72
72
73 r = canperformstreamclone(repo, remote, pullop.heads,
73 r = canperformstreamclone(repo, remote, pullop.heads,
74 streamrequested=pullop.streamclonerequested)
74 streamrequested=pullop.streamclonerequested)
75 supported, requirements = r
75 supported, requirements = r
76
76
77 if not supported:
77 if not supported:
78 return
78 return
79
79
80 streamin(repo, remote, requirements)
80 # Save remote branchmap. We will use it later to speed up branchcache
81 # creation.
82 rbranchmap = None
83 if remote.capable('branchmap'):
84 rbranchmap = remote.branchmap()
85
86 fp = remote.stream_out()
87 l = fp.readline()
88 try:
89 resp = int(l)
90 except ValueError:
91 raise error.ResponseError(
92 _('unexpected response from remote server:'), l)
93 if resp == 1:
94 raise util.Abort(_('operation forbidden by server'))
95 elif resp == 2:
96 raise util.Abort(_('locking the remote repository failed'))
97 elif resp != 0:
98 raise util.Abort(_('the server sent an unknown error code'))
99
100 applyremotedata(repo, requirements, rbranchmap, fp)
81
101
82 def allowservergeneration(ui):
102 def allowservergeneration(ui):
83 """Whether streaming clones are allowed from the server."""
103 """Whether streaming clones are allowed from the server."""
84 return ui.configbool('server', 'uncompressed', True, untrusted=True)
104 return ui.configbool('server', 'uncompressed', True, untrusted=True)
85
105
86 # This is it's own function so extensions can override it.
106 # This is it's own function so extensions can override it.
87 def _walkstreamfiles(repo):
107 def _walkstreamfiles(repo):
88 return repo.store.walk()
108 return repo.store.walk()
89
109
90 def generatev1(repo):
110 def generatev1(repo):
91 """Emit content for version 1 of a streaming clone.
111 """Emit content for version 1 of a streaming clone.
92
112
93 This is a generator of raw chunks that constitute a streaming clone.
113 This is a generator of raw chunks that constitute a streaming clone.
94
114
95 The stream begins with a line of 2 space-delimited integers containing the
115 The stream begins with a line of 2 space-delimited integers containing the
96 number of entries and total bytes size.
116 number of entries and total bytes size.
97
117
98 Next, are N entries for each file being transferred. Each file entry starts
118 Next, are N entries for each file being transferred. Each file entry starts
99 as a line with the file name and integer size delimited by a null byte.
119 as a line with the file name and integer size delimited by a null byte.
100 The raw file data follows. Following the raw file data is the next file
120 The raw file data follows. Following the raw file data is the next file
101 entry, or EOF.
121 entry, or EOF.
102
122
103 When used on the wire protocol, an additional line indicating protocol
123 When used on the wire protocol, an additional line indicating protocol
104 success will be prepended to the stream. This function is not responsible
124 success will be prepended to the stream. This function is not responsible
105 for adding it.
125 for adding it.
106
126
107 This function will obtain a repository lock to ensure a consistent view of
127 This function will obtain a repository lock to ensure a consistent view of
108 the store is captured. It therefore may raise LockError.
128 the store is captured. It therefore may raise LockError.
109 """
129 """
110 entries = []
130 entries = []
111 total_bytes = 0
131 total_bytes = 0
112 # Get consistent snapshot of repo, lock during scan.
132 # Get consistent snapshot of repo, lock during scan.
113 lock = repo.lock()
133 lock = repo.lock()
114 try:
134 try:
115 repo.ui.debug('scanning\n')
135 repo.ui.debug('scanning\n')
116 for name, ename, size in _walkstreamfiles(repo):
136 for name, ename, size in _walkstreamfiles(repo):
117 if size:
137 if size:
118 entries.append((name, size))
138 entries.append((name, size))
119 total_bytes += size
139 total_bytes += size
120 finally:
140 finally:
121 lock.release()
141 lock.release()
122
142
123 repo.ui.debug('%d files, %d bytes to transfer\n' %
143 repo.ui.debug('%d files, %d bytes to transfer\n' %
124 (len(entries), total_bytes))
144 (len(entries), total_bytes))
125 yield '%d %d\n' % (len(entries), total_bytes)
145 yield '%d %d\n' % (len(entries), total_bytes)
126
146
127 svfs = repo.svfs
147 svfs = repo.svfs
128 oldaudit = svfs.mustaudit
148 oldaudit = svfs.mustaudit
129 debugflag = repo.ui.debugflag
149 debugflag = repo.ui.debugflag
130 svfs.mustaudit = False
150 svfs.mustaudit = False
131
151
132 try:
152 try:
133 for name, size in entries:
153 for name, size in entries:
134 if debugflag:
154 if debugflag:
135 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
155 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
136 # partially encode name over the wire for backwards compat
156 # partially encode name over the wire for backwards compat
137 yield '%s\0%d\n' % (store.encodedir(name), size)
157 yield '%s\0%d\n' % (store.encodedir(name), size)
138 if size <= 65536:
158 if size <= 65536:
139 fp = svfs(name)
159 fp = svfs(name)
140 try:
160 try:
141 data = fp.read(size)
161 data = fp.read(size)
142 finally:
162 finally:
143 fp.close()
163 fp.close()
144 yield data
164 yield data
145 else:
165 else:
146 for chunk in util.filechunkiter(svfs(name), limit=size):
166 for chunk in util.filechunkiter(svfs(name), limit=size):
147 yield chunk
167 yield chunk
148 finally:
168 finally:
149 svfs.mustaudit = oldaudit
169 svfs.mustaudit = oldaudit
150
170
151 def consumev1(repo, fp):
171 def consumev1(repo, fp):
152 """Apply the contents from version 1 of a streaming clone file handle.
172 """Apply the contents from version 1 of a streaming clone file handle.
153
173
154 This takes the output from "streamout" and applies it to the specified
174 This takes the output from "streamout" and applies it to the specified
155 repository.
175 repository.
156
176
157 Like "streamout," the status line added by the wire protocol is not handled
177 Like "streamout," the status line added by the wire protocol is not handled
158 by this function.
178 by this function.
159 """
179 """
160 lock = repo.lock()
180 lock = repo.lock()
161 try:
181 try:
162 repo.ui.status(_('streaming all changes\n'))
182 repo.ui.status(_('streaming all changes\n'))
163 l = fp.readline()
183 l = fp.readline()
164 try:
184 try:
165 total_files, total_bytes = map(int, l.split(' ', 1))
185 total_files, total_bytes = map(int, l.split(' ', 1))
166 except (ValueError, TypeError):
186 except (ValueError, TypeError):
167 raise error.ResponseError(
187 raise error.ResponseError(
168 _('unexpected response from remote server:'), l)
188 _('unexpected response from remote server:'), l)
169 repo.ui.status(_('%d files to transfer, %s of data\n') %
189 repo.ui.status(_('%d files to transfer, %s of data\n') %
170 (total_files, util.bytecount(total_bytes)))
190 (total_files, util.bytecount(total_bytes)))
171 handled_bytes = 0
191 handled_bytes = 0
172 repo.ui.progress(_('clone'), 0, total=total_bytes)
192 repo.ui.progress(_('clone'), 0, total=total_bytes)
173 start = time.time()
193 start = time.time()
174
194
175 tr = repo.transaction(_('clone'))
195 tr = repo.transaction(_('clone'))
176 try:
196 try:
177 for i in xrange(total_files):
197 for i in xrange(total_files):
178 # XXX doesn't support '\n' or '\r' in filenames
198 # XXX doesn't support '\n' or '\r' in filenames
179 l = fp.readline()
199 l = fp.readline()
180 try:
200 try:
181 name, size = l.split('\0', 1)
201 name, size = l.split('\0', 1)
182 size = int(size)
202 size = int(size)
183 except (ValueError, TypeError):
203 except (ValueError, TypeError):
184 raise error.ResponseError(
204 raise error.ResponseError(
185 _('unexpected response from remote server:'), l)
205 _('unexpected response from remote server:'), l)
186 if repo.ui.debugflag:
206 if repo.ui.debugflag:
187 repo.ui.debug('adding %s (%s)\n' %
207 repo.ui.debug('adding %s (%s)\n' %
188 (name, util.bytecount(size)))
208 (name, util.bytecount(size)))
189 # for backwards compat, name was partially encoded
209 # for backwards compat, name was partially encoded
190 ofp = repo.svfs(store.decodedir(name), 'w')
210 ofp = repo.svfs(store.decodedir(name), 'w')
191 for chunk in util.filechunkiter(fp, limit=size):
211 for chunk in util.filechunkiter(fp, limit=size):
192 handled_bytes += len(chunk)
212 handled_bytes += len(chunk)
193 repo.ui.progress(_('clone'), handled_bytes,
213 repo.ui.progress(_('clone'), handled_bytes,
194 total=total_bytes)
214 total=total_bytes)
195 ofp.write(chunk)
215 ofp.write(chunk)
196 ofp.close()
216 ofp.close()
197 tr.close()
217 tr.close()
198 finally:
218 finally:
199 tr.release()
219 tr.release()
200
220
201 # Writing straight to files circumvented the inmemory caches
221 # Writing straight to files circumvented the inmemory caches
202 repo.invalidate()
222 repo.invalidate()
203
223
204 elapsed = time.time() - start
224 elapsed = time.time() - start
205 if elapsed <= 0:
225 if elapsed <= 0:
206 elapsed = 0.001
226 elapsed = 0.001
207 repo.ui.progress(_('clone'), None)
227 repo.ui.progress(_('clone'), None)
208 repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
228 repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
209 (util.bytecount(total_bytes), elapsed,
229 (util.bytecount(total_bytes), elapsed,
210 util.bytecount(total_bytes / elapsed)))
230 util.bytecount(total_bytes / elapsed)))
211 finally:
231 finally:
212 lock.release()
232 lock.release()
213
233
214 def streamin(repo, remote, remotereqs):
215 # Save remote branchmap. We will use it later
216 # to speed up branchcache creation
217 rbranchmap = None
218 if remote.capable("branchmap"):
219 rbranchmap = remote.branchmap()
220
221 fp = remote.stream_out()
222 l = fp.readline()
223 try:
224 resp = int(l)
225 except ValueError:
226 raise error.ResponseError(
227 _('unexpected response from remote server:'), l)
228 if resp == 1:
229 raise util.Abort(_('operation forbidden by server'))
230 elif resp == 2:
231 raise util.Abort(_('locking the remote repository failed'))
232 elif resp != 0:
233 raise util.Abort(_('the server sent an unknown error code'))
234
235 applyremotedata(repo, remotereqs, rbranchmap, fp)
236 return len(repo.heads()) + 1
237
238 def applyremotedata(repo, remotereqs, remotebranchmap, fp):
234 def applyremotedata(repo, remotereqs, remotebranchmap, fp):
239 """Apply stream clone data to a repository.
235 """Apply stream clone data to a repository.
240
236
241 "remotereqs" is a set of requirements to handle the incoming data.
237 "remotereqs" is a set of requirements to handle the incoming data.
242 "remotebranchmap" is the result of a branchmap lookup on the remote. It
238 "remotebranchmap" is the result of a branchmap lookup on the remote. It
243 can be None.
239 can be None.
244 "fp" is a file object containing the raw stream data, suitable for
240 "fp" is a file object containing the raw stream data, suitable for
245 feeding into consumev1().
241 feeding into consumev1().
246 """
242 """
247 lock = repo.lock()
243 lock = repo.lock()
248 try:
244 try:
249 consumev1(repo, fp)
245 consumev1(repo, fp)
250
246
251 # new requirements = old non-format requirements +
247 # new requirements = old non-format requirements +
252 # new format-related remote requirements
248 # new format-related remote requirements
253 # requirements from the streamed-in repository
249 # requirements from the streamed-in repository
254 repo.requirements = remotereqs | (
250 repo.requirements = remotereqs | (
255 repo.requirements - repo.supportedformats)
251 repo.requirements - repo.supportedformats)
256 repo._applyopenerreqs()
252 repo._applyopenerreqs()
257 repo._writerequirements()
253 repo._writerequirements()
258
254
259 if remotebranchmap:
255 if remotebranchmap:
260 rbheads = []
256 rbheads = []
261 closed = []
257 closed = []
262 for bheads in remotebranchmap.itervalues():
258 for bheads in remotebranchmap.itervalues():
263 rbheads.extend(bheads)
259 rbheads.extend(bheads)
264 for h in bheads:
260 for h in bheads:
265 r = repo.changelog.rev(h)
261 r = repo.changelog.rev(h)
266 b, c = repo.changelog.branchinfo(r)
262 b, c = repo.changelog.branchinfo(r)
267 if c:
263 if c:
268 closed.append(h)
264 closed.append(h)
269
265
270 if rbheads:
266 if rbheads:
271 rtiprev = max((int(repo.changelog.rev(node))
267 rtiprev = max((int(repo.changelog.rev(node))
272 for node in rbheads))
268 for node in rbheads))
273 cache = branchmap.branchcache(remotebranchmap,
269 cache = branchmap.branchcache(remotebranchmap,
274 repo[rtiprev].node(),
270 repo[rtiprev].node(),
275 rtiprev,
271 rtiprev,
276 closednodes=closed)
272 closednodes=closed)
277 # Try to stick it as low as possible
273 # Try to stick it as low as possible
278 # filter above served are unlikely to be fetch from a clone
274 # filter above served are unlikely to be fetch from a clone
279 for candidate in ('base', 'immutable', 'served'):
275 for candidate in ('base', 'immutable', 'served'):
280 rview = repo.filtered(candidate)
276 rview = repo.filtered(candidate)
281 if cache.validfor(rview):
277 if cache.validfor(rview):
282 repo._branchcaches[candidate] = cache
278 repo._branchcaches[candidate] = cache
283 cache.write(rview)
279 cache.write(rview)
284 break
280 break
285 repo.invalidate()
281 repo.invalidate()
286 finally:
282 finally:
287 lock.release()
283 lock.release()
General Comments 0
You need to be logged in to leave comments. Login now