##// END OF EJS Templates
wireproto: do not call pushkey module directly (issue3041)...
Andreas Freimuth -
r15217:42d0d4f6 default
parent child Browse files
Show More
@@ -1,609 +1,607
1 # wireproto.py - generic wire protocol support functions
1 # wireproto.py - generic wire protocol support functions
2 #
2 #
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 import urllib, tempfile, os, sys
8 import urllib, tempfile, os, sys
9 from i18n import _
9 from i18n import _
10 from node import bin, hex
10 from node import bin, hex
11 import changegroup as changegroupmod
11 import changegroup as changegroupmod
12 import repo, error, encoding, util, store
12 import repo, error, encoding, util, store
13 import pushkey as pushkeymod
14
13
15 # abstract batching support
14 # abstract batching support
16
15
17 class future(object):
16 class future(object):
18 '''placeholder for a value to be set later'''
17 '''placeholder for a value to be set later'''
19 def set(self, value):
18 def set(self, value):
20 if util.safehasattr(self, 'value'):
19 if util.safehasattr(self, 'value'):
21 raise error.RepoError("future is already set")
20 raise error.RepoError("future is already set")
22 self.value = value
21 self.value = value
23
22
24 class batcher(object):
23 class batcher(object):
25 '''base class for batches of commands submittable in a single request
24 '''base class for batches of commands submittable in a single request
26
25
27 All methods invoked on instances of this class are simply queued and return a
26 All methods invoked on instances of this class are simply queued and return a
28 a future for the result. Once you call submit(), all the queued calls are
27 a future for the result. Once you call submit(), all the queued calls are
29 performed and the results set in their respective futures.
28 performed and the results set in their respective futures.
30 '''
29 '''
31 def __init__(self):
30 def __init__(self):
32 self.calls = []
31 self.calls = []
33 def __getattr__(self, name):
32 def __getattr__(self, name):
34 def call(*args, **opts):
33 def call(*args, **opts):
35 resref = future()
34 resref = future()
36 self.calls.append((name, args, opts, resref,))
35 self.calls.append((name, args, opts, resref,))
37 return resref
36 return resref
38 return call
37 return call
39 def submit(self):
38 def submit(self):
40 pass
39 pass
41
40
42 class localbatch(batcher):
41 class localbatch(batcher):
43 '''performs the queued calls directly'''
42 '''performs the queued calls directly'''
44 def __init__(self, local):
43 def __init__(self, local):
45 batcher.__init__(self)
44 batcher.__init__(self)
46 self.local = local
45 self.local = local
47 def submit(self):
46 def submit(self):
48 for name, args, opts, resref in self.calls:
47 for name, args, opts, resref in self.calls:
49 resref.set(getattr(self.local, name)(*args, **opts))
48 resref.set(getattr(self.local, name)(*args, **opts))
50
49
51 class remotebatch(batcher):
50 class remotebatch(batcher):
52 '''batches the queued calls; uses as few roundtrips as possible'''
51 '''batches the queued calls; uses as few roundtrips as possible'''
53 def __init__(self, remote):
52 def __init__(self, remote):
54 '''remote must support _submitbatch(encbatch) and _submitone(op, encargs)'''
53 '''remote must support _submitbatch(encbatch) and _submitone(op, encargs)'''
55 batcher.__init__(self)
54 batcher.__init__(self)
56 self.remote = remote
55 self.remote = remote
57 def submit(self):
56 def submit(self):
58 req, rsp = [], []
57 req, rsp = [], []
59 for name, args, opts, resref in self.calls:
58 for name, args, opts, resref in self.calls:
60 mtd = getattr(self.remote, name)
59 mtd = getattr(self.remote, name)
61 batchablefn = getattr(mtd, 'batchable', None)
60 batchablefn = getattr(mtd, 'batchable', None)
62 if batchablefn is not None:
61 if batchablefn is not None:
63 batchable = batchablefn(mtd.im_self, *args, **opts)
62 batchable = batchablefn(mtd.im_self, *args, **opts)
64 encargsorres, encresref = batchable.next()
63 encargsorres, encresref = batchable.next()
65 if encresref:
64 if encresref:
66 req.append((name, encargsorres,))
65 req.append((name, encargsorres,))
67 rsp.append((batchable, encresref, resref,))
66 rsp.append((batchable, encresref, resref,))
68 else:
67 else:
69 resref.set(encargsorres)
68 resref.set(encargsorres)
70 else:
69 else:
71 if req:
70 if req:
72 self._submitreq(req, rsp)
71 self._submitreq(req, rsp)
73 req, rsp = [], []
72 req, rsp = [], []
74 resref.set(mtd(*args, **opts))
73 resref.set(mtd(*args, **opts))
75 if req:
74 if req:
76 self._submitreq(req, rsp)
75 self._submitreq(req, rsp)
77 def _submitreq(self, req, rsp):
76 def _submitreq(self, req, rsp):
78 encresults = self.remote._submitbatch(req)
77 encresults = self.remote._submitbatch(req)
79 for encres, r in zip(encresults, rsp):
78 for encres, r in zip(encresults, rsp):
80 batchable, encresref, resref = r
79 batchable, encresref, resref = r
81 encresref.set(encres)
80 encresref.set(encres)
82 resref.set(batchable.next())
81 resref.set(batchable.next())
83
82
84 def batchable(f):
83 def batchable(f):
85 '''annotation for batchable methods
84 '''annotation for batchable methods
86
85
87 Such methods must implement a coroutine as follows:
86 Such methods must implement a coroutine as follows:
88
87
89 @batchable
88 @batchable
90 def sample(self, one, two=None):
89 def sample(self, one, two=None):
91 # Handle locally computable results first:
90 # Handle locally computable results first:
92 if not one:
91 if not one:
93 yield "a local result", None
92 yield "a local result", None
94 # Build list of encoded arguments suitable for your wire protocol:
93 # Build list of encoded arguments suitable for your wire protocol:
95 encargs = [('one', encode(one),), ('two', encode(two),)]
94 encargs = [('one', encode(one),), ('two', encode(two),)]
96 # Create future for injection of encoded result:
95 # Create future for injection of encoded result:
97 encresref = future()
96 encresref = future()
98 # Return encoded arguments and future:
97 # Return encoded arguments and future:
99 yield encargs, encresref
98 yield encargs, encresref
100 # Assuming the future to be filled with the result from the batched request
99 # Assuming the future to be filled with the result from the batched request
101 # now. Decode it:
100 # now. Decode it:
102 yield decode(encresref.value)
101 yield decode(encresref.value)
103
102
104 The decorator returns a function which wraps this coroutine as a plain method,
103 The decorator returns a function which wraps this coroutine as a plain method,
105 but adds the original method as an attribute called "batchable", which is
104 but adds the original method as an attribute called "batchable", which is
106 used by remotebatch to split the call into separate encoding and decoding
105 used by remotebatch to split the call into separate encoding and decoding
107 phases.
106 phases.
108 '''
107 '''
109 def plain(*args, **opts):
108 def plain(*args, **opts):
110 batchable = f(*args, **opts)
109 batchable = f(*args, **opts)
111 encargsorres, encresref = batchable.next()
110 encargsorres, encresref = batchable.next()
112 if not encresref:
111 if not encresref:
113 return encargsorres # a local result in this case
112 return encargsorres # a local result in this case
114 self = args[0]
113 self = args[0]
115 encresref.set(self._submitone(f.func_name, encargsorres))
114 encresref.set(self._submitone(f.func_name, encargsorres))
116 return batchable.next()
115 return batchable.next()
117 setattr(plain, 'batchable', f)
116 setattr(plain, 'batchable', f)
118 return plain
117 return plain
119
118
120 # list of nodes encoding / decoding
119 # list of nodes encoding / decoding
121
120
122 def decodelist(l, sep=' '):
121 def decodelist(l, sep=' '):
123 if l:
122 if l:
124 return map(bin, l.split(sep))
123 return map(bin, l.split(sep))
125 return []
124 return []
126
125
127 def encodelist(l, sep=' '):
126 def encodelist(l, sep=' '):
128 return sep.join(map(hex, l))
127 return sep.join(map(hex, l))
129
128
130 # batched call argument encoding
129 # batched call argument encoding
131
130
132 def escapearg(plain):
131 def escapearg(plain):
133 return (plain
132 return (plain
134 .replace(':', '::')
133 .replace(':', '::')
135 .replace(',', ':,')
134 .replace(',', ':,')
136 .replace(';', ':;')
135 .replace(';', ':;')
137 .replace('=', ':='))
136 .replace('=', ':='))
138
137
139 def unescapearg(escaped):
138 def unescapearg(escaped):
140 return (escaped
139 return (escaped
141 .replace(':=', '=')
140 .replace(':=', '=')
142 .replace(':;', ';')
141 .replace(':;', ';')
143 .replace(':,', ',')
142 .replace(':,', ',')
144 .replace('::', ':'))
143 .replace('::', ':'))
145
144
146 # client side
145 # client side
147
146
148 def todict(**args):
147 def todict(**args):
149 return args
148 return args
150
149
151 class wirerepository(repo.repository):
150 class wirerepository(repo.repository):
152
151
153 def batch(self):
152 def batch(self):
154 return remotebatch(self)
153 return remotebatch(self)
155 def _submitbatch(self, req):
154 def _submitbatch(self, req):
156 cmds = []
155 cmds = []
157 for op, argsdict in req:
156 for op, argsdict in req:
158 args = ','.join('%s=%s' % p for p in argsdict.iteritems())
157 args = ','.join('%s=%s' % p for p in argsdict.iteritems())
159 cmds.append('%s %s' % (op, args))
158 cmds.append('%s %s' % (op, args))
160 rsp = self._call("batch", cmds=';'.join(cmds))
159 rsp = self._call("batch", cmds=';'.join(cmds))
161 return rsp.split(';')
160 return rsp.split(';')
162 def _submitone(self, op, args):
161 def _submitone(self, op, args):
163 return self._call(op, **args)
162 return self._call(op, **args)
164
163
165 @batchable
164 @batchable
166 def lookup(self, key):
165 def lookup(self, key):
167 self.requirecap('lookup', _('look up remote revision'))
166 self.requirecap('lookup', _('look up remote revision'))
168 f = future()
167 f = future()
169 yield todict(key=encoding.fromlocal(key)), f
168 yield todict(key=encoding.fromlocal(key)), f
170 d = f.value
169 d = f.value
171 success, data = d[:-1].split(" ", 1)
170 success, data = d[:-1].split(" ", 1)
172 if int(success):
171 if int(success):
173 yield bin(data)
172 yield bin(data)
174 self._abort(error.RepoError(data))
173 self._abort(error.RepoError(data))
175
174
176 @batchable
175 @batchable
177 def heads(self):
176 def heads(self):
178 f = future()
177 f = future()
179 yield {}, f
178 yield {}, f
180 d = f.value
179 d = f.value
181 try:
180 try:
182 yield decodelist(d[:-1])
181 yield decodelist(d[:-1])
183 except ValueError:
182 except ValueError:
184 self._abort(error.ResponseError(_("unexpected response:"), d))
183 self._abort(error.ResponseError(_("unexpected response:"), d))
185
184
186 @batchable
185 @batchable
187 def known(self, nodes):
186 def known(self, nodes):
188 f = future()
187 f = future()
189 yield todict(nodes=encodelist(nodes)), f
188 yield todict(nodes=encodelist(nodes)), f
190 d = f.value
189 d = f.value
191 try:
190 try:
192 yield [bool(int(f)) for f in d]
191 yield [bool(int(f)) for f in d]
193 except ValueError:
192 except ValueError:
194 self._abort(error.ResponseError(_("unexpected response:"), d))
193 self._abort(error.ResponseError(_("unexpected response:"), d))
195
194
196 @batchable
195 @batchable
197 def branchmap(self):
196 def branchmap(self):
198 f = future()
197 f = future()
199 yield {}, f
198 yield {}, f
200 d = f.value
199 d = f.value
201 try:
200 try:
202 branchmap = {}
201 branchmap = {}
203 for branchpart in d.splitlines():
202 for branchpart in d.splitlines():
204 branchname, branchheads = branchpart.split(' ', 1)
203 branchname, branchheads = branchpart.split(' ', 1)
205 branchname = encoding.tolocal(urllib.unquote(branchname))
204 branchname = encoding.tolocal(urllib.unquote(branchname))
206 branchheads = decodelist(branchheads)
205 branchheads = decodelist(branchheads)
207 branchmap[branchname] = branchheads
206 branchmap[branchname] = branchheads
208 yield branchmap
207 yield branchmap
209 except TypeError:
208 except TypeError:
210 self._abort(error.ResponseError(_("unexpected response:"), d))
209 self._abort(error.ResponseError(_("unexpected response:"), d))
211
210
212 def branches(self, nodes):
211 def branches(self, nodes):
213 n = encodelist(nodes)
212 n = encodelist(nodes)
214 d = self._call("branches", nodes=n)
213 d = self._call("branches", nodes=n)
215 try:
214 try:
216 br = [tuple(decodelist(b)) for b in d.splitlines()]
215 br = [tuple(decodelist(b)) for b in d.splitlines()]
217 return br
216 return br
218 except ValueError:
217 except ValueError:
219 self._abort(error.ResponseError(_("unexpected response:"), d))
218 self._abort(error.ResponseError(_("unexpected response:"), d))
220
219
221 def between(self, pairs):
220 def between(self, pairs):
222 batch = 8 # avoid giant requests
221 batch = 8 # avoid giant requests
223 r = []
222 r = []
224 for i in xrange(0, len(pairs), batch):
223 for i in xrange(0, len(pairs), batch):
225 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
224 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
226 d = self._call("between", pairs=n)
225 d = self._call("between", pairs=n)
227 try:
226 try:
228 r.extend(l and decodelist(l) or [] for l in d.splitlines())
227 r.extend(l and decodelist(l) or [] for l in d.splitlines())
229 except ValueError:
228 except ValueError:
230 self._abort(error.ResponseError(_("unexpected response:"), d))
229 self._abort(error.ResponseError(_("unexpected response:"), d))
231 return r
230 return r
232
231
233 @batchable
232 @batchable
234 def pushkey(self, namespace, key, old, new):
233 def pushkey(self, namespace, key, old, new):
235 if not self.capable('pushkey'):
234 if not self.capable('pushkey'):
236 yield False, None
235 yield False, None
237 f = future()
236 f = future()
238 yield todict(namespace=encoding.fromlocal(namespace),
237 yield todict(namespace=encoding.fromlocal(namespace),
239 key=encoding.fromlocal(key),
238 key=encoding.fromlocal(key),
240 old=encoding.fromlocal(old),
239 old=encoding.fromlocal(old),
241 new=encoding.fromlocal(new)), f
240 new=encoding.fromlocal(new)), f
242 d = f.value
241 d = f.value
243 try:
242 try:
244 d = bool(int(d))
243 d = bool(int(d))
245 except ValueError:
244 except ValueError:
246 raise error.ResponseError(
245 raise error.ResponseError(
247 _('push failed (unexpected response):'), d)
246 _('push failed (unexpected response):'), d)
248 yield d
247 yield d
249
248
250 @batchable
249 @batchable
251 def listkeys(self, namespace):
250 def listkeys(self, namespace):
252 if not self.capable('pushkey'):
251 if not self.capable('pushkey'):
253 yield {}, None
252 yield {}, None
254 f = future()
253 f = future()
255 yield todict(namespace=encoding.fromlocal(namespace)), f
254 yield todict(namespace=encoding.fromlocal(namespace)), f
256 d = f.value
255 d = f.value
257 r = {}
256 r = {}
258 for l in d.splitlines():
257 for l in d.splitlines():
259 k, v = l.split('\t')
258 k, v = l.split('\t')
260 r[encoding.tolocal(k)] = encoding.tolocal(v)
259 r[encoding.tolocal(k)] = encoding.tolocal(v)
261 yield r
260 yield r
262
261
263 def stream_out(self):
262 def stream_out(self):
264 return self._callstream('stream_out')
263 return self._callstream('stream_out')
265
264
266 def changegroup(self, nodes, kind):
265 def changegroup(self, nodes, kind):
267 n = encodelist(nodes)
266 n = encodelist(nodes)
268 f = self._callstream("changegroup", roots=n)
267 f = self._callstream("changegroup", roots=n)
269 return changegroupmod.unbundle10(self._decompress(f), 'UN')
268 return changegroupmod.unbundle10(self._decompress(f), 'UN')
270
269
271 def changegroupsubset(self, bases, heads, kind):
270 def changegroupsubset(self, bases, heads, kind):
272 self.requirecap('changegroupsubset', _('look up remote changes'))
271 self.requirecap('changegroupsubset', _('look up remote changes'))
273 bases = encodelist(bases)
272 bases = encodelist(bases)
274 heads = encodelist(heads)
273 heads = encodelist(heads)
275 f = self._callstream("changegroupsubset",
274 f = self._callstream("changegroupsubset",
276 bases=bases, heads=heads)
275 bases=bases, heads=heads)
277 return changegroupmod.unbundle10(self._decompress(f), 'UN')
276 return changegroupmod.unbundle10(self._decompress(f), 'UN')
278
277
279 def getbundle(self, source, heads=None, common=None):
278 def getbundle(self, source, heads=None, common=None):
280 self.requirecap('getbundle', _('look up remote changes'))
279 self.requirecap('getbundle', _('look up remote changes'))
281 opts = {}
280 opts = {}
282 if heads is not None:
281 if heads is not None:
283 opts['heads'] = encodelist(heads)
282 opts['heads'] = encodelist(heads)
284 if common is not None:
283 if common is not None:
285 opts['common'] = encodelist(common)
284 opts['common'] = encodelist(common)
286 f = self._callstream("getbundle", **opts)
285 f = self._callstream("getbundle", **opts)
287 return changegroupmod.unbundle10(self._decompress(f), 'UN')
286 return changegroupmod.unbundle10(self._decompress(f), 'UN')
288
287
289 def unbundle(self, cg, heads, source):
288 def unbundle(self, cg, heads, source):
290 '''Send cg (a readable file-like object representing the
289 '''Send cg (a readable file-like object representing the
291 changegroup to push, typically a chunkbuffer object) to the
290 changegroup to push, typically a chunkbuffer object) to the
292 remote server as a bundle. Return an integer indicating the
291 remote server as a bundle. Return an integer indicating the
293 result of the push (see localrepository.addchangegroup()).'''
292 result of the push (see localrepository.addchangegroup()).'''
294
293
295 if heads != ['force'] and self.capable('unbundlehash'):
294 if heads != ['force'] and self.capable('unbundlehash'):
296 heads = encodelist(['hashed',
295 heads = encodelist(['hashed',
297 util.sha1(''.join(sorted(heads))).digest()])
296 util.sha1(''.join(sorted(heads))).digest()])
298 else:
297 else:
299 heads = encodelist(heads)
298 heads = encodelist(heads)
300
299
301 ret, output = self._callpush("unbundle", cg, heads=heads)
300 ret, output = self._callpush("unbundle", cg, heads=heads)
302 if ret == "":
301 if ret == "":
303 raise error.ResponseError(
302 raise error.ResponseError(
304 _('push failed:'), output)
303 _('push failed:'), output)
305 try:
304 try:
306 ret = int(ret)
305 ret = int(ret)
307 except ValueError:
306 except ValueError:
308 raise error.ResponseError(
307 raise error.ResponseError(
309 _('push failed (unexpected response):'), ret)
308 _('push failed (unexpected response):'), ret)
310
309
311 for l in output.splitlines(True):
310 for l in output.splitlines(True):
312 self.ui.status(_('remote: '), l)
311 self.ui.status(_('remote: '), l)
313 return ret
312 return ret
314
313
315 def debugwireargs(self, one, two, three=None, four=None, five=None):
314 def debugwireargs(self, one, two, three=None, four=None, five=None):
316 # don't pass optional arguments left at their default value
315 # don't pass optional arguments left at their default value
317 opts = {}
316 opts = {}
318 if three is not None:
317 if three is not None:
319 opts['three'] = three
318 opts['three'] = three
320 if four is not None:
319 if four is not None:
321 opts['four'] = four
320 opts['four'] = four
322 return self._call('debugwireargs', one=one, two=two, **opts)
321 return self._call('debugwireargs', one=one, two=two, **opts)
323
322
324 # server side
323 # server side
325
324
326 class streamres(object):
325 class streamres(object):
327 def __init__(self, gen):
326 def __init__(self, gen):
328 self.gen = gen
327 self.gen = gen
329
328
330 class pushres(object):
329 class pushres(object):
331 def __init__(self, res):
330 def __init__(self, res):
332 self.res = res
331 self.res = res
333
332
334 class pusherr(object):
333 class pusherr(object):
335 def __init__(self, res):
334 def __init__(self, res):
336 self.res = res
335 self.res = res
337
336
338 class ooberror(object):
337 class ooberror(object):
339 def __init__(self, message):
338 def __init__(self, message):
340 self.message = message
339 self.message = message
341
340
342 def dispatch(repo, proto, command):
341 def dispatch(repo, proto, command):
343 func, spec = commands[command]
342 func, spec = commands[command]
344 args = proto.getargs(spec)
343 args = proto.getargs(spec)
345 return func(repo, proto, *args)
344 return func(repo, proto, *args)
346
345
347 def options(cmd, keys, others):
346 def options(cmd, keys, others):
348 opts = {}
347 opts = {}
349 for k in keys:
348 for k in keys:
350 if k in others:
349 if k in others:
351 opts[k] = others[k]
350 opts[k] = others[k]
352 del others[k]
351 del others[k]
353 if others:
352 if others:
354 sys.stderr.write("abort: %s got unexpected arguments %s\n"
353 sys.stderr.write("abort: %s got unexpected arguments %s\n"
355 % (cmd, ",".join(others)))
354 % (cmd, ",".join(others)))
356 return opts
355 return opts
357
356
358 def batch(repo, proto, cmds, others):
357 def batch(repo, proto, cmds, others):
359 res = []
358 res = []
360 for pair in cmds.split(';'):
359 for pair in cmds.split(';'):
361 op, args = pair.split(' ', 1)
360 op, args = pair.split(' ', 1)
362 vals = {}
361 vals = {}
363 for a in args.split(','):
362 for a in args.split(','):
364 if a:
363 if a:
365 n, v = a.split('=')
364 n, v = a.split('=')
366 vals[n] = unescapearg(v)
365 vals[n] = unescapearg(v)
367 func, spec = commands[op]
366 func, spec = commands[op]
368 if spec:
367 if spec:
369 keys = spec.split()
368 keys = spec.split()
370 data = {}
369 data = {}
371 for k in keys:
370 for k in keys:
372 if k == '*':
371 if k == '*':
373 star = {}
372 star = {}
374 for key in vals.keys():
373 for key in vals.keys():
375 if key not in keys:
374 if key not in keys:
376 star[key] = vals[key]
375 star[key] = vals[key]
377 data['*'] = star
376 data['*'] = star
378 else:
377 else:
379 data[k] = vals[k]
378 data[k] = vals[k]
380 result = func(repo, proto, *[data[k] for k in keys])
379 result = func(repo, proto, *[data[k] for k in keys])
381 else:
380 else:
382 result = func(repo, proto)
381 result = func(repo, proto)
383 if isinstance(result, ooberror):
382 if isinstance(result, ooberror):
384 return result
383 return result
385 res.append(escapearg(result))
384 res.append(escapearg(result))
386 return ';'.join(res)
385 return ';'.join(res)
387
386
388 def between(repo, proto, pairs):
387 def between(repo, proto, pairs):
389 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
388 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
390 r = []
389 r = []
391 for b in repo.between(pairs):
390 for b in repo.between(pairs):
392 r.append(encodelist(b) + "\n")
391 r.append(encodelist(b) + "\n")
393 return "".join(r)
392 return "".join(r)
394
393
395 def branchmap(repo, proto):
394 def branchmap(repo, proto):
396 branchmap = repo.branchmap()
395 branchmap = repo.branchmap()
397 heads = []
396 heads = []
398 for branch, nodes in branchmap.iteritems():
397 for branch, nodes in branchmap.iteritems():
399 branchname = urllib.quote(encoding.fromlocal(branch))
398 branchname = urllib.quote(encoding.fromlocal(branch))
400 branchnodes = encodelist(nodes)
399 branchnodes = encodelist(nodes)
401 heads.append('%s %s' % (branchname, branchnodes))
400 heads.append('%s %s' % (branchname, branchnodes))
402 return '\n'.join(heads)
401 return '\n'.join(heads)
403
402
404 def branches(repo, proto, nodes):
403 def branches(repo, proto, nodes):
405 nodes = decodelist(nodes)
404 nodes = decodelist(nodes)
406 r = []
405 r = []
407 for b in repo.branches(nodes):
406 for b in repo.branches(nodes):
408 r.append(encodelist(b) + "\n")
407 r.append(encodelist(b) + "\n")
409 return "".join(r)
408 return "".join(r)
410
409
411 def capabilities(repo, proto):
410 def capabilities(repo, proto):
412 caps = ('lookup changegroupsubset branchmap pushkey known getbundle '
411 caps = ('lookup changegroupsubset branchmap pushkey known getbundle '
413 'unbundlehash batch').split()
412 'unbundlehash batch').split()
414 if _allowstream(repo.ui):
413 if _allowstream(repo.ui):
415 requiredformats = repo.requirements & repo.supportedformats
414 requiredformats = repo.requirements & repo.supportedformats
416 # if our local revlogs are just revlogv1, add 'stream' cap
415 # if our local revlogs are just revlogv1, add 'stream' cap
417 if not requiredformats - set(('revlogv1',)):
416 if not requiredformats - set(('revlogv1',)):
418 caps.append('stream')
417 caps.append('stream')
419 # otherwise, add 'streamreqs' detailing our local revlog format
418 # otherwise, add 'streamreqs' detailing our local revlog format
420 else:
419 else:
421 caps.append('streamreqs=%s' % ','.join(requiredformats))
420 caps.append('streamreqs=%s' % ','.join(requiredformats))
422 caps.append('unbundle=%s' % ','.join(changegroupmod.bundlepriority))
421 caps.append('unbundle=%s' % ','.join(changegroupmod.bundlepriority))
423 caps.append('httpheader=1024')
422 caps.append('httpheader=1024')
424 return ' '.join(caps)
423 return ' '.join(caps)
425
424
426 def changegroup(repo, proto, roots):
425 def changegroup(repo, proto, roots):
427 nodes = decodelist(roots)
426 nodes = decodelist(roots)
428 cg = repo.changegroup(nodes, 'serve')
427 cg = repo.changegroup(nodes, 'serve')
429 return streamres(proto.groupchunks(cg))
428 return streamres(proto.groupchunks(cg))
430
429
431 def changegroupsubset(repo, proto, bases, heads):
430 def changegroupsubset(repo, proto, bases, heads):
432 bases = decodelist(bases)
431 bases = decodelist(bases)
433 heads = decodelist(heads)
432 heads = decodelist(heads)
434 cg = repo.changegroupsubset(bases, heads, 'serve')
433 cg = repo.changegroupsubset(bases, heads, 'serve')
435 return streamres(proto.groupchunks(cg))
434 return streamres(proto.groupchunks(cg))
436
435
437 def debugwireargs(repo, proto, one, two, others):
436 def debugwireargs(repo, proto, one, two, others):
438 # only accept optional args from the known set
437 # only accept optional args from the known set
439 opts = options('debugwireargs', ['three', 'four'], others)
438 opts = options('debugwireargs', ['three', 'four'], others)
440 return repo.debugwireargs(one, two, **opts)
439 return repo.debugwireargs(one, two, **opts)
441
440
442 def getbundle(repo, proto, others):
441 def getbundle(repo, proto, others):
443 opts = options('getbundle', ['heads', 'common'], others)
442 opts = options('getbundle', ['heads', 'common'], others)
444 for k, v in opts.iteritems():
443 for k, v in opts.iteritems():
445 opts[k] = decodelist(v)
444 opts[k] = decodelist(v)
446 cg = repo.getbundle('serve', **opts)
445 cg = repo.getbundle('serve', **opts)
447 return streamres(proto.groupchunks(cg))
446 return streamres(proto.groupchunks(cg))
448
447
449 def heads(repo, proto):
448 def heads(repo, proto):
450 h = repo.heads()
449 h = repo.heads()
451 return encodelist(h) + "\n"
450 return encodelist(h) + "\n"
452
451
453 def hello(repo, proto):
452 def hello(repo, proto):
454 '''the hello command returns a set of lines describing various
453 '''the hello command returns a set of lines describing various
455 interesting things about the server, in an RFC822-like format.
454 interesting things about the server, in an RFC822-like format.
456 Currently the only one defined is "capabilities", which
455 Currently the only one defined is "capabilities", which
457 consists of a line in the form:
456 consists of a line in the form:
458
457
459 capabilities: space separated list of tokens
458 capabilities: space separated list of tokens
460 '''
459 '''
461 return "capabilities: %s\n" % (capabilities(repo, proto))
460 return "capabilities: %s\n" % (capabilities(repo, proto))
462
461
463 def listkeys(repo, proto, namespace):
462 def listkeys(repo, proto, namespace):
464 d = pushkeymod.list(repo, encoding.tolocal(namespace)).items()
463 d = repo.listkeys(encoding.tolocal(namespace)).items()
465 t = '\n'.join(['%s\t%s' % (encoding.fromlocal(k), encoding.fromlocal(v))
464 t = '\n'.join(['%s\t%s' % (encoding.fromlocal(k), encoding.fromlocal(v))
466 for k, v in d])
465 for k, v in d])
467 return t
466 return t
468
467
469 def lookup(repo, proto, key):
468 def lookup(repo, proto, key):
470 try:
469 try:
471 r = hex(repo.lookup(encoding.tolocal(key)))
470 r = hex(repo.lookup(encoding.tolocal(key)))
472 success = 1
471 success = 1
473 except Exception, inst:
472 except Exception, inst:
474 r = str(inst)
473 r = str(inst)
475 success = 0
474 success = 0
476 return "%s %s\n" % (success, r)
475 return "%s %s\n" % (success, r)
477
476
478 def known(repo, proto, nodes, others):
477 def known(repo, proto, nodes, others):
479 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
478 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
480
479
481 def pushkey(repo, proto, namespace, key, old, new):
480 def pushkey(repo, proto, namespace, key, old, new):
482 # compatibility with pre-1.8 clients which were accidentally
481 # compatibility with pre-1.8 clients which were accidentally
483 # sending raw binary nodes rather than utf-8-encoded hex
482 # sending raw binary nodes rather than utf-8-encoded hex
484 if len(new) == 20 and new.encode('string-escape') != new:
483 if len(new) == 20 and new.encode('string-escape') != new:
485 # looks like it could be a binary node
484 # looks like it could be a binary node
486 try:
485 try:
487 new.decode('utf-8')
486 new.decode('utf-8')
488 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
487 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
489 except UnicodeDecodeError:
488 except UnicodeDecodeError:
490 pass # binary, leave unmodified
489 pass # binary, leave unmodified
491 else:
490 else:
492 new = encoding.tolocal(new) # normal path
491 new = encoding.tolocal(new) # normal path
493
492
494 r = pushkeymod.push(repo,
493 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
495 encoding.tolocal(namespace), encoding.tolocal(key),
494 encoding.tolocal(old), new)
496 encoding.tolocal(old), new)
497 return '%s\n' % int(r)
495 return '%s\n' % int(r)
498
496
499 def _allowstream(ui):
497 def _allowstream(ui):
500 return ui.configbool('server', 'uncompressed', True, untrusted=True)
498 return ui.configbool('server', 'uncompressed', True, untrusted=True)
501
499
502 def stream(repo, proto):
500 def stream(repo, proto):
503 '''If the server supports streaming clone, it advertises the "stream"
501 '''If the server supports streaming clone, it advertises the "stream"
504 capability with a value representing the version and flags of the repo
502 capability with a value representing the version and flags of the repo
505 it is serving. Client checks to see if it understands the format.
503 it is serving. Client checks to see if it understands the format.
506
504
507 The format is simple: the server writes out a line with the amount
505 The format is simple: the server writes out a line with the amount
508 of files, then the total amount of bytes to be transfered (separated
506 of files, then the total amount of bytes to be transfered (separated
509 by a space). Then, for each file, the server first writes the filename
507 by a space). Then, for each file, the server first writes the filename
510 and filesize (separated by the null character), then the file contents.
508 and filesize (separated by the null character), then the file contents.
511 '''
509 '''
512
510
513 if not _allowstream(repo.ui):
511 if not _allowstream(repo.ui):
514 return '1\n'
512 return '1\n'
515
513
516 entries = []
514 entries = []
517 total_bytes = 0
515 total_bytes = 0
518 try:
516 try:
519 # get consistent snapshot of repo, lock during scan
517 # get consistent snapshot of repo, lock during scan
520 lock = repo.lock()
518 lock = repo.lock()
521 try:
519 try:
522 repo.ui.debug('scanning\n')
520 repo.ui.debug('scanning\n')
523 for name, ename, size in repo.store.walk():
521 for name, ename, size in repo.store.walk():
524 entries.append((name, size))
522 entries.append((name, size))
525 total_bytes += size
523 total_bytes += size
526 finally:
524 finally:
527 lock.release()
525 lock.release()
528 except error.LockError:
526 except error.LockError:
529 return '2\n' # error: 2
527 return '2\n' # error: 2
530
528
531 def streamer(repo, entries, total):
529 def streamer(repo, entries, total):
532 '''stream out all metadata files in repository.'''
530 '''stream out all metadata files in repository.'''
533 yield '0\n' # success
531 yield '0\n' # success
534 repo.ui.debug('%d files, %d bytes to transfer\n' %
532 repo.ui.debug('%d files, %d bytes to transfer\n' %
535 (len(entries), total_bytes))
533 (len(entries), total_bytes))
536 yield '%d %d\n' % (len(entries), total_bytes)
534 yield '%d %d\n' % (len(entries), total_bytes)
537 for name, size in entries:
535 for name, size in entries:
538 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
536 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
539 # partially encode name over the wire for backwards compat
537 # partially encode name over the wire for backwards compat
540 yield '%s\0%d\n' % (store.encodedir(name), size)
538 yield '%s\0%d\n' % (store.encodedir(name), size)
541 for chunk in util.filechunkiter(repo.sopener(name), limit=size):
539 for chunk in util.filechunkiter(repo.sopener(name), limit=size):
542 yield chunk
540 yield chunk
543
541
544 return streamres(streamer(repo, entries, total_bytes))
542 return streamres(streamer(repo, entries, total_bytes))
545
543
546 def unbundle(repo, proto, heads):
544 def unbundle(repo, proto, heads):
547 their_heads = decodelist(heads)
545 their_heads = decodelist(heads)
548
546
549 def check_heads():
547 def check_heads():
550 heads = repo.heads()
548 heads = repo.heads()
551 heads_hash = util.sha1(''.join(sorted(heads))).digest()
549 heads_hash = util.sha1(''.join(sorted(heads))).digest()
552 return (their_heads == ['force'] or their_heads == heads or
550 return (their_heads == ['force'] or their_heads == heads or
553 their_heads == ['hashed', heads_hash])
551 their_heads == ['hashed', heads_hash])
554
552
555 proto.redirect()
553 proto.redirect()
556
554
557 # fail early if possible
555 # fail early if possible
558 if not check_heads():
556 if not check_heads():
559 return pusherr('unsynced changes')
557 return pusherr('unsynced changes')
560
558
561 # write bundle data to temporary file because it can be big
559 # write bundle data to temporary file because it can be big
562 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
560 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
563 fp = os.fdopen(fd, 'wb+')
561 fp = os.fdopen(fd, 'wb+')
564 r = 0
562 r = 0
565 try:
563 try:
566 proto.getfile(fp)
564 proto.getfile(fp)
567 lock = repo.lock()
565 lock = repo.lock()
568 try:
566 try:
569 if not check_heads():
567 if not check_heads():
570 # someone else committed/pushed/unbundled while we
568 # someone else committed/pushed/unbundled while we
571 # were transferring data
569 # were transferring data
572 return pusherr('unsynced changes')
570 return pusherr('unsynced changes')
573
571
574 # push can proceed
572 # push can proceed
575 fp.seek(0)
573 fp.seek(0)
576 gen = changegroupmod.readbundle(fp, None)
574 gen = changegroupmod.readbundle(fp, None)
577
575
578 try:
576 try:
579 r = repo.addchangegroup(gen, 'serve', proto._client(),
577 r = repo.addchangegroup(gen, 'serve', proto._client(),
580 lock=lock)
578 lock=lock)
581 except util.Abort, inst:
579 except util.Abort, inst:
582 sys.stderr.write("abort: %s\n" % inst)
580 sys.stderr.write("abort: %s\n" % inst)
583 finally:
581 finally:
584 lock.release()
582 lock.release()
585 return pushres(r)
583 return pushres(r)
586
584
587 finally:
585 finally:
588 fp.close()
586 fp.close()
589 os.unlink(tempname)
587 os.unlink(tempname)
590
588
591 commands = {
589 commands = {
592 'batch': (batch, 'cmds *'),
590 'batch': (batch, 'cmds *'),
593 'between': (between, 'pairs'),
591 'between': (between, 'pairs'),
594 'branchmap': (branchmap, ''),
592 'branchmap': (branchmap, ''),
595 'branches': (branches, 'nodes'),
593 'branches': (branches, 'nodes'),
596 'capabilities': (capabilities, ''),
594 'capabilities': (capabilities, ''),
597 'changegroup': (changegroup, 'roots'),
595 'changegroup': (changegroup, 'roots'),
598 'changegroupsubset': (changegroupsubset, 'bases heads'),
596 'changegroupsubset': (changegroupsubset, 'bases heads'),
599 'debugwireargs': (debugwireargs, 'one two *'),
597 'debugwireargs': (debugwireargs, 'one two *'),
600 'getbundle': (getbundle, '*'),
598 'getbundle': (getbundle, '*'),
601 'heads': (heads, ''),
599 'heads': (heads, ''),
602 'hello': (hello, ''),
600 'hello': (hello, ''),
603 'known': (known, 'nodes *'),
601 'known': (known, 'nodes *'),
604 'listkeys': (listkeys, 'namespace'),
602 'listkeys': (listkeys, 'namespace'),
605 'lookup': (lookup, 'key'),
603 'lookup': (lookup, 'key'),
606 'pushkey': (pushkey, 'namespace key old new'),
604 'pushkey': (pushkey, 'namespace key old new'),
607 'stream_out': (stream, ''),
605 'stream_out': (stream, ''),
608 'unbundle': (unbundle, 'heads'),
606 'unbundle': (unbundle, 'heads'),
609 }
607 }
General Comments 0
You need to be logged in to leave comments. Login now