##// END OF EJS Templates
protocol: unify unbundle on the server side
Matt Mackall -
r11593:d054cc5c default
parent child Browse files
Show More
@@ -6,8 +6,8 b''
6 # This software may be used and distributed according to the terms of the
6 # This software may be used and distributed according to the terms of the
7 # GNU General Public License version 2 or any later version.
7 # GNU General Public License version 2 or any later version.
8
8
9 import os, zlib
9 import os, zlib, sys, cStringIO, urllib
10 from mercurial import ui, hg, hook, error, encoding, templater, wireproto
10 from mercurial import ui, hg, hook, error, encoding, templater, wireproto, util
11 from common import get_mtime, ErrorResponse, permhooks
11 from common import get_mtime, ErrorResponse, permhooks
12 from common import HTTP_OK, HTTP_BAD_REQUEST, HTTP_NOT_FOUND, HTTP_SERVER_ERROR
12 from common import HTTP_OK, HTTP_BAD_REQUEST, HTTP_NOT_FOUND, HTTP_SERVER_ERROR
13 from request import wsgirequest
13 from request import wsgirequest
@@ -56,6 +56,23 b' class webproto(object):'
56 def respond(self, s):
56 def respond(self, s):
57 self.req.respond(HTTP_OK, HGTYPE, length=len(s))
57 self.req.respond(HTTP_OK, HGTYPE, length=len(s))
58 self.response = s
58 self.response = s
59 def getfile(self, fp):
60 length = int(self.req.env['CONTENT_LENGTH'])
61 for s in util.filechunkiter(self.req, limit=length):
62 fp.write(s)
63 def redirect(self):
64 self.oldio = sys.stdout, sys.stderr
65 sys.stderr = sys.stdout = cStringIO.StringIO()
66 def respondpush(self, ret):
67 val = sys.stdout.getvalue()
68 sys.stdout, sys.stderr = self.oldio
69 self.req.respond(HTTP_OK, HGTYPE)
70 self.response = '%d\n%s' % (ret, val)
71 def _client(self):
72 return 'remote:%s:%s:%s' % (
73 self.req.env.get('wsgi.url_scheme') or 'http',
74 urllib.quote(self.req.env.get('REMOTE_HOST', '')),
75 urllib.quote(self.req.env.get('REMOTE_USER', '')))
59
76
60 def callproto(repo, req, cmd):
77 def callproto(repo, req, cmd):
61 p = webproto(req)
78 p = webproto(req)
@@ -32,85 +32,3 b' def capabilities(repo, req):'
32 rsp = ' '.join(caps)
32 rsp = ' '.join(caps)
33 req.respond(HTTP_OK, HGTYPE, length=len(rsp))
33 req.respond(HTTP_OK, HGTYPE, length=len(rsp))
34 yield rsp
34 yield rsp
35
36 def unbundle(repo, req):
37
38 proto = req.env.get('wsgi.url_scheme') or 'http'
39 their_heads = req.form['heads'][0].split(' ')
40
41 def check_heads():
42 heads = map(hex, repo.heads())
43 return their_heads == [hex('force')] or their_heads == heads
44
45 # fail early if possible
46 if not check_heads():
47 req.drain()
48 raise ErrorResponse(HTTP_OK, 'unsynced changes')
49
50 # do not lock repo until all changegroup data is
51 # streamed. save to temporary file.
52
53 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
54 fp = os.fdopen(fd, 'wb+')
55 try:
56 length = int(req.env['CONTENT_LENGTH'])
57 for s in util.filechunkiter(req, limit=length):
58 fp.write(s)
59
60 try:
61 lock = repo.lock()
62 try:
63 if not check_heads():
64 raise ErrorResponse(HTTP_OK, 'unsynced changes')
65
66 fp.seek(0)
67 header = fp.read(6)
68 if header.startswith('HG') and not header.startswith('HG10'):
69 raise ValueError('unknown bundle version')
70 elif header not in changegroupmod.bundletypes:
71 raise ValueError('unknown bundle compression type')
72 gen = changegroupmod.unbundle(header, fp)
73
74 # send addchangegroup output to client
75
76 oldio = sys.stdout, sys.stderr
77 sys.stderr = sys.stdout = cStringIO.StringIO()
78
79 try:
80 url = 'remote:%s:%s:%s' % (
81 proto,
82 urllib.quote(req.env.get('REMOTE_HOST', '')),
83 urllib.quote(req.env.get('REMOTE_USER', '')))
84 try:
85 ret = repo.addchangegroup(gen, 'serve', url, lock=lock)
86 except util.Abort, inst:
87 sys.stdout.write("abort: %s\n" % inst)
88 ret = 0
89 finally:
90 val = sys.stdout.getvalue()
91 sys.stdout, sys.stderr = oldio
92 req.respond(HTTP_OK, HGTYPE)
93 return '%d\n%s' % (ret, val),
94 finally:
95 lock.release()
96 except ValueError, inst:
97 raise ErrorResponse(HTTP_OK, inst)
98 except (OSError, IOError), inst:
99 error = getattr(inst, 'strerror', 'Unknown error')
100 if not isinstance(error, str):
101 error = 'Error: %s' % str(error)
102 if inst.errno == errno.ENOENT:
103 code = HTTP_NOT_FOUND
104 else:
105 code = HTTP_SERVER_ERROR
106 filename = getattr(inst, 'filename', '')
107 # Don't send our filesystem layout to the client
108 if filename and filename.startswith(repo.root):
109 filename = filename[len(repo.root)+1:]
110 text = '%s: %s' % (error, filename)
111 else:
112 text = error.replace(repo.root + os.path.sep, '')
113 raise ErrorResponse(code, text)
114 finally:
115 fp.close()
116 os.unlink(tempname)
@@ -72,6 +72,20 b' class sshserver(object):'
72 self.fout.write(chunk)
72 self.fout.write(chunk)
73 self.fout.flush()
73 self.fout.flush()
74
74
75 def getfile(self, fpout):
76 self.respond('')
77 count = int(self.fin.readline())
78 while count:
79 fpout.write(self.fin.read(count))
80 count = int(self.fin.readline())
81
82 def redirect(self):
83 pass
84
85 def respondpush(self, ret):
86 self.respond('')
87 self.respond(str(ret))
88
75 def serve_forever(self):
89 def serve_forever(self):
76 try:
90 try:
77 while self.serve_one():
91 while self.serve_one():
@@ -127,58 +141,10 b' class sshserver(object):'
127 return
141 return
128
142
129 self.respond("")
143 self.respond("")
130 r = self.repo.addchangegroup(self.fin, 'serve', self.client_url(),
144 r = self.repo.addchangegroup(self.fin, 'serve', self._client(),
131 lock=self.lock)
145 lock=self.lock)
132 return str(r)
146 return str(r)
133
147
134 def client_url(self):
148 def _client(self):
135 client = os.environ.get('SSH_CLIENT', '').split(' ', 1)[0]
149 client = os.environ.get('SSH_CLIENT', '').split(' ', 1)[0]
136 return 'remote:ssh:' + client
150 return 'remote:ssh:' + client
137
138 def do_unbundle(self):
139 their_heads = self.getarg('heads').split()
140
141 def check_heads():
142 heads = map(hex, self.repo.heads())
143 return their_heads == [hex('force')] or their_heads == heads
144
145 # fail early if possible
146 if not check_heads():
147 self.respond(_('unsynced changes'))
148 return
149
150 self.respond('')
151
152 # write bundle data to temporary file because it can be big
153 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
154 fp = os.fdopen(fd, 'wb+')
155 try:
156 count = int(self.fin.readline())
157 while count:
158 fp.write(self.fin.read(count))
159 count = int(self.fin.readline())
160
161 was_locked = self.lock is not None
162 if not was_locked:
163 self.lock = self.repo.lock()
164 try:
165 if not check_heads():
166 # someone else committed/pushed/unbundled while we
167 # were transferring data
168 self.respond(_('unsynced changes'))
169 return
170 self.respond('')
171
172 # push can proceed
173
174 fp.seek(0)
175 r = self.repo.addchangegroup(fp, 'serve', self.client_url(),
176 lock=self.lock)
177 self.respond(str(r))
178 finally:
179 if not was_locked:
180 self.lock.release()
181 self.lock = None
182 finally:
183 fp.close()
184 os.unlink(tempname)
@@ -5,10 +5,11 b''
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 import urllib, tempfile, os
8 from i18n import _
9 from i18n import _
9 from node import bin, hex
10 from node import bin, hex
10 import urllib
11 import changegroup as changegroupmod
11 import streamclone, repo, error, encoding
12 import streamclone, repo, error, encoding, util
12 import pushkey as pushkey_
13 import pushkey as pushkey_
13
14
14 # client side
15 # client side
@@ -198,6 +199,56 b' def stream(repo, proto):'
198 except streamclone.StreamException, inst:
199 except streamclone.StreamException, inst:
199 return str(inst)
200 return str(inst)
200
201
202 def unbundle(repo, proto, heads):
203 their_heads = heads.split()
204
205 def check_heads():
206 heads = map(hex, repo.heads())
207 return their_heads == [hex('force')] or their_heads == heads
208
209 # fail early if possible
210 if not check_heads():
211 repo.respond(_('unsynced changes'))
212 return
213
214 # write bundle data to temporary file because it can be big
215 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
216 fp = os.fdopen(fd, 'wb+')
217 r = 0
218 proto.redirect()
219 try:
220 proto.getfile(fp)
221 lock = repo.lock()
222 try:
223 if not check_heads():
224 # someone else committed/pushed/unbundled while we
225 # were transferring data
226 proto.respond(_('unsynced changes'))
227 return
228
229 # push can proceed
230 fp.seek(0)
231 header = fp.read(6)
232 if header.startswith('HG'):
233 if not header.startswith('HG10'):
234 raise ValueError('unknown bundle version')
235 elif header not in changegroupmod.bundletypes:
236 raise ValueError('unknown bundle compression type')
237 gen = changegroupmod.unbundle(header, fp)
238
239 try:
240 r = repo.addchangegroup(gen, 'serve', proto._client(),
241 lock=lock)
242 except util.Abort, inst:
243 sys.stderr.write("abort: %s\n" % inst)
244 finally:
245 lock.release()
246 proto.respondpush(r)
247
248 finally:
249 fp.close()
250 os.unlink(tempname)
251
201 commands = {
252 commands = {
202 'between': (between, 'pairs'),
253 'between': (between, 'pairs'),
203 'branchmap': (branchmap, ''),
254 'branchmap': (branchmap, ''),
@@ -209,4 +260,5 b' commands = {'
209 'lookup': (lookup, 'key'),
260 'lookup': (lookup, 'key'),
210 'pushkey': (pushkey, 'namespace key old new'),
261 'pushkey': (pushkey, 'namespace key old new'),
211 'stream_out': (stream, ''),
262 'stream_out': (stream, ''),
263 'unbundle': (unbundle, 'heads'),
212 }
264 }
General Comments 0
You need to be logged in to leave comments. Login now