##// END OF EJS Templates
sshrepo: add passing of lookup exceptions
Eric Hopper -
r3447:ef1032c2 default
parent child Browse files
Show More
@@ -1,223 +1,228 b''
1 1 # sshrepo.py - ssh repository proxy class for mercurial
2 2 #
3 3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms
6 6 # of the GNU General Public License, incorporated herein by reference.
7 7
8 8 from node import *
9 9 from remoterepo import *
10 10 from i18n import gettext as _
11 11 from demandload import *
12 12 demandload(globals(), "hg os re stat util")
13 13
14 14 class sshrepository(remoterepository):
15 15 def __init__(self, ui, path, create=0):
16 16 self._url = path
17 17 self.ui = ui
18 18
19 19 m = re.match(r'ssh://(([^@]+)@)?([^:/]+)(:(\d+))?(/(.*))?', path)
20 20 if not m:
21 21 self.repoerror(_("couldn't parse location %s") % path)
22 22
23 23 self.user = m.group(2)
24 24 self.host = m.group(3)
25 25 self.port = m.group(5)
26 26 self.path = m.group(7) or "."
27 27
28 28 args = self.user and ("%s@%s" % (self.user, self.host)) or self.host
29 29 args = self.port and ("%s -p %s") % (args, self.port) or args
30 30
31 31 sshcmd = self.ui.config("ui", "ssh", "ssh")
32 32 remotecmd = self.ui.config("ui", "remotecmd", "hg")
33 33
34 34 if create:
35 35 cmd = '%s %s "%s init %s"'
36 36 cmd = cmd % (sshcmd, args, remotecmd, self.path)
37 37
38 38 ui.note('running %s\n' % cmd)
39 39 res = os.system(cmd)
40 40 if res != 0:
41 41 self.repoerror(_("could not create remote repo"))
42 42
43 43 self.validate_repo(ui, sshcmd, args, remotecmd)
44 44
45 45 def url(self):
46 46 return self._url
47 47
48 48 def validate_repo(self, ui, sshcmd, args, remotecmd):
49 49 # cleanup up previous run
50 50 self.cleanup()
51 51
52 52 cmd = '%s %s "%s -R %s serve --stdio"'
53 53 cmd = cmd % (sshcmd, args, remotecmd, self.path)
54 54
55 55 ui.note('running %s\n' % cmd)
56 56 self.pipeo, self.pipei, self.pipee = os.popen3(cmd, 'b')
57 57
58 58 # skip any noise generated by remote shell
59 59 self.do_cmd("hello")
60 60 r = self.do_cmd("between", pairs=("%s-%s" % ("0"*40, "0"*40)))
61 61 lines = ["", "dummy"]
62 62 max_noise = 500
63 63 while lines[-1] and max_noise:
64 64 l = r.readline()
65 65 self.readerr()
66 66 if lines[-1] == "1\n" and l == "\n":
67 67 break
68 68 if l:
69 69 ui.debug(_("remote: "), l)
70 70 lines.append(l)
71 71 max_noise -= 1
72 72 else:
73 73 self.repoerror(_("no suitable response from remote hg"))
74 74
75 75 self.capabilities = ()
76 76 lines.reverse()
77 77 for l in lines:
78 78 if l.startswith("capabilities:"):
79 79 self.capabilities = l[:-1].split(":")[1].split()
80 80 break
81 81
82 82 def readerr(self):
83 83 while 1:
84 84 size = util.fstat(self.pipee).st_size
85 85 if size == 0: break
86 86 l = self.pipee.readline()
87 87 if not l: break
88 88 self.ui.status(_("remote: "), l)
89 89
90 90 def repoerror(self, msg):
91 91 self.cleanup()
92 92 raise hg.RepoError(msg)
93 93
94 94 def cleanup(self):
95 95 try:
96 96 self.pipeo.close()
97 97 self.pipei.close()
98 98 # read the error descriptor until EOF
99 99 for l in self.pipee:
100 100 self.ui.status(_("remote: "), l)
101 101 self.pipee.close()
102 102 except:
103 103 pass
104 104
105 105 __del__ = cleanup
106 106
107 107 def do_cmd(self, cmd, **args):
108 108 self.ui.debug(_("sending %s command\n") % cmd)
109 109 self.pipeo.write("%s\n" % cmd)
110 110 for k, v in args.items():
111 111 self.pipeo.write("%s %d\n" % (k, len(v)))
112 112 self.pipeo.write(v)
113 113 self.pipeo.flush()
114 114
115 115 return self.pipei
116 116
117 117 def call(self, cmd, **args):
118 118 r = self.do_cmd(cmd, **args)
119 119 l = r.readline()
120 120 self.readerr()
121 121 try:
122 122 l = int(l)
123 123 except:
124 124 self.repoerror(_("unexpected response '%s'") % l)
125 125 return r.read(l)
126 126
127 127 def lock(self):
128 128 self.call("lock")
129 129 return remotelock(self)
130 130
131 131 def unlock(self):
132 132 self.call("unlock")
133 133
134 134 def lookup(self, key):
135 135 d = self.call("lookup", key=key)
136 success, data = d[:-1].split(" ", 1)
136 137 try:
137 return bin(d[:-1])
138 if int(success):
139 return bin(data)
140 else:
141 raise data
138 142 except:
143 raise
139 144 raise hg.RepoError("unexpected response '%s'" % (d[:400] + "..."))
140 145
141 146 def heads(self):
142 147 d = self.call("heads")
143 148 try:
144 149 return map(bin, d[:-1].split(" "))
145 150 except:
146 151 self.repoerror(_("unexpected response '%s'") % (d[:400] + "..."))
147 152
148 153 def branches(self, nodes):
149 154 n = " ".join(map(hex, nodes))
150 155 d = self.call("branches", nodes=n)
151 156 try:
152 157 br = [ tuple(map(bin, b.split(" "))) for b in d.splitlines() ]
153 158 return br
154 159 except:
155 160 self.repoerror(_("unexpected response '%s'") % (d[:400] + "..."))
156 161
157 162 def between(self, pairs):
158 163 n = "\n".join(["-".join(map(hex, p)) for p in pairs])
159 164 d = self.call("between", pairs=n)
160 165 try:
161 166 p = [ l and map(bin, l.split(" ")) or [] for l in d.splitlines() ]
162 167 return p
163 168 except:
164 169 self.repoerror(_("unexpected response '%s'") % (d[:400] + "..."))
165 170
166 171 def changegroup(self, nodes, kind):
167 172 n = " ".join(map(hex, nodes))
168 173 return self.do_cmd("changegroup", roots=n)
169 174
170 175 def changegroupsubset(self, bases, heads, kind):
171 176 bases = " ".join(map(hex, bases))
172 177 heads = " ".join(map(hex, heads))
173 178 return self.do_cmd("changegroupsubset", bases=bases, heads=heads)
174 179
175 180 def unbundle(self, cg, heads, source):
176 181 d = self.call("unbundle", heads=' '.join(map(hex, heads)))
177 182 if d:
178 183 self.repoerror(_("push refused: %s") % d)
179 184
180 185 while 1:
181 186 d = cg.read(4096)
182 187 if not d: break
183 188 self.pipeo.write(str(len(d)) + '\n')
184 189 self.pipeo.write(d)
185 190 self.readerr()
186 191
187 192 self.pipeo.write('0\n')
188 193 self.pipeo.flush()
189 194
190 195 self.readerr()
191 196 d = self.pipei.readline()
192 197 if d != '\n':
193 198 return 1
194 199
195 200 l = int(self.pipei.readline())
196 201 r = self.pipei.read(l)
197 202 if not r:
198 203 return 1
199 204 return int(r)
200 205
201 206 def addchangegroup(self, cg, source, url):
202 207 d = self.call("addchangegroup")
203 208 if d:
204 209 self.repoerror(_("push refused: %s") % d)
205 210 while 1:
206 211 d = cg.read(4096)
207 212 if not d: break
208 213 self.pipeo.write(d)
209 214 self.readerr()
210 215
211 216 self.pipeo.flush()
212 217
213 218 self.readerr()
214 219 l = int(self.pipei.readline())
215 220 r = self.pipei.read(l)
216 221 if not r:
217 222 return 1
218 223 return int(r)
219 224
220 225 def stream_out(self):
221 226 return self.do_cmd('stream_out')
222 227
223 228 instance = sshrepository
@@ -1,199 +1,205 b''
1 1 # sshserver.py - ssh protocol server support for mercurial
2 2 #
3 3 # Copyright 2005 Matt Mackall <mpm@selenic.com>
4 4 # Copyright 2006 Vadim Gelfer <vadim.gelfer@gmail.com>
5 5 #
6 6 # This software may be used and distributed according to the terms
7 7 # of the GNU General Public License, incorporated herein by reference.
8 8
9 9 from demandload import demandload
10 10 from i18n import gettext as _
11 11 from node import *
12 12 demandload(globals(), "os streamclone sys tempfile util")
13 13
14 14 class sshserver(object):
15 15 def __init__(self, ui, repo):
16 16 self.ui = ui
17 17 self.repo = repo
18 18 self.lock = None
19 19 self.fin = sys.stdin
20 20 self.fout = sys.stdout
21 21
22 22 sys.stdout = sys.stderr
23 23
24 24 # Prevent insertion/deletion of CRs
25 25 util.set_binary(self.fin)
26 26 util.set_binary(self.fout)
27 27
28 28 def getarg(self):
29 29 argline = self.fin.readline()[:-1]
30 30 arg, l = argline.split()
31 31 val = self.fin.read(int(l))
32 32 return arg, val
33 33
34 34 def respond(self, v):
35 35 self.fout.write("%d\n" % len(v))
36 36 self.fout.write(v)
37 37 self.fout.flush()
38 38
39 39 def serve_forever(self):
40 40 while self.serve_one(): pass
41 41 sys.exit(0)
42 42
43 43 def serve_one(self):
44 44 cmd = self.fin.readline()[:-1]
45 45 if cmd:
46 46 impl = getattr(self, 'do_' + cmd, None)
47 47 if impl: impl()
48 48 else: self.respond("")
49 49 return cmd != ''
50 50
51 51 def do_lookup(self):
52 52 arg, key = self.getarg()
53 53 assert arg == 'key'
54 self.respond(hex(self.repo.lookup(key)) + "\n")
54 try:
55 r = hex(self.repo.lookup(key))
56 success = 1
57 except Exception,inst:
58 r = str(inst)
59 success = 0
60 self.respond("%s %s\n" % (success, r))
55 61
56 62 def do_heads(self):
57 63 h = self.repo.heads()
58 64 self.respond(" ".join(map(hex, h)) + "\n")
59 65
60 66 def do_hello(self):
61 67 '''the hello command returns a set of lines describing various
62 68 interesting things about the server, in an RFC822-like format.
63 69 Currently the only one defined is "capabilities", which
64 70 consists of a line in the form:
65 71
66 72 capabilities: space separated list of tokens
67 73 '''
68 74
69 75 caps = ['unbundle', 'lookup', 'changegroupsubset']
70 76 if self.ui.configbool('server', 'uncompressed'):
71 77 caps.append('stream=%d' % self.repo.revlogversion)
72 78 self.respond("capabilities: %s\n" % (' '.join(caps),))
73 79
74 80 def do_lock(self):
75 81 '''DEPRECATED - allowing remote client to lock repo is not safe'''
76 82
77 83 self.lock = self.repo.lock()
78 84 self.respond("")
79 85
80 86 def do_unlock(self):
81 87 '''DEPRECATED'''
82 88
83 89 if self.lock:
84 90 self.lock.release()
85 91 self.lock = None
86 92 self.respond("")
87 93
88 94 def do_branches(self):
89 95 arg, nodes = self.getarg()
90 96 nodes = map(bin, nodes.split(" "))
91 97 r = []
92 98 for b in self.repo.branches(nodes):
93 99 r.append(" ".join(map(hex, b)) + "\n")
94 100 self.respond("".join(r))
95 101
96 102 def do_between(self):
97 103 arg, pairs = self.getarg()
98 104 pairs = [map(bin, p.split("-")) for p in pairs.split(" ")]
99 105 r = []
100 106 for b in self.repo.between(pairs):
101 107 r.append(" ".join(map(hex, b)) + "\n")
102 108 self.respond("".join(r))
103 109
104 110 def do_changegroup(self):
105 111 nodes = []
106 112 arg, roots = self.getarg()
107 113 nodes = map(bin, roots.split(" "))
108 114
109 115 cg = self.repo.changegroup(nodes, 'serve')
110 116 while True:
111 117 d = cg.read(4096)
112 118 if not d:
113 119 break
114 120 self.fout.write(d)
115 121
116 122 self.fout.flush()
117 123
118 124 def do_changegroupsubset(self):
119 125 bases = []
120 126 heads = []
121 127 argmap = dict([self.getarg(), self.getarg()])
122 128 bases = [bin(n) for n in argmap['bases'].split(' ')]
123 129 heads = [bin(n) for n in argmap['heads'].split(' ')]
124 130
125 131 cg = self.repo.changegroupsubset(bases, heads, 'serve')
126 132 while True:
127 133 d = cg.read(4096)
128 134 if not d:
129 135 break
130 136 self.fout.write(d)
131 137
132 138 self.fout.flush()
133 139
134 140 def do_addchangegroup(self):
135 141 '''DEPRECATED'''
136 142
137 143 if not self.lock:
138 144 self.respond("not locked")
139 145 return
140 146
141 147 self.respond("")
142 148 r = self.repo.addchangegroup(self.fin, 'serve', self.client_url())
143 149 self.respond(str(r))
144 150
145 151 def client_url(self):
146 152 client = os.environ.get('SSH_CLIENT', '').split(' ', 1)[0]
147 153 return 'remote:ssh:' + client
148 154
149 155 def do_unbundle(self):
150 156 their_heads = self.getarg()[1].split()
151 157
152 158 def check_heads():
153 159 heads = map(hex, self.repo.heads())
154 160 return their_heads == [hex('force')] or their_heads == heads
155 161
156 162 # fail early if possible
157 163 if not check_heads():
158 164 self.respond(_('unsynced changes'))
159 165 return
160 166
161 167 self.respond('')
162 168
163 169 # write bundle data to temporary file because it can be big
164 170
165 171 try:
166 172 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
167 173 fp = os.fdopen(fd, 'wb+')
168 174
169 175 count = int(self.fin.readline())
170 176 while count:
171 177 fp.write(self.fin.read(count))
172 178 count = int(self.fin.readline())
173 179
174 180 was_locked = self.lock is not None
175 181 if not was_locked:
176 182 self.lock = self.repo.lock()
177 183 try:
178 184 if not check_heads():
179 185 # someone else committed/pushed/unbundled while we
180 186 # were transferring data
181 187 self.respond(_('unsynced changes'))
182 188 return
183 189 self.respond('')
184 190
185 191 # push can proceed
186 192
187 193 fp.seek(0)
188 194 r = self.repo.addchangegroup(fp, 'serve', self.client_url())
189 195 self.respond(str(r))
190 196 finally:
191 197 if not was_locked:
192 198 self.lock.release()
193 199 self.lock = None
194 200 finally:
195 201 fp.close()
196 202 os.unlink(tempname)
197 203
198 204 def do_stream_out(self):
199 205 streamclone.stream_out(self.repo, self.fout)
General Comments 0
You need to be logged in to leave comments. Login now