##// END OF EJS Templates
protocol: introduce wireproto.py...
Matt Mackall -
r11581:4530b330 default
parent child Browse files
Show More
@@ -0,0 +1,75
1 # wireproto.py - generic wire protocol support functions
2 #
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 #
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
7
8 from i18n import _
9 from node import bin, hex
10 import urllib
11 import pushkey as pushkey_
12
13 def dispatch(repo, proto, command):
14 if command not in commands:
15 return False
16 func, spec = commands[command]
17 args = proto.getargs(spec)
18 proto.respond(func(repo, *args))
19 return True
20
21 def between(repo, pairs):
22 pairs = [map(bin, p.split("-")) for p in pairs.split(" ")]
23 r = []
24 for b in repo.between(pairs):
25 r.append(" ".join(map(hex, b)) + "\n")
26 return "".join(r)
27
28 def branchmap(repo):
29 branchmap = repo.branchmap()
30 heads = []
31 for branch, nodes in branchmap.iteritems():
32 branchname = urllib.quote(branch)
33 branchnodes = [hex(node) for node in nodes]
34 heads.append('%s %s' % (branchname, ' '.join(branchnodes)))
35 return '\n'.join(heads)
36
37 def branches(repo, nodes):
38 nodes = map(bin, nodes.split(" "))
39 r = []
40 for b in repo.branches(nodes):
41 r.append(" ".join(map(hex, b)) + "\n")
42 return "".join(r)
43
44 def heads(repo):
45 h = repo.heads()
46 return " ".join(map(hex, h)) + "\n"
47
48 def listkeys(repo, namespace):
49 d = pushkey_.list(repo, namespace).items()
50 t = '\n'.join(['%s\t%s' % (k.encode('string-escape'),
51 v.encode('string-escape')) for k, v in d])
52 return t
53
54 def lookup(repo, key):
55 try:
56 r = hex(repo.lookup(key))
57 success = 1
58 except Exception, inst:
59 r = str(inst)
60 success = 0
61 return "%s %s\n" % (success, r)
62
63 def pushkey(repo, namespace, key, old, new):
64 r = pushkey_.push(repo, namespace, key, old, new)
65 return '%s\n' % int(r)
66
67 commands = {
68 'between': (between, 'pairs'),
69 'branchmap': (branchmap, ''),
70 'branches': (branches, 'nodes'),
71 'heads': (heads, ''),
72 'listkeys': (listkeys, 'namespace'),
73 'lookup': (lookup, 'key'),
74 'pushkey': (pushkey, 'namespace key old new'),
75 }
@@ -1,258 +1,207
1 # sshserver.py - ssh protocol server support for mercurial
1 # sshserver.py - ssh protocol server support for mercurial
2 #
2 #
3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
4 # Copyright 2006 Vadim Gelfer <vadim.gelfer@gmail.com>
4 # Copyright 2006 Vadim Gelfer <vadim.gelfer@gmail.com>
5 #
5 #
6 # This software may be used and distributed according to the terms of the
6 # This software may be used and distributed according to the terms of the
7 # GNU General Public License version 2 or any later version.
7 # GNU General Public License version 2 or any later version.
8
8
9 from i18n import _
9 from i18n import _
10 from node import bin, hex
10 from node import bin, hex
11 import streamclone, util, hook, pushkey
11 import streamclone, util, hook, pushkey, wireproto
12 import os, sys, tempfile, urllib, copy
12 import os, sys, tempfile, urllib, copy
13
13
14 class sshserver(object):
14 class sshserver(object):
15
15
16 caps = 'unbundle lookup changegroupsubset branchmap pushkey'.split()
16 caps = 'unbundle lookup changegroupsubset branchmap pushkey'.split()
17
17
18 def __init__(self, ui, repo):
18 def __init__(self, ui, repo):
19 self.ui = ui
19 self.ui = ui
20 self.repo = repo
20 self.repo = repo
21 self.lock = None
21 self.lock = None
22 self.fin = sys.stdin
22 self.fin = sys.stdin
23 self.fout = sys.stdout
23 self.fout = sys.stdout
24
24
25 hook.redirect(True)
25 hook.redirect(True)
26 sys.stdout = sys.stderr
26 sys.stdout = sys.stderr
27
27
28 # Prevent insertion/deletion of CRs
28 # Prevent insertion/deletion of CRs
29 util.set_binary(self.fin)
29 util.set_binary(self.fin)
30 util.set_binary(self.fout)
30 util.set_binary(self.fout)
31
31
32 def getargs(self, args):
32 def getargs(self, args):
33 data = {}
33 data = {}
34 keys = args.split()
34 keys = args.split()
35 count = len(keys)
35 count = len(keys)
36 for n in xrange(len(keys)):
36 for n in xrange(len(keys)):
37 argline = self.fin.readline()[:-1]
37 argline = self.fin.readline()[:-1]
38 arg, l = argline.split()
38 arg, l = argline.split()
39 val = self.fin.read(int(l))
39 val = self.fin.read(int(l))
40 if arg not in keys:
40 if arg not in keys:
41 raise util.Abort("unexpected parameter %r" % arg)
41 raise util.Abort("unexpected parameter %r" % arg)
42 if arg == '*':
42 if arg == '*':
43 star = {}
43 star = {}
44 for n in xrange(int(l)):
44 for n in xrange(int(l)):
45 arg, l = argline.split()
45 arg, l = argline.split()
46 val = self.fin.read(int(l))
46 val = self.fin.read(int(l))
47 star[arg] = val
47 star[arg] = val
48 data['*'] = star
48 data['*'] = star
49 else:
49 else:
50 data[arg] = val
50 data[arg] = val
51 return [data[k] for k in keys]
51 return [data[k] for k in keys]
52
52
53 def getarg(self, name):
53 def getarg(self, name):
54 return self.getargs(name)[0]
54 return self.getargs(name)[0]
55
55
56 def respond(self, v):
56 def respond(self, v):
57 self.fout.write("%d\n" % len(v))
57 self.fout.write("%d\n" % len(v))
58 self.fout.write(v)
58 self.fout.write(v)
59 self.fout.flush()
59 self.fout.flush()
60
60
61 def serve_forever(self):
61 def serve_forever(self):
62 try:
62 try:
63 while self.serve_one():
63 while self.serve_one():
64 pass
64 pass
65 finally:
65 finally:
66 if self.lock is not None:
66 if self.lock is not None:
67 self.lock.release()
67 self.lock.release()
68 sys.exit(0)
68 sys.exit(0)
69
69
70 def serve_one(self):
70 def serve_one(self):
71 cmd = self.fin.readline()[:-1]
71 cmd = self.fin.readline()[:-1]
72 if cmd:
72 if cmd and not wireproto.dispatch(self.repo, self, cmd):
73 impl = getattr(self, 'do_' + cmd, None)
73 impl = getattr(self, 'do_' + cmd, None)
74 if impl:
74 if impl:
75 r = impl()
75 r = impl()
76 if r is not None:
76 if r is not None:
77 self.respond(r)
77 self.respond(r)
78 else: self.respond("")
78 else: self.respond("")
79 return cmd != ''
79 return cmd != ''
80
80
81 def do_lookup(self):
82 key = self.getarg('key')
83 try:
84 r = hex(self.repo.lookup(key))
85 success = 1
86 except Exception, inst:
87 r = str(inst)
88 success = 0
89 return "%s %s\n" % (success, r)
90
91 def do_branchmap(self):
92 branchmap = self.repo.branchmap()
93 heads = []
94 for branch, nodes in branchmap.iteritems():
95 branchname = urllib.quote(branch)
96 branchnodes = [hex(node) for node in nodes]
97 heads.append('%s %s' % (branchname, ' '.join(branchnodes)))
98 return '\n'.join(heads)
99
100 def do_heads(self):
101 h = self.repo.heads()
102 return " ".join(map(hex, h)) + "\n"
103
104 def do_hello(self):
81 def do_hello(self):
105 '''the hello command returns a set of lines describing various
82 '''the hello command returns a set of lines describing various
106 interesting things about the server, in an RFC822-like format.
83 interesting things about the server, in an RFC822-like format.
107 Currently the only one defined is "capabilities", which
84 Currently the only one defined is "capabilities", which
108 consists of a line in the form:
85 consists of a line in the form:
109
86
110 capabilities: space separated list of tokens
87 capabilities: space separated list of tokens
111 '''
88 '''
112 caps = copy.copy(self.caps)
89 caps = copy.copy(self.caps)
113 if streamclone.allowed(self.repo.ui):
90 if streamclone.allowed(self.repo.ui):
114 caps.append('stream=%d' % self.repo.changelog.version)
91 caps.append('stream=%d' % self.repo.changelog.version)
115 return "capabilities: %s\n" % (' '.join(caps),)
92 return "capabilities: %s\n" % (' '.join(caps),)
116
93
117 def do_lock(self):
94 def do_lock(self):
118 '''DEPRECATED - allowing remote client to lock repo is not safe'''
95 '''DEPRECATED - allowing remote client to lock repo is not safe'''
119
96
120 self.lock = self.repo.lock()
97 self.lock = self.repo.lock()
121 return ""
98 return ""
122
99
123 def do_unlock(self):
100 def do_unlock(self):
124 '''DEPRECATED'''
101 '''DEPRECATED'''
125
102
126 if self.lock:
103 if self.lock:
127 self.lock.release()
104 self.lock.release()
128 self.lock = None
105 self.lock = None
129 return ""
106 return ""
130
107
131 def do_branches(self):
132 nodes = self.getarg('nodes')
133 nodes = map(bin, nodes.split(" "))
134 r = []
135 for b in self.repo.branches(nodes):
136 r.append(" ".join(map(hex, b)) + "\n")
137 return "".join(r)
138
139 def do_between(self):
140 pairs = self.getarg('pairs')
141 pairs = [map(bin, p.split("-")) for p in pairs.split(" ")]
142 r = []
143 for b in self.repo.between(pairs):
144 r.append(" ".join(map(hex, b)) + "\n")
145 return "".join(r)
146
147 def do_changegroup(self):
108 def do_changegroup(self):
148 nodes = []
109 nodes = []
149 roots = self.getarg('roots')
110 roots = self.getarg('roots')
150 nodes = map(bin, roots.split(" "))
111 nodes = map(bin, roots.split(" "))
151
112
152 cg = self.repo.changegroup(nodes, 'serve')
113 cg = self.repo.changegroup(nodes, 'serve')
153 while True:
114 while True:
154 d = cg.read(4096)
115 d = cg.read(4096)
155 if not d:
116 if not d:
156 break
117 break
157 self.fout.write(d)
118 self.fout.write(d)
158
119
159 self.fout.flush()
120 self.fout.flush()
160
121
161 def do_changegroupsubset(self):
122 def do_changegroupsubset(self):
162 bases, heads = self.getargs('bases heads')
123 bases, heads = self.getargs('bases heads')
163 bases = [bin(n) for n in bases.split(' ')]
124 bases = [bin(n) for n in bases.split(' ')]
164 heads = [bin(n) for n in heads.split(' ')]
125 heads = [bin(n) for n in heads.split(' ')]
165
126
166 cg = self.repo.changegroupsubset(bases, heads, 'serve')
127 cg = self.repo.changegroupsubset(bases, heads, 'serve')
167 while True:
128 while True:
168 d = cg.read(4096)
129 d = cg.read(4096)
169 if not d:
130 if not d:
170 break
131 break
171 self.fout.write(d)
132 self.fout.write(d)
172
133
173 self.fout.flush()
134 self.fout.flush()
174
135
175 def do_addchangegroup(self):
136 def do_addchangegroup(self):
176 '''DEPRECATED'''
137 '''DEPRECATED'''
177
138
178 if not self.lock:
139 if not self.lock:
179 self.respond("not locked")
140 self.respond("not locked")
180 return
141 return
181
142
182 self.respond("")
143 self.respond("")
183 r = self.repo.addchangegroup(self.fin, 'serve', self.client_url(),
144 r = self.repo.addchangegroup(self.fin, 'serve', self.client_url(),
184 lock=self.lock)
145 lock=self.lock)
185 return str(r)
146 return str(r)
186
147
187 def client_url(self):
148 def client_url(self):
188 client = os.environ.get('SSH_CLIENT', '').split(' ', 1)[0]
149 client = os.environ.get('SSH_CLIENT', '').split(' ', 1)[0]
189 return 'remote:ssh:' + client
150 return 'remote:ssh:' + client
190
151
191 def do_unbundle(self):
152 def do_unbundle(self):
192 their_heads = self.getarg('heads').split()
153 their_heads = self.getarg('heads').split()
193
154
194 def check_heads():
155 def check_heads():
195 heads = map(hex, self.repo.heads())
156 heads = map(hex, self.repo.heads())
196 return their_heads == [hex('force')] or their_heads == heads
157 return their_heads == [hex('force')] or their_heads == heads
197
158
198 # fail early if possible
159 # fail early if possible
199 if not check_heads():
160 if not check_heads():
200 self.respond(_('unsynced changes'))
161 self.respond(_('unsynced changes'))
201 return
162 return
202
163
203 self.respond('')
164 self.respond('')
204
165
205 # write bundle data to temporary file because it can be big
166 # write bundle data to temporary file because it can be big
206 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
167 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
207 fp = os.fdopen(fd, 'wb+')
168 fp = os.fdopen(fd, 'wb+')
208 try:
169 try:
209 count = int(self.fin.readline())
170 count = int(self.fin.readline())
210 while count:
171 while count:
211 fp.write(self.fin.read(count))
172 fp.write(self.fin.read(count))
212 count = int(self.fin.readline())
173 count = int(self.fin.readline())
213
174
214 was_locked = self.lock is not None
175 was_locked = self.lock is not None
215 if not was_locked:
176 if not was_locked:
216 self.lock = self.repo.lock()
177 self.lock = self.repo.lock()
217 try:
178 try:
218 if not check_heads():
179 if not check_heads():
219 # someone else committed/pushed/unbundled while we
180 # someone else committed/pushed/unbundled while we
220 # were transferring data
181 # were transferring data
221 self.respond(_('unsynced changes'))
182 self.respond(_('unsynced changes'))
222 return
183 return
223 self.respond('')
184 self.respond('')
224
185
225 # push can proceed
186 # push can proceed
226
187
227 fp.seek(0)
188 fp.seek(0)
228 r = self.repo.addchangegroup(fp, 'serve', self.client_url(),
189 r = self.repo.addchangegroup(fp, 'serve', self.client_url(),
229 lock=self.lock)
190 lock=self.lock)
230 self.respond(str(r))
191 self.respond(str(r))
231 finally:
192 finally:
232 if not was_locked:
193 if not was_locked:
233 self.lock.release()
194 self.lock.release()
234 self.lock = None
195 self.lock = None
235 finally:
196 finally:
236 fp.close()
197 fp.close()
237 os.unlink(tempname)
198 os.unlink(tempname)
238
199
239 def do_stream_out(self):
200 def do_stream_out(self):
240 try:
201 try:
241 for chunk in streamclone.stream_out(self.repo):
202 for chunk in streamclone.stream_out(self.repo):
242 self.fout.write(chunk)
203 self.fout.write(chunk)
243 self.fout.flush()
204 self.fout.flush()
244 except streamclone.StreamException, inst:
205 except streamclone.StreamException, inst:
245 self.fout.write(str(inst))
206 self.fout.write(str(inst))
246 self.fout.flush()
207 self.fout.flush()
247
248 def do_pushkey(self):
249 namespace, key, old, new = self.getargs('namespace key old new')
250 r = pushkey.push(self.repo, namespace, key, old, new)
251 return '%s\n' % int(r)
252
253 def do_listkeys(self):
254 namespace = self.getarg('namespace')
255 d = pushkey.list(self.repo, namespace).items()
256 t = '\n'.join(['%s\t%s' % (k.encode('string-escape'),
257 v.encode('string-escape')) for k, v in d])
258 return t
General Comments 0
You need to be logged in to leave comments. Login now