##// END OF EJS Templates
wireproto: overhaul iterating batcher code (API)...
Gregory Szorc -
r33761:4c706037 default
parent child Browse files
Show More
@@ -1,152 +1,153 b''
1 # peer.py - repository base classes for mercurial
1 # peer.py - repository base classes for mercurial
2 #
2 #
3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
4 # Copyright 2006 Vadim Gelfer <vadim.gelfer@gmail.com>
4 # Copyright 2006 Vadim Gelfer <vadim.gelfer@gmail.com>
5 #
5 #
6 # This software may be used and distributed according to the terms of the
6 # This software may be used and distributed according to the terms of the
7 # GNU General Public License version 2 or any later version.
7 # GNU General Public License version 2 or any later version.
8
8
9 from __future__ import absolute_import
9 from __future__ import absolute_import
10
10
11 from .i18n import _
11 from .i18n import _
12 from . import (
12 from . import (
13 error,
13 error,
14 util,
14 util,
15 )
15 )
16
16
17 # abstract batching support
17 # abstract batching support
18
18
19 class future(object):
19 class future(object):
20 '''placeholder for a value to be set later'''
20 '''placeholder for a value to be set later'''
21 def set(self, value):
21 def set(self, value):
22 if util.safehasattr(self, 'value'):
22 if util.safehasattr(self, 'value'):
23 raise error.RepoError("future is already set")
23 raise error.RepoError("future is already set")
24 self.value = value
24 self.value = value
25
25
26 class batcher(object):
26 class batcher(object):
27 '''base class for batches of commands submittable in a single request
27 '''base class for batches of commands submittable in a single request
28
28
29 All methods invoked on instances of this class are simply queued and
29 All methods invoked on instances of this class are simply queued and
30 return a a future for the result. Once you call submit(), all the queued
30 return a a future for the result. Once you call submit(), all the queued
31 calls are performed and the results set in their respective futures.
31 calls are performed and the results set in their respective futures.
32 '''
32 '''
33 def __init__(self):
33 def __init__(self):
34 self.calls = []
34 self.calls = []
35 def __getattr__(self, name):
35 def __getattr__(self, name):
36 def call(*args, **opts):
36 def call(*args, **opts):
37 resref = future()
37 resref = future()
38 self.calls.append((name, args, opts, resref,))
38 self.calls.append((name, args, opts, resref,))
39 return resref
39 return resref
40 return call
40 return call
41 def submit(self):
41 def submit(self):
42 raise NotImplementedError()
42 raise NotImplementedError()
43
43
44 class iterbatcher(batcher):
44 class iterbatcher(batcher):
45
45
46 def submit(self):
46 def submit(self):
47 raise NotImplementedError()
47 raise NotImplementedError()
48
48
49 def results(self):
49 def results(self):
50 raise NotImplementedError()
50 raise NotImplementedError()
51
51
52 class localbatch(batcher):
52 class localbatch(batcher):
53 '''performs the queued calls directly'''
53 '''performs the queued calls directly'''
54 def __init__(self, local):
54 def __init__(self, local):
55 batcher.__init__(self)
55 batcher.__init__(self)
56 self.local = local
56 self.local = local
57 def submit(self):
57 def submit(self):
58 for name, args, opts, resref in self.calls:
58 for name, args, opts, resref in self.calls:
59 resref.set(getattr(self.local, name)(*args, **opts))
59 resref.set(getattr(self.local, name)(*args, **opts))
60
60
61 class localiterbatcher(iterbatcher):
61 class localiterbatcher(iterbatcher):
62 def __init__(self, local):
62 def __init__(self, local):
63 super(iterbatcher, self).__init__()
63 super(iterbatcher, self).__init__()
64 self.local = local
64 self.local = local
65
65
66 def submit(self):
66 def submit(self):
67 # submit for a local iter batcher is a noop
67 # submit for a local iter batcher is a noop
68 pass
68 pass
69
69
70 def results(self):
70 def results(self):
71 for name, args, opts, resref in self.calls:
71 for name, args, opts, resref in self.calls:
72 yield getattr(self.local, name)(*args, **opts)
72 resref.set(getattr(self.local, name)(*args, **opts))
73 yield resref.value
73
74
74 def batchable(f):
75 def batchable(f):
75 '''annotation for batchable methods
76 '''annotation for batchable methods
76
77
77 Such methods must implement a coroutine as follows:
78 Such methods must implement a coroutine as follows:
78
79
79 @batchable
80 @batchable
80 def sample(self, one, two=None):
81 def sample(self, one, two=None):
81 # Build list of encoded arguments suitable for your wire protocol:
82 # Build list of encoded arguments suitable for your wire protocol:
82 encargs = [('one', encode(one),), ('two', encode(two),)]
83 encargs = [('one', encode(one),), ('two', encode(two),)]
83 # Create future for injection of encoded result:
84 # Create future for injection of encoded result:
84 encresref = future()
85 encresref = future()
85 # Return encoded arguments and future:
86 # Return encoded arguments and future:
86 yield encargs, encresref
87 yield encargs, encresref
87 # Assuming the future to be filled with the result from the batched
88 # Assuming the future to be filled with the result from the batched
88 # request now. Decode it:
89 # request now. Decode it:
89 yield decode(encresref.value)
90 yield decode(encresref.value)
90
91
91 The decorator returns a function which wraps this coroutine as a plain
92 The decorator returns a function which wraps this coroutine as a plain
92 method, but adds the original method as an attribute called "batchable",
93 method, but adds the original method as an attribute called "batchable",
93 which is used by remotebatch to split the call into separate encoding and
94 which is used by remotebatch to split the call into separate encoding and
94 decoding phases.
95 decoding phases.
95 '''
96 '''
96 def plain(*args, **opts):
97 def plain(*args, **opts):
97 batchable = f(*args, **opts)
98 batchable = f(*args, **opts)
98 encargsorres, encresref = next(batchable)
99 encargsorres, encresref = next(batchable)
99 if not encresref:
100 if not encresref:
100 return encargsorres # a local result in this case
101 return encargsorres # a local result in this case
101 self = args[0]
102 self = args[0]
102 encresref.set(self._submitone(f.func_name, encargsorres))
103 encresref.set(self._submitone(f.func_name, encargsorres))
103 return next(batchable)
104 return next(batchable)
104 setattr(plain, 'batchable', f)
105 setattr(plain, 'batchable', f)
105 return plain
106 return plain
106
107
107 class peerrepository(object):
108 class peerrepository(object):
108
109
109 def batch(self):
110 def batch(self):
110 return localbatch(self)
111 return localbatch(self)
111
112
112 def iterbatch(self):
113 def iterbatch(self):
113 """Batch requests but allow iterating over the results.
114 """Batch requests but allow iterating over the results.
114
115
115 This is to allow interleaving responses with things like
116 This is to allow interleaving responses with things like
116 progress updates for clients.
117 progress updates for clients.
117 """
118 """
118 return localiterbatcher(self)
119 return localiterbatcher(self)
119
120
120 def capable(self, name):
121 def capable(self, name):
121 '''tell whether repo supports named capability.
122 '''tell whether repo supports named capability.
122 return False if not supported.
123 return False if not supported.
123 if boolean capability, return True.
124 if boolean capability, return True.
124 if string capability, return string.'''
125 if string capability, return string.'''
125 caps = self._capabilities()
126 caps = self._capabilities()
126 if name in caps:
127 if name in caps:
127 return True
128 return True
128 name_eq = name + '='
129 name_eq = name + '='
129 for cap in caps:
130 for cap in caps:
130 if cap.startswith(name_eq):
131 if cap.startswith(name_eq):
131 return cap[len(name_eq):]
132 return cap[len(name_eq):]
132 return False
133 return False
133
134
134 def requirecap(self, name, purpose):
135 def requirecap(self, name, purpose):
135 '''raise an exception if the given capability is not present'''
136 '''raise an exception if the given capability is not present'''
136 if not self.capable(name):
137 if not self.capable(name):
137 raise error.CapabilityError(
138 raise error.CapabilityError(
138 _('cannot %s; remote repository does not '
139 _('cannot %s; remote repository does not '
139 'support the %r capability') % (purpose, name))
140 'support the %r capability') % (purpose, name))
140
141
141 def local(self):
142 def local(self):
142 '''return peer as a localrepo, or None'''
143 '''return peer as a localrepo, or None'''
143 return None
144 return None
144
145
145 def peer(self):
146 def peer(self):
146 return self
147 return self
147
148
148 def canpush(self):
149 def canpush(self):
149 return True
150 return True
150
151
151 def close(self):
152 def close(self):
152 pass
153 pass
@@ -1,1064 +1,1088 b''
1 # wireproto.py - generic wire protocol support functions
1 # wireproto.py - generic wire protocol support functions
2 #
2 #
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import hashlib
10 import hashlib
11 import itertools
11 import itertools
12 import os
12 import os
13 import tempfile
13 import tempfile
14
14
15 from .i18n import _
15 from .i18n import _
16 from .node import (
16 from .node import (
17 bin,
17 bin,
18 hex,
18 hex,
19 nullid,
19 nullid,
20 )
20 )
21
21
22 from . import (
22 from . import (
23 bundle2,
23 bundle2,
24 changegroup as changegroupmod,
24 changegroup as changegroupmod,
25 encoding,
25 encoding,
26 error,
26 error,
27 exchange,
27 exchange,
28 peer,
28 peer,
29 pushkey as pushkeymod,
29 pushkey as pushkeymod,
30 pycompat,
30 pycompat,
31 streamclone,
31 streamclone,
32 util,
32 util,
33 )
33 )
34
34
35 urlerr = util.urlerr
35 urlerr = util.urlerr
36 urlreq = util.urlreq
36 urlreq = util.urlreq
37
37
38 bundle2requiredmain = _('incompatible Mercurial client; bundle2 required')
38 bundle2requiredmain = _('incompatible Mercurial client; bundle2 required')
39 bundle2requiredhint = _('see https://www.mercurial-scm.org/wiki/'
39 bundle2requiredhint = _('see https://www.mercurial-scm.org/wiki/'
40 'IncompatibleClient')
40 'IncompatibleClient')
41 bundle2required = '%s\n(%s)\n' % (bundle2requiredmain, bundle2requiredhint)
41 bundle2required = '%s\n(%s)\n' % (bundle2requiredmain, bundle2requiredhint)
42
42
43 class abstractserverproto(object):
43 class abstractserverproto(object):
44 """abstract class that summarizes the protocol API
44 """abstract class that summarizes the protocol API
45
45
46 Used as reference and documentation.
46 Used as reference and documentation.
47 """
47 """
48
48
49 def getargs(self, args):
49 def getargs(self, args):
50 """return the value for arguments in <args>
50 """return the value for arguments in <args>
51
51
52 returns a list of values (same order as <args>)"""
52 returns a list of values (same order as <args>)"""
53 raise NotImplementedError()
53 raise NotImplementedError()
54
54
55 def getfile(self, fp):
55 def getfile(self, fp):
56 """write the whole content of a file into a file like object
56 """write the whole content of a file into a file like object
57
57
58 The file is in the form::
58 The file is in the form::
59
59
60 (<chunk-size>\n<chunk>)+0\n
60 (<chunk-size>\n<chunk>)+0\n
61
61
62 chunk size is the ascii version of the int.
62 chunk size is the ascii version of the int.
63 """
63 """
64 raise NotImplementedError()
64 raise NotImplementedError()
65
65
66 def redirect(self):
66 def redirect(self):
67 """may setup interception for stdout and stderr
67 """may setup interception for stdout and stderr
68
68
69 See also the `restore` method."""
69 See also the `restore` method."""
70 raise NotImplementedError()
70 raise NotImplementedError()
71
71
72 # If the `redirect` function does install interception, the `restore`
72 # If the `redirect` function does install interception, the `restore`
73 # function MUST be defined. If interception is not used, this function
73 # function MUST be defined. If interception is not used, this function
74 # MUST NOT be defined.
74 # MUST NOT be defined.
75 #
75 #
76 # left commented here on purpose
76 # left commented here on purpose
77 #
77 #
78 #def restore(self):
78 #def restore(self):
79 # """reinstall previous stdout and stderr and return intercepted stdout
79 # """reinstall previous stdout and stderr and return intercepted stdout
80 # """
80 # """
81 # raise NotImplementedError()
81 # raise NotImplementedError()
82
82
83 class remotebatch(peer.batcher):
83 class remotebatch(peer.batcher):
84 '''batches the queued calls; uses as few roundtrips as possible'''
84 '''batches the queued calls; uses as few roundtrips as possible'''
85 def __init__(self, remote):
85 def __init__(self, remote):
86 '''remote must support _submitbatch(encbatch) and
86 '''remote must support _submitbatch(encbatch) and
87 _submitone(op, encargs)'''
87 _submitone(op, encargs)'''
88 peer.batcher.__init__(self)
88 peer.batcher.__init__(self)
89 self.remote = remote
89 self.remote = remote
90 def submit(self):
90 def submit(self):
91 req, rsp = [], []
91 req, rsp = [], []
92 for name, args, opts, resref in self.calls:
92 for name, args, opts, resref in self.calls:
93 mtd = getattr(self.remote, name)
93 mtd = getattr(self.remote, name)
94 batchablefn = getattr(mtd, 'batchable', None)
94 batchablefn = getattr(mtd, 'batchable', None)
95 if batchablefn is not None:
95 if batchablefn is not None:
96 batchable = batchablefn(mtd.im_self, *args, **opts)
96 batchable = batchablefn(mtd.im_self, *args, **opts)
97 encargsorres, encresref = next(batchable)
97 encargsorres, encresref = next(batchable)
98 assert encresref
98 assert encresref
99 req.append((name, encargsorres,))
99 req.append((name, encargsorres,))
100 rsp.append((batchable, encresref, resref,))
100 rsp.append((batchable, encresref, resref,))
101 else:
101 else:
102 if req:
102 if req:
103 self._submitreq(req, rsp)
103 self._submitreq(req, rsp)
104 req, rsp = [], []
104 req, rsp = [], []
105 resref.set(mtd(*args, **opts))
105 resref.set(mtd(*args, **opts))
106 if req:
106 if req:
107 self._submitreq(req, rsp)
107 self._submitreq(req, rsp)
108 def _submitreq(self, req, rsp):
108 def _submitreq(self, req, rsp):
109 encresults = self.remote._submitbatch(req)
109 encresults = self.remote._submitbatch(req)
110 for encres, r in zip(encresults, rsp):
110 for encres, r in zip(encresults, rsp):
111 batchable, encresref, resref = r
111 batchable, encresref, resref = r
112 encresref.set(encres)
112 encresref.set(encres)
113 resref.set(next(batchable))
113 resref.set(next(batchable))
114
114
115 class remoteiterbatcher(peer.iterbatcher):
115 class remoteiterbatcher(peer.iterbatcher):
116 def __init__(self, remote):
116 def __init__(self, remote):
117 super(remoteiterbatcher, self).__init__()
117 super(remoteiterbatcher, self).__init__()
118 self._remote = remote
118 self._remote = remote
119
119
120 def __getattr__(self, name):
120 def __getattr__(self, name):
121 # Validate this method is batchable, since submit() only supports
121 # Validate this method is batchable, since submit() only supports
122 # batchable methods.
122 # batchable methods.
123 fn = getattr(self._remote, name)
123 fn = getattr(self._remote, name)
124 if not getattr(fn, 'batchable', None):
124 if not getattr(fn, 'batchable', None):
125 raise error.ProgrammingError('Attempted to batch a non-batchable '
125 raise error.ProgrammingError('Attempted to batch a non-batchable '
126 'call to %r' % name)
126 'call to %r' % name)
127
127
128 return super(remoteiterbatcher, self).__getattr__(name)
128 return super(remoteiterbatcher, self).__getattr__(name)
129
129
130 def submit(self):
130 def submit(self):
131 """Break the batch request into many patch calls and pipeline them.
131 """Break the batch request into many patch calls and pipeline them.
132
132
133 This is mostly valuable over http where request sizes can be
133 This is mostly valuable over http where request sizes can be
134 limited, but can be used in other places as well.
134 limited, but can be used in other places as well.
135 """
135 """
136 req, rsp = [], []
136 # 2-tuple of (command, arguments) that represents what will be
137 for name, args, opts, resref in self.calls:
137 # sent over the wire.
138 mtd = getattr(self._remote, name)
138 requests = []
139
140 # 4-tuple of (command, final future, @batchable generator, remote
141 # future).
142 results = []
143
144 for command, args, opts, finalfuture in self.calls:
145 mtd = getattr(self._remote, command)
139 batchable = mtd.batchable(mtd.im_self, *args, **opts)
146 batchable = mtd.batchable(mtd.im_self, *args, **opts)
140 encargsorres, encresref = next(batchable)
147
141 assert encresref
148 commandargs, fremote = next(batchable)
142 req.append((name, encargsorres))
149 assert fremote
143 rsp.append((batchable, encresref))
150 requests.append((command, commandargs))
144 if req:
151 results.append((command, finalfuture, batchable, fremote))
145 self._resultiter = self._remote._submitbatch(req)
152
146 self._rsp = rsp
153 if requests:
154 self._resultiter = self._remote._submitbatch(requests)
155
156 self._results = results
147
157
148 def results(self):
158 def results(self):
149 for (batchable, encresref), encres in itertools.izip(
159 for command, finalfuture, batchable, remotefuture in self._results:
150 self._rsp, self._resultiter):
160 # Get the raw result, set it in the remote future, feed it
151 encresref.set(encres)
161 # back into the @batchable generator so it can be decoded, and
152 yield next(batchable)
162 # set the result on the final future to this value.
163 remoteresult = next(self._resultiter)
164 remotefuture.set(remoteresult)
165 finalfuture.set(next(batchable))
166
167 # Verify our @batchable generators only emit 2 values.
168 try:
169 next(batchable)
170 except StopIteration:
171 pass
172 else:
173 raise error.ProgrammingError('%s @batchable generator emitted '
174 'unexpected value count' % command)
175
176 yield finalfuture.value
153
177
154 # Forward a couple of names from peer to make wireproto interactions
178 # Forward a couple of names from peer to make wireproto interactions
155 # slightly more sensible.
179 # slightly more sensible.
156 batchable = peer.batchable
180 batchable = peer.batchable
157 future = peer.future
181 future = peer.future
158
182
159 # list of nodes encoding / decoding
183 # list of nodes encoding / decoding
160
184
161 def decodelist(l, sep=' '):
185 def decodelist(l, sep=' '):
162 if l:
186 if l:
163 return map(bin, l.split(sep))
187 return map(bin, l.split(sep))
164 return []
188 return []
165
189
166 def encodelist(l, sep=' '):
190 def encodelist(l, sep=' '):
167 try:
191 try:
168 return sep.join(map(hex, l))
192 return sep.join(map(hex, l))
169 except TypeError:
193 except TypeError:
170 raise
194 raise
171
195
172 # batched call argument encoding
196 # batched call argument encoding
173
197
174 def escapearg(plain):
198 def escapearg(plain):
175 return (plain
199 return (plain
176 .replace(':', ':c')
200 .replace(':', ':c')
177 .replace(',', ':o')
201 .replace(',', ':o')
178 .replace(';', ':s')
202 .replace(';', ':s')
179 .replace('=', ':e'))
203 .replace('=', ':e'))
180
204
181 def unescapearg(escaped):
205 def unescapearg(escaped):
182 return (escaped
206 return (escaped
183 .replace(':e', '=')
207 .replace(':e', '=')
184 .replace(':s', ';')
208 .replace(':s', ';')
185 .replace(':o', ',')
209 .replace(':o', ',')
186 .replace(':c', ':'))
210 .replace(':c', ':'))
187
211
188 def encodebatchcmds(req):
212 def encodebatchcmds(req):
189 """Return a ``cmds`` argument value for the ``batch`` command."""
213 """Return a ``cmds`` argument value for the ``batch`` command."""
190 cmds = []
214 cmds = []
191 for op, argsdict in req:
215 for op, argsdict in req:
192 # Old servers didn't properly unescape argument names. So prevent
216 # Old servers didn't properly unescape argument names. So prevent
193 # the sending of argument names that may not be decoded properly by
217 # the sending of argument names that may not be decoded properly by
194 # servers.
218 # servers.
195 assert all(escapearg(k) == k for k in argsdict)
219 assert all(escapearg(k) == k for k in argsdict)
196
220
197 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
221 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
198 for k, v in argsdict.iteritems())
222 for k, v in argsdict.iteritems())
199 cmds.append('%s %s' % (op, args))
223 cmds.append('%s %s' % (op, args))
200
224
201 return ';'.join(cmds)
225 return ';'.join(cmds)
202
226
203 # mapping of options accepted by getbundle and their types
227 # mapping of options accepted by getbundle and their types
204 #
228 #
205 # Meant to be extended by extensions. It is extensions responsibility to ensure
229 # Meant to be extended by extensions. It is extensions responsibility to ensure
206 # such options are properly processed in exchange.getbundle.
230 # such options are properly processed in exchange.getbundle.
207 #
231 #
208 # supported types are:
232 # supported types are:
209 #
233 #
210 # :nodes: list of binary nodes
234 # :nodes: list of binary nodes
211 # :csv: list of comma-separated values
235 # :csv: list of comma-separated values
212 # :scsv: list of comma-separated values return as set
236 # :scsv: list of comma-separated values return as set
213 # :plain: string with no transformation needed.
237 # :plain: string with no transformation needed.
214 gboptsmap = {'heads': 'nodes',
238 gboptsmap = {'heads': 'nodes',
215 'common': 'nodes',
239 'common': 'nodes',
216 'obsmarkers': 'boolean',
240 'obsmarkers': 'boolean',
217 'bundlecaps': 'scsv',
241 'bundlecaps': 'scsv',
218 'listkeys': 'csv',
242 'listkeys': 'csv',
219 'cg': 'boolean',
243 'cg': 'boolean',
220 'cbattempted': 'boolean'}
244 'cbattempted': 'boolean'}
221
245
222 # client side
246 # client side
223
247
224 class wirepeer(peer.peerrepository):
248 class wirepeer(peer.peerrepository):
225 """Client-side interface for communicating with a peer repository.
249 """Client-side interface for communicating with a peer repository.
226
250
227 Methods commonly call wire protocol commands of the same name.
251 Methods commonly call wire protocol commands of the same name.
228
252
229 See also httppeer.py and sshpeer.py for protocol-specific
253 See also httppeer.py and sshpeer.py for protocol-specific
230 implementations of this interface.
254 implementations of this interface.
231 """
255 """
232 def batch(self):
256 def batch(self):
233 if self.capable('batch'):
257 if self.capable('batch'):
234 return remotebatch(self)
258 return remotebatch(self)
235 else:
259 else:
236 return peer.localbatch(self)
260 return peer.localbatch(self)
237 def _submitbatch(self, req):
261 def _submitbatch(self, req):
238 """run batch request <req> on the server
262 """run batch request <req> on the server
239
263
240 Returns an iterator of the raw responses from the server.
264 Returns an iterator of the raw responses from the server.
241 """
265 """
242 rsp = self._callstream("batch", cmds=encodebatchcmds(req))
266 rsp = self._callstream("batch", cmds=encodebatchcmds(req))
243 chunk = rsp.read(1024)
267 chunk = rsp.read(1024)
244 work = [chunk]
268 work = [chunk]
245 while chunk:
269 while chunk:
246 while ';' not in chunk and chunk:
270 while ';' not in chunk and chunk:
247 chunk = rsp.read(1024)
271 chunk = rsp.read(1024)
248 work.append(chunk)
272 work.append(chunk)
249 merged = ''.join(work)
273 merged = ''.join(work)
250 while ';' in merged:
274 while ';' in merged:
251 one, merged = merged.split(';', 1)
275 one, merged = merged.split(';', 1)
252 yield unescapearg(one)
276 yield unescapearg(one)
253 chunk = rsp.read(1024)
277 chunk = rsp.read(1024)
254 work = [merged, chunk]
278 work = [merged, chunk]
255 yield unescapearg(''.join(work))
279 yield unescapearg(''.join(work))
256
280
257 def _submitone(self, op, args):
281 def _submitone(self, op, args):
258 return self._call(op, **args)
282 return self._call(op, **args)
259
283
260 def iterbatch(self):
284 def iterbatch(self):
261 return remoteiterbatcher(self)
285 return remoteiterbatcher(self)
262
286
263 @batchable
287 @batchable
264 def lookup(self, key):
288 def lookup(self, key):
265 self.requirecap('lookup', _('look up remote revision'))
289 self.requirecap('lookup', _('look up remote revision'))
266 f = future()
290 f = future()
267 yield {'key': encoding.fromlocal(key)}, f
291 yield {'key': encoding.fromlocal(key)}, f
268 d = f.value
292 d = f.value
269 success, data = d[:-1].split(" ", 1)
293 success, data = d[:-1].split(" ", 1)
270 if int(success):
294 if int(success):
271 yield bin(data)
295 yield bin(data)
272 self._abort(error.RepoError(data))
296 self._abort(error.RepoError(data))
273
297
274 @batchable
298 @batchable
275 def heads(self):
299 def heads(self):
276 f = future()
300 f = future()
277 yield {}, f
301 yield {}, f
278 d = f.value
302 d = f.value
279 try:
303 try:
280 yield decodelist(d[:-1])
304 yield decodelist(d[:-1])
281 except ValueError:
305 except ValueError:
282 self._abort(error.ResponseError(_("unexpected response:"), d))
306 self._abort(error.ResponseError(_("unexpected response:"), d))
283
307
284 @batchable
308 @batchable
285 def known(self, nodes):
309 def known(self, nodes):
286 f = future()
310 f = future()
287 yield {'nodes': encodelist(nodes)}, f
311 yield {'nodes': encodelist(nodes)}, f
288 d = f.value
312 d = f.value
289 try:
313 try:
290 yield [bool(int(b)) for b in d]
314 yield [bool(int(b)) for b in d]
291 except ValueError:
315 except ValueError:
292 self._abort(error.ResponseError(_("unexpected response:"), d))
316 self._abort(error.ResponseError(_("unexpected response:"), d))
293
317
294 @batchable
318 @batchable
295 def branchmap(self):
319 def branchmap(self):
296 f = future()
320 f = future()
297 yield {}, f
321 yield {}, f
298 d = f.value
322 d = f.value
299 try:
323 try:
300 branchmap = {}
324 branchmap = {}
301 for branchpart in d.splitlines():
325 for branchpart in d.splitlines():
302 branchname, branchheads = branchpart.split(' ', 1)
326 branchname, branchheads = branchpart.split(' ', 1)
303 branchname = encoding.tolocal(urlreq.unquote(branchname))
327 branchname = encoding.tolocal(urlreq.unquote(branchname))
304 branchheads = decodelist(branchheads)
328 branchheads = decodelist(branchheads)
305 branchmap[branchname] = branchheads
329 branchmap[branchname] = branchheads
306 yield branchmap
330 yield branchmap
307 except TypeError:
331 except TypeError:
308 self._abort(error.ResponseError(_("unexpected response:"), d))
332 self._abort(error.ResponseError(_("unexpected response:"), d))
309
333
310 def branches(self, nodes):
334 def branches(self, nodes):
311 n = encodelist(nodes)
335 n = encodelist(nodes)
312 d = self._call("branches", nodes=n)
336 d = self._call("branches", nodes=n)
313 try:
337 try:
314 br = [tuple(decodelist(b)) for b in d.splitlines()]
338 br = [tuple(decodelist(b)) for b in d.splitlines()]
315 return br
339 return br
316 except ValueError:
340 except ValueError:
317 self._abort(error.ResponseError(_("unexpected response:"), d))
341 self._abort(error.ResponseError(_("unexpected response:"), d))
318
342
319 def between(self, pairs):
343 def between(self, pairs):
320 batch = 8 # avoid giant requests
344 batch = 8 # avoid giant requests
321 r = []
345 r = []
322 for i in xrange(0, len(pairs), batch):
346 for i in xrange(0, len(pairs), batch):
323 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
347 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
324 d = self._call("between", pairs=n)
348 d = self._call("between", pairs=n)
325 try:
349 try:
326 r.extend(l and decodelist(l) or [] for l in d.splitlines())
350 r.extend(l and decodelist(l) or [] for l in d.splitlines())
327 except ValueError:
351 except ValueError:
328 self._abort(error.ResponseError(_("unexpected response:"), d))
352 self._abort(error.ResponseError(_("unexpected response:"), d))
329 return r
353 return r
330
354
331 @batchable
355 @batchable
332 def pushkey(self, namespace, key, old, new):
356 def pushkey(self, namespace, key, old, new):
333 if not self.capable('pushkey'):
357 if not self.capable('pushkey'):
334 yield False, None
358 yield False, None
335 f = future()
359 f = future()
336 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
360 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
337 yield {'namespace': encoding.fromlocal(namespace),
361 yield {'namespace': encoding.fromlocal(namespace),
338 'key': encoding.fromlocal(key),
362 'key': encoding.fromlocal(key),
339 'old': encoding.fromlocal(old),
363 'old': encoding.fromlocal(old),
340 'new': encoding.fromlocal(new)}, f
364 'new': encoding.fromlocal(new)}, f
341 d = f.value
365 d = f.value
342 d, output = d.split('\n', 1)
366 d, output = d.split('\n', 1)
343 try:
367 try:
344 d = bool(int(d))
368 d = bool(int(d))
345 except ValueError:
369 except ValueError:
346 raise error.ResponseError(
370 raise error.ResponseError(
347 _('push failed (unexpected response):'), d)
371 _('push failed (unexpected response):'), d)
348 for l in output.splitlines(True):
372 for l in output.splitlines(True):
349 self.ui.status(_('remote: '), l)
373 self.ui.status(_('remote: '), l)
350 yield d
374 yield d
351
375
352 @batchable
376 @batchable
353 def listkeys(self, namespace):
377 def listkeys(self, namespace):
354 if not self.capable('pushkey'):
378 if not self.capable('pushkey'):
355 yield {}, None
379 yield {}, None
356 f = future()
380 f = future()
357 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
381 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
358 yield {'namespace': encoding.fromlocal(namespace)}, f
382 yield {'namespace': encoding.fromlocal(namespace)}, f
359 d = f.value
383 d = f.value
360 self.ui.debug('received listkey for "%s": %i bytes\n'
384 self.ui.debug('received listkey for "%s": %i bytes\n'
361 % (namespace, len(d)))
385 % (namespace, len(d)))
362 yield pushkeymod.decodekeys(d)
386 yield pushkeymod.decodekeys(d)
363
387
364 def stream_out(self):
388 def stream_out(self):
365 return self._callstream('stream_out')
389 return self._callstream('stream_out')
366
390
367 def changegroup(self, nodes, kind):
391 def changegroup(self, nodes, kind):
368 n = encodelist(nodes)
392 n = encodelist(nodes)
369 f = self._callcompressable("changegroup", roots=n)
393 f = self._callcompressable("changegroup", roots=n)
370 return changegroupmod.cg1unpacker(f, 'UN')
394 return changegroupmod.cg1unpacker(f, 'UN')
371
395
372 def changegroupsubset(self, bases, heads, kind):
396 def changegroupsubset(self, bases, heads, kind):
373 self.requirecap('changegroupsubset', _('look up remote changes'))
397 self.requirecap('changegroupsubset', _('look up remote changes'))
374 bases = encodelist(bases)
398 bases = encodelist(bases)
375 heads = encodelist(heads)
399 heads = encodelist(heads)
376 f = self._callcompressable("changegroupsubset",
400 f = self._callcompressable("changegroupsubset",
377 bases=bases, heads=heads)
401 bases=bases, heads=heads)
378 return changegroupmod.cg1unpacker(f, 'UN')
402 return changegroupmod.cg1unpacker(f, 'UN')
379
403
380 def getbundle(self, source, **kwargs):
404 def getbundle(self, source, **kwargs):
381 self.requirecap('getbundle', _('look up remote changes'))
405 self.requirecap('getbundle', _('look up remote changes'))
382 opts = {}
406 opts = {}
383 bundlecaps = kwargs.get('bundlecaps')
407 bundlecaps = kwargs.get('bundlecaps')
384 if bundlecaps is not None:
408 if bundlecaps is not None:
385 kwargs['bundlecaps'] = sorted(bundlecaps)
409 kwargs['bundlecaps'] = sorted(bundlecaps)
386 else:
410 else:
387 bundlecaps = () # kwargs could have it to None
411 bundlecaps = () # kwargs could have it to None
388 for key, value in kwargs.iteritems():
412 for key, value in kwargs.iteritems():
389 if value is None:
413 if value is None:
390 continue
414 continue
391 keytype = gboptsmap.get(key)
415 keytype = gboptsmap.get(key)
392 if keytype is None:
416 if keytype is None:
393 assert False, 'unexpected'
417 assert False, 'unexpected'
394 elif keytype == 'nodes':
418 elif keytype == 'nodes':
395 value = encodelist(value)
419 value = encodelist(value)
396 elif keytype in ('csv', 'scsv'):
420 elif keytype in ('csv', 'scsv'):
397 value = ','.join(value)
421 value = ','.join(value)
398 elif keytype == 'boolean':
422 elif keytype == 'boolean':
399 value = '%i' % bool(value)
423 value = '%i' % bool(value)
400 elif keytype != 'plain':
424 elif keytype != 'plain':
401 raise KeyError('unknown getbundle option type %s'
425 raise KeyError('unknown getbundle option type %s'
402 % keytype)
426 % keytype)
403 opts[key] = value
427 opts[key] = value
404 f = self._callcompressable("getbundle", **opts)
428 f = self._callcompressable("getbundle", **opts)
405 if any((cap.startswith('HG2') for cap in bundlecaps)):
429 if any((cap.startswith('HG2') for cap in bundlecaps)):
406 return bundle2.getunbundler(self.ui, f)
430 return bundle2.getunbundler(self.ui, f)
407 else:
431 else:
408 return changegroupmod.cg1unpacker(f, 'UN')
432 return changegroupmod.cg1unpacker(f, 'UN')
409
433
410 def unbundle(self, cg, heads, url):
434 def unbundle(self, cg, heads, url):
411 '''Send cg (a readable file-like object representing the
435 '''Send cg (a readable file-like object representing the
412 changegroup to push, typically a chunkbuffer object) to the
436 changegroup to push, typically a chunkbuffer object) to the
413 remote server as a bundle.
437 remote server as a bundle.
414
438
415 When pushing a bundle10 stream, return an integer indicating the
439 When pushing a bundle10 stream, return an integer indicating the
416 result of the push (see changegroup.apply()).
440 result of the push (see changegroup.apply()).
417
441
418 When pushing a bundle20 stream, return a bundle20 stream.
442 When pushing a bundle20 stream, return a bundle20 stream.
419
443
420 `url` is the url the client thinks it's pushing to, which is
444 `url` is the url the client thinks it's pushing to, which is
421 visible to hooks.
445 visible to hooks.
422 '''
446 '''
423
447
424 if heads != ['force'] and self.capable('unbundlehash'):
448 if heads != ['force'] and self.capable('unbundlehash'):
425 heads = encodelist(['hashed',
449 heads = encodelist(['hashed',
426 hashlib.sha1(''.join(sorted(heads))).digest()])
450 hashlib.sha1(''.join(sorted(heads))).digest()])
427 else:
451 else:
428 heads = encodelist(heads)
452 heads = encodelist(heads)
429
453
430 if util.safehasattr(cg, 'deltaheader'):
454 if util.safehasattr(cg, 'deltaheader'):
431 # this a bundle10, do the old style call sequence
455 # this a bundle10, do the old style call sequence
432 ret, output = self._callpush("unbundle", cg, heads=heads)
456 ret, output = self._callpush("unbundle", cg, heads=heads)
433 if ret == "":
457 if ret == "":
434 raise error.ResponseError(
458 raise error.ResponseError(
435 _('push failed:'), output)
459 _('push failed:'), output)
436 try:
460 try:
437 ret = int(ret)
461 ret = int(ret)
438 except ValueError:
462 except ValueError:
439 raise error.ResponseError(
463 raise error.ResponseError(
440 _('push failed (unexpected response):'), ret)
464 _('push failed (unexpected response):'), ret)
441
465
442 for l in output.splitlines(True):
466 for l in output.splitlines(True):
443 self.ui.status(_('remote: '), l)
467 self.ui.status(_('remote: '), l)
444 else:
468 else:
445 # bundle2 push. Send a stream, fetch a stream.
469 # bundle2 push. Send a stream, fetch a stream.
446 stream = self._calltwowaystream('unbundle', cg, heads=heads)
470 stream = self._calltwowaystream('unbundle', cg, heads=heads)
447 ret = bundle2.getunbundler(self.ui, stream)
471 ret = bundle2.getunbundler(self.ui, stream)
448 return ret
472 return ret
449
473
450 def debugwireargs(self, one, two, three=None, four=None, five=None):
474 def debugwireargs(self, one, two, three=None, four=None, five=None):
451 # don't pass optional arguments left at their default value
475 # don't pass optional arguments left at their default value
452 opts = {}
476 opts = {}
453 if three is not None:
477 if three is not None:
454 opts['three'] = three
478 opts['three'] = three
455 if four is not None:
479 if four is not None:
456 opts['four'] = four
480 opts['four'] = four
457 return self._call('debugwireargs', one=one, two=two, **opts)
481 return self._call('debugwireargs', one=one, two=two, **opts)
458
482
459 def _call(self, cmd, **args):
483 def _call(self, cmd, **args):
460 """execute <cmd> on the server
484 """execute <cmd> on the server
461
485
462 The command is expected to return a simple string.
486 The command is expected to return a simple string.
463
487
464 returns the server reply as a string."""
488 returns the server reply as a string."""
465 raise NotImplementedError()
489 raise NotImplementedError()
466
490
467 def _callstream(self, cmd, **args):
491 def _callstream(self, cmd, **args):
468 """execute <cmd> on the server
492 """execute <cmd> on the server
469
493
470 The command is expected to return a stream. Note that if the
494 The command is expected to return a stream. Note that if the
471 command doesn't return a stream, _callstream behaves
495 command doesn't return a stream, _callstream behaves
472 differently for ssh and http peers.
496 differently for ssh and http peers.
473
497
474 returns the server reply as a file like object.
498 returns the server reply as a file like object.
475 """
499 """
476 raise NotImplementedError()
500 raise NotImplementedError()
477
501
478 def _callcompressable(self, cmd, **args):
502 def _callcompressable(self, cmd, **args):
479 """execute <cmd> on the server
503 """execute <cmd> on the server
480
504
481 The command is expected to return a stream.
505 The command is expected to return a stream.
482
506
483 The stream may have been compressed in some implementations. This
507 The stream may have been compressed in some implementations. This
484 function takes care of the decompression. This is the only difference
508 function takes care of the decompression. This is the only difference
485 with _callstream.
509 with _callstream.
486
510
487 returns the server reply as a file like object.
511 returns the server reply as a file like object.
488 """
512 """
489 raise NotImplementedError()
513 raise NotImplementedError()
490
514
491 def _callpush(self, cmd, fp, **args):
515 def _callpush(self, cmd, fp, **args):
492 """execute a <cmd> on server
516 """execute a <cmd> on server
493
517
494 The command is expected to be related to a push. Push has a special
518 The command is expected to be related to a push. Push has a special
495 return method.
519 return method.
496
520
497 returns the server reply as a (ret, output) tuple. ret is either
521 returns the server reply as a (ret, output) tuple. ret is either
498 empty (error) or a stringified int.
522 empty (error) or a stringified int.
499 """
523 """
500 raise NotImplementedError()
524 raise NotImplementedError()
501
525
502 def _calltwowaystream(self, cmd, fp, **args):
526 def _calltwowaystream(self, cmd, fp, **args):
503 """execute <cmd> on server
527 """execute <cmd> on server
504
528
505 The command will send a stream to the server and get a stream in reply.
529 The command will send a stream to the server and get a stream in reply.
506 """
530 """
507 raise NotImplementedError()
531 raise NotImplementedError()
508
532
509 def _abort(self, exception):
533 def _abort(self, exception):
510 """clearly abort the wire protocol connection and raise the exception
534 """clearly abort the wire protocol connection and raise the exception
511 """
535 """
512 raise NotImplementedError()
536 raise NotImplementedError()
513
537
514 # server side
538 # server side
515
539
516 # wire protocol command can either return a string or one of these classes.
540 # wire protocol command can either return a string or one of these classes.
517 class streamres(object):
541 class streamres(object):
518 """wireproto reply: binary stream
542 """wireproto reply: binary stream
519
543
520 The call was successful and the result is a stream.
544 The call was successful and the result is a stream.
521
545
522 Accepts either a generator or an object with a ``read(size)`` method.
546 Accepts either a generator or an object with a ``read(size)`` method.
523
547
524 ``v1compressible`` indicates whether this data can be compressed to
548 ``v1compressible`` indicates whether this data can be compressed to
525 "version 1" clients (technically: HTTP peers using
549 "version 1" clients (technically: HTTP peers using
526 application/mercurial-0.1 media type). This flag should NOT be used on
550 application/mercurial-0.1 media type). This flag should NOT be used on
527 new commands because new clients should support a more modern compression
551 new commands because new clients should support a more modern compression
528 mechanism.
552 mechanism.
529 """
553 """
530 def __init__(self, gen=None, reader=None, v1compressible=False):
554 def __init__(self, gen=None, reader=None, v1compressible=False):
531 self.gen = gen
555 self.gen = gen
532 self.reader = reader
556 self.reader = reader
533 self.v1compressible = v1compressible
557 self.v1compressible = v1compressible
534
558
535 class pushres(object):
559 class pushres(object):
536 """wireproto reply: success with simple integer return
560 """wireproto reply: success with simple integer return
537
561
538 The call was successful and returned an integer contained in `self.res`.
562 The call was successful and returned an integer contained in `self.res`.
539 """
563 """
540 def __init__(self, res):
564 def __init__(self, res):
541 self.res = res
565 self.res = res
542
566
543 class pusherr(object):
567 class pusherr(object):
544 """wireproto reply: failure
568 """wireproto reply: failure
545
569
546 The call failed. The `self.res` attribute contains the error message.
570 The call failed. The `self.res` attribute contains the error message.
547 """
571 """
548 def __init__(self, res):
572 def __init__(self, res):
549 self.res = res
573 self.res = res
550
574
551 class ooberror(object):
575 class ooberror(object):
552 """wireproto reply: failure of a batch of operation
576 """wireproto reply: failure of a batch of operation
553
577
554 Something failed during a batch call. The error message is stored in
578 Something failed during a batch call. The error message is stored in
555 `self.message`.
579 `self.message`.
556 """
580 """
557 def __init__(self, message):
581 def __init__(self, message):
558 self.message = message
582 self.message = message
559
583
560 def getdispatchrepo(repo, proto, command):
584 def getdispatchrepo(repo, proto, command):
561 """Obtain the repo used for processing wire protocol commands.
585 """Obtain the repo used for processing wire protocol commands.
562
586
563 The intent of this function is to serve as a monkeypatch point for
587 The intent of this function is to serve as a monkeypatch point for
564 extensions that need commands to operate on different repo views under
588 extensions that need commands to operate on different repo views under
565 specialized circumstances.
589 specialized circumstances.
566 """
590 """
567 return repo.filtered('served')
591 return repo.filtered('served')
568
592
569 def dispatch(repo, proto, command):
593 def dispatch(repo, proto, command):
570 repo = getdispatchrepo(repo, proto, command)
594 repo = getdispatchrepo(repo, proto, command)
571 func, spec = commands[command]
595 func, spec = commands[command]
572 args = proto.getargs(spec)
596 args = proto.getargs(spec)
573 return func(repo, proto, *args)
597 return func(repo, proto, *args)
574
598
575 def options(cmd, keys, others):
599 def options(cmd, keys, others):
576 opts = {}
600 opts = {}
577 for k in keys:
601 for k in keys:
578 if k in others:
602 if k in others:
579 opts[k] = others[k]
603 opts[k] = others[k]
580 del others[k]
604 del others[k]
581 if others:
605 if others:
582 util.stderr.write("warning: %s ignored unexpected arguments %s\n"
606 util.stderr.write("warning: %s ignored unexpected arguments %s\n"
583 % (cmd, ",".join(others)))
607 % (cmd, ",".join(others)))
584 return opts
608 return opts
585
609
586 def bundle1allowed(repo, action):
610 def bundle1allowed(repo, action):
587 """Whether a bundle1 operation is allowed from the server.
611 """Whether a bundle1 operation is allowed from the server.
588
612
589 Priority is:
613 Priority is:
590
614
591 1. server.bundle1gd.<action> (if generaldelta active)
615 1. server.bundle1gd.<action> (if generaldelta active)
592 2. server.bundle1.<action>
616 2. server.bundle1.<action>
593 3. server.bundle1gd (if generaldelta active)
617 3. server.bundle1gd (if generaldelta active)
594 4. server.bundle1
618 4. server.bundle1
595 """
619 """
596 ui = repo.ui
620 ui = repo.ui
597 gd = 'generaldelta' in repo.requirements
621 gd = 'generaldelta' in repo.requirements
598
622
599 if gd:
623 if gd:
600 v = ui.configbool('server', 'bundle1gd.%s' % action, None)
624 v = ui.configbool('server', 'bundle1gd.%s' % action, None)
601 if v is not None:
625 if v is not None:
602 return v
626 return v
603
627
604 v = ui.configbool('server', 'bundle1.%s' % action, None)
628 v = ui.configbool('server', 'bundle1.%s' % action, None)
605 if v is not None:
629 if v is not None:
606 return v
630 return v
607
631
608 if gd:
632 if gd:
609 v = ui.configbool('server', 'bundle1gd')
633 v = ui.configbool('server', 'bundle1gd')
610 if v is not None:
634 if v is not None:
611 return v
635 return v
612
636
613 return ui.configbool('server', 'bundle1')
637 return ui.configbool('server', 'bundle1')
614
638
615 def supportedcompengines(ui, proto, role):
639 def supportedcompengines(ui, proto, role):
616 """Obtain the list of supported compression engines for a request."""
640 """Obtain the list of supported compression engines for a request."""
617 assert role in (util.CLIENTROLE, util.SERVERROLE)
641 assert role in (util.CLIENTROLE, util.SERVERROLE)
618
642
619 compengines = util.compengines.supportedwireengines(role)
643 compengines = util.compengines.supportedwireengines(role)
620
644
621 # Allow config to override default list and ordering.
645 # Allow config to override default list and ordering.
622 if role == util.SERVERROLE:
646 if role == util.SERVERROLE:
623 configengines = ui.configlist('server', 'compressionengines')
647 configengines = ui.configlist('server', 'compressionengines')
624 config = 'server.compressionengines'
648 config = 'server.compressionengines'
625 else:
649 else:
626 # This is currently implemented mainly to facilitate testing. In most
650 # This is currently implemented mainly to facilitate testing. In most
627 # cases, the server should be in charge of choosing a compression engine
651 # cases, the server should be in charge of choosing a compression engine
628 # because a server has the most to lose from a sub-optimal choice. (e.g.
652 # because a server has the most to lose from a sub-optimal choice. (e.g.
629 # CPU DoS due to an expensive engine or a network DoS due to poor
653 # CPU DoS due to an expensive engine or a network DoS due to poor
630 # compression ratio).
654 # compression ratio).
631 configengines = ui.configlist('experimental',
655 configengines = ui.configlist('experimental',
632 'clientcompressionengines')
656 'clientcompressionengines')
633 config = 'experimental.clientcompressionengines'
657 config = 'experimental.clientcompressionengines'
634
658
635 # No explicit config. Filter out the ones that aren't supposed to be
659 # No explicit config. Filter out the ones that aren't supposed to be
636 # advertised and return default ordering.
660 # advertised and return default ordering.
637 if not configengines:
661 if not configengines:
638 attr = 'serverpriority' if role == util.SERVERROLE else 'clientpriority'
662 attr = 'serverpriority' if role == util.SERVERROLE else 'clientpriority'
639 return [e for e in compengines
663 return [e for e in compengines
640 if getattr(e.wireprotosupport(), attr) > 0]
664 if getattr(e.wireprotosupport(), attr) > 0]
641
665
642 # If compression engines are listed in the config, assume there is a good
666 # If compression engines are listed in the config, assume there is a good
643 # reason for it (like server operators wanting to achieve specific
667 # reason for it (like server operators wanting to achieve specific
644 # performance characteristics). So fail fast if the config references
668 # performance characteristics). So fail fast if the config references
645 # unusable compression engines.
669 # unusable compression engines.
646 validnames = set(e.name() for e in compengines)
670 validnames = set(e.name() for e in compengines)
647 invalidnames = set(e for e in configengines if e not in validnames)
671 invalidnames = set(e for e in configengines if e not in validnames)
648 if invalidnames:
672 if invalidnames:
649 raise error.Abort(_('invalid compression engine defined in %s: %s') %
673 raise error.Abort(_('invalid compression engine defined in %s: %s') %
650 (config, ', '.join(sorted(invalidnames))))
674 (config, ', '.join(sorted(invalidnames))))
651
675
652 compengines = [e for e in compengines if e.name() in configengines]
676 compengines = [e for e in compengines if e.name() in configengines]
653 compengines = sorted(compengines,
677 compengines = sorted(compengines,
654 key=lambda e: configengines.index(e.name()))
678 key=lambda e: configengines.index(e.name()))
655
679
656 if not compengines:
680 if not compengines:
657 raise error.Abort(_('%s config option does not specify any known '
681 raise error.Abort(_('%s config option does not specify any known '
658 'compression engines') % config,
682 'compression engines') % config,
659 hint=_('usable compression engines: %s') %
683 hint=_('usable compression engines: %s') %
660 ', '.sorted(validnames))
684 ', '.sorted(validnames))
661
685
662 return compengines
686 return compengines
663
687
664 # list of commands
688 # list of commands
665 commands = {}
689 commands = {}
666
690
667 def wireprotocommand(name, args=''):
691 def wireprotocommand(name, args=''):
668 """decorator for wire protocol command"""
692 """decorator for wire protocol command"""
669 def register(func):
693 def register(func):
670 commands[name] = (func, args)
694 commands[name] = (func, args)
671 return func
695 return func
672 return register
696 return register
673
697
674 @wireprotocommand('batch', 'cmds *')
698 @wireprotocommand('batch', 'cmds *')
675 def batch(repo, proto, cmds, others):
699 def batch(repo, proto, cmds, others):
676 repo = repo.filtered("served")
700 repo = repo.filtered("served")
677 res = []
701 res = []
678 for pair in cmds.split(';'):
702 for pair in cmds.split(';'):
679 op, args = pair.split(' ', 1)
703 op, args = pair.split(' ', 1)
680 vals = {}
704 vals = {}
681 for a in args.split(','):
705 for a in args.split(','):
682 if a:
706 if a:
683 n, v = a.split('=')
707 n, v = a.split('=')
684 vals[unescapearg(n)] = unescapearg(v)
708 vals[unescapearg(n)] = unescapearg(v)
685 func, spec = commands[op]
709 func, spec = commands[op]
686 if spec:
710 if spec:
687 keys = spec.split()
711 keys = spec.split()
688 data = {}
712 data = {}
689 for k in keys:
713 for k in keys:
690 if k == '*':
714 if k == '*':
691 star = {}
715 star = {}
692 for key in vals.keys():
716 for key in vals.keys():
693 if key not in keys:
717 if key not in keys:
694 star[key] = vals[key]
718 star[key] = vals[key]
695 data['*'] = star
719 data['*'] = star
696 else:
720 else:
697 data[k] = vals[k]
721 data[k] = vals[k]
698 result = func(repo, proto, *[data[k] for k in keys])
722 result = func(repo, proto, *[data[k] for k in keys])
699 else:
723 else:
700 result = func(repo, proto)
724 result = func(repo, proto)
701 if isinstance(result, ooberror):
725 if isinstance(result, ooberror):
702 return result
726 return result
703 res.append(escapearg(result))
727 res.append(escapearg(result))
704 return ';'.join(res)
728 return ';'.join(res)
705
729
706 @wireprotocommand('between', 'pairs')
730 @wireprotocommand('between', 'pairs')
707 def between(repo, proto, pairs):
731 def between(repo, proto, pairs):
708 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
732 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
709 r = []
733 r = []
710 for b in repo.between(pairs):
734 for b in repo.between(pairs):
711 r.append(encodelist(b) + "\n")
735 r.append(encodelist(b) + "\n")
712 return "".join(r)
736 return "".join(r)
713
737
714 @wireprotocommand('branchmap')
738 @wireprotocommand('branchmap')
715 def branchmap(repo, proto):
739 def branchmap(repo, proto):
716 branchmap = repo.branchmap()
740 branchmap = repo.branchmap()
717 heads = []
741 heads = []
718 for branch, nodes in branchmap.iteritems():
742 for branch, nodes in branchmap.iteritems():
719 branchname = urlreq.quote(encoding.fromlocal(branch))
743 branchname = urlreq.quote(encoding.fromlocal(branch))
720 branchnodes = encodelist(nodes)
744 branchnodes = encodelist(nodes)
721 heads.append('%s %s' % (branchname, branchnodes))
745 heads.append('%s %s' % (branchname, branchnodes))
722 return '\n'.join(heads)
746 return '\n'.join(heads)
723
747
724 @wireprotocommand('branches', 'nodes')
748 @wireprotocommand('branches', 'nodes')
725 def branches(repo, proto, nodes):
749 def branches(repo, proto, nodes):
726 nodes = decodelist(nodes)
750 nodes = decodelist(nodes)
727 r = []
751 r = []
728 for b in repo.branches(nodes):
752 for b in repo.branches(nodes):
729 r.append(encodelist(b) + "\n")
753 r.append(encodelist(b) + "\n")
730 return "".join(r)
754 return "".join(r)
731
755
732 @wireprotocommand('clonebundles', '')
756 @wireprotocommand('clonebundles', '')
733 def clonebundles(repo, proto):
757 def clonebundles(repo, proto):
734 """Server command for returning info for available bundles to seed clones.
758 """Server command for returning info for available bundles to seed clones.
735
759
736 Clients will parse this response and determine what bundle to fetch.
760 Clients will parse this response and determine what bundle to fetch.
737
761
738 Extensions may wrap this command to filter or dynamically emit data
762 Extensions may wrap this command to filter or dynamically emit data
739 depending on the request. e.g. you could advertise URLs for the closest
763 depending on the request. e.g. you could advertise URLs for the closest
740 data center given the client's IP address.
764 data center given the client's IP address.
741 """
765 """
742 return repo.vfs.tryread('clonebundles.manifest')
766 return repo.vfs.tryread('clonebundles.manifest')
743
767
744 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
768 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
745 'known', 'getbundle', 'unbundlehash', 'batch']
769 'known', 'getbundle', 'unbundlehash', 'batch']
746
770
747 def _capabilities(repo, proto):
771 def _capabilities(repo, proto):
748 """return a list of capabilities for a repo
772 """return a list of capabilities for a repo
749
773
750 This function exists to allow extensions to easily wrap capabilities
774 This function exists to allow extensions to easily wrap capabilities
751 computation
775 computation
752
776
753 - returns a lists: easy to alter
777 - returns a lists: easy to alter
754 - change done here will be propagated to both `capabilities` and `hello`
778 - change done here will be propagated to both `capabilities` and `hello`
755 command without any other action needed.
779 command without any other action needed.
756 """
780 """
757 # copy to prevent modification of the global list
781 # copy to prevent modification of the global list
758 caps = list(wireprotocaps)
782 caps = list(wireprotocaps)
759 if streamclone.allowservergeneration(repo):
783 if streamclone.allowservergeneration(repo):
760 if repo.ui.configbool('server', 'preferuncompressed'):
784 if repo.ui.configbool('server', 'preferuncompressed'):
761 caps.append('stream-preferred')
785 caps.append('stream-preferred')
762 requiredformats = repo.requirements & repo.supportedformats
786 requiredformats = repo.requirements & repo.supportedformats
763 # if our local revlogs are just revlogv1, add 'stream' cap
787 # if our local revlogs are just revlogv1, add 'stream' cap
764 if not requiredformats - {'revlogv1'}:
788 if not requiredformats - {'revlogv1'}:
765 caps.append('stream')
789 caps.append('stream')
766 # otherwise, add 'streamreqs' detailing our local revlog format
790 # otherwise, add 'streamreqs' detailing our local revlog format
767 else:
791 else:
768 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
792 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
769 if repo.ui.configbool('experimental', 'bundle2-advertise'):
793 if repo.ui.configbool('experimental', 'bundle2-advertise'):
770 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
794 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
771 caps.append('bundle2=' + urlreq.quote(capsblob))
795 caps.append('bundle2=' + urlreq.quote(capsblob))
772 caps.append('unbundle=%s' % ','.join(bundle2.bundlepriority))
796 caps.append('unbundle=%s' % ','.join(bundle2.bundlepriority))
773
797
774 if proto.name == 'http':
798 if proto.name == 'http':
775 caps.append('httpheader=%d' %
799 caps.append('httpheader=%d' %
776 repo.ui.configint('server', 'maxhttpheaderlen'))
800 repo.ui.configint('server', 'maxhttpheaderlen'))
777 if repo.ui.configbool('experimental', 'httppostargs'):
801 if repo.ui.configbool('experimental', 'httppostargs'):
778 caps.append('httppostargs')
802 caps.append('httppostargs')
779
803
780 # FUTURE advertise 0.2rx once support is implemented
804 # FUTURE advertise 0.2rx once support is implemented
781 # FUTURE advertise minrx and mintx after consulting config option
805 # FUTURE advertise minrx and mintx after consulting config option
782 caps.append('httpmediatype=0.1rx,0.1tx,0.2tx')
806 caps.append('httpmediatype=0.1rx,0.1tx,0.2tx')
783
807
784 compengines = supportedcompengines(repo.ui, proto, util.SERVERROLE)
808 compengines = supportedcompengines(repo.ui, proto, util.SERVERROLE)
785 if compengines:
809 if compengines:
786 comptypes = ','.join(urlreq.quote(e.wireprotosupport().name)
810 comptypes = ','.join(urlreq.quote(e.wireprotosupport().name)
787 for e in compengines)
811 for e in compengines)
788 caps.append('compression=%s' % comptypes)
812 caps.append('compression=%s' % comptypes)
789
813
790 return caps
814 return caps
791
815
792 # If you are writing an extension and consider wrapping this function. Wrap
816 # If you are writing an extension and consider wrapping this function. Wrap
793 # `_capabilities` instead.
817 # `_capabilities` instead.
794 @wireprotocommand('capabilities')
818 @wireprotocommand('capabilities')
795 def capabilities(repo, proto):
819 def capabilities(repo, proto):
796 return ' '.join(_capabilities(repo, proto))
820 return ' '.join(_capabilities(repo, proto))
797
821
798 @wireprotocommand('changegroup', 'roots')
822 @wireprotocommand('changegroup', 'roots')
799 def changegroup(repo, proto, roots):
823 def changegroup(repo, proto, roots):
800 nodes = decodelist(roots)
824 nodes = decodelist(roots)
801 cg = changegroupmod.changegroup(repo, nodes, 'serve')
825 cg = changegroupmod.changegroup(repo, nodes, 'serve')
802 return streamres(reader=cg, v1compressible=True)
826 return streamres(reader=cg, v1compressible=True)
803
827
804 @wireprotocommand('changegroupsubset', 'bases heads')
828 @wireprotocommand('changegroupsubset', 'bases heads')
805 def changegroupsubset(repo, proto, bases, heads):
829 def changegroupsubset(repo, proto, bases, heads):
806 bases = decodelist(bases)
830 bases = decodelist(bases)
807 heads = decodelist(heads)
831 heads = decodelist(heads)
808 cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve')
832 cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve')
809 return streamres(reader=cg, v1compressible=True)
833 return streamres(reader=cg, v1compressible=True)
810
834
811 @wireprotocommand('debugwireargs', 'one two *')
835 @wireprotocommand('debugwireargs', 'one two *')
812 def debugwireargs(repo, proto, one, two, others):
836 def debugwireargs(repo, proto, one, two, others):
813 # only accept optional args from the known set
837 # only accept optional args from the known set
814 opts = options('debugwireargs', ['three', 'four'], others)
838 opts = options('debugwireargs', ['three', 'four'], others)
815 return repo.debugwireargs(one, two, **opts)
839 return repo.debugwireargs(one, two, **opts)
816
840
817 @wireprotocommand('getbundle', '*')
841 @wireprotocommand('getbundle', '*')
818 def getbundle(repo, proto, others):
842 def getbundle(repo, proto, others):
819 opts = options('getbundle', gboptsmap.keys(), others)
843 opts = options('getbundle', gboptsmap.keys(), others)
820 for k, v in opts.iteritems():
844 for k, v in opts.iteritems():
821 keytype = gboptsmap[k]
845 keytype = gboptsmap[k]
822 if keytype == 'nodes':
846 if keytype == 'nodes':
823 opts[k] = decodelist(v)
847 opts[k] = decodelist(v)
824 elif keytype == 'csv':
848 elif keytype == 'csv':
825 opts[k] = list(v.split(','))
849 opts[k] = list(v.split(','))
826 elif keytype == 'scsv':
850 elif keytype == 'scsv':
827 opts[k] = set(v.split(','))
851 opts[k] = set(v.split(','))
828 elif keytype == 'boolean':
852 elif keytype == 'boolean':
829 # Client should serialize False as '0', which is a non-empty string
853 # Client should serialize False as '0', which is a non-empty string
830 # so it evaluates as a True bool.
854 # so it evaluates as a True bool.
831 if v == '0':
855 if v == '0':
832 opts[k] = False
856 opts[k] = False
833 else:
857 else:
834 opts[k] = bool(v)
858 opts[k] = bool(v)
835 elif keytype != 'plain':
859 elif keytype != 'plain':
836 raise KeyError('unknown getbundle option type %s'
860 raise KeyError('unknown getbundle option type %s'
837 % keytype)
861 % keytype)
838
862
839 if not bundle1allowed(repo, 'pull'):
863 if not bundle1allowed(repo, 'pull'):
840 if not exchange.bundle2requested(opts.get('bundlecaps')):
864 if not exchange.bundle2requested(opts.get('bundlecaps')):
841 if proto.name == 'http':
865 if proto.name == 'http':
842 return ooberror(bundle2required)
866 return ooberror(bundle2required)
843 raise error.Abort(bundle2requiredmain,
867 raise error.Abort(bundle2requiredmain,
844 hint=bundle2requiredhint)
868 hint=bundle2requiredhint)
845
869
846 try:
870 try:
847 if repo.ui.configbool('server', 'disablefullbundle'):
871 if repo.ui.configbool('server', 'disablefullbundle'):
848 # Check to see if this is a full clone.
872 # Check to see if this is a full clone.
849 clheads = set(repo.changelog.heads())
873 clheads = set(repo.changelog.heads())
850 heads = set(opts.get('heads', set()))
874 heads = set(opts.get('heads', set()))
851 common = set(opts.get('common', set()))
875 common = set(opts.get('common', set()))
852 common.discard(nullid)
876 common.discard(nullid)
853 if not common and clheads == heads:
877 if not common and clheads == heads:
854 raise error.Abort(
878 raise error.Abort(
855 _('server has pull-based clones disabled'),
879 _('server has pull-based clones disabled'),
856 hint=_('remove --pull if specified or upgrade Mercurial'))
880 hint=_('remove --pull if specified or upgrade Mercurial'))
857
881
858 chunks = exchange.getbundlechunks(repo, 'serve', **opts)
882 chunks = exchange.getbundlechunks(repo, 'serve', **opts)
859 except error.Abort as exc:
883 except error.Abort as exc:
860 # cleanly forward Abort error to the client
884 # cleanly forward Abort error to the client
861 if not exchange.bundle2requested(opts.get('bundlecaps')):
885 if not exchange.bundle2requested(opts.get('bundlecaps')):
862 if proto.name == 'http':
886 if proto.name == 'http':
863 return ooberror(str(exc) + '\n')
887 return ooberror(str(exc) + '\n')
864 raise # cannot do better for bundle1 + ssh
888 raise # cannot do better for bundle1 + ssh
865 # bundle2 request expect a bundle2 reply
889 # bundle2 request expect a bundle2 reply
866 bundler = bundle2.bundle20(repo.ui)
890 bundler = bundle2.bundle20(repo.ui)
867 manargs = [('message', str(exc))]
891 manargs = [('message', str(exc))]
868 advargs = []
892 advargs = []
869 if exc.hint is not None:
893 if exc.hint is not None:
870 advargs.append(('hint', exc.hint))
894 advargs.append(('hint', exc.hint))
871 bundler.addpart(bundle2.bundlepart('error:abort',
895 bundler.addpart(bundle2.bundlepart('error:abort',
872 manargs, advargs))
896 manargs, advargs))
873 return streamres(gen=bundler.getchunks(), v1compressible=True)
897 return streamres(gen=bundler.getchunks(), v1compressible=True)
874 return streamres(gen=chunks, v1compressible=True)
898 return streamres(gen=chunks, v1compressible=True)
875
899
876 @wireprotocommand('heads')
900 @wireprotocommand('heads')
877 def heads(repo, proto):
901 def heads(repo, proto):
878 h = repo.heads()
902 h = repo.heads()
879 return encodelist(h) + "\n"
903 return encodelist(h) + "\n"
880
904
881 @wireprotocommand('hello')
905 @wireprotocommand('hello')
882 def hello(repo, proto):
906 def hello(repo, proto):
883 '''the hello command returns a set of lines describing various
907 '''the hello command returns a set of lines describing various
884 interesting things about the server, in an RFC822-like format.
908 interesting things about the server, in an RFC822-like format.
885 Currently the only one defined is "capabilities", which
909 Currently the only one defined is "capabilities", which
886 consists of a line in the form:
910 consists of a line in the form:
887
911
888 capabilities: space separated list of tokens
912 capabilities: space separated list of tokens
889 '''
913 '''
890 return "capabilities: %s\n" % (capabilities(repo, proto))
914 return "capabilities: %s\n" % (capabilities(repo, proto))
891
915
892 @wireprotocommand('listkeys', 'namespace')
916 @wireprotocommand('listkeys', 'namespace')
893 def listkeys(repo, proto, namespace):
917 def listkeys(repo, proto, namespace):
894 d = repo.listkeys(encoding.tolocal(namespace)).items()
918 d = repo.listkeys(encoding.tolocal(namespace)).items()
895 return pushkeymod.encodekeys(d)
919 return pushkeymod.encodekeys(d)
896
920
897 @wireprotocommand('lookup', 'key')
921 @wireprotocommand('lookup', 'key')
898 def lookup(repo, proto, key):
922 def lookup(repo, proto, key):
899 try:
923 try:
900 k = encoding.tolocal(key)
924 k = encoding.tolocal(key)
901 c = repo[k]
925 c = repo[k]
902 r = c.hex()
926 r = c.hex()
903 success = 1
927 success = 1
904 except Exception as inst:
928 except Exception as inst:
905 r = str(inst)
929 r = str(inst)
906 success = 0
930 success = 0
907 return "%s %s\n" % (success, r)
931 return "%s %s\n" % (success, r)
908
932
909 @wireprotocommand('known', 'nodes *')
933 @wireprotocommand('known', 'nodes *')
910 def known(repo, proto, nodes, others):
934 def known(repo, proto, nodes, others):
911 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
935 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
912
936
913 @wireprotocommand('pushkey', 'namespace key old new')
937 @wireprotocommand('pushkey', 'namespace key old new')
914 def pushkey(repo, proto, namespace, key, old, new):
938 def pushkey(repo, proto, namespace, key, old, new):
915 # compatibility with pre-1.8 clients which were accidentally
939 # compatibility with pre-1.8 clients which were accidentally
916 # sending raw binary nodes rather than utf-8-encoded hex
940 # sending raw binary nodes rather than utf-8-encoded hex
917 if len(new) == 20 and util.escapestr(new) != new:
941 if len(new) == 20 and util.escapestr(new) != new:
918 # looks like it could be a binary node
942 # looks like it could be a binary node
919 try:
943 try:
920 new.decode('utf-8')
944 new.decode('utf-8')
921 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
945 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
922 except UnicodeDecodeError:
946 except UnicodeDecodeError:
923 pass # binary, leave unmodified
947 pass # binary, leave unmodified
924 else:
948 else:
925 new = encoding.tolocal(new) # normal path
949 new = encoding.tolocal(new) # normal path
926
950
927 if util.safehasattr(proto, 'restore'):
951 if util.safehasattr(proto, 'restore'):
928
952
929 proto.redirect()
953 proto.redirect()
930
954
931 try:
955 try:
932 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
956 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
933 encoding.tolocal(old), new) or False
957 encoding.tolocal(old), new) or False
934 except error.Abort:
958 except error.Abort:
935 r = False
959 r = False
936
960
937 output = proto.restore()
961 output = proto.restore()
938
962
939 return '%s\n%s' % (int(r), output)
963 return '%s\n%s' % (int(r), output)
940
964
941 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
965 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
942 encoding.tolocal(old), new)
966 encoding.tolocal(old), new)
943 return '%s\n' % int(r)
967 return '%s\n' % int(r)
944
968
945 @wireprotocommand('stream_out')
969 @wireprotocommand('stream_out')
946 def stream(repo, proto):
970 def stream(repo, proto):
947 '''If the server supports streaming clone, it advertises the "stream"
971 '''If the server supports streaming clone, it advertises the "stream"
948 capability with a value representing the version and flags of the repo
972 capability with a value representing the version and flags of the repo
949 it is serving. Client checks to see if it understands the format.
973 it is serving. Client checks to see if it understands the format.
950 '''
974 '''
951 if not streamclone.allowservergeneration(repo):
975 if not streamclone.allowservergeneration(repo):
952 return '1\n'
976 return '1\n'
953
977
954 def getstream(it):
978 def getstream(it):
955 yield '0\n'
979 yield '0\n'
956 for chunk in it:
980 for chunk in it:
957 yield chunk
981 yield chunk
958
982
959 try:
983 try:
960 # LockError may be raised before the first result is yielded. Don't
984 # LockError may be raised before the first result is yielded. Don't
961 # emit output until we're sure we got the lock successfully.
985 # emit output until we're sure we got the lock successfully.
962 it = streamclone.generatev1wireproto(repo)
986 it = streamclone.generatev1wireproto(repo)
963 return streamres(gen=getstream(it))
987 return streamres(gen=getstream(it))
964 except error.LockError:
988 except error.LockError:
965 return '2\n'
989 return '2\n'
966
990
967 @wireprotocommand('unbundle', 'heads')
991 @wireprotocommand('unbundle', 'heads')
968 def unbundle(repo, proto, heads):
992 def unbundle(repo, proto, heads):
969 their_heads = decodelist(heads)
993 their_heads = decodelist(heads)
970
994
971 try:
995 try:
972 proto.redirect()
996 proto.redirect()
973
997
974 exchange.check_heads(repo, their_heads, 'preparing changes')
998 exchange.check_heads(repo, their_heads, 'preparing changes')
975
999
976 # write bundle data to temporary file because it can be big
1000 # write bundle data to temporary file because it can be big
977 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
1001 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
978 fp = os.fdopen(fd, pycompat.sysstr('wb+'))
1002 fp = os.fdopen(fd, pycompat.sysstr('wb+'))
979 r = 0
1003 r = 0
980 try:
1004 try:
981 proto.getfile(fp)
1005 proto.getfile(fp)
982 fp.seek(0)
1006 fp.seek(0)
983 gen = exchange.readbundle(repo.ui, fp, None)
1007 gen = exchange.readbundle(repo.ui, fp, None)
984 if (isinstance(gen, changegroupmod.cg1unpacker)
1008 if (isinstance(gen, changegroupmod.cg1unpacker)
985 and not bundle1allowed(repo, 'push')):
1009 and not bundle1allowed(repo, 'push')):
986 if proto.name == 'http':
1010 if proto.name == 'http':
987 # need to special case http because stderr do not get to
1011 # need to special case http because stderr do not get to
988 # the http client on failed push so we need to abuse some
1012 # the http client on failed push so we need to abuse some
989 # other error type to make sure the message get to the
1013 # other error type to make sure the message get to the
990 # user.
1014 # user.
991 return ooberror(bundle2required)
1015 return ooberror(bundle2required)
992 raise error.Abort(bundle2requiredmain,
1016 raise error.Abort(bundle2requiredmain,
993 hint=bundle2requiredhint)
1017 hint=bundle2requiredhint)
994
1018
995 r = exchange.unbundle(repo, gen, their_heads, 'serve',
1019 r = exchange.unbundle(repo, gen, their_heads, 'serve',
996 proto._client())
1020 proto._client())
997 if util.safehasattr(r, 'addpart'):
1021 if util.safehasattr(r, 'addpart'):
998 # The return looks streamable, we are in the bundle2 case and
1022 # The return looks streamable, we are in the bundle2 case and
999 # should return a stream.
1023 # should return a stream.
1000 return streamres(gen=r.getchunks())
1024 return streamres(gen=r.getchunks())
1001 return pushres(r)
1025 return pushres(r)
1002
1026
1003 finally:
1027 finally:
1004 fp.close()
1028 fp.close()
1005 os.unlink(tempname)
1029 os.unlink(tempname)
1006
1030
1007 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
1031 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
1008 # handle non-bundle2 case first
1032 # handle non-bundle2 case first
1009 if not getattr(exc, 'duringunbundle2', False):
1033 if not getattr(exc, 'duringunbundle2', False):
1010 try:
1034 try:
1011 raise
1035 raise
1012 except error.Abort:
1036 except error.Abort:
1013 # The old code we moved used util.stderr directly.
1037 # The old code we moved used util.stderr directly.
1014 # We did not change it to minimise code change.
1038 # We did not change it to minimise code change.
1015 # This need to be moved to something proper.
1039 # This need to be moved to something proper.
1016 # Feel free to do it.
1040 # Feel free to do it.
1017 util.stderr.write("abort: %s\n" % exc)
1041 util.stderr.write("abort: %s\n" % exc)
1018 if exc.hint is not None:
1042 if exc.hint is not None:
1019 util.stderr.write("(%s)\n" % exc.hint)
1043 util.stderr.write("(%s)\n" % exc.hint)
1020 return pushres(0)
1044 return pushres(0)
1021 except error.PushRaced:
1045 except error.PushRaced:
1022 return pusherr(str(exc))
1046 return pusherr(str(exc))
1023
1047
1024 bundler = bundle2.bundle20(repo.ui)
1048 bundler = bundle2.bundle20(repo.ui)
1025 for out in getattr(exc, '_bundle2salvagedoutput', ()):
1049 for out in getattr(exc, '_bundle2salvagedoutput', ()):
1026 bundler.addpart(out)
1050 bundler.addpart(out)
1027 try:
1051 try:
1028 try:
1052 try:
1029 raise
1053 raise
1030 except error.PushkeyFailed as exc:
1054 except error.PushkeyFailed as exc:
1031 # check client caps
1055 # check client caps
1032 remotecaps = getattr(exc, '_replycaps', None)
1056 remotecaps = getattr(exc, '_replycaps', None)
1033 if (remotecaps is not None
1057 if (remotecaps is not None
1034 and 'pushkey' not in remotecaps.get('error', ())):
1058 and 'pushkey' not in remotecaps.get('error', ())):
1035 # no support remote side, fallback to Abort handler.
1059 # no support remote side, fallback to Abort handler.
1036 raise
1060 raise
1037 part = bundler.newpart('error:pushkey')
1061 part = bundler.newpart('error:pushkey')
1038 part.addparam('in-reply-to', exc.partid)
1062 part.addparam('in-reply-to', exc.partid)
1039 if exc.namespace is not None:
1063 if exc.namespace is not None:
1040 part.addparam('namespace', exc.namespace, mandatory=False)
1064 part.addparam('namespace', exc.namespace, mandatory=False)
1041 if exc.key is not None:
1065 if exc.key is not None:
1042 part.addparam('key', exc.key, mandatory=False)
1066 part.addparam('key', exc.key, mandatory=False)
1043 if exc.new is not None:
1067 if exc.new is not None:
1044 part.addparam('new', exc.new, mandatory=False)
1068 part.addparam('new', exc.new, mandatory=False)
1045 if exc.old is not None:
1069 if exc.old is not None:
1046 part.addparam('old', exc.old, mandatory=False)
1070 part.addparam('old', exc.old, mandatory=False)
1047 if exc.ret is not None:
1071 if exc.ret is not None:
1048 part.addparam('ret', exc.ret, mandatory=False)
1072 part.addparam('ret', exc.ret, mandatory=False)
1049 except error.BundleValueError as exc:
1073 except error.BundleValueError as exc:
1050 errpart = bundler.newpart('error:unsupportedcontent')
1074 errpart = bundler.newpart('error:unsupportedcontent')
1051 if exc.parttype is not None:
1075 if exc.parttype is not None:
1052 errpart.addparam('parttype', exc.parttype)
1076 errpart.addparam('parttype', exc.parttype)
1053 if exc.params:
1077 if exc.params:
1054 errpart.addparam('params', '\0'.join(exc.params))
1078 errpart.addparam('params', '\0'.join(exc.params))
1055 except error.Abort as exc:
1079 except error.Abort as exc:
1056 manargs = [('message', str(exc))]
1080 manargs = [('message', str(exc))]
1057 advargs = []
1081 advargs = []
1058 if exc.hint is not None:
1082 if exc.hint is not None:
1059 advargs.append(('hint', exc.hint))
1083 advargs.append(('hint', exc.hint))
1060 bundler.addpart(bundle2.bundlepart('error:abort',
1084 bundler.addpart(bundle2.bundlepart('error:abort',
1061 manargs, advargs))
1085 manargs, advargs))
1062 except error.PushRaced as exc:
1086 except error.PushRaced as exc:
1063 bundler.newpart('error:pushraced', [('message', str(exc))])
1087 bundler.newpart('error:pushraced', [('message', str(exc))])
1064 return streamres(gen=bundler.getchunks())
1088 return streamres(gen=bundler.getchunks())
@@ -1,176 +1,206 b''
1 # test-batching.py - tests for transparent command batching
1 # test-batching.py - tests for transparent command batching
2 #
2 #
3 # Copyright 2011 Peter Arrenbrecht <peter@arrenbrecht.ch>
3 # Copyright 2011 Peter Arrenbrecht <peter@arrenbrecht.ch>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import, print_function
8 from __future__ import absolute_import, print_function
9
9
10 from mercurial import (
10 from mercurial import (
11 error,
11 peer,
12 peer,
13 util,
12 wireproto,
14 wireproto,
13 )
15 )
14
16
15 # equivalent of repo.repository
17 # equivalent of repo.repository
16 class thing(object):
18 class thing(object):
17 def hello(self):
19 def hello(self):
18 return "Ready."
20 return "Ready."
19
21
20 # equivalent of localrepo.localrepository
22 # equivalent of localrepo.localrepository
21 class localthing(thing):
23 class localthing(thing):
22 def foo(self, one, two=None):
24 def foo(self, one, two=None):
23 if one:
25 if one:
24 return "%s and %s" % (one, two,)
26 return "%s and %s" % (one, two,)
25 return "Nope"
27 return "Nope"
26 def bar(self, b, a):
28 def bar(self, b, a):
27 return "%s und %s" % (b, a,)
29 return "%s und %s" % (b, a,)
28 def greet(self, name=None):
30 def greet(self, name=None):
29 return "Hello, %s" % name
31 return "Hello, %s" % name
30 def batch(self):
32 def batchiter(self):
31 '''Support for local batching.'''
33 '''Support for local batching.'''
32 return peer.localbatch(self)
34 return peer.localiterbatcher(self)
33
35
34 # usage of "thing" interface
36 # usage of "thing" interface
35 def use(it):
37 def use(it):
36
38
37 # Direct call to base method shared between client and server.
39 # Direct call to base method shared between client and server.
38 print(it.hello())
40 print(it.hello())
39
41
40 # Direct calls to proxied methods. They cause individual roundtrips.
42 # Direct calls to proxied methods. They cause individual roundtrips.
41 print(it.foo("Un", two="Deux"))
43 print(it.foo("Un", two="Deux"))
42 print(it.bar("Eins", "Zwei"))
44 print(it.bar("Eins", "Zwei"))
43
45
44 # Batched call to a couple of (possibly proxied) methods.
46 # Batched call to a couple of proxied methods.
45 batch = it.batch()
47 batch = it.batchiter()
46 # The calls return futures to eventually hold results.
48 # The calls return futures to eventually hold results.
47 foo = batch.foo(one="One", two="Two")
49 foo = batch.foo(one="One", two="Two")
48 bar = batch.bar("Eins", "Zwei")
50 bar = batch.bar("Eins", "Zwei")
49 # We can call non-batchable proxy methods, but the break the current batch
50 # request and cause additional roundtrips.
51 greet = batch.greet(name="John Smith")
52 # We can also add local methods into the mix, but they break the batch too.
53 hello = batch.hello()
54 bar2 = batch.bar(b="Uno", a="Due")
51 bar2 = batch.bar(b="Uno", a="Due")
55 # Only now are all the calls executed in sequence, with as few roundtrips
52
56 # as possible.
53 # Future shouldn't be set until we submit().
54 assert isinstance(foo, peer.future)
55 assert not util.safehasattr(foo, 'value')
56 assert not util.safehasattr(bar, 'value')
57 batch.submit()
57 batch.submit()
58 # After the call to submit, the futures actually contain values.
58 # Call results() to obtain results as a generator.
59 results = batch.results()
60
61 # Future results shouldn't be set until we consume a value.
62 assert not util.safehasattr(foo, 'value')
63 foovalue = next(results)
64 assert util.safehasattr(foo, 'value')
65 assert foovalue == foo.value
59 print(foo.value)
66 print(foo.value)
67 next(results)
60 print(bar.value)
68 print(bar.value)
61 print(greet.value)
69 next(results)
62 print(hello.value)
63 print(bar2.value)
70 print(bar2.value)
64
71
72 # We should be at the end of the results generator.
73 try:
74 next(results)
75 except StopIteration:
76 print('proper end of results generator')
77 else:
78 print('extra emitted element!')
79
80 # Attempting to call a non-batchable method inside a batch fails.
81 batch = it.batchiter()
82 try:
83 batch.greet(name='John Smith')
84 except error.ProgrammingError as e:
85 print(e)
86
87 # Attempting to call a local method inside a batch fails.
88 batch = it.batchiter()
89 try:
90 batch.hello()
91 except error.ProgrammingError as e:
92 print(e)
93
65 # local usage
94 # local usage
66 mylocal = localthing()
95 mylocal = localthing()
67 print()
96 print()
68 print("== Local")
97 print("== Local")
69 use(mylocal)
98 use(mylocal)
70
99
71 # demo remoting; mimicks what wireproto and HTTP/SSH do
100 # demo remoting; mimicks what wireproto and HTTP/SSH do
72
101
73 # shared
102 # shared
74
103
75 def escapearg(plain):
104 def escapearg(plain):
76 return (plain
105 return (plain
77 .replace(':', '::')
106 .replace(':', '::')
78 .replace(',', ':,')
107 .replace(',', ':,')
79 .replace(';', ':;')
108 .replace(';', ':;')
80 .replace('=', ':='))
109 .replace('=', ':='))
81 def unescapearg(escaped):
110 def unescapearg(escaped):
82 return (escaped
111 return (escaped
83 .replace(':=', '=')
112 .replace(':=', '=')
84 .replace(':;', ';')
113 .replace(':;', ';')
85 .replace(':,', ',')
114 .replace(':,', ',')
86 .replace('::', ':'))
115 .replace('::', ':'))
87
116
88 # server side
117 # server side
89
118
90 # equivalent of wireproto's global functions
119 # equivalent of wireproto's global functions
91 class server(object):
120 class server(object):
92 def __init__(self, local):
121 def __init__(self, local):
93 self.local = local
122 self.local = local
94 def _call(self, name, args):
123 def _call(self, name, args):
95 args = dict(arg.split('=', 1) for arg in args)
124 args = dict(arg.split('=', 1) for arg in args)
96 return getattr(self, name)(**args)
125 return getattr(self, name)(**args)
97 def perform(self, req):
126 def perform(self, req):
98 print("REQ:", req)
127 print("REQ:", req)
99 name, args = req.split('?', 1)
128 name, args = req.split('?', 1)
100 args = args.split('&')
129 args = args.split('&')
101 vals = dict(arg.split('=', 1) for arg in args)
130 vals = dict(arg.split('=', 1) for arg in args)
102 res = getattr(self, name)(**vals)
131 res = getattr(self, name)(**vals)
103 print(" ->", res)
132 print(" ->", res)
104 return res
133 return res
105 def batch(self, cmds):
134 def batch(self, cmds):
106 res = []
135 res = []
107 for pair in cmds.split(';'):
136 for pair in cmds.split(';'):
108 name, args = pair.split(':', 1)
137 name, args = pair.split(':', 1)
109 vals = {}
138 vals = {}
110 for a in args.split(','):
139 for a in args.split(','):
111 if a:
140 if a:
112 n, v = a.split('=')
141 n, v = a.split('=')
113 vals[n] = unescapearg(v)
142 vals[n] = unescapearg(v)
114 res.append(escapearg(getattr(self, name)(**vals)))
143 res.append(escapearg(getattr(self, name)(**vals)))
115 return ';'.join(res)
144 return ';'.join(res)
116 def foo(self, one, two):
145 def foo(self, one, two):
117 return mangle(self.local.foo(unmangle(one), unmangle(two)))
146 return mangle(self.local.foo(unmangle(one), unmangle(two)))
118 def bar(self, b, a):
147 def bar(self, b, a):
119 return mangle(self.local.bar(unmangle(b), unmangle(a)))
148 return mangle(self.local.bar(unmangle(b), unmangle(a)))
120 def greet(self, name):
149 def greet(self, name):
121 return mangle(self.local.greet(unmangle(name)))
150 return mangle(self.local.greet(unmangle(name)))
122 myserver = server(mylocal)
151 myserver = server(mylocal)
123
152
124 # local side
153 # local side
125
154
126 # equivalent of wireproto.encode/decodelist, that is, type-specific marshalling
155 # equivalent of wireproto.encode/decodelist, that is, type-specific marshalling
127 # here we just transform the strings a bit to check we're properly en-/decoding
156 # here we just transform the strings a bit to check we're properly en-/decoding
128 def mangle(s):
157 def mangle(s):
129 return ''.join(chr(ord(c) + 1) for c in s)
158 return ''.join(chr(ord(c) + 1) for c in s)
130 def unmangle(s):
159 def unmangle(s):
131 return ''.join(chr(ord(c) - 1) for c in s)
160 return ''.join(chr(ord(c) - 1) for c in s)
132
161
133 # equivalent of wireproto.wirerepository and something like http's wire format
162 # equivalent of wireproto.wirerepository and something like http's wire format
134 class remotething(thing):
163 class remotething(thing):
135 def __init__(self, server):
164 def __init__(self, server):
136 self.server = server
165 self.server = server
137 def _submitone(self, name, args):
166 def _submitone(self, name, args):
138 req = name + '?' + '&'.join(['%s=%s' % (n, v) for n, v in args])
167 req = name + '?' + '&'.join(['%s=%s' % (n, v) for n, v in args])
139 return self.server.perform(req)
168 return self.server.perform(req)
140 def _submitbatch(self, cmds):
169 def _submitbatch(self, cmds):
141 req = []
170 req = []
142 for name, args in cmds:
171 for name, args in cmds:
143 args = ','.join(n + '=' + escapearg(v) for n, v in args)
172 args = ','.join(n + '=' + escapearg(v) for n, v in args)
144 req.append(name + ':' + args)
173 req.append(name + ':' + args)
145 req = ';'.join(req)
174 req = ';'.join(req)
146 res = self._submitone('batch', [('cmds', req,)])
175 res = self._submitone('batch', [('cmds', req,)])
147 return res.split(';')
176 for r in res.split(';'):
177 yield r
148
178
149 def batch(self):
179 def batchiter(self):
150 return wireproto.remotebatch(self)
180 return wireproto.remoteiterbatcher(self)
151
181
152 @peer.batchable
182 @peer.batchable
153 def foo(self, one, two=None):
183 def foo(self, one, two=None):
154 encargs = [('one', mangle(one),), ('two', mangle(two),)]
184 encargs = [('one', mangle(one),), ('two', mangle(two),)]
155 encresref = peer.future()
185 encresref = peer.future()
156 yield encargs, encresref
186 yield encargs, encresref
157 yield unmangle(encresref.value)
187 yield unmangle(encresref.value)
158
188
159 @peer.batchable
189 @peer.batchable
160 def bar(self, b, a):
190 def bar(self, b, a):
161 encresref = peer.future()
191 encresref = peer.future()
162 yield [('b', mangle(b),), ('a', mangle(a),)], encresref
192 yield [('b', mangle(b),), ('a', mangle(a),)], encresref
163 yield unmangle(encresref.value)
193 yield unmangle(encresref.value)
164
194
165 # greet is coded directly. It therefore does not support batching. If it
195 # greet is coded directly. It therefore does not support batching. If it
166 # does appear in a batch, the batch is split around greet, and the call to
196 # does appear in a batch, the batch is split around greet, and the call to
167 # greet is done in its own roundtrip.
197 # greet is done in its own roundtrip.
168 def greet(self, name=None):
198 def greet(self, name=None):
169 return unmangle(self._submitone('greet', [('name', mangle(name),)]))
199 return unmangle(self._submitone('greet', [('name', mangle(name),)]))
170
200
171 # demo remote usage
201 # demo remote usage
172
202
173 myproxy = remotething(myserver)
203 myproxy = remotething(myserver)
174 print()
204 print()
175 print("== Remote")
205 print("== Remote")
176 use(myproxy)
206 use(myproxy)
@@ -1,30 +1,26 b''
1
1
2 == Local
2 == Local
3 Ready.
3 Ready.
4 Un and Deux
4 Un and Deux
5 Eins und Zwei
5 Eins und Zwei
6 One and Two
6 One and Two
7 Eins und Zwei
7 Eins und Zwei
8 Hello, John Smith
9 Ready.
10 Uno und Due
8 Uno und Due
9 proper end of results generator
11
10
12 == Remote
11 == Remote
13 Ready.
12 Ready.
14 REQ: foo?one=Vo&two=Efvy
13 REQ: foo?one=Vo&two=Efvy
15 -> Vo!boe!Efvy
14 -> Vo!boe!Efvy
16 Un and Deux
15 Un and Deux
17 REQ: bar?b=Fjot&a=[xfj
16 REQ: bar?b=Fjot&a=[xfj
18 -> Fjot!voe![xfj
17 -> Fjot!voe![xfj
19 Eins und Zwei
18 Eins und Zwei
20 REQ: batch?cmds=foo:one=Pof,two=Uxp;bar:b=Fjot,a=[xfj
19 REQ: batch?cmds=foo:one=Pof,two=Uxp;bar:b=Fjot,a=[xfj;bar:b=Vop,a=Evf
21 -> Pof!boe!Uxp;Fjot!voe![xfj
20 -> Pof!boe!Uxp;Fjot!voe![xfj;Vop!voe!Evf
22 REQ: greet?name=Kpio!Tnjui
23 -> Ifmmp-!Kpio!Tnjui
24 REQ: batch?cmds=bar:b=Vop,a=Evf
25 -> Vop!voe!Evf
26 One and Two
21 One and Two
27 Eins und Zwei
22 Eins und Zwei
28 Hello, John Smith
29 Ready.
30 Uno und Due
23 Uno und Due
24 proper end of results generator
25 Attempted to batch a non-batchable call to 'greet'
26 Attempted to batch a non-batchable call to 'hello'
General Comments 0
You need to be logged in to leave comments. Login now