##// END OF EJS Templates
protocol: wrap non-string protocol responses in classes
Dirkjan Ochtman -
r11625:cdeb8613 default
parent child Browse files
Show More
@@ -1,70 +1,78 b''
1 1 #
2 2 # Copyright 21 May 2005 - (c) 2005 Jake Edge <jake@edge2.net>
3 3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 import cStringIO, zlib, sys, urllib
9 9 from mercurial import util, wireproto
10 10 from common import HTTP_OK
11 11
12 12 HGTYPE = 'application/mercurial-0.1'
13 13
14 14 class webproto(object):
15 15 def __init__(self, req):
16 16 self.req = req
17 17 self.response = ''
18 18 def getargs(self, args):
19 19 data = {}
20 20 keys = args.split()
21 21 for k in keys:
22 22 if k == '*':
23 23 star = {}
24 24 for key in self.req.form.keys():
25 25 if key not in keys:
26 26 star[key] = self.req.form[key][0]
27 27 data['*'] = star
28 28 else:
29 29 data[k] = self.req.form[k][0]
30 30 return [data[k] for k in keys]
31 31 def getfile(self, fp):
32 32 length = int(self.req.env['CONTENT_LENGTH'])
33 33 for s in util.filechunkiter(self.req, limit=length):
34 34 fp.write(s)
35 35 def redirect(self):
36 36 self.oldio = sys.stdout, sys.stderr
37 37 sys.stderr = sys.stdout = cStringIO.StringIO()
38 38 def groupchunks(self, cg):
39 39 z = zlib.compressobj()
40 40 while 1:
41 41 chunk = cg.read(4096)
42 42 if not chunk:
43 43 break
44 44 yield z.compress(chunk)
45 45 yield z.flush()
46 46 def sendresponse(self, s):
47 47 self.req.respond(HTTP_OK, HGTYPE, length=len(s))
48 48 self.response = s
49 49 def sendstream(self, source):
50 50 self.req.respond(HTTP_OK, HGTYPE)
51 for chunk in source:
52 self.req.write(str(chunk))
53 def sendpushresponse(self, ret):
51 for chunk in source.gen:
52 self.req.write(chunk)
53 def sendpushresponse(self, rsp):
54 54 val = sys.stdout.getvalue()
55 55 sys.stdout, sys.stderr = self.oldio
56 56 self.req.respond(HTTP_OK, HGTYPE)
57 self.response = '%d\n%s' % (ret, val)
57 self.response = '%d\n%s' % (rsp.res, val)
58
59 handlers = {
60 str: sendresponse,
61 wireproto.streamres: sendstream,
62 wireproto.pushres: sendpushresponse,
63 }
64
58 65 def _client(self):
59 66 return 'remote:%s:%s:%s' % (
60 67 self.req.env.get('wsgi.url_scheme') or 'http',
61 68 urllib.quote(self.req.env.get('REMOTE_HOST', '')),
62 69 urllib.quote(self.req.env.get('REMOTE_USER', '')))
63 70
64 71 def iscmd(cmd):
65 72 return cmd in wireproto.commands
66 73
67 74 def call(repo, req, cmd):
68 75 p = webproto(req)
69 wireproto.dispatch(repo, p, cmd)
70 yield p.response
76 rsp = wireproto.dispatch(repo, p, cmd)
77 webproto.handlers[rsp.__class__](p, rsp)
78 return [p.response]
@@ -1,133 +1,140 b''
1 1 # sshserver.py - ssh protocol server support for mercurial
2 2 #
3 3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
4 4 # Copyright 2006 Vadim Gelfer <vadim.gelfer@gmail.com>
5 5 #
6 6 # This software may be used and distributed according to the terms of the
7 7 # GNU General Public License version 2 or any later version.
8 8
9 9 from i18n import _
10 10 import util, hook, wireproto
11 11 import os, sys
12 12
13 13 class sshserver(object):
14 14 def __init__(self, ui, repo):
15 15 self.ui = ui
16 16 self.repo = repo
17 17 self.lock = None
18 18 self.fin = sys.stdin
19 19 self.fout = sys.stdout
20 20
21 21 hook.redirect(True)
22 22 sys.stdout = sys.stderr
23 23
24 24 # Prevent insertion/deletion of CRs
25 25 util.set_binary(self.fin)
26 26 util.set_binary(self.fout)
27 27
28 28 def getargs(self, args):
29 29 data = {}
30 30 keys = args.split()
31 31 count = len(keys)
32 32 for n in xrange(len(keys)):
33 33 argline = self.fin.readline()[:-1]
34 34 arg, l = argline.split()
35 35 val = self.fin.read(int(l))
36 36 if arg not in keys:
37 37 raise util.Abort("unexpected parameter %r" % arg)
38 38 if arg == '*':
39 39 star = {}
40 40 for n in xrange(int(l)):
41 41 arg, l = argline.split()
42 42 val = self.fin.read(int(l))
43 43 star[arg] = val
44 44 data['*'] = star
45 45 else:
46 46 data[arg] = val
47 47 return [data[k] for k in keys]
48 48
49 49 def getarg(self, name):
50 50 return self.getargs(name)[0]
51 51
52 52 def getfile(self, fpout):
53 53 self.sendresponse('')
54 54 count = int(self.fin.readline())
55 55 while count:
56 56 fpout.write(self.fin.read(count))
57 57 count = int(self.fin.readline())
58 58
59 59 def redirect(self):
60 60 pass
61 61
62 62 def groupchunks(self, changegroup):
63 63 while True:
64 64 d = changegroup.read(4096)
65 65 if not d:
66 66 break
67 67 yield d
68 68
69 69 def sendresponse(self, v):
70 70 self.fout.write("%d\n" % len(v))
71 71 self.fout.write(v)
72 72 self.fout.flush()
73 73
74 74 def sendstream(self, source):
75 for chunk in source:
75 for chunk in source.gen:
76 76 self.fout.write(chunk)
77 77 self.fout.flush()
78 78
79 def sendpushresponse(self, ret):
79 def sendpushresponse(self, rsp):
80 80 self.sendresponse('')
81 self.sendresponse(str(ret))
81 self.sendresponse(str(rsp.res))
82 82
83 83 def serve_forever(self):
84 84 try:
85 85 while self.serve_one():
86 86 pass
87 87 finally:
88 88 if self.lock is not None:
89 89 self.lock.release()
90 90 sys.exit(0)
91 91
92 handlers = {
93 str: sendresponse,
94 wireproto.streamres: sendstream,
95 wireproto.pushres: sendpushresponse,
96 }
97
92 98 def serve_one(self):
93 99 cmd = self.fin.readline()[:-1]
94 100 if cmd and cmd in wireproto.commands:
95 wireproto.dispatch(self.repo, self, cmd)
101 rsp = wireproto.dispatch(self.repo, self, cmd)
102 self.handlers[rsp.__class__](self, rsp)
96 103 elif cmd:
97 104 impl = getattr(self, 'do_' + cmd, None)
98 105 if impl:
99 106 r = impl()
100 107 if r is not None:
101 108 self.sendresponse(r)
102 109 else: self.sendresponse("")
103 110 return cmd != ''
104 111
105 112 def do_lock(self):
106 113 '''DEPRECATED - allowing remote client to lock repo is not safe'''
107 114
108 115 self.lock = self.repo.lock()
109 116 return ""
110 117
111 118 def do_unlock(self):
112 119 '''DEPRECATED'''
113 120
114 121 if self.lock:
115 122 self.lock.release()
116 123 self.lock = None
117 124 return ""
118 125
119 126 def do_addchangegroup(self):
120 127 '''DEPRECATED'''
121 128
122 129 if not self.lock:
123 130 self.sendresponse("not locked")
124 131 return
125 132
126 133 self.sendresponse("")
127 134 r = self.repo.addchangegroup(self.fin, 'serve', self._client(),
128 135 lock=self.lock)
129 136 return str(r)
130 137
131 138 def _client(self):
132 139 client = os.environ.get('SSH_CLIENT', '').split(' ', 1)[0]
133 140 return 'remote:ssh:' + client
@@ -1,282 +1,288 b''
1 1 # wireproto.py - generic wire protocol support functions
2 2 #
3 3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 import urllib, tempfile, os
9 9 from i18n import _
10 10 from node import bin, hex
11 11 import changegroup as changegroupmod
12 12 import streamclone, repo, error, encoding, util
13 13 import pushkey as pushkey_
14 14
15 15 # list of nodes encoding / decoding
16 16
17 17 def decodelist(l, sep=' '):
18 18 return map(bin, l.split(sep))
19 19
20 20 def encodelist(l, sep=' '):
21 21 return sep.join(map(hex, l))
22 22
23 23 # client side
24 24
25 25 class wirerepository(repo.repository):
26 26 def lookup(self, key):
27 27 self.requirecap('lookup', _('look up remote revision'))
28 28 d = self._call("lookup", key=key)
29 29 success, data = d[:-1].split(" ", 1)
30 30 if int(success):
31 31 return bin(data)
32 32 self._abort(error.RepoError(data))
33 33
34 34 def heads(self):
35 35 d = self._call("heads")
36 36 try:
37 37 return decodelist(d[:-1])
38 38 except:
39 39 self.abort(error.ResponseError(_("unexpected response:"), d))
40 40
41 41 def branchmap(self):
42 42 d = self._call("branchmap")
43 43 try:
44 44 branchmap = {}
45 45 for branchpart in d.splitlines():
46 46 branchname, branchheads = branchpart.split(' ', 1)
47 47 branchname = urllib.unquote(branchname)
48 48 # Earlier servers (1.3.x) send branch names in (their) local
49 49 # charset. The best we can do is assume it's identical to our
50 50 # own local charset, in case it's not utf-8.
51 51 try:
52 52 branchname.decode('utf-8')
53 53 except UnicodeDecodeError:
54 54 branchname = encoding.fromlocal(branchname)
55 55 branchheads = decodelist(branchheads)
56 56 branchmap[branchname] = branchheads
57 57 return branchmap
58 58 except TypeError:
59 59 self._abort(error.ResponseError(_("unexpected response:"), d))
60 60
61 61 def branches(self, nodes):
62 62 n = encodelist(nodes)
63 63 d = self._call("branches", nodes=n)
64 64 try:
65 65 br = [tuple(decodelist(b)) for b in d.splitlines()]
66 66 return br
67 67 except:
68 68 self._abort(error.ResponseError(_("unexpected response:"), d))
69 69
70 70 def between(self, pairs):
71 71 batch = 8 # avoid giant requests
72 72 r = []
73 73 for i in xrange(0, len(pairs), batch):
74 74 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
75 75 d = self._call("between", pairs=n)
76 76 try:
77 77 r.extend(l and decodelist(l) or [] for l in d.splitlines())
78 78 except:
79 79 self._abort(error.ResponseError(_("unexpected response:"), d))
80 80 return r
81 81
82 82 def pushkey(self, namespace, key, old, new):
83 83 if not self.capable('pushkey'):
84 84 return False
85 85 d = self._call("pushkey",
86 86 namespace=namespace, key=key, old=old, new=new)
87 87 return bool(int(d))
88 88
89 89 def listkeys(self, namespace):
90 90 if not self.capable('pushkey'):
91 91 return {}
92 92 d = self._call("listkeys", namespace=namespace)
93 93 r = {}
94 94 for l in d.splitlines():
95 95 k, v = l.split('\t')
96 96 r[k.decode('string-escape')] = v.decode('string-escape')
97 97 return r
98 98
99 99 def stream_out(self):
100 100 return self._callstream('stream_out')
101 101
102 102 def changegroup(self, nodes, kind):
103 103 n = encodelist(nodes)
104 104 f = self._callstream("changegroup", roots=n)
105 105 return self._decompress(f)
106 106
107 107 def changegroupsubset(self, bases, heads, kind):
108 108 self.requirecap('changegroupsubset', _('look up remote changes'))
109 109 bases = encodelist(bases)
110 110 heads = encodelist(heads)
111 111 return self._decompress(self._callstream("changegroupsubset",
112 112 bases=bases, heads=heads))
113 113
114 114 def unbundle(self, cg, heads, source):
115 115 '''Send cg (a readable file-like object representing the
116 116 changegroup to push, typically a chunkbuffer object) to the
117 117 remote server as a bundle. Return an integer indicating the
118 118 result of the push (see localrepository.addchangegroup()).'''
119 119
120 120 ret, output = self._callpush("unbundle", cg, heads=encodelist(heads))
121 121 if ret == "":
122 122 raise error.ResponseError(
123 123 _('push failed:'), output)
124 124 try:
125 125 ret = int(ret)
126 126 except ValueError, err:
127 127 raise error.ResponseError(
128 128 _('push failed (unexpected response):'), ret)
129 129
130 130 for l in output.splitlines(True):
131 131 self.ui.status(_('remote: '), l)
132 132 return ret
133 133
134 134 # server side
135 135
136 class streamres(object):
137 def __init__(self, gen):
138 self.gen = gen
139
140 class pushres(object):
141 def __init__(self, res):
142 self.res = res
143
136 144 def dispatch(repo, proto, command):
137 145 func, spec = commands[command]
138 146 args = proto.getargs(spec)
139 r = func(repo, proto, *args)
140 if r != None:
141 proto.sendresponse(r)
147 return func(repo, proto, *args)
142 148
143 149 def between(repo, proto, pairs):
144 150 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
145 151 r = []
146 152 for b in repo.between(pairs):
147 153 r.append(encodelist(b) + "\n")
148 154 return "".join(r)
149 155
150 156 def branchmap(repo, proto):
151 157 branchmap = repo.branchmap()
152 158 heads = []
153 159 for branch, nodes in branchmap.iteritems():
154 160 branchname = urllib.quote(branch)
155 161 branchnodes = encodelist(nodes)
156 162 heads.append('%s %s' % (branchname, branchnodes))
157 163 return '\n'.join(heads)
158 164
159 165 def branches(repo, proto, nodes):
160 166 nodes = decodelist(nodes)
161 167 r = []
162 168 for b in repo.branches(nodes):
163 169 r.append(encodelist(b) + "\n")
164 170 return "".join(r)
165 171
166 172 def capabilities(repo, proto):
167 173 caps = 'lookup changegroupsubset branchmap pushkey'.split()
168 174 if streamclone.allowed(repo.ui):
169 175 caps.append('stream=%d' % repo.changelog.version)
170 176 caps.append('unbundle=%s' % ','.join(changegroupmod.bundlepriority))
171 177 return ' '.join(caps)
172 178
173 179 def changegroup(repo, proto, roots):
174 180 nodes = decodelist(roots)
175 181 cg = repo.changegroup(nodes, 'serve')
176 proto.sendstream(proto.groupchunks(cg))
182 return streamres(proto.groupchunks(cg))
177 183
178 184 def changegroupsubset(repo, proto, bases, heads):
179 185 bases = decodelist(bases)
180 186 heads = decodelist(heads)
181 187 cg = repo.changegroupsubset(bases, heads, 'serve')
182 proto.sendstream(proto.groupchunks(cg))
188 return streamres(proto.groupchunks(cg))
183 189
184 190 def heads(repo, proto):
185 191 h = repo.heads()
186 192 return encodelist(h) + "\n"
187 193
188 194 def hello(repo, proto):
189 195 '''the hello command returns a set of lines describing various
190 196 interesting things about the server, in an RFC822-like format.
191 197 Currently the only one defined is "capabilities", which
192 198 consists of a line in the form:
193 199
194 200 capabilities: space separated list of tokens
195 201 '''
196 202 return "capabilities: %s\n" % (capabilities(repo, proto))
197 203
198 204 def listkeys(repo, proto, namespace):
199 205 d = pushkey_.list(repo, namespace).items()
200 206 t = '\n'.join(['%s\t%s' % (k.encode('string-escape'),
201 207 v.encode('string-escape')) for k, v in d])
202 208 return t
203 209
204 210 def lookup(repo, proto, key):
205 211 try:
206 212 r = hex(repo.lookup(key))
207 213 success = 1
208 214 except Exception, inst:
209 215 r = str(inst)
210 216 success = 0
211 217 return "%s %s\n" % (success, r)
212 218
213 219 def pushkey(repo, proto, namespace, key, old, new):
214 220 r = pushkey_.push(repo, namespace, key, old, new)
215 221 return '%s\n' % int(r)
216 222
217 223 def stream(repo, proto):
218 proto.sendstream(streamclone.stream_out(repo))
224 return streamres(streamclone.stream_out(repo))
219 225
220 226 def unbundle(repo, proto, heads):
221 227 their_heads = decodelist(heads)
222 228
223 229 def check_heads():
224 230 heads = repo.heads()
225 231 return their_heads == ['force'] or their_heads == heads
226 232
227 233 # fail early if possible
228 234 if not check_heads():
229 235 return 'unsynced changes'
230 236
231 237 # write bundle data to temporary file because it can be big
232 238 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
233 239 fp = os.fdopen(fd, 'wb+')
234 240 r = 0
235 241 proto.redirect()
236 242 try:
237 243 proto.getfile(fp)
238 244 lock = repo.lock()
239 245 try:
240 246 if not check_heads():
241 247 # someone else committed/pushed/unbundled while we
242 248 # were transferring data
243 249 return 'unsynced changes'
244 250
245 251 # push can proceed
246 252 fp.seek(0)
247 253 header = fp.read(6)
248 254 if header.startswith('HG'):
249 255 if not header.startswith('HG10'):
250 256 raise ValueError('unknown bundle version')
251 257 elif header not in changegroupmod.bundletypes:
252 258 raise ValueError('unknown bundle compression type')
253 259 gen = changegroupmod.unbundle(header, fp)
254 260
255 261 try:
256 262 r = repo.addchangegroup(gen, 'serve', proto._client(),
257 263 lock=lock)
258 264 except util.Abort, inst:
259 265 sys.stderr.write("abort: %s\n" % inst)
260 266 finally:
261 267 lock.release()
262 proto.sendpushresponse(r)
268 return pushres(r)
263 269
264 270 finally:
265 271 fp.close()
266 272 os.unlink(tempname)
267 273
268 274 commands = {
269 275 'between': (between, 'pairs'),
270 276 'branchmap': (branchmap, ''),
271 277 'branches': (branches, 'nodes'),
272 278 'capabilities': (capabilities, ''),
273 279 'changegroup': (changegroup, 'roots'),
274 280 'changegroupsubset': (changegroupsubset, 'bases heads'),
275 281 'heads': (heads, ''),
276 282 'hello': (hello, ''),
277 283 'listkeys': (listkeys, 'namespace'),
278 284 'lookup': (lookup, 'key'),
279 285 'pushkey': (pushkey, 'namespace key old new'),
280 286 'stream_out': (stream, ''),
281 287 'unbundle': (unbundle, 'heads'),
282 288 }
General Comments 0
You need to be logged in to leave comments. Login now