##// END OF EJS Templates
narrow: move adding of narrow server capabilities to core...
Pulkit Goyal -
r40111:ad8d8dc9 default
parent child Browse files
Show More
@@ -1,115 +1,106
1 1 # narrowwirepeer.py - passes narrow spec with unbundle command
2 2 #
3 3 # Copyright 2017 Google, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 from mercurial import (
11 11 bundle2,
12 12 error,
13 13 extensions,
14 14 hg,
15 15 match as matchmod,
16 16 narrowspec,
17 17 pycompat,
18 18 wireprototypes,
19 19 wireprotov1peer,
20 20 wireprotov1server,
21 21 )
22 22
23 23 def uisetup():
24 extensions.wrapfunction(wireprotov1server, '_capabilities', addnarrowcap)
25 24 wireprotov1peer.wirepeer.narrow_widen = peernarrowwiden
26 25
27 def addnarrowcap(orig, repo, proto):
28 """add the narrow capability to the server"""
29 caps = orig(repo, proto)
30 caps.append(wireprototypes.NARROWCAP)
31 if repo.ui.configbool('experimental', 'narrowservebrokenellipses'):
32 caps.append(wireprototypes.ELLIPSESCAP)
33 return caps
34
35 26 def reposetup(repo):
36 27 def wirereposetup(ui, peer):
37 28 def wrapped(orig, cmd, *args, **kwargs):
38 29 if cmd == 'unbundle':
39 30 # TODO: don't blindly add include/exclude wireproto
40 31 # arguments to unbundle.
41 32 include, exclude = repo.narrowpats
42 33 kwargs[r"includepats"] = ','.join(include)
43 34 kwargs[r"excludepats"] = ','.join(exclude)
44 35 return orig(cmd, *args, **kwargs)
45 36 extensions.wrapfunction(peer, '_calltwowaystream', wrapped)
46 37 hg.wirepeersetupfuncs.append(wirereposetup)
47 38
48 39 @wireprotov1server.wireprotocommand('narrow_widen', 'oldincludes oldexcludes'
49 40 ' newincludes newexcludes'
50 41 ' commonheads cgversion'
51 42 ' known ellipses',
52 43 permission='pull')
53 44 def narrow_widen(repo, proto, oldincludes, oldexcludes, newincludes,
54 45 newexcludes, commonheads, cgversion, known, ellipses):
55 46 """wireprotocol command to send data when a narrow clone is widen. We will
56 47 be sending a changegroup here.
57 48
58 49 The current set of arguments which are required:
59 50 oldincludes: the old includes of the narrow copy
60 51 oldexcludes: the old excludes of the narrow copy
61 52 newincludes: the new includes of the narrow copy
62 53 newexcludes: the new excludes of the narrow copy
63 54 commonheads: list of heads which are common between the server and client
64 55 cgversion(maybe): the changegroup version to produce
65 56 known: list of nodes which are known on the client (used in ellipses cases)
66 57 ellipses: whether to send ellipses data or not
67 58 """
68 59
69 60 try:
70 61 oldincludes = wireprototypes.decodelist(oldincludes)
71 62 newincludes = wireprototypes.decodelist(newincludes)
72 63 oldexcludes = wireprototypes.decodelist(oldexcludes)
73 64 newexcludes = wireprototypes.decodelist(newexcludes)
74 65 # validate the patterns
75 66 narrowspec.validatepatterns(set(oldincludes))
76 67 narrowspec.validatepatterns(set(newincludes))
77 68 narrowspec.validatepatterns(set(oldexcludes))
78 69 narrowspec.validatepatterns(set(newexcludes))
79 70
80 71 common = wireprototypes.decodelist(commonheads)
81 72 known = None
82 73 if known:
83 74 known = wireprototypes.decodelist(known)
84 75 if ellipses == '0':
85 76 ellipses = False
86 77 else:
87 78 ellipses = bool(ellipses)
88 79 cgversion = cgversion
89 80 newmatch = narrowspec.match(repo.root, include=newincludes,
90 81 exclude=newexcludes)
91 82 oldmatch = narrowspec.match(repo.root, include=oldincludes,
92 83 exclude=oldexcludes)
93 84 diffmatch = matchmod.differencematcher(newmatch, oldmatch)
94 85
95 86 bundler = bundle2.widen_bundle(repo, diffmatch, common, known,
96 87 cgversion, ellipses)
97 88 except error.Abort as exc:
98 89 bundler = bundle2.bundle20(repo.ui)
99 90 manargs = [('message', pycompat.bytestr(exc))]
100 91 advargs = []
101 92 if exc.hint is not None:
102 93 advargs.append(('hint', exc.hint))
103 94 bundler.addpart(bundle2.bundlepart('error:abort', manargs, advargs))
104 95
105 96 chunks = bundler.getchunks()
106 97 return wireprototypes.streamres(gen=chunks)
107 98
108 99 def peernarrowwiden(remote, **kwargs):
109 100 for ch in ('oldincludes', 'newincludes', 'oldexcludes', 'newexcludes',
110 101 'commonheads', 'known'):
111 102 kwargs[ch] = wireprototypes.encodelist(kwargs[ch])
112 103
113 104 kwargs['ellipses'] = '%i' % bool(kwargs['ellipses'])
114 105 f = remote._callcompressable('narrow_widen', **kwargs)
115 106 return bundle2.getunbundler(remote.ui, f)
@@ -1,668 +1,673
1 1 # wireprotov1server.py - Wire protocol version 1 server functionality
2 2 #
3 3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import os
11 11
12 12 from .i18n import _
13 13 from .node import (
14 14 hex,
15 15 nullid,
16 16 )
17 17
18 18 from . import (
19 19 bundle2,
20 20 changegroup as changegroupmod,
21 21 discovery,
22 22 encoding,
23 23 error,
24 24 exchange,
25 25 pushkey as pushkeymod,
26 26 pycompat,
27 27 streamclone,
28 28 util,
29 29 wireprototypes,
30 30 )
31 31
32 32 from .utils import (
33 33 procutil,
34 34 stringutil,
35 35 )
36 36
37 37 urlerr = util.urlerr
38 38 urlreq = util.urlreq
39 39
40 40 bundle2requiredmain = _('incompatible Mercurial client; bundle2 required')
41 41 bundle2requiredhint = _('see https://www.mercurial-scm.org/wiki/'
42 42 'IncompatibleClient')
43 43 bundle2required = '%s\n(%s)\n' % (bundle2requiredmain, bundle2requiredhint)
44 44
45 45 def clientcompressionsupport(proto):
46 46 """Returns a list of compression methods supported by the client.
47 47
48 48 Returns a list of the compression methods supported by the client
49 49 according to the protocol capabilities. If no such capability has
50 50 been announced, fallback to the default of zlib and uncompressed.
51 51 """
52 52 for cap in proto.getprotocaps():
53 53 if cap.startswith('comp='):
54 54 return cap[5:].split(',')
55 55 return ['zlib', 'none']
56 56
57 57 # wire protocol command can either return a string or one of these classes.
58 58
59 59 def getdispatchrepo(repo, proto, command):
60 60 """Obtain the repo used for processing wire protocol commands.
61 61
62 62 The intent of this function is to serve as a monkeypatch point for
63 63 extensions that need commands to operate on different repo views under
64 64 specialized circumstances.
65 65 """
66 66 return repo.filtered('served')
67 67
68 68 def dispatch(repo, proto, command):
69 69 repo = getdispatchrepo(repo, proto, command)
70 70
71 71 func, spec = commands[command]
72 72 args = proto.getargs(spec)
73 73
74 74 return func(repo, proto, *args)
75 75
76 76 def options(cmd, keys, others):
77 77 opts = {}
78 78 for k in keys:
79 79 if k in others:
80 80 opts[k] = others[k]
81 81 del others[k]
82 82 if others:
83 83 procutil.stderr.write("warning: %s ignored unexpected arguments %s\n"
84 84 % (cmd, ",".join(others)))
85 85 return opts
86 86
87 87 def bundle1allowed(repo, action):
88 88 """Whether a bundle1 operation is allowed from the server.
89 89
90 90 Priority is:
91 91
92 92 1. server.bundle1gd.<action> (if generaldelta active)
93 93 2. server.bundle1.<action>
94 94 3. server.bundle1gd (if generaldelta active)
95 95 4. server.bundle1
96 96 """
97 97 ui = repo.ui
98 98 gd = 'generaldelta' in repo.requirements
99 99
100 100 if gd:
101 101 v = ui.configbool('server', 'bundle1gd.%s' % action)
102 102 if v is not None:
103 103 return v
104 104
105 105 v = ui.configbool('server', 'bundle1.%s' % action)
106 106 if v is not None:
107 107 return v
108 108
109 109 if gd:
110 110 v = ui.configbool('server', 'bundle1gd')
111 111 if v is not None:
112 112 return v
113 113
114 114 return ui.configbool('server', 'bundle1')
115 115
116 116 commands = wireprototypes.commanddict()
117 117
118 118 def wireprotocommand(name, args=None, permission='push'):
119 119 """Decorator to declare a wire protocol command.
120 120
121 121 ``name`` is the name of the wire protocol command being provided.
122 122
123 123 ``args`` defines the named arguments accepted by the command. It is
124 124 a space-delimited list of argument names. ``*`` denotes a special value
125 125 that says to accept all named arguments.
126 126
127 127 ``permission`` defines the permission type needed to run this command.
128 128 Can be ``push`` or ``pull``. These roughly map to read-write and read-only,
129 129 respectively. Default is to assume command requires ``push`` permissions
130 130 because otherwise commands not declaring their permissions could modify
131 131 a repository that is supposed to be read-only.
132 132 """
133 133 transports = {k for k, v in wireprototypes.TRANSPORTS.items()
134 134 if v['version'] == 1}
135 135
136 136 # Because SSHv2 is a mirror of SSHv1, we allow "batch" commands through to
137 137 # SSHv2.
138 138 # TODO undo this hack when SSH is using the unified frame protocol.
139 139 if name == b'batch':
140 140 transports.add(wireprototypes.SSHV2)
141 141
142 142 if permission not in ('push', 'pull'):
143 143 raise error.ProgrammingError('invalid wire protocol permission; '
144 144 'got %s; expected "push" or "pull"' %
145 145 permission)
146 146
147 147 if args is None:
148 148 args = ''
149 149
150 150 if not isinstance(args, bytes):
151 151 raise error.ProgrammingError('arguments for version 1 commands '
152 152 'must be declared as bytes')
153 153
154 154 def register(func):
155 155 if name in commands:
156 156 raise error.ProgrammingError('%s command already registered '
157 157 'for version 1' % name)
158 158 commands[name] = wireprototypes.commandentry(
159 159 func, args=args, transports=transports, permission=permission)
160 160
161 161 return func
162 162 return register
163 163
164 164 # TODO define a more appropriate permissions type to use for this.
165 165 @wireprotocommand('batch', 'cmds *', permission='pull')
166 166 def batch(repo, proto, cmds, others):
167 167 unescapearg = wireprototypes.unescapebatcharg
168 168 repo = repo.filtered("served")
169 169 res = []
170 170 for pair in cmds.split(';'):
171 171 op, args = pair.split(' ', 1)
172 172 vals = {}
173 173 for a in args.split(','):
174 174 if a:
175 175 n, v = a.split('=')
176 176 vals[unescapearg(n)] = unescapearg(v)
177 177 func, spec = commands[op]
178 178
179 179 # Validate that client has permissions to perform this command.
180 180 perm = commands[op].permission
181 181 assert perm in ('push', 'pull')
182 182 proto.checkperm(perm)
183 183
184 184 if spec:
185 185 keys = spec.split()
186 186 data = {}
187 187 for k in keys:
188 188 if k == '*':
189 189 star = {}
190 190 for key in vals.keys():
191 191 if key not in keys:
192 192 star[key] = vals[key]
193 193 data['*'] = star
194 194 else:
195 195 data[k] = vals[k]
196 196 result = func(repo, proto, *[data[k] for k in keys])
197 197 else:
198 198 result = func(repo, proto)
199 199 if isinstance(result, wireprototypes.ooberror):
200 200 return result
201 201
202 202 # For now, all batchable commands must return bytesresponse or
203 203 # raw bytes (for backwards compatibility).
204 204 assert isinstance(result, (wireprototypes.bytesresponse, bytes))
205 205 if isinstance(result, wireprototypes.bytesresponse):
206 206 result = result.data
207 207 res.append(wireprototypes.escapebatcharg(result))
208 208
209 209 return wireprototypes.bytesresponse(';'.join(res))
210 210
211 211 @wireprotocommand('between', 'pairs', permission='pull')
212 212 def between(repo, proto, pairs):
213 213 pairs = [wireprototypes.decodelist(p, '-') for p in pairs.split(" ")]
214 214 r = []
215 215 for b in repo.between(pairs):
216 216 r.append(wireprototypes.encodelist(b) + "\n")
217 217
218 218 return wireprototypes.bytesresponse(''.join(r))
219 219
220 220 @wireprotocommand('branchmap', permission='pull')
221 221 def branchmap(repo, proto):
222 222 branchmap = repo.branchmap()
223 223 heads = []
224 224 for branch, nodes in branchmap.iteritems():
225 225 branchname = urlreq.quote(encoding.fromlocal(branch))
226 226 branchnodes = wireprototypes.encodelist(nodes)
227 227 heads.append('%s %s' % (branchname, branchnodes))
228 228
229 229 return wireprototypes.bytesresponse('\n'.join(heads))
230 230
231 231 @wireprotocommand('branches', 'nodes', permission='pull')
232 232 def branches(repo, proto, nodes):
233 233 nodes = wireprototypes.decodelist(nodes)
234 234 r = []
235 235 for b in repo.branches(nodes):
236 236 r.append(wireprototypes.encodelist(b) + "\n")
237 237
238 238 return wireprototypes.bytesresponse(''.join(r))
239 239
240 240 @wireprotocommand('clonebundles', '', permission='pull')
241 241 def clonebundles(repo, proto):
242 242 """Server command for returning info for available bundles to seed clones.
243 243
244 244 Clients will parse this response and determine what bundle to fetch.
245 245
246 246 Extensions may wrap this command to filter or dynamically emit data
247 247 depending on the request. e.g. you could advertise URLs for the closest
248 248 data center given the client's IP address.
249 249 """
250 250 return wireprototypes.bytesresponse(
251 251 repo.vfs.tryread('clonebundles.manifest'))
252 252
253 253 wireprotocaps = ['lookup', 'branchmap', 'pushkey',
254 254 'known', 'getbundle', 'unbundlehash']
255 255
256 256 def _capabilities(repo, proto):
257 257 """return a list of capabilities for a repo
258 258
259 259 This function exists to allow extensions to easily wrap capabilities
260 260 computation
261 261
262 262 - returns a lists: easy to alter
263 263 - change done here will be propagated to both `capabilities` and `hello`
264 264 command without any other action needed.
265 265 """
266 266 # copy to prevent modification of the global list
267 267 caps = list(wireprotocaps)
268 268
269 269 # Command of same name as capability isn't exposed to version 1 of
270 270 # transports. So conditionally add it.
271 271 if commands.commandavailable('changegroupsubset', proto):
272 272 caps.append('changegroupsubset')
273 273
274 274 if streamclone.allowservergeneration(repo):
275 275 if repo.ui.configbool('server', 'preferuncompressed'):
276 276 caps.append('stream-preferred')
277 277 requiredformats = repo.requirements & repo.supportedformats
278 278 # if our local revlogs are just revlogv1, add 'stream' cap
279 279 if not requiredformats - {'revlogv1'}:
280 280 caps.append('stream')
281 281 # otherwise, add 'streamreqs' detailing our local revlog format
282 282 else:
283 283 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
284 284 if repo.ui.configbool('experimental', 'bundle2-advertise'):
285 285 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo, role='server'))
286 286 caps.append('bundle2=' + urlreq.quote(capsblob))
287 287 caps.append('unbundle=%s' % ','.join(bundle2.bundlepriority))
288 288
289 if repo.ui.configbool('experimental', 'narrow'):
290 caps.append(wireprototypes.NARROWCAP)
291 if repo.ui.configbool('experimental', 'narrowservebrokenellipses'):
292 caps.append(wireprototypes.ELLIPSESCAP)
293
289 294 return proto.addcapabilities(repo, caps)
290 295
291 296 # If you are writing an extension and consider wrapping this function. Wrap
292 297 # `_capabilities` instead.
293 298 @wireprotocommand('capabilities', permission='pull')
294 299 def capabilities(repo, proto):
295 300 caps = _capabilities(repo, proto)
296 301 return wireprototypes.bytesresponse(' '.join(sorted(caps)))
297 302
298 303 @wireprotocommand('changegroup', 'roots', permission='pull')
299 304 def changegroup(repo, proto, roots):
300 305 nodes = wireprototypes.decodelist(roots)
301 306 outgoing = discovery.outgoing(repo, missingroots=nodes,
302 307 missingheads=repo.heads())
303 308 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
304 309 gen = iter(lambda: cg.read(32768), '')
305 310 return wireprototypes.streamres(gen=gen)
306 311
307 312 @wireprotocommand('changegroupsubset', 'bases heads',
308 313 permission='pull')
309 314 def changegroupsubset(repo, proto, bases, heads):
310 315 bases = wireprototypes.decodelist(bases)
311 316 heads = wireprototypes.decodelist(heads)
312 317 outgoing = discovery.outgoing(repo, missingroots=bases,
313 318 missingheads=heads)
314 319 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
315 320 gen = iter(lambda: cg.read(32768), '')
316 321 return wireprototypes.streamres(gen=gen)
317 322
318 323 @wireprotocommand('debugwireargs', 'one two *',
319 324 permission='pull')
320 325 def debugwireargs(repo, proto, one, two, others):
321 326 # only accept optional args from the known set
322 327 opts = options('debugwireargs', ['three', 'four'], others)
323 328 return wireprototypes.bytesresponse(repo.debugwireargs(
324 329 one, two, **pycompat.strkwargs(opts)))
325 330
326 331 def find_pullbundle(repo, proto, opts, clheads, heads, common):
327 332 """Return a file object for the first matching pullbundle.
328 333
329 334 Pullbundles are specified in .hg/pullbundles.manifest similar to
330 335 clonebundles.
331 336 For each entry, the bundle specification is checked for compatibility:
332 337 - Client features vs the BUNDLESPEC.
333 338 - Revisions shared with the clients vs base revisions of the bundle.
334 339 A bundle can be applied only if all its base revisions are known by
335 340 the client.
336 341 - At least one leaf of the bundle's DAG is missing on the client.
337 342 - Every leaf of the bundle's DAG is part of node set the client wants.
338 343 E.g. do not send a bundle of all changes if the client wants only
339 344 one specific branch of many.
340 345 """
341 346 def decodehexstring(s):
342 347 return set([h.decode('hex') for h in s.split(';')])
343 348
344 349 manifest = repo.vfs.tryread('pullbundles.manifest')
345 350 if not manifest:
346 351 return None
347 352 res = exchange.parseclonebundlesmanifest(repo, manifest)
348 353 res = exchange.filterclonebundleentries(repo, res)
349 354 if not res:
350 355 return None
351 356 cl = repo.changelog
352 357 heads_anc = cl.ancestors([cl.rev(rev) for rev in heads], inclusive=True)
353 358 common_anc = cl.ancestors([cl.rev(rev) for rev in common], inclusive=True)
354 359 compformats = clientcompressionsupport(proto)
355 360 for entry in res:
356 361 comp = entry.get('COMPRESSION')
357 362 altcomp = util.compengines._bundlenames.get(comp)
358 363 if comp and comp not in compformats and altcomp not in compformats:
359 364 continue
360 365 # No test yet for VERSION, since V2 is supported by any client
361 366 # that advertises partial pulls
362 367 if 'heads' in entry:
363 368 try:
364 369 bundle_heads = decodehexstring(entry['heads'])
365 370 except TypeError:
366 371 # Bad heads entry
367 372 continue
368 373 if bundle_heads.issubset(common):
369 374 continue # Nothing new
370 375 if all(cl.rev(rev) in common_anc for rev in bundle_heads):
371 376 continue # Still nothing new
372 377 if any(cl.rev(rev) not in heads_anc and
373 378 cl.rev(rev) not in common_anc for rev in bundle_heads):
374 379 continue
375 380 if 'bases' in entry:
376 381 try:
377 382 bundle_bases = decodehexstring(entry['bases'])
378 383 except TypeError:
379 384 # Bad bases entry
380 385 continue
381 386 if not all(cl.rev(rev) in common_anc for rev in bundle_bases):
382 387 continue
383 388 path = entry['URL']
384 389 repo.ui.debug('sending pullbundle "%s"\n' % path)
385 390 try:
386 391 return repo.vfs.open(path)
387 392 except IOError:
388 393 repo.ui.debug('pullbundle "%s" not accessible\n' % path)
389 394 continue
390 395 return None
391 396
392 397 @wireprotocommand('getbundle', '*', permission='pull')
393 398 def getbundle(repo, proto, others):
394 399 opts = options('getbundle', wireprototypes.GETBUNDLE_ARGUMENTS.keys(),
395 400 others)
396 401 for k, v in opts.iteritems():
397 402 keytype = wireprototypes.GETBUNDLE_ARGUMENTS[k]
398 403 if keytype == 'nodes':
399 404 opts[k] = wireprototypes.decodelist(v)
400 405 elif keytype == 'csv':
401 406 opts[k] = list(v.split(','))
402 407 elif keytype == 'scsv':
403 408 opts[k] = set(v.split(','))
404 409 elif keytype == 'boolean':
405 410 # Client should serialize False as '0', which is a non-empty string
406 411 # so it evaluates as a True bool.
407 412 if v == '0':
408 413 opts[k] = False
409 414 else:
410 415 opts[k] = bool(v)
411 416 elif keytype != 'plain':
412 417 raise KeyError('unknown getbundle option type %s'
413 418 % keytype)
414 419
415 420 if not bundle1allowed(repo, 'pull'):
416 421 if not exchange.bundle2requested(opts.get('bundlecaps')):
417 422 if proto.name == 'http-v1':
418 423 return wireprototypes.ooberror(bundle2required)
419 424 raise error.Abort(bundle2requiredmain,
420 425 hint=bundle2requiredhint)
421 426
422 427 prefercompressed = True
423 428
424 429 try:
425 430 clheads = set(repo.changelog.heads())
426 431 heads = set(opts.get('heads', set()))
427 432 common = set(opts.get('common', set()))
428 433 common.discard(nullid)
429 434 if (repo.ui.configbool('server', 'pullbundle') and
430 435 'partial-pull' in proto.getprotocaps()):
431 436 # Check if a pre-built bundle covers this request.
432 437 bundle = find_pullbundle(repo, proto, opts, clheads, heads, common)
433 438 if bundle:
434 439 return wireprototypes.streamres(gen=util.filechunkiter(bundle),
435 440 prefer_uncompressed=True)
436 441
437 442 if repo.ui.configbool('server', 'disablefullbundle'):
438 443 # Check to see if this is a full clone.
439 444 changegroup = opts.get('cg', True)
440 445 if changegroup and not common and clheads == heads:
441 446 raise error.Abort(
442 447 _('server has pull-based clones disabled'),
443 448 hint=_('remove --pull if specified or upgrade Mercurial'))
444 449
445 450 info, chunks = exchange.getbundlechunks(repo, 'serve',
446 451 **pycompat.strkwargs(opts))
447 452 prefercompressed = info.get('prefercompressed', True)
448 453 except error.Abort as exc:
449 454 # cleanly forward Abort error to the client
450 455 if not exchange.bundle2requested(opts.get('bundlecaps')):
451 456 if proto.name == 'http-v1':
452 457 return wireprototypes.ooberror(pycompat.bytestr(exc) + '\n')
453 458 raise # cannot do better for bundle1 + ssh
454 459 # bundle2 request expect a bundle2 reply
455 460 bundler = bundle2.bundle20(repo.ui)
456 461 manargs = [('message', pycompat.bytestr(exc))]
457 462 advargs = []
458 463 if exc.hint is not None:
459 464 advargs.append(('hint', exc.hint))
460 465 bundler.addpart(bundle2.bundlepart('error:abort',
461 466 manargs, advargs))
462 467 chunks = bundler.getchunks()
463 468 prefercompressed = False
464 469
465 470 return wireprototypes.streamres(
466 471 gen=chunks, prefer_uncompressed=not prefercompressed)
467 472
468 473 @wireprotocommand('heads', permission='pull')
469 474 def heads(repo, proto):
470 475 h = repo.heads()
471 476 return wireprototypes.bytesresponse(wireprototypes.encodelist(h) + '\n')
472 477
473 478 @wireprotocommand('hello', permission='pull')
474 479 def hello(repo, proto):
475 480 """Called as part of SSH handshake to obtain server info.
476 481
477 482 Returns a list of lines describing interesting things about the
478 483 server, in an RFC822-like format.
479 484
480 485 Currently, the only one defined is ``capabilities``, which consists of a
481 486 line of space separated tokens describing server abilities:
482 487
483 488 capabilities: <token0> <token1> <token2>
484 489 """
485 490 caps = capabilities(repo, proto).data
486 491 return wireprototypes.bytesresponse('capabilities: %s\n' % caps)
487 492
488 493 @wireprotocommand('listkeys', 'namespace', permission='pull')
489 494 def listkeys(repo, proto, namespace):
490 495 d = sorted(repo.listkeys(encoding.tolocal(namespace)).items())
491 496 return wireprototypes.bytesresponse(pushkeymod.encodekeys(d))
492 497
493 498 @wireprotocommand('lookup', 'key', permission='pull')
494 499 def lookup(repo, proto, key):
495 500 try:
496 501 k = encoding.tolocal(key)
497 502 n = repo.lookup(k)
498 503 r = hex(n)
499 504 success = 1
500 505 except Exception as inst:
501 506 r = stringutil.forcebytestr(inst)
502 507 success = 0
503 508 return wireprototypes.bytesresponse('%d %s\n' % (success, r))
504 509
505 510 @wireprotocommand('known', 'nodes *', permission='pull')
506 511 def known(repo, proto, nodes, others):
507 512 v = ''.join(b and '1' or '0'
508 513 for b in repo.known(wireprototypes.decodelist(nodes)))
509 514 return wireprototypes.bytesresponse(v)
510 515
511 516 @wireprotocommand('protocaps', 'caps', permission='pull')
512 517 def protocaps(repo, proto, caps):
513 518 if proto.name == wireprototypes.SSHV1:
514 519 proto._protocaps = set(caps.split(' '))
515 520 return wireprototypes.bytesresponse('OK')
516 521
517 522 @wireprotocommand('pushkey', 'namespace key old new', permission='push')
518 523 def pushkey(repo, proto, namespace, key, old, new):
519 524 # compatibility with pre-1.8 clients which were accidentally
520 525 # sending raw binary nodes rather than utf-8-encoded hex
521 526 if len(new) == 20 and stringutil.escapestr(new) != new:
522 527 # looks like it could be a binary node
523 528 try:
524 529 new.decode('utf-8')
525 530 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
526 531 except UnicodeDecodeError:
527 532 pass # binary, leave unmodified
528 533 else:
529 534 new = encoding.tolocal(new) # normal path
530 535
531 536 with proto.mayberedirectstdio() as output:
532 537 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
533 538 encoding.tolocal(old), new) or False
534 539
535 540 output = output.getvalue() if output else ''
536 541 return wireprototypes.bytesresponse('%d\n%s' % (int(r), output))
537 542
538 543 @wireprotocommand('stream_out', permission='pull')
539 544 def stream(repo, proto):
540 545 '''If the server supports streaming clone, it advertises the "stream"
541 546 capability with a value representing the version and flags of the repo
542 547 it is serving. Client checks to see if it understands the format.
543 548 '''
544 549 return wireprototypes.streamreslegacy(
545 550 streamclone.generatev1wireproto(repo))
546 551
547 552 @wireprotocommand('unbundle', 'heads', permission='push')
548 553 def unbundle(repo, proto, heads):
549 554 their_heads = wireprototypes.decodelist(heads)
550 555
551 556 with proto.mayberedirectstdio() as output:
552 557 try:
553 558 exchange.check_heads(repo, their_heads, 'preparing changes')
554 559 cleanup = lambda: None
555 560 try:
556 561 payload = proto.getpayload()
557 562 if repo.ui.configbool('server', 'streamunbundle'):
558 563 def cleanup():
559 564 # Ensure that the full payload is consumed, so
560 565 # that the connection doesn't contain trailing garbage.
561 566 for p in payload:
562 567 pass
563 568 fp = util.chunkbuffer(payload)
564 569 else:
565 570 # write bundle data to temporary file as it can be big
566 571 fp, tempname = None, None
567 572 def cleanup():
568 573 if fp:
569 574 fp.close()
570 575 if tempname:
571 576 os.unlink(tempname)
572 577 fd, tempname = pycompat.mkstemp(prefix='hg-unbundle-')
573 578 repo.ui.debug('redirecting incoming bundle to %s\n' %
574 579 tempname)
575 580 fp = os.fdopen(fd, pycompat.sysstr('wb+'))
576 581 r = 0
577 582 for p in payload:
578 583 fp.write(p)
579 584 fp.seek(0)
580 585
581 586 gen = exchange.readbundle(repo.ui, fp, None)
582 587 if (isinstance(gen, changegroupmod.cg1unpacker)
583 588 and not bundle1allowed(repo, 'push')):
584 589 if proto.name == 'http-v1':
585 590 # need to special case http because stderr do not get to
586 591 # the http client on failed push so we need to abuse
587 592 # some other error type to make sure the message get to
588 593 # the user.
589 594 return wireprototypes.ooberror(bundle2required)
590 595 raise error.Abort(bundle2requiredmain,
591 596 hint=bundle2requiredhint)
592 597
593 598 r = exchange.unbundle(repo, gen, their_heads, 'serve',
594 599 proto.client())
595 600 if util.safehasattr(r, 'addpart'):
596 601 # The return looks streamable, we are in the bundle2 case
597 602 # and should return a stream.
598 603 return wireprototypes.streamreslegacy(gen=r.getchunks())
599 604 return wireprototypes.pushres(
600 605 r, output.getvalue() if output else '')
601 606
602 607 finally:
603 608 cleanup()
604 609
605 610 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
606 611 # handle non-bundle2 case first
607 612 if not getattr(exc, 'duringunbundle2', False):
608 613 try:
609 614 raise
610 615 except error.Abort:
611 616 # The old code we moved used procutil.stderr directly.
612 617 # We did not change it to minimise code change.
613 618 # This need to be moved to something proper.
614 619 # Feel free to do it.
615 620 procutil.stderr.write("abort: %s\n" % exc)
616 621 if exc.hint is not None:
617 622 procutil.stderr.write("(%s)\n" % exc.hint)
618 623 procutil.stderr.flush()
619 624 return wireprototypes.pushres(
620 625 0, output.getvalue() if output else '')
621 626 except error.PushRaced:
622 627 return wireprototypes.pusherr(
623 628 pycompat.bytestr(exc),
624 629 output.getvalue() if output else '')
625 630
626 631 bundler = bundle2.bundle20(repo.ui)
627 632 for out in getattr(exc, '_bundle2salvagedoutput', ()):
628 633 bundler.addpart(out)
629 634 try:
630 635 try:
631 636 raise
632 637 except error.PushkeyFailed as exc:
633 638 # check client caps
634 639 remotecaps = getattr(exc, '_replycaps', None)
635 640 if (remotecaps is not None
636 641 and 'pushkey' not in remotecaps.get('error', ())):
637 642 # no support remote side, fallback to Abort handler.
638 643 raise
639 644 part = bundler.newpart('error:pushkey')
640 645 part.addparam('in-reply-to', exc.partid)
641 646 if exc.namespace is not None:
642 647 part.addparam('namespace', exc.namespace,
643 648 mandatory=False)
644 649 if exc.key is not None:
645 650 part.addparam('key', exc.key, mandatory=False)
646 651 if exc.new is not None:
647 652 part.addparam('new', exc.new, mandatory=False)
648 653 if exc.old is not None:
649 654 part.addparam('old', exc.old, mandatory=False)
650 655 if exc.ret is not None:
651 656 part.addparam('ret', exc.ret, mandatory=False)
652 657 except error.BundleValueError as exc:
653 658 errpart = bundler.newpart('error:unsupportedcontent')
654 659 if exc.parttype is not None:
655 660 errpart.addparam('parttype', exc.parttype)
656 661 if exc.params:
657 662 errpart.addparam('params', '\0'.join(exc.params))
658 663 except error.Abort as exc:
659 664 manargs = [('message', stringutil.forcebytestr(exc))]
660 665 advargs = []
661 666 if exc.hint is not None:
662 667 advargs.append(('hint', exc.hint))
663 668 bundler.addpart(bundle2.bundlepart('error:abort',
664 669 manargs, advargs))
665 670 except error.PushRaced as exc:
666 671 bundler.newpart('error:pushraced',
667 672 [('message', stringutil.forcebytestr(exc))])
668 673 return wireprototypes.streamreslegacy(gen=bundler.getchunks())
General Comments 0
You need to be logged in to leave comments. Login now