##// END OF EJS Templates
stream-clone: factor computation of requirement of a stream clone...
marmoute -
r49443:8475a136 default
parent child Browse files
Show More
@@ -1,932 +1,941 b''
1 # streamclone.py - producing and consuming streaming repository data
1 # streamclone.py - producing and consuming streaming repository data
2 #
2 #
3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import contextlib
10 import contextlib
11 import errno
11 import errno
12 import os
12 import os
13 import struct
13 import struct
14
14
15 from .i18n import _
15 from .i18n import _
16 from .pycompat import open
16 from .pycompat import open
17 from .interfaces import repository
17 from .interfaces import repository
18 from . import (
18 from . import (
19 bookmarks,
19 bookmarks,
20 cacheutil,
20 cacheutil,
21 error,
21 error,
22 narrowspec,
22 narrowspec,
23 phases,
23 phases,
24 pycompat,
24 pycompat,
25 requirements as requirementsmod,
25 requirements as requirementsmod,
26 scmutil,
26 scmutil,
27 store,
27 store,
28 util,
28 util,
29 )
29 )
30 from .utils import (
30 from .utils import (
31 stringutil,
31 stringutil,
32 )
32 )
33
33
34
34
35 def new_stream_clone_requirements(
35 def new_stream_clone_requirements(
36 supported_formats, default_requirements, streamed_requirements
36 supported_formats, default_requirements, streamed_requirements
37 ):
37 ):
38 """determine the final set of requirement for a new stream clone
38 """determine the final set of requirement for a new stream clone
39
39
40 this method combine the "default" requirements that a new repository would
40 this method combine the "default" requirements that a new repository would
41 use with the constaint we get from the stream clone content. We keep local
41 use with the constaint we get from the stream clone content. We keep local
42 configuration choice when possible.
42 configuration choice when possible.
43 """
43 """
44 requirements = set(default_requirements)
44 requirements = set(default_requirements)
45 requirements -= supported_formats
45 requirements -= supported_formats
46 requirements.update(streamed_requirements)
46 requirements.update(streamed_requirements)
47 return requirements
47 return requirements
48
48
49
49
50 def streamed_requirements(repo):
51 """the set of requirement the new clone will have to support
52
53 This is used for advertising the stream options and to generate the actual
54 stream content."""
55 requiredformats = repo.requirements & repo.supportedformats
56 return requiredformats
57
58
50 def canperformstreamclone(pullop, bundle2=False):
59 def canperformstreamclone(pullop, bundle2=False):
51 """Whether it is possible to perform a streaming clone as part of pull.
60 """Whether it is possible to perform a streaming clone as part of pull.
52
61
53 ``bundle2`` will cause the function to consider stream clone through
62 ``bundle2`` will cause the function to consider stream clone through
54 bundle2 and only through bundle2.
63 bundle2 and only through bundle2.
55
64
56 Returns a tuple of (supported, requirements). ``supported`` is True if
65 Returns a tuple of (supported, requirements). ``supported`` is True if
57 streaming clone is supported and False otherwise. ``requirements`` is
66 streaming clone is supported and False otherwise. ``requirements`` is
58 a set of repo requirements from the remote, or ``None`` if stream clone
67 a set of repo requirements from the remote, or ``None`` if stream clone
59 isn't supported.
68 isn't supported.
60 """
69 """
61 repo = pullop.repo
70 repo = pullop.repo
62 remote = pullop.remote
71 remote = pullop.remote
63
72
64 bundle2supported = False
73 bundle2supported = False
65 if pullop.canusebundle2:
74 if pullop.canusebundle2:
66 if b'v2' in pullop.remotebundle2caps.get(b'stream', []):
75 if b'v2' in pullop.remotebundle2caps.get(b'stream', []):
67 bundle2supported = True
76 bundle2supported = True
68 # else
77 # else
69 # Server doesn't support bundle2 stream clone or doesn't support
78 # Server doesn't support bundle2 stream clone or doesn't support
70 # the versions we support. Fall back and possibly allow legacy.
79 # the versions we support. Fall back and possibly allow legacy.
71
80
72 # Ensures legacy code path uses available bundle2.
81 # Ensures legacy code path uses available bundle2.
73 if bundle2supported and not bundle2:
82 if bundle2supported and not bundle2:
74 return False, None
83 return False, None
75 # Ensures bundle2 doesn't try to do a stream clone if it isn't supported.
84 # Ensures bundle2 doesn't try to do a stream clone if it isn't supported.
76 elif bundle2 and not bundle2supported:
85 elif bundle2 and not bundle2supported:
77 return False, None
86 return False, None
78
87
79 # Streaming clone only works on empty repositories.
88 # Streaming clone only works on empty repositories.
80 if len(repo):
89 if len(repo):
81 return False, None
90 return False, None
82
91
83 # Streaming clone only works if all data is being requested.
92 # Streaming clone only works if all data is being requested.
84 if pullop.heads:
93 if pullop.heads:
85 return False, None
94 return False, None
86
95
87 streamrequested = pullop.streamclonerequested
96 streamrequested = pullop.streamclonerequested
88
97
89 # If we don't have a preference, let the server decide for us. This
98 # If we don't have a preference, let the server decide for us. This
90 # likely only comes into play in LANs.
99 # likely only comes into play in LANs.
91 if streamrequested is None:
100 if streamrequested is None:
92 # The server can advertise whether to prefer streaming clone.
101 # The server can advertise whether to prefer streaming clone.
93 streamrequested = remote.capable(b'stream-preferred')
102 streamrequested = remote.capable(b'stream-preferred')
94
103
95 if not streamrequested:
104 if not streamrequested:
96 return False, None
105 return False, None
97
106
98 # In order for stream clone to work, the client has to support all the
107 # In order for stream clone to work, the client has to support all the
99 # requirements advertised by the server.
108 # requirements advertised by the server.
100 #
109 #
101 # The server advertises its requirements via the "stream" and "streamreqs"
110 # The server advertises its requirements via the "stream" and "streamreqs"
102 # capability. "stream" (a value-less capability) is advertised if and only
111 # capability. "stream" (a value-less capability) is advertised if and only
103 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
112 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
104 # is advertised and contains a comma-delimited list of requirements.
113 # is advertised and contains a comma-delimited list of requirements.
105 requirements = set()
114 requirements = set()
106 if remote.capable(b'stream'):
115 if remote.capable(b'stream'):
107 requirements.add(requirementsmod.REVLOGV1_REQUIREMENT)
116 requirements.add(requirementsmod.REVLOGV1_REQUIREMENT)
108 else:
117 else:
109 streamreqs = remote.capable(b'streamreqs')
118 streamreqs = remote.capable(b'streamreqs')
110 # This is weird and shouldn't happen with modern servers.
119 # This is weird and shouldn't happen with modern servers.
111 if not streamreqs:
120 if not streamreqs:
112 pullop.repo.ui.warn(
121 pullop.repo.ui.warn(
113 _(
122 _(
114 b'warning: stream clone requested but server has them '
123 b'warning: stream clone requested but server has them '
115 b'disabled\n'
124 b'disabled\n'
116 )
125 )
117 )
126 )
118 return False, None
127 return False, None
119
128
120 streamreqs = set(streamreqs.split(b','))
129 streamreqs = set(streamreqs.split(b','))
121 # Server requires something we don't support. Bail.
130 # Server requires something we don't support. Bail.
122 missingreqs = streamreqs - repo.supportedformats
131 missingreqs = streamreqs - repo.supportedformats
123 if missingreqs:
132 if missingreqs:
124 pullop.repo.ui.warn(
133 pullop.repo.ui.warn(
125 _(
134 _(
126 b'warning: stream clone requested but client is missing '
135 b'warning: stream clone requested but client is missing '
127 b'requirements: %s\n'
136 b'requirements: %s\n'
128 )
137 )
129 % b', '.join(sorted(missingreqs))
138 % b', '.join(sorted(missingreqs))
130 )
139 )
131 pullop.repo.ui.warn(
140 pullop.repo.ui.warn(
132 _(
141 _(
133 b'(see https://www.mercurial-scm.org/wiki/MissingRequirement '
142 b'(see https://www.mercurial-scm.org/wiki/MissingRequirement '
134 b'for more information)\n'
143 b'for more information)\n'
135 )
144 )
136 )
145 )
137 return False, None
146 return False, None
138 requirements = streamreqs
147 requirements = streamreqs
139
148
140 return True, requirements
149 return True, requirements
141
150
142
151
143 def maybeperformlegacystreamclone(pullop):
152 def maybeperformlegacystreamclone(pullop):
144 """Possibly perform a legacy stream clone operation.
153 """Possibly perform a legacy stream clone operation.
145
154
146 Legacy stream clones are performed as part of pull but before all other
155 Legacy stream clones are performed as part of pull but before all other
147 operations.
156 operations.
148
157
149 A legacy stream clone will not be performed if a bundle2 stream clone is
158 A legacy stream clone will not be performed if a bundle2 stream clone is
150 supported.
159 supported.
151 """
160 """
152 from . import localrepo
161 from . import localrepo
153
162
154 supported, requirements = canperformstreamclone(pullop)
163 supported, requirements = canperformstreamclone(pullop)
155
164
156 if not supported:
165 if not supported:
157 return
166 return
158
167
159 repo = pullop.repo
168 repo = pullop.repo
160 remote = pullop.remote
169 remote = pullop.remote
161
170
162 # Save remote branchmap. We will use it later to speed up branchcache
171 # Save remote branchmap. We will use it later to speed up branchcache
163 # creation.
172 # creation.
164 rbranchmap = None
173 rbranchmap = None
165 if remote.capable(b'branchmap'):
174 if remote.capable(b'branchmap'):
166 with remote.commandexecutor() as e:
175 with remote.commandexecutor() as e:
167 rbranchmap = e.callcommand(b'branchmap', {}).result()
176 rbranchmap = e.callcommand(b'branchmap', {}).result()
168
177
169 repo.ui.status(_(b'streaming all changes\n'))
178 repo.ui.status(_(b'streaming all changes\n'))
170
179
171 with remote.commandexecutor() as e:
180 with remote.commandexecutor() as e:
172 fp = e.callcommand(b'stream_out', {}).result()
181 fp = e.callcommand(b'stream_out', {}).result()
173
182
174 # TODO strictly speaking, this code should all be inside the context
183 # TODO strictly speaking, this code should all be inside the context
175 # manager because the context manager is supposed to ensure all wire state
184 # manager because the context manager is supposed to ensure all wire state
176 # is flushed when exiting. But the legacy peers don't do this, so it
185 # is flushed when exiting. But the legacy peers don't do this, so it
177 # doesn't matter.
186 # doesn't matter.
178 l = fp.readline()
187 l = fp.readline()
179 try:
188 try:
180 resp = int(l)
189 resp = int(l)
181 except ValueError:
190 except ValueError:
182 raise error.ResponseError(
191 raise error.ResponseError(
183 _(b'unexpected response from remote server:'), l
192 _(b'unexpected response from remote server:'), l
184 )
193 )
185 if resp == 1:
194 if resp == 1:
186 raise error.Abort(_(b'operation forbidden by server'))
195 raise error.Abort(_(b'operation forbidden by server'))
187 elif resp == 2:
196 elif resp == 2:
188 raise error.Abort(_(b'locking the remote repository failed'))
197 raise error.Abort(_(b'locking the remote repository failed'))
189 elif resp != 0:
198 elif resp != 0:
190 raise error.Abort(_(b'the server sent an unknown error code'))
199 raise error.Abort(_(b'the server sent an unknown error code'))
191
200
192 l = fp.readline()
201 l = fp.readline()
193 try:
202 try:
194 filecount, bytecount = map(int, l.split(b' ', 1))
203 filecount, bytecount = map(int, l.split(b' ', 1))
195 except (ValueError, TypeError):
204 except (ValueError, TypeError):
196 raise error.ResponseError(
205 raise error.ResponseError(
197 _(b'unexpected response from remote server:'), l
206 _(b'unexpected response from remote server:'), l
198 )
207 )
199
208
200 with repo.lock():
209 with repo.lock():
201 consumev1(repo, fp, filecount, bytecount)
210 consumev1(repo, fp, filecount, bytecount)
202 repo.requirements = new_stream_clone_requirements(
211 repo.requirements = new_stream_clone_requirements(
203 repo.supportedformats,
212 repo.supportedformats,
204 repo.requirements,
213 repo.requirements,
205 requirements,
214 requirements,
206 )
215 )
207 repo.svfs.options = localrepo.resolvestorevfsoptions(
216 repo.svfs.options = localrepo.resolvestorevfsoptions(
208 repo.ui, repo.requirements, repo.features
217 repo.ui, repo.requirements, repo.features
209 )
218 )
210 scmutil.writereporequirements(repo)
219 scmutil.writereporequirements(repo)
211
220
212 if rbranchmap:
221 if rbranchmap:
213 repo._branchcaches.replace(repo, rbranchmap)
222 repo._branchcaches.replace(repo, rbranchmap)
214
223
215 repo.invalidate()
224 repo.invalidate()
216
225
217
226
218 def allowservergeneration(repo):
227 def allowservergeneration(repo):
219 """Whether streaming clones are allowed from the server."""
228 """Whether streaming clones are allowed from the server."""
220 if repository.REPO_FEATURE_STREAM_CLONE not in repo.features:
229 if repository.REPO_FEATURE_STREAM_CLONE not in repo.features:
221 return False
230 return False
222
231
223 if not repo.ui.configbool(b'server', b'uncompressed', untrusted=True):
232 if not repo.ui.configbool(b'server', b'uncompressed', untrusted=True):
224 return False
233 return False
225
234
226 # The way stream clone works makes it impossible to hide secret changesets.
235 # The way stream clone works makes it impossible to hide secret changesets.
227 # So don't allow this by default.
236 # So don't allow this by default.
228 secret = phases.hassecret(repo)
237 secret = phases.hassecret(repo)
229 if secret:
238 if secret:
230 return repo.ui.configbool(b'server', b'uncompressedallowsecret')
239 return repo.ui.configbool(b'server', b'uncompressedallowsecret')
231
240
232 return True
241 return True
233
242
234
243
235 # This is it's own function so extensions can override it.
244 # This is it's own function so extensions can override it.
236 def _walkstreamfiles(repo, matcher=None):
245 def _walkstreamfiles(repo, matcher=None):
237 return repo.store.walk(matcher)
246 return repo.store.walk(matcher)
238
247
239
248
240 def generatev1(repo):
249 def generatev1(repo):
241 """Emit content for version 1 of a streaming clone.
250 """Emit content for version 1 of a streaming clone.
242
251
243 This returns a 3-tuple of (file count, byte size, data iterator).
252 This returns a 3-tuple of (file count, byte size, data iterator).
244
253
245 The data iterator consists of N entries for each file being transferred.
254 The data iterator consists of N entries for each file being transferred.
246 Each file entry starts as a line with the file name and integer size
255 Each file entry starts as a line with the file name and integer size
247 delimited by a null byte.
256 delimited by a null byte.
248
257
249 The raw file data follows. Following the raw file data is the next file
258 The raw file data follows. Following the raw file data is the next file
250 entry, or EOF.
259 entry, or EOF.
251
260
252 When used on the wire protocol, an additional line indicating protocol
261 When used on the wire protocol, an additional line indicating protocol
253 success will be prepended to the stream. This function is not responsible
262 success will be prepended to the stream. This function is not responsible
254 for adding it.
263 for adding it.
255
264
256 This function will obtain a repository lock to ensure a consistent view of
265 This function will obtain a repository lock to ensure a consistent view of
257 the store is captured. It therefore may raise LockError.
266 the store is captured. It therefore may raise LockError.
258 """
267 """
259 entries = []
268 entries = []
260 total_bytes = 0
269 total_bytes = 0
261 # Get consistent snapshot of repo, lock during scan.
270 # Get consistent snapshot of repo, lock during scan.
262 with repo.lock():
271 with repo.lock():
263 repo.ui.debug(b'scanning\n')
272 repo.ui.debug(b'scanning\n')
264 for file_type, name, size in _walkstreamfiles(repo):
273 for file_type, name, size in _walkstreamfiles(repo):
265 if size:
274 if size:
266 entries.append((name, size))
275 entries.append((name, size))
267 total_bytes += size
276 total_bytes += size
268 _test_sync_point_walk_1(repo)
277 _test_sync_point_walk_1(repo)
269 _test_sync_point_walk_2(repo)
278 _test_sync_point_walk_2(repo)
270
279
271 repo.ui.debug(
280 repo.ui.debug(
272 b'%d files, %d bytes to transfer\n' % (len(entries), total_bytes)
281 b'%d files, %d bytes to transfer\n' % (len(entries), total_bytes)
273 )
282 )
274
283
275 svfs = repo.svfs
284 svfs = repo.svfs
276 debugflag = repo.ui.debugflag
285 debugflag = repo.ui.debugflag
277
286
278 def emitrevlogdata():
287 def emitrevlogdata():
279 for name, size in entries:
288 for name, size in entries:
280 if debugflag:
289 if debugflag:
281 repo.ui.debug(b'sending %s (%d bytes)\n' % (name, size))
290 repo.ui.debug(b'sending %s (%d bytes)\n' % (name, size))
282 # partially encode name over the wire for backwards compat
291 # partially encode name over the wire for backwards compat
283 yield b'%s\0%d\n' % (store.encodedir(name), size)
292 yield b'%s\0%d\n' % (store.encodedir(name), size)
284 # auditing at this stage is both pointless (paths are already
293 # auditing at this stage is both pointless (paths are already
285 # trusted by the local repo) and expensive
294 # trusted by the local repo) and expensive
286 with svfs(name, b'rb', auditpath=False) as fp:
295 with svfs(name, b'rb', auditpath=False) as fp:
287 if size <= 65536:
296 if size <= 65536:
288 yield fp.read(size)
297 yield fp.read(size)
289 else:
298 else:
290 for chunk in util.filechunkiter(fp, limit=size):
299 for chunk in util.filechunkiter(fp, limit=size):
291 yield chunk
300 yield chunk
292
301
293 return len(entries), total_bytes, emitrevlogdata()
302 return len(entries), total_bytes, emitrevlogdata()
294
303
295
304
296 def generatev1wireproto(repo):
305 def generatev1wireproto(repo):
297 """Emit content for version 1 of streaming clone suitable for the wire.
306 """Emit content for version 1 of streaming clone suitable for the wire.
298
307
299 This is the data output from ``generatev1()`` with 2 header lines. The
308 This is the data output from ``generatev1()`` with 2 header lines. The
300 first line indicates overall success. The 2nd contains the file count and
309 first line indicates overall success. The 2nd contains the file count and
301 byte size of payload.
310 byte size of payload.
302
311
303 The success line contains "0" for success, "1" for stream generation not
312 The success line contains "0" for success, "1" for stream generation not
304 allowed, and "2" for error locking the repository (possibly indicating
313 allowed, and "2" for error locking the repository (possibly indicating
305 a permissions error for the server process).
314 a permissions error for the server process).
306 """
315 """
307 if not allowservergeneration(repo):
316 if not allowservergeneration(repo):
308 yield b'1\n'
317 yield b'1\n'
309 return
318 return
310
319
311 try:
320 try:
312 filecount, bytecount, it = generatev1(repo)
321 filecount, bytecount, it = generatev1(repo)
313 except error.LockError:
322 except error.LockError:
314 yield b'2\n'
323 yield b'2\n'
315 return
324 return
316
325
317 # Indicates successful response.
326 # Indicates successful response.
318 yield b'0\n'
327 yield b'0\n'
319 yield b'%d %d\n' % (filecount, bytecount)
328 yield b'%d %d\n' % (filecount, bytecount)
320 for chunk in it:
329 for chunk in it:
321 yield chunk
330 yield chunk
322
331
323
332
324 def generatebundlev1(repo, compression=b'UN'):
333 def generatebundlev1(repo, compression=b'UN'):
325 """Emit content for version 1 of a stream clone bundle.
334 """Emit content for version 1 of a stream clone bundle.
326
335
327 The first 4 bytes of the output ("HGS1") denote this as stream clone
336 The first 4 bytes of the output ("HGS1") denote this as stream clone
328 bundle version 1.
337 bundle version 1.
329
338
330 The next 2 bytes indicate the compression type. Only "UN" is currently
339 The next 2 bytes indicate the compression type. Only "UN" is currently
331 supported.
340 supported.
332
341
333 The next 16 bytes are two 64-bit big endian unsigned integers indicating
342 The next 16 bytes are two 64-bit big endian unsigned integers indicating
334 file count and byte count, respectively.
343 file count and byte count, respectively.
335
344
336 The next 2 bytes is a 16-bit big endian unsigned short declaring the length
345 The next 2 bytes is a 16-bit big endian unsigned short declaring the length
337 of the requirements string, including a trailing \0. The following N bytes
346 of the requirements string, including a trailing \0. The following N bytes
338 are the requirements string, which is ASCII containing a comma-delimited
347 are the requirements string, which is ASCII containing a comma-delimited
339 list of repo requirements that are needed to support the data.
348 list of repo requirements that are needed to support the data.
340
349
341 The remaining content is the output of ``generatev1()`` (which may be
350 The remaining content is the output of ``generatev1()`` (which may be
342 compressed in the future).
351 compressed in the future).
343
352
344 Returns a tuple of (requirements, data generator).
353 Returns a tuple of (requirements, data generator).
345 """
354 """
346 if compression != b'UN':
355 if compression != b'UN':
347 raise ValueError(b'we do not support the compression argument yet')
356 raise ValueError(b'we do not support the compression argument yet')
348
357
349 requirements = repo.requirements & repo.supportedformats
358 requirements = streamed_requirements(repo)
350 requires = b','.join(sorted(requirements))
359 requires = b','.join(sorted(requirements))
351
360
352 def gen():
361 def gen():
353 yield b'HGS1'
362 yield b'HGS1'
354 yield compression
363 yield compression
355
364
356 filecount, bytecount, it = generatev1(repo)
365 filecount, bytecount, it = generatev1(repo)
357 repo.ui.status(
366 repo.ui.status(
358 _(b'writing %d bytes for %d files\n') % (bytecount, filecount)
367 _(b'writing %d bytes for %d files\n') % (bytecount, filecount)
359 )
368 )
360
369
361 yield struct.pack(b'>QQ', filecount, bytecount)
370 yield struct.pack(b'>QQ', filecount, bytecount)
362 yield struct.pack(b'>H', len(requires) + 1)
371 yield struct.pack(b'>H', len(requires) + 1)
363 yield requires + b'\0'
372 yield requires + b'\0'
364
373
365 # This is where we'll add compression in the future.
374 # This is where we'll add compression in the future.
366 assert compression == b'UN'
375 assert compression == b'UN'
367
376
368 progress = repo.ui.makeprogress(
377 progress = repo.ui.makeprogress(
369 _(b'bundle'), total=bytecount, unit=_(b'bytes')
378 _(b'bundle'), total=bytecount, unit=_(b'bytes')
370 )
379 )
371 progress.update(0)
380 progress.update(0)
372
381
373 for chunk in it:
382 for chunk in it:
374 progress.increment(step=len(chunk))
383 progress.increment(step=len(chunk))
375 yield chunk
384 yield chunk
376
385
377 progress.complete()
386 progress.complete()
378
387
379 return requirements, gen()
388 return requirements, gen()
380
389
381
390
382 def consumev1(repo, fp, filecount, bytecount):
391 def consumev1(repo, fp, filecount, bytecount):
383 """Apply the contents from version 1 of a streaming clone file handle.
392 """Apply the contents from version 1 of a streaming clone file handle.
384
393
385 This takes the output from "stream_out" and applies it to the specified
394 This takes the output from "stream_out" and applies it to the specified
386 repository.
395 repository.
387
396
388 Like "stream_out," the status line added by the wire protocol is not
397 Like "stream_out," the status line added by the wire protocol is not
389 handled by this function.
398 handled by this function.
390 """
399 """
391 with repo.lock():
400 with repo.lock():
392 repo.ui.status(
401 repo.ui.status(
393 _(b'%d files to transfer, %s of data\n')
402 _(b'%d files to transfer, %s of data\n')
394 % (filecount, util.bytecount(bytecount))
403 % (filecount, util.bytecount(bytecount))
395 )
404 )
396 progress = repo.ui.makeprogress(
405 progress = repo.ui.makeprogress(
397 _(b'clone'), total=bytecount, unit=_(b'bytes')
406 _(b'clone'), total=bytecount, unit=_(b'bytes')
398 )
407 )
399 progress.update(0)
408 progress.update(0)
400 start = util.timer()
409 start = util.timer()
401
410
402 # TODO: get rid of (potential) inconsistency
411 # TODO: get rid of (potential) inconsistency
403 #
412 #
404 # If transaction is started and any @filecache property is
413 # If transaction is started and any @filecache property is
405 # changed at this point, it causes inconsistency between
414 # changed at this point, it causes inconsistency between
406 # in-memory cached property and streamclone-ed file on the
415 # in-memory cached property and streamclone-ed file on the
407 # disk. Nested transaction prevents transaction scope "clone"
416 # disk. Nested transaction prevents transaction scope "clone"
408 # below from writing in-memory changes out at the end of it,
417 # below from writing in-memory changes out at the end of it,
409 # even though in-memory changes are discarded at the end of it
418 # even though in-memory changes are discarded at the end of it
410 # regardless of transaction nesting.
419 # regardless of transaction nesting.
411 #
420 #
412 # But transaction nesting can't be simply prohibited, because
421 # But transaction nesting can't be simply prohibited, because
413 # nesting occurs also in ordinary case (e.g. enabling
422 # nesting occurs also in ordinary case (e.g. enabling
414 # clonebundles).
423 # clonebundles).
415
424
416 with repo.transaction(b'clone'):
425 with repo.transaction(b'clone'):
417 with repo.svfs.backgroundclosing(repo.ui, expectedcount=filecount):
426 with repo.svfs.backgroundclosing(repo.ui, expectedcount=filecount):
418 for i in pycompat.xrange(filecount):
427 for i in pycompat.xrange(filecount):
419 # XXX doesn't support '\n' or '\r' in filenames
428 # XXX doesn't support '\n' or '\r' in filenames
420 l = fp.readline()
429 l = fp.readline()
421 try:
430 try:
422 name, size = l.split(b'\0', 1)
431 name, size = l.split(b'\0', 1)
423 size = int(size)
432 size = int(size)
424 except (ValueError, TypeError):
433 except (ValueError, TypeError):
425 raise error.ResponseError(
434 raise error.ResponseError(
426 _(b'unexpected response from remote server:'), l
435 _(b'unexpected response from remote server:'), l
427 )
436 )
428 if repo.ui.debugflag:
437 if repo.ui.debugflag:
429 repo.ui.debug(
438 repo.ui.debug(
430 b'adding %s (%s)\n' % (name, util.bytecount(size))
439 b'adding %s (%s)\n' % (name, util.bytecount(size))
431 )
440 )
432 # for backwards compat, name was partially encoded
441 # for backwards compat, name was partially encoded
433 path = store.decodedir(name)
442 path = store.decodedir(name)
434 with repo.svfs(path, b'w', backgroundclose=True) as ofp:
443 with repo.svfs(path, b'w', backgroundclose=True) as ofp:
435 for chunk in util.filechunkiter(fp, limit=size):
444 for chunk in util.filechunkiter(fp, limit=size):
436 progress.increment(step=len(chunk))
445 progress.increment(step=len(chunk))
437 ofp.write(chunk)
446 ofp.write(chunk)
438
447
439 # force @filecache properties to be reloaded from
448 # force @filecache properties to be reloaded from
440 # streamclone-ed file at next access
449 # streamclone-ed file at next access
441 repo.invalidate(clearfilecache=True)
450 repo.invalidate(clearfilecache=True)
442
451
443 elapsed = util.timer() - start
452 elapsed = util.timer() - start
444 if elapsed <= 0:
453 if elapsed <= 0:
445 elapsed = 0.001
454 elapsed = 0.001
446 progress.complete()
455 progress.complete()
447 repo.ui.status(
456 repo.ui.status(
448 _(b'transferred %s in %.1f seconds (%s/sec)\n')
457 _(b'transferred %s in %.1f seconds (%s/sec)\n')
449 % (
458 % (
450 util.bytecount(bytecount),
459 util.bytecount(bytecount),
451 elapsed,
460 elapsed,
452 util.bytecount(bytecount / elapsed),
461 util.bytecount(bytecount / elapsed),
453 )
462 )
454 )
463 )
455
464
456
465
457 def readbundle1header(fp):
466 def readbundle1header(fp):
458 compression = fp.read(2)
467 compression = fp.read(2)
459 if compression != b'UN':
468 if compression != b'UN':
460 raise error.Abort(
469 raise error.Abort(
461 _(
470 _(
462 b'only uncompressed stream clone bundles are '
471 b'only uncompressed stream clone bundles are '
463 b'supported; got %s'
472 b'supported; got %s'
464 )
473 )
465 % compression
474 % compression
466 )
475 )
467
476
468 filecount, bytecount = struct.unpack(b'>QQ', fp.read(16))
477 filecount, bytecount = struct.unpack(b'>QQ', fp.read(16))
469 requireslen = struct.unpack(b'>H', fp.read(2))[0]
478 requireslen = struct.unpack(b'>H', fp.read(2))[0]
470 requires = fp.read(requireslen)
479 requires = fp.read(requireslen)
471
480
472 if not requires.endswith(b'\0'):
481 if not requires.endswith(b'\0'):
473 raise error.Abort(
482 raise error.Abort(
474 _(
483 _(
475 b'malformed stream clone bundle: '
484 b'malformed stream clone bundle: '
476 b'requirements not properly encoded'
485 b'requirements not properly encoded'
477 )
486 )
478 )
487 )
479
488
480 requirements = set(requires.rstrip(b'\0').split(b','))
489 requirements = set(requires.rstrip(b'\0').split(b','))
481
490
482 return filecount, bytecount, requirements
491 return filecount, bytecount, requirements
483
492
484
493
485 def applybundlev1(repo, fp):
494 def applybundlev1(repo, fp):
486 """Apply the content from a stream clone bundle version 1.
495 """Apply the content from a stream clone bundle version 1.
487
496
488 We assume the 4 byte header has been read and validated and the file handle
497 We assume the 4 byte header has been read and validated and the file handle
489 is at the 2 byte compression identifier.
498 is at the 2 byte compression identifier.
490 """
499 """
491 if len(repo):
500 if len(repo):
492 raise error.Abort(
501 raise error.Abort(
493 _(b'cannot apply stream clone bundle on non-empty repo')
502 _(b'cannot apply stream clone bundle on non-empty repo')
494 )
503 )
495
504
496 filecount, bytecount, requirements = readbundle1header(fp)
505 filecount, bytecount, requirements = readbundle1header(fp)
497 missingreqs = requirements - repo.supportedformats
506 missingreqs = requirements - repo.supportedformats
498 if missingreqs:
507 if missingreqs:
499 raise error.Abort(
508 raise error.Abort(
500 _(b'unable to apply stream clone: unsupported format: %s')
509 _(b'unable to apply stream clone: unsupported format: %s')
501 % b', '.join(sorted(missingreqs))
510 % b', '.join(sorted(missingreqs))
502 )
511 )
503
512
504 consumev1(repo, fp, filecount, bytecount)
513 consumev1(repo, fp, filecount, bytecount)
505
514
506
515
507 class streamcloneapplier(object):
516 class streamcloneapplier(object):
508 """Class to manage applying streaming clone bundles.
517 """Class to manage applying streaming clone bundles.
509
518
510 We need to wrap ``applybundlev1()`` in a dedicated type to enable bundle
519 We need to wrap ``applybundlev1()`` in a dedicated type to enable bundle
511 readers to perform bundle type-specific functionality.
520 readers to perform bundle type-specific functionality.
512 """
521 """
513
522
514 def __init__(self, fh):
523 def __init__(self, fh):
515 self._fh = fh
524 self._fh = fh
516
525
517 def apply(self, repo):
526 def apply(self, repo):
518 return applybundlev1(repo, self._fh)
527 return applybundlev1(repo, self._fh)
519
528
520
529
521 # type of file to stream
530 # type of file to stream
522 _fileappend = 0 # append only file
531 _fileappend = 0 # append only file
523 _filefull = 1 # full snapshot file
532 _filefull = 1 # full snapshot file
524
533
525 # Source of the file
534 # Source of the file
526 _srcstore = b's' # store (svfs)
535 _srcstore = b's' # store (svfs)
527 _srccache = b'c' # cache (cache)
536 _srccache = b'c' # cache (cache)
528
537
529 # This is it's own function so extensions can override it.
538 # This is it's own function so extensions can override it.
530 def _walkstreamfullstorefiles(repo):
539 def _walkstreamfullstorefiles(repo):
531 """list snapshot file from the store"""
540 """list snapshot file from the store"""
532 fnames = []
541 fnames = []
533 if not repo.publishing():
542 if not repo.publishing():
534 fnames.append(b'phaseroots')
543 fnames.append(b'phaseroots')
535 return fnames
544 return fnames
536
545
537
546
538 def _filterfull(entry, copy, vfsmap):
547 def _filterfull(entry, copy, vfsmap):
539 """actually copy the snapshot files"""
548 """actually copy the snapshot files"""
540 src, name, ftype, data = entry
549 src, name, ftype, data = entry
541 if ftype != _filefull:
550 if ftype != _filefull:
542 return entry
551 return entry
543 return (src, name, ftype, copy(vfsmap[src].join(name)))
552 return (src, name, ftype, copy(vfsmap[src].join(name)))
544
553
545
554
546 @contextlib.contextmanager
555 @contextlib.contextmanager
547 def maketempcopies():
556 def maketempcopies():
548 """return a function to temporary copy file"""
557 """return a function to temporary copy file"""
549 files = []
558 files = []
550 try:
559 try:
551
560
552 def copy(src):
561 def copy(src):
553 fd, dst = pycompat.mkstemp()
562 fd, dst = pycompat.mkstemp()
554 os.close(fd)
563 os.close(fd)
555 files.append(dst)
564 files.append(dst)
556 util.copyfiles(src, dst, hardlink=True)
565 util.copyfiles(src, dst, hardlink=True)
557 return dst
566 return dst
558
567
559 yield copy
568 yield copy
560 finally:
569 finally:
561 for tmp in files:
570 for tmp in files:
562 util.tryunlink(tmp)
571 util.tryunlink(tmp)
563
572
564
573
565 def _makemap(repo):
574 def _makemap(repo):
566 """make a (src -> vfs) map for the repo"""
575 """make a (src -> vfs) map for the repo"""
567 vfsmap = {
576 vfsmap = {
568 _srcstore: repo.svfs,
577 _srcstore: repo.svfs,
569 _srccache: repo.cachevfs,
578 _srccache: repo.cachevfs,
570 }
579 }
571 # we keep repo.vfs out of the on purpose, ther are too many danger there
580 # we keep repo.vfs out of the on purpose, ther are too many danger there
572 # (eg: .hg/hgrc)
581 # (eg: .hg/hgrc)
573 assert repo.vfs not in vfsmap.values()
582 assert repo.vfs not in vfsmap.values()
574
583
575 return vfsmap
584 return vfsmap
576
585
577
586
578 def _emit2(repo, entries, totalfilesize):
587 def _emit2(repo, entries, totalfilesize):
579 """actually emit the stream bundle"""
588 """actually emit the stream bundle"""
580 vfsmap = _makemap(repo)
589 vfsmap = _makemap(repo)
581 # we keep repo.vfs out of the on purpose, ther are too many danger there
590 # we keep repo.vfs out of the on purpose, ther are too many danger there
582 # (eg: .hg/hgrc),
591 # (eg: .hg/hgrc),
583 #
592 #
584 # this assert is duplicated (from _makemap) as author might think this is
593 # this assert is duplicated (from _makemap) as author might think this is
585 # fine, while this is really not fine.
594 # fine, while this is really not fine.
586 if repo.vfs in vfsmap.values():
595 if repo.vfs in vfsmap.values():
587 raise error.ProgrammingError(
596 raise error.ProgrammingError(
588 b'repo.vfs must not be added to vfsmap for security reasons'
597 b'repo.vfs must not be added to vfsmap for security reasons'
589 )
598 )
590
599
591 progress = repo.ui.makeprogress(
600 progress = repo.ui.makeprogress(
592 _(b'bundle'), total=totalfilesize, unit=_(b'bytes')
601 _(b'bundle'), total=totalfilesize, unit=_(b'bytes')
593 )
602 )
594 progress.update(0)
603 progress.update(0)
595 with maketempcopies() as copy, progress:
604 with maketempcopies() as copy, progress:
596 # copy is delayed until we are in the try
605 # copy is delayed until we are in the try
597 entries = [_filterfull(e, copy, vfsmap) for e in entries]
606 entries = [_filterfull(e, copy, vfsmap) for e in entries]
598 yield None # this release the lock on the repository
607 yield None # this release the lock on the repository
599 totalbytecount = 0
608 totalbytecount = 0
600
609
601 for src, name, ftype, data in entries:
610 for src, name, ftype, data in entries:
602 vfs = vfsmap[src]
611 vfs = vfsmap[src]
603 yield src
612 yield src
604 yield util.uvarintencode(len(name))
613 yield util.uvarintencode(len(name))
605 if ftype == _fileappend:
614 if ftype == _fileappend:
606 fp = vfs(name)
615 fp = vfs(name)
607 size = data
616 size = data
608 elif ftype == _filefull:
617 elif ftype == _filefull:
609 fp = open(data, b'rb')
618 fp = open(data, b'rb')
610 size = util.fstat(fp).st_size
619 size = util.fstat(fp).st_size
611 bytecount = 0
620 bytecount = 0
612 try:
621 try:
613 yield util.uvarintencode(size)
622 yield util.uvarintencode(size)
614 yield name
623 yield name
615 if size <= 65536:
624 if size <= 65536:
616 chunks = (fp.read(size),)
625 chunks = (fp.read(size),)
617 else:
626 else:
618 chunks = util.filechunkiter(fp, limit=size)
627 chunks = util.filechunkiter(fp, limit=size)
619 for chunk in chunks:
628 for chunk in chunks:
620 bytecount += len(chunk)
629 bytecount += len(chunk)
621 totalbytecount += len(chunk)
630 totalbytecount += len(chunk)
622 progress.update(totalbytecount)
631 progress.update(totalbytecount)
623 yield chunk
632 yield chunk
624 if bytecount != size:
633 if bytecount != size:
625 # Would most likely be caused by a race due to `hg strip` or
634 # Would most likely be caused by a race due to `hg strip` or
626 # a revlog split
635 # a revlog split
627 raise error.Abort(
636 raise error.Abort(
628 _(
637 _(
629 b'clone could only read %d bytes from %s, but '
638 b'clone could only read %d bytes from %s, but '
630 b'expected %d bytes'
639 b'expected %d bytes'
631 )
640 )
632 % (bytecount, name, size)
641 % (bytecount, name, size)
633 )
642 )
634 finally:
643 finally:
635 fp.close()
644 fp.close()
636
645
637
646
638 def _test_sync_point_walk_1(repo):
647 def _test_sync_point_walk_1(repo):
639 """a function for synchronisation during tests"""
648 """a function for synchronisation during tests"""
640
649
641
650
642 def _test_sync_point_walk_2(repo):
651 def _test_sync_point_walk_2(repo):
643 """a function for synchronisation during tests"""
652 """a function for synchronisation during tests"""
644
653
645
654
646 def _v2_walk(repo, includes, excludes, includeobsmarkers):
655 def _v2_walk(repo, includes, excludes, includeobsmarkers):
647 """emit a seris of files information useful to clone a repo
656 """emit a seris of files information useful to clone a repo
648
657
649 return (entries, totalfilesize)
658 return (entries, totalfilesize)
650
659
651 entries is a list of tuple (vfs-key, file-path, file-type, size)
660 entries is a list of tuple (vfs-key, file-path, file-type, size)
652
661
653 - `vfs-key`: is a key to the right vfs to write the file (see _makemap)
662 - `vfs-key`: is a key to the right vfs to write the file (see _makemap)
654 - `name`: file path of the file to copy (to be feed to the vfss)
663 - `name`: file path of the file to copy (to be feed to the vfss)
655 - `file-type`: do this file need to be copied with the source lock ?
664 - `file-type`: do this file need to be copied with the source lock ?
656 - `size`: the size of the file (or None)
665 - `size`: the size of the file (or None)
657 """
666 """
658 assert repo._currentlock(repo._lockref) is not None
667 assert repo._currentlock(repo._lockref) is not None
659 entries = []
668 entries = []
660 totalfilesize = 0
669 totalfilesize = 0
661
670
662 matcher = None
671 matcher = None
663 if includes or excludes:
672 if includes or excludes:
664 matcher = narrowspec.match(repo.root, includes, excludes)
673 matcher = narrowspec.match(repo.root, includes, excludes)
665
674
666 for rl_type, name, size in _walkstreamfiles(repo, matcher):
675 for rl_type, name, size in _walkstreamfiles(repo, matcher):
667 if size:
676 if size:
668 ft = _fileappend
677 ft = _fileappend
669 if rl_type & store.FILEFLAGS_VOLATILE:
678 if rl_type & store.FILEFLAGS_VOLATILE:
670 ft = _filefull
679 ft = _filefull
671 entries.append((_srcstore, name, ft, size))
680 entries.append((_srcstore, name, ft, size))
672 totalfilesize += size
681 totalfilesize += size
673 for name in _walkstreamfullstorefiles(repo):
682 for name in _walkstreamfullstorefiles(repo):
674 if repo.svfs.exists(name):
683 if repo.svfs.exists(name):
675 totalfilesize += repo.svfs.lstat(name).st_size
684 totalfilesize += repo.svfs.lstat(name).st_size
676 entries.append((_srcstore, name, _filefull, None))
685 entries.append((_srcstore, name, _filefull, None))
677 if includeobsmarkers and repo.svfs.exists(b'obsstore'):
686 if includeobsmarkers and repo.svfs.exists(b'obsstore'):
678 totalfilesize += repo.svfs.lstat(b'obsstore').st_size
687 totalfilesize += repo.svfs.lstat(b'obsstore').st_size
679 entries.append((_srcstore, b'obsstore', _filefull, None))
688 entries.append((_srcstore, b'obsstore', _filefull, None))
680 for name in cacheutil.cachetocopy(repo):
689 for name in cacheutil.cachetocopy(repo):
681 if repo.cachevfs.exists(name):
690 if repo.cachevfs.exists(name):
682 totalfilesize += repo.cachevfs.lstat(name).st_size
691 totalfilesize += repo.cachevfs.lstat(name).st_size
683 entries.append((_srccache, name, _filefull, None))
692 entries.append((_srccache, name, _filefull, None))
684 return entries, totalfilesize
693 return entries, totalfilesize
685
694
686
695
687 def generatev2(repo, includes, excludes, includeobsmarkers):
696 def generatev2(repo, includes, excludes, includeobsmarkers):
688 """Emit content for version 2 of a streaming clone.
697 """Emit content for version 2 of a streaming clone.
689
698
690 the data stream consists the following entries:
699 the data stream consists the following entries:
691 1) A char representing the file destination (eg: store or cache)
700 1) A char representing the file destination (eg: store or cache)
692 2) A varint containing the length of the filename
701 2) A varint containing the length of the filename
693 3) A varint containing the length of file data
702 3) A varint containing the length of file data
694 4) N bytes containing the filename (the internal, store-agnostic form)
703 4) N bytes containing the filename (the internal, store-agnostic form)
695 5) N bytes containing the file data
704 5) N bytes containing the file data
696
705
697 Returns a 3-tuple of (file count, file size, data iterator).
706 Returns a 3-tuple of (file count, file size, data iterator).
698 """
707 """
699
708
700 with repo.lock():
709 with repo.lock():
701
710
702 repo.ui.debug(b'scanning\n')
711 repo.ui.debug(b'scanning\n')
703
712
704 entries, totalfilesize = _v2_walk(
713 entries, totalfilesize = _v2_walk(
705 repo,
714 repo,
706 includes=includes,
715 includes=includes,
707 excludes=excludes,
716 excludes=excludes,
708 includeobsmarkers=includeobsmarkers,
717 includeobsmarkers=includeobsmarkers,
709 )
718 )
710
719
711 chunks = _emit2(repo, entries, totalfilesize)
720 chunks = _emit2(repo, entries, totalfilesize)
712 first = next(chunks)
721 first = next(chunks)
713 assert first is None
722 assert first is None
714 _test_sync_point_walk_1(repo)
723 _test_sync_point_walk_1(repo)
715 _test_sync_point_walk_2(repo)
724 _test_sync_point_walk_2(repo)
716
725
717 return len(entries), totalfilesize, chunks
726 return len(entries), totalfilesize, chunks
718
727
719
728
720 @contextlib.contextmanager
729 @contextlib.contextmanager
721 def nested(*ctxs):
730 def nested(*ctxs):
722 this = ctxs[0]
731 this = ctxs[0]
723 rest = ctxs[1:]
732 rest = ctxs[1:]
724 with this:
733 with this:
725 if rest:
734 if rest:
726 with nested(*rest):
735 with nested(*rest):
727 yield
736 yield
728 else:
737 else:
729 yield
738 yield
730
739
731
740
732 def consumev2(repo, fp, filecount, filesize):
741 def consumev2(repo, fp, filecount, filesize):
733 """Apply the contents from a version 2 streaming clone.
742 """Apply the contents from a version 2 streaming clone.
734
743
735 Data is read from an object that only needs to provide a ``read(size)``
744 Data is read from an object that only needs to provide a ``read(size)``
736 method.
745 method.
737 """
746 """
738 with repo.lock():
747 with repo.lock():
739 repo.ui.status(
748 repo.ui.status(
740 _(b'%d files to transfer, %s of data\n')
749 _(b'%d files to transfer, %s of data\n')
741 % (filecount, util.bytecount(filesize))
750 % (filecount, util.bytecount(filesize))
742 )
751 )
743
752
744 start = util.timer()
753 start = util.timer()
745 progress = repo.ui.makeprogress(
754 progress = repo.ui.makeprogress(
746 _(b'clone'), total=filesize, unit=_(b'bytes')
755 _(b'clone'), total=filesize, unit=_(b'bytes')
747 )
756 )
748 progress.update(0)
757 progress.update(0)
749
758
750 vfsmap = _makemap(repo)
759 vfsmap = _makemap(repo)
751 # we keep repo.vfs out of the on purpose, ther are too many danger
760 # we keep repo.vfs out of the on purpose, ther are too many danger
752 # there (eg: .hg/hgrc),
761 # there (eg: .hg/hgrc),
753 #
762 #
754 # this assert is duplicated (from _makemap) as author might think this
763 # this assert is duplicated (from _makemap) as author might think this
755 # is fine, while this is really not fine.
764 # is fine, while this is really not fine.
756 if repo.vfs in vfsmap.values():
765 if repo.vfs in vfsmap.values():
757 raise error.ProgrammingError(
766 raise error.ProgrammingError(
758 b'repo.vfs must not be added to vfsmap for security reasons'
767 b'repo.vfs must not be added to vfsmap for security reasons'
759 )
768 )
760
769
761 with repo.transaction(b'clone'):
770 with repo.transaction(b'clone'):
762 ctxs = (vfs.backgroundclosing(repo.ui) for vfs in vfsmap.values())
771 ctxs = (vfs.backgroundclosing(repo.ui) for vfs in vfsmap.values())
763 with nested(*ctxs):
772 with nested(*ctxs):
764 for i in range(filecount):
773 for i in range(filecount):
765 src = util.readexactly(fp, 1)
774 src = util.readexactly(fp, 1)
766 vfs = vfsmap[src]
775 vfs = vfsmap[src]
767 namelen = util.uvarintdecodestream(fp)
776 namelen = util.uvarintdecodestream(fp)
768 datalen = util.uvarintdecodestream(fp)
777 datalen = util.uvarintdecodestream(fp)
769
778
770 name = util.readexactly(fp, namelen)
779 name = util.readexactly(fp, namelen)
771
780
772 if repo.ui.debugflag:
781 if repo.ui.debugflag:
773 repo.ui.debug(
782 repo.ui.debug(
774 b'adding [%s] %s (%s)\n'
783 b'adding [%s] %s (%s)\n'
775 % (src, name, util.bytecount(datalen))
784 % (src, name, util.bytecount(datalen))
776 )
785 )
777
786
778 with vfs(name, b'w') as ofp:
787 with vfs(name, b'w') as ofp:
779 for chunk in util.filechunkiter(fp, limit=datalen):
788 for chunk in util.filechunkiter(fp, limit=datalen):
780 progress.increment(step=len(chunk))
789 progress.increment(step=len(chunk))
781 ofp.write(chunk)
790 ofp.write(chunk)
782
791
783 # force @filecache properties to be reloaded from
792 # force @filecache properties to be reloaded from
784 # streamclone-ed file at next access
793 # streamclone-ed file at next access
785 repo.invalidate(clearfilecache=True)
794 repo.invalidate(clearfilecache=True)
786
795
787 elapsed = util.timer() - start
796 elapsed = util.timer() - start
788 if elapsed <= 0:
797 if elapsed <= 0:
789 elapsed = 0.001
798 elapsed = 0.001
790 repo.ui.status(
799 repo.ui.status(
791 _(b'transferred %s in %.1f seconds (%s/sec)\n')
800 _(b'transferred %s in %.1f seconds (%s/sec)\n')
792 % (
801 % (
793 util.bytecount(progress.pos),
802 util.bytecount(progress.pos),
794 elapsed,
803 elapsed,
795 util.bytecount(progress.pos / elapsed),
804 util.bytecount(progress.pos / elapsed),
796 )
805 )
797 )
806 )
798 progress.complete()
807 progress.complete()
799
808
800
809
801 def applybundlev2(repo, fp, filecount, filesize, requirements):
810 def applybundlev2(repo, fp, filecount, filesize, requirements):
802 from . import localrepo
811 from . import localrepo
803
812
804 missingreqs = [r for r in requirements if r not in repo.supported]
813 missingreqs = [r for r in requirements if r not in repo.supported]
805 if missingreqs:
814 if missingreqs:
806 raise error.Abort(
815 raise error.Abort(
807 _(b'unable to apply stream clone: unsupported format: %s')
816 _(b'unable to apply stream clone: unsupported format: %s')
808 % b', '.join(sorted(missingreqs))
817 % b', '.join(sorted(missingreqs))
809 )
818 )
810
819
811 consumev2(repo, fp, filecount, filesize)
820 consumev2(repo, fp, filecount, filesize)
812
821
813 repo.requirements = new_stream_clone_requirements(
822 repo.requirements = new_stream_clone_requirements(
814 repo.supportedformats,
823 repo.supportedformats,
815 repo.requirements,
824 repo.requirements,
816 requirements,
825 requirements,
817 )
826 )
818 repo.svfs.options = localrepo.resolvestorevfsoptions(
827 repo.svfs.options = localrepo.resolvestorevfsoptions(
819 repo.ui, repo.requirements, repo.features
828 repo.ui, repo.requirements, repo.features
820 )
829 )
821 scmutil.writereporequirements(repo)
830 scmutil.writereporequirements(repo)
822
831
823
832
824 def _copy_files(src_vfs_map, dst_vfs_map, entries, progress):
833 def _copy_files(src_vfs_map, dst_vfs_map, entries, progress):
825 hardlink = [True]
834 hardlink = [True]
826
835
827 def copy_used():
836 def copy_used():
828 hardlink[0] = False
837 hardlink[0] = False
829 progress.topic = _(b'copying')
838 progress.topic = _(b'copying')
830
839
831 for k, path, size in entries:
840 for k, path, size in entries:
832 src_vfs = src_vfs_map[k]
841 src_vfs = src_vfs_map[k]
833 dst_vfs = dst_vfs_map[k]
842 dst_vfs = dst_vfs_map[k]
834 src_path = src_vfs.join(path)
843 src_path = src_vfs.join(path)
835 dst_path = dst_vfs.join(path)
844 dst_path = dst_vfs.join(path)
836 # We cannot use dirname and makedirs of dst_vfs here because the store
845 # We cannot use dirname and makedirs of dst_vfs here because the store
837 # encoding confuses them. See issue 6581 for details.
846 # encoding confuses them. See issue 6581 for details.
838 dirname = os.path.dirname(dst_path)
847 dirname = os.path.dirname(dst_path)
839 if not os.path.exists(dirname):
848 if not os.path.exists(dirname):
840 util.makedirs(dirname)
849 util.makedirs(dirname)
841 dst_vfs.register_file(path)
850 dst_vfs.register_file(path)
842 # XXX we could use the #nb_bytes argument.
851 # XXX we could use the #nb_bytes argument.
843 util.copyfile(
852 util.copyfile(
844 src_path,
853 src_path,
845 dst_path,
854 dst_path,
846 hardlink=hardlink[0],
855 hardlink=hardlink[0],
847 no_hardlink_cb=copy_used,
856 no_hardlink_cb=copy_used,
848 check_fs_hardlink=False,
857 check_fs_hardlink=False,
849 )
858 )
850 progress.increment()
859 progress.increment()
851 return hardlink[0]
860 return hardlink[0]
852
861
853
862
854 def local_copy(src_repo, dest_repo):
863 def local_copy(src_repo, dest_repo):
855 """copy all content from one local repository to another
864 """copy all content from one local repository to another
856
865
857 This is useful for local clone"""
866 This is useful for local clone"""
858 src_store_requirements = {
867 src_store_requirements = {
859 r
868 r
860 for r in src_repo.requirements
869 for r in src_repo.requirements
861 if r not in requirementsmod.WORKING_DIR_REQUIREMENTS
870 if r not in requirementsmod.WORKING_DIR_REQUIREMENTS
862 }
871 }
863 dest_store_requirements = {
872 dest_store_requirements = {
864 r
873 r
865 for r in dest_repo.requirements
874 for r in dest_repo.requirements
866 if r not in requirementsmod.WORKING_DIR_REQUIREMENTS
875 if r not in requirementsmod.WORKING_DIR_REQUIREMENTS
867 }
876 }
868 assert src_store_requirements == dest_store_requirements
877 assert src_store_requirements == dest_store_requirements
869
878
870 with dest_repo.lock():
879 with dest_repo.lock():
871 with src_repo.lock():
880 with src_repo.lock():
872
881
873 # bookmark is not integrated to the streaming as it might use the
882 # bookmark is not integrated to the streaming as it might use the
874 # `repo.vfs` and they are too many sentitive data accessible
883 # `repo.vfs` and they are too many sentitive data accessible
875 # through `repo.vfs` to expose it to streaming clone.
884 # through `repo.vfs` to expose it to streaming clone.
876 src_book_vfs = bookmarks.bookmarksvfs(src_repo)
885 src_book_vfs = bookmarks.bookmarksvfs(src_repo)
877 srcbookmarks = src_book_vfs.join(b'bookmarks')
886 srcbookmarks = src_book_vfs.join(b'bookmarks')
878 bm_count = 0
887 bm_count = 0
879 if os.path.exists(srcbookmarks):
888 if os.path.exists(srcbookmarks):
880 bm_count = 1
889 bm_count = 1
881
890
882 entries, totalfilesize = _v2_walk(
891 entries, totalfilesize = _v2_walk(
883 src_repo,
892 src_repo,
884 includes=None,
893 includes=None,
885 excludes=None,
894 excludes=None,
886 includeobsmarkers=True,
895 includeobsmarkers=True,
887 )
896 )
888 src_vfs_map = _makemap(src_repo)
897 src_vfs_map = _makemap(src_repo)
889 dest_vfs_map = _makemap(dest_repo)
898 dest_vfs_map = _makemap(dest_repo)
890 progress = src_repo.ui.makeprogress(
899 progress = src_repo.ui.makeprogress(
891 topic=_(b'linking'),
900 topic=_(b'linking'),
892 total=len(entries) + bm_count,
901 total=len(entries) + bm_count,
893 unit=_(b'files'),
902 unit=_(b'files'),
894 )
903 )
895 # copy files
904 # copy files
896 #
905 #
897 # We could copy the full file while the source repository is locked
906 # We could copy the full file while the source repository is locked
898 # and the other one without the lock. However, in the linking case,
907 # and the other one without the lock. However, in the linking case,
899 # this would also requires checks that nobody is appending any data
908 # this would also requires checks that nobody is appending any data
900 # to the files while we do the clone, so this is not done yet. We
909 # to the files while we do the clone, so this is not done yet. We
901 # could do this blindly when copying files.
910 # could do this blindly when copying files.
902 files = ((k, path, size) for k, path, ftype, size in entries)
911 files = ((k, path, size) for k, path, ftype, size in entries)
903 hardlink = _copy_files(src_vfs_map, dest_vfs_map, files, progress)
912 hardlink = _copy_files(src_vfs_map, dest_vfs_map, files, progress)
904
913
905 # copy bookmarks over
914 # copy bookmarks over
906 if bm_count:
915 if bm_count:
907 dst_book_vfs = bookmarks.bookmarksvfs(dest_repo)
916 dst_book_vfs = bookmarks.bookmarksvfs(dest_repo)
908 dstbookmarks = dst_book_vfs.join(b'bookmarks')
917 dstbookmarks = dst_book_vfs.join(b'bookmarks')
909 util.copyfile(srcbookmarks, dstbookmarks)
918 util.copyfile(srcbookmarks, dstbookmarks)
910 progress.complete()
919 progress.complete()
911 if hardlink:
920 if hardlink:
912 msg = b'linked %d files\n'
921 msg = b'linked %d files\n'
913 else:
922 else:
914 msg = b'copied %d files\n'
923 msg = b'copied %d files\n'
915 src_repo.ui.debug(msg % (len(entries) + bm_count))
924 src_repo.ui.debug(msg % (len(entries) + bm_count))
916
925
917 with dest_repo.transaction(b"localclone") as tr:
926 with dest_repo.transaction(b"localclone") as tr:
918 dest_repo.store.write(tr)
927 dest_repo.store.write(tr)
919
928
920 # clean up transaction file as they do not make sense
929 # clean up transaction file as they do not make sense
921 undo_files = [(dest_repo.svfs, b'undo.backupfiles')]
930 undo_files = [(dest_repo.svfs, b'undo.backupfiles')]
922 undo_files.extend(dest_repo.undofiles())
931 undo_files.extend(dest_repo.undofiles())
923 for undovfs, undofile in undo_files:
932 for undovfs, undofile in undo_files:
924 try:
933 try:
925 undovfs.unlink(undofile)
934 undovfs.unlink(undofile)
926 except OSError as e:
935 except OSError as e:
927 if e.errno != errno.ENOENT:
936 if e.errno != errno.ENOENT:
928 msg = _(b'error removing %s: %s\n')
937 msg = _(b'error removing %s: %s\n')
929 path = undovfs.join(undofile)
938 path = undovfs.join(undofile)
930 e_msg = stringutil.forcebytestr(e)
939 e_msg = stringutil.forcebytestr(e)
931 msg %= (path, e_msg)
940 msg %= (path, e_msg)
932 dest_repo.ui.warn(msg)
941 dest_repo.ui.warn(msg)
@@ -1,739 +1,739 b''
1 # wireprotov1server.py - Wire protocol version 1 server functionality
1 # wireprotov1server.py - Wire protocol version 1 server functionality
2 #
2 #
3 # Copyright 2005-2010 Olivia Mackall <olivia@selenic.com>
3 # Copyright 2005-2010 Olivia Mackall <olivia@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import binascii
10 import binascii
11 import os
11 import os
12
12
13 from .i18n import _
13 from .i18n import _
14 from .node import hex
14 from .node import hex
15 from .pycompat import getattr
15 from .pycompat import getattr
16
16
17 from . import (
17 from . import (
18 bundle2,
18 bundle2,
19 bundlecaches,
19 bundlecaches,
20 changegroup as changegroupmod,
20 changegroup as changegroupmod,
21 discovery,
21 discovery,
22 encoding,
22 encoding,
23 error,
23 error,
24 exchange,
24 exchange,
25 pushkey as pushkeymod,
25 pushkey as pushkeymod,
26 pycompat,
26 pycompat,
27 requirements as requirementsmod,
27 requirements as requirementsmod,
28 streamclone,
28 streamclone,
29 util,
29 util,
30 wireprototypes,
30 wireprototypes,
31 )
31 )
32
32
33 from .utils import (
33 from .utils import (
34 procutil,
34 procutil,
35 stringutil,
35 stringutil,
36 )
36 )
37
37
38 urlerr = util.urlerr
38 urlerr = util.urlerr
39 urlreq = util.urlreq
39 urlreq = util.urlreq
40
40
41 bundle2requiredmain = _(b'incompatible Mercurial client; bundle2 required')
41 bundle2requiredmain = _(b'incompatible Mercurial client; bundle2 required')
42 bundle2requiredhint = _(
42 bundle2requiredhint = _(
43 b'see https://www.mercurial-scm.org/wiki/IncompatibleClient'
43 b'see https://www.mercurial-scm.org/wiki/IncompatibleClient'
44 )
44 )
45 bundle2required = b'%s\n(%s)\n' % (bundle2requiredmain, bundle2requiredhint)
45 bundle2required = b'%s\n(%s)\n' % (bundle2requiredmain, bundle2requiredhint)
46
46
47
47
48 def clientcompressionsupport(proto):
48 def clientcompressionsupport(proto):
49 """Returns a list of compression methods supported by the client.
49 """Returns a list of compression methods supported by the client.
50
50
51 Returns a list of the compression methods supported by the client
51 Returns a list of the compression methods supported by the client
52 according to the protocol capabilities. If no such capability has
52 according to the protocol capabilities. If no such capability has
53 been announced, fallback to the default of zlib and uncompressed.
53 been announced, fallback to the default of zlib and uncompressed.
54 """
54 """
55 for cap in proto.getprotocaps():
55 for cap in proto.getprotocaps():
56 if cap.startswith(b'comp='):
56 if cap.startswith(b'comp='):
57 return cap[5:].split(b',')
57 return cap[5:].split(b',')
58 return [b'zlib', b'none']
58 return [b'zlib', b'none']
59
59
60
60
61 # wire protocol command can either return a string or one of these classes.
61 # wire protocol command can either return a string or one of these classes.
62
62
63
63
64 def getdispatchrepo(repo, proto, command):
64 def getdispatchrepo(repo, proto, command):
65 """Obtain the repo used for processing wire protocol commands.
65 """Obtain the repo used for processing wire protocol commands.
66
66
67 The intent of this function is to serve as a monkeypatch point for
67 The intent of this function is to serve as a monkeypatch point for
68 extensions that need commands to operate on different repo views under
68 extensions that need commands to operate on different repo views under
69 specialized circumstances.
69 specialized circumstances.
70 """
70 """
71 viewconfig = repo.ui.config(b'server', b'view')
71 viewconfig = repo.ui.config(b'server', b'view')
72 return repo.filtered(viewconfig)
72 return repo.filtered(viewconfig)
73
73
74
74
75 def dispatch(repo, proto, command):
75 def dispatch(repo, proto, command):
76 repo = getdispatchrepo(repo, proto, command)
76 repo = getdispatchrepo(repo, proto, command)
77
77
78 func, spec = commands[command]
78 func, spec = commands[command]
79 args = proto.getargs(spec)
79 args = proto.getargs(spec)
80
80
81 return func(repo, proto, *args)
81 return func(repo, proto, *args)
82
82
83
83
84 def options(cmd, keys, others):
84 def options(cmd, keys, others):
85 opts = {}
85 opts = {}
86 for k in keys:
86 for k in keys:
87 if k in others:
87 if k in others:
88 opts[k] = others[k]
88 opts[k] = others[k]
89 del others[k]
89 del others[k]
90 if others:
90 if others:
91 procutil.stderr.write(
91 procutil.stderr.write(
92 b"warning: %s ignored unexpected arguments %s\n"
92 b"warning: %s ignored unexpected arguments %s\n"
93 % (cmd, b",".join(others))
93 % (cmd, b",".join(others))
94 )
94 )
95 return opts
95 return opts
96
96
97
97
98 def bundle1allowed(repo, action):
98 def bundle1allowed(repo, action):
99 """Whether a bundle1 operation is allowed from the server.
99 """Whether a bundle1 operation is allowed from the server.
100
100
101 Priority is:
101 Priority is:
102
102
103 1. server.bundle1gd.<action> (if generaldelta active)
103 1. server.bundle1gd.<action> (if generaldelta active)
104 2. server.bundle1.<action>
104 2. server.bundle1.<action>
105 3. server.bundle1gd (if generaldelta active)
105 3. server.bundle1gd (if generaldelta active)
106 4. server.bundle1
106 4. server.bundle1
107 """
107 """
108 ui = repo.ui
108 ui = repo.ui
109 gd = requirementsmod.GENERALDELTA_REQUIREMENT in repo.requirements
109 gd = requirementsmod.GENERALDELTA_REQUIREMENT in repo.requirements
110
110
111 if gd:
111 if gd:
112 v = ui.configbool(b'server', b'bundle1gd.%s' % action)
112 v = ui.configbool(b'server', b'bundle1gd.%s' % action)
113 if v is not None:
113 if v is not None:
114 return v
114 return v
115
115
116 v = ui.configbool(b'server', b'bundle1.%s' % action)
116 v = ui.configbool(b'server', b'bundle1.%s' % action)
117 if v is not None:
117 if v is not None:
118 return v
118 return v
119
119
120 if gd:
120 if gd:
121 v = ui.configbool(b'server', b'bundle1gd')
121 v = ui.configbool(b'server', b'bundle1gd')
122 if v is not None:
122 if v is not None:
123 return v
123 return v
124
124
125 return ui.configbool(b'server', b'bundle1')
125 return ui.configbool(b'server', b'bundle1')
126
126
127
127
128 commands = wireprototypes.commanddict()
128 commands = wireprototypes.commanddict()
129
129
130
130
131 def wireprotocommand(name, args=None, permission=b'push'):
131 def wireprotocommand(name, args=None, permission=b'push'):
132 """Decorator to declare a wire protocol command.
132 """Decorator to declare a wire protocol command.
133
133
134 ``name`` is the name of the wire protocol command being provided.
134 ``name`` is the name of the wire protocol command being provided.
135
135
136 ``args`` defines the named arguments accepted by the command. It is
136 ``args`` defines the named arguments accepted by the command. It is
137 a space-delimited list of argument names. ``*`` denotes a special value
137 a space-delimited list of argument names. ``*`` denotes a special value
138 that says to accept all named arguments.
138 that says to accept all named arguments.
139
139
140 ``permission`` defines the permission type needed to run this command.
140 ``permission`` defines the permission type needed to run this command.
141 Can be ``push`` or ``pull``. These roughly map to read-write and read-only,
141 Can be ``push`` or ``pull``. These roughly map to read-write and read-only,
142 respectively. Default is to assume command requires ``push`` permissions
142 respectively. Default is to assume command requires ``push`` permissions
143 because otherwise commands not declaring their permissions could modify
143 because otherwise commands not declaring their permissions could modify
144 a repository that is supposed to be read-only.
144 a repository that is supposed to be read-only.
145 """
145 """
146 transports = {
146 transports = {
147 k for k, v in wireprototypes.TRANSPORTS.items() if v[b'version'] == 1
147 k for k, v in wireprototypes.TRANSPORTS.items() if v[b'version'] == 1
148 }
148 }
149
149
150 if permission not in (b'push', b'pull'):
150 if permission not in (b'push', b'pull'):
151 raise error.ProgrammingError(
151 raise error.ProgrammingError(
152 b'invalid wire protocol permission; '
152 b'invalid wire protocol permission; '
153 b'got %s; expected "push" or "pull"' % permission
153 b'got %s; expected "push" or "pull"' % permission
154 )
154 )
155
155
156 if args is None:
156 if args is None:
157 args = b''
157 args = b''
158
158
159 if not isinstance(args, bytes):
159 if not isinstance(args, bytes):
160 raise error.ProgrammingError(
160 raise error.ProgrammingError(
161 b'arguments for version 1 commands must be declared as bytes'
161 b'arguments for version 1 commands must be declared as bytes'
162 )
162 )
163
163
164 def register(func):
164 def register(func):
165 if name in commands:
165 if name in commands:
166 raise error.ProgrammingError(
166 raise error.ProgrammingError(
167 b'%s command already registered for version 1' % name
167 b'%s command already registered for version 1' % name
168 )
168 )
169 commands[name] = wireprototypes.commandentry(
169 commands[name] = wireprototypes.commandentry(
170 func, args=args, transports=transports, permission=permission
170 func, args=args, transports=transports, permission=permission
171 )
171 )
172
172
173 return func
173 return func
174
174
175 return register
175 return register
176
176
177
177
178 # TODO define a more appropriate permissions type to use for this.
178 # TODO define a more appropriate permissions type to use for this.
179 @wireprotocommand(b'batch', b'cmds *', permission=b'pull')
179 @wireprotocommand(b'batch', b'cmds *', permission=b'pull')
180 def batch(repo, proto, cmds, others):
180 def batch(repo, proto, cmds, others):
181 unescapearg = wireprototypes.unescapebatcharg
181 unescapearg = wireprototypes.unescapebatcharg
182 res = []
182 res = []
183 for pair in cmds.split(b';'):
183 for pair in cmds.split(b';'):
184 op, args = pair.split(b' ', 1)
184 op, args = pair.split(b' ', 1)
185 vals = {}
185 vals = {}
186 for a in args.split(b','):
186 for a in args.split(b','):
187 if a:
187 if a:
188 n, v = a.split(b'=')
188 n, v = a.split(b'=')
189 vals[unescapearg(n)] = unescapearg(v)
189 vals[unescapearg(n)] = unescapearg(v)
190 func, spec = commands[op]
190 func, spec = commands[op]
191
191
192 # Validate that client has permissions to perform this command.
192 # Validate that client has permissions to perform this command.
193 perm = commands[op].permission
193 perm = commands[op].permission
194 assert perm in (b'push', b'pull')
194 assert perm in (b'push', b'pull')
195 proto.checkperm(perm)
195 proto.checkperm(perm)
196
196
197 if spec:
197 if spec:
198 keys = spec.split()
198 keys = spec.split()
199 data = {}
199 data = {}
200 for k in keys:
200 for k in keys:
201 if k == b'*':
201 if k == b'*':
202 star = {}
202 star = {}
203 for key in vals.keys():
203 for key in vals.keys():
204 if key not in keys:
204 if key not in keys:
205 star[key] = vals[key]
205 star[key] = vals[key]
206 data[b'*'] = star
206 data[b'*'] = star
207 else:
207 else:
208 data[k] = vals[k]
208 data[k] = vals[k]
209 result = func(repo, proto, *[data[k] for k in keys])
209 result = func(repo, proto, *[data[k] for k in keys])
210 else:
210 else:
211 result = func(repo, proto)
211 result = func(repo, proto)
212 if isinstance(result, wireprototypes.ooberror):
212 if isinstance(result, wireprototypes.ooberror):
213 return result
213 return result
214
214
215 # For now, all batchable commands must return bytesresponse or
215 # For now, all batchable commands must return bytesresponse or
216 # raw bytes (for backwards compatibility).
216 # raw bytes (for backwards compatibility).
217 assert isinstance(result, (wireprototypes.bytesresponse, bytes))
217 assert isinstance(result, (wireprototypes.bytesresponse, bytes))
218 if isinstance(result, wireprototypes.bytesresponse):
218 if isinstance(result, wireprototypes.bytesresponse):
219 result = result.data
219 result = result.data
220 res.append(wireprototypes.escapebatcharg(result))
220 res.append(wireprototypes.escapebatcharg(result))
221
221
222 return wireprototypes.bytesresponse(b';'.join(res))
222 return wireprototypes.bytesresponse(b';'.join(res))
223
223
224
224
225 @wireprotocommand(b'between', b'pairs', permission=b'pull')
225 @wireprotocommand(b'between', b'pairs', permission=b'pull')
226 def between(repo, proto, pairs):
226 def between(repo, proto, pairs):
227 pairs = [wireprototypes.decodelist(p, b'-') for p in pairs.split(b" ")]
227 pairs = [wireprototypes.decodelist(p, b'-') for p in pairs.split(b" ")]
228 r = []
228 r = []
229 for b in repo.between(pairs):
229 for b in repo.between(pairs):
230 r.append(wireprototypes.encodelist(b) + b"\n")
230 r.append(wireprototypes.encodelist(b) + b"\n")
231
231
232 return wireprototypes.bytesresponse(b''.join(r))
232 return wireprototypes.bytesresponse(b''.join(r))
233
233
234
234
235 @wireprotocommand(b'branchmap', permission=b'pull')
235 @wireprotocommand(b'branchmap', permission=b'pull')
236 def branchmap(repo, proto):
236 def branchmap(repo, proto):
237 branchmap = repo.branchmap()
237 branchmap = repo.branchmap()
238 heads = []
238 heads = []
239 for branch, nodes in pycompat.iteritems(branchmap):
239 for branch, nodes in pycompat.iteritems(branchmap):
240 branchname = urlreq.quote(encoding.fromlocal(branch))
240 branchname = urlreq.quote(encoding.fromlocal(branch))
241 branchnodes = wireprototypes.encodelist(nodes)
241 branchnodes = wireprototypes.encodelist(nodes)
242 heads.append(b'%s %s' % (branchname, branchnodes))
242 heads.append(b'%s %s' % (branchname, branchnodes))
243
243
244 return wireprototypes.bytesresponse(b'\n'.join(heads))
244 return wireprototypes.bytesresponse(b'\n'.join(heads))
245
245
246
246
247 @wireprotocommand(b'branches', b'nodes', permission=b'pull')
247 @wireprotocommand(b'branches', b'nodes', permission=b'pull')
248 def branches(repo, proto, nodes):
248 def branches(repo, proto, nodes):
249 nodes = wireprototypes.decodelist(nodes)
249 nodes = wireprototypes.decodelist(nodes)
250 r = []
250 r = []
251 for b in repo.branches(nodes):
251 for b in repo.branches(nodes):
252 r.append(wireprototypes.encodelist(b) + b"\n")
252 r.append(wireprototypes.encodelist(b) + b"\n")
253
253
254 return wireprototypes.bytesresponse(b''.join(r))
254 return wireprototypes.bytesresponse(b''.join(r))
255
255
256
256
257 @wireprotocommand(b'clonebundles', b'', permission=b'pull')
257 @wireprotocommand(b'clonebundles', b'', permission=b'pull')
258 def clonebundles(repo, proto):
258 def clonebundles(repo, proto):
259 """Server command for returning info for available bundles to seed clones.
259 """Server command for returning info for available bundles to seed clones.
260
260
261 Clients will parse this response and determine what bundle to fetch.
261 Clients will parse this response and determine what bundle to fetch.
262
262
263 Extensions may wrap this command to filter or dynamically emit data
263 Extensions may wrap this command to filter or dynamically emit data
264 depending on the request. e.g. you could advertise URLs for the closest
264 depending on the request. e.g. you could advertise URLs for the closest
265 data center given the client's IP address.
265 data center given the client's IP address.
266 """
266 """
267 return wireprototypes.bytesresponse(
267 return wireprototypes.bytesresponse(
268 repo.vfs.tryread(bundlecaches.CB_MANIFEST_FILE)
268 repo.vfs.tryread(bundlecaches.CB_MANIFEST_FILE)
269 )
269 )
270
270
271
271
272 wireprotocaps = [
272 wireprotocaps = [
273 b'lookup',
273 b'lookup',
274 b'branchmap',
274 b'branchmap',
275 b'pushkey',
275 b'pushkey',
276 b'known',
276 b'known',
277 b'getbundle',
277 b'getbundle',
278 b'unbundlehash',
278 b'unbundlehash',
279 ]
279 ]
280
280
281
281
282 def _capabilities(repo, proto):
282 def _capabilities(repo, proto):
283 """return a list of capabilities for a repo
283 """return a list of capabilities for a repo
284
284
285 This function exists to allow extensions to easily wrap capabilities
285 This function exists to allow extensions to easily wrap capabilities
286 computation
286 computation
287
287
288 - returns a lists: easy to alter
288 - returns a lists: easy to alter
289 - change done here will be propagated to both `capabilities` and `hello`
289 - change done here will be propagated to both `capabilities` and `hello`
290 command without any other action needed.
290 command without any other action needed.
291 """
291 """
292 # copy to prevent modification of the global list
292 # copy to prevent modification of the global list
293 caps = list(wireprotocaps)
293 caps = list(wireprotocaps)
294
294
295 # Command of same name as capability isn't exposed to version 1 of
295 # Command of same name as capability isn't exposed to version 1 of
296 # transports. So conditionally add it.
296 # transports. So conditionally add it.
297 if commands.commandavailable(b'changegroupsubset', proto):
297 if commands.commandavailable(b'changegroupsubset', proto):
298 caps.append(b'changegroupsubset')
298 caps.append(b'changegroupsubset')
299
299
300 if streamclone.allowservergeneration(repo):
300 if streamclone.allowservergeneration(repo):
301 if repo.ui.configbool(b'server', b'preferuncompressed'):
301 if repo.ui.configbool(b'server', b'preferuncompressed'):
302 caps.append(b'stream-preferred')
302 caps.append(b'stream-preferred')
303 requiredformats = repo.requirements & repo.supportedformats
303 requiredformats = streamclone.streamed_requirements(repo)
304 # if our local revlogs are just revlogv1, add 'stream' cap
304 # if our local revlogs are just revlogv1, add 'stream' cap
305 if not requiredformats - {requirementsmod.REVLOGV1_REQUIREMENT}:
305 if not requiredformats - {requirementsmod.REVLOGV1_REQUIREMENT}:
306 caps.append(b'stream')
306 caps.append(b'stream')
307 # otherwise, add 'streamreqs' detailing our local revlog format
307 # otherwise, add 'streamreqs' detailing our local revlog format
308 else:
308 else:
309 caps.append(b'streamreqs=%s' % b','.join(sorted(requiredformats)))
309 caps.append(b'streamreqs=%s' % b','.join(sorted(requiredformats)))
310 if repo.ui.configbool(b'experimental', b'bundle2-advertise'):
310 if repo.ui.configbool(b'experimental', b'bundle2-advertise'):
311 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo, role=b'server'))
311 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo, role=b'server'))
312 caps.append(b'bundle2=' + urlreq.quote(capsblob))
312 caps.append(b'bundle2=' + urlreq.quote(capsblob))
313 caps.append(b'unbundle=%s' % b','.join(bundle2.bundlepriority))
313 caps.append(b'unbundle=%s' % b','.join(bundle2.bundlepriority))
314
314
315 if repo.ui.configbool(b'experimental', b'narrow'):
315 if repo.ui.configbool(b'experimental', b'narrow'):
316 caps.append(wireprototypes.NARROWCAP)
316 caps.append(wireprototypes.NARROWCAP)
317 if repo.ui.configbool(b'experimental', b'narrowservebrokenellipses'):
317 if repo.ui.configbool(b'experimental', b'narrowservebrokenellipses'):
318 caps.append(wireprototypes.ELLIPSESCAP)
318 caps.append(wireprototypes.ELLIPSESCAP)
319
319
320 return proto.addcapabilities(repo, caps)
320 return proto.addcapabilities(repo, caps)
321
321
322
322
323 # If you are writing an extension and consider wrapping this function. Wrap
323 # If you are writing an extension and consider wrapping this function. Wrap
324 # `_capabilities` instead.
324 # `_capabilities` instead.
325 @wireprotocommand(b'capabilities', permission=b'pull')
325 @wireprotocommand(b'capabilities', permission=b'pull')
326 def capabilities(repo, proto):
326 def capabilities(repo, proto):
327 caps = _capabilities(repo, proto)
327 caps = _capabilities(repo, proto)
328 return wireprototypes.bytesresponse(b' '.join(sorted(caps)))
328 return wireprototypes.bytesresponse(b' '.join(sorted(caps)))
329
329
330
330
331 @wireprotocommand(b'changegroup', b'roots', permission=b'pull')
331 @wireprotocommand(b'changegroup', b'roots', permission=b'pull')
332 def changegroup(repo, proto, roots):
332 def changegroup(repo, proto, roots):
333 nodes = wireprototypes.decodelist(roots)
333 nodes = wireprototypes.decodelist(roots)
334 outgoing = discovery.outgoing(
334 outgoing = discovery.outgoing(
335 repo, missingroots=nodes, ancestorsof=repo.heads()
335 repo, missingroots=nodes, ancestorsof=repo.heads()
336 )
336 )
337 cg = changegroupmod.makechangegroup(repo, outgoing, b'01', b'serve')
337 cg = changegroupmod.makechangegroup(repo, outgoing, b'01', b'serve')
338 gen = iter(lambda: cg.read(32768), b'')
338 gen = iter(lambda: cg.read(32768), b'')
339 return wireprototypes.streamres(gen=gen)
339 return wireprototypes.streamres(gen=gen)
340
340
341
341
342 @wireprotocommand(b'changegroupsubset', b'bases heads', permission=b'pull')
342 @wireprotocommand(b'changegroupsubset', b'bases heads', permission=b'pull')
343 def changegroupsubset(repo, proto, bases, heads):
343 def changegroupsubset(repo, proto, bases, heads):
344 bases = wireprototypes.decodelist(bases)
344 bases = wireprototypes.decodelist(bases)
345 heads = wireprototypes.decodelist(heads)
345 heads = wireprototypes.decodelist(heads)
346 outgoing = discovery.outgoing(repo, missingroots=bases, ancestorsof=heads)
346 outgoing = discovery.outgoing(repo, missingroots=bases, ancestorsof=heads)
347 cg = changegroupmod.makechangegroup(repo, outgoing, b'01', b'serve')
347 cg = changegroupmod.makechangegroup(repo, outgoing, b'01', b'serve')
348 gen = iter(lambda: cg.read(32768), b'')
348 gen = iter(lambda: cg.read(32768), b'')
349 return wireprototypes.streamres(gen=gen)
349 return wireprototypes.streamres(gen=gen)
350
350
351
351
352 @wireprotocommand(b'debugwireargs', b'one two *', permission=b'pull')
352 @wireprotocommand(b'debugwireargs', b'one two *', permission=b'pull')
353 def debugwireargs(repo, proto, one, two, others):
353 def debugwireargs(repo, proto, one, two, others):
354 # only accept optional args from the known set
354 # only accept optional args from the known set
355 opts = options(b'debugwireargs', [b'three', b'four'], others)
355 opts = options(b'debugwireargs', [b'three', b'four'], others)
356 return wireprototypes.bytesresponse(
356 return wireprototypes.bytesresponse(
357 repo.debugwireargs(one, two, **pycompat.strkwargs(opts))
357 repo.debugwireargs(one, two, **pycompat.strkwargs(opts))
358 )
358 )
359
359
360
360
361 def find_pullbundle(repo, proto, opts, clheads, heads, common):
361 def find_pullbundle(repo, proto, opts, clheads, heads, common):
362 """Return a file object for the first matching pullbundle.
362 """Return a file object for the first matching pullbundle.
363
363
364 Pullbundles are specified in .hg/pullbundles.manifest similar to
364 Pullbundles are specified in .hg/pullbundles.manifest similar to
365 clonebundles.
365 clonebundles.
366 For each entry, the bundle specification is checked for compatibility:
366 For each entry, the bundle specification is checked for compatibility:
367 - Client features vs the BUNDLESPEC.
367 - Client features vs the BUNDLESPEC.
368 - Revisions shared with the clients vs base revisions of the bundle.
368 - Revisions shared with the clients vs base revisions of the bundle.
369 A bundle can be applied only if all its base revisions are known by
369 A bundle can be applied only if all its base revisions are known by
370 the client.
370 the client.
371 - At least one leaf of the bundle's DAG is missing on the client.
371 - At least one leaf of the bundle's DAG is missing on the client.
372 - Every leaf of the bundle's DAG is part of node set the client wants.
372 - Every leaf of the bundle's DAG is part of node set the client wants.
373 E.g. do not send a bundle of all changes if the client wants only
373 E.g. do not send a bundle of all changes if the client wants only
374 one specific branch of many.
374 one specific branch of many.
375 """
375 """
376
376
377 def decodehexstring(s):
377 def decodehexstring(s):
378 return {binascii.unhexlify(h) for h in s.split(b';')}
378 return {binascii.unhexlify(h) for h in s.split(b';')}
379
379
380 manifest = repo.vfs.tryread(b'pullbundles.manifest')
380 manifest = repo.vfs.tryread(b'pullbundles.manifest')
381 if not manifest:
381 if not manifest:
382 return None
382 return None
383 res = bundlecaches.parseclonebundlesmanifest(repo, manifest)
383 res = bundlecaches.parseclonebundlesmanifest(repo, manifest)
384 res = bundlecaches.filterclonebundleentries(repo, res)
384 res = bundlecaches.filterclonebundleentries(repo, res)
385 if not res:
385 if not res:
386 return None
386 return None
387 cl = repo.unfiltered().changelog
387 cl = repo.unfiltered().changelog
388 heads_anc = cl.ancestors([cl.rev(rev) for rev in heads], inclusive=True)
388 heads_anc = cl.ancestors([cl.rev(rev) for rev in heads], inclusive=True)
389 common_anc = cl.ancestors([cl.rev(rev) for rev in common], inclusive=True)
389 common_anc = cl.ancestors([cl.rev(rev) for rev in common], inclusive=True)
390 compformats = clientcompressionsupport(proto)
390 compformats = clientcompressionsupport(proto)
391 for entry in res:
391 for entry in res:
392 comp = entry.get(b'COMPRESSION')
392 comp = entry.get(b'COMPRESSION')
393 altcomp = util.compengines._bundlenames.get(comp)
393 altcomp = util.compengines._bundlenames.get(comp)
394 if comp and comp not in compformats and altcomp not in compformats:
394 if comp and comp not in compformats and altcomp not in compformats:
395 continue
395 continue
396 # No test yet for VERSION, since V2 is supported by any client
396 # No test yet for VERSION, since V2 is supported by any client
397 # that advertises partial pulls
397 # that advertises partial pulls
398 if b'heads' in entry:
398 if b'heads' in entry:
399 try:
399 try:
400 bundle_heads = decodehexstring(entry[b'heads'])
400 bundle_heads = decodehexstring(entry[b'heads'])
401 except TypeError:
401 except TypeError:
402 # Bad heads entry
402 # Bad heads entry
403 continue
403 continue
404 if bundle_heads.issubset(common):
404 if bundle_heads.issubset(common):
405 continue # Nothing new
405 continue # Nothing new
406 if all(cl.rev(rev) in common_anc for rev in bundle_heads):
406 if all(cl.rev(rev) in common_anc for rev in bundle_heads):
407 continue # Still nothing new
407 continue # Still nothing new
408 if any(
408 if any(
409 cl.rev(rev) not in heads_anc and cl.rev(rev) not in common_anc
409 cl.rev(rev) not in heads_anc and cl.rev(rev) not in common_anc
410 for rev in bundle_heads
410 for rev in bundle_heads
411 ):
411 ):
412 continue
412 continue
413 if b'bases' in entry:
413 if b'bases' in entry:
414 try:
414 try:
415 bundle_bases = decodehexstring(entry[b'bases'])
415 bundle_bases = decodehexstring(entry[b'bases'])
416 except TypeError:
416 except TypeError:
417 # Bad bases entry
417 # Bad bases entry
418 continue
418 continue
419 if not all(cl.rev(rev) in common_anc for rev in bundle_bases):
419 if not all(cl.rev(rev) in common_anc for rev in bundle_bases):
420 continue
420 continue
421 path = entry[b'URL']
421 path = entry[b'URL']
422 repo.ui.debug(b'sending pullbundle "%s"\n' % path)
422 repo.ui.debug(b'sending pullbundle "%s"\n' % path)
423 try:
423 try:
424 return repo.vfs.open(path)
424 return repo.vfs.open(path)
425 except IOError:
425 except IOError:
426 repo.ui.debug(b'pullbundle "%s" not accessible\n' % path)
426 repo.ui.debug(b'pullbundle "%s" not accessible\n' % path)
427 continue
427 continue
428 return None
428 return None
429
429
430
430
431 @wireprotocommand(b'getbundle', b'*', permission=b'pull')
431 @wireprotocommand(b'getbundle', b'*', permission=b'pull')
432 def getbundle(repo, proto, others):
432 def getbundle(repo, proto, others):
433 opts = options(
433 opts = options(
434 b'getbundle', wireprototypes.GETBUNDLE_ARGUMENTS.keys(), others
434 b'getbundle', wireprototypes.GETBUNDLE_ARGUMENTS.keys(), others
435 )
435 )
436 for k, v in pycompat.iteritems(opts):
436 for k, v in pycompat.iteritems(opts):
437 keytype = wireprototypes.GETBUNDLE_ARGUMENTS[k]
437 keytype = wireprototypes.GETBUNDLE_ARGUMENTS[k]
438 if keytype == b'nodes':
438 if keytype == b'nodes':
439 opts[k] = wireprototypes.decodelist(v)
439 opts[k] = wireprototypes.decodelist(v)
440 elif keytype == b'csv':
440 elif keytype == b'csv':
441 opts[k] = list(v.split(b','))
441 opts[k] = list(v.split(b','))
442 elif keytype == b'scsv':
442 elif keytype == b'scsv':
443 opts[k] = set(v.split(b','))
443 opts[k] = set(v.split(b','))
444 elif keytype == b'boolean':
444 elif keytype == b'boolean':
445 # Client should serialize False as '0', which is a non-empty string
445 # Client should serialize False as '0', which is a non-empty string
446 # so it evaluates as a True bool.
446 # so it evaluates as a True bool.
447 if v == b'0':
447 if v == b'0':
448 opts[k] = False
448 opts[k] = False
449 else:
449 else:
450 opts[k] = bool(v)
450 opts[k] = bool(v)
451 elif keytype != b'plain':
451 elif keytype != b'plain':
452 raise KeyError(b'unknown getbundle option type %s' % keytype)
452 raise KeyError(b'unknown getbundle option type %s' % keytype)
453
453
454 if not bundle1allowed(repo, b'pull'):
454 if not bundle1allowed(repo, b'pull'):
455 if not exchange.bundle2requested(opts.get(b'bundlecaps')):
455 if not exchange.bundle2requested(opts.get(b'bundlecaps')):
456 if proto.name == b'http-v1':
456 if proto.name == b'http-v1':
457 return wireprototypes.ooberror(bundle2required)
457 return wireprototypes.ooberror(bundle2required)
458 raise error.Abort(bundle2requiredmain, hint=bundle2requiredhint)
458 raise error.Abort(bundle2requiredmain, hint=bundle2requiredhint)
459
459
460 try:
460 try:
461 clheads = set(repo.changelog.heads())
461 clheads = set(repo.changelog.heads())
462 heads = set(opts.get(b'heads', set()))
462 heads = set(opts.get(b'heads', set()))
463 common = set(opts.get(b'common', set()))
463 common = set(opts.get(b'common', set()))
464 common.discard(repo.nullid)
464 common.discard(repo.nullid)
465 if (
465 if (
466 repo.ui.configbool(b'server', b'pullbundle')
466 repo.ui.configbool(b'server', b'pullbundle')
467 and b'partial-pull' in proto.getprotocaps()
467 and b'partial-pull' in proto.getprotocaps()
468 ):
468 ):
469 # Check if a pre-built bundle covers this request.
469 # Check if a pre-built bundle covers this request.
470 bundle = find_pullbundle(repo, proto, opts, clheads, heads, common)
470 bundle = find_pullbundle(repo, proto, opts, clheads, heads, common)
471 if bundle:
471 if bundle:
472 return wireprototypes.streamres(
472 return wireprototypes.streamres(
473 gen=util.filechunkiter(bundle), prefer_uncompressed=True
473 gen=util.filechunkiter(bundle), prefer_uncompressed=True
474 )
474 )
475
475
476 if repo.ui.configbool(b'server', b'disablefullbundle'):
476 if repo.ui.configbool(b'server', b'disablefullbundle'):
477 # Check to see if this is a full clone.
477 # Check to see if this is a full clone.
478 changegroup = opts.get(b'cg', True)
478 changegroup = opts.get(b'cg', True)
479 if changegroup and not common and clheads == heads:
479 if changegroup and not common and clheads == heads:
480 raise error.Abort(
480 raise error.Abort(
481 _(b'server has pull-based clones disabled'),
481 _(b'server has pull-based clones disabled'),
482 hint=_(b'remove --pull if specified or upgrade Mercurial'),
482 hint=_(b'remove --pull if specified or upgrade Mercurial'),
483 )
483 )
484
484
485 info, chunks = exchange.getbundlechunks(
485 info, chunks = exchange.getbundlechunks(
486 repo, b'serve', **pycompat.strkwargs(opts)
486 repo, b'serve', **pycompat.strkwargs(opts)
487 )
487 )
488 prefercompressed = info.get(b'prefercompressed', True)
488 prefercompressed = info.get(b'prefercompressed', True)
489 except error.Abort as exc:
489 except error.Abort as exc:
490 # cleanly forward Abort error to the client
490 # cleanly forward Abort error to the client
491 if not exchange.bundle2requested(opts.get(b'bundlecaps')):
491 if not exchange.bundle2requested(opts.get(b'bundlecaps')):
492 if proto.name == b'http-v1':
492 if proto.name == b'http-v1':
493 return wireprototypes.ooberror(exc.message + b'\n')
493 return wireprototypes.ooberror(exc.message + b'\n')
494 raise # cannot do better for bundle1 + ssh
494 raise # cannot do better for bundle1 + ssh
495 # bundle2 request expect a bundle2 reply
495 # bundle2 request expect a bundle2 reply
496 bundler = bundle2.bundle20(repo.ui)
496 bundler = bundle2.bundle20(repo.ui)
497 manargs = [(b'message', exc.message)]
497 manargs = [(b'message', exc.message)]
498 advargs = []
498 advargs = []
499 if exc.hint is not None:
499 if exc.hint is not None:
500 advargs.append((b'hint', exc.hint))
500 advargs.append((b'hint', exc.hint))
501 bundler.addpart(bundle2.bundlepart(b'error:abort', manargs, advargs))
501 bundler.addpart(bundle2.bundlepart(b'error:abort', manargs, advargs))
502 chunks = bundler.getchunks()
502 chunks = bundler.getchunks()
503 prefercompressed = False
503 prefercompressed = False
504
504
505 return wireprototypes.streamres(
505 return wireprototypes.streamres(
506 gen=chunks, prefer_uncompressed=not prefercompressed
506 gen=chunks, prefer_uncompressed=not prefercompressed
507 )
507 )
508
508
509
509
510 @wireprotocommand(b'heads', permission=b'pull')
510 @wireprotocommand(b'heads', permission=b'pull')
511 def heads(repo, proto):
511 def heads(repo, proto):
512 h = repo.heads()
512 h = repo.heads()
513 return wireprototypes.bytesresponse(wireprototypes.encodelist(h) + b'\n')
513 return wireprototypes.bytesresponse(wireprototypes.encodelist(h) + b'\n')
514
514
515
515
516 @wireprotocommand(b'hello', permission=b'pull')
516 @wireprotocommand(b'hello', permission=b'pull')
517 def hello(repo, proto):
517 def hello(repo, proto):
518 """Called as part of SSH handshake to obtain server info.
518 """Called as part of SSH handshake to obtain server info.
519
519
520 Returns a list of lines describing interesting things about the
520 Returns a list of lines describing interesting things about the
521 server, in an RFC822-like format.
521 server, in an RFC822-like format.
522
522
523 Currently, the only one defined is ``capabilities``, which consists of a
523 Currently, the only one defined is ``capabilities``, which consists of a
524 line of space separated tokens describing server abilities:
524 line of space separated tokens describing server abilities:
525
525
526 capabilities: <token0> <token1> <token2>
526 capabilities: <token0> <token1> <token2>
527 """
527 """
528 caps = capabilities(repo, proto).data
528 caps = capabilities(repo, proto).data
529 return wireprototypes.bytesresponse(b'capabilities: %s\n' % caps)
529 return wireprototypes.bytesresponse(b'capabilities: %s\n' % caps)
530
530
531
531
532 @wireprotocommand(b'listkeys', b'namespace', permission=b'pull')
532 @wireprotocommand(b'listkeys', b'namespace', permission=b'pull')
533 def listkeys(repo, proto, namespace):
533 def listkeys(repo, proto, namespace):
534 d = sorted(repo.listkeys(encoding.tolocal(namespace)).items())
534 d = sorted(repo.listkeys(encoding.tolocal(namespace)).items())
535 return wireprototypes.bytesresponse(pushkeymod.encodekeys(d))
535 return wireprototypes.bytesresponse(pushkeymod.encodekeys(d))
536
536
537
537
538 @wireprotocommand(b'lookup', b'key', permission=b'pull')
538 @wireprotocommand(b'lookup', b'key', permission=b'pull')
539 def lookup(repo, proto, key):
539 def lookup(repo, proto, key):
540 try:
540 try:
541 k = encoding.tolocal(key)
541 k = encoding.tolocal(key)
542 n = repo.lookup(k)
542 n = repo.lookup(k)
543 r = hex(n)
543 r = hex(n)
544 success = 1
544 success = 1
545 except Exception as inst:
545 except Exception as inst:
546 r = stringutil.forcebytestr(inst)
546 r = stringutil.forcebytestr(inst)
547 success = 0
547 success = 0
548 return wireprototypes.bytesresponse(b'%d %s\n' % (success, r))
548 return wireprototypes.bytesresponse(b'%d %s\n' % (success, r))
549
549
550
550
551 @wireprotocommand(b'known', b'nodes *', permission=b'pull')
551 @wireprotocommand(b'known', b'nodes *', permission=b'pull')
552 def known(repo, proto, nodes, others):
552 def known(repo, proto, nodes, others):
553 v = b''.join(
553 v = b''.join(
554 b and b'1' or b'0' for b in repo.known(wireprototypes.decodelist(nodes))
554 b and b'1' or b'0' for b in repo.known(wireprototypes.decodelist(nodes))
555 )
555 )
556 return wireprototypes.bytesresponse(v)
556 return wireprototypes.bytesresponse(v)
557
557
558
558
559 @wireprotocommand(b'protocaps', b'caps', permission=b'pull')
559 @wireprotocommand(b'protocaps', b'caps', permission=b'pull')
560 def protocaps(repo, proto, caps):
560 def protocaps(repo, proto, caps):
561 if proto.name == wireprototypes.SSHV1:
561 if proto.name == wireprototypes.SSHV1:
562 proto._protocaps = set(caps.split(b' '))
562 proto._protocaps = set(caps.split(b' '))
563 return wireprototypes.bytesresponse(b'OK')
563 return wireprototypes.bytesresponse(b'OK')
564
564
565
565
566 @wireprotocommand(b'pushkey', b'namespace key old new', permission=b'push')
566 @wireprotocommand(b'pushkey', b'namespace key old new', permission=b'push')
567 def pushkey(repo, proto, namespace, key, old, new):
567 def pushkey(repo, proto, namespace, key, old, new):
568 # compatibility with pre-1.8 clients which were accidentally
568 # compatibility with pre-1.8 clients which were accidentally
569 # sending raw binary nodes rather than utf-8-encoded hex
569 # sending raw binary nodes rather than utf-8-encoded hex
570 if len(new) == 20 and stringutil.escapestr(new) != new:
570 if len(new) == 20 and stringutil.escapestr(new) != new:
571 # looks like it could be a binary node
571 # looks like it could be a binary node
572 try:
572 try:
573 new.decode('utf-8')
573 new.decode('utf-8')
574 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
574 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
575 except UnicodeDecodeError:
575 except UnicodeDecodeError:
576 pass # binary, leave unmodified
576 pass # binary, leave unmodified
577 else:
577 else:
578 new = encoding.tolocal(new) # normal path
578 new = encoding.tolocal(new) # normal path
579
579
580 with proto.mayberedirectstdio() as output:
580 with proto.mayberedirectstdio() as output:
581 r = (
581 r = (
582 repo.pushkey(
582 repo.pushkey(
583 encoding.tolocal(namespace),
583 encoding.tolocal(namespace),
584 encoding.tolocal(key),
584 encoding.tolocal(key),
585 encoding.tolocal(old),
585 encoding.tolocal(old),
586 new,
586 new,
587 )
587 )
588 or False
588 or False
589 )
589 )
590
590
591 output = output.getvalue() if output else b''
591 output = output.getvalue() if output else b''
592 return wireprototypes.bytesresponse(b'%d\n%s' % (int(r), output))
592 return wireprototypes.bytesresponse(b'%d\n%s' % (int(r), output))
593
593
594
594
595 @wireprotocommand(b'stream_out', permission=b'pull')
595 @wireprotocommand(b'stream_out', permission=b'pull')
596 def stream(repo, proto):
596 def stream(repo, proto):
597 """If the server supports streaming clone, it advertises the "stream"
597 """If the server supports streaming clone, it advertises the "stream"
598 capability with a value representing the version and flags of the repo
598 capability with a value representing the version and flags of the repo
599 it is serving. Client checks to see if it understands the format.
599 it is serving. Client checks to see if it understands the format.
600 """
600 """
601 return wireprototypes.streamreslegacy(streamclone.generatev1wireproto(repo))
601 return wireprototypes.streamreslegacy(streamclone.generatev1wireproto(repo))
602
602
603
603
604 @wireprotocommand(b'unbundle', b'heads', permission=b'push')
604 @wireprotocommand(b'unbundle', b'heads', permission=b'push')
605 def unbundle(repo, proto, heads):
605 def unbundle(repo, proto, heads):
606 their_heads = wireprototypes.decodelist(heads)
606 their_heads = wireprototypes.decodelist(heads)
607
607
608 with proto.mayberedirectstdio() as output:
608 with proto.mayberedirectstdio() as output:
609 try:
609 try:
610 exchange.check_heads(repo, their_heads, b'preparing changes')
610 exchange.check_heads(repo, their_heads, b'preparing changes')
611 cleanup = lambda: None
611 cleanup = lambda: None
612 try:
612 try:
613 payload = proto.getpayload()
613 payload = proto.getpayload()
614 if repo.ui.configbool(b'server', b'streamunbundle'):
614 if repo.ui.configbool(b'server', b'streamunbundle'):
615
615
616 def cleanup():
616 def cleanup():
617 # Ensure that the full payload is consumed, so
617 # Ensure that the full payload is consumed, so
618 # that the connection doesn't contain trailing garbage.
618 # that the connection doesn't contain trailing garbage.
619 for p in payload:
619 for p in payload:
620 pass
620 pass
621
621
622 fp = util.chunkbuffer(payload)
622 fp = util.chunkbuffer(payload)
623 else:
623 else:
624 # write bundle data to temporary file as it can be big
624 # write bundle data to temporary file as it can be big
625 fp, tempname = None, None
625 fp, tempname = None, None
626
626
627 def cleanup():
627 def cleanup():
628 if fp:
628 if fp:
629 fp.close()
629 fp.close()
630 if tempname:
630 if tempname:
631 os.unlink(tempname)
631 os.unlink(tempname)
632
632
633 fd, tempname = pycompat.mkstemp(prefix=b'hg-unbundle-')
633 fd, tempname = pycompat.mkstemp(prefix=b'hg-unbundle-')
634 repo.ui.debug(
634 repo.ui.debug(
635 b'redirecting incoming bundle to %s\n' % tempname
635 b'redirecting incoming bundle to %s\n' % tempname
636 )
636 )
637 fp = os.fdopen(fd, pycompat.sysstr(b'wb+'))
637 fp = os.fdopen(fd, pycompat.sysstr(b'wb+'))
638 for p in payload:
638 for p in payload:
639 fp.write(p)
639 fp.write(p)
640 fp.seek(0)
640 fp.seek(0)
641
641
642 gen = exchange.readbundle(repo.ui, fp, None)
642 gen = exchange.readbundle(repo.ui, fp, None)
643 if isinstance(
643 if isinstance(
644 gen, changegroupmod.cg1unpacker
644 gen, changegroupmod.cg1unpacker
645 ) and not bundle1allowed(repo, b'push'):
645 ) and not bundle1allowed(repo, b'push'):
646 if proto.name == b'http-v1':
646 if proto.name == b'http-v1':
647 # need to special case http because stderr do not get to
647 # need to special case http because stderr do not get to
648 # the http client on failed push so we need to abuse
648 # the http client on failed push so we need to abuse
649 # some other error type to make sure the message get to
649 # some other error type to make sure the message get to
650 # the user.
650 # the user.
651 return wireprototypes.ooberror(bundle2required)
651 return wireprototypes.ooberror(bundle2required)
652 raise error.Abort(
652 raise error.Abort(
653 bundle2requiredmain, hint=bundle2requiredhint
653 bundle2requiredmain, hint=bundle2requiredhint
654 )
654 )
655
655
656 r = exchange.unbundle(
656 r = exchange.unbundle(
657 repo, gen, their_heads, b'serve', proto.client()
657 repo, gen, their_heads, b'serve', proto.client()
658 )
658 )
659 if util.safehasattr(r, b'addpart'):
659 if util.safehasattr(r, b'addpart'):
660 # The return looks streamable, we are in the bundle2 case
660 # The return looks streamable, we are in the bundle2 case
661 # and should return a stream.
661 # and should return a stream.
662 return wireprototypes.streamreslegacy(gen=r.getchunks())
662 return wireprototypes.streamreslegacy(gen=r.getchunks())
663 return wireprototypes.pushres(
663 return wireprototypes.pushres(
664 r, output.getvalue() if output else b''
664 r, output.getvalue() if output else b''
665 )
665 )
666
666
667 finally:
667 finally:
668 cleanup()
668 cleanup()
669
669
670 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
670 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
671 # handle non-bundle2 case first
671 # handle non-bundle2 case first
672 if not getattr(exc, 'duringunbundle2', False):
672 if not getattr(exc, 'duringunbundle2', False):
673 try:
673 try:
674 raise
674 raise
675 except error.Abort as exc:
675 except error.Abort as exc:
676 # The old code we moved used procutil.stderr directly.
676 # The old code we moved used procutil.stderr directly.
677 # We did not change it to minimise code change.
677 # We did not change it to minimise code change.
678 # This need to be moved to something proper.
678 # This need to be moved to something proper.
679 # Feel free to do it.
679 # Feel free to do it.
680 procutil.stderr.write(exc.format())
680 procutil.stderr.write(exc.format())
681 procutil.stderr.flush()
681 procutil.stderr.flush()
682 return wireprototypes.pushres(
682 return wireprototypes.pushres(
683 0, output.getvalue() if output else b''
683 0, output.getvalue() if output else b''
684 )
684 )
685 except error.PushRaced:
685 except error.PushRaced:
686 return wireprototypes.pusherr(
686 return wireprototypes.pusherr(
687 pycompat.bytestr(exc),
687 pycompat.bytestr(exc),
688 output.getvalue() if output else b'',
688 output.getvalue() if output else b'',
689 )
689 )
690
690
691 bundler = bundle2.bundle20(repo.ui)
691 bundler = bundle2.bundle20(repo.ui)
692 for out in getattr(exc, '_bundle2salvagedoutput', ()):
692 for out in getattr(exc, '_bundle2salvagedoutput', ()):
693 bundler.addpart(out)
693 bundler.addpart(out)
694 try:
694 try:
695 try:
695 try:
696 raise
696 raise
697 except error.PushkeyFailed as exc:
697 except error.PushkeyFailed as exc:
698 # check client caps
698 # check client caps
699 remotecaps = getattr(exc, '_replycaps', None)
699 remotecaps = getattr(exc, '_replycaps', None)
700 if (
700 if (
701 remotecaps is not None
701 remotecaps is not None
702 and b'pushkey' not in remotecaps.get(b'error', ())
702 and b'pushkey' not in remotecaps.get(b'error', ())
703 ):
703 ):
704 # no support remote side, fallback to Abort handler.
704 # no support remote side, fallback to Abort handler.
705 raise
705 raise
706 part = bundler.newpart(b'error:pushkey')
706 part = bundler.newpart(b'error:pushkey')
707 part.addparam(b'in-reply-to', exc.partid)
707 part.addparam(b'in-reply-to', exc.partid)
708 if exc.namespace is not None:
708 if exc.namespace is not None:
709 part.addparam(
709 part.addparam(
710 b'namespace', exc.namespace, mandatory=False
710 b'namespace', exc.namespace, mandatory=False
711 )
711 )
712 if exc.key is not None:
712 if exc.key is not None:
713 part.addparam(b'key', exc.key, mandatory=False)
713 part.addparam(b'key', exc.key, mandatory=False)
714 if exc.new is not None:
714 if exc.new is not None:
715 part.addparam(b'new', exc.new, mandatory=False)
715 part.addparam(b'new', exc.new, mandatory=False)
716 if exc.old is not None:
716 if exc.old is not None:
717 part.addparam(b'old', exc.old, mandatory=False)
717 part.addparam(b'old', exc.old, mandatory=False)
718 if exc.ret is not None:
718 if exc.ret is not None:
719 part.addparam(b'ret', exc.ret, mandatory=False)
719 part.addparam(b'ret', exc.ret, mandatory=False)
720 except error.BundleValueError as exc:
720 except error.BundleValueError as exc:
721 errpart = bundler.newpart(b'error:unsupportedcontent')
721 errpart = bundler.newpart(b'error:unsupportedcontent')
722 if exc.parttype is not None:
722 if exc.parttype is not None:
723 errpart.addparam(b'parttype', exc.parttype)
723 errpart.addparam(b'parttype', exc.parttype)
724 if exc.params:
724 if exc.params:
725 errpart.addparam(b'params', b'\0'.join(exc.params))
725 errpart.addparam(b'params', b'\0'.join(exc.params))
726 except error.Abort as exc:
726 except error.Abort as exc:
727 manargs = [(b'message', exc.message)]
727 manargs = [(b'message', exc.message)]
728 advargs = []
728 advargs = []
729 if exc.hint is not None:
729 if exc.hint is not None:
730 advargs.append((b'hint', exc.hint))
730 advargs.append((b'hint', exc.hint))
731 bundler.addpart(
731 bundler.addpart(
732 bundle2.bundlepart(b'error:abort', manargs, advargs)
732 bundle2.bundlepart(b'error:abort', manargs, advargs)
733 )
733 )
734 except error.PushRaced as exc:
734 except error.PushRaced as exc:
735 bundler.newpart(
735 bundler.newpart(
736 b'error:pushraced',
736 b'error:pushraced',
737 [(b'message', stringutil.forcebytestr(exc))],
737 [(b'message', stringutil.forcebytestr(exc))],
738 )
738 )
739 return wireprototypes.streamreslegacy(gen=bundler.getchunks())
739 return wireprototypes.streamreslegacy(gen=bundler.getchunks())
General Comments 0
You need to be logged in to leave comments. Login now