##// END OF EJS Templates
wireproto: don't audit local paths during stream_out...
Bryan O'Sullivan -
r17556:39c6e349 default
parent child Browse files
Show More
@@ -1,619 +1,627 b''
1 # wireproto.py - generic wire protocol support functions
1 # wireproto.py - generic wire protocol support functions
2 #
2 #
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 import urllib, tempfile, os, sys
8 import urllib, tempfile, os, sys
9 from i18n import _
9 from i18n import _
10 from node import bin, hex
10 from node import bin, hex
11 import changegroup as changegroupmod
11 import changegroup as changegroupmod
12 import peer, error, encoding, util, store
12 import peer, error, encoding, util, store
13 import discovery, phases
13 import discovery, phases
14
14
15 # abstract batching support
15 # abstract batching support
16
16
17 class future(object):
17 class future(object):
18 '''placeholder for a value to be set later'''
18 '''placeholder for a value to be set later'''
19 def set(self, value):
19 def set(self, value):
20 if util.safehasattr(self, 'value'):
20 if util.safehasattr(self, 'value'):
21 raise error.RepoError("future is already set")
21 raise error.RepoError("future is already set")
22 self.value = value
22 self.value = value
23
23
24 class batcher(object):
24 class batcher(object):
25 '''base class for batches of commands submittable in a single request
25 '''base class for batches of commands submittable in a single request
26
26
27 All methods invoked on instances of this class are simply queued and
27 All methods invoked on instances of this class are simply queued and
28 return a a future for the result. Once you call submit(), all the queued
28 return a a future for the result. Once you call submit(), all the queued
29 calls are performed and the results set in their respective futures.
29 calls are performed and the results set in their respective futures.
30 '''
30 '''
31 def __init__(self):
31 def __init__(self):
32 self.calls = []
32 self.calls = []
33 def __getattr__(self, name):
33 def __getattr__(self, name):
34 def call(*args, **opts):
34 def call(*args, **opts):
35 resref = future()
35 resref = future()
36 self.calls.append((name, args, opts, resref,))
36 self.calls.append((name, args, opts, resref,))
37 return resref
37 return resref
38 return call
38 return call
39 def submit(self):
39 def submit(self):
40 pass
40 pass
41
41
42 class localbatch(batcher):
42 class localbatch(batcher):
43 '''performs the queued calls directly'''
43 '''performs the queued calls directly'''
44 def __init__(self, local):
44 def __init__(self, local):
45 batcher.__init__(self)
45 batcher.__init__(self)
46 self.local = local
46 self.local = local
47 def submit(self):
47 def submit(self):
48 for name, args, opts, resref in self.calls:
48 for name, args, opts, resref in self.calls:
49 resref.set(getattr(self.local, name)(*args, **opts))
49 resref.set(getattr(self.local, name)(*args, **opts))
50
50
51 class remotebatch(batcher):
51 class remotebatch(batcher):
52 '''batches the queued calls; uses as few roundtrips as possible'''
52 '''batches the queued calls; uses as few roundtrips as possible'''
53 def __init__(self, remote):
53 def __init__(self, remote):
54 '''remote must support _submitbatch(encbatch) and
54 '''remote must support _submitbatch(encbatch) and
55 _submitone(op, encargs)'''
55 _submitone(op, encargs)'''
56 batcher.__init__(self)
56 batcher.__init__(self)
57 self.remote = remote
57 self.remote = remote
58 def submit(self):
58 def submit(self):
59 req, rsp = [], []
59 req, rsp = [], []
60 for name, args, opts, resref in self.calls:
60 for name, args, opts, resref in self.calls:
61 mtd = getattr(self.remote, name)
61 mtd = getattr(self.remote, name)
62 batchablefn = getattr(mtd, 'batchable', None)
62 batchablefn = getattr(mtd, 'batchable', None)
63 if batchablefn is not None:
63 if batchablefn is not None:
64 batchable = batchablefn(mtd.im_self, *args, **opts)
64 batchable = batchablefn(mtd.im_self, *args, **opts)
65 encargsorres, encresref = batchable.next()
65 encargsorres, encresref = batchable.next()
66 if encresref:
66 if encresref:
67 req.append((name, encargsorres,))
67 req.append((name, encargsorres,))
68 rsp.append((batchable, encresref, resref,))
68 rsp.append((batchable, encresref, resref,))
69 else:
69 else:
70 resref.set(encargsorres)
70 resref.set(encargsorres)
71 else:
71 else:
72 if req:
72 if req:
73 self._submitreq(req, rsp)
73 self._submitreq(req, rsp)
74 req, rsp = [], []
74 req, rsp = [], []
75 resref.set(mtd(*args, **opts))
75 resref.set(mtd(*args, **opts))
76 if req:
76 if req:
77 self._submitreq(req, rsp)
77 self._submitreq(req, rsp)
78 def _submitreq(self, req, rsp):
78 def _submitreq(self, req, rsp):
79 encresults = self.remote._submitbatch(req)
79 encresults = self.remote._submitbatch(req)
80 for encres, r in zip(encresults, rsp):
80 for encres, r in zip(encresults, rsp):
81 batchable, encresref, resref = r
81 batchable, encresref, resref = r
82 encresref.set(encres)
82 encresref.set(encres)
83 resref.set(batchable.next())
83 resref.set(batchable.next())
84
84
85 def batchable(f):
85 def batchable(f):
86 '''annotation for batchable methods
86 '''annotation for batchable methods
87
87
88 Such methods must implement a coroutine as follows:
88 Such methods must implement a coroutine as follows:
89
89
90 @batchable
90 @batchable
91 def sample(self, one, two=None):
91 def sample(self, one, two=None):
92 # Handle locally computable results first:
92 # Handle locally computable results first:
93 if not one:
93 if not one:
94 yield "a local result", None
94 yield "a local result", None
95 # Build list of encoded arguments suitable for your wire protocol:
95 # Build list of encoded arguments suitable for your wire protocol:
96 encargs = [('one', encode(one),), ('two', encode(two),)]
96 encargs = [('one', encode(one),), ('two', encode(two),)]
97 # Create future for injection of encoded result:
97 # Create future for injection of encoded result:
98 encresref = future()
98 encresref = future()
99 # Return encoded arguments and future:
99 # Return encoded arguments and future:
100 yield encargs, encresref
100 yield encargs, encresref
101 # Assuming the future to be filled with the result from the batched
101 # Assuming the future to be filled with the result from the batched
102 # request now. Decode it:
102 # request now. Decode it:
103 yield decode(encresref.value)
103 yield decode(encresref.value)
104
104
105 The decorator returns a function which wraps this coroutine as a plain
105 The decorator returns a function which wraps this coroutine as a plain
106 method, but adds the original method as an attribute called "batchable",
106 method, but adds the original method as an attribute called "batchable",
107 which is used by remotebatch to split the call into separate encoding and
107 which is used by remotebatch to split the call into separate encoding and
108 decoding phases.
108 decoding phases.
109 '''
109 '''
110 def plain(*args, **opts):
110 def plain(*args, **opts):
111 batchable = f(*args, **opts)
111 batchable = f(*args, **opts)
112 encargsorres, encresref = batchable.next()
112 encargsorres, encresref = batchable.next()
113 if not encresref:
113 if not encresref:
114 return encargsorres # a local result in this case
114 return encargsorres # a local result in this case
115 self = args[0]
115 self = args[0]
116 encresref.set(self._submitone(f.func_name, encargsorres))
116 encresref.set(self._submitone(f.func_name, encargsorres))
117 return batchable.next()
117 return batchable.next()
118 setattr(plain, 'batchable', f)
118 setattr(plain, 'batchable', f)
119 return plain
119 return plain
120
120
121 # list of nodes encoding / decoding
121 # list of nodes encoding / decoding
122
122
123 def decodelist(l, sep=' '):
123 def decodelist(l, sep=' '):
124 if l:
124 if l:
125 return map(bin, l.split(sep))
125 return map(bin, l.split(sep))
126 return []
126 return []
127
127
128 def encodelist(l, sep=' '):
128 def encodelist(l, sep=' '):
129 return sep.join(map(hex, l))
129 return sep.join(map(hex, l))
130
130
131 # batched call argument encoding
131 # batched call argument encoding
132
132
133 def escapearg(plain):
133 def escapearg(plain):
134 return (plain
134 return (plain
135 .replace(':', '::')
135 .replace(':', '::')
136 .replace(',', ':,')
136 .replace(',', ':,')
137 .replace(';', ':;')
137 .replace(';', ':;')
138 .replace('=', ':='))
138 .replace('=', ':='))
139
139
140 def unescapearg(escaped):
140 def unescapearg(escaped):
141 return (escaped
141 return (escaped
142 .replace(':=', '=')
142 .replace(':=', '=')
143 .replace(':;', ';')
143 .replace(':;', ';')
144 .replace(':,', ',')
144 .replace(':,', ',')
145 .replace('::', ':'))
145 .replace('::', ':'))
146
146
147 # client side
147 # client side
148
148
149 def todict(**args):
149 def todict(**args):
150 return args
150 return args
151
151
152 class wirepeer(peer.peerrepository):
152 class wirepeer(peer.peerrepository):
153
153
154 def batch(self):
154 def batch(self):
155 return remotebatch(self)
155 return remotebatch(self)
156 def _submitbatch(self, req):
156 def _submitbatch(self, req):
157 cmds = []
157 cmds = []
158 for op, argsdict in req:
158 for op, argsdict in req:
159 args = ','.join('%s=%s' % p for p in argsdict.iteritems())
159 args = ','.join('%s=%s' % p for p in argsdict.iteritems())
160 cmds.append('%s %s' % (op, args))
160 cmds.append('%s %s' % (op, args))
161 rsp = self._call("batch", cmds=';'.join(cmds))
161 rsp = self._call("batch", cmds=';'.join(cmds))
162 return rsp.split(';')
162 return rsp.split(';')
163 def _submitone(self, op, args):
163 def _submitone(self, op, args):
164 return self._call(op, **args)
164 return self._call(op, **args)
165
165
166 @batchable
166 @batchable
167 def lookup(self, key):
167 def lookup(self, key):
168 self.requirecap('lookup', _('look up remote revision'))
168 self.requirecap('lookup', _('look up remote revision'))
169 f = future()
169 f = future()
170 yield todict(key=encoding.fromlocal(key)), f
170 yield todict(key=encoding.fromlocal(key)), f
171 d = f.value
171 d = f.value
172 success, data = d[:-1].split(" ", 1)
172 success, data = d[:-1].split(" ", 1)
173 if int(success):
173 if int(success):
174 yield bin(data)
174 yield bin(data)
175 self._abort(error.RepoError(data))
175 self._abort(error.RepoError(data))
176
176
177 @batchable
177 @batchable
178 def heads(self):
178 def heads(self):
179 f = future()
179 f = future()
180 yield {}, f
180 yield {}, f
181 d = f.value
181 d = f.value
182 try:
182 try:
183 yield decodelist(d[:-1])
183 yield decodelist(d[:-1])
184 except ValueError:
184 except ValueError:
185 self._abort(error.ResponseError(_("unexpected response:"), d))
185 self._abort(error.ResponseError(_("unexpected response:"), d))
186
186
187 @batchable
187 @batchable
188 def known(self, nodes):
188 def known(self, nodes):
189 f = future()
189 f = future()
190 yield todict(nodes=encodelist(nodes)), f
190 yield todict(nodes=encodelist(nodes)), f
191 d = f.value
191 d = f.value
192 try:
192 try:
193 yield [bool(int(f)) for f in d]
193 yield [bool(int(f)) for f in d]
194 except ValueError:
194 except ValueError:
195 self._abort(error.ResponseError(_("unexpected response:"), d))
195 self._abort(error.ResponseError(_("unexpected response:"), d))
196
196
197 @batchable
197 @batchable
198 def branchmap(self):
198 def branchmap(self):
199 f = future()
199 f = future()
200 yield {}, f
200 yield {}, f
201 d = f.value
201 d = f.value
202 try:
202 try:
203 branchmap = {}
203 branchmap = {}
204 for branchpart in d.splitlines():
204 for branchpart in d.splitlines():
205 branchname, branchheads = branchpart.split(' ', 1)
205 branchname, branchheads = branchpart.split(' ', 1)
206 branchname = encoding.tolocal(urllib.unquote(branchname))
206 branchname = encoding.tolocal(urllib.unquote(branchname))
207 branchheads = decodelist(branchheads)
207 branchheads = decodelist(branchheads)
208 branchmap[branchname] = branchheads
208 branchmap[branchname] = branchheads
209 yield branchmap
209 yield branchmap
210 except TypeError:
210 except TypeError:
211 self._abort(error.ResponseError(_("unexpected response:"), d))
211 self._abort(error.ResponseError(_("unexpected response:"), d))
212
212
213 def branches(self, nodes):
213 def branches(self, nodes):
214 n = encodelist(nodes)
214 n = encodelist(nodes)
215 d = self._call("branches", nodes=n)
215 d = self._call("branches", nodes=n)
216 try:
216 try:
217 br = [tuple(decodelist(b)) for b in d.splitlines()]
217 br = [tuple(decodelist(b)) for b in d.splitlines()]
218 return br
218 return br
219 except ValueError:
219 except ValueError:
220 self._abort(error.ResponseError(_("unexpected response:"), d))
220 self._abort(error.ResponseError(_("unexpected response:"), d))
221
221
222 def between(self, pairs):
222 def between(self, pairs):
223 batch = 8 # avoid giant requests
223 batch = 8 # avoid giant requests
224 r = []
224 r = []
225 for i in xrange(0, len(pairs), batch):
225 for i in xrange(0, len(pairs), batch):
226 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
226 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
227 d = self._call("between", pairs=n)
227 d = self._call("between", pairs=n)
228 try:
228 try:
229 r.extend(l and decodelist(l) or [] for l in d.splitlines())
229 r.extend(l and decodelist(l) or [] for l in d.splitlines())
230 except ValueError:
230 except ValueError:
231 self._abort(error.ResponseError(_("unexpected response:"), d))
231 self._abort(error.ResponseError(_("unexpected response:"), d))
232 return r
232 return r
233
233
234 @batchable
234 @batchable
235 def pushkey(self, namespace, key, old, new):
235 def pushkey(self, namespace, key, old, new):
236 if not self.capable('pushkey'):
236 if not self.capable('pushkey'):
237 yield False, None
237 yield False, None
238 f = future()
238 f = future()
239 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
239 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
240 yield todict(namespace=encoding.fromlocal(namespace),
240 yield todict(namespace=encoding.fromlocal(namespace),
241 key=encoding.fromlocal(key),
241 key=encoding.fromlocal(key),
242 old=encoding.fromlocal(old),
242 old=encoding.fromlocal(old),
243 new=encoding.fromlocal(new)), f
243 new=encoding.fromlocal(new)), f
244 d = f.value
244 d = f.value
245 d, output = d.split('\n', 1)
245 d, output = d.split('\n', 1)
246 try:
246 try:
247 d = bool(int(d))
247 d = bool(int(d))
248 except ValueError:
248 except ValueError:
249 raise error.ResponseError(
249 raise error.ResponseError(
250 _('push failed (unexpected response):'), d)
250 _('push failed (unexpected response):'), d)
251 for l in output.splitlines(True):
251 for l in output.splitlines(True):
252 self.ui.status(_('remote: '), l)
252 self.ui.status(_('remote: '), l)
253 yield d
253 yield d
254
254
255 @batchable
255 @batchable
256 def listkeys(self, namespace):
256 def listkeys(self, namespace):
257 if not self.capable('pushkey'):
257 if not self.capable('pushkey'):
258 yield {}, None
258 yield {}, None
259 f = future()
259 f = future()
260 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
260 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
261 yield todict(namespace=encoding.fromlocal(namespace)), f
261 yield todict(namespace=encoding.fromlocal(namespace)), f
262 d = f.value
262 d = f.value
263 r = {}
263 r = {}
264 for l in d.splitlines():
264 for l in d.splitlines():
265 k, v = l.split('\t')
265 k, v = l.split('\t')
266 r[encoding.tolocal(k)] = encoding.tolocal(v)
266 r[encoding.tolocal(k)] = encoding.tolocal(v)
267 yield r
267 yield r
268
268
269 def stream_out(self):
269 def stream_out(self):
270 return self._callstream('stream_out')
270 return self._callstream('stream_out')
271
271
272 def changegroup(self, nodes, kind):
272 def changegroup(self, nodes, kind):
273 n = encodelist(nodes)
273 n = encodelist(nodes)
274 f = self._callstream("changegroup", roots=n)
274 f = self._callstream("changegroup", roots=n)
275 return changegroupmod.unbundle10(self._decompress(f), 'UN')
275 return changegroupmod.unbundle10(self._decompress(f), 'UN')
276
276
277 def changegroupsubset(self, bases, heads, kind):
277 def changegroupsubset(self, bases, heads, kind):
278 self.requirecap('changegroupsubset', _('look up remote changes'))
278 self.requirecap('changegroupsubset', _('look up remote changes'))
279 bases = encodelist(bases)
279 bases = encodelist(bases)
280 heads = encodelist(heads)
280 heads = encodelist(heads)
281 f = self._callstream("changegroupsubset",
281 f = self._callstream("changegroupsubset",
282 bases=bases, heads=heads)
282 bases=bases, heads=heads)
283 return changegroupmod.unbundle10(self._decompress(f), 'UN')
283 return changegroupmod.unbundle10(self._decompress(f), 'UN')
284
284
285 def getbundle(self, source, heads=None, common=None):
285 def getbundle(self, source, heads=None, common=None):
286 self.requirecap('getbundle', _('look up remote changes'))
286 self.requirecap('getbundle', _('look up remote changes'))
287 opts = {}
287 opts = {}
288 if heads is not None:
288 if heads is not None:
289 opts['heads'] = encodelist(heads)
289 opts['heads'] = encodelist(heads)
290 if common is not None:
290 if common is not None:
291 opts['common'] = encodelist(common)
291 opts['common'] = encodelist(common)
292 f = self._callstream("getbundle", **opts)
292 f = self._callstream("getbundle", **opts)
293 return changegroupmod.unbundle10(self._decompress(f), 'UN')
293 return changegroupmod.unbundle10(self._decompress(f), 'UN')
294
294
295 def unbundle(self, cg, heads, source):
295 def unbundle(self, cg, heads, source):
296 '''Send cg (a readable file-like object representing the
296 '''Send cg (a readable file-like object representing the
297 changegroup to push, typically a chunkbuffer object) to the
297 changegroup to push, typically a chunkbuffer object) to the
298 remote server as a bundle. Return an integer indicating the
298 remote server as a bundle. Return an integer indicating the
299 result of the push (see localrepository.addchangegroup()).'''
299 result of the push (see localrepository.addchangegroup()).'''
300
300
301 if heads != ['force'] and self.capable('unbundlehash'):
301 if heads != ['force'] and self.capable('unbundlehash'):
302 heads = encodelist(['hashed',
302 heads = encodelist(['hashed',
303 util.sha1(''.join(sorted(heads))).digest()])
303 util.sha1(''.join(sorted(heads))).digest()])
304 else:
304 else:
305 heads = encodelist(heads)
305 heads = encodelist(heads)
306
306
307 ret, output = self._callpush("unbundle", cg, heads=heads)
307 ret, output = self._callpush("unbundle", cg, heads=heads)
308 if ret == "":
308 if ret == "":
309 raise error.ResponseError(
309 raise error.ResponseError(
310 _('push failed:'), output)
310 _('push failed:'), output)
311 try:
311 try:
312 ret = int(ret)
312 ret = int(ret)
313 except ValueError:
313 except ValueError:
314 raise error.ResponseError(
314 raise error.ResponseError(
315 _('push failed (unexpected response):'), ret)
315 _('push failed (unexpected response):'), ret)
316
316
317 for l in output.splitlines(True):
317 for l in output.splitlines(True):
318 self.ui.status(_('remote: '), l)
318 self.ui.status(_('remote: '), l)
319 return ret
319 return ret
320
320
321 def debugwireargs(self, one, two, three=None, four=None, five=None):
321 def debugwireargs(self, one, two, three=None, four=None, five=None):
322 # don't pass optional arguments left at their default value
322 # don't pass optional arguments left at their default value
323 opts = {}
323 opts = {}
324 if three is not None:
324 if three is not None:
325 opts['three'] = three
325 opts['three'] = three
326 if four is not None:
326 if four is not None:
327 opts['four'] = four
327 opts['four'] = four
328 return self._call('debugwireargs', one=one, two=two, **opts)
328 return self._call('debugwireargs', one=one, two=two, **opts)
329
329
330 # server side
330 # server side
331
331
332 class streamres(object):
332 class streamres(object):
333 def __init__(self, gen):
333 def __init__(self, gen):
334 self.gen = gen
334 self.gen = gen
335
335
336 class pushres(object):
336 class pushres(object):
337 def __init__(self, res):
337 def __init__(self, res):
338 self.res = res
338 self.res = res
339
339
340 class pusherr(object):
340 class pusherr(object):
341 def __init__(self, res):
341 def __init__(self, res):
342 self.res = res
342 self.res = res
343
343
344 class ooberror(object):
344 class ooberror(object):
345 def __init__(self, message):
345 def __init__(self, message):
346 self.message = message
346 self.message = message
347
347
348 def dispatch(repo, proto, command):
348 def dispatch(repo, proto, command):
349 func, spec = commands[command]
349 func, spec = commands[command]
350 args = proto.getargs(spec)
350 args = proto.getargs(spec)
351 return func(repo, proto, *args)
351 return func(repo, proto, *args)
352
352
353 def options(cmd, keys, others):
353 def options(cmd, keys, others):
354 opts = {}
354 opts = {}
355 for k in keys:
355 for k in keys:
356 if k in others:
356 if k in others:
357 opts[k] = others[k]
357 opts[k] = others[k]
358 del others[k]
358 del others[k]
359 if others:
359 if others:
360 sys.stderr.write("abort: %s got unexpected arguments %s\n"
360 sys.stderr.write("abort: %s got unexpected arguments %s\n"
361 % (cmd, ",".join(others)))
361 % (cmd, ",".join(others)))
362 return opts
362 return opts
363
363
364 def batch(repo, proto, cmds, others):
364 def batch(repo, proto, cmds, others):
365 res = []
365 res = []
366 for pair in cmds.split(';'):
366 for pair in cmds.split(';'):
367 op, args = pair.split(' ', 1)
367 op, args = pair.split(' ', 1)
368 vals = {}
368 vals = {}
369 for a in args.split(','):
369 for a in args.split(','):
370 if a:
370 if a:
371 n, v = a.split('=')
371 n, v = a.split('=')
372 vals[n] = unescapearg(v)
372 vals[n] = unescapearg(v)
373 func, spec = commands[op]
373 func, spec = commands[op]
374 if spec:
374 if spec:
375 keys = spec.split()
375 keys = spec.split()
376 data = {}
376 data = {}
377 for k in keys:
377 for k in keys:
378 if k == '*':
378 if k == '*':
379 star = {}
379 star = {}
380 for key in vals.keys():
380 for key in vals.keys():
381 if key not in keys:
381 if key not in keys:
382 star[key] = vals[key]
382 star[key] = vals[key]
383 data['*'] = star
383 data['*'] = star
384 else:
384 else:
385 data[k] = vals[k]
385 data[k] = vals[k]
386 result = func(repo, proto, *[data[k] for k in keys])
386 result = func(repo, proto, *[data[k] for k in keys])
387 else:
387 else:
388 result = func(repo, proto)
388 result = func(repo, proto)
389 if isinstance(result, ooberror):
389 if isinstance(result, ooberror):
390 return result
390 return result
391 res.append(escapearg(result))
391 res.append(escapearg(result))
392 return ';'.join(res)
392 return ';'.join(res)
393
393
394 def between(repo, proto, pairs):
394 def between(repo, proto, pairs):
395 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
395 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
396 r = []
396 r = []
397 for b in repo.between(pairs):
397 for b in repo.between(pairs):
398 r.append(encodelist(b) + "\n")
398 r.append(encodelist(b) + "\n")
399 return "".join(r)
399 return "".join(r)
400
400
401 def branchmap(repo, proto):
401 def branchmap(repo, proto):
402 branchmap = discovery.visiblebranchmap(repo)
402 branchmap = discovery.visiblebranchmap(repo)
403 heads = []
403 heads = []
404 for branch, nodes in branchmap.iteritems():
404 for branch, nodes in branchmap.iteritems():
405 branchname = urllib.quote(encoding.fromlocal(branch))
405 branchname = urllib.quote(encoding.fromlocal(branch))
406 branchnodes = encodelist(nodes)
406 branchnodes = encodelist(nodes)
407 heads.append('%s %s' % (branchname, branchnodes))
407 heads.append('%s %s' % (branchname, branchnodes))
408 return '\n'.join(heads)
408 return '\n'.join(heads)
409
409
410 def branches(repo, proto, nodes):
410 def branches(repo, proto, nodes):
411 nodes = decodelist(nodes)
411 nodes = decodelist(nodes)
412 r = []
412 r = []
413 for b in repo.branches(nodes):
413 for b in repo.branches(nodes):
414 r.append(encodelist(b) + "\n")
414 r.append(encodelist(b) + "\n")
415 return "".join(r)
415 return "".join(r)
416
416
417 def capabilities(repo, proto):
417 def capabilities(repo, proto):
418 caps = ('lookup changegroupsubset branchmap pushkey known getbundle '
418 caps = ('lookup changegroupsubset branchmap pushkey known getbundle '
419 'unbundlehash batch').split()
419 'unbundlehash batch').split()
420 if _allowstream(repo.ui):
420 if _allowstream(repo.ui):
421 if repo.ui.configbool('server', 'preferuncompressed', False):
421 if repo.ui.configbool('server', 'preferuncompressed', False):
422 caps.append('stream-preferred')
422 caps.append('stream-preferred')
423 requiredformats = repo.requirements & repo.supportedformats
423 requiredformats = repo.requirements & repo.supportedformats
424 # if our local revlogs are just revlogv1, add 'stream' cap
424 # if our local revlogs are just revlogv1, add 'stream' cap
425 if not requiredformats - set(('revlogv1',)):
425 if not requiredformats - set(('revlogv1',)):
426 caps.append('stream')
426 caps.append('stream')
427 # otherwise, add 'streamreqs' detailing our local revlog format
427 # otherwise, add 'streamreqs' detailing our local revlog format
428 else:
428 else:
429 caps.append('streamreqs=%s' % ','.join(requiredformats))
429 caps.append('streamreqs=%s' % ','.join(requiredformats))
430 caps.append('unbundle=%s' % ','.join(changegroupmod.bundlepriority))
430 caps.append('unbundle=%s' % ','.join(changegroupmod.bundlepriority))
431 caps.append('httpheader=1024')
431 caps.append('httpheader=1024')
432 return ' '.join(caps)
432 return ' '.join(caps)
433
433
434 def changegroup(repo, proto, roots):
434 def changegroup(repo, proto, roots):
435 nodes = decodelist(roots)
435 nodes = decodelist(roots)
436 cg = repo.changegroup(nodes, 'serve')
436 cg = repo.changegroup(nodes, 'serve')
437 return streamres(proto.groupchunks(cg))
437 return streamres(proto.groupchunks(cg))
438
438
439 def changegroupsubset(repo, proto, bases, heads):
439 def changegroupsubset(repo, proto, bases, heads):
440 bases = decodelist(bases)
440 bases = decodelist(bases)
441 heads = decodelist(heads)
441 heads = decodelist(heads)
442 cg = repo.changegroupsubset(bases, heads, 'serve')
442 cg = repo.changegroupsubset(bases, heads, 'serve')
443 return streamres(proto.groupchunks(cg))
443 return streamres(proto.groupchunks(cg))
444
444
445 def debugwireargs(repo, proto, one, two, others):
445 def debugwireargs(repo, proto, one, two, others):
446 # only accept optional args from the known set
446 # only accept optional args from the known set
447 opts = options('debugwireargs', ['three', 'four'], others)
447 opts = options('debugwireargs', ['three', 'four'], others)
448 return repo.debugwireargs(one, two, **opts)
448 return repo.debugwireargs(one, two, **opts)
449
449
450 def getbundle(repo, proto, others):
450 def getbundle(repo, proto, others):
451 opts = options('getbundle', ['heads', 'common'], others)
451 opts = options('getbundle', ['heads', 'common'], others)
452 for k, v in opts.iteritems():
452 for k, v in opts.iteritems():
453 opts[k] = decodelist(v)
453 opts[k] = decodelist(v)
454 cg = repo.getbundle('serve', **opts)
454 cg = repo.getbundle('serve', **opts)
455 return streamres(proto.groupchunks(cg))
455 return streamres(proto.groupchunks(cg))
456
456
457 def heads(repo, proto):
457 def heads(repo, proto):
458 h = discovery.visibleheads(repo)
458 h = discovery.visibleheads(repo)
459 return encodelist(h) + "\n"
459 return encodelist(h) + "\n"
460
460
461 def hello(repo, proto):
461 def hello(repo, proto):
462 '''the hello command returns a set of lines describing various
462 '''the hello command returns a set of lines describing various
463 interesting things about the server, in an RFC822-like format.
463 interesting things about the server, in an RFC822-like format.
464 Currently the only one defined is "capabilities", which
464 Currently the only one defined is "capabilities", which
465 consists of a line in the form:
465 consists of a line in the form:
466
466
467 capabilities: space separated list of tokens
467 capabilities: space separated list of tokens
468 '''
468 '''
469 return "capabilities: %s\n" % (capabilities(repo, proto))
469 return "capabilities: %s\n" % (capabilities(repo, proto))
470
470
471 def listkeys(repo, proto, namespace):
471 def listkeys(repo, proto, namespace):
472 d = repo.listkeys(encoding.tolocal(namespace)).items()
472 d = repo.listkeys(encoding.tolocal(namespace)).items()
473 t = '\n'.join(['%s\t%s' % (encoding.fromlocal(k), encoding.fromlocal(v))
473 t = '\n'.join(['%s\t%s' % (encoding.fromlocal(k), encoding.fromlocal(v))
474 for k, v in d])
474 for k, v in d])
475 return t
475 return t
476
476
477 def lookup(repo, proto, key):
477 def lookup(repo, proto, key):
478 try:
478 try:
479 k = encoding.tolocal(key)
479 k = encoding.tolocal(key)
480 c = repo[k]
480 c = repo[k]
481 if c.phase() == phases.secret:
481 if c.phase() == phases.secret:
482 raise error.RepoLookupError(_("unknown revision '%s'") % k)
482 raise error.RepoLookupError(_("unknown revision '%s'") % k)
483 r = c.hex()
483 r = c.hex()
484 success = 1
484 success = 1
485 except Exception, inst:
485 except Exception, inst:
486 r = str(inst)
486 r = str(inst)
487 success = 0
487 success = 0
488 return "%s %s\n" % (success, r)
488 return "%s %s\n" % (success, r)
489
489
490 def known(repo, proto, nodes, others):
490 def known(repo, proto, nodes, others):
491 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
491 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
492
492
493 def pushkey(repo, proto, namespace, key, old, new):
493 def pushkey(repo, proto, namespace, key, old, new):
494 # compatibility with pre-1.8 clients which were accidentally
494 # compatibility with pre-1.8 clients which were accidentally
495 # sending raw binary nodes rather than utf-8-encoded hex
495 # sending raw binary nodes rather than utf-8-encoded hex
496 if len(new) == 20 and new.encode('string-escape') != new:
496 if len(new) == 20 and new.encode('string-escape') != new:
497 # looks like it could be a binary node
497 # looks like it could be a binary node
498 try:
498 try:
499 new.decode('utf-8')
499 new.decode('utf-8')
500 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
500 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
501 except UnicodeDecodeError:
501 except UnicodeDecodeError:
502 pass # binary, leave unmodified
502 pass # binary, leave unmodified
503 else:
503 else:
504 new = encoding.tolocal(new) # normal path
504 new = encoding.tolocal(new) # normal path
505
505
506 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
506 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
507 encoding.tolocal(old), new)
507 encoding.tolocal(old), new)
508 return '%s\n' % int(r)
508 return '%s\n' % int(r)
509
509
510 def _allowstream(ui):
510 def _allowstream(ui):
511 return ui.configbool('server', 'uncompressed', True, untrusted=True)
511 return ui.configbool('server', 'uncompressed', True, untrusted=True)
512
512
513 def stream(repo, proto):
513 def stream(repo, proto):
514 '''If the server supports streaming clone, it advertises the "stream"
514 '''If the server supports streaming clone, it advertises the "stream"
515 capability with a value representing the version and flags of the repo
515 capability with a value representing the version and flags of the repo
516 it is serving. Client checks to see if it understands the format.
516 it is serving. Client checks to see if it understands the format.
517
517
518 The format is simple: the server writes out a line with the amount
518 The format is simple: the server writes out a line with the amount
519 of files, then the total amount of bytes to be transferred (separated
519 of files, then the total amount of bytes to be transferred (separated
520 by a space). Then, for each file, the server first writes the filename
520 by a space). Then, for each file, the server first writes the filename
521 and filesize (separated by the null character), then the file contents.
521 and filesize (separated by the null character), then the file contents.
522 '''
522 '''
523
523
524 if not _allowstream(repo.ui):
524 if not _allowstream(repo.ui):
525 return '1\n'
525 return '1\n'
526
526
527 entries = []
527 entries = []
528 total_bytes = 0
528 total_bytes = 0
529 try:
529 try:
530 # get consistent snapshot of repo, lock during scan
530 # get consistent snapshot of repo, lock during scan
531 lock = repo.lock()
531 lock = repo.lock()
532 try:
532 try:
533 repo.ui.debug('scanning\n')
533 repo.ui.debug('scanning\n')
534 for name, ename, size in repo.store.walk():
534 for name, ename, size in repo.store.walk():
535 entries.append((name, size))
535 entries.append((name, size))
536 total_bytes += size
536 total_bytes += size
537 finally:
537 finally:
538 lock.release()
538 lock.release()
539 except error.LockError:
539 except error.LockError:
540 return '2\n' # error: 2
540 return '2\n' # error: 2
541
541
542 def streamer(repo, entries, total):
542 def streamer(repo, entries, total):
543 '''stream out all metadata files in repository.'''
543 '''stream out all metadata files in repository.'''
544 yield '0\n' # success
544 yield '0\n' # success
545 repo.ui.debug('%d files, %d bytes to transfer\n' %
545 repo.ui.debug('%d files, %d bytes to transfer\n' %
546 (len(entries), total_bytes))
546 (len(entries), total_bytes))
547 yield '%d %d\n' % (len(entries), total_bytes)
547 yield '%d %d\n' % (len(entries), total_bytes)
548 for name, size in entries:
548
549 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
549 sopener = repo.sopener
550 # partially encode name over the wire for backwards compat
550 oldaudit = sopener.mustaudit
551 yield '%s\0%d\n' % (store.encodedir(name), size)
551 sopener.mustaudit = False
552 for chunk in util.filechunkiter(repo.sopener(name), limit=size):
552
553 yield chunk
553 try:
554 for name, size in entries:
555 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
556 # partially encode name over the wire for backwards compat
557 yield '%s\0%d\n' % (store.encodedir(name), size)
558 for chunk in util.filechunkiter(sopener(name), limit=size):
559 yield chunk
560 finally:
561 sopener.mustaudit = oldaudit
554
562
555 return streamres(streamer(repo, entries, total_bytes))
563 return streamres(streamer(repo, entries, total_bytes))
556
564
557 def unbundle(repo, proto, heads):
565 def unbundle(repo, proto, heads):
558 their_heads = decodelist(heads)
566 their_heads = decodelist(heads)
559
567
560 def check_heads():
568 def check_heads():
561 heads = discovery.visibleheads(repo)
569 heads = discovery.visibleheads(repo)
562 heads_hash = util.sha1(''.join(sorted(heads))).digest()
570 heads_hash = util.sha1(''.join(sorted(heads))).digest()
563 return (their_heads == ['force'] or their_heads == heads or
571 return (their_heads == ['force'] or their_heads == heads or
564 their_heads == ['hashed', heads_hash])
572 their_heads == ['hashed', heads_hash])
565
573
566 proto.redirect()
574 proto.redirect()
567
575
568 # fail early if possible
576 # fail early if possible
569 if not check_heads():
577 if not check_heads():
570 return pusherr('unsynced changes')
578 return pusherr('unsynced changes')
571
579
572 # write bundle data to temporary file because it can be big
580 # write bundle data to temporary file because it can be big
573 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
581 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
574 fp = os.fdopen(fd, 'wb+')
582 fp = os.fdopen(fd, 'wb+')
575 r = 0
583 r = 0
576 try:
584 try:
577 proto.getfile(fp)
585 proto.getfile(fp)
578 lock = repo.lock()
586 lock = repo.lock()
579 try:
587 try:
580 if not check_heads():
588 if not check_heads():
581 # someone else committed/pushed/unbundled while we
589 # someone else committed/pushed/unbundled while we
582 # were transferring data
590 # were transferring data
583 return pusherr('unsynced changes')
591 return pusherr('unsynced changes')
584
592
585 # push can proceed
593 # push can proceed
586 fp.seek(0)
594 fp.seek(0)
587 gen = changegroupmod.readbundle(fp, None)
595 gen = changegroupmod.readbundle(fp, None)
588
596
589 try:
597 try:
590 r = repo.addchangegroup(gen, 'serve', proto._client())
598 r = repo.addchangegroup(gen, 'serve', proto._client())
591 except util.Abort, inst:
599 except util.Abort, inst:
592 sys.stderr.write("abort: %s\n" % inst)
600 sys.stderr.write("abort: %s\n" % inst)
593 finally:
601 finally:
594 lock.release()
602 lock.release()
595 return pushres(r)
603 return pushres(r)
596
604
597 finally:
605 finally:
598 fp.close()
606 fp.close()
599 os.unlink(tempname)
607 os.unlink(tempname)
600
608
601 commands = {
609 commands = {
602 'batch': (batch, 'cmds *'),
610 'batch': (batch, 'cmds *'),
603 'between': (between, 'pairs'),
611 'between': (between, 'pairs'),
604 'branchmap': (branchmap, ''),
612 'branchmap': (branchmap, ''),
605 'branches': (branches, 'nodes'),
613 'branches': (branches, 'nodes'),
606 'capabilities': (capabilities, ''),
614 'capabilities': (capabilities, ''),
607 'changegroup': (changegroup, 'roots'),
615 'changegroup': (changegroup, 'roots'),
608 'changegroupsubset': (changegroupsubset, 'bases heads'),
616 'changegroupsubset': (changegroupsubset, 'bases heads'),
609 'debugwireargs': (debugwireargs, 'one two *'),
617 'debugwireargs': (debugwireargs, 'one two *'),
610 'getbundle': (getbundle, '*'),
618 'getbundle': (getbundle, '*'),
611 'heads': (heads, ''),
619 'heads': (heads, ''),
612 'hello': (hello, ''),
620 'hello': (hello, ''),
613 'known': (known, 'nodes *'),
621 'known': (known, 'nodes *'),
614 'listkeys': (listkeys, 'namespace'),
622 'listkeys': (listkeys, 'namespace'),
615 'lookup': (lookup, 'key'),
623 'lookup': (lookup, 'key'),
616 'pushkey': (pushkey, 'namespace key old new'),
624 'pushkey': (pushkey, 'namespace key old new'),
617 'stream_out': (stream, ''),
625 'stream_out': (stream, ''),
618 'unbundle': (unbundle, 'heads'),
626 'unbundle': (unbundle, 'heads'),
619 }
627 }
General Comments 0
You need to be logged in to leave comments. Login now