##// END OF EJS Templates
protocol: introduce wireproto.py...
Matt Mackall -
r11581:4530b330 default
parent child Browse files
Show More
@@ -0,0 +1,75
1 # wireproto.py - generic wire protocol support functions
2 #
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 #
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
7
8 from i18n import _
9 from node import bin, hex
10 import urllib
11 import pushkey as pushkey_
12
13 def dispatch(repo, proto, command):
14 if command not in commands:
15 return False
16 func, spec = commands[command]
17 args = proto.getargs(spec)
18 proto.respond(func(repo, *args))
19 return True
20
21 def between(repo, pairs):
22 pairs = [map(bin, p.split("-")) for p in pairs.split(" ")]
23 r = []
24 for b in repo.between(pairs):
25 r.append(" ".join(map(hex, b)) + "\n")
26 return "".join(r)
27
28 def branchmap(repo):
29 branchmap = repo.branchmap()
30 heads = []
31 for branch, nodes in branchmap.iteritems():
32 branchname = urllib.quote(branch)
33 branchnodes = [hex(node) for node in nodes]
34 heads.append('%s %s' % (branchname, ' '.join(branchnodes)))
35 return '\n'.join(heads)
36
37 def branches(repo, nodes):
38 nodes = map(bin, nodes.split(" "))
39 r = []
40 for b in repo.branches(nodes):
41 r.append(" ".join(map(hex, b)) + "\n")
42 return "".join(r)
43
44 def heads(repo):
45 h = repo.heads()
46 return " ".join(map(hex, h)) + "\n"
47
48 def listkeys(repo, namespace):
49 d = pushkey_.list(repo, namespace).items()
50 t = '\n'.join(['%s\t%s' % (k.encode('string-escape'),
51 v.encode('string-escape')) for k, v in d])
52 return t
53
54 def lookup(repo, key):
55 try:
56 r = hex(repo.lookup(key))
57 success = 1
58 except Exception, inst:
59 r = str(inst)
60 success = 0
61 return "%s %s\n" % (success, r)
62
63 def pushkey(repo, namespace, key, old, new):
64 r = pushkey_.push(repo, namespace, key, old, new)
65 return '%s\n' % int(r)
66
67 commands = {
68 'between': (between, 'pairs'),
69 'branchmap': (branchmap, ''),
70 'branches': (branches, 'nodes'),
71 'heads': (heads, ''),
72 'listkeys': (listkeys, 'namespace'),
73 'lookup': (lookup, 'key'),
74 'pushkey': (pushkey, 'namespace key old new'),
75 }
@@ -1,258 +1,207
1 1 # sshserver.py - ssh protocol server support for mercurial
2 2 #
3 3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
4 4 # Copyright 2006 Vadim Gelfer <vadim.gelfer@gmail.com>
5 5 #
6 6 # This software may be used and distributed according to the terms of the
7 7 # GNU General Public License version 2 or any later version.
8 8
9 9 from i18n import _
10 10 from node import bin, hex
11 import streamclone, util, hook, pushkey
11 import streamclone, util, hook, pushkey, wireproto
12 12 import os, sys, tempfile, urllib, copy
13 13
14 14 class sshserver(object):
15 15
16 16 caps = 'unbundle lookup changegroupsubset branchmap pushkey'.split()
17 17
18 18 def __init__(self, ui, repo):
19 19 self.ui = ui
20 20 self.repo = repo
21 21 self.lock = None
22 22 self.fin = sys.stdin
23 23 self.fout = sys.stdout
24 24
25 25 hook.redirect(True)
26 26 sys.stdout = sys.stderr
27 27
28 28 # Prevent insertion/deletion of CRs
29 29 util.set_binary(self.fin)
30 30 util.set_binary(self.fout)
31 31
32 32 def getargs(self, args):
33 33 data = {}
34 34 keys = args.split()
35 35 count = len(keys)
36 36 for n in xrange(len(keys)):
37 37 argline = self.fin.readline()[:-1]
38 38 arg, l = argline.split()
39 39 val = self.fin.read(int(l))
40 40 if arg not in keys:
41 41 raise util.Abort("unexpected parameter %r" % arg)
42 42 if arg == '*':
43 43 star = {}
44 44 for n in xrange(int(l)):
45 45 arg, l = argline.split()
46 46 val = self.fin.read(int(l))
47 47 star[arg] = val
48 48 data['*'] = star
49 49 else:
50 50 data[arg] = val
51 51 return [data[k] for k in keys]
52 52
53 53 def getarg(self, name):
54 54 return self.getargs(name)[0]
55 55
56 56 def respond(self, v):
57 57 self.fout.write("%d\n" % len(v))
58 58 self.fout.write(v)
59 59 self.fout.flush()
60 60
61 61 def serve_forever(self):
62 62 try:
63 63 while self.serve_one():
64 64 pass
65 65 finally:
66 66 if self.lock is not None:
67 67 self.lock.release()
68 68 sys.exit(0)
69 69
70 70 def serve_one(self):
71 71 cmd = self.fin.readline()[:-1]
72 if cmd:
72 if cmd and not wireproto.dispatch(self.repo, self, cmd):
73 73 impl = getattr(self, 'do_' + cmd, None)
74 74 if impl:
75 75 r = impl()
76 76 if r is not None:
77 77 self.respond(r)
78 78 else: self.respond("")
79 79 return cmd != ''
80 80
81 def do_lookup(self):
82 key = self.getarg('key')
83 try:
84 r = hex(self.repo.lookup(key))
85 success = 1
86 except Exception, inst:
87 r = str(inst)
88 success = 0
89 return "%s %s\n" % (success, r)
90
91 def do_branchmap(self):
92 branchmap = self.repo.branchmap()
93 heads = []
94 for branch, nodes in branchmap.iteritems():
95 branchname = urllib.quote(branch)
96 branchnodes = [hex(node) for node in nodes]
97 heads.append('%s %s' % (branchname, ' '.join(branchnodes)))
98 return '\n'.join(heads)
99
100 def do_heads(self):
101 h = self.repo.heads()
102 return " ".join(map(hex, h)) + "\n"
103
104 81 def do_hello(self):
105 82 '''the hello command returns a set of lines describing various
106 83 interesting things about the server, in an RFC822-like format.
107 84 Currently the only one defined is "capabilities", which
108 85 consists of a line in the form:
109 86
110 87 capabilities: space separated list of tokens
111 88 '''
112 89 caps = copy.copy(self.caps)
113 90 if streamclone.allowed(self.repo.ui):
114 91 caps.append('stream=%d' % self.repo.changelog.version)
115 92 return "capabilities: %s\n" % (' '.join(caps),)
116 93
117 94 def do_lock(self):
118 95 '''DEPRECATED - allowing remote client to lock repo is not safe'''
119 96
120 97 self.lock = self.repo.lock()
121 98 return ""
122 99
123 100 def do_unlock(self):
124 101 '''DEPRECATED'''
125 102
126 103 if self.lock:
127 104 self.lock.release()
128 105 self.lock = None
129 106 return ""
130 107
131 def do_branches(self):
132 nodes = self.getarg('nodes')
133 nodes = map(bin, nodes.split(" "))
134 r = []
135 for b in self.repo.branches(nodes):
136 r.append(" ".join(map(hex, b)) + "\n")
137 return "".join(r)
138
139 def do_between(self):
140 pairs = self.getarg('pairs')
141 pairs = [map(bin, p.split("-")) for p in pairs.split(" ")]
142 r = []
143 for b in self.repo.between(pairs):
144 r.append(" ".join(map(hex, b)) + "\n")
145 return "".join(r)
146
147 108 def do_changegroup(self):
148 109 nodes = []
149 110 roots = self.getarg('roots')
150 111 nodes = map(bin, roots.split(" "))
151 112
152 113 cg = self.repo.changegroup(nodes, 'serve')
153 114 while True:
154 115 d = cg.read(4096)
155 116 if not d:
156 117 break
157 118 self.fout.write(d)
158 119
159 120 self.fout.flush()
160 121
161 122 def do_changegroupsubset(self):
162 123 bases, heads = self.getargs('bases heads')
163 124 bases = [bin(n) for n in bases.split(' ')]
164 125 heads = [bin(n) for n in heads.split(' ')]
165 126
166 127 cg = self.repo.changegroupsubset(bases, heads, 'serve')
167 128 while True:
168 129 d = cg.read(4096)
169 130 if not d:
170 131 break
171 132 self.fout.write(d)
172 133
173 134 self.fout.flush()
174 135
175 136 def do_addchangegroup(self):
176 137 '''DEPRECATED'''
177 138
178 139 if not self.lock:
179 140 self.respond("not locked")
180 141 return
181 142
182 143 self.respond("")
183 144 r = self.repo.addchangegroup(self.fin, 'serve', self.client_url(),
184 145 lock=self.lock)
185 146 return str(r)
186 147
187 148 def client_url(self):
188 149 client = os.environ.get('SSH_CLIENT', '').split(' ', 1)[0]
189 150 return 'remote:ssh:' + client
190 151
191 152 def do_unbundle(self):
192 153 their_heads = self.getarg('heads').split()
193 154
194 155 def check_heads():
195 156 heads = map(hex, self.repo.heads())
196 157 return their_heads == [hex('force')] or their_heads == heads
197 158
198 159 # fail early if possible
199 160 if not check_heads():
200 161 self.respond(_('unsynced changes'))
201 162 return
202 163
203 164 self.respond('')
204 165
205 166 # write bundle data to temporary file because it can be big
206 167 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
207 168 fp = os.fdopen(fd, 'wb+')
208 169 try:
209 170 count = int(self.fin.readline())
210 171 while count:
211 172 fp.write(self.fin.read(count))
212 173 count = int(self.fin.readline())
213 174
214 175 was_locked = self.lock is not None
215 176 if not was_locked:
216 177 self.lock = self.repo.lock()
217 178 try:
218 179 if not check_heads():
219 180 # someone else committed/pushed/unbundled while we
220 181 # were transferring data
221 182 self.respond(_('unsynced changes'))
222 183 return
223 184 self.respond('')
224 185
225 186 # push can proceed
226 187
227 188 fp.seek(0)
228 189 r = self.repo.addchangegroup(fp, 'serve', self.client_url(),
229 190 lock=self.lock)
230 191 self.respond(str(r))
231 192 finally:
232 193 if not was_locked:
233 194 self.lock.release()
234 195 self.lock = None
235 196 finally:
236 197 fp.close()
237 198 os.unlink(tempname)
238 199
239 200 def do_stream_out(self):
240 201 try:
241 202 for chunk in streamclone.stream_out(self.repo):
242 203 self.fout.write(chunk)
243 204 self.fout.flush()
244 205 except streamclone.StreamException, inst:
245 206 self.fout.write(str(inst))
246 207 self.fout.flush()
247
248 def do_pushkey(self):
249 namespace, key, old, new = self.getargs('namespace key old new')
250 r = pushkey.push(self.repo, namespace, key, old, new)
251 return '%s\n' % int(r)
252
253 def do_listkeys(self):
254 namespace = self.getarg('namespace')
255 d = pushkey.list(self.repo, namespace).items()
256 t = '\n'.join(['%s\t%s' % (k.encode('string-escape'),
257 v.encode('string-escape')) for k, v in d])
258 return t
General Comments 0
You need to be logged in to leave comments. Login now