##// END OF EJS Templates
wireproto: handle other server output in pushkey...
Pierre-Yves David -
r15652:ca6accda default
parent child Browse files
Show More
@@ -1,606 +1,609 b''
1 # wireproto.py - generic wire protocol support functions
1 # wireproto.py - generic wire protocol support functions
2 #
2 #
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 import urllib, tempfile, os, sys
8 import urllib, tempfile, os, sys
9 from i18n import _
9 from i18n import _
10 from node import bin, hex
10 from node import bin, hex
11 import changegroup as changegroupmod
11 import changegroup as changegroupmod
12 import repo, error, encoding, util, store
12 import repo, error, encoding, util, store
13
13
14 # abstract batching support
14 # abstract batching support
15
15
16 class future(object):
16 class future(object):
17 '''placeholder for a value to be set later'''
17 '''placeholder for a value to be set later'''
18 def set(self, value):
18 def set(self, value):
19 if util.safehasattr(self, 'value'):
19 if util.safehasattr(self, 'value'):
20 raise error.RepoError("future is already set")
20 raise error.RepoError("future is already set")
21 self.value = value
21 self.value = value
22
22
23 class batcher(object):
23 class batcher(object):
24 '''base class for batches of commands submittable in a single request
24 '''base class for batches of commands submittable in a single request
25
25
26 All methods invoked on instances of this class are simply queued and return a
26 All methods invoked on instances of this class are simply queued and return a
27 a future for the result. Once you call submit(), all the queued calls are
27 a future for the result. Once you call submit(), all the queued calls are
28 performed and the results set in their respective futures.
28 performed and the results set in their respective futures.
29 '''
29 '''
30 def __init__(self):
30 def __init__(self):
31 self.calls = []
31 self.calls = []
32 def __getattr__(self, name):
32 def __getattr__(self, name):
33 def call(*args, **opts):
33 def call(*args, **opts):
34 resref = future()
34 resref = future()
35 self.calls.append((name, args, opts, resref,))
35 self.calls.append((name, args, opts, resref,))
36 return resref
36 return resref
37 return call
37 return call
38 def submit(self):
38 def submit(self):
39 pass
39 pass
40
40
41 class localbatch(batcher):
41 class localbatch(batcher):
42 '''performs the queued calls directly'''
42 '''performs the queued calls directly'''
43 def __init__(self, local):
43 def __init__(self, local):
44 batcher.__init__(self)
44 batcher.__init__(self)
45 self.local = local
45 self.local = local
46 def submit(self):
46 def submit(self):
47 for name, args, opts, resref in self.calls:
47 for name, args, opts, resref in self.calls:
48 resref.set(getattr(self.local, name)(*args, **opts))
48 resref.set(getattr(self.local, name)(*args, **opts))
49
49
50 class remotebatch(batcher):
50 class remotebatch(batcher):
51 '''batches the queued calls; uses as few roundtrips as possible'''
51 '''batches the queued calls; uses as few roundtrips as possible'''
52 def __init__(self, remote):
52 def __init__(self, remote):
53 '''remote must support _submitbatch(encbatch) and _submitone(op, encargs)'''
53 '''remote must support _submitbatch(encbatch) and _submitone(op, encargs)'''
54 batcher.__init__(self)
54 batcher.__init__(self)
55 self.remote = remote
55 self.remote = remote
56 def submit(self):
56 def submit(self):
57 req, rsp = [], []
57 req, rsp = [], []
58 for name, args, opts, resref in self.calls:
58 for name, args, opts, resref in self.calls:
59 mtd = getattr(self.remote, name)
59 mtd = getattr(self.remote, name)
60 batchablefn = getattr(mtd, 'batchable', None)
60 batchablefn = getattr(mtd, 'batchable', None)
61 if batchablefn is not None:
61 if batchablefn is not None:
62 batchable = batchablefn(mtd.im_self, *args, **opts)
62 batchable = batchablefn(mtd.im_self, *args, **opts)
63 encargsorres, encresref = batchable.next()
63 encargsorres, encresref = batchable.next()
64 if encresref:
64 if encresref:
65 req.append((name, encargsorres,))
65 req.append((name, encargsorres,))
66 rsp.append((batchable, encresref, resref,))
66 rsp.append((batchable, encresref, resref,))
67 else:
67 else:
68 resref.set(encargsorres)
68 resref.set(encargsorres)
69 else:
69 else:
70 if req:
70 if req:
71 self._submitreq(req, rsp)
71 self._submitreq(req, rsp)
72 req, rsp = [], []
72 req, rsp = [], []
73 resref.set(mtd(*args, **opts))
73 resref.set(mtd(*args, **opts))
74 if req:
74 if req:
75 self._submitreq(req, rsp)
75 self._submitreq(req, rsp)
76 def _submitreq(self, req, rsp):
76 def _submitreq(self, req, rsp):
77 encresults = self.remote._submitbatch(req)
77 encresults = self.remote._submitbatch(req)
78 for encres, r in zip(encresults, rsp):
78 for encres, r in zip(encresults, rsp):
79 batchable, encresref, resref = r
79 batchable, encresref, resref = r
80 encresref.set(encres)
80 encresref.set(encres)
81 resref.set(batchable.next())
81 resref.set(batchable.next())
82
82
83 def batchable(f):
83 def batchable(f):
84 '''annotation for batchable methods
84 '''annotation for batchable methods
85
85
86 Such methods must implement a coroutine as follows:
86 Such methods must implement a coroutine as follows:
87
87
88 @batchable
88 @batchable
89 def sample(self, one, two=None):
89 def sample(self, one, two=None):
90 # Handle locally computable results first:
90 # Handle locally computable results first:
91 if not one:
91 if not one:
92 yield "a local result", None
92 yield "a local result", None
93 # Build list of encoded arguments suitable for your wire protocol:
93 # Build list of encoded arguments suitable for your wire protocol:
94 encargs = [('one', encode(one),), ('two', encode(two),)]
94 encargs = [('one', encode(one),), ('two', encode(two),)]
95 # Create future for injection of encoded result:
95 # Create future for injection of encoded result:
96 encresref = future()
96 encresref = future()
97 # Return encoded arguments and future:
97 # Return encoded arguments and future:
98 yield encargs, encresref
98 yield encargs, encresref
99 # Assuming the future to be filled with the result from the batched request
99 # Assuming the future to be filled with the result from the batched request
100 # now. Decode it:
100 # now. Decode it:
101 yield decode(encresref.value)
101 yield decode(encresref.value)
102
102
103 The decorator returns a function which wraps this coroutine as a plain method,
103 The decorator returns a function which wraps this coroutine as a plain method,
104 but adds the original method as an attribute called "batchable", which is
104 but adds the original method as an attribute called "batchable", which is
105 used by remotebatch to split the call into separate encoding and decoding
105 used by remotebatch to split the call into separate encoding and decoding
106 phases.
106 phases.
107 '''
107 '''
108 def plain(*args, **opts):
108 def plain(*args, **opts):
109 batchable = f(*args, **opts)
109 batchable = f(*args, **opts)
110 encargsorres, encresref = batchable.next()
110 encargsorres, encresref = batchable.next()
111 if not encresref:
111 if not encresref:
112 return encargsorres # a local result in this case
112 return encargsorres # a local result in this case
113 self = args[0]
113 self = args[0]
114 encresref.set(self._submitone(f.func_name, encargsorres))
114 encresref.set(self._submitone(f.func_name, encargsorres))
115 return batchable.next()
115 return batchable.next()
116 setattr(plain, 'batchable', f)
116 setattr(plain, 'batchable', f)
117 return plain
117 return plain
118
118
119 # list of nodes encoding / decoding
119 # list of nodes encoding / decoding
120
120
121 def decodelist(l, sep=' '):
121 def decodelist(l, sep=' '):
122 if l:
122 if l:
123 return map(bin, l.split(sep))
123 return map(bin, l.split(sep))
124 return []
124 return []
125
125
126 def encodelist(l, sep=' '):
126 def encodelist(l, sep=' '):
127 return sep.join(map(hex, l))
127 return sep.join(map(hex, l))
128
128
129 # batched call argument encoding
129 # batched call argument encoding
130
130
131 def escapearg(plain):
131 def escapearg(plain):
132 return (plain
132 return (plain
133 .replace(':', '::')
133 .replace(':', '::')
134 .replace(',', ':,')
134 .replace(',', ':,')
135 .replace(';', ':;')
135 .replace(';', ':;')
136 .replace('=', ':='))
136 .replace('=', ':='))
137
137
138 def unescapearg(escaped):
138 def unescapearg(escaped):
139 return (escaped
139 return (escaped
140 .replace(':=', '=')
140 .replace(':=', '=')
141 .replace(':;', ';')
141 .replace(':;', ';')
142 .replace(':,', ',')
142 .replace(':,', ',')
143 .replace('::', ':'))
143 .replace('::', ':'))
144
144
145 # client side
145 # client side
146
146
147 def todict(**args):
147 def todict(**args):
148 return args
148 return args
149
149
150 class wirerepository(repo.repository):
150 class wirerepository(repo.repository):
151
151
152 def batch(self):
152 def batch(self):
153 return remotebatch(self)
153 return remotebatch(self)
154 def _submitbatch(self, req):
154 def _submitbatch(self, req):
155 cmds = []
155 cmds = []
156 for op, argsdict in req:
156 for op, argsdict in req:
157 args = ','.join('%s=%s' % p for p in argsdict.iteritems())
157 args = ','.join('%s=%s' % p for p in argsdict.iteritems())
158 cmds.append('%s %s' % (op, args))
158 cmds.append('%s %s' % (op, args))
159 rsp = self._call("batch", cmds=';'.join(cmds))
159 rsp = self._call("batch", cmds=';'.join(cmds))
160 return rsp.split(';')
160 return rsp.split(';')
161 def _submitone(self, op, args):
161 def _submitone(self, op, args):
162 return self._call(op, **args)
162 return self._call(op, **args)
163
163
164 @batchable
164 @batchable
165 def lookup(self, key):
165 def lookup(self, key):
166 self.requirecap('lookup', _('look up remote revision'))
166 self.requirecap('lookup', _('look up remote revision'))
167 f = future()
167 f = future()
168 yield todict(key=encoding.fromlocal(key)), f
168 yield todict(key=encoding.fromlocal(key)), f
169 d = f.value
169 d = f.value
170 success, data = d[:-1].split(" ", 1)
170 success, data = d[:-1].split(" ", 1)
171 if int(success):
171 if int(success):
172 yield bin(data)
172 yield bin(data)
173 self._abort(error.RepoError(data))
173 self._abort(error.RepoError(data))
174
174
175 @batchable
175 @batchable
176 def heads(self):
176 def heads(self):
177 f = future()
177 f = future()
178 yield {}, f
178 yield {}, f
179 d = f.value
179 d = f.value
180 try:
180 try:
181 yield decodelist(d[:-1])
181 yield decodelist(d[:-1])
182 except ValueError:
182 except ValueError:
183 self._abort(error.ResponseError(_("unexpected response:"), d))
183 self._abort(error.ResponseError(_("unexpected response:"), d))
184
184
185 @batchable
185 @batchable
186 def known(self, nodes):
186 def known(self, nodes):
187 f = future()
187 f = future()
188 yield todict(nodes=encodelist(nodes)), f
188 yield todict(nodes=encodelist(nodes)), f
189 d = f.value
189 d = f.value
190 try:
190 try:
191 yield [bool(int(f)) for f in d]
191 yield [bool(int(f)) for f in d]
192 except ValueError:
192 except ValueError:
193 self._abort(error.ResponseError(_("unexpected response:"), d))
193 self._abort(error.ResponseError(_("unexpected response:"), d))
194
194
195 @batchable
195 @batchable
196 def branchmap(self):
196 def branchmap(self):
197 f = future()
197 f = future()
198 yield {}, f
198 yield {}, f
199 d = f.value
199 d = f.value
200 try:
200 try:
201 branchmap = {}
201 branchmap = {}
202 for branchpart in d.splitlines():
202 for branchpart in d.splitlines():
203 branchname, branchheads = branchpart.split(' ', 1)
203 branchname, branchheads = branchpart.split(' ', 1)
204 branchname = encoding.tolocal(urllib.unquote(branchname))
204 branchname = encoding.tolocal(urllib.unquote(branchname))
205 branchheads = decodelist(branchheads)
205 branchheads = decodelist(branchheads)
206 branchmap[branchname] = branchheads
206 branchmap[branchname] = branchheads
207 yield branchmap
207 yield branchmap
208 except TypeError:
208 except TypeError:
209 self._abort(error.ResponseError(_("unexpected response:"), d))
209 self._abort(error.ResponseError(_("unexpected response:"), d))
210
210
211 def branches(self, nodes):
211 def branches(self, nodes):
212 n = encodelist(nodes)
212 n = encodelist(nodes)
213 d = self._call("branches", nodes=n)
213 d = self._call("branches", nodes=n)
214 try:
214 try:
215 br = [tuple(decodelist(b)) for b in d.splitlines()]
215 br = [tuple(decodelist(b)) for b in d.splitlines()]
216 return br
216 return br
217 except ValueError:
217 except ValueError:
218 self._abort(error.ResponseError(_("unexpected response:"), d))
218 self._abort(error.ResponseError(_("unexpected response:"), d))
219
219
220 def between(self, pairs):
220 def between(self, pairs):
221 batch = 8 # avoid giant requests
221 batch = 8 # avoid giant requests
222 r = []
222 r = []
223 for i in xrange(0, len(pairs), batch):
223 for i in xrange(0, len(pairs), batch):
224 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
224 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
225 d = self._call("between", pairs=n)
225 d = self._call("between", pairs=n)
226 try:
226 try:
227 r.extend(l and decodelist(l) or [] for l in d.splitlines())
227 r.extend(l and decodelist(l) or [] for l in d.splitlines())
228 except ValueError:
228 except ValueError:
229 self._abort(error.ResponseError(_("unexpected response:"), d))
229 self._abort(error.ResponseError(_("unexpected response:"), d))
230 return r
230 return r
231
231
232 @batchable
232 @batchable
233 def pushkey(self, namespace, key, old, new):
233 def pushkey(self, namespace, key, old, new):
234 if not self.capable('pushkey'):
234 if not self.capable('pushkey'):
235 yield False, None
235 yield False, None
236 f = future()
236 f = future()
237 yield todict(namespace=encoding.fromlocal(namespace),
237 yield todict(namespace=encoding.fromlocal(namespace),
238 key=encoding.fromlocal(key),
238 key=encoding.fromlocal(key),
239 old=encoding.fromlocal(old),
239 old=encoding.fromlocal(old),
240 new=encoding.fromlocal(new)), f
240 new=encoding.fromlocal(new)), f
241 d = f.value
241 d = f.value
242 d, output = d.split('\n', 1)
242 try:
243 try:
243 d = bool(int(d))
244 d = bool(int(d))
244 except ValueError:
245 except ValueError:
245 raise error.ResponseError(
246 raise error.ResponseError(
246 _('push failed (unexpected response):'), d)
247 _('push failed (unexpected response):'), d)
248 for l in output.splitlines(True):
249 self.ui.status(_('remote: '), l)
247 yield d
250 yield d
248
251
249 @batchable
252 @batchable
250 def listkeys(self, namespace):
253 def listkeys(self, namespace):
251 if not self.capable('pushkey'):
254 if not self.capable('pushkey'):
252 yield {}, None
255 yield {}, None
253 f = future()
256 f = future()
254 yield todict(namespace=encoding.fromlocal(namespace)), f
257 yield todict(namespace=encoding.fromlocal(namespace)), f
255 d = f.value
258 d = f.value
256 r = {}
259 r = {}
257 for l in d.splitlines():
260 for l in d.splitlines():
258 k, v = l.split('\t')
261 k, v = l.split('\t')
259 r[encoding.tolocal(k)] = encoding.tolocal(v)
262 r[encoding.tolocal(k)] = encoding.tolocal(v)
260 yield r
263 yield r
261
264
262 def stream_out(self):
265 def stream_out(self):
263 return self._callstream('stream_out')
266 return self._callstream('stream_out')
264
267
265 def changegroup(self, nodes, kind):
268 def changegroup(self, nodes, kind):
266 n = encodelist(nodes)
269 n = encodelist(nodes)
267 f = self._callstream("changegroup", roots=n)
270 f = self._callstream("changegroup", roots=n)
268 return changegroupmod.unbundle10(self._decompress(f), 'UN')
271 return changegroupmod.unbundle10(self._decompress(f), 'UN')
269
272
270 def changegroupsubset(self, bases, heads, kind):
273 def changegroupsubset(self, bases, heads, kind):
271 self.requirecap('changegroupsubset', _('look up remote changes'))
274 self.requirecap('changegroupsubset', _('look up remote changes'))
272 bases = encodelist(bases)
275 bases = encodelist(bases)
273 heads = encodelist(heads)
276 heads = encodelist(heads)
274 f = self._callstream("changegroupsubset",
277 f = self._callstream("changegroupsubset",
275 bases=bases, heads=heads)
278 bases=bases, heads=heads)
276 return changegroupmod.unbundle10(self._decompress(f), 'UN')
279 return changegroupmod.unbundle10(self._decompress(f), 'UN')
277
280
278 def getbundle(self, source, heads=None, common=None):
281 def getbundle(self, source, heads=None, common=None):
279 self.requirecap('getbundle', _('look up remote changes'))
282 self.requirecap('getbundle', _('look up remote changes'))
280 opts = {}
283 opts = {}
281 if heads is not None:
284 if heads is not None:
282 opts['heads'] = encodelist(heads)
285 opts['heads'] = encodelist(heads)
283 if common is not None:
286 if common is not None:
284 opts['common'] = encodelist(common)
287 opts['common'] = encodelist(common)
285 f = self._callstream("getbundle", **opts)
288 f = self._callstream("getbundle", **opts)
286 return changegroupmod.unbundle10(self._decompress(f), 'UN')
289 return changegroupmod.unbundle10(self._decompress(f), 'UN')
287
290
288 def unbundle(self, cg, heads, source):
291 def unbundle(self, cg, heads, source):
289 '''Send cg (a readable file-like object representing the
292 '''Send cg (a readable file-like object representing the
290 changegroup to push, typically a chunkbuffer object) to the
293 changegroup to push, typically a chunkbuffer object) to the
291 remote server as a bundle. Return an integer indicating the
294 remote server as a bundle. Return an integer indicating the
292 result of the push (see localrepository.addchangegroup()).'''
295 result of the push (see localrepository.addchangegroup()).'''
293
296
294 if heads != ['force'] and self.capable('unbundlehash'):
297 if heads != ['force'] and self.capable('unbundlehash'):
295 heads = encodelist(['hashed',
298 heads = encodelist(['hashed',
296 util.sha1(''.join(sorted(heads))).digest()])
299 util.sha1(''.join(sorted(heads))).digest()])
297 else:
300 else:
298 heads = encodelist(heads)
301 heads = encodelist(heads)
299
302
300 ret, output = self._callpush("unbundle", cg, heads=heads)
303 ret, output = self._callpush("unbundle", cg, heads=heads)
301 if ret == "":
304 if ret == "":
302 raise error.ResponseError(
305 raise error.ResponseError(
303 _('push failed:'), output)
306 _('push failed:'), output)
304 try:
307 try:
305 ret = int(ret)
308 ret = int(ret)
306 except ValueError:
309 except ValueError:
307 raise error.ResponseError(
310 raise error.ResponseError(
308 _('push failed (unexpected response):'), ret)
311 _('push failed (unexpected response):'), ret)
309
312
310 for l in output.splitlines(True):
313 for l in output.splitlines(True):
311 self.ui.status(_('remote: '), l)
314 self.ui.status(_('remote: '), l)
312 return ret
315 return ret
313
316
314 def debugwireargs(self, one, two, three=None, four=None, five=None):
317 def debugwireargs(self, one, two, three=None, four=None, five=None):
315 # don't pass optional arguments left at their default value
318 # don't pass optional arguments left at their default value
316 opts = {}
319 opts = {}
317 if three is not None:
320 if three is not None:
318 opts['three'] = three
321 opts['three'] = three
319 if four is not None:
322 if four is not None:
320 opts['four'] = four
323 opts['four'] = four
321 return self._call('debugwireargs', one=one, two=two, **opts)
324 return self._call('debugwireargs', one=one, two=two, **opts)
322
325
323 # server side
326 # server side
324
327
325 class streamres(object):
328 class streamres(object):
326 def __init__(self, gen):
329 def __init__(self, gen):
327 self.gen = gen
330 self.gen = gen
328
331
329 class pushres(object):
332 class pushres(object):
330 def __init__(self, res):
333 def __init__(self, res):
331 self.res = res
334 self.res = res
332
335
333 class pusherr(object):
336 class pusherr(object):
334 def __init__(self, res):
337 def __init__(self, res):
335 self.res = res
338 self.res = res
336
339
337 class ooberror(object):
340 class ooberror(object):
338 def __init__(self, message):
341 def __init__(self, message):
339 self.message = message
342 self.message = message
340
343
341 def dispatch(repo, proto, command):
344 def dispatch(repo, proto, command):
342 func, spec = commands[command]
345 func, spec = commands[command]
343 args = proto.getargs(spec)
346 args = proto.getargs(spec)
344 return func(repo, proto, *args)
347 return func(repo, proto, *args)
345
348
346 def options(cmd, keys, others):
349 def options(cmd, keys, others):
347 opts = {}
350 opts = {}
348 for k in keys:
351 for k in keys:
349 if k in others:
352 if k in others:
350 opts[k] = others[k]
353 opts[k] = others[k]
351 del others[k]
354 del others[k]
352 if others:
355 if others:
353 sys.stderr.write("abort: %s got unexpected arguments %s\n"
356 sys.stderr.write("abort: %s got unexpected arguments %s\n"
354 % (cmd, ",".join(others)))
357 % (cmd, ",".join(others)))
355 return opts
358 return opts
356
359
357 def batch(repo, proto, cmds, others):
360 def batch(repo, proto, cmds, others):
358 res = []
361 res = []
359 for pair in cmds.split(';'):
362 for pair in cmds.split(';'):
360 op, args = pair.split(' ', 1)
363 op, args = pair.split(' ', 1)
361 vals = {}
364 vals = {}
362 for a in args.split(','):
365 for a in args.split(','):
363 if a:
366 if a:
364 n, v = a.split('=')
367 n, v = a.split('=')
365 vals[n] = unescapearg(v)
368 vals[n] = unescapearg(v)
366 func, spec = commands[op]
369 func, spec = commands[op]
367 if spec:
370 if spec:
368 keys = spec.split()
371 keys = spec.split()
369 data = {}
372 data = {}
370 for k in keys:
373 for k in keys:
371 if k == '*':
374 if k == '*':
372 star = {}
375 star = {}
373 for key in vals.keys():
376 for key in vals.keys():
374 if key not in keys:
377 if key not in keys:
375 star[key] = vals[key]
378 star[key] = vals[key]
376 data['*'] = star
379 data['*'] = star
377 else:
380 else:
378 data[k] = vals[k]
381 data[k] = vals[k]
379 result = func(repo, proto, *[data[k] for k in keys])
382 result = func(repo, proto, *[data[k] for k in keys])
380 else:
383 else:
381 result = func(repo, proto)
384 result = func(repo, proto)
382 if isinstance(result, ooberror):
385 if isinstance(result, ooberror):
383 return result
386 return result
384 res.append(escapearg(result))
387 res.append(escapearg(result))
385 return ';'.join(res)
388 return ';'.join(res)
386
389
387 def between(repo, proto, pairs):
390 def between(repo, proto, pairs):
388 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
391 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
389 r = []
392 r = []
390 for b in repo.between(pairs):
393 for b in repo.between(pairs):
391 r.append(encodelist(b) + "\n")
394 r.append(encodelist(b) + "\n")
392 return "".join(r)
395 return "".join(r)
393
396
394 def branchmap(repo, proto):
397 def branchmap(repo, proto):
395 branchmap = repo.branchmap()
398 branchmap = repo.branchmap()
396 heads = []
399 heads = []
397 for branch, nodes in branchmap.iteritems():
400 for branch, nodes in branchmap.iteritems():
398 branchname = urllib.quote(encoding.fromlocal(branch))
401 branchname = urllib.quote(encoding.fromlocal(branch))
399 branchnodes = encodelist(nodes)
402 branchnodes = encodelist(nodes)
400 heads.append('%s %s' % (branchname, branchnodes))
403 heads.append('%s %s' % (branchname, branchnodes))
401 return '\n'.join(heads)
404 return '\n'.join(heads)
402
405
403 def branches(repo, proto, nodes):
406 def branches(repo, proto, nodes):
404 nodes = decodelist(nodes)
407 nodes = decodelist(nodes)
405 r = []
408 r = []
406 for b in repo.branches(nodes):
409 for b in repo.branches(nodes):
407 r.append(encodelist(b) + "\n")
410 r.append(encodelist(b) + "\n")
408 return "".join(r)
411 return "".join(r)
409
412
410 def capabilities(repo, proto):
413 def capabilities(repo, proto):
411 caps = ('lookup changegroupsubset branchmap pushkey known getbundle '
414 caps = ('lookup changegroupsubset branchmap pushkey known getbundle '
412 'unbundlehash batch').split()
415 'unbundlehash batch').split()
413 if _allowstream(repo.ui):
416 if _allowstream(repo.ui):
414 requiredformats = repo.requirements & repo.supportedformats
417 requiredformats = repo.requirements & repo.supportedformats
415 # if our local revlogs are just revlogv1, add 'stream' cap
418 # if our local revlogs are just revlogv1, add 'stream' cap
416 if not requiredformats - set(('revlogv1',)):
419 if not requiredformats - set(('revlogv1',)):
417 caps.append('stream')
420 caps.append('stream')
418 # otherwise, add 'streamreqs' detailing our local revlog format
421 # otherwise, add 'streamreqs' detailing our local revlog format
419 else:
422 else:
420 caps.append('streamreqs=%s' % ','.join(requiredformats))
423 caps.append('streamreqs=%s' % ','.join(requiredformats))
421 caps.append('unbundle=%s' % ','.join(changegroupmod.bundlepriority))
424 caps.append('unbundle=%s' % ','.join(changegroupmod.bundlepriority))
422 caps.append('httpheader=1024')
425 caps.append('httpheader=1024')
423 return ' '.join(caps)
426 return ' '.join(caps)
424
427
425 def changegroup(repo, proto, roots):
428 def changegroup(repo, proto, roots):
426 nodes = decodelist(roots)
429 nodes = decodelist(roots)
427 cg = repo.changegroup(nodes, 'serve')
430 cg = repo.changegroup(nodes, 'serve')
428 return streamres(proto.groupchunks(cg))
431 return streamres(proto.groupchunks(cg))
429
432
430 def changegroupsubset(repo, proto, bases, heads):
433 def changegroupsubset(repo, proto, bases, heads):
431 bases = decodelist(bases)
434 bases = decodelist(bases)
432 heads = decodelist(heads)
435 heads = decodelist(heads)
433 cg = repo.changegroupsubset(bases, heads, 'serve')
436 cg = repo.changegroupsubset(bases, heads, 'serve')
434 return streamres(proto.groupchunks(cg))
437 return streamres(proto.groupchunks(cg))
435
438
436 def debugwireargs(repo, proto, one, two, others):
439 def debugwireargs(repo, proto, one, two, others):
437 # only accept optional args from the known set
440 # only accept optional args from the known set
438 opts = options('debugwireargs', ['three', 'four'], others)
441 opts = options('debugwireargs', ['three', 'four'], others)
439 return repo.debugwireargs(one, two, **opts)
442 return repo.debugwireargs(one, two, **opts)
440
443
441 def getbundle(repo, proto, others):
444 def getbundle(repo, proto, others):
442 opts = options('getbundle', ['heads', 'common'], others)
445 opts = options('getbundle', ['heads', 'common'], others)
443 for k, v in opts.iteritems():
446 for k, v in opts.iteritems():
444 opts[k] = decodelist(v)
447 opts[k] = decodelist(v)
445 cg = repo.getbundle('serve', **opts)
448 cg = repo.getbundle('serve', **opts)
446 return streamres(proto.groupchunks(cg))
449 return streamres(proto.groupchunks(cg))
447
450
448 def heads(repo, proto):
451 def heads(repo, proto):
449 h = repo.heads()
452 h = repo.heads()
450 return encodelist(h) + "\n"
453 return encodelist(h) + "\n"
451
454
452 def hello(repo, proto):
455 def hello(repo, proto):
453 '''the hello command returns a set of lines describing various
456 '''the hello command returns a set of lines describing various
454 interesting things about the server, in an RFC822-like format.
457 interesting things about the server, in an RFC822-like format.
455 Currently the only one defined is "capabilities", which
458 Currently the only one defined is "capabilities", which
456 consists of a line in the form:
459 consists of a line in the form:
457
460
458 capabilities: space separated list of tokens
461 capabilities: space separated list of tokens
459 '''
462 '''
460 return "capabilities: %s\n" % (capabilities(repo, proto))
463 return "capabilities: %s\n" % (capabilities(repo, proto))
461
464
462 def listkeys(repo, proto, namespace):
465 def listkeys(repo, proto, namespace):
463 d = repo.listkeys(encoding.tolocal(namespace)).items()
466 d = repo.listkeys(encoding.tolocal(namespace)).items()
464 t = '\n'.join(['%s\t%s' % (encoding.fromlocal(k), encoding.fromlocal(v))
467 t = '\n'.join(['%s\t%s' % (encoding.fromlocal(k), encoding.fromlocal(v))
465 for k, v in d])
468 for k, v in d])
466 return t
469 return t
467
470
468 def lookup(repo, proto, key):
471 def lookup(repo, proto, key):
469 try:
472 try:
470 r = hex(repo.lookup(encoding.tolocal(key)))
473 r = hex(repo.lookup(encoding.tolocal(key)))
471 success = 1
474 success = 1
472 except Exception, inst:
475 except Exception, inst:
473 r = str(inst)
476 r = str(inst)
474 success = 0
477 success = 0
475 return "%s %s\n" % (success, r)
478 return "%s %s\n" % (success, r)
476
479
477 def known(repo, proto, nodes, others):
480 def known(repo, proto, nodes, others):
478 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
481 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
479
482
480 def pushkey(repo, proto, namespace, key, old, new):
483 def pushkey(repo, proto, namespace, key, old, new):
481 # compatibility with pre-1.8 clients which were accidentally
484 # compatibility with pre-1.8 clients which were accidentally
482 # sending raw binary nodes rather than utf-8-encoded hex
485 # sending raw binary nodes rather than utf-8-encoded hex
483 if len(new) == 20 and new.encode('string-escape') != new:
486 if len(new) == 20 and new.encode('string-escape') != new:
484 # looks like it could be a binary node
487 # looks like it could be a binary node
485 try:
488 try:
486 new.decode('utf-8')
489 new.decode('utf-8')
487 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
490 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
488 except UnicodeDecodeError:
491 except UnicodeDecodeError:
489 pass # binary, leave unmodified
492 pass # binary, leave unmodified
490 else:
493 else:
491 new = encoding.tolocal(new) # normal path
494 new = encoding.tolocal(new) # normal path
492
495
493 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
496 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
494 encoding.tolocal(old), new)
497 encoding.tolocal(old), new)
495 return '%s\n' % int(r)
498 return '%s\n' % int(r)
496
499
497 def _allowstream(ui):
500 def _allowstream(ui):
498 return ui.configbool('server', 'uncompressed', True, untrusted=True)
501 return ui.configbool('server', 'uncompressed', True, untrusted=True)
499
502
500 def stream(repo, proto):
503 def stream(repo, proto):
501 '''If the server supports streaming clone, it advertises the "stream"
504 '''If the server supports streaming clone, it advertises the "stream"
502 capability with a value representing the version and flags of the repo
505 capability with a value representing the version and flags of the repo
503 it is serving. Client checks to see if it understands the format.
506 it is serving. Client checks to see if it understands the format.
504
507
505 The format is simple: the server writes out a line with the amount
508 The format is simple: the server writes out a line with the amount
506 of files, then the total amount of bytes to be transfered (separated
509 of files, then the total amount of bytes to be transfered (separated
507 by a space). Then, for each file, the server first writes the filename
510 by a space). Then, for each file, the server first writes the filename
508 and filesize (separated by the null character), then the file contents.
511 and filesize (separated by the null character), then the file contents.
509 '''
512 '''
510
513
511 if not _allowstream(repo.ui):
514 if not _allowstream(repo.ui):
512 return '1\n'
515 return '1\n'
513
516
514 entries = []
517 entries = []
515 total_bytes = 0
518 total_bytes = 0
516 try:
519 try:
517 # get consistent snapshot of repo, lock during scan
520 # get consistent snapshot of repo, lock during scan
518 lock = repo.lock()
521 lock = repo.lock()
519 try:
522 try:
520 repo.ui.debug('scanning\n')
523 repo.ui.debug('scanning\n')
521 for name, ename, size in repo.store.walk():
524 for name, ename, size in repo.store.walk():
522 entries.append((name, size))
525 entries.append((name, size))
523 total_bytes += size
526 total_bytes += size
524 finally:
527 finally:
525 lock.release()
528 lock.release()
526 except error.LockError:
529 except error.LockError:
527 return '2\n' # error: 2
530 return '2\n' # error: 2
528
531
529 def streamer(repo, entries, total):
532 def streamer(repo, entries, total):
530 '''stream out all metadata files in repository.'''
533 '''stream out all metadata files in repository.'''
531 yield '0\n' # success
534 yield '0\n' # success
532 repo.ui.debug('%d files, %d bytes to transfer\n' %
535 repo.ui.debug('%d files, %d bytes to transfer\n' %
533 (len(entries), total_bytes))
536 (len(entries), total_bytes))
534 yield '%d %d\n' % (len(entries), total_bytes)
537 yield '%d %d\n' % (len(entries), total_bytes)
535 for name, size in entries:
538 for name, size in entries:
536 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
539 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
537 # partially encode name over the wire for backwards compat
540 # partially encode name over the wire for backwards compat
538 yield '%s\0%d\n' % (store.encodedir(name), size)
541 yield '%s\0%d\n' % (store.encodedir(name), size)
539 for chunk in util.filechunkiter(repo.sopener(name), limit=size):
542 for chunk in util.filechunkiter(repo.sopener(name), limit=size):
540 yield chunk
543 yield chunk
541
544
542 return streamres(streamer(repo, entries, total_bytes))
545 return streamres(streamer(repo, entries, total_bytes))
543
546
544 def unbundle(repo, proto, heads):
547 def unbundle(repo, proto, heads):
545 their_heads = decodelist(heads)
548 their_heads = decodelist(heads)
546
549
547 def check_heads():
550 def check_heads():
548 heads = repo.heads()
551 heads = repo.heads()
549 heads_hash = util.sha1(''.join(sorted(heads))).digest()
552 heads_hash = util.sha1(''.join(sorted(heads))).digest()
550 return (their_heads == ['force'] or their_heads == heads or
553 return (their_heads == ['force'] or their_heads == heads or
551 their_heads == ['hashed', heads_hash])
554 their_heads == ['hashed', heads_hash])
552
555
553 proto.redirect()
556 proto.redirect()
554
557
555 # fail early if possible
558 # fail early if possible
556 if not check_heads():
559 if not check_heads():
557 return pusherr('unsynced changes')
560 return pusherr('unsynced changes')
558
561
559 # write bundle data to temporary file because it can be big
562 # write bundle data to temporary file because it can be big
560 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
563 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
561 fp = os.fdopen(fd, 'wb+')
564 fp = os.fdopen(fd, 'wb+')
562 r = 0
565 r = 0
563 try:
566 try:
564 proto.getfile(fp)
567 proto.getfile(fp)
565 lock = repo.lock()
568 lock = repo.lock()
566 try:
569 try:
567 if not check_heads():
570 if not check_heads():
568 # someone else committed/pushed/unbundled while we
571 # someone else committed/pushed/unbundled while we
569 # were transferring data
572 # were transferring data
570 return pusherr('unsynced changes')
573 return pusherr('unsynced changes')
571
574
572 # push can proceed
575 # push can proceed
573 fp.seek(0)
576 fp.seek(0)
574 gen = changegroupmod.readbundle(fp, None)
577 gen = changegroupmod.readbundle(fp, None)
575
578
576 try:
579 try:
577 r = repo.addchangegroup(gen, 'serve', proto._client())
580 r = repo.addchangegroup(gen, 'serve', proto._client())
578 except util.Abort, inst:
581 except util.Abort, inst:
579 sys.stderr.write("abort: %s\n" % inst)
582 sys.stderr.write("abort: %s\n" % inst)
580 finally:
583 finally:
581 lock.release()
584 lock.release()
582 return pushres(r)
585 return pushres(r)
583
586
584 finally:
587 finally:
585 fp.close()
588 fp.close()
586 os.unlink(tempname)
589 os.unlink(tempname)
587
590
588 commands = {
591 commands = {
589 'batch': (batch, 'cmds *'),
592 'batch': (batch, 'cmds *'),
590 'between': (between, 'pairs'),
593 'between': (between, 'pairs'),
591 'branchmap': (branchmap, ''),
594 'branchmap': (branchmap, ''),
592 'branches': (branches, 'nodes'),
595 'branches': (branches, 'nodes'),
593 'capabilities': (capabilities, ''),
596 'capabilities': (capabilities, ''),
594 'changegroup': (changegroup, 'roots'),
597 'changegroup': (changegroup, 'roots'),
595 'changegroupsubset': (changegroupsubset, 'bases heads'),
598 'changegroupsubset': (changegroupsubset, 'bases heads'),
596 'debugwireargs': (debugwireargs, 'one two *'),
599 'debugwireargs': (debugwireargs, 'one two *'),
597 'getbundle': (getbundle, '*'),
600 'getbundle': (getbundle, '*'),
598 'heads': (heads, ''),
601 'heads': (heads, ''),
599 'hello': (hello, ''),
602 'hello': (hello, ''),
600 'known': (known, 'nodes *'),
603 'known': (known, 'nodes *'),
601 'listkeys': (listkeys, 'namespace'),
604 'listkeys': (listkeys, 'namespace'),
602 'lookup': (lookup, 'key'),
605 'lookup': (lookup, 'key'),
603 'pushkey': (pushkey, 'namespace key old new'),
606 'pushkey': (pushkey, 'namespace key old new'),
604 'stream_out': (stream, ''),
607 'stream_out': (stream, ''),
605 'unbundle': (unbundle, 'heads'),
608 'unbundle': (unbundle, 'heads'),
606 }
609 }
General Comments 0
You need to be logged in to leave comments. Login now