##// END OF EJS Templates
Adding changegroupsubset and lookup to ssh protocol so pull -r and...
Eric Hopper -
r3446:0b450267 default
parent child Browse files
Show More
@@ -1,211 +1,223
1 # sshrepo.py - ssh repository proxy class for mercurial
1 # sshrepo.py - ssh repository proxy class for mercurial
2 #
2 #
3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms
5 # This software may be used and distributed according to the terms
6 # of the GNU General Public License, incorporated herein by reference.
6 # of the GNU General Public License, incorporated herein by reference.
7
7
8 from node import *
8 from node import *
9 from remoterepo import *
9 from remoterepo import *
10 from i18n import gettext as _
10 from i18n import gettext as _
11 from demandload import *
11 from demandload import *
12 demandload(globals(), "hg os re stat util")
12 demandload(globals(), "hg os re stat util")
13
13
14 class sshrepository(remoterepository):
14 class sshrepository(remoterepository):
15 def __init__(self, ui, path, create=0):
15 def __init__(self, ui, path, create=0):
16 self._url = path
16 self._url = path
17 self.ui = ui
17 self.ui = ui
18
18
19 m = re.match(r'ssh://(([^@]+)@)?([^:/]+)(:(\d+))?(/(.*))?', path)
19 m = re.match(r'ssh://(([^@]+)@)?([^:/]+)(:(\d+))?(/(.*))?', path)
20 if not m:
20 if not m:
21 self.repoerror(_("couldn't parse location %s") % path)
21 self.repoerror(_("couldn't parse location %s") % path)
22
22
23 self.user = m.group(2)
23 self.user = m.group(2)
24 self.host = m.group(3)
24 self.host = m.group(3)
25 self.port = m.group(5)
25 self.port = m.group(5)
26 self.path = m.group(7) or "."
26 self.path = m.group(7) or "."
27
27
28 args = self.user and ("%s@%s" % (self.user, self.host)) or self.host
28 args = self.user and ("%s@%s" % (self.user, self.host)) or self.host
29 args = self.port and ("%s -p %s") % (args, self.port) or args
29 args = self.port and ("%s -p %s") % (args, self.port) or args
30
30
31 sshcmd = self.ui.config("ui", "ssh", "ssh")
31 sshcmd = self.ui.config("ui", "ssh", "ssh")
32 remotecmd = self.ui.config("ui", "remotecmd", "hg")
32 remotecmd = self.ui.config("ui", "remotecmd", "hg")
33
33
34 if create:
34 if create:
35 cmd = '%s %s "%s init %s"'
35 cmd = '%s %s "%s init %s"'
36 cmd = cmd % (sshcmd, args, remotecmd, self.path)
36 cmd = cmd % (sshcmd, args, remotecmd, self.path)
37
37
38 ui.note('running %s\n' % cmd)
38 ui.note('running %s\n' % cmd)
39 res = os.system(cmd)
39 res = os.system(cmd)
40 if res != 0:
40 if res != 0:
41 self.repoerror(_("could not create remote repo"))
41 self.repoerror(_("could not create remote repo"))
42
42
43 self.validate_repo(ui, sshcmd, args, remotecmd)
43 self.validate_repo(ui, sshcmd, args, remotecmd)
44
44
45 def url(self):
45 def url(self):
46 return self._url
46 return self._url
47
47
48 def validate_repo(self, ui, sshcmd, args, remotecmd):
48 def validate_repo(self, ui, sshcmd, args, remotecmd):
49 # cleanup up previous run
49 # cleanup up previous run
50 self.cleanup()
50 self.cleanup()
51
51
52 cmd = '%s %s "%s -R %s serve --stdio"'
52 cmd = '%s %s "%s -R %s serve --stdio"'
53 cmd = cmd % (sshcmd, args, remotecmd, self.path)
53 cmd = cmd % (sshcmd, args, remotecmd, self.path)
54
54
55 ui.note('running %s\n' % cmd)
55 ui.note('running %s\n' % cmd)
56 self.pipeo, self.pipei, self.pipee = os.popen3(cmd, 'b')
56 self.pipeo, self.pipei, self.pipee = os.popen3(cmd, 'b')
57
57
58 # skip any noise generated by remote shell
58 # skip any noise generated by remote shell
59 self.do_cmd("hello")
59 self.do_cmd("hello")
60 r = self.do_cmd("between", pairs=("%s-%s" % ("0"*40, "0"*40)))
60 r = self.do_cmd("between", pairs=("%s-%s" % ("0"*40, "0"*40)))
61 lines = ["", "dummy"]
61 lines = ["", "dummy"]
62 max_noise = 500
62 max_noise = 500
63 while lines[-1] and max_noise:
63 while lines[-1] and max_noise:
64 l = r.readline()
64 l = r.readline()
65 self.readerr()
65 self.readerr()
66 if lines[-1] == "1\n" and l == "\n":
66 if lines[-1] == "1\n" and l == "\n":
67 break
67 break
68 if l:
68 if l:
69 ui.debug(_("remote: "), l)
69 ui.debug(_("remote: "), l)
70 lines.append(l)
70 lines.append(l)
71 max_noise -= 1
71 max_noise -= 1
72 else:
72 else:
73 self.repoerror(_("no suitable response from remote hg"))
73 self.repoerror(_("no suitable response from remote hg"))
74
74
75 self.capabilities = ()
75 self.capabilities = ()
76 lines.reverse()
76 lines.reverse()
77 for l in lines:
77 for l in lines:
78 if l.startswith("capabilities:"):
78 if l.startswith("capabilities:"):
79 self.capabilities = l[:-1].split(":")[1].split()
79 self.capabilities = l[:-1].split(":")[1].split()
80 break
80 break
81
81
82 def readerr(self):
82 def readerr(self):
83 while 1:
83 while 1:
84 size = util.fstat(self.pipee).st_size
84 size = util.fstat(self.pipee).st_size
85 if size == 0: break
85 if size == 0: break
86 l = self.pipee.readline()
86 l = self.pipee.readline()
87 if not l: break
87 if not l: break
88 self.ui.status(_("remote: "), l)
88 self.ui.status(_("remote: "), l)
89
89
90 def repoerror(self, msg):
90 def repoerror(self, msg):
91 self.cleanup()
91 self.cleanup()
92 raise hg.RepoError(msg)
92 raise hg.RepoError(msg)
93
93
94 def cleanup(self):
94 def cleanup(self):
95 try:
95 try:
96 self.pipeo.close()
96 self.pipeo.close()
97 self.pipei.close()
97 self.pipei.close()
98 # read the error descriptor until EOF
98 # read the error descriptor until EOF
99 for l in self.pipee:
99 for l in self.pipee:
100 self.ui.status(_("remote: "), l)
100 self.ui.status(_("remote: "), l)
101 self.pipee.close()
101 self.pipee.close()
102 except:
102 except:
103 pass
103 pass
104
104
105 __del__ = cleanup
105 __del__ = cleanup
106
106
107 def do_cmd(self, cmd, **args):
107 def do_cmd(self, cmd, **args):
108 self.ui.debug(_("sending %s command\n") % cmd)
108 self.ui.debug(_("sending %s command\n") % cmd)
109 self.pipeo.write("%s\n" % cmd)
109 self.pipeo.write("%s\n" % cmd)
110 for k, v in args.items():
110 for k, v in args.items():
111 self.pipeo.write("%s %d\n" % (k, len(v)))
111 self.pipeo.write("%s %d\n" % (k, len(v)))
112 self.pipeo.write(v)
112 self.pipeo.write(v)
113 self.pipeo.flush()
113 self.pipeo.flush()
114
114
115 return self.pipei
115 return self.pipei
116
116
117 def call(self, cmd, **args):
117 def call(self, cmd, **args):
118 r = self.do_cmd(cmd, **args)
118 r = self.do_cmd(cmd, **args)
119 l = r.readline()
119 l = r.readline()
120 self.readerr()
120 self.readerr()
121 try:
121 try:
122 l = int(l)
122 l = int(l)
123 except:
123 except:
124 self.repoerror(_("unexpected response '%s'") % l)
124 self.repoerror(_("unexpected response '%s'") % l)
125 return r.read(l)
125 return r.read(l)
126
126
127 def lock(self):
127 def lock(self):
128 self.call("lock")
128 self.call("lock")
129 return remotelock(self)
129 return remotelock(self)
130
130
131 def unlock(self):
131 def unlock(self):
132 self.call("unlock")
132 self.call("unlock")
133
133
134 def lookup(self, key):
135 d = self.call("lookup", key=key)
136 try:
137 return bin(d[:-1])
138 except:
139 raise hg.RepoError("unexpected response '%s'" % (d[:400] + "..."))
140
134 def heads(self):
141 def heads(self):
135 d = self.call("heads")
142 d = self.call("heads")
136 try:
143 try:
137 return map(bin, d[:-1].split(" "))
144 return map(bin, d[:-1].split(" "))
138 except:
145 except:
139 self.repoerror(_("unexpected response '%s'") % (d[:400] + "..."))
146 self.repoerror(_("unexpected response '%s'") % (d[:400] + "..."))
140
147
141 def branches(self, nodes):
148 def branches(self, nodes):
142 n = " ".join(map(hex, nodes))
149 n = " ".join(map(hex, nodes))
143 d = self.call("branches", nodes=n)
150 d = self.call("branches", nodes=n)
144 try:
151 try:
145 br = [ tuple(map(bin, b.split(" "))) for b in d.splitlines() ]
152 br = [ tuple(map(bin, b.split(" "))) for b in d.splitlines() ]
146 return br
153 return br
147 except:
154 except:
148 self.repoerror(_("unexpected response '%s'") % (d[:400] + "..."))
155 self.repoerror(_("unexpected response '%s'") % (d[:400] + "..."))
149
156
150 def between(self, pairs):
157 def between(self, pairs):
151 n = "\n".join(["-".join(map(hex, p)) for p in pairs])
158 n = "\n".join(["-".join(map(hex, p)) for p in pairs])
152 d = self.call("between", pairs=n)
159 d = self.call("between", pairs=n)
153 try:
160 try:
154 p = [ l and map(bin, l.split(" ")) or [] for l in d.splitlines() ]
161 p = [ l and map(bin, l.split(" ")) or [] for l in d.splitlines() ]
155 return p
162 return p
156 except:
163 except:
157 self.repoerror(_("unexpected response '%s'") % (d[:400] + "..."))
164 self.repoerror(_("unexpected response '%s'") % (d[:400] + "..."))
158
165
159 def changegroup(self, nodes, kind):
166 def changegroup(self, nodes, kind):
160 n = " ".join(map(hex, nodes))
167 n = " ".join(map(hex, nodes))
161 return self.do_cmd("changegroup", roots=n)
168 return self.do_cmd("changegroup", roots=n)
162
169
170 def changegroupsubset(self, bases, heads, kind):
171 bases = " ".join(map(hex, bases))
172 heads = " ".join(map(hex, heads))
173 return self.do_cmd("changegroupsubset", bases=bases, heads=heads)
174
163 def unbundle(self, cg, heads, source):
175 def unbundle(self, cg, heads, source):
164 d = self.call("unbundle", heads=' '.join(map(hex, heads)))
176 d = self.call("unbundle", heads=' '.join(map(hex, heads)))
165 if d:
177 if d:
166 self.repoerror(_("push refused: %s") % d)
178 self.repoerror(_("push refused: %s") % d)
167
179
168 while 1:
180 while 1:
169 d = cg.read(4096)
181 d = cg.read(4096)
170 if not d: break
182 if not d: break
171 self.pipeo.write(str(len(d)) + '\n')
183 self.pipeo.write(str(len(d)) + '\n')
172 self.pipeo.write(d)
184 self.pipeo.write(d)
173 self.readerr()
185 self.readerr()
174
186
175 self.pipeo.write('0\n')
187 self.pipeo.write('0\n')
176 self.pipeo.flush()
188 self.pipeo.flush()
177
189
178 self.readerr()
190 self.readerr()
179 d = self.pipei.readline()
191 d = self.pipei.readline()
180 if d != '\n':
192 if d != '\n':
181 return 1
193 return 1
182
194
183 l = int(self.pipei.readline())
195 l = int(self.pipei.readline())
184 r = self.pipei.read(l)
196 r = self.pipei.read(l)
185 if not r:
197 if not r:
186 return 1
198 return 1
187 return int(r)
199 return int(r)
188
200
189 def addchangegroup(self, cg, source, url):
201 def addchangegroup(self, cg, source, url):
190 d = self.call("addchangegroup")
202 d = self.call("addchangegroup")
191 if d:
203 if d:
192 self.repoerror(_("push refused: %s") % d)
204 self.repoerror(_("push refused: %s") % d)
193 while 1:
205 while 1:
194 d = cg.read(4096)
206 d = cg.read(4096)
195 if not d: break
207 if not d: break
196 self.pipeo.write(d)
208 self.pipeo.write(d)
197 self.readerr()
209 self.readerr()
198
210
199 self.pipeo.flush()
211 self.pipeo.flush()
200
212
201 self.readerr()
213 self.readerr()
202 l = int(self.pipei.readline())
214 l = int(self.pipei.readline())
203 r = self.pipei.read(l)
215 r = self.pipei.read(l)
204 if not r:
216 if not r:
205 return 1
217 return 1
206 return int(r)
218 return int(r)
207
219
208 def stream_out(self):
220 def stream_out(self):
209 return self.do_cmd('stream_out')
221 return self.do_cmd('stream_out')
210
222
211 instance = sshrepository
223 instance = sshrepository
@@ -1,178 +1,199
1 # sshserver.py - ssh protocol server support for mercurial
1 # sshserver.py - ssh protocol server support for mercurial
2 #
2 #
3 # Copyright 2005 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005 Matt Mackall <mpm@selenic.com>
4 # Copyright 2006 Vadim Gelfer <vadim.gelfer@gmail.com>
4 # Copyright 2006 Vadim Gelfer <vadim.gelfer@gmail.com>
5 #
5 #
6 # This software may be used and distributed according to the terms
6 # This software may be used and distributed according to the terms
7 # of the GNU General Public License, incorporated herein by reference.
7 # of the GNU General Public License, incorporated herein by reference.
8
8
9 from demandload import demandload
9 from demandload import demandload
10 from i18n import gettext as _
10 from i18n import gettext as _
11 from node import *
11 from node import *
12 demandload(globals(), "os streamclone sys tempfile util")
12 demandload(globals(), "os streamclone sys tempfile util")
13
13
14 class sshserver(object):
14 class sshserver(object):
15 def __init__(self, ui, repo):
15 def __init__(self, ui, repo):
16 self.ui = ui
16 self.ui = ui
17 self.repo = repo
17 self.repo = repo
18 self.lock = None
18 self.lock = None
19 self.fin = sys.stdin
19 self.fin = sys.stdin
20 self.fout = sys.stdout
20 self.fout = sys.stdout
21
21
22 sys.stdout = sys.stderr
22 sys.stdout = sys.stderr
23
23
24 # Prevent insertion/deletion of CRs
24 # Prevent insertion/deletion of CRs
25 util.set_binary(self.fin)
25 util.set_binary(self.fin)
26 util.set_binary(self.fout)
26 util.set_binary(self.fout)
27
27
28 def getarg(self):
28 def getarg(self):
29 argline = self.fin.readline()[:-1]
29 argline = self.fin.readline()[:-1]
30 arg, l = argline.split()
30 arg, l = argline.split()
31 val = self.fin.read(int(l))
31 val = self.fin.read(int(l))
32 return arg, val
32 return arg, val
33
33
34 def respond(self, v):
34 def respond(self, v):
35 self.fout.write("%d\n" % len(v))
35 self.fout.write("%d\n" % len(v))
36 self.fout.write(v)
36 self.fout.write(v)
37 self.fout.flush()
37 self.fout.flush()
38
38
39 def serve_forever(self):
39 def serve_forever(self):
40 while self.serve_one(): pass
40 while self.serve_one(): pass
41 sys.exit(0)
41 sys.exit(0)
42
42
43 def serve_one(self):
43 def serve_one(self):
44 cmd = self.fin.readline()[:-1]
44 cmd = self.fin.readline()[:-1]
45 if cmd:
45 if cmd:
46 impl = getattr(self, 'do_' + cmd, None)
46 impl = getattr(self, 'do_' + cmd, None)
47 if impl: impl()
47 if impl: impl()
48 else: self.respond("")
48 else: self.respond("")
49 return cmd != ''
49 return cmd != ''
50
50
51 def do_lookup(self):
52 arg, key = self.getarg()
53 assert arg == 'key'
54 self.respond(hex(self.repo.lookup(key)) + "\n")
55
51 def do_heads(self):
56 def do_heads(self):
52 h = self.repo.heads()
57 h = self.repo.heads()
53 self.respond(" ".join(map(hex, h)) + "\n")
58 self.respond(" ".join(map(hex, h)) + "\n")
54
59
55 def do_hello(self):
60 def do_hello(self):
56 '''the hello command returns a set of lines describing various
61 '''the hello command returns a set of lines describing various
57 interesting things about the server, in an RFC822-like format.
62 interesting things about the server, in an RFC822-like format.
58 Currently the only one defined is "capabilities", which
63 Currently the only one defined is "capabilities", which
59 consists of a line in the form:
64 consists of a line in the form:
60
65
61 capabilities: space separated list of tokens
66 capabilities: space separated list of tokens
62 '''
67 '''
63
68
64 caps = ['unbundle']
69 caps = ['unbundle', 'lookup', 'changegroupsubset']
65 if self.ui.configbool('server', 'uncompressed'):
70 if self.ui.configbool('server', 'uncompressed'):
66 caps.append('stream=%d' % self.repo.revlogversion)
71 caps.append('stream=%d' % self.repo.revlogversion)
67 self.respond("capabilities: %s\n" % (' '.join(caps),))
72 self.respond("capabilities: %s\n" % (' '.join(caps),))
68
73
69 def do_lock(self):
74 def do_lock(self):
70 '''DEPRECATED - allowing remote client to lock repo is not safe'''
75 '''DEPRECATED - allowing remote client to lock repo is not safe'''
71
76
72 self.lock = self.repo.lock()
77 self.lock = self.repo.lock()
73 self.respond("")
78 self.respond("")
74
79
75 def do_unlock(self):
80 def do_unlock(self):
76 '''DEPRECATED'''
81 '''DEPRECATED'''
77
82
78 if self.lock:
83 if self.lock:
79 self.lock.release()
84 self.lock.release()
80 self.lock = None
85 self.lock = None
81 self.respond("")
86 self.respond("")
82
87
83 def do_branches(self):
88 def do_branches(self):
84 arg, nodes = self.getarg()
89 arg, nodes = self.getarg()
85 nodes = map(bin, nodes.split(" "))
90 nodes = map(bin, nodes.split(" "))
86 r = []
91 r = []
87 for b in self.repo.branches(nodes):
92 for b in self.repo.branches(nodes):
88 r.append(" ".join(map(hex, b)) + "\n")
93 r.append(" ".join(map(hex, b)) + "\n")
89 self.respond("".join(r))
94 self.respond("".join(r))
90
95
91 def do_between(self):
96 def do_between(self):
92 arg, pairs = self.getarg()
97 arg, pairs = self.getarg()
93 pairs = [map(bin, p.split("-")) for p in pairs.split(" ")]
98 pairs = [map(bin, p.split("-")) for p in pairs.split(" ")]
94 r = []
99 r = []
95 for b in self.repo.between(pairs):
100 for b in self.repo.between(pairs):
96 r.append(" ".join(map(hex, b)) + "\n")
101 r.append(" ".join(map(hex, b)) + "\n")
97 self.respond("".join(r))
102 self.respond("".join(r))
98
103
99 def do_changegroup(self):
104 def do_changegroup(self):
100 nodes = []
105 nodes = []
101 arg, roots = self.getarg()
106 arg, roots = self.getarg()
102 nodes = map(bin, roots.split(" "))
107 nodes = map(bin, roots.split(" "))
103
108
104 cg = self.repo.changegroup(nodes, 'serve')
109 cg = self.repo.changegroup(nodes, 'serve')
105 while True:
110 while True:
106 d = cg.read(4096)
111 d = cg.read(4096)
107 if not d:
112 if not d:
108 break
113 break
109 self.fout.write(d)
114 self.fout.write(d)
110
115
111 self.fout.flush()
116 self.fout.flush()
112
117
118 def do_changegroupsubset(self):
119 bases = []
120 heads = []
121 argmap = dict([self.getarg(), self.getarg()])
122 bases = [bin(n) for n in argmap['bases'].split(' ')]
123 heads = [bin(n) for n in argmap['heads'].split(' ')]
124
125 cg = self.repo.changegroupsubset(bases, heads, 'serve')
126 while True:
127 d = cg.read(4096)
128 if not d:
129 break
130 self.fout.write(d)
131
132 self.fout.flush()
133
113 def do_addchangegroup(self):
134 def do_addchangegroup(self):
114 '''DEPRECATED'''
135 '''DEPRECATED'''
115
136
116 if not self.lock:
137 if not self.lock:
117 self.respond("not locked")
138 self.respond("not locked")
118 return
139 return
119
140
120 self.respond("")
141 self.respond("")
121 r = self.repo.addchangegroup(self.fin, 'serve', self.client_url())
142 r = self.repo.addchangegroup(self.fin, 'serve', self.client_url())
122 self.respond(str(r))
143 self.respond(str(r))
123
144
124 def client_url(self):
145 def client_url(self):
125 client = os.environ.get('SSH_CLIENT', '').split(' ', 1)[0]
146 client = os.environ.get('SSH_CLIENT', '').split(' ', 1)[0]
126 return 'remote:ssh:' + client
147 return 'remote:ssh:' + client
127
148
128 def do_unbundle(self):
149 def do_unbundle(self):
129 their_heads = self.getarg()[1].split()
150 their_heads = self.getarg()[1].split()
130
151
131 def check_heads():
152 def check_heads():
132 heads = map(hex, self.repo.heads())
153 heads = map(hex, self.repo.heads())
133 return their_heads == [hex('force')] or their_heads == heads
154 return their_heads == [hex('force')] or their_heads == heads
134
155
135 # fail early if possible
156 # fail early if possible
136 if not check_heads():
157 if not check_heads():
137 self.respond(_('unsynced changes'))
158 self.respond(_('unsynced changes'))
138 return
159 return
139
160
140 self.respond('')
161 self.respond('')
141
162
142 # write bundle data to temporary file because it can be big
163 # write bundle data to temporary file because it can be big
143
164
144 try:
165 try:
145 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
166 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
146 fp = os.fdopen(fd, 'wb+')
167 fp = os.fdopen(fd, 'wb+')
147
168
148 count = int(self.fin.readline())
169 count = int(self.fin.readline())
149 while count:
170 while count:
150 fp.write(self.fin.read(count))
171 fp.write(self.fin.read(count))
151 count = int(self.fin.readline())
172 count = int(self.fin.readline())
152
173
153 was_locked = self.lock is not None
174 was_locked = self.lock is not None
154 if not was_locked:
175 if not was_locked:
155 self.lock = self.repo.lock()
176 self.lock = self.repo.lock()
156 try:
177 try:
157 if not check_heads():
178 if not check_heads():
158 # someone else committed/pushed/unbundled while we
179 # someone else committed/pushed/unbundled while we
159 # were transferring data
180 # were transferring data
160 self.respond(_('unsynced changes'))
181 self.respond(_('unsynced changes'))
161 return
182 return
162 self.respond('')
183 self.respond('')
163
184
164 # push can proceed
185 # push can proceed
165
186
166 fp.seek(0)
187 fp.seek(0)
167 r = self.repo.addchangegroup(fp, 'serve', self.client_url())
188 r = self.repo.addchangegroup(fp, 'serve', self.client_url())
168 self.respond(str(r))
189 self.respond(str(r))
169 finally:
190 finally:
170 if not was_locked:
191 if not was_locked:
171 self.lock.release()
192 self.lock.release()
172 self.lock = None
193 self.lock = None
173 finally:
194 finally:
174 fp.close()
195 fp.close()
175 os.unlink(tempname)
196 os.unlink(tempname)
176
197
177 def do_stream_out(self):
198 def do_stream_out(self):
178 streamclone.stream_out(self.repo, self.fout)
199 streamclone.stream_out(self.repo, self.fout)
General Comments 0
You need to be logged in to leave comments. Login now