##// END OF EJS Templates
batching: migrate basic noop batching into peer.peer...
Augie Fackler -
r25912:cbbdd085 default
parent child Browse files
Show More
@@ -1,46 +1,122
1 # peer.py - repository base classes for mercurial
1 # peer.py - repository base classes for mercurial
2 #
2 #
3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
4 # Copyright 2006 Vadim Gelfer <vadim.gelfer@gmail.com>
4 # Copyright 2006 Vadim Gelfer <vadim.gelfer@gmail.com>
5 #
5 #
6 # This software may be used and distributed according to the terms of the
6 # This software may be used and distributed according to the terms of the
7 # GNU General Public License version 2 or any later version.
7 # GNU General Public License version 2 or any later version.
8
8
9 from i18n import _
9 from i18n import _
10 import error
10 import error
11 import util
12
13 # abstract batching support
14
15 class future(object):
16 '''placeholder for a value to be set later'''
17 def set(self, value):
18 if util.safehasattr(self, 'value'):
19 raise error.RepoError("future is already set")
20 self.value = value
21
22 class batcher(object):
23 '''base class for batches of commands submittable in a single request
24
25 All methods invoked on instances of this class are simply queued and
26 return a a future for the result. Once you call submit(), all the queued
27 calls are performed and the results set in their respective futures.
28 '''
29 def __init__(self):
30 self.calls = []
31 def __getattr__(self, name):
32 def call(*args, **opts):
33 resref = future()
34 self.calls.append((name, args, opts, resref,))
35 return resref
36 return call
37 def submit(self):
38 pass
39
40 class localbatch(batcher):
41 '''performs the queued calls directly'''
42 def __init__(self, local):
43 batcher.__init__(self)
44 self.local = local
45 def submit(self):
46 for name, args, opts, resref in self.calls:
47 resref.set(getattr(self.local, name)(*args, **opts))
48
49 def batchable(f):
50 '''annotation for batchable methods
51
52 Such methods must implement a coroutine as follows:
53
54 @batchable
55 def sample(self, one, two=None):
56 # Handle locally computable results first:
57 if not one:
58 yield "a local result", None
59 # Build list of encoded arguments suitable for your wire protocol:
60 encargs = [('one', encode(one),), ('two', encode(two),)]
61 # Create future for injection of encoded result:
62 encresref = future()
63 # Return encoded arguments and future:
64 yield encargs, encresref
65 # Assuming the future to be filled with the result from the batched
66 # request now. Decode it:
67 yield decode(encresref.value)
68
69 The decorator returns a function which wraps this coroutine as a plain
70 method, but adds the original method as an attribute called "batchable",
71 which is used by remotebatch to split the call into separate encoding and
72 decoding phases.
73 '''
74 def plain(*args, **opts):
75 batchable = f(*args, **opts)
76 encargsorres, encresref = batchable.next()
77 if not encresref:
78 return encargsorres # a local result in this case
79 self = args[0]
80 encresref.set(self._submitone(f.func_name, encargsorres))
81 return batchable.next()
82 setattr(plain, 'batchable', f)
83 return plain
11
84
12 class peerrepository(object):
85 class peerrepository(object):
13
86
87 def batch(self):
88 return localbatch(self)
89
14 def capable(self, name):
90 def capable(self, name):
15 '''tell whether repo supports named capability.
91 '''tell whether repo supports named capability.
16 return False if not supported.
92 return False if not supported.
17 if boolean capability, return True.
93 if boolean capability, return True.
18 if string capability, return string.'''
94 if string capability, return string.'''
19 caps = self._capabilities()
95 caps = self._capabilities()
20 if name in caps:
96 if name in caps:
21 return True
97 return True
22 name_eq = name + '='
98 name_eq = name + '='
23 for cap in caps:
99 for cap in caps:
24 if cap.startswith(name_eq):
100 if cap.startswith(name_eq):
25 return cap[len(name_eq):]
101 return cap[len(name_eq):]
26 return False
102 return False
27
103
28 def requirecap(self, name, purpose):
104 def requirecap(self, name, purpose):
29 '''raise an exception if the given capability is not present'''
105 '''raise an exception if the given capability is not present'''
30 if not self.capable(name):
106 if not self.capable(name):
31 raise error.CapabilityError(
107 raise error.CapabilityError(
32 _('cannot %s; remote repository does not '
108 _('cannot %s; remote repository does not '
33 'support the %r capability') % (purpose, name))
109 'support the %r capability') % (purpose, name))
34
110
35 def local(self):
111 def local(self):
36 '''return peer as a localrepo, or None'''
112 '''return peer as a localrepo, or None'''
37 return None
113 return None
38
114
39 def peer(self):
115 def peer(self):
40 return self
116 return self
41
117
42 def canpush(self):
118 def canpush(self):
43 return True
119 return True
44
120
45 def close(self):
121 def close(self):
46 pass
122 pass
@@ -1,859 +1,792
1 # wireproto.py - generic wire protocol support functions
1 # wireproto.py - generic wire protocol support functions
2 #
2 #
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 import urllib, tempfile, os, sys
8 import urllib, tempfile, os, sys
9 from i18n import _
9 from i18n import _
10 from node import bin, hex
10 from node import bin, hex
11 import changegroup as changegroupmod, bundle2, pushkey as pushkeymod
11 import changegroup as changegroupmod, bundle2, pushkey as pushkeymod
12 import peer, error, encoding, util, exchange
12 import peer, error, encoding, util, exchange
13
13
14
14
15 class abstractserverproto(object):
15 class abstractserverproto(object):
16 """abstract class that summarizes the protocol API
16 """abstract class that summarizes the protocol API
17
17
18 Used as reference and documentation.
18 Used as reference and documentation.
19 """
19 """
20
20
21 def getargs(self, args):
21 def getargs(self, args):
22 """return the value for arguments in <args>
22 """return the value for arguments in <args>
23
23
24 returns a list of values (same order as <args>)"""
24 returns a list of values (same order as <args>)"""
25 raise NotImplementedError()
25 raise NotImplementedError()
26
26
27 def getfile(self, fp):
27 def getfile(self, fp):
28 """write the whole content of a file into a file like object
28 """write the whole content of a file into a file like object
29
29
30 The file is in the form::
30 The file is in the form::
31
31
32 (<chunk-size>\n<chunk>)+0\n
32 (<chunk-size>\n<chunk>)+0\n
33
33
34 chunk size is the ascii version of the int.
34 chunk size is the ascii version of the int.
35 """
35 """
36 raise NotImplementedError()
36 raise NotImplementedError()
37
37
38 def redirect(self):
38 def redirect(self):
39 """may setup interception for stdout and stderr
39 """may setup interception for stdout and stderr
40
40
41 See also the `restore` method."""
41 See also the `restore` method."""
42 raise NotImplementedError()
42 raise NotImplementedError()
43
43
44 # If the `redirect` function does install interception, the `restore`
44 # If the `redirect` function does install interception, the `restore`
45 # function MUST be defined. If interception is not used, this function
45 # function MUST be defined. If interception is not used, this function
46 # MUST NOT be defined.
46 # MUST NOT be defined.
47 #
47 #
48 # left commented here on purpose
48 # left commented here on purpose
49 #
49 #
50 #def restore(self):
50 #def restore(self):
51 # """reinstall previous stdout and stderr and return intercepted stdout
51 # """reinstall previous stdout and stderr and return intercepted stdout
52 # """
52 # """
53 # raise NotImplementedError()
53 # raise NotImplementedError()
54
54
55 def groupchunks(self, cg):
55 def groupchunks(self, cg):
56 """return 4096 chunks from a changegroup object
56 """return 4096 chunks from a changegroup object
57
57
58 Some protocols may have compressed the contents."""
58 Some protocols may have compressed the contents."""
59 raise NotImplementedError()
59 raise NotImplementedError()
60
60
61 # abstract batching support
61 class remotebatch(peer.batcher):
62
63 class future(object):
64 '''placeholder for a value to be set later'''
65 def set(self, value):
66 if util.safehasattr(self, 'value'):
67 raise error.RepoError("future is already set")
68 self.value = value
69
70 class batcher(object):
71 '''base class for batches of commands submittable in a single request
72
73 All methods invoked on instances of this class are simply queued and
74 return a a future for the result. Once you call submit(), all the queued
75 calls are performed and the results set in their respective futures.
76 '''
77 def __init__(self):
78 self.calls = []
79 def __getattr__(self, name):
80 def call(*args, **opts):
81 resref = future()
82 self.calls.append((name, args, opts, resref,))
83 return resref
84 return call
85 def submit(self):
86 pass
87
88 class localbatch(batcher):
89 '''performs the queued calls directly'''
90 def __init__(self, local):
91 batcher.__init__(self)
92 self.local = local
93 def submit(self):
94 for name, args, opts, resref in self.calls:
95 resref.set(getattr(self.local, name)(*args, **opts))
96
97 class remotebatch(batcher):
98 '''batches the queued calls; uses as few roundtrips as possible'''
62 '''batches the queued calls; uses as few roundtrips as possible'''
99 def __init__(self, remote):
63 def __init__(self, remote):
100 '''remote must support _submitbatch(encbatch) and
64 '''remote must support _submitbatch(encbatch) and
101 _submitone(op, encargs)'''
65 _submitone(op, encargs)'''
102 batcher.__init__(self)
66 peer.batcher.__init__(self)
103 self.remote = remote
67 self.remote = remote
104 def submit(self):
68 def submit(self):
105 req, rsp = [], []
69 req, rsp = [], []
106 for name, args, opts, resref in self.calls:
70 for name, args, opts, resref in self.calls:
107 mtd = getattr(self.remote, name)
71 mtd = getattr(self.remote, name)
108 batchablefn = getattr(mtd, 'batchable', None)
72 batchablefn = getattr(mtd, 'batchable', None)
109 if batchablefn is not None:
73 if batchablefn is not None:
110 batchable = batchablefn(mtd.im_self, *args, **opts)
74 batchable = batchablefn(mtd.im_self, *args, **opts)
111 encargsorres, encresref = batchable.next()
75 encargsorres, encresref = batchable.next()
112 if encresref:
76 if encresref:
113 req.append((name, encargsorres,))
77 req.append((name, encargsorres,))
114 rsp.append((batchable, encresref, resref,))
78 rsp.append((batchable, encresref, resref,))
115 else:
79 else:
116 resref.set(encargsorres)
80 resref.set(encargsorres)
117 else:
81 else:
118 if req:
82 if req:
119 self._submitreq(req, rsp)
83 self._submitreq(req, rsp)
120 req, rsp = [], []
84 req, rsp = [], []
121 resref.set(mtd(*args, **opts))
85 resref.set(mtd(*args, **opts))
122 if req:
86 if req:
123 self._submitreq(req, rsp)
87 self._submitreq(req, rsp)
124 def _submitreq(self, req, rsp):
88 def _submitreq(self, req, rsp):
125 encresults = self.remote._submitbatch(req)
89 encresults = self.remote._submitbatch(req)
126 for encres, r in zip(encresults, rsp):
90 for encres, r in zip(encresults, rsp):
127 batchable, encresref, resref = r
91 batchable, encresref, resref = r
128 encresref.set(encres)
92 encresref.set(encres)
129 resref.set(batchable.next())
93 resref.set(batchable.next())
130
94
131 def batchable(f):
95 # Forward a couple of names from peer to make wireproto interactions
132 '''annotation for batchable methods
96 # slightly more sensible.
133
97 batchable = peer.batchable
134 Such methods must implement a coroutine as follows:
98 future = peer.future
135
136 @batchable
137 def sample(self, one, two=None):
138 # Handle locally computable results first:
139 if not one:
140 yield "a local result", None
141 # Build list of encoded arguments suitable for your wire protocol:
142 encargs = [('one', encode(one),), ('two', encode(two),)]
143 # Create future for injection of encoded result:
144 encresref = future()
145 # Return encoded arguments and future:
146 yield encargs, encresref
147 # Assuming the future to be filled with the result from the batched
148 # request now. Decode it:
149 yield decode(encresref.value)
150
151 The decorator returns a function which wraps this coroutine as a plain
152 method, but adds the original method as an attribute called "batchable",
153 which is used by remotebatch to split the call into separate encoding and
154 decoding phases.
155 '''
156 def plain(*args, **opts):
157 batchable = f(*args, **opts)
158 encargsorres, encresref = batchable.next()
159 if not encresref:
160 return encargsorres # a local result in this case
161 self = args[0]
162 encresref.set(self._submitone(f.func_name, encargsorres))
163 return batchable.next()
164 setattr(plain, 'batchable', f)
165 return plain
166
99
167 # list of nodes encoding / decoding
100 # list of nodes encoding / decoding
168
101
169 def decodelist(l, sep=' '):
102 def decodelist(l, sep=' '):
170 if l:
103 if l:
171 return map(bin, l.split(sep))
104 return map(bin, l.split(sep))
172 return []
105 return []
173
106
174 def encodelist(l, sep=' '):
107 def encodelist(l, sep=' '):
175 try:
108 try:
176 return sep.join(map(hex, l))
109 return sep.join(map(hex, l))
177 except TypeError:
110 except TypeError:
178 raise
111 raise
179
112
180 # batched call argument encoding
113 # batched call argument encoding
181
114
182 def escapearg(plain):
115 def escapearg(plain):
183 return (plain
116 return (plain
184 .replace(':', ':c')
117 .replace(':', ':c')
185 .replace(',', ':o')
118 .replace(',', ':o')
186 .replace(';', ':s')
119 .replace(';', ':s')
187 .replace('=', ':e'))
120 .replace('=', ':e'))
188
121
189 def unescapearg(escaped):
122 def unescapearg(escaped):
190 return (escaped
123 return (escaped
191 .replace(':e', '=')
124 .replace(':e', '=')
192 .replace(':s', ';')
125 .replace(':s', ';')
193 .replace(':o', ',')
126 .replace(':o', ',')
194 .replace(':c', ':'))
127 .replace(':c', ':'))
195
128
196 # mapping of options accepted by getbundle and their types
129 # mapping of options accepted by getbundle and their types
197 #
130 #
198 # Meant to be extended by extensions. It is extensions responsibility to ensure
131 # Meant to be extended by extensions. It is extensions responsibility to ensure
199 # such options are properly processed in exchange.getbundle.
132 # such options are properly processed in exchange.getbundle.
200 #
133 #
201 # supported types are:
134 # supported types are:
202 #
135 #
203 # :nodes: list of binary nodes
136 # :nodes: list of binary nodes
204 # :csv: list of comma-separated values
137 # :csv: list of comma-separated values
205 # :scsv: list of comma-separated values return as set
138 # :scsv: list of comma-separated values return as set
206 # :plain: string with no transformation needed.
139 # :plain: string with no transformation needed.
207 gboptsmap = {'heads': 'nodes',
140 gboptsmap = {'heads': 'nodes',
208 'common': 'nodes',
141 'common': 'nodes',
209 'obsmarkers': 'boolean',
142 'obsmarkers': 'boolean',
210 'bundlecaps': 'scsv',
143 'bundlecaps': 'scsv',
211 'listkeys': 'csv',
144 'listkeys': 'csv',
212 'cg': 'boolean'}
145 'cg': 'boolean'}
213
146
214 # client side
147 # client side
215
148
216 class wirepeer(peer.peerrepository):
149 class wirepeer(peer.peerrepository):
217
150
218 def batch(self):
151 def batch(self):
219 return remotebatch(self)
152 return remotebatch(self)
220 def _submitbatch(self, req):
153 def _submitbatch(self, req):
221 cmds = []
154 cmds = []
222 for op, argsdict in req:
155 for op, argsdict in req:
223 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
156 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
224 for k, v in argsdict.iteritems())
157 for k, v in argsdict.iteritems())
225 cmds.append('%s %s' % (op, args))
158 cmds.append('%s %s' % (op, args))
226 rsp = self._call("batch", cmds=';'.join(cmds))
159 rsp = self._call("batch", cmds=';'.join(cmds))
227 return [unescapearg(r) for r in rsp.split(';')]
160 return [unescapearg(r) for r in rsp.split(';')]
228 def _submitone(self, op, args):
161 def _submitone(self, op, args):
229 return self._call(op, **args)
162 return self._call(op, **args)
230
163
231 @batchable
164 @batchable
232 def lookup(self, key):
165 def lookup(self, key):
233 self.requirecap('lookup', _('look up remote revision'))
166 self.requirecap('lookup', _('look up remote revision'))
234 f = future()
167 f = future()
235 yield {'key': encoding.fromlocal(key)}, f
168 yield {'key': encoding.fromlocal(key)}, f
236 d = f.value
169 d = f.value
237 success, data = d[:-1].split(" ", 1)
170 success, data = d[:-1].split(" ", 1)
238 if int(success):
171 if int(success):
239 yield bin(data)
172 yield bin(data)
240 self._abort(error.RepoError(data))
173 self._abort(error.RepoError(data))
241
174
242 @batchable
175 @batchable
243 def heads(self):
176 def heads(self):
244 f = future()
177 f = future()
245 yield {}, f
178 yield {}, f
246 d = f.value
179 d = f.value
247 try:
180 try:
248 yield decodelist(d[:-1])
181 yield decodelist(d[:-1])
249 except ValueError:
182 except ValueError:
250 self._abort(error.ResponseError(_("unexpected response:"), d))
183 self._abort(error.ResponseError(_("unexpected response:"), d))
251
184
252 @batchable
185 @batchable
253 def known(self, nodes):
186 def known(self, nodes):
254 f = future()
187 f = future()
255 yield {'nodes': encodelist(nodes)}, f
188 yield {'nodes': encodelist(nodes)}, f
256 d = f.value
189 d = f.value
257 try:
190 try:
258 yield [bool(int(b)) for b in d]
191 yield [bool(int(b)) for b in d]
259 except ValueError:
192 except ValueError:
260 self._abort(error.ResponseError(_("unexpected response:"), d))
193 self._abort(error.ResponseError(_("unexpected response:"), d))
261
194
262 @batchable
195 @batchable
263 def branchmap(self):
196 def branchmap(self):
264 f = future()
197 f = future()
265 yield {}, f
198 yield {}, f
266 d = f.value
199 d = f.value
267 try:
200 try:
268 branchmap = {}
201 branchmap = {}
269 for branchpart in d.splitlines():
202 for branchpart in d.splitlines():
270 branchname, branchheads = branchpart.split(' ', 1)
203 branchname, branchheads = branchpart.split(' ', 1)
271 branchname = encoding.tolocal(urllib.unquote(branchname))
204 branchname = encoding.tolocal(urllib.unquote(branchname))
272 branchheads = decodelist(branchheads)
205 branchheads = decodelist(branchheads)
273 branchmap[branchname] = branchheads
206 branchmap[branchname] = branchheads
274 yield branchmap
207 yield branchmap
275 except TypeError:
208 except TypeError:
276 self._abort(error.ResponseError(_("unexpected response:"), d))
209 self._abort(error.ResponseError(_("unexpected response:"), d))
277
210
278 def branches(self, nodes):
211 def branches(self, nodes):
279 n = encodelist(nodes)
212 n = encodelist(nodes)
280 d = self._call("branches", nodes=n)
213 d = self._call("branches", nodes=n)
281 try:
214 try:
282 br = [tuple(decodelist(b)) for b in d.splitlines()]
215 br = [tuple(decodelist(b)) for b in d.splitlines()]
283 return br
216 return br
284 except ValueError:
217 except ValueError:
285 self._abort(error.ResponseError(_("unexpected response:"), d))
218 self._abort(error.ResponseError(_("unexpected response:"), d))
286
219
287 def between(self, pairs):
220 def between(self, pairs):
288 batch = 8 # avoid giant requests
221 batch = 8 # avoid giant requests
289 r = []
222 r = []
290 for i in xrange(0, len(pairs), batch):
223 for i in xrange(0, len(pairs), batch):
291 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
224 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
292 d = self._call("between", pairs=n)
225 d = self._call("between", pairs=n)
293 try:
226 try:
294 r.extend(l and decodelist(l) or [] for l in d.splitlines())
227 r.extend(l and decodelist(l) or [] for l in d.splitlines())
295 except ValueError:
228 except ValueError:
296 self._abort(error.ResponseError(_("unexpected response:"), d))
229 self._abort(error.ResponseError(_("unexpected response:"), d))
297 return r
230 return r
298
231
299 @batchable
232 @batchable
300 def pushkey(self, namespace, key, old, new):
233 def pushkey(self, namespace, key, old, new):
301 if not self.capable('pushkey'):
234 if not self.capable('pushkey'):
302 yield False, None
235 yield False, None
303 f = future()
236 f = future()
304 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
237 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
305 yield {'namespace': encoding.fromlocal(namespace),
238 yield {'namespace': encoding.fromlocal(namespace),
306 'key': encoding.fromlocal(key),
239 'key': encoding.fromlocal(key),
307 'old': encoding.fromlocal(old),
240 'old': encoding.fromlocal(old),
308 'new': encoding.fromlocal(new)}, f
241 'new': encoding.fromlocal(new)}, f
309 d = f.value
242 d = f.value
310 d, output = d.split('\n', 1)
243 d, output = d.split('\n', 1)
311 try:
244 try:
312 d = bool(int(d))
245 d = bool(int(d))
313 except ValueError:
246 except ValueError:
314 raise error.ResponseError(
247 raise error.ResponseError(
315 _('push failed (unexpected response):'), d)
248 _('push failed (unexpected response):'), d)
316 for l in output.splitlines(True):
249 for l in output.splitlines(True):
317 self.ui.status(_('remote: '), l)
250 self.ui.status(_('remote: '), l)
318 yield d
251 yield d
319
252
320 @batchable
253 @batchable
321 def listkeys(self, namespace):
254 def listkeys(self, namespace):
322 if not self.capable('pushkey'):
255 if not self.capable('pushkey'):
323 yield {}, None
256 yield {}, None
324 f = future()
257 f = future()
325 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
258 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
326 yield {'namespace': encoding.fromlocal(namespace)}, f
259 yield {'namespace': encoding.fromlocal(namespace)}, f
327 d = f.value
260 d = f.value
328 self.ui.debug('received listkey for "%s": %i bytes\n'
261 self.ui.debug('received listkey for "%s": %i bytes\n'
329 % (namespace, len(d)))
262 % (namespace, len(d)))
330 yield pushkeymod.decodekeys(d)
263 yield pushkeymod.decodekeys(d)
331
264
332 def stream_out(self):
265 def stream_out(self):
333 return self._callstream('stream_out')
266 return self._callstream('stream_out')
334
267
335 def changegroup(self, nodes, kind):
268 def changegroup(self, nodes, kind):
336 n = encodelist(nodes)
269 n = encodelist(nodes)
337 f = self._callcompressable("changegroup", roots=n)
270 f = self._callcompressable("changegroup", roots=n)
338 return changegroupmod.cg1unpacker(f, 'UN')
271 return changegroupmod.cg1unpacker(f, 'UN')
339
272
340 def changegroupsubset(self, bases, heads, kind):
273 def changegroupsubset(self, bases, heads, kind):
341 self.requirecap('changegroupsubset', _('look up remote changes'))
274 self.requirecap('changegroupsubset', _('look up remote changes'))
342 bases = encodelist(bases)
275 bases = encodelist(bases)
343 heads = encodelist(heads)
276 heads = encodelist(heads)
344 f = self._callcompressable("changegroupsubset",
277 f = self._callcompressable("changegroupsubset",
345 bases=bases, heads=heads)
278 bases=bases, heads=heads)
346 return changegroupmod.cg1unpacker(f, 'UN')
279 return changegroupmod.cg1unpacker(f, 'UN')
347
280
348 def getbundle(self, source, **kwargs):
281 def getbundle(self, source, **kwargs):
349 self.requirecap('getbundle', _('look up remote changes'))
282 self.requirecap('getbundle', _('look up remote changes'))
350 opts = {}
283 opts = {}
351 bundlecaps = kwargs.get('bundlecaps')
284 bundlecaps = kwargs.get('bundlecaps')
352 if bundlecaps is not None:
285 if bundlecaps is not None:
353 kwargs['bundlecaps'] = sorted(bundlecaps)
286 kwargs['bundlecaps'] = sorted(bundlecaps)
354 else:
287 else:
355 bundlecaps = () # kwargs could have it to None
288 bundlecaps = () # kwargs could have it to None
356 for key, value in kwargs.iteritems():
289 for key, value in kwargs.iteritems():
357 if value is None:
290 if value is None:
358 continue
291 continue
359 keytype = gboptsmap.get(key)
292 keytype = gboptsmap.get(key)
360 if keytype is None:
293 if keytype is None:
361 assert False, 'unexpected'
294 assert False, 'unexpected'
362 elif keytype == 'nodes':
295 elif keytype == 'nodes':
363 value = encodelist(value)
296 value = encodelist(value)
364 elif keytype in ('csv', 'scsv'):
297 elif keytype in ('csv', 'scsv'):
365 value = ','.join(value)
298 value = ','.join(value)
366 elif keytype == 'boolean':
299 elif keytype == 'boolean':
367 value = '%i' % bool(value)
300 value = '%i' % bool(value)
368 elif keytype != 'plain':
301 elif keytype != 'plain':
369 raise KeyError('unknown getbundle option type %s'
302 raise KeyError('unknown getbundle option type %s'
370 % keytype)
303 % keytype)
371 opts[key] = value
304 opts[key] = value
372 f = self._callcompressable("getbundle", **opts)
305 f = self._callcompressable("getbundle", **opts)
373 if any((cap.startswith('HG2') for cap in bundlecaps)):
306 if any((cap.startswith('HG2') for cap in bundlecaps)):
374 return bundle2.getunbundler(self.ui, f)
307 return bundle2.getunbundler(self.ui, f)
375 else:
308 else:
376 return changegroupmod.cg1unpacker(f, 'UN')
309 return changegroupmod.cg1unpacker(f, 'UN')
377
310
378 def unbundle(self, cg, heads, source):
311 def unbundle(self, cg, heads, source):
379 '''Send cg (a readable file-like object representing the
312 '''Send cg (a readable file-like object representing the
380 changegroup to push, typically a chunkbuffer object) to the
313 changegroup to push, typically a chunkbuffer object) to the
381 remote server as a bundle.
314 remote server as a bundle.
382
315
383 When pushing a bundle10 stream, return an integer indicating the
316 When pushing a bundle10 stream, return an integer indicating the
384 result of the push (see localrepository.addchangegroup()).
317 result of the push (see localrepository.addchangegroup()).
385
318
386 When pushing a bundle20 stream, return a bundle20 stream.'''
319 When pushing a bundle20 stream, return a bundle20 stream.'''
387
320
388 if heads != ['force'] and self.capable('unbundlehash'):
321 if heads != ['force'] and self.capable('unbundlehash'):
389 heads = encodelist(['hashed',
322 heads = encodelist(['hashed',
390 util.sha1(''.join(sorted(heads))).digest()])
323 util.sha1(''.join(sorted(heads))).digest()])
391 else:
324 else:
392 heads = encodelist(heads)
325 heads = encodelist(heads)
393
326
394 if util.safehasattr(cg, 'deltaheader'):
327 if util.safehasattr(cg, 'deltaheader'):
395 # this a bundle10, do the old style call sequence
328 # this a bundle10, do the old style call sequence
396 ret, output = self._callpush("unbundle", cg, heads=heads)
329 ret, output = self._callpush("unbundle", cg, heads=heads)
397 if ret == "":
330 if ret == "":
398 raise error.ResponseError(
331 raise error.ResponseError(
399 _('push failed:'), output)
332 _('push failed:'), output)
400 try:
333 try:
401 ret = int(ret)
334 ret = int(ret)
402 except ValueError:
335 except ValueError:
403 raise error.ResponseError(
336 raise error.ResponseError(
404 _('push failed (unexpected response):'), ret)
337 _('push failed (unexpected response):'), ret)
405
338
406 for l in output.splitlines(True):
339 for l in output.splitlines(True):
407 self.ui.status(_('remote: '), l)
340 self.ui.status(_('remote: '), l)
408 else:
341 else:
409 # bundle2 push. Send a stream, fetch a stream.
342 # bundle2 push. Send a stream, fetch a stream.
410 stream = self._calltwowaystream('unbundle', cg, heads=heads)
343 stream = self._calltwowaystream('unbundle', cg, heads=heads)
411 ret = bundle2.getunbundler(self.ui, stream)
344 ret = bundle2.getunbundler(self.ui, stream)
412 return ret
345 return ret
413
346
414 def debugwireargs(self, one, two, three=None, four=None, five=None):
347 def debugwireargs(self, one, two, three=None, four=None, five=None):
415 # don't pass optional arguments left at their default value
348 # don't pass optional arguments left at their default value
416 opts = {}
349 opts = {}
417 if three is not None:
350 if three is not None:
418 opts['three'] = three
351 opts['three'] = three
419 if four is not None:
352 if four is not None:
420 opts['four'] = four
353 opts['four'] = four
421 return self._call('debugwireargs', one=one, two=two, **opts)
354 return self._call('debugwireargs', one=one, two=two, **opts)
422
355
423 def _call(self, cmd, **args):
356 def _call(self, cmd, **args):
424 """execute <cmd> on the server
357 """execute <cmd> on the server
425
358
426 The command is expected to return a simple string.
359 The command is expected to return a simple string.
427
360
428 returns the server reply as a string."""
361 returns the server reply as a string."""
429 raise NotImplementedError()
362 raise NotImplementedError()
430
363
431 def _callstream(self, cmd, **args):
364 def _callstream(self, cmd, **args):
432 """execute <cmd> on the server
365 """execute <cmd> on the server
433
366
434 The command is expected to return a stream.
367 The command is expected to return a stream.
435
368
436 returns the server reply as a file like object."""
369 returns the server reply as a file like object."""
437 raise NotImplementedError()
370 raise NotImplementedError()
438
371
439 def _callcompressable(self, cmd, **args):
372 def _callcompressable(self, cmd, **args):
440 """execute <cmd> on the server
373 """execute <cmd> on the server
441
374
442 The command is expected to return a stream.
375 The command is expected to return a stream.
443
376
444 The stream may have been compressed in some implementations. This
377 The stream may have been compressed in some implementations. This
445 function takes care of the decompression. This is the only difference
378 function takes care of the decompression. This is the only difference
446 with _callstream.
379 with _callstream.
447
380
448 returns the server reply as a file like object.
381 returns the server reply as a file like object.
449 """
382 """
450 raise NotImplementedError()
383 raise NotImplementedError()
451
384
452 def _callpush(self, cmd, fp, **args):
385 def _callpush(self, cmd, fp, **args):
453 """execute a <cmd> on server
386 """execute a <cmd> on server
454
387
455 The command is expected to be related to a push. Push has a special
388 The command is expected to be related to a push. Push has a special
456 return method.
389 return method.
457
390
458 returns the server reply as a (ret, output) tuple. ret is either
391 returns the server reply as a (ret, output) tuple. ret is either
459 empty (error) or a stringified int.
392 empty (error) or a stringified int.
460 """
393 """
461 raise NotImplementedError()
394 raise NotImplementedError()
462
395
463 def _calltwowaystream(self, cmd, fp, **args):
396 def _calltwowaystream(self, cmd, fp, **args):
464 """execute <cmd> on server
397 """execute <cmd> on server
465
398
466 The command will send a stream to the server and get a stream in reply.
399 The command will send a stream to the server and get a stream in reply.
467 """
400 """
468 raise NotImplementedError()
401 raise NotImplementedError()
469
402
470 def _abort(self, exception):
403 def _abort(self, exception):
471 """clearly abort the wire protocol connection and raise the exception
404 """clearly abort the wire protocol connection and raise the exception
472 """
405 """
473 raise NotImplementedError()
406 raise NotImplementedError()
474
407
475 # server side
408 # server side
476
409
477 # wire protocol command can either return a string or one of these classes.
410 # wire protocol command can either return a string or one of these classes.
478 class streamres(object):
411 class streamres(object):
479 """wireproto reply: binary stream
412 """wireproto reply: binary stream
480
413
481 The call was successful and the result is a stream.
414 The call was successful and the result is a stream.
482 Iterate on the `self.gen` attribute to retrieve chunks.
415 Iterate on the `self.gen` attribute to retrieve chunks.
483 """
416 """
484 def __init__(self, gen):
417 def __init__(self, gen):
485 self.gen = gen
418 self.gen = gen
486
419
487 class pushres(object):
420 class pushres(object):
488 """wireproto reply: success with simple integer return
421 """wireproto reply: success with simple integer return
489
422
490 The call was successful and returned an integer contained in `self.res`.
423 The call was successful and returned an integer contained in `self.res`.
491 """
424 """
492 def __init__(self, res):
425 def __init__(self, res):
493 self.res = res
426 self.res = res
494
427
495 class pusherr(object):
428 class pusherr(object):
496 """wireproto reply: failure
429 """wireproto reply: failure
497
430
498 The call failed. The `self.res` attribute contains the error message.
431 The call failed. The `self.res` attribute contains the error message.
499 """
432 """
500 def __init__(self, res):
433 def __init__(self, res):
501 self.res = res
434 self.res = res
502
435
503 class ooberror(object):
436 class ooberror(object):
504 """wireproto reply: failure of a batch of operation
437 """wireproto reply: failure of a batch of operation
505
438
506 Something failed during a batch call. The error message is stored in
439 Something failed during a batch call. The error message is stored in
507 `self.message`.
440 `self.message`.
508 """
441 """
509 def __init__(self, message):
442 def __init__(self, message):
510 self.message = message
443 self.message = message
511
444
512 def dispatch(repo, proto, command):
445 def dispatch(repo, proto, command):
513 repo = repo.filtered("served")
446 repo = repo.filtered("served")
514 func, spec = commands[command]
447 func, spec = commands[command]
515 args = proto.getargs(spec)
448 args = proto.getargs(spec)
516 return func(repo, proto, *args)
449 return func(repo, proto, *args)
517
450
518 def options(cmd, keys, others):
451 def options(cmd, keys, others):
519 opts = {}
452 opts = {}
520 for k in keys:
453 for k in keys:
521 if k in others:
454 if k in others:
522 opts[k] = others[k]
455 opts[k] = others[k]
523 del others[k]
456 del others[k]
524 if others:
457 if others:
525 sys.stderr.write("warning: %s ignored unexpected arguments %s\n"
458 sys.stderr.write("warning: %s ignored unexpected arguments %s\n"
526 % (cmd, ",".join(others)))
459 % (cmd, ",".join(others)))
527 return opts
460 return opts
528
461
529 # list of commands
462 # list of commands
530 commands = {}
463 commands = {}
531
464
532 def wireprotocommand(name, args=''):
465 def wireprotocommand(name, args=''):
533 """decorator for wire protocol command"""
466 """decorator for wire protocol command"""
534 def register(func):
467 def register(func):
535 commands[name] = (func, args)
468 commands[name] = (func, args)
536 return func
469 return func
537 return register
470 return register
538
471
539 @wireprotocommand('batch', 'cmds *')
472 @wireprotocommand('batch', 'cmds *')
540 def batch(repo, proto, cmds, others):
473 def batch(repo, proto, cmds, others):
541 repo = repo.filtered("served")
474 repo = repo.filtered("served")
542 res = []
475 res = []
543 for pair in cmds.split(';'):
476 for pair in cmds.split(';'):
544 op, args = pair.split(' ', 1)
477 op, args = pair.split(' ', 1)
545 vals = {}
478 vals = {}
546 for a in args.split(','):
479 for a in args.split(','):
547 if a:
480 if a:
548 n, v = a.split('=')
481 n, v = a.split('=')
549 vals[n] = unescapearg(v)
482 vals[n] = unescapearg(v)
550 func, spec = commands[op]
483 func, spec = commands[op]
551 if spec:
484 if spec:
552 keys = spec.split()
485 keys = spec.split()
553 data = {}
486 data = {}
554 for k in keys:
487 for k in keys:
555 if k == '*':
488 if k == '*':
556 star = {}
489 star = {}
557 for key in vals.keys():
490 for key in vals.keys():
558 if key not in keys:
491 if key not in keys:
559 star[key] = vals[key]
492 star[key] = vals[key]
560 data['*'] = star
493 data['*'] = star
561 else:
494 else:
562 data[k] = vals[k]
495 data[k] = vals[k]
563 result = func(repo, proto, *[data[k] for k in keys])
496 result = func(repo, proto, *[data[k] for k in keys])
564 else:
497 else:
565 result = func(repo, proto)
498 result = func(repo, proto)
566 if isinstance(result, ooberror):
499 if isinstance(result, ooberror):
567 return result
500 return result
568 res.append(escapearg(result))
501 res.append(escapearg(result))
569 return ';'.join(res)
502 return ';'.join(res)
570
503
571 @wireprotocommand('between', 'pairs')
504 @wireprotocommand('between', 'pairs')
572 def between(repo, proto, pairs):
505 def between(repo, proto, pairs):
573 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
506 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
574 r = []
507 r = []
575 for b in repo.between(pairs):
508 for b in repo.between(pairs):
576 r.append(encodelist(b) + "\n")
509 r.append(encodelist(b) + "\n")
577 return "".join(r)
510 return "".join(r)
578
511
579 @wireprotocommand('branchmap')
512 @wireprotocommand('branchmap')
580 def branchmap(repo, proto):
513 def branchmap(repo, proto):
581 branchmap = repo.branchmap()
514 branchmap = repo.branchmap()
582 heads = []
515 heads = []
583 for branch, nodes in branchmap.iteritems():
516 for branch, nodes in branchmap.iteritems():
584 branchname = urllib.quote(encoding.fromlocal(branch))
517 branchname = urllib.quote(encoding.fromlocal(branch))
585 branchnodes = encodelist(nodes)
518 branchnodes = encodelist(nodes)
586 heads.append('%s %s' % (branchname, branchnodes))
519 heads.append('%s %s' % (branchname, branchnodes))
587 return '\n'.join(heads)
520 return '\n'.join(heads)
588
521
589 @wireprotocommand('branches', 'nodes')
522 @wireprotocommand('branches', 'nodes')
590 def branches(repo, proto, nodes):
523 def branches(repo, proto, nodes):
591 nodes = decodelist(nodes)
524 nodes = decodelist(nodes)
592 r = []
525 r = []
593 for b in repo.branches(nodes):
526 for b in repo.branches(nodes):
594 r.append(encodelist(b) + "\n")
527 r.append(encodelist(b) + "\n")
595 return "".join(r)
528 return "".join(r)
596
529
597
530
598 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
531 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
599 'known', 'getbundle', 'unbundlehash', 'batch']
532 'known', 'getbundle', 'unbundlehash', 'batch']
600
533
601 def _capabilities(repo, proto):
534 def _capabilities(repo, proto):
602 """return a list of capabilities for a repo
535 """return a list of capabilities for a repo
603
536
604 This function exists to allow extensions to easily wrap capabilities
537 This function exists to allow extensions to easily wrap capabilities
605 computation
538 computation
606
539
607 - returns a lists: easy to alter
540 - returns a lists: easy to alter
608 - change done here will be propagated to both `capabilities` and `hello`
541 - change done here will be propagated to both `capabilities` and `hello`
609 command without any other action needed.
542 command without any other action needed.
610 """
543 """
611 # copy to prevent modification of the global list
544 # copy to prevent modification of the global list
612 caps = list(wireprotocaps)
545 caps = list(wireprotocaps)
613 if _allowstream(repo.ui):
546 if _allowstream(repo.ui):
614 if repo.ui.configbool('server', 'preferuncompressed', False):
547 if repo.ui.configbool('server', 'preferuncompressed', False):
615 caps.append('stream-preferred')
548 caps.append('stream-preferred')
616 requiredformats = repo.requirements & repo.supportedformats
549 requiredformats = repo.requirements & repo.supportedformats
617 # if our local revlogs are just revlogv1, add 'stream' cap
550 # if our local revlogs are just revlogv1, add 'stream' cap
618 if not requiredformats - set(('revlogv1',)):
551 if not requiredformats - set(('revlogv1',)):
619 caps.append('stream')
552 caps.append('stream')
620 # otherwise, add 'streamreqs' detailing our local revlog format
553 # otherwise, add 'streamreqs' detailing our local revlog format
621 else:
554 else:
622 caps.append('streamreqs=%s' % ','.join(requiredformats))
555 caps.append('streamreqs=%s' % ','.join(requiredformats))
623 if repo.ui.configbool('experimental', 'bundle2-advertise', True):
556 if repo.ui.configbool('experimental', 'bundle2-advertise', True):
624 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
557 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
625 caps.append('bundle2=' + urllib.quote(capsblob))
558 caps.append('bundle2=' + urllib.quote(capsblob))
626 caps.append('unbundle=%s' % ','.join(changegroupmod.bundlepriority))
559 caps.append('unbundle=%s' % ','.join(changegroupmod.bundlepriority))
627 caps.append(
560 caps.append(
628 'httpheader=%d' % repo.ui.configint('server', 'maxhttpheaderlen', 1024))
561 'httpheader=%d' % repo.ui.configint('server', 'maxhttpheaderlen', 1024))
629 return caps
562 return caps
630
563
631 # If you are writing an extension and consider wrapping this function. Wrap
564 # If you are writing an extension and consider wrapping this function. Wrap
632 # `_capabilities` instead.
565 # `_capabilities` instead.
633 @wireprotocommand('capabilities')
566 @wireprotocommand('capabilities')
634 def capabilities(repo, proto):
567 def capabilities(repo, proto):
635 return ' '.join(_capabilities(repo, proto))
568 return ' '.join(_capabilities(repo, proto))
636
569
637 @wireprotocommand('changegroup', 'roots')
570 @wireprotocommand('changegroup', 'roots')
638 def changegroup(repo, proto, roots):
571 def changegroup(repo, proto, roots):
639 nodes = decodelist(roots)
572 nodes = decodelist(roots)
640 cg = changegroupmod.changegroup(repo, nodes, 'serve')
573 cg = changegroupmod.changegroup(repo, nodes, 'serve')
641 return streamres(proto.groupchunks(cg))
574 return streamres(proto.groupchunks(cg))
642
575
643 @wireprotocommand('changegroupsubset', 'bases heads')
576 @wireprotocommand('changegroupsubset', 'bases heads')
644 def changegroupsubset(repo, proto, bases, heads):
577 def changegroupsubset(repo, proto, bases, heads):
645 bases = decodelist(bases)
578 bases = decodelist(bases)
646 heads = decodelist(heads)
579 heads = decodelist(heads)
647 cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve')
580 cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve')
648 return streamres(proto.groupchunks(cg))
581 return streamres(proto.groupchunks(cg))
649
582
650 @wireprotocommand('debugwireargs', 'one two *')
583 @wireprotocommand('debugwireargs', 'one two *')
651 def debugwireargs(repo, proto, one, two, others):
584 def debugwireargs(repo, proto, one, two, others):
652 # only accept optional args from the known set
585 # only accept optional args from the known set
653 opts = options('debugwireargs', ['three', 'four'], others)
586 opts = options('debugwireargs', ['three', 'four'], others)
654 return repo.debugwireargs(one, two, **opts)
587 return repo.debugwireargs(one, two, **opts)
655
588
656 # List of options accepted by getbundle.
589 # List of options accepted by getbundle.
657 #
590 #
658 # Meant to be extended by extensions. It is the extension's responsibility to
591 # Meant to be extended by extensions. It is the extension's responsibility to
659 # ensure such options are properly processed in exchange.getbundle.
592 # ensure such options are properly processed in exchange.getbundle.
660 gboptslist = ['heads', 'common', 'bundlecaps']
593 gboptslist = ['heads', 'common', 'bundlecaps']
661
594
662 @wireprotocommand('getbundle', '*')
595 @wireprotocommand('getbundle', '*')
663 def getbundle(repo, proto, others):
596 def getbundle(repo, proto, others):
664 opts = options('getbundle', gboptsmap.keys(), others)
597 opts = options('getbundle', gboptsmap.keys(), others)
665 for k, v in opts.iteritems():
598 for k, v in opts.iteritems():
666 keytype = gboptsmap[k]
599 keytype = gboptsmap[k]
667 if keytype == 'nodes':
600 if keytype == 'nodes':
668 opts[k] = decodelist(v)
601 opts[k] = decodelist(v)
669 elif keytype == 'csv':
602 elif keytype == 'csv':
670 opts[k] = list(v.split(','))
603 opts[k] = list(v.split(','))
671 elif keytype == 'scsv':
604 elif keytype == 'scsv':
672 opts[k] = set(v.split(','))
605 opts[k] = set(v.split(','))
673 elif keytype == 'boolean':
606 elif keytype == 'boolean':
674 opts[k] = bool(v)
607 opts[k] = bool(v)
675 elif keytype != 'plain':
608 elif keytype != 'plain':
676 raise KeyError('unknown getbundle option type %s'
609 raise KeyError('unknown getbundle option type %s'
677 % keytype)
610 % keytype)
678 cg = exchange.getbundle(repo, 'serve', **opts)
611 cg = exchange.getbundle(repo, 'serve', **opts)
679 return streamres(proto.groupchunks(cg))
612 return streamres(proto.groupchunks(cg))
680
613
681 @wireprotocommand('heads')
614 @wireprotocommand('heads')
682 def heads(repo, proto):
615 def heads(repo, proto):
683 h = repo.heads()
616 h = repo.heads()
684 return encodelist(h) + "\n"
617 return encodelist(h) + "\n"
685
618
686 @wireprotocommand('hello')
619 @wireprotocommand('hello')
687 def hello(repo, proto):
620 def hello(repo, proto):
688 '''the hello command returns a set of lines describing various
621 '''the hello command returns a set of lines describing various
689 interesting things about the server, in an RFC822-like format.
622 interesting things about the server, in an RFC822-like format.
690 Currently the only one defined is "capabilities", which
623 Currently the only one defined is "capabilities", which
691 consists of a line in the form:
624 consists of a line in the form:
692
625
693 capabilities: space separated list of tokens
626 capabilities: space separated list of tokens
694 '''
627 '''
695 return "capabilities: %s\n" % (capabilities(repo, proto))
628 return "capabilities: %s\n" % (capabilities(repo, proto))
696
629
697 @wireprotocommand('listkeys', 'namespace')
630 @wireprotocommand('listkeys', 'namespace')
698 def listkeys(repo, proto, namespace):
631 def listkeys(repo, proto, namespace):
699 d = repo.listkeys(encoding.tolocal(namespace)).items()
632 d = repo.listkeys(encoding.tolocal(namespace)).items()
700 return pushkeymod.encodekeys(d)
633 return pushkeymod.encodekeys(d)
701
634
702 @wireprotocommand('lookup', 'key')
635 @wireprotocommand('lookup', 'key')
703 def lookup(repo, proto, key):
636 def lookup(repo, proto, key):
704 try:
637 try:
705 k = encoding.tolocal(key)
638 k = encoding.tolocal(key)
706 c = repo[k]
639 c = repo[k]
707 r = c.hex()
640 r = c.hex()
708 success = 1
641 success = 1
709 except Exception as inst:
642 except Exception as inst:
710 r = str(inst)
643 r = str(inst)
711 success = 0
644 success = 0
712 return "%s %s\n" % (success, r)
645 return "%s %s\n" % (success, r)
713
646
714 @wireprotocommand('known', 'nodes *')
647 @wireprotocommand('known', 'nodes *')
715 def known(repo, proto, nodes, others):
648 def known(repo, proto, nodes, others):
716 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
649 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
717
650
718 @wireprotocommand('pushkey', 'namespace key old new')
651 @wireprotocommand('pushkey', 'namespace key old new')
719 def pushkey(repo, proto, namespace, key, old, new):
652 def pushkey(repo, proto, namespace, key, old, new):
720 # compatibility with pre-1.8 clients which were accidentally
653 # compatibility with pre-1.8 clients which were accidentally
721 # sending raw binary nodes rather than utf-8-encoded hex
654 # sending raw binary nodes rather than utf-8-encoded hex
722 if len(new) == 20 and new.encode('string-escape') != new:
655 if len(new) == 20 and new.encode('string-escape') != new:
723 # looks like it could be a binary node
656 # looks like it could be a binary node
724 try:
657 try:
725 new.decode('utf-8')
658 new.decode('utf-8')
726 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
659 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
727 except UnicodeDecodeError:
660 except UnicodeDecodeError:
728 pass # binary, leave unmodified
661 pass # binary, leave unmodified
729 else:
662 else:
730 new = encoding.tolocal(new) # normal path
663 new = encoding.tolocal(new) # normal path
731
664
732 if util.safehasattr(proto, 'restore'):
665 if util.safehasattr(proto, 'restore'):
733
666
734 proto.redirect()
667 proto.redirect()
735
668
736 try:
669 try:
737 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
670 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
738 encoding.tolocal(old), new) or False
671 encoding.tolocal(old), new) or False
739 except util.Abort:
672 except util.Abort:
740 r = False
673 r = False
741
674
742 output = proto.restore()
675 output = proto.restore()
743
676
744 return '%s\n%s' % (int(r), output)
677 return '%s\n%s' % (int(r), output)
745
678
746 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
679 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
747 encoding.tolocal(old), new)
680 encoding.tolocal(old), new)
748 return '%s\n' % int(r)
681 return '%s\n' % int(r)
749
682
750 def _allowstream(ui):
683 def _allowstream(ui):
751 return ui.configbool('server', 'uncompressed', True, untrusted=True)
684 return ui.configbool('server', 'uncompressed', True, untrusted=True)
752
685
753 @wireprotocommand('stream_out')
686 @wireprotocommand('stream_out')
754 def stream(repo, proto):
687 def stream(repo, proto):
755 '''If the server supports streaming clone, it advertises the "stream"
688 '''If the server supports streaming clone, it advertises the "stream"
756 capability with a value representing the version and flags of the repo
689 capability with a value representing the version and flags of the repo
757 it is serving. Client checks to see if it understands the format.
690 it is serving. Client checks to see if it understands the format.
758 '''
691 '''
759 if not _allowstream(repo.ui):
692 if not _allowstream(repo.ui):
760 return '1\n'
693 return '1\n'
761
694
762 def getstream(it):
695 def getstream(it):
763 yield '0\n'
696 yield '0\n'
764 for chunk in it:
697 for chunk in it:
765 yield chunk
698 yield chunk
766
699
767 try:
700 try:
768 # LockError may be raised before the first result is yielded. Don't
701 # LockError may be raised before the first result is yielded. Don't
769 # emit output until we're sure we got the lock successfully.
702 # emit output until we're sure we got the lock successfully.
770 it = exchange.generatestreamclone(repo)
703 it = exchange.generatestreamclone(repo)
771 return streamres(getstream(it))
704 return streamres(getstream(it))
772 except error.LockError:
705 except error.LockError:
773 return '2\n'
706 return '2\n'
774
707
775 @wireprotocommand('unbundle', 'heads')
708 @wireprotocommand('unbundle', 'heads')
776 def unbundle(repo, proto, heads):
709 def unbundle(repo, proto, heads):
777 their_heads = decodelist(heads)
710 their_heads = decodelist(heads)
778
711
779 try:
712 try:
780 proto.redirect()
713 proto.redirect()
781
714
782 exchange.check_heads(repo, their_heads, 'preparing changes')
715 exchange.check_heads(repo, their_heads, 'preparing changes')
783
716
784 # write bundle data to temporary file because it can be big
717 # write bundle data to temporary file because it can be big
785 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
718 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
786 fp = os.fdopen(fd, 'wb+')
719 fp = os.fdopen(fd, 'wb+')
787 r = 0
720 r = 0
788 try:
721 try:
789 proto.getfile(fp)
722 proto.getfile(fp)
790 fp.seek(0)
723 fp.seek(0)
791 gen = exchange.readbundle(repo.ui, fp, None)
724 gen = exchange.readbundle(repo.ui, fp, None)
792 r = exchange.unbundle(repo, gen, their_heads, 'serve',
725 r = exchange.unbundle(repo, gen, their_heads, 'serve',
793 proto._client())
726 proto._client())
794 if util.safehasattr(r, 'addpart'):
727 if util.safehasattr(r, 'addpart'):
795 # The return looks streamable, we are in the bundle2 case and
728 # The return looks streamable, we are in the bundle2 case and
796 # should return a stream.
729 # should return a stream.
797 return streamres(r.getchunks())
730 return streamres(r.getchunks())
798 return pushres(r)
731 return pushres(r)
799
732
800 finally:
733 finally:
801 fp.close()
734 fp.close()
802 os.unlink(tempname)
735 os.unlink(tempname)
803
736
804 except (error.BundleValueError, util.Abort, error.PushRaced) as exc:
737 except (error.BundleValueError, util.Abort, error.PushRaced) as exc:
805 # handle non-bundle2 case first
738 # handle non-bundle2 case first
806 if not getattr(exc, 'duringunbundle2', False):
739 if not getattr(exc, 'duringunbundle2', False):
807 try:
740 try:
808 raise
741 raise
809 except util.Abort:
742 except util.Abort:
810 # The old code we moved used sys.stderr directly.
743 # The old code we moved used sys.stderr directly.
811 # We did not change it to minimise code change.
744 # We did not change it to minimise code change.
812 # This need to be moved to something proper.
745 # This need to be moved to something proper.
813 # Feel free to do it.
746 # Feel free to do it.
814 sys.stderr.write("abort: %s\n" % exc)
747 sys.stderr.write("abort: %s\n" % exc)
815 return pushres(0)
748 return pushres(0)
816 except error.PushRaced:
749 except error.PushRaced:
817 return pusherr(str(exc))
750 return pusherr(str(exc))
818
751
819 bundler = bundle2.bundle20(repo.ui)
752 bundler = bundle2.bundle20(repo.ui)
820 for out in getattr(exc, '_bundle2salvagedoutput', ()):
753 for out in getattr(exc, '_bundle2salvagedoutput', ()):
821 bundler.addpart(out)
754 bundler.addpart(out)
822 try:
755 try:
823 try:
756 try:
824 raise
757 raise
825 except error.PushkeyFailed as exc:
758 except error.PushkeyFailed as exc:
826 # check client caps
759 # check client caps
827 remotecaps = getattr(exc, '_replycaps', None)
760 remotecaps = getattr(exc, '_replycaps', None)
828 if (remotecaps is not None
761 if (remotecaps is not None
829 and 'pushkey' not in remotecaps.get('error', ())):
762 and 'pushkey' not in remotecaps.get('error', ())):
830 # no support remote side, fallback to Abort handler.
763 # no support remote side, fallback to Abort handler.
831 raise
764 raise
832 part = bundler.newpart('error:pushkey')
765 part = bundler.newpart('error:pushkey')
833 part.addparam('in-reply-to', exc.partid)
766 part.addparam('in-reply-to', exc.partid)
834 if exc.namespace is not None:
767 if exc.namespace is not None:
835 part.addparam('namespace', exc.namespace, mandatory=False)
768 part.addparam('namespace', exc.namespace, mandatory=False)
836 if exc.key is not None:
769 if exc.key is not None:
837 part.addparam('key', exc.key, mandatory=False)
770 part.addparam('key', exc.key, mandatory=False)
838 if exc.new is not None:
771 if exc.new is not None:
839 part.addparam('new', exc.new, mandatory=False)
772 part.addparam('new', exc.new, mandatory=False)
840 if exc.old is not None:
773 if exc.old is not None:
841 part.addparam('old', exc.old, mandatory=False)
774 part.addparam('old', exc.old, mandatory=False)
842 if exc.ret is not None:
775 if exc.ret is not None:
843 part.addparam('ret', exc.ret, mandatory=False)
776 part.addparam('ret', exc.ret, mandatory=False)
844 except error.BundleValueError as exc:
777 except error.BundleValueError as exc:
845 errpart = bundler.newpart('error:unsupportedcontent')
778 errpart = bundler.newpart('error:unsupportedcontent')
846 if exc.parttype is not None:
779 if exc.parttype is not None:
847 errpart.addparam('parttype', exc.parttype)
780 errpart.addparam('parttype', exc.parttype)
848 if exc.params:
781 if exc.params:
849 errpart.addparam('params', '\0'.join(exc.params))
782 errpart.addparam('params', '\0'.join(exc.params))
850 except util.Abort as exc:
783 except util.Abort as exc:
851 manargs = [('message', str(exc))]
784 manargs = [('message', str(exc))]
852 advargs = []
785 advargs = []
853 if exc.hint is not None:
786 if exc.hint is not None:
854 advargs.append(('hint', exc.hint))
787 advargs.append(('hint', exc.hint))
855 bundler.addpart(bundle2.bundlepart('error:abort',
788 bundler.addpart(bundle2.bundlepart('error:abort',
856 manargs, advargs))
789 manargs, advargs))
857 except error.PushRaced as exc:
790 except error.PushRaced as exc:
858 bundler.newpart('error:pushraced', [('message', str(exc))])
791 bundler.newpart('error:pushraced', [('message', str(exc))])
859 return streamres(bundler.getchunks())
792 return streamres(bundler.getchunks())
@@ -1,175 +1,176
1 # test-batching.py - tests for transparent command batching
1 # test-batching.py - tests for transparent command batching
2 #
2 #
3 # Copyright 2011 Peter Arrenbrecht <peter@arrenbrecht.ch>
3 # Copyright 2011 Peter Arrenbrecht <peter@arrenbrecht.ch>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from mercurial.wireproto import localbatch, remotebatch, batchable, future
8 from mercurial.peer import localbatch, batchable, future
9 from mercurial.wireproto import remotebatch
9
10
10 # equivalent of repo.repository
11 # equivalent of repo.repository
11 class thing(object):
12 class thing(object):
12 def hello(self):
13 def hello(self):
13 return "Ready."
14 return "Ready."
14
15
15 # equivalent of localrepo.localrepository
16 # equivalent of localrepo.localrepository
16 class localthing(thing):
17 class localthing(thing):
17 def foo(self, one, two=None):
18 def foo(self, one, two=None):
18 if one:
19 if one:
19 return "%s and %s" % (one, two,)
20 return "%s and %s" % (one, two,)
20 return "Nope"
21 return "Nope"
21 def bar(self, b, a):
22 def bar(self, b, a):
22 return "%s und %s" % (b, a,)
23 return "%s und %s" % (b, a,)
23 def greet(self, name=None):
24 def greet(self, name=None):
24 return "Hello, %s" % name
25 return "Hello, %s" % name
25 def batch(self):
26 def batch(self):
26 '''Support for local batching.'''
27 '''Support for local batching.'''
27 return localbatch(self)
28 return localbatch(self)
28
29
29 # usage of "thing" interface
30 # usage of "thing" interface
30 def use(it):
31 def use(it):
31
32
32 # Direct call to base method shared between client and server.
33 # Direct call to base method shared between client and server.
33 print it.hello()
34 print it.hello()
34
35
35 # Direct calls to proxied methods. They cause individual roundtrips.
36 # Direct calls to proxied methods. They cause individual roundtrips.
36 print it.foo("Un", two="Deux")
37 print it.foo("Un", two="Deux")
37 print it.bar("Eins", "Zwei")
38 print it.bar("Eins", "Zwei")
38
39
39 # Batched call to a couple of (possibly proxied) methods.
40 # Batched call to a couple of (possibly proxied) methods.
40 batch = it.batch()
41 batch = it.batch()
41 # The calls return futures to eventually hold results.
42 # The calls return futures to eventually hold results.
42 foo = batch.foo(one="One", two="Two")
43 foo = batch.foo(one="One", two="Two")
43 foo2 = batch.foo(None)
44 foo2 = batch.foo(None)
44 bar = batch.bar("Eins", "Zwei")
45 bar = batch.bar("Eins", "Zwei")
45 # We can call non-batchable proxy methods, but the break the current batch
46 # We can call non-batchable proxy methods, but the break the current batch
46 # request and cause additional roundtrips.
47 # request and cause additional roundtrips.
47 greet = batch.greet(name="John Smith")
48 greet = batch.greet(name="John Smith")
48 # We can also add local methods into the mix, but they break the batch too.
49 # We can also add local methods into the mix, but they break the batch too.
49 hello = batch.hello()
50 hello = batch.hello()
50 bar2 = batch.bar(b="Uno", a="Due")
51 bar2 = batch.bar(b="Uno", a="Due")
51 # Only now are all the calls executed in sequence, with as few roundtrips
52 # Only now are all the calls executed in sequence, with as few roundtrips
52 # as possible.
53 # as possible.
53 batch.submit()
54 batch.submit()
54 # After the call to submit, the futures actually contain values.
55 # After the call to submit, the futures actually contain values.
55 print foo.value
56 print foo.value
56 print foo2.value
57 print foo2.value
57 print bar.value
58 print bar.value
58 print greet.value
59 print greet.value
59 print hello.value
60 print hello.value
60 print bar2.value
61 print bar2.value
61
62
62 # local usage
63 # local usage
63 mylocal = localthing()
64 mylocal = localthing()
64 print
65 print
65 print "== Local"
66 print "== Local"
66 use(mylocal)
67 use(mylocal)
67
68
68 # demo remoting; mimicks what wireproto and HTTP/SSH do
69 # demo remoting; mimicks what wireproto and HTTP/SSH do
69
70
70 # shared
71 # shared
71
72
72 def escapearg(plain):
73 def escapearg(plain):
73 return (plain
74 return (plain
74 .replace(':', '::')
75 .replace(':', '::')
75 .replace(',', ':,')
76 .replace(',', ':,')
76 .replace(';', ':;')
77 .replace(';', ':;')
77 .replace('=', ':='))
78 .replace('=', ':='))
78 def unescapearg(escaped):
79 def unescapearg(escaped):
79 return (escaped
80 return (escaped
80 .replace(':=', '=')
81 .replace(':=', '=')
81 .replace(':;', ';')
82 .replace(':;', ';')
82 .replace(':,', ',')
83 .replace(':,', ',')
83 .replace('::', ':'))
84 .replace('::', ':'))
84
85
85 # server side
86 # server side
86
87
87 # equivalent of wireproto's global functions
88 # equivalent of wireproto's global functions
88 class server(object):
89 class server(object):
89 def __init__(self, local):
90 def __init__(self, local):
90 self.local = local
91 self.local = local
91 def _call(self, name, args):
92 def _call(self, name, args):
92 args = dict(arg.split('=', 1) for arg in args)
93 args = dict(arg.split('=', 1) for arg in args)
93 return getattr(self, name)(**args)
94 return getattr(self, name)(**args)
94 def perform(self, req):
95 def perform(self, req):
95 print "REQ:", req
96 print "REQ:", req
96 name, args = req.split('?', 1)
97 name, args = req.split('?', 1)
97 args = args.split('&')
98 args = args.split('&')
98 vals = dict(arg.split('=', 1) for arg in args)
99 vals = dict(arg.split('=', 1) for arg in args)
99 res = getattr(self, name)(**vals)
100 res = getattr(self, name)(**vals)
100 print " ->", res
101 print " ->", res
101 return res
102 return res
102 def batch(self, cmds):
103 def batch(self, cmds):
103 res = []
104 res = []
104 for pair in cmds.split(';'):
105 for pair in cmds.split(';'):
105 name, args = pair.split(':', 1)
106 name, args = pair.split(':', 1)
106 vals = {}
107 vals = {}
107 for a in args.split(','):
108 for a in args.split(','):
108 if a:
109 if a:
109 n, v = a.split('=')
110 n, v = a.split('=')
110 vals[n] = unescapearg(v)
111 vals[n] = unescapearg(v)
111 res.append(escapearg(getattr(self, name)(**vals)))
112 res.append(escapearg(getattr(self, name)(**vals)))
112 return ';'.join(res)
113 return ';'.join(res)
113 def foo(self, one, two):
114 def foo(self, one, two):
114 return mangle(self.local.foo(unmangle(one), unmangle(two)))
115 return mangle(self.local.foo(unmangle(one), unmangle(two)))
115 def bar(self, b, a):
116 def bar(self, b, a):
116 return mangle(self.local.bar(unmangle(b), unmangle(a)))
117 return mangle(self.local.bar(unmangle(b), unmangle(a)))
117 def greet(self, name):
118 def greet(self, name):
118 return mangle(self.local.greet(unmangle(name)))
119 return mangle(self.local.greet(unmangle(name)))
119 myserver = server(mylocal)
120 myserver = server(mylocal)
120
121
121 # local side
122 # local side
122
123
123 # equivalent of wireproto.encode/decodelist, that is, type-specific marshalling
124 # equivalent of wireproto.encode/decodelist, that is, type-specific marshalling
124 # here we just transform the strings a bit to check we're properly en-/decoding
125 # here we just transform the strings a bit to check we're properly en-/decoding
125 def mangle(s):
126 def mangle(s):
126 return ''.join(chr(ord(c) + 1) for c in s)
127 return ''.join(chr(ord(c) + 1) for c in s)
127 def unmangle(s):
128 def unmangle(s):
128 return ''.join(chr(ord(c) - 1) for c in s)
129 return ''.join(chr(ord(c) - 1) for c in s)
129
130
130 # equivalent of wireproto.wirerepository and something like http's wire format
131 # equivalent of wireproto.wirerepository and something like http's wire format
131 class remotething(thing):
132 class remotething(thing):
132 def __init__(self, server):
133 def __init__(self, server):
133 self.server = server
134 self.server = server
134 def _submitone(self, name, args):
135 def _submitone(self, name, args):
135 req = name + '?' + '&'.join(['%s=%s' % (n, v) for n, v in args])
136 req = name + '?' + '&'.join(['%s=%s' % (n, v) for n, v in args])
136 return self.server.perform(req)
137 return self.server.perform(req)
137 def _submitbatch(self, cmds):
138 def _submitbatch(self, cmds):
138 req = []
139 req = []
139 for name, args in cmds:
140 for name, args in cmds:
140 args = ','.join(n + '=' + escapearg(v) for n, v in args)
141 args = ','.join(n + '=' + escapearg(v) for n, v in args)
141 req.append(name + ':' + args)
142 req.append(name + ':' + args)
142 req = ';'.join(req)
143 req = ';'.join(req)
143 res = self._submitone('batch', [('cmds', req,)])
144 res = self._submitone('batch', [('cmds', req,)])
144 return res.split(';')
145 return res.split(';')
145
146
146 def batch(self):
147 def batch(self):
147 return remotebatch(self)
148 return remotebatch(self)
148
149
149 @batchable
150 @batchable
150 def foo(self, one, two=None):
151 def foo(self, one, two=None):
151 if not one:
152 if not one:
152 yield "Nope", None
153 yield "Nope", None
153 encargs = [('one', mangle(one),), ('two', mangle(two),)]
154 encargs = [('one', mangle(one),), ('two', mangle(two),)]
154 encresref = future()
155 encresref = future()
155 yield encargs, encresref
156 yield encargs, encresref
156 yield unmangle(encresref.value)
157 yield unmangle(encresref.value)
157
158
158 @batchable
159 @batchable
159 def bar(self, b, a):
160 def bar(self, b, a):
160 encresref = future()
161 encresref = future()
161 yield [('b', mangle(b),), ('a', mangle(a),)], encresref
162 yield [('b', mangle(b),), ('a', mangle(a),)], encresref
162 yield unmangle(encresref.value)
163 yield unmangle(encresref.value)
163
164
164 # greet is coded directly. It therefore does not support batching. If it
165 # greet is coded directly. It therefore does not support batching. If it
165 # does appear in a batch, the batch is split around greet, and the call to
166 # does appear in a batch, the batch is split around greet, and the call to
166 # greet is done in its own roundtrip.
167 # greet is done in its own roundtrip.
167 def greet(self, name=None):
168 def greet(self, name=None):
168 return unmangle(self._submitone('greet', [('name', mangle(name),)]))
169 return unmangle(self._submitone('greet', [('name', mangle(name),)]))
169
170
170 # demo remote usage
171 # demo remote usage
171
172
172 myproxy = remotething(myserver)
173 myproxy = remotething(myserver)
173 print
174 print
174 print "== Remote"
175 print "== Remote"
175 use(myproxy)
176 use(myproxy)
@@ -1,48 +1,52
1 from mercurial import wireproto
1 from mercurial import wireproto
2
2
3 class proto(object):
3 class proto(object):
4 def __init__(self, args):
4 def __init__(self, args):
5 self.args = args
5 self.args = args
6 def getargs(self, spec):
6 def getargs(self, spec):
7 args = self.args
7 args = self.args
8 args.setdefault('*', {})
8 args.setdefault('*', {})
9 names = spec.split()
9 names = spec.split()
10 return [args[n] for n in names]
10 return [args[n] for n in names]
11
11
12 class clientpeer(wireproto.wirepeer):
12 class clientpeer(wireproto.wirepeer):
13 def __init__(self, serverrepo):
13 def __init__(self, serverrepo):
14 self.serverrepo = serverrepo
14 self.serverrepo = serverrepo
15
16 def _capabilities(self):
17 return ['batch']
18
15 def _call(self, cmd, **args):
19 def _call(self, cmd, **args):
16 return wireproto.dispatch(self.serverrepo, proto(args), cmd)
20 return wireproto.dispatch(self.serverrepo, proto(args), cmd)
17
21
18 @wireproto.batchable
22 @wireproto.batchable
19 def greet(self, name):
23 def greet(self, name):
20 f = wireproto.future()
24 f = wireproto.future()
21 yield {'name': mangle(name)}, f
25 yield {'name': mangle(name)}, f
22 yield unmangle(f.value)
26 yield unmangle(f.value)
23
27
24 class serverrepo(object):
28 class serverrepo(object):
25 def greet(self, name):
29 def greet(self, name):
26 return "Hello, " + name
30 return "Hello, " + name
27
31
28 def filtered(self, name):
32 def filtered(self, name):
29 return self
33 return self
30
34
31 def mangle(s):
35 def mangle(s):
32 return ''.join(chr(ord(c) + 1) for c in s)
36 return ''.join(chr(ord(c) + 1) for c in s)
33 def unmangle(s):
37 def unmangle(s):
34 return ''.join(chr(ord(c) - 1) for c in s)
38 return ''.join(chr(ord(c) - 1) for c in s)
35
39
36 def greet(repo, proto, name):
40 def greet(repo, proto, name):
37 return mangle(repo.greet(unmangle(name)))
41 return mangle(repo.greet(unmangle(name)))
38
42
39 wireproto.commands['greet'] = (greet, 'name',)
43 wireproto.commands['greet'] = (greet, 'name',)
40
44
41 srv = serverrepo()
45 srv = serverrepo()
42 clt = clientpeer(srv)
46 clt = clientpeer(srv)
43
47
44 print clt.greet("Foobar")
48 print clt.greet("Foobar")
45 b = clt.batch()
49 b = clt.batch()
46 fs = [b.greet(s) for s in ["Fo, =;:<o", "Bar"]]
50 fs = [b.greet(s) for s in ["Fo, =;:<o", "Bar"]]
47 b.submit()
51 b.submit()
48 print [f.value for f in fs]
52 print [f.value for f in fs]
General Comments 0
You need to be logged in to leave comments. Login now