##// END OF EJS Templates
wireproto: move wireproto capabilities computation in a subfunction...
Pierre-Yves David -
r20775:982f13be default
parent child Browse files
Show More
@@ -1,665 +1,680
1 # wireproto.py - generic wire protocol support functions
1 # wireproto.py - generic wire protocol support functions
2 #
2 #
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 import urllib, tempfile, os, sys
8 import urllib, tempfile, os, sys
9 from i18n import _
9 from i18n import _
10 from node import bin, hex
10 from node import bin, hex
11 import changegroup as changegroupmod
11 import changegroup as changegroupmod
12 import peer, error, encoding, util, store
12 import peer, error, encoding, util, store
13
13
14 # abstract batching support
14 # abstract batching support
15
15
16 class future(object):
16 class future(object):
17 '''placeholder for a value to be set later'''
17 '''placeholder for a value to be set later'''
18 def set(self, value):
18 def set(self, value):
19 if util.safehasattr(self, 'value'):
19 if util.safehasattr(self, 'value'):
20 raise error.RepoError("future is already set")
20 raise error.RepoError("future is already set")
21 self.value = value
21 self.value = value
22
22
23 class batcher(object):
23 class batcher(object):
24 '''base class for batches of commands submittable in a single request
24 '''base class for batches of commands submittable in a single request
25
25
26 All methods invoked on instances of this class are simply queued and
26 All methods invoked on instances of this class are simply queued and
27 return a a future for the result. Once you call submit(), all the queued
27 return a a future for the result. Once you call submit(), all the queued
28 calls are performed and the results set in their respective futures.
28 calls are performed and the results set in their respective futures.
29 '''
29 '''
30 def __init__(self):
30 def __init__(self):
31 self.calls = []
31 self.calls = []
32 def __getattr__(self, name):
32 def __getattr__(self, name):
33 def call(*args, **opts):
33 def call(*args, **opts):
34 resref = future()
34 resref = future()
35 self.calls.append((name, args, opts, resref,))
35 self.calls.append((name, args, opts, resref,))
36 return resref
36 return resref
37 return call
37 return call
38 def submit(self):
38 def submit(self):
39 pass
39 pass
40
40
41 class localbatch(batcher):
41 class localbatch(batcher):
42 '''performs the queued calls directly'''
42 '''performs the queued calls directly'''
43 def __init__(self, local):
43 def __init__(self, local):
44 batcher.__init__(self)
44 batcher.__init__(self)
45 self.local = local
45 self.local = local
46 def submit(self):
46 def submit(self):
47 for name, args, opts, resref in self.calls:
47 for name, args, opts, resref in self.calls:
48 resref.set(getattr(self.local, name)(*args, **opts))
48 resref.set(getattr(self.local, name)(*args, **opts))
49
49
50 class remotebatch(batcher):
50 class remotebatch(batcher):
51 '''batches the queued calls; uses as few roundtrips as possible'''
51 '''batches the queued calls; uses as few roundtrips as possible'''
52 def __init__(self, remote):
52 def __init__(self, remote):
53 '''remote must support _submitbatch(encbatch) and
53 '''remote must support _submitbatch(encbatch) and
54 _submitone(op, encargs)'''
54 _submitone(op, encargs)'''
55 batcher.__init__(self)
55 batcher.__init__(self)
56 self.remote = remote
56 self.remote = remote
57 def submit(self):
57 def submit(self):
58 req, rsp = [], []
58 req, rsp = [], []
59 for name, args, opts, resref in self.calls:
59 for name, args, opts, resref in self.calls:
60 mtd = getattr(self.remote, name)
60 mtd = getattr(self.remote, name)
61 batchablefn = getattr(mtd, 'batchable', None)
61 batchablefn = getattr(mtd, 'batchable', None)
62 if batchablefn is not None:
62 if batchablefn is not None:
63 batchable = batchablefn(mtd.im_self, *args, **opts)
63 batchable = batchablefn(mtd.im_self, *args, **opts)
64 encargsorres, encresref = batchable.next()
64 encargsorres, encresref = batchable.next()
65 if encresref:
65 if encresref:
66 req.append((name, encargsorres,))
66 req.append((name, encargsorres,))
67 rsp.append((batchable, encresref, resref,))
67 rsp.append((batchable, encresref, resref,))
68 else:
68 else:
69 resref.set(encargsorres)
69 resref.set(encargsorres)
70 else:
70 else:
71 if req:
71 if req:
72 self._submitreq(req, rsp)
72 self._submitreq(req, rsp)
73 req, rsp = [], []
73 req, rsp = [], []
74 resref.set(mtd(*args, **opts))
74 resref.set(mtd(*args, **opts))
75 if req:
75 if req:
76 self._submitreq(req, rsp)
76 self._submitreq(req, rsp)
77 def _submitreq(self, req, rsp):
77 def _submitreq(self, req, rsp):
78 encresults = self.remote._submitbatch(req)
78 encresults = self.remote._submitbatch(req)
79 for encres, r in zip(encresults, rsp):
79 for encres, r in zip(encresults, rsp):
80 batchable, encresref, resref = r
80 batchable, encresref, resref = r
81 encresref.set(encres)
81 encresref.set(encres)
82 resref.set(batchable.next())
82 resref.set(batchable.next())
83
83
84 def batchable(f):
84 def batchable(f):
85 '''annotation for batchable methods
85 '''annotation for batchable methods
86
86
87 Such methods must implement a coroutine as follows:
87 Such methods must implement a coroutine as follows:
88
88
89 @batchable
89 @batchable
90 def sample(self, one, two=None):
90 def sample(self, one, two=None):
91 # Handle locally computable results first:
91 # Handle locally computable results first:
92 if not one:
92 if not one:
93 yield "a local result", None
93 yield "a local result", None
94 # Build list of encoded arguments suitable for your wire protocol:
94 # Build list of encoded arguments suitable for your wire protocol:
95 encargs = [('one', encode(one),), ('two', encode(two),)]
95 encargs = [('one', encode(one),), ('two', encode(two),)]
96 # Create future for injection of encoded result:
96 # Create future for injection of encoded result:
97 encresref = future()
97 encresref = future()
98 # Return encoded arguments and future:
98 # Return encoded arguments and future:
99 yield encargs, encresref
99 yield encargs, encresref
100 # Assuming the future to be filled with the result from the batched
100 # Assuming the future to be filled with the result from the batched
101 # request now. Decode it:
101 # request now. Decode it:
102 yield decode(encresref.value)
102 yield decode(encresref.value)
103
103
104 The decorator returns a function which wraps this coroutine as a plain
104 The decorator returns a function which wraps this coroutine as a plain
105 method, but adds the original method as an attribute called "batchable",
105 method, but adds the original method as an attribute called "batchable",
106 which is used by remotebatch to split the call into separate encoding and
106 which is used by remotebatch to split the call into separate encoding and
107 decoding phases.
107 decoding phases.
108 '''
108 '''
109 def plain(*args, **opts):
109 def plain(*args, **opts):
110 batchable = f(*args, **opts)
110 batchable = f(*args, **opts)
111 encargsorres, encresref = batchable.next()
111 encargsorres, encresref = batchable.next()
112 if not encresref:
112 if not encresref:
113 return encargsorres # a local result in this case
113 return encargsorres # a local result in this case
114 self = args[0]
114 self = args[0]
115 encresref.set(self._submitone(f.func_name, encargsorres))
115 encresref.set(self._submitone(f.func_name, encargsorres))
116 return batchable.next()
116 return batchable.next()
117 setattr(plain, 'batchable', f)
117 setattr(plain, 'batchable', f)
118 return plain
118 return plain
119
119
120 # list of nodes encoding / decoding
120 # list of nodes encoding / decoding
121
121
122 def decodelist(l, sep=' '):
122 def decodelist(l, sep=' '):
123 if l:
123 if l:
124 return map(bin, l.split(sep))
124 return map(bin, l.split(sep))
125 return []
125 return []
126
126
127 def encodelist(l, sep=' '):
127 def encodelist(l, sep=' '):
128 return sep.join(map(hex, l))
128 return sep.join(map(hex, l))
129
129
130 # batched call argument encoding
130 # batched call argument encoding
131
131
132 def escapearg(plain):
132 def escapearg(plain):
133 return (plain
133 return (plain
134 .replace(':', '::')
134 .replace(':', '::')
135 .replace(',', ':,')
135 .replace(',', ':,')
136 .replace(';', ':;')
136 .replace(';', ':;')
137 .replace('=', ':='))
137 .replace('=', ':='))
138
138
139 def unescapearg(escaped):
139 def unescapearg(escaped):
140 return (escaped
140 return (escaped
141 .replace(':=', '=')
141 .replace(':=', '=')
142 .replace(':;', ';')
142 .replace(':;', ';')
143 .replace(':,', ',')
143 .replace(':,', ',')
144 .replace('::', ':'))
144 .replace('::', ':'))
145
145
146 # client side
146 # client side
147
147
148 class wirepeer(peer.peerrepository):
148 class wirepeer(peer.peerrepository):
149
149
150 def batch(self):
150 def batch(self):
151 return remotebatch(self)
151 return remotebatch(self)
152 def _submitbatch(self, req):
152 def _submitbatch(self, req):
153 cmds = []
153 cmds = []
154 for op, argsdict in req:
154 for op, argsdict in req:
155 args = ','.join('%s=%s' % p for p in argsdict.iteritems())
155 args = ','.join('%s=%s' % p for p in argsdict.iteritems())
156 cmds.append('%s %s' % (op, args))
156 cmds.append('%s %s' % (op, args))
157 rsp = self._call("batch", cmds=';'.join(cmds))
157 rsp = self._call("batch", cmds=';'.join(cmds))
158 return rsp.split(';')
158 return rsp.split(';')
159 def _submitone(self, op, args):
159 def _submitone(self, op, args):
160 return self._call(op, **args)
160 return self._call(op, **args)
161
161
162 @batchable
162 @batchable
163 def lookup(self, key):
163 def lookup(self, key):
164 self.requirecap('lookup', _('look up remote revision'))
164 self.requirecap('lookup', _('look up remote revision'))
165 f = future()
165 f = future()
166 yield {'key': encoding.fromlocal(key)}, f
166 yield {'key': encoding.fromlocal(key)}, f
167 d = f.value
167 d = f.value
168 success, data = d[:-1].split(" ", 1)
168 success, data = d[:-1].split(" ", 1)
169 if int(success):
169 if int(success):
170 yield bin(data)
170 yield bin(data)
171 self._abort(error.RepoError(data))
171 self._abort(error.RepoError(data))
172
172
173 @batchable
173 @batchable
174 def heads(self):
174 def heads(self):
175 f = future()
175 f = future()
176 yield {}, f
176 yield {}, f
177 d = f.value
177 d = f.value
178 try:
178 try:
179 yield decodelist(d[:-1])
179 yield decodelist(d[:-1])
180 except ValueError:
180 except ValueError:
181 self._abort(error.ResponseError(_("unexpected response:"), d))
181 self._abort(error.ResponseError(_("unexpected response:"), d))
182
182
183 @batchable
183 @batchable
184 def known(self, nodes):
184 def known(self, nodes):
185 f = future()
185 f = future()
186 yield {'nodes': encodelist(nodes)}, f
186 yield {'nodes': encodelist(nodes)}, f
187 d = f.value
187 d = f.value
188 try:
188 try:
189 yield [bool(int(f)) for f in d]
189 yield [bool(int(f)) for f in d]
190 except ValueError:
190 except ValueError:
191 self._abort(error.ResponseError(_("unexpected response:"), d))
191 self._abort(error.ResponseError(_("unexpected response:"), d))
192
192
193 @batchable
193 @batchable
194 def branchmap(self):
194 def branchmap(self):
195 f = future()
195 f = future()
196 yield {}, f
196 yield {}, f
197 d = f.value
197 d = f.value
198 try:
198 try:
199 branchmap = {}
199 branchmap = {}
200 for branchpart in d.splitlines():
200 for branchpart in d.splitlines():
201 branchname, branchheads = branchpart.split(' ', 1)
201 branchname, branchheads = branchpart.split(' ', 1)
202 branchname = encoding.tolocal(urllib.unquote(branchname))
202 branchname = encoding.tolocal(urllib.unquote(branchname))
203 branchheads = decodelist(branchheads)
203 branchheads = decodelist(branchheads)
204 branchmap[branchname] = branchheads
204 branchmap[branchname] = branchheads
205 yield branchmap
205 yield branchmap
206 except TypeError:
206 except TypeError:
207 self._abort(error.ResponseError(_("unexpected response:"), d))
207 self._abort(error.ResponseError(_("unexpected response:"), d))
208
208
209 def branches(self, nodes):
209 def branches(self, nodes):
210 n = encodelist(nodes)
210 n = encodelist(nodes)
211 d = self._call("branches", nodes=n)
211 d = self._call("branches", nodes=n)
212 try:
212 try:
213 br = [tuple(decodelist(b)) for b in d.splitlines()]
213 br = [tuple(decodelist(b)) for b in d.splitlines()]
214 return br
214 return br
215 except ValueError:
215 except ValueError:
216 self._abort(error.ResponseError(_("unexpected response:"), d))
216 self._abort(error.ResponseError(_("unexpected response:"), d))
217
217
218 def between(self, pairs):
218 def between(self, pairs):
219 batch = 8 # avoid giant requests
219 batch = 8 # avoid giant requests
220 r = []
220 r = []
221 for i in xrange(0, len(pairs), batch):
221 for i in xrange(0, len(pairs), batch):
222 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
222 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
223 d = self._call("between", pairs=n)
223 d = self._call("between", pairs=n)
224 try:
224 try:
225 r.extend(l and decodelist(l) or [] for l in d.splitlines())
225 r.extend(l and decodelist(l) or [] for l in d.splitlines())
226 except ValueError:
226 except ValueError:
227 self._abort(error.ResponseError(_("unexpected response:"), d))
227 self._abort(error.ResponseError(_("unexpected response:"), d))
228 return r
228 return r
229
229
230 @batchable
230 @batchable
231 def pushkey(self, namespace, key, old, new):
231 def pushkey(self, namespace, key, old, new):
232 if not self.capable('pushkey'):
232 if not self.capable('pushkey'):
233 yield False, None
233 yield False, None
234 f = future()
234 f = future()
235 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
235 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
236 yield {'namespace': encoding.fromlocal(namespace),
236 yield {'namespace': encoding.fromlocal(namespace),
237 'key': encoding.fromlocal(key),
237 'key': encoding.fromlocal(key),
238 'old': encoding.fromlocal(old),
238 'old': encoding.fromlocal(old),
239 'new': encoding.fromlocal(new)}, f
239 'new': encoding.fromlocal(new)}, f
240 d = f.value
240 d = f.value
241 d, output = d.split('\n', 1)
241 d, output = d.split('\n', 1)
242 try:
242 try:
243 d = bool(int(d))
243 d = bool(int(d))
244 except ValueError:
244 except ValueError:
245 raise error.ResponseError(
245 raise error.ResponseError(
246 _('push failed (unexpected response):'), d)
246 _('push failed (unexpected response):'), d)
247 for l in output.splitlines(True):
247 for l in output.splitlines(True):
248 self.ui.status(_('remote: '), l)
248 self.ui.status(_('remote: '), l)
249 yield d
249 yield d
250
250
251 @batchable
251 @batchable
252 def listkeys(self, namespace):
252 def listkeys(self, namespace):
253 if not self.capable('pushkey'):
253 if not self.capable('pushkey'):
254 yield {}, None
254 yield {}, None
255 f = future()
255 f = future()
256 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
256 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
257 yield {'namespace': encoding.fromlocal(namespace)}, f
257 yield {'namespace': encoding.fromlocal(namespace)}, f
258 d = f.value
258 d = f.value
259 r = {}
259 r = {}
260 for l in d.splitlines():
260 for l in d.splitlines():
261 k, v = l.split('\t')
261 k, v = l.split('\t')
262 r[encoding.tolocal(k)] = encoding.tolocal(v)
262 r[encoding.tolocal(k)] = encoding.tolocal(v)
263 yield r
263 yield r
264
264
265 def stream_out(self):
265 def stream_out(self):
266 return self._callstream('stream_out')
266 return self._callstream('stream_out')
267
267
268 def changegroup(self, nodes, kind):
268 def changegroup(self, nodes, kind):
269 n = encodelist(nodes)
269 n = encodelist(nodes)
270 f = self._callstream("changegroup", roots=n)
270 f = self._callstream("changegroup", roots=n)
271 return changegroupmod.unbundle10(self._decompress(f), 'UN')
271 return changegroupmod.unbundle10(self._decompress(f), 'UN')
272
272
273 def changegroupsubset(self, bases, heads, kind):
273 def changegroupsubset(self, bases, heads, kind):
274 self.requirecap('changegroupsubset', _('look up remote changes'))
274 self.requirecap('changegroupsubset', _('look up remote changes'))
275 bases = encodelist(bases)
275 bases = encodelist(bases)
276 heads = encodelist(heads)
276 heads = encodelist(heads)
277 f = self._callstream("changegroupsubset",
277 f = self._callstream("changegroupsubset",
278 bases=bases, heads=heads)
278 bases=bases, heads=heads)
279 return changegroupmod.unbundle10(self._decompress(f), 'UN')
279 return changegroupmod.unbundle10(self._decompress(f), 'UN')
280
280
281 def getbundle(self, source, heads=None, common=None, bundlecaps=None):
281 def getbundle(self, source, heads=None, common=None, bundlecaps=None):
282 self.requirecap('getbundle', _('look up remote changes'))
282 self.requirecap('getbundle', _('look up remote changes'))
283 opts = {}
283 opts = {}
284 if heads is not None:
284 if heads is not None:
285 opts['heads'] = encodelist(heads)
285 opts['heads'] = encodelist(heads)
286 if common is not None:
286 if common is not None:
287 opts['common'] = encodelist(common)
287 opts['common'] = encodelist(common)
288 if bundlecaps is not None:
288 if bundlecaps is not None:
289 opts['bundlecaps'] = ','.join(bundlecaps)
289 opts['bundlecaps'] = ','.join(bundlecaps)
290 f = self._callstream("getbundle", **opts)
290 f = self._callstream("getbundle", **opts)
291 return changegroupmod.unbundle10(self._decompress(f), 'UN')
291 return changegroupmod.unbundle10(self._decompress(f), 'UN')
292
292
293 def unbundle(self, cg, heads, source):
293 def unbundle(self, cg, heads, source):
294 '''Send cg (a readable file-like object representing the
294 '''Send cg (a readable file-like object representing the
295 changegroup to push, typically a chunkbuffer object) to the
295 changegroup to push, typically a chunkbuffer object) to the
296 remote server as a bundle. Return an integer indicating the
296 remote server as a bundle. Return an integer indicating the
297 result of the push (see localrepository.addchangegroup()).'''
297 result of the push (see localrepository.addchangegroup()).'''
298
298
299 if heads != ['force'] and self.capable('unbundlehash'):
299 if heads != ['force'] and self.capable('unbundlehash'):
300 heads = encodelist(['hashed',
300 heads = encodelist(['hashed',
301 util.sha1(''.join(sorted(heads))).digest()])
301 util.sha1(''.join(sorted(heads))).digest()])
302 else:
302 else:
303 heads = encodelist(heads)
303 heads = encodelist(heads)
304
304
305 ret, output = self._callpush("unbundle", cg, heads=heads)
305 ret, output = self._callpush("unbundle", cg, heads=heads)
306 if ret == "":
306 if ret == "":
307 raise error.ResponseError(
307 raise error.ResponseError(
308 _('push failed:'), output)
308 _('push failed:'), output)
309 try:
309 try:
310 ret = int(ret)
310 ret = int(ret)
311 except ValueError:
311 except ValueError:
312 raise error.ResponseError(
312 raise error.ResponseError(
313 _('push failed (unexpected response):'), ret)
313 _('push failed (unexpected response):'), ret)
314
314
315 for l in output.splitlines(True):
315 for l in output.splitlines(True):
316 self.ui.status(_('remote: '), l)
316 self.ui.status(_('remote: '), l)
317 return ret
317 return ret
318
318
319 def debugwireargs(self, one, two, three=None, four=None, five=None):
319 def debugwireargs(self, one, two, three=None, four=None, five=None):
320 # don't pass optional arguments left at their default value
320 # don't pass optional arguments left at their default value
321 opts = {}
321 opts = {}
322 if three is not None:
322 if three is not None:
323 opts['three'] = three
323 opts['three'] = three
324 if four is not None:
324 if four is not None:
325 opts['four'] = four
325 opts['four'] = four
326 return self._call('debugwireargs', one=one, two=two, **opts)
326 return self._call('debugwireargs', one=one, two=two, **opts)
327
327
328 # server side
328 # server side
329
329
330 class streamres(object):
330 class streamres(object):
331 def __init__(self, gen):
331 def __init__(self, gen):
332 self.gen = gen
332 self.gen = gen
333
333
334 class pushres(object):
334 class pushres(object):
335 def __init__(self, res):
335 def __init__(self, res):
336 self.res = res
336 self.res = res
337
337
338 class pusherr(object):
338 class pusherr(object):
339 def __init__(self, res):
339 def __init__(self, res):
340 self.res = res
340 self.res = res
341
341
342 class ooberror(object):
342 class ooberror(object):
343 def __init__(self, message):
343 def __init__(self, message):
344 self.message = message
344 self.message = message
345
345
346 def dispatch(repo, proto, command):
346 def dispatch(repo, proto, command):
347 repo = repo.filtered("served")
347 repo = repo.filtered("served")
348 func, spec = commands[command]
348 func, spec = commands[command]
349 args = proto.getargs(spec)
349 args = proto.getargs(spec)
350 return func(repo, proto, *args)
350 return func(repo, proto, *args)
351
351
352 def options(cmd, keys, others):
352 def options(cmd, keys, others):
353 opts = {}
353 opts = {}
354 for k in keys:
354 for k in keys:
355 if k in others:
355 if k in others:
356 opts[k] = others[k]
356 opts[k] = others[k]
357 del others[k]
357 del others[k]
358 if others:
358 if others:
359 sys.stderr.write("abort: %s got unexpected arguments %s\n"
359 sys.stderr.write("abort: %s got unexpected arguments %s\n"
360 % (cmd, ",".join(others)))
360 % (cmd, ",".join(others)))
361 return opts
361 return opts
362
362
363 def batch(repo, proto, cmds, others):
363 def batch(repo, proto, cmds, others):
364 repo = repo.filtered("served")
364 repo = repo.filtered("served")
365 res = []
365 res = []
366 for pair in cmds.split(';'):
366 for pair in cmds.split(';'):
367 op, args = pair.split(' ', 1)
367 op, args = pair.split(' ', 1)
368 vals = {}
368 vals = {}
369 for a in args.split(','):
369 for a in args.split(','):
370 if a:
370 if a:
371 n, v = a.split('=')
371 n, v = a.split('=')
372 vals[n] = unescapearg(v)
372 vals[n] = unescapearg(v)
373 func, spec = commands[op]
373 func, spec = commands[op]
374 if spec:
374 if spec:
375 keys = spec.split()
375 keys = spec.split()
376 data = {}
376 data = {}
377 for k in keys:
377 for k in keys:
378 if k == '*':
378 if k == '*':
379 star = {}
379 star = {}
380 for key in vals.keys():
380 for key in vals.keys():
381 if key not in keys:
381 if key not in keys:
382 star[key] = vals[key]
382 star[key] = vals[key]
383 data['*'] = star
383 data['*'] = star
384 else:
384 else:
385 data[k] = vals[k]
385 data[k] = vals[k]
386 result = func(repo, proto, *[data[k] for k in keys])
386 result = func(repo, proto, *[data[k] for k in keys])
387 else:
387 else:
388 result = func(repo, proto)
388 result = func(repo, proto)
389 if isinstance(result, ooberror):
389 if isinstance(result, ooberror):
390 return result
390 return result
391 res.append(escapearg(result))
391 res.append(escapearg(result))
392 return ';'.join(res)
392 return ';'.join(res)
393
393
394 def between(repo, proto, pairs):
394 def between(repo, proto, pairs):
395 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
395 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
396 r = []
396 r = []
397 for b in repo.between(pairs):
397 for b in repo.between(pairs):
398 r.append(encodelist(b) + "\n")
398 r.append(encodelist(b) + "\n")
399 return "".join(r)
399 return "".join(r)
400
400
401 def branchmap(repo, proto):
401 def branchmap(repo, proto):
402 branchmap = repo.branchmap()
402 branchmap = repo.branchmap()
403 heads = []
403 heads = []
404 for branch, nodes in branchmap.iteritems():
404 for branch, nodes in branchmap.iteritems():
405 branchname = urllib.quote(encoding.fromlocal(branch))
405 branchname = urllib.quote(encoding.fromlocal(branch))
406 branchnodes = encodelist(nodes)
406 branchnodes = encodelist(nodes)
407 heads.append('%s %s' % (branchname, branchnodes))
407 heads.append('%s %s' % (branchname, branchnodes))
408 return '\n'.join(heads)
408 return '\n'.join(heads)
409
409
410 def branches(repo, proto, nodes):
410 def branches(repo, proto, nodes):
411 nodes = decodelist(nodes)
411 nodes = decodelist(nodes)
412 r = []
412 r = []
413 for b in repo.branches(nodes):
413 for b in repo.branches(nodes):
414 r.append(encodelist(b) + "\n")
414 r.append(encodelist(b) + "\n")
415 return "".join(r)
415 return "".join(r)
416
416
417
417
418 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
418 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
419 'known', 'getbundle', 'unbundlehash', 'batch']
419 'known', 'getbundle', 'unbundlehash', 'batch']
420 def capabilities(repo, proto):
420
421 def _capabilities(repo, proto):
422 """return a list of capabilities for a repo
423
424 This function exists to allow extensions to easily wrap capabilities
425 computation
426
427 - returns a lists: easy to alter
428 - change done here will be propagated to both `capabilities` and `hello`
429 command without any other effort. without any other action needed.
430 """
421 # copy to prevent modification of the global list
431 # copy to prevent modification of the global list
422 caps = list(wireprotocaps)
432 caps = list(wireprotocaps)
423 if _allowstream(repo.ui):
433 if _allowstream(repo.ui):
424 if repo.ui.configbool('server', 'preferuncompressed', False):
434 if repo.ui.configbool('server', 'preferuncompressed', False):
425 caps.append('stream-preferred')
435 caps.append('stream-preferred')
426 requiredformats = repo.requirements & repo.supportedformats
436 requiredformats = repo.requirements & repo.supportedformats
427 # if our local revlogs are just revlogv1, add 'stream' cap
437 # if our local revlogs are just revlogv1, add 'stream' cap
428 if not requiredformats - set(('revlogv1',)):
438 if not requiredformats - set(('revlogv1',)):
429 caps.append('stream')
439 caps.append('stream')
430 # otherwise, add 'streamreqs' detailing our local revlog format
440 # otherwise, add 'streamreqs' detailing our local revlog format
431 else:
441 else:
432 caps.append('streamreqs=%s' % ','.join(requiredformats))
442 caps.append('streamreqs=%s' % ','.join(requiredformats))
433 caps.append('unbundle=%s' % ','.join(changegroupmod.bundlepriority))
443 caps.append('unbundle=%s' % ','.join(changegroupmod.bundlepriority))
434 caps.append('httpheader=1024')
444 caps.append('httpheader=1024')
435 return ' '.join(caps)
445 return caps
446
447 # If you are writting and extension and consider wrapping this function. Wrap
448 # `_capabilities` instead.
449 def capabilities(repo, proto):
450 return ' '.join(_capabilities(repo, proto))
436
451
437 def changegroup(repo, proto, roots):
452 def changegroup(repo, proto, roots):
438 nodes = decodelist(roots)
453 nodes = decodelist(roots)
439 cg = repo.changegroup(nodes, 'serve')
454 cg = repo.changegroup(nodes, 'serve')
440 return streamres(proto.groupchunks(cg))
455 return streamres(proto.groupchunks(cg))
441
456
442 def changegroupsubset(repo, proto, bases, heads):
457 def changegroupsubset(repo, proto, bases, heads):
443 bases = decodelist(bases)
458 bases = decodelist(bases)
444 heads = decodelist(heads)
459 heads = decodelist(heads)
445 cg = repo.changegroupsubset(bases, heads, 'serve')
460 cg = repo.changegroupsubset(bases, heads, 'serve')
446 return streamres(proto.groupchunks(cg))
461 return streamres(proto.groupchunks(cg))
447
462
448 def debugwireargs(repo, proto, one, two, others):
463 def debugwireargs(repo, proto, one, two, others):
449 # only accept optional args from the known set
464 # only accept optional args from the known set
450 opts = options('debugwireargs', ['three', 'four'], others)
465 opts = options('debugwireargs', ['three', 'four'], others)
451 return repo.debugwireargs(one, two, **opts)
466 return repo.debugwireargs(one, two, **opts)
452
467
453 def getbundle(repo, proto, others):
468 def getbundle(repo, proto, others):
454 opts = options('getbundle', ['heads', 'common', 'bundlecaps'], others)
469 opts = options('getbundle', ['heads', 'common', 'bundlecaps'], others)
455 for k, v in opts.iteritems():
470 for k, v in opts.iteritems():
456 if k in ('heads', 'common'):
471 if k in ('heads', 'common'):
457 opts[k] = decodelist(v)
472 opts[k] = decodelist(v)
458 elif k == 'bundlecaps':
473 elif k == 'bundlecaps':
459 opts[k] = set(v.split(','))
474 opts[k] = set(v.split(','))
460 cg = repo.getbundle('serve', **opts)
475 cg = repo.getbundle('serve', **opts)
461 return streamres(proto.groupchunks(cg))
476 return streamres(proto.groupchunks(cg))
462
477
463 def heads(repo, proto):
478 def heads(repo, proto):
464 h = repo.heads()
479 h = repo.heads()
465 return encodelist(h) + "\n"
480 return encodelist(h) + "\n"
466
481
467 def hello(repo, proto):
482 def hello(repo, proto):
468 '''the hello command returns a set of lines describing various
483 '''the hello command returns a set of lines describing various
469 interesting things about the server, in an RFC822-like format.
484 interesting things about the server, in an RFC822-like format.
470 Currently the only one defined is "capabilities", which
485 Currently the only one defined is "capabilities", which
471 consists of a line in the form:
486 consists of a line in the form:
472
487
473 capabilities: space separated list of tokens
488 capabilities: space separated list of tokens
474 '''
489 '''
475 return "capabilities: %s\n" % (capabilities(repo, proto))
490 return "capabilities: %s\n" % (capabilities(repo, proto))
476
491
477 def listkeys(repo, proto, namespace):
492 def listkeys(repo, proto, namespace):
478 d = repo.listkeys(encoding.tolocal(namespace)).items()
493 d = repo.listkeys(encoding.tolocal(namespace)).items()
479 t = '\n'.join(['%s\t%s' % (encoding.fromlocal(k), encoding.fromlocal(v))
494 t = '\n'.join(['%s\t%s' % (encoding.fromlocal(k), encoding.fromlocal(v))
480 for k, v in d])
495 for k, v in d])
481 return t
496 return t
482
497
483 def lookup(repo, proto, key):
498 def lookup(repo, proto, key):
484 try:
499 try:
485 k = encoding.tolocal(key)
500 k = encoding.tolocal(key)
486 c = repo[k]
501 c = repo[k]
487 r = c.hex()
502 r = c.hex()
488 success = 1
503 success = 1
489 except Exception, inst:
504 except Exception, inst:
490 r = str(inst)
505 r = str(inst)
491 success = 0
506 success = 0
492 return "%s %s\n" % (success, r)
507 return "%s %s\n" % (success, r)
493
508
494 def known(repo, proto, nodes, others):
509 def known(repo, proto, nodes, others):
495 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
510 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
496
511
497 def pushkey(repo, proto, namespace, key, old, new):
512 def pushkey(repo, proto, namespace, key, old, new):
498 # compatibility with pre-1.8 clients which were accidentally
513 # compatibility with pre-1.8 clients which were accidentally
499 # sending raw binary nodes rather than utf-8-encoded hex
514 # sending raw binary nodes rather than utf-8-encoded hex
500 if len(new) == 20 and new.encode('string-escape') != new:
515 if len(new) == 20 and new.encode('string-escape') != new:
501 # looks like it could be a binary node
516 # looks like it could be a binary node
502 try:
517 try:
503 new.decode('utf-8')
518 new.decode('utf-8')
504 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
519 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
505 except UnicodeDecodeError:
520 except UnicodeDecodeError:
506 pass # binary, leave unmodified
521 pass # binary, leave unmodified
507 else:
522 else:
508 new = encoding.tolocal(new) # normal path
523 new = encoding.tolocal(new) # normal path
509
524
510 if util.safehasattr(proto, 'restore'):
525 if util.safehasattr(proto, 'restore'):
511
526
512 proto.redirect()
527 proto.redirect()
513
528
514 try:
529 try:
515 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
530 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
516 encoding.tolocal(old), new) or False
531 encoding.tolocal(old), new) or False
517 except util.Abort:
532 except util.Abort:
518 r = False
533 r = False
519
534
520 output = proto.restore()
535 output = proto.restore()
521
536
522 return '%s\n%s' % (int(r), output)
537 return '%s\n%s' % (int(r), output)
523
538
524 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
539 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
525 encoding.tolocal(old), new)
540 encoding.tolocal(old), new)
526 return '%s\n' % int(r)
541 return '%s\n' % int(r)
527
542
528 def _allowstream(ui):
543 def _allowstream(ui):
529 return ui.configbool('server', 'uncompressed', True, untrusted=True)
544 return ui.configbool('server', 'uncompressed', True, untrusted=True)
530
545
531 def _walkstreamfiles(repo):
546 def _walkstreamfiles(repo):
532 # this is it's own function so extensions can override it
547 # this is it's own function so extensions can override it
533 return repo.store.walk()
548 return repo.store.walk()
534
549
535 def stream(repo, proto):
550 def stream(repo, proto):
536 '''If the server supports streaming clone, it advertises the "stream"
551 '''If the server supports streaming clone, it advertises the "stream"
537 capability with a value representing the version and flags of the repo
552 capability with a value representing the version and flags of the repo
538 it is serving. Client checks to see if it understands the format.
553 it is serving. Client checks to see if it understands the format.
539
554
540 The format is simple: the server writes out a line with the amount
555 The format is simple: the server writes out a line with the amount
541 of files, then the total amount of bytes to be transferred (separated
556 of files, then the total amount of bytes to be transferred (separated
542 by a space). Then, for each file, the server first writes the filename
557 by a space). Then, for each file, the server first writes the filename
543 and filesize (separated by the null character), then the file contents.
558 and filesize (separated by the null character), then the file contents.
544 '''
559 '''
545
560
546 if not _allowstream(repo.ui):
561 if not _allowstream(repo.ui):
547 return '1\n'
562 return '1\n'
548
563
549 entries = []
564 entries = []
550 total_bytes = 0
565 total_bytes = 0
551 try:
566 try:
552 # get consistent snapshot of repo, lock during scan
567 # get consistent snapshot of repo, lock during scan
553 lock = repo.lock()
568 lock = repo.lock()
554 try:
569 try:
555 repo.ui.debug('scanning\n')
570 repo.ui.debug('scanning\n')
556 for name, ename, size in _walkstreamfiles(repo):
571 for name, ename, size in _walkstreamfiles(repo):
557 if size:
572 if size:
558 entries.append((name, size))
573 entries.append((name, size))
559 total_bytes += size
574 total_bytes += size
560 finally:
575 finally:
561 lock.release()
576 lock.release()
562 except error.LockError:
577 except error.LockError:
563 return '2\n' # error: 2
578 return '2\n' # error: 2
564
579
565 def streamer(repo, entries, total):
580 def streamer(repo, entries, total):
566 '''stream out all metadata files in repository.'''
581 '''stream out all metadata files in repository.'''
567 yield '0\n' # success
582 yield '0\n' # success
568 repo.ui.debug('%d files, %d bytes to transfer\n' %
583 repo.ui.debug('%d files, %d bytes to transfer\n' %
569 (len(entries), total_bytes))
584 (len(entries), total_bytes))
570 yield '%d %d\n' % (len(entries), total_bytes)
585 yield '%d %d\n' % (len(entries), total_bytes)
571
586
572 sopener = repo.sopener
587 sopener = repo.sopener
573 oldaudit = sopener.mustaudit
588 oldaudit = sopener.mustaudit
574 debugflag = repo.ui.debugflag
589 debugflag = repo.ui.debugflag
575 sopener.mustaudit = False
590 sopener.mustaudit = False
576
591
577 try:
592 try:
578 for name, size in entries:
593 for name, size in entries:
579 if debugflag:
594 if debugflag:
580 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
595 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
581 # partially encode name over the wire for backwards compat
596 # partially encode name over the wire for backwards compat
582 yield '%s\0%d\n' % (store.encodedir(name), size)
597 yield '%s\0%d\n' % (store.encodedir(name), size)
583 if size <= 65536:
598 if size <= 65536:
584 fp = sopener(name)
599 fp = sopener(name)
585 try:
600 try:
586 data = fp.read(size)
601 data = fp.read(size)
587 finally:
602 finally:
588 fp.close()
603 fp.close()
589 yield data
604 yield data
590 else:
605 else:
591 for chunk in util.filechunkiter(sopener(name), limit=size):
606 for chunk in util.filechunkiter(sopener(name), limit=size):
592 yield chunk
607 yield chunk
593 # replace with "finally:" when support for python 2.4 has been dropped
608 # replace with "finally:" when support for python 2.4 has been dropped
594 except Exception:
609 except Exception:
595 sopener.mustaudit = oldaudit
610 sopener.mustaudit = oldaudit
596 raise
611 raise
597 sopener.mustaudit = oldaudit
612 sopener.mustaudit = oldaudit
598
613
599 return streamres(streamer(repo, entries, total_bytes))
614 return streamres(streamer(repo, entries, total_bytes))
600
615
601 def unbundle(repo, proto, heads):
616 def unbundle(repo, proto, heads):
602 their_heads = decodelist(heads)
617 their_heads = decodelist(heads)
603
618
604 def check_heads():
619 def check_heads():
605 heads = repo.heads()
620 heads = repo.heads()
606 heads_hash = util.sha1(''.join(sorted(heads))).digest()
621 heads_hash = util.sha1(''.join(sorted(heads))).digest()
607 return (their_heads == ['force'] or their_heads == heads or
622 return (their_heads == ['force'] or their_heads == heads or
608 their_heads == ['hashed', heads_hash])
623 their_heads == ['hashed', heads_hash])
609
624
610 proto.redirect()
625 proto.redirect()
611
626
612 # fail early if possible
627 # fail early if possible
613 if not check_heads():
628 if not check_heads():
614 return pusherr('repository changed while preparing changes - '
629 return pusherr('repository changed while preparing changes - '
615 'please try again')
630 'please try again')
616
631
617 # write bundle data to temporary file because it can be big
632 # write bundle data to temporary file because it can be big
618 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
633 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
619 fp = os.fdopen(fd, 'wb+')
634 fp = os.fdopen(fd, 'wb+')
620 r = 0
635 r = 0
621 try:
636 try:
622 proto.getfile(fp)
637 proto.getfile(fp)
623 lock = repo.lock()
638 lock = repo.lock()
624 try:
639 try:
625 if not check_heads():
640 if not check_heads():
626 # someone else committed/pushed/unbundled while we
641 # someone else committed/pushed/unbundled while we
627 # were transferring data
642 # were transferring data
628 return pusherr('repository changed while uploading changes - '
643 return pusherr('repository changed while uploading changes - '
629 'please try again')
644 'please try again')
630
645
631 # push can proceed
646 # push can proceed
632 fp.seek(0)
647 fp.seek(0)
633 gen = changegroupmod.readbundle(fp, None)
648 gen = changegroupmod.readbundle(fp, None)
634
649
635 try:
650 try:
636 r = repo.addchangegroup(gen, 'serve', proto._client())
651 r = repo.addchangegroup(gen, 'serve', proto._client())
637 except util.Abort, inst:
652 except util.Abort, inst:
638 sys.stderr.write("abort: %s\n" % inst)
653 sys.stderr.write("abort: %s\n" % inst)
639 finally:
654 finally:
640 lock.release()
655 lock.release()
641 return pushres(r)
656 return pushres(r)
642
657
643 finally:
658 finally:
644 fp.close()
659 fp.close()
645 os.unlink(tempname)
660 os.unlink(tempname)
646
661
647 commands = {
662 commands = {
648 'batch': (batch, 'cmds *'),
663 'batch': (batch, 'cmds *'),
649 'between': (between, 'pairs'),
664 'between': (between, 'pairs'),
650 'branchmap': (branchmap, ''),
665 'branchmap': (branchmap, ''),
651 'branches': (branches, 'nodes'),
666 'branches': (branches, 'nodes'),
652 'capabilities': (capabilities, ''),
667 'capabilities': (capabilities, ''),
653 'changegroup': (changegroup, 'roots'),
668 'changegroup': (changegroup, 'roots'),
654 'changegroupsubset': (changegroupsubset, 'bases heads'),
669 'changegroupsubset': (changegroupsubset, 'bases heads'),
655 'debugwireargs': (debugwireargs, 'one two *'),
670 'debugwireargs': (debugwireargs, 'one two *'),
656 'getbundle': (getbundle, '*'),
671 'getbundle': (getbundle, '*'),
657 'heads': (heads, ''),
672 'heads': (heads, ''),
658 'hello': (hello, ''),
673 'hello': (hello, ''),
659 'known': (known, 'nodes *'),
674 'known': (known, 'nodes *'),
660 'listkeys': (listkeys, 'namespace'),
675 'listkeys': (listkeys, 'namespace'),
661 'lookup': (lookup, 'key'),
676 'lookup': (lookup, 'key'),
662 'pushkey': (pushkey, 'namespace key old new'),
677 'pushkey': (pushkey, 'namespace key old new'),
663 'stream_out': (stream, ''),
678 'stream_out': (stream, ''),
664 'unbundle': (unbundle, 'heads'),
679 'unbundle': (unbundle, 'heads'),
665 }
680 }
General Comments 0
You need to be logged in to leave comments. Login now