##// END OF EJS Templates
wireproto: use new peer interface...
Gregory Szorc -
r33805:dedab036 default
parent child Browse files
Show More
@@ -1,426 +1,422 b''
1 1 # httppeer.py - HTTP repository proxy classes for mercurial
2 2 #
3 3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
4 4 # Copyright 2006 Vadim Gelfer <vadim.gelfer@gmail.com>
5 5 #
6 6 # This software may be used and distributed according to the terms of the
7 7 # GNU General Public License version 2 or any later version.
8 8
9 9 from __future__ import absolute_import
10 10
11 11 import errno
12 12 import os
13 13 import socket
14 14 import struct
15 15 import tempfile
16 16
17 17 from .i18n import _
18 18 from .node import nullid
19 19 from . import (
20 20 bundle2,
21 21 error,
22 22 httpconnection,
23 23 pycompat,
24 repository,
25 24 statichttprepo,
26 25 url,
27 26 util,
28 27 wireproto,
29 28 )
30 29
31 30 httplib = util.httplib
32 31 urlerr = util.urlerr
33 32 urlreq = util.urlreq
34 33
35 34 def encodevalueinheaders(value, header, limit):
36 35 """Encode a string value into multiple HTTP headers.
37 36
38 37 ``value`` will be encoded into 1 or more HTTP headers with the names
39 38 ``header-<N>`` where ``<N>`` is an integer starting at 1. Each header
40 39 name + value will be at most ``limit`` bytes long.
41 40
42 41 Returns an iterable of 2-tuples consisting of header names and values.
43 42 """
44 43 fmt = header + '-%s'
45 44 valuelen = limit - len(fmt % '000') - len(': \r\n')
46 45 result = []
47 46
48 47 n = 0
49 48 for i in xrange(0, len(value), valuelen):
50 49 n += 1
51 50 result.append((fmt % str(n), value[i:i + valuelen]))
52 51
53 52 return result
54 53
55 54 def _wraphttpresponse(resp):
56 55 """Wrap an HTTPResponse with common error handlers.
57 56
58 57 This ensures that any I/O from any consumer raises the appropriate
59 58 error and messaging.
60 59 """
61 60 origread = resp.read
62 61
63 62 class readerproxy(resp.__class__):
64 63 def read(self, size=None):
65 64 try:
66 65 return origread(size)
67 66 except httplib.IncompleteRead as e:
68 67 # e.expected is an integer if length known or None otherwise.
69 68 if e.expected:
70 69 msg = _('HTTP request error (incomplete response; '
71 70 'expected %d bytes got %d)') % (e.expected,
72 71 len(e.partial))
73 72 else:
74 73 msg = _('HTTP request error (incomplete response)')
75 74
76 75 raise error.PeerTransportError(
77 76 msg,
78 77 hint=_('this may be an intermittent network failure; '
79 78 'if the error persists, consider contacting the '
80 79 'network or server operator'))
81 80 except httplib.HTTPException as e:
82 81 raise error.PeerTransportError(
83 82 _('HTTP request error (%s)') % e,
84 83 hint=_('this may be an intermittent network failure; '
85 84 'if the error persists, consider contacting the '
86 85 'network or server operator'))
87 86
88 87 resp.__class__ = readerproxy
89 88
90 class httppeer(wireproto.wirepeer, repository.legacypeer):
89 class httppeer(wireproto.wirepeer):
91 90 def __init__(self, ui, path):
92 91 self._path = path
93 92 self._caps = None
94 93 self._urlopener = None
95 94 self._requestbuilder = None
96 95 u = util.url(path)
97 96 if u.query or u.fragment:
98 97 raise error.Abort(_('unsupported URL component: "%s"') %
99 98 (u.query or u.fragment))
100 99
101 100 # urllib cannot handle URLs with embedded user or passwd
102 101 self._url, authinfo = u.authinfo()
103 102
104 103 self._ui = ui
105 104 ui.debug('using %s\n' % self._url)
106 105
107 106 self._urlopener = url.opener(ui, authinfo)
108 107 self._requestbuilder = urlreq.request
109 108
110 # TODO remove once peerrepository isn't in inheritance.
111 self._capabilities = self.capabilities
112
113 109 def __del__(self):
114 110 urlopener = getattr(self, '_urlopener', None)
115 111 if urlopener:
116 112 for h in urlopener.handlers:
117 113 h.close()
118 114 getattr(h, "close_all", lambda : None)()
119 115
120 116 # Begin of _basepeer interface.
121 117
122 118 @util.propertycache
123 119 def ui(self):
124 120 return self._ui
125 121
126 122 def url(self):
127 123 return self._path
128 124
129 125 def local(self):
130 126 return None
131 127
132 128 def peer(self):
133 129 return self
134 130
135 131 def canpush(self):
136 132 return True
137 133
138 134 def close(self):
139 135 pass
140 136
141 137 # End of _basepeer interface.
142 138
143 139 # Begin of _basewirepeer interface.
144 140
145 141 def capabilities(self):
146 142 if self._caps is None:
147 143 try:
148 144 self._fetchcaps()
149 145 except error.RepoError:
150 146 self._caps = set()
151 147 self.ui.debug('capabilities: %s\n' %
152 148 (' '.join(self._caps or ['none'])))
153 149 return self._caps
154 150
155 151 # End of _basewirepeer interface.
156 152
157 153 # look up capabilities only when needed
158 154
159 155 def _fetchcaps(self):
160 156 self._caps = set(self._call('capabilities').split())
161 157
162 158 def _callstream(self, cmd, _compressible=False, **args):
163 159 if cmd == 'pushkey':
164 160 args['data'] = ''
165 161 data = args.pop('data', None)
166 162 headers = args.pop('headers', {})
167 163
168 164 self.ui.debug("sending %s command\n" % cmd)
169 165 q = [('cmd', cmd)]
170 166 headersize = 0
171 167 varyheaders = []
172 168 # Important: don't use self.capable() here or else you end up
173 169 # with infinite recursion when trying to look up capabilities
174 170 # for the first time.
175 171 postargsok = self._caps is not None and 'httppostargs' in self._caps
176 172 # TODO: support for httppostargs when data is a file-like
177 173 # object rather than a basestring
178 174 canmungedata = not data or isinstance(data, basestring)
179 175 if postargsok and canmungedata:
180 176 strargs = urlreq.urlencode(sorted(args.items()))
181 177 if strargs:
182 178 if not data:
183 179 data = strargs
184 180 elif isinstance(data, basestring):
185 181 data = strargs + data
186 182 headers['X-HgArgs-Post'] = len(strargs)
187 183 else:
188 184 if len(args) > 0:
189 185 httpheader = self.capable('httpheader')
190 186 if httpheader:
191 187 headersize = int(httpheader.split(',', 1)[0])
192 188 if headersize > 0:
193 189 # The headers can typically carry more data than the URL.
194 190 encargs = urlreq.urlencode(sorted(args.items()))
195 191 for header, value in encodevalueinheaders(encargs, 'X-HgArg',
196 192 headersize):
197 193 headers[header] = value
198 194 varyheaders.append(header)
199 195 else:
200 196 q += sorted(args.items())
201 197 qs = '?%s' % urlreq.urlencode(q)
202 198 cu = "%s%s" % (self._url, qs)
203 199 size = 0
204 200 if util.safehasattr(data, 'length'):
205 201 size = data.length
206 202 elif data is not None:
207 203 size = len(data)
208 204 if size and self.ui.configbool('ui', 'usehttp2'):
209 205 headers['Expect'] = '100-Continue'
210 206 headers['X-HgHttp2'] = '1'
211 207 if data is not None and 'Content-Type' not in headers:
212 208 headers['Content-Type'] = 'application/mercurial-0.1'
213 209
214 210 # Tell the server we accept application/mercurial-0.2 and multiple
215 211 # compression formats if the server is capable of emitting those
216 212 # payloads.
217 213 protoparams = []
218 214
219 215 mediatypes = set()
220 216 if self._caps is not None:
221 217 mt = self.capable('httpmediatype')
222 218 if mt:
223 219 protoparams.append('0.1')
224 220 mediatypes = set(mt.split(','))
225 221
226 222 if '0.2tx' in mediatypes:
227 223 protoparams.append('0.2')
228 224
229 225 if '0.2tx' in mediatypes and self.capable('compression'):
230 226 # We /could/ compare supported compression formats and prune
231 227 # non-mutually supported or error if nothing is mutually supported.
232 228 # For now, send the full list to the server and have it error.
233 229 comps = [e.wireprotosupport().name for e in
234 230 util.compengines.supportedwireengines(util.CLIENTROLE)]
235 231 protoparams.append('comp=%s' % ','.join(comps))
236 232
237 233 if protoparams:
238 234 protoheaders = encodevalueinheaders(' '.join(protoparams),
239 235 'X-HgProto',
240 236 headersize or 1024)
241 237 for header, value in protoheaders:
242 238 headers[header] = value
243 239 varyheaders.append(header)
244 240
245 241 if varyheaders:
246 242 headers['Vary'] = ','.join(varyheaders)
247 243
248 244 req = self._requestbuilder(cu, data, headers)
249 245
250 246 if data is not None:
251 247 self.ui.debug("sending %s bytes\n" % size)
252 248 req.add_unredirected_header('Content-Length', '%d' % size)
253 249 try:
254 250 resp = self._urlopener.open(req)
255 251 except urlerr.httperror as inst:
256 252 if inst.code == 401:
257 253 raise error.Abort(_('authorization failed'))
258 254 raise
259 255 except httplib.HTTPException as inst:
260 256 self.ui.debug('http error while sending %s command\n' % cmd)
261 257 self.ui.traceback()
262 258 raise IOError(None, inst)
263 259
264 260 # Insert error handlers for common I/O failures.
265 261 _wraphttpresponse(resp)
266 262
267 263 # record the url we got redirected to
268 264 resp_url = resp.geturl()
269 265 if resp_url.endswith(qs):
270 266 resp_url = resp_url[:-len(qs)]
271 267 if self._url.rstrip('/') != resp_url.rstrip('/'):
272 268 if not self.ui.quiet:
273 269 self.ui.warn(_('real URL is %s\n') % resp_url)
274 270 self._url = resp_url
275 271 try:
276 272 proto = resp.getheader('content-type')
277 273 except AttributeError:
278 274 proto = resp.headers.get('content-type', '')
279 275
280 276 safeurl = util.hidepassword(self._url)
281 277 if proto.startswith('application/hg-error'):
282 278 raise error.OutOfBandError(resp.read())
283 279 # accept old "text/plain" and "application/hg-changegroup" for now
284 280 if not (proto.startswith('application/mercurial-') or
285 281 (proto.startswith('text/plain')
286 282 and not resp.headers.get('content-length')) or
287 283 proto.startswith('application/hg-changegroup')):
288 284 self.ui.debug("requested URL: '%s'\n" % util.hidepassword(cu))
289 285 raise error.RepoError(
290 286 _("'%s' does not appear to be an hg repository:\n"
291 287 "---%%<--- (%s)\n%s\n---%%<---\n")
292 288 % (safeurl, proto or 'no content-type', resp.read(1024)))
293 289
294 290 if proto.startswith('application/mercurial-'):
295 291 try:
296 292 version = proto.split('-', 1)[1]
297 293 version_info = tuple([int(n) for n in version.split('.')])
298 294 except ValueError:
299 295 raise error.RepoError(_("'%s' sent a broken Content-Type "
300 296 "header (%s)") % (safeurl, proto))
301 297
302 298 # TODO consider switching to a decompression reader that uses
303 299 # generators.
304 300 if version_info == (0, 1):
305 301 if _compressible:
306 302 return util.compengines['zlib'].decompressorreader(resp)
307 303 return resp
308 304 elif version_info == (0, 2):
309 305 # application/mercurial-0.2 always identifies the compression
310 306 # engine in the payload header.
311 307 elen = struct.unpack('B', resp.read(1))[0]
312 308 ename = resp.read(elen)
313 309 engine = util.compengines.forwiretype(ename)
314 310 return engine.decompressorreader(resp)
315 311 else:
316 312 raise error.RepoError(_("'%s' uses newer protocol %s") %
317 313 (safeurl, version))
318 314
319 315 if _compressible:
320 316 return util.compengines['zlib'].decompressorreader(resp)
321 317
322 318 return resp
323 319
324 320 def _call(self, cmd, **args):
325 321 fp = self._callstream(cmd, **args)
326 322 try:
327 323 return fp.read()
328 324 finally:
329 325 # if using keepalive, allow connection to be reused
330 326 fp.close()
331 327
332 328 def _callpush(self, cmd, cg, **args):
333 329 # have to stream bundle to a temp file because we do not have
334 330 # http 1.1 chunked transfer.
335 331
336 332 types = self.capable('unbundle')
337 333 try:
338 334 types = types.split(',')
339 335 except AttributeError:
340 336 # servers older than d1b16a746db6 will send 'unbundle' as a
341 337 # boolean capability. They only support headerless/uncompressed
342 338 # bundles.
343 339 types = [""]
344 340 for x in types:
345 341 if x in bundle2.bundletypes:
346 342 type = x
347 343 break
348 344
349 345 tempname = bundle2.writebundle(self.ui, cg, None, type)
350 346 fp = httpconnection.httpsendfile(self.ui, tempname, "rb")
351 347 headers = {'Content-Type': 'application/mercurial-0.1'}
352 348
353 349 try:
354 350 r = self._call(cmd, data=fp, headers=headers, **args)
355 351 vals = r.split('\n', 1)
356 352 if len(vals) < 2:
357 353 raise error.ResponseError(_("unexpected response:"), r)
358 354 return vals
359 355 except socket.error as err:
360 356 if err.args[0] in (errno.ECONNRESET, errno.EPIPE):
361 357 raise error.Abort(_('push failed: %s') % err.args[1])
362 358 raise error.Abort(err.args[1])
363 359 finally:
364 360 fp.close()
365 361 os.unlink(tempname)
366 362
367 363 def _calltwowaystream(self, cmd, fp, **args):
368 364 fh = None
369 365 fp_ = None
370 366 filename = None
371 367 try:
372 368 # dump bundle to disk
373 369 fd, filename = tempfile.mkstemp(prefix="hg-bundle-", suffix=".hg")
374 370 fh = os.fdopen(fd, pycompat.sysstr("wb"))
375 371 d = fp.read(4096)
376 372 while d:
377 373 fh.write(d)
378 374 d = fp.read(4096)
379 375 fh.close()
380 376 # start http push
381 377 fp_ = httpconnection.httpsendfile(self.ui, filename, "rb")
382 378 headers = {'Content-Type': 'application/mercurial-0.1'}
383 379 return self._callstream(cmd, data=fp_, headers=headers, **args)
384 380 finally:
385 381 if fp_ is not None:
386 382 fp_.close()
387 383 if fh is not None:
388 384 fh.close()
389 385 os.unlink(filename)
390 386
391 387 def _callcompressable(self, cmd, **args):
392 388 return self._callstream(cmd, _compressible=True, **args)
393 389
394 390 def _abort(self, exception):
395 391 raise exception
396 392
397 393 class httpspeer(httppeer):
398 394 def __init__(self, ui, path):
399 395 if not url.has_https:
400 396 raise error.Abort(_('Python support for SSL and HTTPS '
401 397 'is not installed'))
402 398 httppeer.__init__(self, ui, path)
403 399
404 400 def instance(ui, path, create):
405 401 if create:
406 402 raise error.Abort(_('cannot create new http repository'))
407 403 try:
408 404 if path.startswith('https:'):
409 405 inst = httpspeer(ui, path)
410 406 else:
411 407 inst = httppeer(ui, path)
412 408 try:
413 409 # Try to do useful work when checking compatibility.
414 410 # Usually saves a roundtrip since we want the caps anyway.
415 411 inst._fetchcaps()
416 412 except error.RepoError:
417 413 # No luck, try older compatibility check.
418 414 inst.between([(nullid, nullid)])
419 415 return inst
420 416 except error.RepoError as httpexception:
421 417 try:
422 418 r = statichttprepo.instance(ui, "static-" + path, create)
423 419 ui.note(_('(falling back to static-http)\n'))
424 420 return r
425 421 except error.RepoError:
426 422 raise httpexception # use the original http RepoError instead
@@ -1,140 +1,96 b''
1 1 # peer.py - repository base classes for mercurial
2 2 #
3 3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
4 4 # Copyright 2006 Vadim Gelfer <vadim.gelfer@gmail.com>
5 5 #
6 6 # This software may be used and distributed according to the terms of the
7 7 # GNU General Public License version 2 or any later version.
8 8
9 9 from __future__ import absolute_import
10 10
11 from .i18n import _
12 11 from . import (
13 12 error,
14 13 util,
15 14 )
16 15
17 16 # abstract batching support
18 17
19 18 class future(object):
20 19 '''placeholder for a value to be set later'''
21 20 def set(self, value):
22 21 if util.safehasattr(self, 'value'):
23 22 raise error.RepoError("future is already set")
24 23 self.value = value
25 24
26 25 class batcher(object):
27 26 '''base class for batches of commands submittable in a single request
28 27
29 28 All methods invoked on instances of this class are simply queued and
30 29 return a a future for the result. Once you call submit(), all the queued
31 30 calls are performed and the results set in their respective futures.
32 31 '''
33 32 def __init__(self):
34 33 self.calls = []
35 34 def __getattr__(self, name):
36 35 def call(*args, **opts):
37 36 resref = future()
38 37 self.calls.append((name, args, opts, resref,))
39 38 return resref
40 39 return call
41 40 def submit(self):
42 41 raise NotImplementedError()
43 42
44 43 class iterbatcher(batcher):
45 44
46 45 def submit(self):
47 46 raise NotImplementedError()
48 47
49 48 def results(self):
50 49 raise NotImplementedError()
51 50
52 51 class localiterbatcher(iterbatcher):
53 52 def __init__(self, local):
54 53 super(iterbatcher, self).__init__()
55 54 self.local = local
56 55
57 56 def submit(self):
58 57 # submit for a local iter batcher is a noop
59 58 pass
60 59
61 60 def results(self):
62 61 for name, args, opts, resref in self.calls:
63 62 resref.set(getattr(self.local, name)(*args, **opts))
64 63 yield resref.value
65 64
66 65 def batchable(f):
67 66 '''annotation for batchable methods
68 67
69 68 Such methods must implement a coroutine as follows:
70 69
71 70 @batchable
72 71 def sample(self, one, two=None):
73 72 # Build list of encoded arguments suitable for your wire protocol:
74 73 encargs = [('one', encode(one),), ('two', encode(two),)]
75 74 # Create future for injection of encoded result:
76 75 encresref = future()
77 76 # Return encoded arguments and future:
78 77 yield encargs, encresref
79 78 # Assuming the future to be filled with the result from the batched
80 79 # request now. Decode it:
81 80 yield decode(encresref.value)
82 81
83 82 The decorator returns a function which wraps this coroutine as a plain
84 83 method, but adds the original method as an attribute called "batchable",
85 84 which is used by remotebatch to split the call into separate encoding and
86 85 decoding phases.
87 86 '''
88 87 def plain(*args, **opts):
89 88 batchable = f(*args, **opts)
90 89 encargsorres, encresref = next(batchable)
91 90 if not encresref:
92 91 return encargsorres # a local result in this case
93 92 self = args[0]
94 93 encresref.set(self._submitone(f.func_name, encargsorres))
95 94 return next(batchable)
96 95 setattr(plain, 'batchable', f)
97 96 return plain
98
99 class peerrepository(object):
100 def iterbatch(self):
101 """Batch requests but allow iterating over the results.
102
103 This is to allow interleaving responses with things like
104 progress updates for clients.
105 """
106 return localiterbatcher(self)
107
108 def capable(self, name):
109 '''tell whether repo supports named capability.
110 return False if not supported.
111 if boolean capability, return True.
112 if string capability, return string.'''
113 caps = self._capabilities()
114 if name in caps:
115 return True
116 name_eq = name + '='
117 for cap in caps:
118 if cap.startswith(name_eq):
119 return cap[len(name_eq):]
120 return False
121
122 def requirecap(self, name, purpose):
123 '''raise an exception if the given capability is not present'''
124 if not self.capable(name):
125 raise error.CapabilityError(
126 _('cannot %s; remote repository does not '
127 'support the %r capability') % (purpose, name))
128
129 def local(self):
130 '''return peer as a localrepo, or None'''
131 return None
132
133 def peer(self):
134 return self
135
136 def canpush(self):
137 return True
138
139 def close(self):
140 pass
@@ -1,353 +1,349 b''
1 1 # sshpeer.py - ssh repository proxy class for mercurial
2 2 #
3 3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import re
11 11
12 12 from .i18n import _
13 13 from . import (
14 14 error,
15 15 pycompat,
16 repository,
17 16 util,
18 17 wireproto,
19 18 )
20 19
21 20 def _serverquote(s):
22 21 if not s:
23 22 return s
24 23 '''quote a string for the remote shell ... which we assume is sh'''
25 24 if re.match('[a-zA-Z0-9@%_+=:,./-]*$', s):
26 25 return s
27 26 return "'%s'" % s.replace("'", "'\\''")
28 27
29 28 def _forwardoutput(ui, pipe):
30 29 """display all data currently available on pipe as remote output.
31 30
32 31 This is non blocking."""
33 32 s = util.readpipe(pipe)
34 33 if s:
35 34 for l in s.splitlines():
36 35 ui.status(_("remote: "), l, '\n')
37 36
38 37 class doublepipe(object):
39 38 """Operate a side-channel pipe in addition of a main one
40 39
41 40 The side-channel pipe contains server output to be forwarded to the user
42 41 input. The double pipe will behave as the "main" pipe, but will ensure the
43 42 content of the "side" pipe is properly processed while we wait for blocking
44 43 call on the "main" pipe.
45 44
46 45 If large amounts of data are read from "main", the forward will cease after
47 46 the first bytes start to appear. This simplifies the implementation
48 47 without affecting actual output of sshpeer too much as we rarely issue
49 48 large read for data not yet emitted by the server.
50 49
51 50 The main pipe is expected to be a 'bufferedinputpipe' from the util module
52 51 that handle all the os specific bits. This class lives in this module
53 52 because it focus on behavior specific to the ssh protocol."""
54 53
55 54 def __init__(self, ui, main, side):
56 55 self._ui = ui
57 56 self._main = main
58 57 self._side = side
59 58
60 59 def _wait(self):
61 60 """wait until some data are available on main or side
62 61
63 62 return a pair of boolean (ismainready, issideready)
64 63
65 64 (This will only wait for data if the setup is supported by `util.poll`)
66 65 """
67 66 if getattr(self._main, 'hasbuffer', False): # getattr for classic pipe
68 67 return (True, True) # main has data, assume side is worth poking at.
69 68 fds = [self._main.fileno(), self._side.fileno()]
70 69 try:
71 70 act = util.poll(fds)
72 71 except NotImplementedError:
73 72 # non supported yet case, assume all have data.
74 73 act = fds
75 74 return (self._main.fileno() in act, self._side.fileno() in act)
76 75
77 76 def write(self, data):
78 77 return self._call('write', data)
79 78
80 79 def read(self, size):
81 80 r = self._call('read', size)
82 81 if size != 0 and not r:
83 82 # We've observed a condition that indicates the
84 83 # stdout closed unexpectedly. Check stderr one
85 84 # more time and snag anything that's there before
86 85 # letting anyone know the main part of the pipe
87 86 # closed prematurely.
88 87 _forwardoutput(self._ui, self._side)
89 88 return r
90 89
91 90 def readline(self):
92 91 return self._call('readline')
93 92
94 93 def _call(self, methname, data=None):
95 94 """call <methname> on "main", forward output of "side" while blocking
96 95 """
97 96 # data can be '' or 0
98 97 if (data is not None and not data) or self._main.closed:
99 98 _forwardoutput(self._ui, self._side)
100 99 return ''
101 100 while True:
102 101 mainready, sideready = self._wait()
103 102 if sideready:
104 103 _forwardoutput(self._ui, self._side)
105 104 if mainready:
106 105 meth = getattr(self._main, methname)
107 106 if data is None:
108 107 return meth()
109 108 else:
110 109 return meth(data)
111 110
112 111 def close(self):
113 112 return self._main.close()
114 113
115 114 def flush(self):
116 115 return self._main.flush()
117 116
118 class sshpeer(wireproto.wirepeer, repository.legacypeer):
117 class sshpeer(wireproto.wirepeer):
119 118 def __init__(self, ui, path, create=False):
120 119 self._url = path
121 120 self._ui = ui
122 121 self._pipeo = self._pipei = self._pipee = None
123 122
124 123 u = util.url(path, parsequery=False, parsefragment=False)
125 124 if u.scheme != 'ssh' or not u.host or u.path is None:
126 125 self._abort(error.RepoError(_("couldn't parse location %s") % path))
127 126
128 127 util.checksafessh(path)
129 128
130 129 if u.passwd is not None:
131 130 self._abort(error.RepoError(_("password in URL not supported")))
132 131
133 132 self._user = u.user
134 133 self._host = u.host
135 134 self._port = u.port
136 135 self._path = u.path or '.'
137 136
138 137 sshcmd = self.ui.config("ui", "ssh")
139 138 remotecmd = self.ui.config("ui", "remotecmd")
140 139
141 140 args = util.sshargs(sshcmd, self._host, self._user, self._port)
142 141
143 142 if create:
144 143 cmd = '%s %s %s' % (sshcmd, args,
145 144 util.shellquote("%s init %s" %
146 145 (_serverquote(remotecmd), _serverquote(self._path))))
147 146 ui.debug('running %s\n' % cmd)
148 147 res = ui.system(cmd, blockedtag='sshpeer')
149 148 if res != 0:
150 149 self._abort(error.RepoError(_("could not create remote repo")))
151 150
152 151 self._validaterepo(sshcmd, args, remotecmd)
153 152
154 # TODO remove this alias once peerrepository inheritance is removed.
155 self._capabilities = self.capabilities
156
157 153 # Begin of _basepeer interface.
158 154
159 155 @util.propertycache
160 156 def ui(self):
161 157 return self._ui
162 158
163 159 def url(self):
164 160 return self._url
165 161
166 162 def local(self):
167 163 return None
168 164
169 165 def peer(self):
170 166 return self
171 167
172 168 def canpush(self):
173 169 return True
174 170
175 171 def close(self):
176 172 pass
177 173
178 174 # End of _basepeer interface.
179 175
180 176 # Begin of _basewirecommands interface.
181 177
182 178 def capabilities(self):
183 179 return self._caps
184 180
185 181 # End of _basewirecommands interface.
186 182
187 183 def _validaterepo(self, sshcmd, args, remotecmd):
188 184 # cleanup up previous run
189 185 self._cleanup()
190 186
191 187 cmd = '%s %s %s' % (sshcmd, args,
192 188 util.shellquote("%s -R %s serve --stdio" %
193 189 (_serverquote(remotecmd), _serverquote(self._path))))
194 190 self.ui.debug('running %s\n' % cmd)
195 191 cmd = util.quotecommand(cmd)
196 192
197 193 # while self._subprocess isn't used, having it allows the subprocess to
198 194 # to clean up correctly later
199 195 #
200 196 # no buffer allow the use of 'select'
201 197 # feel free to remove buffering and select usage when we ultimately
202 198 # move to threading.
203 199 sub = util.popen4(cmd, bufsize=0)
204 200 self._pipeo, self._pipei, self._pipee, self._subprocess = sub
205 201
206 202 self._pipei = util.bufferedinputpipe(self._pipei)
207 203 self._pipei = doublepipe(self.ui, self._pipei, self._pipee)
208 204 self._pipeo = doublepipe(self.ui, self._pipeo, self._pipee)
209 205
210 206 # skip any noise generated by remote shell
211 207 self._callstream("hello")
212 208 r = self._callstream("between", pairs=("%s-%s" % ("0"*40, "0"*40)))
213 209 lines = ["", "dummy"]
214 210 max_noise = 500
215 211 while lines[-1] and max_noise:
216 212 l = r.readline()
217 213 self._readerr()
218 214 if lines[-1] == "1\n" and l == "\n":
219 215 break
220 216 if l:
221 217 self.ui.debug("remote: ", l)
222 218 lines.append(l)
223 219 max_noise -= 1
224 220 else:
225 221 self._abort(error.RepoError(_('no suitable response from '
226 222 'remote hg')))
227 223
228 224 self._caps = set()
229 225 for l in reversed(lines):
230 226 if l.startswith("capabilities:"):
231 227 self._caps.update(l[:-1].split(":")[1].split())
232 228 break
233 229
234 230 def _readerr(self):
235 231 _forwardoutput(self.ui, self._pipee)
236 232
237 233 def _abort(self, exception):
238 234 self._cleanup()
239 235 raise exception
240 236
241 237 def _cleanup(self):
242 238 if self._pipeo is None:
243 239 return
244 240 self._pipeo.close()
245 241 self._pipei.close()
246 242 try:
247 243 # read the error descriptor until EOF
248 244 for l in self._pipee:
249 245 self.ui.status(_("remote: "), l)
250 246 except (IOError, ValueError):
251 247 pass
252 248 self._pipee.close()
253 249
254 250 __del__ = _cleanup
255 251
256 252 def _submitbatch(self, req):
257 253 rsp = self._callstream("batch", cmds=wireproto.encodebatchcmds(req))
258 254 available = self._getamount()
259 255 # TODO this response parsing is probably suboptimal for large
260 256 # batches with large responses.
261 257 toread = min(available, 1024)
262 258 work = rsp.read(toread)
263 259 available -= toread
264 260 chunk = work
265 261 while chunk:
266 262 while ';' in work:
267 263 one, work = work.split(';', 1)
268 264 yield wireproto.unescapearg(one)
269 265 toread = min(available, 1024)
270 266 chunk = rsp.read(toread)
271 267 available -= toread
272 268 work += chunk
273 269 yield wireproto.unescapearg(work)
274 270
275 271 def _callstream(self, cmd, **args):
276 272 args = pycompat.byteskwargs(args)
277 273 self.ui.debug("sending %s command\n" % cmd)
278 274 self._pipeo.write("%s\n" % cmd)
279 275 _func, names = wireproto.commands[cmd]
280 276 keys = names.split()
281 277 wireargs = {}
282 278 for k in keys:
283 279 if k == '*':
284 280 wireargs['*'] = args
285 281 break
286 282 else:
287 283 wireargs[k] = args[k]
288 284 del args[k]
289 285 for k, v in sorted(wireargs.iteritems()):
290 286 self._pipeo.write("%s %d\n" % (k, len(v)))
291 287 if isinstance(v, dict):
292 288 for dk, dv in v.iteritems():
293 289 self._pipeo.write("%s %d\n" % (dk, len(dv)))
294 290 self._pipeo.write(dv)
295 291 else:
296 292 self._pipeo.write(v)
297 293 self._pipeo.flush()
298 294
299 295 return self._pipei
300 296
301 297 def _callcompressable(self, cmd, **args):
302 298 return self._callstream(cmd, **args)
303 299
304 300 def _call(self, cmd, **args):
305 301 self._callstream(cmd, **args)
306 302 return self._recv()
307 303
308 304 def _callpush(self, cmd, fp, **args):
309 305 r = self._call(cmd, **args)
310 306 if r:
311 307 return '', r
312 308 for d in iter(lambda: fp.read(4096), ''):
313 309 self._send(d)
314 310 self._send("", flush=True)
315 311 r = self._recv()
316 312 if r:
317 313 return '', r
318 314 return self._recv(), ''
319 315
320 316 def _calltwowaystream(self, cmd, fp, **args):
321 317 r = self._call(cmd, **args)
322 318 if r:
323 319 # XXX needs to be made better
324 320 raise error.Abort(_('unexpected remote reply: %s') % r)
325 321 for d in iter(lambda: fp.read(4096), ''):
326 322 self._send(d)
327 323 self._send("", flush=True)
328 324 return self._pipei
329 325
330 326 def _getamount(self):
331 327 l = self._pipei.readline()
332 328 if l == '\n':
333 329 self._readerr()
334 330 msg = _('check previous remote output')
335 331 self._abort(error.OutOfBandError(hint=msg))
336 332 self._readerr()
337 333 try:
338 334 return int(l)
339 335 except ValueError:
340 336 self._abort(error.ResponseError(_("unexpected response:"), l))
341 337
342 338 def _recv(self):
343 339 return self._pipei.read(self._getamount())
344 340
345 341 def _send(self, data, flush=False):
346 342 self._pipeo.write("%d\n" % len(data))
347 343 if data:
348 344 self._pipeo.write(data)
349 345 if flush:
350 346 self._pipeo.flush()
351 347 self._readerr()
352 348
353 349 instance = sshpeer
@@ -1,1050 +1,1059 b''
1 1 # wireproto.py - generic wire protocol support functions
2 2 #
3 3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import hashlib
11 11 import os
12 12 import tempfile
13 13
14 14 from .i18n import _
15 15 from .node import (
16 16 bin,
17 17 hex,
18 18 nullid,
19 19 )
20 20
21 21 from . import (
22 22 bundle2,
23 23 changegroup as changegroupmod,
24 24 encoding,
25 25 error,
26 26 exchange,
27 27 peer,
28 28 pushkey as pushkeymod,
29 29 pycompat,
30 repository,
30 31 streamclone,
31 32 util,
32 33 )
33 34
34 35 urlerr = util.urlerr
35 36 urlreq = util.urlreq
36 37
37 38 bundle2requiredmain = _('incompatible Mercurial client; bundle2 required')
38 39 bundle2requiredhint = _('see https://www.mercurial-scm.org/wiki/'
39 40 'IncompatibleClient')
40 41 bundle2required = '%s\n(%s)\n' % (bundle2requiredmain, bundle2requiredhint)
41 42
42 43 class abstractserverproto(object):
43 44 """abstract class that summarizes the protocol API
44 45
45 46 Used as reference and documentation.
46 47 """
47 48
48 49 def getargs(self, args):
49 50 """return the value for arguments in <args>
50 51
51 52 returns a list of values (same order as <args>)"""
52 53 raise NotImplementedError()
53 54
54 55 def getfile(self, fp):
55 56 """write the whole content of a file into a file like object
56 57
57 58 The file is in the form::
58 59
59 60 (<chunk-size>\n<chunk>)+0\n
60 61
61 62 chunk size is the ascii version of the int.
62 63 """
63 64 raise NotImplementedError()
64 65
65 66 def redirect(self):
66 67 """may setup interception for stdout and stderr
67 68
68 69 See also the `restore` method."""
69 70 raise NotImplementedError()
70 71
71 72 # If the `redirect` function does install interception, the `restore`
72 73 # function MUST be defined. If interception is not used, this function
73 74 # MUST NOT be defined.
74 75 #
75 76 # left commented here on purpose
76 77 #
77 78 #def restore(self):
78 79 # """reinstall previous stdout and stderr and return intercepted stdout
79 80 # """
80 81 # raise NotImplementedError()
81 82
82 83 class remoteiterbatcher(peer.iterbatcher):
83 84 def __init__(self, remote):
84 85 super(remoteiterbatcher, self).__init__()
85 86 self._remote = remote
86 87
87 88 def __getattr__(self, name):
88 89 # Validate this method is batchable, since submit() only supports
89 90 # batchable methods.
90 91 fn = getattr(self._remote, name)
91 92 if not getattr(fn, 'batchable', None):
92 93 raise error.ProgrammingError('Attempted to batch a non-batchable '
93 94 'call to %r' % name)
94 95
95 96 return super(remoteiterbatcher, self).__getattr__(name)
96 97
97 98 def submit(self):
98 99 """Break the batch request into many patch calls and pipeline them.
99 100
100 101 This is mostly valuable over http where request sizes can be
101 102 limited, but can be used in other places as well.
102 103 """
103 104 # 2-tuple of (command, arguments) that represents what will be
104 105 # sent over the wire.
105 106 requests = []
106 107
107 108 # 4-tuple of (command, final future, @batchable generator, remote
108 109 # future).
109 110 results = []
110 111
111 112 for command, args, opts, finalfuture in self.calls:
112 113 mtd = getattr(self._remote, command)
113 114 batchable = mtd.batchable(mtd.im_self, *args, **opts)
114 115
115 116 commandargs, fremote = next(batchable)
116 117 assert fremote
117 118 requests.append((command, commandargs))
118 119 results.append((command, finalfuture, batchable, fremote))
119 120
120 121 if requests:
121 122 self._resultiter = self._remote._submitbatch(requests)
122 123
123 124 self._results = results
124 125
125 126 def results(self):
126 127 for command, finalfuture, batchable, remotefuture in self._results:
127 128 # Get the raw result, set it in the remote future, feed it
128 129 # back into the @batchable generator so it can be decoded, and
129 130 # set the result on the final future to this value.
130 131 remoteresult = next(self._resultiter)
131 132 remotefuture.set(remoteresult)
132 133 finalfuture.set(next(batchable))
133 134
134 135 # Verify our @batchable generators only emit 2 values.
135 136 try:
136 137 next(batchable)
137 138 except StopIteration:
138 139 pass
139 140 else:
140 141 raise error.ProgrammingError('%s @batchable generator emitted '
141 142 'unexpected value count' % command)
142 143
143 144 yield finalfuture.value
144 145
145 146 # Forward a couple of names from peer to make wireproto interactions
146 147 # slightly more sensible.
147 148 batchable = peer.batchable
148 149 future = peer.future
149 150
150 151 # list of nodes encoding / decoding
151 152
152 153 def decodelist(l, sep=' '):
153 154 if l:
154 155 return map(bin, l.split(sep))
155 156 return []
156 157
157 158 def encodelist(l, sep=' '):
158 159 try:
159 160 return sep.join(map(hex, l))
160 161 except TypeError:
161 162 raise
162 163
163 164 # batched call argument encoding
164 165
165 166 def escapearg(plain):
166 167 return (plain
167 168 .replace(':', ':c')
168 169 .replace(',', ':o')
169 170 .replace(';', ':s')
170 171 .replace('=', ':e'))
171 172
172 173 def unescapearg(escaped):
173 174 return (escaped
174 175 .replace(':e', '=')
175 176 .replace(':s', ';')
176 177 .replace(':o', ',')
177 178 .replace(':c', ':'))
178 179
179 180 def encodebatchcmds(req):
180 181 """Return a ``cmds`` argument value for the ``batch`` command."""
181 182 cmds = []
182 183 for op, argsdict in req:
183 184 # Old servers didn't properly unescape argument names. So prevent
184 185 # the sending of argument names that may not be decoded properly by
185 186 # servers.
186 187 assert all(escapearg(k) == k for k in argsdict)
187 188
188 189 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
189 190 for k, v in argsdict.iteritems())
190 191 cmds.append('%s %s' % (op, args))
191 192
192 193 return ';'.join(cmds)
193 194
194 195 # mapping of options accepted by getbundle and their types
195 196 #
196 197 # Meant to be extended by extensions. It is extensions responsibility to ensure
197 198 # such options are properly processed in exchange.getbundle.
198 199 #
199 200 # supported types are:
200 201 #
201 202 # :nodes: list of binary nodes
202 203 # :csv: list of comma-separated values
203 204 # :scsv: list of comma-separated values return as set
204 205 # :plain: string with no transformation needed.
205 206 gboptsmap = {'heads': 'nodes',
206 207 'common': 'nodes',
207 208 'obsmarkers': 'boolean',
208 209 'bundlecaps': 'scsv',
209 210 'listkeys': 'csv',
210 211 'cg': 'boolean',
211 212 'cbattempted': 'boolean'}
212 213
213 214 # client side
214 215
215 class wirepeer(peer.peerrepository):
216 class wirepeer(repository.legacypeer):
216 217 """Client-side interface for communicating with a peer repository.
217 218
218 219 Methods commonly call wire protocol commands of the same name.
219 220
220 221 See also httppeer.py and sshpeer.py for protocol-specific
221 222 implementations of this interface.
222 223 """
223 def _submitbatch(self, req):
224 """run batch request <req> on the server
225
226 Returns an iterator of the raw responses from the server.
227 """
228 rsp = self._callstream("batch", cmds=encodebatchcmds(req))
229 chunk = rsp.read(1024)
230 work = [chunk]
231 while chunk:
232 while ';' not in chunk and chunk:
233 chunk = rsp.read(1024)
234 work.append(chunk)
235 merged = ''.join(work)
236 while ';' in merged:
237 one, merged = merged.split(';', 1)
238 yield unescapearg(one)
239 chunk = rsp.read(1024)
240 work = [merged, chunk]
241 yield unescapearg(''.join(work))
242
243 def _submitone(self, op, args):
244 return self._call(op, **args)
224 # Begin of basewirepeer interface.
245 225
246 226 def iterbatch(self):
247 227 return remoteiterbatcher(self)
248 228
249 229 @batchable
250 230 def lookup(self, key):
251 231 self.requirecap('lookup', _('look up remote revision'))
252 232 f = future()
253 233 yield {'key': encoding.fromlocal(key)}, f
254 234 d = f.value
255 235 success, data = d[:-1].split(" ", 1)
256 236 if int(success):
257 237 yield bin(data)
258 238 self._abort(error.RepoError(data))
259 239
260 240 @batchable
261 241 def heads(self):
262 242 f = future()
263 243 yield {}, f
264 244 d = f.value
265 245 try:
266 246 yield decodelist(d[:-1])
267 247 except ValueError:
268 248 self._abort(error.ResponseError(_("unexpected response:"), d))
269 249
270 250 @batchable
271 251 def known(self, nodes):
272 252 f = future()
273 253 yield {'nodes': encodelist(nodes)}, f
274 254 d = f.value
275 255 try:
276 256 yield [bool(int(b)) for b in d]
277 257 except ValueError:
278 258 self._abort(error.ResponseError(_("unexpected response:"), d))
279 259
280 260 @batchable
281 261 def branchmap(self):
282 262 f = future()
283 263 yield {}, f
284 264 d = f.value
285 265 try:
286 266 branchmap = {}
287 267 for branchpart in d.splitlines():
288 268 branchname, branchheads = branchpart.split(' ', 1)
289 269 branchname = encoding.tolocal(urlreq.unquote(branchname))
290 270 branchheads = decodelist(branchheads)
291 271 branchmap[branchname] = branchheads
292 272 yield branchmap
293 273 except TypeError:
294 274 self._abort(error.ResponseError(_("unexpected response:"), d))
295 275
296 def branches(self, nodes):
297 n = encodelist(nodes)
298 d = self._call("branches", nodes=n)
299 try:
300 br = [tuple(decodelist(b)) for b in d.splitlines()]
301 return br
302 except ValueError:
303 self._abort(error.ResponseError(_("unexpected response:"), d))
304
305 def between(self, pairs):
306 batch = 8 # avoid giant requests
307 r = []
308 for i in xrange(0, len(pairs), batch):
309 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
310 d = self._call("between", pairs=n)
311 try:
312 r.extend(l and decodelist(l) or [] for l in d.splitlines())
313 except ValueError:
314 self._abort(error.ResponseError(_("unexpected response:"), d))
315 return r
276 @batchable
277 def listkeys(self, namespace):
278 if not self.capable('pushkey'):
279 yield {}, None
280 f = future()
281 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
282 yield {'namespace': encoding.fromlocal(namespace)}, f
283 d = f.value
284 self.ui.debug('received listkey for "%s": %i bytes\n'
285 % (namespace, len(d)))
286 yield pushkeymod.decodekeys(d)
316 287
317 288 @batchable
318 289 def pushkey(self, namespace, key, old, new):
319 290 if not self.capable('pushkey'):
320 291 yield False, None
321 292 f = future()
322 293 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
323 294 yield {'namespace': encoding.fromlocal(namespace),
324 295 'key': encoding.fromlocal(key),
325 296 'old': encoding.fromlocal(old),
326 297 'new': encoding.fromlocal(new)}, f
327 298 d = f.value
328 299 d, output = d.split('\n', 1)
329 300 try:
330 301 d = bool(int(d))
331 302 except ValueError:
332 303 raise error.ResponseError(
333 304 _('push failed (unexpected response):'), d)
334 305 for l in output.splitlines(True):
335 306 self.ui.status(_('remote: '), l)
336 307 yield d
337 308
338 @batchable
339 def listkeys(self, namespace):
340 if not self.capable('pushkey'):
341 yield {}, None
342 f = future()
343 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
344 yield {'namespace': encoding.fromlocal(namespace)}, f
345 d = f.value
346 self.ui.debug('received listkey for "%s": %i bytes\n'
347 % (namespace, len(d)))
348 yield pushkeymod.decodekeys(d)
349
350 309 def stream_out(self):
351 310 return self._callstream('stream_out')
352 311
353 def changegroup(self, nodes, kind):
354 n = encodelist(nodes)
355 f = self._callcompressable("changegroup", roots=n)
356 return changegroupmod.cg1unpacker(f, 'UN')
357
358 def changegroupsubset(self, bases, heads, kind):
359 self.requirecap('changegroupsubset', _('look up remote changes'))
360 bases = encodelist(bases)
361 heads = encodelist(heads)
362 f = self._callcompressable("changegroupsubset",
363 bases=bases, heads=heads)
364 return changegroupmod.cg1unpacker(f, 'UN')
365
366 312 def getbundle(self, source, **kwargs):
367 313 self.requirecap('getbundle', _('look up remote changes'))
368 314 opts = {}
369 315 bundlecaps = kwargs.get('bundlecaps')
370 316 if bundlecaps is not None:
371 317 kwargs['bundlecaps'] = sorted(bundlecaps)
372 318 else:
373 319 bundlecaps = () # kwargs could have it to None
374 320 for key, value in kwargs.iteritems():
375 321 if value is None:
376 322 continue
377 323 keytype = gboptsmap.get(key)
378 324 if keytype is None:
379 325 assert False, 'unexpected'
380 326 elif keytype == 'nodes':
381 327 value = encodelist(value)
382 328 elif keytype in ('csv', 'scsv'):
383 329 value = ','.join(value)
384 330 elif keytype == 'boolean':
385 331 value = '%i' % bool(value)
386 332 elif keytype != 'plain':
387 333 raise KeyError('unknown getbundle option type %s'
388 334 % keytype)
389 335 opts[key] = value
390 336 f = self._callcompressable("getbundle", **opts)
391 337 if any((cap.startswith('HG2') for cap in bundlecaps)):
392 338 return bundle2.getunbundler(self.ui, f)
393 339 else:
394 340 return changegroupmod.cg1unpacker(f, 'UN')
395 341
396 342 def unbundle(self, cg, heads, url):
397 343 '''Send cg (a readable file-like object representing the
398 344 changegroup to push, typically a chunkbuffer object) to the
399 345 remote server as a bundle.
400 346
401 347 When pushing a bundle10 stream, return an integer indicating the
402 348 result of the push (see changegroup.apply()).
403 349
404 350 When pushing a bundle20 stream, return a bundle20 stream.
405 351
406 352 `url` is the url the client thinks it's pushing to, which is
407 353 visible to hooks.
408 354 '''
409 355
410 356 if heads != ['force'] and self.capable('unbundlehash'):
411 357 heads = encodelist(['hashed',
412 358 hashlib.sha1(''.join(sorted(heads))).digest()])
413 359 else:
414 360 heads = encodelist(heads)
415 361
416 362 if util.safehasattr(cg, 'deltaheader'):
417 363 # this a bundle10, do the old style call sequence
418 364 ret, output = self._callpush("unbundle", cg, heads=heads)
419 365 if ret == "":
420 366 raise error.ResponseError(
421 367 _('push failed:'), output)
422 368 try:
423 369 ret = int(ret)
424 370 except ValueError:
425 371 raise error.ResponseError(
426 372 _('push failed (unexpected response):'), ret)
427 373
428 374 for l in output.splitlines(True):
429 375 self.ui.status(_('remote: '), l)
430 376 else:
431 377 # bundle2 push. Send a stream, fetch a stream.
432 378 stream = self._calltwowaystream('unbundle', cg, heads=heads)
433 379 ret = bundle2.getunbundler(self.ui, stream)
434 380 return ret
435 381
382 # End of basewirepeer interface.
383
384 # Begin of baselegacywirepeer interface.
385
386 def branches(self, nodes):
387 n = encodelist(nodes)
388 d = self._call("branches", nodes=n)
389 try:
390 br = [tuple(decodelist(b)) for b in d.splitlines()]
391 return br
392 except ValueError:
393 self._abort(error.ResponseError(_("unexpected response:"), d))
394
395 def between(self, pairs):
396 batch = 8 # avoid giant requests
397 r = []
398 for i in xrange(0, len(pairs), batch):
399 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
400 d = self._call("between", pairs=n)
401 try:
402 r.extend(l and decodelist(l) or [] for l in d.splitlines())
403 except ValueError:
404 self._abort(error.ResponseError(_("unexpected response:"), d))
405 return r
406
407 def changegroup(self, nodes, kind):
408 n = encodelist(nodes)
409 f = self._callcompressable("changegroup", roots=n)
410 return changegroupmod.cg1unpacker(f, 'UN')
411
412 def changegroupsubset(self, bases, heads, kind):
413 self.requirecap('changegroupsubset', _('look up remote changes'))
414 bases = encodelist(bases)
415 heads = encodelist(heads)
416 f = self._callcompressable("changegroupsubset",
417 bases=bases, heads=heads)
418 return changegroupmod.cg1unpacker(f, 'UN')
419
420 # End of baselegacywirepeer interface.
421
422 def _submitbatch(self, req):
423 """run batch request <req> on the server
424
425 Returns an iterator of the raw responses from the server.
426 """
427 rsp = self._callstream("batch", cmds=encodebatchcmds(req))
428 chunk = rsp.read(1024)
429 work = [chunk]
430 while chunk:
431 while ';' not in chunk and chunk:
432 chunk = rsp.read(1024)
433 work.append(chunk)
434 merged = ''.join(work)
435 while ';' in merged:
436 one, merged = merged.split(';', 1)
437 yield unescapearg(one)
438 chunk = rsp.read(1024)
439 work = [merged, chunk]
440 yield unescapearg(''.join(work))
441
442 def _submitone(self, op, args):
443 return self._call(op, **args)
444
436 445 def debugwireargs(self, one, two, three=None, four=None, five=None):
437 446 # don't pass optional arguments left at their default value
438 447 opts = {}
439 448 if three is not None:
440 449 opts['three'] = three
441 450 if four is not None:
442 451 opts['four'] = four
443 452 return self._call('debugwireargs', one=one, two=two, **opts)
444 453
445 454 def _call(self, cmd, **args):
446 455 """execute <cmd> on the server
447 456
448 457 The command is expected to return a simple string.
449 458
450 459 returns the server reply as a string."""
451 460 raise NotImplementedError()
452 461
453 462 def _callstream(self, cmd, **args):
454 463 """execute <cmd> on the server
455 464
456 465 The command is expected to return a stream. Note that if the
457 466 command doesn't return a stream, _callstream behaves
458 467 differently for ssh and http peers.
459 468
460 469 returns the server reply as a file like object.
461 470 """
462 471 raise NotImplementedError()
463 472
464 473 def _callcompressable(self, cmd, **args):
465 474 """execute <cmd> on the server
466 475
467 476 The command is expected to return a stream.
468 477
469 478 The stream may have been compressed in some implementations. This
470 479 function takes care of the decompression. This is the only difference
471 480 with _callstream.
472 481
473 482 returns the server reply as a file like object.
474 483 """
475 484 raise NotImplementedError()
476 485
477 486 def _callpush(self, cmd, fp, **args):
478 487 """execute a <cmd> on server
479 488
480 489 The command is expected to be related to a push. Push has a special
481 490 return method.
482 491
483 492 returns the server reply as a (ret, output) tuple. ret is either
484 493 empty (error) or a stringified int.
485 494 """
486 495 raise NotImplementedError()
487 496
488 497 def _calltwowaystream(self, cmd, fp, **args):
489 498 """execute <cmd> on server
490 499
491 500 The command will send a stream to the server and get a stream in reply.
492 501 """
493 502 raise NotImplementedError()
494 503
495 504 def _abort(self, exception):
496 505 """clearly abort the wire protocol connection and raise the exception
497 506 """
498 507 raise NotImplementedError()
499 508
500 509 # server side
501 510
502 511 # wire protocol command can either return a string or one of these classes.
503 512 class streamres(object):
504 513 """wireproto reply: binary stream
505 514
506 515 The call was successful and the result is a stream.
507 516
508 517 Accepts either a generator or an object with a ``read(size)`` method.
509 518
510 519 ``v1compressible`` indicates whether this data can be compressed to
511 520 "version 1" clients (technically: HTTP peers using
512 521 application/mercurial-0.1 media type). This flag should NOT be used on
513 522 new commands because new clients should support a more modern compression
514 523 mechanism.
515 524 """
516 525 def __init__(self, gen=None, reader=None, v1compressible=False):
517 526 self.gen = gen
518 527 self.reader = reader
519 528 self.v1compressible = v1compressible
520 529
521 530 class pushres(object):
522 531 """wireproto reply: success with simple integer return
523 532
524 533 The call was successful and returned an integer contained in `self.res`.
525 534 """
526 535 def __init__(self, res):
527 536 self.res = res
528 537
529 538 class pusherr(object):
530 539 """wireproto reply: failure
531 540
532 541 The call failed. The `self.res` attribute contains the error message.
533 542 """
534 543 def __init__(self, res):
535 544 self.res = res
536 545
537 546 class ooberror(object):
538 547 """wireproto reply: failure of a batch of operation
539 548
540 549 Something failed during a batch call. The error message is stored in
541 550 `self.message`.
542 551 """
543 552 def __init__(self, message):
544 553 self.message = message
545 554
546 555 def getdispatchrepo(repo, proto, command):
547 556 """Obtain the repo used for processing wire protocol commands.
548 557
549 558 The intent of this function is to serve as a monkeypatch point for
550 559 extensions that need commands to operate on different repo views under
551 560 specialized circumstances.
552 561 """
553 562 return repo.filtered('served')
554 563
555 564 def dispatch(repo, proto, command):
556 565 repo = getdispatchrepo(repo, proto, command)
557 566 func, spec = commands[command]
558 567 args = proto.getargs(spec)
559 568 return func(repo, proto, *args)
560 569
561 570 def options(cmd, keys, others):
562 571 opts = {}
563 572 for k in keys:
564 573 if k in others:
565 574 opts[k] = others[k]
566 575 del others[k]
567 576 if others:
568 577 util.stderr.write("warning: %s ignored unexpected arguments %s\n"
569 578 % (cmd, ",".join(others)))
570 579 return opts
571 580
572 581 def bundle1allowed(repo, action):
573 582 """Whether a bundle1 operation is allowed from the server.
574 583
575 584 Priority is:
576 585
577 586 1. server.bundle1gd.<action> (if generaldelta active)
578 587 2. server.bundle1.<action>
579 588 3. server.bundle1gd (if generaldelta active)
580 589 4. server.bundle1
581 590 """
582 591 ui = repo.ui
583 592 gd = 'generaldelta' in repo.requirements
584 593
585 594 if gd:
586 595 v = ui.configbool('server', 'bundle1gd.%s' % action, None)
587 596 if v is not None:
588 597 return v
589 598
590 599 v = ui.configbool('server', 'bundle1.%s' % action, None)
591 600 if v is not None:
592 601 return v
593 602
594 603 if gd:
595 604 v = ui.configbool('server', 'bundle1gd')
596 605 if v is not None:
597 606 return v
598 607
599 608 return ui.configbool('server', 'bundle1')
600 609
601 610 def supportedcompengines(ui, proto, role):
602 611 """Obtain the list of supported compression engines for a request."""
603 612 assert role in (util.CLIENTROLE, util.SERVERROLE)
604 613
605 614 compengines = util.compengines.supportedwireengines(role)
606 615
607 616 # Allow config to override default list and ordering.
608 617 if role == util.SERVERROLE:
609 618 configengines = ui.configlist('server', 'compressionengines')
610 619 config = 'server.compressionengines'
611 620 else:
612 621 # This is currently implemented mainly to facilitate testing. In most
613 622 # cases, the server should be in charge of choosing a compression engine
614 623 # because a server has the most to lose from a sub-optimal choice. (e.g.
615 624 # CPU DoS due to an expensive engine or a network DoS due to poor
616 625 # compression ratio).
617 626 configengines = ui.configlist('experimental',
618 627 'clientcompressionengines')
619 628 config = 'experimental.clientcompressionengines'
620 629
621 630 # No explicit config. Filter out the ones that aren't supposed to be
622 631 # advertised and return default ordering.
623 632 if not configengines:
624 633 attr = 'serverpriority' if role == util.SERVERROLE else 'clientpriority'
625 634 return [e for e in compengines
626 635 if getattr(e.wireprotosupport(), attr) > 0]
627 636
628 637 # If compression engines are listed in the config, assume there is a good
629 638 # reason for it (like server operators wanting to achieve specific
630 639 # performance characteristics). So fail fast if the config references
631 640 # unusable compression engines.
632 641 validnames = set(e.name() for e in compengines)
633 642 invalidnames = set(e for e in configengines if e not in validnames)
634 643 if invalidnames:
635 644 raise error.Abort(_('invalid compression engine defined in %s: %s') %
636 645 (config, ', '.join(sorted(invalidnames))))
637 646
638 647 compengines = [e for e in compengines if e.name() in configengines]
639 648 compengines = sorted(compengines,
640 649 key=lambda e: configengines.index(e.name()))
641 650
642 651 if not compengines:
643 652 raise error.Abort(_('%s config option does not specify any known '
644 653 'compression engines') % config,
645 654 hint=_('usable compression engines: %s') %
646 655 ', '.sorted(validnames))
647 656
648 657 return compengines
649 658
650 659 # list of commands
651 660 commands = {}
652 661
653 662 def wireprotocommand(name, args=''):
654 663 """decorator for wire protocol command"""
655 664 def register(func):
656 665 commands[name] = (func, args)
657 666 return func
658 667 return register
659 668
660 669 @wireprotocommand('batch', 'cmds *')
661 670 def batch(repo, proto, cmds, others):
662 671 repo = repo.filtered("served")
663 672 res = []
664 673 for pair in cmds.split(';'):
665 674 op, args = pair.split(' ', 1)
666 675 vals = {}
667 676 for a in args.split(','):
668 677 if a:
669 678 n, v = a.split('=')
670 679 vals[unescapearg(n)] = unescapearg(v)
671 680 func, spec = commands[op]
672 681 if spec:
673 682 keys = spec.split()
674 683 data = {}
675 684 for k in keys:
676 685 if k == '*':
677 686 star = {}
678 687 for key in vals.keys():
679 688 if key not in keys:
680 689 star[key] = vals[key]
681 690 data['*'] = star
682 691 else:
683 692 data[k] = vals[k]
684 693 result = func(repo, proto, *[data[k] for k in keys])
685 694 else:
686 695 result = func(repo, proto)
687 696 if isinstance(result, ooberror):
688 697 return result
689 698 res.append(escapearg(result))
690 699 return ';'.join(res)
691 700
692 701 @wireprotocommand('between', 'pairs')
693 702 def between(repo, proto, pairs):
694 703 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
695 704 r = []
696 705 for b in repo.between(pairs):
697 706 r.append(encodelist(b) + "\n")
698 707 return "".join(r)
699 708
700 709 @wireprotocommand('branchmap')
701 710 def branchmap(repo, proto):
702 711 branchmap = repo.branchmap()
703 712 heads = []
704 713 for branch, nodes in branchmap.iteritems():
705 714 branchname = urlreq.quote(encoding.fromlocal(branch))
706 715 branchnodes = encodelist(nodes)
707 716 heads.append('%s %s' % (branchname, branchnodes))
708 717 return '\n'.join(heads)
709 718
710 719 @wireprotocommand('branches', 'nodes')
711 720 def branches(repo, proto, nodes):
712 721 nodes = decodelist(nodes)
713 722 r = []
714 723 for b in repo.branches(nodes):
715 724 r.append(encodelist(b) + "\n")
716 725 return "".join(r)
717 726
718 727 @wireprotocommand('clonebundles', '')
719 728 def clonebundles(repo, proto):
720 729 """Server command for returning info for available bundles to seed clones.
721 730
722 731 Clients will parse this response and determine what bundle to fetch.
723 732
724 733 Extensions may wrap this command to filter or dynamically emit data
725 734 depending on the request. e.g. you could advertise URLs for the closest
726 735 data center given the client's IP address.
727 736 """
728 737 return repo.vfs.tryread('clonebundles.manifest')
729 738
730 739 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
731 740 'known', 'getbundle', 'unbundlehash', 'batch']
732 741
733 742 def _capabilities(repo, proto):
734 743 """return a list of capabilities for a repo
735 744
736 745 This function exists to allow extensions to easily wrap capabilities
737 746 computation
738 747
739 748 - returns a lists: easy to alter
740 749 - change done here will be propagated to both `capabilities` and `hello`
741 750 command without any other action needed.
742 751 """
743 752 # copy to prevent modification of the global list
744 753 caps = list(wireprotocaps)
745 754 if streamclone.allowservergeneration(repo):
746 755 if repo.ui.configbool('server', 'preferuncompressed'):
747 756 caps.append('stream-preferred')
748 757 requiredformats = repo.requirements & repo.supportedformats
749 758 # if our local revlogs are just revlogv1, add 'stream' cap
750 759 if not requiredformats - {'revlogv1'}:
751 760 caps.append('stream')
752 761 # otherwise, add 'streamreqs' detailing our local revlog format
753 762 else:
754 763 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
755 764 if repo.ui.configbool('experimental', 'bundle2-advertise'):
756 765 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
757 766 caps.append('bundle2=' + urlreq.quote(capsblob))
758 767 caps.append('unbundle=%s' % ','.join(bundle2.bundlepriority))
759 768
760 769 if proto.name == 'http':
761 770 caps.append('httpheader=%d' %
762 771 repo.ui.configint('server', 'maxhttpheaderlen'))
763 772 if repo.ui.configbool('experimental', 'httppostargs'):
764 773 caps.append('httppostargs')
765 774
766 775 # FUTURE advertise 0.2rx once support is implemented
767 776 # FUTURE advertise minrx and mintx after consulting config option
768 777 caps.append('httpmediatype=0.1rx,0.1tx,0.2tx')
769 778
770 779 compengines = supportedcompengines(repo.ui, proto, util.SERVERROLE)
771 780 if compengines:
772 781 comptypes = ','.join(urlreq.quote(e.wireprotosupport().name)
773 782 for e in compengines)
774 783 caps.append('compression=%s' % comptypes)
775 784
776 785 return caps
777 786
778 787 # If you are writing an extension and consider wrapping this function. Wrap
779 788 # `_capabilities` instead.
780 789 @wireprotocommand('capabilities')
781 790 def capabilities(repo, proto):
782 791 return ' '.join(_capabilities(repo, proto))
783 792
784 793 @wireprotocommand('changegroup', 'roots')
785 794 def changegroup(repo, proto, roots):
786 795 nodes = decodelist(roots)
787 796 cg = changegroupmod.changegroup(repo, nodes, 'serve')
788 797 return streamres(reader=cg, v1compressible=True)
789 798
790 799 @wireprotocommand('changegroupsubset', 'bases heads')
791 800 def changegroupsubset(repo, proto, bases, heads):
792 801 bases = decodelist(bases)
793 802 heads = decodelist(heads)
794 803 cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve')
795 804 return streamres(reader=cg, v1compressible=True)
796 805
797 806 @wireprotocommand('debugwireargs', 'one two *')
798 807 def debugwireargs(repo, proto, one, two, others):
799 808 # only accept optional args from the known set
800 809 opts = options('debugwireargs', ['three', 'four'], others)
801 810 return repo.debugwireargs(one, two, **opts)
802 811
803 812 @wireprotocommand('getbundle', '*')
804 813 def getbundle(repo, proto, others):
805 814 opts = options('getbundle', gboptsmap.keys(), others)
806 815 for k, v in opts.iteritems():
807 816 keytype = gboptsmap[k]
808 817 if keytype == 'nodes':
809 818 opts[k] = decodelist(v)
810 819 elif keytype == 'csv':
811 820 opts[k] = list(v.split(','))
812 821 elif keytype == 'scsv':
813 822 opts[k] = set(v.split(','))
814 823 elif keytype == 'boolean':
815 824 # Client should serialize False as '0', which is a non-empty string
816 825 # so it evaluates as a True bool.
817 826 if v == '0':
818 827 opts[k] = False
819 828 else:
820 829 opts[k] = bool(v)
821 830 elif keytype != 'plain':
822 831 raise KeyError('unknown getbundle option type %s'
823 832 % keytype)
824 833
825 834 if not bundle1allowed(repo, 'pull'):
826 835 if not exchange.bundle2requested(opts.get('bundlecaps')):
827 836 if proto.name == 'http':
828 837 return ooberror(bundle2required)
829 838 raise error.Abort(bundle2requiredmain,
830 839 hint=bundle2requiredhint)
831 840
832 841 try:
833 842 if repo.ui.configbool('server', 'disablefullbundle'):
834 843 # Check to see if this is a full clone.
835 844 clheads = set(repo.changelog.heads())
836 845 heads = set(opts.get('heads', set()))
837 846 common = set(opts.get('common', set()))
838 847 common.discard(nullid)
839 848 if not common and clheads == heads:
840 849 raise error.Abort(
841 850 _('server has pull-based clones disabled'),
842 851 hint=_('remove --pull if specified or upgrade Mercurial'))
843 852
844 853 chunks = exchange.getbundlechunks(repo, 'serve', **opts)
845 854 except error.Abort as exc:
846 855 # cleanly forward Abort error to the client
847 856 if not exchange.bundle2requested(opts.get('bundlecaps')):
848 857 if proto.name == 'http':
849 858 return ooberror(str(exc) + '\n')
850 859 raise # cannot do better for bundle1 + ssh
851 860 # bundle2 request expect a bundle2 reply
852 861 bundler = bundle2.bundle20(repo.ui)
853 862 manargs = [('message', str(exc))]
854 863 advargs = []
855 864 if exc.hint is not None:
856 865 advargs.append(('hint', exc.hint))
857 866 bundler.addpart(bundle2.bundlepart('error:abort',
858 867 manargs, advargs))
859 868 return streamres(gen=bundler.getchunks(), v1compressible=True)
860 869 return streamres(gen=chunks, v1compressible=True)
861 870
862 871 @wireprotocommand('heads')
863 872 def heads(repo, proto):
864 873 h = repo.heads()
865 874 return encodelist(h) + "\n"
866 875
867 876 @wireprotocommand('hello')
868 877 def hello(repo, proto):
869 878 '''the hello command returns a set of lines describing various
870 879 interesting things about the server, in an RFC822-like format.
871 880 Currently the only one defined is "capabilities", which
872 881 consists of a line in the form:
873 882
874 883 capabilities: space separated list of tokens
875 884 '''
876 885 return "capabilities: %s\n" % (capabilities(repo, proto))
877 886
878 887 @wireprotocommand('listkeys', 'namespace')
879 888 def listkeys(repo, proto, namespace):
880 889 d = repo.listkeys(encoding.tolocal(namespace)).items()
881 890 return pushkeymod.encodekeys(d)
882 891
883 892 @wireprotocommand('lookup', 'key')
884 893 def lookup(repo, proto, key):
885 894 try:
886 895 k = encoding.tolocal(key)
887 896 c = repo[k]
888 897 r = c.hex()
889 898 success = 1
890 899 except Exception as inst:
891 900 r = str(inst)
892 901 success = 0
893 902 return "%s %s\n" % (success, r)
894 903
895 904 @wireprotocommand('known', 'nodes *')
896 905 def known(repo, proto, nodes, others):
897 906 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
898 907
899 908 @wireprotocommand('pushkey', 'namespace key old new')
900 909 def pushkey(repo, proto, namespace, key, old, new):
901 910 # compatibility with pre-1.8 clients which were accidentally
902 911 # sending raw binary nodes rather than utf-8-encoded hex
903 912 if len(new) == 20 and util.escapestr(new) != new:
904 913 # looks like it could be a binary node
905 914 try:
906 915 new.decode('utf-8')
907 916 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
908 917 except UnicodeDecodeError:
909 918 pass # binary, leave unmodified
910 919 else:
911 920 new = encoding.tolocal(new) # normal path
912 921
913 922 if util.safehasattr(proto, 'restore'):
914 923
915 924 proto.redirect()
916 925
917 926 try:
918 927 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
919 928 encoding.tolocal(old), new) or False
920 929 except error.Abort:
921 930 r = False
922 931
923 932 output = proto.restore()
924 933
925 934 return '%s\n%s' % (int(r), output)
926 935
927 936 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
928 937 encoding.tolocal(old), new)
929 938 return '%s\n' % int(r)
930 939
931 940 @wireprotocommand('stream_out')
932 941 def stream(repo, proto):
933 942 '''If the server supports streaming clone, it advertises the "stream"
934 943 capability with a value representing the version and flags of the repo
935 944 it is serving. Client checks to see if it understands the format.
936 945 '''
937 946 if not streamclone.allowservergeneration(repo):
938 947 return '1\n'
939 948
940 949 def getstream(it):
941 950 yield '0\n'
942 951 for chunk in it:
943 952 yield chunk
944 953
945 954 try:
946 955 # LockError may be raised before the first result is yielded. Don't
947 956 # emit output until we're sure we got the lock successfully.
948 957 it = streamclone.generatev1wireproto(repo)
949 958 return streamres(gen=getstream(it))
950 959 except error.LockError:
951 960 return '2\n'
952 961
953 962 @wireprotocommand('unbundle', 'heads')
954 963 def unbundle(repo, proto, heads):
955 964 their_heads = decodelist(heads)
956 965
957 966 try:
958 967 proto.redirect()
959 968
960 969 exchange.check_heads(repo, their_heads, 'preparing changes')
961 970
962 971 # write bundle data to temporary file because it can be big
963 972 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
964 973 fp = os.fdopen(fd, pycompat.sysstr('wb+'))
965 974 r = 0
966 975 try:
967 976 proto.getfile(fp)
968 977 fp.seek(0)
969 978 gen = exchange.readbundle(repo.ui, fp, None)
970 979 if (isinstance(gen, changegroupmod.cg1unpacker)
971 980 and not bundle1allowed(repo, 'push')):
972 981 if proto.name == 'http':
973 982 # need to special case http because stderr do not get to
974 983 # the http client on failed push so we need to abuse some
975 984 # other error type to make sure the message get to the
976 985 # user.
977 986 return ooberror(bundle2required)
978 987 raise error.Abort(bundle2requiredmain,
979 988 hint=bundle2requiredhint)
980 989
981 990 r = exchange.unbundle(repo, gen, their_heads, 'serve',
982 991 proto._client())
983 992 if util.safehasattr(r, 'addpart'):
984 993 # The return looks streamable, we are in the bundle2 case and
985 994 # should return a stream.
986 995 return streamres(gen=r.getchunks())
987 996 return pushres(r)
988 997
989 998 finally:
990 999 fp.close()
991 1000 os.unlink(tempname)
992 1001
993 1002 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
994 1003 # handle non-bundle2 case first
995 1004 if not getattr(exc, 'duringunbundle2', False):
996 1005 try:
997 1006 raise
998 1007 except error.Abort:
999 1008 # The old code we moved used util.stderr directly.
1000 1009 # We did not change it to minimise code change.
1001 1010 # This need to be moved to something proper.
1002 1011 # Feel free to do it.
1003 1012 util.stderr.write("abort: %s\n" % exc)
1004 1013 if exc.hint is not None:
1005 1014 util.stderr.write("(%s)\n" % exc.hint)
1006 1015 return pushres(0)
1007 1016 except error.PushRaced:
1008 1017 return pusherr(str(exc))
1009 1018
1010 1019 bundler = bundle2.bundle20(repo.ui)
1011 1020 for out in getattr(exc, '_bundle2salvagedoutput', ()):
1012 1021 bundler.addpart(out)
1013 1022 try:
1014 1023 try:
1015 1024 raise
1016 1025 except error.PushkeyFailed as exc:
1017 1026 # check client caps
1018 1027 remotecaps = getattr(exc, '_replycaps', None)
1019 1028 if (remotecaps is not None
1020 1029 and 'pushkey' not in remotecaps.get('error', ())):
1021 1030 # no support remote side, fallback to Abort handler.
1022 1031 raise
1023 1032 part = bundler.newpart('error:pushkey')
1024 1033 part.addparam('in-reply-to', exc.partid)
1025 1034 if exc.namespace is not None:
1026 1035 part.addparam('namespace', exc.namespace, mandatory=False)
1027 1036 if exc.key is not None:
1028 1037 part.addparam('key', exc.key, mandatory=False)
1029 1038 if exc.new is not None:
1030 1039 part.addparam('new', exc.new, mandatory=False)
1031 1040 if exc.old is not None:
1032 1041 part.addparam('old', exc.old, mandatory=False)
1033 1042 if exc.ret is not None:
1034 1043 part.addparam('ret', exc.ret, mandatory=False)
1035 1044 except error.BundleValueError as exc:
1036 1045 errpart = bundler.newpart('error:unsupportedcontent')
1037 1046 if exc.parttype is not None:
1038 1047 errpart.addparam('parttype', exc.parttype)
1039 1048 if exc.params:
1040 1049 errpart.addparam('params', '\0'.join(exc.params))
1041 1050 except error.Abort as exc:
1042 1051 manargs = [('message', str(exc))]
1043 1052 advargs = []
1044 1053 if exc.hint is not None:
1045 1054 advargs.append(('hint', exc.hint))
1046 1055 bundler.addpart(bundle2.bundlepart('error:abort',
1047 1056 manargs, advargs))
1048 1057 except error.PushRaced as exc:
1049 1058 bundler.newpart('error:pushraced', [('message', str(exc))])
1050 1059 return streamres(gen=bundler.getchunks())
@@ -1,24 +1,24 b''
1 1 # Disable the $CAP wire protocol capability.
2 2
3 3 if test -z "$CAP"
4 4 then
5 5 echo "CAP environment variable not set."
6 6 fi
7 7
8 8 cat > notcapable-$CAP.py << EOF
9 from mercurial import extensions, peer, localrepo
9 from mercurial import extensions, localrepo, repository
10 10 def extsetup():
11 extensions.wrapfunction(peer.peerrepository, 'capable', wrapcapable)
11 extensions.wrapfunction(repository.peer, 'capable', wrapcapable)
12 12 extensions.wrapfunction(localrepo.localrepository, 'peer', wrappeer)
13 13 def wrapcapable(orig, self, name, *args, **kwargs):
14 14 if name in '$CAP'.split(' '):
15 15 return False
16 16 return orig(self, name, *args, **kwargs)
17 17 def wrappeer(orig, self):
18 18 # Since we're disabling some newer features, we need to make sure local
19 19 # repos add in the legacy features again.
20 20 return localrepo.locallegacypeer(self)
21 21 EOF
22 22
23 23 echo '[extensions]' >> $HGRCPATH
24 24 echo "notcapable-$CAP = `pwd`/notcapable-$CAP.py" >> $HGRCPATH
@@ -1,61 +1,80 b''
1 1 from __future__ import absolute_import, print_function
2 2
3 3 from mercurial import (
4 4 util,
5 5 wireproto,
6 6 )
7 7 stringio = util.stringio
8 8
9 9 class proto(object):
10 10 def __init__(self, args):
11 11 self.args = args
12 12 def getargs(self, spec):
13 13 args = self.args
14 14 args.setdefault('*', {})
15 15 names = spec.split()
16 16 return [args[n] for n in names]
17 17
18 18 class clientpeer(wireproto.wirepeer):
19 19 def __init__(self, serverrepo):
20 20 self.serverrepo = serverrepo
21 21
22 def _capabilities(self):
22 @property
23 def ui(self):
24 return self.serverrepo.ui
25
26 def url(self):
27 return 'test'
28
29 def local(self):
30 return None
31
32 def peer(self):
33 return self
34
35 def canpush(self):
36 return True
37
38 def close(self):
39 pass
40
41 def capabilities(self):
23 42 return ['batch']
24 43
25 44 def _call(self, cmd, **args):
26 45 return wireproto.dispatch(self.serverrepo, proto(args), cmd)
27 46
28 47 def _callstream(self, cmd, **args):
29 48 return stringio(self._call(cmd, **args))
30 49
31 50 @wireproto.batchable
32 51 def greet(self, name):
33 52 f = wireproto.future()
34 53 yield {'name': mangle(name)}, f
35 54 yield unmangle(f.value)
36 55
37 56 class serverrepo(object):
38 57 def greet(self, name):
39 58 return "Hello, " + name
40 59
41 60 def filtered(self, name):
42 61 return self
43 62
44 63 def mangle(s):
45 64 return ''.join(chr(ord(c) + 1) for c in s)
46 65 def unmangle(s):
47 66 return ''.join(chr(ord(c) - 1) for c in s)
48 67
49 68 def greet(repo, proto, name):
50 69 return mangle(repo.greet(unmangle(name)))
51 70
52 71 wireproto.commands['greet'] = (greet, 'name',)
53 72
54 73 srv = serverrepo()
55 74 clt = clientpeer(srv)
56 75
57 76 print(clt.greet("Foobar"))
58 77 b = clt.iterbatch()
59 78 map(b.greet, ('Fo, =;:<o', 'Bar'))
60 79 b.submit()
61 80 print([r for r in b.results()])
General Comments 0
You need to be logged in to leave comments. Login now