##// END OF EJS Templates
wireproto: rename argument to groupchunks()...
Gregory Szorc -
r30014:d34cf260 default
parent child Browse files
Show More
@@ -1,124 +1,124 b''
1 1 #
2 2 # Copyright 21 May 2005 - (c) 2005 Jake Edge <jake@edge2.net>
3 3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import cgi
11 11 import zlib
12 12
13 13 from .common import (
14 14 HTTP_OK,
15 15 )
16 16
17 17 from .. import (
18 18 util,
19 19 wireproto,
20 20 )
21 21 stringio = util.stringio
22 22
23 23 urlerr = util.urlerr
24 24 urlreq = util.urlreq
25 25
26 26 HGTYPE = 'application/mercurial-0.1'
27 27 HGERRTYPE = 'application/hg-error'
28 28
29 29 class webproto(wireproto.abstractserverproto):
30 30 def __init__(self, req, ui):
31 31 self.req = req
32 32 self.response = ''
33 33 self.ui = ui
34 34 def getargs(self, args):
35 35 knownargs = self._args()
36 36 data = {}
37 37 keys = args.split()
38 38 for k in keys:
39 39 if k == '*':
40 40 star = {}
41 41 for key in knownargs.keys():
42 42 if key != 'cmd' and key not in keys:
43 43 star[key] = knownargs[key][0]
44 44 data['*'] = star
45 45 else:
46 46 data[k] = knownargs[k][0]
47 47 return [data[k] for k in keys]
48 48 def _args(self):
49 49 args = self.req.form.copy()
50 50 postlen = int(self.req.env.get('HTTP_X_HGARGS_POST', 0))
51 51 if postlen:
52 52 args.update(cgi.parse_qs(
53 53 self.req.read(postlen), keep_blank_values=True))
54 54 return args
55 55 chunks = []
56 56 i = 1
57 57 while True:
58 58 h = self.req.env.get('HTTP_X_HGARG_' + str(i))
59 59 if h is None:
60 60 break
61 61 chunks += [h]
62 62 i += 1
63 63 args.update(cgi.parse_qs(''.join(chunks), keep_blank_values=True))
64 64 return args
65 65 def getfile(self, fp):
66 66 length = int(self.req.env['CONTENT_LENGTH'])
67 67 for s in util.filechunkiter(self.req, limit=length):
68 68 fp.write(s)
69 69 def redirect(self):
70 70 self.oldio = self.ui.fout, self.ui.ferr
71 71 self.ui.ferr = self.ui.fout = stringio()
72 72 def restore(self):
73 73 val = self.ui.fout.getvalue()
74 74 self.ui.ferr, self.ui.fout = self.oldio
75 75 return val
76 def groupchunks(self, cg):
76 def groupchunks(self, fh):
77 77 # Don't allow untrusted settings because disabling compression or
78 78 # setting a very high compression level could lead to flooding
79 79 # the server's network or CPU.
80 80 z = zlib.compressobj(self.ui.configint('server', 'zliblevel', -1))
81 81 while True:
82 chunk = cg.read(32768)
82 chunk = fh.read(32768)
83 83 if not chunk:
84 84 break
85 85 data = z.compress(chunk)
86 86 # Not all calls to compress() emit data. It is cheaper to inspect
87 87 # that here than to send it via the generator.
88 88 if data:
89 89 yield data
90 90 yield z.flush()
91 91 def _client(self):
92 92 return 'remote:%s:%s:%s' % (
93 93 self.req.env.get('wsgi.url_scheme') or 'http',
94 94 urlreq.quote(self.req.env.get('REMOTE_HOST', '')),
95 95 urlreq.quote(self.req.env.get('REMOTE_USER', '')))
96 96
97 97 def iscmd(cmd):
98 98 return cmd in wireproto.commands
99 99
100 100 def call(repo, req, cmd):
101 101 p = webproto(req, repo.ui)
102 102 rsp = wireproto.dispatch(repo, p, cmd)
103 103 if isinstance(rsp, str):
104 104 req.respond(HTTP_OK, HGTYPE, body=rsp)
105 105 return []
106 106 elif isinstance(rsp, wireproto.streamres):
107 107 req.respond(HTTP_OK, HGTYPE)
108 108 return rsp.gen
109 109 elif isinstance(rsp, wireproto.pushres):
110 110 val = p.restore()
111 111 rsp = '%d\n%s' % (rsp.res, val)
112 112 req.respond(HTTP_OK, HGTYPE, body=rsp)
113 113 return []
114 114 elif isinstance(rsp, wireproto.pusherr):
115 115 # drain the incoming bundle
116 116 req.drain()
117 117 p.restore()
118 118 rsp = '0\n%s\n' % rsp.res
119 119 req.respond(HTTP_OK, HGTYPE, body=rsp)
120 120 return []
121 121 elif isinstance(rsp, wireproto.ooberror):
122 122 rsp = rsp.message
123 123 req.respond(HTTP_OK, HGERRTYPE, body=rsp)
124 124 return []
@@ -1,131 +1,131 b''
1 1 # sshserver.py - ssh protocol server support for mercurial
2 2 #
3 3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
4 4 # Copyright 2006 Vadim Gelfer <vadim.gelfer@gmail.com>
5 5 #
6 6 # This software may be used and distributed according to the terms of the
7 7 # GNU General Public License version 2 or any later version.
8 8
9 9 from __future__ import absolute_import
10 10
11 11 import os
12 12 import sys
13 13
14 14 from .i18n import _
15 15 from . import (
16 16 error,
17 17 hook,
18 18 util,
19 19 wireproto,
20 20 )
21 21
22 22 class sshserver(wireproto.abstractserverproto):
23 23 def __init__(self, ui, repo):
24 24 self.ui = ui
25 25 self.repo = repo
26 26 self.lock = None
27 27 self.fin = ui.fin
28 28 self.fout = ui.fout
29 29
30 30 hook.redirect(True)
31 31 ui.fout = repo.ui.fout = ui.ferr
32 32
33 33 # Prevent insertion/deletion of CRs
34 34 util.setbinary(self.fin)
35 35 util.setbinary(self.fout)
36 36
37 37 def getargs(self, args):
38 38 data = {}
39 39 keys = args.split()
40 40 for n in xrange(len(keys)):
41 41 argline = self.fin.readline()[:-1]
42 42 arg, l = argline.split()
43 43 if arg not in keys:
44 44 raise error.Abort(_("unexpected parameter %r") % arg)
45 45 if arg == '*':
46 46 star = {}
47 47 for k in xrange(int(l)):
48 48 argline = self.fin.readline()[:-1]
49 49 arg, l = argline.split()
50 50 val = self.fin.read(int(l))
51 51 star[arg] = val
52 52 data['*'] = star
53 53 else:
54 54 val = self.fin.read(int(l))
55 55 data[arg] = val
56 56 return [data[k] for k in keys]
57 57
58 58 def getarg(self, name):
59 59 return self.getargs(name)[0]
60 60
61 61 def getfile(self, fpout):
62 62 self.sendresponse('')
63 63 count = int(self.fin.readline())
64 64 while count:
65 65 fpout.write(self.fin.read(count))
66 66 count = int(self.fin.readline())
67 67
68 68 def redirect(self):
69 69 pass
70 70
71 def groupchunks(self, changegroup):
72 return iter(lambda: changegroup.read(4096), '')
71 def groupchunks(self, fh):
72 return iter(lambda: fh.read(4096), '')
73 73
74 74 def sendresponse(self, v):
75 75 self.fout.write("%d\n" % len(v))
76 76 self.fout.write(v)
77 77 self.fout.flush()
78 78
79 79 def sendstream(self, source):
80 80 write = self.fout.write
81 81 for chunk in source.gen:
82 82 write(chunk)
83 83 self.fout.flush()
84 84
85 85 def sendpushresponse(self, rsp):
86 86 self.sendresponse('')
87 87 self.sendresponse(str(rsp.res))
88 88
89 89 def sendpusherror(self, rsp):
90 90 self.sendresponse(rsp.res)
91 91
92 92 def sendooberror(self, rsp):
93 93 self.ui.ferr.write('%s\n-\n' % rsp.message)
94 94 self.ui.ferr.flush()
95 95 self.fout.write('\n')
96 96 self.fout.flush()
97 97
98 98 def serve_forever(self):
99 99 try:
100 100 while self.serve_one():
101 101 pass
102 102 finally:
103 103 if self.lock is not None:
104 104 self.lock.release()
105 105 sys.exit(0)
106 106
107 107 handlers = {
108 108 str: sendresponse,
109 109 wireproto.streamres: sendstream,
110 110 wireproto.pushres: sendpushresponse,
111 111 wireproto.pusherr: sendpusherror,
112 112 wireproto.ooberror: sendooberror,
113 113 }
114 114
115 115 def serve_one(self):
116 116 cmd = self.fin.readline()[:-1]
117 117 if cmd and cmd in wireproto.commands:
118 118 rsp = wireproto.dispatch(self.repo, self, cmd)
119 119 self.handlers[rsp.__class__](self, rsp)
120 120 elif cmd:
121 121 impl = getattr(self, 'do_' + cmd, None)
122 122 if impl:
123 123 r = impl()
124 124 if r is not None:
125 125 self.sendresponse(r)
126 126 else: self.sendresponse("")
127 127 return cmd != ''
128 128
129 129 def _client(self):
130 130 client = os.environ.get('SSH_CLIENT', '').split(' ', 1)[0]
131 131 return 'remote:ssh:' + client
@@ -1,956 +1,957 b''
1 1 # wireproto.py - generic wire protocol support functions
2 2 #
3 3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import hashlib
11 11 import itertools
12 12 import os
13 13 import sys
14 14 import tempfile
15 15
16 16 from .i18n import _
17 17 from .node import (
18 18 bin,
19 19 hex,
20 20 )
21 21
22 22 from . import (
23 23 bundle2,
24 24 changegroup as changegroupmod,
25 25 encoding,
26 26 error,
27 27 exchange,
28 28 peer,
29 29 pushkey as pushkeymod,
30 30 streamclone,
31 31 util,
32 32 )
33 33
34 34 urlerr = util.urlerr
35 35 urlreq = util.urlreq
36 36
37 37 bundle2required = _(
38 38 'incompatible Mercurial client; bundle2 required\n'
39 39 '(see https://www.mercurial-scm.org/wiki/IncompatibleClient)\n')
40 40
41 41 class abstractserverproto(object):
42 42 """abstract class that summarizes the protocol API
43 43
44 44 Used as reference and documentation.
45 45 """
46 46
47 47 def getargs(self, args):
48 48 """return the value for arguments in <args>
49 49
50 50 returns a list of values (same order as <args>)"""
51 51 raise NotImplementedError()
52 52
53 53 def getfile(self, fp):
54 54 """write the whole content of a file into a file like object
55 55
56 56 The file is in the form::
57 57
58 58 (<chunk-size>\n<chunk>)+0\n
59 59
60 60 chunk size is the ascii version of the int.
61 61 """
62 62 raise NotImplementedError()
63 63
64 64 def redirect(self):
65 65 """may setup interception for stdout and stderr
66 66
67 67 See also the `restore` method."""
68 68 raise NotImplementedError()
69 69
70 70 # If the `redirect` function does install interception, the `restore`
71 71 # function MUST be defined. If interception is not used, this function
72 72 # MUST NOT be defined.
73 73 #
74 74 # left commented here on purpose
75 75 #
76 76 #def restore(self):
77 77 # """reinstall previous stdout and stderr and return intercepted stdout
78 78 # """
79 79 # raise NotImplementedError()
80 80
81 def groupchunks(self, cg):
82 """return 4096 chunks from a changegroup object
81 def groupchunks(self, fh):
82 """Generator of chunks to send to the client.
83 83
84 Some protocols may have compressed the contents."""
84 Some protocols may have compressed the contents.
85 """
85 86 raise NotImplementedError()
86 87
87 88 class remotebatch(peer.batcher):
88 89 '''batches the queued calls; uses as few roundtrips as possible'''
89 90 def __init__(self, remote):
90 91 '''remote must support _submitbatch(encbatch) and
91 92 _submitone(op, encargs)'''
92 93 peer.batcher.__init__(self)
93 94 self.remote = remote
94 95 def submit(self):
95 96 req, rsp = [], []
96 97 for name, args, opts, resref in self.calls:
97 98 mtd = getattr(self.remote, name)
98 99 batchablefn = getattr(mtd, 'batchable', None)
99 100 if batchablefn is not None:
100 101 batchable = batchablefn(mtd.im_self, *args, **opts)
101 102 encargsorres, encresref = next(batchable)
102 103 if encresref:
103 104 req.append((name, encargsorres,))
104 105 rsp.append((batchable, encresref, resref,))
105 106 else:
106 107 resref.set(encargsorres)
107 108 else:
108 109 if req:
109 110 self._submitreq(req, rsp)
110 111 req, rsp = [], []
111 112 resref.set(mtd(*args, **opts))
112 113 if req:
113 114 self._submitreq(req, rsp)
114 115 def _submitreq(self, req, rsp):
115 116 encresults = self.remote._submitbatch(req)
116 117 for encres, r in zip(encresults, rsp):
117 118 batchable, encresref, resref = r
118 119 encresref.set(encres)
119 120 resref.set(next(batchable))
120 121
121 122 class remoteiterbatcher(peer.iterbatcher):
122 123 def __init__(self, remote):
123 124 super(remoteiterbatcher, self).__init__()
124 125 self._remote = remote
125 126
126 127 def __getattr__(self, name):
127 128 if not getattr(self._remote, name, False):
128 129 raise AttributeError(
129 130 'Attempted to iterbatch non-batchable call to %r' % name)
130 131 return super(remoteiterbatcher, self).__getattr__(name)
131 132
132 133 def submit(self):
133 134 """Break the batch request into many patch calls and pipeline them.
134 135
135 136 This is mostly valuable over http where request sizes can be
136 137 limited, but can be used in other places as well.
137 138 """
138 139 req, rsp = [], []
139 140 for name, args, opts, resref in self.calls:
140 141 mtd = getattr(self._remote, name)
141 142 batchable = mtd.batchable(mtd.im_self, *args, **opts)
142 143 encargsorres, encresref = next(batchable)
143 144 assert encresref
144 145 req.append((name, encargsorres))
145 146 rsp.append((batchable, encresref))
146 147 if req:
147 148 self._resultiter = self._remote._submitbatch(req)
148 149 self._rsp = rsp
149 150
150 151 def results(self):
151 152 for (batchable, encresref), encres in itertools.izip(
152 153 self._rsp, self._resultiter):
153 154 encresref.set(encres)
154 155 yield next(batchable)
155 156
156 157 # Forward a couple of names from peer to make wireproto interactions
157 158 # slightly more sensible.
158 159 batchable = peer.batchable
159 160 future = peer.future
160 161
161 162 # list of nodes encoding / decoding
162 163
163 164 def decodelist(l, sep=' '):
164 165 if l:
165 166 return map(bin, l.split(sep))
166 167 return []
167 168
168 169 def encodelist(l, sep=' '):
169 170 try:
170 171 return sep.join(map(hex, l))
171 172 except TypeError:
172 173 raise
173 174
174 175 # batched call argument encoding
175 176
176 177 def escapearg(plain):
177 178 return (plain
178 179 .replace(':', ':c')
179 180 .replace(',', ':o')
180 181 .replace(';', ':s')
181 182 .replace('=', ':e'))
182 183
183 184 def unescapearg(escaped):
184 185 return (escaped
185 186 .replace(':e', '=')
186 187 .replace(':s', ';')
187 188 .replace(':o', ',')
188 189 .replace(':c', ':'))
189 190
190 191 def encodebatchcmds(req):
191 192 """Return a ``cmds`` argument value for the ``batch`` command."""
192 193 cmds = []
193 194 for op, argsdict in req:
194 195 # Old servers didn't properly unescape argument names. So prevent
195 196 # the sending of argument names that may not be decoded properly by
196 197 # servers.
197 198 assert all(escapearg(k) == k for k in argsdict)
198 199
199 200 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
200 201 for k, v in argsdict.iteritems())
201 202 cmds.append('%s %s' % (op, args))
202 203
203 204 return ';'.join(cmds)
204 205
205 206 # mapping of options accepted by getbundle and their types
206 207 #
207 208 # Meant to be extended by extensions. It is extensions responsibility to ensure
208 209 # such options are properly processed in exchange.getbundle.
209 210 #
210 211 # supported types are:
211 212 #
212 213 # :nodes: list of binary nodes
213 214 # :csv: list of comma-separated values
214 215 # :scsv: list of comma-separated values return as set
215 216 # :plain: string with no transformation needed.
216 217 gboptsmap = {'heads': 'nodes',
217 218 'common': 'nodes',
218 219 'obsmarkers': 'boolean',
219 220 'bundlecaps': 'scsv',
220 221 'listkeys': 'csv',
221 222 'cg': 'boolean',
222 223 'cbattempted': 'boolean'}
223 224
224 225 # client side
225 226
226 227 class wirepeer(peer.peerrepository):
227 228 """Client-side interface for communicating with a peer repository.
228 229
229 230 Methods commonly call wire protocol commands of the same name.
230 231
231 232 See also httppeer.py and sshpeer.py for protocol-specific
232 233 implementations of this interface.
233 234 """
234 235 def batch(self):
235 236 if self.capable('batch'):
236 237 return remotebatch(self)
237 238 else:
238 239 return peer.localbatch(self)
239 240 def _submitbatch(self, req):
240 241 """run batch request <req> on the server
241 242
242 243 Returns an iterator of the raw responses from the server.
243 244 """
244 245 rsp = self._callstream("batch", cmds=encodebatchcmds(req))
245 246 chunk = rsp.read(1024)
246 247 work = [chunk]
247 248 while chunk:
248 249 while ';' not in chunk and chunk:
249 250 chunk = rsp.read(1024)
250 251 work.append(chunk)
251 252 merged = ''.join(work)
252 253 while ';' in merged:
253 254 one, merged = merged.split(';', 1)
254 255 yield unescapearg(one)
255 256 chunk = rsp.read(1024)
256 257 work = [merged, chunk]
257 258 yield unescapearg(''.join(work))
258 259
259 260 def _submitone(self, op, args):
260 261 return self._call(op, **args)
261 262
262 263 def iterbatch(self):
263 264 return remoteiterbatcher(self)
264 265
265 266 @batchable
266 267 def lookup(self, key):
267 268 self.requirecap('lookup', _('look up remote revision'))
268 269 f = future()
269 270 yield {'key': encoding.fromlocal(key)}, f
270 271 d = f.value
271 272 success, data = d[:-1].split(" ", 1)
272 273 if int(success):
273 274 yield bin(data)
274 275 self._abort(error.RepoError(data))
275 276
276 277 @batchable
277 278 def heads(self):
278 279 f = future()
279 280 yield {}, f
280 281 d = f.value
281 282 try:
282 283 yield decodelist(d[:-1])
283 284 except ValueError:
284 285 self._abort(error.ResponseError(_("unexpected response:"), d))
285 286
286 287 @batchable
287 288 def known(self, nodes):
288 289 f = future()
289 290 yield {'nodes': encodelist(nodes)}, f
290 291 d = f.value
291 292 try:
292 293 yield [bool(int(b)) for b in d]
293 294 except ValueError:
294 295 self._abort(error.ResponseError(_("unexpected response:"), d))
295 296
296 297 @batchable
297 298 def branchmap(self):
298 299 f = future()
299 300 yield {}, f
300 301 d = f.value
301 302 try:
302 303 branchmap = {}
303 304 for branchpart in d.splitlines():
304 305 branchname, branchheads = branchpart.split(' ', 1)
305 306 branchname = encoding.tolocal(urlreq.unquote(branchname))
306 307 branchheads = decodelist(branchheads)
307 308 branchmap[branchname] = branchheads
308 309 yield branchmap
309 310 except TypeError:
310 311 self._abort(error.ResponseError(_("unexpected response:"), d))
311 312
312 313 def branches(self, nodes):
313 314 n = encodelist(nodes)
314 315 d = self._call("branches", nodes=n)
315 316 try:
316 317 br = [tuple(decodelist(b)) for b in d.splitlines()]
317 318 return br
318 319 except ValueError:
319 320 self._abort(error.ResponseError(_("unexpected response:"), d))
320 321
321 322 def between(self, pairs):
322 323 batch = 8 # avoid giant requests
323 324 r = []
324 325 for i in xrange(0, len(pairs), batch):
325 326 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
326 327 d = self._call("between", pairs=n)
327 328 try:
328 329 r.extend(l and decodelist(l) or [] for l in d.splitlines())
329 330 except ValueError:
330 331 self._abort(error.ResponseError(_("unexpected response:"), d))
331 332 return r
332 333
333 334 @batchable
334 335 def pushkey(self, namespace, key, old, new):
335 336 if not self.capable('pushkey'):
336 337 yield False, None
337 338 f = future()
338 339 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
339 340 yield {'namespace': encoding.fromlocal(namespace),
340 341 'key': encoding.fromlocal(key),
341 342 'old': encoding.fromlocal(old),
342 343 'new': encoding.fromlocal(new)}, f
343 344 d = f.value
344 345 d, output = d.split('\n', 1)
345 346 try:
346 347 d = bool(int(d))
347 348 except ValueError:
348 349 raise error.ResponseError(
349 350 _('push failed (unexpected response):'), d)
350 351 for l in output.splitlines(True):
351 352 self.ui.status(_('remote: '), l)
352 353 yield d
353 354
354 355 @batchable
355 356 def listkeys(self, namespace):
356 357 if not self.capable('pushkey'):
357 358 yield {}, None
358 359 f = future()
359 360 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
360 361 yield {'namespace': encoding.fromlocal(namespace)}, f
361 362 d = f.value
362 363 self.ui.debug('received listkey for "%s": %i bytes\n'
363 364 % (namespace, len(d)))
364 365 yield pushkeymod.decodekeys(d)
365 366
366 367 def stream_out(self):
367 368 return self._callstream('stream_out')
368 369
369 370 def changegroup(self, nodes, kind):
370 371 n = encodelist(nodes)
371 372 f = self._callcompressable("changegroup", roots=n)
372 373 return changegroupmod.cg1unpacker(f, 'UN')
373 374
374 375 def changegroupsubset(self, bases, heads, kind):
375 376 self.requirecap('changegroupsubset', _('look up remote changes'))
376 377 bases = encodelist(bases)
377 378 heads = encodelist(heads)
378 379 f = self._callcompressable("changegroupsubset",
379 380 bases=bases, heads=heads)
380 381 return changegroupmod.cg1unpacker(f, 'UN')
381 382
382 383 def getbundle(self, source, **kwargs):
383 384 self.requirecap('getbundle', _('look up remote changes'))
384 385 opts = {}
385 386 bundlecaps = kwargs.get('bundlecaps')
386 387 if bundlecaps is not None:
387 388 kwargs['bundlecaps'] = sorted(bundlecaps)
388 389 else:
389 390 bundlecaps = () # kwargs could have it to None
390 391 for key, value in kwargs.iteritems():
391 392 if value is None:
392 393 continue
393 394 keytype = gboptsmap.get(key)
394 395 if keytype is None:
395 396 assert False, 'unexpected'
396 397 elif keytype == 'nodes':
397 398 value = encodelist(value)
398 399 elif keytype in ('csv', 'scsv'):
399 400 value = ','.join(value)
400 401 elif keytype == 'boolean':
401 402 value = '%i' % bool(value)
402 403 elif keytype != 'plain':
403 404 raise KeyError('unknown getbundle option type %s'
404 405 % keytype)
405 406 opts[key] = value
406 407 f = self._callcompressable("getbundle", **opts)
407 408 if any((cap.startswith('HG2') for cap in bundlecaps)):
408 409 return bundle2.getunbundler(self.ui, f)
409 410 else:
410 411 return changegroupmod.cg1unpacker(f, 'UN')
411 412
412 413 def unbundle(self, cg, heads, url):
413 414 '''Send cg (a readable file-like object representing the
414 415 changegroup to push, typically a chunkbuffer object) to the
415 416 remote server as a bundle.
416 417
417 418 When pushing a bundle10 stream, return an integer indicating the
418 419 result of the push (see localrepository.addchangegroup()).
419 420
420 421 When pushing a bundle20 stream, return a bundle20 stream.
421 422
422 423 `url` is the url the client thinks it's pushing to, which is
423 424 visible to hooks.
424 425 '''
425 426
426 427 if heads != ['force'] and self.capable('unbundlehash'):
427 428 heads = encodelist(['hashed',
428 429 hashlib.sha1(''.join(sorted(heads))).digest()])
429 430 else:
430 431 heads = encodelist(heads)
431 432
432 433 if util.safehasattr(cg, 'deltaheader'):
433 434 # this a bundle10, do the old style call sequence
434 435 ret, output = self._callpush("unbundle", cg, heads=heads)
435 436 if ret == "":
436 437 raise error.ResponseError(
437 438 _('push failed:'), output)
438 439 try:
439 440 ret = int(ret)
440 441 except ValueError:
441 442 raise error.ResponseError(
442 443 _('push failed (unexpected response):'), ret)
443 444
444 445 for l in output.splitlines(True):
445 446 self.ui.status(_('remote: '), l)
446 447 else:
447 448 # bundle2 push. Send a stream, fetch a stream.
448 449 stream = self._calltwowaystream('unbundle', cg, heads=heads)
449 450 ret = bundle2.getunbundler(self.ui, stream)
450 451 return ret
451 452
452 453 def debugwireargs(self, one, two, three=None, four=None, five=None):
453 454 # don't pass optional arguments left at their default value
454 455 opts = {}
455 456 if three is not None:
456 457 opts['three'] = three
457 458 if four is not None:
458 459 opts['four'] = four
459 460 return self._call('debugwireargs', one=one, two=two, **opts)
460 461
461 462 def _call(self, cmd, **args):
462 463 """execute <cmd> on the server
463 464
464 465 The command is expected to return a simple string.
465 466
466 467 returns the server reply as a string."""
467 468 raise NotImplementedError()
468 469
469 470 def _callstream(self, cmd, **args):
470 471 """execute <cmd> on the server
471 472
472 473 The command is expected to return a stream. Note that if the
473 474 command doesn't return a stream, _callstream behaves
474 475 differently for ssh and http peers.
475 476
476 477 returns the server reply as a file like object.
477 478 """
478 479 raise NotImplementedError()
479 480
480 481 def _callcompressable(self, cmd, **args):
481 482 """execute <cmd> on the server
482 483
483 484 The command is expected to return a stream.
484 485
485 486 The stream may have been compressed in some implementations. This
486 487 function takes care of the decompression. This is the only difference
487 488 with _callstream.
488 489
489 490 returns the server reply as a file like object.
490 491 """
491 492 raise NotImplementedError()
492 493
493 494 def _callpush(self, cmd, fp, **args):
494 495 """execute a <cmd> on server
495 496
496 497 The command is expected to be related to a push. Push has a special
497 498 return method.
498 499
499 500 returns the server reply as a (ret, output) tuple. ret is either
500 501 empty (error) or a stringified int.
501 502 """
502 503 raise NotImplementedError()
503 504
504 505 def _calltwowaystream(self, cmd, fp, **args):
505 506 """execute <cmd> on server
506 507
507 508 The command will send a stream to the server and get a stream in reply.
508 509 """
509 510 raise NotImplementedError()
510 511
511 512 def _abort(self, exception):
512 513 """clearly abort the wire protocol connection and raise the exception
513 514 """
514 515 raise NotImplementedError()
515 516
516 517 # server side
517 518
518 519 # wire protocol command can either return a string or one of these classes.
519 520 class streamres(object):
520 521 """wireproto reply: binary stream
521 522
522 523 The call was successful and the result is a stream.
523 524 Iterate on the `self.gen` attribute to retrieve chunks.
524 525 """
525 526 def __init__(self, gen):
526 527 self.gen = gen
527 528
528 529 class pushres(object):
529 530 """wireproto reply: success with simple integer return
530 531
531 532 The call was successful and returned an integer contained in `self.res`.
532 533 """
533 534 def __init__(self, res):
534 535 self.res = res
535 536
536 537 class pusherr(object):
537 538 """wireproto reply: failure
538 539
539 540 The call failed. The `self.res` attribute contains the error message.
540 541 """
541 542 def __init__(self, res):
542 543 self.res = res
543 544
544 545 class ooberror(object):
545 546 """wireproto reply: failure of a batch of operation
546 547
547 548 Something failed during a batch call. The error message is stored in
548 549 `self.message`.
549 550 """
550 551 def __init__(self, message):
551 552 self.message = message
552 553
553 554 def getdispatchrepo(repo, proto, command):
554 555 """Obtain the repo used for processing wire protocol commands.
555 556
556 557 The intent of this function is to serve as a monkeypatch point for
557 558 extensions that need commands to operate on different repo views under
558 559 specialized circumstances.
559 560 """
560 561 return repo.filtered('served')
561 562
562 563 def dispatch(repo, proto, command):
563 564 repo = getdispatchrepo(repo, proto, command)
564 565 func, spec = commands[command]
565 566 args = proto.getargs(spec)
566 567 return func(repo, proto, *args)
567 568
568 569 def options(cmd, keys, others):
569 570 opts = {}
570 571 for k in keys:
571 572 if k in others:
572 573 opts[k] = others[k]
573 574 del others[k]
574 575 if others:
575 576 sys.stderr.write("warning: %s ignored unexpected arguments %s\n"
576 577 % (cmd, ",".join(others)))
577 578 return opts
578 579
579 580 def bundle1allowed(repo, action):
580 581 """Whether a bundle1 operation is allowed from the server.
581 582
582 583 Priority is:
583 584
584 585 1. server.bundle1gd.<action> (if generaldelta active)
585 586 2. server.bundle1.<action>
586 587 3. server.bundle1gd (if generaldelta active)
587 588 4. server.bundle1
588 589 """
589 590 ui = repo.ui
590 591 gd = 'generaldelta' in repo.requirements
591 592
592 593 if gd:
593 594 v = ui.configbool('server', 'bundle1gd.%s' % action, None)
594 595 if v is not None:
595 596 return v
596 597
597 598 v = ui.configbool('server', 'bundle1.%s' % action, None)
598 599 if v is not None:
599 600 return v
600 601
601 602 if gd:
602 603 v = ui.configbool('server', 'bundle1gd', None)
603 604 if v is not None:
604 605 return v
605 606
606 607 return ui.configbool('server', 'bundle1', True)
607 608
608 609 # list of commands
609 610 commands = {}
610 611
611 612 def wireprotocommand(name, args=''):
612 613 """decorator for wire protocol command"""
613 614 def register(func):
614 615 commands[name] = (func, args)
615 616 return func
616 617 return register
617 618
618 619 @wireprotocommand('batch', 'cmds *')
619 620 def batch(repo, proto, cmds, others):
620 621 repo = repo.filtered("served")
621 622 res = []
622 623 for pair in cmds.split(';'):
623 624 op, args = pair.split(' ', 1)
624 625 vals = {}
625 626 for a in args.split(','):
626 627 if a:
627 628 n, v = a.split('=')
628 629 vals[unescapearg(n)] = unescapearg(v)
629 630 func, spec = commands[op]
630 631 if spec:
631 632 keys = spec.split()
632 633 data = {}
633 634 for k in keys:
634 635 if k == '*':
635 636 star = {}
636 637 for key in vals.keys():
637 638 if key not in keys:
638 639 star[key] = vals[key]
639 640 data['*'] = star
640 641 else:
641 642 data[k] = vals[k]
642 643 result = func(repo, proto, *[data[k] for k in keys])
643 644 else:
644 645 result = func(repo, proto)
645 646 if isinstance(result, ooberror):
646 647 return result
647 648 res.append(escapearg(result))
648 649 return ';'.join(res)
649 650
650 651 @wireprotocommand('between', 'pairs')
651 652 def between(repo, proto, pairs):
652 653 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
653 654 r = []
654 655 for b in repo.between(pairs):
655 656 r.append(encodelist(b) + "\n")
656 657 return "".join(r)
657 658
658 659 @wireprotocommand('branchmap')
659 660 def branchmap(repo, proto):
660 661 branchmap = repo.branchmap()
661 662 heads = []
662 663 for branch, nodes in branchmap.iteritems():
663 664 branchname = urlreq.quote(encoding.fromlocal(branch))
664 665 branchnodes = encodelist(nodes)
665 666 heads.append('%s %s' % (branchname, branchnodes))
666 667 return '\n'.join(heads)
667 668
668 669 @wireprotocommand('branches', 'nodes')
669 670 def branches(repo, proto, nodes):
670 671 nodes = decodelist(nodes)
671 672 r = []
672 673 for b in repo.branches(nodes):
673 674 r.append(encodelist(b) + "\n")
674 675 return "".join(r)
675 676
676 677 @wireprotocommand('clonebundles', '')
677 678 def clonebundles(repo, proto):
678 679 """Server command for returning info for available bundles to seed clones.
679 680
680 681 Clients will parse this response and determine what bundle to fetch.
681 682
682 683 Extensions may wrap this command to filter or dynamically emit data
683 684 depending on the request. e.g. you could advertise URLs for the closest
684 685 data center given the client's IP address.
685 686 """
686 687 return repo.opener.tryread('clonebundles.manifest')
687 688
688 689 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
689 690 'known', 'getbundle', 'unbundlehash', 'batch']
690 691
691 692 def _capabilities(repo, proto):
692 693 """return a list of capabilities for a repo
693 694
694 695 This function exists to allow extensions to easily wrap capabilities
695 696 computation
696 697
697 698 - returns a lists: easy to alter
698 699 - change done here will be propagated to both `capabilities` and `hello`
699 700 command without any other action needed.
700 701 """
701 702 # copy to prevent modification of the global list
702 703 caps = list(wireprotocaps)
703 704 if streamclone.allowservergeneration(repo.ui):
704 705 if repo.ui.configbool('server', 'preferuncompressed', False):
705 706 caps.append('stream-preferred')
706 707 requiredformats = repo.requirements & repo.supportedformats
707 708 # if our local revlogs are just revlogv1, add 'stream' cap
708 709 if not requiredformats - set(('revlogv1',)):
709 710 caps.append('stream')
710 711 # otherwise, add 'streamreqs' detailing our local revlog format
711 712 else:
712 713 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
713 714 if repo.ui.configbool('experimental', 'bundle2-advertise', True):
714 715 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
715 716 caps.append('bundle2=' + urlreq.quote(capsblob))
716 717 caps.append('unbundle=%s' % ','.join(bundle2.bundlepriority))
717 718 caps.append(
718 719 'httpheader=%d' % repo.ui.configint('server', 'maxhttpheaderlen', 1024))
719 720 if repo.ui.configbool('experimental', 'httppostargs', False):
720 721 caps.append('httppostargs')
721 722 return caps
722 723
723 724 # If you are writing an extension and consider wrapping this function. Wrap
724 725 # `_capabilities` instead.
725 726 @wireprotocommand('capabilities')
726 727 def capabilities(repo, proto):
727 728 return ' '.join(_capabilities(repo, proto))
728 729
729 730 @wireprotocommand('changegroup', 'roots')
730 731 def changegroup(repo, proto, roots):
731 732 nodes = decodelist(roots)
732 733 cg = changegroupmod.changegroup(repo, nodes, 'serve')
733 734 return streamres(proto.groupchunks(cg))
734 735
735 736 @wireprotocommand('changegroupsubset', 'bases heads')
736 737 def changegroupsubset(repo, proto, bases, heads):
737 738 bases = decodelist(bases)
738 739 heads = decodelist(heads)
739 740 cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve')
740 741 return streamres(proto.groupchunks(cg))
741 742
742 743 @wireprotocommand('debugwireargs', 'one two *')
743 744 def debugwireargs(repo, proto, one, two, others):
744 745 # only accept optional args from the known set
745 746 opts = options('debugwireargs', ['three', 'four'], others)
746 747 return repo.debugwireargs(one, two, **opts)
747 748
748 749 @wireprotocommand('getbundle', '*')
749 750 def getbundle(repo, proto, others):
750 751 opts = options('getbundle', gboptsmap.keys(), others)
751 752 for k, v in opts.iteritems():
752 753 keytype = gboptsmap[k]
753 754 if keytype == 'nodes':
754 755 opts[k] = decodelist(v)
755 756 elif keytype == 'csv':
756 757 opts[k] = list(v.split(','))
757 758 elif keytype == 'scsv':
758 759 opts[k] = set(v.split(','))
759 760 elif keytype == 'boolean':
760 761 # Client should serialize False as '0', which is a non-empty string
761 762 # so it evaluates as a True bool.
762 763 if v == '0':
763 764 opts[k] = False
764 765 else:
765 766 opts[k] = bool(v)
766 767 elif keytype != 'plain':
767 768 raise KeyError('unknown getbundle option type %s'
768 769 % keytype)
769 770
770 771 if not bundle1allowed(repo, 'pull'):
771 772 if not exchange.bundle2requested(opts.get('bundlecaps')):
772 773 return ooberror(bundle2required)
773 774
774 775 cg = exchange.getbundle(repo, 'serve', **opts)
775 776 return streamres(proto.groupchunks(cg))
776 777
777 778 @wireprotocommand('heads')
778 779 def heads(repo, proto):
779 780 h = repo.heads()
780 781 return encodelist(h) + "\n"
781 782
782 783 @wireprotocommand('hello')
783 784 def hello(repo, proto):
784 785 '''the hello command returns a set of lines describing various
785 786 interesting things about the server, in an RFC822-like format.
786 787 Currently the only one defined is "capabilities", which
787 788 consists of a line in the form:
788 789
789 790 capabilities: space separated list of tokens
790 791 '''
791 792 return "capabilities: %s\n" % (capabilities(repo, proto))
792 793
793 794 @wireprotocommand('listkeys', 'namespace')
794 795 def listkeys(repo, proto, namespace):
795 796 d = repo.listkeys(encoding.tolocal(namespace)).items()
796 797 return pushkeymod.encodekeys(d)
797 798
798 799 @wireprotocommand('lookup', 'key')
799 800 def lookup(repo, proto, key):
800 801 try:
801 802 k = encoding.tolocal(key)
802 803 c = repo[k]
803 804 r = c.hex()
804 805 success = 1
805 806 except Exception as inst:
806 807 r = str(inst)
807 808 success = 0
808 809 return "%s %s\n" % (success, r)
809 810
810 811 @wireprotocommand('known', 'nodes *')
811 812 def known(repo, proto, nodes, others):
812 813 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
813 814
814 815 @wireprotocommand('pushkey', 'namespace key old new')
815 816 def pushkey(repo, proto, namespace, key, old, new):
816 817 # compatibility with pre-1.8 clients which were accidentally
817 818 # sending raw binary nodes rather than utf-8-encoded hex
818 819 if len(new) == 20 and new.encode('string-escape') != new:
819 820 # looks like it could be a binary node
820 821 try:
821 822 new.decode('utf-8')
822 823 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
823 824 except UnicodeDecodeError:
824 825 pass # binary, leave unmodified
825 826 else:
826 827 new = encoding.tolocal(new) # normal path
827 828
828 829 if util.safehasattr(proto, 'restore'):
829 830
830 831 proto.redirect()
831 832
832 833 try:
833 834 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
834 835 encoding.tolocal(old), new) or False
835 836 except error.Abort:
836 837 r = False
837 838
838 839 output = proto.restore()
839 840
840 841 return '%s\n%s' % (int(r), output)
841 842
842 843 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
843 844 encoding.tolocal(old), new)
844 845 return '%s\n' % int(r)
845 846
846 847 @wireprotocommand('stream_out')
847 848 def stream(repo, proto):
848 849 '''If the server supports streaming clone, it advertises the "stream"
849 850 capability with a value representing the version and flags of the repo
850 851 it is serving. Client checks to see if it understands the format.
851 852 '''
852 853 if not streamclone.allowservergeneration(repo.ui):
853 854 return '1\n'
854 855
855 856 def getstream(it):
856 857 yield '0\n'
857 858 for chunk in it:
858 859 yield chunk
859 860
860 861 try:
861 862 # LockError may be raised before the first result is yielded. Don't
862 863 # emit output until we're sure we got the lock successfully.
863 864 it = streamclone.generatev1wireproto(repo)
864 865 return streamres(getstream(it))
865 866 except error.LockError:
866 867 return '2\n'
867 868
868 869 @wireprotocommand('unbundle', 'heads')
869 870 def unbundle(repo, proto, heads):
870 871 their_heads = decodelist(heads)
871 872
872 873 try:
873 874 proto.redirect()
874 875
875 876 exchange.check_heads(repo, their_heads, 'preparing changes')
876 877
877 878 # write bundle data to temporary file because it can be big
878 879 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
879 880 fp = os.fdopen(fd, 'wb+')
880 881 r = 0
881 882 try:
882 883 proto.getfile(fp)
883 884 fp.seek(0)
884 885 gen = exchange.readbundle(repo.ui, fp, None)
885 886 if (isinstance(gen, changegroupmod.cg1unpacker)
886 887 and not bundle1allowed(repo, 'push')):
887 888 return ooberror(bundle2required)
888 889
889 890 r = exchange.unbundle(repo, gen, their_heads, 'serve',
890 891 proto._client())
891 892 if util.safehasattr(r, 'addpart'):
892 893 # The return looks streamable, we are in the bundle2 case and
893 894 # should return a stream.
894 895 return streamres(r.getchunks())
895 896 return pushres(r)
896 897
897 898 finally:
898 899 fp.close()
899 900 os.unlink(tempname)
900 901
901 902 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
902 903 # handle non-bundle2 case first
903 904 if not getattr(exc, 'duringunbundle2', False):
904 905 try:
905 906 raise
906 907 except error.Abort:
907 908 # The old code we moved used sys.stderr directly.
908 909 # We did not change it to minimise code change.
909 910 # This need to be moved to something proper.
910 911 # Feel free to do it.
911 912 sys.stderr.write("abort: %s\n" % exc)
912 913 return pushres(0)
913 914 except error.PushRaced:
914 915 return pusherr(str(exc))
915 916
916 917 bundler = bundle2.bundle20(repo.ui)
917 918 for out in getattr(exc, '_bundle2salvagedoutput', ()):
918 919 bundler.addpart(out)
919 920 try:
920 921 try:
921 922 raise
922 923 except error.PushkeyFailed as exc:
923 924 # check client caps
924 925 remotecaps = getattr(exc, '_replycaps', None)
925 926 if (remotecaps is not None
926 927 and 'pushkey' not in remotecaps.get('error', ())):
927 928 # no support remote side, fallback to Abort handler.
928 929 raise
929 930 part = bundler.newpart('error:pushkey')
930 931 part.addparam('in-reply-to', exc.partid)
931 932 if exc.namespace is not None:
932 933 part.addparam('namespace', exc.namespace, mandatory=False)
933 934 if exc.key is not None:
934 935 part.addparam('key', exc.key, mandatory=False)
935 936 if exc.new is not None:
936 937 part.addparam('new', exc.new, mandatory=False)
937 938 if exc.old is not None:
938 939 part.addparam('old', exc.old, mandatory=False)
939 940 if exc.ret is not None:
940 941 part.addparam('ret', exc.ret, mandatory=False)
941 942 except error.BundleValueError as exc:
942 943 errpart = bundler.newpart('error:unsupportedcontent')
943 944 if exc.parttype is not None:
944 945 errpart.addparam('parttype', exc.parttype)
945 946 if exc.params:
946 947 errpart.addparam('params', '\0'.join(exc.params))
947 948 except error.Abort as exc:
948 949 manargs = [('message', str(exc))]
949 950 advargs = []
950 951 if exc.hint is not None:
951 952 advargs.append(('hint', exc.hint))
952 953 bundler.addpart(bundle2.bundlepart('error:abort',
953 954 manargs, advargs))
954 955 except error.PushRaced as exc:
955 956 bundler.newpart('error:pushraced', [('message', str(exc))])
956 957 return streamres(bundler.getchunks())
General Comments 0
You need to be logged in to leave comments. Login now