##// END OF EJS Templates
wireproto: separate commands tables for version 1 and 2 commands...
Gregory Szorc -
r37311:45b39c69 default
parent child Browse files
Show More
@@ -1,202 +1,203 b''
1 1 # Copyright 2009-2010 Gregory P. Ward
2 2 # Copyright 2009-2010 Intelerad Medical Systems Incorporated
3 3 # Copyright 2010-2011 Fog Creek Software
4 4 # Copyright 2010-2011 Unity Technologies
5 5 #
6 6 # This software may be used and distributed according to the terms of the
7 7 # GNU General Public License version 2 or any later version.
8 8
9 9 '''setup for largefiles extension: uisetup'''
10 10 from __future__ import absolute_import
11 11
12 12 from mercurial.i18n import _
13 13
14 14 from mercurial.hgweb import (
15 15 webcommands,
16 16 )
17 17
18 18 from mercurial import (
19 19 archival,
20 20 cmdutil,
21 21 commands,
22 22 copies,
23 23 exchange,
24 24 extensions,
25 25 filemerge,
26 26 hg,
27 27 httppeer,
28 28 merge,
29 29 scmutil,
30 30 sshpeer,
31 31 subrepo,
32 32 upgrade,
33 33 url,
34 34 wireproto,
35 35 )
36 36
37 37 from . import (
38 38 overrides,
39 39 proto,
40 40 )
41 41
42 42 def uisetup(ui):
43 43 # Disable auto-status for some commands which assume that all
44 44 # files in the result are under Mercurial's control
45 45
46 46 entry = extensions.wrapcommand(commands.table, 'add',
47 47 overrides.overrideadd)
48 48 addopt = [('', 'large', None, _('add as largefile')),
49 49 ('', 'normal', None, _('add as normal file')),
50 50 ('', 'lfsize', '', _('add all files above this size '
51 51 '(in megabytes) as largefiles '
52 52 '(default: 10)'))]
53 53 entry[1].extend(addopt)
54 54
55 55 # The scmutil function is called both by the (trivial) addremove command,
56 56 # and in the process of handling commit -A (issue3542)
57 57 extensions.wrapfunction(scmutil, 'addremove', overrides.scmutiladdremove)
58 58 extensions.wrapfunction(cmdutil, 'add', overrides.cmdutiladd)
59 59 extensions.wrapfunction(cmdutil, 'remove', overrides.cmdutilremove)
60 60 extensions.wrapfunction(cmdutil, 'forget', overrides.cmdutilforget)
61 61
62 62 extensions.wrapfunction(copies, 'pathcopies', overrides.copiespathcopies)
63 63
64 64 extensions.wrapfunction(upgrade, 'preservedrequirements',
65 65 overrides.upgraderequirements)
66 66
67 67 extensions.wrapfunction(upgrade, 'supporteddestrequirements',
68 68 overrides.upgraderequirements)
69 69
70 70 # Subrepos call status function
71 71 entry = extensions.wrapcommand(commands.table, 'status',
72 72 overrides.overridestatus)
73 73 extensions.wrapfunction(subrepo.hgsubrepo, 'status',
74 74 overrides.overridestatusfn)
75 75
76 76 entry = extensions.wrapcommand(commands.table, 'log',
77 77 overrides.overridelog)
78 78 entry = extensions.wrapcommand(commands.table, 'rollback',
79 79 overrides.overriderollback)
80 80 entry = extensions.wrapcommand(commands.table, 'verify',
81 81 overrides.overrideverify)
82 82
83 83 verifyopt = [('', 'large', None,
84 84 _('verify that all largefiles in current revision exists')),
85 85 ('', 'lfa', None,
86 86 _('verify largefiles in all revisions, not just current')),
87 87 ('', 'lfc', None,
88 88 _('verify local largefile contents, not just existence'))]
89 89 entry[1].extend(verifyopt)
90 90
91 91 entry = extensions.wrapcommand(commands.table, 'debugstate',
92 92 overrides.overridedebugstate)
93 93 debugstateopt = [('', 'large', None, _('display largefiles dirstate'))]
94 94 entry[1].extend(debugstateopt)
95 95
96 96 outgoing = lambda orgfunc, *arg, **kwargs: orgfunc(*arg, **kwargs)
97 97 entry = extensions.wrapcommand(commands.table, 'outgoing', outgoing)
98 98 outgoingopt = [('', 'large', None, _('display outgoing largefiles'))]
99 99 entry[1].extend(outgoingopt)
100 100 cmdutil.outgoinghooks.add('largefiles', overrides.outgoinghook)
101 101 entry = extensions.wrapcommand(commands.table, 'summary',
102 102 overrides.overridesummary)
103 103 summaryopt = [('', 'large', None, _('display outgoing largefiles'))]
104 104 entry[1].extend(summaryopt)
105 105 cmdutil.summaryremotehooks.add('largefiles', overrides.summaryremotehook)
106 106
107 107 entry = extensions.wrapcommand(commands.table, 'pull',
108 108 overrides.overridepull)
109 109 pullopt = [('', 'all-largefiles', None,
110 110 _('download all pulled versions of largefiles (DEPRECATED)')),
111 111 ('', 'lfrev', [],
112 112 _('download largefiles for these revisions'), _('REV'))]
113 113 entry[1].extend(pullopt)
114 114
115 115 entry = extensions.wrapcommand(commands.table, 'push',
116 116 overrides.overridepush)
117 117 pushopt = [('', 'lfrev', [],
118 118 _('upload largefiles for these revisions'), _('REV'))]
119 119 entry[1].extend(pushopt)
120 120 extensions.wrapfunction(exchange, 'pushoperation',
121 121 overrides.exchangepushoperation)
122 122
123 123 entry = extensions.wrapcommand(commands.table, 'clone',
124 124 overrides.overrideclone)
125 125 cloneopt = [('', 'all-largefiles', None,
126 126 _('download all versions of all largefiles'))]
127 127 entry[1].extend(cloneopt)
128 128 extensions.wrapfunction(hg, 'clone', overrides.hgclone)
129 129 extensions.wrapfunction(hg, 'postshare', overrides.hgpostshare)
130 130
131 131 entry = extensions.wrapcommand(commands.table, 'cat',
132 132 overrides.overridecat)
133 133 extensions.wrapfunction(merge, '_checkunknownfile',
134 134 overrides.overridecheckunknownfile)
135 135 extensions.wrapfunction(merge, 'calculateupdates',
136 136 overrides.overridecalculateupdates)
137 137 extensions.wrapfunction(merge, 'recordupdates',
138 138 overrides.mergerecordupdates)
139 139 extensions.wrapfunction(merge, 'update', overrides.mergeupdate)
140 140 extensions.wrapfunction(filemerge, '_filemerge',
141 141 overrides.overridefilemerge)
142 142 extensions.wrapfunction(cmdutil, 'copy', overrides.overridecopy)
143 143
144 144 # Summary calls dirty on the subrepos
145 145 extensions.wrapfunction(subrepo.hgsubrepo, 'dirty', overrides.overridedirty)
146 146
147 147 extensions.wrapfunction(cmdutil, 'revert', overrides.overriderevert)
148 148
149 149 extensions.wrapcommand(commands.table, 'archive',
150 150 overrides.overridearchivecmd)
151 151 extensions.wrapfunction(archival, 'archive', overrides.overridearchive)
152 152 extensions.wrapfunction(subrepo.hgsubrepo, 'archive',
153 153 overrides.hgsubrepoarchive)
154 154 extensions.wrapfunction(webcommands, 'archive', overrides.hgwebarchive)
155 155 extensions.wrapfunction(cmdutil, 'bailifchanged',
156 156 overrides.overridebailifchanged)
157 157
158 158 extensions.wrapfunction(cmdutil, 'postcommitstatus',
159 159 overrides.postcommitstatus)
160 160 extensions.wrapfunction(scmutil, 'marktouched',
161 161 overrides.scmutilmarktouched)
162 162
163 163 extensions.wrapfunction(url, 'open',
164 164 overrides.openlargefile)
165 165
166 166 # create the new wireproto commands ...
167 167 wireproto.wireprotocommand('putlfile', 'sha', permission='push')(
168 168 proto.putlfile)
169 169 wireproto.wireprotocommand('getlfile', 'sha', permission='pull')(
170 170 proto.getlfile)
171 171 wireproto.wireprotocommand('statlfile', 'sha', permission='pull')(
172 172 proto.statlfile)
173 173 wireproto.wireprotocommand('lheads', '', permission='pull')(
174 174 wireproto.heads)
175 175
176 176 # ... and wrap some existing ones
177 177 wireproto.commands['heads'].func = proto.heads
178 # TODO also wrap wireproto.commandsv2 once heads is implemented there.
178 179
179 180 extensions.wrapfunction(webcommands, 'decodepath', overrides.decodepath)
180 181
181 182 extensions.wrapfunction(wireproto, '_capabilities', proto._capabilities)
182 183
183 184 # can't do this in reposetup because it needs to have happened before
184 185 # wirerepo.__init__ is called
185 186 proto.ssholdcallstream = sshpeer.sshv1peer._callstream
186 187 proto.httpoldcallstream = httppeer.httppeer._callstream
187 188 sshpeer.sshv1peer._callstream = proto.sshrepocallstream
188 189 httppeer.httppeer._callstream = proto.httprepocallstream
189 190
190 191 # override some extensions' stuff as well
191 192 for name, module in extensions.extensions():
192 193 if name == 'purge':
193 194 extensions.wrapcommand(getattr(module, 'cmdtable'), 'purge',
194 195 overrides.overridepurge)
195 196 if name == 'rebase':
196 197 extensions.wrapcommand(getattr(module, 'cmdtable'), 'rebase',
197 198 overrides.overriderebase)
198 199 extensions.wrapfunction(module, 'rebase',
199 200 overrides.overriderebase)
200 201 if name == 'transplant':
201 202 extensions.wrapcommand(getattr(module, 'cmdtable'), 'transplant',
202 203 overrides.overridetransplant)
@@ -1,1139 +1,1163 b''
1 1 # wireproto.py - generic wire protocol support functions
2 2 #
3 3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import hashlib
11 11 import os
12 12 import tempfile
13 13
14 14 from .i18n import _
15 15 from .node import (
16 16 bin,
17 17 hex,
18 18 nullid,
19 19 )
20 20
21 21 from . import (
22 22 bundle2,
23 23 changegroup as changegroupmod,
24 24 discovery,
25 25 encoding,
26 26 error,
27 27 exchange,
28 28 peer,
29 29 pushkey as pushkeymod,
30 30 pycompat,
31 31 repository,
32 32 streamclone,
33 33 util,
34 34 wireprototypes,
35 35 )
36 36
37 37 from .utils import (
38 38 procutil,
39 39 stringutil,
40 40 )
41 41
42 42 urlerr = util.urlerr
43 43 urlreq = util.urlreq
44 44
45 45 bundle2requiredmain = _('incompatible Mercurial client; bundle2 required')
46 46 bundle2requiredhint = _('see https://www.mercurial-scm.org/wiki/'
47 47 'IncompatibleClient')
48 48 bundle2required = '%s\n(%s)\n' % (bundle2requiredmain, bundle2requiredhint)
49 49
50 50 class remoteiterbatcher(peer.iterbatcher):
51 51 def __init__(self, remote):
52 52 super(remoteiterbatcher, self).__init__()
53 53 self._remote = remote
54 54
55 55 def __getattr__(self, name):
56 56 # Validate this method is batchable, since submit() only supports
57 57 # batchable methods.
58 58 fn = getattr(self._remote, name)
59 59 if not getattr(fn, 'batchable', None):
60 60 raise error.ProgrammingError('Attempted to batch a non-batchable '
61 61 'call to %r' % name)
62 62
63 63 return super(remoteiterbatcher, self).__getattr__(name)
64 64
65 65 def submit(self):
66 66 """Break the batch request into many patch calls and pipeline them.
67 67
68 68 This is mostly valuable over http where request sizes can be
69 69 limited, but can be used in other places as well.
70 70 """
71 71 # 2-tuple of (command, arguments) that represents what will be
72 72 # sent over the wire.
73 73 requests = []
74 74
75 75 # 4-tuple of (command, final future, @batchable generator, remote
76 76 # future).
77 77 results = []
78 78
79 79 for command, args, opts, finalfuture in self.calls:
80 80 mtd = getattr(self._remote, command)
81 81 batchable = mtd.batchable(mtd.__self__, *args, **opts)
82 82
83 83 commandargs, fremote = next(batchable)
84 84 assert fremote
85 85 requests.append((command, commandargs))
86 86 results.append((command, finalfuture, batchable, fremote))
87 87
88 88 if requests:
89 89 self._resultiter = self._remote._submitbatch(requests)
90 90
91 91 self._results = results
92 92
93 93 def results(self):
94 94 for command, finalfuture, batchable, remotefuture in self._results:
95 95 # Get the raw result, set it in the remote future, feed it
96 96 # back into the @batchable generator so it can be decoded, and
97 97 # set the result on the final future to this value.
98 98 remoteresult = next(self._resultiter)
99 99 remotefuture.set(remoteresult)
100 100 finalfuture.set(next(batchable))
101 101
102 102 # Verify our @batchable generators only emit 2 values.
103 103 try:
104 104 next(batchable)
105 105 except StopIteration:
106 106 pass
107 107 else:
108 108 raise error.ProgrammingError('%s @batchable generator emitted '
109 109 'unexpected value count' % command)
110 110
111 111 yield finalfuture.value
112 112
113 113 # Forward a couple of names from peer to make wireproto interactions
114 114 # slightly more sensible.
115 115 batchable = peer.batchable
116 116 future = peer.future
117 117
118 118 # list of nodes encoding / decoding
119 119
120 120 def decodelist(l, sep=' '):
121 121 if l:
122 122 return [bin(v) for v in l.split(sep)]
123 123 return []
124 124
125 125 def encodelist(l, sep=' '):
126 126 try:
127 127 return sep.join(map(hex, l))
128 128 except TypeError:
129 129 raise
130 130
131 131 # batched call argument encoding
132 132
133 133 def escapearg(plain):
134 134 return (plain
135 135 .replace(':', ':c')
136 136 .replace(',', ':o')
137 137 .replace(';', ':s')
138 138 .replace('=', ':e'))
139 139
140 140 def unescapearg(escaped):
141 141 return (escaped
142 142 .replace(':e', '=')
143 143 .replace(':s', ';')
144 144 .replace(':o', ',')
145 145 .replace(':c', ':'))
146 146
147 147 def encodebatchcmds(req):
148 148 """Return a ``cmds`` argument value for the ``batch`` command."""
149 149 cmds = []
150 150 for op, argsdict in req:
151 151 # Old servers didn't properly unescape argument names. So prevent
152 152 # the sending of argument names that may not be decoded properly by
153 153 # servers.
154 154 assert all(escapearg(k) == k for k in argsdict)
155 155
156 156 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
157 157 for k, v in argsdict.iteritems())
158 158 cmds.append('%s %s' % (op, args))
159 159
160 160 return ';'.join(cmds)
161 161
162 162 # mapping of options accepted by getbundle and their types
163 163 #
164 164 # Meant to be extended by extensions. It is extensions responsibility to ensure
165 165 # such options are properly processed in exchange.getbundle.
166 166 #
167 167 # supported types are:
168 168 #
169 169 # :nodes: list of binary nodes
170 170 # :csv: list of comma-separated values
171 171 # :scsv: list of comma-separated values return as set
172 172 # :plain: string with no transformation needed.
173 173 gboptsmap = {'heads': 'nodes',
174 174 'bookmarks': 'boolean',
175 175 'common': 'nodes',
176 176 'obsmarkers': 'boolean',
177 177 'phases': 'boolean',
178 178 'bundlecaps': 'scsv',
179 179 'listkeys': 'csv',
180 180 'cg': 'boolean',
181 181 'cbattempted': 'boolean',
182 182 'stream': 'boolean',
183 183 }
184 184
185 185 # client side
186 186
187 187 class wirepeer(repository.legacypeer):
188 188 """Client-side interface for communicating with a peer repository.
189 189
190 190 Methods commonly call wire protocol commands of the same name.
191 191
192 192 See also httppeer.py and sshpeer.py for protocol-specific
193 193 implementations of this interface.
194 194 """
195 195 # Begin of basewirepeer interface.
196 196
197 197 def iterbatch(self):
198 198 return remoteiterbatcher(self)
199 199
200 200 @batchable
201 201 def lookup(self, key):
202 202 self.requirecap('lookup', _('look up remote revision'))
203 203 f = future()
204 204 yield {'key': encoding.fromlocal(key)}, f
205 205 d = f.value
206 206 success, data = d[:-1].split(" ", 1)
207 207 if int(success):
208 208 yield bin(data)
209 209 else:
210 210 self._abort(error.RepoError(data))
211 211
212 212 @batchable
213 213 def heads(self):
214 214 f = future()
215 215 yield {}, f
216 216 d = f.value
217 217 try:
218 218 yield decodelist(d[:-1])
219 219 except ValueError:
220 220 self._abort(error.ResponseError(_("unexpected response:"), d))
221 221
222 222 @batchable
223 223 def known(self, nodes):
224 224 f = future()
225 225 yield {'nodes': encodelist(nodes)}, f
226 226 d = f.value
227 227 try:
228 228 yield [bool(int(b)) for b in d]
229 229 except ValueError:
230 230 self._abort(error.ResponseError(_("unexpected response:"), d))
231 231
232 232 @batchable
233 233 def branchmap(self):
234 234 f = future()
235 235 yield {}, f
236 236 d = f.value
237 237 try:
238 238 branchmap = {}
239 239 for branchpart in d.splitlines():
240 240 branchname, branchheads = branchpart.split(' ', 1)
241 241 branchname = encoding.tolocal(urlreq.unquote(branchname))
242 242 branchheads = decodelist(branchheads)
243 243 branchmap[branchname] = branchheads
244 244 yield branchmap
245 245 except TypeError:
246 246 self._abort(error.ResponseError(_("unexpected response:"), d))
247 247
248 248 @batchable
249 249 def listkeys(self, namespace):
250 250 if not self.capable('pushkey'):
251 251 yield {}, None
252 252 f = future()
253 253 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
254 254 yield {'namespace': encoding.fromlocal(namespace)}, f
255 255 d = f.value
256 256 self.ui.debug('received listkey for "%s": %i bytes\n'
257 257 % (namespace, len(d)))
258 258 yield pushkeymod.decodekeys(d)
259 259
260 260 @batchable
261 261 def pushkey(self, namespace, key, old, new):
262 262 if not self.capable('pushkey'):
263 263 yield False, None
264 264 f = future()
265 265 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
266 266 yield {'namespace': encoding.fromlocal(namespace),
267 267 'key': encoding.fromlocal(key),
268 268 'old': encoding.fromlocal(old),
269 269 'new': encoding.fromlocal(new)}, f
270 270 d = f.value
271 271 d, output = d.split('\n', 1)
272 272 try:
273 273 d = bool(int(d))
274 274 except ValueError:
275 275 raise error.ResponseError(
276 276 _('push failed (unexpected response):'), d)
277 277 for l in output.splitlines(True):
278 278 self.ui.status(_('remote: '), l)
279 279 yield d
280 280
281 281 def stream_out(self):
282 282 return self._callstream('stream_out')
283 283
284 284 def getbundle(self, source, **kwargs):
285 285 kwargs = pycompat.byteskwargs(kwargs)
286 286 self.requirecap('getbundle', _('look up remote changes'))
287 287 opts = {}
288 288 bundlecaps = kwargs.get('bundlecaps')
289 289 if bundlecaps is not None:
290 290 kwargs['bundlecaps'] = sorted(bundlecaps)
291 291 else:
292 292 bundlecaps = () # kwargs could have it to None
293 293 for key, value in kwargs.iteritems():
294 294 if value is None:
295 295 continue
296 296 keytype = gboptsmap.get(key)
297 297 if keytype is None:
298 298 raise error.ProgrammingError(
299 299 'Unexpectedly None keytype for key %s' % key)
300 300 elif keytype == 'nodes':
301 301 value = encodelist(value)
302 302 elif keytype in ('csv', 'scsv'):
303 303 value = ','.join(value)
304 304 elif keytype == 'boolean':
305 305 value = '%i' % bool(value)
306 306 elif keytype != 'plain':
307 307 raise KeyError('unknown getbundle option type %s'
308 308 % keytype)
309 309 opts[key] = value
310 310 f = self._callcompressable("getbundle", **pycompat.strkwargs(opts))
311 311 if any((cap.startswith('HG2') for cap in bundlecaps)):
312 312 return bundle2.getunbundler(self.ui, f)
313 313 else:
314 314 return changegroupmod.cg1unpacker(f, 'UN')
315 315
316 316 def unbundle(self, cg, heads, url):
317 317 '''Send cg (a readable file-like object representing the
318 318 changegroup to push, typically a chunkbuffer object) to the
319 319 remote server as a bundle.
320 320
321 321 When pushing a bundle10 stream, return an integer indicating the
322 322 result of the push (see changegroup.apply()).
323 323
324 324 When pushing a bundle20 stream, return a bundle20 stream.
325 325
326 326 `url` is the url the client thinks it's pushing to, which is
327 327 visible to hooks.
328 328 '''
329 329
330 330 if heads != ['force'] and self.capable('unbundlehash'):
331 331 heads = encodelist(['hashed',
332 332 hashlib.sha1(''.join(sorted(heads))).digest()])
333 333 else:
334 334 heads = encodelist(heads)
335 335
336 336 if util.safehasattr(cg, 'deltaheader'):
337 337 # this a bundle10, do the old style call sequence
338 338 ret, output = self._callpush("unbundle", cg, heads=heads)
339 339 if ret == "":
340 340 raise error.ResponseError(
341 341 _('push failed:'), output)
342 342 try:
343 343 ret = int(ret)
344 344 except ValueError:
345 345 raise error.ResponseError(
346 346 _('push failed (unexpected response):'), ret)
347 347
348 348 for l in output.splitlines(True):
349 349 self.ui.status(_('remote: '), l)
350 350 else:
351 351 # bundle2 push. Send a stream, fetch a stream.
352 352 stream = self._calltwowaystream('unbundle', cg, heads=heads)
353 353 ret = bundle2.getunbundler(self.ui, stream)
354 354 return ret
355 355
356 356 # End of basewirepeer interface.
357 357
358 358 # Begin of baselegacywirepeer interface.
359 359
360 360 def branches(self, nodes):
361 361 n = encodelist(nodes)
362 362 d = self._call("branches", nodes=n)
363 363 try:
364 364 br = [tuple(decodelist(b)) for b in d.splitlines()]
365 365 return br
366 366 except ValueError:
367 367 self._abort(error.ResponseError(_("unexpected response:"), d))
368 368
369 369 def between(self, pairs):
370 370 batch = 8 # avoid giant requests
371 371 r = []
372 372 for i in xrange(0, len(pairs), batch):
373 373 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
374 374 d = self._call("between", pairs=n)
375 375 try:
376 376 r.extend(l and decodelist(l) or [] for l in d.splitlines())
377 377 except ValueError:
378 378 self._abort(error.ResponseError(_("unexpected response:"), d))
379 379 return r
380 380
381 381 def changegroup(self, nodes, kind):
382 382 n = encodelist(nodes)
383 383 f = self._callcompressable("changegroup", roots=n)
384 384 return changegroupmod.cg1unpacker(f, 'UN')
385 385
386 386 def changegroupsubset(self, bases, heads, kind):
387 387 self.requirecap('changegroupsubset', _('look up remote changes'))
388 388 bases = encodelist(bases)
389 389 heads = encodelist(heads)
390 390 f = self._callcompressable("changegroupsubset",
391 391 bases=bases, heads=heads)
392 392 return changegroupmod.cg1unpacker(f, 'UN')
393 393
394 394 # End of baselegacywirepeer interface.
395 395
396 396 def _submitbatch(self, req):
397 397 """run batch request <req> on the server
398 398
399 399 Returns an iterator of the raw responses from the server.
400 400 """
401 401 ui = self.ui
402 402 if ui.debugflag and ui.configbool('devel', 'debug.peer-request'):
403 403 ui.debug('devel-peer-request: batched-content\n')
404 404 for op, args in req:
405 405 msg = 'devel-peer-request: - %s (%d arguments)\n'
406 406 ui.debug(msg % (op, len(args)))
407 407
408 408 rsp = self._callstream("batch", cmds=encodebatchcmds(req))
409 409 chunk = rsp.read(1024)
410 410 work = [chunk]
411 411 while chunk:
412 412 while ';' not in chunk and chunk:
413 413 chunk = rsp.read(1024)
414 414 work.append(chunk)
415 415 merged = ''.join(work)
416 416 while ';' in merged:
417 417 one, merged = merged.split(';', 1)
418 418 yield unescapearg(one)
419 419 chunk = rsp.read(1024)
420 420 work = [merged, chunk]
421 421 yield unescapearg(''.join(work))
422 422
423 423 def _submitone(self, op, args):
424 424 return self._call(op, **pycompat.strkwargs(args))
425 425
426 426 def debugwireargs(self, one, two, three=None, four=None, five=None):
427 427 # don't pass optional arguments left at their default value
428 428 opts = {}
429 429 if three is not None:
430 430 opts[r'three'] = three
431 431 if four is not None:
432 432 opts[r'four'] = four
433 433 return self._call('debugwireargs', one=one, two=two, **opts)
434 434
435 435 def _call(self, cmd, **args):
436 436 """execute <cmd> on the server
437 437
438 438 The command is expected to return a simple string.
439 439
440 440 returns the server reply as a string."""
441 441 raise NotImplementedError()
442 442
443 443 def _callstream(self, cmd, **args):
444 444 """execute <cmd> on the server
445 445
446 446 The command is expected to return a stream. Note that if the
447 447 command doesn't return a stream, _callstream behaves
448 448 differently for ssh and http peers.
449 449
450 450 returns the server reply as a file like object.
451 451 """
452 452 raise NotImplementedError()
453 453
454 454 def _callcompressable(self, cmd, **args):
455 455 """execute <cmd> on the server
456 456
457 457 The command is expected to return a stream.
458 458
459 459 The stream may have been compressed in some implementations. This
460 460 function takes care of the decompression. This is the only difference
461 461 with _callstream.
462 462
463 463 returns the server reply as a file like object.
464 464 """
465 465 raise NotImplementedError()
466 466
467 467 def _callpush(self, cmd, fp, **args):
468 468 """execute a <cmd> on server
469 469
470 470 The command is expected to be related to a push. Push has a special
471 471 return method.
472 472
473 473 returns the server reply as a (ret, output) tuple. ret is either
474 474 empty (error) or a stringified int.
475 475 """
476 476 raise NotImplementedError()
477 477
478 478 def _calltwowaystream(self, cmd, fp, **args):
479 479 """execute <cmd> on server
480 480
481 481 The command will send a stream to the server and get a stream in reply.
482 482 """
483 483 raise NotImplementedError()
484 484
485 485 def _abort(self, exception):
486 486 """clearly abort the wire protocol connection and raise the exception
487 487 """
488 488 raise NotImplementedError()
489 489
490 490 # server side
491 491
492 492 # wire protocol command can either return a string or one of these classes.
493 493
494 494 def getdispatchrepo(repo, proto, command):
495 495 """Obtain the repo used for processing wire protocol commands.
496 496
497 497 The intent of this function is to serve as a monkeypatch point for
498 498 extensions that need commands to operate on different repo views under
499 499 specialized circumstances.
500 500 """
501 501 return repo.filtered('served')
502 502
503 503 def dispatch(repo, proto, command):
504 504 repo = getdispatchrepo(repo, proto, command)
505 func, spec = commands[command]
505
506 transportversion = wireprototypes.TRANSPORTS[proto.name]['version']
507 commandtable = commandsv2 if transportversion == 2 else commands
508 func, spec = commandtable[command]
509
506 510 args = proto.getargs(spec)
507 511 return func(repo, proto, *args)
508 512
509 513 def options(cmd, keys, others):
510 514 opts = {}
511 515 for k in keys:
512 516 if k in others:
513 517 opts[k] = others[k]
514 518 del others[k]
515 519 if others:
516 520 procutil.stderr.write("warning: %s ignored unexpected arguments %s\n"
517 521 % (cmd, ",".join(others)))
518 522 return opts
519 523
520 524 def bundle1allowed(repo, action):
521 525 """Whether a bundle1 operation is allowed from the server.
522 526
523 527 Priority is:
524 528
525 529 1. server.bundle1gd.<action> (if generaldelta active)
526 530 2. server.bundle1.<action>
527 531 3. server.bundle1gd (if generaldelta active)
528 532 4. server.bundle1
529 533 """
530 534 ui = repo.ui
531 535 gd = 'generaldelta' in repo.requirements
532 536
533 537 if gd:
534 538 v = ui.configbool('server', 'bundle1gd.%s' % action)
535 539 if v is not None:
536 540 return v
537 541
538 542 v = ui.configbool('server', 'bundle1.%s' % action)
539 543 if v is not None:
540 544 return v
541 545
542 546 if gd:
543 547 v = ui.configbool('server', 'bundle1gd')
544 548 if v is not None:
545 549 return v
546 550
547 551 return ui.configbool('server', 'bundle1')
548 552
549 553 def supportedcompengines(ui, role):
550 554 """Obtain the list of supported compression engines for a request."""
551 555 assert role in (util.CLIENTROLE, util.SERVERROLE)
552 556
553 557 compengines = util.compengines.supportedwireengines(role)
554 558
555 559 # Allow config to override default list and ordering.
556 560 if role == util.SERVERROLE:
557 561 configengines = ui.configlist('server', 'compressionengines')
558 562 config = 'server.compressionengines'
559 563 else:
560 564 # This is currently implemented mainly to facilitate testing. In most
561 565 # cases, the server should be in charge of choosing a compression engine
562 566 # because a server has the most to lose from a sub-optimal choice. (e.g.
563 567 # CPU DoS due to an expensive engine or a network DoS due to poor
564 568 # compression ratio).
565 569 configengines = ui.configlist('experimental',
566 570 'clientcompressionengines')
567 571 config = 'experimental.clientcompressionengines'
568 572
569 573 # No explicit config. Filter out the ones that aren't supposed to be
570 574 # advertised and return default ordering.
571 575 if not configengines:
572 576 attr = 'serverpriority' if role == util.SERVERROLE else 'clientpriority'
573 577 return [e for e in compengines
574 578 if getattr(e.wireprotosupport(), attr) > 0]
575 579
576 580 # If compression engines are listed in the config, assume there is a good
577 581 # reason for it (like server operators wanting to achieve specific
578 582 # performance characteristics). So fail fast if the config references
579 583 # unusable compression engines.
580 584 validnames = set(e.name() for e in compengines)
581 585 invalidnames = set(e for e in configengines if e not in validnames)
582 586 if invalidnames:
583 587 raise error.Abort(_('invalid compression engine defined in %s: %s') %
584 588 (config, ', '.join(sorted(invalidnames))))
585 589
586 590 compengines = [e for e in compengines if e.name() in configengines]
587 591 compengines = sorted(compengines,
588 592 key=lambda e: configengines.index(e.name()))
589 593
590 594 if not compengines:
591 595 raise error.Abort(_('%s config option does not specify any known '
592 596 'compression engines') % config,
593 597 hint=_('usable compression engines: %s') %
594 598 ', '.sorted(validnames))
595 599
596 600 return compengines
597 601
598 602 class commandentry(object):
599 603 """Represents a declared wire protocol command."""
600 604 def __init__(self, func, args='', transports=None,
601 605 permission='push'):
602 606 self.func = func
603 607 self.args = args
604 608 self.transports = transports or set()
605 609 self.permission = permission
606 610
607 611 def _merge(self, func, args):
608 612 """Merge this instance with an incoming 2-tuple.
609 613
610 614 This is called when a caller using the old 2-tuple API attempts
611 615 to replace an instance. The incoming values are merged with
612 616 data not captured by the 2-tuple and a new instance containing
613 617 the union of the two objects is returned.
614 618 """
615 619 return commandentry(func, args=args, transports=set(self.transports),
616 620 permission=self.permission)
617 621
618 622 # Old code treats instances as 2-tuples. So expose that interface.
619 623 def __iter__(self):
620 624 yield self.func
621 625 yield self.args
622 626
623 627 def __getitem__(self, i):
624 628 if i == 0:
625 629 return self.func
626 630 elif i == 1:
627 631 return self.args
628 632 else:
629 633 raise IndexError('can only access elements 0 and 1')
630 634
631 635 class commanddict(dict):
632 636 """Container for registered wire protocol commands.
633 637
634 638 It behaves like a dict. But __setitem__ is overwritten to allow silent
635 639 coercion of values from 2-tuples for API compatibility.
636 640 """
637 641 def __setitem__(self, k, v):
638 642 if isinstance(v, commandentry):
639 643 pass
640 644 # Cast 2-tuples to commandentry instances.
641 645 elif isinstance(v, tuple):
642 646 if len(v) != 2:
643 647 raise ValueError('command tuples must have exactly 2 elements')
644 648
645 649 # It is common for extensions to wrap wire protocol commands via
646 650 # e.g. ``wireproto.commands[x] = (newfn, args)``. Because callers
647 651 # doing this aren't aware of the new API that uses objects to store
648 652 # command entries, we automatically merge old state with new.
649 653 if k in self:
650 654 v = self[k]._merge(v[0], v[1])
651 655 else:
652 656 # Use default values from @wireprotocommand.
653 657 v = commandentry(v[0], args=v[1],
654 658 transports=set(wireprototypes.TRANSPORTS),
655 659 permission='push')
656 660 else:
657 661 raise ValueError('command entries must be commandentry instances '
658 662 'or 2-tuples')
659 663
660 664 return super(commanddict, self).__setitem__(k, v)
661 665
662 666 def commandavailable(self, command, proto):
663 667 """Determine if a command is available for the requested protocol."""
664 668 assert proto.name in wireprototypes.TRANSPORTS
665 669
666 670 entry = self.get(command)
667 671
668 672 if not entry:
669 673 return False
670 674
671 675 if proto.name not in entry.transports:
672 676 return False
673 677
674 678 return True
675 679
676 680 # Constants specifying which transports a wire protocol command should be
677 681 # available on. For use with @wireprotocommand.
678 682 POLICY_ALL = 'all'
679 683 POLICY_V1_ONLY = 'v1-only'
680 684 POLICY_V2_ONLY = 'v2-only'
681 685
686 # For version 1 transports.
682 687 commands = commanddict()
683 688
689 # For version 2 transports.
690 commandsv2 = commanddict()
691
684 692 def wireprotocommand(name, args='', transportpolicy=POLICY_ALL,
685 693 permission='push'):
686 694 """Decorator to declare a wire protocol command.
687 695
688 696 ``name`` is the name of the wire protocol command being provided.
689 697
690 698 ``args`` is a space-delimited list of named arguments that the command
691 699 accepts. ``*`` is a special value that says to accept all arguments.
692 700
693 701 ``transportpolicy`` is a POLICY_* constant denoting which transports
694 702 this wire protocol command should be exposed to. By default, commands
695 703 are exposed to all wire protocol transports.
696 704
697 705 ``permission`` defines the permission type needed to run this command.
698 706 Can be ``push`` or ``pull``. These roughly map to read-write and read-only,
699 707 respectively. Default is to assume command requires ``push`` permissions
700 708 because otherwise commands not declaring their permissions could modify
701 709 a repository that is supposed to be read-only.
702 710 """
703 711 if transportpolicy == POLICY_ALL:
704 712 transports = set(wireprototypes.TRANSPORTS)
713 transportversions = {1, 2}
705 714 elif transportpolicy == POLICY_V1_ONLY:
706 715 transports = {k for k, v in wireprototypes.TRANSPORTS.items()
707 716 if v['version'] == 1}
717 transportversions = {1}
708 718 elif transportpolicy == POLICY_V2_ONLY:
709 719 transports = {k for k, v in wireprototypes.TRANSPORTS.items()
710 720 if v['version'] == 2}
721 transportversions = {2}
711 722 else:
712 723 raise error.ProgrammingError('invalid transport policy value: %s' %
713 724 transportpolicy)
714 725
715 726 # Because SSHv2 is a mirror of SSHv1, we allow "batch" commands through to
716 727 # SSHv2.
717 728 # TODO undo this hack when SSH is using the unified frame protocol.
718 729 if name == b'batch':
719 730 transports.add(wireprototypes.SSHV2)
720 731
721 732 if permission not in ('push', 'pull'):
722 733 raise error.ProgrammingError('invalid wire protocol permission; '
723 734 'got %s; expected "push" or "pull"' %
724 735 permission)
725 736
726 737 def register(func):
727 commands[name] = commandentry(func, args=args, transports=transports,
738 if 1 in transportversions:
739 if name in commands:
740 raise error.ProgrammingError('%s command already registered '
741 'for version 1' % name)
742 commands[name] = commandentry(func, args=args,
743 transports=transports,
728 744 permission=permission)
745 if 2 in transportversions:
746 if name in commandsv2:
747 raise error.ProgrammingError('%s command already registered '
748 'for version 2' % name)
749 commandsv2[name] = commandentry(func, args=args,
750 transports=transports,
751 permission=permission)
752
729 753 return func
730 754 return register
731 755
732 756 # TODO define a more appropriate permissions type to use for this.
733 757 @wireprotocommand('batch', 'cmds *', permission='pull',
734 758 transportpolicy=POLICY_V1_ONLY)
735 759 def batch(repo, proto, cmds, others):
736 760 repo = repo.filtered("served")
737 761 res = []
738 762 for pair in cmds.split(';'):
739 763 op, args = pair.split(' ', 1)
740 764 vals = {}
741 765 for a in args.split(','):
742 766 if a:
743 767 n, v = a.split('=')
744 768 vals[unescapearg(n)] = unescapearg(v)
745 769 func, spec = commands[op]
746 770
747 771 # Validate that client has permissions to perform this command.
748 772 perm = commands[op].permission
749 773 assert perm in ('push', 'pull')
750 774 proto.checkperm(perm)
751 775
752 776 if spec:
753 777 keys = spec.split()
754 778 data = {}
755 779 for k in keys:
756 780 if k == '*':
757 781 star = {}
758 782 for key in vals.keys():
759 783 if key not in keys:
760 784 star[key] = vals[key]
761 785 data['*'] = star
762 786 else:
763 787 data[k] = vals[k]
764 788 result = func(repo, proto, *[data[k] for k in keys])
765 789 else:
766 790 result = func(repo, proto)
767 791 if isinstance(result, wireprototypes.ooberror):
768 792 return result
769 793
770 794 # For now, all batchable commands must return bytesresponse or
771 795 # raw bytes (for backwards compatibility).
772 796 assert isinstance(result, (wireprototypes.bytesresponse, bytes))
773 797 if isinstance(result, wireprototypes.bytesresponse):
774 798 result = result.data
775 799 res.append(escapearg(result))
776 800
777 801 return wireprototypes.bytesresponse(';'.join(res))
778 802
779 803 @wireprotocommand('between', 'pairs', transportpolicy=POLICY_V1_ONLY,
780 804 permission='pull')
781 805 def between(repo, proto, pairs):
782 806 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
783 807 r = []
784 808 for b in repo.between(pairs):
785 809 r.append(encodelist(b) + "\n")
786 810
787 811 return wireprototypes.bytesresponse(''.join(r))
788 812
789 813 @wireprotocommand('branchmap', permission='pull')
790 814 def branchmap(repo, proto):
791 815 branchmap = repo.branchmap()
792 816 heads = []
793 817 for branch, nodes in branchmap.iteritems():
794 818 branchname = urlreq.quote(encoding.fromlocal(branch))
795 819 branchnodes = encodelist(nodes)
796 820 heads.append('%s %s' % (branchname, branchnodes))
797 821
798 822 return wireprototypes.bytesresponse('\n'.join(heads))
799 823
800 824 @wireprotocommand('branches', 'nodes', transportpolicy=POLICY_V1_ONLY,
801 825 permission='pull')
802 826 def branches(repo, proto, nodes):
803 827 nodes = decodelist(nodes)
804 828 r = []
805 829 for b in repo.branches(nodes):
806 830 r.append(encodelist(b) + "\n")
807 831
808 832 return wireprototypes.bytesresponse(''.join(r))
809 833
810 834 @wireprotocommand('clonebundles', '', permission='pull')
811 835 def clonebundles(repo, proto):
812 836 """Server command for returning info for available bundles to seed clones.
813 837
814 838 Clients will parse this response and determine what bundle to fetch.
815 839
816 840 Extensions may wrap this command to filter or dynamically emit data
817 841 depending on the request. e.g. you could advertise URLs for the closest
818 842 data center given the client's IP address.
819 843 """
820 844 return wireprototypes.bytesresponse(
821 845 repo.vfs.tryread('clonebundles.manifest'))
822 846
823 847 wireprotocaps = ['lookup', 'branchmap', 'pushkey',
824 848 'known', 'getbundle', 'unbundlehash']
825 849
826 850 def _capabilities(repo, proto):
827 851 """return a list of capabilities for a repo
828 852
829 853 This function exists to allow extensions to easily wrap capabilities
830 854 computation
831 855
832 856 - returns a lists: easy to alter
833 857 - change done here will be propagated to both `capabilities` and `hello`
834 858 command without any other action needed.
835 859 """
836 860 # copy to prevent modification of the global list
837 861 caps = list(wireprotocaps)
838 862
839 863 # Command of same name as capability isn't exposed to version 1 of
840 864 # transports. So conditionally add it.
841 865 if commands.commandavailable('changegroupsubset', proto):
842 866 caps.append('changegroupsubset')
843 867
844 868 if streamclone.allowservergeneration(repo):
845 869 if repo.ui.configbool('server', 'preferuncompressed'):
846 870 caps.append('stream-preferred')
847 871 requiredformats = repo.requirements & repo.supportedformats
848 872 # if our local revlogs are just revlogv1, add 'stream' cap
849 873 if not requiredformats - {'revlogv1'}:
850 874 caps.append('stream')
851 875 # otherwise, add 'streamreqs' detailing our local revlog format
852 876 else:
853 877 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
854 878 if repo.ui.configbool('experimental', 'bundle2-advertise'):
855 879 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo, role='server'))
856 880 caps.append('bundle2=' + urlreq.quote(capsblob))
857 881 caps.append('unbundle=%s' % ','.join(bundle2.bundlepriority))
858 882
859 883 return proto.addcapabilities(repo, caps)
860 884
861 885 # If you are writing an extension and consider wrapping this function. Wrap
862 886 # `_capabilities` instead.
863 887 @wireprotocommand('capabilities', permission='pull')
864 888 def capabilities(repo, proto):
865 889 return wireprototypes.bytesresponse(' '.join(_capabilities(repo, proto)))
866 890
867 891 @wireprotocommand('changegroup', 'roots', transportpolicy=POLICY_V1_ONLY,
868 892 permission='pull')
869 893 def changegroup(repo, proto, roots):
870 894 nodes = decodelist(roots)
871 895 outgoing = discovery.outgoing(repo, missingroots=nodes,
872 896 missingheads=repo.heads())
873 897 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
874 898 gen = iter(lambda: cg.read(32768), '')
875 899 return wireprototypes.streamres(gen=gen)
876 900
877 901 @wireprotocommand('changegroupsubset', 'bases heads',
878 902 transportpolicy=POLICY_V1_ONLY,
879 903 permission='pull')
880 904 def changegroupsubset(repo, proto, bases, heads):
881 905 bases = decodelist(bases)
882 906 heads = decodelist(heads)
883 907 outgoing = discovery.outgoing(repo, missingroots=bases,
884 908 missingheads=heads)
885 909 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
886 910 gen = iter(lambda: cg.read(32768), '')
887 911 return wireprototypes.streamres(gen=gen)
888 912
889 913 @wireprotocommand('debugwireargs', 'one two *',
890 914 permission='pull')
891 915 def debugwireargs(repo, proto, one, two, others):
892 916 # only accept optional args from the known set
893 917 opts = options('debugwireargs', ['three', 'four'], others)
894 918 return wireprototypes.bytesresponse(repo.debugwireargs(
895 919 one, two, **pycompat.strkwargs(opts)))
896 920
897 921 @wireprotocommand('getbundle', '*', permission='pull')
898 922 def getbundle(repo, proto, others):
899 923 opts = options('getbundle', gboptsmap.keys(), others)
900 924 for k, v in opts.iteritems():
901 925 keytype = gboptsmap[k]
902 926 if keytype == 'nodes':
903 927 opts[k] = decodelist(v)
904 928 elif keytype == 'csv':
905 929 opts[k] = list(v.split(','))
906 930 elif keytype == 'scsv':
907 931 opts[k] = set(v.split(','))
908 932 elif keytype == 'boolean':
909 933 # Client should serialize False as '0', which is a non-empty string
910 934 # so it evaluates as a True bool.
911 935 if v == '0':
912 936 opts[k] = False
913 937 else:
914 938 opts[k] = bool(v)
915 939 elif keytype != 'plain':
916 940 raise KeyError('unknown getbundle option type %s'
917 941 % keytype)
918 942
919 943 if not bundle1allowed(repo, 'pull'):
920 944 if not exchange.bundle2requested(opts.get('bundlecaps')):
921 945 if proto.name == 'http-v1':
922 946 return wireprototypes.ooberror(bundle2required)
923 947 raise error.Abort(bundle2requiredmain,
924 948 hint=bundle2requiredhint)
925 949
926 950 prefercompressed = True
927 951
928 952 try:
929 953 if repo.ui.configbool('server', 'disablefullbundle'):
930 954 # Check to see if this is a full clone.
931 955 clheads = set(repo.changelog.heads())
932 956 changegroup = opts.get('cg', True)
933 957 heads = set(opts.get('heads', set()))
934 958 common = set(opts.get('common', set()))
935 959 common.discard(nullid)
936 960 if changegroup and not common and clheads == heads:
937 961 raise error.Abort(
938 962 _('server has pull-based clones disabled'),
939 963 hint=_('remove --pull if specified or upgrade Mercurial'))
940 964
941 965 info, chunks = exchange.getbundlechunks(repo, 'serve',
942 966 **pycompat.strkwargs(opts))
943 967 prefercompressed = info.get('prefercompressed', True)
944 968 except error.Abort as exc:
945 969 # cleanly forward Abort error to the client
946 970 if not exchange.bundle2requested(opts.get('bundlecaps')):
947 971 if proto.name == 'http-v1':
948 972 return wireprototypes.ooberror(pycompat.bytestr(exc) + '\n')
949 973 raise # cannot do better for bundle1 + ssh
950 974 # bundle2 request expect a bundle2 reply
951 975 bundler = bundle2.bundle20(repo.ui)
952 976 manargs = [('message', pycompat.bytestr(exc))]
953 977 advargs = []
954 978 if exc.hint is not None:
955 979 advargs.append(('hint', exc.hint))
956 980 bundler.addpart(bundle2.bundlepart('error:abort',
957 981 manargs, advargs))
958 982 chunks = bundler.getchunks()
959 983 prefercompressed = False
960 984
961 985 return wireprototypes.streamres(
962 986 gen=chunks, prefer_uncompressed=not prefercompressed)
963 987
964 988 @wireprotocommand('heads', permission='pull')
965 989 def heads(repo, proto):
966 990 h = repo.heads()
967 991 return wireprototypes.bytesresponse(encodelist(h) + '\n')
968 992
969 993 @wireprotocommand('hello', permission='pull')
970 994 def hello(repo, proto):
971 995 """Called as part of SSH handshake to obtain server info.
972 996
973 997 Returns a list of lines describing interesting things about the
974 998 server, in an RFC822-like format.
975 999
976 1000 Currently, the only one defined is ``capabilities``, which consists of a
977 1001 line of space separated tokens describing server abilities:
978 1002
979 1003 capabilities: <token0> <token1> <token2>
980 1004 """
981 1005 caps = capabilities(repo, proto).data
982 1006 return wireprototypes.bytesresponse('capabilities: %s\n' % caps)
983 1007
984 1008 @wireprotocommand('listkeys', 'namespace', permission='pull')
985 1009 def listkeys(repo, proto, namespace):
986 1010 d = sorted(repo.listkeys(encoding.tolocal(namespace)).items())
987 1011 return wireprototypes.bytesresponse(pushkeymod.encodekeys(d))
988 1012
989 1013 @wireprotocommand('lookup', 'key', permission='pull')
990 1014 def lookup(repo, proto, key):
991 1015 try:
992 1016 k = encoding.tolocal(key)
993 1017 c = repo[k]
994 1018 r = c.hex()
995 1019 success = 1
996 1020 except Exception as inst:
997 1021 r = stringutil.forcebytestr(inst)
998 1022 success = 0
999 1023 return wireprototypes.bytesresponse('%d %s\n' % (success, r))
1000 1024
1001 1025 @wireprotocommand('known', 'nodes *', permission='pull')
1002 1026 def known(repo, proto, nodes, others):
1003 1027 v = ''.join(b and '1' or '0' for b in repo.known(decodelist(nodes)))
1004 1028 return wireprototypes.bytesresponse(v)
1005 1029
1006 1030 @wireprotocommand('pushkey', 'namespace key old new', permission='push')
1007 1031 def pushkey(repo, proto, namespace, key, old, new):
1008 1032 # compatibility with pre-1.8 clients which were accidentally
1009 1033 # sending raw binary nodes rather than utf-8-encoded hex
1010 1034 if len(new) == 20 and stringutil.escapestr(new) != new:
1011 1035 # looks like it could be a binary node
1012 1036 try:
1013 1037 new.decode('utf-8')
1014 1038 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
1015 1039 except UnicodeDecodeError:
1016 1040 pass # binary, leave unmodified
1017 1041 else:
1018 1042 new = encoding.tolocal(new) # normal path
1019 1043
1020 1044 with proto.mayberedirectstdio() as output:
1021 1045 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
1022 1046 encoding.tolocal(old), new) or False
1023 1047
1024 1048 output = output.getvalue() if output else ''
1025 1049 return wireprototypes.bytesresponse('%d\n%s' % (int(r), output))
1026 1050
1027 1051 @wireprotocommand('stream_out', permission='pull')
1028 1052 def stream(repo, proto):
1029 1053 '''If the server supports streaming clone, it advertises the "stream"
1030 1054 capability with a value representing the version and flags of the repo
1031 1055 it is serving. Client checks to see if it understands the format.
1032 1056 '''
1033 1057 return wireprototypes.streamreslegacy(
1034 1058 streamclone.generatev1wireproto(repo))
1035 1059
1036 1060 @wireprotocommand('unbundle', 'heads', permission='push')
1037 1061 def unbundle(repo, proto, heads):
1038 1062 their_heads = decodelist(heads)
1039 1063
1040 1064 with proto.mayberedirectstdio() as output:
1041 1065 try:
1042 1066 exchange.check_heads(repo, their_heads, 'preparing changes')
1043 1067
1044 1068 # write bundle data to temporary file because it can be big
1045 1069 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
1046 1070 fp = os.fdopen(fd, r'wb+')
1047 1071 r = 0
1048 1072 try:
1049 1073 proto.forwardpayload(fp)
1050 1074 fp.seek(0)
1051 1075 gen = exchange.readbundle(repo.ui, fp, None)
1052 1076 if (isinstance(gen, changegroupmod.cg1unpacker)
1053 1077 and not bundle1allowed(repo, 'push')):
1054 1078 if proto.name == 'http-v1':
1055 1079 # need to special case http because stderr do not get to
1056 1080 # the http client on failed push so we need to abuse
1057 1081 # some other error type to make sure the message get to
1058 1082 # the user.
1059 1083 return wireprototypes.ooberror(bundle2required)
1060 1084 raise error.Abort(bundle2requiredmain,
1061 1085 hint=bundle2requiredhint)
1062 1086
1063 1087 r = exchange.unbundle(repo, gen, their_heads, 'serve',
1064 1088 proto.client())
1065 1089 if util.safehasattr(r, 'addpart'):
1066 1090 # The return looks streamable, we are in the bundle2 case
1067 1091 # and should return a stream.
1068 1092 return wireprototypes.streamreslegacy(gen=r.getchunks())
1069 1093 return wireprototypes.pushres(
1070 1094 r, output.getvalue() if output else '')
1071 1095
1072 1096 finally:
1073 1097 fp.close()
1074 1098 os.unlink(tempname)
1075 1099
1076 1100 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
1077 1101 # handle non-bundle2 case first
1078 1102 if not getattr(exc, 'duringunbundle2', False):
1079 1103 try:
1080 1104 raise
1081 1105 except error.Abort:
1082 1106 # The old code we moved used procutil.stderr directly.
1083 1107 # We did not change it to minimise code change.
1084 1108 # This need to be moved to something proper.
1085 1109 # Feel free to do it.
1086 1110 procutil.stderr.write("abort: %s\n" % exc)
1087 1111 if exc.hint is not None:
1088 1112 procutil.stderr.write("(%s)\n" % exc.hint)
1089 1113 procutil.stderr.flush()
1090 1114 return wireprototypes.pushres(
1091 1115 0, output.getvalue() if output else '')
1092 1116 except error.PushRaced:
1093 1117 return wireprototypes.pusherr(
1094 1118 pycompat.bytestr(exc),
1095 1119 output.getvalue() if output else '')
1096 1120
1097 1121 bundler = bundle2.bundle20(repo.ui)
1098 1122 for out in getattr(exc, '_bundle2salvagedoutput', ()):
1099 1123 bundler.addpart(out)
1100 1124 try:
1101 1125 try:
1102 1126 raise
1103 1127 except error.PushkeyFailed as exc:
1104 1128 # check client caps
1105 1129 remotecaps = getattr(exc, '_replycaps', None)
1106 1130 if (remotecaps is not None
1107 1131 and 'pushkey' not in remotecaps.get('error', ())):
1108 1132 # no support remote side, fallback to Abort handler.
1109 1133 raise
1110 1134 part = bundler.newpart('error:pushkey')
1111 1135 part.addparam('in-reply-to', exc.partid)
1112 1136 if exc.namespace is not None:
1113 1137 part.addparam('namespace', exc.namespace,
1114 1138 mandatory=False)
1115 1139 if exc.key is not None:
1116 1140 part.addparam('key', exc.key, mandatory=False)
1117 1141 if exc.new is not None:
1118 1142 part.addparam('new', exc.new, mandatory=False)
1119 1143 if exc.old is not None:
1120 1144 part.addparam('old', exc.old, mandatory=False)
1121 1145 if exc.ret is not None:
1122 1146 part.addparam('ret', exc.ret, mandatory=False)
1123 1147 except error.BundleValueError as exc:
1124 1148 errpart = bundler.newpart('error:unsupportedcontent')
1125 1149 if exc.parttype is not None:
1126 1150 errpart.addparam('parttype', exc.parttype)
1127 1151 if exc.params:
1128 1152 errpart.addparam('params', '\0'.join(exc.params))
1129 1153 except error.Abort as exc:
1130 1154 manargs = [('message', stringutil.forcebytestr(exc))]
1131 1155 advargs = []
1132 1156 if exc.hint is not None:
1133 1157 advargs.append(('hint', exc.hint))
1134 1158 bundler.addpart(bundle2.bundlepart('error:abort',
1135 1159 manargs, advargs))
1136 1160 except error.PushRaced as exc:
1137 1161 bundler.newpart('error:pushraced',
1138 1162 [('message', stringutil.forcebytestr(exc))])
1139 1163 return wireprototypes.streamreslegacy(gen=bundler.getchunks())
@@ -1,1050 +1,1050 b''
1 1 # Copyright 21 May 2005 - (c) 2005 Jake Edge <jake@edge2.net>
2 2 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
3 3 #
4 4 # This software may be used and distributed according to the terms of the
5 5 # GNU General Public License version 2 or any later version.
6 6
7 7 from __future__ import absolute_import
8 8
9 9 import contextlib
10 10 import struct
11 11 import sys
12 12 import threading
13 13
14 14 from .i18n import _
15 15 from . import (
16 16 encoding,
17 17 error,
18 18 hook,
19 19 pycompat,
20 20 util,
21 21 wireproto,
22 22 wireprotoframing,
23 23 wireprototypes,
24 24 )
25 25 from .utils import (
26 26 procutil,
27 27 )
28 28
29 29 stringio = util.stringio
30 30
31 31 urlerr = util.urlerr
32 32 urlreq = util.urlreq
33 33
34 34 HTTP_OK = 200
35 35
36 36 HGTYPE = 'application/mercurial-0.1'
37 37 HGTYPE2 = 'application/mercurial-0.2'
38 38 HGERRTYPE = 'application/hg-error'
39 39 FRAMINGTYPE = b'application/mercurial-exp-framing-0003'
40 40
41 41 HTTPV2 = wireprototypes.HTTPV2
42 42 SSHV1 = wireprototypes.SSHV1
43 43 SSHV2 = wireprototypes.SSHV2
44 44
45 45 def decodevaluefromheaders(req, headerprefix):
46 46 """Decode a long value from multiple HTTP request headers.
47 47
48 48 Returns the value as a bytes, not a str.
49 49 """
50 50 chunks = []
51 51 i = 1
52 52 while True:
53 53 v = req.headers.get(b'%s-%d' % (headerprefix, i))
54 54 if v is None:
55 55 break
56 56 chunks.append(pycompat.bytesurl(v))
57 57 i += 1
58 58
59 59 return ''.join(chunks)
60 60
61 61 class httpv1protocolhandler(wireprototypes.baseprotocolhandler):
62 62 def __init__(self, req, ui, checkperm):
63 63 self._req = req
64 64 self._ui = ui
65 65 self._checkperm = checkperm
66 66
67 67 @property
68 68 def name(self):
69 69 return 'http-v1'
70 70
71 71 def getargs(self, args):
72 72 knownargs = self._args()
73 73 data = {}
74 74 keys = args.split()
75 75 for k in keys:
76 76 if k == '*':
77 77 star = {}
78 78 for key in knownargs.keys():
79 79 if key != 'cmd' and key not in keys:
80 80 star[key] = knownargs[key][0]
81 81 data['*'] = star
82 82 else:
83 83 data[k] = knownargs[k][0]
84 84 return [data[k] for k in keys]
85 85
86 86 def _args(self):
87 87 args = self._req.qsparams.asdictoflists()
88 88 postlen = int(self._req.headers.get(b'X-HgArgs-Post', 0))
89 89 if postlen:
90 90 args.update(urlreq.parseqs(
91 91 self._req.bodyfh.read(postlen), keep_blank_values=True))
92 92 return args
93 93
94 94 argvalue = decodevaluefromheaders(self._req, b'X-HgArg')
95 95 args.update(urlreq.parseqs(argvalue, keep_blank_values=True))
96 96 return args
97 97
98 98 def forwardpayload(self, fp):
99 99 # Existing clients *always* send Content-Length.
100 100 length = int(self._req.headers[b'Content-Length'])
101 101
102 102 # If httppostargs is used, we need to read Content-Length
103 103 # minus the amount that was consumed by args.
104 104 length -= int(self._req.headers.get(b'X-HgArgs-Post', 0))
105 105 for s in util.filechunkiter(self._req.bodyfh, limit=length):
106 106 fp.write(s)
107 107
108 108 @contextlib.contextmanager
109 109 def mayberedirectstdio(self):
110 110 oldout = self._ui.fout
111 111 olderr = self._ui.ferr
112 112
113 113 out = util.stringio()
114 114
115 115 try:
116 116 self._ui.fout = out
117 117 self._ui.ferr = out
118 118 yield out
119 119 finally:
120 120 self._ui.fout = oldout
121 121 self._ui.ferr = olderr
122 122
123 123 def client(self):
124 124 return 'remote:%s:%s:%s' % (
125 125 self._req.urlscheme,
126 126 urlreq.quote(self._req.remotehost or ''),
127 127 urlreq.quote(self._req.remoteuser or ''))
128 128
129 129 def addcapabilities(self, repo, caps):
130 130 caps.append(b'batch')
131 131
132 132 caps.append('httpheader=%d' %
133 133 repo.ui.configint('server', 'maxhttpheaderlen'))
134 134 if repo.ui.configbool('experimental', 'httppostargs'):
135 135 caps.append('httppostargs')
136 136
137 137 # FUTURE advertise 0.2rx once support is implemented
138 138 # FUTURE advertise minrx and mintx after consulting config option
139 139 caps.append('httpmediatype=0.1rx,0.1tx,0.2tx')
140 140
141 141 compengines = wireproto.supportedcompengines(repo.ui, util.SERVERROLE)
142 142 if compengines:
143 143 comptypes = ','.join(urlreq.quote(e.wireprotosupport().name)
144 144 for e in compengines)
145 145 caps.append('compression=%s' % comptypes)
146 146
147 147 return caps
148 148
149 149 def checkperm(self, perm):
150 150 return self._checkperm(perm)
151 151
152 152 # This method exists mostly so that extensions like remotefilelog can
153 153 # disable a kludgey legacy method only over http. As of early 2018,
154 154 # there are no other known users, so with any luck we can discard this
155 155 # hook if remotefilelog becomes a first-party extension.
156 156 def iscmd(cmd):
157 157 return cmd in wireproto.commands
158 158
159 159 def handlewsgirequest(rctx, req, res, checkperm):
160 160 """Possibly process a wire protocol request.
161 161
162 162 If the current request is a wire protocol request, the request is
163 163 processed by this function.
164 164
165 165 ``req`` is a ``parsedrequest`` instance.
166 166 ``res`` is a ``wsgiresponse`` instance.
167 167
168 168 Returns a bool indicating if the request was serviced. If set, the caller
169 169 should stop processing the request, as a response has already been issued.
170 170 """
171 171 # Avoid cycle involving hg module.
172 172 from .hgweb import common as hgwebcommon
173 173
174 174 repo = rctx.repo
175 175
176 176 # HTTP version 1 wire protocol requests are denoted by a "cmd" query
177 177 # string parameter. If it isn't present, this isn't a wire protocol
178 178 # request.
179 179 if 'cmd' not in req.qsparams:
180 180 return False
181 181
182 182 cmd = req.qsparams['cmd']
183 183
184 184 # The "cmd" request parameter is used by both the wire protocol and hgweb.
185 185 # While not all wire protocol commands are available for all transports,
186 186 # if we see a "cmd" value that resembles a known wire protocol command, we
187 187 # route it to a protocol handler. This is better than routing possible
188 188 # wire protocol requests to hgweb because it prevents hgweb from using
189 189 # known wire protocol commands and it is less confusing for machine
190 190 # clients.
191 191 if not iscmd(cmd):
192 192 return False
193 193
194 194 # The "cmd" query string argument is only valid on the root path of the
195 195 # repo. e.g. ``/?cmd=foo``, ``/repo?cmd=foo``. URL paths within the repo
196 196 # like ``/blah?cmd=foo`` are not allowed. So don't recognize the request
197 197 # in this case. We send an HTTP 404 for backwards compatibility reasons.
198 198 if req.dispatchpath:
199 199 res.status = hgwebcommon.statusmessage(404)
200 200 res.headers['Content-Type'] = HGTYPE
201 201 # TODO This is not a good response to issue for this request. This
202 202 # is mostly for BC for now.
203 203 res.setbodybytes('0\n%s\n' % b'Not Found')
204 204 return True
205 205
206 206 proto = httpv1protocolhandler(req, repo.ui,
207 207 lambda perm: checkperm(rctx, req, perm))
208 208
209 209 # The permissions checker should be the only thing that can raise an
210 210 # ErrorResponse. It is kind of a layer violation to catch an hgweb
211 211 # exception here. So consider refactoring into a exception type that
212 212 # is associated with the wire protocol.
213 213 try:
214 214 _callhttp(repo, req, res, proto, cmd)
215 215 except hgwebcommon.ErrorResponse as e:
216 216 for k, v in e.headers:
217 217 res.headers[k] = v
218 218 res.status = hgwebcommon.statusmessage(e.code, pycompat.bytestr(e))
219 219 # TODO This response body assumes the failed command was
220 220 # "unbundle." That assumption is not always valid.
221 221 res.setbodybytes('0\n%s\n' % pycompat.bytestr(e))
222 222
223 223 return True
224 224
225 225 def handlewsgiapirequest(rctx, req, res, checkperm):
226 226 """Handle requests to /api/*."""
227 227 assert req.dispatchparts[0] == b'api'
228 228
229 229 repo = rctx.repo
230 230
231 231 # This whole URL space is experimental for now. But we want to
232 232 # reserve the URL space. So, 404 all URLs if the feature isn't enabled.
233 233 if not repo.ui.configbool('experimental', 'web.apiserver'):
234 234 res.status = b'404 Not Found'
235 235 res.headers[b'Content-Type'] = b'text/plain'
236 236 res.setbodybytes(_('Experimental API server endpoint not enabled'))
237 237 return
238 238
239 239 # The URL space is /api/<protocol>/*. The structure of URLs under varies
240 240 # by <protocol>.
241 241
242 242 # Registered APIs are made available via config options of the name of
243 243 # the protocol.
244 244 availableapis = set()
245 245 for k, v in API_HANDLERS.items():
246 246 section, option = v['config']
247 247 if repo.ui.configbool(section, option):
248 248 availableapis.add(k)
249 249
250 250 # Requests to /api/ list available APIs.
251 251 if req.dispatchparts == [b'api']:
252 252 res.status = b'200 OK'
253 253 res.headers[b'Content-Type'] = b'text/plain'
254 254 lines = [_('APIs can be accessed at /api/<name>, where <name> can be '
255 255 'one of the following:\n')]
256 256 if availableapis:
257 257 lines.extend(sorted(availableapis))
258 258 else:
259 259 lines.append(_('(no available APIs)\n'))
260 260 res.setbodybytes(b'\n'.join(lines))
261 261 return
262 262
263 263 proto = req.dispatchparts[1]
264 264
265 265 if proto not in API_HANDLERS:
266 266 res.status = b'404 Not Found'
267 267 res.headers[b'Content-Type'] = b'text/plain'
268 268 res.setbodybytes(_('Unknown API: %s\nKnown APIs: %s') % (
269 269 proto, b', '.join(sorted(availableapis))))
270 270 return
271 271
272 272 if proto not in availableapis:
273 273 res.status = b'404 Not Found'
274 274 res.headers[b'Content-Type'] = b'text/plain'
275 275 res.setbodybytes(_('API %s not enabled\n') % proto)
276 276 return
277 277
278 278 API_HANDLERS[proto]['handler'](rctx, req, res, checkperm,
279 279 req.dispatchparts[2:])
280 280
281 281 def _handlehttpv2request(rctx, req, res, checkperm, urlparts):
282 282 from .hgweb import common as hgwebcommon
283 283
284 284 # URL space looks like: <permissions>/<command>, where <permission> can
285 285 # be ``ro`` or ``rw`` to signal read-only or read-write, respectively.
286 286
287 287 # Root URL does nothing meaningful... yet.
288 288 if not urlparts:
289 289 res.status = b'200 OK'
290 290 res.headers[b'Content-Type'] = b'text/plain'
291 291 res.setbodybytes(_('HTTP version 2 API handler'))
292 292 return
293 293
294 294 if len(urlparts) == 1:
295 295 res.status = b'404 Not Found'
296 296 res.headers[b'Content-Type'] = b'text/plain'
297 297 res.setbodybytes(_('do not know how to process %s\n') %
298 298 req.dispatchpath)
299 299 return
300 300
301 301 permission, command = urlparts[0:2]
302 302
303 303 if permission not in (b'ro', b'rw'):
304 304 res.status = b'404 Not Found'
305 305 res.headers[b'Content-Type'] = b'text/plain'
306 306 res.setbodybytes(_('unknown permission: %s') % permission)
307 307 return
308 308
309 309 if req.method != 'POST':
310 310 res.status = b'405 Method Not Allowed'
311 311 res.headers[b'Allow'] = b'POST'
312 312 res.setbodybytes(_('commands require POST requests'))
313 313 return
314 314
315 315 # At some point we'll want to use our own API instead of recycling the
316 316 # behavior of version 1 of the wire protocol...
317 317 # TODO return reasonable responses - not responses that overload the
318 318 # HTTP status line message for error reporting.
319 319 try:
320 320 checkperm(rctx, req, 'pull' if permission == b'ro' else 'push')
321 321 except hgwebcommon.ErrorResponse as e:
322 322 res.status = hgwebcommon.statusmessage(e.code, pycompat.bytestr(e))
323 323 for k, v in e.headers:
324 324 res.headers[k] = v
325 325 res.setbodybytes('permission denied')
326 326 return
327 327
328 328 # We have a special endpoint to reflect the request back at the client.
329 329 if command == b'debugreflect':
330 330 _processhttpv2reflectrequest(rctx.repo.ui, rctx.repo, req, res)
331 331 return
332 332
333 333 # Extra commands that we handle that aren't really wire protocol
334 334 # commands. Think extra hard before making this hackery available to
335 335 # extension.
336 336 extracommands = {'multirequest'}
337 337
338 if command not in wireproto.commands and command not in extracommands:
338 if command not in wireproto.commandsv2 and command not in extracommands:
339 339 res.status = b'404 Not Found'
340 340 res.headers[b'Content-Type'] = b'text/plain'
341 341 res.setbodybytes(_('unknown wire protocol command: %s\n') % command)
342 342 return
343 343
344 344 repo = rctx.repo
345 345 ui = repo.ui
346 346
347 347 proto = httpv2protocolhandler(req, ui)
348 348
349 if (not wireproto.commands.commandavailable(command, proto)
349 if (not wireproto.commandsv2.commandavailable(command, proto)
350 350 and command not in extracommands):
351 351 res.status = b'404 Not Found'
352 352 res.headers[b'Content-Type'] = b'text/plain'
353 353 res.setbodybytes(_('invalid wire protocol command: %s') % command)
354 354 return
355 355
356 356 # TODO consider cases where proxies may add additional Accept headers.
357 357 if req.headers.get(b'Accept') != FRAMINGTYPE:
358 358 res.status = b'406 Not Acceptable'
359 359 res.headers[b'Content-Type'] = b'text/plain'
360 360 res.setbodybytes(_('client MUST specify Accept header with value: %s\n')
361 361 % FRAMINGTYPE)
362 362 return
363 363
364 364 if req.headers.get(b'Content-Type') != FRAMINGTYPE:
365 365 res.status = b'415 Unsupported Media Type'
366 366 # TODO we should send a response with appropriate media type,
367 367 # since client does Accept it.
368 368 res.headers[b'Content-Type'] = b'text/plain'
369 369 res.setbodybytes(_('client MUST send Content-Type header with '
370 370 'value: %s\n') % FRAMINGTYPE)
371 371 return
372 372
373 373 _processhttpv2request(ui, repo, req, res, permission, command, proto)
374 374
375 375 def _processhttpv2reflectrequest(ui, repo, req, res):
376 376 """Reads unified frame protocol request and dumps out state to client.
377 377
378 378 This special endpoint can be used to help debug the wire protocol.
379 379
380 380 Instead of routing the request through the normal dispatch mechanism,
381 381 we instead read all frames, decode them, and feed them into our state
382 382 tracker. We then dump the log of all that activity back out to the
383 383 client.
384 384 """
385 385 import json
386 386
387 387 # Reflection APIs have a history of being abused, accidentally disclosing
388 388 # sensitive data, etc. So we have a config knob.
389 389 if not ui.configbool('experimental', 'web.api.debugreflect'):
390 390 res.status = b'404 Not Found'
391 391 res.headers[b'Content-Type'] = b'text/plain'
392 392 res.setbodybytes(_('debugreflect service not available'))
393 393 return
394 394
395 395 # We assume we have a unified framing protocol request body.
396 396
397 397 reactor = wireprotoframing.serverreactor()
398 398 states = []
399 399
400 400 while True:
401 401 frame = wireprotoframing.readframe(req.bodyfh)
402 402
403 403 if not frame:
404 404 states.append(b'received: <no frame>')
405 405 break
406 406
407 407 states.append(b'received: %d %d %d %s' % (frame.typeid, frame.flags,
408 408 frame.requestid,
409 409 frame.payload))
410 410
411 411 action, meta = reactor.onframerecv(frame)
412 412 states.append(json.dumps((action, meta), sort_keys=True,
413 413 separators=(', ', ': ')))
414 414
415 415 action, meta = reactor.oninputeof()
416 416 meta['action'] = action
417 417 states.append(json.dumps(meta, sort_keys=True, separators=(', ',': ')))
418 418
419 419 res.status = b'200 OK'
420 420 res.headers[b'Content-Type'] = b'text/plain'
421 421 res.setbodybytes(b'\n'.join(states))
422 422
423 423 def _processhttpv2request(ui, repo, req, res, authedperm, reqcommand, proto):
424 424 """Post-validation handler for HTTPv2 requests.
425 425
426 426 Called when the HTTP request contains unified frame-based protocol
427 427 frames for evaluation.
428 428 """
429 429 # TODO Some HTTP clients are full duplex and can receive data before
430 430 # the entire request is transmitted. Figure out a way to indicate support
431 431 # for that so we can opt into full duplex mode.
432 432 reactor = wireprotoframing.serverreactor(deferoutput=True)
433 433 seencommand = False
434 434
435 435 outstream = reactor.makeoutputstream()
436 436
437 437 while True:
438 438 frame = wireprotoframing.readframe(req.bodyfh)
439 439 if not frame:
440 440 break
441 441
442 442 action, meta = reactor.onframerecv(frame)
443 443
444 444 if action == 'wantframe':
445 445 # Need more data before we can do anything.
446 446 continue
447 447 elif action == 'runcommand':
448 448 sentoutput = _httpv2runcommand(ui, repo, req, res, authedperm,
449 449 reqcommand, reactor, outstream,
450 450 meta, issubsequent=seencommand)
451 451
452 452 if sentoutput:
453 453 return
454 454
455 455 seencommand = True
456 456
457 457 elif action == 'error':
458 458 # TODO define proper error mechanism.
459 459 res.status = b'200 OK'
460 460 res.headers[b'Content-Type'] = b'text/plain'
461 461 res.setbodybytes(meta['message'] + b'\n')
462 462 return
463 463 else:
464 464 raise error.ProgrammingError(
465 465 'unhandled action from frame processor: %s' % action)
466 466
467 467 action, meta = reactor.oninputeof()
468 468 if action == 'sendframes':
469 469 # We assume we haven't started sending the response yet. If we're
470 470 # wrong, the response type will raise an exception.
471 471 res.status = b'200 OK'
472 472 res.headers[b'Content-Type'] = FRAMINGTYPE
473 473 res.setbodygen(meta['framegen'])
474 474 elif action == 'noop':
475 475 pass
476 476 else:
477 477 raise error.ProgrammingError('unhandled action from frame processor: %s'
478 478 % action)
479 479
480 480 def _httpv2runcommand(ui, repo, req, res, authedperm, reqcommand, reactor,
481 481 outstream, command, issubsequent):
482 482 """Dispatch a wire protocol command made from HTTPv2 requests.
483 483
484 484 The authenticated permission (``authedperm``) along with the original
485 485 command from the URL (``reqcommand``) are passed in.
486 486 """
487 487 # We already validated that the session has permissions to perform the
488 488 # actions in ``authedperm``. In the unified frame protocol, the canonical
489 489 # command to run is expressed in a frame. However, the URL also requested
490 490 # to run a specific command. We need to be careful that the command we
491 491 # run doesn't have permissions requirements greater than what was granted
492 492 # by ``authedperm``.
493 493 #
494 494 # Our rule for this is we only allow one command per HTTP request and
495 495 # that command must match the command in the URL. However, we make
496 496 # an exception for the ``multirequest`` URL. This URL is allowed to
497 497 # execute multiple commands. We double check permissions of each command
498 498 # as it is invoked to ensure there is no privilege escalation.
499 499 # TODO consider allowing multiple commands to regular command URLs
500 500 # iff each command is the same.
501 501
502 502 proto = httpv2protocolhandler(req, ui, args=command['args'])
503 503
504 504 if reqcommand == b'multirequest':
505 if not wireproto.commands.commandavailable(command['command'], proto):
505 if not wireproto.commandsv2.commandavailable(command['command'], proto):
506 506 # TODO proper error mechanism
507 507 res.status = b'200 OK'
508 508 res.headers[b'Content-Type'] = b'text/plain'
509 509 res.setbodybytes(_('wire protocol command not available: %s') %
510 510 command['command'])
511 511 return True
512 512
513 513 # TODO don't use assert here, since it may be elided by -O.
514 514 assert authedperm in (b'ro', b'rw')
515 wirecommand = wireproto.commands[command['command']]
515 wirecommand = wireproto.commandsv2[command['command']]
516 516 assert wirecommand.permission in ('push', 'pull')
517 517
518 518 if authedperm == b'ro' and wirecommand.permission != 'pull':
519 519 # TODO proper error mechanism
520 520 res.status = b'403 Forbidden'
521 521 res.headers[b'Content-Type'] = b'text/plain'
522 522 res.setbodybytes(_('insufficient permissions to execute '
523 523 'command: %s') % command['command'])
524 524 return True
525 525
526 526 # TODO should we also call checkperm() here? Maybe not if we're going
527 527 # to overhaul that API. The granted scope from the URL check should
528 528 # be good enough.
529 529
530 530 else:
531 531 # Don't allow multiple commands outside of ``multirequest`` URL.
532 532 if issubsequent:
533 533 # TODO proper error mechanism
534 534 res.status = b'200 OK'
535 535 res.headers[b'Content-Type'] = b'text/plain'
536 536 res.setbodybytes(_('multiple commands cannot be issued to this '
537 537 'URL'))
538 538 return True
539 539
540 540 if reqcommand != command['command']:
541 541 # TODO define proper error mechanism
542 542 res.status = b'200 OK'
543 543 res.headers[b'Content-Type'] = b'text/plain'
544 544 res.setbodybytes(_('command in frame must match command in URL'))
545 545 return True
546 546
547 547 rsp = wireproto.dispatch(repo, proto, command['command'])
548 548
549 549 res.status = b'200 OK'
550 550 res.headers[b'Content-Type'] = FRAMINGTYPE
551 551
552 552 if isinstance(rsp, wireprototypes.bytesresponse):
553 553 action, meta = reactor.onbytesresponseready(outstream,
554 554 command['requestid'],
555 555 rsp.data)
556 556 else:
557 557 action, meta = reactor.onapplicationerror(
558 558 _('unhandled response type from wire proto command'))
559 559
560 560 if action == 'sendframes':
561 561 res.setbodygen(meta['framegen'])
562 562 return True
563 563 elif action == 'noop':
564 564 return False
565 565 else:
566 566 raise error.ProgrammingError('unhandled event from reactor: %s' %
567 567 action)
568 568
569 569 # Maps API name to metadata so custom API can be registered.
570 570 API_HANDLERS = {
571 571 HTTPV2: {
572 572 'config': ('experimental', 'web.api.http-v2'),
573 573 'handler': _handlehttpv2request,
574 574 },
575 575 }
576 576
577 577 class httpv2protocolhandler(wireprototypes.baseprotocolhandler):
578 578 def __init__(self, req, ui, args=None):
579 579 self._req = req
580 580 self._ui = ui
581 581 self._args = args
582 582
583 583 @property
584 584 def name(self):
585 585 return HTTPV2
586 586
587 587 def getargs(self, args):
588 588 data = {}
589 589 for k in args.split():
590 590 if k == '*':
591 591 raise NotImplementedError('do not support * args')
592 592 else:
593 593 data[k] = self._args[k]
594 594
595 595 return [data[k] for k in args.split()]
596 596
597 597 def forwardpayload(self, fp):
598 598 raise NotImplementedError
599 599
600 600 @contextlib.contextmanager
601 601 def mayberedirectstdio(self):
602 602 raise NotImplementedError
603 603
604 604 def client(self):
605 605 raise NotImplementedError
606 606
607 607 def addcapabilities(self, repo, caps):
608 608 return caps
609 609
610 610 def checkperm(self, perm):
611 611 raise NotImplementedError
612 612
613 613 def _httpresponsetype(ui, req, prefer_uncompressed):
614 614 """Determine the appropriate response type and compression settings.
615 615
616 616 Returns a tuple of (mediatype, compengine, engineopts).
617 617 """
618 618 # Determine the response media type and compression engine based
619 619 # on the request parameters.
620 620 protocaps = decodevaluefromheaders(req, 'X-HgProto').split(' ')
621 621
622 622 if '0.2' in protocaps:
623 623 # All clients are expected to support uncompressed data.
624 624 if prefer_uncompressed:
625 625 return HGTYPE2, util._noopengine(), {}
626 626
627 627 # Default as defined by wire protocol spec.
628 628 compformats = ['zlib', 'none']
629 629 for cap in protocaps:
630 630 if cap.startswith('comp='):
631 631 compformats = cap[5:].split(',')
632 632 break
633 633
634 634 # Now find an agreed upon compression format.
635 635 for engine in wireproto.supportedcompengines(ui, util.SERVERROLE):
636 636 if engine.wireprotosupport().name in compformats:
637 637 opts = {}
638 638 level = ui.configint('server', '%slevel' % engine.name())
639 639 if level is not None:
640 640 opts['level'] = level
641 641
642 642 return HGTYPE2, engine, opts
643 643
644 644 # No mutually supported compression format. Fall back to the
645 645 # legacy protocol.
646 646
647 647 # Don't allow untrusted settings because disabling compression or
648 648 # setting a very high compression level could lead to flooding
649 649 # the server's network or CPU.
650 650 opts = {'level': ui.configint('server', 'zliblevel')}
651 651 return HGTYPE, util.compengines['zlib'], opts
652 652
653 653 def _callhttp(repo, req, res, proto, cmd):
654 654 # Avoid cycle involving hg module.
655 655 from .hgweb import common as hgwebcommon
656 656
657 657 def genversion2(gen, engine, engineopts):
658 658 # application/mercurial-0.2 always sends a payload header
659 659 # identifying the compression engine.
660 660 name = engine.wireprotosupport().name
661 661 assert 0 < len(name) < 256
662 662 yield struct.pack('B', len(name))
663 663 yield name
664 664
665 665 for chunk in gen:
666 666 yield chunk
667 667
668 668 def setresponse(code, contenttype, bodybytes=None, bodygen=None):
669 669 if code == HTTP_OK:
670 670 res.status = '200 Script output follows'
671 671 else:
672 672 res.status = hgwebcommon.statusmessage(code)
673 673
674 674 res.headers['Content-Type'] = contenttype
675 675
676 676 if bodybytes is not None:
677 677 res.setbodybytes(bodybytes)
678 678 if bodygen is not None:
679 679 res.setbodygen(bodygen)
680 680
681 681 if not wireproto.commands.commandavailable(cmd, proto):
682 682 setresponse(HTTP_OK, HGERRTYPE,
683 683 _('requested wire protocol command is not available over '
684 684 'HTTP'))
685 685 return
686 686
687 687 proto.checkperm(wireproto.commands[cmd].permission)
688 688
689 689 rsp = wireproto.dispatch(repo, proto, cmd)
690 690
691 691 if isinstance(rsp, bytes):
692 692 setresponse(HTTP_OK, HGTYPE, bodybytes=rsp)
693 693 elif isinstance(rsp, wireprototypes.bytesresponse):
694 694 setresponse(HTTP_OK, HGTYPE, bodybytes=rsp.data)
695 695 elif isinstance(rsp, wireprototypes.streamreslegacy):
696 696 setresponse(HTTP_OK, HGTYPE, bodygen=rsp.gen)
697 697 elif isinstance(rsp, wireprototypes.streamres):
698 698 gen = rsp.gen
699 699
700 700 # This code for compression should not be streamres specific. It
701 701 # is here because we only compress streamres at the moment.
702 702 mediatype, engine, engineopts = _httpresponsetype(
703 703 repo.ui, req, rsp.prefer_uncompressed)
704 704 gen = engine.compressstream(gen, engineopts)
705 705
706 706 if mediatype == HGTYPE2:
707 707 gen = genversion2(gen, engine, engineopts)
708 708
709 709 setresponse(HTTP_OK, mediatype, bodygen=gen)
710 710 elif isinstance(rsp, wireprototypes.pushres):
711 711 rsp = '%d\n%s' % (rsp.res, rsp.output)
712 712 setresponse(HTTP_OK, HGTYPE, bodybytes=rsp)
713 713 elif isinstance(rsp, wireprototypes.pusherr):
714 714 rsp = '0\n%s\n' % rsp.res
715 715 res.drain = True
716 716 setresponse(HTTP_OK, HGTYPE, bodybytes=rsp)
717 717 elif isinstance(rsp, wireprototypes.ooberror):
718 718 setresponse(HTTP_OK, HGERRTYPE, bodybytes=rsp.message)
719 719 else:
720 720 raise error.ProgrammingError('hgweb.protocol internal failure', rsp)
721 721
722 722 def _sshv1respondbytes(fout, value):
723 723 """Send a bytes response for protocol version 1."""
724 724 fout.write('%d\n' % len(value))
725 725 fout.write(value)
726 726 fout.flush()
727 727
728 728 def _sshv1respondstream(fout, source):
729 729 write = fout.write
730 730 for chunk in source.gen:
731 731 write(chunk)
732 732 fout.flush()
733 733
734 734 def _sshv1respondooberror(fout, ferr, rsp):
735 735 ferr.write(b'%s\n-\n' % rsp)
736 736 ferr.flush()
737 737 fout.write(b'\n')
738 738 fout.flush()
739 739
740 740 class sshv1protocolhandler(wireprototypes.baseprotocolhandler):
741 741 """Handler for requests services via version 1 of SSH protocol."""
742 742 def __init__(self, ui, fin, fout):
743 743 self._ui = ui
744 744 self._fin = fin
745 745 self._fout = fout
746 746
747 747 @property
748 748 def name(self):
749 749 return wireprototypes.SSHV1
750 750
751 751 def getargs(self, args):
752 752 data = {}
753 753 keys = args.split()
754 754 for n in xrange(len(keys)):
755 755 argline = self._fin.readline()[:-1]
756 756 arg, l = argline.split()
757 757 if arg not in keys:
758 758 raise error.Abort(_("unexpected parameter %r") % arg)
759 759 if arg == '*':
760 760 star = {}
761 761 for k in xrange(int(l)):
762 762 argline = self._fin.readline()[:-1]
763 763 arg, l = argline.split()
764 764 val = self._fin.read(int(l))
765 765 star[arg] = val
766 766 data['*'] = star
767 767 else:
768 768 val = self._fin.read(int(l))
769 769 data[arg] = val
770 770 return [data[k] for k in keys]
771 771
772 772 def forwardpayload(self, fpout):
773 773 # We initially send an empty response. This tells the client it is
774 774 # OK to start sending data. If a client sees any other response, it
775 775 # interprets it as an error.
776 776 _sshv1respondbytes(self._fout, b'')
777 777
778 778 # The file is in the form:
779 779 #
780 780 # <chunk size>\n<chunk>
781 781 # ...
782 782 # 0\n
783 783 count = int(self._fin.readline())
784 784 while count:
785 785 fpout.write(self._fin.read(count))
786 786 count = int(self._fin.readline())
787 787
788 788 @contextlib.contextmanager
789 789 def mayberedirectstdio(self):
790 790 yield None
791 791
792 792 def client(self):
793 793 client = encoding.environ.get('SSH_CLIENT', '').split(' ', 1)[0]
794 794 return 'remote:ssh:' + client
795 795
796 796 def addcapabilities(self, repo, caps):
797 797 caps.append(b'batch')
798 798 return caps
799 799
800 800 def checkperm(self, perm):
801 801 pass
802 802
803 803 class sshv2protocolhandler(sshv1protocolhandler):
804 804 """Protocol handler for version 2 of the SSH protocol."""
805 805
806 806 @property
807 807 def name(self):
808 808 return wireprototypes.SSHV2
809 809
810 810 def _runsshserver(ui, repo, fin, fout, ev):
811 811 # This function operates like a state machine of sorts. The following
812 812 # states are defined:
813 813 #
814 814 # protov1-serving
815 815 # Server is in protocol version 1 serving mode. Commands arrive on
816 816 # new lines. These commands are processed in this state, one command
817 817 # after the other.
818 818 #
819 819 # protov2-serving
820 820 # Server is in protocol version 2 serving mode.
821 821 #
822 822 # upgrade-initial
823 823 # The server is going to process an upgrade request.
824 824 #
825 825 # upgrade-v2-filter-legacy-handshake
826 826 # The protocol is being upgraded to version 2. The server is expecting
827 827 # the legacy handshake from version 1.
828 828 #
829 829 # upgrade-v2-finish
830 830 # The upgrade to version 2 of the protocol is imminent.
831 831 #
832 832 # shutdown
833 833 # The server is shutting down, possibly in reaction to a client event.
834 834 #
835 835 # And here are their transitions:
836 836 #
837 837 # protov1-serving -> shutdown
838 838 # When server receives an empty request or encounters another
839 839 # error.
840 840 #
841 841 # protov1-serving -> upgrade-initial
842 842 # An upgrade request line was seen.
843 843 #
844 844 # upgrade-initial -> upgrade-v2-filter-legacy-handshake
845 845 # Upgrade to version 2 in progress. Server is expecting to
846 846 # process a legacy handshake.
847 847 #
848 848 # upgrade-v2-filter-legacy-handshake -> shutdown
849 849 # Client did not fulfill upgrade handshake requirements.
850 850 #
851 851 # upgrade-v2-filter-legacy-handshake -> upgrade-v2-finish
852 852 # Client fulfilled version 2 upgrade requirements. Finishing that
853 853 # upgrade.
854 854 #
855 855 # upgrade-v2-finish -> protov2-serving
856 856 # Protocol upgrade to version 2 complete. Server can now speak protocol
857 857 # version 2.
858 858 #
859 859 # protov2-serving -> protov1-serving
860 860 # Ths happens by default since protocol version 2 is the same as
861 861 # version 1 except for the handshake.
862 862
863 863 state = 'protov1-serving'
864 864 proto = sshv1protocolhandler(ui, fin, fout)
865 865 protoswitched = False
866 866
867 867 while not ev.is_set():
868 868 if state == 'protov1-serving':
869 869 # Commands are issued on new lines.
870 870 request = fin.readline()[:-1]
871 871
872 872 # Empty lines signal to terminate the connection.
873 873 if not request:
874 874 state = 'shutdown'
875 875 continue
876 876
877 877 # It looks like a protocol upgrade request. Transition state to
878 878 # handle it.
879 879 if request.startswith(b'upgrade '):
880 880 if protoswitched:
881 881 _sshv1respondooberror(fout, ui.ferr,
882 882 b'cannot upgrade protocols multiple '
883 883 b'times')
884 884 state = 'shutdown'
885 885 continue
886 886
887 887 state = 'upgrade-initial'
888 888 continue
889 889
890 890 available = wireproto.commands.commandavailable(request, proto)
891 891
892 892 # This command isn't available. Send an empty response and go
893 893 # back to waiting for a new command.
894 894 if not available:
895 895 _sshv1respondbytes(fout, b'')
896 896 continue
897 897
898 898 rsp = wireproto.dispatch(repo, proto, request)
899 899
900 900 if isinstance(rsp, bytes):
901 901 _sshv1respondbytes(fout, rsp)
902 902 elif isinstance(rsp, wireprototypes.bytesresponse):
903 903 _sshv1respondbytes(fout, rsp.data)
904 904 elif isinstance(rsp, wireprototypes.streamres):
905 905 _sshv1respondstream(fout, rsp)
906 906 elif isinstance(rsp, wireprototypes.streamreslegacy):
907 907 _sshv1respondstream(fout, rsp)
908 908 elif isinstance(rsp, wireprototypes.pushres):
909 909 _sshv1respondbytes(fout, b'')
910 910 _sshv1respondbytes(fout, b'%d' % rsp.res)
911 911 elif isinstance(rsp, wireprototypes.pusherr):
912 912 _sshv1respondbytes(fout, rsp.res)
913 913 elif isinstance(rsp, wireprototypes.ooberror):
914 914 _sshv1respondooberror(fout, ui.ferr, rsp.message)
915 915 else:
916 916 raise error.ProgrammingError('unhandled response type from '
917 917 'wire protocol command: %s' % rsp)
918 918
919 919 # For now, protocol version 2 serving just goes back to version 1.
920 920 elif state == 'protov2-serving':
921 921 state = 'protov1-serving'
922 922 continue
923 923
924 924 elif state == 'upgrade-initial':
925 925 # We should never transition into this state if we've switched
926 926 # protocols.
927 927 assert not protoswitched
928 928 assert proto.name == wireprototypes.SSHV1
929 929
930 930 # Expected: upgrade <token> <capabilities>
931 931 # If we get something else, the request is malformed. It could be
932 932 # from a future client that has altered the upgrade line content.
933 933 # We treat this as an unknown command.
934 934 try:
935 935 token, caps = request.split(b' ')[1:]
936 936 except ValueError:
937 937 _sshv1respondbytes(fout, b'')
938 938 state = 'protov1-serving'
939 939 continue
940 940
941 941 # Send empty response if we don't support upgrading protocols.
942 942 if not ui.configbool('experimental', 'sshserver.support-v2'):
943 943 _sshv1respondbytes(fout, b'')
944 944 state = 'protov1-serving'
945 945 continue
946 946
947 947 try:
948 948 caps = urlreq.parseqs(caps)
949 949 except ValueError:
950 950 _sshv1respondbytes(fout, b'')
951 951 state = 'protov1-serving'
952 952 continue
953 953
954 954 # We don't see an upgrade request to protocol version 2. Ignore
955 955 # the upgrade request.
956 956 wantedprotos = caps.get(b'proto', [b''])[0]
957 957 if SSHV2 not in wantedprotos:
958 958 _sshv1respondbytes(fout, b'')
959 959 state = 'protov1-serving'
960 960 continue
961 961
962 962 # It looks like we can honor this upgrade request to protocol 2.
963 963 # Filter the rest of the handshake protocol request lines.
964 964 state = 'upgrade-v2-filter-legacy-handshake'
965 965 continue
966 966
967 967 elif state == 'upgrade-v2-filter-legacy-handshake':
968 968 # Client should have sent legacy handshake after an ``upgrade``
969 969 # request. Expected lines:
970 970 #
971 971 # hello
972 972 # between
973 973 # pairs 81
974 974 # 0000...-0000...
975 975
976 976 ok = True
977 977 for line in (b'hello', b'between', b'pairs 81'):
978 978 request = fin.readline()[:-1]
979 979
980 980 if request != line:
981 981 _sshv1respondooberror(fout, ui.ferr,
982 982 b'malformed handshake protocol: '
983 983 b'missing %s' % line)
984 984 ok = False
985 985 state = 'shutdown'
986 986 break
987 987
988 988 if not ok:
989 989 continue
990 990
991 991 request = fin.read(81)
992 992 if request != b'%s-%s' % (b'0' * 40, b'0' * 40):
993 993 _sshv1respondooberror(fout, ui.ferr,
994 994 b'malformed handshake protocol: '
995 995 b'missing between argument value')
996 996 state = 'shutdown'
997 997 continue
998 998
999 999 state = 'upgrade-v2-finish'
1000 1000 continue
1001 1001
1002 1002 elif state == 'upgrade-v2-finish':
1003 1003 # Send the upgrade response.
1004 1004 fout.write(b'upgraded %s %s\n' % (token, SSHV2))
1005 1005 servercaps = wireproto.capabilities(repo, proto)
1006 1006 rsp = b'capabilities: %s' % servercaps.data
1007 1007 fout.write(b'%d\n%s\n' % (len(rsp), rsp))
1008 1008 fout.flush()
1009 1009
1010 1010 proto = sshv2protocolhandler(ui, fin, fout)
1011 1011 protoswitched = True
1012 1012
1013 1013 state = 'protov2-serving'
1014 1014 continue
1015 1015
1016 1016 elif state == 'shutdown':
1017 1017 break
1018 1018
1019 1019 else:
1020 1020 raise error.ProgrammingError('unhandled ssh server state: %s' %
1021 1021 state)
1022 1022
1023 1023 class sshserver(object):
1024 1024 def __init__(self, ui, repo, logfh=None):
1025 1025 self._ui = ui
1026 1026 self._repo = repo
1027 1027 self._fin = ui.fin
1028 1028 self._fout = ui.fout
1029 1029
1030 1030 # Log write I/O to stdout and stderr if configured.
1031 1031 if logfh:
1032 1032 self._fout = util.makeloggingfileobject(
1033 1033 logfh, self._fout, 'o', logdata=True)
1034 1034 ui.ferr = util.makeloggingfileobject(
1035 1035 logfh, ui.ferr, 'e', logdata=True)
1036 1036
1037 1037 hook.redirect(True)
1038 1038 ui.fout = repo.ui.fout = ui.ferr
1039 1039
1040 1040 # Prevent insertion/deletion of CRs
1041 1041 procutil.setbinary(self._fin)
1042 1042 procutil.setbinary(self._fout)
1043 1043
1044 1044 def serve_forever(self):
1045 1045 self.serveuntil(threading.Event())
1046 1046 sys.exit(0)
1047 1047
1048 1048 def serveuntil(self, ev):
1049 1049 """Serve until a threading.Event is set."""
1050 1050 _runsshserver(self._ui, self._repo, self._fin, self._fout, ev)
@@ -1,95 +1,102 b''
1 1 from __future__ import absolute_import, print_function
2 2
3 3 from mercurial import (
4 4 error,
5 5 pycompat,
6 6 ui as uimod,
7 7 util,
8 8 wireproto,
9 9 wireprototypes,
10 10 )
11 11 stringio = util.stringio
12 12
13 13 class proto(object):
14 14 def __init__(self, args):
15 15 self.args = args
16 self.name = 'dummyproto'
17
16 18 def getargs(self, spec):
17 19 args = self.args
18 20 args.setdefault(b'*', {})
19 21 names = spec.split()
20 22 return [args[n] for n in names]
21 23
22 24 def checkperm(self, perm):
23 25 pass
24 26
27 wireprototypes.TRANSPORTS['dummyproto'] = {
28 'transport': 'dummy',
29 'version': 1,
30 }
31
25 32 class clientpeer(wireproto.wirepeer):
26 33 def __init__(self, serverrepo, ui):
27 34 self.serverrepo = serverrepo
28 35 self._ui = ui
29 36
30 37 @property
31 38 def ui(self):
32 39 return self._ui
33 40
34 41 def url(self):
35 42 return b'test'
36 43
37 44 def local(self):
38 45 return None
39 46
40 47 def peer(self):
41 48 return self
42 49
43 50 def canpush(self):
44 51 return True
45 52
46 53 def close(self):
47 54 pass
48 55
49 56 def capabilities(self):
50 57 return [b'batch']
51 58
52 59 def _call(self, cmd, **args):
53 60 args = pycompat.byteskwargs(args)
54 61 res = wireproto.dispatch(self.serverrepo, proto(args), cmd)
55 62 if isinstance(res, wireprototypes.bytesresponse):
56 63 return res.data
57 64 elif isinstance(res, bytes):
58 65 return res
59 66 else:
60 67 raise error.Abort('dummy client does not support response type')
61 68
62 69 def _callstream(self, cmd, **args):
63 70 return stringio(self._call(cmd, **args))
64 71
65 72 @wireproto.batchable
66 73 def greet(self, name):
67 74 f = wireproto.future()
68 75 yield {b'name': mangle(name)}, f
69 76 yield unmangle(f.value)
70 77
71 78 class serverrepo(object):
72 79 def greet(self, name):
73 80 return b"Hello, " + name
74 81
75 82 def filtered(self, name):
76 83 return self
77 84
78 85 def mangle(s):
79 86 return b''.join(pycompat.bytechr(ord(c) + 1) for c in pycompat.bytestr(s))
80 87 def unmangle(s):
81 88 return b''.join(pycompat.bytechr(ord(c) - 1) for c in pycompat.bytestr(s))
82 89
83 90 def greet(repo, proto, name):
84 91 return mangle(repo.greet(unmangle(name)))
85 92
86 93 wireproto.commands[b'greet'] = (greet, b'name',)
87 94
88 95 srv = serverrepo()
89 96 clt = clientpeer(srv, uimod.ui())
90 97
91 98 print(clt.greet(b"Foobar"))
92 99 b = clt.iterbatch()
93 100 list(map(b.greet, (b'Fo, =;:<o', b'Bar')))
94 101 b.submit()
95 102 print([r for r in b.results()])
General Comments 0
You need to be logged in to leave comments. Login now