##// END OF EJS Templates
wireproto: rename argument to groupchunks()...
Gregory Szorc -
r30014:d34cf260 default
parent child Browse files
Show More
@@ -1,124 +1,124 b''
1 #
1 #
2 # Copyright 21 May 2005 - (c) 2005 Jake Edge <jake@edge2.net>
2 # Copyright 21 May 2005 - (c) 2005 Jake Edge <jake@edge2.net>
3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import cgi
10 import cgi
11 import zlib
11 import zlib
12
12
13 from .common import (
13 from .common import (
14 HTTP_OK,
14 HTTP_OK,
15 )
15 )
16
16
17 from .. import (
17 from .. import (
18 util,
18 util,
19 wireproto,
19 wireproto,
20 )
20 )
21 stringio = util.stringio
21 stringio = util.stringio
22
22
23 urlerr = util.urlerr
23 urlerr = util.urlerr
24 urlreq = util.urlreq
24 urlreq = util.urlreq
25
25
26 HGTYPE = 'application/mercurial-0.1'
26 HGTYPE = 'application/mercurial-0.1'
27 HGERRTYPE = 'application/hg-error'
27 HGERRTYPE = 'application/hg-error'
28
28
29 class webproto(wireproto.abstractserverproto):
29 class webproto(wireproto.abstractserverproto):
30 def __init__(self, req, ui):
30 def __init__(self, req, ui):
31 self.req = req
31 self.req = req
32 self.response = ''
32 self.response = ''
33 self.ui = ui
33 self.ui = ui
34 def getargs(self, args):
34 def getargs(self, args):
35 knownargs = self._args()
35 knownargs = self._args()
36 data = {}
36 data = {}
37 keys = args.split()
37 keys = args.split()
38 for k in keys:
38 for k in keys:
39 if k == '*':
39 if k == '*':
40 star = {}
40 star = {}
41 for key in knownargs.keys():
41 for key in knownargs.keys():
42 if key != 'cmd' and key not in keys:
42 if key != 'cmd' and key not in keys:
43 star[key] = knownargs[key][0]
43 star[key] = knownargs[key][0]
44 data['*'] = star
44 data['*'] = star
45 else:
45 else:
46 data[k] = knownargs[k][0]
46 data[k] = knownargs[k][0]
47 return [data[k] for k in keys]
47 return [data[k] for k in keys]
48 def _args(self):
48 def _args(self):
49 args = self.req.form.copy()
49 args = self.req.form.copy()
50 postlen = int(self.req.env.get('HTTP_X_HGARGS_POST', 0))
50 postlen = int(self.req.env.get('HTTP_X_HGARGS_POST', 0))
51 if postlen:
51 if postlen:
52 args.update(cgi.parse_qs(
52 args.update(cgi.parse_qs(
53 self.req.read(postlen), keep_blank_values=True))
53 self.req.read(postlen), keep_blank_values=True))
54 return args
54 return args
55 chunks = []
55 chunks = []
56 i = 1
56 i = 1
57 while True:
57 while True:
58 h = self.req.env.get('HTTP_X_HGARG_' + str(i))
58 h = self.req.env.get('HTTP_X_HGARG_' + str(i))
59 if h is None:
59 if h is None:
60 break
60 break
61 chunks += [h]
61 chunks += [h]
62 i += 1
62 i += 1
63 args.update(cgi.parse_qs(''.join(chunks), keep_blank_values=True))
63 args.update(cgi.parse_qs(''.join(chunks), keep_blank_values=True))
64 return args
64 return args
65 def getfile(self, fp):
65 def getfile(self, fp):
66 length = int(self.req.env['CONTENT_LENGTH'])
66 length = int(self.req.env['CONTENT_LENGTH'])
67 for s in util.filechunkiter(self.req, limit=length):
67 for s in util.filechunkiter(self.req, limit=length):
68 fp.write(s)
68 fp.write(s)
69 def redirect(self):
69 def redirect(self):
70 self.oldio = self.ui.fout, self.ui.ferr
70 self.oldio = self.ui.fout, self.ui.ferr
71 self.ui.ferr = self.ui.fout = stringio()
71 self.ui.ferr = self.ui.fout = stringio()
72 def restore(self):
72 def restore(self):
73 val = self.ui.fout.getvalue()
73 val = self.ui.fout.getvalue()
74 self.ui.ferr, self.ui.fout = self.oldio
74 self.ui.ferr, self.ui.fout = self.oldio
75 return val
75 return val
76 def groupchunks(self, cg):
76 def groupchunks(self, fh):
77 # Don't allow untrusted settings because disabling compression or
77 # Don't allow untrusted settings because disabling compression or
78 # setting a very high compression level could lead to flooding
78 # setting a very high compression level could lead to flooding
79 # the server's network or CPU.
79 # the server's network or CPU.
80 z = zlib.compressobj(self.ui.configint('server', 'zliblevel', -1))
80 z = zlib.compressobj(self.ui.configint('server', 'zliblevel', -1))
81 while True:
81 while True:
82 chunk = cg.read(32768)
82 chunk = fh.read(32768)
83 if not chunk:
83 if not chunk:
84 break
84 break
85 data = z.compress(chunk)
85 data = z.compress(chunk)
86 # Not all calls to compress() emit data. It is cheaper to inspect
86 # Not all calls to compress() emit data. It is cheaper to inspect
87 # that here than to send it via the generator.
87 # that here than to send it via the generator.
88 if data:
88 if data:
89 yield data
89 yield data
90 yield z.flush()
90 yield z.flush()
91 def _client(self):
91 def _client(self):
92 return 'remote:%s:%s:%s' % (
92 return 'remote:%s:%s:%s' % (
93 self.req.env.get('wsgi.url_scheme') or 'http',
93 self.req.env.get('wsgi.url_scheme') or 'http',
94 urlreq.quote(self.req.env.get('REMOTE_HOST', '')),
94 urlreq.quote(self.req.env.get('REMOTE_HOST', '')),
95 urlreq.quote(self.req.env.get('REMOTE_USER', '')))
95 urlreq.quote(self.req.env.get('REMOTE_USER', '')))
96
96
97 def iscmd(cmd):
97 def iscmd(cmd):
98 return cmd in wireproto.commands
98 return cmd in wireproto.commands
99
99
100 def call(repo, req, cmd):
100 def call(repo, req, cmd):
101 p = webproto(req, repo.ui)
101 p = webproto(req, repo.ui)
102 rsp = wireproto.dispatch(repo, p, cmd)
102 rsp = wireproto.dispatch(repo, p, cmd)
103 if isinstance(rsp, str):
103 if isinstance(rsp, str):
104 req.respond(HTTP_OK, HGTYPE, body=rsp)
104 req.respond(HTTP_OK, HGTYPE, body=rsp)
105 return []
105 return []
106 elif isinstance(rsp, wireproto.streamres):
106 elif isinstance(rsp, wireproto.streamres):
107 req.respond(HTTP_OK, HGTYPE)
107 req.respond(HTTP_OK, HGTYPE)
108 return rsp.gen
108 return rsp.gen
109 elif isinstance(rsp, wireproto.pushres):
109 elif isinstance(rsp, wireproto.pushres):
110 val = p.restore()
110 val = p.restore()
111 rsp = '%d\n%s' % (rsp.res, val)
111 rsp = '%d\n%s' % (rsp.res, val)
112 req.respond(HTTP_OK, HGTYPE, body=rsp)
112 req.respond(HTTP_OK, HGTYPE, body=rsp)
113 return []
113 return []
114 elif isinstance(rsp, wireproto.pusherr):
114 elif isinstance(rsp, wireproto.pusherr):
115 # drain the incoming bundle
115 # drain the incoming bundle
116 req.drain()
116 req.drain()
117 p.restore()
117 p.restore()
118 rsp = '0\n%s\n' % rsp.res
118 rsp = '0\n%s\n' % rsp.res
119 req.respond(HTTP_OK, HGTYPE, body=rsp)
119 req.respond(HTTP_OK, HGTYPE, body=rsp)
120 return []
120 return []
121 elif isinstance(rsp, wireproto.ooberror):
121 elif isinstance(rsp, wireproto.ooberror):
122 rsp = rsp.message
122 rsp = rsp.message
123 req.respond(HTTP_OK, HGERRTYPE, body=rsp)
123 req.respond(HTTP_OK, HGERRTYPE, body=rsp)
124 return []
124 return []
@@ -1,131 +1,131 b''
1 # sshserver.py - ssh protocol server support for mercurial
1 # sshserver.py - ssh protocol server support for mercurial
2 #
2 #
3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
4 # Copyright 2006 Vadim Gelfer <vadim.gelfer@gmail.com>
4 # Copyright 2006 Vadim Gelfer <vadim.gelfer@gmail.com>
5 #
5 #
6 # This software may be used and distributed according to the terms of the
6 # This software may be used and distributed according to the terms of the
7 # GNU General Public License version 2 or any later version.
7 # GNU General Public License version 2 or any later version.
8
8
9 from __future__ import absolute_import
9 from __future__ import absolute_import
10
10
11 import os
11 import os
12 import sys
12 import sys
13
13
14 from .i18n import _
14 from .i18n import _
15 from . import (
15 from . import (
16 error,
16 error,
17 hook,
17 hook,
18 util,
18 util,
19 wireproto,
19 wireproto,
20 )
20 )
21
21
22 class sshserver(wireproto.abstractserverproto):
22 class sshserver(wireproto.abstractserverproto):
23 def __init__(self, ui, repo):
23 def __init__(self, ui, repo):
24 self.ui = ui
24 self.ui = ui
25 self.repo = repo
25 self.repo = repo
26 self.lock = None
26 self.lock = None
27 self.fin = ui.fin
27 self.fin = ui.fin
28 self.fout = ui.fout
28 self.fout = ui.fout
29
29
30 hook.redirect(True)
30 hook.redirect(True)
31 ui.fout = repo.ui.fout = ui.ferr
31 ui.fout = repo.ui.fout = ui.ferr
32
32
33 # Prevent insertion/deletion of CRs
33 # Prevent insertion/deletion of CRs
34 util.setbinary(self.fin)
34 util.setbinary(self.fin)
35 util.setbinary(self.fout)
35 util.setbinary(self.fout)
36
36
37 def getargs(self, args):
37 def getargs(self, args):
38 data = {}
38 data = {}
39 keys = args.split()
39 keys = args.split()
40 for n in xrange(len(keys)):
40 for n in xrange(len(keys)):
41 argline = self.fin.readline()[:-1]
41 argline = self.fin.readline()[:-1]
42 arg, l = argline.split()
42 arg, l = argline.split()
43 if arg not in keys:
43 if arg not in keys:
44 raise error.Abort(_("unexpected parameter %r") % arg)
44 raise error.Abort(_("unexpected parameter %r") % arg)
45 if arg == '*':
45 if arg == '*':
46 star = {}
46 star = {}
47 for k in xrange(int(l)):
47 for k in xrange(int(l)):
48 argline = self.fin.readline()[:-1]
48 argline = self.fin.readline()[:-1]
49 arg, l = argline.split()
49 arg, l = argline.split()
50 val = self.fin.read(int(l))
50 val = self.fin.read(int(l))
51 star[arg] = val
51 star[arg] = val
52 data['*'] = star
52 data['*'] = star
53 else:
53 else:
54 val = self.fin.read(int(l))
54 val = self.fin.read(int(l))
55 data[arg] = val
55 data[arg] = val
56 return [data[k] for k in keys]
56 return [data[k] for k in keys]
57
57
58 def getarg(self, name):
58 def getarg(self, name):
59 return self.getargs(name)[0]
59 return self.getargs(name)[0]
60
60
61 def getfile(self, fpout):
61 def getfile(self, fpout):
62 self.sendresponse('')
62 self.sendresponse('')
63 count = int(self.fin.readline())
63 count = int(self.fin.readline())
64 while count:
64 while count:
65 fpout.write(self.fin.read(count))
65 fpout.write(self.fin.read(count))
66 count = int(self.fin.readline())
66 count = int(self.fin.readline())
67
67
68 def redirect(self):
68 def redirect(self):
69 pass
69 pass
70
70
71 def groupchunks(self, changegroup):
71 def groupchunks(self, fh):
72 return iter(lambda: changegroup.read(4096), '')
72 return iter(lambda: fh.read(4096), '')
73
73
74 def sendresponse(self, v):
74 def sendresponse(self, v):
75 self.fout.write("%d\n" % len(v))
75 self.fout.write("%d\n" % len(v))
76 self.fout.write(v)
76 self.fout.write(v)
77 self.fout.flush()
77 self.fout.flush()
78
78
79 def sendstream(self, source):
79 def sendstream(self, source):
80 write = self.fout.write
80 write = self.fout.write
81 for chunk in source.gen:
81 for chunk in source.gen:
82 write(chunk)
82 write(chunk)
83 self.fout.flush()
83 self.fout.flush()
84
84
85 def sendpushresponse(self, rsp):
85 def sendpushresponse(self, rsp):
86 self.sendresponse('')
86 self.sendresponse('')
87 self.sendresponse(str(rsp.res))
87 self.sendresponse(str(rsp.res))
88
88
89 def sendpusherror(self, rsp):
89 def sendpusherror(self, rsp):
90 self.sendresponse(rsp.res)
90 self.sendresponse(rsp.res)
91
91
92 def sendooberror(self, rsp):
92 def sendooberror(self, rsp):
93 self.ui.ferr.write('%s\n-\n' % rsp.message)
93 self.ui.ferr.write('%s\n-\n' % rsp.message)
94 self.ui.ferr.flush()
94 self.ui.ferr.flush()
95 self.fout.write('\n')
95 self.fout.write('\n')
96 self.fout.flush()
96 self.fout.flush()
97
97
98 def serve_forever(self):
98 def serve_forever(self):
99 try:
99 try:
100 while self.serve_one():
100 while self.serve_one():
101 pass
101 pass
102 finally:
102 finally:
103 if self.lock is not None:
103 if self.lock is not None:
104 self.lock.release()
104 self.lock.release()
105 sys.exit(0)
105 sys.exit(0)
106
106
107 handlers = {
107 handlers = {
108 str: sendresponse,
108 str: sendresponse,
109 wireproto.streamres: sendstream,
109 wireproto.streamres: sendstream,
110 wireproto.pushres: sendpushresponse,
110 wireproto.pushres: sendpushresponse,
111 wireproto.pusherr: sendpusherror,
111 wireproto.pusherr: sendpusherror,
112 wireproto.ooberror: sendooberror,
112 wireproto.ooberror: sendooberror,
113 }
113 }
114
114
115 def serve_one(self):
115 def serve_one(self):
116 cmd = self.fin.readline()[:-1]
116 cmd = self.fin.readline()[:-1]
117 if cmd and cmd in wireproto.commands:
117 if cmd and cmd in wireproto.commands:
118 rsp = wireproto.dispatch(self.repo, self, cmd)
118 rsp = wireproto.dispatch(self.repo, self, cmd)
119 self.handlers[rsp.__class__](self, rsp)
119 self.handlers[rsp.__class__](self, rsp)
120 elif cmd:
120 elif cmd:
121 impl = getattr(self, 'do_' + cmd, None)
121 impl = getattr(self, 'do_' + cmd, None)
122 if impl:
122 if impl:
123 r = impl()
123 r = impl()
124 if r is not None:
124 if r is not None:
125 self.sendresponse(r)
125 self.sendresponse(r)
126 else: self.sendresponse("")
126 else: self.sendresponse("")
127 return cmd != ''
127 return cmd != ''
128
128
129 def _client(self):
129 def _client(self):
130 client = os.environ.get('SSH_CLIENT', '').split(' ', 1)[0]
130 client = os.environ.get('SSH_CLIENT', '').split(' ', 1)[0]
131 return 'remote:ssh:' + client
131 return 'remote:ssh:' + client
@@ -1,956 +1,957 b''
1 # wireproto.py - generic wire protocol support functions
1 # wireproto.py - generic wire protocol support functions
2 #
2 #
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import hashlib
10 import hashlib
11 import itertools
11 import itertools
12 import os
12 import os
13 import sys
13 import sys
14 import tempfile
14 import tempfile
15
15
16 from .i18n import _
16 from .i18n import _
17 from .node import (
17 from .node import (
18 bin,
18 bin,
19 hex,
19 hex,
20 )
20 )
21
21
22 from . import (
22 from . import (
23 bundle2,
23 bundle2,
24 changegroup as changegroupmod,
24 changegroup as changegroupmod,
25 encoding,
25 encoding,
26 error,
26 error,
27 exchange,
27 exchange,
28 peer,
28 peer,
29 pushkey as pushkeymod,
29 pushkey as pushkeymod,
30 streamclone,
30 streamclone,
31 util,
31 util,
32 )
32 )
33
33
34 urlerr = util.urlerr
34 urlerr = util.urlerr
35 urlreq = util.urlreq
35 urlreq = util.urlreq
36
36
37 bundle2required = _(
37 bundle2required = _(
38 'incompatible Mercurial client; bundle2 required\n'
38 'incompatible Mercurial client; bundle2 required\n'
39 '(see https://www.mercurial-scm.org/wiki/IncompatibleClient)\n')
39 '(see https://www.mercurial-scm.org/wiki/IncompatibleClient)\n')
40
40
41 class abstractserverproto(object):
41 class abstractserverproto(object):
42 """abstract class that summarizes the protocol API
42 """abstract class that summarizes the protocol API
43
43
44 Used as reference and documentation.
44 Used as reference and documentation.
45 """
45 """
46
46
47 def getargs(self, args):
47 def getargs(self, args):
48 """return the value for arguments in <args>
48 """return the value for arguments in <args>
49
49
50 returns a list of values (same order as <args>)"""
50 returns a list of values (same order as <args>)"""
51 raise NotImplementedError()
51 raise NotImplementedError()
52
52
53 def getfile(self, fp):
53 def getfile(self, fp):
54 """write the whole content of a file into a file like object
54 """write the whole content of a file into a file like object
55
55
56 The file is in the form::
56 The file is in the form::
57
57
58 (<chunk-size>\n<chunk>)+0\n
58 (<chunk-size>\n<chunk>)+0\n
59
59
60 chunk size is the ascii version of the int.
60 chunk size is the ascii version of the int.
61 """
61 """
62 raise NotImplementedError()
62 raise NotImplementedError()
63
63
64 def redirect(self):
64 def redirect(self):
65 """may setup interception for stdout and stderr
65 """may setup interception for stdout and stderr
66
66
67 See also the `restore` method."""
67 See also the `restore` method."""
68 raise NotImplementedError()
68 raise NotImplementedError()
69
69
70 # If the `redirect` function does install interception, the `restore`
70 # If the `redirect` function does install interception, the `restore`
71 # function MUST be defined. If interception is not used, this function
71 # function MUST be defined. If interception is not used, this function
72 # MUST NOT be defined.
72 # MUST NOT be defined.
73 #
73 #
74 # left commented here on purpose
74 # left commented here on purpose
75 #
75 #
76 #def restore(self):
76 #def restore(self):
77 # """reinstall previous stdout and stderr and return intercepted stdout
77 # """reinstall previous stdout and stderr and return intercepted stdout
78 # """
78 # """
79 # raise NotImplementedError()
79 # raise NotImplementedError()
80
80
81 def groupchunks(self, cg):
81 def groupchunks(self, fh):
82 """return 4096 chunks from a changegroup object
82 """Generator of chunks to send to the client.
83
83
84 Some protocols may have compressed the contents."""
84 Some protocols may have compressed the contents.
85 """
85 raise NotImplementedError()
86 raise NotImplementedError()
86
87
87 class remotebatch(peer.batcher):
88 class remotebatch(peer.batcher):
88 '''batches the queued calls; uses as few roundtrips as possible'''
89 '''batches the queued calls; uses as few roundtrips as possible'''
89 def __init__(self, remote):
90 def __init__(self, remote):
90 '''remote must support _submitbatch(encbatch) and
91 '''remote must support _submitbatch(encbatch) and
91 _submitone(op, encargs)'''
92 _submitone(op, encargs)'''
92 peer.batcher.__init__(self)
93 peer.batcher.__init__(self)
93 self.remote = remote
94 self.remote = remote
94 def submit(self):
95 def submit(self):
95 req, rsp = [], []
96 req, rsp = [], []
96 for name, args, opts, resref in self.calls:
97 for name, args, opts, resref in self.calls:
97 mtd = getattr(self.remote, name)
98 mtd = getattr(self.remote, name)
98 batchablefn = getattr(mtd, 'batchable', None)
99 batchablefn = getattr(mtd, 'batchable', None)
99 if batchablefn is not None:
100 if batchablefn is not None:
100 batchable = batchablefn(mtd.im_self, *args, **opts)
101 batchable = batchablefn(mtd.im_self, *args, **opts)
101 encargsorres, encresref = next(batchable)
102 encargsorres, encresref = next(batchable)
102 if encresref:
103 if encresref:
103 req.append((name, encargsorres,))
104 req.append((name, encargsorres,))
104 rsp.append((batchable, encresref, resref,))
105 rsp.append((batchable, encresref, resref,))
105 else:
106 else:
106 resref.set(encargsorres)
107 resref.set(encargsorres)
107 else:
108 else:
108 if req:
109 if req:
109 self._submitreq(req, rsp)
110 self._submitreq(req, rsp)
110 req, rsp = [], []
111 req, rsp = [], []
111 resref.set(mtd(*args, **opts))
112 resref.set(mtd(*args, **opts))
112 if req:
113 if req:
113 self._submitreq(req, rsp)
114 self._submitreq(req, rsp)
114 def _submitreq(self, req, rsp):
115 def _submitreq(self, req, rsp):
115 encresults = self.remote._submitbatch(req)
116 encresults = self.remote._submitbatch(req)
116 for encres, r in zip(encresults, rsp):
117 for encres, r in zip(encresults, rsp):
117 batchable, encresref, resref = r
118 batchable, encresref, resref = r
118 encresref.set(encres)
119 encresref.set(encres)
119 resref.set(next(batchable))
120 resref.set(next(batchable))
120
121
121 class remoteiterbatcher(peer.iterbatcher):
122 class remoteiterbatcher(peer.iterbatcher):
122 def __init__(self, remote):
123 def __init__(self, remote):
123 super(remoteiterbatcher, self).__init__()
124 super(remoteiterbatcher, self).__init__()
124 self._remote = remote
125 self._remote = remote
125
126
126 def __getattr__(self, name):
127 def __getattr__(self, name):
127 if not getattr(self._remote, name, False):
128 if not getattr(self._remote, name, False):
128 raise AttributeError(
129 raise AttributeError(
129 'Attempted to iterbatch non-batchable call to %r' % name)
130 'Attempted to iterbatch non-batchable call to %r' % name)
130 return super(remoteiterbatcher, self).__getattr__(name)
131 return super(remoteiterbatcher, self).__getattr__(name)
131
132
132 def submit(self):
133 def submit(self):
133 """Break the batch request into many patch calls and pipeline them.
134 """Break the batch request into many patch calls and pipeline them.
134
135
135 This is mostly valuable over http where request sizes can be
136 This is mostly valuable over http where request sizes can be
136 limited, but can be used in other places as well.
137 limited, but can be used in other places as well.
137 """
138 """
138 req, rsp = [], []
139 req, rsp = [], []
139 for name, args, opts, resref in self.calls:
140 for name, args, opts, resref in self.calls:
140 mtd = getattr(self._remote, name)
141 mtd = getattr(self._remote, name)
141 batchable = mtd.batchable(mtd.im_self, *args, **opts)
142 batchable = mtd.batchable(mtd.im_self, *args, **opts)
142 encargsorres, encresref = next(batchable)
143 encargsorres, encresref = next(batchable)
143 assert encresref
144 assert encresref
144 req.append((name, encargsorres))
145 req.append((name, encargsorres))
145 rsp.append((batchable, encresref))
146 rsp.append((batchable, encresref))
146 if req:
147 if req:
147 self._resultiter = self._remote._submitbatch(req)
148 self._resultiter = self._remote._submitbatch(req)
148 self._rsp = rsp
149 self._rsp = rsp
149
150
150 def results(self):
151 def results(self):
151 for (batchable, encresref), encres in itertools.izip(
152 for (batchable, encresref), encres in itertools.izip(
152 self._rsp, self._resultiter):
153 self._rsp, self._resultiter):
153 encresref.set(encres)
154 encresref.set(encres)
154 yield next(batchable)
155 yield next(batchable)
155
156
156 # Forward a couple of names from peer to make wireproto interactions
157 # Forward a couple of names from peer to make wireproto interactions
157 # slightly more sensible.
158 # slightly more sensible.
158 batchable = peer.batchable
159 batchable = peer.batchable
159 future = peer.future
160 future = peer.future
160
161
161 # list of nodes encoding / decoding
162 # list of nodes encoding / decoding
162
163
163 def decodelist(l, sep=' '):
164 def decodelist(l, sep=' '):
164 if l:
165 if l:
165 return map(bin, l.split(sep))
166 return map(bin, l.split(sep))
166 return []
167 return []
167
168
168 def encodelist(l, sep=' '):
169 def encodelist(l, sep=' '):
169 try:
170 try:
170 return sep.join(map(hex, l))
171 return sep.join(map(hex, l))
171 except TypeError:
172 except TypeError:
172 raise
173 raise
173
174
174 # batched call argument encoding
175 # batched call argument encoding
175
176
176 def escapearg(plain):
177 def escapearg(plain):
177 return (plain
178 return (plain
178 .replace(':', ':c')
179 .replace(':', ':c')
179 .replace(',', ':o')
180 .replace(',', ':o')
180 .replace(';', ':s')
181 .replace(';', ':s')
181 .replace('=', ':e'))
182 .replace('=', ':e'))
182
183
183 def unescapearg(escaped):
184 def unescapearg(escaped):
184 return (escaped
185 return (escaped
185 .replace(':e', '=')
186 .replace(':e', '=')
186 .replace(':s', ';')
187 .replace(':s', ';')
187 .replace(':o', ',')
188 .replace(':o', ',')
188 .replace(':c', ':'))
189 .replace(':c', ':'))
189
190
190 def encodebatchcmds(req):
191 def encodebatchcmds(req):
191 """Return a ``cmds`` argument value for the ``batch`` command."""
192 """Return a ``cmds`` argument value for the ``batch`` command."""
192 cmds = []
193 cmds = []
193 for op, argsdict in req:
194 for op, argsdict in req:
194 # Old servers didn't properly unescape argument names. So prevent
195 # Old servers didn't properly unescape argument names. So prevent
195 # the sending of argument names that may not be decoded properly by
196 # the sending of argument names that may not be decoded properly by
196 # servers.
197 # servers.
197 assert all(escapearg(k) == k for k in argsdict)
198 assert all(escapearg(k) == k for k in argsdict)
198
199
199 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
200 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
200 for k, v in argsdict.iteritems())
201 for k, v in argsdict.iteritems())
201 cmds.append('%s %s' % (op, args))
202 cmds.append('%s %s' % (op, args))
202
203
203 return ';'.join(cmds)
204 return ';'.join(cmds)
204
205
205 # mapping of options accepted by getbundle and their types
206 # mapping of options accepted by getbundle and their types
206 #
207 #
207 # Meant to be extended by extensions. It is extensions responsibility to ensure
208 # Meant to be extended by extensions. It is extensions responsibility to ensure
208 # such options are properly processed in exchange.getbundle.
209 # such options are properly processed in exchange.getbundle.
209 #
210 #
210 # supported types are:
211 # supported types are:
211 #
212 #
212 # :nodes: list of binary nodes
213 # :nodes: list of binary nodes
213 # :csv: list of comma-separated values
214 # :csv: list of comma-separated values
214 # :scsv: list of comma-separated values return as set
215 # :scsv: list of comma-separated values return as set
215 # :plain: string with no transformation needed.
216 # :plain: string with no transformation needed.
216 gboptsmap = {'heads': 'nodes',
217 gboptsmap = {'heads': 'nodes',
217 'common': 'nodes',
218 'common': 'nodes',
218 'obsmarkers': 'boolean',
219 'obsmarkers': 'boolean',
219 'bundlecaps': 'scsv',
220 'bundlecaps': 'scsv',
220 'listkeys': 'csv',
221 'listkeys': 'csv',
221 'cg': 'boolean',
222 'cg': 'boolean',
222 'cbattempted': 'boolean'}
223 'cbattempted': 'boolean'}
223
224
224 # client side
225 # client side
225
226
226 class wirepeer(peer.peerrepository):
227 class wirepeer(peer.peerrepository):
227 """Client-side interface for communicating with a peer repository.
228 """Client-side interface for communicating with a peer repository.
228
229
229 Methods commonly call wire protocol commands of the same name.
230 Methods commonly call wire protocol commands of the same name.
230
231
231 See also httppeer.py and sshpeer.py for protocol-specific
232 See also httppeer.py and sshpeer.py for protocol-specific
232 implementations of this interface.
233 implementations of this interface.
233 """
234 """
234 def batch(self):
235 def batch(self):
235 if self.capable('batch'):
236 if self.capable('batch'):
236 return remotebatch(self)
237 return remotebatch(self)
237 else:
238 else:
238 return peer.localbatch(self)
239 return peer.localbatch(self)
239 def _submitbatch(self, req):
240 def _submitbatch(self, req):
240 """run batch request <req> on the server
241 """run batch request <req> on the server
241
242
242 Returns an iterator of the raw responses from the server.
243 Returns an iterator of the raw responses from the server.
243 """
244 """
244 rsp = self._callstream("batch", cmds=encodebatchcmds(req))
245 rsp = self._callstream("batch", cmds=encodebatchcmds(req))
245 chunk = rsp.read(1024)
246 chunk = rsp.read(1024)
246 work = [chunk]
247 work = [chunk]
247 while chunk:
248 while chunk:
248 while ';' not in chunk and chunk:
249 while ';' not in chunk and chunk:
249 chunk = rsp.read(1024)
250 chunk = rsp.read(1024)
250 work.append(chunk)
251 work.append(chunk)
251 merged = ''.join(work)
252 merged = ''.join(work)
252 while ';' in merged:
253 while ';' in merged:
253 one, merged = merged.split(';', 1)
254 one, merged = merged.split(';', 1)
254 yield unescapearg(one)
255 yield unescapearg(one)
255 chunk = rsp.read(1024)
256 chunk = rsp.read(1024)
256 work = [merged, chunk]
257 work = [merged, chunk]
257 yield unescapearg(''.join(work))
258 yield unescapearg(''.join(work))
258
259
259 def _submitone(self, op, args):
260 def _submitone(self, op, args):
260 return self._call(op, **args)
261 return self._call(op, **args)
261
262
262 def iterbatch(self):
263 def iterbatch(self):
263 return remoteiterbatcher(self)
264 return remoteiterbatcher(self)
264
265
265 @batchable
266 @batchable
266 def lookup(self, key):
267 def lookup(self, key):
267 self.requirecap('lookup', _('look up remote revision'))
268 self.requirecap('lookup', _('look up remote revision'))
268 f = future()
269 f = future()
269 yield {'key': encoding.fromlocal(key)}, f
270 yield {'key': encoding.fromlocal(key)}, f
270 d = f.value
271 d = f.value
271 success, data = d[:-1].split(" ", 1)
272 success, data = d[:-1].split(" ", 1)
272 if int(success):
273 if int(success):
273 yield bin(data)
274 yield bin(data)
274 self._abort(error.RepoError(data))
275 self._abort(error.RepoError(data))
275
276
276 @batchable
277 @batchable
277 def heads(self):
278 def heads(self):
278 f = future()
279 f = future()
279 yield {}, f
280 yield {}, f
280 d = f.value
281 d = f.value
281 try:
282 try:
282 yield decodelist(d[:-1])
283 yield decodelist(d[:-1])
283 except ValueError:
284 except ValueError:
284 self._abort(error.ResponseError(_("unexpected response:"), d))
285 self._abort(error.ResponseError(_("unexpected response:"), d))
285
286
286 @batchable
287 @batchable
287 def known(self, nodes):
288 def known(self, nodes):
288 f = future()
289 f = future()
289 yield {'nodes': encodelist(nodes)}, f
290 yield {'nodes': encodelist(nodes)}, f
290 d = f.value
291 d = f.value
291 try:
292 try:
292 yield [bool(int(b)) for b in d]
293 yield [bool(int(b)) for b in d]
293 except ValueError:
294 except ValueError:
294 self._abort(error.ResponseError(_("unexpected response:"), d))
295 self._abort(error.ResponseError(_("unexpected response:"), d))
295
296
296 @batchable
297 @batchable
297 def branchmap(self):
298 def branchmap(self):
298 f = future()
299 f = future()
299 yield {}, f
300 yield {}, f
300 d = f.value
301 d = f.value
301 try:
302 try:
302 branchmap = {}
303 branchmap = {}
303 for branchpart in d.splitlines():
304 for branchpart in d.splitlines():
304 branchname, branchheads = branchpart.split(' ', 1)
305 branchname, branchheads = branchpart.split(' ', 1)
305 branchname = encoding.tolocal(urlreq.unquote(branchname))
306 branchname = encoding.tolocal(urlreq.unquote(branchname))
306 branchheads = decodelist(branchheads)
307 branchheads = decodelist(branchheads)
307 branchmap[branchname] = branchheads
308 branchmap[branchname] = branchheads
308 yield branchmap
309 yield branchmap
309 except TypeError:
310 except TypeError:
310 self._abort(error.ResponseError(_("unexpected response:"), d))
311 self._abort(error.ResponseError(_("unexpected response:"), d))
311
312
312 def branches(self, nodes):
313 def branches(self, nodes):
313 n = encodelist(nodes)
314 n = encodelist(nodes)
314 d = self._call("branches", nodes=n)
315 d = self._call("branches", nodes=n)
315 try:
316 try:
316 br = [tuple(decodelist(b)) for b in d.splitlines()]
317 br = [tuple(decodelist(b)) for b in d.splitlines()]
317 return br
318 return br
318 except ValueError:
319 except ValueError:
319 self._abort(error.ResponseError(_("unexpected response:"), d))
320 self._abort(error.ResponseError(_("unexpected response:"), d))
320
321
321 def between(self, pairs):
322 def between(self, pairs):
322 batch = 8 # avoid giant requests
323 batch = 8 # avoid giant requests
323 r = []
324 r = []
324 for i in xrange(0, len(pairs), batch):
325 for i in xrange(0, len(pairs), batch):
325 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
326 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
326 d = self._call("between", pairs=n)
327 d = self._call("between", pairs=n)
327 try:
328 try:
328 r.extend(l and decodelist(l) or [] for l in d.splitlines())
329 r.extend(l and decodelist(l) or [] for l in d.splitlines())
329 except ValueError:
330 except ValueError:
330 self._abort(error.ResponseError(_("unexpected response:"), d))
331 self._abort(error.ResponseError(_("unexpected response:"), d))
331 return r
332 return r
332
333
333 @batchable
334 @batchable
334 def pushkey(self, namespace, key, old, new):
335 def pushkey(self, namespace, key, old, new):
335 if not self.capable('pushkey'):
336 if not self.capable('pushkey'):
336 yield False, None
337 yield False, None
337 f = future()
338 f = future()
338 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
339 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
339 yield {'namespace': encoding.fromlocal(namespace),
340 yield {'namespace': encoding.fromlocal(namespace),
340 'key': encoding.fromlocal(key),
341 'key': encoding.fromlocal(key),
341 'old': encoding.fromlocal(old),
342 'old': encoding.fromlocal(old),
342 'new': encoding.fromlocal(new)}, f
343 'new': encoding.fromlocal(new)}, f
343 d = f.value
344 d = f.value
344 d, output = d.split('\n', 1)
345 d, output = d.split('\n', 1)
345 try:
346 try:
346 d = bool(int(d))
347 d = bool(int(d))
347 except ValueError:
348 except ValueError:
348 raise error.ResponseError(
349 raise error.ResponseError(
349 _('push failed (unexpected response):'), d)
350 _('push failed (unexpected response):'), d)
350 for l in output.splitlines(True):
351 for l in output.splitlines(True):
351 self.ui.status(_('remote: '), l)
352 self.ui.status(_('remote: '), l)
352 yield d
353 yield d
353
354
354 @batchable
355 @batchable
355 def listkeys(self, namespace):
356 def listkeys(self, namespace):
356 if not self.capable('pushkey'):
357 if not self.capable('pushkey'):
357 yield {}, None
358 yield {}, None
358 f = future()
359 f = future()
359 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
360 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
360 yield {'namespace': encoding.fromlocal(namespace)}, f
361 yield {'namespace': encoding.fromlocal(namespace)}, f
361 d = f.value
362 d = f.value
362 self.ui.debug('received listkey for "%s": %i bytes\n'
363 self.ui.debug('received listkey for "%s": %i bytes\n'
363 % (namespace, len(d)))
364 % (namespace, len(d)))
364 yield pushkeymod.decodekeys(d)
365 yield pushkeymod.decodekeys(d)
365
366
366 def stream_out(self):
367 def stream_out(self):
367 return self._callstream('stream_out')
368 return self._callstream('stream_out')
368
369
369 def changegroup(self, nodes, kind):
370 def changegroup(self, nodes, kind):
370 n = encodelist(nodes)
371 n = encodelist(nodes)
371 f = self._callcompressable("changegroup", roots=n)
372 f = self._callcompressable("changegroup", roots=n)
372 return changegroupmod.cg1unpacker(f, 'UN')
373 return changegroupmod.cg1unpacker(f, 'UN')
373
374
374 def changegroupsubset(self, bases, heads, kind):
375 def changegroupsubset(self, bases, heads, kind):
375 self.requirecap('changegroupsubset', _('look up remote changes'))
376 self.requirecap('changegroupsubset', _('look up remote changes'))
376 bases = encodelist(bases)
377 bases = encodelist(bases)
377 heads = encodelist(heads)
378 heads = encodelist(heads)
378 f = self._callcompressable("changegroupsubset",
379 f = self._callcompressable("changegroupsubset",
379 bases=bases, heads=heads)
380 bases=bases, heads=heads)
380 return changegroupmod.cg1unpacker(f, 'UN')
381 return changegroupmod.cg1unpacker(f, 'UN')
381
382
382 def getbundle(self, source, **kwargs):
383 def getbundle(self, source, **kwargs):
383 self.requirecap('getbundle', _('look up remote changes'))
384 self.requirecap('getbundle', _('look up remote changes'))
384 opts = {}
385 opts = {}
385 bundlecaps = kwargs.get('bundlecaps')
386 bundlecaps = kwargs.get('bundlecaps')
386 if bundlecaps is not None:
387 if bundlecaps is not None:
387 kwargs['bundlecaps'] = sorted(bundlecaps)
388 kwargs['bundlecaps'] = sorted(bundlecaps)
388 else:
389 else:
389 bundlecaps = () # kwargs could have it to None
390 bundlecaps = () # kwargs could have it to None
390 for key, value in kwargs.iteritems():
391 for key, value in kwargs.iteritems():
391 if value is None:
392 if value is None:
392 continue
393 continue
393 keytype = gboptsmap.get(key)
394 keytype = gboptsmap.get(key)
394 if keytype is None:
395 if keytype is None:
395 assert False, 'unexpected'
396 assert False, 'unexpected'
396 elif keytype == 'nodes':
397 elif keytype == 'nodes':
397 value = encodelist(value)
398 value = encodelist(value)
398 elif keytype in ('csv', 'scsv'):
399 elif keytype in ('csv', 'scsv'):
399 value = ','.join(value)
400 value = ','.join(value)
400 elif keytype == 'boolean':
401 elif keytype == 'boolean':
401 value = '%i' % bool(value)
402 value = '%i' % bool(value)
402 elif keytype != 'plain':
403 elif keytype != 'plain':
403 raise KeyError('unknown getbundle option type %s'
404 raise KeyError('unknown getbundle option type %s'
404 % keytype)
405 % keytype)
405 opts[key] = value
406 opts[key] = value
406 f = self._callcompressable("getbundle", **opts)
407 f = self._callcompressable("getbundle", **opts)
407 if any((cap.startswith('HG2') for cap in bundlecaps)):
408 if any((cap.startswith('HG2') for cap in bundlecaps)):
408 return bundle2.getunbundler(self.ui, f)
409 return bundle2.getunbundler(self.ui, f)
409 else:
410 else:
410 return changegroupmod.cg1unpacker(f, 'UN')
411 return changegroupmod.cg1unpacker(f, 'UN')
411
412
412 def unbundle(self, cg, heads, url):
413 def unbundle(self, cg, heads, url):
413 '''Send cg (a readable file-like object representing the
414 '''Send cg (a readable file-like object representing the
414 changegroup to push, typically a chunkbuffer object) to the
415 changegroup to push, typically a chunkbuffer object) to the
415 remote server as a bundle.
416 remote server as a bundle.
416
417
417 When pushing a bundle10 stream, return an integer indicating the
418 When pushing a bundle10 stream, return an integer indicating the
418 result of the push (see localrepository.addchangegroup()).
419 result of the push (see localrepository.addchangegroup()).
419
420
420 When pushing a bundle20 stream, return a bundle20 stream.
421 When pushing a bundle20 stream, return a bundle20 stream.
421
422
422 `url` is the url the client thinks it's pushing to, which is
423 `url` is the url the client thinks it's pushing to, which is
423 visible to hooks.
424 visible to hooks.
424 '''
425 '''
425
426
426 if heads != ['force'] and self.capable('unbundlehash'):
427 if heads != ['force'] and self.capable('unbundlehash'):
427 heads = encodelist(['hashed',
428 heads = encodelist(['hashed',
428 hashlib.sha1(''.join(sorted(heads))).digest()])
429 hashlib.sha1(''.join(sorted(heads))).digest()])
429 else:
430 else:
430 heads = encodelist(heads)
431 heads = encodelist(heads)
431
432
432 if util.safehasattr(cg, 'deltaheader'):
433 if util.safehasattr(cg, 'deltaheader'):
433 # this a bundle10, do the old style call sequence
434 # this a bundle10, do the old style call sequence
434 ret, output = self._callpush("unbundle", cg, heads=heads)
435 ret, output = self._callpush("unbundle", cg, heads=heads)
435 if ret == "":
436 if ret == "":
436 raise error.ResponseError(
437 raise error.ResponseError(
437 _('push failed:'), output)
438 _('push failed:'), output)
438 try:
439 try:
439 ret = int(ret)
440 ret = int(ret)
440 except ValueError:
441 except ValueError:
441 raise error.ResponseError(
442 raise error.ResponseError(
442 _('push failed (unexpected response):'), ret)
443 _('push failed (unexpected response):'), ret)
443
444
444 for l in output.splitlines(True):
445 for l in output.splitlines(True):
445 self.ui.status(_('remote: '), l)
446 self.ui.status(_('remote: '), l)
446 else:
447 else:
447 # bundle2 push. Send a stream, fetch a stream.
448 # bundle2 push. Send a stream, fetch a stream.
448 stream = self._calltwowaystream('unbundle', cg, heads=heads)
449 stream = self._calltwowaystream('unbundle', cg, heads=heads)
449 ret = bundle2.getunbundler(self.ui, stream)
450 ret = bundle2.getunbundler(self.ui, stream)
450 return ret
451 return ret
451
452
452 def debugwireargs(self, one, two, three=None, four=None, five=None):
453 def debugwireargs(self, one, two, three=None, four=None, five=None):
453 # don't pass optional arguments left at their default value
454 # don't pass optional arguments left at their default value
454 opts = {}
455 opts = {}
455 if three is not None:
456 if three is not None:
456 opts['three'] = three
457 opts['three'] = three
457 if four is not None:
458 if four is not None:
458 opts['four'] = four
459 opts['four'] = four
459 return self._call('debugwireargs', one=one, two=two, **opts)
460 return self._call('debugwireargs', one=one, two=two, **opts)
460
461
461 def _call(self, cmd, **args):
462 def _call(self, cmd, **args):
462 """execute <cmd> on the server
463 """execute <cmd> on the server
463
464
464 The command is expected to return a simple string.
465 The command is expected to return a simple string.
465
466
466 returns the server reply as a string."""
467 returns the server reply as a string."""
467 raise NotImplementedError()
468 raise NotImplementedError()
468
469
469 def _callstream(self, cmd, **args):
470 def _callstream(self, cmd, **args):
470 """execute <cmd> on the server
471 """execute <cmd> on the server
471
472
472 The command is expected to return a stream. Note that if the
473 The command is expected to return a stream. Note that if the
473 command doesn't return a stream, _callstream behaves
474 command doesn't return a stream, _callstream behaves
474 differently for ssh and http peers.
475 differently for ssh and http peers.
475
476
476 returns the server reply as a file like object.
477 returns the server reply as a file like object.
477 """
478 """
478 raise NotImplementedError()
479 raise NotImplementedError()
479
480
480 def _callcompressable(self, cmd, **args):
481 def _callcompressable(self, cmd, **args):
481 """execute <cmd> on the server
482 """execute <cmd> on the server
482
483
483 The command is expected to return a stream.
484 The command is expected to return a stream.
484
485
485 The stream may have been compressed in some implementations. This
486 The stream may have been compressed in some implementations. This
486 function takes care of the decompression. This is the only difference
487 function takes care of the decompression. This is the only difference
487 with _callstream.
488 with _callstream.
488
489
489 returns the server reply as a file like object.
490 returns the server reply as a file like object.
490 """
491 """
491 raise NotImplementedError()
492 raise NotImplementedError()
492
493
493 def _callpush(self, cmd, fp, **args):
494 def _callpush(self, cmd, fp, **args):
494 """execute a <cmd> on server
495 """execute a <cmd> on server
495
496
496 The command is expected to be related to a push. Push has a special
497 The command is expected to be related to a push. Push has a special
497 return method.
498 return method.
498
499
499 returns the server reply as a (ret, output) tuple. ret is either
500 returns the server reply as a (ret, output) tuple. ret is either
500 empty (error) or a stringified int.
501 empty (error) or a stringified int.
501 """
502 """
502 raise NotImplementedError()
503 raise NotImplementedError()
503
504
504 def _calltwowaystream(self, cmd, fp, **args):
505 def _calltwowaystream(self, cmd, fp, **args):
505 """execute <cmd> on server
506 """execute <cmd> on server
506
507
507 The command will send a stream to the server and get a stream in reply.
508 The command will send a stream to the server and get a stream in reply.
508 """
509 """
509 raise NotImplementedError()
510 raise NotImplementedError()
510
511
511 def _abort(self, exception):
512 def _abort(self, exception):
512 """clearly abort the wire protocol connection and raise the exception
513 """clearly abort the wire protocol connection and raise the exception
513 """
514 """
514 raise NotImplementedError()
515 raise NotImplementedError()
515
516
516 # server side
517 # server side
517
518
518 # wire protocol command can either return a string or one of these classes.
519 # wire protocol command can either return a string or one of these classes.
519 class streamres(object):
520 class streamres(object):
520 """wireproto reply: binary stream
521 """wireproto reply: binary stream
521
522
522 The call was successful and the result is a stream.
523 The call was successful and the result is a stream.
523 Iterate on the `self.gen` attribute to retrieve chunks.
524 Iterate on the `self.gen` attribute to retrieve chunks.
524 """
525 """
525 def __init__(self, gen):
526 def __init__(self, gen):
526 self.gen = gen
527 self.gen = gen
527
528
528 class pushres(object):
529 class pushres(object):
529 """wireproto reply: success with simple integer return
530 """wireproto reply: success with simple integer return
530
531
531 The call was successful and returned an integer contained in `self.res`.
532 The call was successful and returned an integer contained in `self.res`.
532 """
533 """
533 def __init__(self, res):
534 def __init__(self, res):
534 self.res = res
535 self.res = res
535
536
536 class pusherr(object):
537 class pusherr(object):
537 """wireproto reply: failure
538 """wireproto reply: failure
538
539
539 The call failed. The `self.res` attribute contains the error message.
540 The call failed. The `self.res` attribute contains the error message.
540 """
541 """
541 def __init__(self, res):
542 def __init__(self, res):
542 self.res = res
543 self.res = res
543
544
544 class ooberror(object):
545 class ooberror(object):
545 """wireproto reply: failure of a batch of operation
546 """wireproto reply: failure of a batch of operation
546
547
547 Something failed during a batch call. The error message is stored in
548 Something failed during a batch call. The error message is stored in
548 `self.message`.
549 `self.message`.
549 """
550 """
550 def __init__(self, message):
551 def __init__(self, message):
551 self.message = message
552 self.message = message
552
553
553 def getdispatchrepo(repo, proto, command):
554 def getdispatchrepo(repo, proto, command):
554 """Obtain the repo used for processing wire protocol commands.
555 """Obtain the repo used for processing wire protocol commands.
555
556
556 The intent of this function is to serve as a monkeypatch point for
557 The intent of this function is to serve as a monkeypatch point for
557 extensions that need commands to operate on different repo views under
558 extensions that need commands to operate on different repo views under
558 specialized circumstances.
559 specialized circumstances.
559 """
560 """
560 return repo.filtered('served')
561 return repo.filtered('served')
561
562
562 def dispatch(repo, proto, command):
563 def dispatch(repo, proto, command):
563 repo = getdispatchrepo(repo, proto, command)
564 repo = getdispatchrepo(repo, proto, command)
564 func, spec = commands[command]
565 func, spec = commands[command]
565 args = proto.getargs(spec)
566 args = proto.getargs(spec)
566 return func(repo, proto, *args)
567 return func(repo, proto, *args)
567
568
568 def options(cmd, keys, others):
569 def options(cmd, keys, others):
569 opts = {}
570 opts = {}
570 for k in keys:
571 for k in keys:
571 if k in others:
572 if k in others:
572 opts[k] = others[k]
573 opts[k] = others[k]
573 del others[k]
574 del others[k]
574 if others:
575 if others:
575 sys.stderr.write("warning: %s ignored unexpected arguments %s\n"
576 sys.stderr.write("warning: %s ignored unexpected arguments %s\n"
576 % (cmd, ",".join(others)))
577 % (cmd, ",".join(others)))
577 return opts
578 return opts
578
579
579 def bundle1allowed(repo, action):
580 def bundle1allowed(repo, action):
580 """Whether a bundle1 operation is allowed from the server.
581 """Whether a bundle1 operation is allowed from the server.
581
582
582 Priority is:
583 Priority is:
583
584
584 1. server.bundle1gd.<action> (if generaldelta active)
585 1. server.bundle1gd.<action> (if generaldelta active)
585 2. server.bundle1.<action>
586 2. server.bundle1.<action>
586 3. server.bundle1gd (if generaldelta active)
587 3. server.bundle1gd (if generaldelta active)
587 4. server.bundle1
588 4. server.bundle1
588 """
589 """
589 ui = repo.ui
590 ui = repo.ui
590 gd = 'generaldelta' in repo.requirements
591 gd = 'generaldelta' in repo.requirements
591
592
592 if gd:
593 if gd:
593 v = ui.configbool('server', 'bundle1gd.%s' % action, None)
594 v = ui.configbool('server', 'bundle1gd.%s' % action, None)
594 if v is not None:
595 if v is not None:
595 return v
596 return v
596
597
597 v = ui.configbool('server', 'bundle1.%s' % action, None)
598 v = ui.configbool('server', 'bundle1.%s' % action, None)
598 if v is not None:
599 if v is not None:
599 return v
600 return v
600
601
601 if gd:
602 if gd:
602 v = ui.configbool('server', 'bundle1gd', None)
603 v = ui.configbool('server', 'bundle1gd', None)
603 if v is not None:
604 if v is not None:
604 return v
605 return v
605
606
606 return ui.configbool('server', 'bundle1', True)
607 return ui.configbool('server', 'bundle1', True)
607
608
608 # list of commands
609 # list of commands
609 commands = {}
610 commands = {}
610
611
611 def wireprotocommand(name, args=''):
612 def wireprotocommand(name, args=''):
612 """decorator for wire protocol command"""
613 """decorator for wire protocol command"""
613 def register(func):
614 def register(func):
614 commands[name] = (func, args)
615 commands[name] = (func, args)
615 return func
616 return func
616 return register
617 return register
617
618
618 @wireprotocommand('batch', 'cmds *')
619 @wireprotocommand('batch', 'cmds *')
619 def batch(repo, proto, cmds, others):
620 def batch(repo, proto, cmds, others):
620 repo = repo.filtered("served")
621 repo = repo.filtered("served")
621 res = []
622 res = []
622 for pair in cmds.split(';'):
623 for pair in cmds.split(';'):
623 op, args = pair.split(' ', 1)
624 op, args = pair.split(' ', 1)
624 vals = {}
625 vals = {}
625 for a in args.split(','):
626 for a in args.split(','):
626 if a:
627 if a:
627 n, v = a.split('=')
628 n, v = a.split('=')
628 vals[unescapearg(n)] = unescapearg(v)
629 vals[unescapearg(n)] = unescapearg(v)
629 func, spec = commands[op]
630 func, spec = commands[op]
630 if spec:
631 if spec:
631 keys = spec.split()
632 keys = spec.split()
632 data = {}
633 data = {}
633 for k in keys:
634 for k in keys:
634 if k == '*':
635 if k == '*':
635 star = {}
636 star = {}
636 for key in vals.keys():
637 for key in vals.keys():
637 if key not in keys:
638 if key not in keys:
638 star[key] = vals[key]
639 star[key] = vals[key]
639 data['*'] = star
640 data['*'] = star
640 else:
641 else:
641 data[k] = vals[k]
642 data[k] = vals[k]
642 result = func(repo, proto, *[data[k] for k in keys])
643 result = func(repo, proto, *[data[k] for k in keys])
643 else:
644 else:
644 result = func(repo, proto)
645 result = func(repo, proto)
645 if isinstance(result, ooberror):
646 if isinstance(result, ooberror):
646 return result
647 return result
647 res.append(escapearg(result))
648 res.append(escapearg(result))
648 return ';'.join(res)
649 return ';'.join(res)
649
650
650 @wireprotocommand('between', 'pairs')
651 @wireprotocommand('between', 'pairs')
651 def between(repo, proto, pairs):
652 def between(repo, proto, pairs):
652 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
653 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
653 r = []
654 r = []
654 for b in repo.between(pairs):
655 for b in repo.between(pairs):
655 r.append(encodelist(b) + "\n")
656 r.append(encodelist(b) + "\n")
656 return "".join(r)
657 return "".join(r)
657
658
658 @wireprotocommand('branchmap')
659 @wireprotocommand('branchmap')
659 def branchmap(repo, proto):
660 def branchmap(repo, proto):
660 branchmap = repo.branchmap()
661 branchmap = repo.branchmap()
661 heads = []
662 heads = []
662 for branch, nodes in branchmap.iteritems():
663 for branch, nodes in branchmap.iteritems():
663 branchname = urlreq.quote(encoding.fromlocal(branch))
664 branchname = urlreq.quote(encoding.fromlocal(branch))
664 branchnodes = encodelist(nodes)
665 branchnodes = encodelist(nodes)
665 heads.append('%s %s' % (branchname, branchnodes))
666 heads.append('%s %s' % (branchname, branchnodes))
666 return '\n'.join(heads)
667 return '\n'.join(heads)
667
668
668 @wireprotocommand('branches', 'nodes')
669 @wireprotocommand('branches', 'nodes')
669 def branches(repo, proto, nodes):
670 def branches(repo, proto, nodes):
670 nodes = decodelist(nodes)
671 nodes = decodelist(nodes)
671 r = []
672 r = []
672 for b in repo.branches(nodes):
673 for b in repo.branches(nodes):
673 r.append(encodelist(b) + "\n")
674 r.append(encodelist(b) + "\n")
674 return "".join(r)
675 return "".join(r)
675
676
676 @wireprotocommand('clonebundles', '')
677 @wireprotocommand('clonebundles', '')
677 def clonebundles(repo, proto):
678 def clonebundles(repo, proto):
678 """Server command for returning info for available bundles to seed clones.
679 """Server command for returning info for available bundles to seed clones.
679
680
680 Clients will parse this response and determine what bundle to fetch.
681 Clients will parse this response and determine what bundle to fetch.
681
682
682 Extensions may wrap this command to filter or dynamically emit data
683 Extensions may wrap this command to filter or dynamically emit data
683 depending on the request. e.g. you could advertise URLs for the closest
684 depending on the request. e.g. you could advertise URLs for the closest
684 data center given the client's IP address.
685 data center given the client's IP address.
685 """
686 """
686 return repo.opener.tryread('clonebundles.manifest')
687 return repo.opener.tryread('clonebundles.manifest')
687
688
688 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
689 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
689 'known', 'getbundle', 'unbundlehash', 'batch']
690 'known', 'getbundle', 'unbundlehash', 'batch']
690
691
691 def _capabilities(repo, proto):
692 def _capabilities(repo, proto):
692 """return a list of capabilities for a repo
693 """return a list of capabilities for a repo
693
694
694 This function exists to allow extensions to easily wrap capabilities
695 This function exists to allow extensions to easily wrap capabilities
695 computation
696 computation
696
697
697 - returns a lists: easy to alter
698 - returns a lists: easy to alter
698 - change done here will be propagated to both `capabilities` and `hello`
699 - change done here will be propagated to both `capabilities` and `hello`
699 command without any other action needed.
700 command without any other action needed.
700 """
701 """
701 # copy to prevent modification of the global list
702 # copy to prevent modification of the global list
702 caps = list(wireprotocaps)
703 caps = list(wireprotocaps)
703 if streamclone.allowservergeneration(repo.ui):
704 if streamclone.allowservergeneration(repo.ui):
704 if repo.ui.configbool('server', 'preferuncompressed', False):
705 if repo.ui.configbool('server', 'preferuncompressed', False):
705 caps.append('stream-preferred')
706 caps.append('stream-preferred')
706 requiredformats = repo.requirements & repo.supportedformats
707 requiredformats = repo.requirements & repo.supportedformats
707 # if our local revlogs are just revlogv1, add 'stream' cap
708 # if our local revlogs are just revlogv1, add 'stream' cap
708 if not requiredformats - set(('revlogv1',)):
709 if not requiredformats - set(('revlogv1',)):
709 caps.append('stream')
710 caps.append('stream')
710 # otherwise, add 'streamreqs' detailing our local revlog format
711 # otherwise, add 'streamreqs' detailing our local revlog format
711 else:
712 else:
712 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
713 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
713 if repo.ui.configbool('experimental', 'bundle2-advertise', True):
714 if repo.ui.configbool('experimental', 'bundle2-advertise', True):
714 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
715 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
715 caps.append('bundle2=' + urlreq.quote(capsblob))
716 caps.append('bundle2=' + urlreq.quote(capsblob))
716 caps.append('unbundle=%s' % ','.join(bundle2.bundlepriority))
717 caps.append('unbundle=%s' % ','.join(bundle2.bundlepriority))
717 caps.append(
718 caps.append(
718 'httpheader=%d' % repo.ui.configint('server', 'maxhttpheaderlen', 1024))
719 'httpheader=%d' % repo.ui.configint('server', 'maxhttpheaderlen', 1024))
719 if repo.ui.configbool('experimental', 'httppostargs', False):
720 if repo.ui.configbool('experimental', 'httppostargs', False):
720 caps.append('httppostargs')
721 caps.append('httppostargs')
721 return caps
722 return caps
722
723
723 # If you are writing an extension and consider wrapping this function. Wrap
724 # If you are writing an extension and consider wrapping this function. Wrap
724 # `_capabilities` instead.
725 # `_capabilities` instead.
725 @wireprotocommand('capabilities')
726 @wireprotocommand('capabilities')
726 def capabilities(repo, proto):
727 def capabilities(repo, proto):
727 return ' '.join(_capabilities(repo, proto))
728 return ' '.join(_capabilities(repo, proto))
728
729
729 @wireprotocommand('changegroup', 'roots')
730 @wireprotocommand('changegroup', 'roots')
730 def changegroup(repo, proto, roots):
731 def changegroup(repo, proto, roots):
731 nodes = decodelist(roots)
732 nodes = decodelist(roots)
732 cg = changegroupmod.changegroup(repo, nodes, 'serve')
733 cg = changegroupmod.changegroup(repo, nodes, 'serve')
733 return streamres(proto.groupchunks(cg))
734 return streamres(proto.groupchunks(cg))
734
735
735 @wireprotocommand('changegroupsubset', 'bases heads')
736 @wireprotocommand('changegroupsubset', 'bases heads')
736 def changegroupsubset(repo, proto, bases, heads):
737 def changegroupsubset(repo, proto, bases, heads):
737 bases = decodelist(bases)
738 bases = decodelist(bases)
738 heads = decodelist(heads)
739 heads = decodelist(heads)
739 cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve')
740 cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve')
740 return streamres(proto.groupchunks(cg))
741 return streamres(proto.groupchunks(cg))
741
742
742 @wireprotocommand('debugwireargs', 'one two *')
743 @wireprotocommand('debugwireargs', 'one two *')
743 def debugwireargs(repo, proto, one, two, others):
744 def debugwireargs(repo, proto, one, two, others):
744 # only accept optional args from the known set
745 # only accept optional args from the known set
745 opts = options('debugwireargs', ['three', 'four'], others)
746 opts = options('debugwireargs', ['three', 'four'], others)
746 return repo.debugwireargs(one, two, **opts)
747 return repo.debugwireargs(one, two, **opts)
747
748
748 @wireprotocommand('getbundle', '*')
749 @wireprotocommand('getbundle', '*')
749 def getbundle(repo, proto, others):
750 def getbundle(repo, proto, others):
750 opts = options('getbundle', gboptsmap.keys(), others)
751 opts = options('getbundle', gboptsmap.keys(), others)
751 for k, v in opts.iteritems():
752 for k, v in opts.iteritems():
752 keytype = gboptsmap[k]
753 keytype = gboptsmap[k]
753 if keytype == 'nodes':
754 if keytype == 'nodes':
754 opts[k] = decodelist(v)
755 opts[k] = decodelist(v)
755 elif keytype == 'csv':
756 elif keytype == 'csv':
756 opts[k] = list(v.split(','))
757 opts[k] = list(v.split(','))
757 elif keytype == 'scsv':
758 elif keytype == 'scsv':
758 opts[k] = set(v.split(','))
759 opts[k] = set(v.split(','))
759 elif keytype == 'boolean':
760 elif keytype == 'boolean':
760 # Client should serialize False as '0', which is a non-empty string
761 # Client should serialize False as '0', which is a non-empty string
761 # so it evaluates as a True bool.
762 # so it evaluates as a True bool.
762 if v == '0':
763 if v == '0':
763 opts[k] = False
764 opts[k] = False
764 else:
765 else:
765 opts[k] = bool(v)
766 opts[k] = bool(v)
766 elif keytype != 'plain':
767 elif keytype != 'plain':
767 raise KeyError('unknown getbundle option type %s'
768 raise KeyError('unknown getbundle option type %s'
768 % keytype)
769 % keytype)
769
770
770 if not bundle1allowed(repo, 'pull'):
771 if not bundle1allowed(repo, 'pull'):
771 if not exchange.bundle2requested(opts.get('bundlecaps')):
772 if not exchange.bundle2requested(opts.get('bundlecaps')):
772 return ooberror(bundle2required)
773 return ooberror(bundle2required)
773
774
774 cg = exchange.getbundle(repo, 'serve', **opts)
775 cg = exchange.getbundle(repo, 'serve', **opts)
775 return streamres(proto.groupchunks(cg))
776 return streamres(proto.groupchunks(cg))
776
777
777 @wireprotocommand('heads')
778 @wireprotocommand('heads')
778 def heads(repo, proto):
779 def heads(repo, proto):
779 h = repo.heads()
780 h = repo.heads()
780 return encodelist(h) + "\n"
781 return encodelist(h) + "\n"
781
782
782 @wireprotocommand('hello')
783 @wireprotocommand('hello')
783 def hello(repo, proto):
784 def hello(repo, proto):
784 '''the hello command returns a set of lines describing various
785 '''the hello command returns a set of lines describing various
785 interesting things about the server, in an RFC822-like format.
786 interesting things about the server, in an RFC822-like format.
786 Currently the only one defined is "capabilities", which
787 Currently the only one defined is "capabilities", which
787 consists of a line in the form:
788 consists of a line in the form:
788
789
789 capabilities: space separated list of tokens
790 capabilities: space separated list of tokens
790 '''
791 '''
791 return "capabilities: %s\n" % (capabilities(repo, proto))
792 return "capabilities: %s\n" % (capabilities(repo, proto))
792
793
793 @wireprotocommand('listkeys', 'namespace')
794 @wireprotocommand('listkeys', 'namespace')
794 def listkeys(repo, proto, namespace):
795 def listkeys(repo, proto, namespace):
795 d = repo.listkeys(encoding.tolocal(namespace)).items()
796 d = repo.listkeys(encoding.tolocal(namespace)).items()
796 return pushkeymod.encodekeys(d)
797 return pushkeymod.encodekeys(d)
797
798
798 @wireprotocommand('lookup', 'key')
799 @wireprotocommand('lookup', 'key')
799 def lookup(repo, proto, key):
800 def lookup(repo, proto, key):
800 try:
801 try:
801 k = encoding.tolocal(key)
802 k = encoding.tolocal(key)
802 c = repo[k]
803 c = repo[k]
803 r = c.hex()
804 r = c.hex()
804 success = 1
805 success = 1
805 except Exception as inst:
806 except Exception as inst:
806 r = str(inst)
807 r = str(inst)
807 success = 0
808 success = 0
808 return "%s %s\n" % (success, r)
809 return "%s %s\n" % (success, r)
809
810
810 @wireprotocommand('known', 'nodes *')
811 @wireprotocommand('known', 'nodes *')
811 def known(repo, proto, nodes, others):
812 def known(repo, proto, nodes, others):
812 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
813 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
813
814
814 @wireprotocommand('pushkey', 'namespace key old new')
815 @wireprotocommand('pushkey', 'namespace key old new')
815 def pushkey(repo, proto, namespace, key, old, new):
816 def pushkey(repo, proto, namespace, key, old, new):
816 # compatibility with pre-1.8 clients which were accidentally
817 # compatibility with pre-1.8 clients which were accidentally
817 # sending raw binary nodes rather than utf-8-encoded hex
818 # sending raw binary nodes rather than utf-8-encoded hex
818 if len(new) == 20 and new.encode('string-escape') != new:
819 if len(new) == 20 and new.encode('string-escape') != new:
819 # looks like it could be a binary node
820 # looks like it could be a binary node
820 try:
821 try:
821 new.decode('utf-8')
822 new.decode('utf-8')
822 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
823 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
823 except UnicodeDecodeError:
824 except UnicodeDecodeError:
824 pass # binary, leave unmodified
825 pass # binary, leave unmodified
825 else:
826 else:
826 new = encoding.tolocal(new) # normal path
827 new = encoding.tolocal(new) # normal path
827
828
828 if util.safehasattr(proto, 'restore'):
829 if util.safehasattr(proto, 'restore'):
829
830
830 proto.redirect()
831 proto.redirect()
831
832
832 try:
833 try:
833 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
834 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
834 encoding.tolocal(old), new) or False
835 encoding.tolocal(old), new) or False
835 except error.Abort:
836 except error.Abort:
836 r = False
837 r = False
837
838
838 output = proto.restore()
839 output = proto.restore()
839
840
840 return '%s\n%s' % (int(r), output)
841 return '%s\n%s' % (int(r), output)
841
842
842 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
843 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
843 encoding.tolocal(old), new)
844 encoding.tolocal(old), new)
844 return '%s\n' % int(r)
845 return '%s\n' % int(r)
845
846
846 @wireprotocommand('stream_out')
847 @wireprotocommand('stream_out')
847 def stream(repo, proto):
848 def stream(repo, proto):
848 '''If the server supports streaming clone, it advertises the "stream"
849 '''If the server supports streaming clone, it advertises the "stream"
849 capability with a value representing the version and flags of the repo
850 capability with a value representing the version and flags of the repo
850 it is serving. Client checks to see if it understands the format.
851 it is serving. Client checks to see if it understands the format.
851 '''
852 '''
852 if not streamclone.allowservergeneration(repo.ui):
853 if not streamclone.allowservergeneration(repo.ui):
853 return '1\n'
854 return '1\n'
854
855
855 def getstream(it):
856 def getstream(it):
856 yield '0\n'
857 yield '0\n'
857 for chunk in it:
858 for chunk in it:
858 yield chunk
859 yield chunk
859
860
860 try:
861 try:
861 # LockError may be raised before the first result is yielded. Don't
862 # LockError may be raised before the first result is yielded. Don't
862 # emit output until we're sure we got the lock successfully.
863 # emit output until we're sure we got the lock successfully.
863 it = streamclone.generatev1wireproto(repo)
864 it = streamclone.generatev1wireproto(repo)
864 return streamres(getstream(it))
865 return streamres(getstream(it))
865 except error.LockError:
866 except error.LockError:
866 return '2\n'
867 return '2\n'
867
868
868 @wireprotocommand('unbundle', 'heads')
869 @wireprotocommand('unbundle', 'heads')
869 def unbundle(repo, proto, heads):
870 def unbundle(repo, proto, heads):
870 their_heads = decodelist(heads)
871 their_heads = decodelist(heads)
871
872
872 try:
873 try:
873 proto.redirect()
874 proto.redirect()
874
875
875 exchange.check_heads(repo, their_heads, 'preparing changes')
876 exchange.check_heads(repo, their_heads, 'preparing changes')
876
877
877 # write bundle data to temporary file because it can be big
878 # write bundle data to temporary file because it can be big
878 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
879 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
879 fp = os.fdopen(fd, 'wb+')
880 fp = os.fdopen(fd, 'wb+')
880 r = 0
881 r = 0
881 try:
882 try:
882 proto.getfile(fp)
883 proto.getfile(fp)
883 fp.seek(0)
884 fp.seek(0)
884 gen = exchange.readbundle(repo.ui, fp, None)
885 gen = exchange.readbundle(repo.ui, fp, None)
885 if (isinstance(gen, changegroupmod.cg1unpacker)
886 if (isinstance(gen, changegroupmod.cg1unpacker)
886 and not bundle1allowed(repo, 'push')):
887 and not bundle1allowed(repo, 'push')):
887 return ooberror(bundle2required)
888 return ooberror(bundle2required)
888
889
889 r = exchange.unbundle(repo, gen, their_heads, 'serve',
890 r = exchange.unbundle(repo, gen, their_heads, 'serve',
890 proto._client())
891 proto._client())
891 if util.safehasattr(r, 'addpart'):
892 if util.safehasattr(r, 'addpart'):
892 # The return looks streamable, we are in the bundle2 case and
893 # The return looks streamable, we are in the bundle2 case and
893 # should return a stream.
894 # should return a stream.
894 return streamres(r.getchunks())
895 return streamres(r.getchunks())
895 return pushres(r)
896 return pushres(r)
896
897
897 finally:
898 finally:
898 fp.close()
899 fp.close()
899 os.unlink(tempname)
900 os.unlink(tempname)
900
901
901 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
902 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
902 # handle non-bundle2 case first
903 # handle non-bundle2 case first
903 if not getattr(exc, 'duringunbundle2', False):
904 if not getattr(exc, 'duringunbundle2', False):
904 try:
905 try:
905 raise
906 raise
906 except error.Abort:
907 except error.Abort:
907 # The old code we moved used sys.stderr directly.
908 # The old code we moved used sys.stderr directly.
908 # We did not change it to minimise code change.
909 # We did not change it to minimise code change.
909 # This need to be moved to something proper.
910 # This need to be moved to something proper.
910 # Feel free to do it.
911 # Feel free to do it.
911 sys.stderr.write("abort: %s\n" % exc)
912 sys.stderr.write("abort: %s\n" % exc)
912 return pushres(0)
913 return pushres(0)
913 except error.PushRaced:
914 except error.PushRaced:
914 return pusherr(str(exc))
915 return pusherr(str(exc))
915
916
916 bundler = bundle2.bundle20(repo.ui)
917 bundler = bundle2.bundle20(repo.ui)
917 for out in getattr(exc, '_bundle2salvagedoutput', ()):
918 for out in getattr(exc, '_bundle2salvagedoutput', ()):
918 bundler.addpart(out)
919 bundler.addpart(out)
919 try:
920 try:
920 try:
921 try:
921 raise
922 raise
922 except error.PushkeyFailed as exc:
923 except error.PushkeyFailed as exc:
923 # check client caps
924 # check client caps
924 remotecaps = getattr(exc, '_replycaps', None)
925 remotecaps = getattr(exc, '_replycaps', None)
925 if (remotecaps is not None
926 if (remotecaps is not None
926 and 'pushkey' not in remotecaps.get('error', ())):
927 and 'pushkey' not in remotecaps.get('error', ())):
927 # no support remote side, fallback to Abort handler.
928 # no support remote side, fallback to Abort handler.
928 raise
929 raise
929 part = bundler.newpart('error:pushkey')
930 part = bundler.newpart('error:pushkey')
930 part.addparam('in-reply-to', exc.partid)
931 part.addparam('in-reply-to', exc.partid)
931 if exc.namespace is not None:
932 if exc.namespace is not None:
932 part.addparam('namespace', exc.namespace, mandatory=False)
933 part.addparam('namespace', exc.namespace, mandatory=False)
933 if exc.key is not None:
934 if exc.key is not None:
934 part.addparam('key', exc.key, mandatory=False)
935 part.addparam('key', exc.key, mandatory=False)
935 if exc.new is not None:
936 if exc.new is not None:
936 part.addparam('new', exc.new, mandatory=False)
937 part.addparam('new', exc.new, mandatory=False)
937 if exc.old is not None:
938 if exc.old is not None:
938 part.addparam('old', exc.old, mandatory=False)
939 part.addparam('old', exc.old, mandatory=False)
939 if exc.ret is not None:
940 if exc.ret is not None:
940 part.addparam('ret', exc.ret, mandatory=False)
941 part.addparam('ret', exc.ret, mandatory=False)
941 except error.BundleValueError as exc:
942 except error.BundleValueError as exc:
942 errpart = bundler.newpart('error:unsupportedcontent')
943 errpart = bundler.newpart('error:unsupportedcontent')
943 if exc.parttype is not None:
944 if exc.parttype is not None:
944 errpart.addparam('parttype', exc.parttype)
945 errpart.addparam('parttype', exc.parttype)
945 if exc.params:
946 if exc.params:
946 errpart.addparam('params', '\0'.join(exc.params))
947 errpart.addparam('params', '\0'.join(exc.params))
947 except error.Abort as exc:
948 except error.Abort as exc:
948 manargs = [('message', str(exc))]
949 manargs = [('message', str(exc))]
949 advargs = []
950 advargs = []
950 if exc.hint is not None:
951 if exc.hint is not None:
951 advargs.append(('hint', exc.hint))
952 advargs.append(('hint', exc.hint))
952 bundler.addpart(bundle2.bundlepart('error:abort',
953 bundler.addpart(bundle2.bundlepart('error:abort',
953 manargs, advargs))
954 manargs, advargs))
954 except error.PushRaced as exc:
955 except error.PushRaced as exc:
955 bundler.newpart('error:pushraced', [('message', str(exc))])
956 bundler.newpart('error:pushraced', [('message', str(exc))])
956 return streamres(bundler.getchunks())
957 return streamres(bundler.getchunks())
General Comments 0
You need to be logged in to leave comments. Login now