##// END OF EJS Templates
clone: move file stream walk to a separate function...
Durham Goode -
r19176:aae14b3d default
parent child Browse files
Show More
@@ -1,656 +1,660 b''
1 # wireproto.py - generic wire protocol support functions
1 # wireproto.py - generic wire protocol support functions
2 #
2 #
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 import urllib, tempfile, os, sys
8 import urllib, tempfile, os, sys
9 from i18n import _
9 from i18n import _
10 from node import bin, hex
10 from node import bin, hex
11 import changegroup as changegroupmod
11 import changegroup as changegroupmod
12 import peer, error, encoding, util, store
12 import peer, error, encoding, util, store
13
13
14 # abstract batching support
14 # abstract batching support
15
15
16 class future(object):
16 class future(object):
17 '''placeholder for a value to be set later'''
17 '''placeholder for a value to be set later'''
18 def set(self, value):
18 def set(self, value):
19 if util.safehasattr(self, 'value'):
19 if util.safehasattr(self, 'value'):
20 raise error.RepoError("future is already set")
20 raise error.RepoError("future is already set")
21 self.value = value
21 self.value = value
22
22
23 class batcher(object):
23 class batcher(object):
24 '''base class for batches of commands submittable in a single request
24 '''base class for batches of commands submittable in a single request
25
25
26 All methods invoked on instances of this class are simply queued and
26 All methods invoked on instances of this class are simply queued and
27 return a a future for the result. Once you call submit(), all the queued
27 return a a future for the result. Once you call submit(), all the queued
28 calls are performed and the results set in their respective futures.
28 calls are performed and the results set in their respective futures.
29 '''
29 '''
30 def __init__(self):
30 def __init__(self):
31 self.calls = []
31 self.calls = []
32 def __getattr__(self, name):
32 def __getattr__(self, name):
33 def call(*args, **opts):
33 def call(*args, **opts):
34 resref = future()
34 resref = future()
35 self.calls.append((name, args, opts, resref,))
35 self.calls.append((name, args, opts, resref,))
36 return resref
36 return resref
37 return call
37 return call
38 def submit(self):
38 def submit(self):
39 pass
39 pass
40
40
41 class localbatch(batcher):
41 class localbatch(batcher):
42 '''performs the queued calls directly'''
42 '''performs the queued calls directly'''
43 def __init__(self, local):
43 def __init__(self, local):
44 batcher.__init__(self)
44 batcher.__init__(self)
45 self.local = local
45 self.local = local
46 def submit(self):
46 def submit(self):
47 for name, args, opts, resref in self.calls:
47 for name, args, opts, resref in self.calls:
48 resref.set(getattr(self.local, name)(*args, **opts))
48 resref.set(getattr(self.local, name)(*args, **opts))
49
49
50 class remotebatch(batcher):
50 class remotebatch(batcher):
51 '''batches the queued calls; uses as few roundtrips as possible'''
51 '''batches the queued calls; uses as few roundtrips as possible'''
52 def __init__(self, remote):
52 def __init__(self, remote):
53 '''remote must support _submitbatch(encbatch) and
53 '''remote must support _submitbatch(encbatch) and
54 _submitone(op, encargs)'''
54 _submitone(op, encargs)'''
55 batcher.__init__(self)
55 batcher.__init__(self)
56 self.remote = remote
56 self.remote = remote
57 def submit(self):
57 def submit(self):
58 req, rsp = [], []
58 req, rsp = [], []
59 for name, args, opts, resref in self.calls:
59 for name, args, opts, resref in self.calls:
60 mtd = getattr(self.remote, name)
60 mtd = getattr(self.remote, name)
61 batchablefn = getattr(mtd, 'batchable', None)
61 batchablefn = getattr(mtd, 'batchable', None)
62 if batchablefn is not None:
62 if batchablefn is not None:
63 batchable = batchablefn(mtd.im_self, *args, **opts)
63 batchable = batchablefn(mtd.im_self, *args, **opts)
64 encargsorres, encresref = batchable.next()
64 encargsorres, encresref = batchable.next()
65 if encresref:
65 if encresref:
66 req.append((name, encargsorres,))
66 req.append((name, encargsorres,))
67 rsp.append((batchable, encresref, resref,))
67 rsp.append((batchable, encresref, resref,))
68 else:
68 else:
69 resref.set(encargsorres)
69 resref.set(encargsorres)
70 else:
70 else:
71 if req:
71 if req:
72 self._submitreq(req, rsp)
72 self._submitreq(req, rsp)
73 req, rsp = [], []
73 req, rsp = [], []
74 resref.set(mtd(*args, **opts))
74 resref.set(mtd(*args, **opts))
75 if req:
75 if req:
76 self._submitreq(req, rsp)
76 self._submitreq(req, rsp)
77 def _submitreq(self, req, rsp):
77 def _submitreq(self, req, rsp):
78 encresults = self.remote._submitbatch(req)
78 encresults = self.remote._submitbatch(req)
79 for encres, r in zip(encresults, rsp):
79 for encres, r in zip(encresults, rsp):
80 batchable, encresref, resref = r
80 batchable, encresref, resref = r
81 encresref.set(encres)
81 encresref.set(encres)
82 resref.set(batchable.next())
82 resref.set(batchable.next())
83
83
84 def batchable(f):
84 def batchable(f):
85 '''annotation for batchable methods
85 '''annotation for batchable methods
86
86
87 Such methods must implement a coroutine as follows:
87 Such methods must implement a coroutine as follows:
88
88
89 @batchable
89 @batchable
90 def sample(self, one, two=None):
90 def sample(self, one, two=None):
91 # Handle locally computable results first:
91 # Handle locally computable results first:
92 if not one:
92 if not one:
93 yield "a local result", None
93 yield "a local result", None
94 # Build list of encoded arguments suitable for your wire protocol:
94 # Build list of encoded arguments suitable for your wire protocol:
95 encargs = [('one', encode(one),), ('two', encode(two),)]
95 encargs = [('one', encode(one),), ('two', encode(two),)]
96 # Create future for injection of encoded result:
96 # Create future for injection of encoded result:
97 encresref = future()
97 encresref = future()
98 # Return encoded arguments and future:
98 # Return encoded arguments and future:
99 yield encargs, encresref
99 yield encargs, encresref
100 # Assuming the future to be filled with the result from the batched
100 # Assuming the future to be filled with the result from the batched
101 # request now. Decode it:
101 # request now. Decode it:
102 yield decode(encresref.value)
102 yield decode(encresref.value)
103
103
104 The decorator returns a function which wraps this coroutine as a plain
104 The decorator returns a function which wraps this coroutine as a plain
105 method, but adds the original method as an attribute called "batchable",
105 method, but adds the original method as an attribute called "batchable",
106 which is used by remotebatch to split the call into separate encoding and
106 which is used by remotebatch to split the call into separate encoding and
107 decoding phases.
107 decoding phases.
108 '''
108 '''
109 def plain(*args, **opts):
109 def plain(*args, **opts):
110 batchable = f(*args, **opts)
110 batchable = f(*args, **opts)
111 encargsorres, encresref = batchable.next()
111 encargsorres, encresref = batchable.next()
112 if not encresref:
112 if not encresref:
113 return encargsorres # a local result in this case
113 return encargsorres # a local result in this case
114 self = args[0]
114 self = args[0]
115 encresref.set(self._submitone(f.func_name, encargsorres))
115 encresref.set(self._submitone(f.func_name, encargsorres))
116 return batchable.next()
116 return batchable.next()
117 setattr(plain, 'batchable', f)
117 setattr(plain, 'batchable', f)
118 return plain
118 return plain
119
119
120 # list of nodes encoding / decoding
120 # list of nodes encoding / decoding
121
121
122 def decodelist(l, sep=' '):
122 def decodelist(l, sep=' '):
123 if l:
123 if l:
124 return map(bin, l.split(sep))
124 return map(bin, l.split(sep))
125 return []
125 return []
126
126
127 def encodelist(l, sep=' '):
127 def encodelist(l, sep=' '):
128 return sep.join(map(hex, l))
128 return sep.join(map(hex, l))
129
129
130 # batched call argument encoding
130 # batched call argument encoding
131
131
132 def escapearg(plain):
132 def escapearg(plain):
133 return (plain
133 return (plain
134 .replace(':', '::')
134 .replace(':', '::')
135 .replace(',', ':,')
135 .replace(',', ':,')
136 .replace(';', ':;')
136 .replace(';', ':;')
137 .replace('=', ':='))
137 .replace('=', ':='))
138
138
139 def unescapearg(escaped):
139 def unescapearg(escaped):
140 return (escaped
140 return (escaped
141 .replace(':=', '=')
141 .replace(':=', '=')
142 .replace(':;', ';')
142 .replace(':;', ';')
143 .replace(':,', ',')
143 .replace(':,', ',')
144 .replace('::', ':'))
144 .replace('::', ':'))
145
145
146 # client side
146 # client side
147
147
148 def todict(**args):
148 def todict(**args):
149 return args
149 return args
150
150
151 class wirepeer(peer.peerrepository):
151 class wirepeer(peer.peerrepository):
152
152
153 def batch(self):
153 def batch(self):
154 return remotebatch(self)
154 return remotebatch(self)
155 def _submitbatch(self, req):
155 def _submitbatch(self, req):
156 cmds = []
156 cmds = []
157 for op, argsdict in req:
157 for op, argsdict in req:
158 args = ','.join('%s=%s' % p for p in argsdict.iteritems())
158 args = ','.join('%s=%s' % p for p in argsdict.iteritems())
159 cmds.append('%s %s' % (op, args))
159 cmds.append('%s %s' % (op, args))
160 rsp = self._call("batch", cmds=';'.join(cmds))
160 rsp = self._call("batch", cmds=';'.join(cmds))
161 return rsp.split(';')
161 return rsp.split(';')
162 def _submitone(self, op, args):
162 def _submitone(self, op, args):
163 return self._call(op, **args)
163 return self._call(op, **args)
164
164
165 @batchable
165 @batchable
166 def lookup(self, key):
166 def lookup(self, key):
167 self.requirecap('lookup', _('look up remote revision'))
167 self.requirecap('lookup', _('look up remote revision'))
168 f = future()
168 f = future()
169 yield todict(key=encoding.fromlocal(key)), f
169 yield todict(key=encoding.fromlocal(key)), f
170 d = f.value
170 d = f.value
171 success, data = d[:-1].split(" ", 1)
171 success, data = d[:-1].split(" ", 1)
172 if int(success):
172 if int(success):
173 yield bin(data)
173 yield bin(data)
174 self._abort(error.RepoError(data))
174 self._abort(error.RepoError(data))
175
175
176 @batchable
176 @batchable
177 def heads(self):
177 def heads(self):
178 f = future()
178 f = future()
179 yield {}, f
179 yield {}, f
180 d = f.value
180 d = f.value
181 try:
181 try:
182 yield decodelist(d[:-1])
182 yield decodelist(d[:-1])
183 except ValueError:
183 except ValueError:
184 self._abort(error.ResponseError(_("unexpected response:"), d))
184 self._abort(error.ResponseError(_("unexpected response:"), d))
185
185
186 @batchable
186 @batchable
187 def known(self, nodes):
187 def known(self, nodes):
188 f = future()
188 f = future()
189 yield todict(nodes=encodelist(nodes)), f
189 yield todict(nodes=encodelist(nodes)), f
190 d = f.value
190 d = f.value
191 try:
191 try:
192 yield [bool(int(f)) for f in d]
192 yield [bool(int(f)) for f in d]
193 except ValueError:
193 except ValueError:
194 self._abort(error.ResponseError(_("unexpected response:"), d))
194 self._abort(error.ResponseError(_("unexpected response:"), d))
195
195
196 @batchable
196 @batchable
197 def branchmap(self):
197 def branchmap(self):
198 f = future()
198 f = future()
199 yield {}, f
199 yield {}, f
200 d = f.value
200 d = f.value
201 try:
201 try:
202 branchmap = {}
202 branchmap = {}
203 for branchpart in d.splitlines():
203 for branchpart in d.splitlines():
204 branchname, branchheads = branchpart.split(' ', 1)
204 branchname, branchheads = branchpart.split(' ', 1)
205 branchname = encoding.tolocal(urllib.unquote(branchname))
205 branchname = encoding.tolocal(urllib.unquote(branchname))
206 branchheads = decodelist(branchheads)
206 branchheads = decodelist(branchheads)
207 branchmap[branchname] = branchheads
207 branchmap[branchname] = branchheads
208 yield branchmap
208 yield branchmap
209 except TypeError:
209 except TypeError:
210 self._abort(error.ResponseError(_("unexpected response:"), d))
210 self._abort(error.ResponseError(_("unexpected response:"), d))
211
211
212 def branches(self, nodes):
212 def branches(self, nodes):
213 n = encodelist(nodes)
213 n = encodelist(nodes)
214 d = self._call("branches", nodes=n)
214 d = self._call("branches", nodes=n)
215 try:
215 try:
216 br = [tuple(decodelist(b)) for b in d.splitlines()]
216 br = [tuple(decodelist(b)) for b in d.splitlines()]
217 return br
217 return br
218 except ValueError:
218 except ValueError:
219 self._abort(error.ResponseError(_("unexpected response:"), d))
219 self._abort(error.ResponseError(_("unexpected response:"), d))
220
220
221 def between(self, pairs):
221 def between(self, pairs):
222 batch = 8 # avoid giant requests
222 batch = 8 # avoid giant requests
223 r = []
223 r = []
224 for i in xrange(0, len(pairs), batch):
224 for i in xrange(0, len(pairs), batch):
225 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
225 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
226 d = self._call("between", pairs=n)
226 d = self._call("between", pairs=n)
227 try:
227 try:
228 r.extend(l and decodelist(l) or [] for l in d.splitlines())
228 r.extend(l and decodelist(l) or [] for l in d.splitlines())
229 except ValueError:
229 except ValueError:
230 self._abort(error.ResponseError(_("unexpected response:"), d))
230 self._abort(error.ResponseError(_("unexpected response:"), d))
231 return r
231 return r
232
232
233 @batchable
233 @batchable
234 def pushkey(self, namespace, key, old, new):
234 def pushkey(self, namespace, key, old, new):
235 if not self.capable('pushkey'):
235 if not self.capable('pushkey'):
236 yield False, None
236 yield False, None
237 f = future()
237 f = future()
238 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
238 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
239 yield todict(namespace=encoding.fromlocal(namespace),
239 yield todict(namespace=encoding.fromlocal(namespace),
240 key=encoding.fromlocal(key),
240 key=encoding.fromlocal(key),
241 old=encoding.fromlocal(old),
241 old=encoding.fromlocal(old),
242 new=encoding.fromlocal(new)), f
242 new=encoding.fromlocal(new)), f
243 d = f.value
243 d = f.value
244 d, output = d.split('\n', 1)
244 d, output = d.split('\n', 1)
245 try:
245 try:
246 d = bool(int(d))
246 d = bool(int(d))
247 except ValueError:
247 except ValueError:
248 raise error.ResponseError(
248 raise error.ResponseError(
249 _('push failed (unexpected response):'), d)
249 _('push failed (unexpected response):'), d)
250 for l in output.splitlines(True):
250 for l in output.splitlines(True):
251 self.ui.status(_('remote: '), l)
251 self.ui.status(_('remote: '), l)
252 yield d
252 yield d
253
253
254 @batchable
254 @batchable
255 def listkeys(self, namespace):
255 def listkeys(self, namespace):
256 if not self.capable('pushkey'):
256 if not self.capable('pushkey'):
257 yield {}, None
257 yield {}, None
258 f = future()
258 f = future()
259 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
259 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
260 yield todict(namespace=encoding.fromlocal(namespace)), f
260 yield todict(namespace=encoding.fromlocal(namespace)), f
261 d = f.value
261 d = f.value
262 r = {}
262 r = {}
263 for l in d.splitlines():
263 for l in d.splitlines():
264 k, v = l.split('\t')
264 k, v = l.split('\t')
265 r[encoding.tolocal(k)] = encoding.tolocal(v)
265 r[encoding.tolocal(k)] = encoding.tolocal(v)
266 yield r
266 yield r
267
267
268 def stream_out(self):
268 def stream_out(self):
269 return self._callstream('stream_out')
269 return self._callstream('stream_out')
270
270
271 def changegroup(self, nodes, kind):
271 def changegroup(self, nodes, kind):
272 n = encodelist(nodes)
272 n = encodelist(nodes)
273 f = self._callstream("changegroup", roots=n)
273 f = self._callstream("changegroup", roots=n)
274 return changegroupmod.unbundle10(self._decompress(f), 'UN')
274 return changegroupmod.unbundle10(self._decompress(f), 'UN')
275
275
276 def changegroupsubset(self, bases, heads, kind):
276 def changegroupsubset(self, bases, heads, kind):
277 self.requirecap('changegroupsubset', _('look up remote changes'))
277 self.requirecap('changegroupsubset', _('look up remote changes'))
278 bases = encodelist(bases)
278 bases = encodelist(bases)
279 heads = encodelist(heads)
279 heads = encodelist(heads)
280 f = self._callstream("changegroupsubset",
280 f = self._callstream("changegroupsubset",
281 bases=bases, heads=heads)
281 bases=bases, heads=heads)
282 return changegroupmod.unbundle10(self._decompress(f), 'UN')
282 return changegroupmod.unbundle10(self._decompress(f), 'UN')
283
283
284 def getbundle(self, source, heads=None, common=None):
284 def getbundle(self, source, heads=None, common=None):
285 self.requirecap('getbundle', _('look up remote changes'))
285 self.requirecap('getbundle', _('look up remote changes'))
286 opts = {}
286 opts = {}
287 if heads is not None:
287 if heads is not None:
288 opts['heads'] = encodelist(heads)
288 opts['heads'] = encodelist(heads)
289 if common is not None:
289 if common is not None:
290 opts['common'] = encodelist(common)
290 opts['common'] = encodelist(common)
291 f = self._callstream("getbundle", **opts)
291 f = self._callstream("getbundle", **opts)
292 return changegroupmod.unbundle10(self._decompress(f), 'UN')
292 return changegroupmod.unbundle10(self._decompress(f), 'UN')
293
293
294 def unbundle(self, cg, heads, source):
294 def unbundle(self, cg, heads, source):
295 '''Send cg (a readable file-like object representing the
295 '''Send cg (a readable file-like object representing the
296 changegroup to push, typically a chunkbuffer object) to the
296 changegroup to push, typically a chunkbuffer object) to the
297 remote server as a bundle. Return an integer indicating the
297 remote server as a bundle. Return an integer indicating the
298 result of the push (see localrepository.addchangegroup()).'''
298 result of the push (see localrepository.addchangegroup()).'''
299
299
300 if heads != ['force'] and self.capable('unbundlehash'):
300 if heads != ['force'] and self.capable('unbundlehash'):
301 heads = encodelist(['hashed',
301 heads = encodelist(['hashed',
302 util.sha1(''.join(sorted(heads))).digest()])
302 util.sha1(''.join(sorted(heads))).digest()])
303 else:
303 else:
304 heads = encodelist(heads)
304 heads = encodelist(heads)
305
305
306 ret, output = self._callpush("unbundle", cg, heads=heads)
306 ret, output = self._callpush("unbundle", cg, heads=heads)
307 if ret == "":
307 if ret == "":
308 raise error.ResponseError(
308 raise error.ResponseError(
309 _('push failed:'), output)
309 _('push failed:'), output)
310 try:
310 try:
311 ret = int(ret)
311 ret = int(ret)
312 except ValueError:
312 except ValueError:
313 raise error.ResponseError(
313 raise error.ResponseError(
314 _('push failed (unexpected response):'), ret)
314 _('push failed (unexpected response):'), ret)
315
315
316 for l in output.splitlines(True):
316 for l in output.splitlines(True):
317 self.ui.status(_('remote: '), l)
317 self.ui.status(_('remote: '), l)
318 return ret
318 return ret
319
319
320 def debugwireargs(self, one, two, three=None, four=None, five=None):
320 def debugwireargs(self, one, two, three=None, four=None, five=None):
321 # don't pass optional arguments left at their default value
321 # don't pass optional arguments left at their default value
322 opts = {}
322 opts = {}
323 if three is not None:
323 if three is not None:
324 opts['three'] = three
324 opts['three'] = three
325 if four is not None:
325 if four is not None:
326 opts['four'] = four
326 opts['four'] = four
327 return self._call('debugwireargs', one=one, two=two, **opts)
327 return self._call('debugwireargs', one=one, two=two, **opts)
328
328
329 # server side
329 # server side
330
330
331 class streamres(object):
331 class streamres(object):
332 def __init__(self, gen):
332 def __init__(self, gen):
333 self.gen = gen
333 self.gen = gen
334
334
335 class pushres(object):
335 class pushres(object):
336 def __init__(self, res):
336 def __init__(self, res):
337 self.res = res
337 self.res = res
338
338
339 class pusherr(object):
339 class pusherr(object):
340 def __init__(self, res):
340 def __init__(self, res):
341 self.res = res
341 self.res = res
342
342
343 class ooberror(object):
343 class ooberror(object):
344 def __init__(self, message):
344 def __init__(self, message):
345 self.message = message
345 self.message = message
346
346
347 def dispatch(repo, proto, command):
347 def dispatch(repo, proto, command):
348 repo = repo.filtered("served")
348 repo = repo.filtered("served")
349 func, spec = commands[command]
349 func, spec = commands[command]
350 args = proto.getargs(spec)
350 args = proto.getargs(spec)
351 return func(repo, proto, *args)
351 return func(repo, proto, *args)
352
352
353 def options(cmd, keys, others):
353 def options(cmd, keys, others):
354 opts = {}
354 opts = {}
355 for k in keys:
355 for k in keys:
356 if k in others:
356 if k in others:
357 opts[k] = others[k]
357 opts[k] = others[k]
358 del others[k]
358 del others[k]
359 if others:
359 if others:
360 sys.stderr.write("abort: %s got unexpected arguments %s\n"
360 sys.stderr.write("abort: %s got unexpected arguments %s\n"
361 % (cmd, ",".join(others)))
361 % (cmd, ",".join(others)))
362 return opts
362 return opts
363
363
364 def batch(repo, proto, cmds, others):
364 def batch(repo, proto, cmds, others):
365 repo = repo.filtered("served")
365 repo = repo.filtered("served")
366 res = []
366 res = []
367 for pair in cmds.split(';'):
367 for pair in cmds.split(';'):
368 op, args = pair.split(' ', 1)
368 op, args = pair.split(' ', 1)
369 vals = {}
369 vals = {}
370 for a in args.split(','):
370 for a in args.split(','):
371 if a:
371 if a:
372 n, v = a.split('=')
372 n, v = a.split('=')
373 vals[n] = unescapearg(v)
373 vals[n] = unescapearg(v)
374 func, spec = commands[op]
374 func, spec = commands[op]
375 if spec:
375 if spec:
376 keys = spec.split()
376 keys = spec.split()
377 data = {}
377 data = {}
378 for k in keys:
378 for k in keys:
379 if k == '*':
379 if k == '*':
380 star = {}
380 star = {}
381 for key in vals.keys():
381 for key in vals.keys():
382 if key not in keys:
382 if key not in keys:
383 star[key] = vals[key]
383 star[key] = vals[key]
384 data['*'] = star
384 data['*'] = star
385 else:
385 else:
386 data[k] = vals[k]
386 data[k] = vals[k]
387 result = func(repo, proto, *[data[k] for k in keys])
387 result = func(repo, proto, *[data[k] for k in keys])
388 else:
388 else:
389 result = func(repo, proto)
389 result = func(repo, proto)
390 if isinstance(result, ooberror):
390 if isinstance(result, ooberror):
391 return result
391 return result
392 res.append(escapearg(result))
392 res.append(escapearg(result))
393 return ';'.join(res)
393 return ';'.join(res)
394
394
395 def between(repo, proto, pairs):
395 def between(repo, proto, pairs):
396 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
396 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
397 r = []
397 r = []
398 for b in repo.between(pairs):
398 for b in repo.between(pairs):
399 r.append(encodelist(b) + "\n")
399 r.append(encodelist(b) + "\n")
400 return "".join(r)
400 return "".join(r)
401
401
402 def branchmap(repo, proto):
402 def branchmap(repo, proto):
403 branchmap = repo.branchmap()
403 branchmap = repo.branchmap()
404 heads = []
404 heads = []
405 for branch, nodes in branchmap.iteritems():
405 for branch, nodes in branchmap.iteritems():
406 branchname = urllib.quote(encoding.fromlocal(branch))
406 branchname = urllib.quote(encoding.fromlocal(branch))
407 branchnodes = encodelist(nodes)
407 branchnodes = encodelist(nodes)
408 heads.append('%s %s' % (branchname, branchnodes))
408 heads.append('%s %s' % (branchname, branchnodes))
409 return '\n'.join(heads)
409 return '\n'.join(heads)
410
410
411 def branches(repo, proto, nodes):
411 def branches(repo, proto, nodes):
412 nodes = decodelist(nodes)
412 nodes = decodelist(nodes)
413 r = []
413 r = []
414 for b in repo.branches(nodes):
414 for b in repo.branches(nodes):
415 r.append(encodelist(b) + "\n")
415 r.append(encodelist(b) + "\n")
416 return "".join(r)
416 return "".join(r)
417
417
418 def capabilities(repo, proto):
418 def capabilities(repo, proto):
419 caps = ('lookup changegroupsubset branchmap pushkey known getbundle '
419 caps = ('lookup changegroupsubset branchmap pushkey known getbundle '
420 'unbundlehash batch').split()
420 'unbundlehash batch').split()
421 if _allowstream(repo.ui):
421 if _allowstream(repo.ui):
422 if repo.ui.configbool('server', 'preferuncompressed', False):
422 if repo.ui.configbool('server', 'preferuncompressed', False):
423 caps.append('stream-preferred')
423 caps.append('stream-preferred')
424 requiredformats = repo.requirements & repo.supportedformats
424 requiredformats = repo.requirements & repo.supportedformats
425 # if our local revlogs are just revlogv1, add 'stream' cap
425 # if our local revlogs are just revlogv1, add 'stream' cap
426 if not requiredformats - set(('revlogv1',)):
426 if not requiredformats - set(('revlogv1',)):
427 caps.append('stream')
427 caps.append('stream')
428 # otherwise, add 'streamreqs' detailing our local revlog format
428 # otherwise, add 'streamreqs' detailing our local revlog format
429 else:
429 else:
430 caps.append('streamreqs=%s' % ','.join(requiredformats))
430 caps.append('streamreqs=%s' % ','.join(requiredformats))
431 caps.append('unbundle=%s' % ','.join(changegroupmod.bundlepriority))
431 caps.append('unbundle=%s' % ','.join(changegroupmod.bundlepriority))
432 caps.append('httpheader=1024')
432 caps.append('httpheader=1024')
433 return ' '.join(caps)
433 return ' '.join(caps)
434
434
435 def changegroup(repo, proto, roots):
435 def changegroup(repo, proto, roots):
436 nodes = decodelist(roots)
436 nodes = decodelist(roots)
437 cg = repo.changegroup(nodes, 'serve')
437 cg = repo.changegroup(nodes, 'serve')
438 return streamres(proto.groupchunks(cg))
438 return streamres(proto.groupchunks(cg))
439
439
440 def changegroupsubset(repo, proto, bases, heads):
440 def changegroupsubset(repo, proto, bases, heads):
441 bases = decodelist(bases)
441 bases = decodelist(bases)
442 heads = decodelist(heads)
442 heads = decodelist(heads)
443 cg = repo.changegroupsubset(bases, heads, 'serve')
443 cg = repo.changegroupsubset(bases, heads, 'serve')
444 return streamres(proto.groupchunks(cg))
444 return streamres(proto.groupchunks(cg))
445
445
446 def debugwireargs(repo, proto, one, two, others):
446 def debugwireargs(repo, proto, one, two, others):
447 # only accept optional args from the known set
447 # only accept optional args from the known set
448 opts = options('debugwireargs', ['three', 'four'], others)
448 opts = options('debugwireargs', ['three', 'four'], others)
449 return repo.debugwireargs(one, two, **opts)
449 return repo.debugwireargs(one, two, **opts)
450
450
451 def getbundle(repo, proto, others):
451 def getbundle(repo, proto, others):
452 opts = options('getbundle', ['heads', 'common'], others)
452 opts = options('getbundle', ['heads', 'common'], others)
453 for k, v in opts.iteritems():
453 for k, v in opts.iteritems():
454 opts[k] = decodelist(v)
454 opts[k] = decodelist(v)
455 cg = repo.getbundle('serve', **opts)
455 cg = repo.getbundle('serve', **opts)
456 return streamres(proto.groupchunks(cg))
456 return streamres(proto.groupchunks(cg))
457
457
458 def heads(repo, proto):
458 def heads(repo, proto):
459 h = repo.heads()
459 h = repo.heads()
460 return encodelist(h) + "\n"
460 return encodelist(h) + "\n"
461
461
462 def hello(repo, proto):
462 def hello(repo, proto):
463 '''the hello command returns a set of lines describing various
463 '''the hello command returns a set of lines describing various
464 interesting things about the server, in an RFC822-like format.
464 interesting things about the server, in an RFC822-like format.
465 Currently the only one defined is "capabilities", which
465 Currently the only one defined is "capabilities", which
466 consists of a line in the form:
466 consists of a line in the form:
467
467
468 capabilities: space separated list of tokens
468 capabilities: space separated list of tokens
469 '''
469 '''
470 return "capabilities: %s\n" % (capabilities(repo, proto))
470 return "capabilities: %s\n" % (capabilities(repo, proto))
471
471
472 def listkeys(repo, proto, namespace):
472 def listkeys(repo, proto, namespace):
473 d = repo.listkeys(encoding.tolocal(namespace)).items()
473 d = repo.listkeys(encoding.tolocal(namespace)).items()
474 t = '\n'.join(['%s\t%s' % (encoding.fromlocal(k), encoding.fromlocal(v))
474 t = '\n'.join(['%s\t%s' % (encoding.fromlocal(k), encoding.fromlocal(v))
475 for k, v in d])
475 for k, v in d])
476 return t
476 return t
477
477
478 def lookup(repo, proto, key):
478 def lookup(repo, proto, key):
479 try:
479 try:
480 k = encoding.tolocal(key)
480 k = encoding.tolocal(key)
481 c = repo[k]
481 c = repo[k]
482 r = c.hex()
482 r = c.hex()
483 success = 1
483 success = 1
484 except Exception, inst:
484 except Exception, inst:
485 r = str(inst)
485 r = str(inst)
486 success = 0
486 success = 0
487 return "%s %s\n" % (success, r)
487 return "%s %s\n" % (success, r)
488
488
489 def known(repo, proto, nodes, others):
489 def known(repo, proto, nodes, others):
490 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
490 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
491
491
492 def pushkey(repo, proto, namespace, key, old, new):
492 def pushkey(repo, proto, namespace, key, old, new):
493 # compatibility with pre-1.8 clients which were accidentally
493 # compatibility with pre-1.8 clients which were accidentally
494 # sending raw binary nodes rather than utf-8-encoded hex
494 # sending raw binary nodes rather than utf-8-encoded hex
495 if len(new) == 20 and new.encode('string-escape') != new:
495 if len(new) == 20 and new.encode('string-escape') != new:
496 # looks like it could be a binary node
496 # looks like it could be a binary node
497 try:
497 try:
498 new.decode('utf-8')
498 new.decode('utf-8')
499 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
499 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
500 except UnicodeDecodeError:
500 except UnicodeDecodeError:
501 pass # binary, leave unmodified
501 pass # binary, leave unmodified
502 else:
502 else:
503 new = encoding.tolocal(new) # normal path
503 new = encoding.tolocal(new) # normal path
504
504
505 if util.safehasattr(proto, 'restore'):
505 if util.safehasattr(proto, 'restore'):
506
506
507 proto.redirect()
507 proto.redirect()
508
508
509 try:
509 try:
510 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
510 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
511 encoding.tolocal(old), new) or False
511 encoding.tolocal(old), new) or False
512 except util.Abort:
512 except util.Abort:
513 r = False
513 r = False
514
514
515 output = proto.restore()
515 output = proto.restore()
516
516
517 return '%s\n%s' % (int(r), output)
517 return '%s\n%s' % (int(r), output)
518
518
519 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
519 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
520 encoding.tolocal(old), new)
520 encoding.tolocal(old), new)
521 return '%s\n' % int(r)
521 return '%s\n' % int(r)
522
522
523 def _allowstream(ui):
523 def _allowstream(ui):
524 return ui.configbool('server', 'uncompressed', True, untrusted=True)
524 return ui.configbool('server', 'uncompressed', True, untrusted=True)
525
525
526 def _walkstreamfiles(repo):
527 # this is it's own function so extensions can override it
528 return repo.store.walk()
529
526 def stream(repo, proto):
530 def stream(repo, proto):
527 '''If the server supports streaming clone, it advertises the "stream"
531 '''If the server supports streaming clone, it advertises the "stream"
528 capability with a value representing the version and flags of the repo
532 capability with a value representing the version and flags of the repo
529 it is serving. Client checks to see if it understands the format.
533 it is serving. Client checks to see if it understands the format.
530
534
531 The format is simple: the server writes out a line with the amount
535 The format is simple: the server writes out a line with the amount
532 of files, then the total amount of bytes to be transferred (separated
536 of files, then the total amount of bytes to be transferred (separated
533 by a space). Then, for each file, the server first writes the filename
537 by a space). Then, for each file, the server first writes the filename
534 and filesize (separated by the null character), then the file contents.
538 and filesize (separated by the null character), then the file contents.
535 '''
539 '''
536
540
537 if not _allowstream(repo.ui):
541 if not _allowstream(repo.ui):
538 return '1\n'
542 return '1\n'
539
543
540 entries = []
544 entries = []
541 total_bytes = 0
545 total_bytes = 0
542 try:
546 try:
543 # get consistent snapshot of repo, lock during scan
547 # get consistent snapshot of repo, lock during scan
544 lock = repo.lock()
548 lock = repo.lock()
545 try:
549 try:
546 repo.ui.debug('scanning\n')
550 repo.ui.debug('scanning\n')
547 for name, ename, size in repo.store.walk():
551 for name, ename, size in _walkstreamfiles(repo):
548 if size:
552 if size:
549 entries.append((name, size))
553 entries.append((name, size))
550 total_bytes += size
554 total_bytes += size
551 finally:
555 finally:
552 lock.release()
556 lock.release()
553 except error.LockError:
557 except error.LockError:
554 return '2\n' # error: 2
558 return '2\n' # error: 2
555
559
556 def streamer(repo, entries, total):
560 def streamer(repo, entries, total):
557 '''stream out all metadata files in repository.'''
561 '''stream out all metadata files in repository.'''
558 yield '0\n' # success
562 yield '0\n' # success
559 repo.ui.debug('%d files, %d bytes to transfer\n' %
563 repo.ui.debug('%d files, %d bytes to transfer\n' %
560 (len(entries), total_bytes))
564 (len(entries), total_bytes))
561 yield '%d %d\n' % (len(entries), total_bytes)
565 yield '%d %d\n' % (len(entries), total_bytes)
562
566
563 sopener = repo.sopener
567 sopener = repo.sopener
564 oldaudit = sopener.mustaudit
568 oldaudit = sopener.mustaudit
565 debugflag = repo.ui.debugflag
569 debugflag = repo.ui.debugflag
566 sopener.mustaudit = False
570 sopener.mustaudit = False
567
571
568 try:
572 try:
569 for name, size in entries:
573 for name, size in entries:
570 if debugflag:
574 if debugflag:
571 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
575 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
572 # partially encode name over the wire for backwards compat
576 # partially encode name over the wire for backwards compat
573 yield '%s\0%d\n' % (store.encodedir(name), size)
577 yield '%s\0%d\n' % (store.encodedir(name), size)
574 if size <= 65536:
578 if size <= 65536:
575 fp = sopener(name)
579 fp = sopener(name)
576 try:
580 try:
577 data = fp.read(size)
581 data = fp.read(size)
578 finally:
582 finally:
579 fp.close()
583 fp.close()
580 yield data
584 yield data
581 else:
585 else:
582 for chunk in util.filechunkiter(sopener(name), limit=size):
586 for chunk in util.filechunkiter(sopener(name), limit=size):
583 yield chunk
587 yield chunk
584 # replace with "finally:" when support for python 2.4 has been dropped
588 # replace with "finally:" when support for python 2.4 has been dropped
585 except Exception:
589 except Exception:
586 sopener.mustaudit = oldaudit
590 sopener.mustaudit = oldaudit
587 raise
591 raise
588 sopener.mustaudit = oldaudit
592 sopener.mustaudit = oldaudit
589
593
590 return streamres(streamer(repo, entries, total_bytes))
594 return streamres(streamer(repo, entries, total_bytes))
591
595
592 def unbundle(repo, proto, heads):
596 def unbundle(repo, proto, heads):
593 their_heads = decodelist(heads)
597 their_heads = decodelist(heads)
594
598
595 def check_heads():
599 def check_heads():
596 heads = repo.heads()
600 heads = repo.heads()
597 heads_hash = util.sha1(''.join(sorted(heads))).digest()
601 heads_hash = util.sha1(''.join(sorted(heads))).digest()
598 return (their_heads == ['force'] or their_heads == heads or
602 return (their_heads == ['force'] or their_heads == heads or
599 their_heads == ['hashed', heads_hash])
603 their_heads == ['hashed', heads_hash])
600
604
601 proto.redirect()
605 proto.redirect()
602
606
603 # fail early if possible
607 # fail early if possible
604 if not check_heads():
608 if not check_heads():
605 return pusherr('repository changed while preparing changes - '
609 return pusherr('repository changed while preparing changes - '
606 'please try again')
610 'please try again')
607
611
608 # write bundle data to temporary file because it can be big
612 # write bundle data to temporary file because it can be big
609 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
613 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
610 fp = os.fdopen(fd, 'wb+')
614 fp = os.fdopen(fd, 'wb+')
611 r = 0
615 r = 0
612 try:
616 try:
613 proto.getfile(fp)
617 proto.getfile(fp)
614 lock = repo.lock()
618 lock = repo.lock()
615 try:
619 try:
616 if not check_heads():
620 if not check_heads():
617 # someone else committed/pushed/unbundled while we
621 # someone else committed/pushed/unbundled while we
618 # were transferring data
622 # were transferring data
619 return pusherr('repository changed while uploading changes - '
623 return pusherr('repository changed while uploading changes - '
620 'please try again')
624 'please try again')
621
625
622 # push can proceed
626 # push can proceed
623 fp.seek(0)
627 fp.seek(0)
624 gen = changegroupmod.readbundle(fp, None)
628 gen = changegroupmod.readbundle(fp, None)
625
629
626 try:
630 try:
627 r = repo.addchangegroup(gen, 'serve', proto._client())
631 r = repo.addchangegroup(gen, 'serve', proto._client())
628 except util.Abort, inst:
632 except util.Abort, inst:
629 sys.stderr.write("abort: %s\n" % inst)
633 sys.stderr.write("abort: %s\n" % inst)
630 finally:
634 finally:
631 lock.release()
635 lock.release()
632 return pushres(r)
636 return pushres(r)
633
637
634 finally:
638 finally:
635 fp.close()
639 fp.close()
636 os.unlink(tempname)
640 os.unlink(tempname)
637
641
638 commands = {
642 commands = {
639 'batch': (batch, 'cmds *'),
643 'batch': (batch, 'cmds *'),
640 'between': (between, 'pairs'),
644 'between': (between, 'pairs'),
641 'branchmap': (branchmap, ''),
645 'branchmap': (branchmap, ''),
642 'branches': (branches, 'nodes'),
646 'branches': (branches, 'nodes'),
643 'capabilities': (capabilities, ''),
647 'capabilities': (capabilities, ''),
644 'changegroup': (changegroup, 'roots'),
648 'changegroup': (changegroup, 'roots'),
645 'changegroupsubset': (changegroupsubset, 'bases heads'),
649 'changegroupsubset': (changegroupsubset, 'bases heads'),
646 'debugwireargs': (debugwireargs, 'one two *'),
650 'debugwireargs': (debugwireargs, 'one two *'),
647 'getbundle': (getbundle, '*'),
651 'getbundle': (getbundle, '*'),
648 'heads': (heads, ''),
652 'heads': (heads, ''),
649 'hello': (hello, ''),
653 'hello': (hello, ''),
650 'known': (known, 'nodes *'),
654 'known': (known, 'nodes *'),
651 'listkeys': (listkeys, 'namespace'),
655 'listkeys': (listkeys, 'namespace'),
652 'lookup': (lookup, 'key'),
656 'lookup': (lookup, 'key'),
653 'pushkey': (pushkey, 'namespace key old new'),
657 'pushkey': (pushkey, 'namespace key old new'),
654 'stream_out': (stream, ''),
658 'stream_out': (stream, ''),
655 'unbundle': (unbundle, 'heads'),
659 'unbundle': (unbundle, 'heads'),
656 }
660 }
General Comments 0
You need to be logged in to leave comments. Login now