##// END OF EJS Templates
wireproto: introduce pusherr() to deal with "unsynced changes" error...
Benoit Boissinot -
r12703:40bb5853 default
parent child Browse files
Show More
@@ -1,68 +1,73
1 1 #
2 2 # Copyright 21 May 2005 - (c) 2005 Jake Edge <jake@edge2.net>
3 3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 import cStringIO, zlib, sys, urllib
9 9 from mercurial import util, wireproto
10 10 from common import HTTP_OK
11 11
12 12 HGTYPE = 'application/mercurial-0.1'
13 13
14 14 class webproto(object):
15 15 def __init__(self, req):
16 16 self.req = req
17 17 self.response = ''
18 18 def getargs(self, args):
19 19 data = {}
20 20 keys = args.split()
21 21 for k in keys:
22 22 if k == '*':
23 23 star = {}
24 24 for key in self.req.form.keys():
25 25 if key not in keys:
26 26 star[key] = self.req.form[key][0]
27 27 data['*'] = star
28 28 else:
29 29 data[k] = self.req.form[k][0]
30 30 return [data[k] for k in keys]
31 31 def getfile(self, fp):
32 32 length = int(self.req.env['CONTENT_LENGTH'])
33 33 for s in util.filechunkiter(self.req, limit=length):
34 34 fp.write(s)
35 35 def redirect(self):
36 36 self.oldio = sys.stdout, sys.stderr
37 37 sys.stderr = sys.stdout = cStringIO.StringIO()
38 38 def groupchunks(self, cg):
39 39 z = zlib.compressobj()
40 40 while 1:
41 41 chunk = cg.read(4096)
42 42 if not chunk:
43 43 break
44 44 yield z.compress(chunk)
45 45 yield z.flush()
46 46 def _client(self):
47 47 return 'remote:%s:%s:%s' % (
48 48 self.req.env.get('wsgi.url_scheme') or 'http',
49 49 urllib.quote(self.req.env.get('REMOTE_HOST', '')),
50 50 urllib.quote(self.req.env.get('REMOTE_USER', '')))
51 51
52 52 def iscmd(cmd):
53 53 return cmd in wireproto.commands
54 54
55 55 def call(repo, req, cmd):
56 56 p = webproto(req)
57 57 rsp = wireproto.dispatch(repo, p, cmd)
58 58 if isinstance(rsp, str):
59 59 req.respond(HTTP_OK, HGTYPE, length=len(rsp))
60 60 return [rsp]
61 61 elif isinstance(rsp, wireproto.streamres):
62 62 req.respond(HTTP_OK, HGTYPE)
63 63 return rsp.gen
64 64 elif isinstance(rsp, wireproto.pushres):
65 65 val = sys.stdout.getvalue()
66 66 sys.stdout, sys.stderr = p.oldio
67 67 req.respond(HTTP_OK, HGTYPE)
68 68 return ['%d\n%s' % (rsp.res, val)]
69 elif isinstance(rsp, wireproto.pusherr):
70 sys.stdout, sys.stderr = p.oldio
71 rsp = '0\n%s\n' % rsp.res
72 req.respond(HTTP_OK, HGTYPE, length=len(rsp))
73 return [rsp]
@@ -1,140 +1,144
1 1 # sshserver.py - ssh protocol server support for mercurial
2 2 #
3 3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
4 4 # Copyright 2006 Vadim Gelfer <vadim.gelfer@gmail.com>
5 5 #
6 6 # This software may be used and distributed according to the terms of the
7 7 # GNU General Public License version 2 or any later version.
8 8
9 9 import util, hook, wireproto, changegroup
10 10 import os, sys
11 11
12 12 class sshserver(object):
13 13 def __init__(self, ui, repo):
14 14 self.ui = ui
15 15 self.repo = repo
16 16 self.lock = None
17 17 self.fin = sys.stdin
18 18 self.fout = sys.stdout
19 19
20 20 hook.redirect(True)
21 21 sys.stdout = sys.stderr
22 22
23 23 # Prevent insertion/deletion of CRs
24 24 util.set_binary(self.fin)
25 25 util.set_binary(self.fout)
26 26
27 27 def getargs(self, args):
28 28 data = {}
29 29 keys = args.split()
30 30 count = len(keys)
31 31 for n in xrange(len(keys)):
32 32 argline = self.fin.readline()[:-1]
33 33 arg, l = argline.split()
34 34 val = self.fin.read(int(l))
35 35 if arg not in keys:
36 36 raise util.Abort("unexpected parameter %r" % arg)
37 37 if arg == '*':
38 38 star = {}
39 39 for n in xrange(int(l)):
40 40 arg, l = argline.split()
41 41 val = self.fin.read(int(l))
42 42 star[arg] = val
43 43 data['*'] = star
44 44 else:
45 45 data[arg] = val
46 46 return [data[k] for k in keys]
47 47
48 48 def getarg(self, name):
49 49 return self.getargs(name)[0]
50 50
51 51 def getfile(self, fpout):
52 52 self.sendresponse('')
53 53 count = int(self.fin.readline())
54 54 while count:
55 55 fpout.write(self.fin.read(count))
56 56 count = int(self.fin.readline())
57 57
58 58 def redirect(self):
59 59 pass
60 60
61 61 def groupchunks(self, changegroup):
62 62 while True:
63 63 d = changegroup.read(4096)
64 64 if not d:
65 65 break
66 66 yield d
67 67
68 68 def sendresponse(self, v):
69 69 self.fout.write("%d\n" % len(v))
70 70 self.fout.write(v)
71 71 self.fout.flush()
72 72
73 73 def sendstream(self, source):
74 74 for chunk in source.gen:
75 75 self.fout.write(chunk)
76 76 self.fout.flush()
77 77
78 78 def sendpushresponse(self, rsp):
79 79 self.sendresponse('')
80 80 self.sendresponse(str(rsp.res))
81 81
82 def sendpusherror(self, rsp):
83 self.sendresponse(rsp.res)
84
82 85 def serve_forever(self):
83 86 try:
84 87 while self.serve_one():
85 88 pass
86 89 finally:
87 90 if self.lock is not None:
88 91 self.lock.release()
89 92 sys.exit(0)
90 93
91 94 handlers = {
92 95 str: sendresponse,
93 96 wireproto.streamres: sendstream,
94 97 wireproto.pushres: sendpushresponse,
98 wireproto.pusherr: sendpusherror,
95 99 }
96 100
97 101 def serve_one(self):
98 102 cmd = self.fin.readline()[:-1]
99 103 if cmd and cmd in wireproto.commands:
100 104 rsp = wireproto.dispatch(self.repo, self, cmd)
101 105 self.handlers[rsp.__class__](self, rsp)
102 106 elif cmd:
103 107 impl = getattr(self, 'do_' + cmd, None)
104 108 if impl:
105 109 r = impl()
106 110 if r is not None:
107 111 self.sendresponse(r)
108 112 else: self.sendresponse("")
109 113 return cmd != ''
110 114
111 115 def do_lock(self):
112 116 '''DEPRECATED - allowing remote client to lock repo is not safe'''
113 117
114 118 self.lock = self.repo.lock()
115 119 return ""
116 120
117 121 def do_unlock(self):
118 122 '''DEPRECATED'''
119 123
120 124 if self.lock:
121 125 self.lock.release()
122 126 self.lock = None
123 127 return ""
124 128
125 129 def do_addchangegroup(self):
126 130 '''DEPRECATED'''
127 131
128 132 if not self.lock:
129 133 self.sendresponse("not locked")
130 134 return
131 135
132 136 self.sendresponse("")
133 137 cg = changegroup.unbundle10(self.fin, "UN")
134 138 r = self.repo.addchangegroup(cg, 'serve', self._client(),
135 139 lock=self.lock)
136 140 return str(r)
137 141
138 142 def _client(self):
139 143 client = os.environ.get('SSH_CLIENT', '').split(' ', 1)[0]
140 144 return 'remote:ssh:' + client
@@ -1,334 +1,338
1 1 # wireproto.py - generic wire protocol support functions
2 2 #
3 3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 import urllib, tempfile, os, sys
9 9 from i18n import _
10 10 from node import bin, hex
11 11 import changegroup as changegroupmod
12 12 import repo, error, encoding, util, store
13 13 import pushkey as pushkeymod
14 14
15 15 # list of nodes encoding / decoding
16 16
17 17 def decodelist(l, sep=' '):
18 18 return map(bin, l.split(sep))
19 19
20 20 def encodelist(l, sep=' '):
21 21 return sep.join(map(hex, l))
22 22
23 23 # client side
24 24
25 25 class wirerepository(repo.repository):
26 26 def lookup(self, key):
27 27 self.requirecap('lookup', _('look up remote revision'))
28 28 d = self._call("lookup", key=key)
29 29 success, data = d[:-1].split(" ", 1)
30 30 if int(success):
31 31 return bin(data)
32 32 self._abort(error.RepoError(data))
33 33
34 34 def heads(self):
35 35 d = self._call("heads")
36 36 try:
37 37 return decodelist(d[:-1])
38 38 except:
39 39 self._abort(error.ResponseError(_("unexpected response:"), d))
40 40
41 41 def branchmap(self):
42 42 d = self._call("branchmap")
43 43 try:
44 44 branchmap = {}
45 45 for branchpart in d.splitlines():
46 46 branchname, branchheads = branchpart.split(' ', 1)
47 47 branchname = urllib.unquote(branchname)
48 48 # Earlier servers (1.3.x) send branch names in (their) local
49 49 # charset. The best we can do is assume it's identical to our
50 50 # own local charset, in case it's not utf-8.
51 51 try:
52 52 branchname.decode('utf-8')
53 53 except UnicodeDecodeError:
54 54 branchname = encoding.fromlocal(branchname)
55 55 branchheads = decodelist(branchheads)
56 56 branchmap[branchname] = branchheads
57 57 return branchmap
58 58 except TypeError:
59 59 self._abort(error.ResponseError(_("unexpected response:"), d))
60 60
61 61 def branches(self, nodes):
62 62 n = encodelist(nodes)
63 63 d = self._call("branches", nodes=n)
64 64 try:
65 65 br = [tuple(decodelist(b)) for b in d.splitlines()]
66 66 return br
67 67 except:
68 68 self._abort(error.ResponseError(_("unexpected response:"), d))
69 69
70 70 def between(self, pairs):
71 71 batch = 8 # avoid giant requests
72 72 r = []
73 73 for i in xrange(0, len(pairs), batch):
74 74 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
75 75 d = self._call("between", pairs=n)
76 76 try:
77 77 r.extend(l and decodelist(l) or [] for l in d.splitlines())
78 78 except:
79 79 self._abort(error.ResponseError(_("unexpected response:"), d))
80 80 return r
81 81
82 82 def pushkey(self, namespace, key, old, new):
83 83 if not self.capable('pushkey'):
84 84 return False
85 85 d = self._call("pushkey",
86 86 namespace=namespace, key=key, old=old, new=new)
87 87 return bool(int(d))
88 88
89 89 def listkeys(self, namespace):
90 90 if not self.capable('pushkey'):
91 91 return {}
92 92 d = self._call("listkeys", namespace=namespace)
93 93 r = {}
94 94 for l in d.splitlines():
95 95 k, v = l.split('\t')
96 96 r[k.decode('string-escape')] = v.decode('string-escape')
97 97 return r
98 98
99 99 def stream_out(self):
100 100 return self._callstream('stream_out')
101 101
102 102 def changegroup(self, nodes, kind):
103 103 n = encodelist(nodes)
104 104 f = self._callstream("changegroup", roots=n)
105 105 return changegroupmod.unbundle10(self._decompress(f), 'UN')
106 106
107 107 def changegroupsubset(self, bases, heads, kind):
108 108 self.requirecap('changegroupsubset', _('look up remote changes'))
109 109 bases = encodelist(bases)
110 110 heads = encodelist(heads)
111 111 f = self._callstream("changegroupsubset",
112 112 bases=bases, heads=heads)
113 113 return changegroupmod.unbundle10(self._decompress(f), 'UN')
114 114
115 115 def unbundle(self, cg, heads, source):
116 116 '''Send cg (a readable file-like object representing the
117 117 changegroup to push, typically a chunkbuffer object) to the
118 118 remote server as a bundle. Return an integer indicating the
119 119 result of the push (see localrepository.addchangegroup()).'''
120 120
121 121 ret, output = self._callpush("unbundle", cg, heads=encodelist(heads))
122 122 if ret == "":
123 123 raise error.ResponseError(
124 124 _('push failed:'), output)
125 125 try:
126 126 ret = int(ret)
127 127 except ValueError:
128 128 raise error.ResponseError(
129 129 _('push failed (unexpected response):'), ret)
130 130
131 131 for l in output.splitlines(True):
132 132 self.ui.status(_('remote: '), l)
133 133 return ret
134 134
135 135 # server side
136 136
137 137 class streamres(object):
138 138 def __init__(self, gen):
139 139 self.gen = gen
140 140
141 141 class pushres(object):
142 142 def __init__(self, res):
143 143 self.res = res
144 144
145 class pusherr(object):
146 def __init__(self, res):
147 self.res = res
148
145 149 def dispatch(repo, proto, command):
146 150 func, spec = commands[command]
147 151 args = proto.getargs(spec)
148 152 return func(repo, proto, *args)
149 153
150 154 def between(repo, proto, pairs):
151 155 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
152 156 r = []
153 157 for b in repo.between(pairs):
154 158 r.append(encodelist(b) + "\n")
155 159 return "".join(r)
156 160
157 161 def branchmap(repo, proto):
158 162 branchmap = repo.branchmap()
159 163 heads = []
160 164 for branch, nodes in branchmap.iteritems():
161 165 branchname = urllib.quote(branch)
162 166 branchnodes = encodelist(nodes)
163 167 heads.append('%s %s' % (branchname, branchnodes))
164 168 return '\n'.join(heads)
165 169
166 170 def branches(repo, proto, nodes):
167 171 nodes = decodelist(nodes)
168 172 r = []
169 173 for b in repo.branches(nodes):
170 174 r.append(encodelist(b) + "\n")
171 175 return "".join(r)
172 176
173 177 def capabilities(repo, proto):
174 178 caps = 'lookup changegroupsubset branchmap pushkey'.split()
175 179 if _allowstream(repo.ui):
176 180 requiredformats = repo.requirements & repo.supportedformats
177 181 # if our local revlogs are just revlogv1, add 'stream' cap
178 182 if not requiredformats - set(('revlogv1',)):
179 183 caps.append('stream')
180 184 # otherwise, add 'streamreqs' detailing our local revlog format
181 185 else:
182 186 caps.append('streamreqs=%s' % ','.join(requiredformats))
183 187 caps.append('unbundle=%s' % ','.join(changegroupmod.bundlepriority))
184 188 return ' '.join(caps)
185 189
186 190 def changegroup(repo, proto, roots):
187 191 nodes = decodelist(roots)
188 192 cg = repo.changegroup(nodes, 'serve')
189 193 return streamres(proto.groupchunks(cg))
190 194
191 195 def changegroupsubset(repo, proto, bases, heads):
192 196 bases = decodelist(bases)
193 197 heads = decodelist(heads)
194 198 cg = repo.changegroupsubset(bases, heads, 'serve')
195 199 return streamres(proto.groupchunks(cg))
196 200
197 201 def heads(repo, proto):
198 202 h = repo.heads()
199 203 return encodelist(h) + "\n"
200 204
201 205 def hello(repo, proto):
202 206 '''the hello command returns a set of lines describing various
203 207 interesting things about the server, in an RFC822-like format.
204 208 Currently the only one defined is "capabilities", which
205 209 consists of a line in the form:
206 210
207 211 capabilities: space separated list of tokens
208 212 '''
209 213 return "capabilities: %s\n" % (capabilities(repo, proto))
210 214
211 215 def listkeys(repo, proto, namespace):
212 216 d = pushkeymod.list(repo, namespace).items()
213 217 t = '\n'.join(['%s\t%s' % (k.encode('string-escape'),
214 218 v.encode('string-escape')) for k, v in d])
215 219 return t
216 220
217 221 def lookup(repo, proto, key):
218 222 try:
219 223 r = hex(repo.lookup(key))
220 224 success = 1
221 225 except Exception, inst:
222 226 r = str(inst)
223 227 success = 0
224 228 return "%s %s\n" % (success, r)
225 229
226 230 def pushkey(repo, proto, namespace, key, old, new):
227 231 r = pushkeymod.push(repo, namespace, key, old, new)
228 232 return '%s\n' % int(r)
229 233
230 234 def _allowstream(ui):
231 235 return ui.configbool('server', 'uncompressed', True, untrusted=True)
232 236
233 237 def stream(repo, proto):
234 238 '''If the server supports streaming clone, it advertises the "stream"
235 239 capability with a value representing the version and flags of the repo
236 240 it is serving. Client checks to see if it understands the format.
237 241
238 242 The format is simple: the server writes out a line with the amount
239 243 of files, then the total amount of bytes to be transfered (separated
240 244 by a space). Then, for each file, the server first writes the filename
241 245 and filesize (separated by the null character), then the file contents.
242 246 '''
243 247
244 248 if not _allowstream(repo.ui):
245 249 return '1\n'
246 250
247 251 entries = []
248 252 total_bytes = 0
249 253 try:
250 254 # get consistent snapshot of repo, lock during scan
251 255 lock = repo.lock()
252 256 try:
253 257 repo.ui.debug('scanning\n')
254 258 for name, ename, size in repo.store.walk():
255 259 entries.append((name, size))
256 260 total_bytes += size
257 261 finally:
258 262 lock.release()
259 263 except error.LockError:
260 264 return '2\n' # error: 2
261 265
262 266 def streamer(repo, entries, total):
263 267 '''stream out all metadata files in repository.'''
264 268 yield '0\n' # success
265 269 repo.ui.debug('%d files, %d bytes to transfer\n' %
266 270 (len(entries), total_bytes))
267 271 yield '%d %d\n' % (len(entries), total_bytes)
268 272 for name, size in entries:
269 273 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
270 274 # partially encode name over the wire for backwards compat
271 275 yield '%s\0%d\n' % (store.encodedir(name), size)
272 276 for chunk in util.filechunkiter(repo.sopener(name), limit=size):
273 277 yield chunk
274 278
275 279 return streamres(streamer(repo, entries, total_bytes))
276 280
277 281 def unbundle(repo, proto, heads):
278 282 their_heads = decodelist(heads)
279 283
280 284 def check_heads():
281 285 heads = repo.heads()
282 286 return their_heads == ['force'] or their_heads == heads
283 287
284 288 proto.redirect()
285 289
286 290 # fail early if possible
287 291 if not check_heads():
288 return 'unsynced changes'
292 return pusherr('unsynced changes')
289 293
290 294 # write bundle data to temporary file because it can be big
291 295 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
292 296 fp = os.fdopen(fd, 'wb+')
293 297 r = 0
294 298 try:
295 299 proto.getfile(fp)
296 300 lock = repo.lock()
297 301 try:
298 302 if not check_heads():
299 303 # someone else committed/pushed/unbundled while we
300 304 # were transferring data
301 return 'unsynced changes'
305 return pusherr('unsynced changes')
302 306
303 307 # push can proceed
304 308 fp.seek(0)
305 309 gen = changegroupmod.readbundle(fp, None)
306 310
307 311 try:
308 312 r = repo.addchangegroup(gen, 'serve', proto._client(),
309 313 lock=lock)
310 314 except util.Abort, inst:
311 315 sys.stderr.write("abort: %s\n" % inst)
312 316 finally:
313 317 lock.release()
314 318 return pushres(r)
315 319
316 320 finally:
317 321 fp.close()
318 322 os.unlink(tempname)
319 323
320 324 commands = {
321 325 'between': (between, 'pairs'),
322 326 'branchmap': (branchmap, ''),
323 327 'branches': (branches, 'nodes'),
324 328 'capabilities': (capabilities, ''),
325 329 'changegroup': (changegroup, 'roots'),
326 330 'changegroupsubset': (changegroupsubset, 'bases heads'),
327 331 'heads': (heads, ''),
328 332 'hello': (hello, ''),
329 333 'listkeys': (listkeys, 'namespace'),
330 334 'lookup': (lookup, 'key'),
331 335 'pushkey': (pushkey, 'namespace key old new'),
332 336 'stream_out': (stream, ''),
333 337 'unbundle': (unbundle, 'heads'),
334 338 }
General Comments 0
You need to be logged in to leave comments. Login now