##// END OF EJS Templates
wireproto: compress data from a generator...
Gregory Szorc -
r30206:d1051954 default
parent child Browse files
Show More
@@ -1,124 +1,133 b''
1 #
1 #
2 # Copyright 21 May 2005 - (c) 2005 Jake Edge <jake@edge2.net>
2 # Copyright 21 May 2005 - (c) 2005 Jake Edge <jake@edge2.net>
3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import cgi
10 import cgi
11 import zlib
11 import zlib
12
12
13 from .common import (
13 from .common import (
14 HTTP_OK,
14 HTTP_OK,
15 )
15 )
16
16
17 from .. import (
17 from .. import (
18 util,
18 util,
19 wireproto,
19 wireproto,
20 )
20 )
21 stringio = util.stringio
21 stringio = util.stringio
22
22
23 urlerr = util.urlerr
23 urlerr = util.urlerr
24 urlreq = util.urlreq
24 urlreq = util.urlreq
25
25
26 HGTYPE = 'application/mercurial-0.1'
26 HGTYPE = 'application/mercurial-0.1'
27 HGERRTYPE = 'application/hg-error'
27 HGERRTYPE = 'application/hg-error'
28
28
29 class webproto(wireproto.abstractserverproto):
29 class webproto(wireproto.abstractserverproto):
30 def __init__(self, req, ui):
30 def __init__(self, req, ui):
31 self.req = req
31 self.req = req
32 self.response = ''
32 self.response = ''
33 self.ui = ui
33 self.ui = ui
34 def getargs(self, args):
34 def getargs(self, args):
35 knownargs = self._args()
35 knownargs = self._args()
36 data = {}
36 data = {}
37 keys = args.split()
37 keys = args.split()
38 for k in keys:
38 for k in keys:
39 if k == '*':
39 if k == '*':
40 star = {}
40 star = {}
41 for key in knownargs.keys():
41 for key in knownargs.keys():
42 if key != 'cmd' and key not in keys:
42 if key != 'cmd' and key not in keys:
43 star[key] = knownargs[key][0]
43 star[key] = knownargs[key][0]
44 data['*'] = star
44 data['*'] = star
45 else:
45 else:
46 data[k] = knownargs[k][0]
46 data[k] = knownargs[k][0]
47 return [data[k] for k in keys]
47 return [data[k] for k in keys]
48 def _args(self):
48 def _args(self):
49 args = self.req.form.copy()
49 args = self.req.form.copy()
50 postlen = int(self.req.env.get('HTTP_X_HGARGS_POST', 0))
50 postlen = int(self.req.env.get('HTTP_X_HGARGS_POST', 0))
51 if postlen:
51 if postlen:
52 args.update(cgi.parse_qs(
52 args.update(cgi.parse_qs(
53 self.req.read(postlen), keep_blank_values=True))
53 self.req.read(postlen), keep_blank_values=True))
54 return args
54 return args
55 chunks = []
55 chunks = []
56 i = 1
56 i = 1
57 while True:
57 while True:
58 h = self.req.env.get('HTTP_X_HGARG_' + str(i))
58 h = self.req.env.get('HTTP_X_HGARG_' + str(i))
59 if h is None:
59 if h is None:
60 break
60 break
61 chunks += [h]
61 chunks += [h]
62 i += 1
62 i += 1
63 args.update(cgi.parse_qs(''.join(chunks), keep_blank_values=True))
63 args.update(cgi.parse_qs(''.join(chunks), keep_blank_values=True))
64 return args
64 return args
65 def getfile(self, fp):
65 def getfile(self, fp):
66 length = int(self.req.env['CONTENT_LENGTH'])
66 length = int(self.req.env['CONTENT_LENGTH'])
67 for s in util.filechunkiter(self.req, limit=length):
67 for s in util.filechunkiter(self.req, limit=length):
68 fp.write(s)
68 fp.write(s)
69 def redirect(self):
69 def redirect(self):
70 self.oldio = self.ui.fout, self.ui.ferr
70 self.oldio = self.ui.fout, self.ui.ferr
71 self.ui.ferr = self.ui.fout = stringio()
71 self.ui.ferr = self.ui.fout = stringio()
72 def restore(self):
72 def restore(self):
73 val = self.ui.fout.getvalue()
73 val = self.ui.fout.getvalue()
74 self.ui.ferr, self.ui.fout = self.oldio
74 self.ui.ferr, self.ui.fout = self.oldio
75 return val
75 return val
76
76 def groupchunks(self, fh):
77 def groupchunks(self, fh):
78 def getchunks():
79 while True:
80 chunk = fh.read(32768)
81 if not chunk:
82 break
83 yield chunk
84
85 return self.compresschunks(getchunks())
86
87 def compresschunks(self, chunks):
77 # Don't allow untrusted settings because disabling compression or
88 # Don't allow untrusted settings because disabling compression or
78 # setting a very high compression level could lead to flooding
89 # setting a very high compression level could lead to flooding
79 # the server's network or CPU.
90 # the server's network or CPU.
80 z = zlib.compressobj(self.ui.configint('server', 'zliblevel', -1))
91 z = zlib.compressobj(self.ui.configint('server', 'zliblevel', -1))
81 while True:
92 for chunk in chunks:
82 chunk = fh.read(32768)
83 if not chunk:
84 break
85 data = z.compress(chunk)
93 data = z.compress(chunk)
86 # Not all calls to compress() emit data. It is cheaper to inspect
94 # Not all calls to compress() emit data. It is cheaper to inspect
87 # that here than to send it via the generator.
95 # that here than to send it via the generator.
88 if data:
96 if data:
89 yield data
97 yield data
90 yield z.flush()
98 yield z.flush()
99
91 def _client(self):
100 def _client(self):
92 return 'remote:%s:%s:%s' % (
101 return 'remote:%s:%s:%s' % (
93 self.req.env.get('wsgi.url_scheme') or 'http',
102 self.req.env.get('wsgi.url_scheme') or 'http',
94 urlreq.quote(self.req.env.get('REMOTE_HOST', '')),
103 urlreq.quote(self.req.env.get('REMOTE_HOST', '')),
95 urlreq.quote(self.req.env.get('REMOTE_USER', '')))
104 urlreq.quote(self.req.env.get('REMOTE_USER', '')))
96
105
97 def iscmd(cmd):
106 def iscmd(cmd):
98 return cmd in wireproto.commands
107 return cmd in wireproto.commands
99
108
100 def call(repo, req, cmd):
109 def call(repo, req, cmd):
101 p = webproto(req, repo.ui)
110 p = webproto(req, repo.ui)
102 rsp = wireproto.dispatch(repo, p, cmd)
111 rsp = wireproto.dispatch(repo, p, cmd)
103 if isinstance(rsp, str):
112 if isinstance(rsp, str):
104 req.respond(HTTP_OK, HGTYPE, body=rsp)
113 req.respond(HTTP_OK, HGTYPE, body=rsp)
105 return []
114 return []
106 elif isinstance(rsp, wireproto.streamres):
115 elif isinstance(rsp, wireproto.streamres):
107 req.respond(HTTP_OK, HGTYPE)
116 req.respond(HTTP_OK, HGTYPE)
108 return rsp.gen
117 return rsp.gen
109 elif isinstance(rsp, wireproto.pushres):
118 elif isinstance(rsp, wireproto.pushres):
110 val = p.restore()
119 val = p.restore()
111 rsp = '%d\n%s' % (rsp.res, val)
120 rsp = '%d\n%s' % (rsp.res, val)
112 req.respond(HTTP_OK, HGTYPE, body=rsp)
121 req.respond(HTTP_OK, HGTYPE, body=rsp)
113 return []
122 return []
114 elif isinstance(rsp, wireproto.pusherr):
123 elif isinstance(rsp, wireproto.pusherr):
115 # drain the incoming bundle
124 # drain the incoming bundle
116 req.drain()
125 req.drain()
117 p.restore()
126 p.restore()
118 rsp = '0\n%s\n' % rsp.res
127 rsp = '0\n%s\n' % rsp.res
119 req.respond(HTTP_OK, HGTYPE, body=rsp)
128 req.respond(HTTP_OK, HGTYPE, body=rsp)
120 return []
129 return []
121 elif isinstance(rsp, wireproto.ooberror):
130 elif isinstance(rsp, wireproto.ooberror):
122 rsp = rsp.message
131 rsp = rsp.message
123 req.respond(HTTP_OK, HGERRTYPE, body=rsp)
132 req.respond(HTTP_OK, HGERRTYPE, body=rsp)
124 return []
133 return []
@@ -1,131 +1,135 b''
1 # sshserver.py - ssh protocol server support for mercurial
1 # sshserver.py - ssh protocol server support for mercurial
2 #
2 #
3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
4 # Copyright 2006 Vadim Gelfer <vadim.gelfer@gmail.com>
4 # Copyright 2006 Vadim Gelfer <vadim.gelfer@gmail.com>
5 #
5 #
6 # This software may be used and distributed according to the terms of the
6 # This software may be used and distributed according to the terms of the
7 # GNU General Public License version 2 or any later version.
7 # GNU General Public License version 2 or any later version.
8
8
9 from __future__ import absolute_import
9 from __future__ import absolute_import
10
10
11 import os
11 import os
12 import sys
12 import sys
13
13
14 from .i18n import _
14 from .i18n import _
15 from . import (
15 from . import (
16 error,
16 error,
17 hook,
17 hook,
18 util,
18 util,
19 wireproto,
19 wireproto,
20 )
20 )
21
21
22 class sshserver(wireproto.abstractserverproto):
22 class sshserver(wireproto.abstractserverproto):
23 def __init__(self, ui, repo):
23 def __init__(self, ui, repo):
24 self.ui = ui
24 self.ui = ui
25 self.repo = repo
25 self.repo = repo
26 self.lock = None
26 self.lock = None
27 self.fin = ui.fin
27 self.fin = ui.fin
28 self.fout = ui.fout
28 self.fout = ui.fout
29
29
30 hook.redirect(True)
30 hook.redirect(True)
31 ui.fout = repo.ui.fout = ui.ferr
31 ui.fout = repo.ui.fout = ui.ferr
32
32
33 # Prevent insertion/deletion of CRs
33 # Prevent insertion/deletion of CRs
34 util.setbinary(self.fin)
34 util.setbinary(self.fin)
35 util.setbinary(self.fout)
35 util.setbinary(self.fout)
36
36
37 def getargs(self, args):
37 def getargs(self, args):
38 data = {}
38 data = {}
39 keys = args.split()
39 keys = args.split()
40 for n in xrange(len(keys)):
40 for n in xrange(len(keys)):
41 argline = self.fin.readline()[:-1]
41 argline = self.fin.readline()[:-1]
42 arg, l = argline.split()
42 arg, l = argline.split()
43 if arg not in keys:
43 if arg not in keys:
44 raise error.Abort(_("unexpected parameter %r") % arg)
44 raise error.Abort(_("unexpected parameter %r") % arg)
45 if arg == '*':
45 if arg == '*':
46 star = {}
46 star = {}
47 for k in xrange(int(l)):
47 for k in xrange(int(l)):
48 argline = self.fin.readline()[:-1]
48 argline = self.fin.readline()[:-1]
49 arg, l = argline.split()
49 arg, l = argline.split()
50 val = self.fin.read(int(l))
50 val = self.fin.read(int(l))
51 star[arg] = val
51 star[arg] = val
52 data['*'] = star
52 data['*'] = star
53 else:
53 else:
54 val = self.fin.read(int(l))
54 val = self.fin.read(int(l))
55 data[arg] = val
55 data[arg] = val
56 return [data[k] for k in keys]
56 return [data[k] for k in keys]
57
57
58 def getarg(self, name):
58 def getarg(self, name):
59 return self.getargs(name)[0]
59 return self.getargs(name)[0]
60
60
61 def getfile(self, fpout):
61 def getfile(self, fpout):
62 self.sendresponse('')
62 self.sendresponse('')
63 count = int(self.fin.readline())
63 count = int(self.fin.readline())
64 while count:
64 while count:
65 fpout.write(self.fin.read(count))
65 fpout.write(self.fin.read(count))
66 count = int(self.fin.readline())
66 count = int(self.fin.readline())
67
67
68 def redirect(self):
68 def redirect(self):
69 pass
69 pass
70
70
71 def groupchunks(self, fh):
71 def groupchunks(self, fh):
72 return iter(lambda: fh.read(4096), '')
72 return iter(lambda: fh.read(4096), '')
73
73
74 def compresschunks(self, chunks):
75 for chunk in chunks:
76 yield chunk
77
74 def sendresponse(self, v):
78 def sendresponse(self, v):
75 self.fout.write("%d\n" % len(v))
79 self.fout.write("%d\n" % len(v))
76 self.fout.write(v)
80 self.fout.write(v)
77 self.fout.flush()
81 self.fout.flush()
78
82
79 def sendstream(self, source):
83 def sendstream(self, source):
80 write = self.fout.write
84 write = self.fout.write
81 for chunk in source.gen:
85 for chunk in source.gen:
82 write(chunk)
86 write(chunk)
83 self.fout.flush()
87 self.fout.flush()
84
88
85 def sendpushresponse(self, rsp):
89 def sendpushresponse(self, rsp):
86 self.sendresponse('')
90 self.sendresponse('')
87 self.sendresponse(str(rsp.res))
91 self.sendresponse(str(rsp.res))
88
92
89 def sendpusherror(self, rsp):
93 def sendpusherror(self, rsp):
90 self.sendresponse(rsp.res)
94 self.sendresponse(rsp.res)
91
95
92 def sendooberror(self, rsp):
96 def sendooberror(self, rsp):
93 self.ui.ferr.write('%s\n-\n' % rsp.message)
97 self.ui.ferr.write('%s\n-\n' % rsp.message)
94 self.ui.ferr.flush()
98 self.ui.ferr.flush()
95 self.fout.write('\n')
99 self.fout.write('\n')
96 self.fout.flush()
100 self.fout.flush()
97
101
98 def serve_forever(self):
102 def serve_forever(self):
99 try:
103 try:
100 while self.serve_one():
104 while self.serve_one():
101 pass
105 pass
102 finally:
106 finally:
103 if self.lock is not None:
107 if self.lock is not None:
104 self.lock.release()
108 self.lock.release()
105 sys.exit(0)
109 sys.exit(0)
106
110
107 handlers = {
111 handlers = {
108 str: sendresponse,
112 str: sendresponse,
109 wireproto.streamres: sendstream,
113 wireproto.streamres: sendstream,
110 wireproto.pushres: sendpushresponse,
114 wireproto.pushres: sendpushresponse,
111 wireproto.pusherr: sendpusherror,
115 wireproto.pusherr: sendpusherror,
112 wireproto.ooberror: sendooberror,
116 wireproto.ooberror: sendooberror,
113 }
117 }
114
118
115 def serve_one(self):
119 def serve_one(self):
116 cmd = self.fin.readline()[:-1]
120 cmd = self.fin.readline()[:-1]
117 if cmd and cmd in wireproto.commands:
121 if cmd and cmd in wireproto.commands:
118 rsp = wireproto.dispatch(self.repo, self, cmd)
122 rsp = wireproto.dispatch(self.repo, self, cmd)
119 self.handlers[rsp.__class__](self, rsp)
123 self.handlers[rsp.__class__](self, rsp)
120 elif cmd:
124 elif cmd:
121 impl = getattr(self, 'do_' + cmd, None)
125 impl = getattr(self, 'do_' + cmd, None)
122 if impl:
126 if impl:
123 r = impl()
127 r = impl()
124 if r is not None:
128 if r is not None:
125 self.sendresponse(r)
129 self.sendresponse(r)
126 else: self.sendresponse("")
130 else: self.sendresponse("")
127 return cmd != ''
131 return cmd != ''
128
132
129 def _client(self):
133 def _client(self):
130 client = os.environ.get('SSH_CLIENT', '').split(' ', 1)[0]
134 client = os.environ.get('SSH_CLIENT', '').split(' ', 1)[0]
131 return 'remote:ssh:' + client
135 return 'remote:ssh:' + client
@@ -1,959 +1,965 b''
1 # wireproto.py - generic wire protocol support functions
1 # wireproto.py - generic wire protocol support functions
2 #
2 #
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import hashlib
10 import hashlib
11 import itertools
11 import itertools
12 import os
12 import os
13 import sys
13 import sys
14 import tempfile
14 import tempfile
15
15
16 from .i18n import _
16 from .i18n import _
17 from .node import (
17 from .node import (
18 bin,
18 bin,
19 hex,
19 hex,
20 )
20 )
21
21
22 from . import (
22 from . import (
23 bundle2,
23 bundle2,
24 changegroup as changegroupmod,
24 changegroup as changegroupmod,
25 encoding,
25 encoding,
26 error,
26 error,
27 exchange,
27 exchange,
28 peer,
28 peer,
29 pushkey as pushkeymod,
29 pushkey as pushkeymod,
30 streamclone,
30 streamclone,
31 util,
31 util,
32 )
32 )
33
33
34 urlerr = util.urlerr
34 urlerr = util.urlerr
35 urlreq = util.urlreq
35 urlreq = util.urlreq
36
36
37 bundle2required = _(
37 bundle2required = _(
38 'incompatible Mercurial client; bundle2 required\n'
38 'incompatible Mercurial client; bundle2 required\n'
39 '(see https://www.mercurial-scm.org/wiki/IncompatibleClient)\n')
39 '(see https://www.mercurial-scm.org/wiki/IncompatibleClient)\n')
40
40
41 class abstractserverproto(object):
41 class abstractserverproto(object):
42 """abstract class that summarizes the protocol API
42 """abstract class that summarizes the protocol API
43
43
44 Used as reference and documentation.
44 Used as reference and documentation.
45 """
45 """
46
46
47 def getargs(self, args):
47 def getargs(self, args):
48 """return the value for arguments in <args>
48 """return the value for arguments in <args>
49
49
50 returns a list of values (same order as <args>)"""
50 returns a list of values (same order as <args>)"""
51 raise NotImplementedError()
51 raise NotImplementedError()
52
52
53 def getfile(self, fp):
53 def getfile(self, fp):
54 """write the whole content of a file into a file like object
54 """write the whole content of a file into a file like object
55
55
56 The file is in the form::
56 The file is in the form::
57
57
58 (<chunk-size>\n<chunk>)+0\n
58 (<chunk-size>\n<chunk>)+0\n
59
59
60 chunk size is the ascii version of the int.
60 chunk size is the ascii version of the int.
61 """
61 """
62 raise NotImplementedError()
62 raise NotImplementedError()
63
63
64 def redirect(self):
64 def redirect(self):
65 """may setup interception for stdout and stderr
65 """may setup interception for stdout and stderr
66
66
67 See also the `restore` method."""
67 See also the `restore` method."""
68 raise NotImplementedError()
68 raise NotImplementedError()
69
69
70 # If the `redirect` function does install interception, the `restore`
70 # If the `redirect` function does install interception, the `restore`
71 # function MUST be defined. If interception is not used, this function
71 # function MUST be defined. If interception is not used, this function
72 # MUST NOT be defined.
72 # MUST NOT be defined.
73 #
73 #
74 # left commented here on purpose
74 # left commented here on purpose
75 #
75 #
76 #def restore(self):
76 #def restore(self):
77 # """reinstall previous stdout and stderr and return intercepted stdout
77 # """reinstall previous stdout and stderr and return intercepted stdout
78 # """
78 # """
79 # raise NotImplementedError()
79 # raise NotImplementedError()
80
80
81 def groupchunks(self, fh):
81 def groupchunks(self, fh):
82 """Generator of chunks to send to the client.
82 """Generator of chunks to send to the client.
83
83
84 Some protocols may have compressed the contents.
84 Some protocols may have compressed the contents.
85 """
85 """
86 raise NotImplementedError()
86 raise NotImplementedError()
87
87
88 def compresschunks(self, chunks):
89 """Generator of possible compressed chunks to send to the client.
90
91 This is like ``groupchunks()`` except it accepts a generator as
92 its argument.
93 """
94 raise NotImplementedError()
95
88 class remotebatch(peer.batcher):
96 class remotebatch(peer.batcher):
89 '''batches the queued calls; uses as few roundtrips as possible'''
97 '''batches the queued calls; uses as few roundtrips as possible'''
90 def __init__(self, remote):
98 def __init__(self, remote):
91 '''remote must support _submitbatch(encbatch) and
99 '''remote must support _submitbatch(encbatch) and
92 _submitone(op, encargs)'''
100 _submitone(op, encargs)'''
93 peer.batcher.__init__(self)
101 peer.batcher.__init__(self)
94 self.remote = remote
102 self.remote = remote
95 def submit(self):
103 def submit(self):
96 req, rsp = [], []
104 req, rsp = [], []
97 for name, args, opts, resref in self.calls:
105 for name, args, opts, resref in self.calls:
98 mtd = getattr(self.remote, name)
106 mtd = getattr(self.remote, name)
99 batchablefn = getattr(mtd, 'batchable', None)
107 batchablefn = getattr(mtd, 'batchable', None)
100 if batchablefn is not None:
108 if batchablefn is not None:
101 batchable = batchablefn(mtd.im_self, *args, **opts)
109 batchable = batchablefn(mtd.im_self, *args, **opts)
102 encargsorres, encresref = next(batchable)
110 encargsorres, encresref = next(batchable)
103 if encresref:
111 if encresref:
104 req.append((name, encargsorres,))
112 req.append((name, encargsorres,))
105 rsp.append((batchable, encresref, resref,))
113 rsp.append((batchable, encresref, resref,))
106 else:
114 else:
107 resref.set(encargsorres)
115 resref.set(encargsorres)
108 else:
116 else:
109 if req:
117 if req:
110 self._submitreq(req, rsp)
118 self._submitreq(req, rsp)
111 req, rsp = [], []
119 req, rsp = [], []
112 resref.set(mtd(*args, **opts))
120 resref.set(mtd(*args, **opts))
113 if req:
121 if req:
114 self._submitreq(req, rsp)
122 self._submitreq(req, rsp)
115 def _submitreq(self, req, rsp):
123 def _submitreq(self, req, rsp):
116 encresults = self.remote._submitbatch(req)
124 encresults = self.remote._submitbatch(req)
117 for encres, r in zip(encresults, rsp):
125 for encres, r in zip(encresults, rsp):
118 batchable, encresref, resref = r
126 batchable, encresref, resref = r
119 encresref.set(encres)
127 encresref.set(encres)
120 resref.set(next(batchable))
128 resref.set(next(batchable))
121
129
122 class remoteiterbatcher(peer.iterbatcher):
130 class remoteiterbatcher(peer.iterbatcher):
123 def __init__(self, remote):
131 def __init__(self, remote):
124 super(remoteiterbatcher, self).__init__()
132 super(remoteiterbatcher, self).__init__()
125 self._remote = remote
133 self._remote = remote
126
134
127 def __getattr__(self, name):
135 def __getattr__(self, name):
128 if not getattr(self._remote, name, False):
136 if not getattr(self._remote, name, False):
129 raise AttributeError(
137 raise AttributeError(
130 'Attempted to iterbatch non-batchable call to %r' % name)
138 'Attempted to iterbatch non-batchable call to %r' % name)
131 return super(remoteiterbatcher, self).__getattr__(name)
139 return super(remoteiterbatcher, self).__getattr__(name)
132
140
133 def submit(self):
141 def submit(self):
134 """Break the batch request into many patch calls and pipeline them.
142 """Break the batch request into many patch calls and pipeline them.
135
143
136 This is mostly valuable over http where request sizes can be
144 This is mostly valuable over http where request sizes can be
137 limited, but can be used in other places as well.
145 limited, but can be used in other places as well.
138 """
146 """
139 req, rsp = [], []
147 req, rsp = [], []
140 for name, args, opts, resref in self.calls:
148 for name, args, opts, resref in self.calls:
141 mtd = getattr(self._remote, name)
149 mtd = getattr(self._remote, name)
142 batchable = mtd.batchable(mtd.im_self, *args, **opts)
150 batchable = mtd.batchable(mtd.im_self, *args, **opts)
143 encargsorres, encresref = next(batchable)
151 encargsorres, encresref = next(batchable)
144 assert encresref
152 assert encresref
145 req.append((name, encargsorres))
153 req.append((name, encargsorres))
146 rsp.append((batchable, encresref))
154 rsp.append((batchable, encresref))
147 if req:
155 if req:
148 self._resultiter = self._remote._submitbatch(req)
156 self._resultiter = self._remote._submitbatch(req)
149 self._rsp = rsp
157 self._rsp = rsp
150
158
151 def results(self):
159 def results(self):
152 for (batchable, encresref), encres in itertools.izip(
160 for (batchable, encresref), encres in itertools.izip(
153 self._rsp, self._resultiter):
161 self._rsp, self._resultiter):
154 encresref.set(encres)
162 encresref.set(encres)
155 yield next(batchable)
163 yield next(batchable)
156
164
157 # Forward a couple of names from peer to make wireproto interactions
165 # Forward a couple of names from peer to make wireproto interactions
158 # slightly more sensible.
166 # slightly more sensible.
159 batchable = peer.batchable
167 batchable = peer.batchable
160 future = peer.future
168 future = peer.future
161
169
162 # list of nodes encoding / decoding
170 # list of nodes encoding / decoding
163
171
164 def decodelist(l, sep=' '):
172 def decodelist(l, sep=' '):
165 if l:
173 if l:
166 return map(bin, l.split(sep))
174 return map(bin, l.split(sep))
167 return []
175 return []
168
176
169 def encodelist(l, sep=' '):
177 def encodelist(l, sep=' '):
170 try:
178 try:
171 return sep.join(map(hex, l))
179 return sep.join(map(hex, l))
172 except TypeError:
180 except TypeError:
173 raise
181 raise
174
182
175 # batched call argument encoding
183 # batched call argument encoding
176
184
177 def escapearg(plain):
185 def escapearg(plain):
178 return (plain
186 return (plain
179 .replace(':', ':c')
187 .replace(':', ':c')
180 .replace(',', ':o')
188 .replace(',', ':o')
181 .replace(';', ':s')
189 .replace(';', ':s')
182 .replace('=', ':e'))
190 .replace('=', ':e'))
183
191
184 def unescapearg(escaped):
192 def unescapearg(escaped):
185 return (escaped
193 return (escaped
186 .replace(':e', '=')
194 .replace(':e', '=')
187 .replace(':s', ';')
195 .replace(':s', ';')
188 .replace(':o', ',')
196 .replace(':o', ',')
189 .replace(':c', ':'))
197 .replace(':c', ':'))
190
198
191 def encodebatchcmds(req):
199 def encodebatchcmds(req):
192 """Return a ``cmds`` argument value for the ``batch`` command."""
200 """Return a ``cmds`` argument value for the ``batch`` command."""
193 cmds = []
201 cmds = []
194 for op, argsdict in req:
202 for op, argsdict in req:
195 # Old servers didn't properly unescape argument names. So prevent
203 # Old servers didn't properly unescape argument names. So prevent
196 # the sending of argument names that may not be decoded properly by
204 # the sending of argument names that may not be decoded properly by
197 # servers.
205 # servers.
198 assert all(escapearg(k) == k for k in argsdict)
206 assert all(escapearg(k) == k for k in argsdict)
199
207
200 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
208 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
201 for k, v in argsdict.iteritems())
209 for k, v in argsdict.iteritems())
202 cmds.append('%s %s' % (op, args))
210 cmds.append('%s %s' % (op, args))
203
211
204 return ';'.join(cmds)
212 return ';'.join(cmds)
205
213
206 # mapping of options accepted by getbundle and their types
214 # mapping of options accepted by getbundle and their types
207 #
215 #
208 # Meant to be extended by extensions. It is extensions responsibility to ensure
216 # Meant to be extended by extensions. It is extensions responsibility to ensure
209 # such options are properly processed in exchange.getbundle.
217 # such options are properly processed in exchange.getbundle.
210 #
218 #
211 # supported types are:
219 # supported types are:
212 #
220 #
213 # :nodes: list of binary nodes
221 # :nodes: list of binary nodes
214 # :csv: list of comma-separated values
222 # :csv: list of comma-separated values
215 # :scsv: list of comma-separated values return as set
223 # :scsv: list of comma-separated values return as set
216 # :plain: string with no transformation needed.
224 # :plain: string with no transformation needed.
217 gboptsmap = {'heads': 'nodes',
225 gboptsmap = {'heads': 'nodes',
218 'common': 'nodes',
226 'common': 'nodes',
219 'obsmarkers': 'boolean',
227 'obsmarkers': 'boolean',
220 'bundlecaps': 'scsv',
228 'bundlecaps': 'scsv',
221 'listkeys': 'csv',
229 'listkeys': 'csv',
222 'cg': 'boolean',
230 'cg': 'boolean',
223 'cbattempted': 'boolean'}
231 'cbattempted': 'boolean'}
224
232
225 # client side
233 # client side
226
234
227 class wirepeer(peer.peerrepository):
235 class wirepeer(peer.peerrepository):
228 """Client-side interface for communicating with a peer repository.
236 """Client-side interface for communicating with a peer repository.
229
237
230 Methods commonly call wire protocol commands of the same name.
238 Methods commonly call wire protocol commands of the same name.
231
239
232 See also httppeer.py and sshpeer.py for protocol-specific
240 See also httppeer.py and sshpeer.py for protocol-specific
233 implementations of this interface.
241 implementations of this interface.
234 """
242 """
235 def batch(self):
243 def batch(self):
236 if self.capable('batch'):
244 if self.capable('batch'):
237 return remotebatch(self)
245 return remotebatch(self)
238 else:
246 else:
239 return peer.localbatch(self)
247 return peer.localbatch(self)
240 def _submitbatch(self, req):
248 def _submitbatch(self, req):
241 """run batch request <req> on the server
249 """run batch request <req> on the server
242
250
243 Returns an iterator of the raw responses from the server.
251 Returns an iterator of the raw responses from the server.
244 """
252 """
245 rsp = self._callstream("batch", cmds=encodebatchcmds(req))
253 rsp = self._callstream("batch", cmds=encodebatchcmds(req))
246 chunk = rsp.read(1024)
254 chunk = rsp.read(1024)
247 work = [chunk]
255 work = [chunk]
248 while chunk:
256 while chunk:
249 while ';' not in chunk and chunk:
257 while ';' not in chunk and chunk:
250 chunk = rsp.read(1024)
258 chunk = rsp.read(1024)
251 work.append(chunk)
259 work.append(chunk)
252 merged = ''.join(work)
260 merged = ''.join(work)
253 while ';' in merged:
261 while ';' in merged:
254 one, merged = merged.split(';', 1)
262 one, merged = merged.split(';', 1)
255 yield unescapearg(one)
263 yield unescapearg(one)
256 chunk = rsp.read(1024)
264 chunk = rsp.read(1024)
257 work = [merged, chunk]
265 work = [merged, chunk]
258 yield unescapearg(''.join(work))
266 yield unescapearg(''.join(work))
259
267
260 def _submitone(self, op, args):
268 def _submitone(self, op, args):
261 return self._call(op, **args)
269 return self._call(op, **args)
262
270
263 def iterbatch(self):
271 def iterbatch(self):
264 return remoteiterbatcher(self)
272 return remoteiterbatcher(self)
265
273
266 @batchable
274 @batchable
267 def lookup(self, key):
275 def lookup(self, key):
268 self.requirecap('lookup', _('look up remote revision'))
276 self.requirecap('lookup', _('look up remote revision'))
269 f = future()
277 f = future()
270 yield {'key': encoding.fromlocal(key)}, f
278 yield {'key': encoding.fromlocal(key)}, f
271 d = f.value
279 d = f.value
272 success, data = d[:-1].split(" ", 1)
280 success, data = d[:-1].split(" ", 1)
273 if int(success):
281 if int(success):
274 yield bin(data)
282 yield bin(data)
275 self._abort(error.RepoError(data))
283 self._abort(error.RepoError(data))
276
284
277 @batchable
285 @batchable
278 def heads(self):
286 def heads(self):
279 f = future()
287 f = future()
280 yield {}, f
288 yield {}, f
281 d = f.value
289 d = f.value
282 try:
290 try:
283 yield decodelist(d[:-1])
291 yield decodelist(d[:-1])
284 except ValueError:
292 except ValueError:
285 self._abort(error.ResponseError(_("unexpected response:"), d))
293 self._abort(error.ResponseError(_("unexpected response:"), d))
286
294
287 @batchable
295 @batchable
288 def known(self, nodes):
296 def known(self, nodes):
289 f = future()
297 f = future()
290 yield {'nodes': encodelist(nodes)}, f
298 yield {'nodes': encodelist(nodes)}, f
291 d = f.value
299 d = f.value
292 try:
300 try:
293 yield [bool(int(b)) for b in d]
301 yield [bool(int(b)) for b in d]
294 except ValueError:
302 except ValueError:
295 self._abort(error.ResponseError(_("unexpected response:"), d))
303 self._abort(error.ResponseError(_("unexpected response:"), d))
296
304
297 @batchable
305 @batchable
298 def branchmap(self):
306 def branchmap(self):
299 f = future()
307 f = future()
300 yield {}, f
308 yield {}, f
301 d = f.value
309 d = f.value
302 try:
310 try:
303 branchmap = {}
311 branchmap = {}
304 for branchpart in d.splitlines():
312 for branchpart in d.splitlines():
305 branchname, branchheads = branchpart.split(' ', 1)
313 branchname, branchheads = branchpart.split(' ', 1)
306 branchname = encoding.tolocal(urlreq.unquote(branchname))
314 branchname = encoding.tolocal(urlreq.unquote(branchname))
307 branchheads = decodelist(branchheads)
315 branchheads = decodelist(branchheads)
308 branchmap[branchname] = branchheads
316 branchmap[branchname] = branchheads
309 yield branchmap
317 yield branchmap
310 except TypeError:
318 except TypeError:
311 self._abort(error.ResponseError(_("unexpected response:"), d))
319 self._abort(error.ResponseError(_("unexpected response:"), d))
312
320
313 def branches(self, nodes):
321 def branches(self, nodes):
314 n = encodelist(nodes)
322 n = encodelist(nodes)
315 d = self._call("branches", nodes=n)
323 d = self._call("branches", nodes=n)
316 try:
324 try:
317 br = [tuple(decodelist(b)) for b in d.splitlines()]
325 br = [tuple(decodelist(b)) for b in d.splitlines()]
318 return br
326 return br
319 except ValueError:
327 except ValueError:
320 self._abort(error.ResponseError(_("unexpected response:"), d))
328 self._abort(error.ResponseError(_("unexpected response:"), d))
321
329
322 def between(self, pairs):
330 def between(self, pairs):
323 batch = 8 # avoid giant requests
331 batch = 8 # avoid giant requests
324 r = []
332 r = []
325 for i in xrange(0, len(pairs), batch):
333 for i in xrange(0, len(pairs), batch):
326 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
334 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
327 d = self._call("between", pairs=n)
335 d = self._call("between", pairs=n)
328 try:
336 try:
329 r.extend(l and decodelist(l) or [] for l in d.splitlines())
337 r.extend(l and decodelist(l) or [] for l in d.splitlines())
330 except ValueError:
338 except ValueError:
331 self._abort(error.ResponseError(_("unexpected response:"), d))
339 self._abort(error.ResponseError(_("unexpected response:"), d))
332 return r
340 return r
333
341
334 @batchable
342 @batchable
335 def pushkey(self, namespace, key, old, new):
343 def pushkey(self, namespace, key, old, new):
336 if not self.capable('pushkey'):
344 if not self.capable('pushkey'):
337 yield False, None
345 yield False, None
338 f = future()
346 f = future()
339 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
347 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
340 yield {'namespace': encoding.fromlocal(namespace),
348 yield {'namespace': encoding.fromlocal(namespace),
341 'key': encoding.fromlocal(key),
349 'key': encoding.fromlocal(key),
342 'old': encoding.fromlocal(old),
350 'old': encoding.fromlocal(old),
343 'new': encoding.fromlocal(new)}, f
351 'new': encoding.fromlocal(new)}, f
344 d = f.value
352 d = f.value
345 d, output = d.split('\n', 1)
353 d, output = d.split('\n', 1)
346 try:
354 try:
347 d = bool(int(d))
355 d = bool(int(d))
348 except ValueError:
356 except ValueError:
349 raise error.ResponseError(
357 raise error.ResponseError(
350 _('push failed (unexpected response):'), d)
358 _('push failed (unexpected response):'), d)
351 for l in output.splitlines(True):
359 for l in output.splitlines(True):
352 self.ui.status(_('remote: '), l)
360 self.ui.status(_('remote: '), l)
353 yield d
361 yield d
354
362
355 @batchable
363 @batchable
356 def listkeys(self, namespace):
364 def listkeys(self, namespace):
357 if not self.capable('pushkey'):
365 if not self.capable('pushkey'):
358 yield {}, None
366 yield {}, None
359 f = future()
367 f = future()
360 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
368 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
361 yield {'namespace': encoding.fromlocal(namespace)}, f
369 yield {'namespace': encoding.fromlocal(namespace)}, f
362 d = f.value
370 d = f.value
363 self.ui.debug('received listkey for "%s": %i bytes\n'
371 self.ui.debug('received listkey for "%s": %i bytes\n'
364 % (namespace, len(d)))
372 % (namespace, len(d)))
365 yield pushkeymod.decodekeys(d)
373 yield pushkeymod.decodekeys(d)
366
374
367 def stream_out(self):
375 def stream_out(self):
368 return self._callstream('stream_out')
376 return self._callstream('stream_out')
369
377
370 def changegroup(self, nodes, kind):
378 def changegroup(self, nodes, kind):
371 n = encodelist(nodes)
379 n = encodelist(nodes)
372 f = self._callcompressable("changegroup", roots=n)
380 f = self._callcompressable("changegroup", roots=n)
373 return changegroupmod.cg1unpacker(f, 'UN')
381 return changegroupmod.cg1unpacker(f, 'UN')
374
382
375 def changegroupsubset(self, bases, heads, kind):
383 def changegroupsubset(self, bases, heads, kind):
376 self.requirecap('changegroupsubset', _('look up remote changes'))
384 self.requirecap('changegroupsubset', _('look up remote changes'))
377 bases = encodelist(bases)
385 bases = encodelist(bases)
378 heads = encodelist(heads)
386 heads = encodelist(heads)
379 f = self._callcompressable("changegroupsubset",
387 f = self._callcompressable("changegroupsubset",
380 bases=bases, heads=heads)
388 bases=bases, heads=heads)
381 return changegroupmod.cg1unpacker(f, 'UN')
389 return changegroupmod.cg1unpacker(f, 'UN')
382
390
383 def getbundle(self, source, **kwargs):
391 def getbundle(self, source, **kwargs):
384 self.requirecap('getbundle', _('look up remote changes'))
392 self.requirecap('getbundle', _('look up remote changes'))
385 opts = {}
393 opts = {}
386 bundlecaps = kwargs.get('bundlecaps')
394 bundlecaps = kwargs.get('bundlecaps')
387 if bundlecaps is not None:
395 if bundlecaps is not None:
388 kwargs['bundlecaps'] = sorted(bundlecaps)
396 kwargs['bundlecaps'] = sorted(bundlecaps)
389 else:
397 else:
390 bundlecaps = () # kwargs could have it to None
398 bundlecaps = () # kwargs could have it to None
391 for key, value in kwargs.iteritems():
399 for key, value in kwargs.iteritems():
392 if value is None:
400 if value is None:
393 continue
401 continue
394 keytype = gboptsmap.get(key)
402 keytype = gboptsmap.get(key)
395 if keytype is None:
403 if keytype is None:
396 assert False, 'unexpected'
404 assert False, 'unexpected'
397 elif keytype == 'nodes':
405 elif keytype == 'nodes':
398 value = encodelist(value)
406 value = encodelist(value)
399 elif keytype in ('csv', 'scsv'):
407 elif keytype in ('csv', 'scsv'):
400 value = ','.join(value)
408 value = ','.join(value)
401 elif keytype == 'boolean':
409 elif keytype == 'boolean':
402 value = '%i' % bool(value)
410 value = '%i' % bool(value)
403 elif keytype != 'plain':
411 elif keytype != 'plain':
404 raise KeyError('unknown getbundle option type %s'
412 raise KeyError('unknown getbundle option type %s'
405 % keytype)
413 % keytype)
406 opts[key] = value
414 opts[key] = value
407 f = self._callcompressable("getbundle", **opts)
415 f = self._callcompressable("getbundle", **opts)
408 if any((cap.startswith('HG2') for cap in bundlecaps)):
416 if any((cap.startswith('HG2') for cap in bundlecaps)):
409 return bundle2.getunbundler(self.ui, f)
417 return bundle2.getunbundler(self.ui, f)
410 else:
418 else:
411 return changegroupmod.cg1unpacker(f, 'UN')
419 return changegroupmod.cg1unpacker(f, 'UN')
412
420
413 def unbundle(self, cg, heads, url):
421 def unbundle(self, cg, heads, url):
414 '''Send cg (a readable file-like object representing the
422 '''Send cg (a readable file-like object representing the
415 changegroup to push, typically a chunkbuffer object) to the
423 changegroup to push, typically a chunkbuffer object) to the
416 remote server as a bundle.
424 remote server as a bundle.
417
425
418 When pushing a bundle10 stream, return an integer indicating the
426 When pushing a bundle10 stream, return an integer indicating the
419 result of the push (see localrepository.addchangegroup()).
427 result of the push (see localrepository.addchangegroup()).
420
428
421 When pushing a bundle20 stream, return a bundle20 stream.
429 When pushing a bundle20 stream, return a bundle20 stream.
422
430
423 `url` is the url the client thinks it's pushing to, which is
431 `url` is the url the client thinks it's pushing to, which is
424 visible to hooks.
432 visible to hooks.
425 '''
433 '''
426
434
427 if heads != ['force'] and self.capable('unbundlehash'):
435 if heads != ['force'] and self.capable('unbundlehash'):
428 heads = encodelist(['hashed',
436 heads = encodelist(['hashed',
429 hashlib.sha1(''.join(sorted(heads))).digest()])
437 hashlib.sha1(''.join(sorted(heads))).digest()])
430 else:
438 else:
431 heads = encodelist(heads)
439 heads = encodelist(heads)
432
440
433 if util.safehasattr(cg, 'deltaheader'):
441 if util.safehasattr(cg, 'deltaheader'):
434 # this a bundle10, do the old style call sequence
442 # this a bundle10, do the old style call sequence
435 ret, output = self._callpush("unbundle", cg, heads=heads)
443 ret, output = self._callpush("unbundle", cg, heads=heads)
436 if ret == "":
444 if ret == "":
437 raise error.ResponseError(
445 raise error.ResponseError(
438 _('push failed:'), output)
446 _('push failed:'), output)
439 try:
447 try:
440 ret = int(ret)
448 ret = int(ret)
441 except ValueError:
449 except ValueError:
442 raise error.ResponseError(
450 raise error.ResponseError(
443 _('push failed (unexpected response):'), ret)
451 _('push failed (unexpected response):'), ret)
444
452
445 for l in output.splitlines(True):
453 for l in output.splitlines(True):
446 self.ui.status(_('remote: '), l)
454 self.ui.status(_('remote: '), l)
447 else:
455 else:
448 # bundle2 push. Send a stream, fetch a stream.
456 # bundle2 push. Send a stream, fetch a stream.
449 stream = self._calltwowaystream('unbundle', cg, heads=heads)
457 stream = self._calltwowaystream('unbundle', cg, heads=heads)
450 ret = bundle2.getunbundler(self.ui, stream)
458 ret = bundle2.getunbundler(self.ui, stream)
451 return ret
459 return ret
452
460
453 def debugwireargs(self, one, two, three=None, four=None, five=None):
461 def debugwireargs(self, one, two, three=None, four=None, five=None):
454 # don't pass optional arguments left at their default value
462 # don't pass optional arguments left at their default value
455 opts = {}
463 opts = {}
456 if three is not None:
464 if three is not None:
457 opts['three'] = three
465 opts['three'] = three
458 if four is not None:
466 if four is not None:
459 opts['four'] = four
467 opts['four'] = four
460 return self._call('debugwireargs', one=one, two=two, **opts)
468 return self._call('debugwireargs', one=one, two=two, **opts)
461
469
462 def _call(self, cmd, **args):
470 def _call(self, cmd, **args):
463 """execute <cmd> on the server
471 """execute <cmd> on the server
464
472
465 The command is expected to return a simple string.
473 The command is expected to return a simple string.
466
474
467 returns the server reply as a string."""
475 returns the server reply as a string."""
468 raise NotImplementedError()
476 raise NotImplementedError()
469
477
470 def _callstream(self, cmd, **args):
478 def _callstream(self, cmd, **args):
471 """execute <cmd> on the server
479 """execute <cmd> on the server
472
480
473 The command is expected to return a stream. Note that if the
481 The command is expected to return a stream. Note that if the
474 command doesn't return a stream, _callstream behaves
482 command doesn't return a stream, _callstream behaves
475 differently for ssh and http peers.
483 differently for ssh and http peers.
476
484
477 returns the server reply as a file like object.
485 returns the server reply as a file like object.
478 """
486 """
479 raise NotImplementedError()
487 raise NotImplementedError()
480
488
481 def _callcompressable(self, cmd, **args):
489 def _callcompressable(self, cmd, **args):
482 """execute <cmd> on the server
490 """execute <cmd> on the server
483
491
484 The command is expected to return a stream.
492 The command is expected to return a stream.
485
493
486 The stream may have been compressed in some implementations. This
494 The stream may have been compressed in some implementations. This
487 function takes care of the decompression. This is the only difference
495 function takes care of the decompression. This is the only difference
488 with _callstream.
496 with _callstream.
489
497
490 returns the server reply as a file like object.
498 returns the server reply as a file like object.
491 """
499 """
492 raise NotImplementedError()
500 raise NotImplementedError()
493
501
494 def _callpush(self, cmd, fp, **args):
502 def _callpush(self, cmd, fp, **args):
495 """execute a <cmd> on server
503 """execute a <cmd> on server
496
504
497 The command is expected to be related to a push. Push has a special
505 The command is expected to be related to a push. Push has a special
498 return method.
506 return method.
499
507
500 returns the server reply as a (ret, output) tuple. ret is either
508 returns the server reply as a (ret, output) tuple. ret is either
501 empty (error) or a stringified int.
509 empty (error) or a stringified int.
502 """
510 """
503 raise NotImplementedError()
511 raise NotImplementedError()
504
512
505 def _calltwowaystream(self, cmd, fp, **args):
513 def _calltwowaystream(self, cmd, fp, **args):
506 """execute <cmd> on server
514 """execute <cmd> on server
507
515
508 The command will send a stream to the server and get a stream in reply.
516 The command will send a stream to the server and get a stream in reply.
509 """
517 """
510 raise NotImplementedError()
518 raise NotImplementedError()
511
519
512 def _abort(self, exception):
520 def _abort(self, exception):
513 """clearly abort the wire protocol connection and raise the exception
521 """clearly abort the wire protocol connection and raise the exception
514 """
522 """
515 raise NotImplementedError()
523 raise NotImplementedError()
516
524
517 # server side
525 # server side
518
526
519 # wire protocol command can either return a string or one of these classes.
527 # wire protocol command can either return a string or one of these classes.
520 class streamres(object):
528 class streamres(object):
521 """wireproto reply: binary stream
529 """wireproto reply: binary stream
522
530
523 The call was successful and the result is a stream.
531 The call was successful and the result is a stream.
524 Iterate on the `self.gen` attribute to retrieve chunks.
532 Iterate on the `self.gen` attribute to retrieve chunks.
525 """
533 """
526 def __init__(self, gen):
534 def __init__(self, gen):
527 self.gen = gen
535 self.gen = gen
528
536
529 class pushres(object):
537 class pushres(object):
530 """wireproto reply: success with simple integer return
538 """wireproto reply: success with simple integer return
531
539
532 The call was successful and returned an integer contained in `self.res`.
540 The call was successful and returned an integer contained in `self.res`.
533 """
541 """
534 def __init__(self, res):
542 def __init__(self, res):
535 self.res = res
543 self.res = res
536
544
537 class pusherr(object):
545 class pusherr(object):
538 """wireproto reply: failure
546 """wireproto reply: failure
539
547
540 The call failed. The `self.res` attribute contains the error message.
548 The call failed. The `self.res` attribute contains the error message.
541 """
549 """
542 def __init__(self, res):
550 def __init__(self, res):
543 self.res = res
551 self.res = res
544
552
545 class ooberror(object):
553 class ooberror(object):
546 """wireproto reply: failure of a batch of operation
554 """wireproto reply: failure of a batch of operation
547
555
548 Something failed during a batch call. The error message is stored in
556 Something failed during a batch call. The error message is stored in
549 `self.message`.
557 `self.message`.
550 """
558 """
551 def __init__(self, message):
559 def __init__(self, message):
552 self.message = message
560 self.message = message
553
561
554 def getdispatchrepo(repo, proto, command):
562 def getdispatchrepo(repo, proto, command):
555 """Obtain the repo used for processing wire protocol commands.
563 """Obtain the repo used for processing wire protocol commands.
556
564
557 The intent of this function is to serve as a monkeypatch point for
565 The intent of this function is to serve as a monkeypatch point for
558 extensions that need commands to operate on different repo views under
566 extensions that need commands to operate on different repo views under
559 specialized circumstances.
567 specialized circumstances.
560 """
568 """
561 return repo.filtered('served')
569 return repo.filtered('served')
562
570
563 def dispatch(repo, proto, command):
571 def dispatch(repo, proto, command):
564 repo = getdispatchrepo(repo, proto, command)
572 repo = getdispatchrepo(repo, proto, command)
565 func, spec = commands[command]
573 func, spec = commands[command]
566 args = proto.getargs(spec)
574 args = proto.getargs(spec)
567 return func(repo, proto, *args)
575 return func(repo, proto, *args)
568
576
569 def options(cmd, keys, others):
577 def options(cmd, keys, others):
570 opts = {}
578 opts = {}
571 for k in keys:
579 for k in keys:
572 if k in others:
580 if k in others:
573 opts[k] = others[k]
581 opts[k] = others[k]
574 del others[k]
582 del others[k]
575 if others:
583 if others:
576 sys.stderr.write("warning: %s ignored unexpected arguments %s\n"
584 sys.stderr.write("warning: %s ignored unexpected arguments %s\n"
577 % (cmd, ",".join(others)))
585 % (cmd, ",".join(others)))
578 return opts
586 return opts
579
587
580 def bundle1allowed(repo, action):
588 def bundle1allowed(repo, action):
581 """Whether a bundle1 operation is allowed from the server.
589 """Whether a bundle1 operation is allowed from the server.
582
590
583 Priority is:
591 Priority is:
584
592
585 1. server.bundle1gd.<action> (if generaldelta active)
593 1. server.bundle1gd.<action> (if generaldelta active)
586 2. server.bundle1.<action>
594 2. server.bundle1.<action>
587 3. server.bundle1gd (if generaldelta active)
595 3. server.bundle1gd (if generaldelta active)
588 4. server.bundle1
596 4. server.bundle1
589 """
597 """
590 ui = repo.ui
598 ui = repo.ui
591 gd = 'generaldelta' in repo.requirements
599 gd = 'generaldelta' in repo.requirements
592
600
593 if gd:
601 if gd:
594 v = ui.configbool('server', 'bundle1gd.%s' % action, None)
602 v = ui.configbool('server', 'bundle1gd.%s' % action, None)
595 if v is not None:
603 if v is not None:
596 return v
604 return v
597
605
598 v = ui.configbool('server', 'bundle1.%s' % action, None)
606 v = ui.configbool('server', 'bundle1.%s' % action, None)
599 if v is not None:
607 if v is not None:
600 return v
608 return v
601
609
602 if gd:
610 if gd:
603 v = ui.configbool('server', 'bundle1gd', None)
611 v = ui.configbool('server', 'bundle1gd', None)
604 if v is not None:
612 if v is not None:
605 return v
613 return v
606
614
607 return ui.configbool('server', 'bundle1', True)
615 return ui.configbool('server', 'bundle1', True)
608
616
609 # list of commands
617 # list of commands
610 commands = {}
618 commands = {}
611
619
612 def wireprotocommand(name, args=''):
620 def wireprotocommand(name, args=''):
613 """decorator for wire protocol command"""
621 """decorator for wire protocol command"""
614 def register(func):
622 def register(func):
615 commands[name] = (func, args)
623 commands[name] = (func, args)
616 return func
624 return func
617 return register
625 return register
618
626
619 @wireprotocommand('batch', 'cmds *')
627 @wireprotocommand('batch', 'cmds *')
620 def batch(repo, proto, cmds, others):
628 def batch(repo, proto, cmds, others):
621 repo = repo.filtered("served")
629 repo = repo.filtered("served")
622 res = []
630 res = []
623 for pair in cmds.split(';'):
631 for pair in cmds.split(';'):
624 op, args = pair.split(' ', 1)
632 op, args = pair.split(' ', 1)
625 vals = {}
633 vals = {}
626 for a in args.split(','):
634 for a in args.split(','):
627 if a:
635 if a:
628 n, v = a.split('=')
636 n, v = a.split('=')
629 vals[unescapearg(n)] = unescapearg(v)
637 vals[unescapearg(n)] = unescapearg(v)
630 func, spec = commands[op]
638 func, spec = commands[op]
631 if spec:
639 if spec:
632 keys = spec.split()
640 keys = spec.split()
633 data = {}
641 data = {}
634 for k in keys:
642 for k in keys:
635 if k == '*':
643 if k == '*':
636 star = {}
644 star = {}
637 for key in vals.keys():
645 for key in vals.keys():
638 if key not in keys:
646 if key not in keys:
639 star[key] = vals[key]
647 star[key] = vals[key]
640 data['*'] = star
648 data['*'] = star
641 else:
649 else:
642 data[k] = vals[k]
650 data[k] = vals[k]
643 result = func(repo, proto, *[data[k] for k in keys])
651 result = func(repo, proto, *[data[k] for k in keys])
644 else:
652 else:
645 result = func(repo, proto)
653 result = func(repo, proto)
646 if isinstance(result, ooberror):
654 if isinstance(result, ooberror):
647 return result
655 return result
648 res.append(escapearg(result))
656 res.append(escapearg(result))
649 return ';'.join(res)
657 return ';'.join(res)
650
658
651 @wireprotocommand('between', 'pairs')
659 @wireprotocommand('between', 'pairs')
652 def between(repo, proto, pairs):
660 def between(repo, proto, pairs):
653 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
661 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
654 r = []
662 r = []
655 for b in repo.between(pairs):
663 for b in repo.between(pairs):
656 r.append(encodelist(b) + "\n")
664 r.append(encodelist(b) + "\n")
657 return "".join(r)
665 return "".join(r)
658
666
659 @wireprotocommand('branchmap')
667 @wireprotocommand('branchmap')
660 def branchmap(repo, proto):
668 def branchmap(repo, proto):
661 branchmap = repo.branchmap()
669 branchmap = repo.branchmap()
662 heads = []
670 heads = []
663 for branch, nodes in branchmap.iteritems():
671 for branch, nodes in branchmap.iteritems():
664 branchname = urlreq.quote(encoding.fromlocal(branch))
672 branchname = urlreq.quote(encoding.fromlocal(branch))
665 branchnodes = encodelist(nodes)
673 branchnodes = encodelist(nodes)
666 heads.append('%s %s' % (branchname, branchnodes))
674 heads.append('%s %s' % (branchname, branchnodes))
667 return '\n'.join(heads)
675 return '\n'.join(heads)
668
676
669 @wireprotocommand('branches', 'nodes')
677 @wireprotocommand('branches', 'nodes')
670 def branches(repo, proto, nodes):
678 def branches(repo, proto, nodes):
671 nodes = decodelist(nodes)
679 nodes = decodelist(nodes)
672 r = []
680 r = []
673 for b in repo.branches(nodes):
681 for b in repo.branches(nodes):
674 r.append(encodelist(b) + "\n")
682 r.append(encodelist(b) + "\n")
675 return "".join(r)
683 return "".join(r)
676
684
677 @wireprotocommand('clonebundles', '')
685 @wireprotocommand('clonebundles', '')
678 def clonebundles(repo, proto):
686 def clonebundles(repo, proto):
679 """Server command for returning info for available bundles to seed clones.
687 """Server command for returning info for available bundles to seed clones.
680
688
681 Clients will parse this response and determine what bundle to fetch.
689 Clients will parse this response and determine what bundle to fetch.
682
690
683 Extensions may wrap this command to filter or dynamically emit data
691 Extensions may wrap this command to filter or dynamically emit data
684 depending on the request. e.g. you could advertise URLs for the closest
692 depending on the request. e.g. you could advertise URLs for the closest
685 data center given the client's IP address.
693 data center given the client's IP address.
686 """
694 """
687 return repo.opener.tryread('clonebundles.manifest')
695 return repo.opener.tryread('clonebundles.manifest')
688
696
689 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
697 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
690 'known', 'getbundle', 'unbundlehash', 'batch']
698 'known', 'getbundle', 'unbundlehash', 'batch']
691
699
692 def _capabilities(repo, proto):
700 def _capabilities(repo, proto):
693 """return a list of capabilities for a repo
701 """return a list of capabilities for a repo
694
702
695 This function exists to allow extensions to easily wrap capabilities
703 This function exists to allow extensions to easily wrap capabilities
696 computation
704 computation
697
705
698 - returns a lists: easy to alter
706 - returns a lists: easy to alter
699 - change done here will be propagated to both `capabilities` and `hello`
707 - change done here will be propagated to both `capabilities` and `hello`
700 command without any other action needed.
708 command without any other action needed.
701 """
709 """
702 # copy to prevent modification of the global list
710 # copy to prevent modification of the global list
703 caps = list(wireprotocaps)
711 caps = list(wireprotocaps)
704 if streamclone.allowservergeneration(repo.ui):
712 if streamclone.allowservergeneration(repo.ui):
705 if repo.ui.configbool('server', 'preferuncompressed', False):
713 if repo.ui.configbool('server', 'preferuncompressed', False):
706 caps.append('stream-preferred')
714 caps.append('stream-preferred')
707 requiredformats = repo.requirements & repo.supportedformats
715 requiredformats = repo.requirements & repo.supportedformats
708 # if our local revlogs are just revlogv1, add 'stream' cap
716 # if our local revlogs are just revlogv1, add 'stream' cap
709 if not requiredformats - set(('revlogv1',)):
717 if not requiredformats - set(('revlogv1',)):
710 caps.append('stream')
718 caps.append('stream')
711 # otherwise, add 'streamreqs' detailing our local revlog format
719 # otherwise, add 'streamreqs' detailing our local revlog format
712 else:
720 else:
713 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
721 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
714 if repo.ui.configbool('experimental', 'bundle2-advertise', True):
722 if repo.ui.configbool('experimental', 'bundle2-advertise', True):
715 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
723 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
716 caps.append('bundle2=' + urlreq.quote(capsblob))
724 caps.append('bundle2=' + urlreq.quote(capsblob))
717 caps.append('unbundle=%s' % ','.join(bundle2.bundlepriority))
725 caps.append('unbundle=%s' % ','.join(bundle2.bundlepriority))
718 caps.append(
726 caps.append(
719 'httpheader=%d' % repo.ui.configint('server', 'maxhttpheaderlen', 1024))
727 'httpheader=%d' % repo.ui.configint('server', 'maxhttpheaderlen', 1024))
720 if repo.ui.configbool('experimental', 'httppostargs', False):
728 if repo.ui.configbool('experimental', 'httppostargs', False):
721 caps.append('httppostargs')
729 caps.append('httppostargs')
722 return caps
730 return caps
723
731
724 # If you are writing an extension and consider wrapping this function. Wrap
732 # If you are writing an extension and consider wrapping this function. Wrap
725 # `_capabilities` instead.
733 # `_capabilities` instead.
726 @wireprotocommand('capabilities')
734 @wireprotocommand('capabilities')
727 def capabilities(repo, proto):
735 def capabilities(repo, proto):
728 return ' '.join(_capabilities(repo, proto))
736 return ' '.join(_capabilities(repo, proto))
729
737
730 @wireprotocommand('changegroup', 'roots')
738 @wireprotocommand('changegroup', 'roots')
731 def changegroup(repo, proto, roots):
739 def changegroup(repo, proto, roots):
732 nodes = decodelist(roots)
740 nodes = decodelist(roots)
733 cg = changegroupmod.changegroup(repo, nodes, 'serve')
741 cg = changegroupmod.changegroup(repo, nodes, 'serve')
734 return streamres(proto.groupchunks(cg))
742 return streamres(proto.groupchunks(cg))
735
743
736 @wireprotocommand('changegroupsubset', 'bases heads')
744 @wireprotocommand('changegroupsubset', 'bases heads')
737 def changegroupsubset(repo, proto, bases, heads):
745 def changegroupsubset(repo, proto, bases, heads):
738 bases = decodelist(bases)
746 bases = decodelist(bases)
739 heads = decodelist(heads)
747 heads = decodelist(heads)
740 cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve')
748 cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve')
741 return streamres(proto.groupchunks(cg))
749 return streamres(proto.groupchunks(cg))
742
750
743 @wireprotocommand('debugwireargs', 'one two *')
751 @wireprotocommand('debugwireargs', 'one two *')
744 def debugwireargs(repo, proto, one, two, others):
752 def debugwireargs(repo, proto, one, two, others):
745 # only accept optional args from the known set
753 # only accept optional args from the known set
746 opts = options('debugwireargs', ['three', 'four'], others)
754 opts = options('debugwireargs', ['three', 'four'], others)
747 return repo.debugwireargs(one, two, **opts)
755 return repo.debugwireargs(one, two, **opts)
748
756
749 @wireprotocommand('getbundle', '*')
757 @wireprotocommand('getbundle', '*')
750 def getbundle(repo, proto, others):
758 def getbundle(repo, proto, others):
751 opts = options('getbundle', gboptsmap.keys(), others)
759 opts = options('getbundle', gboptsmap.keys(), others)
752 for k, v in opts.iteritems():
760 for k, v in opts.iteritems():
753 keytype = gboptsmap[k]
761 keytype = gboptsmap[k]
754 if keytype == 'nodes':
762 if keytype == 'nodes':
755 opts[k] = decodelist(v)
763 opts[k] = decodelist(v)
756 elif keytype == 'csv':
764 elif keytype == 'csv':
757 opts[k] = list(v.split(','))
765 opts[k] = list(v.split(','))
758 elif keytype == 'scsv':
766 elif keytype == 'scsv':
759 opts[k] = set(v.split(','))
767 opts[k] = set(v.split(','))
760 elif keytype == 'boolean':
768 elif keytype == 'boolean':
761 # Client should serialize False as '0', which is a non-empty string
769 # Client should serialize False as '0', which is a non-empty string
762 # so it evaluates as a True bool.
770 # so it evaluates as a True bool.
763 if v == '0':
771 if v == '0':
764 opts[k] = False
772 opts[k] = False
765 else:
773 else:
766 opts[k] = bool(v)
774 opts[k] = bool(v)
767 elif keytype != 'plain':
775 elif keytype != 'plain':
768 raise KeyError('unknown getbundle option type %s'
776 raise KeyError('unknown getbundle option type %s'
769 % keytype)
777 % keytype)
770
778
771 if not bundle1allowed(repo, 'pull'):
779 if not bundle1allowed(repo, 'pull'):
772 if not exchange.bundle2requested(opts.get('bundlecaps')):
780 if not exchange.bundle2requested(opts.get('bundlecaps')):
773 return ooberror(bundle2required)
781 return ooberror(bundle2required)
774
782
775 chunks = exchange.getbundlechunks(repo, 'serve', **opts)
783 chunks = exchange.getbundlechunks(repo, 'serve', **opts)
776 # TODO avoid util.chunkbuffer() here since it is adding overhead to
784 return streamres(proto.compresschunks(chunks))
777 # what is fundamentally a generator proxying operation.
778 return streamres(proto.groupchunks(util.chunkbuffer(chunks)))
779
785
780 @wireprotocommand('heads')
786 @wireprotocommand('heads')
781 def heads(repo, proto):
787 def heads(repo, proto):
782 h = repo.heads()
788 h = repo.heads()
783 return encodelist(h) + "\n"
789 return encodelist(h) + "\n"
784
790
785 @wireprotocommand('hello')
791 @wireprotocommand('hello')
786 def hello(repo, proto):
792 def hello(repo, proto):
787 '''the hello command returns a set of lines describing various
793 '''the hello command returns a set of lines describing various
788 interesting things about the server, in an RFC822-like format.
794 interesting things about the server, in an RFC822-like format.
789 Currently the only one defined is "capabilities", which
795 Currently the only one defined is "capabilities", which
790 consists of a line in the form:
796 consists of a line in the form:
791
797
792 capabilities: space separated list of tokens
798 capabilities: space separated list of tokens
793 '''
799 '''
794 return "capabilities: %s\n" % (capabilities(repo, proto))
800 return "capabilities: %s\n" % (capabilities(repo, proto))
795
801
796 @wireprotocommand('listkeys', 'namespace')
802 @wireprotocommand('listkeys', 'namespace')
797 def listkeys(repo, proto, namespace):
803 def listkeys(repo, proto, namespace):
798 d = repo.listkeys(encoding.tolocal(namespace)).items()
804 d = repo.listkeys(encoding.tolocal(namespace)).items()
799 return pushkeymod.encodekeys(d)
805 return pushkeymod.encodekeys(d)
800
806
801 @wireprotocommand('lookup', 'key')
807 @wireprotocommand('lookup', 'key')
802 def lookup(repo, proto, key):
808 def lookup(repo, proto, key):
803 try:
809 try:
804 k = encoding.tolocal(key)
810 k = encoding.tolocal(key)
805 c = repo[k]
811 c = repo[k]
806 r = c.hex()
812 r = c.hex()
807 success = 1
813 success = 1
808 except Exception as inst:
814 except Exception as inst:
809 r = str(inst)
815 r = str(inst)
810 success = 0
816 success = 0
811 return "%s %s\n" % (success, r)
817 return "%s %s\n" % (success, r)
812
818
813 @wireprotocommand('known', 'nodes *')
819 @wireprotocommand('known', 'nodes *')
814 def known(repo, proto, nodes, others):
820 def known(repo, proto, nodes, others):
815 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
821 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
816
822
817 @wireprotocommand('pushkey', 'namespace key old new')
823 @wireprotocommand('pushkey', 'namespace key old new')
818 def pushkey(repo, proto, namespace, key, old, new):
824 def pushkey(repo, proto, namespace, key, old, new):
819 # compatibility with pre-1.8 clients which were accidentally
825 # compatibility with pre-1.8 clients which were accidentally
820 # sending raw binary nodes rather than utf-8-encoded hex
826 # sending raw binary nodes rather than utf-8-encoded hex
821 if len(new) == 20 and new.encode('string-escape') != new:
827 if len(new) == 20 and new.encode('string-escape') != new:
822 # looks like it could be a binary node
828 # looks like it could be a binary node
823 try:
829 try:
824 new.decode('utf-8')
830 new.decode('utf-8')
825 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
831 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
826 except UnicodeDecodeError:
832 except UnicodeDecodeError:
827 pass # binary, leave unmodified
833 pass # binary, leave unmodified
828 else:
834 else:
829 new = encoding.tolocal(new) # normal path
835 new = encoding.tolocal(new) # normal path
830
836
831 if util.safehasattr(proto, 'restore'):
837 if util.safehasattr(proto, 'restore'):
832
838
833 proto.redirect()
839 proto.redirect()
834
840
835 try:
841 try:
836 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
842 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
837 encoding.tolocal(old), new) or False
843 encoding.tolocal(old), new) or False
838 except error.Abort:
844 except error.Abort:
839 r = False
845 r = False
840
846
841 output = proto.restore()
847 output = proto.restore()
842
848
843 return '%s\n%s' % (int(r), output)
849 return '%s\n%s' % (int(r), output)
844
850
845 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
851 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
846 encoding.tolocal(old), new)
852 encoding.tolocal(old), new)
847 return '%s\n' % int(r)
853 return '%s\n' % int(r)
848
854
849 @wireprotocommand('stream_out')
855 @wireprotocommand('stream_out')
850 def stream(repo, proto):
856 def stream(repo, proto):
851 '''If the server supports streaming clone, it advertises the "stream"
857 '''If the server supports streaming clone, it advertises the "stream"
852 capability with a value representing the version and flags of the repo
858 capability with a value representing the version and flags of the repo
853 it is serving. Client checks to see if it understands the format.
859 it is serving. Client checks to see if it understands the format.
854 '''
860 '''
855 if not streamclone.allowservergeneration(repo.ui):
861 if not streamclone.allowservergeneration(repo.ui):
856 return '1\n'
862 return '1\n'
857
863
858 def getstream(it):
864 def getstream(it):
859 yield '0\n'
865 yield '0\n'
860 for chunk in it:
866 for chunk in it:
861 yield chunk
867 yield chunk
862
868
863 try:
869 try:
864 # LockError may be raised before the first result is yielded. Don't
870 # LockError may be raised before the first result is yielded. Don't
865 # emit output until we're sure we got the lock successfully.
871 # emit output until we're sure we got the lock successfully.
866 it = streamclone.generatev1wireproto(repo)
872 it = streamclone.generatev1wireproto(repo)
867 return streamres(getstream(it))
873 return streamres(getstream(it))
868 except error.LockError:
874 except error.LockError:
869 return '2\n'
875 return '2\n'
870
876
871 @wireprotocommand('unbundle', 'heads')
877 @wireprotocommand('unbundle', 'heads')
872 def unbundle(repo, proto, heads):
878 def unbundle(repo, proto, heads):
873 their_heads = decodelist(heads)
879 their_heads = decodelist(heads)
874
880
875 try:
881 try:
876 proto.redirect()
882 proto.redirect()
877
883
878 exchange.check_heads(repo, their_heads, 'preparing changes')
884 exchange.check_heads(repo, their_heads, 'preparing changes')
879
885
880 # write bundle data to temporary file because it can be big
886 # write bundle data to temporary file because it can be big
881 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
887 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
882 fp = os.fdopen(fd, 'wb+')
888 fp = os.fdopen(fd, 'wb+')
883 r = 0
889 r = 0
884 try:
890 try:
885 proto.getfile(fp)
891 proto.getfile(fp)
886 fp.seek(0)
892 fp.seek(0)
887 gen = exchange.readbundle(repo.ui, fp, None)
893 gen = exchange.readbundle(repo.ui, fp, None)
888 if (isinstance(gen, changegroupmod.cg1unpacker)
894 if (isinstance(gen, changegroupmod.cg1unpacker)
889 and not bundle1allowed(repo, 'push')):
895 and not bundle1allowed(repo, 'push')):
890 return ooberror(bundle2required)
896 return ooberror(bundle2required)
891
897
892 r = exchange.unbundle(repo, gen, their_heads, 'serve',
898 r = exchange.unbundle(repo, gen, their_heads, 'serve',
893 proto._client())
899 proto._client())
894 if util.safehasattr(r, 'addpart'):
900 if util.safehasattr(r, 'addpart'):
895 # The return looks streamable, we are in the bundle2 case and
901 # The return looks streamable, we are in the bundle2 case and
896 # should return a stream.
902 # should return a stream.
897 return streamres(r.getchunks())
903 return streamres(r.getchunks())
898 return pushres(r)
904 return pushres(r)
899
905
900 finally:
906 finally:
901 fp.close()
907 fp.close()
902 os.unlink(tempname)
908 os.unlink(tempname)
903
909
904 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
910 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
905 # handle non-bundle2 case first
911 # handle non-bundle2 case first
906 if not getattr(exc, 'duringunbundle2', False):
912 if not getattr(exc, 'duringunbundle2', False):
907 try:
913 try:
908 raise
914 raise
909 except error.Abort:
915 except error.Abort:
910 # The old code we moved used sys.stderr directly.
916 # The old code we moved used sys.stderr directly.
911 # We did not change it to minimise code change.
917 # We did not change it to minimise code change.
912 # This need to be moved to something proper.
918 # This need to be moved to something proper.
913 # Feel free to do it.
919 # Feel free to do it.
914 sys.stderr.write("abort: %s\n" % exc)
920 sys.stderr.write("abort: %s\n" % exc)
915 return pushres(0)
921 return pushres(0)
916 except error.PushRaced:
922 except error.PushRaced:
917 return pusherr(str(exc))
923 return pusherr(str(exc))
918
924
919 bundler = bundle2.bundle20(repo.ui)
925 bundler = bundle2.bundle20(repo.ui)
920 for out in getattr(exc, '_bundle2salvagedoutput', ()):
926 for out in getattr(exc, '_bundle2salvagedoutput', ()):
921 bundler.addpart(out)
927 bundler.addpart(out)
922 try:
928 try:
923 try:
929 try:
924 raise
930 raise
925 except error.PushkeyFailed as exc:
931 except error.PushkeyFailed as exc:
926 # check client caps
932 # check client caps
927 remotecaps = getattr(exc, '_replycaps', None)
933 remotecaps = getattr(exc, '_replycaps', None)
928 if (remotecaps is not None
934 if (remotecaps is not None
929 and 'pushkey' not in remotecaps.get('error', ())):
935 and 'pushkey' not in remotecaps.get('error', ())):
930 # no support remote side, fallback to Abort handler.
936 # no support remote side, fallback to Abort handler.
931 raise
937 raise
932 part = bundler.newpart('error:pushkey')
938 part = bundler.newpart('error:pushkey')
933 part.addparam('in-reply-to', exc.partid)
939 part.addparam('in-reply-to', exc.partid)
934 if exc.namespace is not None:
940 if exc.namespace is not None:
935 part.addparam('namespace', exc.namespace, mandatory=False)
941 part.addparam('namespace', exc.namespace, mandatory=False)
936 if exc.key is not None:
942 if exc.key is not None:
937 part.addparam('key', exc.key, mandatory=False)
943 part.addparam('key', exc.key, mandatory=False)
938 if exc.new is not None:
944 if exc.new is not None:
939 part.addparam('new', exc.new, mandatory=False)
945 part.addparam('new', exc.new, mandatory=False)
940 if exc.old is not None:
946 if exc.old is not None:
941 part.addparam('old', exc.old, mandatory=False)
947 part.addparam('old', exc.old, mandatory=False)
942 if exc.ret is not None:
948 if exc.ret is not None:
943 part.addparam('ret', exc.ret, mandatory=False)
949 part.addparam('ret', exc.ret, mandatory=False)
944 except error.BundleValueError as exc:
950 except error.BundleValueError as exc:
945 errpart = bundler.newpart('error:unsupportedcontent')
951 errpart = bundler.newpart('error:unsupportedcontent')
946 if exc.parttype is not None:
952 if exc.parttype is not None:
947 errpart.addparam('parttype', exc.parttype)
953 errpart.addparam('parttype', exc.parttype)
948 if exc.params:
954 if exc.params:
949 errpart.addparam('params', '\0'.join(exc.params))
955 errpart.addparam('params', '\0'.join(exc.params))
950 except error.Abort as exc:
956 except error.Abort as exc:
951 manargs = [('message', str(exc))]
957 manargs = [('message', str(exc))]
952 advargs = []
958 advargs = []
953 if exc.hint is not None:
959 if exc.hint is not None:
954 advargs.append(('hint', exc.hint))
960 advargs.append(('hint', exc.hint))
955 bundler.addpart(bundle2.bundlepart('error:abort',
961 bundler.addpart(bundle2.bundlepart('error:abort',
956 manargs, advargs))
962 manargs, advargs))
957 except error.PushRaced as exc:
963 except error.PushRaced as exc:
958 bundler.newpart('error:pushraced', [('message', str(exc))])
964 bundler.newpart('error:pushraced', [('message', str(exc))])
959 return streamres(bundler.getchunks())
965 return streamres(bundler.getchunks())
General Comments 0
You need to be logged in to leave comments. Login now