##// END OF EJS Templates
protocol: unify server-side capabilities functions
Matt Mackall -
r11594:67863f9d default
parent child Browse files
Show More
@@ -1,34 +1,23
1 1 #
2 2 # Copyright 21 May 2005 - (c) 2005 Jake Edge <jake@edge2.net>
3 3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 import cStringIO, zlib, tempfile, errno, os, sys, urllib, copy
9 9 from mercurial import util, streamclone, pushkey
10 10 from mercurial.node import bin, hex
11 11 from mercurial import changegroup as changegroupmod
12 12 from common import ErrorResponse, HTTP_OK, HTTP_NOT_FOUND, HTTP_SERVER_ERROR
13 13
14 14 # __all__ is populated with the allowed commands. Be sure to add to it if
15 15 # you're adding a new command, or the new command won't work.
16 16
17 17 __all__ = [
18 18 'lookup', 'heads', 'branches', 'between', 'changegroup',
19 19 'changegroupsubset', 'capabilities', 'unbundle', 'stream_out',
20 20 'branchmap', 'pushkey', 'listkeys'
21 21 ]
22 22
23 23 HGTYPE = 'application/mercurial-0.1'
24 basecaps = 'lookup changegroupsubset branchmap pushkey'.split()
25
26 def capabilities(repo, req):
27 caps = copy.copy(basecaps)
28 if streamclone.allowed(repo.ui):
29 caps.append('stream=%d' % repo.changelog.version)
30 if changegroupmod.bundlepriority:
31 caps.append('unbundle=%s' % ','.join(changegroupmod.bundlepriority))
32 rsp = ' '.join(caps)
33 req.respond(HTTP_OK, HGTYPE, length=len(rsp))
34 yield rsp
@@ -1,150 +1,134
1 1 # sshserver.py - ssh protocol server support for mercurial
2 2 #
3 3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
4 4 # Copyright 2006 Vadim Gelfer <vadim.gelfer@gmail.com>
5 5 #
6 6 # This software may be used and distributed according to the terms of the
7 7 # GNU General Public License version 2 or any later version.
8 8
9 9 from i18n import _
10 10 from node import bin, hex
11 11 import streamclone, util, hook, pushkey, wireproto
12 12 import os, sys, tempfile, urllib, copy
13 13
14 14 class sshserver(object):
15
16 caps = 'unbundle lookup changegroupsubset branchmap pushkey'.split()
17
18 15 def __init__(self, ui, repo):
19 16 self.ui = ui
20 17 self.repo = repo
21 18 self.lock = None
22 19 self.fin = sys.stdin
23 20 self.fout = sys.stdout
24 21
25 22 hook.redirect(True)
26 23 sys.stdout = sys.stderr
27 24
28 25 # Prevent insertion/deletion of CRs
29 26 util.set_binary(self.fin)
30 27 util.set_binary(self.fout)
31 28
32 29 def getargs(self, args):
33 30 data = {}
34 31 keys = args.split()
35 32 count = len(keys)
36 33 for n in xrange(len(keys)):
37 34 argline = self.fin.readline()[:-1]
38 35 arg, l = argline.split()
39 36 val = self.fin.read(int(l))
40 37 if arg not in keys:
41 38 raise util.Abort("unexpected parameter %r" % arg)
42 39 if arg == '*':
43 40 star = {}
44 41 for n in xrange(int(l)):
45 42 arg, l = argline.split()
46 43 val = self.fin.read(int(l))
47 44 star[arg] = val
48 45 data['*'] = star
49 46 else:
50 47 data[arg] = val
51 48 return [data[k] for k in keys]
52 49
53 50 def getarg(self, name):
54 51 return self.getargs(name)[0]
55 52
56 53 def respond(self, v):
57 54 self.fout.write("%d\n" % len(v))
58 55 self.fout.write(v)
59 56 self.fout.flush()
60 57
61 58 def sendchangegroup(self, changegroup):
62 59 while True:
63 60 d = changegroup.read(4096)
64 61 if not d:
65 62 break
66 63 self.fout.write(d)
67 64
68 65 self.fout.flush()
69 66
70 67 def sendstream(self, source):
71 68 for chunk in source:
72 69 self.fout.write(chunk)
73 70 self.fout.flush()
74 71
75 72 def getfile(self, fpout):
76 73 self.respond('')
77 74 count = int(self.fin.readline())
78 75 while count:
79 76 fpout.write(self.fin.read(count))
80 77 count = int(self.fin.readline())
81 78
82 79 def redirect(self):
83 80 pass
84 81
85 82 def respondpush(self, ret):
86 83 self.respond('')
87 84 self.respond(str(ret))
88 85
89 86 def serve_forever(self):
90 87 try:
91 88 while self.serve_one():
92 89 pass
93 90 finally:
94 91 if self.lock is not None:
95 92 self.lock.release()
96 93 sys.exit(0)
97 94
98 95 def serve_one(self):
99 96 cmd = self.fin.readline()[:-1]
100 97 if cmd and not wireproto.dispatch(self.repo, self, cmd):
101 98 impl = getattr(self, 'do_' + cmd, None)
102 99 if impl:
103 100 r = impl()
104 101 if r is not None:
105 102 self.respond(r)
106 103 else: self.respond("")
107 104 return cmd != ''
108 105
109 def do_hello(self):
110 '''the hello command returns a set of lines describing various
111 interesting things about the server, in an RFC822-like format.
112 Currently the only one defined is "capabilities", which
113 consists of a line in the form:
114
115 capabilities: space separated list of tokens
116 '''
117 caps = copy.copy(self.caps)
118 if streamclone.allowed(self.repo.ui):
119 caps.append('stream=%d' % self.repo.changelog.version)
120 return "capabilities: %s\n" % (' '.join(caps),)
121
122 106 def do_lock(self):
123 107 '''DEPRECATED - allowing remote client to lock repo is not safe'''
124 108
125 109 self.lock = self.repo.lock()
126 110 return ""
127 111
128 112 def do_unlock(self):
129 113 '''DEPRECATED'''
130 114
131 115 if self.lock:
132 116 self.lock.release()
133 117 self.lock = None
134 118 return ""
135 119
136 120 def do_addchangegroup(self):
137 121 '''DEPRECATED'''
138 122
139 123 if not self.lock:
140 124 self.respond("not locked")
141 125 return
142 126
143 127 self.respond("")
144 128 r = self.repo.addchangegroup(self.fin, 'serve', self._client(),
145 129 lock=self.lock)
146 130 return str(r)
147 131
148 132 def _client(self):
149 133 client = os.environ.get('SSH_CLIENT', '').split(' ', 1)[0]
150 134 return 'remote:ssh:' + client
@@ -1,264 +1,283
1 1 # wireproto.py - generic wire protocol support functions
2 2 #
3 3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 import urllib, tempfile, os
9 9 from i18n import _
10 10 from node import bin, hex
11 11 import changegroup as changegroupmod
12 12 import streamclone, repo, error, encoding, util
13 13 import pushkey as pushkey_
14 14
15 15 # client side
16 16
17 17 class wirerepository(repo.repository):
18 18 def lookup(self, key):
19 19 self.requirecap('lookup', _('look up remote revision'))
20 20 d = self._call("lookup", key=key)
21 21 success, data = d[:-1].split(" ", 1)
22 22 if int(success):
23 23 return bin(data)
24 24 self._abort(error.RepoError(data))
25 25
26 26 def heads(self):
27 27 d = self._call("heads")
28 28 try:
29 29 return map(bin, d[:-1].split(" "))
30 30 except:
31 31 self.abort(error.ResponseError(_("unexpected response:"), d))
32 32
33 33 def branchmap(self):
34 34 d = self._call("branchmap")
35 35 try:
36 36 branchmap = {}
37 37 for branchpart in d.splitlines():
38 38 branchheads = branchpart.split(' ')
39 39 branchname = urllib.unquote(branchheads[0])
40 40 # Earlier servers (1.3.x) send branch names in (their) local
41 41 # charset. The best we can do is assume it's identical to our
42 42 # own local charset, in case it's not utf-8.
43 43 try:
44 44 branchname.decode('utf-8')
45 45 except UnicodeDecodeError:
46 46 branchname = encoding.fromlocal(branchname)
47 47 branchheads = [bin(x) for x in branchheads[1:]]
48 48 branchmap[branchname] = branchheads
49 49 return branchmap
50 50 except TypeError:
51 51 self._abort(error.ResponseError(_("unexpected response:"), d))
52 52
53 53 def branches(self, nodes):
54 54 n = " ".join(map(hex, nodes))
55 55 d = self._call("branches", nodes=n)
56 56 try:
57 57 br = [tuple(map(bin, b.split(" "))) for b in d.splitlines()]
58 58 return br
59 59 except:
60 60 self._abort(error.ResponseError(_("unexpected response:"), d))
61 61
62 62 def between(self, pairs):
63 63 batch = 8 # avoid giant requests
64 64 r = []
65 65 for i in xrange(0, len(pairs), batch):
66 66 n = " ".join(["-".join(map(hex, p)) for p in pairs[i:i + batch]])
67 67 d = self._call("between", pairs=n)
68 68 try:
69 69 r += [l and map(bin, l.split(" ")) or []
70 70 for l in d.splitlines()]
71 71 except:
72 72 self._abort(error.ResponseError(_("unexpected response:"), d))
73 73 return r
74 74
75 75 def pushkey(self, namespace, key, old, new):
76 76 if not self.capable('pushkey'):
77 77 return False
78 78 d = self._call("pushkey",
79 79 namespace=namespace, key=key, old=old, new=new)
80 80 return bool(int(d))
81 81
82 82 def listkeys(self, namespace):
83 83 if not self.capable('pushkey'):
84 84 return {}
85 85 d = self._call("listkeys", namespace=namespace)
86 86 r = {}
87 87 for l in d.splitlines():
88 88 k, v = l.split('\t')
89 89 r[k.decode('string-escape')] = v.decode('string-escape')
90 90 return r
91 91
92 92 def stream_out(self):
93 93 return self._callstream('stream_out')
94 94
95 95 def changegroup(self, nodes, kind):
96 96 n = " ".join(map(hex, nodes))
97 97 f = self._callstream("changegroup", roots=n)
98 98 return self._decompress(f)
99 99
100 100 def changegroupsubset(self, bases, heads, kind):
101 101 self.requirecap('changegroupsubset', _('look up remote changes'))
102 102 bases = " ".join(map(hex, bases))
103 103 heads = " ".join(map(hex, heads))
104 104 return self._decompress(self._callstream("changegroupsubset",
105 105 bases=bases, heads=heads))
106 106
107 107 def unbundle(self, cg, heads, source):
108 108 '''Send cg (a readable file-like object representing the
109 109 changegroup to push, typically a chunkbuffer object) to the
110 110 remote server as a bundle. Return an integer indicating the
111 111 result of the push (see localrepository.addchangegroup()).'''
112 112
113 113 ret, output = self._callpush("unbundle", cg, heads=' '.join(map(hex, heads)))
114 114 if ret == "":
115 115 raise error.ResponseError(
116 116 _('push failed:'), output)
117 117 try:
118 118 ret = int(ret)
119 119 except ValueError, err:
120 120 raise error.ResponseError(
121 121 _('push failed (unexpected response):'), ret)
122 122
123 123 for l in output.splitlines(True):
124 124 self.ui.status(_('remote: '), l)
125 125 return ret
126 126
127 127 # server side
128 128
129 129 def dispatch(repo, proto, command):
130 130 if command not in commands:
131 131 return False
132 132 func, spec = commands[command]
133 133 args = proto.getargs(spec)
134 134 r = func(repo, proto, *args)
135 135 if r != None:
136 136 proto.respond(r)
137 137 return True
138 138
139 139 def between(repo, proto, pairs):
140 140 pairs = [map(bin, p.split("-")) for p in pairs.split(" ")]
141 141 r = []
142 142 for b in repo.between(pairs):
143 143 r.append(" ".join(map(hex, b)) + "\n")
144 144 return "".join(r)
145 145
146 146 def branchmap(repo, proto):
147 147 branchmap = repo.branchmap()
148 148 heads = []
149 149 for branch, nodes in branchmap.iteritems():
150 150 branchname = urllib.quote(branch)
151 151 branchnodes = [hex(node) for node in nodes]
152 152 heads.append('%s %s' % (branchname, ' '.join(branchnodes)))
153 153 return '\n'.join(heads)
154 154
155 155 def branches(repo, proto, nodes):
156 156 nodes = map(bin, nodes.split(" "))
157 157 r = []
158 158 for b in repo.branches(nodes):
159 159 r.append(" ".join(map(hex, b)) + "\n")
160 160 return "".join(r)
161 161
162 def capabilities(repo, proto):
163 caps = 'lookup changegroupsubset branchmap pushkey'.split()
164 if streamclone.allowed(repo.ui):
165 caps.append('stream=%d' % repo.changelog.version)
166 caps.append('unbundle=%s' % ','.join(changegroupmod.bundlepriority))
167 return ' '.join(caps)
168
162 169 def changegroup(repo, proto, roots):
163 170 nodes = map(bin, roots.split(" "))
164 171 cg = repo.changegroup(nodes, 'serve')
165 172 proto.sendchangegroup(cg)
166 173
167 174 def changegroupsubset(repo, proto, bases, heads):
168 175 bases = [bin(n) for n in bases.split(' ')]
169 176 heads = [bin(n) for n in heads.split(' ')]
170 177 cg = repo.changegroupsubset(bases, heads, 'serve')
171 178 proto.sendchangegroup(cg)
172 179
173 180 def heads(repo, proto):
174 181 h = repo.heads()
175 182 return " ".join(map(hex, h)) + "\n"
176 183
184 def hello(repo, proto):
185 '''the hello command returns a set of lines describing various
186 interesting things about the server, in an RFC822-like format.
187 Currently the only one defined is "capabilities", which
188 consists of a line in the form:
189
190 capabilities: space separated list of tokens
191 '''
192 return "capabilities: %s\n" % (capabilities(repo, proto))
193
177 194 def listkeys(repo, proto, namespace):
178 195 d = pushkey_.list(repo, namespace).items()
179 196 t = '\n'.join(['%s\t%s' % (k.encode('string-escape'),
180 197 v.encode('string-escape')) for k, v in d])
181 198 return t
182 199
183 200 def lookup(repo, proto, key):
184 201 try:
185 202 r = hex(repo.lookup(key))
186 203 success = 1
187 204 except Exception, inst:
188 205 r = str(inst)
189 206 success = 0
190 207 return "%s %s\n" % (success, r)
191 208
192 209 def pushkey(repo, proto, namespace, key, old, new):
193 210 r = pushkey_.push(repo, namespace, key, old, new)
194 211 return '%s\n' % int(r)
195 212
196 213 def stream(repo, proto):
197 214 try:
198 215 proto.sendstream(streamclone.stream_out(repo))
199 216 except streamclone.StreamException, inst:
200 217 return str(inst)
201 218
202 219 def unbundle(repo, proto, heads):
203 220 their_heads = heads.split()
204 221
205 222 def check_heads():
206 223 heads = map(hex, repo.heads())
207 224 return their_heads == [hex('force')] or their_heads == heads
208 225
209 226 # fail early if possible
210 227 if not check_heads():
211 228 repo.respond(_('unsynced changes'))
212 229 return
213 230
214 231 # write bundle data to temporary file because it can be big
215 232 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
216 233 fp = os.fdopen(fd, 'wb+')
217 234 r = 0
218 235 proto.redirect()
219 236 try:
220 237 proto.getfile(fp)
221 238 lock = repo.lock()
222 239 try:
223 240 if not check_heads():
224 241 # someone else committed/pushed/unbundled while we
225 242 # were transferring data
226 243 proto.respond(_('unsynced changes'))
227 244 return
228 245
229 246 # push can proceed
230 247 fp.seek(0)
231 248 header = fp.read(6)
232 249 if header.startswith('HG'):
233 250 if not header.startswith('HG10'):
234 251 raise ValueError('unknown bundle version')
235 252 elif header not in changegroupmod.bundletypes:
236 253 raise ValueError('unknown bundle compression type')
237 254 gen = changegroupmod.unbundle(header, fp)
238 255
239 256 try:
240 257 r = repo.addchangegroup(gen, 'serve', proto._client(),
241 258 lock=lock)
242 259 except util.Abort, inst:
243 260 sys.stderr.write("abort: %s\n" % inst)
244 261 finally:
245 262 lock.release()
246 263 proto.respondpush(r)
247 264
248 265 finally:
249 266 fp.close()
250 267 os.unlink(tempname)
251 268
252 269 commands = {
253 270 'between': (between, 'pairs'),
254 271 'branchmap': (branchmap, ''),
255 272 'branches': (branches, 'nodes'),
273 'capabilities': (capabilities, ''),
256 274 'changegroup': (changegroup, 'roots'),
257 275 'changegroupsubset': (changegroupsubset, 'bases heads'),
258 276 'heads': (heads, ''),
277 'hello': (hello, ''),
259 278 'listkeys': (listkeys, 'namespace'),
260 279 'lookup': (lookup, 'key'),
261 280 'pushkey': (pushkey, 'namespace key old new'),
262 281 'stream_out': (stream, ''),
263 282 'unbundle': (unbundle, 'heads'),
264 283 }
General Comments 0
You need to be logged in to leave comments. Login now