##// END OF EJS Templates
wireproto: use safehasattr or getattr instead of hasattr
Augie Fackler -
r14970:592e45b7 default
parent child Browse files
Show More
@@ -1,602 +1,603 b''
1 # wireproto.py - generic wire protocol support functions
1 # wireproto.py - generic wire protocol support functions
2 #
2 #
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 import urllib, tempfile, os, sys
8 import urllib, tempfile, os, sys
9 from i18n import _
9 from i18n import _
10 from node import bin, hex
10 from node import bin, hex
11 import changegroup as changegroupmod
11 import changegroup as changegroupmod
12 import repo, error, encoding, util, store
12 import repo, error, encoding, util, store
13 import pushkey as pushkeymod
13 import pushkey as pushkeymod
14
14
15 # abstract batching support
15 # abstract batching support
16
16
17 class future(object):
17 class future(object):
18 '''placeholder for a value to be set later'''
18 '''placeholder for a value to be set later'''
19 def set(self, value):
19 def set(self, value):
20 if hasattr(self, 'value'):
20 if util.safehasattr(self, 'value'):
21 raise error.RepoError("future is already set")
21 raise error.RepoError("future is already set")
22 self.value = value
22 self.value = value
23
23
24 class batcher(object):
24 class batcher(object):
25 '''base class for batches of commands submittable in a single request
25 '''base class for batches of commands submittable in a single request
26
26
27 All methods invoked on instances of this class are simply queued and return a
27 All methods invoked on instances of this class are simply queued and return a
28 a future for the result. Once you call submit(), all the queued calls are
28 a future for the result. Once you call submit(), all the queued calls are
29 performed and the results set in their respective futures.
29 performed and the results set in their respective futures.
30 '''
30 '''
31 def __init__(self):
31 def __init__(self):
32 self.calls = []
32 self.calls = []
33 def __getattr__(self, name):
33 def __getattr__(self, name):
34 def call(*args, **opts):
34 def call(*args, **opts):
35 resref = future()
35 resref = future()
36 self.calls.append((name, args, opts, resref,))
36 self.calls.append((name, args, opts, resref,))
37 return resref
37 return resref
38 return call
38 return call
39 def submit(self):
39 def submit(self):
40 pass
40 pass
41
41
42 class localbatch(batcher):
42 class localbatch(batcher):
43 '''performs the queued calls directly'''
43 '''performs the queued calls directly'''
44 def __init__(self, local):
44 def __init__(self, local):
45 batcher.__init__(self)
45 batcher.__init__(self)
46 self.local = local
46 self.local = local
47 def submit(self):
47 def submit(self):
48 for name, args, opts, resref in self.calls:
48 for name, args, opts, resref in self.calls:
49 resref.set(getattr(self.local, name)(*args, **opts))
49 resref.set(getattr(self.local, name)(*args, **opts))
50
50
51 class remotebatch(batcher):
51 class remotebatch(batcher):
52 '''batches the queued calls; uses as few roundtrips as possible'''
52 '''batches the queued calls; uses as few roundtrips as possible'''
53 def __init__(self, remote):
53 def __init__(self, remote):
54 '''remote must support _submitbatch(encbatch) and _submitone(op, encargs)'''
54 '''remote must support _submitbatch(encbatch) and _submitone(op, encargs)'''
55 batcher.__init__(self)
55 batcher.__init__(self)
56 self.remote = remote
56 self.remote = remote
57 def submit(self):
57 def submit(self):
58 req, rsp = [], []
58 req, rsp = [], []
59 for name, args, opts, resref in self.calls:
59 for name, args, opts, resref in self.calls:
60 mtd = getattr(self.remote, name)
60 mtd = getattr(self.remote, name)
61 if hasattr(mtd, 'batchable'):
61 batchablefn = getattr(mtd, 'batchable', None)
62 batchable = getattr(mtd, 'batchable')(mtd.im_self, *args, **opts)
62 if batchablefn is not None:
63 batchable = batchablefn(mtd.im_self, *args, **opts)
63 encargsorres, encresref = batchable.next()
64 encargsorres, encresref = batchable.next()
64 if encresref:
65 if encresref:
65 req.append((name, encargsorres,))
66 req.append((name, encargsorres,))
66 rsp.append((batchable, encresref, resref,))
67 rsp.append((batchable, encresref, resref,))
67 else:
68 else:
68 resref.set(encargsorres)
69 resref.set(encargsorres)
69 else:
70 else:
70 if req:
71 if req:
71 self._submitreq(req, rsp)
72 self._submitreq(req, rsp)
72 req, rsp = [], []
73 req, rsp = [], []
73 resref.set(mtd(*args, **opts))
74 resref.set(mtd(*args, **opts))
74 if req:
75 if req:
75 self._submitreq(req, rsp)
76 self._submitreq(req, rsp)
76 def _submitreq(self, req, rsp):
77 def _submitreq(self, req, rsp):
77 encresults = self.remote._submitbatch(req)
78 encresults = self.remote._submitbatch(req)
78 for encres, r in zip(encresults, rsp):
79 for encres, r in zip(encresults, rsp):
79 batchable, encresref, resref = r
80 batchable, encresref, resref = r
80 encresref.set(encres)
81 encresref.set(encres)
81 resref.set(batchable.next())
82 resref.set(batchable.next())
82
83
83 def batchable(f):
84 def batchable(f):
84 '''annotation for batchable methods
85 '''annotation for batchable methods
85
86
86 Such methods must implement a coroutine as follows:
87 Such methods must implement a coroutine as follows:
87
88
88 @batchable
89 @batchable
89 def sample(self, one, two=None):
90 def sample(self, one, two=None):
90 # Handle locally computable results first:
91 # Handle locally computable results first:
91 if not one:
92 if not one:
92 yield "a local result", None
93 yield "a local result", None
93 # Build list of encoded arguments suitable for your wire protocol:
94 # Build list of encoded arguments suitable for your wire protocol:
94 encargs = [('one', encode(one),), ('two', encode(two),)]
95 encargs = [('one', encode(one),), ('two', encode(two),)]
95 # Create future for injection of encoded result:
96 # Create future for injection of encoded result:
96 encresref = future()
97 encresref = future()
97 # Return encoded arguments and future:
98 # Return encoded arguments and future:
98 yield encargs, encresref
99 yield encargs, encresref
99 # Assuming the future to be filled with the result from the batched request
100 # Assuming the future to be filled with the result from the batched request
100 # now. Decode it:
101 # now. Decode it:
101 yield decode(encresref.value)
102 yield decode(encresref.value)
102
103
103 The decorator returns a function which wraps this coroutine as a plain method,
104 The decorator returns a function which wraps this coroutine as a plain method,
104 but adds the original method as an attribute called "batchable", which is
105 but adds the original method as an attribute called "batchable", which is
105 used by remotebatch to split the call into separate encoding and decoding
106 used by remotebatch to split the call into separate encoding and decoding
106 phases.
107 phases.
107 '''
108 '''
108 def plain(*args, **opts):
109 def plain(*args, **opts):
109 batchable = f(*args, **opts)
110 batchable = f(*args, **opts)
110 encargsorres, encresref = batchable.next()
111 encargsorres, encresref = batchable.next()
111 if not encresref:
112 if not encresref:
112 return encargsorres # a local result in this case
113 return encargsorres # a local result in this case
113 self = args[0]
114 self = args[0]
114 encresref.set(self._submitone(f.func_name, encargsorres))
115 encresref.set(self._submitone(f.func_name, encargsorres))
115 return batchable.next()
116 return batchable.next()
116 setattr(plain, 'batchable', f)
117 setattr(plain, 'batchable', f)
117 return plain
118 return plain
118
119
119 # list of nodes encoding / decoding
120 # list of nodes encoding / decoding
120
121
121 def decodelist(l, sep=' '):
122 def decodelist(l, sep=' '):
122 if l:
123 if l:
123 return map(bin, l.split(sep))
124 return map(bin, l.split(sep))
124 return []
125 return []
125
126
126 def encodelist(l, sep=' '):
127 def encodelist(l, sep=' '):
127 return sep.join(map(hex, l))
128 return sep.join(map(hex, l))
128
129
129 # batched call argument encoding
130 # batched call argument encoding
130
131
131 def escapearg(plain):
132 def escapearg(plain):
132 return (plain
133 return (plain
133 .replace(':', '::')
134 .replace(':', '::')
134 .replace(',', ':,')
135 .replace(',', ':,')
135 .replace(';', ':;')
136 .replace(';', ':;')
136 .replace('=', ':='))
137 .replace('=', ':='))
137
138
138 def unescapearg(escaped):
139 def unescapearg(escaped):
139 return (escaped
140 return (escaped
140 .replace(':=', '=')
141 .replace(':=', '=')
141 .replace(':;', ';')
142 .replace(':;', ';')
142 .replace(':,', ',')
143 .replace(':,', ',')
143 .replace('::', ':'))
144 .replace('::', ':'))
144
145
145 # client side
146 # client side
146
147
147 def todict(**args):
148 def todict(**args):
148 return args
149 return args
149
150
150 class wirerepository(repo.repository):
151 class wirerepository(repo.repository):
151
152
152 def batch(self):
153 def batch(self):
153 return remotebatch(self)
154 return remotebatch(self)
154 def _submitbatch(self, req):
155 def _submitbatch(self, req):
155 cmds = []
156 cmds = []
156 for op, argsdict in req:
157 for op, argsdict in req:
157 args = ','.join('%s=%s' % p for p in argsdict.iteritems())
158 args = ','.join('%s=%s' % p for p in argsdict.iteritems())
158 cmds.append('%s %s' % (op, args))
159 cmds.append('%s %s' % (op, args))
159 rsp = self._call("batch", cmds=';'.join(cmds))
160 rsp = self._call("batch", cmds=';'.join(cmds))
160 return rsp.split(';')
161 return rsp.split(';')
161 def _submitone(self, op, args):
162 def _submitone(self, op, args):
162 return self._call(op, **args)
163 return self._call(op, **args)
163
164
164 @batchable
165 @batchable
165 def lookup(self, key):
166 def lookup(self, key):
166 self.requirecap('lookup', _('look up remote revision'))
167 self.requirecap('lookup', _('look up remote revision'))
167 f = future()
168 f = future()
168 yield todict(key=encoding.fromlocal(key)), f
169 yield todict(key=encoding.fromlocal(key)), f
169 d = f.value
170 d = f.value
170 success, data = d[:-1].split(" ", 1)
171 success, data = d[:-1].split(" ", 1)
171 if int(success):
172 if int(success):
172 yield bin(data)
173 yield bin(data)
173 self._abort(error.RepoError(data))
174 self._abort(error.RepoError(data))
174
175
175 @batchable
176 @batchable
176 def heads(self):
177 def heads(self):
177 f = future()
178 f = future()
178 yield {}, f
179 yield {}, f
179 d = f.value
180 d = f.value
180 try:
181 try:
181 yield decodelist(d[:-1])
182 yield decodelist(d[:-1])
182 except ValueError:
183 except ValueError:
183 self._abort(error.ResponseError(_("unexpected response:"), d))
184 self._abort(error.ResponseError(_("unexpected response:"), d))
184
185
185 @batchable
186 @batchable
186 def known(self, nodes):
187 def known(self, nodes):
187 f = future()
188 f = future()
188 yield todict(nodes=encodelist(nodes)), f
189 yield todict(nodes=encodelist(nodes)), f
189 d = f.value
190 d = f.value
190 try:
191 try:
191 yield [bool(int(f)) for f in d]
192 yield [bool(int(f)) for f in d]
192 except ValueError:
193 except ValueError:
193 self._abort(error.ResponseError(_("unexpected response:"), d))
194 self._abort(error.ResponseError(_("unexpected response:"), d))
194
195
195 @batchable
196 @batchable
196 def branchmap(self):
197 def branchmap(self):
197 f = future()
198 f = future()
198 yield {}, f
199 yield {}, f
199 d = f.value
200 d = f.value
200 try:
201 try:
201 branchmap = {}
202 branchmap = {}
202 for branchpart in d.splitlines():
203 for branchpart in d.splitlines():
203 branchname, branchheads = branchpart.split(' ', 1)
204 branchname, branchheads = branchpart.split(' ', 1)
204 branchname = encoding.tolocal(urllib.unquote(branchname))
205 branchname = encoding.tolocal(urllib.unquote(branchname))
205 branchheads = decodelist(branchheads)
206 branchheads = decodelist(branchheads)
206 branchmap[branchname] = branchheads
207 branchmap[branchname] = branchheads
207 yield branchmap
208 yield branchmap
208 except TypeError:
209 except TypeError:
209 self._abort(error.ResponseError(_("unexpected response:"), d))
210 self._abort(error.ResponseError(_("unexpected response:"), d))
210
211
211 def branches(self, nodes):
212 def branches(self, nodes):
212 n = encodelist(nodes)
213 n = encodelist(nodes)
213 d = self._call("branches", nodes=n)
214 d = self._call("branches", nodes=n)
214 try:
215 try:
215 br = [tuple(decodelist(b)) for b in d.splitlines()]
216 br = [tuple(decodelist(b)) for b in d.splitlines()]
216 return br
217 return br
217 except ValueError:
218 except ValueError:
218 self._abort(error.ResponseError(_("unexpected response:"), d))
219 self._abort(error.ResponseError(_("unexpected response:"), d))
219
220
220 def between(self, pairs):
221 def between(self, pairs):
221 batch = 8 # avoid giant requests
222 batch = 8 # avoid giant requests
222 r = []
223 r = []
223 for i in xrange(0, len(pairs), batch):
224 for i in xrange(0, len(pairs), batch):
224 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
225 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
225 d = self._call("between", pairs=n)
226 d = self._call("between", pairs=n)
226 try:
227 try:
227 r.extend(l and decodelist(l) or [] for l in d.splitlines())
228 r.extend(l and decodelist(l) or [] for l in d.splitlines())
228 except ValueError:
229 except ValueError:
229 self._abort(error.ResponseError(_("unexpected response:"), d))
230 self._abort(error.ResponseError(_("unexpected response:"), d))
230 return r
231 return r
231
232
232 @batchable
233 @batchable
233 def pushkey(self, namespace, key, old, new):
234 def pushkey(self, namespace, key, old, new):
234 if not self.capable('pushkey'):
235 if not self.capable('pushkey'):
235 yield False, None
236 yield False, None
236 f = future()
237 f = future()
237 yield todict(namespace=encoding.fromlocal(namespace),
238 yield todict(namespace=encoding.fromlocal(namespace),
238 key=encoding.fromlocal(key),
239 key=encoding.fromlocal(key),
239 old=encoding.fromlocal(old),
240 old=encoding.fromlocal(old),
240 new=encoding.fromlocal(new)), f
241 new=encoding.fromlocal(new)), f
241 d = f.value
242 d = f.value
242 try:
243 try:
243 d = bool(int(d))
244 d = bool(int(d))
244 except ValueError:
245 except ValueError:
245 raise error.ResponseError(
246 raise error.ResponseError(
246 _('push failed (unexpected response):'), d)
247 _('push failed (unexpected response):'), d)
247 yield d
248 yield d
248
249
249 @batchable
250 @batchable
250 def listkeys(self, namespace):
251 def listkeys(self, namespace):
251 if not self.capable('pushkey'):
252 if not self.capable('pushkey'):
252 yield {}, None
253 yield {}, None
253 f = future()
254 f = future()
254 yield todict(namespace=encoding.fromlocal(namespace)), f
255 yield todict(namespace=encoding.fromlocal(namespace)), f
255 d = f.value
256 d = f.value
256 r = {}
257 r = {}
257 for l in d.splitlines():
258 for l in d.splitlines():
258 k, v = l.split('\t')
259 k, v = l.split('\t')
259 r[encoding.tolocal(k)] = encoding.tolocal(v)
260 r[encoding.tolocal(k)] = encoding.tolocal(v)
260 yield r
261 yield r
261
262
262 def stream_out(self):
263 def stream_out(self):
263 return self._callstream('stream_out')
264 return self._callstream('stream_out')
264
265
265 def changegroup(self, nodes, kind):
266 def changegroup(self, nodes, kind):
266 n = encodelist(nodes)
267 n = encodelist(nodes)
267 f = self._callstream("changegroup", roots=n)
268 f = self._callstream("changegroup", roots=n)
268 return changegroupmod.unbundle10(self._decompress(f), 'UN')
269 return changegroupmod.unbundle10(self._decompress(f), 'UN')
269
270
270 def changegroupsubset(self, bases, heads, kind):
271 def changegroupsubset(self, bases, heads, kind):
271 self.requirecap('changegroupsubset', _('look up remote changes'))
272 self.requirecap('changegroupsubset', _('look up remote changes'))
272 bases = encodelist(bases)
273 bases = encodelist(bases)
273 heads = encodelist(heads)
274 heads = encodelist(heads)
274 f = self._callstream("changegroupsubset",
275 f = self._callstream("changegroupsubset",
275 bases=bases, heads=heads)
276 bases=bases, heads=heads)
276 return changegroupmod.unbundle10(self._decompress(f), 'UN')
277 return changegroupmod.unbundle10(self._decompress(f), 'UN')
277
278
278 def getbundle(self, source, heads=None, common=None):
279 def getbundle(self, source, heads=None, common=None):
279 self.requirecap('getbundle', _('look up remote changes'))
280 self.requirecap('getbundle', _('look up remote changes'))
280 opts = {}
281 opts = {}
281 if heads is not None:
282 if heads is not None:
282 opts['heads'] = encodelist(heads)
283 opts['heads'] = encodelist(heads)
283 if common is not None:
284 if common is not None:
284 opts['common'] = encodelist(common)
285 opts['common'] = encodelist(common)
285 f = self._callstream("getbundle", **opts)
286 f = self._callstream("getbundle", **opts)
286 return changegroupmod.unbundle10(self._decompress(f), 'UN')
287 return changegroupmod.unbundle10(self._decompress(f), 'UN')
287
288
288 def unbundle(self, cg, heads, source):
289 def unbundle(self, cg, heads, source):
289 '''Send cg (a readable file-like object representing the
290 '''Send cg (a readable file-like object representing the
290 changegroup to push, typically a chunkbuffer object) to the
291 changegroup to push, typically a chunkbuffer object) to the
291 remote server as a bundle. Return an integer indicating the
292 remote server as a bundle. Return an integer indicating the
292 result of the push (see localrepository.addchangegroup()).'''
293 result of the push (see localrepository.addchangegroup()).'''
293
294
294 if heads != ['force'] and self.capable('unbundlehash'):
295 if heads != ['force'] and self.capable('unbundlehash'):
295 heads = encodelist(['hashed',
296 heads = encodelist(['hashed',
296 util.sha1(''.join(sorted(heads))).digest()])
297 util.sha1(''.join(sorted(heads))).digest()])
297 else:
298 else:
298 heads = encodelist(heads)
299 heads = encodelist(heads)
299
300
300 ret, output = self._callpush("unbundle", cg, heads=heads)
301 ret, output = self._callpush("unbundle", cg, heads=heads)
301 if ret == "":
302 if ret == "":
302 raise error.ResponseError(
303 raise error.ResponseError(
303 _('push failed:'), output)
304 _('push failed:'), output)
304 try:
305 try:
305 ret = int(ret)
306 ret = int(ret)
306 except ValueError:
307 except ValueError:
307 raise error.ResponseError(
308 raise error.ResponseError(
308 _('push failed (unexpected response):'), ret)
309 _('push failed (unexpected response):'), ret)
309
310
310 for l in output.splitlines(True):
311 for l in output.splitlines(True):
311 self.ui.status(_('remote: '), l)
312 self.ui.status(_('remote: '), l)
312 return ret
313 return ret
313
314
314 def debugwireargs(self, one, two, three=None, four=None, five=None):
315 def debugwireargs(self, one, two, three=None, four=None, five=None):
315 # don't pass optional arguments left at their default value
316 # don't pass optional arguments left at their default value
316 opts = {}
317 opts = {}
317 if three is not None:
318 if three is not None:
318 opts['three'] = three
319 opts['three'] = three
319 if four is not None:
320 if four is not None:
320 opts['four'] = four
321 opts['four'] = four
321 return self._call('debugwireargs', one=one, two=two, **opts)
322 return self._call('debugwireargs', one=one, two=two, **opts)
322
323
323 # server side
324 # server side
324
325
325 class streamres(object):
326 class streamres(object):
326 def __init__(self, gen):
327 def __init__(self, gen):
327 self.gen = gen
328 self.gen = gen
328
329
329 class pushres(object):
330 class pushres(object):
330 def __init__(self, res):
331 def __init__(self, res):
331 self.res = res
332 self.res = res
332
333
333 class pusherr(object):
334 class pusherr(object):
334 def __init__(self, res):
335 def __init__(self, res):
335 self.res = res
336 self.res = res
336
337
337 def dispatch(repo, proto, command):
338 def dispatch(repo, proto, command):
338 func, spec = commands[command]
339 func, spec = commands[command]
339 args = proto.getargs(spec)
340 args = proto.getargs(spec)
340 return func(repo, proto, *args)
341 return func(repo, proto, *args)
341
342
342 def options(cmd, keys, others):
343 def options(cmd, keys, others):
343 opts = {}
344 opts = {}
344 for k in keys:
345 for k in keys:
345 if k in others:
346 if k in others:
346 opts[k] = others[k]
347 opts[k] = others[k]
347 del others[k]
348 del others[k]
348 if others:
349 if others:
349 sys.stderr.write("abort: %s got unexpected arguments %s\n"
350 sys.stderr.write("abort: %s got unexpected arguments %s\n"
350 % (cmd, ",".join(others)))
351 % (cmd, ",".join(others)))
351 return opts
352 return opts
352
353
353 def batch(repo, proto, cmds, others):
354 def batch(repo, proto, cmds, others):
354 res = []
355 res = []
355 for pair in cmds.split(';'):
356 for pair in cmds.split(';'):
356 op, args = pair.split(' ', 1)
357 op, args = pair.split(' ', 1)
357 vals = {}
358 vals = {}
358 for a in args.split(','):
359 for a in args.split(','):
359 if a:
360 if a:
360 n, v = a.split('=')
361 n, v = a.split('=')
361 vals[n] = unescapearg(v)
362 vals[n] = unescapearg(v)
362 func, spec = commands[op]
363 func, spec = commands[op]
363 if spec:
364 if spec:
364 keys = spec.split()
365 keys = spec.split()
365 data = {}
366 data = {}
366 for k in keys:
367 for k in keys:
367 if k == '*':
368 if k == '*':
368 star = {}
369 star = {}
369 for key in vals.keys():
370 for key in vals.keys():
370 if key not in keys:
371 if key not in keys:
371 star[key] = vals[key]
372 star[key] = vals[key]
372 data['*'] = star
373 data['*'] = star
373 else:
374 else:
374 data[k] = vals[k]
375 data[k] = vals[k]
375 result = func(repo, proto, *[data[k] for k in keys])
376 result = func(repo, proto, *[data[k] for k in keys])
376 else:
377 else:
377 result = func(repo, proto)
378 result = func(repo, proto)
378 res.append(escapearg(result))
379 res.append(escapearg(result))
379 return ';'.join(res)
380 return ';'.join(res)
380
381
381 def between(repo, proto, pairs):
382 def between(repo, proto, pairs):
382 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
383 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
383 r = []
384 r = []
384 for b in repo.between(pairs):
385 for b in repo.between(pairs):
385 r.append(encodelist(b) + "\n")
386 r.append(encodelist(b) + "\n")
386 return "".join(r)
387 return "".join(r)
387
388
388 def branchmap(repo, proto):
389 def branchmap(repo, proto):
389 branchmap = repo.branchmap()
390 branchmap = repo.branchmap()
390 heads = []
391 heads = []
391 for branch, nodes in branchmap.iteritems():
392 for branch, nodes in branchmap.iteritems():
392 branchname = urllib.quote(encoding.fromlocal(branch))
393 branchname = urllib.quote(encoding.fromlocal(branch))
393 branchnodes = encodelist(nodes)
394 branchnodes = encodelist(nodes)
394 heads.append('%s %s' % (branchname, branchnodes))
395 heads.append('%s %s' % (branchname, branchnodes))
395 return '\n'.join(heads)
396 return '\n'.join(heads)
396
397
397 def branches(repo, proto, nodes):
398 def branches(repo, proto, nodes):
398 nodes = decodelist(nodes)
399 nodes = decodelist(nodes)
399 r = []
400 r = []
400 for b in repo.branches(nodes):
401 for b in repo.branches(nodes):
401 r.append(encodelist(b) + "\n")
402 r.append(encodelist(b) + "\n")
402 return "".join(r)
403 return "".join(r)
403
404
404 def capabilities(repo, proto):
405 def capabilities(repo, proto):
405 caps = ('lookup changegroupsubset branchmap pushkey known getbundle '
406 caps = ('lookup changegroupsubset branchmap pushkey known getbundle '
406 'unbundlehash batch').split()
407 'unbundlehash batch').split()
407 if _allowstream(repo.ui):
408 if _allowstream(repo.ui):
408 requiredformats = repo.requirements & repo.supportedformats
409 requiredformats = repo.requirements & repo.supportedformats
409 # if our local revlogs are just revlogv1, add 'stream' cap
410 # if our local revlogs are just revlogv1, add 'stream' cap
410 if not requiredformats - set(('revlogv1',)):
411 if not requiredformats - set(('revlogv1',)):
411 caps.append('stream')
412 caps.append('stream')
412 # otherwise, add 'streamreqs' detailing our local revlog format
413 # otherwise, add 'streamreqs' detailing our local revlog format
413 else:
414 else:
414 caps.append('streamreqs=%s' % ','.join(requiredformats))
415 caps.append('streamreqs=%s' % ','.join(requiredformats))
415 caps.append('unbundle=%s' % ','.join(changegroupmod.bundlepriority))
416 caps.append('unbundle=%s' % ','.join(changegroupmod.bundlepriority))
416 caps.append('httpheader=1024')
417 caps.append('httpheader=1024')
417 return ' '.join(caps)
418 return ' '.join(caps)
418
419
419 def changegroup(repo, proto, roots):
420 def changegroup(repo, proto, roots):
420 nodes = decodelist(roots)
421 nodes = decodelist(roots)
421 cg = repo.changegroup(nodes, 'serve')
422 cg = repo.changegroup(nodes, 'serve')
422 return streamres(proto.groupchunks(cg))
423 return streamres(proto.groupchunks(cg))
423
424
424 def changegroupsubset(repo, proto, bases, heads):
425 def changegroupsubset(repo, proto, bases, heads):
425 bases = decodelist(bases)
426 bases = decodelist(bases)
426 heads = decodelist(heads)
427 heads = decodelist(heads)
427 cg = repo.changegroupsubset(bases, heads, 'serve')
428 cg = repo.changegroupsubset(bases, heads, 'serve')
428 return streamres(proto.groupchunks(cg))
429 return streamres(proto.groupchunks(cg))
429
430
430 def debugwireargs(repo, proto, one, two, others):
431 def debugwireargs(repo, proto, one, two, others):
431 # only accept optional args from the known set
432 # only accept optional args from the known set
432 opts = options('debugwireargs', ['three', 'four'], others)
433 opts = options('debugwireargs', ['three', 'four'], others)
433 return repo.debugwireargs(one, two, **opts)
434 return repo.debugwireargs(one, two, **opts)
434
435
435 def getbundle(repo, proto, others):
436 def getbundle(repo, proto, others):
436 opts = options('getbundle', ['heads', 'common'], others)
437 opts = options('getbundle', ['heads', 'common'], others)
437 for k, v in opts.iteritems():
438 for k, v in opts.iteritems():
438 opts[k] = decodelist(v)
439 opts[k] = decodelist(v)
439 cg = repo.getbundle('serve', **opts)
440 cg = repo.getbundle('serve', **opts)
440 return streamres(proto.groupchunks(cg))
441 return streamres(proto.groupchunks(cg))
441
442
442 def heads(repo, proto):
443 def heads(repo, proto):
443 h = repo.heads()
444 h = repo.heads()
444 return encodelist(h) + "\n"
445 return encodelist(h) + "\n"
445
446
446 def hello(repo, proto):
447 def hello(repo, proto):
447 '''the hello command returns a set of lines describing various
448 '''the hello command returns a set of lines describing various
448 interesting things about the server, in an RFC822-like format.
449 interesting things about the server, in an RFC822-like format.
449 Currently the only one defined is "capabilities", which
450 Currently the only one defined is "capabilities", which
450 consists of a line in the form:
451 consists of a line in the form:
451
452
452 capabilities: space separated list of tokens
453 capabilities: space separated list of tokens
453 '''
454 '''
454 return "capabilities: %s\n" % (capabilities(repo, proto))
455 return "capabilities: %s\n" % (capabilities(repo, proto))
455
456
456 def listkeys(repo, proto, namespace):
457 def listkeys(repo, proto, namespace):
457 d = pushkeymod.list(repo, encoding.tolocal(namespace)).items()
458 d = pushkeymod.list(repo, encoding.tolocal(namespace)).items()
458 t = '\n'.join(['%s\t%s' % (encoding.fromlocal(k), encoding.fromlocal(v))
459 t = '\n'.join(['%s\t%s' % (encoding.fromlocal(k), encoding.fromlocal(v))
459 for k, v in d])
460 for k, v in d])
460 return t
461 return t
461
462
462 def lookup(repo, proto, key):
463 def lookup(repo, proto, key):
463 try:
464 try:
464 r = hex(repo.lookup(encoding.tolocal(key)))
465 r = hex(repo.lookup(encoding.tolocal(key)))
465 success = 1
466 success = 1
466 except Exception, inst:
467 except Exception, inst:
467 r = str(inst)
468 r = str(inst)
468 success = 0
469 success = 0
469 return "%s %s\n" % (success, r)
470 return "%s %s\n" % (success, r)
470
471
471 def known(repo, proto, nodes, others):
472 def known(repo, proto, nodes, others):
472 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
473 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
473
474
474 def pushkey(repo, proto, namespace, key, old, new):
475 def pushkey(repo, proto, namespace, key, old, new):
475 # compatibility with pre-1.8 clients which were accidentally
476 # compatibility with pre-1.8 clients which were accidentally
476 # sending raw binary nodes rather than utf-8-encoded hex
477 # sending raw binary nodes rather than utf-8-encoded hex
477 if len(new) == 20 and new.encode('string-escape') != new:
478 if len(new) == 20 and new.encode('string-escape') != new:
478 # looks like it could be a binary node
479 # looks like it could be a binary node
479 try:
480 try:
480 new.decode('utf-8')
481 new.decode('utf-8')
481 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
482 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
482 except UnicodeDecodeError:
483 except UnicodeDecodeError:
483 pass # binary, leave unmodified
484 pass # binary, leave unmodified
484 else:
485 else:
485 new = encoding.tolocal(new) # normal path
486 new = encoding.tolocal(new) # normal path
486
487
487 r = pushkeymod.push(repo,
488 r = pushkeymod.push(repo,
488 encoding.tolocal(namespace), encoding.tolocal(key),
489 encoding.tolocal(namespace), encoding.tolocal(key),
489 encoding.tolocal(old), new)
490 encoding.tolocal(old), new)
490 return '%s\n' % int(r)
491 return '%s\n' % int(r)
491
492
492 def _allowstream(ui):
493 def _allowstream(ui):
493 return ui.configbool('server', 'uncompressed', True, untrusted=True)
494 return ui.configbool('server', 'uncompressed', True, untrusted=True)
494
495
495 def stream(repo, proto):
496 def stream(repo, proto):
496 '''If the server supports streaming clone, it advertises the "stream"
497 '''If the server supports streaming clone, it advertises the "stream"
497 capability with a value representing the version and flags of the repo
498 capability with a value representing the version and flags of the repo
498 it is serving. Client checks to see if it understands the format.
499 it is serving. Client checks to see if it understands the format.
499
500
500 The format is simple: the server writes out a line with the amount
501 The format is simple: the server writes out a line with the amount
501 of files, then the total amount of bytes to be transfered (separated
502 of files, then the total amount of bytes to be transfered (separated
502 by a space). Then, for each file, the server first writes the filename
503 by a space). Then, for each file, the server first writes the filename
503 and filesize (separated by the null character), then the file contents.
504 and filesize (separated by the null character), then the file contents.
504 '''
505 '''
505
506
506 if not _allowstream(repo.ui):
507 if not _allowstream(repo.ui):
507 return '1\n'
508 return '1\n'
508
509
509 entries = []
510 entries = []
510 total_bytes = 0
511 total_bytes = 0
511 try:
512 try:
512 # get consistent snapshot of repo, lock during scan
513 # get consistent snapshot of repo, lock during scan
513 lock = repo.lock()
514 lock = repo.lock()
514 try:
515 try:
515 repo.ui.debug('scanning\n')
516 repo.ui.debug('scanning\n')
516 for name, ename, size in repo.store.walk():
517 for name, ename, size in repo.store.walk():
517 entries.append((name, size))
518 entries.append((name, size))
518 total_bytes += size
519 total_bytes += size
519 finally:
520 finally:
520 lock.release()
521 lock.release()
521 except error.LockError:
522 except error.LockError:
522 return '2\n' # error: 2
523 return '2\n' # error: 2
523
524
524 def streamer(repo, entries, total):
525 def streamer(repo, entries, total):
525 '''stream out all metadata files in repository.'''
526 '''stream out all metadata files in repository.'''
526 yield '0\n' # success
527 yield '0\n' # success
527 repo.ui.debug('%d files, %d bytes to transfer\n' %
528 repo.ui.debug('%d files, %d bytes to transfer\n' %
528 (len(entries), total_bytes))
529 (len(entries), total_bytes))
529 yield '%d %d\n' % (len(entries), total_bytes)
530 yield '%d %d\n' % (len(entries), total_bytes)
530 for name, size in entries:
531 for name, size in entries:
531 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
532 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
532 # partially encode name over the wire for backwards compat
533 # partially encode name over the wire for backwards compat
533 yield '%s\0%d\n' % (store.encodedir(name), size)
534 yield '%s\0%d\n' % (store.encodedir(name), size)
534 for chunk in util.filechunkiter(repo.sopener(name), limit=size):
535 for chunk in util.filechunkiter(repo.sopener(name), limit=size):
535 yield chunk
536 yield chunk
536
537
537 return streamres(streamer(repo, entries, total_bytes))
538 return streamres(streamer(repo, entries, total_bytes))
538
539
539 def unbundle(repo, proto, heads):
540 def unbundle(repo, proto, heads):
540 their_heads = decodelist(heads)
541 their_heads = decodelist(heads)
541
542
542 def check_heads():
543 def check_heads():
543 heads = repo.heads()
544 heads = repo.heads()
544 heads_hash = util.sha1(''.join(sorted(heads))).digest()
545 heads_hash = util.sha1(''.join(sorted(heads))).digest()
545 return (their_heads == ['force'] or their_heads == heads or
546 return (their_heads == ['force'] or their_heads == heads or
546 their_heads == ['hashed', heads_hash])
547 their_heads == ['hashed', heads_hash])
547
548
548 proto.redirect()
549 proto.redirect()
549
550
550 # fail early if possible
551 # fail early if possible
551 if not check_heads():
552 if not check_heads():
552 return pusherr('unsynced changes')
553 return pusherr('unsynced changes')
553
554
554 # write bundle data to temporary file because it can be big
555 # write bundle data to temporary file because it can be big
555 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
556 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
556 fp = os.fdopen(fd, 'wb+')
557 fp = os.fdopen(fd, 'wb+')
557 r = 0
558 r = 0
558 try:
559 try:
559 proto.getfile(fp)
560 proto.getfile(fp)
560 lock = repo.lock()
561 lock = repo.lock()
561 try:
562 try:
562 if not check_heads():
563 if not check_heads():
563 # someone else committed/pushed/unbundled while we
564 # someone else committed/pushed/unbundled while we
564 # were transferring data
565 # were transferring data
565 return pusherr('unsynced changes')
566 return pusherr('unsynced changes')
566
567
567 # push can proceed
568 # push can proceed
568 fp.seek(0)
569 fp.seek(0)
569 gen = changegroupmod.readbundle(fp, None)
570 gen = changegroupmod.readbundle(fp, None)
570
571
571 try:
572 try:
572 r = repo.addchangegroup(gen, 'serve', proto._client(),
573 r = repo.addchangegroup(gen, 'serve', proto._client(),
573 lock=lock)
574 lock=lock)
574 except util.Abort, inst:
575 except util.Abort, inst:
575 sys.stderr.write("abort: %s\n" % inst)
576 sys.stderr.write("abort: %s\n" % inst)
576 finally:
577 finally:
577 lock.release()
578 lock.release()
578 return pushres(r)
579 return pushres(r)
579
580
580 finally:
581 finally:
581 fp.close()
582 fp.close()
582 os.unlink(tempname)
583 os.unlink(tempname)
583
584
584 commands = {
585 commands = {
585 'batch': (batch, 'cmds *'),
586 'batch': (batch, 'cmds *'),
586 'between': (between, 'pairs'),
587 'between': (between, 'pairs'),
587 'branchmap': (branchmap, ''),
588 'branchmap': (branchmap, ''),
588 'branches': (branches, 'nodes'),
589 'branches': (branches, 'nodes'),
589 'capabilities': (capabilities, ''),
590 'capabilities': (capabilities, ''),
590 'changegroup': (changegroup, 'roots'),
591 'changegroup': (changegroup, 'roots'),
591 'changegroupsubset': (changegroupsubset, 'bases heads'),
592 'changegroupsubset': (changegroupsubset, 'bases heads'),
592 'debugwireargs': (debugwireargs, 'one two *'),
593 'debugwireargs': (debugwireargs, 'one two *'),
593 'getbundle': (getbundle, '*'),
594 'getbundle': (getbundle, '*'),
594 'heads': (heads, ''),
595 'heads': (heads, ''),
595 'hello': (hello, ''),
596 'hello': (hello, ''),
596 'known': (known, 'nodes *'),
597 'known': (known, 'nodes *'),
597 'listkeys': (listkeys, 'namespace'),
598 'listkeys': (listkeys, 'namespace'),
598 'lookup': (lookup, 'key'),
599 'lookup': (lookup, 'key'),
599 'pushkey': (pushkey, 'namespace key old new'),
600 'pushkey': (pushkey, 'namespace key old new'),
600 'stream_out': (stream, ''),
601 'stream_out': (stream, ''),
601 'unbundle': (unbundle, 'heads'),
602 'unbundle': (unbundle, 'heads'),
602 }
603 }
General Comments 0
You need to be logged in to leave comments. Login now