##// END OF EJS Templates
peer: remove non iterating batcher (API)...
Gregory Szorc -
r33762:b47fe973 default
parent child Browse files
Show More
@@ -1,153 +1,140 b''
1 # peer.py - repository base classes for mercurial
1 # peer.py - repository base classes for mercurial
2 #
2 #
3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
4 # Copyright 2006 Vadim Gelfer <vadim.gelfer@gmail.com>
4 # Copyright 2006 Vadim Gelfer <vadim.gelfer@gmail.com>
5 #
5 #
6 # This software may be used and distributed according to the terms of the
6 # This software may be used and distributed according to the terms of the
7 # GNU General Public License version 2 or any later version.
7 # GNU General Public License version 2 or any later version.
8
8
9 from __future__ import absolute_import
9 from __future__ import absolute_import
10
10
11 from .i18n import _
11 from .i18n import _
12 from . import (
12 from . import (
13 error,
13 error,
14 util,
14 util,
15 )
15 )
16
16
17 # abstract batching support
17 # abstract batching support
18
18
19 class future(object):
19 class future(object):
20 '''placeholder for a value to be set later'''
20 '''placeholder for a value to be set later'''
21 def set(self, value):
21 def set(self, value):
22 if util.safehasattr(self, 'value'):
22 if util.safehasattr(self, 'value'):
23 raise error.RepoError("future is already set")
23 raise error.RepoError("future is already set")
24 self.value = value
24 self.value = value
25
25
26 class batcher(object):
26 class batcher(object):
27 '''base class for batches of commands submittable in a single request
27 '''base class for batches of commands submittable in a single request
28
28
29 All methods invoked on instances of this class are simply queued and
29 All methods invoked on instances of this class are simply queued and
30 return a a future for the result. Once you call submit(), all the queued
30 return a a future for the result. Once you call submit(), all the queued
31 calls are performed and the results set in their respective futures.
31 calls are performed and the results set in their respective futures.
32 '''
32 '''
33 def __init__(self):
33 def __init__(self):
34 self.calls = []
34 self.calls = []
35 def __getattr__(self, name):
35 def __getattr__(self, name):
36 def call(*args, **opts):
36 def call(*args, **opts):
37 resref = future()
37 resref = future()
38 self.calls.append((name, args, opts, resref,))
38 self.calls.append((name, args, opts, resref,))
39 return resref
39 return resref
40 return call
40 return call
41 def submit(self):
41 def submit(self):
42 raise NotImplementedError()
42 raise NotImplementedError()
43
43
44 class iterbatcher(batcher):
44 class iterbatcher(batcher):
45
45
46 def submit(self):
46 def submit(self):
47 raise NotImplementedError()
47 raise NotImplementedError()
48
48
49 def results(self):
49 def results(self):
50 raise NotImplementedError()
50 raise NotImplementedError()
51
51
52 class localbatch(batcher):
53 '''performs the queued calls directly'''
54 def __init__(self, local):
55 batcher.__init__(self)
56 self.local = local
57 def submit(self):
58 for name, args, opts, resref in self.calls:
59 resref.set(getattr(self.local, name)(*args, **opts))
60
61 class localiterbatcher(iterbatcher):
52 class localiterbatcher(iterbatcher):
62 def __init__(self, local):
53 def __init__(self, local):
63 super(iterbatcher, self).__init__()
54 super(iterbatcher, self).__init__()
64 self.local = local
55 self.local = local
65
56
66 def submit(self):
57 def submit(self):
67 # submit for a local iter batcher is a noop
58 # submit for a local iter batcher is a noop
68 pass
59 pass
69
60
70 def results(self):
61 def results(self):
71 for name, args, opts, resref in self.calls:
62 for name, args, opts, resref in self.calls:
72 resref.set(getattr(self.local, name)(*args, **opts))
63 resref.set(getattr(self.local, name)(*args, **opts))
73 yield resref.value
64 yield resref.value
74
65
75 def batchable(f):
66 def batchable(f):
76 '''annotation for batchable methods
67 '''annotation for batchable methods
77
68
78 Such methods must implement a coroutine as follows:
69 Such methods must implement a coroutine as follows:
79
70
80 @batchable
71 @batchable
81 def sample(self, one, two=None):
72 def sample(self, one, two=None):
82 # Build list of encoded arguments suitable for your wire protocol:
73 # Build list of encoded arguments suitable for your wire protocol:
83 encargs = [('one', encode(one),), ('two', encode(two),)]
74 encargs = [('one', encode(one),), ('two', encode(two),)]
84 # Create future for injection of encoded result:
75 # Create future for injection of encoded result:
85 encresref = future()
76 encresref = future()
86 # Return encoded arguments and future:
77 # Return encoded arguments and future:
87 yield encargs, encresref
78 yield encargs, encresref
88 # Assuming the future to be filled with the result from the batched
79 # Assuming the future to be filled with the result from the batched
89 # request now. Decode it:
80 # request now. Decode it:
90 yield decode(encresref.value)
81 yield decode(encresref.value)
91
82
92 The decorator returns a function which wraps this coroutine as a plain
83 The decorator returns a function which wraps this coroutine as a plain
93 method, but adds the original method as an attribute called "batchable",
84 method, but adds the original method as an attribute called "batchable",
94 which is used by remotebatch to split the call into separate encoding and
85 which is used by remotebatch to split the call into separate encoding and
95 decoding phases.
86 decoding phases.
96 '''
87 '''
97 def plain(*args, **opts):
88 def plain(*args, **opts):
98 batchable = f(*args, **opts)
89 batchable = f(*args, **opts)
99 encargsorres, encresref = next(batchable)
90 encargsorres, encresref = next(batchable)
100 if not encresref:
91 if not encresref:
101 return encargsorres # a local result in this case
92 return encargsorres # a local result in this case
102 self = args[0]
93 self = args[0]
103 encresref.set(self._submitone(f.func_name, encargsorres))
94 encresref.set(self._submitone(f.func_name, encargsorres))
104 return next(batchable)
95 return next(batchable)
105 setattr(plain, 'batchable', f)
96 setattr(plain, 'batchable', f)
106 return plain
97 return plain
107
98
108 class peerrepository(object):
99 class peerrepository(object):
109
110 def batch(self):
111 return localbatch(self)
112
113 def iterbatch(self):
100 def iterbatch(self):
114 """Batch requests but allow iterating over the results.
101 """Batch requests but allow iterating over the results.
115
102
116 This is to allow interleaving responses with things like
103 This is to allow interleaving responses with things like
117 progress updates for clients.
104 progress updates for clients.
118 """
105 """
119 return localiterbatcher(self)
106 return localiterbatcher(self)
120
107
121 def capable(self, name):
108 def capable(self, name):
122 '''tell whether repo supports named capability.
109 '''tell whether repo supports named capability.
123 return False if not supported.
110 return False if not supported.
124 if boolean capability, return True.
111 if boolean capability, return True.
125 if string capability, return string.'''
112 if string capability, return string.'''
126 caps = self._capabilities()
113 caps = self._capabilities()
127 if name in caps:
114 if name in caps:
128 return True
115 return True
129 name_eq = name + '='
116 name_eq = name + '='
130 for cap in caps:
117 for cap in caps:
131 if cap.startswith(name_eq):
118 if cap.startswith(name_eq):
132 return cap[len(name_eq):]
119 return cap[len(name_eq):]
133 return False
120 return False
134
121
135 def requirecap(self, name, purpose):
122 def requirecap(self, name, purpose):
136 '''raise an exception if the given capability is not present'''
123 '''raise an exception if the given capability is not present'''
137 if not self.capable(name):
124 if not self.capable(name):
138 raise error.CapabilityError(
125 raise error.CapabilityError(
139 _('cannot %s; remote repository does not '
126 _('cannot %s; remote repository does not '
140 'support the %r capability') % (purpose, name))
127 'support the %r capability') % (purpose, name))
141
128
142 def local(self):
129 def local(self):
143 '''return peer as a localrepo, or None'''
130 '''return peer as a localrepo, or None'''
144 return None
131 return None
145
132
146 def peer(self):
133 def peer(self):
147 return self
134 return self
148
135
149 def canpush(self):
136 def canpush(self):
150 return True
137 return True
151
138
152 def close(self):
139 def close(self):
153 pass
140 pass
@@ -1,1088 +1,1050 b''
1 # wireproto.py - generic wire protocol support functions
1 # wireproto.py - generic wire protocol support functions
2 #
2 #
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import hashlib
10 import hashlib
11 import itertools
12 import os
11 import os
13 import tempfile
12 import tempfile
14
13
15 from .i18n import _
14 from .i18n import _
16 from .node import (
15 from .node import (
17 bin,
16 bin,
18 hex,
17 hex,
19 nullid,
18 nullid,
20 )
19 )
21
20
22 from . import (
21 from . import (
23 bundle2,
22 bundle2,
24 changegroup as changegroupmod,
23 changegroup as changegroupmod,
25 encoding,
24 encoding,
26 error,
25 error,
27 exchange,
26 exchange,
28 peer,
27 peer,
29 pushkey as pushkeymod,
28 pushkey as pushkeymod,
30 pycompat,
29 pycompat,
31 streamclone,
30 streamclone,
32 util,
31 util,
33 )
32 )
34
33
35 urlerr = util.urlerr
34 urlerr = util.urlerr
36 urlreq = util.urlreq
35 urlreq = util.urlreq
37
36
38 bundle2requiredmain = _('incompatible Mercurial client; bundle2 required')
37 bundle2requiredmain = _('incompatible Mercurial client; bundle2 required')
39 bundle2requiredhint = _('see https://www.mercurial-scm.org/wiki/'
38 bundle2requiredhint = _('see https://www.mercurial-scm.org/wiki/'
40 'IncompatibleClient')
39 'IncompatibleClient')
41 bundle2required = '%s\n(%s)\n' % (bundle2requiredmain, bundle2requiredhint)
40 bundle2required = '%s\n(%s)\n' % (bundle2requiredmain, bundle2requiredhint)
42
41
43 class abstractserverproto(object):
42 class abstractserverproto(object):
44 """abstract class that summarizes the protocol API
43 """abstract class that summarizes the protocol API
45
44
46 Used as reference and documentation.
45 Used as reference and documentation.
47 """
46 """
48
47
49 def getargs(self, args):
48 def getargs(self, args):
50 """return the value for arguments in <args>
49 """return the value for arguments in <args>
51
50
52 returns a list of values (same order as <args>)"""
51 returns a list of values (same order as <args>)"""
53 raise NotImplementedError()
52 raise NotImplementedError()
54
53
55 def getfile(self, fp):
54 def getfile(self, fp):
56 """write the whole content of a file into a file like object
55 """write the whole content of a file into a file like object
57
56
58 The file is in the form::
57 The file is in the form::
59
58
60 (<chunk-size>\n<chunk>)+0\n
59 (<chunk-size>\n<chunk>)+0\n
61
60
62 chunk size is the ascii version of the int.
61 chunk size is the ascii version of the int.
63 """
62 """
64 raise NotImplementedError()
63 raise NotImplementedError()
65
64
66 def redirect(self):
65 def redirect(self):
67 """may setup interception for stdout and stderr
66 """may setup interception for stdout and stderr
68
67
69 See also the `restore` method."""
68 See also the `restore` method."""
70 raise NotImplementedError()
69 raise NotImplementedError()
71
70
72 # If the `redirect` function does install interception, the `restore`
71 # If the `redirect` function does install interception, the `restore`
73 # function MUST be defined. If interception is not used, this function
72 # function MUST be defined. If interception is not used, this function
74 # MUST NOT be defined.
73 # MUST NOT be defined.
75 #
74 #
76 # left commented here on purpose
75 # left commented here on purpose
77 #
76 #
78 #def restore(self):
77 #def restore(self):
79 # """reinstall previous stdout and stderr and return intercepted stdout
78 # """reinstall previous stdout and stderr and return intercepted stdout
80 # """
79 # """
81 # raise NotImplementedError()
80 # raise NotImplementedError()
82
81
83 class remotebatch(peer.batcher):
84 '''batches the queued calls; uses as few roundtrips as possible'''
85 def __init__(self, remote):
86 '''remote must support _submitbatch(encbatch) and
87 _submitone(op, encargs)'''
88 peer.batcher.__init__(self)
89 self.remote = remote
90 def submit(self):
91 req, rsp = [], []
92 for name, args, opts, resref in self.calls:
93 mtd = getattr(self.remote, name)
94 batchablefn = getattr(mtd, 'batchable', None)
95 if batchablefn is not None:
96 batchable = batchablefn(mtd.im_self, *args, **opts)
97 encargsorres, encresref = next(batchable)
98 assert encresref
99 req.append((name, encargsorres,))
100 rsp.append((batchable, encresref, resref,))
101 else:
102 if req:
103 self._submitreq(req, rsp)
104 req, rsp = [], []
105 resref.set(mtd(*args, **opts))
106 if req:
107 self._submitreq(req, rsp)
108 def _submitreq(self, req, rsp):
109 encresults = self.remote._submitbatch(req)
110 for encres, r in zip(encresults, rsp):
111 batchable, encresref, resref = r
112 encresref.set(encres)
113 resref.set(next(batchable))
114
115 class remoteiterbatcher(peer.iterbatcher):
82 class remoteiterbatcher(peer.iterbatcher):
116 def __init__(self, remote):
83 def __init__(self, remote):
117 super(remoteiterbatcher, self).__init__()
84 super(remoteiterbatcher, self).__init__()
118 self._remote = remote
85 self._remote = remote
119
86
120 def __getattr__(self, name):
87 def __getattr__(self, name):
121 # Validate this method is batchable, since submit() only supports
88 # Validate this method is batchable, since submit() only supports
122 # batchable methods.
89 # batchable methods.
123 fn = getattr(self._remote, name)
90 fn = getattr(self._remote, name)
124 if not getattr(fn, 'batchable', None):
91 if not getattr(fn, 'batchable', None):
125 raise error.ProgrammingError('Attempted to batch a non-batchable '
92 raise error.ProgrammingError('Attempted to batch a non-batchable '
126 'call to %r' % name)
93 'call to %r' % name)
127
94
128 return super(remoteiterbatcher, self).__getattr__(name)
95 return super(remoteiterbatcher, self).__getattr__(name)
129
96
130 def submit(self):
97 def submit(self):
131 """Break the batch request into many patch calls and pipeline them.
98 """Break the batch request into many patch calls and pipeline them.
132
99
133 This is mostly valuable over http where request sizes can be
100 This is mostly valuable over http where request sizes can be
134 limited, but can be used in other places as well.
101 limited, but can be used in other places as well.
135 """
102 """
136 # 2-tuple of (command, arguments) that represents what will be
103 # 2-tuple of (command, arguments) that represents what will be
137 # sent over the wire.
104 # sent over the wire.
138 requests = []
105 requests = []
139
106
140 # 4-tuple of (command, final future, @batchable generator, remote
107 # 4-tuple of (command, final future, @batchable generator, remote
141 # future).
108 # future).
142 results = []
109 results = []
143
110
144 for command, args, opts, finalfuture in self.calls:
111 for command, args, opts, finalfuture in self.calls:
145 mtd = getattr(self._remote, command)
112 mtd = getattr(self._remote, command)
146 batchable = mtd.batchable(mtd.im_self, *args, **opts)
113 batchable = mtd.batchable(mtd.im_self, *args, **opts)
147
114
148 commandargs, fremote = next(batchable)
115 commandargs, fremote = next(batchable)
149 assert fremote
116 assert fremote
150 requests.append((command, commandargs))
117 requests.append((command, commandargs))
151 results.append((command, finalfuture, batchable, fremote))
118 results.append((command, finalfuture, batchable, fremote))
152
119
153 if requests:
120 if requests:
154 self._resultiter = self._remote._submitbatch(requests)
121 self._resultiter = self._remote._submitbatch(requests)
155
122
156 self._results = results
123 self._results = results
157
124
158 def results(self):
125 def results(self):
159 for command, finalfuture, batchable, remotefuture in self._results:
126 for command, finalfuture, batchable, remotefuture in self._results:
160 # Get the raw result, set it in the remote future, feed it
127 # Get the raw result, set it in the remote future, feed it
161 # back into the @batchable generator so it can be decoded, and
128 # back into the @batchable generator so it can be decoded, and
162 # set the result on the final future to this value.
129 # set the result on the final future to this value.
163 remoteresult = next(self._resultiter)
130 remoteresult = next(self._resultiter)
164 remotefuture.set(remoteresult)
131 remotefuture.set(remoteresult)
165 finalfuture.set(next(batchable))
132 finalfuture.set(next(batchable))
166
133
167 # Verify our @batchable generators only emit 2 values.
134 # Verify our @batchable generators only emit 2 values.
168 try:
135 try:
169 next(batchable)
136 next(batchable)
170 except StopIteration:
137 except StopIteration:
171 pass
138 pass
172 else:
139 else:
173 raise error.ProgrammingError('%s @batchable generator emitted '
140 raise error.ProgrammingError('%s @batchable generator emitted '
174 'unexpected value count' % command)
141 'unexpected value count' % command)
175
142
176 yield finalfuture.value
143 yield finalfuture.value
177
144
178 # Forward a couple of names from peer to make wireproto interactions
145 # Forward a couple of names from peer to make wireproto interactions
179 # slightly more sensible.
146 # slightly more sensible.
180 batchable = peer.batchable
147 batchable = peer.batchable
181 future = peer.future
148 future = peer.future
182
149
183 # list of nodes encoding / decoding
150 # list of nodes encoding / decoding
184
151
185 def decodelist(l, sep=' '):
152 def decodelist(l, sep=' '):
186 if l:
153 if l:
187 return map(bin, l.split(sep))
154 return map(bin, l.split(sep))
188 return []
155 return []
189
156
190 def encodelist(l, sep=' '):
157 def encodelist(l, sep=' '):
191 try:
158 try:
192 return sep.join(map(hex, l))
159 return sep.join(map(hex, l))
193 except TypeError:
160 except TypeError:
194 raise
161 raise
195
162
196 # batched call argument encoding
163 # batched call argument encoding
197
164
198 def escapearg(plain):
165 def escapearg(plain):
199 return (plain
166 return (plain
200 .replace(':', ':c')
167 .replace(':', ':c')
201 .replace(',', ':o')
168 .replace(',', ':o')
202 .replace(';', ':s')
169 .replace(';', ':s')
203 .replace('=', ':e'))
170 .replace('=', ':e'))
204
171
205 def unescapearg(escaped):
172 def unescapearg(escaped):
206 return (escaped
173 return (escaped
207 .replace(':e', '=')
174 .replace(':e', '=')
208 .replace(':s', ';')
175 .replace(':s', ';')
209 .replace(':o', ',')
176 .replace(':o', ',')
210 .replace(':c', ':'))
177 .replace(':c', ':'))
211
178
212 def encodebatchcmds(req):
179 def encodebatchcmds(req):
213 """Return a ``cmds`` argument value for the ``batch`` command."""
180 """Return a ``cmds`` argument value for the ``batch`` command."""
214 cmds = []
181 cmds = []
215 for op, argsdict in req:
182 for op, argsdict in req:
216 # Old servers didn't properly unescape argument names. So prevent
183 # Old servers didn't properly unescape argument names. So prevent
217 # the sending of argument names that may not be decoded properly by
184 # the sending of argument names that may not be decoded properly by
218 # servers.
185 # servers.
219 assert all(escapearg(k) == k for k in argsdict)
186 assert all(escapearg(k) == k for k in argsdict)
220
187
221 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
188 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
222 for k, v in argsdict.iteritems())
189 for k, v in argsdict.iteritems())
223 cmds.append('%s %s' % (op, args))
190 cmds.append('%s %s' % (op, args))
224
191
225 return ';'.join(cmds)
192 return ';'.join(cmds)
226
193
227 # mapping of options accepted by getbundle and their types
194 # mapping of options accepted by getbundle and their types
228 #
195 #
229 # Meant to be extended by extensions. It is extensions responsibility to ensure
196 # Meant to be extended by extensions. It is extensions responsibility to ensure
230 # such options are properly processed in exchange.getbundle.
197 # such options are properly processed in exchange.getbundle.
231 #
198 #
232 # supported types are:
199 # supported types are:
233 #
200 #
234 # :nodes: list of binary nodes
201 # :nodes: list of binary nodes
235 # :csv: list of comma-separated values
202 # :csv: list of comma-separated values
236 # :scsv: list of comma-separated values return as set
203 # :scsv: list of comma-separated values return as set
237 # :plain: string with no transformation needed.
204 # :plain: string with no transformation needed.
238 gboptsmap = {'heads': 'nodes',
205 gboptsmap = {'heads': 'nodes',
239 'common': 'nodes',
206 'common': 'nodes',
240 'obsmarkers': 'boolean',
207 'obsmarkers': 'boolean',
241 'bundlecaps': 'scsv',
208 'bundlecaps': 'scsv',
242 'listkeys': 'csv',
209 'listkeys': 'csv',
243 'cg': 'boolean',
210 'cg': 'boolean',
244 'cbattempted': 'boolean'}
211 'cbattempted': 'boolean'}
245
212
246 # client side
213 # client side
247
214
248 class wirepeer(peer.peerrepository):
215 class wirepeer(peer.peerrepository):
249 """Client-side interface for communicating with a peer repository.
216 """Client-side interface for communicating with a peer repository.
250
217
251 Methods commonly call wire protocol commands of the same name.
218 Methods commonly call wire protocol commands of the same name.
252
219
253 See also httppeer.py and sshpeer.py for protocol-specific
220 See also httppeer.py and sshpeer.py for protocol-specific
254 implementations of this interface.
221 implementations of this interface.
255 """
222 """
256 def batch(self):
257 if self.capable('batch'):
258 return remotebatch(self)
259 else:
260 return peer.localbatch(self)
261 def _submitbatch(self, req):
223 def _submitbatch(self, req):
262 """run batch request <req> on the server
224 """run batch request <req> on the server
263
225
264 Returns an iterator of the raw responses from the server.
226 Returns an iterator of the raw responses from the server.
265 """
227 """
266 rsp = self._callstream("batch", cmds=encodebatchcmds(req))
228 rsp = self._callstream("batch", cmds=encodebatchcmds(req))
267 chunk = rsp.read(1024)
229 chunk = rsp.read(1024)
268 work = [chunk]
230 work = [chunk]
269 while chunk:
231 while chunk:
270 while ';' not in chunk and chunk:
232 while ';' not in chunk and chunk:
271 chunk = rsp.read(1024)
233 chunk = rsp.read(1024)
272 work.append(chunk)
234 work.append(chunk)
273 merged = ''.join(work)
235 merged = ''.join(work)
274 while ';' in merged:
236 while ';' in merged:
275 one, merged = merged.split(';', 1)
237 one, merged = merged.split(';', 1)
276 yield unescapearg(one)
238 yield unescapearg(one)
277 chunk = rsp.read(1024)
239 chunk = rsp.read(1024)
278 work = [merged, chunk]
240 work = [merged, chunk]
279 yield unescapearg(''.join(work))
241 yield unescapearg(''.join(work))
280
242
281 def _submitone(self, op, args):
243 def _submitone(self, op, args):
282 return self._call(op, **args)
244 return self._call(op, **args)
283
245
284 def iterbatch(self):
246 def iterbatch(self):
285 return remoteiterbatcher(self)
247 return remoteiterbatcher(self)
286
248
287 @batchable
249 @batchable
288 def lookup(self, key):
250 def lookup(self, key):
289 self.requirecap('lookup', _('look up remote revision'))
251 self.requirecap('lookup', _('look up remote revision'))
290 f = future()
252 f = future()
291 yield {'key': encoding.fromlocal(key)}, f
253 yield {'key': encoding.fromlocal(key)}, f
292 d = f.value
254 d = f.value
293 success, data = d[:-1].split(" ", 1)
255 success, data = d[:-1].split(" ", 1)
294 if int(success):
256 if int(success):
295 yield bin(data)
257 yield bin(data)
296 self._abort(error.RepoError(data))
258 self._abort(error.RepoError(data))
297
259
298 @batchable
260 @batchable
299 def heads(self):
261 def heads(self):
300 f = future()
262 f = future()
301 yield {}, f
263 yield {}, f
302 d = f.value
264 d = f.value
303 try:
265 try:
304 yield decodelist(d[:-1])
266 yield decodelist(d[:-1])
305 except ValueError:
267 except ValueError:
306 self._abort(error.ResponseError(_("unexpected response:"), d))
268 self._abort(error.ResponseError(_("unexpected response:"), d))
307
269
308 @batchable
270 @batchable
309 def known(self, nodes):
271 def known(self, nodes):
310 f = future()
272 f = future()
311 yield {'nodes': encodelist(nodes)}, f
273 yield {'nodes': encodelist(nodes)}, f
312 d = f.value
274 d = f.value
313 try:
275 try:
314 yield [bool(int(b)) for b in d]
276 yield [bool(int(b)) for b in d]
315 except ValueError:
277 except ValueError:
316 self._abort(error.ResponseError(_("unexpected response:"), d))
278 self._abort(error.ResponseError(_("unexpected response:"), d))
317
279
318 @batchable
280 @batchable
319 def branchmap(self):
281 def branchmap(self):
320 f = future()
282 f = future()
321 yield {}, f
283 yield {}, f
322 d = f.value
284 d = f.value
323 try:
285 try:
324 branchmap = {}
286 branchmap = {}
325 for branchpart in d.splitlines():
287 for branchpart in d.splitlines():
326 branchname, branchheads = branchpart.split(' ', 1)
288 branchname, branchheads = branchpart.split(' ', 1)
327 branchname = encoding.tolocal(urlreq.unquote(branchname))
289 branchname = encoding.tolocal(urlreq.unquote(branchname))
328 branchheads = decodelist(branchheads)
290 branchheads = decodelist(branchheads)
329 branchmap[branchname] = branchheads
291 branchmap[branchname] = branchheads
330 yield branchmap
292 yield branchmap
331 except TypeError:
293 except TypeError:
332 self._abort(error.ResponseError(_("unexpected response:"), d))
294 self._abort(error.ResponseError(_("unexpected response:"), d))
333
295
334 def branches(self, nodes):
296 def branches(self, nodes):
335 n = encodelist(nodes)
297 n = encodelist(nodes)
336 d = self._call("branches", nodes=n)
298 d = self._call("branches", nodes=n)
337 try:
299 try:
338 br = [tuple(decodelist(b)) for b in d.splitlines()]
300 br = [tuple(decodelist(b)) for b in d.splitlines()]
339 return br
301 return br
340 except ValueError:
302 except ValueError:
341 self._abort(error.ResponseError(_("unexpected response:"), d))
303 self._abort(error.ResponseError(_("unexpected response:"), d))
342
304
343 def between(self, pairs):
305 def between(self, pairs):
344 batch = 8 # avoid giant requests
306 batch = 8 # avoid giant requests
345 r = []
307 r = []
346 for i in xrange(0, len(pairs), batch):
308 for i in xrange(0, len(pairs), batch):
347 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
309 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
348 d = self._call("between", pairs=n)
310 d = self._call("between", pairs=n)
349 try:
311 try:
350 r.extend(l and decodelist(l) or [] for l in d.splitlines())
312 r.extend(l and decodelist(l) or [] for l in d.splitlines())
351 except ValueError:
313 except ValueError:
352 self._abort(error.ResponseError(_("unexpected response:"), d))
314 self._abort(error.ResponseError(_("unexpected response:"), d))
353 return r
315 return r
354
316
355 @batchable
317 @batchable
356 def pushkey(self, namespace, key, old, new):
318 def pushkey(self, namespace, key, old, new):
357 if not self.capable('pushkey'):
319 if not self.capable('pushkey'):
358 yield False, None
320 yield False, None
359 f = future()
321 f = future()
360 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
322 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
361 yield {'namespace': encoding.fromlocal(namespace),
323 yield {'namespace': encoding.fromlocal(namespace),
362 'key': encoding.fromlocal(key),
324 'key': encoding.fromlocal(key),
363 'old': encoding.fromlocal(old),
325 'old': encoding.fromlocal(old),
364 'new': encoding.fromlocal(new)}, f
326 'new': encoding.fromlocal(new)}, f
365 d = f.value
327 d = f.value
366 d, output = d.split('\n', 1)
328 d, output = d.split('\n', 1)
367 try:
329 try:
368 d = bool(int(d))
330 d = bool(int(d))
369 except ValueError:
331 except ValueError:
370 raise error.ResponseError(
332 raise error.ResponseError(
371 _('push failed (unexpected response):'), d)
333 _('push failed (unexpected response):'), d)
372 for l in output.splitlines(True):
334 for l in output.splitlines(True):
373 self.ui.status(_('remote: '), l)
335 self.ui.status(_('remote: '), l)
374 yield d
336 yield d
375
337
376 @batchable
338 @batchable
377 def listkeys(self, namespace):
339 def listkeys(self, namespace):
378 if not self.capable('pushkey'):
340 if not self.capable('pushkey'):
379 yield {}, None
341 yield {}, None
380 f = future()
342 f = future()
381 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
343 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
382 yield {'namespace': encoding.fromlocal(namespace)}, f
344 yield {'namespace': encoding.fromlocal(namespace)}, f
383 d = f.value
345 d = f.value
384 self.ui.debug('received listkey for "%s": %i bytes\n'
346 self.ui.debug('received listkey for "%s": %i bytes\n'
385 % (namespace, len(d)))
347 % (namespace, len(d)))
386 yield pushkeymod.decodekeys(d)
348 yield pushkeymod.decodekeys(d)
387
349
388 def stream_out(self):
350 def stream_out(self):
389 return self._callstream('stream_out')
351 return self._callstream('stream_out')
390
352
391 def changegroup(self, nodes, kind):
353 def changegroup(self, nodes, kind):
392 n = encodelist(nodes)
354 n = encodelist(nodes)
393 f = self._callcompressable("changegroup", roots=n)
355 f = self._callcompressable("changegroup", roots=n)
394 return changegroupmod.cg1unpacker(f, 'UN')
356 return changegroupmod.cg1unpacker(f, 'UN')
395
357
396 def changegroupsubset(self, bases, heads, kind):
358 def changegroupsubset(self, bases, heads, kind):
397 self.requirecap('changegroupsubset', _('look up remote changes'))
359 self.requirecap('changegroupsubset', _('look up remote changes'))
398 bases = encodelist(bases)
360 bases = encodelist(bases)
399 heads = encodelist(heads)
361 heads = encodelist(heads)
400 f = self._callcompressable("changegroupsubset",
362 f = self._callcompressable("changegroupsubset",
401 bases=bases, heads=heads)
363 bases=bases, heads=heads)
402 return changegroupmod.cg1unpacker(f, 'UN')
364 return changegroupmod.cg1unpacker(f, 'UN')
403
365
404 def getbundle(self, source, **kwargs):
366 def getbundle(self, source, **kwargs):
405 self.requirecap('getbundle', _('look up remote changes'))
367 self.requirecap('getbundle', _('look up remote changes'))
406 opts = {}
368 opts = {}
407 bundlecaps = kwargs.get('bundlecaps')
369 bundlecaps = kwargs.get('bundlecaps')
408 if bundlecaps is not None:
370 if bundlecaps is not None:
409 kwargs['bundlecaps'] = sorted(bundlecaps)
371 kwargs['bundlecaps'] = sorted(bundlecaps)
410 else:
372 else:
411 bundlecaps = () # kwargs could have it to None
373 bundlecaps = () # kwargs could have it to None
412 for key, value in kwargs.iteritems():
374 for key, value in kwargs.iteritems():
413 if value is None:
375 if value is None:
414 continue
376 continue
415 keytype = gboptsmap.get(key)
377 keytype = gboptsmap.get(key)
416 if keytype is None:
378 if keytype is None:
417 assert False, 'unexpected'
379 assert False, 'unexpected'
418 elif keytype == 'nodes':
380 elif keytype == 'nodes':
419 value = encodelist(value)
381 value = encodelist(value)
420 elif keytype in ('csv', 'scsv'):
382 elif keytype in ('csv', 'scsv'):
421 value = ','.join(value)
383 value = ','.join(value)
422 elif keytype == 'boolean':
384 elif keytype == 'boolean':
423 value = '%i' % bool(value)
385 value = '%i' % bool(value)
424 elif keytype != 'plain':
386 elif keytype != 'plain':
425 raise KeyError('unknown getbundle option type %s'
387 raise KeyError('unknown getbundle option type %s'
426 % keytype)
388 % keytype)
427 opts[key] = value
389 opts[key] = value
428 f = self._callcompressable("getbundle", **opts)
390 f = self._callcompressable("getbundle", **opts)
429 if any((cap.startswith('HG2') for cap in bundlecaps)):
391 if any((cap.startswith('HG2') for cap in bundlecaps)):
430 return bundle2.getunbundler(self.ui, f)
392 return bundle2.getunbundler(self.ui, f)
431 else:
393 else:
432 return changegroupmod.cg1unpacker(f, 'UN')
394 return changegroupmod.cg1unpacker(f, 'UN')
433
395
434 def unbundle(self, cg, heads, url):
396 def unbundle(self, cg, heads, url):
435 '''Send cg (a readable file-like object representing the
397 '''Send cg (a readable file-like object representing the
436 changegroup to push, typically a chunkbuffer object) to the
398 changegroup to push, typically a chunkbuffer object) to the
437 remote server as a bundle.
399 remote server as a bundle.
438
400
439 When pushing a bundle10 stream, return an integer indicating the
401 When pushing a bundle10 stream, return an integer indicating the
440 result of the push (see changegroup.apply()).
402 result of the push (see changegroup.apply()).
441
403
442 When pushing a bundle20 stream, return a bundle20 stream.
404 When pushing a bundle20 stream, return a bundle20 stream.
443
405
444 `url` is the url the client thinks it's pushing to, which is
406 `url` is the url the client thinks it's pushing to, which is
445 visible to hooks.
407 visible to hooks.
446 '''
408 '''
447
409
448 if heads != ['force'] and self.capable('unbundlehash'):
410 if heads != ['force'] and self.capable('unbundlehash'):
449 heads = encodelist(['hashed',
411 heads = encodelist(['hashed',
450 hashlib.sha1(''.join(sorted(heads))).digest()])
412 hashlib.sha1(''.join(sorted(heads))).digest()])
451 else:
413 else:
452 heads = encodelist(heads)
414 heads = encodelist(heads)
453
415
454 if util.safehasattr(cg, 'deltaheader'):
416 if util.safehasattr(cg, 'deltaheader'):
455 # this a bundle10, do the old style call sequence
417 # this a bundle10, do the old style call sequence
456 ret, output = self._callpush("unbundle", cg, heads=heads)
418 ret, output = self._callpush("unbundle", cg, heads=heads)
457 if ret == "":
419 if ret == "":
458 raise error.ResponseError(
420 raise error.ResponseError(
459 _('push failed:'), output)
421 _('push failed:'), output)
460 try:
422 try:
461 ret = int(ret)
423 ret = int(ret)
462 except ValueError:
424 except ValueError:
463 raise error.ResponseError(
425 raise error.ResponseError(
464 _('push failed (unexpected response):'), ret)
426 _('push failed (unexpected response):'), ret)
465
427
466 for l in output.splitlines(True):
428 for l in output.splitlines(True):
467 self.ui.status(_('remote: '), l)
429 self.ui.status(_('remote: '), l)
468 else:
430 else:
469 # bundle2 push. Send a stream, fetch a stream.
431 # bundle2 push. Send a stream, fetch a stream.
470 stream = self._calltwowaystream('unbundle', cg, heads=heads)
432 stream = self._calltwowaystream('unbundle', cg, heads=heads)
471 ret = bundle2.getunbundler(self.ui, stream)
433 ret = bundle2.getunbundler(self.ui, stream)
472 return ret
434 return ret
473
435
474 def debugwireargs(self, one, two, three=None, four=None, five=None):
436 def debugwireargs(self, one, two, three=None, four=None, five=None):
475 # don't pass optional arguments left at their default value
437 # don't pass optional arguments left at their default value
476 opts = {}
438 opts = {}
477 if three is not None:
439 if three is not None:
478 opts['three'] = three
440 opts['three'] = three
479 if four is not None:
441 if four is not None:
480 opts['four'] = four
442 opts['four'] = four
481 return self._call('debugwireargs', one=one, two=two, **opts)
443 return self._call('debugwireargs', one=one, two=two, **opts)
482
444
483 def _call(self, cmd, **args):
445 def _call(self, cmd, **args):
484 """execute <cmd> on the server
446 """execute <cmd> on the server
485
447
486 The command is expected to return a simple string.
448 The command is expected to return a simple string.
487
449
488 returns the server reply as a string."""
450 returns the server reply as a string."""
489 raise NotImplementedError()
451 raise NotImplementedError()
490
452
491 def _callstream(self, cmd, **args):
453 def _callstream(self, cmd, **args):
492 """execute <cmd> on the server
454 """execute <cmd> on the server
493
455
494 The command is expected to return a stream. Note that if the
456 The command is expected to return a stream. Note that if the
495 command doesn't return a stream, _callstream behaves
457 command doesn't return a stream, _callstream behaves
496 differently for ssh and http peers.
458 differently for ssh and http peers.
497
459
498 returns the server reply as a file like object.
460 returns the server reply as a file like object.
499 """
461 """
500 raise NotImplementedError()
462 raise NotImplementedError()
501
463
502 def _callcompressable(self, cmd, **args):
464 def _callcompressable(self, cmd, **args):
503 """execute <cmd> on the server
465 """execute <cmd> on the server
504
466
505 The command is expected to return a stream.
467 The command is expected to return a stream.
506
468
507 The stream may have been compressed in some implementations. This
469 The stream may have been compressed in some implementations. This
508 function takes care of the decompression. This is the only difference
470 function takes care of the decompression. This is the only difference
509 with _callstream.
471 with _callstream.
510
472
511 returns the server reply as a file like object.
473 returns the server reply as a file like object.
512 """
474 """
513 raise NotImplementedError()
475 raise NotImplementedError()
514
476
515 def _callpush(self, cmd, fp, **args):
477 def _callpush(self, cmd, fp, **args):
516 """execute a <cmd> on server
478 """execute a <cmd> on server
517
479
518 The command is expected to be related to a push. Push has a special
480 The command is expected to be related to a push. Push has a special
519 return method.
481 return method.
520
482
521 returns the server reply as a (ret, output) tuple. ret is either
483 returns the server reply as a (ret, output) tuple. ret is either
522 empty (error) or a stringified int.
484 empty (error) or a stringified int.
523 """
485 """
524 raise NotImplementedError()
486 raise NotImplementedError()
525
487
526 def _calltwowaystream(self, cmd, fp, **args):
488 def _calltwowaystream(self, cmd, fp, **args):
527 """execute <cmd> on server
489 """execute <cmd> on server
528
490
529 The command will send a stream to the server and get a stream in reply.
491 The command will send a stream to the server and get a stream in reply.
530 """
492 """
531 raise NotImplementedError()
493 raise NotImplementedError()
532
494
533 def _abort(self, exception):
495 def _abort(self, exception):
534 """clearly abort the wire protocol connection and raise the exception
496 """clearly abort the wire protocol connection and raise the exception
535 """
497 """
536 raise NotImplementedError()
498 raise NotImplementedError()
537
499
538 # server side
500 # server side
539
501
540 # wire protocol command can either return a string or one of these classes.
502 # wire protocol command can either return a string or one of these classes.
541 class streamres(object):
503 class streamres(object):
542 """wireproto reply: binary stream
504 """wireproto reply: binary stream
543
505
544 The call was successful and the result is a stream.
506 The call was successful and the result is a stream.
545
507
546 Accepts either a generator or an object with a ``read(size)`` method.
508 Accepts either a generator or an object with a ``read(size)`` method.
547
509
548 ``v1compressible`` indicates whether this data can be compressed to
510 ``v1compressible`` indicates whether this data can be compressed to
549 "version 1" clients (technically: HTTP peers using
511 "version 1" clients (technically: HTTP peers using
550 application/mercurial-0.1 media type). This flag should NOT be used on
512 application/mercurial-0.1 media type). This flag should NOT be used on
551 new commands because new clients should support a more modern compression
513 new commands because new clients should support a more modern compression
552 mechanism.
514 mechanism.
553 """
515 """
554 def __init__(self, gen=None, reader=None, v1compressible=False):
516 def __init__(self, gen=None, reader=None, v1compressible=False):
555 self.gen = gen
517 self.gen = gen
556 self.reader = reader
518 self.reader = reader
557 self.v1compressible = v1compressible
519 self.v1compressible = v1compressible
558
520
559 class pushres(object):
521 class pushres(object):
560 """wireproto reply: success with simple integer return
522 """wireproto reply: success with simple integer return
561
523
562 The call was successful and returned an integer contained in `self.res`.
524 The call was successful and returned an integer contained in `self.res`.
563 """
525 """
564 def __init__(self, res):
526 def __init__(self, res):
565 self.res = res
527 self.res = res
566
528
567 class pusherr(object):
529 class pusherr(object):
568 """wireproto reply: failure
530 """wireproto reply: failure
569
531
570 The call failed. The `self.res` attribute contains the error message.
532 The call failed. The `self.res` attribute contains the error message.
571 """
533 """
572 def __init__(self, res):
534 def __init__(self, res):
573 self.res = res
535 self.res = res
574
536
575 class ooberror(object):
537 class ooberror(object):
576 """wireproto reply: failure of a batch of operation
538 """wireproto reply: failure of a batch of operation
577
539
578 Something failed during a batch call. The error message is stored in
540 Something failed during a batch call. The error message is stored in
579 `self.message`.
541 `self.message`.
580 """
542 """
581 def __init__(self, message):
543 def __init__(self, message):
582 self.message = message
544 self.message = message
583
545
584 def getdispatchrepo(repo, proto, command):
546 def getdispatchrepo(repo, proto, command):
585 """Obtain the repo used for processing wire protocol commands.
547 """Obtain the repo used for processing wire protocol commands.
586
548
587 The intent of this function is to serve as a monkeypatch point for
549 The intent of this function is to serve as a monkeypatch point for
588 extensions that need commands to operate on different repo views under
550 extensions that need commands to operate on different repo views under
589 specialized circumstances.
551 specialized circumstances.
590 """
552 """
591 return repo.filtered('served')
553 return repo.filtered('served')
592
554
593 def dispatch(repo, proto, command):
555 def dispatch(repo, proto, command):
594 repo = getdispatchrepo(repo, proto, command)
556 repo = getdispatchrepo(repo, proto, command)
595 func, spec = commands[command]
557 func, spec = commands[command]
596 args = proto.getargs(spec)
558 args = proto.getargs(spec)
597 return func(repo, proto, *args)
559 return func(repo, proto, *args)
598
560
599 def options(cmd, keys, others):
561 def options(cmd, keys, others):
600 opts = {}
562 opts = {}
601 for k in keys:
563 for k in keys:
602 if k in others:
564 if k in others:
603 opts[k] = others[k]
565 opts[k] = others[k]
604 del others[k]
566 del others[k]
605 if others:
567 if others:
606 util.stderr.write("warning: %s ignored unexpected arguments %s\n"
568 util.stderr.write("warning: %s ignored unexpected arguments %s\n"
607 % (cmd, ",".join(others)))
569 % (cmd, ",".join(others)))
608 return opts
570 return opts
609
571
610 def bundle1allowed(repo, action):
572 def bundle1allowed(repo, action):
611 """Whether a bundle1 operation is allowed from the server.
573 """Whether a bundle1 operation is allowed from the server.
612
574
613 Priority is:
575 Priority is:
614
576
615 1. server.bundle1gd.<action> (if generaldelta active)
577 1. server.bundle1gd.<action> (if generaldelta active)
616 2. server.bundle1.<action>
578 2. server.bundle1.<action>
617 3. server.bundle1gd (if generaldelta active)
579 3. server.bundle1gd (if generaldelta active)
618 4. server.bundle1
580 4. server.bundle1
619 """
581 """
620 ui = repo.ui
582 ui = repo.ui
621 gd = 'generaldelta' in repo.requirements
583 gd = 'generaldelta' in repo.requirements
622
584
623 if gd:
585 if gd:
624 v = ui.configbool('server', 'bundle1gd.%s' % action, None)
586 v = ui.configbool('server', 'bundle1gd.%s' % action, None)
625 if v is not None:
587 if v is not None:
626 return v
588 return v
627
589
628 v = ui.configbool('server', 'bundle1.%s' % action, None)
590 v = ui.configbool('server', 'bundle1.%s' % action, None)
629 if v is not None:
591 if v is not None:
630 return v
592 return v
631
593
632 if gd:
594 if gd:
633 v = ui.configbool('server', 'bundle1gd')
595 v = ui.configbool('server', 'bundle1gd')
634 if v is not None:
596 if v is not None:
635 return v
597 return v
636
598
637 return ui.configbool('server', 'bundle1')
599 return ui.configbool('server', 'bundle1')
638
600
639 def supportedcompengines(ui, proto, role):
601 def supportedcompengines(ui, proto, role):
640 """Obtain the list of supported compression engines for a request."""
602 """Obtain the list of supported compression engines for a request."""
641 assert role in (util.CLIENTROLE, util.SERVERROLE)
603 assert role in (util.CLIENTROLE, util.SERVERROLE)
642
604
643 compengines = util.compengines.supportedwireengines(role)
605 compengines = util.compengines.supportedwireengines(role)
644
606
645 # Allow config to override default list and ordering.
607 # Allow config to override default list and ordering.
646 if role == util.SERVERROLE:
608 if role == util.SERVERROLE:
647 configengines = ui.configlist('server', 'compressionengines')
609 configengines = ui.configlist('server', 'compressionengines')
648 config = 'server.compressionengines'
610 config = 'server.compressionengines'
649 else:
611 else:
650 # This is currently implemented mainly to facilitate testing. In most
612 # This is currently implemented mainly to facilitate testing. In most
651 # cases, the server should be in charge of choosing a compression engine
613 # cases, the server should be in charge of choosing a compression engine
652 # because a server has the most to lose from a sub-optimal choice. (e.g.
614 # because a server has the most to lose from a sub-optimal choice. (e.g.
653 # CPU DoS due to an expensive engine or a network DoS due to poor
615 # CPU DoS due to an expensive engine or a network DoS due to poor
654 # compression ratio).
616 # compression ratio).
655 configengines = ui.configlist('experimental',
617 configengines = ui.configlist('experimental',
656 'clientcompressionengines')
618 'clientcompressionengines')
657 config = 'experimental.clientcompressionengines'
619 config = 'experimental.clientcompressionengines'
658
620
659 # No explicit config. Filter out the ones that aren't supposed to be
621 # No explicit config. Filter out the ones that aren't supposed to be
660 # advertised and return default ordering.
622 # advertised and return default ordering.
661 if not configengines:
623 if not configengines:
662 attr = 'serverpriority' if role == util.SERVERROLE else 'clientpriority'
624 attr = 'serverpriority' if role == util.SERVERROLE else 'clientpriority'
663 return [e for e in compengines
625 return [e for e in compengines
664 if getattr(e.wireprotosupport(), attr) > 0]
626 if getattr(e.wireprotosupport(), attr) > 0]
665
627
666 # If compression engines are listed in the config, assume there is a good
628 # If compression engines are listed in the config, assume there is a good
667 # reason for it (like server operators wanting to achieve specific
629 # reason for it (like server operators wanting to achieve specific
668 # performance characteristics). So fail fast if the config references
630 # performance characteristics). So fail fast if the config references
669 # unusable compression engines.
631 # unusable compression engines.
670 validnames = set(e.name() for e in compengines)
632 validnames = set(e.name() for e in compengines)
671 invalidnames = set(e for e in configengines if e not in validnames)
633 invalidnames = set(e for e in configengines if e not in validnames)
672 if invalidnames:
634 if invalidnames:
673 raise error.Abort(_('invalid compression engine defined in %s: %s') %
635 raise error.Abort(_('invalid compression engine defined in %s: %s') %
674 (config, ', '.join(sorted(invalidnames))))
636 (config, ', '.join(sorted(invalidnames))))
675
637
676 compengines = [e for e in compengines if e.name() in configengines]
638 compengines = [e for e in compengines if e.name() in configengines]
677 compengines = sorted(compengines,
639 compengines = sorted(compengines,
678 key=lambda e: configengines.index(e.name()))
640 key=lambda e: configengines.index(e.name()))
679
641
680 if not compengines:
642 if not compengines:
681 raise error.Abort(_('%s config option does not specify any known '
643 raise error.Abort(_('%s config option does not specify any known '
682 'compression engines') % config,
644 'compression engines') % config,
683 hint=_('usable compression engines: %s') %
645 hint=_('usable compression engines: %s') %
684 ', '.sorted(validnames))
646 ', '.sorted(validnames))
685
647
686 return compengines
648 return compengines
687
649
688 # list of commands
650 # list of commands
689 commands = {}
651 commands = {}
690
652
691 def wireprotocommand(name, args=''):
653 def wireprotocommand(name, args=''):
692 """decorator for wire protocol command"""
654 """decorator for wire protocol command"""
693 def register(func):
655 def register(func):
694 commands[name] = (func, args)
656 commands[name] = (func, args)
695 return func
657 return func
696 return register
658 return register
697
659
698 @wireprotocommand('batch', 'cmds *')
660 @wireprotocommand('batch', 'cmds *')
699 def batch(repo, proto, cmds, others):
661 def batch(repo, proto, cmds, others):
700 repo = repo.filtered("served")
662 repo = repo.filtered("served")
701 res = []
663 res = []
702 for pair in cmds.split(';'):
664 for pair in cmds.split(';'):
703 op, args = pair.split(' ', 1)
665 op, args = pair.split(' ', 1)
704 vals = {}
666 vals = {}
705 for a in args.split(','):
667 for a in args.split(','):
706 if a:
668 if a:
707 n, v = a.split('=')
669 n, v = a.split('=')
708 vals[unescapearg(n)] = unescapearg(v)
670 vals[unescapearg(n)] = unescapearg(v)
709 func, spec = commands[op]
671 func, spec = commands[op]
710 if spec:
672 if spec:
711 keys = spec.split()
673 keys = spec.split()
712 data = {}
674 data = {}
713 for k in keys:
675 for k in keys:
714 if k == '*':
676 if k == '*':
715 star = {}
677 star = {}
716 for key in vals.keys():
678 for key in vals.keys():
717 if key not in keys:
679 if key not in keys:
718 star[key] = vals[key]
680 star[key] = vals[key]
719 data['*'] = star
681 data['*'] = star
720 else:
682 else:
721 data[k] = vals[k]
683 data[k] = vals[k]
722 result = func(repo, proto, *[data[k] for k in keys])
684 result = func(repo, proto, *[data[k] for k in keys])
723 else:
685 else:
724 result = func(repo, proto)
686 result = func(repo, proto)
725 if isinstance(result, ooberror):
687 if isinstance(result, ooberror):
726 return result
688 return result
727 res.append(escapearg(result))
689 res.append(escapearg(result))
728 return ';'.join(res)
690 return ';'.join(res)
729
691
730 @wireprotocommand('between', 'pairs')
692 @wireprotocommand('between', 'pairs')
731 def between(repo, proto, pairs):
693 def between(repo, proto, pairs):
732 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
694 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
733 r = []
695 r = []
734 for b in repo.between(pairs):
696 for b in repo.between(pairs):
735 r.append(encodelist(b) + "\n")
697 r.append(encodelist(b) + "\n")
736 return "".join(r)
698 return "".join(r)
737
699
738 @wireprotocommand('branchmap')
700 @wireprotocommand('branchmap')
739 def branchmap(repo, proto):
701 def branchmap(repo, proto):
740 branchmap = repo.branchmap()
702 branchmap = repo.branchmap()
741 heads = []
703 heads = []
742 for branch, nodes in branchmap.iteritems():
704 for branch, nodes in branchmap.iteritems():
743 branchname = urlreq.quote(encoding.fromlocal(branch))
705 branchname = urlreq.quote(encoding.fromlocal(branch))
744 branchnodes = encodelist(nodes)
706 branchnodes = encodelist(nodes)
745 heads.append('%s %s' % (branchname, branchnodes))
707 heads.append('%s %s' % (branchname, branchnodes))
746 return '\n'.join(heads)
708 return '\n'.join(heads)
747
709
748 @wireprotocommand('branches', 'nodes')
710 @wireprotocommand('branches', 'nodes')
749 def branches(repo, proto, nodes):
711 def branches(repo, proto, nodes):
750 nodes = decodelist(nodes)
712 nodes = decodelist(nodes)
751 r = []
713 r = []
752 for b in repo.branches(nodes):
714 for b in repo.branches(nodes):
753 r.append(encodelist(b) + "\n")
715 r.append(encodelist(b) + "\n")
754 return "".join(r)
716 return "".join(r)
755
717
756 @wireprotocommand('clonebundles', '')
718 @wireprotocommand('clonebundles', '')
757 def clonebundles(repo, proto):
719 def clonebundles(repo, proto):
758 """Server command for returning info for available bundles to seed clones.
720 """Server command for returning info for available bundles to seed clones.
759
721
760 Clients will parse this response and determine what bundle to fetch.
722 Clients will parse this response and determine what bundle to fetch.
761
723
762 Extensions may wrap this command to filter or dynamically emit data
724 Extensions may wrap this command to filter or dynamically emit data
763 depending on the request. e.g. you could advertise URLs for the closest
725 depending on the request. e.g. you could advertise URLs for the closest
764 data center given the client's IP address.
726 data center given the client's IP address.
765 """
727 """
766 return repo.vfs.tryread('clonebundles.manifest')
728 return repo.vfs.tryread('clonebundles.manifest')
767
729
768 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
730 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
769 'known', 'getbundle', 'unbundlehash', 'batch']
731 'known', 'getbundle', 'unbundlehash', 'batch']
770
732
771 def _capabilities(repo, proto):
733 def _capabilities(repo, proto):
772 """return a list of capabilities for a repo
734 """return a list of capabilities for a repo
773
735
774 This function exists to allow extensions to easily wrap capabilities
736 This function exists to allow extensions to easily wrap capabilities
775 computation
737 computation
776
738
777 - returns a lists: easy to alter
739 - returns a lists: easy to alter
778 - change done here will be propagated to both `capabilities` and `hello`
740 - change done here will be propagated to both `capabilities` and `hello`
779 command without any other action needed.
741 command without any other action needed.
780 """
742 """
781 # copy to prevent modification of the global list
743 # copy to prevent modification of the global list
782 caps = list(wireprotocaps)
744 caps = list(wireprotocaps)
783 if streamclone.allowservergeneration(repo):
745 if streamclone.allowservergeneration(repo):
784 if repo.ui.configbool('server', 'preferuncompressed'):
746 if repo.ui.configbool('server', 'preferuncompressed'):
785 caps.append('stream-preferred')
747 caps.append('stream-preferred')
786 requiredformats = repo.requirements & repo.supportedformats
748 requiredformats = repo.requirements & repo.supportedformats
787 # if our local revlogs are just revlogv1, add 'stream' cap
749 # if our local revlogs are just revlogv1, add 'stream' cap
788 if not requiredformats - {'revlogv1'}:
750 if not requiredformats - {'revlogv1'}:
789 caps.append('stream')
751 caps.append('stream')
790 # otherwise, add 'streamreqs' detailing our local revlog format
752 # otherwise, add 'streamreqs' detailing our local revlog format
791 else:
753 else:
792 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
754 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
793 if repo.ui.configbool('experimental', 'bundle2-advertise'):
755 if repo.ui.configbool('experimental', 'bundle2-advertise'):
794 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
756 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
795 caps.append('bundle2=' + urlreq.quote(capsblob))
757 caps.append('bundle2=' + urlreq.quote(capsblob))
796 caps.append('unbundle=%s' % ','.join(bundle2.bundlepriority))
758 caps.append('unbundle=%s' % ','.join(bundle2.bundlepriority))
797
759
798 if proto.name == 'http':
760 if proto.name == 'http':
799 caps.append('httpheader=%d' %
761 caps.append('httpheader=%d' %
800 repo.ui.configint('server', 'maxhttpheaderlen'))
762 repo.ui.configint('server', 'maxhttpheaderlen'))
801 if repo.ui.configbool('experimental', 'httppostargs'):
763 if repo.ui.configbool('experimental', 'httppostargs'):
802 caps.append('httppostargs')
764 caps.append('httppostargs')
803
765
804 # FUTURE advertise 0.2rx once support is implemented
766 # FUTURE advertise 0.2rx once support is implemented
805 # FUTURE advertise minrx and mintx after consulting config option
767 # FUTURE advertise minrx and mintx after consulting config option
806 caps.append('httpmediatype=0.1rx,0.1tx,0.2tx')
768 caps.append('httpmediatype=0.1rx,0.1tx,0.2tx')
807
769
808 compengines = supportedcompengines(repo.ui, proto, util.SERVERROLE)
770 compengines = supportedcompengines(repo.ui, proto, util.SERVERROLE)
809 if compengines:
771 if compengines:
810 comptypes = ','.join(urlreq.quote(e.wireprotosupport().name)
772 comptypes = ','.join(urlreq.quote(e.wireprotosupport().name)
811 for e in compengines)
773 for e in compengines)
812 caps.append('compression=%s' % comptypes)
774 caps.append('compression=%s' % comptypes)
813
775
814 return caps
776 return caps
815
777
816 # If you are writing an extension and consider wrapping this function. Wrap
778 # If you are writing an extension and consider wrapping this function. Wrap
817 # `_capabilities` instead.
779 # `_capabilities` instead.
818 @wireprotocommand('capabilities')
780 @wireprotocommand('capabilities')
819 def capabilities(repo, proto):
781 def capabilities(repo, proto):
820 return ' '.join(_capabilities(repo, proto))
782 return ' '.join(_capabilities(repo, proto))
821
783
822 @wireprotocommand('changegroup', 'roots')
784 @wireprotocommand('changegroup', 'roots')
823 def changegroup(repo, proto, roots):
785 def changegroup(repo, proto, roots):
824 nodes = decodelist(roots)
786 nodes = decodelist(roots)
825 cg = changegroupmod.changegroup(repo, nodes, 'serve')
787 cg = changegroupmod.changegroup(repo, nodes, 'serve')
826 return streamres(reader=cg, v1compressible=True)
788 return streamres(reader=cg, v1compressible=True)
827
789
828 @wireprotocommand('changegroupsubset', 'bases heads')
790 @wireprotocommand('changegroupsubset', 'bases heads')
829 def changegroupsubset(repo, proto, bases, heads):
791 def changegroupsubset(repo, proto, bases, heads):
830 bases = decodelist(bases)
792 bases = decodelist(bases)
831 heads = decodelist(heads)
793 heads = decodelist(heads)
832 cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve')
794 cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve')
833 return streamres(reader=cg, v1compressible=True)
795 return streamres(reader=cg, v1compressible=True)
834
796
835 @wireprotocommand('debugwireargs', 'one two *')
797 @wireprotocommand('debugwireargs', 'one two *')
836 def debugwireargs(repo, proto, one, two, others):
798 def debugwireargs(repo, proto, one, two, others):
837 # only accept optional args from the known set
799 # only accept optional args from the known set
838 opts = options('debugwireargs', ['three', 'four'], others)
800 opts = options('debugwireargs', ['three', 'four'], others)
839 return repo.debugwireargs(one, two, **opts)
801 return repo.debugwireargs(one, two, **opts)
840
802
841 @wireprotocommand('getbundle', '*')
803 @wireprotocommand('getbundle', '*')
842 def getbundle(repo, proto, others):
804 def getbundle(repo, proto, others):
843 opts = options('getbundle', gboptsmap.keys(), others)
805 opts = options('getbundle', gboptsmap.keys(), others)
844 for k, v in opts.iteritems():
806 for k, v in opts.iteritems():
845 keytype = gboptsmap[k]
807 keytype = gboptsmap[k]
846 if keytype == 'nodes':
808 if keytype == 'nodes':
847 opts[k] = decodelist(v)
809 opts[k] = decodelist(v)
848 elif keytype == 'csv':
810 elif keytype == 'csv':
849 opts[k] = list(v.split(','))
811 opts[k] = list(v.split(','))
850 elif keytype == 'scsv':
812 elif keytype == 'scsv':
851 opts[k] = set(v.split(','))
813 opts[k] = set(v.split(','))
852 elif keytype == 'boolean':
814 elif keytype == 'boolean':
853 # Client should serialize False as '0', which is a non-empty string
815 # Client should serialize False as '0', which is a non-empty string
854 # so it evaluates as a True bool.
816 # so it evaluates as a True bool.
855 if v == '0':
817 if v == '0':
856 opts[k] = False
818 opts[k] = False
857 else:
819 else:
858 opts[k] = bool(v)
820 opts[k] = bool(v)
859 elif keytype != 'plain':
821 elif keytype != 'plain':
860 raise KeyError('unknown getbundle option type %s'
822 raise KeyError('unknown getbundle option type %s'
861 % keytype)
823 % keytype)
862
824
863 if not bundle1allowed(repo, 'pull'):
825 if not bundle1allowed(repo, 'pull'):
864 if not exchange.bundle2requested(opts.get('bundlecaps')):
826 if not exchange.bundle2requested(opts.get('bundlecaps')):
865 if proto.name == 'http':
827 if proto.name == 'http':
866 return ooberror(bundle2required)
828 return ooberror(bundle2required)
867 raise error.Abort(bundle2requiredmain,
829 raise error.Abort(bundle2requiredmain,
868 hint=bundle2requiredhint)
830 hint=bundle2requiredhint)
869
831
870 try:
832 try:
871 if repo.ui.configbool('server', 'disablefullbundle'):
833 if repo.ui.configbool('server', 'disablefullbundle'):
872 # Check to see if this is a full clone.
834 # Check to see if this is a full clone.
873 clheads = set(repo.changelog.heads())
835 clheads = set(repo.changelog.heads())
874 heads = set(opts.get('heads', set()))
836 heads = set(opts.get('heads', set()))
875 common = set(opts.get('common', set()))
837 common = set(opts.get('common', set()))
876 common.discard(nullid)
838 common.discard(nullid)
877 if not common and clheads == heads:
839 if not common and clheads == heads:
878 raise error.Abort(
840 raise error.Abort(
879 _('server has pull-based clones disabled'),
841 _('server has pull-based clones disabled'),
880 hint=_('remove --pull if specified or upgrade Mercurial'))
842 hint=_('remove --pull if specified or upgrade Mercurial'))
881
843
882 chunks = exchange.getbundlechunks(repo, 'serve', **opts)
844 chunks = exchange.getbundlechunks(repo, 'serve', **opts)
883 except error.Abort as exc:
845 except error.Abort as exc:
884 # cleanly forward Abort error to the client
846 # cleanly forward Abort error to the client
885 if not exchange.bundle2requested(opts.get('bundlecaps')):
847 if not exchange.bundle2requested(opts.get('bundlecaps')):
886 if proto.name == 'http':
848 if proto.name == 'http':
887 return ooberror(str(exc) + '\n')
849 return ooberror(str(exc) + '\n')
888 raise # cannot do better for bundle1 + ssh
850 raise # cannot do better for bundle1 + ssh
889 # bundle2 request expect a bundle2 reply
851 # bundle2 request expect a bundle2 reply
890 bundler = bundle2.bundle20(repo.ui)
852 bundler = bundle2.bundle20(repo.ui)
891 manargs = [('message', str(exc))]
853 manargs = [('message', str(exc))]
892 advargs = []
854 advargs = []
893 if exc.hint is not None:
855 if exc.hint is not None:
894 advargs.append(('hint', exc.hint))
856 advargs.append(('hint', exc.hint))
895 bundler.addpart(bundle2.bundlepart('error:abort',
857 bundler.addpart(bundle2.bundlepart('error:abort',
896 manargs, advargs))
858 manargs, advargs))
897 return streamres(gen=bundler.getchunks(), v1compressible=True)
859 return streamres(gen=bundler.getchunks(), v1compressible=True)
898 return streamres(gen=chunks, v1compressible=True)
860 return streamres(gen=chunks, v1compressible=True)
899
861
900 @wireprotocommand('heads')
862 @wireprotocommand('heads')
901 def heads(repo, proto):
863 def heads(repo, proto):
902 h = repo.heads()
864 h = repo.heads()
903 return encodelist(h) + "\n"
865 return encodelist(h) + "\n"
904
866
905 @wireprotocommand('hello')
867 @wireprotocommand('hello')
906 def hello(repo, proto):
868 def hello(repo, proto):
907 '''the hello command returns a set of lines describing various
869 '''the hello command returns a set of lines describing various
908 interesting things about the server, in an RFC822-like format.
870 interesting things about the server, in an RFC822-like format.
909 Currently the only one defined is "capabilities", which
871 Currently the only one defined is "capabilities", which
910 consists of a line in the form:
872 consists of a line in the form:
911
873
912 capabilities: space separated list of tokens
874 capabilities: space separated list of tokens
913 '''
875 '''
914 return "capabilities: %s\n" % (capabilities(repo, proto))
876 return "capabilities: %s\n" % (capabilities(repo, proto))
915
877
916 @wireprotocommand('listkeys', 'namespace')
878 @wireprotocommand('listkeys', 'namespace')
917 def listkeys(repo, proto, namespace):
879 def listkeys(repo, proto, namespace):
918 d = repo.listkeys(encoding.tolocal(namespace)).items()
880 d = repo.listkeys(encoding.tolocal(namespace)).items()
919 return pushkeymod.encodekeys(d)
881 return pushkeymod.encodekeys(d)
920
882
921 @wireprotocommand('lookup', 'key')
883 @wireprotocommand('lookup', 'key')
922 def lookup(repo, proto, key):
884 def lookup(repo, proto, key):
923 try:
885 try:
924 k = encoding.tolocal(key)
886 k = encoding.tolocal(key)
925 c = repo[k]
887 c = repo[k]
926 r = c.hex()
888 r = c.hex()
927 success = 1
889 success = 1
928 except Exception as inst:
890 except Exception as inst:
929 r = str(inst)
891 r = str(inst)
930 success = 0
892 success = 0
931 return "%s %s\n" % (success, r)
893 return "%s %s\n" % (success, r)
932
894
933 @wireprotocommand('known', 'nodes *')
895 @wireprotocommand('known', 'nodes *')
934 def known(repo, proto, nodes, others):
896 def known(repo, proto, nodes, others):
935 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
897 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
936
898
937 @wireprotocommand('pushkey', 'namespace key old new')
899 @wireprotocommand('pushkey', 'namespace key old new')
938 def pushkey(repo, proto, namespace, key, old, new):
900 def pushkey(repo, proto, namespace, key, old, new):
939 # compatibility with pre-1.8 clients which were accidentally
901 # compatibility with pre-1.8 clients which were accidentally
940 # sending raw binary nodes rather than utf-8-encoded hex
902 # sending raw binary nodes rather than utf-8-encoded hex
941 if len(new) == 20 and util.escapestr(new) != new:
903 if len(new) == 20 and util.escapestr(new) != new:
942 # looks like it could be a binary node
904 # looks like it could be a binary node
943 try:
905 try:
944 new.decode('utf-8')
906 new.decode('utf-8')
945 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
907 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
946 except UnicodeDecodeError:
908 except UnicodeDecodeError:
947 pass # binary, leave unmodified
909 pass # binary, leave unmodified
948 else:
910 else:
949 new = encoding.tolocal(new) # normal path
911 new = encoding.tolocal(new) # normal path
950
912
951 if util.safehasattr(proto, 'restore'):
913 if util.safehasattr(proto, 'restore'):
952
914
953 proto.redirect()
915 proto.redirect()
954
916
955 try:
917 try:
956 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
918 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
957 encoding.tolocal(old), new) or False
919 encoding.tolocal(old), new) or False
958 except error.Abort:
920 except error.Abort:
959 r = False
921 r = False
960
922
961 output = proto.restore()
923 output = proto.restore()
962
924
963 return '%s\n%s' % (int(r), output)
925 return '%s\n%s' % (int(r), output)
964
926
965 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
927 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
966 encoding.tolocal(old), new)
928 encoding.tolocal(old), new)
967 return '%s\n' % int(r)
929 return '%s\n' % int(r)
968
930
969 @wireprotocommand('stream_out')
931 @wireprotocommand('stream_out')
970 def stream(repo, proto):
932 def stream(repo, proto):
971 '''If the server supports streaming clone, it advertises the "stream"
933 '''If the server supports streaming clone, it advertises the "stream"
972 capability with a value representing the version and flags of the repo
934 capability with a value representing the version and flags of the repo
973 it is serving. Client checks to see if it understands the format.
935 it is serving. Client checks to see if it understands the format.
974 '''
936 '''
975 if not streamclone.allowservergeneration(repo):
937 if not streamclone.allowservergeneration(repo):
976 return '1\n'
938 return '1\n'
977
939
978 def getstream(it):
940 def getstream(it):
979 yield '0\n'
941 yield '0\n'
980 for chunk in it:
942 for chunk in it:
981 yield chunk
943 yield chunk
982
944
983 try:
945 try:
984 # LockError may be raised before the first result is yielded. Don't
946 # LockError may be raised before the first result is yielded. Don't
985 # emit output until we're sure we got the lock successfully.
947 # emit output until we're sure we got the lock successfully.
986 it = streamclone.generatev1wireproto(repo)
948 it = streamclone.generatev1wireproto(repo)
987 return streamres(gen=getstream(it))
949 return streamres(gen=getstream(it))
988 except error.LockError:
950 except error.LockError:
989 return '2\n'
951 return '2\n'
990
952
991 @wireprotocommand('unbundle', 'heads')
953 @wireprotocommand('unbundle', 'heads')
992 def unbundle(repo, proto, heads):
954 def unbundle(repo, proto, heads):
993 their_heads = decodelist(heads)
955 their_heads = decodelist(heads)
994
956
995 try:
957 try:
996 proto.redirect()
958 proto.redirect()
997
959
998 exchange.check_heads(repo, their_heads, 'preparing changes')
960 exchange.check_heads(repo, their_heads, 'preparing changes')
999
961
1000 # write bundle data to temporary file because it can be big
962 # write bundle data to temporary file because it can be big
1001 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
963 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
1002 fp = os.fdopen(fd, pycompat.sysstr('wb+'))
964 fp = os.fdopen(fd, pycompat.sysstr('wb+'))
1003 r = 0
965 r = 0
1004 try:
966 try:
1005 proto.getfile(fp)
967 proto.getfile(fp)
1006 fp.seek(0)
968 fp.seek(0)
1007 gen = exchange.readbundle(repo.ui, fp, None)
969 gen = exchange.readbundle(repo.ui, fp, None)
1008 if (isinstance(gen, changegroupmod.cg1unpacker)
970 if (isinstance(gen, changegroupmod.cg1unpacker)
1009 and not bundle1allowed(repo, 'push')):
971 and not bundle1allowed(repo, 'push')):
1010 if proto.name == 'http':
972 if proto.name == 'http':
1011 # need to special case http because stderr do not get to
973 # need to special case http because stderr do not get to
1012 # the http client on failed push so we need to abuse some
974 # the http client on failed push so we need to abuse some
1013 # other error type to make sure the message get to the
975 # other error type to make sure the message get to the
1014 # user.
976 # user.
1015 return ooberror(bundle2required)
977 return ooberror(bundle2required)
1016 raise error.Abort(bundle2requiredmain,
978 raise error.Abort(bundle2requiredmain,
1017 hint=bundle2requiredhint)
979 hint=bundle2requiredhint)
1018
980
1019 r = exchange.unbundle(repo, gen, their_heads, 'serve',
981 r = exchange.unbundle(repo, gen, their_heads, 'serve',
1020 proto._client())
982 proto._client())
1021 if util.safehasattr(r, 'addpart'):
983 if util.safehasattr(r, 'addpart'):
1022 # The return looks streamable, we are in the bundle2 case and
984 # The return looks streamable, we are in the bundle2 case and
1023 # should return a stream.
985 # should return a stream.
1024 return streamres(gen=r.getchunks())
986 return streamres(gen=r.getchunks())
1025 return pushres(r)
987 return pushres(r)
1026
988
1027 finally:
989 finally:
1028 fp.close()
990 fp.close()
1029 os.unlink(tempname)
991 os.unlink(tempname)
1030
992
1031 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
993 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
1032 # handle non-bundle2 case first
994 # handle non-bundle2 case first
1033 if not getattr(exc, 'duringunbundle2', False):
995 if not getattr(exc, 'duringunbundle2', False):
1034 try:
996 try:
1035 raise
997 raise
1036 except error.Abort:
998 except error.Abort:
1037 # The old code we moved used util.stderr directly.
999 # The old code we moved used util.stderr directly.
1038 # We did not change it to minimise code change.
1000 # We did not change it to minimise code change.
1039 # This need to be moved to something proper.
1001 # This need to be moved to something proper.
1040 # Feel free to do it.
1002 # Feel free to do it.
1041 util.stderr.write("abort: %s\n" % exc)
1003 util.stderr.write("abort: %s\n" % exc)
1042 if exc.hint is not None:
1004 if exc.hint is not None:
1043 util.stderr.write("(%s)\n" % exc.hint)
1005 util.stderr.write("(%s)\n" % exc.hint)
1044 return pushres(0)
1006 return pushres(0)
1045 except error.PushRaced:
1007 except error.PushRaced:
1046 return pusherr(str(exc))
1008 return pusherr(str(exc))
1047
1009
1048 bundler = bundle2.bundle20(repo.ui)
1010 bundler = bundle2.bundle20(repo.ui)
1049 for out in getattr(exc, '_bundle2salvagedoutput', ()):
1011 for out in getattr(exc, '_bundle2salvagedoutput', ()):
1050 bundler.addpart(out)
1012 bundler.addpart(out)
1051 try:
1013 try:
1052 try:
1014 try:
1053 raise
1015 raise
1054 except error.PushkeyFailed as exc:
1016 except error.PushkeyFailed as exc:
1055 # check client caps
1017 # check client caps
1056 remotecaps = getattr(exc, '_replycaps', None)
1018 remotecaps = getattr(exc, '_replycaps', None)
1057 if (remotecaps is not None
1019 if (remotecaps is not None
1058 and 'pushkey' not in remotecaps.get('error', ())):
1020 and 'pushkey' not in remotecaps.get('error', ())):
1059 # no support remote side, fallback to Abort handler.
1021 # no support remote side, fallback to Abort handler.
1060 raise
1022 raise
1061 part = bundler.newpart('error:pushkey')
1023 part = bundler.newpart('error:pushkey')
1062 part.addparam('in-reply-to', exc.partid)
1024 part.addparam('in-reply-to', exc.partid)
1063 if exc.namespace is not None:
1025 if exc.namespace is not None:
1064 part.addparam('namespace', exc.namespace, mandatory=False)
1026 part.addparam('namespace', exc.namespace, mandatory=False)
1065 if exc.key is not None:
1027 if exc.key is not None:
1066 part.addparam('key', exc.key, mandatory=False)
1028 part.addparam('key', exc.key, mandatory=False)
1067 if exc.new is not None:
1029 if exc.new is not None:
1068 part.addparam('new', exc.new, mandatory=False)
1030 part.addparam('new', exc.new, mandatory=False)
1069 if exc.old is not None:
1031 if exc.old is not None:
1070 part.addparam('old', exc.old, mandatory=False)
1032 part.addparam('old', exc.old, mandatory=False)
1071 if exc.ret is not None:
1033 if exc.ret is not None:
1072 part.addparam('ret', exc.ret, mandatory=False)
1034 part.addparam('ret', exc.ret, mandatory=False)
1073 except error.BundleValueError as exc:
1035 except error.BundleValueError as exc:
1074 errpart = bundler.newpart('error:unsupportedcontent')
1036 errpart = bundler.newpart('error:unsupportedcontent')
1075 if exc.parttype is not None:
1037 if exc.parttype is not None:
1076 errpart.addparam('parttype', exc.parttype)
1038 errpart.addparam('parttype', exc.parttype)
1077 if exc.params:
1039 if exc.params:
1078 errpart.addparam('params', '\0'.join(exc.params))
1040 errpart.addparam('params', '\0'.join(exc.params))
1079 except error.Abort as exc:
1041 except error.Abort as exc:
1080 manargs = [('message', str(exc))]
1042 manargs = [('message', str(exc))]
1081 advargs = []
1043 advargs = []
1082 if exc.hint is not None:
1044 if exc.hint is not None:
1083 advargs.append(('hint', exc.hint))
1045 advargs.append(('hint', exc.hint))
1084 bundler.addpart(bundle2.bundlepart('error:abort',
1046 bundler.addpart(bundle2.bundlepart('error:abort',
1085 manargs, advargs))
1047 manargs, advargs))
1086 except error.PushRaced as exc:
1048 except error.PushRaced as exc:
1087 bundler.newpart('error:pushraced', [('message', str(exc))])
1049 bundler.newpart('error:pushraced', [('message', str(exc))])
1088 return streamres(gen=bundler.getchunks())
1050 return streamres(gen=bundler.getchunks())
@@ -1,61 +1,61 b''
1 from __future__ import absolute_import, print_function
1 from __future__ import absolute_import, print_function
2
2
3 from mercurial import (
3 from mercurial import (
4 util,
4 util,
5 wireproto,
5 wireproto,
6 )
6 )
7 stringio = util.stringio
7 stringio = util.stringio
8
8
9 class proto(object):
9 class proto(object):
10 def __init__(self, args):
10 def __init__(self, args):
11 self.args = args
11 self.args = args
12 def getargs(self, spec):
12 def getargs(self, spec):
13 args = self.args
13 args = self.args
14 args.setdefault('*', {})
14 args.setdefault('*', {})
15 names = spec.split()
15 names = spec.split()
16 return [args[n] for n in names]
16 return [args[n] for n in names]
17
17
18 class clientpeer(wireproto.wirepeer):
18 class clientpeer(wireproto.wirepeer):
19 def __init__(self, serverrepo):
19 def __init__(self, serverrepo):
20 self.serverrepo = serverrepo
20 self.serverrepo = serverrepo
21
21
22 def _capabilities(self):
22 def _capabilities(self):
23 return ['batch']
23 return ['batch']
24
24
25 def _call(self, cmd, **args):
25 def _call(self, cmd, **args):
26 return wireproto.dispatch(self.serverrepo, proto(args), cmd)
26 return wireproto.dispatch(self.serverrepo, proto(args), cmd)
27
27
28 def _callstream(self, cmd, **args):
28 def _callstream(self, cmd, **args):
29 return stringio(self._call(cmd, **args))
29 return stringio(self._call(cmd, **args))
30
30
31 @wireproto.batchable
31 @wireproto.batchable
32 def greet(self, name):
32 def greet(self, name):
33 f = wireproto.future()
33 f = wireproto.future()
34 yield {'name': mangle(name)}, f
34 yield {'name': mangle(name)}, f
35 yield unmangle(f.value)
35 yield unmangle(f.value)
36
36
37 class serverrepo(object):
37 class serverrepo(object):
38 def greet(self, name):
38 def greet(self, name):
39 return "Hello, " + name
39 return "Hello, " + name
40
40
41 def filtered(self, name):
41 def filtered(self, name):
42 return self
42 return self
43
43
44 def mangle(s):
44 def mangle(s):
45 return ''.join(chr(ord(c) + 1) for c in s)
45 return ''.join(chr(ord(c) + 1) for c in s)
46 def unmangle(s):
46 def unmangle(s):
47 return ''.join(chr(ord(c) - 1) for c in s)
47 return ''.join(chr(ord(c) - 1) for c in s)
48
48
49 def greet(repo, proto, name):
49 def greet(repo, proto, name):
50 return mangle(repo.greet(unmangle(name)))
50 return mangle(repo.greet(unmangle(name)))
51
51
52 wireproto.commands['greet'] = (greet, 'name',)
52 wireproto.commands['greet'] = (greet, 'name',)
53
53
54 srv = serverrepo()
54 srv = serverrepo()
55 clt = clientpeer(srv)
55 clt = clientpeer(srv)
56
56
57 print(clt.greet("Foobar"))
57 print(clt.greet("Foobar"))
58 b = clt.batch()
58 b = clt.iterbatch()
59 fs = [b.greet(s) for s in ["Fo, =;:<o", "Bar"]]
59 map(b.greet, ('Fo, =;:<o', 'Bar'))
60 b.submit()
60 b.submit()
61 print([f.value for f in fs])
61 print([r for r in b.results()])
General Comments 0
You need to be logged in to leave comments. Login now