##// END OF EJS Templates
streamclone: move _allowstream() from wireproto...
Gregory Szorc -
r26444:62374301 default
parent child Browse files
Show More
@@ -1,221 +1,225
1 1 # streamclone.py - producing and consuming streaming repository data
2 2 #
3 3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import time
11 11
12 12 from .i18n import _
13 13 from . import (
14 14 branchmap,
15 15 error,
16 16 store,
17 17 util,
18 18 )
19 19
20 def allowservergeneration(ui):
21 """Whether streaming clones are allowed from the server."""
22 return ui.configbool('server', 'uncompressed', True, untrusted=True)
23
20 24 # This is it's own function so extensions can override it.
21 25 def _walkstreamfiles(repo):
22 26 return repo.store.walk()
23 27
24 28 def generatev1(repo):
25 29 """Emit content for version 1 of a streaming clone.
26 30
27 31 This is a generator of raw chunks that constitute a streaming clone.
28 32
29 33 The stream begins with a line of 2 space-delimited integers containing the
30 34 number of entries and total bytes size.
31 35
32 36 Next, are N entries for each file being transferred. Each file entry starts
33 37 as a line with the file name and integer size delimited by a null byte.
34 38 The raw file data follows. Following the raw file data is the next file
35 39 entry, or EOF.
36 40
37 41 When used on the wire protocol, an additional line indicating protocol
38 42 success will be prepended to the stream. This function is not responsible
39 43 for adding it.
40 44
41 45 This function will obtain a repository lock to ensure a consistent view of
42 46 the store is captured. It therefore may raise LockError.
43 47 """
44 48 entries = []
45 49 total_bytes = 0
46 50 # Get consistent snapshot of repo, lock during scan.
47 51 lock = repo.lock()
48 52 try:
49 53 repo.ui.debug('scanning\n')
50 54 for name, ename, size in _walkstreamfiles(repo):
51 55 if size:
52 56 entries.append((name, size))
53 57 total_bytes += size
54 58 finally:
55 59 lock.release()
56 60
57 61 repo.ui.debug('%d files, %d bytes to transfer\n' %
58 62 (len(entries), total_bytes))
59 63 yield '%d %d\n' % (len(entries), total_bytes)
60 64
61 65 svfs = repo.svfs
62 66 oldaudit = svfs.mustaudit
63 67 debugflag = repo.ui.debugflag
64 68 svfs.mustaudit = False
65 69
66 70 try:
67 71 for name, size in entries:
68 72 if debugflag:
69 73 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
70 74 # partially encode name over the wire for backwards compat
71 75 yield '%s\0%d\n' % (store.encodedir(name), size)
72 76 if size <= 65536:
73 77 fp = svfs(name)
74 78 try:
75 79 data = fp.read(size)
76 80 finally:
77 81 fp.close()
78 82 yield data
79 83 else:
80 84 for chunk in util.filechunkiter(svfs(name), limit=size):
81 85 yield chunk
82 86 finally:
83 87 svfs.mustaudit = oldaudit
84 88
85 89 def consumev1(repo, fp):
86 90 """Apply the contents from version 1 of a streaming clone file handle.
87 91
88 92 This takes the output from "streamout" and applies it to the specified
89 93 repository.
90 94
91 95 Like "streamout," the status line added by the wire protocol is not handled
92 96 by this function.
93 97 """
94 98 lock = repo.lock()
95 99 try:
96 100 repo.ui.status(_('streaming all changes\n'))
97 101 l = fp.readline()
98 102 try:
99 103 total_files, total_bytes = map(int, l.split(' ', 1))
100 104 except (ValueError, TypeError):
101 105 raise error.ResponseError(
102 106 _('unexpected response from remote server:'), l)
103 107 repo.ui.status(_('%d files to transfer, %s of data\n') %
104 108 (total_files, util.bytecount(total_bytes)))
105 109 handled_bytes = 0
106 110 repo.ui.progress(_('clone'), 0, total=total_bytes)
107 111 start = time.time()
108 112
109 113 tr = repo.transaction(_('clone'))
110 114 try:
111 115 for i in xrange(total_files):
112 116 # XXX doesn't support '\n' or '\r' in filenames
113 117 l = fp.readline()
114 118 try:
115 119 name, size = l.split('\0', 1)
116 120 size = int(size)
117 121 except (ValueError, TypeError):
118 122 raise error.ResponseError(
119 123 _('unexpected response from remote server:'), l)
120 124 if repo.ui.debugflag:
121 125 repo.ui.debug('adding %s (%s)\n' %
122 126 (name, util.bytecount(size)))
123 127 # for backwards compat, name was partially encoded
124 128 ofp = repo.svfs(store.decodedir(name), 'w')
125 129 for chunk in util.filechunkiter(fp, limit=size):
126 130 handled_bytes += len(chunk)
127 131 repo.ui.progress(_('clone'), handled_bytes,
128 132 total=total_bytes)
129 133 ofp.write(chunk)
130 134 ofp.close()
131 135 tr.close()
132 136 finally:
133 137 tr.release()
134 138
135 139 # Writing straight to files circumvented the inmemory caches
136 140 repo.invalidate()
137 141
138 142 elapsed = time.time() - start
139 143 if elapsed <= 0:
140 144 elapsed = 0.001
141 145 repo.ui.progress(_('clone'), None)
142 146 repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
143 147 (util.bytecount(total_bytes), elapsed,
144 148 util.bytecount(total_bytes / elapsed)))
145 149 finally:
146 150 lock.release()
147 151
148 152 def streamin(repo, remote, remotereqs):
149 153 # Save remote branchmap. We will use it later
150 154 # to speed up branchcache creation
151 155 rbranchmap = None
152 156 if remote.capable("branchmap"):
153 157 rbranchmap = remote.branchmap()
154 158
155 159 fp = remote.stream_out()
156 160 l = fp.readline()
157 161 try:
158 162 resp = int(l)
159 163 except ValueError:
160 164 raise error.ResponseError(
161 165 _('unexpected response from remote server:'), l)
162 166 if resp == 1:
163 167 raise util.Abort(_('operation forbidden by server'))
164 168 elif resp == 2:
165 169 raise util.Abort(_('locking the remote repository failed'))
166 170 elif resp != 0:
167 171 raise util.Abort(_('the server sent an unknown error code'))
168 172
169 173 applyremotedata(repo, remotereqs, rbranchmap, fp)
170 174 return len(repo.heads()) + 1
171 175
172 176 def applyremotedata(repo, remotereqs, remotebranchmap, fp):
173 177 """Apply stream clone data to a repository.
174 178
175 179 "remotereqs" is a set of requirements to handle the incoming data.
176 180 "remotebranchmap" is the result of a branchmap lookup on the remote. It
177 181 can be None.
178 182 "fp" is a file object containing the raw stream data, suitable for
179 183 feeding into consumev1().
180 184 """
181 185 lock = repo.lock()
182 186 try:
183 187 consumev1(repo, fp)
184 188
185 189 # new requirements = old non-format requirements +
186 190 # new format-related remote requirements
187 191 # requirements from the streamed-in repository
188 192 repo.requirements = remotereqs | (
189 193 repo.requirements - repo.supportedformats)
190 194 repo._applyopenerreqs()
191 195 repo._writerequirements()
192 196
193 197 if remotebranchmap:
194 198 rbheads = []
195 199 closed = []
196 200 for bheads in remotebranchmap.itervalues():
197 201 rbheads.extend(bheads)
198 202 for h in bheads:
199 203 r = repo.changelog.rev(h)
200 204 b, c = repo.changelog.branchinfo(r)
201 205 if c:
202 206 closed.append(h)
203 207
204 208 if rbheads:
205 209 rtiprev = max((int(repo.changelog.rev(node))
206 210 for node in rbheads))
207 211 cache = branchmap.branchcache(remotebranchmap,
208 212 repo[rtiprev].node(),
209 213 rtiprev,
210 214 closednodes=closed)
211 215 # Try to stick it as low as possible
212 216 # filter above served are unlikely to be fetch from a clone
213 217 for candidate in ('base', 'immutable', 'served'):
214 218 rview = repo.filtered(candidate)
215 219 if cache.validfor(rview):
216 220 repo._branchcaches[candidate] = cache
217 221 cache.write(rview)
218 222 break
219 223 repo.invalidate()
220 224 finally:
221 225 lock.release()
@@ -1,813 +1,810
1 1 # wireproto.py - generic wire protocol support functions
2 2 #
3 3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import os
11 11 import sys
12 12 import tempfile
13 13 import urllib
14 14
15 15 from .i18n import _
16 16 from .node import (
17 17 bin,
18 18 hex,
19 19 )
20 20
21 21 from . import (
22 22 bundle2,
23 23 changegroup as changegroupmod,
24 24 encoding,
25 25 error,
26 26 exchange,
27 27 peer,
28 28 pushkey as pushkeymod,
29 29 streamclone,
30 30 util,
31 31 )
32 32
33 33 class abstractserverproto(object):
34 34 """abstract class that summarizes the protocol API
35 35
36 36 Used as reference and documentation.
37 37 """
38 38
39 39 def getargs(self, args):
40 40 """return the value for arguments in <args>
41 41
42 42 returns a list of values (same order as <args>)"""
43 43 raise NotImplementedError()
44 44
45 45 def getfile(self, fp):
46 46 """write the whole content of a file into a file like object
47 47
48 48 The file is in the form::
49 49
50 50 (<chunk-size>\n<chunk>)+0\n
51 51
52 52 chunk size is the ascii version of the int.
53 53 """
54 54 raise NotImplementedError()
55 55
56 56 def redirect(self):
57 57 """may setup interception for stdout and stderr
58 58
59 59 See also the `restore` method."""
60 60 raise NotImplementedError()
61 61
62 62 # If the `redirect` function does install interception, the `restore`
63 63 # function MUST be defined. If interception is not used, this function
64 64 # MUST NOT be defined.
65 65 #
66 66 # left commented here on purpose
67 67 #
68 68 #def restore(self):
69 69 # """reinstall previous stdout and stderr and return intercepted stdout
70 70 # """
71 71 # raise NotImplementedError()
72 72
73 73 def groupchunks(self, cg):
74 74 """return 4096 chunks from a changegroup object
75 75
76 76 Some protocols may have compressed the contents."""
77 77 raise NotImplementedError()
78 78
79 79 class remotebatch(peer.batcher):
80 80 '''batches the queued calls; uses as few roundtrips as possible'''
81 81 def __init__(self, remote):
82 82 '''remote must support _submitbatch(encbatch) and
83 83 _submitone(op, encargs)'''
84 84 peer.batcher.__init__(self)
85 85 self.remote = remote
86 86 def submit(self):
87 87 req, rsp = [], []
88 88 for name, args, opts, resref in self.calls:
89 89 mtd = getattr(self.remote, name)
90 90 batchablefn = getattr(mtd, 'batchable', None)
91 91 if batchablefn is not None:
92 92 batchable = batchablefn(mtd.im_self, *args, **opts)
93 93 encargsorres, encresref = batchable.next()
94 94 if encresref:
95 95 req.append((name, encargsorres,))
96 96 rsp.append((batchable, encresref, resref,))
97 97 else:
98 98 resref.set(encargsorres)
99 99 else:
100 100 if req:
101 101 self._submitreq(req, rsp)
102 102 req, rsp = [], []
103 103 resref.set(mtd(*args, **opts))
104 104 if req:
105 105 self._submitreq(req, rsp)
106 106 def _submitreq(self, req, rsp):
107 107 encresults = self.remote._submitbatch(req)
108 108 for encres, r in zip(encresults, rsp):
109 109 batchable, encresref, resref = r
110 110 encresref.set(encres)
111 111 resref.set(batchable.next())
112 112
113 113 # Forward a couple of names from peer to make wireproto interactions
114 114 # slightly more sensible.
115 115 batchable = peer.batchable
116 116 future = peer.future
117 117
118 118 # list of nodes encoding / decoding
119 119
120 120 def decodelist(l, sep=' '):
121 121 if l:
122 122 return map(bin, l.split(sep))
123 123 return []
124 124
125 125 def encodelist(l, sep=' '):
126 126 try:
127 127 return sep.join(map(hex, l))
128 128 except TypeError:
129 129 raise
130 130
131 131 # batched call argument encoding
132 132
133 133 def escapearg(plain):
134 134 return (plain
135 135 .replace(':', ':c')
136 136 .replace(',', ':o')
137 137 .replace(';', ':s')
138 138 .replace('=', ':e'))
139 139
140 140 def unescapearg(escaped):
141 141 return (escaped
142 142 .replace(':e', '=')
143 143 .replace(':s', ';')
144 144 .replace(':o', ',')
145 145 .replace(':c', ':'))
146 146
147 147 # mapping of options accepted by getbundle and their types
148 148 #
149 149 # Meant to be extended by extensions. It is extensions responsibility to ensure
150 150 # such options are properly processed in exchange.getbundle.
151 151 #
152 152 # supported types are:
153 153 #
154 154 # :nodes: list of binary nodes
155 155 # :csv: list of comma-separated values
156 156 # :scsv: list of comma-separated values return as set
157 157 # :plain: string with no transformation needed.
158 158 gboptsmap = {'heads': 'nodes',
159 159 'common': 'nodes',
160 160 'obsmarkers': 'boolean',
161 161 'bundlecaps': 'scsv',
162 162 'listkeys': 'csv',
163 163 'cg': 'boolean'}
164 164
165 165 # client side
166 166
167 167 class wirepeer(peer.peerrepository):
168 168
169 169 def batch(self):
170 170 if self.capable('batch'):
171 171 return remotebatch(self)
172 172 else:
173 173 return peer.localbatch(self)
174 174 def _submitbatch(self, req):
175 175 cmds = []
176 176 for op, argsdict in req:
177 177 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
178 178 for k, v in argsdict.iteritems())
179 179 cmds.append('%s %s' % (op, args))
180 180 rsp = self._call("batch", cmds=';'.join(cmds))
181 181 return [unescapearg(r) for r in rsp.split(';')]
182 182 def _submitone(self, op, args):
183 183 return self._call(op, **args)
184 184
185 185 @batchable
186 186 def lookup(self, key):
187 187 self.requirecap('lookup', _('look up remote revision'))
188 188 f = future()
189 189 yield {'key': encoding.fromlocal(key)}, f
190 190 d = f.value
191 191 success, data = d[:-1].split(" ", 1)
192 192 if int(success):
193 193 yield bin(data)
194 194 self._abort(error.RepoError(data))
195 195
196 196 @batchable
197 197 def heads(self):
198 198 f = future()
199 199 yield {}, f
200 200 d = f.value
201 201 try:
202 202 yield decodelist(d[:-1])
203 203 except ValueError:
204 204 self._abort(error.ResponseError(_("unexpected response:"), d))
205 205
206 206 @batchable
207 207 def known(self, nodes):
208 208 f = future()
209 209 yield {'nodes': encodelist(nodes)}, f
210 210 d = f.value
211 211 try:
212 212 yield [bool(int(b)) for b in d]
213 213 except ValueError:
214 214 self._abort(error.ResponseError(_("unexpected response:"), d))
215 215
216 216 @batchable
217 217 def branchmap(self):
218 218 f = future()
219 219 yield {}, f
220 220 d = f.value
221 221 try:
222 222 branchmap = {}
223 223 for branchpart in d.splitlines():
224 224 branchname, branchheads = branchpart.split(' ', 1)
225 225 branchname = encoding.tolocal(urllib.unquote(branchname))
226 226 branchheads = decodelist(branchheads)
227 227 branchmap[branchname] = branchheads
228 228 yield branchmap
229 229 except TypeError:
230 230 self._abort(error.ResponseError(_("unexpected response:"), d))
231 231
232 232 def branches(self, nodes):
233 233 n = encodelist(nodes)
234 234 d = self._call("branches", nodes=n)
235 235 try:
236 236 br = [tuple(decodelist(b)) for b in d.splitlines()]
237 237 return br
238 238 except ValueError:
239 239 self._abort(error.ResponseError(_("unexpected response:"), d))
240 240
241 241 def between(self, pairs):
242 242 batch = 8 # avoid giant requests
243 243 r = []
244 244 for i in xrange(0, len(pairs), batch):
245 245 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
246 246 d = self._call("between", pairs=n)
247 247 try:
248 248 r.extend(l and decodelist(l) or [] for l in d.splitlines())
249 249 except ValueError:
250 250 self._abort(error.ResponseError(_("unexpected response:"), d))
251 251 return r
252 252
253 253 @batchable
254 254 def pushkey(self, namespace, key, old, new):
255 255 if not self.capable('pushkey'):
256 256 yield False, None
257 257 f = future()
258 258 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
259 259 yield {'namespace': encoding.fromlocal(namespace),
260 260 'key': encoding.fromlocal(key),
261 261 'old': encoding.fromlocal(old),
262 262 'new': encoding.fromlocal(new)}, f
263 263 d = f.value
264 264 d, output = d.split('\n', 1)
265 265 try:
266 266 d = bool(int(d))
267 267 except ValueError:
268 268 raise error.ResponseError(
269 269 _('push failed (unexpected response):'), d)
270 270 for l in output.splitlines(True):
271 271 self.ui.status(_('remote: '), l)
272 272 yield d
273 273
274 274 @batchable
275 275 def listkeys(self, namespace):
276 276 if not self.capable('pushkey'):
277 277 yield {}, None
278 278 f = future()
279 279 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
280 280 yield {'namespace': encoding.fromlocal(namespace)}, f
281 281 d = f.value
282 282 self.ui.debug('received listkey for "%s": %i bytes\n'
283 283 % (namespace, len(d)))
284 284 yield pushkeymod.decodekeys(d)
285 285
286 286 def stream_out(self):
287 287 return self._callstream('stream_out')
288 288
289 289 def changegroup(self, nodes, kind):
290 290 n = encodelist(nodes)
291 291 f = self._callcompressable("changegroup", roots=n)
292 292 return changegroupmod.cg1unpacker(f, 'UN')
293 293
294 294 def changegroupsubset(self, bases, heads, kind):
295 295 self.requirecap('changegroupsubset', _('look up remote changes'))
296 296 bases = encodelist(bases)
297 297 heads = encodelist(heads)
298 298 f = self._callcompressable("changegroupsubset",
299 299 bases=bases, heads=heads)
300 300 return changegroupmod.cg1unpacker(f, 'UN')
301 301
302 302 def getbundle(self, source, **kwargs):
303 303 self.requirecap('getbundle', _('look up remote changes'))
304 304 opts = {}
305 305 bundlecaps = kwargs.get('bundlecaps')
306 306 if bundlecaps is not None:
307 307 kwargs['bundlecaps'] = sorted(bundlecaps)
308 308 else:
309 309 bundlecaps = () # kwargs could have it to None
310 310 for key, value in kwargs.iteritems():
311 311 if value is None:
312 312 continue
313 313 keytype = gboptsmap.get(key)
314 314 if keytype is None:
315 315 assert False, 'unexpected'
316 316 elif keytype == 'nodes':
317 317 value = encodelist(value)
318 318 elif keytype in ('csv', 'scsv'):
319 319 value = ','.join(value)
320 320 elif keytype == 'boolean':
321 321 value = '%i' % bool(value)
322 322 elif keytype != 'plain':
323 323 raise KeyError('unknown getbundle option type %s'
324 324 % keytype)
325 325 opts[key] = value
326 326 f = self._callcompressable("getbundle", **opts)
327 327 if any((cap.startswith('HG2') for cap in bundlecaps)):
328 328 return bundle2.getunbundler(self.ui, f)
329 329 else:
330 330 return changegroupmod.cg1unpacker(f, 'UN')
331 331
332 332 def unbundle(self, cg, heads, source):
333 333 '''Send cg (a readable file-like object representing the
334 334 changegroup to push, typically a chunkbuffer object) to the
335 335 remote server as a bundle.
336 336
337 337 When pushing a bundle10 stream, return an integer indicating the
338 338 result of the push (see localrepository.addchangegroup()).
339 339
340 340 When pushing a bundle20 stream, return a bundle20 stream.'''
341 341
342 342 if heads != ['force'] and self.capable('unbundlehash'):
343 343 heads = encodelist(['hashed',
344 344 util.sha1(''.join(sorted(heads))).digest()])
345 345 else:
346 346 heads = encodelist(heads)
347 347
348 348 if util.safehasattr(cg, 'deltaheader'):
349 349 # this a bundle10, do the old style call sequence
350 350 ret, output = self._callpush("unbundle", cg, heads=heads)
351 351 if ret == "":
352 352 raise error.ResponseError(
353 353 _('push failed:'), output)
354 354 try:
355 355 ret = int(ret)
356 356 except ValueError:
357 357 raise error.ResponseError(
358 358 _('push failed (unexpected response):'), ret)
359 359
360 360 for l in output.splitlines(True):
361 361 self.ui.status(_('remote: '), l)
362 362 else:
363 363 # bundle2 push. Send a stream, fetch a stream.
364 364 stream = self._calltwowaystream('unbundle', cg, heads=heads)
365 365 ret = bundle2.getunbundler(self.ui, stream)
366 366 return ret
367 367
368 368 def debugwireargs(self, one, two, three=None, four=None, five=None):
369 369 # don't pass optional arguments left at their default value
370 370 opts = {}
371 371 if three is not None:
372 372 opts['three'] = three
373 373 if four is not None:
374 374 opts['four'] = four
375 375 return self._call('debugwireargs', one=one, two=two, **opts)
376 376
377 377 def _call(self, cmd, **args):
378 378 """execute <cmd> on the server
379 379
380 380 The command is expected to return a simple string.
381 381
382 382 returns the server reply as a string."""
383 383 raise NotImplementedError()
384 384
385 385 def _callstream(self, cmd, **args):
386 386 """execute <cmd> on the server
387 387
388 388 The command is expected to return a stream.
389 389
390 390 returns the server reply as a file like object."""
391 391 raise NotImplementedError()
392 392
393 393 def _callcompressable(self, cmd, **args):
394 394 """execute <cmd> on the server
395 395
396 396 The command is expected to return a stream.
397 397
398 398 The stream may have been compressed in some implementations. This
399 399 function takes care of the decompression. This is the only difference
400 400 with _callstream.
401 401
402 402 returns the server reply as a file like object.
403 403 """
404 404 raise NotImplementedError()
405 405
406 406 def _callpush(self, cmd, fp, **args):
407 407 """execute a <cmd> on server
408 408
409 409 The command is expected to be related to a push. Push has a special
410 410 return method.
411 411
412 412 returns the server reply as a (ret, output) tuple. ret is either
413 413 empty (error) or a stringified int.
414 414 """
415 415 raise NotImplementedError()
416 416
417 417 def _calltwowaystream(self, cmd, fp, **args):
418 418 """execute <cmd> on server
419 419
420 420 The command will send a stream to the server and get a stream in reply.
421 421 """
422 422 raise NotImplementedError()
423 423
424 424 def _abort(self, exception):
425 425 """clearly abort the wire protocol connection and raise the exception
426 426 """
427 427 raise NotImplementedError()
428 428
429 429 # server side
430 430
431 431 # wire protocol command can either return a string or one of these classes.
432 432 class streamres(object):
433 433 """wireproto reply: binary stream
434 434
435 435 The call was successful and the result is a stream.
436 436 Iterate on the `self.gen` attribute to retrieve chunks.
437 437 """
438 438 def __init__(self, gen):
439 439 self.gen = gen
440 440
441 441 class pushres(object):
442 442 """wireproto reply: success with simple integer return
443 443
444 444 The call was successful and returned an integer contained in `self.res`.
445 445 """
446 446 def __init__(self, res):
447 447 self.res = res
448 448
449 449 class pusherr(object):
450 450 """wireproto reply: failure
451 451
452 452 The call failed. The `self.res` attribute contains the error message.
453 453 """
454 454 def __init__(self, res):
455 455 self.res = res
456 456
457 457 class ooberror(object):
458 458 """wireproto reply: failure of a batch of operation
459 459
460 460 Something failed during a batch call. The error message is stored in
461 461 `self.message`.
462 462 """
463 463 def __init__(self, message):
464 464 self.message = message
465 465
466 466 def dispatch(repo, proto, command):
467 467 repo = repo.filtered("served")
468 468 func, spec = commands[command]
469 469 args = proto.getargs(spec)
470 470 return func(repo, proto, *args)
471 471
472 472 def options(cmd, keys, others):
473 473 opts = {}
474 474 for k in keys:
475 475 if k in others:
476 476 opts[k] = others[k]
477 477 del others[k]
478 478 if others:
479 479 sys.stderr.write("warning: %s ignored unexpected arguments %s\n"
480 480 % (cmd, ",".join(others)))
481 481 return opts
482 482
483 483 # list of commands
484 484 commands = {}
485 485
486 486 def wireprotocommand(name, args=''):
487 487 """decorator for wire protocol command"""
488 488 def register(func):
489 489 commands[name] = (func, args)
490 490 return func
491 491 return register
492 492
493 493 @wireprotocommand('batch', 'cmds *')
494 494 def batch(repo, proto, cmds, others):
495 495 repo = repo.filtered("served")
496 496 res = []
497 497 for pair in cmds.split(';'):
498 498 op, args = pair.split(' ', 1)
499 499 vals = {}
500 500 for a in args.split(','):
501 501 if a:
502 502 n, v = a.split('=')
503 503 vals[n] = unescapearg(v)
504 504 func, spec = commands[op]
505 505 if spec:
506 506 keys = spec.split()
507 507 data = {}
508 508 for k in keys:
509 509 if k == '*':
510 510 star = {}
511 511 for key in vals.keys():
512 512 if key not in keys:
513 513 star[key] = vals[key]
514 514 data['*'] = star
515 515 else:
516 516 data[k] = vals[k]
517 517 result = func(repo, proto, *[data[k] for k in keys])
518 518 else:
519 519 result = func(repo, proto)
520 520 if isinstance(result, ooberror):
521 521 return result
522 522 res.append(escapearg(result))
523 523 return ';'.join(res)
524 524
525 525 @wireprotocommand('between', 'pairs')
526 526 def between(repo, proto, pairs):
527 527 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
528 528 r = []
529 529 for b in repo.between(pairs):
530 530 r.append(encodelist(b) + "\n")
531 531 return "".join(r)
532 532
533 533 @wireprotocommand('branchmap')
534 534 def branchmap(repo, proto):
535 535 branchmap = repo.branchmap()
536 536 heads = []
537 537 for branch, nodes in branchmap.iteritems():
538 538 branchname = urllib.quote(encoding.fromlocal(branch))
539 539 branchnodes = encodelist(nodes)
540 540 heads.append('%s %s' % (branchname, branchnodes))
541 541 return '\n'.join(heads)
542 542
543 543 @wireprotocommand('branches', 'nodes')
544 544 def branches(repo, proto, nodes):
545 545 nodes = decodelist(nodes)
546 546 r = []
547 547 for b in repo.branches(nodes):
548 548 r.append(encodelist(b) + "\n")
549 549 return "".join(r)
550 550
551 551
552 552 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
553 553 'known', 'getbundle', 'unbundlehash', 'batch']
554 554
555 555 def _capabilities(repo, proto):
556 556 """return a list of capabilities for a repo
557 557
558 558 This function exists to allow extensions to easily wrap capabilities
559 559 computation
560 560
561 561 - returns a lists: easy to alter
562 562 - change done here will be propagated to both `capabilities` and `hello`
563 563 command without any other action needed.
564 564 """
565 565 # copy to prevent modification of the global list
566 566 caps = list(wireprotocaps)
567 if _allowstream(repo.ui):
567 if streamclone.allowservergeneration(repo.ui):
568 568 if repo.ui.configbool('server', 'preferuncompressed', False):
569 569 caps.append('stream-preferred')
570 570 requiredformats = repo.requirements & repo.supportedformats
571 571 # if our local revlogs are just revlogv1, add 'stream' cap
572 572 if not requiredformats - set(('revlogv1',)):
573 573 caps.append('stream')
574 574 # otherwise, add 'streamreqs' detailing our local revlog format
575 575 else:
576 576 caps.append('streamreqs=%s' % ','.join(requiredformats))
577 577 if repo.ui.configbool('experimental', 'bundle2-advertise', True):
578 578 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
579 579 caps.append('bundle2=' + urllib.quote(capsblob))
580 580 caps.append('unbundle=%s' % ','.join(changegroupmod.bundlepriority))
581 581 caps.append(
582 582 'httpheader=%d' % repo.ui.configint('server', 'maxhttpheaderlen', 1024))
583 583 return caps
584 584
585 585 # If you are writing an extension and consider wrapping this function. Wrap
586 586 # `_capabilities` instead.
587 587 @wireprotocommand('capabilities')
588 588 def capabilities(repo, proto):
589 589 return ' '.join(_capabilities(repo, proto))
590 590
591 591 @wireprotocommand('changegroup', 'roots')
592 592 def changegroup(repo, proto, roots):
593 593 nodes = decodelist(roots)
594 594 cg = changegroupmod.changegroup(repo, nodes, 'serve')
595 595 return streamres(proto.groupchunks(cg))
596 596
597 597 @wireprotocommand('changegroupsubset', 'bases heads')
598 598 def changegroupsubset(repo, proto, bases, heads):
599 599 bases = decodelist(bases)
600 600 heads = decodelist(heads)
601 601 cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve')
602 602 return streamres(proto.groupchunks(cg))
603 603
604 604 @wireprotocommand('debugwireargs', 'one two *')
605 605 def debugwireargs(repo, proto, one, two, others):
606 606 # only accept optional args from the known set
607 607 opts = options('debugwireargs', ['three', 'four'], others)
608 608 return repo.debugwireargs(one, two, **opts)
609 609
610 610 # List of options accepted by getbundle.
611 611 #
612 612 # Meant to be extended by extensions. It is the extension's responsibility to
613 613 # ensure such options are properly processed in exchange.getbundle.
614 614 gboptslist = ['heads', 'common', 'bundlecaps']
615 615
616 616 @wireprotocommand('getbundle', '*')
617 617 def getbundle(repo, proto, others):
618 618 opts = options('getbundle', gboptsmap.keys(), others)
619 619 for k, v in opts.iteritems():
620 620 keytype = gboptsmap[k]
621 621 if keytype == 'nodes':
622 622 opts[k] = decodelist(v)
623 623 elif keytype == 'csv':
624 624 opts[k] = list(v.split(','))
625 625 elif keytype == 'scsv':
626 626 opts[k] = set(v.split(','))
627 627 elif keytype == 'boolean':
628 628 opts[k] = bool(v)
629 629 elif keytype != 'plain':
630 630 raise KeyError('unknown getbundle option type %s'
631 631 % keytype)
632 632 cg = exchange.getbundle(repo, 'serve', **opts)
633 633 return streamres(proto.groupchunks(cg))
634 634
635 635 @wireprotocommand('heads')
636 636 def heads(repo, proto):
637 637 h = repo.heads()
638 638 return encodelist(h) + "\n"
639 639
640 640 @wireprotocommand('hello')
641 641 def hello(repo, proto):
642 642 '''the hello command returns a set of lines describing various
643 643 interesting things about the server, in an RFC822-like format.
644 644 Currently the only one defined is "capabilities", which
645 645 consists of a line in the form:
646 646
647 647 capabilities: space separated list of tokens
648 648 '''
649 649 return "capabilities: %s\n" % (capabilities(repo, proto))
650 650
651 651 @wireprotocommand('listkeys', 'namespace')
652 652 def listkeys(repo, proto, namespace):
653 653 d = repo.listkeys(encoding.tolocal(namespace)).items()
654 654 return pushkeymod.encodekeys(d)
655 655
656 656 @wireprotocommand('lookup', 'key')
657 657 def lookup(repo, proto, key):
658 658 try:
659 659 k = encoding.tolocal(key)
660 660 c = repo[k]
661 661 r = c.hex()
662 662 success = 1
663 663 except Exception as inst:
664 664 r = str(inst)
665 665 success = 0
666 666 return "%s %s\n" % (success, r)
667 667
668 668 @wireprotocommand('known', 'nodes *')
669 669 def known(repo, proto, nodes, others):
670 670 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
671 671
672 672 @wireprotocommand('pushkey', 'namespace key old new')
673 673 def pushkey(repo, proto, namespace, key, old, new):
674 674 # compatibility with pre-1.8 clients which were accidentally
675 675 # sending raw binary nodes rather than utf-8-encoded hex
676 676 if len(new) == 20 and new.encode('string-escape') != new:
677 677 # looks like it could be a binary node
678 678 try:
679 679 new.decode('utf-8')
680 680 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
681 681 except UnicodeDecodeError:
682 682 pass # binary, leave unmodified
683 683 else:
684 684 new = encoding.tolocal(new) # normal path
685 685
686 686 if util.safehasattr(proto, 'restore'):
687 687
688 688 proto.redirect()
689 689
690 690 try:
691 691 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
692 692 encoding.tolocal(old), new) or False
693 693 except util.Abort:
694 694 r = False
695 695
696 696 output = proto.restore()
697 697
698 698 return '%s\n%s' % (int(r), output)
699 699
700 700 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
701 701 encoding.tolocal(old), new)
702 702 return '%s\n' % int(r)
703 703
704 def _allowstream(ui):
705 return ui.configbool('server', 'uncompressed', True, untrusted=True)
706
707 704 @wireprotocommand('stream_out')
708 705 def stream(repo, proto):
709 706 '''If the server supports streaming clone, it advertises the "stream"
710 707 capability with a value representing the version and flags of the repo
711 708 it is serving. Client checks to see if it understands the format.
712 709 '''
713 if not _allowstream(repo.ui):
710 if not streamclone.allowservergeneration(repo.ui):
714 711 return '1\n'
715 712
716 713 def getstream(it):
717 714 yield '0\n'
718 715 for chunk in it:
719 716 yield chunk
720 717
721 718 try:
722 719 # LockError may be raised before the first result is yielded. Don't
723 720 # emit output until we're sure we got the lock successfully.
724 721 it = streamclone.generatev1(repo)
725 722 return streamres(getstream(it))
726 723 except error.LockError:
727 724 return '2\n'
728 725
729 726 @wireprotocommand('unbundle', 'heads')
730 727 def unbundle(repo, proto, heads):
731 728 their_heads = decodelist(heads)
732 729
733 730 try:
734 731 proto.redirect()
735 732
736 733 exchange.check_heads(repo, their_heads, 'preparing changes')
737 734
738 735 # write bundle data to temporary file because it can be big
739 736 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
740 737 fp = os.fdopen(fd, 'wb+')
741 738 r = 0
742 739 try:
743 740 proto.getfile(fp)
744 741 fp.seek(0)
745 742 gen = exchange.readbundle(repo.ui, fp, None)
746 743 r = exchange.unbundle(repo, gen, their_heads, 'serve',
747 744 proto._client())
748 745 if util.safehasattr(r, 'addpart'):
749 746 # The return looks streamable, we are in the bundle2 case and
750 747 # should return a stream.
751 748 return streamres(r.getchunks())
752 749 return pushres(r)
753 750
754 751 finally:
755 752 fp.close()
756 753 os.unlink(tempname)
757 754
758 755 except (error.BundleValueError, util.Abort, error.PushRaced) as exc:
759 756 # handle non-bundle2 case first
760 757 if not getattr(exc, 'duringunbundle2', False):
761 758 try:
762 759 raise
763 760 except util.Abort:
764 761 # The old code we moved used sys.stderr directly.
765 762 # We did not change it to minimise code change.
766 763 # This need to be moved to something proper.
767 764 # Feel free to do it.
768 765 sys.stderr.write("abort: %s\n" % exc)
769 766 return pushres(0)
770 767 except error.PushRaced:
771 768 return pusherr(str(exc))
772 769
773 770 bundler = bundle2.bundle20(repo.ui)
774 771 for out in getattr(exc, '_bundle2salvagedoutput', ()):
775 772 bundler.addpart(out)
776 773 try:
777 774 try:
778 775 raise
779 776 except error.PushkeyFailed as exc:
780 777 # check client caps
781 778 remotecaps = getattr(exc, '_replycaps', None)
782 779 if (remotecaps is not None
783 780 and 'pushkey' not in remotecaps.get('error', ())):
784 781 # no support remote side, fallback to Abort handler.
785 782 raise
786 783 part = bundler.newpart('error:pushkey')
787 784 part.addparam('in-reply-to', exc.partid)
788 785 if exc.namespace is not None:
789 786 part.addparam('namespace', exc.namespace, mandatory=False)
790 787 if exc.key is not None:
791 788 part.addparam('key', exc.key, mandatory=False)
792 789 if exc.new is not None:
793 790 part.addparam('new', exc.new, mandatory=False)
794 791 if exc.old is not None:
795 792 part.addparam('old', exc.old, mandatory=False)
796 793 if exc.ret is not None:
797 794 part.addparam('ret', exc.ret, mandatory=False)
798 795 except error.BundleValueError as exc:
799 796 errpart = bundler.newpart('error:unsupportedcontent')
800 797 if exc.parttype is not None:
801 798 errpart.addparam('parttype', exc.parttype)
802 799 if exc.params:
803 800 errpart.addparam('params', '\0'.join(exc.params))
804 801 except util.Abort as exc:
805 802 manargs = [('message', str(exc))]
806 803 advargs = []
807 804 if exc.hint is not None:
808 805 advargs.append(('hint', exc.hint))
809 806 bundler.addpart(bundle2.bundlepart('error:abort',
810 807 manargs, advargs))
811 808 except error.PushRaced as exc:
812 809 bundler.newpart('error:pushraced', [('message', str(exc))])
813 810 return streamres(bundler.getchunks())
General Comments 0
You need to be logged in to leave comments. Login now