##// END OF EJS Templates
wireproto: remove todict() and use {} literals instead
Augie Fackler -
r20671:5442cab5 default
parent child Browse files
Show More
@@ -1,665 +1,662
1 # wireproto.py - generic wire protocol support functions
1 # wireproto.py - generic wire protocol support functions
2 #
2 #
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 import urllib, tempfile, os, sys
8 import urllib, tempfile, os, sys
9 from i18n import _
9 from i18n import _
10 from node import bin, hex
10 from node import bin, hex
11 import changegroup as changegroupmod
11 import changegroup as changegroupmod
12 import peer, error, encoding, util, store
12 import peer, error, encoding, util, store
13
13
14 # abstract batching support
14 # abstract batching support
15
15
16 class future(object):
16 class future(object):
17 '''placeholder for a value to be set later'''
17 '''placeholder for a value to be set later'''
18 def set(self, value):
18 def set(self, value):
19 if util.safehasattr(self, 'value'):
19 if util.safehasattr(self, 'value'):
20 raise error.RepoError("future is already set")
20 raise error.RepoError("future is already set")
21 self.value = value
21 self.value = value
22
22
23 class batcher(object):
23 class batcher(object):
24 '''base class for batches of commands submittable in a single request
24 '''base class for batches of commands submittable in a single request
25
25
26 All methods invoked on instances of this class are simply queued and
26 All methods invoked on instances of this class are simply queued and
27 return a a future for the result. Once you call submit(), all the queued
27 return a a future for the result. Once you call submit(), all the queued
28 calls are performed and the results set in their respective futures.
28 calls are performed and the results set in their respective futures.
29 '''
29 '''
30 def __init__(self):
30 def __init__(self):
31 self.calls = []
31 self.calls = []
32 def __getattr__(self, name):
32 def __getattr__(self, name):
33 def call(*args, **opts):
33 def call(*args, **opts):
34 resref = future()
34 resref = future()
35 self.calls.append((name, args, opts, resref,))
35 self.calls.append((name, args, opts, resref,))
36 return resref
36 return resref
37 return call
37 return call
38 def submit(self):
38 def submit(self):
39 pass
39 pass
40
40
41 class localbatch(batcher):
41 class localbatch(batcher):
42 '''performs the queued calls directly'''
42 '''performs the queued calls directly'''
43 def __init__(self, local):
43 def __init__(self, local):
44 batcher.__init__(self)
44 batcher.__init__(self)
45 self.local = local
45 self.local = local
46 def submit(self):
46 def submit(self):
47 for name, args, opts, resref in self.calls:
47 for name, args, opts, resref in self.calls:
48 resref.set(getattr(self.local, name)(*args, **opts))
48 resref.set(getattr(self.local, name)(*args, **opts))
49
49
50 class remotebatch(batcher):
50 class remotebatch(batcher):
51 '''batches the queued calls; uses as few roundtrips as possible'''
51 '''batches the queued calls; uses as few roundtrips as possible'''
52 def __init__(self, remote):
52 def __init__(self, remote):
53 '''remote must support _submitbatch(encbatch) and
53 '''remote must support _submitbatch(encbatch) and
54 _submitone(op, encargs)'''
54 _submitone(op, encargs)'''
55 batcher.__init__(self)
55 batcher.__init__(self)
56 self.remote = remote
56 self.remote = remote
57 def submit(self):
57 def submit(self):
58 req, rsp = [], []
58 req, rsp = [], []
59 for name, args, opts, resref in self.calls:
59 for name, args, opts, resref in self.calls:
60 mtd = getattr(self.remote, name)
60 mtd = getattr(self.remote, name)
61 batchablefn = getattr(mtd, 'batchable', None)
61 batchablefn = getattr(mtd, 'batchable', None)
62 if batchablefn is not None:
62 if batchablefn is not None:
63 batchable = batchablefn(mtd.im_self, *args, **opts)
63 batchable = batchablefn(mtd.im_self, *args, **opts)
64 encargsorres, encresref = batchable.next()
64 encargsorres, encresref = batchable.next()
65 if encresref:
65 if encresref:
66 req.append((name, encargsorres,))
66 req.append((name, encargsorres,))
67 rsp.append((batchable, encresref, resref,))
67 rsp.append((batchable, encresref, resref,))
68 else:
68 else:
69 resref.set(encargsorres)
69 resref.set(encargsorres)
70 else:
70 else:
71 if req:
71 if req:
72 self._submitreq(req, rsp)
72 self._submitreq(req, rsp)
73 req, rsp = [], []
73 req, rsp = [], []
74 resref.set(mtd(*args, **opts))
74 resref.set(mtd(*args, **opts))
75 if req:
75 if req:
76 self._submitreq(req, rsp)
76 self._submitreq(req, rsp)
77 def _submitreq(self, req, rsp):
77 def _submitreq(self, req, rsp):
78 encresults = self.remote._submitbatch(req)
78 encresults = self.remote._submitbatch(req)
79 for encres, r in zip(encresults, rsp):
79 for encres, r in zip(encresults, rsp):
80 batchable, encresref, resref = r
80 batchable, encresref, resref = r
81 encresref.set(encres)
81 encresref.set(encres)
82 resref.set(batchable.next())
82 resref.set(batchable.next())
83
83
84 def batchable(f):
84 def batchable(f):
85 '''annotation for batchable methods
85 '''annotation for batchable methods
86
86
87 Such methods must implement a coroutine as follows:
87 Such methods must implement a coroutine as follows:
88
88
89 @batchable
89 @batchable
90 def sample(self, one, two=None):
90 def sample(self, one, two=None):
91 # Handle locally computable results first:
91 # Handle locally computable results first:
92 if not one:
92 if not one:
93 yield "a local result", None
93 yield "a local result", None
94 # Build list of encoded arguments suitable for your wire protocol:
94 # Build list of encoded arguments suitable for your wire protocol:
95 encargs = [('one', encode(one),), ('two', encode(two),)]
95 encargs = [('one', encode(one),), ('two', encode(two),)]
96 # Create future for injection of encoded result:
96 # Create future for injection of encoded result:
97 encresref = future()
97 encresref = future()
98 # Return encoded arguments and future:
98 # Return encoded arguments and future:
99 yield encargs, encresref
99 yield encargs, encresref
100 # Assuming the future to be filled with the result from the batched
100 # Assuming the future to be filled with the result from the batched
101 # request now. Decode it:
101 # request now. Decode it:
102 yield decode(encresref.value)
102 yield decode(encresref.value)
103
103
104 The decorator returns a function which wraps this coroutine as a plain
104 The decorator returns a function which wraps this coroutine as a plain
105 method, but adds the original method as an attribute called "batchable",
105 method, but adds the original method as an attribute called "batchable",
106 which is used by remotebatch to split the call into separate encoding and
106 which is used by remotebatch to split the call into separate encoding and
107 decoding phases.
107 decoding phases.
108 '''
108 '''
109 def plain(*args, **opts):
109 def plain(*args, **opts):
110 batchable = f(*args, **opts)
110 batchable = f(*args, **opts)
111 encargsorres, encresref = batchable.next()
111 encargsorres, encresref = batchable.next()
112 if not encresref:
112 if not encresref:
113 return encargsorres # a local result in this case
113 return encargsorres # a local result in this case
114 self = args[0]
114 self = args[0]
115 encresref.set(self._submitone(f.func_name, encargsorres))
115 encresref.set(self._submitone(f.func_name, encargsorres))
116 return batchable.next()
116 return batchable.next()
117 setattr(plain, 'batchable', f)
117 setattr(plain, 'batchable', f)
118 return plain
118 return plain
119
119
120 # list of nodes encoding / decoding
120 # list of nodes encoding / decoding
121
121
122 def decodelist(l, sep=' '):
122 def decodelist(l, sep=' '):
123 if l:
123 if l:
124 return map(bin, l.split(sep))
124 return map(bin, l.split(sep))
125 return []
125 return []
126
126
127 def encodelist(l, sep=' '):
127 def encodelist(l, sep=' '):
128 return sep.join(map(hex, l))
128 return sep.join(map(hex, l))
129
129
130 # batched call argument encoding
130 # batched call argument encoding
131
131
132 def escapearg(plain):
132 def escapearg(plain):
133 return (plain
133 return (plain
134 .replace(':', '::')
134 .replace(':', '::')
135 .replace(',', ':,')
135 .replace(',', ':,')
136 .replace(';', ':;')
136 .replace(';', ':;')
137 .replace('=', ':='))
137 .replace('=', ':='))
138
138
139 def unescapearg(escaped):
139 def unescapearg(escaped):
140 return (escaped
140 return (escaped
141 .replace(':=', '=')
141 .replace(':=', '=')
142 .replace(':;', ';')
142 .replace(':;', ';')
143 .replace(':,', ',')
143 .replace(':,', ',')
144 .replace('::', ':'))
144 .replace('::', ':'))
145
145
146 # client side
146 # client side
147
147
148 def todict(**args):
149 return args
150
151 class wirepeer(peer.peerrepository):
148 class wirepeer(peer.peerrepository):
152
149
153 def batch(self):
150 def batch(self):
154 return remotebatch(self)
151 return remotebatch(self)
155 def _submitbatch(self, req):
152 def _submitbatch(self, req):
156 cmds = []
153 cmds = []
157 for op, argsdict in req:
154 for op, argsdict in req:
158 args = ','.join('%s=%s' % p for p in argsdict.iteritems())
155 args = ','.join('%s=%s' % p for p in argsdict.iteritems())
159 cmds.append('%s %s' % (op, args))
156 cmds.append('%s %s' % (op, args))
160 rsp = self._call("batch", cmds=';'.join(cmds))
157 rsp = self._call("batch", cmds=';'.join(cmds))
161 return rsp.split(';')
158 return rsp.split(';')
162 def _submitone(self, op, args):
159 def _submitone(self, op, args):
163 return self._call(op, **args)
160 return self._call(op, **args)
164
161
165 @batchable
162 @batchable
166 def lookup(self, key):
163 def lookup(self, key):
167 self.requirecap('lookup', _('look up remote revision'))
164 self.requirecap('lookup', _('look up remote revision'))
168 f = future()
165 f = future()
169 yield todict(key=encoding.fromlocal(key)), f
166 yield {'key': encoding.fromlocal(key)}, f
170 d = f.value
167 d = f.value
171 success, data = d[:-1].split(" ", 1)
168 success, data = d[:-1].split(" ", 1)
172 if int(success):
169 if int(success):
173 yield bin(data)
170 yield bin(data)
174 self._abort(error.RepoError(data))
171 self._abort(error.RepoError(data))
175
172
176 @batchable
173 @batchable
177 def heads(self):
174 def heads(self):
178 f = future()
175 f = future()
179 yield {}, f
176 yield {}, f
180 d = f.value
177 d = f.value
181 try:
178 try:
182 yield decodelist(d[:-1])
179 yield decodelist(d[:-1])
183 except ValueError:
180 except ValueError:
184 self._abort(error.ResponseError(_("unexpected response:"), d))
181 self._abort(error.ResponseError(_("unexpected response:"), d))
185
182
186 @batchable
183 @batchable
187 def known(self, nodes):
184 def known(self, nodes):
188 f = future()
185 f = future()
189 yield todict(nodes=encodelist(nodes)), f
186 yield {'nodes': encodelist(nodes)}, f
190 d = f.value
187 d = f.value
191 try:
188 try:
192 yield [bool(int(f)) for f in d]
189 yield [bool(int(f)) for f in d]
193 except ValueError:
190 except ValueError:
194 self._abort(error.ResponseError(_("unexpected response:"), d))
191 self._abort(error.ResponseError(_("unexpected response:"), d))
195
192
196 @batchable
193 @batchable
197 def branchmap(self):
194 def branchmap(self):
198 f = future()
195 f = future()
199 yield {}, f
196 yield {}, f
200 d = f.value
197 d = f.value
201 try:
198 try:
202 branchmap = {}
199 branchmap = {}
203 for branchpart in d.splitlines():
200 for branchpart in d.splitlines():
204 branchname, branchheads = branchpart.split(' ', 1)
201 branchname, branchheads = branchpart.split(' ', 1)
205 branchname = encoding.tolocal(urllib.unquote(branchname))
202 branchname = encoding.tolocal(urllib.unquote(branchname))
206 branchheads = decodelist(branchheads)
203 branchheads = decodelist(branchheads)
207 branchmap[branchname] = branchheads
204 branchmap[branchname] = branchheads
208 yield branchmap
205 yield branchmap
209 except TypeError:
206 except TypeError:
210 self._abort(error.ResponseError(_("unexpected response:"), d))
207 self._abort(error.ResponseError(_("unexpected response:"), d))
211
208
212 def branches(self, nodes):
209 def branches(self, nodes):
213 n = encodelist(nodes)
210 n = encodelist(nodes)
214 d = self._call("branches", nodes=n)
211 d = self._call("branches", nodes=n)
215 try:
212 try:
216 br = [tuple(decodelist(b)) for b in d.splitlines()]
213 br = [tuple(decodelist(b)) for b in d.splitlines()]
217 return br
214 return br
218 except ValueError:
215 except ValueError:
219 self._abort(error.ResponseError(_("unexpected response:"), d))
216 self._abort(error.ResponseError(_("unexpected response:"), d))
220
217
221 def between(self, pairs):
218 def between(self, pairs):
222 batch = 8 # avoid giant requests
219 batch = 8 # avoid giant requests
223 r = []
220 r = []
224 for i in xrange(0, len(pairs), batch):
221 for i in xrange(0, len(pairs), batch):
225 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
222 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
226 d = self._call("between", pairs=n)
223 d = self._call("between", pairs=n)
227 try:
224 try:
228 r.extend(l and decodelist(l) or [] for l in d.splitlines())
225 r.extend(l and decodelist(l) or [] for l in d.splitlines())
229 except ValueError:
226 except ValueError:
230 self._abort(error.ResponseError(_("unexpected response:"), d))
227 self._abort(error.ResponseError(_("unexpected response:"), d))
231 return r
228 return r
232
229
233 @batchable
230 @batchable
234 def pushkey(self, namespace, key, old, new):
231 def pushkey(self, namespace, key, old, new):
235 if not self.capable('pushkey'):
232 if not self.capable('pushkey'):
236 yield False, None
233 yield False, None
237 f = future()
234 f = future()
238 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
235 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
239 yield todict(namespace=encoding.fromlocal(namespace),
236 yield {'namespace': encoding.fromlocal(namespace),
240 key=encoding.fromlocal(key),
237 'key': encoding.fromlocal(key),
241 old=encoding.fromlocal(old),
238 'old': encoding.fromlocal(old),
242 new=encoding.fromlocal(new)), f
239 'new': encoding.fromlocal(new)}, f
243 d = f.value
240 d = f.value
244 d, output = d.split('\n', 1)
241 d, output = d.split('\n', 1)
245 try:
242 try:
246 d = bool(int(d))
243 d = bool(int(d))
247 except ValueError:
244 except ValueError:
248 raise error.ResponseError(
245 raise error.ResponseError(
249 _('push failed (unexpected response):'), d)
246 _('push failed (unexpected response):'), d)
250 for l in output.splitlines(True):
247 for l in output.splitlines(True):
251 self.ui.status(_('remote: '), l)
248 self.ui.status(_('remote: '), l)
252 yield d
249 yield d
253
250
254 @batchable
251 @batchable
255 def listkeys(self, namespace):
252 def listkeys(self, namespace):
256 if not self.capable('pushkey'):
253 if not self.capable('pushkey'):
257 yield {}, None
254 yield {}, None
258 f = future()
255 f = future()
259 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
256 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
260 yield todict(namespace=encoding.fromlocal(namespace)), f
257 yield {'namespace': encoding.fromlocal(namespace)}, f
261 d = f.value
258 d = f.value
262 r = {}
259 r = {}
263 for l in d.splitlines():
260 for l in d.splitlines():
264 k, v = l.split('\t')
261 k, v = l.split('\t')
265 r[encoding.tolocal(k)] = encoding.tolocal(v)
262 r[encoding.tolocal(k)] = encoding.tolocal(v)
266 yield r
263 yield r
267
264
268 def stream_out(self):
265 def stream_out(self):
269 return self._callstream('stream_out')
266 return self._callstream('stream_out')
270
267
271 def changegroup(self, nodes, kind):
268 def changegroup(self, nodes, kind):
272 n = encodelist(nodes)
269 n = encodelist(nodes)
273 f = self._callstream("changegroup", roots=n)
270 f = self._callstream("changegroup", roots=n)
274 return changegroupmod.unbundle10(self._decompress(f), 'UN')
271 return changegroupmod.unbundle10(self._decompress(f), 'UN')
275
272
276 def changegroupsubset(self, bases, heads, kind):
273 def changegroupsubset(self, bases, heads, kind):
277 self.requirecap('changegroupsubset', _('look up remote changes'))
274 self.requirecap('changegroupsubset', _('look up remote changes'))
278 bases = encodelist(bases)
275 bases = encodelist(bases)
279 heads = encodelist(heads)
276 heads = encodelist(heads)
280 f = self._callstream("changegroupsubset",
277 f = self._callstream("changegroupsubset",
281 bases=bases, heads=heads)
278 bases=bases, heads=heads)
282 return changegroupmod.unbundle10(self._decompress(f), 'UN')
279 return changegroupmod.unbundle10(self._decompress(f), 'UN')
283
280
284 def getbundle(self, source, heads=None, common=None, bundlecaps=None):
281 def getbundle(self, source, heads=None, common=None, bundlecaps=None):
285 self.requirecap('getbundle', _('look up remote changes'))
282 self.requirecap('getbundle', _('look up remote changes'))
286 opts = {}
283 opts = {}
287 if heads is not None:
284 if heads is not None:
288 opts['heads'] = encodelist(heads)
285 opts['heads'] = encodelist(heads)
289 if common is not None:
286 if common is not None:
290 opts['common'] = encodelist(common)
287 opts['common'] = encodelist(common)
291 if bundlecaps is not None:
288 if bundlecaps is not None:
292 opts['bundlecaps'] = ','.join(bundlecaps)
289 opts['bundlecaps'] = ','.join(bundlecaps)
293 f = self._callstream("getbundle", **opts)
290 f = self._callstream("getbundle", **opts)
294 return changegroupmod.unbundle10(self._decompress(f), 'UN')
291 return changegroupmod.unbundle10(self._decompress(f), 'UN')
295
292
296 def unbundle(self, cg, heads, source):
293 def unbundle(self, cg, heads, source):
297 '''Send cg (a readable file-like object representing the
294 '''Send cg (a readable file-like object representing the
298 changegroup to push, typically a chunkbuffer object) to the
295 changegroup to push, typically a chunkbuffer object) to the
299 remote server as a bundle. Return an integer indicating the
296 remote server as a bundle. Return an integer indicating the
300 result of the push (see localrepository.addchangegroup()).'''
297 result of the push (see localrepository.addchangegroup()).'''
301
298
302 if heads != ['force'] and self.capable('unbundlehash'):
299 if heads != ['force'] and self.capable('unbundlehash'):
303 heads = encodelist(['hashed',
300 heads = encodelist(['hashed',
304 util.sha1(''.join(sorted(heads))).digest()])
301 util.sha1(''.join(sorted(heads))).digest()])
305 else:
302 else:
306 heads = encodelist(heads)
303 heads = encodelist(heads)
307
304
308 ret, output = self._callpush("unbundle", cg, heads=heads)
305 ret, output = self._callpush("unbundle", cg, heads=heads)
309 if ret == "":
306 if ret == "":
310 raise error.ResponseError(
307 raise error.ResponseError(
311 _('push failed:'), output)
308 _('push failed:'), output)
312 try:
309 try:
313 ret = int(ret)
310 ret = int(ret)
314 except ValueError:
311 except ValueError:
315 raise error.ResponseError(
312 raise error.ResponseError(
316 _('push failed (unexpected response):'), ret)
313 _('push failed (unexpected response):'), ret)
317
314
318 for l in output.splitlines(True):
315 for l in output.splitlines(True):
319 self.ui.status(_('remote: '), l)
316 self.ui.status(_('remote: '), l)
320 return ret
317 return ret
321
318
322 def debugwireargs(self, one, two, three=None, four=None, five=None):
319 def debugwireargs(self, one, two, three=None, four=None, five=None):
323 # don't pass optional arguments left at their default value
320 # don't pass optional arguments left at their default value
324 opts = {}
321 opts = {}
325 if three is not None:
322 if three is not None:
326 opts['three'] = three
323 opts['three'] = three
327 if four is not None:
324 if four is not None:
328 opts['four'] = four
325 opts['four'] = four
329 return self._call('debugwireargs', one=one, two=two, **opts)
326 return self._call('debugwireargs', one=one, two=two, **opts)
330
327
331 # server side
328 # server side
332
329
333 class streamres(object):
330 class streamres(object):
334 def __init__(self, gen):
331 def __init__(self, gen):
335 self.gen = gen
332 self.gen = gen
336
333
337 class pushres(object):
334 class pushres(object):
338 def __init__(self, res):
335 def __init__(self, res):
339 self.res = res
336 self.res = res
340
337
341 class pusherr(object):
338 class pusherr(object):
342 def __init__(self, res):
339 def __init__(self, res):
343 self.res = res
340 self.res = res
344
341
345 class ooberror(object):
342 class ooberror(object):
346 def __init__(self, message):
343 def __init__(self, message):
347 self.message = message
344 self.message = message
348
345
349 def dispatch(repo, proto, command):
346 def dispatch(repo, proto, command):
350 repo = repo.filtered("served")
347 repo = repo.filtered("served")
351 func, spec = commands[command]
348 func, spec = commands[command]
352 args = proto.getargs(spec)
349 args = proto.getargs(spec)
353 return func(repo, proto, *args)
350 return func(repo, proto, *args)
354
351
355 def options(cmd, keys, others):
352 def options(cmd, keys, others):
356 opts = {}
353 opts = {}
357 for k in keys:
354 for k in keys:
358 if k in others:
355 if k in others:
359 opts[k] = others[k]
356 opts[k] = others[k]
360 del others[k]
357 del others[k]
361 if others:
358 if others:
362 sys.stderr.write("abort: %s got unexpected arguments %s\n"
359 sys.stderr.write("abort: %s got unexpected arguments %s\n"
363 % (cmd, ",".join(others)))
360 % (cmd, ",".join(others)))
364 return opts
361 return opts
365
362
366 def batch(repo, proto, cmds, others):
363 def batch(repo, proto, cmds, others):
367 repo = repo.filtered("served")
364 repo = repo.filtered("served")
368 res = []
365 res = []
369 for pair in cmds.split(';'):
366 for pair in cmds.split(';'):
370 op, args = pair.split(' ', 1)
367 op, args = pair.split(' ', 1)
371 vals = {}
368 vals = {}
372 for a in args.split(','):
369 for a in args.split(','):
373 if a:
370 if a:
374 n, v = a.split('=')
371 n, v = a.split('=')
375 vals[n] = unescapearg(v)
372 vals[n] = unescapearg(v)
376 func, spec = commands[op]
373 func, spec = commands[op]
377 if spec:
374 if spec:
378 keys = spec.split()
375 keys = spec.split()
379 data = {}
376 data = {}
380 for k in keys:
377 for k in keys:
381 if k == '*':
378 if k == '*':
382 star = {}
379 star = {}
383 for key in vals.keys():
380 for key in vals.keys():
384 if key not in keys:
381 if key not in keys:
385 star[key] = vals[key]
382 star[key] = vals[key]
386 data['*'] = star
383 data['*'] = star
387 else:
384 else:
388 data[k] = vals[k]
385 data[k] = vals[k]
389 result = func(repo, proto, *[data[k] for k in keys])
386 result = func(repo, proto, *[data[k] for k in keys])
390 else:
387 else:
391 result = func(repo, proto)
388 result = func(repo, proto)
392 if isinstance(result, ooberror):
389 if isinstance(result, ooberror):
393 return result
390 return result
394 res.append(escapearg(result))
391 res.append(escapearg(result))
395 return ';'.join(res)
392 return ';'.join(res)
396
393
397 def between(repo, proto, pairs):
394 def between(repo, proto, pairs):
398 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
395 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
399 r = []
396 r = []
400 for b in repo.between(pairs):
397 for b in repo.between(pairs):
401 r.append(encodelist(b) + "\n")
398 r.append(encodelist(b) + "\n")
402 return "".join(r)
399 return "".join(r)
403
400
404 def branchmap(repo, proto):
401 def branchmap(repo, proto):
405 branchmap = repo.branchmap()
402 branchmap = repo.branchmap()
406 heads = []
403 heads = []
407 for branch, nodes in branchmap.iteritems():
404 for branch, nodes in branchmap.iteritems():
408 branchname = urllib.quote(encoding.fromlocal(branch))
405 branchname = urllib.quote(encoding.fromlocal(branch))
409 branchnodes = encodelist(nodes)
406 branchnodes = encodelist(nodes)
410 heads.append('%s %s' % (branchname, branchnodes))
407 heads.append('%s %s' % (branchname, branchnodes))
411 return '\n'.join(heads)
408 return '\n'.join(heads)
412
409
413 def branches(repo, proto, nodes):
410 def branches(repo, proto, nodes):
414 nodes = decodelist(nodes)
411 nodes = decodelist(nodes)
415 r = []
412 r = []
416 for b in repo.branches(nodes):
413 for b in repo.branches(nodes):
417 r.append(encodelist(b) + "\n")
414 r.append(encodelist(b) + "\n")
418 return "".join(r)
415 return "".join(r)
419
416
420 def capabilities(repo, proto):
417 def capabilities(repo, proto):
421 caps = ('lookup changegroupsubset branchmap pushkey known getbundle '
418 caps = ('lookup changegroupsubset branchmap pushkey known getbundle '
422 'unbundlehash batch').split()
419 'unbundlehash batch').split()
423 if _allowstream(repo.ui):
420 if _allowstream(repo.ui):
424 if repo.ui.configbool('server', 'preferuncompressed', False):
421 if repo.ui.configbool('server', 'preferuncompressed', False):
425 caps.append('stream-preferred')
422 caps.append('stream-preferred')
426 requiredformats = repo.requirements & repo.supportedformats
423 requiredformats = repo.requirements & repo.supportedformats
427 # if our local revlogs are just revlogv1, add 'stream' cap
424 # if our local revlogs are just revlogv1, add 'stream' cap
428 if not requiredformats - set(('revlogv1',)):
425 if not requiredformats - set(('revlogv1',)):
429 caps.append('stream')
426 caps.append('stream')
430 # otherwise, add 'streamreqs' detailing our local revlog format
427 # otherwise, add 'streamreqs' detailing our local revlog format
431 else:
428 else:
432 caps.append('streamreqs=%s' % ','.join(requiredformats))
429 caps.append('streamreqs=%s' % ','.join(requiredformats))
433 caps.append('unbundle=%s' % ','.join(changegroupmod.bundlepriority))
430 caps.append('unbundle=%s' % ','.join(changegroupmod.bundlepriority))
434 caps.append('httpheader=1024')
431 caps.append('httpheader=1024')
435 return ' '.join(caps)
432 return ' '.join(caps)
436
433
437 def changegroup(repo, proto, roots):
434 def changegroup(repo, proto, roots):
438 nodes = decodelist(roots)
435 nodes = decodelist(roots)
439 cg = repo.changegroup(nodes, 'serve')
436 cg = repo.changegroup(nodes, 'serve')
440 return streamres(proto.groupchunks(cg))
437 return streamres(proto.groupchunks(cg))
441
438
442 def changegroupsubset(repo, proto, bases, heads):
439 def changegroupsubset(repo, proto, bases, heads):
443 bases = decodelist(bases)
440 bases = decodelist(bases)
444 heads = decodelist(heads)
441 heads = decodelist(heads)
445 cg = repo.changegroupsubset(bases, heads, 'serve')
442 cg = repo.changegroupsubset(bases, heads, 'serve')
446 return streamres(proto.groupchunks(cg))
443 return streamres(proto.groupchunks(cg))
447
444
448 def debugwireargs(repo, proto, one, two, others):
445 def debugwireargs(repo, proto, one, two, others):
449 # only accept optional args from the known set
446 # only accept optional args from the known set
450 opts = options('debugwireargs', ['three', 'four'], others)
447 opts = options('debugwireargs', ['three', 'four'], others)
451 return repo.debugwireargs(one, two, **opts)
448 return repo.debugwireargs(one, two, **opts)
452
449
453 def getbundle(repo, proto, others):
450 def getbundle(repo, proto, others):
454 opts = options('getbundle', ['heads', 'common', 'bundlecaps'], others)
451 opts = options('getbundle', ['heads', 'common', 'bundlecaps'], others)
455 for k, v in opts.iteritems():
452 for k, v in opts.iteritems():
456 if k in ('heads', 'common'):
453 if k in ('heads', 'common'):
457 opts[k] = decodelist(v)
454 opts[k] = decodelist(v)
458 elif k == 'bundlecaps':
455 elif k == 'bundlecaps':
459 opts[k] = set(v.split(','))
456 opts[k] = set(v.split(','))
460 cg = repo.getbundle('serve', **opts)
457 cg = repo.getbundle('serve', **opts)
461 return streamres(proto.groupchunks(cg))
458 return streamres(proto.groupchunks(cg))
462
459
463 def heads(repo, proto):
460 def heads(repo, proto):
464 h = repo.heads()
461 h = repo.heads()
465 return encodelist(h) + "\n"
462 return encodelist(h) + "\n"
466
463
467 def hello(repo, proto):
464 def hello(repo, proto):
468 '''the hello command returns a set of lines describing various
465 '''the hello command returns a set of lines describing various
469 interesting things about the server, in an RFC822-like format.
466 interesting things about the server, in an RFC822-like format.
470 Currently the only one defined is "capabilities", which
467 Currently the only one defined is "capabilities", which
471 consists of a line in the form:
468 consists of a line in the form:
472
469
473 capabilities: space separated list of tokens
470 capabilities: space separated list of tokens
474 '''
471 '''
475 return "capabilities: %s\n" % (capabilities(repo, proto))
472 return "capabilities: %s\n" % (capabilities(repo, proto))
476
473
477 def listkeys(repo, proto, namespace):
474 def listkeys(repo, proto, namespace):
478 d = repo.listkeys(encoding.tolocal(namespace)).items()
475 d = repo.listkeys(encoding.tolocal(namespace)).items()
479 t = '\n'.join(['%s\t%s' % (encoding.fromlocal(k), encoding.fromlocal(v))
476 t = '\n'.join(['%s\t%s' % (encoding.fromlocal(k), encoding.fromlocal(v))
480 for k, v in d])
477 for k, v in d])
481 return t
478 return t
482
479
483 def lookup(repo, proto, key):
480 def lookup(repo, proto, key):
484 try:
481 try:
485 k = encoding.tolocal(key)
482 k = encoding.tolocal(key)
486 c = repo[k]
483 c = repo[k]
487 r = c.hex()
484 r = c.hex()
488 success = 1
485 success = 1
489 except Exception, inst:
486 except Exception, inst:
490 r = str(inst)
487 r = str(inst)
491 success = 0
488 success = 0
492 return "%s %s\n" % (success, r)
489 return "%s %s\n" % (success, r)
493
490
494 def known(repo, proto, nodes, others):
491 def known(repo, proto, nodes, others):
495 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
492 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
496
493
497 def pushkey(repo, proto, namespace, key, old, new):
494 def pushkey(repo, proto, namespace, key, old, new):
498 # compatibility with pre-1.8 clients which were accidentally
495 # compatibility with pre-1.8 clients which were accidentally
499 # sending raw binary nodes rather than utf-8-encoded hex
496 # sending raw binary nodes rather than utf-8-encoded hex
500 if len(new) == 20 and new.encode('string-escape') != new:
497 if len(new) == 20 and new.encode('string-escape') != new:
501 # looks like it could be a binary node
498 # looks like it could be a binary node
502 try:
499 try:
503 new.decode('utf-8')
500 new.decode('utf-8')
504 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
501 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
505 except UnicodeDecodeError:
502 except UnicodeDecodeError:
506 pass # binary, leave unmodified
503 pass # binary, leave unmodified
507 else:
504 else:
508 new = encoding.tolocal(new) # normal path
505 new = encoding.tolocal(new) # normal path
509
506
510 if util.safehasattr(proto, 'restore'):
507 if util.safehasattr(proto, 'restore'):
511
508
512 proto.redirect()
509 proto.redirect()
513
510
514 try:
511 try:
515 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
512 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
516 encoding.tolocal(old), new) or False
513 encoding.tolocal(old), new) or False
517 except util.Abort:
514 except util.Abort:
518 r = False
515 r = False
519
516
520 output = proto.restore()
517 output = proto.restore()
521
518
522 return '%s\n%s' % (int(r), output)
519 return '%s\n%s' % (int(r), output)
523
520
524 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
521 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
525 encoding.tolocal(old), new)
522 encoding.tolocal(old), new)
526 return '%s\n' % int(r)
523 return '%s\n' % int(r)
527
524
528 def _allowstream(ui):
525 def _allowstream(ui):
529 return ui.configbool('server', 'uncompressed', True, untrusted=True)
526 return ui.configbool('server', 'uncompressed', True, untrusted=True)
530
527
531 def _walkstreamfiles(repo):
528 def _walkstreamfiles(repo):
532 # this is it's own function so extensions can override it
529 # this is it's own function so extensions can override it
533 return repo.store.walk()
530 return repo.store.walk()
534
531
535 def stream(repo, proto):
532 def stream(repo, proto):
536 '''If the server supports streaming clone, it advertises the "stream"
533 '''If the server supports streaming clone, it advertises the "stream"
537 capability with a value representing the version and flags of the repo
534 capability with a value representing the version and flags of the repo
538 it is serving. Client checks to see if it understands the format.
535 it is serving. Client checks to see if it understands the format.
539
536
540 The format is simple: the server writes out a line with the amount
537 The format is simple: the server writes out a line with the amount
541 of files, then the total amount of bytes to be transferred (separated
538 of files, then the total amount of bytes to be transferred (separated
542 by a space). Then, for each file, the server first writes the filename
539 by a space). Then, for each file, the server first writes the filename
543 and filesize (separated by the null character), then the file contents.
540 and filesize (separated by the null character), then the file contents.
544 '''
541 '''
545
542
546 if not _allowstream(repo.ui):
543 if not _allowstream(repo.ui):
547 return '1\n'
544 return '1\n'
548
545
549 entries = []
546 entries = []
550 total_bytes = 0
547 total_bytes = 0
551 try:
548 try:
552 # get consistent snapshot of repo, lock during scan
549 # get consistent snapshot of repo, lock during scan
553 lock = repo.lock()
550 lock = repo.lock()
554 try:
551 try:
555 repo.ui.debug('scanning\n')
552 repo.ui.debug('scanning\n')
556 for name, ename, size in _walkstreamfiles(repo):
553 for name, ename, size in _walkstreamfiles(repo):
557 if size:
554 if size:
558 entries.append((name, size))
555 entries.append((name, size))
559 total_bytes += size
556 total_bytes += size
560 finally:
557 finally:
561 lock.release()
558 lock.release()
562 except error.LockError:
559 except error.LockError:
563 return '2\n' # error: 2
560 return '2\n' # error: 2
564
561
565 def streamer(repo, entries, total):
562 def streamer(repo, entries, total):
566 '''stream out all metadata files in repository.'''
563 '''stream out all metadata files in repository.'''
567 yield '0\n' # success
564 yield '0\n' # success
568 repo.ui.debug('%d files, %d bytes to transfer\n' %
565 repo.ui.debug('%d files, %d bytes to transfer\n' %
569 (len(entries), total_bytes))
566 (len(entries), total_bytes))
570 yield '%d %d\n' % (len(entries), total_bytes)
567 yield '%d %d\n' % (len(entries), total_bytes)
571
568
572 sopener = repo.sopener
569 sopener = repo.sopener
573 oldaudit = sopener.mustaudit
570 oldaudit = sopener.mustaudit
574 debugflag = repo.ui.debugflag
571 debugflag = repo.ui.debugflag
575 sopener.mustaudit = False
572 sopener.mustaudit = False
576
573
577 try:
574 try:
578 for name, size in entries:
575 for name, size in entries:
579 if debugflag:
576 if debugflag:
580 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
577 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
581 # partially encode name over the wire for backwards compat
578 # partially encode name over the wire for backwards compat
582 yield '%s\0%d\n' % (store.encodedir(name), size)
579 yield '%s\0%d\n' % (store.encodedir(name), size)
583 if size <= 65536:
580 if size <= 65536:
584 fp = sopener(name)
581 fp = sopener(name)
585 try:
582 try:
586 data = fp.read(size)
583 data = fp.read(size)
587 finally:
584 finally:
588 fp.close()
585 fp.close()
589 yield data
586 yield data
590 else:
587 else:
591 for chunk in util.filechunkiter(sopener(name), limit=size):
588 for chunk in util.filechunkiter(sopener(name), limit=size):
592 yield chunk
589 yield chunk
593 # replace with "finally:" when support for python 2.4 has been dropped
590 # replace with "finally:" when support for python 2.4 has been dropped
594 except Exception:
591 except Exception:
595 sopener.mustaudit = oldaudit
592 sopener.mustaudit = oldaudit
596 raise
593 raise
597 sopener.mustaudit = oldaudit
594 sopener.mustaudit = oldaudit
598
595
599 return streamres(streamer(repo, entries, total_bytes))
596 return streamres(streamer(repo, entries, total_bytes))
600
597
601 def unbundle(repo, proto, heads):
598 def unbundle(repo, proto, heads):
602 their_heads = decodelist(heads)
599 their_heads = decodelist(heads)
603
600
604 def check_heads():
601 def check_heads():
605 heads = repo.heads()
602 heads = repo.heads()
606 heads_hash = util.sha1(''.join(sorted(heads))).digest()
603 heads_hash = util.sha1(''.join(sorted(heads))).digest()
607 return (their_heads == ['force'] or their_heads == heads or
604 return (their_heads == ['force'] or their_heads == heads or
608 their_heads == ['hashed', heads_hash])
605 their_heads == ['hashed', heads_hash])
609
606
610 proto.redirect()
607 proto.redirect()
611
608
612 # fail early if possible
609 # fail early if possible
613 if not check_heads():
610 if not check_heads():
614 return pusherr('repository changed while preparing changes - '
611 return pusherr('repository changed while preparing changes - '
615 'please try again')
612 'please try again')
616
613
617 # write bundle data to temporary file because it can be big
614 # write bundle data to temporary file because it can be big
618 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
615 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
619 fp = os.fdopen(fd, 'wb+')
616 fp = os.fdopen(fd, 'wb+')
620 r = 0
617 r = 0
621 try:
618 try:
622 proto.getfile(fp)
619 proto.getfile(fp)
623 lock = repo.lock()
620 lock = repo.lock()
624 try:
621 try:
625 if not check_heads():
622 if not check_heads():
626 # someone else committed/pushed/unbundled while we
623 # someone else committed/pushed/unbundled while we
627 # were transferring data
624 # were transferring data
628 return pusherr('repository changed while uploading changes - '
625 return pusherr('repository changed while uploading changes - '
629 'please try again')
626 'please try again')
630
627
631 # push can proceed
628 # push can proceed
632 fp.seek(0)
629 fp.seek(0)
633 gen = changegroupmod.readbundle(fp, None)
630 gen = changegroupmod.readbundle(fp, None)
634
631
635 try:
632 try:
636 r = repo.addchangegroup(gen, 'serve', proto._client())
633 r = repo.addchangegroup(gen, 'serve', proto._client())
637 except util.Abort, inst:
634 except util.Abort, inst:
638 sys.stderr.write("abort: %s\n" % inst)
635 sys.stderr.write("abort: %s\n" % inst)
639 finally:
636 finally:
640 lock.release()
637 lock.release()
641 return pushres(r)
638 return pushres(r)
642
639
643 finally:
640 finally:
644 fp.close()
641 fp.close()
645 os.unlink(tempname)
642 os.unlink(tempname)
646
643
647 commands = {
644 commands = {
648 'batch': (batch, 'cmds *'),
645 'batch': (batch, 'cmds *'),
649 'between': (between, 'pairs'),
646 'between': (between, 'pairs'),
650 'branchmap': (branchmap, ''),
647 'branchmap': (branchmap, ''),
651 'branches': (branches, 'nodes'),
648 'branches': (branches, 'nodes'),
652 'capabilities': (capabilities, ''),
649 'capabilities': (capabilities, ''),
653 'changegroup': (changegroup, 'roots'),
650 'changegroup': (changegroup, 'roots'),
654 'changegroupsubset': (changegroupsubset, 'bases heads'),
651 'changegroupsubset': (changegroupsubset, 'bases heads'),
655 'debugwireargs': (debugwireargs, 'one two *'),
652 'debugwireargs': (debugwireargs, 'one two *'),
656 'getbundle': (getbundle, '*'),
653 'getbundle': (getbundle, '*'),
657 'heads': (heads, ''),
654 'heads': (heads, ''),
658 'hello': (hello, ''),
655 'hello': (hello, ''),
659 'known': (known, 'nodes *'),
656 'known': (known, 'nodes *'),
660 'listkeys': (listkeys, 'namespace'),
657 'listkeys': (listkeys, 'namespace'),
661 'lookup': (lookup, 'key'),
658 'lookup': (lookup, 'key'),
662 'pushkey': (pushkey, 'namespace key old new'),
659 'pushkey': (pushkey, 'namespace key old new'),
663 'stream_out': (stream, ''),
660 'stream_out': (stream, ''),
664 'unbundle': (unbundle, 'heads'),
661 'unbundle': (unbundle, 'heads'),
665 }
662 }
General Comments 0
You need to be logged in to leave comments. Login now