##// END OF EJS Templates
streamclone: move applyremotedata() into maybeperformstreamclone()...
Gregory Szorc -
r26461:09cc3c2e default
parent child Browse files
Show More
@@ -1,260 +1,249 b''
1 # streamclone.py - producing and consuming streaming repository data
1 # streamclone.py - producing and consuming streaming repository data
2 #
2 #
3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import time
10 import time
11
11
12 from .i18n import _
12 from .i18n import _
13 from . import (
13 from . import (
14 branchmap,
14 branchmap,
15 error,
15 error,
16 store,
16 store,
17 util,
17 util,
18 )
18 )
19
19
20 def canperformstreamclone(repo, remote, heads, streamrequested=None):
20 def canperformstreamclone(repo, remote, heads, streamrequested=None):
21 """Whether it is possible to perform a streaming clone as part of pull.
21 """Whether it is possible to perform a streaming clone as part of pull.
22
22
23 Returns a tuple of (supported, requirements). ``supported`` is True if
23 Returns a tuple of (supported, requirements). ``supported`` is True if
24 streaming clone is supported and False otherwise. ``requirements`` is
24 streaming clone is supported and False otherwise. ``requirements`` is
25 a set of repo requirements from the remote, or ``None`` if stream clone
25 a set of repo requirements from the remote, or ``None`` if stream clone
26 isn't supported.
26 isn't supported.
27 """
27 """
28 # Streaming clone only works on empty repositories.
28 # Streaming clone only works on empty repositories.
29 if len(repo):
29 if len(repo):
30 return False, None
30 return False, None
31
31
32 # Streaming clone only works if all data is being requested.
32 # Streaming clone only works if all data is being requested.
33 if heads:
33 if heads:
34 return False, None
34 return False, None
35
35
36 # If we don't have a preference, let the server decide for us. This
36 # If we don't have a preference, let the server decide for us. This
37 # likely only comes into play in LANs.
37 # likely only comes into play in LANs.
38 if streamrequested is None:
38 if streamrequested is None:
39 # The server can advertise whether to prefer streaming clone.
39 # The server can advertise whether to prefer streaming clone.
40 streamrequested = remote.capable('stream-preferred')
40 streamrequested = remote.capable('stream-preferred')
41
41
42 if not streamrequested:
42 if not streamrequested:
43 return False, None
43 return False, None
44
44
45 # In order for stream clone to work, the client has to support all the
45 # In order for stream clone to work, the client has to support all the
46 # requirements advertised by the server.
46 # requirements advertised by the server.
47 #
47 #
48 # The server advertises its requirements via the "stream" and "streamreqs"
48 # The server advertises its requirements via the "stream" and "streamreqs"
49 # capability. "stream" (a value-less capability) is advertised if and only
49 # capability. "stream" (a value-less capability) is advertised if and only
50 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
50 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
51 # is advertised and contains a comma-delimited list of requirements.
51 # is advertised and contains a comma-delimited list of requirements.
52 requirements = set()
52 requirements = set()
53 if remote.capable('stream'):
53 if remote.capable('stream'):
54 requirements.add('revlogv1')
54 requirements.add('revlogv1')
55 else:
55 else:
56 streamreqs = remote.capable('streamreqs')
56 streamreqs = remote.capable('streamreqs')
57 # This is weird and shouldn't happen with modern servers.
57 # This is weird and shouldn't happen with modern servers.
58 if not streamreqs:
58 if not streamreqs:
59 return False, None
59 return False, None
60
60
61 streamreqs = set(streamreqs.split(','))
61 streamreqs = set(streamreqs.split(','))
62 # Server requires something we don't support. Bail.
62 # Server requires something we don't support. Bail.
63 if streamreqs - repo.supportedformats:
63 if streamreqs - repo.supportedformats:
64 return False, None
64 return False, None
65 requirements = streamreqs
65 requirements = streamreqs
66
66
67 return True, requirements
67 return True, requirements
68
68
69 def maybeperformstreamclone(pullop):
69 def maybeperformstreamclone(pullop):
70 repo = pullop.repo
70 repo = pullop.repo
71 remote = pullop.remote
71 remote = pullop.remote
72
72
73 r = canperformstreamclone(repo, remote, pullop.heads,
73 r = canperformstreamclone(repo, remote, pullop.heads,
74 streamrequested=pullop.streamclonerequested)
74 streamrequested=pullop.streamclonerequested)
75 supported, requirements = r
75 supported, requirements = r
76
76
77 if not supported:
77 if not supported:
78 return
78 return
79
79
80 # Save remote branchmap. We will use it later to speed up branchcache
80 # Save remote branchmap. We will use it later to speed up branchcache
81 # creation.
81 # creation.
82 rbranchmap = None
82 rbranchmap = None
83 if remote.capable('branchmap'):
83 if remote.capable('branchmap'):
84 rbranchmap = remote.branchmap()
84 rbranchmap = remote.branchmap()
85
85
86 fp = remote.stream_out()
86 fp = remote.stream_out()
87 l = fp.readline()
87 l = fp.readline()
88 try:
88 try:
89 resp = int(l)
89 resp = int(l)
90 except ValueError:
90 except ValueError:
91 raise error.ResponseError(
91 raise error.ResponseError(
92 _('unexpected response from remote server:'), l)
92 _('unexpected response from remote server:'), l)
93 if resp == 1:
93 if resp == 1:
94 raise util.Abort(_('operation forbidden by server'))
94 raise util.Abort(_('operation forbidden by server'))
95 elif resp == 2:
95 elif resp == 2:
96 raise util.Abort(_('locking the remote repository failed'))
96 raise util.Abort(_('locking the remote repository failed'))
97 elif resp != 0:
97 elif resp != 0:
98 raise util.Abort(_('the server sent an unknown error code'))
98 raise util.Abort(_('the server sent an unknown error code'))
99
99
100 applyremotedata(repo, requirements, rbranchmap, fp)
100 lock = repo.lock()
101 try:
102 consumev1(repo, fp)
103
104 # new requirements = old non-format requirements +
105 # new format-related remote requirements
106 # requirements from the streamed-in repository
107 repo.requirements = requirements | (
108 repo.requirements - repo.supportedformats)
109 repo._applyopenerreqs()
110 repo._writerequirements()
111
112 if rbranchmap:
113 branchmap.replacecache(repo, rbranchmap)
114
115 repo.invalidate()
116 finally:
117 lock.release()
101
118
102 def allowservergeneration(ui):
119 def allowservergeneration(ui):
103 """Whether streaming clones are allowed from the server."""
120 """Whether streaming clones are allowed from the server."""
104 return ui.configbool('server', 'uncompressed', True, untrusted=True)
121 return ui.configbool('server', 'uncompressed', True, untrusted=True)
105
122
106 # This is it's own function so extensions can override it.
123 # This is it's own function so extensions can override it.
107 def _walkstreamfiles(repo):
124 def _walkstreamfiles(repo):
108 return repo.store.walk()
125 return repo.store.walk()
109
126
110 def generatev1(repo):
127 def generatev1(repo):
111 """Emit content for version 1 of a streaming clone.
128 """Emit content for version 1 of a streaming clone.
112
129
113 This is a generator of raw chunks that constitute a streaming clone.
130 This is a generator of raw chunks that constitute a streaming clone.
114
131
115 The stream begins with a line of 2 space-delimited integers containing the
132 The stream begins with a line of 2 space-delimited integers containing the
116 number of entries and total bytes size.
133 number of entries and total bytes size.
117
134
118 Next, are N entries for each file being transferred. Each file entry starts
135 Next, are N entries for each file being transferred. Each file entry starts
119 as a line with the file name and integer size delimited by a null byte.
136 as a line with the file name and integer size delimited by a null byte.
120 The raw file data follows. Following the raw file data is the next file
137 The raw file data follows. Following the raw file data is the next file
121 entry, or EOF.
138 entry, or EOF.
122
139
123 When used on the wire protocol, an additional line indicating protocol
140 When used on the wire protocol, an additional line indicating protocol
124 success will be prepended to the stream. This function is not responsible
141 success will be prepended to the stream. This function is not responsible
125 for adding it.
142 for adding it.
126
143
127 This function will obtain a repository lock to ensure a consistent view of
144 This function will obtain a repository lock to ensure a consistent view of
128 the store is captured. It therefore may raise LockError.
145 the store is captured. It therefore may raise LockError.
129 """
146 """
130 entries = []
147 entries = []
131 total_bytes = 0
148 total_bytes = 0
132 # Get consistent snapshot of repo, lock during scan.
149 # Get consistent snapshot of repo, lock during scan.
133 lock = repo.lock()
150 lock = repo.lock()
134 try:
151 try:
135 repo.ui.debug('scanning\n')
152 repo.ui.debug('scanning\n')
136 for name, ename, size in _walkstreamfiles(repo):
153 for name, ename, size in _walkstreamfiles(repo):
137 if size:
154 if size:
138 entries.append((name, size))
155 entries.append((name, size))
139 total_bytes += size
156 total_bytes += size
140 finally:
157 finally:
141 lock.release()
158 lock.release()
142
159
143 repo.ui.debug('%d files, %d bytes to transfer\n' %
160 repo.ui.debug('%d files, %d bytes to transfer\n' %
144 (len(entries), total_bytes))
161 (len(entries), total_bytes))
145 yield '%d %d\n' % (len(entries), total_bytes)
162 yield '%d %d\n' % (len(entries), total_bytes)
146
163
147 svfs = repo.svfs
164 svfs = repo.svfs
148 oldaudit = svfs.mustaudit
165 oldaudit = svfs.mustaudit
149 debugflag = repo.ui.debugflag
166 debugflag = repo.ui.debugflag
150 svfs.mustaudit = False
167 svfs.mustaudit = False
151
168
152 try:
169 try:
153 for name, size in entries:
170 for name, size in entries:
154 if debugflag:
171 if debugflag:
155 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
172 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
156 # partially encode name over the wire for backwards compat
173 # partially encode name over the wire for backwards compat
157 yield '%s\0%d\n' % (store.encodedir(name), size)
174 yield '%s\0%d\n' % (store.encodedir(name), size)
158 if size <= 65536:
175 if size <= 65536:
159 fp = svfs(name)
176 fp = svfs(name)
160 try:
177 try:
161 data = fp.read(size)
178 data = fp.read(size)
162 finally:
179 finally:
163 fp.close()
180 fp.close()
164 yield data
181 yield data
165 else:
182 else:
166 for chunk in util.filechunkiter(svfs(name), limit=size):
183 for chunk in util.filechunkiter(svfs(name), limit=size):
167 yield chunk
184 yield chunk
168 finally:
185 finally:
169 svfs.mustaudit = oldaudit
186 svfs.mustaudit = oldaudit
170
187
171 def consumev1(repo, fp):
188 def consumev1(repo, fp):
172 """Apply the contents from version 1 of a streaming clone file handle.
189 """Apply the contents from version 1 of a streaming clone file handle.
173
190
174 This takes the output from "streamout" and applies it to the specified
191 This takes the output from "streamout" and applies it to the specified
175 repository.
192 repository.
176
193
177 Like "streamout," the status line added by the wire protocol is not handled
194 Like "streamout," the status line added by the wire protocol is not handled
178 by this function.
195 by this function.
179 """
196 """
180 lock = repo.lock()
197 lock = repo.lock()
181 try:
198 try:
182 repo.ui.status(_('streaming all changes\n'))
199 repo.ui.status(_('streaming all changes\n'))
183 l = fp.readline()
200 l = fp.readline()
184 try:
201 try:
185 total_files, total_bytes = map(int, l.split(' ', 1))
202 total_files, total_bytes = map(int, l.split(' ', 1))
186 except (ValueError, TypeError):
203 except (ValueError, TypeError):
187 raise error.ResponseError(
204 raise error.ResponseError(
188 _('unexpected response from remote server:'), l)
205 _('unexpected response from remote server:'), l)
189 repo.ui.status(_('%d files to transfer, %s of data\n') %
206 repo.ui.status(_('%d files to transfer, %s of data\n') %
190 (total_files, util.bytecount(total_bytes)))
207 (total_files, util.bytecount(total_bytes)))
191 handled_bytes = 0
208 handled_bytes = 0
192 repo.ui.progress(_('clone'), 0, total=total_bytes)
209 repo.ui.progress(_('clone'), 0, total=total_bytes)
193 start = time.time()
210 start = time.time()
194
211
195 tr = repo.transaction(_('clone'))
212 tr = repo.transaction(_('clone'))
196 try:
213 try:
197 for i in xrange(total_files):
214 for i in xrange(total_files):
198 # XXX doesn't support '\n' or '\r' in filenames
215 # XXX doesn't support '\n' or '\r' in filenames
199 l = fp.readline()
216 l = fp.readline()
200 try:
217 try:
201 name, size = l.split('\0', 1)
218 name, size = l.split('\0', 1)
202 size = int(size)
219 size = int(size)
203 except (ValueError, TypeError):
220 except (ValueError, TypeError):
204 raise error.ResponseError(
221 raise error.ResponseError(
205 _('unexpected response from remote server:'), l)
222 _('unexpected response from remote server:'), l)
206 if repo.ui.debugflag:
223 if repo.ui.debugflag:
207 repo.ui.debug('adding %s (%s)\n' %
224 repo.ui.debug('adding %s (%s)\n' %
208 (name, util.bytecount(size)))
225 (name, util.bytecount(size)))
209 # for backwards compat, name was partially encoded
226 # for backwards compat, name was partially encoded
210 ofp = repo.svfs(store.decodedir(name), 'w')
227 ofp = repo.svfs(store.decodedir(name), 'w')
211 for chunk in util.filechunkiter(fp, limit=size):
228 for chunk in util.filechunkiter(fp, limit=size):
212 handled_bytes += len(chunk)
229 handled_bytes += len(chunk)
213 repo.ui.progress(_('clone'), handled_bytes,
230 repo.ui.progress(_('clone'), handled_bytes,
214 total=total_bytes)
231 total=total_bytes)
215 ofp.write(chunk)
232 ofp.write(chunk)
216 ofp.close()
233 ofp.close()
217 tr.close()
234 tr.close()
218 finally:
235 finally:
219 tr.release()
236 tr.release()
220
237
221 # Writing straight to files circumvented the inmemory caches
238 # Writing straight to files circumvented the inmemory caches
222 repo.invalidate()
239 repo.invalidate()
223
240
224 elapsed = time.time() - start
241 elapsed = time.time() - start
225 if elapsed <= 0:
242 if elapsed <= 0:
226 elapsed = 0.001
243 elapsed = 0.001
227 repo.ui.progress(_('clone'), None)
244 repo.ui.progress(_('clone'), None)
228 repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
245 repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
229 (util.bytecount(total_bytes), elapsed,
246 (util.bytecount(total_bytes), elapsed,
230 util.bytecount(total_bytes / elapsed)))
247 util.bytecount(total_bytes / elapsed)))
231 finally:
248 finally:
232 lock.release()
249 lock.release()
233
234 def applyremotedata(repo, remotereqs, remotebranchmap, fp):
235 """Apply stream clone data to a repository.
236
237 "remotereqs" is a set of requirements to handle the incoming data.
238 "remotebranchmap" is the result of a branchmap lookup on the remote. It
239 can be None.
240 "fp" is a file object containing the raw stream data, suitable for
241 feeding into consumev1().
242 """
243 lock = repo.lock()
244 try:
245 consumev1(repo, fp)
246
247 # new requirements = old non-format requirements +
248 # new format-related remote requirements
249 # requirements from the streamed-in repository
250 repo.requirements = remotereqs | (
251 repo.requirements - repo.supportedformats)
252 repo._applyopenerreqs()
253 repo._writerequirements()
254
255 if remotebranchmap:
256 branchmap.replacecache(repo, remotebranchmap)
257
258 repo.invalidate()
259 finally:
260 lock.release()
General Comments 0
You need to be logged in to leave comments. Login now