##// END OF EJS Templates
streamclone: move wire protocol status code from wireproto command...
Gregory Szorc -
r35507:ded3a63f default
parent child Browse files
Show More
@@ -1,414 +1,430 b''
1 # streamclone.py - producing and consuming streaming repository data
1 # streamclone.py - producing and consuming streaming repository data
2 #
2 #
3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import struct
10 import struct
11
11
12 from .i18n import _
12 from .i18n import _
13 from . import (
13 from . import (
14 branchmap,
14 branchmap,
15 error,
15 error,
16 phases,
16 phases,
17 store,
17 store,
18 util,
18 util,
19 )
19 )
20
20
21 def canperformstreamclone(pullop, bailifbundle2supported=False):
21 def canperformstreamclone(pullop, bailifbundle2supported=False):
22 """Whether it is possible to perform a streaming clone as part of pull.
22 """Whether it is possible to perform a streaming clone as part of pull.
23
23
24 ``bailifbundle2supported`` will cause the function to return False if
24 ``bailifbundle2supported`` will cause the function to return False if
25 bundle2 stream clones are supported. It should only be called by the
25 bundle2 stream clones are supported. It should only be called by the
26 legacy stream clone code path.
26 legacy stream clone code path.
27
27
28 Returns a tuple of (supported, requirements). ``supported`` is True if
28 Returns a tuple of (supported, requirements). ``supported`` is True if
29 streaming clone is supported and False otherwise. ``requirements`` is
29 streaming clone is supported and False otherwise. ``requirements`` is
30 a set of repo requirements from the remote, or ``None`` if stream clone
30 a set of repo requirements from the remote, or ``None`` if stream clone
31 isn't supported.
31 isn't supported.
32 """
32 """
33 repo = pullop.repo
33 repo = pullop.repo
34 remote = pullop.remote
34 remote = pullop.remote
35
35
36 bundle2supported = False
36 bundle2supported = False
37 if pullop.canusebundle2:
37 if pullop.canusebundle2:
38 if 'v1' in pullop.remotebundle2caps.get('stream', []):
38 if 'v1' in pullop.remotebundle2caps.get('stream', []):
39 bundle2supported = True
39 bundle2supported = True
40 # else
40 # else
41 # Server doesn't support bundle2 stream clone or doesn't support
41 # Server doesn't support bundle2 stream clone or doesn't support
42 # the versions we support. Fall back and possibly allow legacy.
42 # the versions we support. Fall back and possibly allow legacy.
43
43
44 # Ensures legacy code path uses available bundle2.
44 # Ensures legacy code path uses available bundle2.
45 if bailifbundle2supported and bundle2supported:
45 if bailifbundle2supported and bundle2supported:
46 return False, None
46 return False, None
47 # Ensures bundle2 doesn't try to do a stream clone if it isn't supported.
47 # Ensures bundle2 doesn't try to do a stream clone if it isn't supported.
48 #elif not bailifbundle2supported and not bundle2supported:
48 #elif not bailifbundle2supported and not bundle2supported:
49 # return False, None
49 # return False, None
50
50
51 # Streaming clone only works on empty repositories.
51 # Streaming clone only works on empty repositories.
52 if len(repo):
52 if len(repo):
53 return False, None
53 return False, None
54
54
55 # Streaming clone only works if all data is being requested.
55 # Streaming clone only works if all data is being requested.
56 if pullop.heads:
56 if pullop.heads:
57 return False, None
57 return False, None
58
58
59 streamrequested = pullop.streamclonerequested
59 streamrequested = pullop.streamclonerequested
60
60
61 # If we don't have a preference, let the server decide for us. This
61 # If we don't have a preference, let the server decide for us. This
62 # likely only comes into play in LANs.
62 # likely only comes into play in LANs.
63 if streamrequested is None:
63 if streamrequested is None:
64 # The server can advertise whether to prefer streaming clone.
64 # The server can advertise whether to prefer streaming clone.
65 streamrequested = remote.capable('stream-preferred')
65 streamrequested = remote.capable('stream-preferred')
66
66
67 if not streamrequested:
67 if not streamrequested:
68 return False, None
68 return False, None
69
69
70 # In order for stream clone to work, the client has to support all the
70 # In order for stream clone to work, the client has to support all the
71 # requirements advertised by the server.
71 # requirements advertised by the server.
72 #
72 #
73 # The server advertises its requirements via the "stream" and "streamreqs"
73 # The server advertises its requirements via the "stream" and "streamreqs"
74 # capability. "stream" (a value-less capability) is advertised if and only
74 # capability. "stream" (a value-less capability) is advertised if and only
75 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
75 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
76 # is advertised and contains a comma-delimited list of requirements.
76 # is advertised and contains a comma-delimited list of requirements.
77 requirements = set()
77 requirements = set()
78 if remote.capable('stream'):
78 if remote.capable('stream'):
79 requirements.add('revlogv1')
79 requirements.add('revlogv1')
80 else:
80 else:
81 streamreqs = remote.capable('streamreqs')
81 streamreqs = remote.capable('streamreqs')
82 # This is weird and shouldn't happen with modern servers.
82 # This is weird and shouldn't happen with modern servers.
83 if not streamreqs:
83 if not streamreqs:
84 pullop.repo.ui.warn(_(
84 pullop.repo.ui.warn(_(
85 'warning: stream clone requested but server has them '
85 'warning: stream clone requested but server has them '
86 'disabled\n'))
86 'disabled\n'))
87 return False, None
87 return False, None
88
88
89 streamreqs = set(streamreqs.split(','))
89 streamreqs = set(streamreqs.split(','))
90 # Server requires something we don't support. Bail.
90 # Server requires something we don't support. Bail.
91 missingreqs = streamreqs - repo.supportedformats
91 missingreqs = streamreqs - repo.supportedformats
92 if missingreqs:
92 if missingreqs:
93 pullop.repo.ui.warn(_(
93 pullop.repo.ui.warn(_(
94 'warning: stream clone requested but client is missing '
94 'warning: stream clone requested but client is missing '
95 'requirements: %s\n') % ', '.join(sorted(missingreqs)))
95 'requirements: %s\n') % ', '.join(sorted(missingreqs)))
96 pullop.repo.ui.warn(
96 pullop.repo.ui.warn(
97 _('(see https://www.mercurial-scm.org/wiki/MissingRequirement '
97 _('(see https://www.mercurial-scm.org/wiki/MissingRequirement '
98 'for more information)\n'))
98 'for more information)\n'))
99 return False, None
99 return False, None
100 requirements = streamreqs
100 requirements = streamreqs
101
101
102 return True, requirements
102 return True, requirements
103
103
104 def maybeperformlegacystreamclone(pullop):
104 def maybeperformlegacystreamclone(pullop):
105 """Possibly perform a legacy stream clone operation.
105 """Possibly perform a legacy stream clone operation.
106
106
107 Legacy stream clones are performed as part of pull but before all other
107 Legacy stream clones are performed as part of pull but before all other
108 operations.
108 operations.
109
109
110 A legacy stream clone will not be performed if a bundle2 stream clone is
110 A legacy stream clone will not be performed if a bundle2 stream clone is
111 supported.
111 supported.
112 """
112 """
113 supported, requirements = canperformstreamclone(pullop)
113 supported, requirements = canperformstreamclone(pullop)
114
114
115 if not supported:
115 if not supported:
116 return
116 return
117
117
118 repo = pullop.repo
118 repo = pullop.repo
119 remote = pullop.remote
119 remote = pullop.remote
120
120
121 # Save remote branchmap. We will use it later to speed up branchcache
121 # Save remote branchmap. We will use it later to speed up branchcache
122 # creation.
122 # creation.
123 rbranchmap = None
123 rbranchmap = None
124 if remote.capable('branchmap'):
124 if remote.capable('branchmap'):
125 rbranchmap = remote.branchmap()
125 rbranchmap = remote.branchmap()
126
126
127 repo.ui.status(_('streaming all changes\n'))
127 repo.ui.status(_('streaming all changes\n'))
128
128
129 fp = remote.stream_out()
129 fp = remote.stream_out()
130 l = fp.readline()
130 l = fp.readline()
131 try:
131 try:
132 resp = int(l)
132 resp = int(l)
133 except ValueError:
133 except ValueError:
134 raise error.ResponseError(
134 raise error.ResponseError(
135 _('unexpected response from remote server:'), l)
135 _('unexpected response from remote server:'), l)
136 if resp == 1:
136 if resp == 1:
137 raise error.Abort(_('operation forbidden by server'))
137 raise error.Abort(_('operation forbidden by server'))
138 elif resp == 2:
138 elif resp == 2:
139 raise error.Abort(_('locking the remote repository failed'))
139 raise error.Abort(_('locking the remote repository failed'))
140 elif resp != 0:
140 elif resp != 0:
141 raise error.Abort(_('the server sent an unknown error code'))
141 raise error.Abort(_('the server sent an unknown error code'))
142
142
143 l = fp.readline()
143 l = fp.readline()
144 try:
144 try:
145 filecount, bytecount = map(int, l.split(' ', 1))
145 filecount, bytecount = map(int, l.split(' ', 1))
146 except (ValueError, TypeError):
146 except (ValueError, TypeError):
147 raise error.ResponseError(
147 raise error.ResponseError(
148 _('unexpected response from remote server:'), l)
148 _('unexpected response from remote server:'), l)
149
149
150 with repo.lock():
150 with repo.lock():
151 consumev1(repo, fp, filecount, bytecount)
151 consumev1(repo, fp, filecount, bytecount)
152
152
153 # new requirements = old non-format requirements +
153 # new requirements = old non-format requirements +
154 # new format-related remote requirements
154 # new format-related remote requirements
155 # requirements from the streamed-in repository
155 # requirements from the streamed-in repository
156 repo.requirements = requirements | (
156 repo.requirements = requirements | (
157 repo.requirements - repo.supportedformats)
157 repo.requirements - repo.supportedformats)
158 repo._applyopenerreqs()
158 repo._applyopenerreqs()
159 repo._writerequirements()
159 repo._writerequirements()
160
160
161 if rbranchmap:
161 if rbranchmap:
162 branchmap.replacecache(repo, rbranchmap)
162 branchmap.replacecache(repo, rbranchmap)
163
163
164 repo.invalidate()
164 repo.invalidate()
165
165
166 def allowservergeneration(repo):
166 def allowservergeneration(repo):
167 """Whether streaming clones are allowed from the server."""
167 """Whether streaming clones are allowed from the server."""
168 if not repo.ui.configbool('server', 'uncompressed', untrusted=True):
168 if not repo.ui.configbool('server', 'uncompressed', untrusted=True):
169 return False
169 return False
170
170
171 # The way stream clone works makes it impossible to hide secret changesets.
171 # The way stream clone works makes it impossible to hide secret changesets.
172 # So don't allow this by default.
172 # So don't allow this by default.
173 secret = phases.hassecret(repo)
173 secret = phases.hassecret(repo)
174 if secret:
174 if secret:
175 return repo.ui.configbool('server', 'uncompressedallowsecret')
175 return repo.ui.configbool('server', 'uncompressedallowsecret')
176
176
177 return True
177 return True
178
178
179 # This is it's own function so extensions can override it.
179 # This is it's own function so extensions can override it.
180 def _walkstreamfiles(repo):
180 def _walkstreamfiles(repo):
181 return repo.store.walk()
181 return repo.store.walk()
182
182
183 def generatev1(repo):
183 def generatev1(repo):
184 """Emit content for version 1 of a streaming clone.
184 """Emit content for version 1 of a streaming clone.
185
185
186 This returns a 3-tuple of (file count, byte size, data iterator).
186 This returns a 3-tuple of (file count, byte size, data iterator).
187
187
188 The data iterator consists of N entries for each file being transferred.
188 The data iterator consists of N entries for each file being transferred.
189 Each file entry starts as a line with the file name and integer size
189 Each file entry starts as a line with the file name and integer size
190 delimited by a null byte.
190 delimited by a null byte.
191
191
192 The raw file data follows. Following the raw file data is the next file
192 The raw file data follows. Following the raw file data is the next file
193 entry, or EOF.
193 entry, or EOF.
194
194
195 When used on the wire protocol, an additional line indicating protocol
195 When used on the wire protocol, an additional line indicating protocol
196 success will be prepended to the stream. This function is not responsible
196 success will be prepended to the stream. This function is not responsible
197 for adding it.
197 for adding it.
198
198
199 This function will obtain a repository lock to ensure a consistent view of
199 This function will obtain a repository lock to ensure a consistent view of
200 the store is captured. It therefore may raise LockError.
200 the store is captured. It therefore may raise LockError.
201 """
201 """
202 entries = []
202 entries = []
203 total_bytes = 0
203 total_bytes = 0
204 # Get consistent snapshot of repo, lock during scan.
204 # Get consistent snapshot of repo, lock during scan.
205 with repo.lock():
205 with repo.lock():
206 repo.ui.debug('scanning\n')
206 repo.ui.debug('scanning\n')
207 for name, ename, size in _walkstreamfiles(repo):
207 for name, ename, size in _walkstreamfiles(repo):
208 if size:
208 if size:
209 entries.append((name, size))
209 entries.append((name, size))
210 total_bytes += size
210 total_bytes += size
211
211
212 repo.ui.debug('%d files, %d bytes to transfer\n' %
212 repo.ui.debug('%d files, %d bytes to transfer\n' %
213 (len(entries), total_bytes))
213 (len(entries), total_bytes))
214
214
215 svfs = repo.svfs
215 svfs = repo.svfs
216 debugflag = repo.ui.debugflag
216 debugflag = repo.ui.debugflag
217
217
218 def emitrevlogdata():
218 def emitrevlogdata():
219 for name, size in entries:
219 for name, size in entries:
220 if debugflag:
220 if debugflag:
221 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
221 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
222 # partially encode name over the wire for backwards compat
222 # partially encode name over the wire for backwards compat
223 yield '%s\0%d\n' % (store.encodedir(name), size)
223 yield '%s\0%d\n' % (store.encodedir(name), size)
224 # auditing at this stage is both pointless (paths are already
224 # auditing at this stage is both pointless (paths are already
225 # trusted by the local repo) and expensive
225 # trusted by the local repo) and expensive
226 with svfs(name, 'rb', auditpath=False) as fp:
226 with svfs(name, 'rb', auditpath=False) as fp:
227 if size <= 65536:
227 if size <= 65536:
228 yield fp.read(size)
228 yield fp.read(size)
229 else:
229 else:
230 for chunk in util.filechunkiter(fp, limit=size):
230 for chunk in util.filechunkiter(fp, limit=size):
231 yield chunk
231 yield chunk
232
232
233 return len(entries), total_bytes, emitrevlogdata()
233 return len(entries), total_bytes, emitrevlogdata()
234
234
235 def generatev1wireproto(repo):
235 def generatev1wireproto(repo):
236 """Emit content for version 1 of streaming clone suitable for the wire.
236 """Emit content for version 1 of streaming clone suitable for the wire.
237
237
238 This is the data output from ``generatev1()`` with a header line
238 This is the data output from ``generatev1()`` with 2 header lines. The
239 indicating file count and byte size.
239 first line indicates overall success. The 2nd contains the file count and
240 byte size of payload.
241
242 The success line contains "0" for success, "1" for stream generation not
243 allowed, and "2" for error locking the repository (possibly indicating
244 a permissions error for the server process).
240 """
245 """
246 if not allowservergeneration(repo):
247 yield '1\n'
248 return
249
250 try:
241 filecount, bytecount, it = generatev1(repo)
251 filecount, bytecount, it = generatev1(repo)
252 except error.LockError:
253 yield '2\n'
254 return
255
256 # Indicates successful response.
257 yield '0\n'
242 yield '%d %d\n' % (filecount, bytecount)
258 yield '%d %d\n' % (filecount, bytecount)
243 for chunk in it:
259 for chunk in it:
244 yield chunk
260 yield chunk
245
261
246 def generatebundlev1(repo, compression='UN'):
262 def generatebundlev1(repo, compression='UN'):
247 """Emit content for version 1 of a stream clone bundle.
263 """Emit content for version 1 of a stream clone bundle.
248
264
249 The first 4 bytes of the output ("HGS1") denote this as stream clone
265 The first 4 bytes of the output ("HGS1") denote this as stream clone
250 bundle version 1.
266 bundle version 1.
251
267
252 The next 2 bytes indicate the compression type. Only "UN" is currently
268 The next 2 bytes indicate the compression type. Only "UN" is currently
253 supported.
269 supported.
254
270
255 The next 16 bytes are two 64-bit big endian unsigned integers indicating
271 The next 16 bytes are two 64-bit big endian unsigned integers indicating
256 file count and byte count, respectively.
272 file count and byte count, respectively.
257
273
258 The next 2 bytes is a 16-bit big endian unsigned short declaring the length
274 The next 2 bytes is a 16-bit big endian unsigned short declaring the length
259 of the requirements string, including a trailing \0. The following N bytes
275 of the requirements string, including a trailing \0. The following N bytes
260 are the requirements string, which is ASCII containing a comma-delimited
276 are the requirements string, which is ASCII containing a comma-delimited
261 list of repo requirements that are needed to support the data.
277 list of repo requirements that are needed to support the data.
262
278
263 The remaining content is the output of ``generatev1()`` (which may be
279 The remaining content is the output of ``generatev1()`` (which may be
264 compressed in the future).
280 compressed in the future).
265
281
266 Returns a tuple of (requirements, data generator).
282 Returns a tuple of (requirements, data generator).
267 """
283 """
268 if compression != 'UN':
284 if compression != 'UN':
269 raise ValueError('we do not support the compression argument yet')
285 raise ValueError('we do not support the compression argument yet')
270
286
271 requirements = repo.requirements & repo.supportedformats
287 requirements = repo.requirements & repo.supportedformats
272 requires = ','.join(sorted(requirements))
288 requires = ','.join(sorted(requirements))
273
289
274 def gen():
290 def gen():
275 yield 'HGS1'
291 yield 'HGS1'
276 yield compression
292 yield compression
277
293
278 filecount, bytecount, it = generatev1(repo)
294 filecount, bytecount, it = generatev1(repo)
279 repo.ui.status(_('writing %d bytes for %d files\n') %
295 repo.ui.status(_('writing %d bytes for %d files\n') %
280 (bytecount, filecount))
296 (bytecount, filecount))
281
297
282 yield struct.pack('>QQ', filecount, bytecount)
298 yield struct.pack('>QQ', filecount, bytecount)
283 yield struct.pack('>H', len(requires) + 1)
299 yield struct.pack('>H', len(requires) + 1)
284 yield requires + '\0'
300 yield requires + '\0'
285
301
286 # This is where we'll add compression in the future.
302 # This is where we'll add compression in the future.
287 assert compression == 'UN'
303 assert compression == 'UN'
288
304
289 seen = 0
305 seen = 0
290 repo.ui.progress(_('bundle'), 0, total=bytecount, unit=_('bytes'))
306 repo.ui.progress(_('bundle'), 0, total=bytecount, unit=_('bytes'))
291
307
292 for chunk in it:
308 for chunk in it:
293 seen += len(chunk)
309 seen += len(chunk)
294 repo.ui.progress(_('bundle'), seen, total=bytecount,
310 repo.ui.progress(_('bundle'), seen, total=bytecount,
295 unit=_('bytes'))
311 unit=_('bytes'))
296 yield chunk
312 yield chunk
297
313
298 repo.ui.progress(_('bundle'), None)
314 repo.ui.progress(_('bundle'), None)
299
315
300 return requirements, gen()
316 return requirements, gen()
301
317
302 def consumev1(repo, fp, filecount, bytecount):
318 def consumev1(repo, fp, filecount, bytecount):
303 """Apply the contents from version 1 of a streaming clone file handle.
319 """Apply the contents from version 1 of a streaming clone file handle.
304
320
305 This takes the output from "stream_out" and applies it to the specified
321 This takes the output from "stream_out" and applies it to the specified
306 repository.
322 repository.
307
323
308 Like "stream_out," the status line added by the wire protocol is not
324 Like "stream_out," the status line added by the wire protocol is not
309 handled by this function.
325 handled by this function.
310 """
326 """
311 with repo.lock():
327 with repo.lock():
312 repo.ui.status(_('%d files to transfer, %s of data\n') %
328 repo.ui.status(_('%d files to transfer, %s of data\n') %
313 (filecount, util.bytecount(bytecount)))
329 (filecount, util.bytecount(bytecount)))
314 handled_bytes = 0
330 handled_bytes = 0
315 repo.ui.progress(_('clone'), 0, total=bytecount, unit=_('bytes'))
331 repo.ui.progress(_('clone'), 0, total=bytecount, unit=_('bytes'))
316 start = util.timer()
332 start = util.timer()
317
333
318 # TODO: get rid of (potential) inconsistency
334 # TODO: get rid of (potential) inconsistency
319 #
335 #
320 # If transaction is started and any @filecache property is
336 # If transaction is started and any @filecache property is
321 # changed at this point, it causes inconsistency between
337 # changed at this point, it causes inconsistency between
322 # in-memory cached property and streamclone-ed file on the
338 # in-memory cached property and streamclone-ed file on the
323 # disk. Nested transaction prevents transaction scope "clone"
339 # disk. Nested transaction prevents transaction scope "clone"
324 # below from writing in-memory changes out at the end of it,
340 # below from writing in-memory changes out at the end of it,
325 # even though in-memory changes are discarded at the end of it
341 # even though in-memory changes are discarded at the end of it
326 # regardless of transaction nesting.
342 # regardless of transaction nesting.
327 #
343 #
328 # But transaction nesting can't be simply prohibited, because
344 # But transaction nesting can't be simply prohibited, because
329 # nesting occurs also in ordinary case (e.g. enabling
345 # nesting occurs also in ordinary case (e.g. enabling
330 # clonebundles).
346 # clonebundles).
331
347
332 with repo.transaction('clone'):
348 with repo.transaction('clone'):
333 with repo.svfs.backgroundclosing(repo.ui, expectedcount=filecount):
349 with repo.svfs.backgroundclosing(repo.ui, expectedcount=filecount):
334 for i in xrange(filecount):
350 for i in xrange(filecount):
335 # XXX doesn't support '\n' or '\r' in filenames
351 # XXX doesn't support '\n' or '\r' in filenames
336 l = fp.readline()
352 l = fp.readline()
337 try:
353 try:
338 name, size = l.split('\0', 1)
354 name, size = l.split('\0', 1)
339 size = int(size)
355 size = int(size)
340 except (ValueError, TypeError):
356 except (ValueError, TypeError):
341 raise error.ResponseError(
357 raise error.ResponseError(
342 _('unexpected response from remote server:'), l)
358 _('unexpected response from remote server:'), l)
343 if repo.ui.debugflag:
359 if repo.ui.debugflag:
344 repo.ui.debug('adding %s (%s)\n' %
360 repo.ui.debug('adding %s (%s)\n' %
345 (name, util.bytecount(size)))
361 (name, util.bytecount(size)))
346 # for backwards compat, name was partially encoded
362 # for backwards compat, name was partially encoded
347 path = store.decodedir(name)
363 path = store.decodedir(name)
348 with repo.svfs(path, 'w', backgroundclose=True) as ofp:
364 with repo.svfs(path, 'w', backgroundclose=True) as ofp:
349 for chunk in util.filechunkiter(fp, limit=size):
365 for chunk in util.filechunkiter(fp, limit=size):
350 handled_bytes += len(chunk)
366 handled_bytes += len(chunk)
351 repo.ui.progress(_('clone'), handled_bytes,
367 repo.ui.progress(_('clone'), handled_bytes,
352 total=bytecount, unit=_('bytes'))
368 total=bytecount, unit=_('bytes'))
353 ofp.write(chunk)
369 ofp.write(chunk)
354
370
355 # force @filecache properties to be reloaded from
371 # force @filecache properties to be reloaded from
356 # streamclone-ed file at next access
372 # streamclone-ed file at next access
357 repo.invalidate(clearfilecache=True)
373 repo.invalidate(clearfilecache=True)
358
374
359 elapsed = util.timer() - start
375 elapsed = util.timer() - start
360 if elapsed <= 0:
376 if elapsed <= 0:
361 elapsed = 0.001
377 elapsed = 0.001
362 repo.ui.progress(_('clone'), None)
378 repo.ui.progress(_('clone'), None)
363 repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
379 repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
364 (util.bytecount(bytecount), elapsed,
380 (util.bytecount(bytecount), elapsed,
365 util.bytecount(bytecount / elapsed)))
381 util.bytecount(bytecount / elapsed)))
366
382
367 def readbundle1header(fp):
383 def readbundle1header(fp):
368 compression = fp.read(2)
384 compression = fp.read(2)
369 if compression != 'UN':
385 if compression != 'UN':
370 raise error.Abort(_('only uncompressed stream clone bundles are '
386 raise error.Abort(_('only uncompressed stream clone bundles are '
371 'supported; got %s') % compression)
387 'supported; got %s') % compression)
372
388
373 filecount, bytecount = struct.unpack('>QQ', fp.read(16))
389 filecount, bytecount = struct.unpack('>QQ', fp.read(16))
374 requireslen = struct.unpack('>H', fp.read(2))[0]
390 requireslen = struct.unpack('>H', fp.read(2))[0]
375 requires = fp.read(requireslen)
391 requires = fp.read(requireslen)
376
392
377 if not requires.endswith('\0'):
393 if not requires.endswith('\0'):
378 raise error.Abort(_('malformed stream clone bundle: '
394 raise error.Abort(_('malformed stream clone bundle: '
379 'requirements not properly encoded'))
395 'requirements not properly encoded'))
380
396
381 requirements = set(requires.rstrip('\0').split(','))
397 requirements = set(requires.rstrip('\0').split(','))
382
398
383 return filecount, bytecount, requirements
399 return filecount, bytecount, requirements
384
400
385 def applybundlev1(repo, fp):
401 def applybundlev1(repo, fp):
386 """Apply the content from a stream clone bundle version 1.
402 """Apply the content from a stream clone bundle version 1.
387
403
388 We assume the 4 byte header has been read and validated and the file handle
404 We assume the 4 byte header has been read and validated and the file handle
389 is at the 2 byte compression identifier.
405 is at the 2 byte compression identifier.
390 """
406 """
391 if len(repo):
407 if len(repo):
392 raise error.Abort(_('cannot apply stream clone bundle on non-empty '
408 raise error.Abort(_('cannot apply stream clone bundle on non-empty '
393 'repo'))
409 'repo'))
394
410
395 filecount, bytecount, requirements = readbundle1header(fp)
411 filecount, bytecount, requirements = readbundle1header(fp)
396 missingreqs = requirements - repo.supportedformats
412 missingreqs = requirements - repo.supportedformats
397 if missingreqs:
413 if missingreqs:
398 raise error.Abort(_('unable to apply stream clone: '
414 raise error.Abort(_('unable to apply stream clone: '
399 'unsupported format: %s') %
415 'unsupported format: %s') %
400 ', '.join(sorted(missingreqs)))
416 ', '.join(sorted(missingreqs)))
401
417
402 consumev1(repo, fp, filecount, bytecount)
418 consumev1(repo, fp, filecount, bytecount)
403
419
404 class streamcloneapplier(object):
420 class streamcloneapplier(object):
405 """Class to manage applying streaming clone bundles.
421 """Class to manage applying streaming clone bundles.
406
422
407 We need to wrap ``applybundlev1()`` in a dedicated type to enable bundle
423 We need to wrap ``applybundlev1()`` in a dedicated type to enable bundle
408 readers to perform bundle type-specific functionality.
424 readers to perform bundle type-specific functionality.
409 """
425 """
410 def __init__(self, fh):
426 def __init__(self, fh):
411 self._fh = fh
427 self._fh = fh
412
428
413 def apply(self, repo):
429 def apply(self, repo):
414 return applybundlev1(repo, self._fh)
430 return applybundlev1(repo, self._fh)
@@ -1,1070 +1,1056 b''
1 # wireproto.py - generic wire protocol support functions
1 # wireproto.py - generic wire protocol support functions
2 #
2 #
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import hashlib
10 import hashlib
11 import os
11 import os
12 import tempfile
12 import tempfile
13
13
14 from .i18n import _
14 from .i18n import _
15 from .node import (
15 from .node import (
16 bin,
16 bin,
17 hex,
17 hex,
18 nullid,
18 nullid,
19 )
19 )
20
20
21 from . import (
21 from . import (
22 bundle2,
22 bundle2,
23 changegroup as changegroupmod,
23 changegroup as changegroupmod,
24 discovery,
24 discovery,
25 encoding,
25 encoding,
26 error,
26 error,
27 exchange,
27 exchange,
28 peer,
28 peer,
29 pushkey as pushkeymod,
29 pushkey as pushkeymod,
30 pycompat,
30 pycompat,
31 repository,
31 repository,
32 streamclone,
32 streamclone,
33 util,
33 util,
34 )
34 )
35
35
36 urlerr = util.urlerr
36 urlerr = util.urlerr
37 urlreq = util.urlreq
37 urlreq = util.urlreq
38
38
39 bundle2requiredmain = _('incompatible Mercurial client; bundle2 required')
39 bundle2requiredmain = _('incompatible Mercurial client; bundle2 required')
40 bundle2requiredhint = _('see https://www.mercurial-scm.org/wiki/'
40 bundle2requiredhint = _('see https://www.mercurial-scm.org/wiki/'
41 'IncompatibleClient')
41 'IncompatibleClient')
42 bundle2required = '%s\n(%s)\n' % (bundle2requiredmain, bundle2requiredhint)
42 bundle2required = '%s\n(%s)\n' % (bundle2requiredmain, bundle2requiredhint)
43
43
44 class abstractserverproto(object):
44 class abstractserverproto(object):
45 """abstract class that summarizes the protocol API
45 """abstract class that summarizes the protocol API
46
46
47 Used as reference and documentation.
47 Used as reference and documentation.
48 """
48 """
49
49
50 def getargs(self, args):
50 def getargs(self, args):
51 """return the value for arguments in <args>
51 """return the value for arguments in <args>
52
52
53 returns a list of values (same order as <args>)"""
53 returns a list of values (same order as <args>)"""
54 raise NotImplementedError()
54 raise NotImplementedError()
55
55
56 def getfile(self, fp):
56 def getfile(self, fp):
57 """write the whole content of a file into a file like object
57 """write the whole content of a file into a file like object
58
58
59 The file is in the form::
59 The file is in the form::
60
60
61 (<chunk-size>\n<chunk>)+0\n
61 (<chunk-size>\n<chunk>)+0\n
62
62
63 chunk size is the ascii version of the int.
63 chunk size is the ascii version of the int.
64 """
64 """
65 raise NotImplementedError()
65 raise NotImplementedError()
66
66
67 def redirect(self):
67 def redirect(self):
68 """may setup interception for stdout and stderr
68 """may setup interception for stdout and stderr
69
69
70 See also the `restore` method."""
70 See also the `restore` method."""
71 raise NotImplementedError()
71 raise NotImplementedError()
72
72
73 # If the `redirect` function does install interception, the `restore`
73 # If the `redirect` function does install interception, the `restore`
74 # function MUST be defined. If interception is not used, this function
74 # function MUST be defined. If interception is not used, this function
75 # MUST NOT be defined.
75 # MUST NOT be defined.
76 #
76 #
77 # left commented here on purpose
77 # left commented here on purpose
78 #
78 #
79 #def restore(self):
79 #def restore(self):
80 # """reinstall previous stdout and stderr and return intercepted stdout
80 # """reinstall previous stdout and stderr and return intercepted stdout
81 # """
81 # """
82 # raise NotImplementedError()
82 # raise NotImplementedError()
83
83
84 class remoteiterbatcher(peer.iterbatcher):
84 class remoteiterbatcher(peer.iterbatcher):
85 def __init__(self, remote):
85 def __init__(self, remote):
86 super(remoteiterbatcher, self).__init__()
86 super(remoteiterbatcher, self).__init__()
87 self._remote = remote
87 self._remote = remote
88
88
89 def __getattr__(self, name):
89 def __getattr__(self, name):
90 # Validate this method is batchable, since submit() only supports
90 # Validate this method is batchable, since submit() only supports
91 # batchable methods.
91 # batchable methods.
92 fn = getattr(self._remote, name)
92 fn = getattr(self._remote, name)
93 if not getattr(fn, 'batchable', None):
93 if not getattr(fn, 'batchable', None):
94 raise error.ProgrammingError('Attempted to batch a non-batchable '
94 raise error.ProgrammingError('Attempted to batch a non-batchable '
95 'call to %r' % name)
95 'call to %r' % name)
96
96
97 return super(remoteiterbatcher, self).__getattr__(name)
97 return super(remoteiterbatcher, self).__getattr__(name)
98
98
99 def submit(self):
99 def submit(self):
100 """Break the batch request into many patch calls and pipeline them.
100 """Break the batch request into many patch calls and pipeline them.
101
101
102 This is mostly valuable over http where request sizes can be
102 This is mostly valuable over http where request sizes can be
103 limited, but can be used in other places as well.
103 limited, but can be used in other places as well.
104 """
104 """
105 # 2-tuple of (command, arguments) that represents what will be
105 # 2-tuple of (command, arguments) that represents what will be
106 # sent over the wire.
106 # sent over the wire.
107 requests = []
107 requests = []
108
108
109 # 4-tuple of (command, final future, @batchable generator, remote
109 # 4-tuple of (command, final future, @batchable generator, remote
110 # future).
110 # future).
111 results = []
111 results = []
112
112
113 for command, args, opts, finalfuture in self.calls:
113 for command, args, opts, finalfuture in self.calls:
114 mtd = getattr(self._remote, command)
114 mtd = getattr(self._remote, command)
115 batchable = mtd.batchable(mtd.__self__, *args, **opts)
115 batchable = mtd.batchable(mtd.__self__, *args, **opts)
116
116
117 commandargs, fremote = next(batchable)
117 commandargs, fremote = next(batchable)
118 assert fremote
118 assert fremote
119 requests.append((command, commandargs))
119 requests.append((command, commandargs))
120 results.append((command, finalfuture, batchable, fremote))
120 results.append((command, finalfuture, batchable, fremote))
121
121
122 if requests:
122 if requests:
123 self._resultiter = self._remote._submitbatch(requests)
123 self._resultiter = self._remote._submitbatch(requests)
124
124
125 self._results = results
125 self._results = results
126
126
127 def results(self):
127 def results(self):
128 for command, finalfuture, batchable, remotefuture in self._results:
128 for command, finalfuture, batchable, remotefuture in self._results:
129 # Get the raw result, set it in the remote future, feed it
129 # Get the raw result, set it in the remote future, feed it
130 # back into the @batchable generator so it can be decoded, and
130 # back into the @batchable generator so it can be decoded, and
131 # set the result on the final future to this value.
131 # set the result on the final future to this value.
132 remoteresult = next(self._resultiter)
132 remoteresult = next(self._resultiter)
133 remotefuture.set(remoteresult)
133 remotefuture.set(remoteresult)
134 finalfuture.set(next(batchable))
134 finalfuture.set(next(batchable))
135
135
136 # Verify our @batchable generators only emit 2 values.
136 # Verify our @batchable generators only emit 2 values.
137 try:
137 try:
138 next(batchable)
138 next(batchable)
139 except StopIteration:
139 except StopIteration:
140 pass
140 pass
141 else:
141 else:
142 raise error.ProgrammingError('%s @batchable generator emitted '
142 raise error.ProgrammingError('%s @batchable generator emitted '
143 'unexpected value count' % command)
143 'unexpected value count' % command)
144
144
145 yield finalfuture.value
145 yield finalfuture.value
146
146
147 # Forward a couple of names from peer to make wireproto interactions
147 # Forward a couple of names from peer to make wireproto interactions
148 # slightly more sensible.
148 # slightly more sensible.
149 batchable = peer.batchable
149 batchable = peer.batchable
150 future = peer.future
150 future = peer.future
151
151
152 # list of nodes encoding / decoding
152 # list of nodes encoding / decoding
153
153
154 def decodelist(l, sep=' '):
154 def decodelist(l, sep=' '):
155 if l:
155 if l:
156 return [bin(v) for v in l.split(sep)]
156 return [bin(v) for v in l.split(sep)]
157 return []
157 return []
158
158
159 def encodelist(l, sep=' '):
159 def encodelist(l, sep=' '):
160 try:
160 try:
161 return sep.join(map(hex, l))
161 return sep.join(map(hex, l))
162 except TypeError:
162 except TypeError:
163 raise
163 raise
164
164
165 # batched call argument encoding
165 # batched call argument encoding
166
166
167 def escapearg(plain):
167 def escapearg(plain):
168 return (plain
168 return (plain
169 .replace(':', ':c')
169 .replace(':', ':c')
170 .replace(',', ':o')
170 .replace(',', ':o')
171 .replace(';', ':s')
171 .replace(';', ':s')
172 .replace('=', ':e'))
172 .replace('=', ':e'))
173
173
174 def unescapearg(escaped):
174 def unescapearg(escaped):
175 return (escaped
175 return (escaped
176 .replace(':e', '=')
176 .replace(':e', '=')
177 .replace(':s', ';')
177 .replace(':s', ';')
178 .replace(':o', ',')
178 .replace(':o', ',')
179 .replace(':c', ':'))
179 .replace(':c', ':'))
180
180
181 def encodebatchcmds(req):
181 def encodebatchcmds(req):
182 """Return a ``cmds`` argument value for the ``batch`` command."""
182 """Return a ``cmds`` argument value for the ``batch`` command."""
183 cmds = []
183 cmds = []
184 for op, argsdict in req:
184 for op, argsdict in req:
185 # Old servers didn't properly unescape argument names. So prevent
185 # Old servers didn't properly unescape argument names. So prevent
186 # the sending of argument names that may not be decoded properly by
186 # the sending of argument names that may not be decoded properly by
187 # servers.
187 # servers.
188 assert all(escapearg(k) == k for k in argsdict)
188 assert all(escapearg(k) == k for k in argsdict)
189
189
190 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
190 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
191 for k, v in argsdict.iteritems())
191 for k, v in argsdict.iteritems())
192 cmds.append('%s %s' % (op, args))
192 cmds.append('%s %s' % (op, args))
193
193
194 return ';'.join(cmds)
194 return ';'.join(cmds)
195
195
196 # mapping of options accepted by getbundle and their types
196 # mapping of options accepted by getbundle and their types
197 #
197 #
198 # Meant to be extended by extensions. It is extensions responsibility to ensure
198 # Meant to be extended by extensions. It is extensions responsibility to ensure
199 # such options are properly processed in exchange.getbundle.
199 # such options are properly processed in exchange.getbundle.
200 #
200 #
201 # supported types are:
201 # supported types are:
202 #
202 #
203 # :nodes: list of binary nodes
203 # :nodes: list of binary nodes
204 # :csv: list of comma-separated values
204 # :csv: list of comma-separated values
205 # :scsv: list of comma-separated values return as set
205 # :scsv: list of comma-separated values return as set
206 # :plain: string with no transformation needed.
206 # :plain: string with no transformation needed.
207 gboptsmap = {'heads': 'nodes',
207 gboptsmap = {'heads': 'nodes',
208 'bookmarks': 'boolean',
208 'bookmarks': 'boolean',
209 'common': 'nodes',
209 'common': 'nodes',
210 'obsmarkers': 'boolean',
210 'obsmarkers': 'boolean',
211 'phases': 'boolean',
211 'phases': 'boolean',
212 'bundlecaps': 'scsv',
212 'bundlecaps': 'scsv',
213 'listkeys': 'csv',
213 'listkeys': 'csv',
214 'cg': 'boolean',
214 'cg': 'boolean',
215 'cbattempted': 'boolean'}
215 'cbattempted': 'boolean'}
216
216
217 # client side
217 # client side
218
218
219 class wirepeer(repository.legacypeer):
219 class wirepeer(repository.legacypeer):
220 """Client-side interface for communicating with a peer repository.
220 """Client-side interface for communicating with a peer repository.
221
221
222 Methods commonly call wire protocol commands of the same name.
222 Methods commonly call wire protocol commands of the same name.
223
223
224 See also httppeer.py and sshpeer.py for protocol-specific
224 See also httppeer.py and sshpeer.py for protocol-specific
225 implementations of this interface.
225 implementations of this interface.
226 """
226 """
227 # Begin of basewirepeer interface.
227 # Begin of basewirepeer interface.
228
228
229 def iterbatch(self):
229 def iterbatch(self):
230 return remoteiterbatcher(self)
230 return remoteiterbatcher(self)
231
231
232 @batchable
232 @batchable
233 def lookup(self, key):
233 def lookup(self, key):
234 self.requirecap('lookup', _('look up remote revision'))
234 self.requirecap('lookup', _('look up remote revision'))
235 f = future()
235 f = future()
236 yield {'key': encoding.fromlocal(key)}, f
236 yield {'key': encoding.fromlocal(key)}, f
237 d = f.value
237 d = f.value
238 success, data = d[:-1].split(" ", 1)
238 success, data = d[:-1].split(" ", 1)
239 if int(success):
239 if int(success):
240 yield bin(data)
240 yield bin(data)
241 else:
241 else:
242 self._abort(error.RepoError(data))
242 self._abort(error.RepoError(data))
243
243
244 @batchable
244 @batchable
245 def heads(self):
245 def heads(self):
246 f = future()
246 f = future()
247 yield {}, f
247 yield {}, f
248 d = f.value
248 d = f.value
249 try:
249 try:
250 yield decodelist(d[:-1])
250 yield decodelist(d[:-1])
251 except ValueError:
251 except ValueError:
252 self._abort(error.ResponseError(_("unexpected response:"), d))
252 self._abort(error.ResponseError(_("unexpected response:"), d))
253
253
254 @batchable
254 @batchable
255 def known(self, nodes):
255 def known(self, nodes):
256 f = future()
256 f = future()
257 yield {'nodes': encodelist(nodes)}, f
257 yield {'nodes': encodelist(nodes)}, f
258 d = f.value
258 d = f.value
259 try:
259 try:
260 yield [bool(int(b)) for b in d]
260 yield [bool(int(b)) for b in d]
261 except ValueError:
261 except ValueError:
262 self._abort(error.ResponseError(_("unexpected response:"), d))
262 self._abort(error.ResponseError(_("unexpected response:"), d))
263
263
264 @batchable
264 @batchable
265 def branchmap(self):
265 def branchmap(self):
266 f = future()
266 f = future()
267 yield {}, f
267 yield {}, f
268 d = f.value
268 d = f.value
269 try:
269 try:
270 branchmap = {}
270 branchmap = {}
271 for branchpart in d.splitlines():
271 for branchpart in d.splitlines():
272 branchname, branchheads = branchpart.split(' ', 1)
272 branchname, branchheads = branchpart.split(' ', 1)
273 branchname = encoding.tolocal(urlreq.unquote(branchname))
273 branchname = encoding.tolocal(urlreq.unquote(branchname))
274 branchheads = decodelist(branchheads)
274 branchheads = decodelist(branchheads)
275 branchmap[branchname] = branchheads
275 branchmap[branchname] = branchheads
276 yield branchmap
276 yield branchmap
277 except TypeError:
277 except TypeError:
278 self._abort(error.ResponseError(_("unexpected response:"), d))
278 self._abort(error.ResponseError(_("unexpected response:"), d))
279
279
280 @batchable
280 @batchable
281 def listkeys(self, namespace):
281 def listkeys(self, namespace):
282 if not self.capable('pushkey'):
282 if not self.capable('pushkey'):
283 yield {}, None
283 yield {}, None
284 f = future()
284 f = future()
285 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
285 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
286 yield {'namespace': encoding.fromlocal(namespace)}, f
286 yield {'namespace': encoding.fromlocal(namespace)}, f
287 d = f.value
287 d = f.value
288 self.ui.debug('received listkey for "%s": %i bytes\n'
288 self.ui.debug('received listkey for "%s": %i bytes\n'
289 % (namespace, len(d)))
289 % (namespace, len(d)))
290 yield pushkeymod.decodekeys(d)
290 yield pushkeymod.decodekeys(d)
291
291
292 @batchable
292 @batchable
293 def pushkey(self, namespace, key, old, new):
293 def pushkey(self, namespace, key, old, new):
294 if not self.capable('pushkey'):
294 if not self.capable('pushkey'):
295 yield False, None
295 yield False, None
296 f = future()
296 f = future()
297 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
297 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
298 yield {'namespace': encoding.fromlocal(namespace),
298 yield {'namespace': encoding.fromlocal(namespace),
299 'key': encoding.fromlocal(key),
299 'key': encoding.fromlocal(key),
300 'old': encoding.fromlocal(old),
300 'old': encoding.fromlocal(old),
301 'new': encoding.fromlocal(new)}, f
301 'new': encoding.fromlocal(new)}, f
302 d = f.value
302 d = f.value
303 d, output = d.split('\n', 1)
303 d, output = d.split('\n', 1)
304 try:
304 try:
305 d = bool(int(d))
305 d = bool(int(d))
306 except ValueError:
306 except ValueError:
307 raise error.ResponseError(
307 raise error.ResponseError(
308 _('push failed (unexpected response):'), d)
308 _('push failed (unexpected response):'), d)
309 for l in output.splitlines(True):
309 for l in output.splitlines(True):
310 self.ui.status(_('remote: '), l)
310 self.ui.status(_('remote: '), l)
311 yield d
311 yield d
312
312
313 def stream_out(self):
313 def stream_out(self):
314 return self._callstream('stream_out')
314 return self._callstream('stream_out')
315
315
316 def getbundle(self, source, **kwargs):
316 def getbundle(self, source, **kwargs):
317 kwargs = pycompat.byteskwargs(kwargs)
317 kwargs = pycompat.byteskwargs(kwargs)
318 self.requirecap('getbundle', _('look up remote changes'))
318 self.requirecap('getbundle', _('look up remote changes'))
319 opts = {}
319 opts = {}
320 bundlecaps = kwargs.get('bundlecaps')
320 bundlecaps = kwargs.get('bundlecaps')
321 if bundlecaps is not None:
321 if bundlecaps is not None:
322 kwargs['bundlecaps'] = sorted(bundlecaps)
322 kwargs['bundlecaps'] = sorted(bundlecaps)
323 else:
323 else:
324 bundlecaps = () # kwargs could have it to None
324 bundlecaps = () # kwargs could have it to None
325 for key, value in kwargs.iteritems():
325 for key, value in kwargs.iteritems():
326 if value is None:
326 if value is None:
327 continue
327 continue
328 keytype = gboptsmap.get(key)
328 keytype = gboptsmap.get(key)
329 if keytype is None:
329 if keytype is None:
330 raise error.ProgrammingError(
330 raise error.ProgrammingError(
331 'Unexpectedly None keytype for key %s' % key)
331 'Unexpectedly None keytype for key %s' % key)
332 elif keytype == 'nodes':
332 elif keytype == 'nodes':
333 value = encodelist(value)
333 value = encodelist(value)
334 elif keytype in ('csv', 'scsv'):
334 elif keytype in ('csv', 'scsv'):
335 value = ','.join(value)
335 value = ','.join(value)
336 elif keytype == 'boolean':
336 elif keytype == 'boolean':
337 value = '%i' % bool(value)
337 value = '%i' % bool(value)
338 elif keytype != 'plain':
338 elif keytype != 'plain':
339 raise KeyError('unknown getbundle option type %s'
339 raise KeyError('unknown getbundle option type %s'
340 % keytype)
340 % keytype)
341 opts[key] = value
341 opts[key] = value
342 f = self._callcompressable("getbundle", **pycompat.strkwargs(opts))
342 f = self._callcompressable("getbundle", **pycompat.strkwargs(opts))
343 if any((cap.startswith('HG2') for cap in bundlecaps)):
343 if any((cap.startswith('HG2') for cap in bundlecaps)):
344 return bundle2.getunbundler(self.ui, f)
344 return bundle2.getunbundler(self.ui, f)
345 else:
345 else:
346 return changegroupmod.cg1unpacker(f, 'UN')
346 return changegroupmod.cg1unpacker(f, 'UN')
347
347
348 def unbundle(self, cg, heads, url):
348 def unbundle(self, cg, heads, url):
349 '''Send cg (a readable file-like object representing the
349 '''Send cg (a readable file-like object representing the
350 changegroup to push, typically a chunkbuffer object) to the
350 changegroup to push, typically a chunkbuffer object) to the
351 remote server as a bundle.
351 remote server as a bundle.
352
352
353 When pushing a bundle10 stream, return an integer indicating the
353 When pushing a bundle10 stream, return an integer indicating the
354 result of the push (see changegroup.apply()).
354 result of the push (see changegroup.apply()).
355
355
356 When pushing a bundle20 stream, return a bundle20 stream.
356 When pushing a bundle20 stream, return a bundle20 stream.
357
357
358 `url` is the url the client thinks it's pushing to, which is
358 `url` is the url the client thinks it's pushing to, which is
359 visible to hooks.
359 visible to hooks.
360 '''
360 '''
361
361
362 if heads != ['force'] and self.capable('unbundlehash'):
362 if heads != ['force'] and self.capable('unbundlehash'):
363 heads = encodelist(['hashed',
363 heads = encodelist(['hashed',
364 hashlib.sha1(''.join(sorted(heads))).digest()])
364 hashlib.sha1(''.join(sorted(heads))).digest()])
365 else:
365 else:
366 heads = encodelist(heads)
366 heads = encodelist(heads)
367
367
368 if util.safehasattr(cg, 'deltaheader'):
368 if util.safehasattr(cg, 'deltaheader'):
369 # this a bundle10, do the old style call sequence
369 # this a bundle10, do the old style call sequence
370 ret, output = self._callpush("unbundle", cg, heads=heads)
370 ret, output = self._callpush("unbundle", cg, heads=heads)
371 if ret == "":
371 if ret == "":
372 raise error.ResponseError(
372 raise error.ResponseError(
373 _('push failed:'), output)
373 _('push failed:'), output)
374 try:
374 try:
375 ret = int(ret)
375 ret = int(ret)
376 except ValueError:
376 except ValueError:
377 raise error.ResponseError(
377 raise error.ResponseError(
378 _('push failed (unexpected response):'), ret)
378 _('push failed (unexpected response):'), ret)
379
379
380 for l in output.splitlines(True):
380 for l in output.splitlines(True):
381 self.ui.status(_('remote: '), l)
381 self.ui.status(_('remote: '), l)
382 else:
382 else:
383 # bundle2 push. Send a stream, fetch a stream.
383 # bundle2 push. Send a stream, fetch a stream.
384 stream = self._calltwowaystream('unbundle', cg, heads=heads)
384 stream = self._calltwowaystream('unbundle', cg, heads=heads)
385 ret = bundle2.getunbundler(self.ui, stream)
385 ret = bundle2.getunbundler(self.ui, stream)
386 return ret
386 return ret
387
387
388 # End of basewirepeer interface.
388 # End of basewirepeer interface.
389
389
390 # Begin of baselegacywirepeer interface.
390 # Begin of baselegacywirepeer interface.
391
391
392 def branches(self, nodes):
392 def branches(self, nodes):
393 n = encodelist(nodes)
393 n = encodelist(nodes)
394 d = self._call("branches", nodes=n)
394 d = self._call("branches", nodes=n)
395 try:
395 try:
396 br = [tuple(decodelist(b)) for b in d.splitlines()]
396 br = [tuple(decodelist(b)) for b in d.splitlines()]
397 return br
397 return br
398 except ValueError:
398 except ValueError:
399 self._abort(error.ResponseError(_("unexpected response:"), d))
399 self._abort(error.ResponseError(_("unexpected response:"), d))
400
400
401 def between(self, pairs):
401 def between(self, pairs):
402 batch = 8 # avoid giant requests
402 batch = 8 # avoid giant requests
403 r = []
403 r = []
404 for i in xrange(0, len(pairs), batch):
404 for i in xrange(0, len(pairs), batch):
405 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
405 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
406 d = self._call("between", pairs=n)
406 d = self._call("between", pairs=n)
407 try:
407 try:
408 r.extend(l and decodelist(l) or [] for l in d.splitlines())
408 r.extend(l and decodelist(l) or [] for l in d.splitlines())
409 except ValueError:
409 except ValueError:
410 self._abort(error.ResponseError(_("unexpected response:"), d))
410 self._abort(error.ResponseError(_("unexpected response:"), d))
411 return r
411 return r
412
412
413 def changegroup(self, nodes, kind):
413 def changegroup(self, nodes, kind):
414 n = encodelist(nodes)
414 n = encodelist(nodes)
415 f = self._callcompressable("changegroup", roots=n)
415 f = self._callcompressable("changegroup", roots=n)
416 return changegroupmod.cg1unpacker(f, 'UN')
416 return changegroupmod.cg1unpacker(f, 'UN')
417
417
418 def changegroupsubset(self, bases, heads, kind):
418 def changegroupsubset(self, bases, heads, kind):
419 self.requirecap('changegroupsubset', _('look up remote changes'))
419 self.requirecap('changegroupsubset', _('look up remote changes'))
420 bases = encodelist(bases)
420 bases = encodelist(bases)
421 heads = encodelist(heads)
421 heads = encodelist(heads)
422 f = self._callcompressable("changegroupsubset",
422 f = self._callcompressable("changegroupsubset",
423 bases=bases, heads=heads)
423 bases=bases, heads=heads)
424 return changegroupmod.cg1unpacker(f, 'UN')
424 return changegroupmod.cg1unpacker(f, 'UN')
425
425
426 # End of baselegacywirepeer interface.
426 # End of baselegacywirepeer interface.
427
427
428 def _submitbatch(self, req):
428 def _submitbatch(self, req):
429 """run batch request <req> on the server
429 """run batch request <req> on the server
430
430
431 Returns an iterator of the raw responses from the server.
431 Returns an iterator of the raw responses from the server.
432 """
432 """
433 rsp = self._callstream("batch", cmds=encodebatchcmds(req))
433 rsp = self._callstream("batch", cmds=encodebatchcmds(req))
434 chunk = rsp.read(1024)
434 chunk = rsp.read(1024)
435 work = [chunk]
435 work = [chunk]
436 while chunk:
436 while chunk:
437 while ';' not in chunk and chunk:
437 while ';' not in chunk and chunk:
438 chunk = rsp.read(1024)
438 chunk = rsp.read(1024)
439 work.append(chunk)
439 work.append(chunk)
440 merged = ''.join(work)
440 merged = ''.join(work)
441 while ';' in merged:
441 while ';' in merged:
442 one, merged = merged.split(';', 1)
442 one, merged = merged.split(';', 1)
443 yield unescapearg(one)
443 yield unescapearg(one)
444 chunk = rsp.read(1024)
444 chunk = rsp.read(1024)
445 work = [merged, chunk]
445 work = [merged, chunk]
446 yield unescapearg(''.join(work))
446 yield unescapearg(''.join(work))
447
447
448 def _submitone(self, op, args):
448 def _submitone(self, op, args):
449 return self._call(op, **pycompat.strkwargs(args))
449 return self._call(op, **pycompat.strkwargs(args))
450
450
451 def debugwireargs(self, one, two, three=None, four=None, five=None):
451 def debugwireargs(self, one, two, three=None, four=None, five=None):
452 # don't pass optional arguments left at their default value
452 # don't pass optional arguments left at their default value
453 opts = {}
453 opts = {}
454 if three is not None:
454 if three is not None:
455 opts[r'three'] = three
455 opts[r'three'] = three
456 if four is not None:
456 if four is not None:
457 opts[r'four'] = four
457 opts[r'four'] = four
458 return self._call('debugwireargs', one=one, two=two, **opts)
458 return self._call('debugwireargs', one=one, two=two, **opts)
459
459
460 def _call(self, cmd, **args):
460 def _call(self, cmd, **args):
461 """execute <cmd> on the server
461 """execute <cmd> on the server
462
462
463 The command is expected to return a simple string.
463 The command is expected to return a simple string.
464
464
465 returns the server reply as a string."""
465 returns the server reply as a string."""
466 raise NotImplementedError()
466 raise NotImplementedError()
467
467
468 def _callstream(self, cmd, **args):
468 def _callstream(self, cmd, **args):
469 """execute <cmd> on the server
469 """execute <cmd> on the server
470
470
471 The command is expected to return a stream. Note that if the
471 The command is expected to return a stream. Note that if the
472 command doesn't return a stream, _callstream behaves
472 command doesn't return a stream, _callstream behaves
473 differently for ssh and http peers.
473 differently for ssh and http peers.
474
474
475 returns the server reply as a file like object.
475 returns the server reply as a file like object.
476 """
476 """
477 raise NotImplementedError()
477 raise NotImplementedError()
478
478
479 def _callcompressable(self, cmd, **args):
479 def _callcompressable(self, cmd, **args):
480 """execute <cmd> on the server
480 """execute <cmd> on the server
481
481
482 The command is expected to return a stream.
482 The command is expected to return a stream.
483
483
484 The stream may have been compressed in some implementations. This
484 The stream may have been compressed in some implementations. This
485 function takes care of the decompression. This is the only difference
485 function takes care of the decompression. This is the only difference
486 with _callstream.
486 with _callstream.
487
487
488 returns the server reply as a file like object.
488 returns the server reply as a file like object.
489 """
489 """
490 raise NotImplementedError()
490 raise NotImplementedError()
491
491
492 def _callpush(self, cmd, fp, **args):
492 def _callpush(self, cmd, fp, **args):
493 """execute a <cmd> on server
493 """execute a <cmd> on server
494
494
495 The command is expected to be related to a push. Push has a special
495 The command is expected to be related to a push. Push has a special
496 return method.
496 return method.
497
497
498 returns the server reply as a (ret, output) tuple. ret is either
498 returns the server reply as a (ret, output) tuple. ret is either
499 empty (error) or a stringified int.
499 empty (error) or a stringified int.
500 """
500 """
501 raise NotImplementedError()
501 raise NotImplementedError()
502
502
503 def _calltwowaystream(self, cmd, fp, **args):
503 def _calltwowaystream(self, cmd, fp, **args):
504 """execute <cmd> on server
504 """execute <cmd> on server
505
505
506 The command will send a stream to the server and get a stream in reply.
506 The command will send a stream to the server and get a stream in reply.
507 """
507 """
508 raise NotImplementedError()
508 raise NotImplementedError()
509
509
510 def _abort(self, exception):
510 def _abort(self, exception):
511 """clearly abort the wire protocol connection and raise the exception
511 """clearly abort the wire protocol connection and raise the exception
512 """
512 """
513 raise NotImplementedError()
513 raise NotImplementedError()
514
514
515 # server side
515 # server side
516
516
517 # wire protocol command can either return a string or one of these classes.
517 # wire protocol command can either return a string or one of these classes.
518 class streamres(object):
518 class streamres(object):
519 """wireproto reply: binary stream
519 """wireproto reply: binary stream
520
520
521 The call was successful and the result is a stream.
521 The call was successful and the result is a stream.
522
522
523 Accepts either a generator or an object with a ``read(size)`` method.
523 Accepts either a generator or an object with a ``read(size)`` method.
524
524
525 ``v1compressible`` indicates whether this data can be compressed to
525 ``v1compressible`` indicates whether this data can be compressed to
526 "version 1" clients (technically: HTTP peers using
526 "version 1" clients (technically: HTTP peers using
527 application/mercurial-0.1 media type). This flag should NOT be used on
527 application/mercurial-0.1 media type). This flag should NOT be used on
528 new commands because new clients should support a more modern compression
528 new commands because new clients should support a more modern compression
529 mechanism.
529 mechanism.
530 """
530 """
531 def __init__(self, gen=None, reader=None, v1compressible=False):
531 def __init__(self, gen=None, reader=None, v1compressible=False):
532 self.gen = gen
532 self.gen = gen
533 self.reader = reader
533 self.reader = reader
534 self.v1compressible = v1compressible
534 self.v1compressible = v1compressible
535
535
536 class pushres(object):
536 class pushres(object):
537 """wireproto reply: success with simple integer return
537 """wireproto reply: success with simple integer return
538
538
539 The call was successful and returned an integer contained in `self.res`.
539 The call was successful and returned an integer contained in `self.res`.
540 """
540 """
541 def __init__(self, res):
541 def __init__(self, res):
542 self.res = res
542 self.res = res
543
543
544 class pusherr(object):
544 class pusherr(object):
545 """wireproto reply: failure
545 """wireproto reply: failure
546
546
547 The call failed. The `self.res` attribute contains the error message.
547 The call failed. The `self.res` attribute contains the error message.
548 """
548 """
549 def __init__(self, res):
549 def __init__(self, res):
550 self.res = res
550 self.res = res
551
551
552 class ooberror(object):
552 class ooberror(object):
553 """wireproto reply: failure of a batch of operation
553 """wireproto reply: failure of a batch of operation
554
554
555 Something failed during a batch call. The error message is stored in
555 Something failed during a batch call. The error message is stored in
556 `self.message`.
556 `self.message`.
557 """
557 """
558 def __init__(self, message):
558 def __init__(self, message):
559 self.message = message
559 self.message = message
560
560
561 def getdispatchrepo(repo, proto, command):
561 def getdispatchrepo(repo, proto, command):
562 """Obtain the repo used for processing wire protocol commands.
562 """Obtain the repo used for processing wire protocol commands.
563
563
564 The intent of this function is to serve as a monkeypatch point for
564 The intent of this function is to serve as a monkeypatch point for
565 extensions that need commands to operate on different repo views under
565 extensions that need commands to operate on different repo views under
566 specialized circumstances.
566 specialized circumstances.
567 """
567 """
568 return repo.filtered('served')
568 return repo.filtered('served')
569
569
570 def dispatch(repo, proto, command):
570 def dispatch(repo, proto, command):
571 repo = getdispatchrepo(repo, proto, command)
571 repo = getdispatchrepo(repo, proto, command)
572 func, spec = commands[command]
572 func, spec = commands[command]
573 args = proto.getargs(spec)
573 args = proto.getargs(spec)
574 return func(repo, proto, *args)
574 return func(repo, proto, *args)
575
575
576 def options(cmd, keys, others):
576 def options(cmd, keys, others):
577 opts = {}
577 opts = {}
578 for k in keys:
578 for k in keys:
579 if k in others:
579 if k in others:
580 opts[k] = others[k]
580 opts[k] = others[k]
581 del others[k]
581 del others[k]
582 if others:
582 if others:
583 util.stderr.write("warning: %s ignored unexpected arguments %s\n"
583 util.stderr.write("warning: %s ignored unexpected arguments %s\n"
584 % (cmd, ",".join(others)))
584 % (cmd, ",".join(others)))
585 return opts
585 return opts
586
586
587 def bundle1allowed(repo, action):
587 def bundle1allowed(repo, action):
588 """Whether a bundle1 operation is allowed from the server.
588 """Whether a bundle1 operation is allowed from the server.
589
589
590 Priority is:
590 Priority is:
591
591
592 1. server.bundle1gd.<action> (if generaldelta active)
592 1. server.bundle1gd.<action> (if generaldelta active)
593 2. server.bundle1.<action>
593 2. server.bundle1.<action>
594 3. server.bundle1gd (if generaldelta active)
594 3. server.bundle1gd (if generaldelta active)
595 4. server.bundle1
595 4. server.bundle1
596 """
596 """
597 ui = repo.ui
597 ui = repo.ui
598 gd = 'generaldelta' in repo.requirements
598 gd = 'generaldelta' in repo.requirements
599
599
600 if gd:
600 if gd:
601 v = ui.configbool('server', 'bundle1gd.%s' % action)
601 v = ui.configbool('server', 'bundle1gd.%s' % action)
602 if v is not None:
602 if v is not None:
603 return v
603 return v
604
604
605 v = ui.configbool('server', 'bundle1.%s' % action)
605 v = ui.configbool('server', 'bundle1.%s' % action)
606 if v is not None:
606 if v is not None:
607 return v
607 return v
608
608
609 if gd:
609 if gd:
610 v = ui.configbool('server', 'bundle1gd')
610 v = ui.configbool('server', 'bundle1gd')
611 if v is not None:
611 if v is not None:
612 return v
612 return v
613
613
614 return ui.configbool('server', 'bundle1')
614 return ui.configbool('server', 'bundle1')
615
615
616 def supportedcompengines(ui, proto, role):
616 def supportedcompengines(ui, proto, role):
617 """Obtain the list of supported compression engines for a request."""
617 """Obtain the list of supported compression engines for a request."""
618 assert role in (util.CLIENTROLE, util.SERVERROLE)
618 assert role in (util.CLIENTROLE, util.SERVERROLE)
619
619
620 compengines = util.compengines.supportedwireengines(role)
620 compengines = util.compengines.supportedwireengines(role)
621
621
622 # Allow config to override default list and ordering.
622 # Allow config to override default list and ordering.
623 if role == util.SERVERROLE:
623 if role == util.SERVERROLE:
624 configengines = ui.configlist('server', 'compressionengines')
624 configengines = ui.configlist('server', 'compressionengines')
625 config = 'server.compressionengines'
625 config = 'server.compressionengines'
626 else:
626 else:
627 # This is currently implemented mainly to facilitate testing. In most
627 # This is currently implemented mainly to facilitate testing. In most
628 # cases, the server should be in charge of choosing a compression engine
628 # cases, the server should be in charge of choosing a compression engine
629 # because a server has the most to lose from a sub-optimal choice. (e.g.
629 # because a server has the most to lose from a sub-optimal choice. (e.g.
630 # CPU DoS due to an expensive engine or a network DoS due to poor
630 # CPU DoS due to an expensive engine or a network DoS due to poor
631 # compression ratio).
631 # compression ratio).
632 configengines = ui.configlist('experimental',
632 configengines = ui.configlist('experimental',
633 'clientcompressionengines')
633 'clientcompressionengines')
634 config = 'experimental.clientcompressionengines'
634 config = 'experimental.clientcompressionengines'
635
635
636 # No explicit config. Filter out the ones that aren't supposed to be
636 # No explicit config. Filter out the ones that aren't supposed to be
637 # advertised and return default ordering.
637 # advertised and return default ordering.
638 if not configengines:
638 if not configengines:
639 attr = 'serverpriority' if role == util.SERVERROLE else 'clientpriority'
639 attr = 'serverpriority' if role == util.SERVERROLE else 'clientpriority'
640 return [e for e in compengines
640 return [e for e in compengines
641 if getattr(e.wireprotosupport(), attr) > 0]
641 if getattr(e.wireprotosupport(), attr) > 0]
642
642
643 # If compression engines are listed in the config, assume there is a good
643 # If compression engines are listed in the config, assume there is a good
644 # reason for it (like server operators wanting to achieve specific
644 # reason for it (like server operators wanting to achieve specific
645 # performance characteristics). So fail fast if the config references
645 # performance characteristics). So fail fast if the config references
646 # unusable compression engines.
646 # unusable compression engines.
647 validnames = set(e.name() for e in compengines)
647 validnames = set(e.name() for e in compengines)
648 invalidnames = set(e for e in configengines if e not in validnames)
648 invalidnames = set(e for e in configengines if e not in validnames)
649 if invalidnames:
649 if invalidnames:
650 raise error.Abort(_('invalid compression engine defined in %s: %s') %
650 raise error.Abort(_('invalid compression engine defined in %s: %s') %
651 (config, ', '.join(sorted(invalidnames))))
651 (config, ', '.join(sorted(invalidnames))))
652
652
653 compengines = [e for e in compengines if e.name() in configengines]
653 compengines = [e for e in compengines if e.name() in configengines]
654 compengines = sorted(compengines,
654 compengines = sorted(compengines,
655 key=lambda e: configengines.index(e.name()))
655 key=lambda e: configengines.index(e.name()))
656
656
657 if not compengines:
657 if not compengines:
658 raise error.Abort(_('%s config option does not specify any known '
658 raise error.Abort(_('%s config option does not specify any known '
659 'compression engines') % config,
659 'compression engines') % config,
660 hint=_('usable compression engines: %s') %
660 hint=_('usable compression engines: %s') %
661 ', '.sorted(validnames))
661 ', '.sorted(validnames))
662
662
663 return compengines
663 return compengines
664
664
665 # list of commands
665 # list of commands
666 commands = {}
666 commands = {}
667
667
668 def wireprotocommand(name, args=''):
668 def wireprotocommand(name, args=''):
669 """decorator for wire protocol command"""
669 """decorator for wire protocol command"""
670 def register(func):
670 def register(func):
671 commands[name] = (func, args)
671 commands[name] = (func, args)
672 return func
672 return func
673 return register
673 return register
674
674
675 @wireprotocommand('batch', 'cmds *')
675 @wireprotocommand('batch', 'cmds *')
676 def batch(repo, proto, cmds, others):
676 def batch(repo, proto, cmds, others):
677 repo = repo.filtered("served")
677 repo = repo.filtered("served")
678 res = []
678 res = []
679 for pair in cmds.split(';'):
679 for pair in cmds.split(';'):
680 op, args = pair.split(' ', 1)
680 op, args = pair.split(' ', 1)
681 vals = {}
681 vals = {}
682 for a in args.split(','):
682 for a in args.split(','):
683 if a:
683 if a:
684 n, v = a.split('=')
684 n, v = a.split('=')
685 vals[unescapearg(n)] = unescapearg(v)
685 vals[unescapearg(n)] = unescapearg(v)
686 func, spec = commands[op]
686 func, spec = commands[op]
687 if spec:
687 if spec:
688 keys = spec.split()
688 keys = spec.split()
689 data = {}
689 data = {}
690 for k in keys:
690 for k in keys:
691 if k == '*':
691 if k == '*':
692 star = {}
692 star = {}
693 for key in vals.keys():
693 for key in vals.keys():
694 if key not in keys:
694 if key not in keys:
695 star[key] = vals[key]
695 star[key] = vals[key]
696 data['*'] = star
696 data['*'] = star
697 else:
697 else:
698 data[k] = vals[k]
698 data[k] = vals[k]
699 result = func(repo, proto, *[data[k] for k in keys])
699 result = func(repo, proto, *[data[k] for k in keys])
700 else:
700 else:
701 result = func(repo, proto)
701 result = func(repo, proto)
702 if isinstance(result, ooberror):
702 if isinstance(result, ooberror):
703 return result
703 return result
704 res.append(escapearg(result))
704 res.append(escapearg(result))
705 return ';'.join(res)
705 return ';'.join(res)
706
706
707 @wireprotocommand('between', 'pairs')
707 @wireprotocommand('between', 'pairs')
708 def between(repo, proto, pairs):
708 def between(repo, proto, pairs):
709 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
709 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
710 r = []
710 r = []
711 for b in repo.between(pairs):
711 for b in repo.between(pairs):
712 r.append(encodelist(b) + "\n")
712 r.append(encodelist(b) + "\n")
713 return "".join(r)
713 return "".join(r)
714
714
715 @wireprotocommand('branchmap')
715 @wireprotocommand('branchmap')
716 def branchmap(repo, proto):
716 def branchmap(repo, proto):
717 branchmap = repo.branchmap()
717 branchmap = repo.branchmap()
718 heads = []
718 heads = []
719 for branch, nodes in branchmap.iteritems():
719 for branch, nodes in branchmap.iteritems():
720 branchname = urlreq.quote(encoding.fromlocal(branch))
720 branchname = urlreq.quote(encoding.fromlocal(branch))
721 branchnodes = encodelist(nodes)
721 branchnodes = encodelist(nodes)
722 heads.append('%s %s' % (branchname, branchnodes))
722 heads.append('%s %s' % (branchname, branchnodes))
723 return '\n'.join(heads)
723 return '\n'.join(heads)
724
724
725 @wireprotocommand('branches', 'nodes')
725 @wireprotocommand('branches', 'nodes')
726 def branches(repo, proto, nodes):
726 def branches(repo, proto, nodes):
727 nodes = decodelist(nodes)
727 nodes = decodelist(nodes)
728 r = []
728 r = []
729 for b in repo.branches(nodes):
729 for b in repo.branches(nodes):
730 r.append(encodelist(b) + "\n")
730 r.append(encodelist(b) + "\n")
731 return "".join(r)
731 return "".join(r)
732
732
733 @wireprotocommand('clonebundles', '')
733 @wireprotocommand('clonebundles', '')
734 def clonebundles(repo, proto):
734 def clonebundles(repo, proto):
735 """Server command for returning info for available bundles to seed clones.
735 """Server command for returning info for available bundles to seed clones.
736
736
737 Clients will parse this response and determine what bundle to fetch.
737 Clients will parse this response and determine what bundle to fetch.
738
738
739 Extensions may wrap this command to filter or dynamically emit data
739 Extensions may wrap this command to filter or dynamically emit data
740 depending on the request. e.g. you could advertise URLs for the closest
740 depending on the request. e.g. you could advertise URLs for the closest
741 data center given the client's IP address.
741 data center given the client's IP address.
742 """
742 """
743 return repo.vfs.tryread('clonebundles.manifest')
743 return repo.vfs.tryread('clonebundles.manifest')
744
744
745 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
745 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
746 'known', 'getbundle', 'unbundlehash', 'batch']
746 'known', 'getbundle', 'unbundlehash', 'batch']
747
747
748 def _capabilities(repo, proto):
748 def _capabilities(repo, proto):
749 """return a list of capabilities for a repo
749 """return a list of capabilities for a repo
750
750
751 This function exists to allow extensions to easily wrap capabilities
751 This function exists to allow extensions to easily wrap capabilities
752 computation
752 computation
753
753
754 - returns a lists: easy to alter
754 - returns a lists: easy to alter
755 - change done here will be propagated to both `capabilities` and `hello`
755 - change done here will be propagated to both `capabilities` and `hello`
756 command without any other action needed.
756 command without any other action needed.
757 """
757 """
758 # copy to prevent modification of the global list
758 # copy to prevent modification of the global list
759 caps = list(wireprotocaps)
759 caps = list(wireprotocaps)
760 if streamclone.allowservergeneration(repo):
760 if streamclone.allowservergeneration(repo):
761 if repo.ui.configbool('server', 'preferuncompressed'):
761 if repo.ui.configbool('server', 'preferuncompressed'):
762 caps.append('stream-preferred')
762 caps.append('stream-preferred')
763 requiredformats = repo.requirements & repo.supportedformats
763 requiredformats = repo.requirements & repo.supportedformats
764 # if our local revlogs are just revlogv1, add 'stream' cap
764 # if our local revlogs are just revlogv1, add 'stream' cap
765 if not requiredformats - {'revlogv1'}:
765 if not requiredformats - {'revlogv1'}:
766 caps.append('stream')
766 caps.append('stream')
767 # otherwise, add 'streamreqs' detailing our local revlog format
767 # otherwise, add 'streamreqs' detailing our local revlog format
768 else:
768 else:
769 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
769 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
770 if repo.ui.configbool('experimental', 'bundle2-advertise'):
770 if repo.ui.configbool('experimental', 'bundle2-advertise'):
771 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
771 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
772 caps.append('bundle2=' + urlreq.quote(capsblob))
772 caps.append('bundle2=' + urlreq.quote(capsblob))
773 caps.append('unbundle=%s' % ','.join(bundle2.bundlepriority))
773 caps.append('unbundle=%s' % ','.join(bundle2.bundlepriority))
774
774
775 if proto.name == 'http':
775 if proto.name == 'http':
776 caps.append('httpheader=%d' %
776 caps.append('httpheader=%d' %
777 repo.ui.configint('server', 'maxhttpheaderlen'))
777 repo.ui.configint('server', 'maxhttpheaderlen'))
778 if repo.ui.configbool('experimental', 'httppostargs'):
778 if repo.ui.configbool('experimental', 'httppostargs'):
779 caps.append('httppostargs')
779 caps.append('httppostargs')
780
780
781 # FUTURE advertise 0.2rx once support is implemented
781 # FUTURE advertise 0.2rx once support is implemented
782 # FUTURE advertise minrx and mintx after consulting config option
782 # FUTURE advertise minrx and mintx after consulting config option
783 caps.append('httpmediatype=0.1rx,0.1tx,0.2tx')
783 caps.append('httpmediatype=0.1rx,0.1tx,0.2tx')
784
784
785 compengines = supportedcompengines(repo.ui, proto, util.SERVERROLE)
785 compengines = supportedcompengines(repo.ui, proto, util.SERVERROLE)
786 if compengines:
786 if compengines:
787 comptypes = ','.join(urlreq.quote(e.wireprotosupport().name)
787 comptypes = ','.join(urlreq.quote(e.wireprotosupport().name)
788 for e in compengines)
788 for e in compengines)
789 caps.append('compression=%s' % comptypes)
789 caps.append('compression=%s' % comptypes)
790
790
791 return caps
791 return caps
792
792
793 # If you are writing an extension and consider wrapping this function. Wrap
793 # If you are writing an extension and consider wrapping this function. Wrap
794 # `_capabilities` instead.
794 # `_capabilities` instead.
795 @wireprotocommand('capabilities')
795 @wireprotocommand('capabilities')
796 def capabilities(repo, proto):
796 def capabilities(repo, proto):
797 return ' '.join(_capabilities(repo, proto))
797 return ' '.join(_capabilities(repo, proto))
798
798
799 @wireprotocommand('changegroup', 'roots')
799 @wireprotocommand('changegroup', 'roots')
800 def changegroup(repo, proto, roots):
800 def changegroup(repo, proto, roots):
801 nodes = decodelist(roots)
801 nodes = decodelist(roots)
802 outgoing = discovery.outgoing(repo, missingroots=nodes,
802 outgoing = discovery.outgoing(repo, missingroots=nodes,
803 missingheads=repo.heads())
803 missingheads=repo.heads())
804 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
804 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
805 return streamres(reader=cg, v1compressible=True)
805 return streamres(reader=cg, v1compressible=True)
806
806
807 @wireprotocommand('changegroupsubset', 'bases heads')
807 @wireprotocommand('changegroupsubset', 'bases heads')
808 def changegroupsubset(repo, proto, bases, heads):
808 def changegroupsubset(repo, proto, bases, heads):
809 bases = decodelist(bases)
809 bases = decodelist(bases)
810 heads = decodelist(heads)
810 heads = decodelist(heads)
811 outgoing = discovery.outgoing(repo, missingroots=bases,
811 outgoing = discovery.outgoing(repo, missingroots=bases,
812 missingheads=heads)
812 missingheads=heads)
813 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
813 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
814 return streamres(reader=cg, v1compressible=True)
814 return streamres(reader=cg, v1compressible=True)
815
815
816 @wireprotocommand('debugwireargs', 'one two *')
816 @wireprotocommand('debugwireargs', 'one two *')
817 def debugwireargs(repo, proto, one, two, others):
817 def debugwireargs(repo, proto, one, two, others):
818 # only accept optional args from the known set
818 # only accept optional args from the known set
819 opts = options('debugwireargs', ['three', 'four'], others)
819 opts = options('debugwireargs', ['three', 'four'], others)
820 return repo.debugwireargs(one, two, **pycompat.strkwargs(opts))
820 return repo.debugwireargs(one, two, **pycompat.strkwargs(opts))
821
821
822 @wireprotocommand('getbundle', '*')
822 @wireprotocommand('getbundle', '*')
823 def getbundle(repo, proto, others):
823 def getbundle(repo, proto, others):
824 opts = options('getbundle', gboptsmap.keys(), others)
824 opts = options('getbundle', gboptsmap.keys(), others)
825 for k, v in opts.iteritems():
825 for k, v in opts.iteritems():
826 keytype = gboptsmap[k]
826 keytype = gboptsmap[k]
827 if keytype == 'nodes':
827 if keytype == 'nodes':
828 opts[k] = decodelist(v)
828 opts[k] = decodelist(v)
829 elif keytype == 'csv':
829 elif keytype == 'csv':
830 opts[k] = list(v.split(','))
830 opts[k] = list(v.split(','))
831 elif keytype == 'scsv':
831 elif keytype == 'scsv':
832 opts[k] = set(v.split(','))
832 opts[k] = set(v.split(','))
833 elif keytype == 'boolean':
833 elif keytype == 'boolean':
834 # Client should serialize False as '0', which is a non-empty string
834 # Client should serialize False as '0', which is a non-empty string
835 # so it evaluates as a True bool.
835 # so it evaluates as a True bool.
836 if v == '0':
836 if v == '0':
837 opts[k] = False
837 opts[k] = False
838 else:
838 else:
839 opts[k] = bool(v)
839 opts[k] = bool(v)
840 elif keytype != 'plain':
840 elif keytype != 'plain':
841 raise KeyError('unknown getbundle option type %s'
841 raise KeyError('unknown getbundle option type %s'
842 % keytype)
842 % keytype)
843
843
844 if not bundle1allowed(repo, 'pull'):
844 if not bundle1allowed(repo, 'pull'):
845 if not exchange.bundle2requested(opts.get('bundlecaps')):
845 if not exchange.bundle2requested(opts.get('bundlecaps')):
846 if proto.name == 'http':
846 if proto.name == 'http':
847 return ooberror(bundle2required)
847 return ooberror(bundle2required)
848 raise error.Abort(bundle2requiredmain,
848 raise error.Abort(bundle2requiredmain,
849 hint=bundle2requiredhint)
849 hint=bundle2requiredhint)
850
850
851 try:
851 try:
852 if repo.ui.configbool('server', 'disablefullbundle'):
852 if repo.ui.configbool('server', 'disablefullbundle'):
853 # Check to see if this is a full clone.
853 # Check to see if this is a full clone.
854 clheads = set(repo.changelog.heads())
854 clheads = set(repo.changelog.heads())
855 heads = set(opts.get('heads', set()))
855 heads = set(opts.get('heads', set()))
856 common = set(opts.get('common', set()))
856 common = set(opts.get('common', set()))
857 common.discard(nullid)
857 common.discard(nullid)
858 if not common and clheads == heads:
858 if not common and clheads == heads:
859 raise error.Abort(
859 raise error.Abort(
860 _('server has pull-based clones disabled'),
860 _('server has pull-based clones disabled'),
861 hint=_('remove --pull if specified or upgrade Mercurial'))
861 hint=_('remove --pull if specified or upgrade Mercurial'))
862
862
863 chunks = exchange.getbundlechunks(repo, 'serve',
863 chunks = exchange.getbundlechunks(repo, 'serve',
864 **pycompat.strkwargs(opts))
864 **pycompat.strkwargs(opts))
865 except error.Abort as exc:
865 except error.Abort as exc:
866 # cleanly forward Abort error to the client
866 # cleanly forward Abort error to the client
867 if not exchange.bundle2requested(opts.get('bundlecaps')):
867 if not exchange.bundle2requested(opts.get('bundlecaps')):
868 if proto.name == 'http':
868 if proto.name == 'http':
869 return ooberror(str(exc) + '\n')
869 return ooberror(str(exc) + '\n')
870 raise # cannot do better for bundle1 + ssh
870 raise # cannot do better for bundle1 + ssh
871 # bundle2 request expect a bundle2 reply
871 # bundle2 request expect a bundle2 reply
872 bundler = bundle2.bundle20(repo.ui)
872 bundler = bundle2.bundle20(repo.ui)
873 manargs = [('message', str(exc))]
873 manargs = [('message', str(exc))]
874 advargs = []
874 advargs = []
875 if exc.hint is not None:
875 if exc.hint is not None:
876 advargs.append(('hint', exc.hint))
876 advargs.append(('hint', exc.hint))
877 bundler.addpart(bundle2.bundlepart('error:abort',
877 bundler.addpart(bundle2.bundlepart('error:abort',
878 manargs, advargs))
878 manargs, advargs))
879 return streamres(gen=bundler.getchunks(), v1compressible=True)
879 return streamres(gen=bundler.getchunks(), v1compressible=True)
880 return streamres(gen=chunks, v1compressible=True)
880 return streamres(gen=chunks, v1compressible=True)
881
881
882 @wireprotocommand('heads')
882 @wireprotocommand('heads')
883 def heads(repo, proto):
883 def heads(repo, proto):
884 h = repo.heads()
884 h = repo.heads()
885 return encodelist(h) + "\n"
885 return encodelist(h) + "\n"
886
886
887 @wireprotocommand('hello')
887 @wireprotocommand('hello')
888 def hello(repo, proto):
888 def hello(repo, proto):
889 '''the hello command returns a set of lines describing various
889 '''the hello command returns a set of lines describing various
890 interesting things about the server, in an RFC822-like format.
890 interesting things about the server, in an RFC822-like format.
891 Currently the only one defined is "capabilities", which
891 Currently the only one defined is "capabilities", which
892 consists of a line in the form:
892 consists of a line in the form:
893
893
894 capabilities: space separated list of tokens
894 capabilities: space separated list of tokens
895 '''
895 '''
896 return "capabilities: %s\n" % (capabilities(repo, proto))
896 return "capabilities: %s\n" % (capabilities(repo, proto))
897
897
898 @wireprotocommand('listkeys', 'namespace')
898 @wireprotocommand('listkeys', 'namespace')
899 def listkeys(repo, proto, namespace):
899 def listkeys(repo, proto, namespace):
900 d = repo.listkeys(encoding.tolocal(namespace)).items()
900 d = repo.listkeys(encoding.tolocal(namespace)).items()
901 return pushkeymod.encodekeys(d)
901 return pushkeymod.encodekeys(d)
902
902
903 @wireprotocommand('lookup', 'key')
903 @wireprotocommand('lookup', 'key')
904 def lookup(repo, proto, key):
904 def lookup(repo, proto, key):
905 try:
905 try:
906 k = encoding.tolocal(key)
906 k = encoding.tolocal(key)
907 c = repo[k]
907 c = repo[k]
908 r = c.hex()
908 r = c.hex()
909 success = 1
909 success = 1
910 except Exception as inst:
910 except Exception as inst:
911 r = str(inst)
911 r = str(inst)
912 success = 0
912 success = 0
913 return "%d %s\n" % (success, r)
913 return "%d %s\n" % (success, r)
914
914
915 @wireprotocommand('known', 'nodes *')
915 @wireprotocommand('known', 'nodes *')
916 def known(repo, proto, nodes, others):
916 def known(repo, proto, nodes, others):
917 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
917 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
918
918
919 @wireprotocommand('pushkey', 'namespace key old new')
919 @wireprotocommand('pushkey', 'namespace key old new')
920 def pushkey(repo, proto, namespace, key, old, new):
920 def pushkey(repo, proto, namespace, key, old, new):
921 # compatibility with pre-1.8 clients which were accidentally
921 # compatibility with pre-1.8 clients which were accidentally
922 # sending raw binary nodes rather than utf-8-encoded hex
922 # sending raw binary nodes rather than utf-8-encoded hex
923 if len(new) == 20 and util.escapestr(new) != new:
923 if len(new) == 20 and util.escapestr(new) != new:
924 # looks like it could be a binary node
924 # looks like it could be a binary node
925 try:
925 try:
926 new.decode('utf-8')
926 new.decode('utf-8')
927 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
927 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
928 except UnicodeDecodeError:
928 except UnicodeDecodeError:
929 pass # binary, leave unmodified
929 pass # binary, leave unmodified
930 else:
930 else:
931 new = encoding.tolocal(new) # normal path
931 new = encoding.tolocal(new) # normal path
932
932
933 if util.safehasattr(proto, 'restore'):
933 if util.safehasattr(proto, 'restore'):
934
934
935 proto.redirect()
935 proto.redirect()
936
936
937 try:
937 try:
938 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
938 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
939 encoding.tolocal(old), new) or False
939 encoding.tolocal(old), new) or False
940 except error.Abort:
940 except error.Abort:
941 r = False
941 r = False
942
942
943 output = proto.restore()
943 output = proto.restore()
944
944
945 return '%s\n%s' % (int(r), output)
945 return '%s\n%s' % (int(r), output)
946
946
947 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
947 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
948 encoding.tolocal(old), new)
948 encoding.tolocal(old), new)
949 return '%s\n' % int(r)
949 return '%s\n' % int(r)
950
950
951 @wireprotocommand('stream_out')
951 @wireprotocommand('stream_out')
952 def stream(repo, proto):
952 def stream(repo, proto):
953 '''If the server supports streaming clone, it advertises the "stream"
953 '''If the server supports streaming clone, it advertises the "stream"
954 capability with a value representing the version and flags of the repo
954 capability with a value representing the version and flags of the repo
955 it is serving. Client checks to see if it understands the format.
955 it is serving. Client checks to see if it understands the format.
956 '''
956 '''
957 if not streamclone.allowservergeneration(repo):
957 return streamres(streamclone.generatev1wireproto(repo))
958 return '1\n'
959
960 def getstream(it):
961 yield '0\n'
962 for chunk in it:
963 yield chunk
964
965 try:
966 # LockError may be raised before the first result is yielded. Don't
967 # emit output until we're sure we got the lock successfully.
968 it = streamclone.generatev1wireproto(repo)
969 return streamres(gen=getstream(it))
970 except error.LockError:
971 return '2\n'
972
958
973 @wireprotocommand('unbundle', 'heads')
959 @wireprotocommand('unbundle', 'heads')
974 def unbundle(repo, proto, heads):
960 def unbundle(repo, proto, heads):
975 their_heads = decodelist(heads)
961 their_heads = decodelist(heads)
976
962
977 try:
963 try:
978 proto.redirect()
964 proto.redirect()
979
965
980 exchange.check_heads(repo, their_heads, 'preparing changes')
966 exchange.check_heads(repo, their_heads, 'preparing changes')
981
967
982 # write bundle data to temporary file because it can be big
968 # write bundle data to temporary file because it can be big
983 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
969 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
984 fp = os.fdopen(fd, pycompat.sysstr('wb+'))
970 fp = os.fdopen(fd, pycompat.sysstr('wb+'))
985 r = 0
971 r = 0
986 try:
972 try:
987 proto.getfile(fp)
973 proto.getfile(fp)
988 fp.seek(0)
974 fp.seek(0)
989 gen = exchange.readbundle(repo.ui, fp, None)
975 gen = exchange.readbundle(repo.ui, fp, None)
990 if (isinstance(gen, changegroupmod.cg1unpacker)
976 if (isinstance(gen, changegroupmod.cg1unpacker)
991 and not bundle1allowed(repo, 'push')):
977 and not bundle1allowed(repo, 'push')):
992 if proto.name == 'http':
978 if proto.name == 'http':
993 # need to special case http because stderr do not get to
979 # need to special case http because stderr do not get to
994 # the http client on failed push so we need to abuse some
980 # the http client on failed push so we need to abuse some
995 # other error type to make sure the message get to the
981 # other error type to make sure the message get to the
996 # user.
982 # user.
997 return ooberror(bundle2required)
983 return ooberror(bundle2required)
998 raise error.Abort(bundle2requiredmain,
984 raise error.Abort(bundle2requiredmain,
999 hint=bundle2requiredhint)
985 hint=bundle2requiredhint)
1000
986
1001 r = exchange.unbundle(repo, gen, their_heads, 'serve',
987 r = exchange.unbundle(repo, gen, their_heads, 'serve',
1002 proto._client())
988 proto._client())
1003 if util.safehasattr(r, 'addpart'):
989 if util.safehasattr(r, 'addpart'):
1004 # The return looks streamable, we are in the bundle2 case and
990 # The return looks streamable, we are in the bundle2 case and
1005 # should return a stream.
991 # should return a stream.
1006 return streamres(gen=r.getchunks())
992 return streamres(gen=r.getchunks())
1007 return pushres(r)
993 return pushres(r)
1008
994
1009 finally:
995 finally:
1010 fp.close()
996 fp.close()
1011 os.unlink(tempname)
997 os.unlink(tempname)
1012
998
1013 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
999 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
1014 # handle non-bundle2 case first
1000 # handle non-bundle2 case first
1015 if not getattr(exc, 'duringunbundle2', False):
1001 if not getattr(exc, 'duringunbundle2', False):
1016 try:
1002 try:
1017 raise
1003 raise
1018 except error.Abort:
1004 except error.Abort:
1019 # The old code we moved used util.stderr directly.
1005 # The old code we moved used util.stderr directly.
1020 # We did not change it to minimise code change.
1006 # We did not change it to minimise code change.
1021 # This need to be moved to something proper.
1007 # This need to be moved to something proper.
1022 # Feel free to do it.
1008 # Feel free to do it.
1023 util.stderr.write("abort: %s\n" % exc)
1009 util.stderr.write("abort: %s\n" % exc)
1024 if exc.hint is not None:
1010 if exc.hint is not None:
1025 util.stderr.write("(%s)\n" % exc.hint)
1011 util.stderr.write("(%s)\n" % exc.hint)
1026 return pushres(0)
1012 return pushres(0)
1027 except error.PushRaced:
1013 except error.PushRaced:
1028 return pusherr(str(exc))
1014 return pusherr(str(exc))
1029
1015
1030 bundler = bundle2.bundle20(repo.ui)
1016 bundler = bundle2.bundle20(repo.ui)
1031 for out in getattr(exc, '_bundle2salvagedoutput', ()):
1017 for out in getattr(exc, '_bundle2salvagedoutput', ()):
1032 bundler.addpart(out)
1018 bundler.addpart(out)
1033 try:
1019 try:
1034 try:
1020 try:
1035 raise
1021 raise
1036 except error.PushkeyFailed as exc:
1022 except error.PushkeyFailed as exc:
1037 # check client caps
1023 # check client caps
1038 remotecaps = getattr(exc, '_replycaps', None)
1024 remotecaps = getattr(exc, '_replycaps', None)
1039 if (remotecaps is not None
1025 if (remotecaps is not None
1040 and 'pushkey' not in remotecaps.get('error', ())):
1026 and 'pushkey' not in remotecaps.get('error', ())):
1041 # no support remote side, fallback to Abort handler.
1027 # no support remote side, fallback to Abort handler.
1042 raise
1028 raise
1043 part = bundler.newpart('error:pushkey')
1029 part = bundler.newpart('error:pushkey')
1044 part.addparam('in-reply-to', exc.partid)
1030 part.addparam('in-reply-to', exc.partid)
1045 if exc.namespace is not None:
1031 if exc.namespace is not None:
1046 part.addparam('namespace', exc.namespace, mandatory=False)
1032 part.addparam('namespace', exc.namespace, mandatory=False)
1047 if exc.key is not None:
1033 if exc.key is not None:
1048 part.addparam('key', exc.key, mandatory=False)
1034 part.addparam('key', exc.key, mandatory=False)
1049 if exc.new is not None:
1035 if exc.new is not None:
1050 part.addparam('new', exc.new, mandatory=False)
1036 part.addparam('new', exc.new, mandatory=False)
1051 if exc.old is not None:
1037 if exc.old is not None:
1052 part.addparam('old', exc.old, mandatory=False)
1038 part.addparam('old', exc.old, mandatory=False)
1053 if exc.ret is not None:
1039 if exc.ret is not None:
1054 part.addparam('ret', exc.ret, mandatory=False)
1040 part.addparam('ret', exc.ret, mandatory=False)
1055 except error.BundleValueError as exc:
1041 except error.BundleValueError as exc:
1056 errpart = bundler.newpart('error:unsupportedcontent')
1042 errpart = bundler.newpart('error:unsupportedcontent')
1057 if exc.parttype is not None:
1043 if exc.parttype is not None:
1058 errpart.addparam('parttype', exc.parttype)
1044 errpart.addparam('parttype', exc.parttype)
1059 if exc.params:
1045 if exc.params:
1060 errpart.addparam('params', '\0'.join(exc.params))
1046 errpart.addparam('params', '\0'.join(exc.params))
1061 except error.Abort as exc:
1047 except error.Abort as exc:
1062 manargs = [('message', str(exc))]
1048 manargs = [('message', str(exc))]
1063 advargs = []
1049 advargs = []
1064 if exc.hint is not None:
1050 if exc.hint is not None:
1065 advargs.append(('hint', exc.hint))
1051 advargs.append(('hint', exc.hint))
1066 bundler.addpart(bundle2.bundlepart('error:abort',
1052 bundler.addpart(bundle2.bundlepart('error:abort',
1067 manargs, advargs))
1053 manargs, advargs))
1068 except error.PushRaced as exc:
1054 except error.PushRaced as exc:
1069 bundler.newpart('error:pushraced', [('message', str(exc))])
1055 bundler.newpart('error:pushraced', [('message', str(exc))])
1070 return streamres(gen=bundler.getchunks())
1056 return streamres(gen=bundler.getchunks())
General Comments 0
You need to be logged in to leave comments. Login now