##// END OF EJS Templates
clfilter: drop extra filtering in wireprotocol...
Pierre-Yves David -
r18281:898c5758 default
parent child Browse files
Show More
@@ -1,656 +1,653 b''
1 # wireproto.py - generic wire protocol support functions
1 # wireproto.py - generic wire protocol support functions
2 #
2 #
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 import urllib, tempfile, os, sys
8 import urllib, tempfile, os, sys
9 from i18n import _
9 from i18n import _
10 from node import bin, hex
10 from node import bin, hex
11 import changegroup as changegroupmod
11 import changegroup as changegroupmod
12 import peer, error, encoding, util, store
12 import peer, error, encoding, util, store
13 import discovery, phases
14
13
15 # abstract batching support
14 # abstract batching support
16
15
17 class future(object):
16 class future(object):
18 '''placeholder for a value to be set later'''
17 '''placeholder for a value to be set later'''
19 def set(self, value):
18 def set(self, value):
20 if util.safehasattr(self, 'value'):
19 if util.safehasattr(self, 'value'):
21 raise error.RepoError("future is already set")
20 raise error.RepoError("future is already set")
22 self.value = value
21 self.value = value
23
22
24 class batcher(object):
23 class batcher(object):
25 '''base class for batches of commands submittable in a single request
24 '''base class for batches of commands submittable in a single request
26
25
27 All methods invoked on instances of this class are simply queued and
26 All methods invoked on instances of this class are simply queued and
28 return a a future for the result. Once you call submit(), all the queued
27 return a a future for the result. Once you call submit(), all the queued
29 calls are performed and the results set in their respective futures.
28 calls are performed and the results set in their respective futures.
30 '''
29 '''
31 def __init__(self):
30 def __init__(self):
32 self.calls = []
31 self.calls = []
33 def __getattr__(self, name):
32 def __getattr__(self, name):
34 def call(*args, **opts):
33 def call(*args, **opts):
35 resref = future()
34 resref = future()
36 self.calls.append((name, args, opts, resref,))
35 self.calls.append((name, args, opts, resref,))
37 return resref
36 return resref
38 return call
37 return call
39 def submit(self):
38 def submit(self):
40 pass
39 pass
41
40
42 class localbatch(batcher):
41 class localbatch(batcher):
43 '''performs the queued calls directly'''
42 '''performs the queued calls directly'''
44 def __init__(self, local):
43 def __init__(self, local):
45 batcher.__init__(self)
44 batcher.__init__(self)
46 self.local = local
45 self.local = local
47 def submit(self):
46 def submit(self):
48 for name, args, opts, resref in self.calls:
47 for name, args, opts, resref in self.calls:
49 resref.set(getattr(self.local, name)(*args, **opts))
48 resref.set(getattr(self.local, name)(*args, **opts))
50
49
51 class remotebatch(batcher):
50 class remotebatch(batcher):
52 '''batches the queued calls; uses as few roundtrips as possible'''
51 '''batches the queued calls; uses as few roundtrips as possible'''
53 def __init__(self, remote):
52 def __init__(self, remote):
54 '''remote must support _submitbatch(encbatch) and
53 '''remote must support _submitbatch(encbatch) and
55 _submitone(op, encargs)'''
54 _submitone(op, encargs)'''
56 batcher.__init__(self)
55 batcher.__init__(self)
57 self.remote = remote
56 self.remote = remote
58 def submit(self):
57 def submit(self):
59 req, rsp = [], []
58 req, rsp = [], []
60 for name, args, opts, resref in self.calls:
59 for name, args, opts, resref in self.calls:
61 mtd = getattr(self.remote, name)
60 mtd = getattr(self.remote, name)
62 batchablefn = getattr(mtd, 'batchable', None)
61 batchablefn = getattr(mtd, 'batchable', None)
63 if batchablefn is not None:
62 if batchablefn is not None:
64 batchable = batchablefn(mtd.im_self, *args, **opts)
63 batchable = batchablefn(mtd.im_self, *args, **opts)
65 encargsorres, encresref = batchable.next()
64 encargsorres, encresref = batchable.next()
66 if encresref:
65 if encresref:
67 req.append((name, encargsorres,))
66 req.append((name, encargsorres,))
68 rsp.append((batchable, encresref, resref,))
67 rsp.append((batchable, encresref, resref,))
69 else:
68 else:
70 resref.set(encargsorres)
69 resref.set(encargsorres)
71 else:
70 else:
72 if req:
71 if req:
73 self._submitreq(req, rsp)
72 self._submitreq(req, rsp)
74 req, rsp = [], []
73 req, rsp = [], []
75 resref.set(mtd(*args, **opts))
74 resref.set(mtd(*args, **opts))
76 if req:
75 if req:
77 self._submitreq(req, rsp)
76 self._submitreq(req, rsp)
78 def _submitreq(self, req, rsp):
77 def _submitreq(self, req, rsp):
79 encresults = self.remote._submitbatch(req)
78 encresults = self.remote._submitbatch(req)
80 for encres, r in zip(encresults, rsp):
79 for encres, r in zip(encresults, rsp):
81 batchable, encresref, resref = r
80 batchable, encresref, resref = r
82 encresref.set(encres)
81 encresref.set(encres)
83 resref.set(batchable.next())
82 resref.set(batchable.next())
84
83
85 def batchable(f):
84 def batchable(f):
86 '''annotation for batchable methods
85 '''annotation for batchable methods
87
86
88 Such methods must implement a coroutine as follows:
87 Such methods must implement a coroutine as follows:
89
88
90 @batchable
89 @batchable
91 def sample(self, one, two=None):
90 def sample(self, one, two=None):
92 # Handle locally computable results first:
91 # Handle locally computable results first:
93 if not one:
92 if not one:
94 yield "a local result", None
93 yield "a local result", None
95 # Build list of encoded arguments suitable for your wire protocol:
94 # Build list of encoded arguments suitable for your wire protocol:
96 encargs = [('one', encode(one),), ('two', encode(two),)]
95 encargs = [('one', encode(one),), ('two', encode(two),)]
97 # Create future for injection of encoded result:
96 # Create future for injection of encoded result:
98 encresref = future()
97 encresref = future()
99 # Return encoded arguments and future:
98 # Return encoded arguments and future:
100 yield encargs, encresref
99 yield encargs, encresref
101 # Assuming the future to be filled with the result from the batched
100 # Assuming the future to be filled with the result from the batched
102 # request now. Decode it:
101 # request now. Decode it:
103 yield decode(encresref.value)
102 yield decode(encresref.value)
104
103
105 The decorator returns a function which wraps this coroutine as a plain
104 The decorator returns a function which wraps this coroutine as a plain
106 method, but adds the original method as an attribute called "batchable",
105 method, but adds the original method as an attribute called "batchable",
107 which is used by remotebatch to split the call into separate encoding and
106 which is used by remotebatch to split the call into separate encoding and
108 decoding phases.
107 decoding phases.
109 '''
108 '''
110 def plain(*args, **opts):
109 def plain(*args, **opts):
111 batchable = f(*args, **opts)
110 batchable = f(*args, **opts)
112 encargsorres, encresref = batchable.next()
111 encargsorres, encresref = batchable.next()
113 if not encresref:
112 if not encresref:
114 return encargsorres # a local result in this case
113 return encargsorres # a local result in this case
115 self = args[0]
114 self = args[0]
116 encresref.set(self._submitone(f.func_name, encargsorres))
115 encresref.set(self._submitone(f.func_name, encargsorres))
117 return batchable.next()
116 return batchable.next()
118 setattr(plain, 'batchable', f)
117 setattr(plain, 'batchable', f)
119 return plain
118 return plain
120
119
121 # list of nodes encoding / decoding
120 # list of nodes encoding / decoding
122
121
123 def decodelist(l, sep=' '):
122 def decodelist(l, sep=' '):
124 if l:
123 if l:
125 return map(bin, l.split(sep))
124 return map(bin, l.split(sep))
126 return []
125 return []
127
126
128 def encodelist(l, sep=' '):
127 def encodelist(l, sep=' '):
129 return sep.join(map(hex, l))
128 return sep.join(map(hex, l))
130
129
131 # batched call argument encoding
130 # batched call argument encoding
132
131
133 def escapearg(plain):
132 def escapearg(plain):
134 return (plain
133 return (plain
135 .replace(':', '::')
134 .replace(':', '::')
136 .replace(',', ':,')
135 .replace(',', ':,')
137 .replace(';', ':;')
136 .replace(';', ':;')
138 .replace('=', ':='))
137 .replace('=', ':='))
139
138
140 def unescapearg(escaped):
139 def unescapearg(escaped):
141 return (escaped
140 return (escaped
142 .replace(':=', '=')
141 .replace(':=', '=')
143 .replace(':;', ';')
142 .replace(':;', ';')
144 .replace(':,', ',')
143 .replace(':,', ',')
145 .replace('::', ':'))
144 .replace('::', ':'))
146
145
147 # client side
146 # client side
148
147
149 def todict(**args):
148 def todict(**args):
150 return args
149 return args
151
150
152 class wirepeer(peer.peerrepository):
151 class wirepeer(peer.peerrepository):
153
152
154 def batch(self):
153 def batch(self):
155 return remotebatch(self)
154 return remotebatch(self)
156 def _submitbatch(self, req):
155 def _submitbatch(self, req):
157 cmds = []
156 cmds = []
158 for op, argsdict in req:
157 for op, argsdict in req:
159 args = ','.join('%s=%s' % p for p in argsdict.iteritems())
158 args = ','.join('%s=%s' % p for p in argsdict.iteritems())
160 cmds.append('%s %s' % (op, args))
159 cmds.append('%s %s' % (op, args))
161 rsp = self._call("batch", cmds=';'.join(cmds))
160 rsp = self._call("batch", cmds=';'.join(cmds))
162 return rsp.split(';')
161 return rsp.split(';')
163 def _submitone(self, op, args):
162 def _submitone(self, op, args):
164 return self._call(op, **args)
163 return self._call(op, **args)
165
164
166 @batchable
165 @batchable
167 def lookup(self, key):
166 def lookup(self, key):
168 self.requirecap('lookup', _('look up remote revision'))
167 self.requirecap('lookup', _('look up remote revision'))
169 f = future()
168 f = future()
170 yield todict(key=encoding.fromlocal(key)), f
169 yield todict(key=encoding.fromlocal(key)), f
171 d = f.value
170 d = f.value
172 success, data = d[:-1].split(" ", 1)
171 success, data = d[:-1].split(" ", 1)
173 if int(success):
172 if int(success):
174 yield bin(data)
173 yield bin(data)
175 self._abort(error.RepoError(data))
174 self._abort(error.RepoError(data))
176
175
177 @batchable
176 @batchable
178 def heads(self):
177 def heads(self):
179 f = future()
178 f = future()
180 yield {}, f
179 yield {}, f
181 d = f.value
180 d = f.value
182 try:
181 try:
183 yield decodelist(d[:-1])
182 yield decodelist(d[:-1])
184 except ValueError:
183 except ValueError:
185 self._abort(error.ResponseError(_("unexpected response:"), d))
184 self._abort(error.ResponseError(_("unexpected response:"), d))
186
185
187 @batchable
186 @batchable
188 def known(self, nodes):
187 def known(self, nodes):
189 f = future()
188 f = future()
190 yield todict(nodes=encodelist(nodes)), f
189 yield todict(nodes=encodelist(nodes)), f
191 d = f.value
190 d = f.value
192 try:
191 try:
193 yield [bool(int(f)) for f in d]
192 yield [bool(int(f)) for f in d]
194 except ValueError:
193 except ValueError:
195 self._abort(error.ResponseError(_("unexpected response:"), d))
194 self._abort(error.ResponseError(_("unexpected response:"), d))
196
195
197 @batchable
196 @batchable
198 def branchmap(self):
197 def branchmap(self):
199 f = future()
198 f = future()
200 yield {}, f
199 yield {}, f
201 d = f.value
200 d = f.value
202 try:
201 try:
203 branchmap = {}
202 branchmap = {}
204 for branchpart in d.splitlines():
203 for branchpart in d.splitlines():
205 branchname, branchheads = branchpart.split(' ', 1)
204 branchname, branchheads = branchpart.split(' ', 1)
206 branchname = encoding.tolocal(urllib.unquote(branchname))
205 branchname = encoding.tolocal(urllib.unquote(branchname))
207 branchheads = decodelist(branchheads)
206 branchheads = decodelist(branchheads)
208 branchmap[branchname] = branchheads
207 branchmap[branchname] = branchheads
209 yield branchmap
208 yield branchmap
210 except TypeError:
209 except TypeError:
211 self._abort(error.ResponseError(_("unexpected response:"), d))
210 self._abort(error.ResponseError(_("unexpected response:"), d))
212
211
213 def branches(self, nodes):
212 def branches(self, nodes):
214 n = encodelist(nodes)
213 n = encodelist(nodes)
215 d = self._call("branches", nodes=n)
214 d = self._call("branches", nodes=n)
216 try:
215 try:
217 br = [tuple(decodelist(b)) for b in d.splitlines()]
216 br = [tuple(decodelist(b)) for b in d.splitlines()]
218 return br
217 return br
219 except ValueError:
218 except ValueError:
220 self._abort(error.ResponseError(_("unexpected response:"), d))
219 self._abort(error.ResponseError(_("unexpected response:"), d))
221
220
222 def between(self, pairs):
221 def between(self, pairs):
223 batch = 8 # avoid giant requests
222 batch = 8 # avoid giant requests
224 r = []
223 r = []
225 for i in xrange(0, len(pairs), batch):
224 for i in xrange(0, len(pairs), batch):
226 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
225 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
227 d = self._call("between", pairs=n)
226 d = self._call("between", pairs=n)
228 try:
227 try:
229 r.extend(l and decodelist(l) or [] for l in d.splitlines())
228 r.extend(l and decodelist(l) or [] for l in d.splitlines())
230 except ValueError:
229 except ValueError:
231 self._abort(error.ResponseError(_("unexpected response:"), d))
230 self._abort(error.ResponseError(_("unexpected response:"), d))
232 return r
231 return r
233
232
234 @batchable
233 @batchable
235 def pushkey(self, namespace, key, old, new):
234 def pushkey(self, namespace, key, old, new):
236 if not self.capable('pushkey'):
235 if not self.capable('pushkey'):
237 yield False, None
236 yield False, None
238 f = future()
237 f = future()
239 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
238 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
240 yield todict(namespace=encoding.fromlocal(namespace),
239 yield todict(namespace=encoding.fromlocal(namespace),
241 key=encoding.fromlocal(key),
240 key=encoding.fromlocal(key),
242 old=encoding.fromlocal(old),
241 old=encoding.fromlocal(old),
243 new=encoding.fromlocal(new)), f
242 new=encoding.fromlocal(new)), f
244 d = f.value
243 d = f.value
245 d, output = d.split('\n', 1)
244 d, output = d.split('\n', 1)
246 try:
245 try:
247 d = bool(int(d))
246 d = bool(int(d))
248 except ValueError:
247 except ValueError:
249 raise error.ResponseError(
248 raise error.ResponseError(
250 _('push failed (unexpected response):'), d)
249 _('push failed (unexpected response):'), d)
251 for l in output.splitlines(True):
250 for l in output.splitlines(True):
252 self.ui.status(_('remote: '), l)
251 self.ui.status(_('remote: '), l)
253 yield d
252 yield d
254
253
255 @batchable
254 @batchable
256 def listkeys(self, namespace):
255 def listkeys(self, namespace):
257 if not self.capable('pushkey'):
256 if not self.capable('pushkey'):
258 yield {}, None
257 yield {}, None
259 f = future()
258 f = future()
260 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
259 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
261 yield todict(namespace=encoding.fromlocal(namespace)), f
260 yield todict(namespace=encoding.fromlocal(namespace)), f
262 d = f.value
261 d = f.value
263 r = {}
262 r = {}
264 for l in d.splitlines():
263 for l in d.splitlines():
265 k, v = l.split('\t')
264 k, v = l.split('\t')
266 r[encoding.tolocal(k)] = encoding.tolocal(v)
265 r[encoding.tolocal(k)] = encoding.tolocal(v)
267 yield r
266 yield r
268
267
269 def stream_out(self):
268 def stream_out(self):
270 return self._callstream('stream_out')
269 return self._callstream('stream_out')
271
270
272 def changegroup(self, nodes, kind):
271 def changegroup(self, nodes, kind):
273 n = encodelist(nodes)
272 n = encodelist(nodes)
274 f = self._callstream("changegroup", roots=n)
273 f = self._callstream("changegroup", roots=n)
275 return changegroupmod.unbundle10(self._decompress(f), 'UN')
274 return changegroupmod.unbundle10(self._decompress(f), 'UN')
276
275
277 def changegroupsubset(self, bases, heads, kind):
276 def changegroupsubset(self, bases, heads, kind):
278 self.requirecap('changegroupsubset', _('look up remote changes'))
277 self.requirecap('changegroupsubset', _('look up remote changes'))
279 bases = encodelist(bases)
278 bases = encodelist(bases)
280 heads = encodelist(heads)
279 heads = encodelist(heads)
281 f = self._callstream("changegroupsubset",
280 f = self._callstream("changegroupsubset",
282 bases=bases, heads=heads)
281 bases=bases, heads=heads)
283 return changegroupmod.unbundle10(self._decompress(f), 'UN')
282 return changegroupmod.unbundle10(self._decompress(f), 'UN')
284
283
285 def getbundle(self, source, heads=None, common=None):
284 def getbundle(self, source, heads=None, common=None):
286 self.requirecap('getbundle', _('look up remote changes'))
285 self.requirecap('getbundle', _('look up remote changes'))
287 opts = {}
286 opts = {}
288 if heads is not None:
287 if heads is not None:
289 opts['heads'] = encodelist(heads)
288 opts['heads'] = encodelist(heads)
290 if common is not None:
289 if common is not None:
291 opts['common'] = encodelist(common)
290 opts['common'] = encodelist(common)
292 f = self._callstream("getbundle", **opts)
291 f = self._callstream("getbundle", **opts)
293 return changegroupmod.unbundle10(self._decompress(f), 'UN')
292 return changegroupmod.unbundle10(self._decompress(f), 'UN')
294
293
295 def unbundle(self, cg, heads, source):
294 def unbundle(self, cg, heads, source):
296 '''Send cg (a readable file-like object representing the
295 '''Send cg (a readable file-like object representing the
297 changegroup to push, typically a chunkbuffer object) to the
296 changegroup to push, typically a chunkbuffer object) to the
298 remote server as a bundle. Return an integer indicating the
297 remote server as a bundle. Return an integer indicating the
299 result of the push (see localrepository.addchangegroup()).'''
298 result of the push (see localrepository.addchangegroup()).'''
300
299
301 if heads != ['force'] and self.capable('unbundlehash'):
300 if heads != ['force'] and self.capable('unbundlehash'):
302 heads = encodelist(['hashed',
301 heads = encodelist(['hashed',
303 util.sha1(''.join(sorted(heads))).digest()])
302 util.sha1(''.join(sorted(heads))).digest()])
304 else:
303 else:
305 heads = encodelist(heads)
304 heads = encodelist(heads)
306
305
307 ret, output = self._callpush("unbundle", cg, heads=heads)
306 ret, output = self._callpush("unbundle", cg, heads=heads)
308 if ret == "":
307 if ret == "":
309 raise error.ResponseError(
308 raise error.ResponseError(
310 _('push failed:'), output)
309 _('push failed:'), output)
311 try:
310 try:
312 ret = int(ret)
311 ret = int(ret)
313 except ValueError:
312 except ValueError:
314 raise error.ResponseError(
313 raise error.ResponseError(
315 _('push failed (unexpected response):'), ret)
314 _('push failed (unexpected response):'), ret)
316
315
317 for l in output.splitlines(True):
316 for l in output.splitlines(True):
318 self.ui.status(_('remote: '), l)
317 self.ui.status(_('remote: '), l)
319 return ret
318 return ret
320
319
321 def debugwireargs(self, one, two, three=None, four=None, five=None):
320 def debugwireargs(self, one, two, three=None, four=None, five=None):
322 # don't pass optional arguments left at their default value
321 # don't pass optional arguments left at their default value
323 opts = {}
322 opts = {}
324 if three is not None:
323 if three is not None:
325 opts['three'] = three
324 opts['three'] = three
326 if four is not None:
325 if four is not None:
327 opts['four'] = four
326 opts['four'] = four
328 return self._call('debugwireargs', one=one, two=two, **opts)
327 return self._call('debugwireargs', one=one, two=two, **opts)
329
328
330 # server side
329 # server side
331
330
332 class streamres(object):
331 class streamres(object):
333 def __init__(self, gen):
332 def __init__(self, gen):
334 self.gen = gen
333 self.gen = gen
335
334
336 class pushres(object):
335 class pushres(object):
337 def __init__(self, res):
336 def __init__(self, res):
338 self.res = res
337 self.res = res
339
338
340 class pusherr(object):
339 class pusherr(object):
341 def __init__(self, res):
340 def __init__(self, res):
342 self.res = res
341 self.res = res
343
342
344 class ooberror(object):
343 class ooberror(object):
345 def __init__(self, message):
344 def __init__(self, message):
346 self.message = message
345 self.message = message
347
346
348 def dispatch(repo, proto, command):
347 def dispatch(repo, proto, command):
349 repo = repo.filtered("unserved")
348 repo = repo.filtered("unserved")
350 func, spec = commands[command]
349 func, spec = commands[command]
351 args = proto.getargs(spec)
350 args = proto.getargs(spec)
352 return func(repo, proto, *args)
351 return func(repo, proto, *args)
353
352
354 def options(cmd, keys, others):
353 def options(cmd, keys, others):
355 opts = {}
354 opts = {}
356 for k in keys:
355 for k in keys:
357 if k in others:
356 if k in others:
358 opts[k] = others[k]
357 opts[k] = others[k]
359 del others[k]
358 del others[k]
360 if others:
359 if others:
361 sys.stderr.write("abort: %s got unexpected arguments %s\n"
360 sys.stderr.write("abort: %s got unexpected arguments %s\n"
362 % (cmd, ",".join(others)))
361 % (cmd, ",".join(others)))
363 return opts
362 return opts
364
363
365 def batch(repo, proto, cmds, others):
364 def batch(repo, proto, cmds, others):
366 repo = repo.filtered("unserved")
365 repo = repo.filtered("unserved")
367 res = []
366 res = []
368 for pair in cmds.split(';'):
367 for pair in cmds.split(';'):
369 op, args = pair.split(' ', 1)
368 op, args = pair.split(' ', 1)
370 vals = {}
369 vals = {}
371 for a in args.split(','):
370 for a in args.split(','):
372 if a:
371 if a:
373 n, v = a.split('=')
372 n, v = a.split('=')
374 vals[n] = unescapearg(v)
373 vals[n] = unescapearg(v)
375 func, spec = commands[op]
374 func, spec = commands[op]
376 if spec:
375 if spec:
377 keys = spec.split()
376 keys = spec.split()
378 data = {}
377 data = {}
379 for k in keys:
378 for k in keys:
380 if k == '*':
379 if k == '*':
381 star = {}
380 star = {}
382 for key in vals.keys():
381 for key in vals.keys():
383 if key not in keys:
382 if key not in keys:
384 star[key] = vals[key]
383 star[key] = vals[key]
385 data['*'] = star
384 data['*'] = star
386 else:
385 else:
387 data[k] = vals[k]
386 data[k] = vals[k]
388 result = func(repo, proto, *[data[k] for k in keys])
387 result = func(repo, proto, *[data[k] for k in keys])
389 else:
388 else:
390 result = func(repo, proto)
389 result = func(repo, proto)
391 if isinstance(result, ooberror):
390 if isinstance(result, ooberror):
392 return result
391 return result
393 res.append(escapearg(result))
392 res.append(escapearg(result))
394 return ';'.join(res)
393 return ';'.join(res)
395
394
396 def between(repo, proto, pairs):
395 def between(repo, proto, pairs):
397 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
396 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
398 r = []
397 r = []
399 for b in repo.between(pairs):
398 for b in repo.between(pairs):
400 r.append(encodelist(b) + "\n")
399 r.append(encodelist(b) + "\n")
401 return "".join(r)
400 return "".join(r)
402
401
403 def branchmap(repo, proto):
402 def branchmap(repo, proto):
404 branchmap = discovery.visiblebranchmap(repo)
403 branchmap = repo.branchmap()
405 heads = []
404 heads = []
406 for branch, nodes in branchmap.iteritems():
405 for branch, nodes in branchmap.iteritems():
407 branchname = urllib.quote(encoding.fromlocal(branch))
406 branchname = urllib.quote(encoding.fromlocal(branch))
408 branchnodes = encodelist(nodes)
407 branchnodes = encodelist(nodes)
409 heads.append('%s %s' % (branchname, branchnodes))
408 heads.append('%s %s' % (branchname, branchnodes))
410 return '\n'.join(heads)
409 return '\n'.join(heads)
411
410
412 def branches(repo, proto, nodes):
411 def branches(repo, proto, nodes):
413 nodes = decodelist(nodes)
412 nodes = decodelist(nodes)
414 r = []
413 r = []
415 for b in repo.branches(nodes):
414 for b in repo.branches(nodes):
416 r.append(encodelist(b) + "\n")
415 r.append(encodelist(b) + "\n")
417 return "".join(r)
416 return "".join(r)
418
417
419 def capabilities(repo, proto):
418 def capabilities(repo, proto):
420 caps = ('lookup changegroupsubset branchmap pushkey known getbundle '
419 caps = ('lookup changegroupsubset branchmap pushkey known getbundle '
421 'unbundlehash batch').split()
420 'unbundlehash batch').split()
422 if _allowstream(repo.ui):
421 if _allowstream(repo.ui):
423 if repo.ui.configbool('server', 'preferuncompressed', False):
422 if repo.ui.configbool('server', 'preferuncompressed', False):
424 caps.append('stream-preferred')
423 caps.append('stream-preferred')
425 requiredformats = repo.requirements & repo.supportedformats
424 requiredformats = repo.requirements & repo.supportedformats
426 # if our local revlogs are just revlogv1, add 'stream' cap
425 # if our local revlogs are just revlogv1, add 'stream' cap
427 if not requiredformats - set(('revlogv1',)):
426 if not requiredformats - set(('revlogv1',)):
428 caps.append('stream')
427 caps.append('stream')
429 # otherwise, add 'streamreqs' detailing our local revlog format
428 # otherwise, add 'streamreqs' detailing our local revlog format
430 else:
429 else:
431 caps.append('streamreqs=%s' % ','.join(requiredformats))
430 caps.append('streamreqs=%s' % ','.join(requiredformats))
432 caps.append('unbundle=%s' % ','.join(changegroupmod.bundlepriority))
431 caps.append('unbundle=%s' % ','.join(changegroupmod.bundlepriority))
433 caps.append('httpheader=1024')
432 caps.append('httpheader=1024')
434 return ' '.join(caps)
433 return ' '.join(caps)
435
434
436 def changegroup(repo, proto, roots):
435 def changegroup(repo, proto, roots):
437 nodes = decodelist(roots)
436 nodes = decodelist(roots)
438 cg = repo.changegroup(nodes, 'serve')
437 cg = repo.changegroup(nodes, 'serve')
439 return streamres(proto.groupchunks(cg))
438 return streamres(proto.groupchunks(cg))
440
439
441 def changegroupsubset(repo, proto, bases, heads):
440 def changegroupsubset(repo, proto, bases, heads):
442 bases = decodelist(bases)
441 bases = decodelist(bases)
443 heads = decodelist(heads)
442 heads = decodelist(heads)
444 cg = repo.changegroupsubset(bases, heads, 'serve')
443 cg = repo.changegroupsubset(bases, heads, 'serve')
445 return streamres(proto.groupchunks(cg))
444 return streamres(proto.groupchunks(cg))
446
445
447 def debugwireargs(repo, proto, one, two, others):
446 def debugwireargs(repo, proto, one, two, others):
448 # only accept optional args from the known set
447 # only accept optional args from the known set
449 opts = options('debugwireargs', ['three', 'four'], others)
448 opts = options('debugwireargs', ['three', 'four'], others)
450 return repo.debugwireargs(one, two, **opts)
449 return repo.debugwireargs(one, two, **opts)
451
450
452 def getbundle(repo, proto, others):
451 def getbundle(repo, proto, others):
453 opts = options('getbundle', ['heads', 'common'], others)
452 opts = options('getbundle', ['heads', 'common'], others)
454 for k, v in opts.iteritems():
453 for k, v in opts.iteritems():
455 opts[k] = decodelist(v)
454 opts[k] = decodelist(v)
456 cg = repo.getbundle('serve', **opts)
455 cg = repo.getbundle('serve', **opts)
457 return streamres(proto.groupchunks(cg))
456 return streamres(proto.groupchunks(cg))
458
457
459 def heads(repo, proto):
458 def heads(repo, proto):
460 h = discovery.visibleheads(repo)
459 h = repo.heads()
461 return encodelist(h) + "\n"
460 return encodelist(h) + "\n"
462
461
463 def hello(repo, proto):
462 def hello(repo, proto):
464 '''the hello command returns a set of lines describing various
463 '''the hello command returns a set of lines describing various
465 interesting things about the server, in an RFC822-like format.
464 interesting things about the server, in an RFC822-like format.
466 Currently the only one defined is "capabilities", which
465 Currently the only one defined is "capabilities", which
467 consists of a line in the form:
466 consists of a line in the form:
468
467
469 capabilities: space separated list of tokens
468 capabilities: space separated list of tokens
470 '''
469 '''
471 return "capabilities: %s\n" % (capabilities(repo, proto))
470 return "capabilities: %s\n" % (capabilities(repo, proto))
472
471
473 def listkeys(repo, proto, namespace):
472 def listkeys(repo, proto, namespace):
474 d = repo.listkeys(encoding.tolocal(namespace)).items()
473 d = repo.listkeys(encoding.tolocal(namespace)).items()
475 t = '\n'.join(['%s\t%s' % (encoding.fromlocal(k), encoding.fromlocal(v))
474 t = '\n'.join(['%s\t%s' % (encoding.fromlocal(k), encoding.fromlocal(v))
476 for k, v in d])
475 for k, v in d])
477 return t
476 return t
478
477
479 def lookup(repo, proto, key):
478 def lookup(repo, proto, key):
480 try:
479 try:
481 k = encoding.tolocal(key)
480 k = encoding.tolocal(key)
482 c = repo[k]
481 c = repo[k]
483 if c.phase() == phases.secret:
484 raise error.RepoLookupError(_("unknown revision '%s'") % k)
485 r = c.hex()
482 r = c.hex()
486 success = 1
483 success = 1
487 except Exception, inst:
484 except Exception, inst:
488 r = str(inst)
485 r = str(inst)
489 success = 0
486 success = 0
490 return "%s %s\n" % (success, r)
487 return "%s %s\n" % (success, r)
491
488
492 def known(repo, proto, nodes, others):
489 def known(repo, proto, nodes, others):
493 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
490 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
494
491
495 def pushkey(repo, proto, namespace, key, old, new):
492 def pushkey(repo, proto, namespace, key, old, new):
496 # compatibility with pre-1.8 clients which were accidentally
493 # compatibility with pre-1.8 clients which were accidentally
497 # sending raw binary nodes rather than utf-8-encoded hex
494 # sending raw binary nodes rather than utf-8-encoded hex
498 if len(new) == 20 and new.encode('string-escape') != new:
495 if len(new) == 20 and new.encode('string-escape') != new:
499 # looks like it could be a binary node
496 # looks like it could be a binary node
500 try:
497 try:
501 new.decode('utf-8')
498 new.decode('utf-8')
502 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
499 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
503 except UnicodeDecodeError:
500 except UnicodeDecodeError:
504 pass # binary, leave unmodified
501 pass # binary, leave unmodified
505 else:
502 else:
506 new = encoding.tolocal(new) # normal path
503 new = encoding.tolocal(new) # normal path
507
504
508 if util.safehasattr(proto, 'restore'):
505 if util.safehasattr(proto, 'restore'):
509
506
510 proto.redirect()
507 proto.redirect()
511
508
512 try:
509 try:
513 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
510 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
514 encoding.tolocal(old), new) or False
511 encoding.tolocal(old), new) or False
515 except util.Abort:
512 except util.Abort:
516 r = False
513 r = False
517
514
518 output = proto.restore()
515 output = proto.restore()
519
516
520 return '%s\n%s' % (int(r), output)
517 return '%s\n%s' % (int(r), output)
521
518
522 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
519 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
523 encoding.tolocal(old), new)
520 encoding.tolocal(old), new)
524 return '%s\n' % int(r)
521 return '%s\n' % int(r)
525
522
526 def _allowstream(ui):
523 def _allowstream(ui):
527 return ui.configbool('server', 'uncompressed', True, untrusted=True)
524 return ui.configbool('server', 'uncompressed', True, untrusted=True)
528
525
529 def stream(repo, proto):
526 def stream(repo, proto):
530 '''If the server supports streaming clone, it advertises the "stream"
527 '''If the server supports streaming clone, it advertises the "stream"
531 capability with a value representing the version and flags of the repo
528 capability with a value representing the version and flags of the repo
532 it is serving. Client checks to see if it understands the format.
529 it is serving. Client checks to see if it understands the format.
533
530
534 The format is simple: the server writes out a line with the amount
531 The format is simple: the server writes out a line with the amount
535 of files, then the total amount of bytes to be transferred (separated
532 of files, then the total amount of bytes to be transferred (separated
536 by a space). Then, for each file, the server first writes the filename
533 by a space). Then, for each file, the server first writes the filename
537 and filesize (separated by the null character), then the file contents.
534 and filesize (separated by the null character), then the file contents.
538 '''
535 '''
539
536
540 if not _allowstream(repo.ui):
537 if not _allowstream(repo.ui):
541 return '1\n'
538 return '1\n'
542
539
543 entries = []
540 entries = []
544 total_bytes = 0
541 total_bytes = 0
545 try:
542 try:
546 # get consistent snapshot of repo, lock during scan
543 # get consistent snapshot of repo, lock during scan
547 lock = repo.lock()
544 lock = repo.lock()
548 try:
545 try:
549 repo.ui.debug('scanning\n')
546 repo.ui.debug('scanning\n')
550 for name, ename, size in repo.store.walk():
547 for name, ename, size in repo.store.walk():
551 entries.append((name, size))
548 entries.append((name, size))
552 total_bytes += size
549 total_bytes += size
553 finally:
550 finally:
554 lock.release()
551 lock.release()
555 except error.LockError:
552 except error.LockError:
556 return '2\n' # error: 2
553 return '2\n' # error: 2
557
554
558 def streamer(repo, entries, total):
555 def streamer(repo, entries, total):
559 '''stream out all metadata files in repository.'''
556 '''stream out all metadata files in repository.'''
560 yield '0\n' # success
557 yield '0\n' # success
561 repo.ui.debug('%d files, %d bytes to transfer\n' %
558 repo.ui.debug('%d files, %d bytes to transfer\n' %
562 (len(entries), total_bytes))
559 (len(entries), total_bytes))
563 yield '%d %d\n' % (len(entries), total_bytes)
560 yield '%d %d\n' % (len(entries), total_bytes)
564
561
565 sopener = repo.sopener
562 sopener = repo.sopener
566 oldaudit = sopener.mustaudit
563 oldaudit = sopener.mustaudit
567 debugflag = repo.ui.debugflag
564 debugflag = repo.ui.debugflag
568 sopener.mustaudit = False
565 sopener.mustaudit = False
569
566
570 try:
567 try:
571 for name, size in entries:
568 for name, size in entries:
572 if debugflag:
569 if debugflag:
573 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
570 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
574 # partially encode name over the wire for backwards compat
571 # partially encode name over the wire for backwards compat
575 yield '%s\0%d\n' % (store.encodedir(name), size)
572 yield '%s\0%d\n' % (store.encodedir(name), size)
576 if size <= 65536:
573 if size <= 65536:
577 fp = sopener(name)
574 fp = sopener(name)
578 try:
575 try:
579 data = fp.read(size)
576 data = fp.read(size)
580 finally:
577 finally:
581 fp.close()
578 fp.close()
582 yield data
579 yield data
583 else:
580 else:
584 for chunk in util.filechunkiter(sopener(name), limit=size):
581 for chunk in util.filechunkiter(sopener(name), limit=size):
585 yield chunk
582 yield chunk
586 # replace with "finally:" when support for python 2.4 has been dropped
583 # replace with "finally:" when support for python 2.4 has been dropped
587 except Exception:
584 except Exception:
588 sopener.mustaudit = oldaudit
585 sopener.mustaudit = oldaudit
589 raise
586 raise
590 sopener.mustaudit = oldaudit
587 sopener.mustaudit = oldaudit
591
588
592 return streamres(streamer(repo, entries, total_bytes))
589 return streamres(streamer(repo, entries, total_bytes))
593
590
594 def unbundle(repo, proto, heads):
591 def unbundle(repo, proto, heads):
595 their_heads = decodelist(heads)
592 their_heads = decodelist(heads)
596
593
597 def check_heads():
594 def check_heads():
598 heads = discovery.visibleheads(repo)
595 heads = repo.heads()
599 heads_hash = util.sha1(''.join(sorted(heads))).digest()
596 heads_hash = util.sha1(''.join(sorted(heads))).digest()
600 return (their_heads == ['force'] or their_heads == heads or
597 return (their_heads == ['force'] or their_heads == heads or
601 their_heads == ['hashed', heads_hash])
598 their_heads == ['hashed', heads_hash])
602
599
603 proto.redirect()
600 proto.redirect()
604
601
605 # fail early if possible
602 # fail early if possible
606 if not check_heads():
603 if not check_heads():
607 return pusherr('unsynced changes')
604 return pusherr('unsynced changes')
608
605
609 # write bundle data to temporary file because it can be big
606 # write bundle data to temporary file because it can be big
610 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
607 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
611 fp = os.fdopen(fd, 'wb+')
608 fp = os.fdopen(fd, 'wb+')
612 r = 0
609 r = 0
613 try:
610 try:
614 proto.getfile(fp)
611 proto.getfile(fp)
615 lock = repo.lock()
612 lock = repo.lock()
616 try:
613 try:
617 if not check_heads():
614 if not check_heads():
618 # someone else committed/pushed/unbundled while we
615 # someone else committed/pushed/unbundled while we
619 # were transferring data
616 # were transferring data
620 return pusherr('unsynced changes')
617 return pusherr('unsynced changes')
621
618
622 # push can proceed
619 # push can proceed
623 fp.seek(0)
620 fp.seek(0)
624 gen = changegroupmod.readbundle(fp, None)
621 gen = changegroupmod.readbundle(fp, None)
625
622
626 try:
623 try:
627 r = repo.addchangegroup(gen, 'serve', proto._client())
624 r = repo.addchangegroup(gen, 'serve', proto._client())
628 except util.Abort, inst:
625 except util.Abort, inst:
629 sys.stderr.write("abort: %s\n" % inst)
626 sys.stderr.write("abort: %s\n" % inst)
630 finally:
627 finally:
631 lock.release()
628 lock.release()
632 return pushres(r)
629 return pushres(r)
633
630
634 finally:
631 finally:
635 fp.close()
632 fp.close()
636 os.unlink(tempname)
633 os.unlink(tempname)
637
634
638 commands = {
635 commands = {
639 'batch': (batch, 'cmds *'),
636 'batch': (batch, 'cmds *'),
640 'between': (between, 'pairs'),
637 'between': (between, 'pairs'),
641 'branchmap': (branchmap, ''),
638 'branchmap': (branchmap, ''),
642 'branches': (branches, 'nodes'),
639 'branches': (branches, 'nodes'),
643 'capabilities': (capabilities, ''),
640 'capabilities': (capabilities, ''),
644 'changegroup': (changegroup, 'roots'),
641 'changegroup': (changegroup, 'roots'),
645 'changegroupsubset': (changegroupsubset, 'bases heads'),
642 'changegroupsubset': (changegroupsubset, 'bases heads'),
646 'debugwireargs': (debugwireargs, 'one two *'),
643 'debugwireargs': (debugwireargs, 'one two *'),
647 'getbundle': (getbundle, '*'),
644 'getbundle': (getbundle, '*'),
648 'heads': (heads, ''),
645 'heads': (heads, ''),
649 'hello': (hello, ''),
646 'hello': (hello, ''),
650 'known': (known, 'nodes *'),
647 'known': (known, 'nodes *'),
651 'listkeys': (listkeys, 'namespace'),
648 'listkeys': (listkeys, 'namespace'),
652 'lookup': (lookup, 'key'),
649 'lookup': (lookup, 'key'),
653 'pushkey': (pushkey, 'namespace key old new'),
650 'pushkey': (pushkey, 'namespace key old new'),
654 'stream_out': (stream, ''),
651 'stream_out': (stream, ''),
655 'unbundle': (unbundle, 'heads'),
652 'unbundle': (unbundle, 'heads'),
656 }
653 }
General Comments 0
You need to be logged in to leave comments. Login now