##// END OF EJS Templates
wireproto: make a number of commands batchable...
Peter Arrenbrecht -
r14623:e7c9fdbb default
parent child Browse files
Show More
@@ -1,586 +1,602 b''
1 # wireproto.py - generic wire protocol support functions
1 # wireproto.py - generic wire protocol support functions
2 #
2 #
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 import urllib, tempfile, os, sys
8 import urllib, tempfile, os, sys
9 from i18n import _
9 from i18n import _
10 from node import bin, hex
10 from node import bin, hex
11 import changegroup as changegroupmod
11 import changegroup as changegroupmod
12 import repo, error, encoding, util, store
12 import repo, error, encoding, util, store
13 import pushkey as pushkeymod
13 import pushkey as pushkeymod
14
14
15 # abstract batching support
15 # abstract batching support
16
16
17 class future(object):
17 class future(object):
18 '''placeholder for a value to be set later'''
18 '''placeholder for a value to be set later'''
19 def set(self, value):
19 def set(self, value):
20 if hasattr(self, 'value'):
20 if hasattr(self, 'value'):
21 raise error.RepoError("future is already set")
21 raise error.RepoError("future is already set")
22 self.value = value
22 self.value = value
23
23
24 class batcher(object):
24 class batcher(object):
25 '''base class for batches of commands submittable in a single request
25 '''base class for batches of commands submittable in a single request
26
26
27 All methods invoked on instances of this class are simply queued and return a
27 All methods invoked on instances of this class are simply queued and return a
28 a future for the result. Once you call submit(), all the queued calls are
28 a future for the result. Once you call submit(), all the queued calls are
29 performed and the results set in their respective futures.
29 performed and the results set in their respective futures.
30 '''
30 '''
31 def __init__(self):
31 def __init__(self):
32 self.calls = []
32 self.calls = []
33 def __getattr__(self, name):
33 def __getattr__(self, name):
34 def call(*args, **opts):
34 def call(*args, **opts):
35 resref = future()
35 resref = future()
36 self.calls.append((name, args, opts, resref,))
36 self.calls.append((name, args, opts, resref,))
37 return resref
37 return resref
38 return call
38 return call
39 def submit(self):
39 def submit(self):
40 pass
40 pass
41
41
42 class localbatch(batcher):
42 class localbatch(batcher):
43 '''performs the queued calls directly'''
43 '''performs the queued calls directly'''
44 def __init__(self, local):
44 def __init__(self, local):
45 batcher.__init__(self)
45 batcher.__init__(self)
46 self.local = local
46 self.local = local
47 def submit(self):
47 def submit(self):
48 for name, args, opts, resref in self.calls:
48 for name, args, opts, resref in self.calls:
49 resref.set(getattr(self.local, name)(*args, **opts))
49 resref.set(getattr(self.local, name)(*args, **opts))
50
50
51 class remotebatch(batcher):
51 class remotebatch(batcher):
52 '''batches the queued calls; uses as few roundtrips as possible'''
52 '''batches the queued calls; uses as few roundtrips as possible'''
53 def __init__(self, remote):
53 def __init__(self, remote):
54 '''remote must support _submitbatch(encbatch) and _submitone(op, encargs)'''
54 '''remote must support _submitbatch(encbatch) and _submitone(op, encargs)'''
55 batcher.__init__(self)
55 batcher.__init__(self)
56 self.remote = remote
56 self.remote = remote
57 def submit(self):
57 def submit(self):
58 req, rsp = [], []
58 req, rsp = [], []
59 for name, args, opts, resref in self.calls:
59 for name, args, opts, resref in self.calls:
60 mtd = getattr(self.remote, name)
60 mtd = getattr(self.remote, name)
61 if hasattr(mtd, 'batchable'):
61 if hasattr(mtd, 'batchable'):
62 batchable = getattr(mtd, 'batchable')(mtd.im_self, *args, **opts)
62 batchable = getattr(mtd, 'batchable')(mtd.im_self, *args, **opts)
63 encargsorres, encresref = batchable.next()
63 encargsorres, encresref = batchable.next()
64 if encresref:
64 if encresref:
65 req.append((name, encargsorres,))
65 req.append((name, encargsorres,))
66 rsp.append((batchable, encresref, resref,))
66 rsp.append((batchable, encresref, resref,))
67 else:
67 else:
68 resref.set(encargsorres)
68 resref.set(encargsorres)
69 else:
69 else:
70 if req:
70 if req:
71 self._submitreq(req, rsp)
71 self._submitreq(req, rsp)
72 req, rsp = [], []
72 req, rsp = [], []
73 resref.set(mtd(*args, **opts))
73 resref.set(mtd(*args, **opts))
74 if req:
74 if req:
75 self._submitreq(req, rsp)
75 self._submitreq(req, rsp)
76 def _submitreq(self, req, rsp):
76 def _submitreq(self, req, rsp):
77 encresults = self.remote._submitbatch(req)
77 encresults = self.remote._submitbatch(req)
78 for encres, r in zip(encresults, rsp):
78 for encres, r in zip(encresults, rsp):
79 batchable, encresref, resref = r
79 batchable, encresref, resref = r
80 encresref.set(encres)
80 encresref.set(encres)
81 resref.set(batchable.next())
81 resref.set(batchable.next())
82
82
83 def batchable(f):
83 def batchable(f):
84 '''annotation for batchable methods
84 '''annotation for batchable methods
85
85
86 Such methods must implement a coroutine as follows:
86 Such methods must implement a coroutine as follows:
87
87
88 @batchable
88 @batchable
89 def sample(self, one, two=None):
89 def sample(self, one, two=None):
90 # Handle locally computable results first:
90 # Handle locally computable results first:
91 if not one:
91 if not one:
92 yield "a local result", None
92 yield "a local result", None
93 # Build list of encoded arguments suitable for your wire protocol:
93 # Build list of encoded arguments suitable for your wire protocol:
94 encargs = [('one', encode(one),), ('two', encode(two),)]
94 encargs = [('one', encode(one),), ('two', encode(two),)]
95 # Create future for injection of encoded result:
95 # Create future for injection of encoded result:
96 encresref = future()
96 encresref = future()
97 # Return encoded arguments and future:
97 # Return encoded arguments and future:
98 yield encargs, encresref
98 yield encargs, encresref
99 # Assuming the future to be filled with the result from the batched request
99 # Assuming the future to be filled with the result from the batched request
100 # now. Decode it:
100 # now. Decode it:
101 yield decode(encresref.value)
101 yield decode(encresref.value)
102
102
103 The decorator returns a function which wraps this coroutine as a plain method,
103 The decorator returns a function which wraps this coroutine as a plain method,
104 but adds the original method as an attribute called "batchable", which is
104 but adds the original method as an attribute called "batchable", which is
105 used by remotebatch to split the call into separate encoding and decoding
105 used by remotebatch to split the call into separate encoding and decoding
106 phases.
106 phases.
107 '''
107 '''
108 def plain(*args, **opts):
108 def plain(*args, **opts):
109 batchable = f(*args, **opts)
109 batchable = f(*args, **opts)
110 encargsorres, encresref = batchable.next()
110 encargsorres, encresref = batchable.next()
111 if not encresref:
111 if not encresref:
112 return encargsorres # a local result in this case
112 return encargsorres # a local result in this case
113 self = args[0]
113 self = args[0]
114 encresref.set(self._submitone(f.func_name, encargsorres))
114 encresref.set(self._submitone(f.func_name, encargsorres))
115 return batchable.next()
115 return batchable.next()
116 setattr(plain, 'batchable', f)
116 setattr(plain, 'batchable', f)
117 return plain
117 return plain
118
118
119 # list of nodes encoding / decoding
119 # list of nodes encoding / decoding
120
120
121 def decodelist(l, sep=' '):
121 def decodelist(l, sep=' '):
122 if l:
122 if l:
123 return map(bin, l.split(sep))
123 return map(bin, l.split(sep))
124 return []
124 return []
125
125
126 def encodelist(l, sep=' '):
126 def encodelist(l, sep=' '):
127 return sep.join(map(hex, l))
127 return sep.join(map(hex, l))
128
128
129 # batched call argument encoding
129 # batched call argument encoding
130
130
131 def escapearg(plain):
131 def escapearg(plain):
132 return (plain
132 return (plain
133 .replace(':', '::')
133 .replace(':', '::')
134 .replace(',', ':,')
134 .replace(',', ':,')
135 .replace(';', ':;')
135 .replace(';', ':;')
136 .replace('=', ':='))
136 .replace('=', ':='))
137
137
138 def unescapearg(escaped):
138 def unescapearg(escaped):
139 return (escaped
139 return (escaped
140 .replace(':=', '=')
140 .replace(':=', '=')
141 .replace(':;', ';')
141 .replace(':;', ';')
142 .replace(':,', ',')
142 .replace(':,', ',')
143 .replace('::', ':'))
143 .replace('::', ':'))
144
144
145 # client side
145 # client side
146
146
147 def todict(**args):
147 def todict(**args):
148 return args
148 return args
149
149
150 class wirerepository(repo.repository):
150 class wirerepository(repo.repository):
151
151
152 def batch(self):
152 def batch(self):
153 return remotebatch(self)
153 return remotebatch(self)
154 def _submitbatch(self, req):
154 def _submitbatch(self, req):
155 cmds = []
155 cmds = []
156 for op, argsdict in req:
156 for op, argsdict in req:
157 args = ','.join('%s=%s' % p for p in argsdict.iteritems())
157 args = ','.join('%s=%s' % p for p in argsdict.iteritems())
158 cmds.append('%s %s' % (op, args))
158 cmds.append('%s %s' % (op, args))
159 rsp = self._call("batch", cmds=';'.join(cmds))
159 rsp = self._call("batch", cmds=';'.join(cmds))
160 return rsp.split(';')
160 return rsp.split(';')
161 def _submitone(self, op, args):
161 def _submitone(self, op, args):
162 return self._call(op, **args)
162 return self._call(op, **args)
163
163
164 @batchable
164 def lookup(self, key):
165 def lookup(self, key):
165 self.requirecap('lookup', _('look up remote revision'))
166 self.requirecap('lookup', _('look up remote revision'))
166 d = self._call("lookup", key=encoding.fromlocal(key))
167 f = future()
168 yield todict(key=encoding.fromlocal(key)), f
169 d = f.value
167 success, data = d[:-1].split(" ", 1)
170 success, data = d[:-1].split(" ", 1)
168 if int(success):
171 if int(success):
169 return bin(data)
172 yield bin(data)
170 self._abort(error.RepoError(data))
173 self._abort(error.RepoError(data))
171
174
175 @batchable
172 def heads(self):
176 def heads(self):
173 d = self._call("heads")
177 f = future()
178 yield {}, f
179 d = f.value
174 try:
180 try:
175 return decodelist(d[:-1])
181 yield decodelist(d[:-1])
176 except ValueError:
182 except ValueError:
177 self._abort(error.ResponseError(_("unexpected response:"), d))
183 self._abort(error.ResponseError(_("unexpected response:"), d))
178
184
185 @batchable
179 def known(self, nodes):
186 def known(self, nodes):
180 n = encodelist(nodes)
187 f = future()
181 d = self._call("known", nodes=n)
188 yield todict(nodes=encodelist(nodes)), f
189 d = f.value
182 try:
190 try:
183 return [bool(int(f)) for f in d]
191 yield [bool(int(f)) for f in d]
184 except ValueError:
192 except ValueError:
185 self._abort(error.ResponseError(_("unexpected response:"), d))
193 self._abort(error.ResponseError(_("unexpected response:"), d))
186
194
195 @batchable
187 def branchmap(self):
196 def branchmap(self):
188 d = self._call("branchmap")
197 f = future()
198 yield {}, f
199 d = f.value
189 try:
200 try:
190 branchmap = {}
201 branchmap = {}
191 for branchpart in d.splitlines():
202 for branchpart in d.splitlines():
192 branchname, branchheads = branchpart.split(' ', 1)
203 branchname, branchheads = branchpart.split(' ', 1)
193 branchname = encoding.tolocal(urllib.unquote(branchname))
204 branchname = encoding.tolocal(urllib.unquote(branchname))
194 branchheads = decodelist(branchheads)
205 branchheads = decodelist(branchheads)
195 branchmap[branchname] = branchheads
206 branchmap[branchname] = branchheads
196 return branchmap
207 yield branchmap
197 except TypeError:
208 except TypeError:
198 self._abort(error.ResponseError(_("unexpected response:"), d))
209 self._abort(error.ResponseError(_("unexpected response:"), d))
199
210
200 def branches(self, nodes):
211 def branches(self, nodes):
201 n = encodelist(nodes)
212 n = encodelist(nodes)
202 d = self._call("branches", nodes=n)
213 d = self._call("branches", nodes=n)
203 try:
214 try:
204 br = [tuple(decodelist(b)) for b in d.splitlines()]
215 br = [tuple(decodelist(b)) for b in d.splitlines()]
205 return br
216 return br
206 except ValueError:
217 except ValueError:
207 self._abort(error.ResponseError(_("unexpected response:"), d))
218 self._abort(error.ResponseError(_("unexpected response:"), d))
208
219
209 def between(self, pairs):
220 def between(self, pairs):
210 batch = 8 # avoid giant requests
221 batch = 8 # avoid giant requests
211 r = []
222 r = []
212 for i in xrange(0, len(pairs), batch):
223 for i in xrange(0, len(pairs), batch):
213 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
224 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
214 d = self._call("between", pairs=n)
225 d = self._call("between", pairs=n)
215 try:
226 try:
216 r.extend(l and decodelist(l) or [] for l in d.splitlines())
227 r.extend(l and decodelist(l) or [] for l in d.splitlines())
217 except ValueError:
228 except ValueError:
218 self._abort(error.ResponseError(_("unexpected response:"), d))
229 self._abort(error.ResponseError(_("unexpected response:"), d))
219 return r
230 return r
220
231
232 @batchable
221 def pushkey(self, namespace, key, old, new):
233 def pushkey(self, namespace, key, old, new):
222 if not self.capable('pushkey'):
234 if not self.capable('pushkey'):
223 return False
235 yield False, None
224 d = self._call("pushkey",
236 f = future()
225 namespace=encoding.fromlocal(namespace),
237 yield todict(namespace=encoding.fromlocal(namespace),
226 key=encoding.fromlocal(key),
238 key=encoding.fromlocal(key),
227 old=encoding.fromlocal(old),
239 old=encoding.fromlocal(old),
228 new=encoding.fromlocal(new))
240 new=encoding.fromlocal(new)), f
241 d = f.value
229 try:
242 try:
230 d = bool(int(d))
243 d = bool(int(d))
231 except ValueError:
244 except ValueError:
232 raise error.ResponseError(
245 raise error.ResponseError(
233 _('push failed (unexpected response):'), d)
246 _('push failed (unexpected response):'), d)
234 return d
247 yield d
235
248
249 @batchable
236 def listkeys(self, namespace):
250 def listkeys(self, namespace):
237 if not self.capable('pushkey'):
251 if not self.capable('pushkey'):
238 return {}
252 yield {}, None
239 d = self._call("listkeys", namespace=encoding.fromlocal(namespace))
253 f = future()
254 yield todict(namespace=encoding.fromlocal(namespace)), f
255 d = f.value
240 r = {}
256 r = {}
241 for l in d.splitlines():
257 for l in d.splitlines():
242 k, v = l.split('\t')
258 k, v = l.split('\t')
243 r[encoding.tolocal(k)] = encoding.tolocal(v)
259 r[encoding.tolocal(k)] = encoding.tolocal(v)
244 return r
260 yield r
245
261
246 def stream_out(self):
262 def stream_out(self):
247 return self._callstream('stream_out')
263 return self._callstream('stream_out')
248
264
249 def changegroup(self, nodes, kind):
265 def changegroup(self, nodes, kind):
250 n = encodelist(nodes)
266 n = encodelist(nodes)
251 f = self._callstream("changegroup", roots=n)
267 f = self._callstream("changegroup", roots=n)
252 return changegroupmod.unbundle10(self._decompress(f), 'UN')
268 return changegroupmod.unbundle10(self._decompress(f), 'UN')
253
269
254 def changegroupsubset(self, bases, heads, kind):
270 def changegroupsubset(self, bases, heads, kind):
255 self.requirecap('changegroupsubset', _('look up remote changes'))
271 self.requirecap('changegroupsubset', _('look up remote changes'))
256 bases = encodelist(bases)
272 bases = encodelist(bases)
257 heads = encodelist(heads)
273 heads = encodelist(heads)
258 f = self._callstream("changegroupsubset",
274 f = self._callstream("changegroupsubset",
259 bases=bases, heads=heads)
275 bases=bases, heads=heads)
260 return changegroupmod.unbundle10(self._decompress(f), 'UN')
276 return changegroupmod.unbundle10(self._decompress(f), 'UN')
261
277
262 def getbundle(self, source, heads=None, common=None):
278 def getbundle(self, source, heads=None, common=None):
263 self.requirecap('getbundle', _('look up remote changes'))
279 self.requirecap('getbundle', _('look up remote changes'))
264 opts = {}
280 opts = {}
265 if heads is not None:
281 if heads is not None:
266 opts['heads'] = encodelist(heads)
282 opts['heads'] = encodelist(heads)
267 if common is not None:
283 if common is not None:
268 opts['common'] = encodelist(common)
284 opts['common'] = encodelist(common)
269 f = self._callstream("getbundle", **opts)
285 f = self._callstream("getbundle", **opts)
270 return changegroupmod.unbundle10(self._decompress(f), 'UN')
286 return changegroupmod.unbundle10(self._decompress(f), 'UN')
271
287
272 def unbundle(self, cg, heads, source):
288 def unbundle(self, cg, heads, source):
273 '''Send cg (a readable file-like object representing the
289 '''Send cg (a readable file-like object representing the
274 changegroup to push, typically a chunkbuffer object) to the
290 changegroup to push, typically a chunkbuffer object) to the
275 remote server as a bundle. Return an integer indicating the
291 remote server as a bundle. Return an integer indicating the
276 result of the push (see localrepository.addchangegroup()).'''
292 result of the push (see localrepository.addchangegroup()).'''
277
293
278 if heads != ['force'] and self.capable('unbundlehash'):
294 if heads != ['force'] and self.capable('unbundlehash'):
279 heads = encodelist(['hashed',
295 heads = encodelist(['hashed',
280 util.sha1(''.join(sorted(heads))).digest()])
296 util.sha1(''.join(sorted(heads))).digest()])
281 else:
297 else:
282 heads = encodelist(heads)
298 heads = encodelist(heads)
283
299
284 ret, output = self._callpush("unbundle", cg, heads=heads)
300 ret, output = self._callpush("unbundle", cg, heads=heads)
285 if ret == "":
301 if ret == "":
286 raise error.ResponseError(
302 raise error.ResponseError(
287 _('push failed:'), output)
303 _('push failed:'), output)
288 try:
304 try:
289 ret = int(ret)
305 ret = int(ret)
290 except ValueError:
306 except ValueError:
291 raise error.ResponseError(
307 raise error.ResponseError(
292 _('push failed (unexpected response):'), ret)
308 _('push failed (unexpected response):'), ret)
293
309
294 for l in output.splitlines(True):
310 for l in output.splitlines(True):
295 self.ui.status(_('remote: '), l)
311 self.ui.status(_('remote: '), l)
296 return ret
312 return ret
297
313
298 def debugwireargs(self, one, two, three=None, four=None, five=None):
314 def debugwireargs(self, one, two, three=None, four=None, five=None):
299 # don't pass optional arguments left at their default value
315 # don't pass optional arguments left at their default value
300 opts = {}
316 opts = {}
301 if three is not None:
317 if three is not None:
302 opts['three'] = three
318 opts['three'] = three
303 if four is not None:
319 if four is not None:
304 opts['four'] = four
320 opts['four'] = four
305 return self._call('debugwireargs', one=one, two=two, **opts)
321 return self._call('debugwireargs', one=one, two=two, **opts)
306
322
307 # server side
323 # server side
308
324
309 class streamres(object):
325 class streamres(object):
310 def __init__(self, gen):
326 def __init__(self, gen):
311 self.gen = gen
327 self.gen = gen
312
328
313 class pushres(object):
329 class pushres(object):
314 def __init__(self, res):
330 def __init__(self, res):
315 self.res = res
331 self.res = res
316
332
317 class pusherr(object):
333 class pusherr(object):
318 def __init__(self, res):
334 def __init__(self, res):
319 self.res = res
335 self.res = res
320
336
321 def dispatch(repo, proto, command):
337 def dispatch(repo, proto, command):
322 func, spec = commands[command]
338 func, spec = commands[command]
323 args = proto.getargs(spec)
339 args = proto.getargs(spec)
324 return func(repo, proto, *args)
340 return func(repo, proto, *args)
325
341
326 def options(cmd, keys, others):
342 def options(cmd, keys, others):
327 opts = {}
343 opts = {}
328 for k in keys:
344 for k in keys:
329 if k in others:
345 if k in others:
330 opts[k] = others[k]
346 opts[k] = others[k]
331 del others[k]
347 del others[k]
332 if others:
348 if others:
333 sys.stderr.write("abort: %s got unexpected arguments %s\n"
349 sys.stderr.write("abort: %s got unexpected arguments %s\n"
334 % (cmd, ",".join(others)))
350 % (cmd, ",".join(others)))
335 return opts
351 return opts
336
352
337 def batch(repo, proto, cmds, others):
353 def batch(repo, proto, cmds, others):
338 res = []
354 res = []
339 for pair in cmds.split(';'):
355 for pair in cmds.split(';'):
340 op, args = pair.split(' ', 1)
356 op, args = pair.split(' ', 1)
341 vals = {}
357 vals = {}
342 for a in args.split(','):
358 for a in args.split(','):
343 if a:
359 if a:
344 n, v = a.split('=')
360 n, v = a.split('=')
345 vals[n] = unescapearg(v)
361 vals[n] = unescapearg(v)
346 func, spec = commands[op]
362 func, spec = commands[op]
347 if spec:
363 if spec:
348 keys = spec.split()
364 keys = spec.split()
349 data = {}
365 data = {}
350 for k in keys:
366 for k in keys:
351 if k == '*':
367 if k == '*':
352 star = {}
368 star = {}
353 for key in vals.keys():
369 for key in vals.keys():
354 if key not in keys:
370 if key not in keys:
355 star[key] = vals[key]
371 star[key] = vals[key]
356 data['*'] = star
372 data['*'] = star
357 else:
373 else:
358 data[k] = vals[k]
374 data[k] = vals[k]
359 result = func(repo, proto, *[data[k] for k in keys])
375 result = func(repo, proto, *[data[k] for k in keys])
360 else:
376 else:
361 result = func(repo, proto)
377 result = func(repo, proto)
362 res.append(escapearg(result))
378 res.append(escapearg(result))
363 return ';'.join(res)
379 return ';'.join(res)
364
380
365 def between(repo, proto, pairs):
381 def between(repo, proto, pairs):
366 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
382 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
367 r = []
383 r = []
368 for b in repo.between(pairs):
384 for b in repo.between(pairs):
369 r.append(encodelist(b) + "\n")
385 r.append(encodelist(b) + "\n")
370 return "".join(r)
386 return "".join(r)
371
387
372 def branchmap(repo, proto):
388 def branchmap(repo, proto):
373 branchmap = repo.branchmap()
389 branchmap = repo.branchmap()
374 heads = []
390 heads = []
375 for branch, nodes in branchmap.iteritems():
391 for branch, nodes in branchmap.iteritems():
376 branchname = urllib.quote(encoding.fromlocal(branch))
392 branchname = urllib.quote(encoding.fromlocal(branch))
377 branchnodes = encodelist(nodes)
393 branchnodes = encodelist(nodes)
378 heads.append('%s %s' % (branchname, branchnodes))
394 heads.append('%s %s' % (branchname, branchnodes))
379 return '\n'.join(heads)
395 return '\n'.join(heads)
380
396
381 def branches(repo, proto, nodes):
397 def branches(repo, proto, nodes):
382 nodes = decodelist(nodes)
398 nodes = decodelist(nodes)
383 r = []
399 r = []
384 for b in repo.branches(nodes):
400 for b in repo.branches(nodes):
385 r.append(encodelist(b) + "\n")
401 r.append(encodelist(b) + "\n")
386 return "".join(r)
402 return "".join(r)
387
403
388 def capabilities(repo, proto):
404 def capabilities(repo, proto):
389 caps = ('lookup changegroupsubset branchmap pushkey known getbundle '
405 caps = ('lookup changegroupsubset branchmap pushkey known getbundle '
390 'unbundlehash batch').split()
406 'unbundlehash batch').split()
391 if _allowstream(repo.ui):
407 if _allowstream(repo.ui):
392 requiredformats = repo.requirements & repo.supportedformats
408 requiredformats = repo.requirements & repo.supportedformats
393 # if our local revlogs are just revlogv1, add 'stream' cap
409 # if our local revlogs are just revlogv1, add 'stream' cap
394 if not requiredformats - set(('revlogv1',)):
410 if not requiredformats - set(('revlogv1',)):
395 caps.append('stream')
411 caps.append('stream')
396 # otherwise, add 'streamreqs' detailing our local revlog format
412 # otherwise, add 'streamreqs' detailing our local revlog format
397 else:
413 else:
398 caps.append('streamreqs=%s' % ','.join(requiredformats))
414 caps.append('streamreqs=%s' % ','.join(requiredformats))
399 caps.append('unbundle=%s' % ','.join(changegroupmod.bundlepriority))
415 caps.append('unbundle=%s' % ','.join(changegroupmod.bundlepriority))
400 caps.append('httpheader=1024')
416 caps.append('httpheader=1024')
401 return ' '.join(caps)
417 return ' '.join(caps)
402
418
403 def changegroup(repo, proto, roots):
419 def changegroup(repo, proto, roots):
404 nodes = decodelist(roots)
420 nodes = decodelist(roots)
405 cg = repo.changegroup(nodes, 'serve')
421 cg = repo.changegroup(nodes, 'serve')
406 return streamres(proto.groupchunks(cg))
422 return streamres(proto.groupchunks(cg))
407
423
408 def changegroupsubset(repo, proto, bases, heads):
424 def changegroupsubset(repo, proto, bases, heads):
409 bases = decodelist(bases)
425 bases = decodelist(bases)
410 heads = decodelist(heads)
426 heads = decodelist(heads)
411 cg = repo.changegroupsubset(bases, heads, 'serve')
427 cg = repo.changegroupsubset(bases, heads, 'serve')
412 return streamres(proto.groupchunks(cg))
428 return streamres(proto.groupchunks(cg))
413
429
414 def debugwireargs(repo, proto, one, two, others):
430 def debugwireargs(repo, proto, one, two, others):
415 # only accept optional args from the known set
431 # only accept optional args from the known set
416 opts = options('debugwireargs', ['three', 'four'], others)
432 opts = options('debugwireargs', ['three', 'four'], others)
417 return repo.debugwireargs(one, two, **opts)
433 return repo.debugwireargs(one, two, **opts)
418
434
419 def getbundle(repo, proto, others):
435 def getbundle(repo, proto, others):
420 opts = options('getbundle', ['heads', 'common'], others)
436 opts = options('getbundle', ['heads', 'common'], others)
421 for k, v in opts.iteritems():
437 for k, v in opts.iteritems():
422 opts[k] = decodelist(v)
438 opts[k] = decodelist(v)
423 cg = repo.getbundle('serve', **opts)
439 cg = repo.getbundle('serve', **opts)
424 return streamres(proto.groupchunks(cg))
440 return streamres(proto.groupchunks(cg))
425
441
426 def heads(repo, proto):
442 def heads(repo, proto):
427 h = repo.heads()
443 h = repo.heads()
428 return encodelist(h) + "\n"
444 return encodelist(h) + "\n"
429
445
430 def hello(repo, proto):
446 def hello(repo, proto):
431 '''the hello command returns a set of lines describing various
447 '''the hello command returns a set of lines describing various
432 interesting things about the server, in an RFC822-like format.
448 interesting things about the server, in an RFC822-like format.
433 Currently the only one defined is "capabilities", which
449 Currently the only one defined is "capabilities", which
434 consists of a line in the form:
450 consists of a line in the form:
435
451
436 capabilities: space separated list of tokens
452 capabilities: space separated list of tokens
437 '''
453 '''
438 return "capabilities: %s\n" % (capabilities(repo, proto))
454 return "capabilities: %s\n" % (capabilities(repo, proto))
439
455
440 def listkeys(repo, proto, namespace):
456 def listkeys(repo, proto, namespace):
441 d = pushkeymod.list(repo, encoding.tolocal(namespace)).items()
457 d = pushkeymod.list(repo, encoding.tolocal(namespace)).items()
442 t = '\n'.join(['%s\t%s' % (encoding.fromlocal(k), encoding.fromlocal(v))
458 t = '\n'.join(['%s\t%s' % (encoding.fromlocal(k), encoding.fromlocal(v))
443 for k, v in d])
459 for k, v in d])
444 return t
460 return t
445
461
446 def lookup(repo, proto, key):
462 def lookup(repo, proto, key):
447 try:
463 try:
448 r = hex(repo.lookup(encoding.tolocal(key)))
464 r = hex(repo.lookup(encoding.tolocal(key)))
449 success = 1
465 success = 1
450 except Exception, inst:
466 except Exception, inst:
451 r = str(inst)
467 r = str(inst)
452 success = 0
468 success = 0
453 return "%s %s\n" % (success, r)
469 return "%s %s\n" % (success, r)
454
470
455 def known(repo, proto, nodes, others):
471 def known(repo, proto, nodes, others):
456 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
472 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
457
473
458 def pushkey(repo, proto, namespace, key, old, new):
474 def pushkey(repo, proto, namespace, key, old, new):
459 # compatibility with pre-1.8 clients which were accidentally
475 # compatibility with pre-1.8 clients which were accidentally
460 # sending raw binary nodes rather than utf-8-encoded hex
476 # sending raw binary nodes rather than utf-8-encoded hex
461 if len(new) == 20 and new.encode('string-escape') != new:
477 if len(new) == 20 and new.encode('string-escape') != new:
462 # looks like it could be a binary node
478 # looks like it could be a binary node
463 try:
479 try:
464 new.decode('utf-8')
480 new.decode('utf-8')
465 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
481 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
466 except UnicodeDecodeError:
482 except UnicodeDecodeError:
467 pass # binary, leave unmodified
483 pass # binary, leave unmodified
468 else:
484 else:
469 new = encoding.tolocal(new) # normal path
485 new = encoding.tolocal(new) # normal path
470
486
471 r = pushkeymod.push(repo,
487 r = pushkeymod.push(repo,
472 encoding.tolocal(namespace), encoding.tolocal(key),
488 encoding.tolocal(namespace), encoding.tolocal(key),
473 encoding.tolocal(old), new)
489 encoding.tolocal(old), new)
474 return '%s\n' % int(r)
490 return '%s\n' % int(r)
475
491
476 def _allowstream(ui):
492 def _allowstream(ui):
477 return ui.configbool('server', 'uncompressed', True, untrusted=True)
493 return ui.configbool('server', 'uncompressed', True, untrusted=True)
478
494
479 def stream(repo, proto):
495 def stream(repo, proto):
480 '''If the server supports streaming clone, it advertises the "stream"
496 '''If the server supports streaming clone, it advertises the "stream"
481 capability with a value representing the version and flags of the repo
497 capability with a value representing the version and flags of the repo
482 it is serving. Client checks to see if it understands the format.
498 it is serving. Client checks to see if it understands the format.
483
499
484 The format is simple: the server writes out a line with the amount
500 The format is simple: the server writes out a line with the amount
485 of files, then the total amount of bytes to be transfered (separated
501 of files, then the total amount of bytes to be transfered (separated
486 by a space). Then, for each file, the server first writes the filename
502 by a space). Then, for each file, the server first writes the filename
487 and filesize (separated by the null character), then the file contents.
503 and filesize (separated by the null character), then the file contents.
488 '''
504 '''
489
505
490 if not _allowstream(repo.ui):
506 if not _allowstream(repo.ui):
491 return '1\n'
507 return '1\n'
492
508
493 entries = []
509 entries = []
494 total_bytes = 0
510 total_bytes = 0
495 try:
511 try:
496 # get consistent snapshot of repo, lock during scan
512 # get consistent snapshot of repo, lock during scan
497 lock = repo.lock()
513 lock = repo.lock()
498 try:
514 try:
499 repo.ui.debug('scanning\n')
515 repo.ui.debug('scanning\n')
500 for name, ename, size in repo.store.walk():
516 for name, ename, size in repo.store.walk():
501 entries.append((name, size))
517 entries.append((name, size))
502 total_bytes += size
518 total_bytes += size
503 finally:
519 finally:
504 lock.release()
520 lock.release()
505 except error.LockError:
521 except error.LockError:
506 return '2\n' # error: 2
522 return '2\n' # error: 2
507
523
508 def streamer(repo, entries, total):
524 def streamer(repo, entries, total):
509 '''stream out all metadata files in repository.'''
525 '''stream out all metadata files in repository.'''
510 yield '0\n' # success
526 yield '0\n' # success
511 repo.ui.debug('%d files, %d bytes to transfer\n' %
527 repo.ui.debug('%d files, %d bytes to transfer\n' %
512 (len(entries), total_bytes))
528 (len(entries), total_bytes))
513 yield '%d %d\n' % (len(entries), total_bytes)
529 yield '%d %d\n' % (len(entries), total_bytes)
514 for name, size in entries:
530 for name, size in entries:
515 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
531 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
516 # partially encode name over the wire for backwards compat
532 # partially encode name over the wire for backwards compat
517 yield '%s\0%d\n' % (store.encodedir(name), size)
533 yield '%s\0%d\n' % (store.encodedir(name), size)
518 for chunk in util.filechunkiter(repo.sopener(name), limit=size):
534 for chunk in util.filechunkiter(repo.sopener(name), limit=size):
519 yield chunk
535 yield chunk
520
536
521 return streamres(streamer(repo, entries, total_bytes))
537 return streamres(streamer(repo, entries, total_bytes))
522
538
523 def unbundle(repo, proto, heads):
539 def unbundle(repo, proto, heads):
524 their_heads = decodelist(heads)
540 their_heads = decodelist(heads)
525
541
526 def check_heads():
542 def check_heads():
527 heads = repo.heads()
543 heads = repo.heads()
528 heads_hash = util.sha1(''.join(sorted(heads))).digest()
544 heads_hash = util.sha1(''.join(sorted(heads))).digest()
529 return (their_heads == ['force'] or their_heads == heads or
545 return (their_heads == ['force'] or their_heads == heads or
530 their_heads == ['hashed', heads_hash])
546 their_heads == ['hashed', heads_hash])
531
547
532 proto.redirect()
548 proto.redirect()
533
549
534 # fail early if possible
550 # fail early if possible
535 if not check_heads():
551 if not check_heads():
536 return pusherr('unsynced changes')
552 return pusherr('unsynced changes')
537
553
538 # write bundle data to temporary file because it can be big
554 # write bundle data to temporary file because it can be big
539 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
555 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
540 fp = os.fdopen(fd, 'wb+')
556 fp = os.fdopen(fd, 'wb+')
541 r = 0
557 r = 0
542 try:
558 try:
543 proto.getfile(fp)
559 proto.getfile(fp)
544 lock = repo.lock()
560 lock = repo.lock()
545 try:
561 try:
546 if not check_heads():
562 if not check_heads():
547 # someone else committed/pushed/unbundled while we
563 # someone else committed/pushed/unbundled while we
548 # were transferring data
564 # were transferring data
549 return pusherr('unsynced changes')
565 return pusherr('unsynced changes')
550
566
551 # push can proceed
567 # push can proceed
552 fp.seek(0)
568 fp.seek(0)
553 gen = changegroupmod.readbundle(fp, None)
569 gen = changegroupmod.readbundle(fp, None)
554
570
555 try:
571 try:
556 r = repo.addchangegroup(gen, 'serve', proto._client(),
572 r = repo.addchangegroup(gen, 'serve', proto._client(),
557 lock=lock)
573 lock=lock)
558 except util.Abort, inst:
574 except util.Abort, inst:
559 sys.stderr.write("abort: %s\n" % inst)
575 sys.stderr.write("abort: %s\n" % inst)
560 finally:
576 finally:
561 lock.release()
577 lock.release()
562 return pushres(r)
578 return pushres(r)
563
579
564 finally:
580 finally:
565 fp.close()
581 fp.close()
566 os.unlink(tempname)
582 os.unlink(tempname)
567
583
568 commands = {
584 commands = {
569 'batch': (batch, 'cmds *'),
585 'batch': (batch, 'cmds *'),
570 'between': (between, 'pairs'),
586 'between': (between, 'pairs'),
571 'branchmap': (branchmap, ''),
587 'branchmap': (branchmap, ''),
572 'branches': (branches, 'nodes'),
588 'branches': (branches, 'nodes'),
573 'capabilities': (capabilities, ''),
589 'capabilities': (capabilities, ''),
574 'changegroup': (changegroup, 'roots'),
590 'changegroup': (changegroup, 'roots'),
575 'changegroupsubset': (changegroupsubset, 'bases heads'),
591 'changegroupsubset': (changegroupsubset, 'bases heads'),
576 'debugwireargs': (debugwireargs, 'one two *'),
592 'debugwireargs': (debugwireargs, 'one two *'),
577 'getbundle': (getbundle, '*'),
593 'getbundle': (getbundle, '*'),
578 'heads': (heads, ''),
594 'heads': (heads, ''),
579 'hello': (hello, ''),
595 'hello': (hello, ''),
580 'known': (known, 'nodes *'),
596 'known': (known, 'nodes *'),
581 'listkeys': (listkeys, 'namespace'),
597 'listkeys': (listkeys, 'namespace'),
582 'lookup': (lookup, 'key'),
598 'lookup': (lookup, 'key'),
583 'pushkey': (pushkey, 'namespace key old new'),
599 'pushkey': (pushkey, 'namespace key old new'),
584 'stream_out': (stream, ''),
600 'stream_out': (stream, ''),
585 'unbundle': (unbundle, 'heads'),
601 'unbundle': (unbundle, 'heads'),
586 }
602 }
General Comments 0
You need to be logged in to leave comments. Login now